Streaming Support
Why Streaming?
When LLMs generate responses character-by-character, you don't want to wait until the end to store memory. rememberStream() provides:
- Automatic buffering - No manual stream consumption
- Full feature parity - Embeddings, facts, graph sync all supported
- Type safe - ReadableStream and AsyncIterable support
- Edge compatible - Works in Vercel Edge, Cloudflare Workers
Quick Start
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
const response = await streamText({
model: openai('gpt-4'),
messages: [{ role: 'user', content: 'What is AI?' }],
});
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'What is AI?',
responseStream: response.textStream,
userId: 'user-123',
userName: 'Alice',
});
console.log(result.fullResponse); // Complete response
console.log(result.memories); // Stored memories
import OpenAI from 'openai';
const openai = new OpenAI();
const stream = await openai.chat.completions.create({
model: 'gpt-4',
messages: [{ role: 'user', content: 'Hello!' }],
stream: true,
});
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-456',
userMessage: 'Hello!',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
});
async function* generateResponse() {
yield 'Hello ';
await new Promise(resolve => setTimeout(resolve, 100));
yield 'World!';
}
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-789',
userMessage: 'Say hello',
responseStream: generateResponse(),
userId: 'user-123',
userName: 'Alice',
});
console.log(result.fullResponse); // "Hello World!"
Why Not Manual Buffering?
// Manual stream consumption - tedious!
let fullResponse = '';
for await (const chunk of stream) {
fullResponse += chunk.choices[0]?.delta?.content || '';
}
// Separate storage step
await cortex.memory.remember({
memorySpaceId: 'agent-1',
userMessage: 'Hello!',
agentResponse: fullResponse,
userId: 'user-1',
userName: 'User',
});
// One call - automatic buffering + storage
const result = await cortex.memory.rememberStream({
memorySpaceId: 'agent-1',
conversationId: 'conv-123',
userMessage: 'Hello!',
responseStream: stream,
userId: 'user-1',
userName: 'User',
});
console.log('Stored:', result.fullResponse);
// All layers updated automatically
With Fact Extraction
// Automatic extraction with LLM configured
const cortex = new Cortex({
convexUrl: process.env.CONVEX_URL!,
llm: openai('gpt-4'), // Enables automatic fact extraction
});
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'My favorite color is blue and I love pizza',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
// Facts automatically extracted from conversation + response
});
console.log(`Extracted ${result.facts.length} facts`);
result.facts.forEach(f => console.log(`- ${f.fact}`));
Custom Extraction
// Override automatic extraction
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'I prefer TypeScript',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
extractFacts: async (userMsg, agentResp) => [{
fact: 'User prefers TypeScript',
factType: 'preference',
confidence: 95,
}],
});
With Embeddings
import { embed } from '@ai-sdk/openai';
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'What is machine learning?',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
generateEmbedding: async (text) => {
const { embedding } = await embed({
model: openai.embedding('text-embedding-3-small'),
value: text,
});
return embedding;
},
});
Edge Runtime Support
// app/api/chat/route.ts
import { Cortex } from '@cortexmemory/sdk';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';
export const runtime = 'edge';
const cortex = new Cortex({
convexUrl: process.env.CONVEX_URL!,
});
export async function POST(req: Request) {
const { message, userId } = await req.json();
const response = await streamText({
model: openai('gpt-4'),
messages: [{ role: 'user', content: message }],
});
// Store in background (non-blocking)
cortex.memory.rememberStream({
memorySpaceId: 'edge-chat',
conversationId: `conv-${userId}-${Date.now()}`,
userMessage: message,
responseStream: response.textStream,
userId,
userName: 'User',
}).catch(console.error);
// Return streaming response to client
return response.toDataStreamResponse();
}
Supported Stream Types
| Type | Source | Example |
|---|---|---|
| ReadableStream | Web Streams API | Vercel AI SDK, fetch() |
| AsyncIterable | JavaScript async generators | OpenAI SDK, LangChain |
| Custom generators | Your code | async function* |
All three work seamlessly with rememberStream().
Streaming Options
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
storePartialResponse | boolean | No | false | Store in-progress memories during streaming |
partialResponseInterval | number | No | 5000 | How often to update partial memory (ms) |
progressiveFactExtraction | boolean | No | false | Extract facts incrementally during streaming |
generateResumeToken | boolean | No | false | Enable stream resumption on failure |
streamTimeout | number | No | 30000 | Maximum stream duration (ms) |
partialFailureHandling | string | No | 'best-effort' | 'store-partial' | 'rollback' | 'retry' | 'best-effort' |
Streaming Hooks
Monitor streaming progress with real-time callbacks:
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'Explain quantum computing',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
}, {
hooks: {
onChunk: (event) => {
// Real-time chunk updates
websocket.send({ type: 'chunk', data: event.chunk });
},
onProgress: (event) => {
// Progress tracking
console.log(`Processed: ${event.bytesProcessed} bytes`);
updateProgressBar(event.bytesProcessed);
},
onComplete: (event) => {
console.log(`Done! ${event.totalChunks} chunks in ${event.durationMs}ms`);
},
onError: (error) => {
console.error('Stream error:', error);
},
},
});
Stream Metrics
Comprehensive Tracking
Every stream provides detailed metrics for monitoring and cost analysis.
const result = await cortex.memory.rememberStream({
// ... params
});
console.log(result.streamMetrics);
// {
// totalChunks: 42,
// streamDurationMs: 1234,
// firstChunkLatency: 89,
// averageChunkSize: 24,
// totalBytesProcessed: 1008,
// chunksPerSecond: 34,
// estimatedTokens: 156,
// estimatedCost: 0.003,
// }
Error Recovery
Resume Interrupted Streams
Enable generateResumeToken to recover from stream failures.
try {
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'Long explanation...',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
}, {
partialFailureHandling: 'store-partial',
generateResumeToken: true,
streamTimeout: 30000,
});
} catch (error) {
if (error instanceof ResumableStreamError) {
console.log('Stream interrupted. Resume token:', error.resumeToken);
// Save resumeToken for later retry
}
}
Best Practices
Non-Blocking Storage
// Don't await if you don't need the result
cortex.memory.rememberStream({
// ... params
}).catch(console.error);
// Continue with other work immediately
Use in Background
// Store memory while returning stream to client
const response = await streamText({...});
// Fire-and-forget storage
cortex.memory.rememberStream({
responseStream: response.textStream,
// ...
}).catch(console.error);
// Return stream to client immediately
return response.toDataStreamResponse();
Stream Once Only
ReadableStream and AsyncIterable can only be consumed once. Don't try to reuse streams:
// Bad: Can't consume same stream twice
await cortex.memory.rememberStream({ responseStream: stream, ... });
await cortex.memory.rememberStream({ responseStream: stream, ... }); // Fails!
// Good: Use separate streams or tee the stream
const [stream1, stream2] = stream.tee();
Performance
Stream Consumption:
- Small (100 chars): < 1ms overhead
- Medium (1K chars): < 5ms overhead
- Large (10K chars): < 20ms overhead
Memory Usage:
- ~16 bytes per character
- 10K char response ≈ 160KB RAM
Storage performance is identical to remember().