Skip to main content

Installation

Install the Trainly SDK using pip or poetry:
pip install trainly

Quick Start

from trainly import TrainlyClient
import os

trainly = TrainlyClient(
    api_key=os.getenv("TRAINLY_API_KEY"),
    chat_id=os.getenv("TRAINLY_CHAT_ID")
)

# Ask a question
response = trainly.query("What are the main findings?")

print("Answer:", response.answer)
print("Citations:", len(response.context))

Initialization

Basic Initialization

from trainly import TrainlyClient
import os
from dotenv import load_dotenv

load_dotenv()

api_key = os.getenv("TRAINLY_API_KEY")
chat_id = os.getenv("TRAINLY_CHAT_ID")

if not api_key or not chat_id:
    raise ValueError("TRAINLY_API_KEY and TRAINLY_CHAT_ID must be set")

client = TrainlyClient(api_key=api_key, chat_id=chat_id)
print(f"Client initialized with chat_id: {client.chat_id}")
print(f"Base URL: {client.base_url}")
print(f"Timeout: {client.timeout}s")

Querying Documents

Basic Query

from trainly import TrainlyClient, QueryResponse

client = TrainlyClient(
    api_key=os.getenv("TRAINLY_API_KEY"),
    chat_id=os.getenv("TRAINLY_CHAT_ID")
)

response = client.query("What is the main topic?")

assert isinstance(response, QueryResponse), "Response should be QueryResponse"
assert hasattr(response, 'answer'), "Response should have answer"
assert isinstance(response.answer, str), "Answer should be a string"
assert len(response.answer) > 0, "Answer should not be empty"

print(f"Answer length: {len(response.answer)} characters")
print(f"Context chunks: {len(response.context)}")

if response.usage:
    print(f"Token usage: {response.usage.total_tokens} total tokens")

Query with Custom Parameters

response = client.query(
    question="Explain the key concepts",
    model="gpt-4o-mini",
    temperature=0.7,
    max_tokens=500,
    include_context=True
)

assert isinstance(response, QueryResponse), "Response should be QueryResponse"
print(f"Model: {response.model or 'default'}")
print(f"Answer preview: {response.answer[:100]}...")

Query without Context

response = client.query(
    question="What are the findings?",
    include_context=False
)

assert isinstance(response, QueryResponse), "Response should be QueryResponse"
print(f"Context chunks returned: {len(response.context)}")

Streaming Responses

Streaming Query

from trainly import TrainlyClient

chunks = []
content_parts = []

for chunk in client.query_stream("Explain the methodology"):
    chunks.append(chunk)
    if chunk.is_content:
        content_parts.append(chunk.data)
    elif chunk.is_end:
        print("Received end marker")

assert len(chunks) > 0, "Should receive at least one chunk"
assert any(chunk.is_end for chunk in chunks), "Should receive end marker"

full_content = "".join(content_parts)
assert len(full_content) > 0, "Should receive content"

print(f"Total chunks received: {len(chunks)}")
print(f"Content length: {len(full_content)} characters")

File Management

Upload Files

from trainly import TrainlyClient, UploadResult
import tempfile
import os

client = TrainlyClient(
    api_key=os.getenv("TRAINLY_API_KEY"),
    chat_id=os.getenv("TRAINLY_CHAT_ID")
)

# Create a temporary test file
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
    f.write("This is a test document for Trainly SDK testing.\n")
    f.write("It contains multiple lines of text.\n")
    f.write("The content should be searchable after upload.")
    temp_path = f.name

try:
    result = client.upload_file(temp_path)

    assert isinstance(result, UploadResult), "Result should be UploadResult"
    assert result.success is True, "Upload should be successful"
    assert result.file_id is not None, "File ID should be returned"
    assert result.size_bytes > 0, "File size should be greater than 0"

    print(f"Filename: {result.filename}")
    print(f"File ID: {result.file_id}")
    print(f"Size: {result.size_bytes} bytes")
    print(f"Status: {result.processing_status}")
finally:
    # Clean up temp file
    os.unlink(temp_path)

Upload File with Scope Values

import tempfile
import os

with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
    f.write("Test document with scope values")
    temp_path = f.name

