Resilient generation

This section covers the foundational patterns for building resilient LLM-powered tasks: reusable environments, traced function calls, and retry strategies.

Two environments

This example uses two task environments with different characteristics:

  1. llm_env (reusable): For tasks that make many LLM calls in a loop or process batches in parallel. Container reuse avoids cold starts.
  2. driver_env (standard): For orchestration tasks that fan out work to other tasks but don’t make LLM calls themselves.

Reusable environment for LLM work

When processing a batch of topics, each topic goes through multiple LLM calls (generate, critique, revise, repeat). With 5 topics × ~7 calls each, that’s ~35 LLM calls. ReusePolicy keeps containers warm to handle this efficiently:

generate.py
# Reusable environment for tasks that make many LLM calls in a loop.
# The ReusePolicy keeps containers warm, reducing cold start latency for iterative work.
llm_env = flyte.TaskEnvironment(
    name="llm-worker",
    secrets=[] if MOCK_MODE else [flyte.Secret(key="openai-api-key", as_env_var="OPENAI_API_KEY")],
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "unionai-reuse>=0.1.10",
        "openai>=1.0.0",
        "pydantic>=2.0.0",
    ),
    resources=flyte.Resources(cpu=1, memory="2Gi"),
    reusable=flyte.ReusePolicy(
        replicas=2,              # Keep 2 container instances ready
        concurrency=4,           # Allow 4 concurrent tasks per container
        scaledown_ttl=timedelta(minutes=5),   # Wait 5 min before scaling down
        idle_ttl=timedelta(minutes=30),       # Shut down after 30 min idle
    ),
    cache="auto",
)

ReusePolicy parameters

Parameter Description
replicas Number of container instances to keep ready (or (min, max) tuple)
concurrency Maximum tasks per container at once
scaledown_ttl Minimum wait before scaling down a replica
idle_ttl Time after which idle containers shut down completely

The configuration above keeps 2 containers ready, allows 4 concurrent tasks per container, waits 5 minutes before scaling down, and shuts down after 30 minutes of inactivity.

Both scaledown_ttl and idle_ttl must be at least 30 seconds.

Standard environment for orchestration

The driver environment doesn’t need container reuse—it just coordinates work. The depends_on parameter declares that tasks in this environment call tasks in llm_env, ensuring both environments are deployed together:

generate.py
# Standard environment for orchestration tasks that don't need container reuse.
# depends_on declares that this environment's tasks call tasks in llm_env.
driver_env = flyte.TaskEnvironment(
    name="driver",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "pydantic>=2.0.0",
    ),
    resources=flyte.Resources(cpu=1, memory="1Gi"),
    depends_on=[llm_env],
)

Traced LLM calls

The @flyte.trace decorator provides automatic checkpointing at the function level. When a traced function completes successfully, its result is cached. If the task fails and restarts, previously completed traced calls return their cached results instead of re-executing.

generate.py
@flyte.trace
async def call_llm(prompt: str, system: str, json_mode: bool = False) -> str:
    """
    Make an LLM call with automatic checkpointing.

    The @flyte.trace decorator provides:
    - Automatic caching of results for identical inputs
    - Recovery from failures without re-running successful calls
    - Full observability in the Flyte UI

    Args:
        prompt: The user prompt to send
        system: The system prompt defining the LLM's role
        json_mode: Whether to request JSON output

    Returns:
        The LLM's response text
    """
    # Use mock responses for testing without API keys
    if MOCK_MODE:
        import asyncio
        await asyncio.sleep(0.5)  # Simulate API latency

        if "critique" in prompt.lower() or "critic" in system.lower():
            # Return good score if draft has been revised (contains revision marker)
            if "[REVISED]" in prompt:
                return MOCK_CRITIQUE_GOOD
            return MOCK_CRITIQUE_NEEDS_WORK
        elif "summary" in system.lower():
            return MOCK_SUMMARY
        elif "revis" in system.lower():
            # Return revised version with marker
            return MOCK_REPORT.replace("## Introduction", "[REVISED]\n\n## Introduction")
        else:
            return MOCK_REPORT

    from openai import AsyncOpenAI

    client = AsyncOpenAI()

    kwargs = {
        "model": "gpt-4o-mini",
        "messages": [
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ],
        "max_tokens": 2000,
    }

    if json_mode:
        kwargs["response_format"] = {"type": "json_object"}

    response = await client.chat.completions.create(**kwargs)
    return response.choices[0].message.content

Benefits of tracing

  1. Cost savings: Failed tasks don’t re-run expensive API calls that already succeeded
  2. Faster recovery: Resuming from checkpoints skips completed work
  3. Observability: Each traced call appears in the Flyte UI with timing data

When to use @flyte.trace

