Skip to main content

Overview

Streaming provides immediate response feedback, improving user experience for long-form answers. Benefits:
  • Better UX: Users see responses as they’re generated
  • Lower perceived latency: Feels faster than waiting for complete response
  • Progress indication: Users know the system is working
  • Cancellation: Stop generation early if needed

Basic Streaming

JavaScript/TypeScript

import { TrainlyClient } from "@trainly/react";

const trainly = new TrainlyClient({
  apiKey: process.env.TRAINLY_API_KEY!,
  chatId: process.env.TRAINLY_CHAT_ID!,
});

const stream = await trainly.queryStream({
  question: "Explain quantum computing in detail",
});

for await (const chunk of stream) {
  if (chunk.type === "content") {
    process.stdout.write(chunk.data); // Write as it arrives
  } else if (chunk.type === "context") {
    console.log("\nSources:", chunk.data.length);
  } else if (chunk.type === "end") {
    console.log("\nComplete!");
  } else if (chunk.type === "error") {
    console.error("Error:", chunk.data);
  }
}

Python

import trainly

client = trainly.Client(
    api_key="tk_your_key",
    chat_id="chat_abc123"
)

stream = client.query_stream(question="Explain quantum computing")

for chunk in stream:
    if chunk["type"] == "content":
        print(chunk["data"], end="", flush=True)
    elif chunk["type"] == "context":
        print(f"\nSources: {len(chunk['data'])}")
    elif chunk["type"] == "end":
        print("\nComplete!")

React Integration

Streaming Chat Component

'use client';

import { useState } from 'react';
import { TrainlyClient } from '@trainly/react';

export function StreamingChat() {
  const [messages, setMessages] = useState<Array<{ role: string; content: string }>>([]);
  const [input, setInput] = useState('');
  const [loading, setLoading] = useState(false);

  const trainly = new TrainlyClient({
    apiKey: process.env.NEXT_PUBLIC_TRAINLY_API_KEY!,
    chatId: process.env.NEXT_PUBLIC_TRAINLY_CHAT_ID!,
  });

  async function handleSubmit(e: React.FormEvent) {
    e.preventDefault();
    if (!input.trim()) return;

    // Add user message
    setMessages(prev => [...prev, { role: 'user', content: input }]);
    setInput('');
    setLoading(true);

    try {
      // Call your API route (not directly in client)
      const response = await fetch('/api/query', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ question: input }),
      });

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let answer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        answer += chunk;

        // Update last message or add new one
        setMessages(prev => {
          const newMessages = [...prev];
          if (newMessages[newMessages.length - 1]?.role === 'assistant') {
            newMessages[newMessages.length - 1].content = answer;
          } else {
            newMessages.push({ role: 'assistant', content: answer });
          }
          return newMessages;
        });
      }
    } catch (error) {
      console.error('Stream error:', error);
      setMessages(prev => [...prev, {
        role: 'assistant',
        content: 'Sorry, an error occurred. Please try again.'
      }]);
    } finally {
      setLoading(false);
    }
  }

  return (
    <div className="flex flex-col h-[600px]">
      {/* Messages */}
      <div className="flex-1 overflow-y-auto space-y-4 p-4">
        {messages.map((msg, i) => (
          <div
            key={i}
            className={`flex ${msg.role === 'user' ? 'justify-end' : 'justify-start'}`}
          >
            <div
              className={`max-w-[80%] p-3 rounded-lg ${
                msg.role === 'user'
                  ? 'bg-blue-500 text-white'
                  : 'bg-gray-200 text-gray-900'
              }`}
            >
              {msg.content}
            </div>
          </div>
        ))}
        {loading && (
          <div className="flex justify-start">
            <div className="bg-gray-200 p-3 rounded-lg">
              <div className="animate-pulse">Thinking...</div>
            </div>
          </div>
        )}
      </div>

      {/* Input */}
      <form onSubmit={handleSubmit} className="border-t p-4">
        <div className="flex gap-2">
          <input
            type="text"
            value={input}
            onChange={e => setInput(e.target.value)}
            placeholder="Ask a question..."
            disabled={loading}
            className="flex-1 p-2 border rounded focus:outline-none focus:ring-2 focus:ring-blue-500"
          />
          <button
            type="submit"
            disabled={loading}
            className="px-4 py-2 bg-blue-500 text-white rounded hover:bg-blue-600 disabled:opacity-50 disabled:cursor-not-allowed"
          >
            {loading ? 'Sending...' : 'Send'}
          </button>
        </div>
      </form>
    </div>
  );
}

