Resilient generation
This section covers the foundational patterns for building resilient LLM-powered tasks: reusable environments, traced function calls, and retry strategies.
Two environments
This example uses two task environments with different characteristics:
llm_env(reusable): For tasks that make many LLM calls in a loop or process batches in parallel. Container reuse avoids cold starts.driver_env(standard): For orchestration tasks that fan out work to other tasks but don’t make LLM calls themselves.
Reusable environment for LLM work
When processing a batch of topics, each topic goes through multiple LLM calls
(generate, critique, revise, repeat). With 5 topics × ~7 calls each, that’s ~35
LLM calls. ReusePolicy keeps containers warm to handle this efficiently:
# Reusable environment for tasks that make many LLM calls in a loop.
# The ReusePolicy keeps containers warm, reducing cold start latency for iterative work.
llm_env = flyte.TaskEnvironment(
name="llm-worker",
secrets=[] if MOCK_MODE else [flyte.Secret(key="openai-api-key", as_env_var="OPENAI_API_KEY")],
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"unionai-reuse>=0.1.10",
"openai>=1.0.0",
"pydantic>=2.0.0",
),
resources=flyte.Resources(cpu=1, memory="2Gi"),
reusable=flyte.ReusePolicy(
replicas=2, # Keep 2 container instances ready
concurrency=4, # Allow 4 concurrent tasks per container
scaledown_ttl=timedelta(minutes=5), # Wait 5 min before scaling down
idle_ttl=timedelta(minutes=30), # Shut down after 30 min idle
),
cache="auto",
)
ReusePolicy parameters
| Parameter | Description |
|---|---|
replicas |
Number of container instances to keep ready (or (min, max) tuple) |
concurrency |
Maximum tasks per container at once |
scaledown_ttl |
Minimum wait before scaling down a replica |
idle_ttl |
Time after which idle containers shut down completely |
The configuration above keeps 2 containers ready, allows 4 concurrent tasks per container, waits 5 minutes before scaling down, and shuts down after 30 minutes of inactivity.
scaledown_ttl and idle_ttl must be at least 30 seconds.Standard environment for orchestration
The driver environment doesn’t need container reuse—it just coordinates work.
The depends_on parameter declares that tasks in this environment call tasks
in llm_env, ensuring both environments are deployed together:
# Standard environment for orchestration tasks that don't need container reuse.
# depends_on declares that this environment's tasks call tasks in llm_env.
driver_env = flyte.TaskEnvironment(
name="driver",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"pydantic>=2.0.0",
),
resources=flyte.Resources(cpu=1, memory="1Gi"),
depends_on=[llm_env],
)
Traced LLM calls
The @flyte.trace decorator provides automatic checkpointing at the function level.
When a traced function completes successfully, its result is cached. If the task
fails and restarts, previously completed traced calls return their cached results
instead of re-executing.
@flyte.trace
async def call_llm(prompt: str, system: str, json_mode: bool = False) -> str:
"""
Make an LLM call with automatic checkpointing.
The @flyte.trace decorator provides:
- Automatic caching of results for identical inputs
- Recovery from failures without re-running successful calls
- Full observability in the Flyte UI
Args:
prompt: The user prompt to send
system: The system prompt defining the LLM's role
json_mode: Whether to request JSON output
Returns:
The LLM's response text
"""
# Use mock responses for testing without API keys
if MOCK_MODE:
import asyncio
await asyncio.sleep(0.5) # Simulate API latency
if "critique" in prompt.lower() or "critic" in system.lower():
# Return good score if draft has been revised (contains revision marker)
if "[REVISED]" in prompt:
return MOCK_CRITIQUE_GOOD
return MOCK_CRITIQUE_NEEDS_WORK
elif "summary" in system.lower():
return MOCK_SUMMARY
elif "revis" in system.lower():
# Return revised version with marker
return MOCK_REPORT.replace("## Introduction", "[REVISED]\n\n## Introduction")
else:
return MOCK_REPORT
from openai import AsyncOpenAI
client = AsyncOpenAI()
kwargs = {
"model": "gpt-4o-mini",
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": prompt},
],
"max_tokens": 2000,
}
if json_mode:
kwargs["response_format"] = {"type": "json_object"}
response = await client.chat.completions.create(**kwargs)
return response.choices[0].message.content
Benefits of tracing
- Cost savings: Failed tasks don’t re-run expensive API calls that already succeeded
- Faster recovery: Resuming from checkpoints skips completed work
- Observability: Each traced call appears in the Flyte UI with timing data
When to use @flyte.trace
Use @flyte.trace for:
- LLM API calls (OpenAI, Anthropic, etc.)
- External API requests
- Any expensive operation you don’t want to repeat on retry
Don’t use @flyte.trace for:
- Simple computations (overhead outweighs benefit)
- Operations with side effects that shouldn’t be skipped
Traced helper functions
The LLM-calling functions are decorated with @flyte.trace rather than being
separate tasks. This keeps the architecture simple while still providing
checkpointing:
@flyte.trace
async def generate_initial_draft(topic: str) -> str:
"""
Generate the initial report draft.
The @flyte.trace decorator provides checkpointing - if the task fails
after this completes, it won't re-run on retry.
Args:
topic: The topic to write about
Returns:
The initial draft in markdown format
"""
print(f"Generating initial draft for topic: {topic}")
prompt = f"Write a comprehensive report on the following topic:\n\n{topic}"
draft = await call_llm(prompt, GENERATOR_SYSTEM_PROMPT)
print(f"Generated initial draft ({len(draft)} characters)")
return draft
These traced functions run inside the refine_report task. If the task fails
and retries, completed traced calls return cached results instead of re-executing.
Retry strategies
The task that orchestrates the LLM calls uses retries to handle transient failures:
@llm_env.task(retries=3)
async def refine_report(topic: str, ...) -> str:
# Traced functions are called here
draft = await generate_initial_draft(topic)
...Configuring retries
You can specify retries as a simple integer:
@llm_env.task(retries=3)
async def my_task():
...Or use RetryStrategy for more control:
@llm_env.task(retries=flyte.RetryStrategy(count=3))
async def my_task():
...Combining tracing with retries
When you combine @flyte.trace with task-level retries, you get the best of both:
- Task fails after completing some traced calls
- Flyte retries the task
- Previously completed traced calls return cached results
- Only the failed operation (and subsequent ones) re-execute
This pattern is essential for multi-step LLM workflows where you don’t want to re-run the entire chain when a single call fails.
Structured prompts
The example uses a separate prompts.py module for system prompts and Pydantic models:
GENERATOR_SYSTEM_PROMPT = """You are an expert report writer. Generate a well-structured,
informative report on the given topic. The report should include:
1. An engaging introduction that sets context
2. Clear sections with descriptive headings
3. Specific facts, examples, or data points where relevant
4. A conclusion that summarizes key takeaways
Write in a professional but accessible tone. Use markdown formatting for structure.
Aim for approximately 500-800 words."""
CRITIC_SYSTEM_PROMPT = """You are a demanding but fair editor reviewing a report draft.
Evaluate the report on these criteria:
- Clarity: Is the writing clear and easy to follow?
- Structure: Is it well-organized with logical flow?
- Depth: Does it provide sufficient detail and insight?
- Accuracy: Are claims supported and reasonable?
- Engagement: Is it interesting to read?
Provide your response as JSON matching this schema:
{
"score": <1-10 integer>,
"strengths": ["strength 1", "strength 2", ...],
"improvements": ["improvement 1", "improvement 2", ...],
"summary": "brief overall assessment"
}
Be specific in your feedback. A score of 8+ means the report is ready for publication."""
REVISER_SYSTEM_PROMPT = """You are an expert editor revising a report based on feedback.
Your task is to improve the report by addressing the specific improvements requested
while preserving its strengths.
Guidelines:
- Address each improvement point specifically
- Maintain the original voice and style
- Keep the same overall structure unless restructuring is requested
- Preserve any content that was praised as a strength
- Ensure the revised version is cohesive and flows well
Return only the revised report in markdown format, no preamble or explanation."""
SUMMARY_SYSTEM_PROMPT = """Create a concise executive summary (2-3 paragraphs) of the
following report. Capture the key points and main takeaways. Write in a professional
tone suitable for busy executives who need the essential information quickly."""
Pydantic models for structured output
LLM responses can be unpredictable. Using Pydantic models with JSON mode ensures you get structured, validated data:
class Critique(BaseModel):
"""Structured critique response from the LLM."""
score: int = Field(
ge=1,
le=10,
description="Quality score from 1-10, where 10 is publication-ready",
)
strengths: list[str] = Field(
description="List of strengths in the current draft",
)
improvements: list[str] = Field(
description="Specific improvements needed",
)
summary: str = Field(
description="Brief summary of the critique",
)
The Critique model validates that:
scoreis an integer between 1 and 10strengthsandimprovementsare lists of strings- All required fields are present
If the LLM returns malformed JSON, Pydantic raises a validation error, which triggers a retry (if configured).
Next steps
With resilient generation in place, you’re ready to build the agentic refinement loop.