Overview
This guide covers best practices for building production applications with Trainly, including security, performance, error handling, and cost optimization.Following these practices will help you build robust, scalable applications
with Trainly.
Security
API Key Management
Never expose API keys in client-side code or public repositories!
✅ Secure Patterns
✅ Secure Patterns
Environment VariablesServer-Side OnlySecret Management Services
Copy
// .env.local (never commit this file!)
TRAINLY_API_KEY=tk_your_api_key
TRAINLY_CHAT_ID=chat_abc123
// Load securely
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY,
chatId: process.env.TRAINLY_CHAT_ID
});
Copy
// ✅ GOOD - API route (Next.js)
// app/api/query/route.ts
import { TrainlyClient } from '@trainly/react';
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!, // Server-side only
chatId: process.env.TRAINLY_CHAT_ID!
});
export async function POST(request: Request) {
const { question } = await request.json();
return trainly.query({ question });
}
Copy
# Using AWS Secrets Manager
import boto3
from trainly import TrainlyClient
def get_secret(secret_name):
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
secrets = get_secret('trainly-credentials')
trainly = TrainlyClient(
api_key=secrets['api_key'],
chat_id=secrets['chat_id']
)
❌ Insecure Patterns (Avoid)
❌ Insecure Patterns (Avoid)
Client-Side ExposureHardcoded KeysLogging Sensitive Data
Copy
// ❌ BAD - Exposed in browser
const trainly = new TrainlyClient({
apiKey: 'tk_your_api_key', // Visible to users!
chatId: 'chat_abc123'
});
Copy
# ❌ BAD - Hardcoded in source
trainly = TrainlyClient(
api_key="tk_abc123...", # In source control!
chat_id="chat_xyz"
)
Copy
// ❌ BAD - Logging API keys
console.log('Config:', {
apiKey: process.env.TRAINLY_API_KEY, // Don't log!
chatId: chatId
});
Input Validation
Always validate and sanitize user input:Copy
function validateQuestion(question: string): string {
// Remove excessive whitespace
question = question.trim();
// Check length
if (question.length === 0) {
throw new Error("Question cannot be empty");
}
if (question.length > 5000) {
throw new Error("Question too long (max 5000 characters)");
}
// Basic XSS prevention
const sanitized = question
.replace(/<script>/gi, "")
.replace(/<\/script>/gi, "")
.replace(/javascript:/gi, "");
return sanitized;
}
// Usage
try {
const cleanQuestion = validateQuestion(userInput);
const response = await trainly.query({ question: cleanQuestion });
} catch (error) {
console.error("Validation failed:", error.message);
}
Rate Limiting
Implement client-side rate limiting:Copy
from time import time, sleep
from collections import deque
class RateLimiter:
def __init__(self, max_requests: int = 60, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
def wait_if_needed(self):
"""Wait if rate limit would be exceeded"""
now = time()
# Remove old requests outside window
while self.requests and self.requests[0] < now - self.window_seconds:
self.requests.popleft()
# Check if at limit
if len(self.requests) >= self.max_requests:
# Calculate wait time
oldest = self.requests[0]
wait_time = self.window_seconds - (now - oldest)
if wait_time > 0:
print(f"Rate limit reached. Waiting {wait_time:.1f}s...")
sleep(wait_time)
# Add current request
self.requests.append(now)
# Usage
limiter = RateLimiter(max_requests=60, window_seconds=60)
def rate_limited_query(question: str):
limiter.wait_if_needed()
return trainly.query(question=question)
Performance Optimization
Caching Strategies
- In-Memory Cache
- Redis Cache
- Database Cache
Copy
import { LRUCache } from 'lru-cache';
const queryCache = new LRUCache<string, QueryResponse>({
max: 100,
ttl: 1000 * 60 * 5, // 5 minutes
updateAgeOnGet: true
});
async function cachedQuery(question: string) {
const cacheKey = `query:${question}:${model}`;
// Check cache
let response = queryCache.get(cacheKey);
if (response) {
console.log('Cache hit!');
return response;
}
// Query API
response = await trainly.query({ question });
// Cache response
queryCache.set(cacheKey, response);
return response;
}
Copy
import Redis from 'ioredis';
const redis = new Redis(process.env.REDIS_URL);
async function cachedQuery(question: string) {
const cacheKey = `trainly:query:${hash(question)}`;
// Try cache
const cached = await redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
// Query API
const response = await trainly.query({ question });
// Cache for 5 minutes
await redis.setex(
cacheKey,
300,
JSON.stringify(response)
);
return response;
}
Copy
from datetime import datetime, timedelta
from sqlalchemy import Column, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class QueryCache(Base):
__tablename__ = 'query_cache'
question = Column(String, primary_key=True)
answer = Column(Text)
context = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
expires_at = Column(DateTime)
def cached_query(question: str, ttl_minutes: int = 5):
# Check database cache
cached = session.query(QueryCache).filter(
QueryCache.question == question,
QueryCache.expires_at > datetime.utcnow()
).first()
if cached:
return {
"answer": cached.answer,
"context": json.loads(cached.context)
}
# Query API
response = trainly.query(question=question)
# Save to database
cache_entry = QueryCache(
question=question,
answer=response.answer,
context=json.dumps([c.dict() for c in response.context]),
expires_at=datetime.utcnow() + timedelta(minutes=ttl_minutes)
)
session.add(cache_entry)
session.commit()
return response
Connection Pooling
Reuse client instances:Copy
// ✅ GOOD - Singleton pattern
class TrainlyService {
private static instance: TrainlyClient;
static getInstance(): TrainlyClient {
if (!this.instance) {
this.instance = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
}
return this.instance;
}
}
// Use everywhere
const trainly = TrainlyService.getInstance();
const response = await trainly.query({ question });
Copy
# ✅ GOOD - Module-level singleton
# trainly_service.py
from trainly import TrainlyClient
import os
_trainly_client = None
def get_trainly_client() -> TrainlyClient:
global _trainly_client
if _trainly_client is None:
_trainly_client = TrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
return _trainly_client
# Use everywhere
trainly = get_trainly_client()
response = trainly.query(question="...")
Async Operations
Use async for concurrent operations:Copy
// Process multiple queries concurrently
async function batchProcess(questions: string[]) {
const promises = questions.map((question) => trainly.query({ question }));
const responses = await Promise.all(promises);
return responses;
}
// Usage
const questions = [
"What is the introduction about?",
"What is the methodology?",
"What are the conclusions?",
];
const responses = await batchProcess(questions);
Copy
import asyncio
from trainly import AsyncTrainlyClient
async def batch_process(questions: list):
trainly = AsyncTrainlyClient(
api_key="tk_key",
chat_id="chat_id"
)
# Run queries concurrently
tasks = [
trainly.query(question=q)
for q in questions
]
responses = await asyncio.gather(*tasks)
return responses
# Usage
questions = [
'What is the introduction about?',
'What is the methodology?',
'What are the conclusions?'
]
responses = asyncio.run(batch_process(questions))
Error Handling
Comprehensive Error Handling
Copy
import {
TrainlyClient,
TrainlyError,
RateLimitError,
AuthenticationError,
ValidationError,
NetworkError,
} from "@trainly/react";
async function robustQuery(question: string, maxRetries: number = 3) {
let lastError: Error;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await trainly.query({ question });
} catch (error) {
lastError = error;
if (error instanceof RateLimitError) {
// Respect retry-after header
const delay = error.retryAfter * 1000 || 60000;
console.log(`Rate limited. Waiting ${delay}ms...`);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
if (error instanceof AuthenticationError) {
// Don't retry auth errors
console.error("Authentication failed:", error.message);
throw error;
}
if (error instanceof ValidationError) {
// Don't retry validation errors
console.error("Invalid request:", error.message);
throw error;
}
if (error instanceof NetworkError && attempt < maxRetries) {
// Retry network errors with exponential backoff
const delay = Math.pow(2, attempt) * 1000;
console.log(`Network error. Retrying in ${delay}ms...`);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
if (
error instanceof TrainlyError &&
error.status >= 500 &&
attempt < maxRetries
) {
// Retry server errors
const delay = Math.pow(2, attempt) * 1000;
console.log(`Server error. Retrying in ${delay}ms...`);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
// Don't retry other errors
throw error;
}
}
throw lastError!;
}
Circuit Breaker Pattern
Copy
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
timeout_seconds: int = 60,
success_threshold: int = 2
):
self.failure_threshold = failure_threshold
self.timeout = timedelta(seconds=timeout_seconds)
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failures = 0
self.successes = 0
self.last_failure_time = None
def call(self, func, *args, **kwargs):
"""Execute function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
# Check if timeout has passed
if datetime.now() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.successes = 0
else:
raise Exception("Circuit breaker is OPEN - service unavailable")
try:
result = func(*args, **kwargs)
# Success
if self.state == CircuitState.HALF_OPEN:
self.successes += 1
if self.successes >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failures = 0
return result
except Exception as e:
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"⚠️ Circuit breaker opened after {self.failures} failures")
raise
# Usage
circuit_breaker = CircuitBreaker(
failure_threshold=5,
timeout_seconds=60
)
def query_with_circuit_breaker(question: str):
return circuit_breaker.call(
trainly.query,
question=question
)
Monitoring & Observability
Logging Best Practices
Copy
import winston from "winston";
// Configure structured logging
const logger = winston.createLogger({
level: "info",
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json(),
),
transports: [
new winston.transports.File({ filename: "error.log", level: "error" }),
new winston.transports.File({ filename: "combined.log" }),
],
});
// Log Trainly operations
async function loggedQuery(question: string, userId: string) {
const startTime = Date.now();
logger.info("Trainly query started", {
userId,
question: question.substring(0, 100), // Truncate for privacy
timestamp: new Date().toISOString(),
});
try {
const response = await trainly.query({ question });
const duration = Date.now() - startTime;
logger.info("Trainly query completed", {
userId,
duration,
model: response.model,
tokens: response.usage.total_tokens,
citationCount: response.context.length,
timestamp: new Date().toISOString(),
});
return response;
} catch (error) {
const duration = Date.now() - startTime;
logger.error("Trainly query failed", {
userId,
duration,
error: error.message,
errorType: error.constructor.name,
status: error.status,
timestamp: new Date().toISOString(),
});
throw error;
}
}
Metrics & Analytics
Copy
from dataclasses import dataclass
from datetime import datetime
from typing import List
import json
@dataclass
class QueryMetrics:
timestamp: datetime
user_id: str
question_length: int
answer_length: int
model: str
tokens: int
credits: float
duration_ms: float
success: bool
error: str = None
class MetricsCollector:
def __init__(self):
self.metrics: List[QueryMetrics] = []
def record_query(
self,
user_id: str,
question: str,
response: any,
duration_ms: float,
success: bool,
error: str = None
):
"""Record query metrics"""
# Calculate credits used
multiplier = MODEL_MULTIPLIERS.get(response.model if success else "gpt-4o-mini", 1)
tokens = response.usage.total_tokens if success else 0
credits = (tokens / 1000) * multiplier
metric = QueryMetrics(
timestamp=datetime.now(),
user_id=user_id,
question_length=len(question),
answer_length=len(response.answer) if success else 0,
model=response.model if success else "unknown",
tokens=tokens,
credits=credits,
duration_ms=duration_ms,
success=success,
error=error
)
self.metrics.append(metric)
# Optional: Send to analytics service
self.send_to_analytics(metric)
def send_to_analytics(self, metric: QueryMetrics):
"""Send metrics to your analytics service"""
# Integrate with DataDog, New Relic, etc.
pass
def get_stats(self) -> dict:
"""Get aggregated statistics"""
if not self.metrics:
return {}
successful = [m for m in self.metrics if m.success]
failed = [m for m in self.metrics if not m.success]
return {
"total_queries": len(self.metrics),
"successful": len(successful),
"failed": len(failed),
"success_rate": len(successful) / len(self.metrics),
"avg_duration_ms": sum(m.duration_ms for m in self.metrics) / len(self.metrics),
"total_credits": sum(m.credits for m in successful),
"avg_tokens": sum(m.tokens for m in successful) / len(successful) if successful else 0
}
# Usage
metrics = MetricsCollector()
import time
start = time.time()
try:
response = trainly.query(question="What is AI?")
duration = (time.time() - start) * 1000
metrics.record_query(
user_id="user_123",
question="What is AI?",
response=response,
duration_ms=duration,
success=True
)
except Exception as e:
duration = (time.time() - start) * 1000
metrics.record_query(
user_id="user_123",
question="What is AI?",
response=None,
duration_ms=duration,
success=False,
error=str(e)
)
# Get statistics
stats = metrics.get_stats()
print(f"Success rate: {stats['success_rate'] * 100:.1f}%")
print(f"Avg duration: {stats['avg_duration_ms']:.0f}ms")
print(f"Total credits: {stats['total_credits']:.2f}")
Cost Optimization
Model Selection Strategy
Copy
function selectOptimalModel(
questionComplexity: "simple" | "medium" | "complex",
): string {
const modelStrategy = {
simple: "gpt-4o-mini", // 1x - Quick facts, definitions
medium: "gpt-4o-mini", // 1x - Most use cases
complex: "gpt-4o", // 15x - Deep analysis
};
return modelStrategy[questionComplexity];
}
// Detect complexity
function assessComplexity(question: string): "simple" | "medium" | "complex" {
const length = question.length;
const keywords = ["analyze", "compare", "explain in detail", "comprehensive"];
if (
length < 50 &&
!keywords.some((k) => question.toLowerCase().includes(k))
) {
return "simple";
}
if (keywords.some((k) => question.toLowerCase().includes(k))) {
return "complex";
}
return "medium";
}
// Usage
async function smartQuery(question: string) {
const complexity = assessComplexity(question);
const model = selectOptimalModel(complexity);
console.log(`Using ${model} for ${complexity} query`);
return await trainly.query({
question,
model,
});
}
Token Optimization
Copy
def optimize_query(question: str, context_chunks: int = 5):
"""Optimize token usage"""
# Truncate very long questions
if len(question) > 1000:
question = question[:1000] + "..."
# Adjust max_tokens based on question
if len(question) < 50:
max_tokens = 200 # Short answer expected
elif len(question) < 200:
max_tokens = 500 # Medium answer
else:
max_tokens = 1000 # Long answer
response = trainly.query(
question=question,
max_tokens=max_tokens,
# Lower temperature for more focused answers = fewer tokens
temperature=0.5
)
return response
Production Deployment
Health Checks
Copy
// health-check.ts
export async function healthCheck(): Promise<{
status: "healthy" | "degraded" | "unhealthy";
checks: Record<string, boolean>;
timestamp: number;
}> {
const checks: Record<string, boolean> = {};
try {
// Check Trainly connectivity
const response = await fetch("https://api.trainlyai.com/v1/health");
checks.trainly = response.ok;
} catch {
checks.trainly = false;
}
try {
// Check Redis cache
await redis.ping();
checks.redis = true;
} catch {
checks.redis = false;
}
try {
// Check database
await db.query("SELECT 1");
checks.database = true;
} catch {
checks.database = false;
}
// Determine overall status
const allHealthy = Object.values(checks).every((v) => v);
const someHealthy = Object.values(checks).some((v) => v);
return {
status: allHealthy ? "healthy" : someHealthy ? "degraded" : "unhealthy",
checks,
timestamp: Date.now(),
};
}
// Express endpoint
app.get("/health", async (req, res) => {
const health = await healthCheck();
const statusCode =
health.status === "healthy"
? 200
: health.status === "degraded"
? 200
: 503;
res.status(statusCode).json(health);
});
Graceful Shutdown
Copy
// server.ts
import { TrainlyClient } from "@trainly/react";
const trainly = new TrainlyClient({
apiKey: process.env.TRAINLY_API_KEY!,
chatId: process.env.TRAINLY_CHAT_ID!,
});
let isShuttingDown = false;
// Handle shutdown signals
process.on("SIGTERM", gracefulShutdown);
process.on("SIGINT", gracefulShutdown);
async function gracefulShutdown() {
if (isShuttingDown) return;
isShuttingDown = true;
console.log("Received shutdown signal. Gracefully shutting down...");
// Stop accepting new requests
server.close(() => {
console.log("Closed server");
});
// Wait for ongoing requests to complete (with timeout)
await Promise.race([
waitForOngoingRequests(),
new Promise((resolve) => setTimeout(resolve, 30000)), // 30s timeout
]);
// Cleanup
await trainly.close();
console.log("Shutdown complete");
process.exit(0);
}
let ongoingRequests = 0;
function waitForOngoingRequests() {
return new Promise((resolve) => {
const check = setInterval(() => {
if (ongoingRequests === 0) {
clearInterval(check);
resolve();
}
}, 100);
});
}
// Track requests
app.use((req, res, next) => {
ongoingRequests++;
res.on("finish", () => ongoingRequests--);
next();
});
Testing
Unit Tests
Copy
// trainly.test.ts
import { TrainlyClient } from "@trainly/react";
import { describe, it, expect, vi } from "vitest";
describe("TrainlyClient", () => {
it("should query successfully", async () => {
const trainly = new TrainlyClient({
apiKey: "tk_test_key",
chatId: "chat_test_123",
});
// Mock the fetch
global.fetch = vi.fn().mockResolvedValue({
ok: true,
json: async () => ({
answer: "Test answer",
context: [],
chat_id: "chat_test_123",
model: "gpt-4o-mini",
usage: { total_tokens: 100 },
}),
});
const response = await trainly.query({
question: "Test question",
});
expect(response.answer).toBe("Test answer");
expect(response.model).toBe("gpt-4o-mini");
});
it("should handle rate limits", async () => {
const trainly = new TrainlyClient({
apiKey: "tk_test_key",
chatId: "chat_test_123",
});
global.fetch = vi.fn().mockResolvedValue({
ok: false,
status: 429,
headers: new Map([["Retry-After", "60"]]),
});
await expect(trainly.query({ question: "Test" })).rejects.toThrow(
"Rate limit exceeded",
);
});
});
Integration Tests
Copy
# test_trainly_integration.py
import pytest
from trainly import TrainlyClient
import os
@pytest.fixture
def trainly_client():
return TrainlyClient(
api_key=os.getenv("TRAINLY_TEST_API_KEY"),
chat_id=os.getenv("TRAINLY_TEST_CHAT_ID")
)
def test_query_basic(trainly_client):
response = trainly_client.query(
question="What is 2+2?"
)
assert response.answer
assert response.chat_id
assert response.usage.total_tokens > 0
def test_query_with_custom_model(trainly_client):
response = trainly_client.query(
question="Explain AI",
model="gpt-4o"
)
assert response.model == "gpt-4o"
assert len(response.answer) > 0
def test_file_upload(trainly_client):
with open("test_file.txt", "rb") as file:
result = trainly_client.upload_file(
file=file,
filename="test_file.txt"
)
assert result.success
assert result.file_id
# Cleanup
trainly_client.delete_file(result.file_id)
@pytest.mark.asyncio
async def test_async_query():
from trainly import AsyncTrainlyClient
trainly = AsyncTrainlyClient(
api_key=os.getenv("TRAINLY_TEST_API_KEY"),
chat_id=os.getenv("TRAINLY_TEST_CHAT_ID")
)
response = await trainly.query(question="Test")
assert response.answer
Scalability
Load Balancing
Copy
// multi-instance-client.ts
class LoadBalancedTrainlyService {
private clients: TrainlyClient[];
private currentIndex: number = 0;
constructor(configs: Array<{ apiKey: string; chatId: string }>) {
this.clients = configs.map((config) => new TrainlyClient(config));
}
getNextClient(): TrainlyClient {
// Round-robin load balancing
const client = this.clients[this.currentIndex];
this.currentIndex = (this.currentIndex + 1) % this.clients.length;
return client;
}
async query(question: string) {
const client = this.getNextClient();
return await client.query({ question });
}
}
// Usage with multiple API keys
const service = new LoadBalancedTrainlyService([
{ apiKey: "tk_key_1", chatId: "chat_1" },
{ apiKey: "tk_key_2", chatId: "chat_2" },
{ apiKey: "tk_key_3", chatId: "chat_3" },
]);
const response = await service.query("What is AI?");
Queueing
Copy
from queue import Queue
from threading import Thread
from trainly import TrainlyClient
class TrainlyWorkerPool:
def __init__(self, num_workers: int = 5):
self.queue = Queue()
self.workers = []
self.trainly = TrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
# Start workers
for _ in range(num_workers):
worker = Thread(target=self._worker, daemon=True)
worker.start()
self.workers.append(worker)
def _worker(self):
"""Worker thread to process queries"""
while True:
task = self.queue.get()
if task is None:
break
question, callback = task
try:
response = self.trainly.query(question=question)
callback(response, None)
except Exception as e:
callback(None, e)
finally:
self.queue.task_done()
def submit(self, question: str, callback):
"""Submit query to queue"""
self.queue.put((question, callback))
def wait_completion(self):
"""Wait for all tasks to complete"""
self.queue.join()
def shutdown(self):
"""Shutdown worker pool"""
for _ in self.workers:
self.queue.put(None)
for worker in self.workers:
worker.join()
# Usage
pool = TrainlyWorkerPool(num_workers=5)
def handle_response(response, error):
if error:
print(f"Error: {error}")
else:
print(f"Answer: {response.answer}")
# Submit multiple queries
for question in questions:
pool.submit(question, handle_response)
# Wait for completion
pool.wait_completion()
pool.shutdown()
Compliance & Privacy
GDPR Compliance
Copy
class GDPRCompliantTrainlyService {
private trainly: TrainlyClient;
constructor(trainly: TrainlyClient) {
this.trainly = trainly;
}
async queryWithConsent(
question: string,
userId: string,
hasConsent: boolean
) {
if (!hasConsent) {
throw new Error('User has not consented to data processing');
}
// Log consent
await this.logConsent(userId, 'query', question);
return await this.trainly.query({ question });
}
async exportUserData(userId: string) {
"""Export all user data for GDPR request"""
const files = await this.trainly.listFiles();
// Filter to user's files
const userFiles = files.files.filter(f =>
f.file_id.includes(userId)
);
return {
userId,
files: userFiles,
exportDate: new Date().toISOString(),
format: 'json'
};
}
async deleteUserData(userId: string) {
"""Delete all user data for GDPR request"""
const files = await this.trainly.listFiles();
// Delete user's files
for (const file of files.files) {
if (file.file_id.includes(userId)) {
await this.trainly.deleteFile(file.file_id);
}
}
return {
userId,
filesDeleted: files.files.length,
deletionDate: new Date().toISOString()
};
}
private async logConsent(
userId: string,
action: string,
details: string
) {
// Log to compliance database
await db.consentLog.create({
userId,
action,
details: details.substring(0, 100),
timestamp: new Date(),
ipAddress: req.ip
});
}
}
Monitoring Checklist
1
Setup Logging
Configure structured logging with appropriate levels
2
Track Metrics
Monitor query latency, success rate, and credit usage
3
Set Alerts
Alert on high error rates, slow responses, or credit exhaustion
4
Dashboard
Create dashboard to visualize key metrics
5
Error Tracking
Integrate with Sentry, Rollbar, or similar services
6
Uptime Monitoring
Monitor API availability with external service
Performance Benchmarks
Target Metrics
| Metric | Target | Alert Threshold |
|---|---|---|
| Response Time (p95) | < 2s | > 5s |
| Success Rate | > 99% | < 95% |
| Cache Hit Rate | > 60% | < 40% |
| Credits/Query | < 1.0 | > 5.0 |
| Error Rate | < 1% | > 5% |
Optimization Checklist
✅ Implement Caching
Cache common queries for 5+ minutes
✅ Use Appropriate Models
Don’t use GPT-4 for simple queries
✅ Set Token Limits
Use max_tokens to control response length
✅ Scope Filtering
Filter to relevant documents only
✅ Connection Pooling
Reuse client instances
✅ Async Operations
Use async for concurrent queries
✅ Error Handling
Implement retry logic with backoff
✅ Monitoring
Track all key metrics