try:
    result = client.upload_file(
        temp_path,
        scope_values={
            "test_category": "sdk_testing",
            "test_id": "test_123"
        }
    )

    assert result.success is True, "Upload should be successful"
    print("File uploaded with scope values")
finally:
    os.unlink(temp_path)

List Files

from trainly import TrainlyClient, FileListResult

result = client.list_files()

assert isinstance(result, FileListResult), "Result should be FileListResult"
assert result.success is True, "List should be successful"
assert isinstance(result.files, list), "Files should be a list"

print(f"Total files: {result.total_files}")
print(f"Total size: {result.total_size_bytes} bytes")

if result.files:
    print(f"Sample file: {result.files[0].filename}")
    print(f"  - File ID: {result.files[0].file_id}")
    print(f"  - Size: {result.files[0].size_bytes} bytes")
    print(f"  - Chunks: {result.files[0].chunk_count}")

Delete File

from trainly import TrainlyClient, FileDeleteResult

file_id = "your_file_id_here"

if not file_id:
    print("Skipping delete test - no file ID available")
else:
    result = client.delete_file(file_id)

    assert isinstance(result, FileDeleteResult), "Result should be FileDeleteResult"
    assert result.success is True, "Delete should be successful"

    print(f"Deleted file: {result.filename}")
    print(f"Chunks deleted: {result.chunks_deleted}")
    print(f"Size freed: {result.size_bytes_freed} bytes")

Error Handling

Basic Error Handling

from trainly import TrainlyClient, TrainlyError

# Test 1: Empty file ID for delete
try:
    client.delete_file("")
    print("Should have raised error for empty file ID")
except TrainlyError as e:
    print(f"Correctly raised error for empty file ID: {e}")

# Test 2: Non-existent file upload
try:
    client.upload_file("/nonexistent/file.pdf")
    print("Should have raised error for non-existent file")
except TrainlyError as e:
    print(f"Correctly raised error for non-existent file: {e}")

Context Manager

Using Context Manager

from trainly import TrainlyClient, QueryResponse
import os

api_key = os.getenv("TRAINLY_API_KEY")
chat_id = os.getenv("TRAINLY_CHAT_ID")

with TrainlyClient(api_key=api_key, chat_id=chat_id) as client:
    response = client.query("Test question")
    assert isinstance(response, QueryResponse)

print("Context manager works correctly")

File Lifecycle

Complete File Lifecycle

from trainly import TrainlyClient
import tempfile
import os

# Upload
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
    f.write("Lifecycle test document content")
    temp_path = f.name

try:
    upload_result = client.upload_file(temp_path)
    file_id = upload_result.file_id
    print(f"Uploaded file: {upload_result.filename}")

    # List and verify
    list_result = client.list_files()
    file_ids = [f.file_id for f in list_result.files]
    assert file_id in file_ids, "File should appear in list"
    print("File appears in file list")

    # Query about the file
    response = client.query("What is in the test document?")
    print("Query successful after upload")

    # Delete
    delete_result = client.delete_file(file_id)
    assert delete_result.success is True
    print("File deleted successfully")
finally:
    os.unlink(temp_path)

Query Specific File Content

Upload and Verify Content

from trainly import TrainlyClient
import tempfile
import os
import time

# First, delete all existing files to start with a clean slate
print("Cleaning up: Deleting all existing files...")
list_result = client.list_files()
if list_result.files:
    for file_info in list_result.files:
        try:
            client.delete_file(file_info.file_id)
            print(f"  Deleted: {file_info.filename}")
        except Exception as e:
            print(f"  Could not delete {file_info.filename}: {e}")
    print(f"Cleaned up {len(list_result.files)} existing files")
else:
    print("No existing files to clean up")

# Wait a moment for deletions to propagate
time.sleep(5)

# Create a file with very specific information
specific_content = """
Product Information Document

Product Name: UltraSound Pro 5000 Headphones
Model Number: USP-5000-BLK
Release Date: September 22, 2024
Price: $349.95

Key Specifications:
- Driver Size: 50mm neodymium drivers
- Frequency Response: 20Hz - 40kHz
- Impedance: 32 ohms
- Battery Life: 48 hours with ANC on
- Charging: USB-C fast charging (10 min = 6 hours)
- Weight: 285 grams

Features:
- Active Noise Cancellation (ANC) with 8 microphones
- Bluetooth 5.3 with multipoint connectivity
- Spatial audio support
- Premium leather ear cushions
- Foldable design with carrying case

Warranty: 2 years manufacturer warranty
"""

