Parallel outputs
After refining the report, the pipeline generates multiple output formats in
parallel. This demonstrates how to use asyncio.gather for concurrent execution
within a task.
The formatting functions
The pipeline generates three outputs: markdown, HTML, and an executive summary.
Only generate_summary uses @flyte.trace because it makes an LLM call.
The markdown and HTML functions are simple, deterministic transformations that
don’t benefit from checkpointing:
async def format_as_markdown(content: str) -> str:
"""Format the report as clean markdown."""
# Content is already markdown, but we could add TOC, metadata, etc.
return f"""---
title: Generated Report
date: {__import__('datetime').datetime.now().isoformat()}
---
{content}
"""
async def format_as_html(content: str) -> str:
"""Convert the report to HTML."""
# Simple markdown to HTML conversion
import re
html = content
# Convert headers
html = re.sub(r"^### (.+)$", r"<h3>\1</h3>", html, flags=re.MULTILINE)
html = re.sub(r"^## (.+)$", r"<h2>\1</h2>", html, flags=re.MULTILINE)
html = re.sub(r"^# (.+)$", r"<h1>\1</h1>", html, flags=re.MULTILINE)
# Convert bold/italic
html = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", html)
html = re.sub(r"\*(.+?)\*", r"<em>\1</em>", html)
# Convert paragraphs
html = re.sub(r"\n\n", r"</p><p>", html)
return f"""<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Generated Report</title>
<style>
body {{ font-family: system-ui, sans-serif; max-width: 800px; margin: 2rem auto; padding: 0 1rem; }}
h1, h2, h3 {{ color: #333; }}
p {{ line-height: 1.6; }}
</style>
</head>
<body>
<p>{html}</p>
</body>
</html>
"""
@flyte.trace
async def generate_summary(content: str) -> str:
"""Generate an executive summary of the report."""
return await call_llm(content, SUMMARY_SYSTEM_PROMPT)
When to trace and when not to
Use @flyte.trace for operations that are expensive, non-deterministic, or
call external APIs (like generate_summary). Skip it for cheap, deterministic
transformations (like format_as_markdown and format_as_html) where
re-running on retry is trivial.
Parallel execution with asyncio.gather
The format_outputs task runs all formatters concurrently:
@llm_env.task
async def format_outputs(content: str) -> Dir:
"""
Generate multiple output formats in parallel.
Uses asyncio.gather to run all formatting operations concurrently,
maximizing efficiency when each operation is I/O-bound.
Args:
content: The final report content
Returns:
Directory containing all formatted outputs
"""
print("Generating output formats in parallel...")
with flyte.group("formatting"):
# Run all formatting operations in parallel
markdown, html, summary = await asyncio.gather(
format_as_markdown(content),
format_as_html(content),
generate_summary(content),
)
# Write outputs to a directory
output_dir = tempfile.mkdtemp()
with open(os.path.join(output_dir, "report.md"), "w") as f:
f.write(markdown)
with open(os.path.join(output_dir, "report.html"), "w") as f:
f.write(html)
with open(os.path.join(output_dir, "summary.txt"), "w") as f:
f.write(summary)
print(f"Created outputs in {output_dir}")
return await Dir.from_local(output_dir)
How asyncio.gather works
asyncio.gather takes multiple coroutines and runs them concurrently:
markdown, html, summary = await asyncio.gather(
format_as_markdown(content), # Starts immediately
format_as_html(content), # Starts immediately
generate_summary(content), # Starts immediately
)
# All three run concurrently, results returned in orderWithout gather, these would run sequentially:
# Sequential (slower)
markdown = await format_as_markdown(content) # Wait for completion
html = await format_as_html(content) # Then start this
summary = await generate_summary(content) # Then start thisWhen to use asyncio.gather
Use asyncio.gather when:
- Operations are independent (don’t depend on each other’s results)
- Operations are I/O-bound (API calls, file operations)
- You want to minimize total execution time
Don’t use asyncio.gather when:
- Operations depend on each other
- Operations are CPU-bound (use process pools instead)
- Order of execution matters for side effects
Grouping parallel operations
The parallel formatting is wrapped in a group for UI clarity:
with flyte.group("formatting"):
markdown, html, summary = await asyncio.gather(...)In the Flyte UI, the traced call within the group is visible:
format_outputs
└── formatting
├── format_as_markdown
├── format_as_html
└── generate_summary (traced)Collecting outputs in a directory
The formatted outputs are written to a temporary directory and returned as a
Dir artifact:
output_dir = tempfile.mkdtemp()
with open(os.path.join(output_dir, "report.md"), "w") as f:
f.write(markdown)
with open(os.path.join(output_dir, "report.html"), "w") as f:
f.write(html)
with open(os.path.join(output_dir, "summary.txt"), "w") as f:
f.write(summary)
return await Dir.from_local(output_dir)The Dir.from_local() call uploads the directory to Union.ai’s
artifact storage, making it available to downstream tasks or applications.
The batch pipeline
The batch pipeline processes multiple topics in parallel, demonstrating where
ReusePolicy truly shines:
@driver_env.task
async def report_batch_pipeline(
topics: list[str],
max_iterations: int = 3,
quality_threshold: int = 8,
) -> list[Dir]:
"""
Generate reports for multiple topics in parallel.
This is where ReusePolicy shines: with N topics, each going through
up to max_iterations refinement cycles, the reusable container pool
handles potentially N × 7 LLM calls efficiently without cold starts.
Args:
topics: List of topics to write about
max_iterations: Maximum refinement cycles per topic
quality_threshold: Minimum quality score to accept
Returns:
List of directories, each containing a report's formatted outputs
"""
print(f"Starting batch pipeline for {len(topics)} topics...")
# Fan out: refine all reports in parallel
# Each refine_report makes 2-7 LLM calls, all hitting the reusable pool
with flyte.group("refine_all"):
reports = await asyncio.gather(*[
refine_report(topic, max_iterations, quality_threshold)
for topic in topics
])
print(f"All {len(reports)} reports refined, formatting outputs...")
# Fan out: format all reports in parallel
with flyte.group("format_all"):
outputs = await asyncio.gather(*[
format_outputs(report)
for report in reports
])
print(f"Batch pipeline complete! Generated {len(outputs)} reports.")
return outputs
Pipeline flow
- Fan out refine_all: Process all topics in parallel using
asyncio.gather - Fan out format_all: Format all reports in parallel
- Return list of Dirs: Each directory contains one report’s outputs
With 5 topics, each making ~7 LLM calls, the reusable container pool handles ~35 LLM calls efficiently without cold starts.
Running the pipeline
To run the batch pipeline:
if __name__ == "__main__":
flyte.init_from_config()
# Multiple topics to generate reports for
topics = [
"The Impact of Large Language Models on Software Development",
"Edge Computing: Bringing AI to IoT Devices",
"Quantum Computing: Current State and Near-Term Applications",
"The Rise of Rust in Systems Programming",
"WebAssembly: The Future of Browser-Based Applications",
]
print(f"Submitting batch run for {len(topics)} topics...")
import sys
sys.stdout.flush()
# Run the batch pipeline - this will generate all reports in parallel,
# with the reusable container pool handling 5 topics × ~7 LLM calls each
run = flyte.run(
report_batch_pipeline,
topics=topics,
max_iterations=3,
quality_threshold=8,
)
print(f"Batch report generation run URL: {run.url}")
sys.stdout.flush()
print("Waiting for pipeline to complete (Ctrl+C to skip)...")
try:
run.wait()
print(f"Pipeline complete! Outputs: {run.outputs()}")
except KeyboardInterrupt:
print(f"\nSkipped waiting. Check status at: {run.url}")
uv run generate.pyThe pipeline will:
- Process all topics in parallel (each with iterative refinement)
- Format all reports in parallel
- Return a list of directories, each containing a report’s outputs
Cost optimization tips
1. Choose the right model
The example uses gpt-4o-mini for cost efficiency. For higher quality (at higher
cost), you could use gpt-4o or gpt-4-turbo:
response = await client.chat.completions.create(
model="gpt-4o", # More capable, more expensive
...
)2. Tune iteration parameters
Fewer iterations mean fewer API calls:
run = flyte.run(
report_batch_pipeline,
topics=["Topic A", "Topic B"],
max_iterations=2, # Limit iterations
quality_threshold=7, # Accept slightly lower quality
)3. Use caching effectively
The cache="auto" setting on the environment caches task outputs. Running the
same pipeline with the same inputs returns cached results instantly:
llm_env = flyte.TaskEnvironment(
...
cache="auto", # Cache task outputs
)4. Scale the batch
The batch pipeline already processes topics in parallel. To handle larger batches,
adjust the ReusePolicy:
reusable=flyte.ReusePolicy(
replicas=4, # More containers for larger batches
concurrency=4, # Tasks per container
...
)With 4 replicas × 4 concurrency = 16 slots, you can process 16 topics’ refinement tasks concurrently.
Next steps
Learn how to deploy a serving app that connects to the pipeline outputs and provides an interactive UI for report generation.