Traces

The @flyte.trace decorator provides fine-grained observability and resumption capabilities for functions called within your Flyte workflows. Traces are used on helper functions that tasks call to perform specific operations like API calls, data processing, or computations. Traces are particularly useful for managing the challenges of non-deterministic behavior in workflows, allowing you to track execution details and resume from failures.

What are traced functions for?

At the top level, Flyte workflows are composed of tasks. But it is also common practice to break down complex task logic into smaller, reusable functions by defining helper functions that tasks call to perform specific operations.

Any helper functions defined or imported into the same file as a task definition are automatically uploaded to the Flyte environment alongside the task when it is deployed.

At the task level, observability and resumption of failed executions is provided by caching, but what if you want these capabilities at a more granular level, for the individual operations that tasks perform?

This is where traced functions come in. By decorating helper functions with @flyte.trace, you enable:

  • Detailed observability: Track execution time, inputs/outputs, and errors for each function call.
  • Fine-grained resumption: If a workflow fails, resume from the last successful traced function instead of re-running the entire task. Each traced function is effectively a checkpoint within its task.

Here is an example:

import asyncio

import flyte

env = flyte.TaskEnvironment("env")


@flyte.trace
async def call_llm(prompt: str) -> str:
    await asyncio.sleep(0.1)
    return f"LLM response for: {prompt}"


@flyte.trace
async def process_data(data: str) -> dict:
    await asyncio.sleep(0.2)
    return {"processed": data, "status": "completed"}


@env.task
async def research_workflow(topic: str) -> dict:
    llm_result = await call_llm(f"Generate research plan for: {topic}")
    processed_data = await process_data(llm_result)
    return {"topic": topic, "result": processed_data}

What Gets Traced

Traces capture detailed execution information:

  • Execution time: How long each function call takes.
  • Inputs and outputs: Function parameters and return values.
  • Checkpoints: State that enables workflow resumption.

Errors are not recorded

Only successful trace executions are recorded in the checkpoint system. When a traced function fails, the exception propagates up to your task code where you can handle it with standard error handling patterns.

Supported Function Types

The trace decorator works with:

  • Asynchronous functions: Functions defined with async def.
  • Generator functions: Functions that yield values.
  • Async generators: Functions that async yield values.

Currently tracing only works for asynchronous functions. Tracing of synchronous functions is coming soon.

@flyte.trace
async def async_api_call(topic: str) -> dict:
    # Asynchronous API call
    await asyncio.sleep(0.1)
    return {"data": ["item1", "item2", "item3"], "status": "success"}

@flyte.trace
async def stream_data(items: list[str]):
    # Async generator function for streaming
    for item in items:
        await asyncio.sleep(0.02)
        yield f"Processing: {item}"

@flyte.trace
async def async_stream_llm(prompt: str):
    # Async generator for streaming LLM responses
    chunks = ["Research shows", " that machine learning", " continues to evolve."]
    for chunk in chunks:
        await asyncio.sleep(0.05)
        yield chunk

@env.task
async def research_workflow(topic: str) -> dict:
    llm_result = await async_api_call(topic)

    # Collect async generator results
    processed_data = []
    async for item in stream_data(llm_result["data"]):
        processed_data.append(item)

    llm_stream = []
    async for chunk in async_stream_llm(f"Summarize research on {topic}"):
        llm_stream.append(chunk)

    return {
        "topic": topic,
        "processed_data": processed_data,
        "llm_summary": "".join(llm_stream)
    }

Task Orchestration Pattern

The typical Flyte workflow follows this pattern:

@flyte.trace
async def search_web(query: str) -> list[dict]:
    # Search the web and return results
    await asyncio.sleep(0.1)
    return [{"title": f"Article about {query}", "content": f"Content on {query}"}]

@flyte.trace
async def summarize_content(content: str) -> str:
    # Summarize content using LLM
    await asyncio.sleep(0.1)
    return f"Summary of {len(content.split())} words"

@flyte.trace
async def extract_insights(summaries: list[str]) -> dict:
    # Extract insights from summaries
    await asyncio.sleep(0.1)
    return {"insights": ["key theme 1", "key theme 2"], "count": len(summaries)}

@env.task
async def research_pipeline(topic: str) -> dict:
    # Each helper function creates a checkpoint
    search_results = await search_web(f"research on {topic}")

    summaries = []
    for result in search_results:
        summary = await summarize_content(result["content"])
        summaries.append(summary)

    final_insights = await extract_insights(summaries)

    return {
        "topic": topic,
        "insights": final_insights,
        "sources_count": len(search_results)
    }

Benefits of this pattern:

  • If search_web succeeds but summarize_content fails, resumption skips the search step
  • Each operation is independently observable and debuggable
  • Clear separation between workflow coordination (task) and execution (traced functions)

Relationship to Caching and Checkpointing

Understanding how traces work with Flyte’s other execution features:

Feature Scope Purpose Default Behavior
Task Caching Entire task execution (@env.task) Skip re-running tasks with same inputs Enabled (cache="auto")
Traces Individual helper functions Observability and fine-grained resumption Manual (requires @flyte.trace)
Checkpointing Workflow state Resume workflows from failure points Automatic when traces are used

How They Work Together

@flyte.trace
async def traced_data_cleaning(dataset_id: str) -> List[str]:
    # Creates checkpoint after successful execution.
    await asyncio.sleep(0.2)
    return [f"cleaned_record_{i}_{dataset_id}" for i in range(100)]

@flyte.trace
async def traced_feature_extraction(data: List[str]) -> dict:
    # Creates checkpoint after successful execution.
    await asyncio.sleep(0.3)
    return {
        "features": [f"feature_{i}" for i in range(10)],
        "feature_count": len(data),
        "processed_samples": len(data)
    }

@flyte.trace
async def traced_model_training(features: dict) -> dict:
    # Creates checkpoint after successful execution.
    await asyncio.sleep(0.4)
    sample_count = features["processed_samples"]
    # Mock accuracy based on sample count
    accuracy = min(0.95, 0.7 + (sample_count / 1000))
    return {
        "accuracy": accuracy,
        "epochs": 50,
        "model_size": "125MB"
    }

@env.task(cache="auto")  # Task-level caching enabled
async def data_pipeline(dataset_id: str) -> dict:
    # 1. If this exact task with these inputs ran before,
    #    the entire task result is returned from cache

    # 2. If not cached, execution begins and each traced function
    #    creates checkpoints for resumption
    cleaned_data = await traced_data_cleaning(dataset_id)      # Checkpoint 1
    features = await traced_feature_extraction(cleaned_data)   # Checkpoint 2
    model_results = await traced_model_training(features)      # Checkpoint 3

    # 3. If workflow fails at step 3, resumption will:
    #    - Skip traced_data_cleaning (checkpointed)
    #    - Skip traced_feature_extraction (checkpointed)
    #    - Re-run only traced_model_training

    return {"dataset_id": dataset_id, "accuracy": model_results["accuracy"]}

Execution Flow

  1. Task Submission: Task is submitted with input parameters
  2. Cache Check: Flyte checks if identical task execution exists in cache
  3. Cache Hit: If cached, return cached result immediately (no traces needed)
  4. Cache Miss: Begin fresh execution
  5. Trace Checkpoints: Each @flyte.trace function creates resumption points
  6. Failure Recovery: If workflow fails, resume from last successful checkpoint
  7. Task Completion: Final result is cached for future identical inputs