Skip to main content

Overview

Trainly supports Server-Sent Events (SSE) for streaming AI responses in real-time. This provides a better user experience, especially for longer responses.
Streaming sends response chunks as they’re generated, reducing perceived latency by up to 80%.

Why Use Streaming?

Better UX

Users see responses immediately, not after waiting 5+ seconds

Perceived Performance

First token arrives in ~500ms vs 5s+ for full response

Progressive Rendering

Render markdown as it arrives for smooth experience

Cancellable

Users can cancel long-running queries

Basic Streaming

JavaScript/TypeScript

import { TrainlyClient } from "@trainly/react";

const trainly = new TrainlyClient({
  apiKey: process.env.TRAINLY_API_KEY!,
  chatId: process.env.TRAINLY_CHAT_ID!,
});

async function streamQuery(question: string) {
  const stream = await trainly.queryStream({ question });

  for await (const chunk of stream) {
    if (chunk.type === "content") {
      // Append text chunk
      process.stdout.write(chunk.data);
    } else if (chunk.type === "context") {
      // Citations available
      console.log(`\nUsing ${chunk.data.length} sources`);
    } else if (chunk.type === "end") {
      console.log("\n\nComplete!");
    } else if (chunk.type === "error") {
      console.error("Error:", chunk.data);
    }
  }
}

// Usage
await streamQuery("Explain the methodology in detail");

Python

from trainly import TrainlyClient

trainly = TrainlyClient(
    api_key=os.getenv("TRAINLY_API_KEY"),
    chat_id=os.getenv("TRAINLY_CHAT_ID")
)

def stream_query(question: str):
    for chunk in trainly.query_stream(question=question):
        if chunk.type == "content":
            print(chunk.data, end="", flush=True)
        elif chunk.type == "context":
            print(f"\nUsing {len(chunk.data)} sources")
        elif chunk.type == "end":
            print("\n\nComplete!")
        elif chunk.type == "error":
            print(f"\nError: {chunk.data}")

# Usage
stream_query("Explain the methodology in detail")

Raw REST API

curl -X POST https://api.trainlyai.com/v1/chat_abc123/answer_question_stream \
  -H "Authorization: Bearer tk_your_api_key" \
  -H "Content-Type: application/json" \
  -H "Accept: text/event-stream" \
  -d '{"question": "What are the findings?"}' \
  --no-buffer
Response (SSE Format):
data: {"type": "context", "data": [...]}

data: {"type": "content", "data": "Based"}

data: {"type": "content", "data": " on"}

data: {"type": "content", "data": " the"}

data: {"type": "end"}

React Integration

Streaming Chat Component

"use client";

import { useState, useRef, useEffect } from "react";
import { TrainlyClient } from "@trainly/react";
import ReactMarkdown from "react-markdown";

interface Message {
  role: "user" | "assistant";
  content: string;
  streaming?: boolean;
}

