Serving app
The final piece is a serving application that displays generated reports and
provides an interactive interface. This demonstrates how to connect apps to
pipeline outputs using RunOutput.
App environment configuration
The AppEnvironment defines how the Streamlit application runs and connects to
the batch report pipeline:
# Define the app environment
env = AppEnvironment(
name="report-generator-app",
description="Interactive report generator with AI-powered refinement",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"streamlit>=1.41.0",
),
args=["streamlit", "run", "app.py", "--server.port", "8080"],
port=8080,
resources=flyte.Resources(cpu=1, memory="2Gi"),
parameters=[
# Connect to the batch pipeline output (list of report directories)
Parameter(
name="reports",
value=RunOutput(
task_name="driver.report_batch_pipeline",
type="directory",
),
download=True,
env_var="REPORTS_PATH",
),
],
include=["app.py"],
requires_auth=False,
)
Key configuration
| Setting | Purpose |
|---|---|
args |
Command to run the Streamlit app |
port |
Port the app listens on |
parameters |
Inputs to the app, including pipeline connections |
include |
Additional files to bundle with the app |
Connecting to pipeline output with RunOutput
The RunOutput parameter connects the app to the batch pipeline’s output:
Parameter(
name="reports",
value=RunOutput(
task_name="driver.report_batch_pipeline",
type="directory",
),
download=True,
env_var="REPORTS_PATH",
)This configuration:
- Finds the latest run of
report_batch_pipelinein thedriverenvironment - Downloads the output to local storage (
download=True) - Sets an environment variable with the path (
REPORTS_PATH)
The app can then scan this directory for all generated reports.
The Streamlit application
The app loads and displays all generated reports from the batch pipeline:
def load_report_from_dir(report_dir: str) -> dict | None:
"""Load a single report from a directory."""
if not os.path.isdir(report_dir):
return None
report = {"path": report_dir, "name": os.path.basename(report_dir)}
md_path = os.path.join(report_dir, "report.md")
if os.path.exists(md_path):
with open(md_path) as f:
report["markdown"] = f.read()
html_path = os.path.join(report_dir, "report.html")
if os.path.exists(html_path):
with open(html_path) as f:
report["html"] = f.read()
summary_path = os.path.join(report_dir, "summary.txt")
if os.path.exists(summary_path):
with open(summary_path) as f:
report["summary"] = f.read()
# Only return if we found at least markdown content
return report if "markdown" in report else None
def load_all_reports() -> list[dict]:
"""Load all reports from the batch pipeline output."""
reports_path = os.environ.get("REPORTS_PATH")
if not reports_path or not os.path.exists(reports_path):
return []
reports = []
# Check if this is a single report directory (has report.md directly)
if os.path.exists(os.path.join(reports_path, "report.md")):
report = load_report_from_dir(reports_path)
if report:
report["name"] = "Report"
reports.append(report)
else:
# Batch output: scan subdirectories for reports
for entry in sorted(os.listdir(reports_path)):
entry_path = os.path.join(reports_path, entry)
report = load_report_from_dir(entry_path)
if report:
reports.append(report)
return reports
Displaying multiple reports
The app provides a sidebar for selecting between reports when multiple are available:
reports = load_all_reports()
if reports:
# Sidebar for report selection if multiple reports
if len(reports) > 1:
st.sidebar.header("Select Report")
report_names = [f"Report {i+1}: {r['name']}" for i, r in enumerate(reports)]
selected_idx = st.sidebar.selectbox(
"Choose a report to view:",
range(len(reports)),
format_func=lambda i: report_names[i],
)
selected_report = reports[selected_idx]
st.sidebar.markdown(f"**Viewing {len(reports)} reports**")
else:
selected_report = reports[0]
st.header(f"Generated Report: {selected_report['name']}")
# Summary section
if "summary" in selected_report:
with st.expander("Executive Summary", expanded=True):
st.write(selected_report["summary"])
# Tabbed view for different formats
tab_md, tab_html = st.tabs(["Markdown", "HTML Preview"])
with tab_md:
st.markdown(selected_report.get("markdown", ""))
with tab_html:
if "html" in selected_report:
st.components.v1.html(selected_report["html"], height=600, scrolling=True)
# Download options
st.subheader("Download")
col1, col2, col3 = st.columns(3)
with col1:
if "markdown" in selected_report:
st.download_button(
label="Download Markdown",
data=selected_report["markdown"],
file_name="report.md",
mime="text/markdown",
)
with col2:
if "html" in selected_report:
st.download_button(
label="Download HTML",
data=selected_report["html"],
file_name="report.html",
mime="text/html",
)
with col3:
if "summary" in selected_report:
st.download_button(
label="Download Summary",
data=selected_report["summary"],
file_name="summary.txt",
mime="text/plain",
)
else:
st.info("No reports generated yet. Run the batch pipeline to create reports.")
Features:
- Report selector: Sidebar navigation when multiple reports exist
- Executive summary: Expandable section with key takeaways
- Tabbed views: Switch between Markdown and HTML preview
- Download buttons: Export in any format
Generation instructions
The app includes instructions for generating new reports:
st.divider()
st.header("Generate New Reports")
st.write("""
To generate reports, run the batch pipeline:
```bash
uv run generate.py
```
This generates reports for multiple topics in parallel, demonstrating
how ReusePolicy efficiently handles many concurrent LLM calls.
""")
# Show pipeline parameters info
with st.expander("Pipeline Parameters"):
st.markdown("""
**Available parameters:**
| Parameter | Default | Description |
|-----------|---------|-------------|
| `topics` | (required) | List of topics to write about |
| `max_iterations` | 3 | Maximum refinement cycles per topic |
| `quality_threshold` | 8 | Minimum score (1-10) to accept |
**Example:**
```python
run = flyte.run(
report_batch_pipeline,
topics=[
"The Future of Renewable Energy",
"Advances in Quantum Computing",
"The Rise of Edge AI",
],
max_iterations=3,
quality_threshold=8,
)
```
""")
Deploying the app
To deploy the report generator application:
if __name__ == "__main__":
flyte.init_from_config()
# Deploy the report generator app
print("Deploying report generator app...")
deployment = flyte.serve(env)
print(f"App deployed at: {deployment.url}")
uv run serve.pyThe deployment process:
- Builds a container image with the app code
- Deploys the app to Union.ai
- Connects to the latest pipeline output
- Returns the app URL
Workflow: Generate then view
The typical workflow is:
-
Run the batch pipeline to generate reports:
uv run generate.py -
Deploy or refresh the app to view results:
uv run serve.py -
Access the app at the provided URL and browse all generated reports
The app automatically picks up the latest pipeline run, so you can generate new batches and always see the most recent results.
Automatic updates with RunOutput
The RunOutput connection is evaluated at app startup. Each time the app
restarts or redeploys, it fetches the latest batch pipeline output.
For real-time updates without redeployment, you could:
- Poll for new runs using the Flyte API
- Implement a webhook that triggers app refresh
- Use a database to track run status
Complete example structure
Here’s the full project structure:
advanced-project/
├── generate.py # Main pipeline with agentic refinement
├── prompts.py # System prompts and Pydantic models
├── serve.py # App deployment configuration
└── app.py # Streamlit user interfaceRunning the complete example
-
Set up the secret:
flyte secret create openai-api-key -
Run the pipeline:
cd /path/to/unionai-examples/v2/user-guide/advanced-project uv run generate.py -
Deploy the app:
uv run serve.py -
Open the app URL and view your generated report
Summary
This example demonstrated:
| Feature | What it does |
|---|---|
ReusePolicy |
Keeps containers warm for high-throughput batch processing |
@flyte.trace |
Checkpoints LLM calls for recovery and observability |
RetryStrategy |
Handles transient API failures gracefully |
flyte.group |
Organizes parallel batches and iterations in the UI |
asyncio.gather |
Fans out to process multiple topics concurrently |
| Pydantic models | Structured LLM outputs |
AppEnvironment |
Deploys interactive Streamlit apps |
RunOutput |
Connects apps to pipeline outputs |
These patterns form the foundation for building production-grade AI workflows that are resilient, observable, and cost-efficient at scale.