Installation
Install the Trainly SDK using pip or poetry:Copy
pip install trainly
Quick Start
Copy
from trainly import TrainlyClient
import os
trainly = TrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
# Ask a question
response = trainly.query("What are the main findings?")
print("Answer:", response.answer)
print("Citations:", len(response.context))
Initialization
Basic Initialization
Copy
from trainly import TrainlyClient
import os
from dotenv import load_dotenv
load_dotenv()
api_key = os.getenv("TRAINLY_API_KEY")
chat_id = os.getenv("TRAINLY_CHAT_ID")
if not api_key or not chat_id:
raise ValueError("TRAINLY_API_KEY and TRAINLY_CHAT_ID must be set")
client = TrainlyClient(api_key=api_key, chat_id=chat_id)
print(f"Client initialized with chat_id: {client.chat_id}")
print(f"Base URL: {client.base_url}")
print(f"Timeout: {client.timeout}s")
Querying Documents
Basic Query
Copy
from trainly import TrainlyClient, QueryResponse
client = TrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
response = client.query("What is the main topic?")
assert isinstance(response, QueryResponse), "Response should be QueryResponse"
assert hasattr(response, 'answer'), "Response should have answer"
assert isinstance(response.answer, str), "Answer should be a string"
assert len(response.answer) > 0, "Answer should not be empty"
print(f"Answer length: {len(response.answer)} characters")
print(f"Context chunks: {len(response.context)}")
if response.usage:
print(f"Token usage: {response.usage.total_tokens} total tokens")
Query with Custom Parameters
Copy
response = client.query(
question="Explain the key concepts",
model="gpt-4o-mini",
temperature=0.7,
max_tokens=500,
include_context=True
)
assert isinstance(response, QueryResponse), "Response should be QueryResponse"
print(f"Model: {response.model or 'default'}")
print(f"Answer preview: {response.answer[:100]}...")
Query without Context
Copy
response = client.query(
question="What are the findings?",
include_context=False
)
assert isinstance(response, QueryResponse), "Response should be QueryResponse"
print(f"Context chunks returned: {len(response.context)}")
Streaming Responses
Streaming Query
Copy
from trainly import TrainlyClient
chunks = []
content_parts = []
for chunk in client.query_stream("Explain the methodology"):
chunks.append(chunk)
if chunk.is_content:
content_parts.append(chunk.data)
elif chunk.is_end:
print("Received end marker")
assert len(chunks) > 0, "Should receive at least one chunk"
assert any(chunk.is_end for chunk in chunks), "Should receive end marker"
full_content = "".join(content_parts)
assert len(full_content) > 0, "Should receive content"
print(f"Total chunks received: {len(chunks)}")
print(f"Content length: {len(full_content)} characters")
File Management
Upload Files
Copy
from trainly import TrainlyClient, UploadResult
import tempfile
import os
client = TrainlyClient(
api_key=os.getenv("TRAINLY_API_KEY"),
chat_id=os.getenv("TRAINLY_CHAT_ID")
)
# Create a temporary test file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("This is a test document for Trainly SDK testing.\n")
f.write("It contains multiple lines of text.\n")
f.write("The content should be searchable after upload.")
temp_path = f.name
try:
result = client.upload_file(temp_path)
assert isinstance(result, UploadResult), "Result should be UploadResult"
assert result.success is True, "Upload should be successful"
assert result.file_id is not None, "File ID should be returned"
assert result.size_bytes > 0, "File size should be greater than 0"
print(f"Filename: {result.filename}")
print(f"File ID: {result.file_id}")
print(f"Size: {result.size_bytes} bytes")
print(f"Status: {result.processing_status}")
finally:
# Clean up temp file
os.unlink(temp_path)
Upload File with Scope Values
Copy
import tempfile
import os
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("Test document with scope values")
temp_path = f.name
try:
result = client.upload_file(
temp_path,
scope_values={
"test_category": "sdk_testing",
"test_id": "test_123"
}
)
assert result.success is True, "Upload should be successful"
print("File uploaded with scope values")
finally:
os.unlink(temp_path)
List Files
Copy
from trainly import TrainlyClient, FileListResult
result = client.list_files()
assert isinstance(result, FileListResult), "Result should be FileListResult"
assert result.success is True, "List should be successful"
assert isinstance(result.files, list), "Files should be a list"
print(f"Total files: {result.total_files}")
print(f"Total size: {result.total_size_bytes} bytes")
if result.files:
print(f"Sample file: {result.files[0].filename}")
print(f" - File ID: {result.files[0].file_id}")
print(f" - Size: {result.files[0].size_bytes} bytes")
print(f" - Chunks: {result.files[0].chunk_count}")
Delete File
Copy
from trainly import TrainlyClient, FileDeleteResult
file_id = "your_file_id_here"
if not file_id:
print("Skipping delete test - no file ID available")
else:
result = client.delete_file(file_id)
assert isinstance(result, FileDeleteResult), "Result should be FileDeleteResult"
assert result.success is True, "Delete should be successful"
print(f"Deleted file: {result.filename}")
print(f"Chunks deleted: {result.chunks_deleted}")
print(f"Size freed: {result.size_bytes_freed} bytes")
Error Handling
Basic Error Handling
Copy
from trainly import TrainlyClient, TrainlyError
# Test 1: Empty file ID for delete
try:
client.delete_file("")
print("Should have raised error for empty file ID")
except TrainlyError as e:
print(f"Correctly raised error for empty file ID: {e}")
# Test 2: Non-existent file upload
try:
client.upload_file("/nonexistent/file.pdf")
print("Should have raised error for non-existent file")
except TrainlyError as e:
print(f"Correctly raised error for non-existent file: {e}")
Context Manager
Using Context Manager
Copy
from trainly import TrainlyClient, QueryResponse
import os
api_key = os.getenv("TRAINLY_API_KEY")
chat_id = os.getenv("TRAINLY_CHAT_ID")
with TrainlyClient(api_key=api_key, chat_id=chat_id) as client:
response = client.query("Test question")
assert isinstance(response, QueryResponse)
print("Context manager works correctly")
File Lifecycle
Complete File Lifecycle
Copy
from trainly import TrainlyClient
import tempfile
import os
# Upload
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("Lifecycle test document content")
temp_path = f.name
try:
upload_result = client.upload_file(temp_path)
file_id = upload_result.file_id
print(f"Uploaded file: {upload_result.filename}")
# List and verify
list_result = client.list_files()
file_ids = [f.file_id for f in list_result.files]
assert file_id in file_ids, "File should appear in list"
print("File appears in file list")
# Query about the file
response = client.query("What is in the test document?")
print("Query successful after upload")
# Delete
delete_result = client.delete_file(file_id)
assert delete_result.success is True
print("File deleted successfully")
finally:
os.unlink(temp_path)
Query Specific File Content
Upload and Verify Content
Copy
from trainly import TrainlyClient
import tempfile
import os
import time
# First, delete all existing files to start with a clean slate
print("Cleaning up: Deleting all existing files...")
list_result = client.list_files()
if list_result.files:
for file_info in list_result.files:
try:
client.delete_file(file_info.file_id)
print(f" Deleted: {file_info.filename}")
except Exception as e:
print(f" Could not delete {file_info.filename}: {e}")
print(f"Cleaned up {len(list_result.files)} existing files")
else:
print("No existing files to clean up")
# Wait a moment for deletions to propagate
time.sleep(5)
# Create a file with very specific information
specific_content = """
Product Information Document
Product Name: UltraSound Pro 5000 Headphones
Model Number: USP-5000-BLK
Release Date: September 22, 2024
Price: $349.95
Key Specifications:
- Driver Size: 50mm neodymium drivers
- Frequency Response: 20Hz - 40kHz
- Impedance: 32 ohms
- Battery Life: 48 hours with ANC on
- Charging: USB-C fast charging (10 min = 6 hours)
- Weight: 285 grams
Features:
- Active Noise Cancellation (ANC) with 8 microphones
- Bluetooth 5.3 with multipoint connectivity
- Spatial audio support
- Premium leather ear cushions
- Foldable design with carrying case
Warranty: 2 years manufacturer warranty
"""
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write(specific_content)
temp_path = f.name
file_id = None
try:
# Upload the file
upload_result = client.upload_file(temp_path)
file_id = upload_result.file_id
print(f"Uploaded file with specific content: {upload_result.filename}")
# Wait a moment for processing (if needed)
time.sleep(30)
print("File processed and indexed, continuing with tests...")
# Test 1: Query about the product name
print("Testing query about product name...")
response1 = client.query("What is the product name mentioned in the document?")
assert "UltraSound Pro 5000" in response1.answer or "USP-5000" in response1.answer or "UltraSound" in response1.answer, \
f"Answer should contain product name. Got: {response1.answer}"
print("✓ Product name query verified")
# Test 2: Query about the price
print("Testing query about price...")
response2 = client.query("What is the price of the UltraSound Pro 5000?")
assert "$349" in response2.answer or "349" in response2.answer, \
f"Answer should contain price. Got: {response2.answer}"
print("✓ Price query verified")
# Test 3: Query about specifications
print("Testing query about battery life...")
response3 = client.query("What is the battery life of the UltraSound Pro 5000 headphones?")
assert "48" in response3.answer, \
f"Answer should contain '48' hours. Got: {response3.answer}"
print("✓ Battery life specification query verified")
# Test 4: Query about release date
print("Testing query about release date...")
response4 = client.query("When was the UltraSound Pro 5000 released?")
assert "September" in response4.answer or "2024" in response4.answer or "September 22" in response4.answer, \
f"Answer should contain release date. Got: {response4.answer}"
print("✓ Release date query verified")
# Test 5: Query about warranty
print("Testing query about warranty...")
response5 = client.query("What is the warranty period for the UltraSound Pro 5000?")
assert "2" in response5.answer and ("year" in response5.answer.lower() or "yr" in response5.answer.lower()), \
f"Answer should contain '2 years'. Got: {response5.answer}"
print("✓ Warranty query verified")
print(f"Response: {response5.answer}")
print("All content verification checks passed!")
# Test 6: Delete the file and verify AI no longer knows the content
print("Testing query after file deletion...")
delete_result = client.delete_file(file_id)
assert delete_result.success is True, "File deletion should succeed"
print(f"✓ File deleted: {delete_result.filename}")
# Wait a moment for deletion to propagate
time.sleep(5)
# Query about the deleted content - AI should not know
response_after_delete = client.query("What is the price of the UltraSound Pro 5000?")
# Check that the AI indicates it doesn't know or can't find the information
no_knowledge_indicators = [
"don't know",
"do not know",
"don't have",
"do not have",
"not found",
"no information",
"cannot find",
"can't find",
"unable to find",
"not available",
"not mentioned",
"don't see",
"do not see"
]
answer_lower = response_after_delete.answer.lower()
has_no_knowledge = any(indicator in answer_lower for indicator in no_knowledge_indicators)
# Also check that it doesn't contain the specific price anymore
does_not_contain_price = "$349" not in response_after_delete.answer and "349.95" not in response_after_delete.answer
if has_no_knowledge or does_not_contain_price:
print("✓ AI correctly indicates it doesn't know after file deletion")
print(f"Response after deletion: {response_after_delete.answer}")
else:
print(f"AI still claims to know the information after deletion. Response: {response_after_delete.answer}")
print("This might indicate the file hasn't been fully removed from the index yet")
file_id = None # Clear file_id since we already deleted it
finally:
# Clean up
os.unlink(temp_path)
if file_id:
try:
client.delete_file(file_id)
print(f"Cleaned up test file: {file_id}")
except Exception as e:
print(f"Could not delete test file: {e}")
Type Hints
Copy
from trainly import TrainlyClient, TrainlyError
from trainly.models import (
QueryResponse,
UploadResult,
FileListResult,
FileDeleteResult,
StreamChunk,
)