A three-layer agentic framework I built from scratch that autonomously generates, validates, and self-corrects structured content β running for 30+ hours without human intervention, across provider quota cycles, with per-output cost tracking baked in.
I needed to generate structured educational content for 3,247 algorithm problems across three programming languages and three content flavors to match different learning styles. Doing this manually would require a team of 10+ content engineers working for a year.
The naive approach β prompt an LLM with βgenerate content for problem Xβ and accept the output β fails at production scale. Outputs are inconsistent, schema violations creep in, edge cases break downstream renderers, and cost runs unbounded. I've seen teams burn tens of thousands of dollars on βAI content pipelinesβ that produce unusable output because they treated the LLM as a magic box instead of architecting around its failure modes.
The AI Factory separates concerns across three distinct layers. Each layer has a single responsibility, making the system debuggable, extensible, and easy to reason about under failure.
The core observe β decide β act loop. One LLM, one conversation state, tool-use via the Anthropic SDK. This is the atomic unit of work.
A ThreadPoolExecutor dispatches N parallel workers, each running an independent single-agent loop with isolated state. Handles model routing and retry logic.
Each tool is dual-registered: an API schema for LLM consumption and a runtime implementation for execution. Adding a new tool requires zero changes to the orchestration layer.
Every agent system in the world is a variation on this pattern:
def run_agent(problem_code: str, *, model: str, dry_run: bool, verbose: bool) -> dict:
client = anthropic.Anthropic()
messages = [{"role": "user", "content": initial_prompt(problem_code)}]
turns = 0
while turns < MAX_TURNS:
turns += 1
# 1. LLM decides what to do
response = client.messages.create(
model=model,
system=SYSTEM_PROMPT,
messages=messages,
tools=TOOL_DEFINITIONS,
max_tokens=16384,
)
# 2. Agent signals completion
if response.stop_reason == "end_turn":
return {"success": True, "problem": problem_code, "turns": turns}
# 3. Execute tool calls
if response.stop_reason == "tool_use":
tool_results = []
for block in response.content:
if block.type == "tool_use":
result = TOOL_IMPLEMENTATIONS[block.name](block.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": str(result),
})
# 4. Feed results back and loop
messages.append({"role": "assistant", "content": response.content})
messages.append({"role": "user", "content": tool_results})
return {"success": False, "problem": problem_code, "turns": turns}agent.py β the core agentic loop
Hard cap on loop iterations (20 by default). Prevents runaway agents that get stuck in validation-repair spirals.
File I/O tools enforce path containment. The agent literally cannot escape its designated directory.
Every call logs input tokens, output tokens, and dollar cost. Unit economics by default, not an afterthought.
The --dry-run flag short-circuits destructive tool calls. I can validate the full loop without side effects.
One agent is a toy. Production throughput comes from running many in parallel, with shared failure handling and intelligent model routing. The orchestrator is where that happens.
def run_orchestrator(*, start, limit, workers, model, dry_run, verbose):
problems = json.loads(list_problems(start=start, limit=limit))
if not problems:
return {"total": 0, "success": 0, "failed": 0}
results = {"total": len(problems), "success": 0, "failed": 0,
"succeeded": [], "failed_list": []}
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(run_agent, problem,
model=model, dry_run=dry_run, verbose=verbose): problem
for problem in problems
}
for future in as_completed(futures):
problem = futures[future]
try:
result = future.result()
if result["success"]:
results["success"] += 1
results["succeeded"].append(problem)
else:
results["failed"] += 1
results["failed_list"].append({
"problem": problem,
"error": result["message"]
})
except Exception as e:
results["failed"] += 1
results["failed_list"].append({"problem": problem, "error": str(e)})
return resultsorchestrator.py β parallel worker dispatch
Not every task deserves the most expensive model. I benchmarked each capability against cost and made routing decisions per workload type:
Tools are how LLMs interact with the world. I use a dual-registration pattern: each tool is defined once as an API schema (for the LLM) and once as a runtime implementation (for execution). This separation keeps the LLM's view of the tool decoupled from how it's actually wired up.
# Runtime implementations
def read_file(path: str) -> str:
"""Read a file and return its contents."""
full = (CONTENT_DIR / path).resolve()
# Enforce sandbox: path must stay within CONTENT_DIR
if not str(full).startswith(str(CONTENT_DIR.resolve())):
return f"ERROR: Access denied β path must be under {CONTENT_DIR}"
try:
return full.read_text(encoding='utf-8')[:50000] # Cap at 50K chars
except FileNotFoundError:
return f"ERROR: File not found: {path}"
def write_file(path: str, content: str) -> str:
"""Write content to a file (sandboxed)."""
full = (CONTENT_DIR / path).resolve()
if not str(full).startswith(str(CONTENT_DIR.resolve())):
return f"ERROR: Access denied"
full.parent.mkdir(parents=True, exist_ok=True)
full.write_text(content, encoding='utf-8')
return f"OK: Written {len(content)} chars to {path}"
# API schemas β what the LLM sees
TOOL_DEFINITIONS = [
{
"name": "read_file",
"description": "Read a file from the content directory.",
"input_schema": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Relative path"}
},
"required": ["path"]
}
},
# ... more tools
]
# Runtime dispatch map β what actually executes
TOOL_IMPLEMENTATIONS = {
"read_file": lambda args: read_file(args["path"]),
"write_file": lambda args: write_file(args["path"], args["content"]),
"validate_visualization": lambda args: validate_visualization(args["path"]),
}tools.py β dual-registration pattern
This is the piece everyone skips when they build AI pipelines, and it's why their pipelines fail in production. Every output must pass a structured validation check before it reaches downstream consumers.
Top-level keys, nested structure, required fields all present.
Minimum counts enforced: 3+ test cases, 3 languages, 3 flavors.
JavaScript, Python, and Java variants all present and parseable.
Every required field populated β no empty strings, no placeholders.
Specific checks for bugs we've seen before (valueSlots nesting, etc).
LLM-as-judge scoring for helpfulness and pedagogical correctness.
def validate_visualization(path: str) -> str:
with open(path) as f:
data = json.load(f)
issues = []
# Top-level schema check
for key in ['testCases', 'annotatedCode', 'algorithmMeta', 'thinkingContent']:
if key not in data:
issues.append(f"Missing required key: {key}")
# Cardinality: at least 3 test cases
tcs = data.get('testCases', [])
if len(tcs) < 3:
issues.append(f"Need 3+ test cases, found {len(tcs)}")
# Per-state structural completeness
for i, tc in enumerate(tcs):
for j, state in enumerate(tc.get('states', [])):
for field in ['step', 'codeLineId', 'phase', 'description',
'variables', 'dataStructureState', 'pointers',
'annotation', 'calculation']:
if field not in state:
issues.append(f"testCase[{i}].states[{j}] missing '{field}'")
# Language coverage
ac = data.get('annotatedCode', {})
for lang in ['javascript', 'python', 'java']:
if lang not in ac:
issues.append(f"Missing annotatedCode.{lang}")
elif 'valueSlots' not in ac[lang]:
# Known bug: LLMs sometimes put valueSlots at the wrong nesting level
issues.append(f"annotatedCode.{lang} missing valueSlots "
f"(should be inside lang dict)")
# Thinking content flavors
tc = data.get('thinkingContent', {})
for flavor in ['technical', 'fun', 'spiritual']:
if flavor not in tc:
issues.append(f"Missing thinkingContent.{flavor}")
if issues:
return "VALIDATION FAILED:\n" + "\n".join(f" β’ {i}" for i in issues)
return f"VALID: {len(tcs)} test cases, {sum_of_states} states"validate_visualization β a slice of the Report Card logic
Autonomous systems need explicit stop conditions. I built circuit breakers at four levels, each addressing a different failure mode.
All I/O tools enforce path containment. The agent literally cannot read or write outside its designated directory.
MAX_TURNS = 20 per problem. Prevents runaway loops where the agent keeps retrying a broken pattern.
MAX_RETRIES = 3 per validation failure. Stops the "validate β fix β re-validate β fix" death spiral.
External actions (publishing, deletion, sending) require explicit human approval. Nothing destructive runs autonomously.
Every architectural decision is a trade-off. Here are the choices I made and the alternatives I rejected.
Chose: Custom 300-line Python framework. Rejected: LangChain, CrewAI, AutoGen.
Framework abstractions obscure what's happening, making production debugging painful. I needed explicit control over retry logic, model fallback, cost tracking per request, and per-tenant observability β all of which required working around frameworks rather than through them. For a team new to agents, I'd recommend LangGraph. For my production use case, custom was the right call. The trade-off is velocity versus control, and you earn the right to build custom by first understanding why the frameworks exist.
Chose: ThreadPoolExecutor. Rejected: asyncio.
Both work. Threading is simpler when the Anthropic SDK handles I/O blocking internally and each worker runs a sequential agent loop. Async adds complexity (coroutines, event loops, context propagation) without proportional benefit for this workload. If I were building a high-concurrency HTTP server I'd pick async. For a batch pipeline with N independent workers, threading wins on simplicity.
Chose: File system as the source of truth. No database. Rejected: Postgres or Redis for agent state.
The content directory (where agents read/write) doubles as the audit log and the resume-from-failure state. After a crash I can see exactly which problems completed, which partially completed, and which never started β by listing files. Adding a database would add operational surface area for no benefit. If this were a multi-tenant production system I'd add a database for coordination; for a single-tenant pipeline, plain files win.
After generating 1,600+ AI-authored solutions and proving the full visualization pipeline on 200+ problems, I stopped the pipeline. Not because it failed β because the infrastructure was proven and continuing would be a pure token-budget decision with no new learning.
Numbers from real runs, rounded for clarity, measured at the pipeline level (not cherry-picked from best runs):
What this removes for a team: the bottleneck of human review on high-volume AI-generated output. With Report Cards and sandboxed tool execution, agents can run overnight without someone watching them β which means the team's time goes to the work AI can't do yet, not to babysitting the work AI can.
How it scales beyond a solo build: every layer of this architecture is reusable across products. The same orchestrator powered both live products at Zen Algorithms β CosmicKeys and WatchAlgo β plus an MVP build of an AI-first professional network (ZoomedIn.us), demonstrating reusability across fundamentally different domains. Within a company, the same pattern would let one platform team serve many product teams β each one writing its own tool definitions and quality criteria while sharing the orchestration core. That's how an AI platform multiplies across an engineering organization instead of being duplicated per team.
The leadership insight: AI-native development isn't about one person being 10x faster. It's about building reusable agentic infrastructure that makes every engineer on a team 10x faster at the work that matters. The AI Factory is the shape that infrastructure takes when you design it for production, not for demos.