The page you navigated to () does not exist, so we brought you to the closest page to it.
You have switched from the to the variant of this site. There is no equivalent of . We have taken you to the closest page in the variant.
DynamicBatcher
Package: flyte.extras
Batches records from many concurrent producers and runs them through a single async processing function, maximizing resource utilization.
The batcher runs two internal loops:
- Aggregation loop — drains the submission queue and assembles
cost-budgeted batches, respecting
target_batch_cost,max_batch_size, andbatch_timeout_s. - Processing loop — pulls assembled batches and calls
process_fn, resolving each record’sasyncio.Future.
Type Parameters:
RecordT: The input record type produced by your tasks.
ResultT: The per-record output type returned by process_fn.
Parameters
class DynamicBatcher(
process_fn: ProcessFn[RecordT, ResultT],
cost_estimator: CostEstimatorFn[RecordT] | None,
target_batch_cost: int,
max_batch_size: int,
min_batch_size: int,
batch_timeout_s: float,
max_queue_size: int,
prefetch_batches: int,
default_cost: int,
)| Parameter | Type | Description |
|---|---|---|
process_fn |
ProcessFn[RecordT, ResultT] |
async def f(batch: list[RecordT]) -> list[ResultT] Must return results in the same order as the input batch. |
cost_estimator |
CostEstimatorFn[RecordT] | None |
Optional (RecordT) -> int function. When provided, it is called to estimate the cost of each submitted record. Falls back to record.estimate_cost() if the record implements CostEstimator, then to default_cost. |
target_batch_cost |
int |
Cost budget per batch. The aggregator fills batches up to this limit before dispatching. |
max_batch_size |
int |
Hard cap on records per batch regardless of cost budget. |
min_batch_size |
int |
Minimum records before dispatching. Ignored when the timeout fires or shutdown is in progress. |
batch_timeout_s |
float |
Maximum seconds to wait for a full batch. Lower values reduce idle time but may produce smaller batches. |
max_queue_size |
int |
Bounded queue size. When full, submit awaits (backpressure). |
prefetch_batches |
int |
Number of pre-assembled batches to buffer between the aggregation and processing loops. |
default_cost |
int |
Fallback cost when no estimator is available. |
Properties
| Property | Type | Description |
|---|---|---|
is_running |
None |
Whether the aggregation and processing loops are active. |
stats |
None |
Current BatchStats snapshot. |
Methods
| Method | Description |
|---|---|
start() |
Start the aggregation and processing loops. |
stop() |
Graceful shutdown: process all enqueued work, then stop. |
submit() |
Submit a single record for batched processing. |
submit_batch() |
Convenience: submit multiple records and return their futures. |
start()
def start()Start the aggregation and processing loops.
Raises
| Exception | Description |
|---|---|
RuntimeError |
If the batcher is already running. |
stop()
def stop()Graceful shutdown: process all enqueued work, then stop.
Blocks until every pending future is resolved.
submit()
def submit(
record: RecordT,
estimated_cost: int | None,
) -> asyncio.Future[ResultT]Submit a single record for batched processing.
Returns an asyncio.Future that resolves once the batch
containing this record has been processed.
| Parameter | Type | Description |
|---|---|---|
record |
RecordT |
The input record. |
estimated_cost |
int | None |
Optional explicit cost. When omitted the batcher tries cost_estimator, then record.estimate_cost(), then default_cost. |
Returns
A future whose result is the corresponding entry from the list
returned by process_fn.
Raises
| Exception | Description |
|---|---|
RuntimeError |
If the batcher is not running. |
If the internal queue is full this coroutine awaits until space is available, providing natural backpressure to fast producers.
submit_batch()
def submit_batch(
records: Sequence[RecordT],
estimated_cost: Sequence[int] | None,
) -> list[asyncio.Future[ResultT]]Convenience: submit multiple records and return their futures.
| Parameter | Type | Description |
|---|---|---|
records |
Sequence[RecordT] |
Iterable of input records. |
estimated_cost |
Sequence[int] | None |
Optional per-record cost estimates. Length must match records when provided. |
Returns: List of futures, one per record.