Overview
Trainly supports Server-Sent Events (SSE) for streaming AI responses in real-time. This provides a better user experience, especially for longer responses.Streaming sends response chunks as they’re generated, reducing perceived
latency by up to 80%.
Why Use Streaming?
Better UX
Users see responses immediately, not after waiting 5+ seconds
Perceived Performance
First token arrives in ~500ms vs 5s+ for full response
Progressive Rendering
Render markdown as it arrives for smooth experience
Cancellable
Users can cancel long-running queries
Basic Streaming
JavaScript/TypeScript
Copy
import { TrainlyClient } from "@trainly/react";
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
async function streamQuery(question: string) {
const stream = await trainly.queryStream({ question });
for await (const chunk of stream) {
if (chunk.type === "content") {
// Append text chunk
process.stdout.write(chunk.data);
} else if (chunk.type === "context") {
// Citations available
console.log(`\nUsing ${chunk.data.length} sources`);
} else if (chunk.type === "end") {
console.log("\n\nComplete!");
} else if (chunk.type === "error") {
console.error("Error:", chunk.data);
}
}
}
// Usage
await streamQuery("Explain the methodology in detail");
Python
Copy
from trainly import TrainlyClient
trainly = TrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
def stream_query(question: str):
for chunk in trainly.query_stream(question=question):
if chunk.type == "content":
print(chunk.data, end="", flush=True)
elif chunk.type == "context":
print(f"\nUsing {len(chunk.data)} sources")
elif chunk.type == "end":
print("\n\nComplete!")
elif chunk.type == "error":
print(f"\nError: {chunk.data}")
# Usage
stream_query("Explain the methodology in detail")
Raw REST API
Copy
curl -X POST https://api.trainlyai.com/v1/chat_abc123/answer_question_stream \
-H "Authorization: Bearer tk_your_api_key" \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d '{"question": "What are the findings?"}' \
--no-buffer
Copy
data: {"type": "context", "data": [...]}
data: {"type": "content", "data": "Based"}
data: {"type": "content", "data": " on"}
data: {"type": "content", "data": " the"}
data: {"type": "end"}
React Integration
Streaming Chat Component
Copy
"use client";
import { useState, useRef, useEffect } from "react";
import { TrainlyClient } from "@trainly/react";
import ReactMarkdown from "react-markdown";
interface Message {
role: "user" | "assistant";
content: string;
streaming?: boolean;
}
export function StreamingChat() {
const [messages, setMessages] = useState<Message[]>([]);
const [input, setInput] = useState("");
const [streaming, setStreaming] = useState(false);
const abortControllerRef = useRef<AbortController | null>(null);
const trainly = new TrainlyClient({
apiKey: process.env.NEXT_PUBLIC_TRAINLY_API_KEY!,
chatId: process.env.NEXT_PUBLIC_TRAINLY_CHAT_ID!,
});
async function handleSubmit(e: React.FormEvent) {
e.preventDefault();
if (!input.trim() || streaming) return;
// Add user message
const userMessage: Message = {
role: "user",
content: input,
};
setMessages((prev) => [...prev, userMessage]);
setInput("");
setStreaming(true);
// Add placeholder for assistant message
setMessages((prev) => [
...prev,
{
role: "assistant",
content: "",
streaming: true,
},
]);
try {
// Create abort controller for cancellation
abortControllerRef.current = new AbortController();
const stream = await trainly.queryStream({
question: input,
signal: abortControllerRef.current.signal,
});
let fullResponse = "";
for await (const chunk of stream) {
if (chunk.type === "content") {
fullResponse += chunk.data;
// Update the last message (assistant)
setMessages((prev) => {
const updated = [...prev];
updated[updated.length - 1] = {
role: "assistant",
content: fullResponse,
streaming: true,
};
return updated;
});
} else if (chunk.type === "end") {
// Mark streaming complete
setMessages((prev) => {
const updated = [...prev];
updated[updated.length - 1] = {
role: "assistant",
content: fullResponse,
streaming: false,
};
return updated;
});
setStreaming(false);
}
}
} catch (error: any) {
if (error.name === "AbortError") {
console.log("Stream cancelled by user");
} else {
console.error("Streaming error:", error);
// Show error message
setMessages((prev) => {
const updated = [...prev];
updated[updated.length - 1] = {
role: "assistant",
content: "Sorry, an error occurred. Please try again.",
streaming: false,
};
return updated;
});
}
setStreaming(false);
} finally {
abortControllerRef.current = null;
}
}
function handleCancel() {
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
}
return (
<div className="flex flex-col h-screen max-w-4xl mx-auto p-4">
<div className="flex-1 overflow-y-auto space-y-4 mb-4">
{messages.map((message, i) => (
<div
key={i}
className={`flex ${message.role === "user" ? "justify-end" : "justify-start"}`}
>
<div
className={`max-w-[80%] rounded-lg p-4 ${
message.role === "user"
? "bg-blue-500 text-white"
: "bg-gray-100 text-gray-900"
}`}
>
<ReactMarkdown>{message.content}</ReactMarkdown>
{message.streaming && (
<span className="inline-block w-2 h-4 bg-current animate-pulse ml-1" />
)}
</div>
</div>
))}
</div>
<form onSubmit={handleSubmit} className="flex gap-2">
<input
type="text"
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Ask a question..."
disabled={streaming}
className="flex-1 px-4 py-2 border rounded-lg"
/>
{streaming ? (
<button
type="button"
onClick={handleCancel}
className="px-6 py-2 bg-red-500 text-white rounded-lg"
>
Cancel
</button>
) : (
<button
type="submit"
disabled={!input.trim()}
className="px-6 py-2 bg-blue-500 text-white rounded-lg disabled:opacity-50"
>
Send
</button>
)}
</form>
</div>
);
}
Advanced Streaming Patterns
Typed Chunks with TypeScript
Copy
type StreamChunk =
| {
type: "context";
data: Array<{ chunk_id: string; chunk_text: string; score: number }>;
}
| { type: "content"; data: string }
| { type: "end" }
| { type: "error"; data: string };
async function typedStream(question: string): Promise<string> {
const stream = await trainly.queryStream({ question });
let fullResponse = "";
let citationCount = 0;
for await (const chunk of stream) {
switch (chunk.type) {
case "context":
citationCount = chunk.data.length;
console.log(`Using ${citationCount} citations`);
break;
case "content":
fullResponse += chunk.data;
process.stdout.write(chunk.data);
break;
case "end":
console.log("\n\nStream complete!");
return fullResponse;
case "error":
throw new Error(chunk.data);
}
}
return fullResponse;
}
Stream with Progress
Copy
interface StreamProgress {
chunksReceived: number;
charactersReceived: number;
estimatedProgress: number;
citationCount: number;
}
async function streamWithProgress(
question: string,
onProgress: (progress: StreamProgress) => void,
) {
const stream = await trainly.queryStream({ question });
const progress: StreamProgress = {
chunksReceived: 0,
charactersReceived: 0,
estimatedProgress: 0,
citationCount: 0,
};
for await (const chunk of stream) {
if (chunk.type === "context") {
progress.citationCount = chunk.data.length;
} else if (chunk.type === "content") {
progress.chunksReceived += 1;
progress.charactersReceived += chunk.data.length;
// Estimate progress (typical response is ~500 chars)
progress.estimatedProgress = Math.min(
(progress.charactersReceived / 500) * 100,
99,
);
onProgress({ ...progress });
process.stdout.write(chunk.data);
} else if (chunk.type === "end") {
progress.estimatedProgress = 100;
onProgress({ ...progress });
}
}
}
// Usage
await streamWithProgress("Explain the methodology", (progress) => {
console.log(`Progress: ${progress.estimatedProgress.toFixed(0)}%`);
console.log(`Characters: ${progress.charactersReceived}`);
});
Cancellable Streams
Copy
async function cancellableStream(
question: string,
onCancel: () => void,
): Promise<{ cancel: () => void }> {
const abortController = new AbortController();
// Start streaming
const streamPromise = (async () => {
try {
const stream = await trainly.queryStream({
question,
signal: abortController.signal,
});
for await (const chunk of stream) {
if (chunk.type === "content") {
console.log(chunk.data);
} else if (chunk.type === "end") {
console.log("Complete!");
}
}
} catch (error: any) {
if (error.name === "AbortError") {
console.log("Stream cancelled");
onCancel();
} else {
throw error;
}
}
})();
// Return cancel function
return {
cancel: () => abortController.abort(),
};
}
// Usage
const stream = await cancellableStream("Long question...", () => {
console.log("User cancelled the stream");
});
// Cancel after 5 seconds
setTimeout(() => stream.cancel(), 5000);
Python Async Streaming
AsyncIO Streaming
Copy
import asyncio
from trainly import AsyncTrainlyClient
async def stream_async(question: str):
trainly = AsyncTrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
async for chunk in trainly.query_stream(question=question):
if chunk.type == "content":
print(chunk.data, end="", flush=True)
elif chunk.type == "end":
print("\n\nComplete!")
# Run
asyncio.run(stream_async("What are the findings?"))
Concurrent Streams
Copy
import asyncio
from trainly import AsyncTrainlyClient
async def process_multiple_streams(questions: list):
trainly = AsyncTrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
async def process_stream(question: str, index: int):
print(f"\n[Stream {index}] Starting: {question}")
answer = ""
async for chunk in trainly.query_stream(question=question):
if chunk.type == "content":
answer += chunk.data
elif chunk.type == "end":
print(f"\n[Stream {index}] Complete: {len(answer)} chars")
return answer
# Run all streams concurrently
tasks = [
process_stream(q, i)
for i, q in enumerate(questions, 1)
]
answers = await asyncio.gather(*tasks)
return answers
# Usage
questions = [
"What is the introduction?",
"What is the methodology?",
"What are the conclusions?"
]
answers = asyncio.run(process_multiple_streams(questions))
Framework Integration
Next.js Streaming Route
Copy
// app/api/stream/route.ts
import { TrainlyClient } from "@trainly/react";
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
export async function POST(request: Request) {
const { question } = await request.json();
// Create a TransformStream for SSE
const encoder = new TextEncoder();
const stream = new ReadableStream({
async start(controller) {
try {
const trainlyStream = await trainly.queryStream({ question });
for await (const chunk of trainlyStream) {
// Send as SSE
const sseData = `data: ${JSON.stringify(chunk)}\n\n`;
controller.enqueue(encoder.encode(sseData));
}
controller.close();
} catch (error) {
const errorData = `data: ${JSON.stringify({
type: "error",
data: error.message,
})}\n\n`;
controller.enqueue(encoder.encode(errorData));
controller.close();
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
React Hook for Streaming
Copy
// hooks/useTrainlyStream.ts
import { useState, useCallback, useRef } from 'react';
import { TrainlyClient } from '@trainly/react';
interface UseStreamOptions {
onComplete?: (answer: string) => void;
onError?: (error: Error) => void;
}
export function useTrainlyStream(options?: UseStreamOptions) {
const [answer, setAnswer] = useState('');
const [loading, setLoading] = useState(false);
const [error, setError] = useState<Error | null>(null);
const [citations, setCitations] = useState<any[]>([]);
const abortControllerRef = useRef<AbortController | null>(null);
const trainly = new TrainlyClient({
apiKey: process.env.NEXT_PUBLIC_TRAINLY_API_KEY!,
chatId: process.env.NEXT_PUBLIC_TRAINLY_CHAT_ID!
});
const stream = useCallback(async (question: string) => {
setLoading(true);
setError(null);
setAnswer('');
setCitations([]);
abortControllerRef.current = new AbortController();
try {
const stream = await trainly.queryStream({
question,
signal: abortControllerRef.current.signal
});
let fullAnswer = '';
for await (const chunk of stream) {
if (chunk.type === 'content') {
fullAnswer += chunk.data;
setAnswer(fullAnswer);
} else if (chunk.type === 'context') {
setCitations(chunk.data);
} else if (chunk.type === 'end') {
options?.onComplete?.(fullAnswer);
}
}
} catch (err: any) {
if (err.name !== 'AbortError') {
setError(err);
options?.onError?.(err);
}
} finally {
setLoading(false);
abortControllerRef.current = null;
}
}, [trainly, options]);
const cancel = useCallback(() => {
abortControllerRef.current?.abort();
}, []);
return {
answer,
loading,
error,
citations,
stream,
cancel
};
}
// Usage in component
function ChatComponent() {
const { answer, loading, stream, cancel } = useTrainlyStream({
onComplete: (answer) => console.log('Stream complete:', answer.length),
onError: (error) => console.error('Stream error:', error)
});
return (
<div>
<button onClick={() => stream('What is AI?')}>
Ask Question
</button>
{loading && (
<button onClick={cancel}>Cancel</button>
)}
<div className="answer">
<ReactMarkdown>{answer}</ReactMarkdown>
</div>
</div>
);
}
Vue 3 Streaming Component
Copy
<template>
<div class="streaming-chat">
<div class="messages">
<div v-for="message in messages" :key="message.id" :class="message.role">
<div v-html="renderMarkdown(message.content)" />
<span v-if="message.streaming" class="cursor">▋</span>
</div>
</div>
<form @submit.prevent="handleSubmit">
<input
v-model="input"
placeholder="Ask a question..."
:disabled="streaming"
/>
<button v-if="streaming" @click="cancel" type="button">Cancel</button>
<button v-else type="submit" :disabled="!input.trim()">Send</button>
</form>
</div>
</template>
<script setup lang="ts">
import { ref } from "vue";
import { TrainlyClient } from "@trainly/react";
import { marked } from "marked";
const trainly = new TrainlyClient({
apiKey: import.meta.env.VITE_TRAINLY_API_KEY,
chatId: import.meta.env.VITE_TRAINLY_CHAT_ID,
});
const messages = ref([]);
const input = ref("");
const streaming = ref(false);
let abortController = null;
async function handleSubmit() {
if (!input.value.trim() || streaming.value) return;
const question = input.value;
// Add user message
messages.value.push({
id: Date.now(),
role: "user",
content: question,
});
input.value = "";
streaming.value = true;
// Add assistant message
const assistantMessage = {
id: Date.now() + 1,
role: "assistant",
content: "",
streaming: true,
};
messages.value.push(assistantMessage);
try {
abortController = new AbortController();
const stream = await trainly.queryStream({
question,
signal: abortController.signal,
});
for await (const chunk of stream) {
if (chunk.type === "content") {
assistantMessage.content += chunk.data;
} else if (chunk.type === "end") {
assistantMessage.streaming = false;
}
}
} catch (error) {
if (error.name !== "AbortError") {
console.error("Streaming error:", error);
assistantMessage.content = "Error occurred. Please try again.";
}
} finally {
streaming.value = false;
abortController = null;
}
}
function cancel() {
abortController?.abort();
}
function renderMarkdown(content: string) {
return marked(content);
}
</script>
Server-Side Streaming
Express SSE Endpoint
Copy
const express = require("express");
const { TrainlyClient } = require("@trainly/react");
const app = express();
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY,
chatId: process.env.TRAINLY_CHAT_ID,
});
app.post("/api/stream", async (req, res) => {
const { question } = req.body;
// Set SSE headers
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
try {
const stream = await trainly.queryStream({ question });
for await (const chunk of stream) {
// Send SSE event
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.end();
} catch (error) {
res.write(
`data: ${JSON.stringify({
type: "error",
data: error.message,
})}\n\n`,
);
res.end();
}
});
// Handle client disconnect
app.use((req, res, next) => {
req.on("close", () => {
console.log("Client disconnected");
});
next();
});
FastAPI Streaming
Copy
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from trainly import TrainlyClient
import json
app = FastAPI()
trainly = TrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
@app.post("/api/stream")
async def stream_query(question: str):
async def generate():
try:
async for chunk in trainly.query_stream(question=question):
# Format as SSE
data = json.dumps({
"type": chunk.type,
"data": chunk.data
})
yield f"data: {data}\n\n"
except Exception as e:
error_data = json.dumps({
"type": "error",
"data": str(e)
})
yield f"data: {error_data}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
Real-Time Features
Live Collaboration
Copy
// Collaborative document Q&A
import { io, Socket } from "socket.io-client";
import { TrainlyClient } from "@trainly/react";
class CollaborativeChat {
private socket: Socket;
private trainly: TrainlyClient;
constructor(roomId: string) {
this.socket = io("https://your-server.com");
this.trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
this.socket.emit("join-room", roomId);
this.setupListeners();
}
setupListeners() {
this.socket.on("user-query", async (data) => {
console.log(`${data.userName} asked: ${data.question}`);
// Broadcast streaming response to all users
const stream = await this.trainly.queryStream({
question: data.question,
});
for await (const chunk of stream) {
this.socket.emit("stream-chunk", {
queryId: data.queryId,
chunk,
});
}
});
}
async askQuestion(question: string) {
const queryId = crypto.randomUUID();
// Broadcast question to room
this.socket.emit("user-query", {
queryId,
question,
userName: "Current User",
timestamp: Date.now(),
});
}
}
// Usage
const chat = new CollaborativeChat("room_123");
await chat.askQuestion("What is the methodology?");
Real-Time Updates
Copy
// Notify users when new documents are uploaded
import { EventEmitter } from "events";
class DocumentWatcher extends EventEmitter {
private trainly: TrainlyClient;
private knownFiles: Set<string>;
constructor(trainly: TrainlyClient) {
super();
this.trainly = trainly;
this.knownFiles = new Set();
this.startWatching();
}
async startWatching() {
// Check every 30 seconds
setInterval(async () => {
const files = await this.trainly.listFiles();
for (const file of files.files) {
if (!this.knownFiles.has(file.file_id)) {
this.knownFiles.add(file.file_id);
// Emit new document event
this.emit("new-document", {
fileId: file.file_id,
filename: file.filename,
size: file.size_bytes,
});
}
}
}, 30000);
}
}
// Usage
const watcher = new DocumentWatcher(trainly);
watcher.on("new-document", (doc) => {
console.log("New document uploaded:", doc.filename);
// Notify all connected clients via WebSocket
io.emit("document-uploaded", doc);
});
Performance Optimization
Chunked Processing
Copy
// Process stream in chunks for better performance
async function processStreamInChunks(question: string) {
const stream = await trainly.queryStream({ question });
let buffer = "";
const chunkSize = 50; // characters
for await (const chunk of stream) {
if (chunk.type === "content") {
buffer += chunk.data;
// Process complete chunks
while (buffer.length >= chunkSize) {
const processChunk = buffer.slice(0, chunkSize);
buffer = buffer.slice(chunkSize);
// Render chunk (e.g., update DOM)
renderChunk(processChunk);
// Small delay for smooth rendering
await new Promise((resolve) => setTimeout(resolve, 10));
}
} else if (chunk.type === "end") {
// Process remaining buffer
if (buffer) {
renderChunk(buffer);
}
}
}
}
function renderChunk(text: string) {
const element = document.getElementById("answer");
if (element) {
element.textContent += text;
}
}
Backpressure Handling
Copy
import asyncio
from collections import deque
class StreamBuffer:
"""Buffer to handle backpressure in streams"""
def __init__(self, max_size: int = 1000):
self.buffer = deque(maxlen=max_size)
self.lock = asyncio.Lock()
async def push(self, chunk: dict):
"""Add chunk to buffer"""
async with self.lock:
if len(self.buffer) >= self.buffer.maxlen:
# Buffer full - wait
await asyncio.sleep(0.1)
self.buffer.append(chunk)
async def pop(self) -> dict:
"""Get chunk from buffer"""
async with self.lock:
if not self.buffer:
return None
return self.buffer.popleft()
async def buffered_stream(question: str):
buffer = StreamBuffer(max_size=100)
# Producer task
async def produce():
async for chunk in trainly.query_stream(question=question):
await buffer.push(chunk)
await buffer.push({"type": "end"})
# Consumer task
async def consume():
while True:
chunk = await buffer.pop()
if not chunk:
await asyncio.sleep(0.01)
continue
if chunk["type"] == "content":
print(chunk["data"], end="", flush=True)
elif chunk["type"] == "end":
break
# Run producer and consumer concurrently
await asyncio.gather(produce(), consume())
Error Handling
Robust Stream Handling
Copy
async function robustStream(
question: string,
maxRetries: number = 3,
): Promise<string> {
let attempt = 0;
while (attempt < maxRetries) {
try {
const stream = await trainly.queryStream({ question });
let fullResponse = "";
for await (const chunk of stream) {
if (chunk.type === "content") {
fullResponse += chunk.data;
process.stdout.write(chunk.data);
} else if (chunk.type === "end") {
console.log("\n\nSuccess!");
return fullResponse;
} else if (chunk.type === "error") {
throw new Error(chunk.data);
}
}
return fullResponse;
} catch (error: any) {
attempt++;
if (error.name === "AbortError") {
console.log("Stream cancelled");
throw error;
}
if (attempt < maxRetries) {
const delay = Math.pow(2, attempt) * 1000;
console.log(
`\nRetrying in ${delay}ms... (attempt ${attempt}/${maxRetries})`,
);
await new Promise((resolve) => setTimeout(resolve, delay));
} else {
console.error("All retry attempts failed");
throw error;
}
}
}
throw new Error("Stream failed after all retries");
}
Graceful Degradation
Copy
async function queryWithFallback(question: string): Promise<string> {
try {
// Try streaming first
console.log("Attempting streaming...");
const stream = await trainly.queryStream({ question });
let response = "";
for await (const chunk of stream) {
if (chunk.type === "content") {
response += chunk.data;
}
}
return response;
} catch (streamError) {
console.log("Streaming failed, falling back to standard query...");
try {
// Fallback to standard query
const response = await trainly.query({ question });
return response.answer;
} catch (queryError) {
console.error("Both streaming and standard query failed");
throw queryError;
}
}
}
Best Practices
Handle Disconnections
Handle Disconnections
Always handle network disconnections gracefully:
Copy
let reconnectAttempts = 0;
const maxReconnects = 3;
async function streamWithReconnect(question: string) {
try {
await streamQuery(question);
reconnectAttempts = 0; // Reset on success
} catch (error) {
if (reconnectAttempts < maxReconnects) {
reconnectAttempts++;
console.log(`Reconnecting... (${reconnectAttempts}/${maxReconnects})`);
await new Promise(r => setTimeout(r, 1000 * reconnectAttempts));
return streamWithReconnect(question);
}
throw error;
}
}
Provide Feedback
Provide Feedback
Show users that streaming is active:
Copy
{streaming && (
<div className="flex items-center gap-2 text-blue-500">
<Spinner />
<span>Generating response...</span>
</div>
)}
Enable Cancellation
Enable Cancellation
Always allow users to cancel long streams:
Copy
const abortController = new AbortController();
// Cancel button
<button onClick={() => abortController.abort()}>
Cancel
</button>
Buffer Management
Buffer Management
Prevent memory leaks with bounded buffers:
Copy
const MAX_BUFFER_SIZE = 10000; // characters
let buffer = '';
for await (const chunk of stream) {
buffer += chunk.data;
if (buffer.length > MAX_BUFFER_SIZE) {
// Flush or truncate
processBuffer(buffer);
buffer = '';
}
}
Testing Streaming
Unit Tests
Copy
import { describe, it, expect, vi } from "vitest";
import { TrainlyClient } from "@trainly/react";
describe("Streaming", () => {
it("should stream response chunks", async () => {
const trainly = new TrainlyClient({
apiKey: "tk_test",
chatId: "chat_test",
});
// Mock streaming response
const mockChunks = [
{ type: "context", data: [] },
{ type: "content", data: "Hello" },
{ type: "content", data: " world" },
{ type: "end" },
];
global.fetch = vi.fn().mockResolvedValue({
ok: true,
body: {
getReader: () => ({
read: async () => {
const chunk = mockChunks.shift();
if (!chunk) return { done: true };
return {
done: false,
value: new TextEncoder().encode(
`data: ${JSON.stringify(chunk)}\n\n`,
),
};
},
}),
},
});
const stream = await trainly.queryStream({
question: "Test",
});
const chunks = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
expect(chunks).toHaveLength(4);
expect(chunks[0].type).toBe("context");
expect(chunks[1].type).toBe("content");
expect(chunks[3].type).toBe("end");
});
});
Performance Metrics
Target Streaming Metrics
| Metric | Target | Notes |
|---|---|---|
| Time to First Byte | < 500ms | First chunk arrives quickly |
| Chunk Frequency | 10-20 chunks/sec | Smooth streaming |
| Total Stream Time | < 10s | Complete response time |
| Error Rate | < 0.1% | Very rare failures |
| Cancellation Success | 100% | Always cancellable |