Back to cookbook

Overnight batch classifications

What you'll build

A batch classifier. You have 50,000 customer-support tickets sitting in a CSV. You write each row out as a JSONL file in OpenAI's batch request format, upload via POST /v1/files, create a batch via POST /v1/batches, then poll every minute until the batch completes (usually within hours). The output file contains one JSONL row per request, you download, parse, and merge back into your DB. Batch requests bill at 0.5x the sync rate.

What you need

  • An FC API key with scope api:write
  • pip install openai
  • A corpus of inputs (we use a list of ticket strings below)

Full code

python
# classify_tickets.py import io, json, os, time from openai import OpenAI client = OpenAI(api_key=os.environ["FC_API_KEY"], base_url="https://api.fightclub.pro/v1") # 1. Prepare the JSONL batch input. tickets = [ {"id": "t_001", "body": "My login is broken, please help"}, {"id": "t_002", "body": "Can I get a refund for last month?"}, {"id": "t_003", "body": "Your pricing page has a typo on it"}, # ... imagine 50,000 rows here ] SYSTEM = ( "Classify the ticket into one of: account, billing, bug, feature_request, other. " "Respond with JUST the label." ) buf = io.BytesIO() for t in tickets: row = { "custom_id": t["id"], "method": "POST", "url": "/v1/chat/completions", "body": { "model": "fc:openai/gpt-4o-mini", "messages": [ {"role": "system", "content": SYSTEM}, {"role": "user", "content": t["body"]}, ], "max_tokens": 8, }, } buf.write((json.dumps(row) + "\n").encode()) buf.seek(0) # 2. Upload the JSONL file. uploaded = client.files.create(file=("tickets.jsonl", buf), purpose="batch") print(f"file: {uploaded.id} ({uploaded.bytes} bytes)") # 3. Create the batch. batch = client.batches.create( input_file_id=uploaded.id, endpoint="/v1/chat/completions", completion_window="24h", metadata={"source": "ticket_backlog_apr21"}, ) print(f"batch: {batch.id} status={batch.status}") # 4. Poll until terminal. while batch.status in ("validating", "in_progress", "finalizing"): time.sleep(60) batch = client.batches.retrieve(batch.id) print(f" {batch.status} {batch.request_counts.completed}/{batch.request_counts.total}") if batch.status != "completed": raise SystemExit(f"batch ended: {batch.status} — error file: {batch.error_file_id}") # 5. Download the output JSONL. out_content = client.files.content(batch.output_file_id).read().decode() classifications = {} for line in out_content.splitlines(): row = json.loads(line) cid = row["custom_id"] label = row["response"]["body"]["choices"][0]["message"]["content"].strip() classifications[cid] = label print(f"classified {len(classifications)} tickets") # merge back into your ticket DB here...

Walkthrough

The custom_id field on each input row is how you correlate output rows back to your records. Ringside does not reorder, but some rows may fail (rate limits, upstream errors); those appear in batch.error_file_id with the same custom_id. Always join on custom_id, never on position.

Polling every 60 seconds is fine, batches are rate-limit-free and you're not billed for polls. For jobs that might run overnight, prefer subscribing to the batch.completed webhook instead of polling. The event payload includes output_file_id and request_counts.

The pricing: batch children are billed at 0.5x the sync markup. If a sync chat request would cost $0.02, the batch equivalent is $0.01. This shows up in api_usage_events.batch_markup_multiplier = 0.5 and rolls into /v1/margin correctly.

24-hour completion_window is the only value supported in v2.1, tunable windows are a v2.2 item.

Run it

bash
export FC_API_KEY=sk_live_xxx python classify_tickets.py

You'll see status transitions validating → in_progress → finalizing → completed over ~10 minutes for 3 rows, longer for 50k.

What's next