export function StreamingChat() {
  const [messages, setMessages] = useState<Message[]>([]);
  const [input, setInput] = useState("");
  const [streaming, setStreaming] = useState(false);
  const abortControllerRef = useRef<AbortController | null>(null);

  const trainly = new TrainlyClient({
    apiKey: process.env.NEXT_PUBLIC_TRAINLY_API_KEY!,
    chatId: process.env.NEXT_PUBLIC_TRAINLY_CHAT_ID!,
  });

  async function handleSubmit(e: React.FormEvent) {
    e.preventDefault();

    if (!input.trim() || streaming) return;

    // Add user message
    const userMessage: Message = {
      role: "user",
      content: input,
    };

    setMessages((prev) => [...prev, userMessage]);
    setInput("");
    setStreaming(true);

    // Add placeholder for assistant message
    setMessages((prev) => [
      ...prev,
      {
        role: "assistant",
        content: "",
        streaming: true,
      },
    ]);

    try {
      // Create abort controller for cancellation
      abortControllerRef.current = new AbortController();

      const stream = await trainly.queryStream({
        question: input,
        signal: abortControllerRef.current.signal,
      });

      let fullResponse = "";

      for await (const chunk of stream) {
        if (chunk.type === "content") {
          fullResponse += chunk.data;

          // Update the last message (assistant)
          setMessages((prev) => {
            const updated = [...prev];
            updated[updated.length - 1] = {
              role: "assistant",
              content: fullResponse,
              streaming: true,
            };
            return updated;
          });
        } else if (chunk.type === "end") {
          // Mark streaming complete
          setMessages((prev) => {
            const updated = [...prev];
            updated[updated.length - 1] = {
              role: "assistant",
              content: fullResponse,
              streaming: false,
            };
            return updated;
          });

          setStreaming(false);
        }
      }
    } catch (error: any) {
      if (error.name === "AbortError") {
        console.log("Stream cancelled by user");
      } else {
        console.error("Streaming error:", error);

        // Show error message
        setMessages((prev) => {
          const updated = [...prev];
          updated[updated.length - 1] = {
            role: "assistant",
            content: "Sorry, an error occurred. Please try again.",
            streaming: false,
          };
          return updated;
        });
      }

      setStreaming(false);
    } finally {
      abortControllerRef.current = null;
    }
  }

  function handleCancel() {
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }
  }

  return (
    <div className="flex flex-col h-screen max-w-4xl mx-auto p-4">
      <div className="flex-1 overflow-y-auto space-y-4 mb-4">
        {messages.map((message, i) => (
          <div
            key={i}
            className={`flex ${message.role === "user" ? "justify-end" : "justify-start"}`}
          >
            <div
              className={`max-w-[80%] rounded-lg p-4 ${
                message.role === "user"
                  ? "bg-blue-500 text-white"
                  : "bg-gray-100 text-gray-900"
              }`}
            >
              <ReactMarkdown>{message.content}</ReactMarkdown>
              {message.streaming && (
                <span className="inline-block w-2 h-4 bg-current animate-pulse ml-1" />
              )}
            </div>
          </div>
        ))}
      </div>

      <form onSubmit={handleSubmit} className="flex gap-2">
        <input
          type="text"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          placeholder="Ask a question..."
          disabled={streaming}
          className="flex-1 px-4 py-2 border rounded-lg"
        />

        {streaming ? (
          <button
            type="button"
            onClick={handleCancel}
            className="px-6 py-2 bg-red-500 text-white rounded-lg"
          >
            Cancel
          </button>
        ) : (
          <button
            type="submit"
            disabled={!input.trim()}
            className="px-6 py-2 bg-blue-500 text-white rounded-lg disabled:opacity-50"
          >
            Send
          </button>
        )}
      </form>
    </div>
  );
}

Advanced Streaming Patterns

Typed Chunks with TypeScript

type StreamChunk =
  | {
      type: "context";
      data: Array<{ chunk_id: string; chunk_text: string; score: number }>;
    }
  | { type: "content"; data: string }
  | { type: "end" }
  | { type: "error"; data: string };

async function typedStream(question: string): Promise<string> {
  const stream = await trainly.queryStream({ question });

  let fullResponse = "";
  let citationCount = 0;

  for await (const chunk of stream) {
    switch (chunk.type) {
      case "context":
        citationCount = chunk.data.length;
        console.log(`Using ${citationCount} citations`);
        break;

      case "content":
        fullResponse += chunk.data;
        process.stdout.write(chunk.data);
        break;

      case "end":
        console.log("\n\nStream complete!");
        return fullResponse;

      case "error":
        throw new Error(chunk.data);
    }
  }

  return fullResponse;
}

Stream with Progress

interface StreamProgress {
  chunksReceived: number;
  charactersReceived: number;
  estimatedProgress: number;
  citationCount: number;
}

