Skip to main content

Streaming Support

Why Streaming?

When LLMs generate responses character-by-character, you don't want to wait until the end to store memory. rememberStream() provides:

  • Automatic buffering - No manual stream consumption
  • Full feature parity - Embeddings, facts, graph sync all supported
  • Type safe - ReadableStream and AsyncIterable support
  • Edge compatible - Works in Vercel Edge, Cloudflare Workers

Quick Start

import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';

const response = await streamText({
model: openai('gpt-4'),
messages: [{ role: 'user', content: 'What is AI?' }],
});

const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'What is AI?',
responseStream: response.textStream,
userId: 'user-123',
userName: 'Alice',
});

console.log(result.fullResponse); // Complete response
console.log(result.memories); // Stored memories

Why Not Manual Buffering?

// Manual stream consumption - tedious!
let fullResponse = '';
for await (const chunk of stream) {
fullResponse += chunk.choices[0]?.delta?.content || '';
}

// Separate storage step
await cortex.memory.remember({
memorySpaceId: 'agent-1',
userMessage: 'Hello!',
agentResponse: fullResponse,
userId: 'user-1',
userName: 'User',
});

With Fact Extraction

// Automatic extraction with LLM configured
const cortex = new Cortex({
convexUrl: process.env.CONVEX_URL!,
llm: openai('gpt-4'), // Enables automatic fact extraction
});

const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'My favorite color is blue and I love pizza',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
// Facts automatically extracted from conversation + response
});

console.log(`Extracted ${result.facts.length} facts`);
result.facts.forEach(f => console.log(`- ${f.fact}`));

Custom Extraction

// Override automatic extraction
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'I prefer TypeScript',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',

extractFacts: async (userMsg, agentResp) => [{
fact: 'User prefers TypeScript',
factType: 'preference',
confidence: 95,
}],
});

With Embeddings

import { embed } from '@ai-sdk/openai';

const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'What is machine learning?',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',

generateEmbedding: async (text) => {
const { embedding } = await embed({
model: openai.embedding('text-embedding-3-small'),
value: text,
});
return embedding;
},
});

Edge Runtime Support

// app/api/chat/route.ts
import { Cortex } from '@cortexmemory/sdk';
import { streamText } from 'ai';
import { openai } from '@ai-sdk/openai';

export const runtime = 'edge';

const cortex = new Cortex({
convexUrl: process.env.CONVEX_URL!,
});

export async function POST(req: Request) {
const { message, userId } = await req.json();

const response = await streamText({
model: openai('gpt-4'),
messages: [{ role: 'user', content: message }],
});

// Store in background (non-blocking)
cortex.memory.rememberStream({
memorySpaceId: 'edge-chat',
conversationId: `conv-${userId}-${Date.now()}`,
userMessage: message,
responseStream: response.textStream,
userId,
userName: 'User',
}).catch(console.error);

// Return streaming response to client
return response.toDataStreamResponse();
}

Supported Stream Types

TypeSourceExample
ReadableStreamWeb Streams APIVercel AI SDK, fetch()
AsyncIterableJavaScript async generatorsOpenAI SDK, LangChain
Custom generatorsYour codeasync function*

All three work seamlessly with rememberStream().


Streaming Options

ParameterTypeRequiredDefaultDescription
storePartialResponsebooleanNofalseStore in-progress memories during streaming
partialResponseIntervalnumberNo5000How often to update partial memory (ms)
progressiveFactExtractionbooleanNofalseExtract facts incrementally during streaming
generateResumeTokenbooleanNofalseEnable stream resumption on failure
streamTimeoutnumberNo30000Maximum stream duration (ms)
partialFailureHandlingstringNo'best-effort''store-partial' | 'rollback' | 'retry' | 'best-effort'

Streaming Hooks

Monitor streaming progress with real-time callbacks:

const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'Explain quantum computing',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
}, {
hooks: {
onChunk: (event) => {
// Real-time chunk updates
websocket.send({ type: 'chunk', data: event.chunk });
},
onProgress: (event) => {
// Progress tracking
console.log(`Processed: ${event.bytesProcessed} bytes`);
updateProgressBar(event.bytesProcessed);
},
onComplete: (event) => {
console.log(`Done! ${event.totalChunks} chunks in ${event.durationMs}ms`);
},
onError: (error) => {
console.error('Stream error:', error);
},
},
});

Stream Metrics

Comprehensive Tracking

Every stream provides detailed metrics for monitoring and cost analysis.

const result = await cortex.memory.rememberStream({
// ... params
});

console.log(result.streamMetrics);
// {
// totalChunks: 42,
// streamDurationMs: 1234,
// firstChunkLatency: 89,
// averageChunkSize: 24,
// totalBytesProcessed: 1008,
// chunksPerSecond: 34,
// estimatedTokens: 156,
// estimatedCost: 0.003,
// }

Error Recovery

Resume Interrupted Streams

Enable generateResumeToken to recover from stream failures.

try {
const result = await cortex.memory.rememberStream({
memorySpaceId: 'user-space',
conversationId: 'conv-123',
userMessage: 'Long explanation...',
responseStream: stream,
userId: 'user-123',
userName: 'Alice',
}, {
partialFailureHandling: 'store-partial',
generateResumeToken: true,
streamTimeout: 30000,
});
} catch (error) {
if (error instanceof ResumableStreamError) {
console.log('Stream interrupted. Resume token:', error.resumeToken);
// Save resumeToken for later retry
}
}

Best Practices

Non-Blocking Storage
// Don't await if you don't need the result
cortex.memory.rememberStream({
// ... params
}).catch(console.error);

// Continue with other work immediately
Use in Background
// Store memory while returning stream to client
const response = await streamText({...});

// Fire-and-forget storage
cortex.memory.rememberStream({
responseStream: response.textStream,
// ...
}).catch(console.error);

// Return stream to client immediately
return response.toDataStreamResponse();
Stream Once Only

ReadableStream and AsyncIterable can only be consumed once. Don't try to reuse streams:

// Bad: Can't consume same stream twice
await cortex.memory.rememberStream({ responseStream: stream, ... });
await cortex.memory.rememberStream({ responseStream: stream, ... }); // Fails!

// Good: Use separate streams or tee the stream
const [stream1, stream2] = stream.tee();

Performance

Stream Consumption:

  • Small (100 chars): < 1ms overhead
  • Medium (1K chars): < 5ms overhead
  • Large (10K chars): < 20ms overhead

Memory Usage:

  • ~16 bytes per character
  • 10K char response ≈ 160KB RAM

Storage performance is identical to remember().


Next Steps