Grouping actions

Groups are an organizational feature in Flyte that allow you to logically cluster related task invocations (called “actions”) for better visualization and management in the UI. Groups help you organize task executions into manageable, hierarchical structures regardless of whether you’re working with large fanouts or smaller, logically-related sets of operations.

What are groups?

Groups provide a way to organize task invocations into logical units in the Flyte UI. When you have multiple task executions—whether from large fanouts, sequential operations, or any combination of tasks—groups help organize them into manageable units.

The problem groups solve

Without groups, complex workflows can become visually overwhelming in the Flyte UI:

  • Multiple task executions appear as separate nodes, making it hard to see the high-level structure
  • Related operations are scattered throughout the workflow graph
  • Debugging and monitoring becomes difficult when dealing with many individual task executions

Groups solve this by:

  • Organizing actions: Multiple task executions within a group are presented as a hierarchical “folder” structure
  • Improving UI visualization: Instead of many individual nodes cluttering the view, you see logical groups that can be collapsed or expanded
  • Aggregating status information: Groups show aggregated run status (success/failure) of their contained actions when you hover over them in the UI
  • Maintaining execution parallelism: Tasks still run concurrently as normal, but are organized for display

How groups work

Groups are declared using the flyte.group context manager. Any task invocations that occur within the with flyte.group() block are automatically associated with that group:

with flyte.group("my-group-name"):
    # All task invocations here belong to "my-group-name"
    result1 = await task_a(data)
    result2 = await task_b(data)
    result3 = await task_c(data)

The key points about groups:

  1. Context-based: Use the with flyte.group("name"): context manager.
  2. Organizational tool: Task invocations within the context are grouped together in the UI.
  3. UI folders: Groups appear as collapsible/expandable folders in the Flyte UI run tree.
  4. Status aggregation: Hover over a group in the UI to see aggregated success/failure information.
  5. Execution unchanged: Tasks still execute in parallel as normal; groups only affect organization and visualization.

Important: Groups do not aggregate outputs. Each task execution still produces its own individual outputs. Groups are purely for organization and UI presentation.

Common grouping patterns

Sequential operations

Group related sequential operations that logically belong together:

@env.task
async def data_pipeline(raw_data: str) -> str:
    with flyte.group("data-validation"):
        validated_data = await process_data(raw_data, "validate_schema")
        validated_data = await process_data(validated_data, "check_quality")
        validated_data = await process_data(validated_data, "remove_duplicates")

    with flyte.group("feature-engineering"):
        features = await process_data(validated_data, "extract_features")
        features = await process_data(features, "normalize_features")
        features = await process_data(features, "select_features")

    with flyte.group("model-training"):
        model = await process_data(features, "train_model")
        model = await process_data(model, "validate_model")
        final_model = await process_data(model, "save_model")

    return final_model

Parallel processing with groups

Groups work well with parallel execution patterns:

@env.task
async def parallel_processing_example(n: int) -> str:
    tasks = []

    with flyte.group("parallel-processing"):
        # Collect all task invocations first
        for i in range(n):
            tasks.append(process_item(i, "transform"))

        # Execute all tasks in parallel
        results = await asyncio.gather(*tasks)

    # Convert to string for consistent return type
    return f"parallel_results: {results}"

Multi-phase workflows

Use groups to organize different phases of complex workflows:

@env.task
async def multi_phase_workflow(data_size: int) -> str:
    # First phase: data preprocessing
    preprocessed = []
    with flyte.group("preprocessing"):
        for i in range(data_size):
            preprocessed.append(process_item(i, "preprocess"))
        phase1_results = await asyncio.gather(*preprocessed)

    # Second phase: main processing
    processed = []
    with flyte.group("main-processing"):
        for result in phase1_results:
            processed.append(process_item(result, "transform"))
        phase2_results = await asyncio.gather(*processed)

    # Third phase: postprocessing
    postprocessed = []
    with flyte.group("postprocessing"):
        for result in phase2_results:
            postprocessed.append(process_item(result, "postprocess"))
        final_results = await asyncio.gather(*postprocessed)

    # Convert to string for consistent return type
    return f"multi_phase_results: {final_results}"

Nested groups

Groups can be nested to create hierarchical organization:

@env.task
async def hierarchical_example(raw_data: str) -> str:
    with flyte.group("machine-learning-pipeline"):
        with flyte.group("data-preparation"):
            cleaned_data = await process_data(raw_data, "clean_data")
            split_data = await process_data(cleaned_data, "split_dataset")

        with flyte.group("model-experiments"):
            with flyte.group("hyperparameter-tuning"):
                best_params = await process_data(split_data, "tune_hyperparameters")

            with flyte.group("model-training"):
                model = await process_data(best_params, "train_final_model")
    return model

Conditional grouping

Groups can be used with conditional logic:

@env.task
async def conditional_processing(use_advanced_features: bool, input_data: str) -> str:
    base_result = await process_data(input_data, "basic_processing")

    if use_advanced_features:
        with flyte.group("advanced-features"):
            enhanced_result = await process_data(base_result, "advanced_processing")
            optimized_result = await process_data(enhanced_result, "optimize_result")
            return optimized_result
    else:
        with flyte.group("basic-features"):
            simple_result = await process_data(base_result, "simple_processing")
            return simple_result

Key insights

Groups are primarily an organizational and UI visualization tool—they don’t change how your tasks execute or aggregate their outputs, but they help organize related task invocations (actions) into collapsible folder-like structures for better workflow management and display. The aggregated status information (success/failure rates) is visible when hovering over group folders in the UI.

Groups make your Flyte workflows more maintainable and easier to understand, especially when working with complex workflows that involve multiple logical phases or large numbers of task executions. They serve as organizational “folders” in the UI’s call stack tree, allowing you to collapse sections to reduce visual distraction while still seeing aggregated status information on hover.