Retries and timeouts
Flyte provides robust error handling through configurable retry strategies and timeout controls. These parameters help ensure task reliability and prevent resource waste from runaway processes.
Retries
The retries parameter controls how many times a failed task should be retried before giving up.
A “retry” is any attempt after the initial attempt.
In other words, retries=3 means the task may be attempted up to 4 times in total (1 initial + 3 retries).
The retries parameter can be configured in either the @env.task decorator or using override when invoking the task.
It cannot be configured in the TaskEnvironment definition.
The code for the examples below can be found on GitHub.
Retry example
First we import the required modules and set up a task environment:
import random
from datetime import timedelta
import flyte
env = flyte.TaskEnvironment(name="my-env")Then we configure our task to retry up to 3 times if it fails (for a total of 4 attempts). We also define the driver task main that calls the retry task:
@env.task(retries=3)
async def retry() -> str:
if random.random() < 0.7: # 70% failure rate
raise Exception("Task failed!")
return "Success!"
@env.task
async def main() -> list[str]:
results = []
try:
results.append(await retry())
except Exception as e:
results.append(f"Failed: {e}")
try:
results.append(await retry.override(retries=5)())
except Exception as e:
results.append(f"Failed: {e}")
return resultsNote that we call retry twice: first without any override, and then with an override to increase the retries to 5 (for a total of 6 attempts).
Finally, we configure flyte and invoke the main task:
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()Timeouts
The timeout parameter sets limits on how long a task can run, preventing resource waste from stuck processes.
It supports multiple formats for different use cases.
The timeout parameter can be configured in either the @env.task decorator or using override when invoking the task.
It cannot be configured in the TaskEnvironment definition.
The code for the example below can be found on GitHub.
Timeout example
First, we import the required modules and set up a task environment:
import random
from datetime import timedelta
import asyncio
import flyte
from flyte import Timeout
env = flyte.TaskEnvironment(name="my-env")Our first task sets a timeout using seconds as an integer:
@env.task(timeout=60) # 60 seconds
async def timeout_seconds() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_seconds completed"We can also set a timeout using a timedelta object for more readable durations:
@env.task(timeout=timedelta(minutes=1))
async def timeout_timedelta() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_timedelta completed"You can also set separate timeouts for maximum execution time and maximum queue time using the Timeout class:
@env.task(timeout=Timeout(
max_runtime=timedelta(minutes=1), # Max execution time per attempt
max_queued_time=timedelta(minutes=1) # Max time in queue before starting
))
async def timeout_advanced() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_advanced completed"You can also combine retries and timeouts for resilience and resource control:
@env.task(
retries=3,
timeout=Timeout(
max_runtime=timedelta(minutes=1),
max_queued_time=timedelta(minutes=1)
)
)
async def timeout_with_retry() -> str:
await asyncio.sleep(random.randint(0, 120)) # Random wait between 0 and 120 seconds
return "timeout_advanced completed"Here we specify:
- Up to 3 retry attempts.
- Each attempt times out after 1 minute.
- Task fails if queued for more than 1 minute.
- Total possible runtime: 1 minute queue + (1 minute × 3 attempts).
We define the main driver task that calls all the timeout tasks concurrently and returns their outputs as a list. The return value for failed tasks will indicate failure:
@env.task
async def main() -> list[str]:
tasks = [
timeout_seconds(),
timeout_seconds.override(timeout=120)(), # Override to 120 seconds
timeout_timedelta(),
timeout_advanced(),
timeout_with_retry(),
]
results = await asyncio.gather(*tasks, return_exceptions=True)
output = []
for r in results:
if isinstance(r, Exception):
output.append(f"Failed: {r}")
else:
output.append(r)
return outputNote that we also demonstrate overriding the timeout for timeout_seconds to 120 seconds when calling it.
Finally, we configure Flyte and invoke the main task:
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()Proper retry and timeout configuration ensures your Flyte workflows are both reliable and efficient, handling transient failures gracefully while preventing resource waste.