async function streamWithProgress(
  question: string,
  onProgress: (progress: StreamProgress) => void,
) {
  const stream = await trainly.queryStream({ question });

  const progress: StreamProgress = {
    chunksReceived: 0,
    charactersReceived: 0,
    estimatedProgress: 0,
    citationCount: 0,
  };

  for await (const chunk of stream) {
    if (chunk.type === "context") {
      progress.citationCount = chunk.data.length;
    } else if (chunk.type === "content") {
      progress.chunksReceived += 1;
      progress.charactersReceived += chunk.data.length;

      // Estimate progress (typical response is ~500 chars)
      progress.estimatedProgress = Math.min(
        (progress.charactersReceived / 500) * 100,
        99,
      );

      onProgress({ ...progress });

      process.stdout.write(chunk.data);
    } else if (chunk.type === "end") {
      progress.estimatedProgress = 100;
      onProgress({ ...progress });
    }
  }
}

// Usage
await streamWithProgress("Explain the methodology", (progress) => {
  console.log(`Progress: ${progress.estimatedProgress.toFixed(0)}%`);
  console.log(`Characters: ${progress.charactersReceived}`);
});

Cancellable Streams

async function cancellableStream(
  question: string,
  onCancel: () => void,
): Promise<{ cancel: () => void }> {
  const abortController = new AbortController();

  // Start streaming
  const streamPromise = (async () => {
    try {
      const stream = await trainly.queryStream({
        question,
        signal: abortController.signal,
      });

      for await (const chunk of stream) {
        if (chunk.type === "content") {
          console.log(chunk.data);
        } else if (chunk.type === "end") {
          console.log("Complete!");
        }
      }
    } catch (error: any) {
      if (error.name === "AbortError") {
        console.log("Stream cancelled");
        onCancel();
      } else {
        throw error;
      }
    }
  })();

  // Return cancel function
  return {
    cancel: () => abortController.abort(),
  };
}

// Usage
const stream = await cancellableStream("Long question...", () => {
  console.log("User cancelled the stream");
});

// Cancel after 5 seconds
setTimeout(() => stream.cancel(), 5000);

Python Async Streaming

AsyncIO Streaming

import asyncio
from trainly import AsyncTrainlyClient

async def stream_async(question: str):
    trainly = AsyncTrainlyClient(
        api_key=os.getenv("TRAINLY_API_KEY"),
        chat_id=os.getenv("TRAINLY_CHAT_ID")
    )

    async for chunk in trainly.query_stream(question=question):
        if chunk.type == "content":
            print(chunk.data, end="", flush=True)
        elif chunk.type == "end":
            print("\n\nComplete!")

# Run
asyncio.run(stream_async("What are the findings?"))

Concurrent Streams

import asyncio
from trainly import AsyncTrainlyClient

async def process_multiple_streams(questions: list):
    trainly = AsyncTrainlyClient(
        api_key=os.getenv("TRAINLY_API_KEY"),
        chat_id=os.getenv("TRAINLY_CHAT_ID")
    )

    async def process_stream(question: str, index: int):
        print(f"\n[Stream {index}] Starting: {question}")
        answer = ""

        async for chunk in trainly.query_stream(question=question):
            if chunk.type == "content":
                answer += chunk.data
            elif chunk.type == "end":
                print(f"\n[Stream {index}] Complete: {len(answer)} chars")

        return answer

    # Run all streams concurrently
    tasks = [
        process_stream(q, i)
        for i, q in enumerate(questions, 1)
    ]

    answers = await asyncio.gather(*tasks)
    return answers

# Usage
questions = [
    "What is the introduction?",
    "What is the methodology?",
    "What are the conclusions?"
]

answers = asyncio.run(process_multiple_streams(questions))

Framework Integration

Next.js Streaming Route

// app/api/stream/route.ts
import { TrainlyClient } from "@trainly/react";

const trainly = new TrainlyClient({
  apiKey: process.env.TRAINLY_API_KEY!,
  chatId: process.env.TRAINLY_CHAT_ID!,
});