Custom Hook

import { useState, useCallback } from 'react';

interface UseStreamingQueryOptions {
  onChunk?: (chunk: string) => void;
  onComplete?: (fullAnswer: string) => void;
  onError?: (error: Error) => void;
}

export function useStreamingQuery(options: UseStreamingQueryOptions = {}) {
  const [answer, setAnswer] = useState('');
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState<Error | null>(null);

  const query = useCallback(async (question: string) => {
    setLoading(true);
    setError(null);
    setAnswer('');

    try {
      const response = await fetch('/api/query', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ question }),
      });

      const reader = response.body!.getReader();
      const decoder = new TextDecoder();
      let fullAnswer = '';

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        fullAnswer += chunk;

        setAnswer(fullAnswer);
        options.onChunk?.(chunk);
      }

      options.onComplete?.(fullAnswer);
    } catch (err) {
      const error = err as Error;
      setError(error);
      options.onError?.(error);
    } finally {
      setLoading(false);
    }
  }, [options]);

  return { answer, loading, error, query };
}

// Usage
function MyComponent() {
  const { answer, loading, query } = useStreamingQuery({
    onChunk: (chunk) => console.log('Received:', chunk),
    onComplete: (full) => console.log('Done:', full),
  });

  return (
    <div>
      <button onClick={() => query('What is AI?')} disabled={loading}>
        Ask
      </button>
      <p>{answer}</p>
    </div>
  );
}

Next.js API Route

Server-side streaming endpoint.
// app/api/query/route.ts
import { TrainlyClient } from '@trainly/react';
import { NextRequest } from 'next/server';

const trainly = new TrainlyClient({
  apiKey: process.env.TRAINLY_API_KEY!,
  chatId: process.env.TRAINLY_CHAT_ID!,
});

export async function POST(request: NextRequest) {
  const { question } = await request.json();

  const stream = await trainly.queryStream({ question });
  const encoder = new TextEncoder();

  return new Response(
    new ReadableStream({
      async start(controller) {
        try {
          for await (const chunk of stream) {
            if (chunk.type === 'content') {
              controller.enqueue(encoder.encode(chunk.data));
            } else if (chunk.type === 'end') {
              controller.close();
            } else if (chunk.type === 'error') {
              controller.error(new Error(chunk.data));
            }
          }
        } catch (error) {
          controller.error(error);
        }
      },
    }),
    {
      headers: {
        'Content-Type': 'text/plain; charset=utf-8',
        'Transfer-Encoding': 'chunked',
      },
    }
  );
}

Express.js Streaming

import express from 'express';
import { TrainlyClient } from '@trainly/react';

const app = express();
app.use(express.json());

const trainly = new TrainlyClient({
  apiKey: process.env.TRAINLY_API_KEY!,
  chatId: process.env.TRAINLY_CHAT_ID!,
});

app.post('/api/stream', async (req, res) => {
  const { question } = req.body;

  // Set SSE headers
  res.setHeader('Content-Type', 'text/event-stream');
  res.setHeader('Cache-Control', 'no-cache');
  res.setHeader('Connection', 'keep-alive');

  try {
    const stream = await trainly.queryStream({ question });

    for await (const chunk of stream) {
      if (chunk.type === 'content') {
        res.write(`data: ${JSON.stringify(chunk)}\n\n`);
      } else if (chunk.type === 'end') {
        res.write('data: [DONE]\n\n');
        res.end();
      }
    }
  } catch (error) {
    res.write(`data: ${JSON.stringify({ type: 'error', data: error.message })}\n\n`);
    res.end();
  }
});

app.listen(3000);

Python Async Streaming

import trainly
import asyncio

client = trainly.AsyncClient(
    api_key="tk_your_key",
    chat_id="chat_abc123"
)

async def stream_query(question: str):
    """Async streaming example"""
    async for chunk in client.query_stream_async(question=question):
        if chunk["type"] == "content":
            print(chunk["data"], end="", flush=True)
        elif chunk["type"] == "end":
            print("\nDone!")

# Run
asyncio.run(stream_query("Explain AI"))

FastAPI Streaming

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import trainly

app = FastAPI()

client = trainly.Client(
    api_key="tk_your_key",
    chat_id="chat_abc123"
)

@app.post("/stream")
async def stream_query(question: str):
    async def generate():
        stream = client.query_stream(question=question)
        for chunk in stream:
            if chunk["type"] == "content":
                yield chunk["data"]
            elif chunk["type"] == "end":
                break

    return StreamingResponse(
        generate(),
        media_type="text/plain"
    )

