Serving app

The final piece is a serving application that displays generated reports and provides an interactive interface. This demonstrates how to connect apps to pipeline outputs using RunOutput.

App environment configuration

The AppEnvironment defines how the Streamlit application runs and connects to the batch report pipeline:

serve.py
# Define the app environment
env = AppEnvironment(
    name="report-generator-app",
    description="Interactive report generator with AI-powered refinement",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "streamlit>=1.41.0",
    ),
    args=["streamlit", "run", "app.py", "--server.port", "8080"],
    port=8080,
    resources=flyte.Resources(cpu=1, memory="2Gi"),
    parameters=[
        # Connect to the batch pipeline output (list of report directories)
        Parameter(
            name="reports",
            value=RunOutput(
                task_name="driver.report_batch_pipeline",
                type="directory",
            ),
            download=True,
            env_var="REPORTS_PATH",
        ),
    ],
    include=["app.py"],
    requires_auth=False,
)

Key configuration

Setting Purpose
args Command to run the Streamlit app
port Port the app listens on
parameters Inputs to the app, including pipeline connections
include Additional files to bundle with the app

Connecting to pipeline output with RunOutput

The RunOutput parameter connects the app to the batch pipeline’s output:

Parameter(
    name="reports",
    value=RunOutput(
        task_name="driver.report_batch_pipeline",
        type="directory",
    ),
    download=True,
    env_var="REPORTS_PATH",
)

This configuration:

  1. Finds the latest run of report_batch_pipeline in the driver environment
  2. Downloads the output to local storage (download=True)
  3. Sets an environment variable with the path (REPORTS_PATH)

The app can then scan this directory for all generated reports.

The Streamlit application

The app loads and displays all generated reports from the batch pipeline:

app.py
def load_report_from_dir(report_dir: str) -> dict | None:
    """Load a single report from a directory."""
    if not os.path.isdir(report_dir):
        return None

    report = {"path": report_dir, "name": os.path.basename(report_dir)}

    md_path = os.path.join(report_dir, "report.md")
    if os.path.exists(md_path):
        with open(md_path) as f:
            report["markdown"] = f.read()

    html_path = os.path.join(report_dir, "report.html")
    if os.path.exists(html_path):
        with open(html_path) as f:
            report["html"] = f.read()

    summary_path = os.path.join(report_dir, "summary.txt")
    if os.path.exists(summary_path):
        with open(summary_path) as f:
            report["summary"] = f.read()

    # Only return if we found at least markdown content
    return report if "markdown" in report else None


def load_all_reports() -> list[dict]:
    """Load all reports from the batch pipeline output."""
    reports_path = os.environ.get("REPORTS_PATH")
    if not reports_path or not os.path.exists(reports_path):
        return []

    reports = []

    # Check if this is a single report directory (has report.md directly)
    if os.path.exists(os.path.join(reports_path, "report.md")):
        report = load_report_from_dir(reports_path)
        if report:
            report["name"] = "Report"
            reports.append(report)
    else:
        # Batch output: scan subdirectories for reports
        for entry in sorted(os.listdir(reports_path)):
            entry_path = os.path.join(reports_path, entry)
            report = load_report_from_dir(entry_path)
            if report:
                reports.append(report)

    return reports

Displaying multiple reports

The app provides a sidebar for selecting between reports when multiple are available:

app.py
reports = load_all_reports()

if reports:
    # Sidebar for report selection if multiple reports
    if len(reports) > 1:
        st.sidebar.header("Select Report")
        report_names = [f"Report {i+1}: {r['name']}" for i, r in enumerate(reports)]
        selected_idx = st.sidebar.selectbox(
            "Choose a report to view:",
            range(len(reports)),
            format_func=lambda i: report_names[i],
        )
        selected_report = reports[selected_idx]
        st.sidebar.markdown(f"**Viewing {len(reports)} reports**")
    else:
        selected_report = reports[0]

    st.header(f"Generated Report: {selected_report['name']}")

    # Summary section
    if "summary" in selected_report:
        with st.expander("Executive Summary", expanded=True):
            st.write(selected_report["summary"])

    # Tabbed view for different formats
    tab_md, tab_html = st.tabs(["Markdown", "HTML Preview"])

    with tab_md:
        st.markdown(selected_report.get("markdown", ""))

    with tab_html:
        if "html" in selected_report:
            st.components.v1.html(selected_report["html"], height=600, scrolling=True)

    # Download options
    st.subheader("Download")
    col1, col2, col3 = st.columns(3)

    with col1:
        if "markdown" in selected_report:
            st.download_button(
                label="Download Markdown",
                data=selected_report["markdown"],
                file_name="report.md",
                mime="text/markdown",
            )

    with col2:
        if "html" in selected_report:
            st.download_button(
                label="Download HTML",
                data=selected_report["html"],
                file_name="report.html",
                mime="text/html",
            )

    with col3:
        if "summary" in selected_report:
            st.download_button(
                label="Download Summary",
                data=selected_report["summary"],
                file_name="summary.txt",
                mime="text/plain",
            )