export async function POST(request: Request) {
  const { question } = await request.json();

  // Create a TransformStream for SSE
  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    async start(controller) {
      try {
        const trainlyStream = await trainly.queryStream({ question });

        for await (const chunk of trainlyStream) {
          // Send as SSE
          const sseData = `data: ${JSON.stringify(chunk)}\n\n`;
          controller.enqueue(encoder.encode(sseData));
        }

        controller.close();
      } catch (error) {
        const errorData = `data: ${JSON.stringify({
          type: "error",
          data: error.message,
        })}\n\n`;
        controller.enqueue(encoder.encode(errorData));
        controller.close();
      }
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

React Hook for Streaming

// hooks/useTrainlyStream.ts
import { useState, useCallback, useRef } from 'react';
import { TrainlyClient } from '@trainly/react';

interface UseStreamOptions {
  onComplete?: (answer: string) => void;
  onError?: (error: Error) => void;
}

export function useTrainlyStream(options?: UseStreamOptions) {
  const [answer, setAnswer] = useState('');
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState<Error | null>(null);
  const [citations, setCitations] = useState<any[]>([]);
  const abortControllerRef = useRef<AbortController | null>(null);

  const trainly = new TrainlyClient({
    apiKey: process.env.NEXT_PUBLIC_TRAINLY_API_KEY!,
    chatId: process.env.NEXT_PUBLIC_TRAINLY_CHAT_ID!
  });

  const stream = useCallback(async (question: string) => {
    setLoading(true);
    setError(null);
    setAnswer('');
    setCitations([]);

    abortControllerRef.current = new AbortController();

    try {
      const stream = await trainly.queryStream({
        question,
        signal: abortControllerRef.current.signal
      });

      let fullAnswer = '';

      for await (const chunk of stream) {
        if (chunk.type === 'content') {
          fullAnswer += chunk.data;
          setAnswer(fullAnswer);
        } else if (chunk.type === 'context') {
          setCitations(chunk.data);
        } else if (chunk.type === 'end') {
          options?.onComplete?.(fullAnswer);
        }
      }

    } catch (err: any) {
      if (err.name !== 'AbortError') {
        setError(err);
        options?.onError?.(err);
      }
    } finally {
      setLoading(false);
      abortControllerRef.current = null;
    }
  }, [trainly, options]);

  const cancel = useCallback(() => {
    abortControllerRef.current?.abort();
  }, []);

  return {
    answer,
    loading,
    error,
    citations,
    stream,
    cancel
  };
}

// Usage in component
function ChatComponent() {
  const { answer, loading, stream, cancel } = useTrainlyStream({
    onComplete: (answer) => console.log('Stream complete:', answer.length),
    onError: (error) => console.error('Stream error:', error)
  });

  return (
    <div>
      <button onClick={() => stream('What is AI?')}>
        Ask Question
      </button>

      {loading && (
        <button onClick={cancel}>Cancel</button>
      )}

      <div className="answer">
        <ReactMarkdown>{answer}</ReactMarkdown>
      </div>
    </div>
  );
}

Vue 3 Streaming Component

<template>
  <div class="streaming-chat">
    <div class="messages">
      <div v-for="message in messages" :key="message.id" :class="message.role">
        <div v-html="renderMarkdown(message.content)" />
        <span v-if="message.streaming" class="cursor"></span>
      </div>
    </div>

    <form @submit.prevent="handleSubmit">
      <input
        v-model="input"
        placeholder="Ask a question..."
        :disabled="streaming"
      />

      <button v-if="streaming" @click="cancel" type="button">Cancel</button>
      <button v-else type="submit" :disabled="!input.trim()">Send</button>
    </form>
  </div>
</template>

<script setup lang="ts">
import { ref } from "vue";
import { TrainlyClient } from "@trainly/react";
import { marked } from "marked";

const trainly = new TrainlyClient({
  apiKey: import.meta.env.VITE_TRAINLY_API_KEY,
  chatId: import.meta.env.VITE_TRAINLY_CHAT_ID,
});

const messages = ref([]);
const input = ref("");
const streaming = ref(false);
let abortController = null;

async function handleSubmit() {
  if (!input.value.trim() || streaming.value) return;

  const question = input.value;

  // Add user message
  messages.value.push({
    id: Date.now(),
    role: "user",
    content: question,
  });

  input.value = "";
  streaming.value = true;

  // Add assistant message
  const assistantMessage = {
    id: Date.now() + 1,
    role: "assistant",
    content: "",
    streaming: true,
  };
  messages.value.push(assistantMessage);

  try {
    abortController = new AbortController();

    const stream = await trainly.queryStream({
      question,
      signal: abortController.signal,
    });

    for await (const chunk of stream) {
      if (chunk.type === "content") {
        assistantMessage.content += chunk.data;
      } else if (chunk.type === "end") {
        assistantMessage.streaming = false;
      }
    }
  } catch (error) {
    if (error.name !== "AbortError") {
      console.error("Streaming error:", error);
      assistantMessage.content = "Error occurred. Please try again.";
    }
  } finally {
    streaming.value = false;
    abortController = null;
  }
}

function cancel() {
  abortController?.abort();
}

function renderMarkdown(content: string) {
  return marked(content);
}
</script>

Server-Side Streaming

Express SSE Endpoint

const express = require("express");
const { TrainlyClient } = require("@trainly/react");

const app = express();
const trainly = new TrainlyClient({
  apiKey: process.env.TRAINLY_API_KEY,
  chatId: process.env.TRAINLY_CHAT_ID,
});

app.post("/api/stream", async (req, res) => {
  const { question } = req.body;

  // Set SSE headers
  res.setHeader("Content-Type", "text/event-stream");
  res.setHeader("Cache-Control", "no-cache");
  res.setHeader("Connection", "keep-alive");

  try {
    const stream = await trainly.queryStream({ question });

    for await (const chunk of stream) {
      // Send SSE event
      res.write(`data: ${JSON.stringify(chunk)}\n\n`);
    }

    res.end();
  } catch (error) {
    res.write(
      `data: ${JSON.stringify({
        type: "error",
        data: error.message,
      })}\n\n`,
    );
    res.end();
  }
});

// Handle client disconnect
app.use((req, res, next) => {
  req.on("close", () => {
    console.log("Client disconnected");
  });
  next();
});

FastAPI Streaming

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from trainly import TrainlyClient
import json

app = FastAPI()

trainly = TrainlyClient(
    api_key=os.getenv("TRAINLY_API_KEY"),
    chat_id=os.getenv("TRAINLY_CHAT_ID")
)

@app.post("/api/stream")
async def stream_query(question: str):
    async def generate():
        try:
            async for chunk in trainly.query_stream(question=question):
                # Format as SSE
                data = json.dumps({
                    "type": chunk.type,
                    "data": chunk.data
                })
                yield f"data: {data}\n\n"

        except Exception as e:
            error_data = json.dumps({
                "type": "error",
                "data": str(e)
            })
            yield f"data: {error_data}\n\n"

    return StreamingResponse(
        generate(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive"
        }
    )

Real-Time Features

Live Collaboration

// Collaborative document Q&A
import { io, Socket } from "socket.io-client";
import { TrainlyClient } from "@trainly/react";

class CollaborativeChat {
  private socket: Socket;
  private trainly: TrainlyClient;

  constructor(roomId: string) {
    this.socket = io("https://your-server.com");
    this.trainly = new TrainlyClient({
      apiKey: process.env.TRAINLY_API_KEY!,
      chatId: process.env.TRAINLY_CHAT_ID!,
    });

    this.socket.emit("join-room", roomId);
    this.setupListeners();
  }

  setupListeners() {
    this.socket.on("user-query", async (data) => {
      console.log(`${data.userName} asked: ${data.question}`);

      // Broadcast streaming response to all users
      const stream = await this.trainly.queryStream({
        question: data.question,
      });

      for await (const chunk of stream) {
        this.socket.emit("stream-chunk", {
          queryId: data.queryId,
          chunk,
        });
      }
    });
  }

  async askQuestion(question: string) {
    const queryId = crypto.randomUUID();

    // Broadcast question to room
    this.socket.emit("user-query", {
      queryId,
      question,
      userName: "Current User",
      timestamp: Date.now(),
    });
  }
}

// Usage
const chat = new CollaborativeChat("room_123");
await chat.askQuestion("What is the methodology?");

Real-Time Updates

// Notify users when new documents are uploaded
import { EventEmitter } from "events";

class DocumentWatcher extends EventEmitter {
  private trainly: TrainlyClient;
  private knownFiles: Set<string>;

  constructor(trainly: TrainlyClient) {
    super();
    this.trainly = trainly;
    this.knownFiles = new Set();

    this.startWatching();
  }

  async startWatching() {
    // Check every 30 seconds
    setInterval(async () => {
      const files = await this.trainly.listFiles();

      for (const file of files.files) {
        if (!this.knownFiles.has(file.file_id)) {
          this.knownFiles.add(file.file_id);

          // Emit new document event
          this.emit("new-document", {
            fileId: file.file_id,
            filename: file.filename,
            size: file.size_bytes,
          });
        }
      }
    }, 30000);
  }
}

// Usage
const watcher = new DocumentWatcher(trainly);

watcher.on("new-document", (doc) => {
  console.log("New document uploaded:", doc.filename);

  // Notify all connected clients via WebSocket
  io.emit("document-uploaded", doc);
});

Performance Optimization

Chunked Processing

// Process stream in chunks for better performance
async function processStreamInChunks(question: string) {
  const stream = await trainly.queryStream({ question });

  let buffer = "";
  const chunkSize = 50; // characters

  for await (const chunk of stream) {
    if (chunk.type === "content") {
      buffer += chunk.data;

      // Process complete chunks
      while (buffer.length >= chunkSize) {
        const processChunk = buffer.slice(0, chunkSize);
        buffer = buffer.slice(chunkSize);

        // Render chunk (e.g., update DOM)
        renderChunk(processChunk);

        // Small delay for smooth rendering
        await new Promise((resolve) => setTimeout(resolve, 10));
      }
    } else if (chunk.type === "end") {
      // Process remaining buffer
      if (buffer) {
        renderChunk(buffer);
      }
    }
  }
}

function renderChunk(text: string) {
  const element = document.getElementById("answer");
  if (element) {
    element.textContent += text;
  }
}

Backpressure Handling

import asyncio
from collections import deque

class StreamBuffer:
    """Buffer to handle backpressure in streams"""

    def __init__(self, max_size: int = 1000):
        self.buffer = deque(maxlen=max_size)
        self.lock = asyncio.Lock()

    async def push(self, chunk: dict):
        """Add chunk to buffer"""
        async with self.lock:
            if len(self.buffer) >= self.buffer.maxlen:
                # Buffer full - wait
                await asyncio.sleep(0.1)
            self.buffer.append(chunk)

    async def pop(self) -> dict:
        """Get chunk from buffer"""
        async with self.lock:
            if not self.buffer:
                return None
            return self.buffer.popleft()

async def buffered_stream(question: str):
    buffer = StreamBuffer(max_size=100)

    # Producer task
    async def produce():
        async for chunk in trainly.query_stream(question=question):
            await buffer.push(chunk)
        await buffer.push({"type": "end"})

    # Consumer task
    async def consume():
        while True:
            chunk = await buffer.pop()
            if not chunk:
                await asyncio.sleep(0.01)
                continue

            if chunk["type"] == "content":
                print(chunk["data"], end="", flush=True)
            elif chunk["type"] == "end":
                break

    # Run producer and consumer concurrently
    await asyncio.gather(produce(), consume())

Error Handling

Robust Stream Handling

async function robustStream(
  question: string,
  maxRetries: number = 3,
): Promise<string> {
  let attempt = 0;

  while (attempt < maxRetries) {
    try {
      const stream = await trainly.queryStream({ question });
      let fullResponse = "";

      for await (const chunk of stream) {
        if (chunk.type === "content") {
          fullResponse += chunk.data;
          process.stdout.write(chunk.data);
        } else if (chunk.type === "end") {
          console.log("\n\nSuccess!");
          return fullResponse;
        } else if (chunk.type === "error") {
          throw new Error(chunk.data);
        }
      }

      return fullResponse;
    } catch (error: any) {
      attempt++;

      if (error.name === "AbortError") {
        console.log("Stream cancelled");
        throw error;
      }

      if (attempt < maxRetries) {
        const delay = Math.pow(2, attempt) * 1000;
        console.log(
          `\nRetrying in ${delay}ms... (attempt ${attempt}/${maxRetries})`,
        );
        await new Promise((resolve) => setTimeout(resolve, delay));
      } else {
        console.error("All retry attempts failed");
        throw error;
      }
    }
  }

  throw new Error("Stream failed after all retries");
}

Graceful Degradation

async function queryWithFallback(question: string): Promise<string> {
  try {
    // Try streaming first
    console.log("Attempting streaming...");
    const stream = await trainly.queryStream({ question });

    let response = "";
    for await (const chunk of stream) {
      if (chunk.type === "content") {
        response += chunk.data;
      }
    }

    return response;
  } catch (streamError) {
    console.log("Streaming failed, falling back to standard query...");

    try {
      // Fallback to standard query
      const response = await trainly.query({ question });
      return response.answer;
    } catch (queryError) {
      console.error("Both streaming and standard query failed");
      throw queryError;
    }
  }
}

Best Practices

Always handle network disconnections gracefully:
let reconnectAttempts = 0;
const maxReconnects = 3;

async function streamWithReconnect(question: string) {
  try {
    await streamQuery(question);
    reconnectAttempts = 0; // Reset on success
  } catch (error) {
    if (reconnectAttempts < maxReconnects) {
      reconnectAttempts++;
      console.log(`Reconnecting... (${reconnectAttempts}/${maxReconnects})`);
      await new Promise(r => setTimeout(r, 1000 * reconnectAttempts));
      return streamWithReconnect(question);
    }
    throw error;
  }
}
Show users that streaming is active:
{streaming && (
  <div className="flex items-center gap-2 text-blue-500">
    <Spinner />
    <span>Generating response...</span>
  </div>
)}
Always allow users to cancel long streams:
const abortController = new AbortController();

// Cancel button
<button onClick={() => abortController.abort()}>
  Cancel
</button>
Prevent memory leaks with bounded buffers:
const MAX_BUFFER_SIZE = 10000; // characters
let buffer = '';

for await (const chunk of stream) {
  buffer += chunk.data;

  if (buffer.length > MAX_BUFFER_SIZE) {
    // Flush or truncate
    processBuffer(buffer);
    buffer = '';
  }
}

Testing Streaming

Unit Tests

import { describe, it, expect, vi } from "vitest";
import { TrainlyClient } from "@trainly/react";

describe("Streaming", () => {
  it("should stream response chunks", async () => {
    const trainly = new TrainlyClient({
      apiKey: "tk_test",
      chatId: "chat_test",
    });

    // Mock streaming response
    const mockChunks = [
      { type: "context", data: [] },
      { type: "content", data: "Hello" },
      { type: "content", data: " world" },
      { type: "end" },
    ];

    global.fetch = vi.fn().mockResolvedValue({
      ok: true,
      body: {
        getReader: () => ({
          read: async () => {
            const chunk = mockChunks.shift();
            if (!chunk) return { done: true };

            return {
              done: false,
              value: new TextEncoder().encode(
                `data: ${JSON.stringify(chunk)}\n\n`,
              ),
            };
          },
        }),
      },
    });

    const stream = await trainly.queryStream({
      question: "Test",
    });

    const chunks = [];
    for await (const chunk of stream) {
      chunks.push(chunk);
    }

    expect(chunks).toHaveLength(4);
    expect(chunks[0].type).toBe("context");
    expect(chunks[1].type).toBe("content");
    expect(chunks[3].type).toBe("end");
  });
});

Performance Metrics

Target Streaming Metrics

MetricTargetNotes
Time to First Byte< 500msFirst chunk arrives quickly
Chunk Frequency10-20 chunks/secSmooth streaming
Total Stream Time< 10sComplete response time
Error Rate< 0.1%Very rare failures
Cancellation Success100%Always cancellable

Next Steps