Advanced Patterns

Cancellable Streams

function useCancellableStream() {
  const abortControllerRef = useRef<AbortController | null>(null);

  async function startStream(question: string) {
    // Cancel previous stream if running
    abortControllerRef.current?.abort();

    // Create new abort controller
    const controller = new AbortController();
    abortControllerRef.current = controller;

    const response = await fetch('/api/query', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ question }),
      signal: controller.signal, // Pass abort signal
    });

    // Handle stream...
  }

  function cancel() {
    abortControllerRef.current?.abort();
  }

  return { startStream, cancel };
}

Progress Tracking

async function streamWithProgress(question: string) {
  const stream = await trainly.queryStream({ question });

  let chunkCount = 0;
  let totalChars = 0;
  const startTime = Date.now();

  for await (const chunk of stream) {
    if (chunk.type === 'content') {
      chunkCount++;
      totalChars += chunk.data.length;

      const elapsed = Date.now() - startTime;
      const charsPerSec = totalChars / (elapsed / 1000);

      console.log(`Chunks: ${chunkCount} | Chars: ${totalChars} | Speed: ${charsPerSec.toFixed(0)} chars/sec`);
    }
  }
}

Retry on Stream Failure

async function robustStream(question: string, maxRetries: number = 3) {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const stream = await trainly.queryStream({ question });
      let answer = '';

      for await (const chunk of stream) {
        if (chunk.type === 'content') {
          answer += chunk.data;
        } else if (chunk.type === 'end') {
          return answer; // Success
        }
      }
    } catch (error) {
      console.error(`Stream attempt ${attempt} failed:`, error);

      if (attempt === maxRetries) {
        // All retries exhausted, fall back to non-streaming
        const response = await trainly.query({ question });
        return response.answer;
      }

      // Exponential backoff
      await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 1000));
    }
  }

  throw new Error('Stream failed after retries');
}

Error Handling

async function safeStream(question: string) {
  try {
    const stream = await trainly.queryStream({ question });
    let answer = '';

    for await (const chunk of stream) {
      if (chunk.type === 'content') {
        answer += chunk.data;
      } else if (chunk.type === 'error') {
        throw new Error(chunk.data);
      } else if (chunk.type === 'end') {
        return answer;
      }
    }
  } catch (error) {
    console.error('Stream error:', error);

    // Graceful fallback to non-streaming
    try {
      const response = await trainly.query({ question });
      return response.answer;
    } catch (fallbackError) {
      return 'Unable to process your question. Please try again.';
    }
  }
}

Best Practices

Use streaming for:
  • Long-form content generation (>100 words)
  • Interactive chat interfaces
  • Real-time user feedback needed
  • Mobile apps (progressive rendering)
Use non-streaming for:
  • Short queries (<50 words expected)
  • Batch processing
  • Analytics/logging (need complete response)
  • API integrations requiring full response
  • Stream chunks immediately, don’t buffer
  • Use async/await for better concurrency
  • Implement backpressure handling for slow clients
  • Monitor streaming latency (P50, P95, P99)
  • Target first chunk <500ms
  • Always implement fallback to non-streaming
  • Handle network interruptions gracefully
  • Show user-friendly error messages
  • Log stream failures for monitoring
  • Retry with exponential backoff
  • Show loading indicator before first chunk
  • Animate text appearance for smoothness
  • Allow users to stop generation
  • Display sources after content
  • Provide feedback during streaming

Performance Targets

Streaming Metrics:
  • First chunk latency: <500ms (time to first content)
  • Chunk throughput: >20 chunks/second
  • Total latency: <5 seconds for 500-word response
  • Stream stability: <1% failure rate
Monitor with analytics:
const traces = await trainly.analytics.getQueryTraces({ limit: 100 });

const streamingMetrics = traces.map(t => ({
  firstChunkLatency: t.timing?.retrieval_ms || 0,
  totalLatency: t.duration_ms,
  success: t.status === 'completed',
}));

const avgFirstChunk = streamingMetrics.reduce((sum, m) => sum + m.firstChunkLatency, 0) / streamingMetrics.length;
const avgTotal = streamingMetrics.reduce((sum, m) => sum + m.totalLatency, 0) / streamingMetrics.length;

console.log(`First chunk: ${avgFirstChunk}ms, Total: ${avgTotal}ms`);

Next Steps