Use @flyte.trace for:

  • LLM API calls (OpenAI, Anthropic, etc.)
  • External API requests
  • Any expensive operation you don’t want to repeat on retry

Don’t use @flyte.trace for:

  • Simple computations (overhead outweighs benefit)
  • Operations with side effects that shouldn’t be skipped

Traced helper functions

The LLM-calling functions are decorated with @flyte.trace rather than being separate tasks. This keeps the architecture simple while still providing checkpointing:

generate.py
@flyte.trace
async def generate_initial_draft(topic: str) -> str:
    """
    Generate the initial report draft.

    The @flyte.trace decorator provides checkpointing - if the task fails
    after this completes, it won't re-run on retry.

    Args:
        topic: The topic to write about

    Returns:
        The initial draft in markdown format
    """
    print(f"Generating initial draft for topic: {topic}")

    prompt = f"Write a comprehensive report on the following topic:\n\n{topic}"
    draft = await call_llm(prompt, GENERATOR_SYSTEM_PROMPT)

    print(f"Generated initial draft ({len(draft)} characters)")
    return draft

These traced functions run inside the refine_report task. If the task fails and retries, completed traced calls return cached results instead of re-executing.

Retry strategies

The task that orchestrates the LLM calls uses retries to handle transient failures:

@llm_env.task(retries=3)
async def refine_report(topic: str, ...) -> str:
    # Traced functions are called here
    draft = await generate_initial_draft(topic)
    ...

Configuring retries

You can specify retries as a simple integer:

@llm_env.task(retries=3)
async def my_task():
    ...

Or use RetryStrategy for more control:

@llm_env.task(retries=flyte.RetryStrategy(count=3))
async def my_task():
    ...

Combining tracing with retries

When you combine @flyte.trace with task-level retries, you get the best of both:

  1. Task fails after completing some traced calls
  2. Flyte retries the task
  3. Previously completed traced calls return cached results
  4. Only the failed operation (and subsequent ones) re-execute

This pattern is essential for multi-step LLM workflows where you don’t want to re-run the entire chain when a single call fails.

Structured prompts

The example uses a separate prompts.py module for system prompts and Pydantic models:

prompts.py
GENERATOR_SYSTEM_PROMPT = """You are an expert report writer. Generate a well-structured,
informative report on the given topic. The report should include:

1. An engaging introduction that sets context
2. Clear sections with descriptive headings
3. Specific facts, examples, or data points where relevant
4. A conclusion that summarizes key takeaways

Write in a professional but accessible tone. Use markdown formatting for structure.
Aim for approximately 500-800 words."""

CRITIC_SYSTEM_PROMPT = """You are a demanding but fair editor reviewing a report draft.
Evaluate the report on these criteria:

- Clarity: Is the writing clear and easy to follow?
- Structure: Is it well-organized with logical flow?
- Depth: Does it provide sufficient detail and insight?
- Accuracy: Are claims supported and reasonable?
- Engagement: Is it interesting to read?

Provide your response as JSON matching this schema:
{
    "score": <1-10 integer>,
    "strengths": ["strength 1", "strength 2", ...],
    "improvements": ["improvement 1", "improvement 2", ...],
    "summary": "brief overall assessment"
}

Be specific in your feedback. A score of 8+ means the report is ready for publication."""

REVISER_SYSTEM_PROMPT = """You are an expert editor revising a report based on feedback.
Your task is to improve the report by addressing the specific improvements requested
while preserving its strengths.

Guidelines:
- Address each improvement point specifically
- Maintain the original voice and style
- Keep the same overall structure unless restructuring is requested
- Preserve any content that was praised as a strength
- Ensure the revised version is cohesive and flows well

Return only the revised report in markdown format, no preamble or explanation."""

SUMMARY_SYSTEM_PROMPT = """Create a concise executive summary (2-3 paragraphs) of the
following report. Capture the key points and main takeaways. Write in a professional
tone suitable for busy executives who need the essential information quickly."""

Pydantic models for structured output

LLM responses can be unpredictable. Using Pydantic models with JSON mode ensures you get structured, validated data:

prompts.py
class Critique(BaseModel):
    """Structured critique response from the LLM."""

    score: int = Field(
        ge=1,
        le=10,
        description="Quality score from 1-10, where 10 is publication-ready",
    )
    strengths: list[str] = Field(
        description="List of strengths in the current draft",
    )
    improvements: list[str] = Field(
        description="Specific improvements needed",
    )
    summary: str = Field(
        description="Brief summary of the critique",
    )

The Critique model validates that:

  • score is an integer between 1 and 10
  • strengths and improvements are lists of strings
  • All required fields are present

If the LLM returns malformed JSON, Pydantic raises a validation error, which triggers a retry (if configured).

Next steps

With resilient generation in place, you’re ready to build the agentic refinement loop.