with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
    f.write(specific_content)
    temp_path = f.name

file_id = None
try:
    # Upload the file
    upload_result = client.upload_file(temp_path)
    file_id = upload_result.file_id
    print(f"Uploaded file with specific content: {upload_result.filename}")

    # Wait a moment for processing (if needed)
    time.sleep(30)
    print("File processed and indexed, continuing with tests...")

    # Test 1: Query about the product name
    print("Testing query about product name...")
    response1 = client.query("What is the product name mentioned in the document?")
    assert "UltraSound Pro 5000" in response1.answer or "USP-5000" in response1.answer or "UltraSound" in response1.answer, \
        f"Answer should contain product name. Got: {response1.answer}"
    print("✓ Product name query verified")

    # Test 2: Query about the price
    print("Testing query about price...")
    response2 = client.query("What is the price of the UltraSound Pro 5000?")
    assert "$349" in response2.answer or "349" in response2.answer, \
        f"Answer should contain price. Got: {response2.answer}"
    print("✓ Price query verified")

    # Test 3: Query about specifications
    print("Testing query about battery life...")
    response3 = client.query("What is the battery life of the UltraSound Pro 5000 headphones?")
    assert "48" in response3.answer, \
        f"Answer should contain '48' hours. Got: {response3.answer}"
    print("✓ Battery life specification query verified")

    # Test 4: Query about release date
    print("Testing query about release date...")
    response4 = client.query("When was the UltraSound Pro 5000 released?")
    assert "September" in response4.answer or "2024" in response4.answer or "September 22" in response4.answer, \
        f"Answer should contain release date. Got: {response4.answer}"
    print("✓ Release date query verified")

    # Test 5: Query about warranty
    print("Testing query about warranty...")
    response5 = client.query("What is the warranty period for the UltraSound Pro 5000?")
    assert "2" in response5.answer and ("year" in response5.answer.lower() or "yr" in response5.answer.lower()), \
        f"Answer should contain '2 years'. Got: {response5.answer}"
    print("✓ Warranty query verified")
    print(f"Response: {response5.answer}")

    print("All content verification checks passed!")

    # Test 6: Delete the file and verify AI no longer knows the content
    print("Testing query after file deletion...")
    delete_result = client.delete_file(file_id)
    assert delete_result.success is True, "File deletion should succeed"
    print(f"✓ File deleted: {delete_result.filename}")

    # Wait a moment for deletion to propagate
    time.sleep(5)

    # Query about the deleted content - AI should not know
    response_after_delete = client.query("What is the price of the UltraSound Pro 5000?")

    # Check that the AI indicates it doesn't know or can't find the information
    no_knowledge_indicators = [
        "don't know",
        "do not know",
        "don't have",
        "do not have",
        "not found",
        "no information",
        "cannot find",
        "can't find",
        "unable to find",
        "not available",
        "not mentioned",
        "don't see",
        "do not see"
    ]

    answer_lower = response_after_delete.answer.lower()
    has_no_knowledge = any(indicator in answer_lower for indicator in no_knowledge_indicators)

    # Also check that it doesn't contain the specific price anymore
    does_not_contain_price = "$349" not in response_after_delete.answer and "349.95" not in response_after_delete.answer

    if has_no_knowledge or does_not_contain_price:
        print("✓ AI correctly indicates it doesn't know after file deletion")
        print(f"Response after deletion: {response_after_delete.answer}")
    else:
        print(f"AI still claims to know the information after deletion. Response: {response_after_delete.answer}")
        print("This might indicate the file hasn't been fully removed from the index yet")

    file_id = None  # Clear file_id since we already deleted it

finally:
    # Clean up
    os.unlink(temp_path)
    if file_id:
        try:
            client.delete_file(file_id)
            print(f"Cleaned up test file: {file_id}")
        except Exception as e:
            print(f"Could not delete test file: {e}")

Type Hints

from trainly import TrainlyClient, TrainlyError
from trainly.models import (
    QueryResponse,
    UploadResult,
    FileListResult,
    FileDeleteResult,
    StreamChunk,
)

Next Steps