Overview
Streaming provides immediate response feedback, improving user experience for long-form answers. Benefits:- Better UX: Users see responses as they’re generated
- Lower perceived latency: Feels faster than waiting for complete response
- Progress indication: Users know the system is working
- Cancellation: Stop generation early if needed
Basic Streaming
JavaScript/TypeScript
Copy
import { TrainlyClient } from "@trainly/react";
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
const stream = await trainly.queryStream({
question: "Explain quantum computing in detail",
});
for await (const chunk of stream) {
if (chunk.type === "content") {
process.stdout.write(chunk.data); // Write as it arrives
} else if (chunk.type === "context") {
console.log("\nSources:", chunk.data.length);
} else if (chunk.type === "end") {
console.log("\nComplete!");
} else if (chunk.type === "error") {
console.error("Error:", chunk.data);
}
}
Python
Copy
import trainly
client = trainly.Client(
api_key="tk_your_key",
chat_id="chat_abc123"
)
stream = client.query_stream(question="Explain quantum computing")
for chunk in stream:
if chunk["type"] == "content":
print(chunk["data"], end="", flush=True)
elif chunk["type"] == "context":
print(f"\nSources: {len(chunk['data'])}")
elif chunk["type"] == "end":
print("\nComplete!")
React Integration
Streaming Chat Component
Copy
'use client';
import { useState } from 'react';
import { TrainlyClient } from '@trainly/react';
export function StreamingChat() {
const [messages, setMessages] = useState<Array<{ role: string; content: string }>>([]);
const [input, setInput] = useState('');
const [loading, setLoading] = useState(false);
const trainly = new TrainlyClient({
apiKey: process.env.NEXT_PUBLIC_TRAINLY_API_KEY!,
chatId: process.env.NEXT_PUBLIC_TRAINLY_CHAT_ID!,
});
async function handleSubmit(e: React.FormEvent) {
e.preventDefault();
if (!input.trim()) return;
// Add user message
setMessages(prev => [...prev, { role: 'user', content: input }]);
setInput('');
setLoading(true);
try {
// Call your API route (not directly in client)
const response = await fetch('/api/query', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ question: input }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let answer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
answer += chunk;
// Update last message or add new one
setMessages(prev => {
const newMessages = [...prev];
if (newMessages[newMessages.length - 1]?.role === 'assistant') {
newMessages[newMessages.length - 1].content = answer;
} else {
newMessages.push({ role: 'assistant', content: answer });
}
return newMessages;
});
}
} catch (error) {
console.error('Stream error:', error);
setMessages(prev => [...prev, {
role: 'assistant',
content: 'Sorry, an error occurred. Please try again.'
}]);
} finally {
setLoading(false);
}
}
return (
<div className="flex flex-col h-[600px]">
{/* Messages */}
<div className="flex-1 overflow-y-auto space-y-4 p-4">
{messages.map((msg, i) => (
<div
key={i}
className={`flex ${msg.role === 'user' ? 'justify-end' : 'justify-start'}`}
>
<div
className={`max-w-[80%] p-3 rounded-lg ${
msg.role === 'user'
? 'bg-blue-500 text-white'
: 'bg-gray-200 text-gray-900'
}`}
>
{msg.content}
</div>
</div>
))}
{loading && (
<div className="flex justify-start">
<div className="bg-gray-200 p-3 rounded-lg">
<div className="animate-pulse">Thinking...</div>
</div>
</div>
)}
</div>
{/* Input */}
<form onSubmit={handleSubmit} className="border-t p-4">
<div className="flex gap-2">
<input
type="text"
value={input}
onChange={e => setInput(e.target.value)}
placeholder="Ask a question..."
disabled={loading}
className="flex-1 p-2 border rounded focus:outline-none focus:ring-2 focus:ring-blue-500"
/>
<button
type="submit"
disabled={loading}
className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 disabled:opacity-50 disabled:cursor-not-allowed"
>
{loading ? 'Sending...' : 'Send'}
</button>
</div>
</form>
</div>
);
}
Custom Hook
Copy
import { useState, useCallback } from 'react';
interface UseStreamingQueryOptions {
onChunk?: (chunk: string) => void;
onComplete?: (fullAnswer: string) => void;
onError?: (error: Error) => void;
}
export function useStreamingQuery(options: UseStreamingQueryOptions = {}) {
const [answer, setAnswer] = useState('');
const [loading, setLoading] = useState(false);
const [error, setError] = useState<Error | null>(null);
const query = useCallback(async (question: string) => {
setLoading(true);
setError(null);
setAnswer('');
try {
const response = await fetch('/api/query', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ question }),
});
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let fullAnswer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
fullAnswer += chunk;
setAnswer(fullAnswer);
options.onChunk?.(chunk);
}
options.onComplete?.(fullAnswer);
} catch (err) {
const error = err as Error;
setError(error);
options.onError?.(error);
} finally {
setLoading(false);
}
}, [options]);
return { answer, loading, error, query };
}
// Usage
function MyComponent() {
const { answer, loading, query } = useStreamingQuery({
onChunk: (chunk) => console.log('Received:', chunk),
onComplete: (full) => console.log('Done:', full),
});
return (
<div>
<button onClick={() => query('What is AI?')} disabled={loading}>
Ask
</button>
<p>{answer}</p>
</div>
);
}
Next.js API Route
Server-side streaming endpoint.Copy
// app/api/query/route.ts
import { TrainlyClient } from '@trainly/react';
import { NextRequest } from 'next/server';
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
export async function POST(request: NextRequest) {
const { question } = await request.json();
const stream = await trainly.queryStream({ question });
const encoder = new TextEncoder();
return new Response(
new ReadableStream({
async start(controller) {
try {
for await (const chunk of stream) {
if (chunk.type === 'content') {
controller.enqueue(encoder.encode(chunk.data));
} else if (chunk.type === 'end') {
controller.close();
} else if (chunk.type === 'error') {
controller.error(new Error(chunk.data));
}
}
} catch (error) {
controller.error(error);
}
},
}),
{
headers: {
'Content-Type': 'text/plain; charset=utf-8',
'Transfer-Encoding': 'chunked',
},
}
);
}
Express.js Streaming
Copy
import express from 'express';
import { TrainlyClient } from '@trainly/react';
const app = express();
app.use(express.json());
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
app.post('/api/stream', async (req, res) => {
const { question } = req.body;
// Set SSE headers
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
try {
const stream = await trainly.queryStream({ question });
for await (const chunk of stream) {
if (chunk.type === 'content') {
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
} else if (chunk.type === 'end') {
res.write('data: [DONE]\n\n');
res.end();
}
}
} catch (error) {
res.write(`data: ${JSON.stringify({ type: 'error', data: error.message })}\n\n`);
res.end();
}
});
app.listen(3000);
Python Async Streaming
Copy
import trainly
import asyncio
client = trainly.AsyncClient(
api_key="tk_your_key",
chat_id="chat_abc123"
)
async def stream_query(question: str):
"""Async streaming example"""
async for chunk in client.query_stream_async(question=question):
if chunk["type"] == "content":
print(chunk["data"], end="", flush=True)
elif chunk["type"] == "end":
print("\nDone!")
# Run
asyncio.run(stream_query("Explain AI"))
FastAPI Streaming
Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import trainly
app = FastAPI()
client = trainly.Client(
api_key="tk_your_key",
chat_id="chat_abc123"
)
@app.post("/stream")
async def stream_query(question: str):
async def generate():
stream = client.query_stream(question=question)
for chunk in stream:
if chunk["type"] == "content":
yield chunk["data"]
elif chunk["type"] == "end":
break
return StreamingResponse(
generate(),
media_type="text/plain"
)
Advanced Patterns
Cancellable Streams
Copy
function useCancellableStream() {
const abortControllerRef = useRef<AbortController | null>(null);
async function startStream(question: string) {
// Cancel previous stream if running
abortControllerRef.current?.abort();
// Create new abort controller
const controller = new AbortController();
abortControllerRef.current = controller;
const response = await fetch('/api/query', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ question }),
signal: controller.signal, // Pass abort signal
});
// Handle stream...
}
function cancel() {
abortControllerRef.current?.abort();
}
return { startStream, cancel };
}
Progress Tracking
Copy
async function streamWithProgress(question: string) {
const stream = await trainly.queryStream({ question });
let chunkCount = 0;
let totalChars = 0;
const startTime = Date.now();
for await (const chunk of stream) {
if (chunk.type === 'content') {
chunkCount++;
totalChars += chunk.data.length;
const elapsed = Date.now() - startTime;
const charsPerSec = totalChars / (elapsed / 1000);
console.log(`Chunks: ${chunkCount} | Chars: ${totalChars} | Speed: ${charsPerSec.toFixed(0)} chars/sec`);
}
}
}
Retry on Stream Failure
Copy
async function robustStream(question: string, maxRetries: number = 3) {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
const stream = await trainly.queryStream({ question });
let answer = '';
for await (const chunk of stream) {
if (chunk.type === 'content') {
answer += chunk.data;
} else if (chunk.type === 'end') {
return answer; // Success
}
}
} catch (error) {
console.error(`Stream attempt ${attempt} failed:`, error);
if (attempt === maxRetries) {
// All retries exhausted, fall back to non-streaming
const response = await trainly.query({ question });
return response.answer;
}
// Exponential backoff
await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 1000));
}
}
throw new Error('Stream failed after retries');
}
Error Handling
Copy
async function safeStream(question: string) {
try {
const stream = await trainly.queryStream({ question });
let answer = '';
for await (const chunk of stream) {
if (chunk.type === 'content') {
answer += chunk.data;
} else if (chunk.type === 'error') {
throw new Error(chunk.data);
} else if (chunk.type === 'end') {
return answer;
}
}
} catch (error) {
console.error('Stream error:', error);
// Graceful fallback to non-streaming
try {
const response = await trainly.query({ question });
return response.answer;
} catch (fallbackError) {
return 'Unable to process your question. Please try again.';
}
}
}
Best Practices
When to Use Streaming
When to Use Streaming
Use streaming for:
- Long-form content generation (>100 words)
- Interactive chat interfaces
- Real-time user feedback needed
- Mobile apps (progressive rendering)
- Short queries (<50 words expected)
- Batch processing
- Analytics/logging (need complete response)
- API integrations requiring full response
Performance
Performance
- Stream chunks immediately, don’t buffer
- Use async/await for better concurrency
- Implement backpressure handling for slow clients
- Monitor streaming latency (P50, P95, P99)
- Target first chunk <500ms
Error Handling
Error Handling
- Always implement fallback to non-streaming
- Handle network interruptions gracefully
- Show user-friendly error messages
- Log stream failures for monitoring
- Retry with exponential backoff
User Experience
User Experience
- Show loading indicator before first chunk
- Animate text appearance for smoothness
- Allow users to stop generation
- Display sources after content
- Provide feedback during streaming
Performance Targets
Streaming Metrics:- First chunk latency: <500ms (time to first content)
- Chunk throughput: >20 chunks/second
- Total latency: <5 seconds for 500-word response
- Stream stability: <1% failure rate
Copy
const traces = await trainly.analytics.getQueryTraces({ limit: 100 });
const streamingMetrics = traces.map(t => ({
firstChunkLatency: t.timing?.retrieval_ms || 0,
totalLatency: t.duration_ms,
success: t.status === 'completed',
}));
const avgFirstChunk = streamingMetrics.reduce((sum, m) => sum + m.firstChunkLatency, 0) / streamingMetrics.length;
const avgTotal = streamingMetrics.reduce((sum, m) => sum + m.totalLatency, 0) / streamingMetrics.length;
console.log(`First chunk: ${avgFirstChunk}ms, Total: ${avgTotal}ms`);