else:
    st.info("No reports generated yet. Run the batch pipeline to create reports.")

Features:

  • Report selector: Sidebar navigation when multiple reports exist
  • Executive summary: Expandable section with key takeaways
  • Tabbed views: Switch between Markdown and HTML preview
  • Download buttons: Export in any format

Generation instructions

The app includes instructions for generating new reports:

app.py
st.divider()
st.header("Generate New Reports")
st.write("""
To generate reports, run the batch pipeline:

```bash
uv run generate.py
```

This generates reports for multiple topics in parallel, demonstrating
how ReusePolicy efficiently handles many concurrent LLM calls.
""")

# Show pipeline parameters info
with st.expander("Pipeline Parameters"):
    st.markdown("""
    **Available parameters:**

    | Parameter | Default | Description |
    |-----------|---------|-------------|
    | `topics` | (required) | List of topics to write about |
    | `max_iterations` | 3 | Maximum refinement cycles per topic |
    | `quality_threshold` | 8 | Minimum score (1-10) to accept |

    **Example:**
    ```python
    run = flyte.run(
        report_batch_pipeline,
        topics=[
            "The Future of Renewable Energy",
            "Advances in Quantum Computing",
            "The Rise of Edge AI",
        ],
        max_iterations=3,
        quality_threshold=8,
    )
    ```
    """)

Deploying the app

To deploy the report generator application:

serve.py
if __name__ == "__main__":
    flyte.init_from_config()

    # Deploy the report generator app
    print("Deploying report generator app...")
    deployment = flyte.serve(env)
    print(f"App deployed at: {deployment.url}")
uv run serve.py

The deployment process:

  1. Builds a container image with the app code
  2. Deploys the app to Union.ai
  3. Connects to the latest pipeline output
  4. Returns the app URL

Workflow: Generate then view

The typical workflow is:

  1. Run the batch pipeline to generate reports:

    uv run generate.py
  2. Deploy or refresh the app to view results:

    uv run serve.py
  3. Access the app at the provided URL and browse all generated reports

The app automatically picks up the latest pipeline run, so you can generate new batches and always see the most recent results.

Automatic updates with RunOutput

The RunOutput connection is evaluated at app startup. Each time the app restarts or redeploys, it fetches the latest batch pipeline output.

For real-time updates without redeployment, you could:

  1. Poll for new runs using the Flyte API
  2. Implement a webhook that triggers app refresh
  3. Use a database to track run status

Complete example structure

Here’s the full project structure:

advanced-project/
├── generate.py    # Main pipeline with agentic refinement
├── prompts.py     # System prompts and Pydantic models
├── serve.py       # App deployment configuration
└── app.py         # Streamlit user interface

Running the complete example

  1. Set up the secret:

    flyte secret create openai-api-key
  2. Run the pipeline:

    cd /path/to/unionai-examples/v2/user-guide/advanced-project
    uv run generate.py
  3. Deploy the app:

    uv run serve.py
  4. Open the app URL and view your generated report

Summary

This example demonstrated:

Feature What it does
ReusePolicy Keeps containers warm for high-throughput batch processing
@flyte.trace Checkpoints LLM calls for recovery and observability
RetryStrategy Handles transient API failures gracefully
flyte.group Organizes parallel batches and iterations in the UI
asyncio.gather Fans out to process multiple topics concurrently
Pydantic models Structured LLM outputs
AppEnvironment Deploys interactive Streamlit apps
RunOutput Connects apps to pipeline outputs

These patterns form the foundation for building production-grade AI workflows that are resilient, observable, and cost-efficient at scale.