agent.run(files=[File(...)]) handles one document. Production runs see folders, queues, and nightly drops. Three primitives cover the production shape: concurrent runs for ad-hoc batches, background runs for long jobs, scheduled runs for nightly intake.
Concurrent batch over a list
The simplest batch is a folder of files.agent.arun is async, so a semaphore plus asyncio.gather is enough.
concurrency to the slowest of: your rate quota, your DB write throughput, your memory budget.
Background runs for long jobs
Sync runs hold an HTTP connection until the model returns. For multi-page contracts or scanned PDFs that take minutes, start the run in the background and poll.db. The agent process can restart and a different process can poll the same run_id. That is the durability property: state lives in the database, not in the calling process.
Scheduled batch with retries
For nightly intake (an SFTP drop, a Drive folder, a queue), put an AgentOS in front of your agent and let the scheduler fire the run on cron.ScheduleManager writes to the same db the AgentOS polls.
if_exists="update" makes the call idempotent. Re-running the bootstrap script does not create duplicates. The scheduler retries on HTTP failure with the configured delay, and every fire writes a row to agno_schedule_runs with status and timing.
Pattern comparison
| Pattern | When to reach for it | Process lifetime |
|---|---|---|
asyncio.gather over agent.arun | One-time backfill, a fixed list of files | One process, end-to-end |
agent.arun(background=True) + poll | Single long document, restart-tolerant | State in db, process can restart |
AgentOS(scheduler=True) + ScheduleManager | Recurring intake (nightly, hourly) | Long-running AgentOS process |
Workflow with Loop / Parallel steps | Multi-step pipelines per document | Either ad-hoc or scheduled |
/workflows/<id>/runs, scheduled with the same ScheduleManager.create call. See Workflows.
Observability
Every scheduled fire creates a row inagno_schedule_runs with the schedule id, attempt number, status, and the run_id of the underlying agent run. To see the last day of activity:
schedule_id and an incrementing attempt. That is the audit trail you can hand to ops.
Production checklist
| Concern | What to add |
|---|---|
| Idempotency per document | Pass a deterministic session_id (e.g. document hash) so re-runs upsert. |
| Dead-letter queue | After max_retries, the row stays in agno_schedule_runs with status="failed". Read it and route to a manual queue. |
| Per-provider rate limiting | The asyncio.Semaphore is enough for one provider. For mixed providers, run one semaphore per provider. |
| Storage of inputs | File(url=...) keeps the URL but not the bytes. If retention matters, store the source PDF before extraction. |
| Authoritative cost | RunMetrics.cost is populated when the provider returns it. For exact reconciliation, attach a token-rate table downstream. |
Next steps
| Task | Guide |
|---|---|
| Pause on low-confidence fields | Human routing and eval |
| Compose multiple agents into a pipeline | Workflows |
| See the workflow + scheduler integration | Scheduling |