flyte.durable
Flyte durable utilities.
This module provides deterministic, crash-resilient replacements for time-related functions.
Usage of time.time(), time.sleep() or asyncio.sleep() introduces non-determinism.
The utilities here persist state across crashes and restarts, making workflows durable.
- :func:
sleep- a durable replacement fortime.sleep/asyncio.sleep - :func:
time- a durable replacement fortime.time - :func:
now- a durable replacement fordatetime.now
Directory
Methods
| Method | Description |
|---|---|
now() |
Returns the current time for every unique invocation of durable_time. |
sleep() |
durable_sleep enables the process to sleep for seconds seconds even if the process recovers from a crash. |
time() |
Returns the current time for every unique invocation of durable_time. |
Methods
now()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await now.aio().
def now()Returns the current time for every unique invocation of durable_time. If the same invocation is encountered
the previously returned time is returned again, ensuring determinism.
Similar to using datetime.now() just durable!
Returns: datetime.datetime
sleep()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await sleep.aio().
def sleep(
seconds: float,
)durable_sleep enables the process to sleep for seconds seconds even if the process recovers from a crash.
This method can be invoked multiple times. If the process crashes, the invocation of durable_sleep will behave
like as-if the process has been sleeping since the first time this method was invoked.
Examples:
import flyte.durable
env = flyte.TaskEnvironment("env")
@env.task
async def main():
# Do something
my_work()
# Now we need to sleep for 1 hour before proceeding.
await flyte.durable.sleep.aio(3600) # Even if process crashes, it will resume and only sleep for
# 1 hour in agregate. If the scheduling takes longer, it
# will simply return immediately.
# thing to be done after 1 hour
my_work()| Parameter | Type | Description |
|---|---|---|
seconds |
float |
float time to sleep for |
time()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await time.aio().
def time()Returns the current time for every unique invocation of durable_time. If the same invocation is encountered again
the previously returned time is returned again, ensuring determinism.
Similar to using time.time() just durable!
Returns: float