This guide covers advanced programming patterns and techniques for working with Flyte tasks.
Task Forwarding
When one task calls another task using the normal invocation syntax (e.g., await inner_task(x)), Flyte creates a durable action that’s recorded in the UI with data passed through the metadata store. However, if you want to execute a task in the same Python VM without this overhead, use the .forward() method.
When to use: You want to avoid durability overhead and execute task logic directly in the current VM.
import flyte
env = flyte.TaskEnvironment("my-env")
@env.task
async def inner_task(x: int) -> int:
return x + 1
@env.task
async def outer_task(x: int) -> int:
# Executes in same VM, no durable action created
v = await inner_task.forward(x=10)
# Creates a durable action, recorded in UI
return await inner_task(v)The .forward() method works with both sync and async tasks:
@env.task
def sync_inner_task(x: int) -> int:
return x + 1
@env.task
def sync_outer_task(x: int) -> int:
# Direct execution, no remote call
v = sync_inner_task.forward(x=10)
return sync_inner_task(v)Passing Tasks and Functions as Arguments
You can pass both Flyte tasks and regular Python functions as arguments to other tasks. Flyte handles this through pickling, so the code appears as pickled data in the UI.
import typing
import flyte
env = flyte.TaskEnvironment("udfs")
@env.task
async def add_one_udf(x: int) -> int:
return x + 1
# Regular async function (not a task)
async def fn_add_two_udf(x: int) -> int:
return x + 2
@env.task
async def run_udf(x: int, udf: typing.Callable[[int], typing.Awaitable[int]]) -> int:
return await udf(x)
@env.task
async def main() -> list[int]:
# Pass a Flyte task as an argument
result_one = await run_udf(5, add_one_udf)
# Pass a regular function as an argument
result_two = await run_udf(5, fn_add_two_udf)
return [result_one, result_two]Note: Both tasks and regular functions are serialized via pickling when passed as arguments.
Custom Action Names
By default, actions in the UI use the task’s function name. You can provide custom, user-friendly names using the short_name parameter.
Set at Task Definition
import flyte
env = flyte.TaskEnvironment("friendly_names")
@env.task(short_name="my_task")
async def some_task() -> str:
return "Hello, Flyte!"Override at Call Time
@env.task(short_name="entrypoint")
async def main() -> str:
# Uses the default short_name "my_task"
s = await some_task()
# Overrides to use "my_name" for this specific action
return s + await some_task.override(short_name="my_name")()This is useful when the same task is called multiple times with different contexts, making the UI more readable.
Invoking Async Functions from Sync Tasks
When migrating from Flyte 1.x to 2.0, you may have legacy sync code that needs to call async functions. Use nest_asyncio.apply() to enable asyncio.run() within sync tasks.
import asyncio
import nest_asyncio
import flyte
env = flyte.TaskEnvironment(
"async_in_sync",
image=flyte.Image.from_debian_base().with_pip_packages("nest_asyncio"),
)
# Apply at module level
nest_asyncio.apply()
async def async_function() -> str:
await asyncio.sleep(1)
return "done"
@env.task
def sync_task() -> str:
# Now you can use asyncio.run() in a sync task
return asyncio.run(async_function())Important:
- Call
nest_asyncio.apply()at the module level before defining tasks - Add
nest_asyncioto your image dependencies - This is particularly useful during migration when you have mixed sync/async code
Async and Sync Task Interoperability
When migrating from older sync-based code to async tasks, or when working with mixed codebases, you need to call sync tasks from async parent tasks. Flyte provides the .aio method on every task (even sync ones) to enable this.
Calling Sync Tasks from Async Tasks
Every sync task automatically has an .aio property that returns an async-compatible version:
import flyte
env = flyte.TaskEnvironment("mixed-tasks")
@env.task
def sync_task(x: int) -> str:
"""Legacy sync task"""
return f"Processed {x}"
@env.task
async def async_task(x: int) -> str:
"""New async task that calls legacy sync task"""
# Use .aio to call sync task from async context
result = await sync_task.aio(x)
return resultUsing with flyte.map.aio()
When you need to call sync tasks in parallel from an async context, use flyte.map.aio():
from typing import List
import flyte
env = flyte.TaskEnvironment("map-example")
@env.task
def sync_process(x: int) -> str:
"""Synchronous processing task"""
return f"Task {x}"
@env.task
async def async_main(n: int) -> List[str]:
"""Async task that maps over sync task"""
results = []
# Map over sync task from async context
async for result in flyte.map.aio(sync_process, range(n)):
if isinstance(result, Exception):
raise result
results.append(result)
return resultsWhy this matters: This pattern is powerful when migrating from Flyte 1.x or integrating legacy sync tasks with new async code. You don’t need to rewrite all sync tasks at once—they can be called seamlessly from async contexts.
Using AnyIO in Async Tasks
Flyte async tasks support anyio for structured concurrency as an alternative to asyncio.gather().
import anyio
import aioresult
import flyte
env = flyte.TaskEnvironment(
"anyio_example",
image=flyte.Image.from_debian_base().with_pip_packages("anyio", "aioresult"),
)
@env.task
async def process_item(x: int) -> int:
return x * 2
@env.task
async def batch_process(items: list[int]) -> list[int]:
captured_results = []
async with anyio.create_task_group() as tg:
# Start multiple tasks concurrently
for item in items:
captured_results.append(
aioresult.ResultCapture.start_soon(tg, process_item, item)
)
# Extract results
return [r.result() for r in captured_results]Note: You can use anyio’s task groups, timeouts, and other structured concurrency primitives within Flyte async tasks.