Remote tasks
Remote tasks let you use previously deployed tasks without importing their code or dependencies. This enables teams to share and reuse tasks without managing complex dependency chains or container images.
Prerequisites
Remote tasks must be deployed before you can use them. See the task deployment guide for details.
Basic usage
Use flyte.remote.Task.get() to reference a deployed task:
import flyte
import flyte.remote
env = flyte.TaskEnvironment(name="my_env")
# Get the latest version of a deployed task
data_processor = flyte.remote.Task.get(
"data_team.spark_analyzer",
auto_version="latest"
)
# Use it in your task
@env.task
async def my_task(data_path: str) -> flyte.io.DataFrame:
# Call the reference task like any other task
result = await data_processor(input_path=data_path)
return resultYou can run this directly without deploying it:
flyte run my_workflow.py my_task --data_path s3://my-bucket/data.parquetUnderstanding lazy loading
Remote tasks use lazy loading to keep module imports fast and enable flexible client configuration. When you call flyte.remote.Task.get(), it returns a lazy reference that doesn’t actually fetch the task from the server until the first invocation.
When tasks are fetched
The remote task is fetched from the server only when:
- You call
flyte.run()with the task - You call
flyte.deploy()with code that uses the task - You invoke the task with the
()operator inside another task - You explicitly call
.fetch()on the lazy reference
import flyte.remote
# This does NOT make a network call - returns a lazy reference
data_processor = flyte.remote.Task.get(
"data_team.spark_analyzer",
auto_version="latest"
)
# The task is fetched here when you invoke it
run = flyte.run(data_processor, input_path="s3://my-bucket/data.parquet")Benefits of lazy loading
Fast module loading: Since no network calls are made during import, your Python modules load quickly even when referencing many remote tasks.
Late binding: You can call flyte.init() after importing remote tasks, and the correct client will be bound when the task is actually invoked:
import flyte
import flyte.remote
# Load remote task reference at module level
data_processor = flyte.remote.Task.get(
"data_team.spark_analyzer",
auto_version="latest"
)
# Initialize the client later
flyte.init_from_config()
# The task uses the client configured above
run = flyte.run(data_processor, input_path="s3://data.parquet")Error handling
Because of lazy loading, if a referenced task doesn’t exist, you won’t get an error when calling get(). Instead, the error occurs during invocation, raising a flyte.errors.ReferenceTaskError:
import flyte
import flyte.remote
import flyte.errors
# This succeeds even if the task doesn't exist
data_processor = flyte.remote.Task.get(
"nonexistent.task",
auto_version="latest"
)
try:
# Error occurs here during invocation
run = flyte.run(data_processor, input_path="s3://data.parquet")
except flyte.errors.ReferenceTaskError as e:
print(f"Task not found or invocation failed: {e}")
# Handle the error - perhaps use a fallback task
# or notify the user that the task needs to be deployedYou can also catch errors when using remote tasks within other tasks:
@env.task
async def pipeline_with_fallback(data_path: str) -> dict:
try:
# Try to use the remote task
result = await data_processor(input_path=data_path)
return {"status": "success", "result": result}
except flyte.errors.ReferenceTaskError as e:
# Fallback to local processing
print(f"Remote task failed: {e}, using local fallback")
return {"status": "fallback", "result": local_process(data_path)}Eager fetching with fetch()
While lazy loading is convenient, you can explicitly fetch a task upfront using the fetch() method. This is useful for:
- Catching errors early: Validate that the task exists before execution starts
- Caching: Avoid the network call on first invocation when running multiple times
- Service initialization: Pre-load tasks when your service starts
import flyte
import flyte.remote
import flyte.errors
# Get the lazy reference
data_processor = flyte.remote.Task.get(
"data_team.spark_analyzer",
auto_version="latest"
)
try:
# Eagerly fetch the task details
task_details = data_processor.fetch()
# Now the task is cached - subsequent calls won't hit the remote service
# You can pass either the original reference or task_details to flyte.run
run1 = flyte.run(data_processor, input_path="s3://data1.parquet")
run2 = flyte.run(task_details, input_path="s3://data2.parquet")
except flyte.errors.ReferenceTaskError as e:
print(f"Task validation failed at startup: {e}")
# Handle the error before any execution attemptsFor async contexts, use await fetch.aio():
import flyte.remote
async def initialize_service():
processor_ref = flyte.remote.Task.get(
"data_team.spark_analyzer",
auto_version="latest"
)
try:
# Fetch asynchronously
task_details = await processor_ref.fetch.aio()
print(f"Task {task_details.name} loaded successfully")
return processor_ref # Return the cached reference
except flyte.errors.ReferenceTaskError as e:
print(f"Failed to load task: {e}")
raise
# Initialize once at service startup
cached_processor = None
async def startup():
global cached_processor
cached_processor = await initialize_service()
# Later in your service
async def process_request(data_path: str):
# The task is already cached from initialization
# No network call on first invocation
run = flyte.run(cached_processor, input_path=data_path)
return runWhen to use eager fetching:
- Service startup: Fetch all remote tasks during initialization to validate they exist and cache them
- Multiple invocations: If you’ll invoke the same task many times, fetch once to cache it
- Fail-fast validation: Catch configuration errors before execution begins
When lazy loading is better:
- Single-use tasks: If you only invoke the task once, lazy loading is simpler
- Import-time overhead: Keep imports fast by deferring network calls
- Conditional usage: If the task may not be needed, don’t fetch it upfront
Module-level vs dynamic loading
Module-level loading (recommended): Load remote tasks at the module level for cleaner, more maintainable code:
import flyte.remote
# Module-level - clear and maintainable
data_processor = flyte.remote.Task.get(
"data_team.spark_analyzer",
auto_version="latest"
)
@env.task
async def my_task(data_path: str):
return await data_processor(input_path=data_path)Dynamic loading: You can also load remote tasks dynamically within a task if needed:
@env.task
async def dynamic_pipeline(task_name: str, data_path: str):
# Load the task based on runtime parameters
processor = flyte.remote.Task.get(
f"data_team.{task_name}",
auto_version="latest"
)
try:
result = await processor(input_path=data_path)
return result
except flyte.errors.ReferenceTaskError as e:
raise ValueError(f"Task {task_name} not found: {e}")Complete example
This example shows how different teams can collaborate using remote tasks.
Team A: Spark environment
Team A maintains Spark-based data processing tasks:
# spark_env.py
from dataclasses import dataclass
import flyte
env = flyte.TaskEnvironment(name="spark_env")
@dataclass
class AnalysisResult:
mean_value: float
std_dev: float
@env.task
async def analyze_data(data_path: str) -> AnalysisResult:
# Spark code here (not shown)
return AnalysisResult(mean_value=42.5, std_dev=3.2)
@env.task
async def compute_score(result: AnalysisResult) -> float:
# More Spark processing
return result.mean_value / result.std_devDeploy the Spark environment:
flyte deploy spark_env/Team B: ML environment
Team B maintains PyTorch-based ML tasks:
# ml_env.py
from pydantic import BaseModel
import flyte
env = flyte.TaskEnvironment(name="ml_env")
class PredictionRequest(BaseModel):
feature_x: float
feature_y: float
class Prediction(BaseModel):
score: float
confidence: float
model_version: str
@env.task
async def run_inference(request: PredictionRequest) -> Prediction:
# PyTorch model inference (not shown)
return Prediction(
score=request.feature_x * 2.5,
confidence=0.95,
model_version="v2.1"
)Deploy the ML environment:
flyte deploy ml_env/Team C: Orchestration
Team C builds a workflow using remote tasks from both teams without needing Spark or PyTorch dependencies:
# orchestration_env.py
import flyte.remote
env = flyte.TaskEnvironment(name="orchestration")
# Reference tasks from other teams
analyze_data = flyte.remote.Task.get(
"spark_env.analyze_data",
auto_version="latest"
)
compute_score = flyte.remote.Task.get(
"spark_env.compute_score",
auto_version="latest"
)
run_inference = flyte.remote.Task.get(
"ml_env.run_inference",
auto_version="latest"
)
@env.task
async def orchestrate_pipeline(data_path: str) -> float:
# Use Spark tasks without Spark dependencies
analysis = await analyze_data(data_path=data_path)
# Access attributes from the result
# (Flyte creates a fake type that allows attribute access)
print(f"Analysis: mean={analysis.mean_value}, std={analysis.std_dev}")
data_score = await compute_score(result=analysis)
# Use ML task without PyTorch dependencies
# Pass Pydantic models as dictionaries
prediction = await run_inference(
request={
"feature_x": analysis.mean_value,
"feature_y": data_score
}
)
# Access Pydantic model attributes
print(f"Prediction: {prediction.score} (confidence: {prediction.confidence})")
return prediction.scoreRun the orchestration task directly (no deployment needed):
Using Python API:
if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(
orchestrate_pipeline,
data_path="s3://my-bucket/data.parquet"
)
print(f"Execution URL: {run.url}")
# You can wait for the execution
run.wait()
# You can then retrieve the outputs
print(f"Pipeline result: {run.outputs()}")Using CLI:
flyte run orchestration_env.py orchestrate_pipeline --data_path s3://my-bucket/data.parquetInvoke remote tasks in a script.
You can also run any remote task directly using a script in a similar way
import flyte
import flyte.models
import flyte.remote
flyte.init_from_config()
# Fetch the task
remote_task = flyte.remote.Task.get("package-example.calculate_average", auto_version="latest")
# Create a run, note keyword arguments are required currently. In the future this will accept positional args based on the declaration order, but, we still recommend to use keyword args.
run = flyte.run(remote_task, numbers=[1.0, 2.0, 3.0])
print(f"Execution URL: {run.url}")
# you can view the phase
print(f"Current Phase: {run.phase}")
# You can wait for the execution
run.wait()
# Only available after flyte >= 2.0.0b39
print(f"Current phase: {run.phase}")
# Phases can be compared to
if run.phase == flyte.models.ActionPhase.SUCCEEDED:
print(f"Run completed!")
# You can then retrieve the outputs
print(f"Pipeline result: {run.outputs()}")Why use remote tasks?
Remote tasks solve common collaboration and dependency management challenges:
Cross-team collaboration: Team A has deployed a Spark task that analyzes large datasets. Team B needs this analysis for their ML pipeline but doesn’t want to learn Spark internals, install Spark dependencies, or build Spark-enabled container images. With remote tasks, Team B simply references Team A’s deployed task.
Platform reusability: Platform teams can create common, reusable tasks (data validation, feature engineering, model serving) that other teams can use without duplicating code or managing complex dependencies.
Microservices for data workflows: Remote tasks work like microservices for long-running tasks or agents, enabling secure sharing while maintaining isolation.
When to use remote tasks
Use remote tasks when you need to:
- Use functionality from another team without their dependencies
- Share common tasks across your organization
- Build reusable platform components
- Avoid dependency conflicts between different parts of your workflow
- Create modular, maintainable data pipelines
How remote tasks work
Security model
Remote tasks run in the caller’s project and domain using the caller’s compute resources, but execute with the callee’s service accounts, IAM roles, and secrets. This ensures:
- Tasks are secure from misuse
- Resource usage is properly attributed
- Authentication and authorization are maintained
- Collaboration remains safe and controlled
Type system
Remote tasks use Flyte’s default types as inputs and outputs. Flyte’s type system seamlessly translates data between tasks without requiring the original dependencies:
| Remote Task Type | Flyte Type |
|---|---|
DataFrames (pandas, polars, spark, etc.) |
flyte.io.DataFrame |
| Object store files | flyte.io.File |
| Object store directories | flyte.io.Dir |
| Pydantic models | Dictionary (Flyte creates a representation) |
Any DataFrame type (pandas, polars, spark) automatically becomes flyte.io.DataFrame, allowing seamless data exchange between tasks using different DataFrame libraries. You can also write custom integrations or explore Flyte’s plugin system for additional types.
For Pydantic models specifically, you don’t need the exact model locally. Pass a dictionary as input, and Flyte will handle the translation.
Versioning options
Reference tasks support flexible versioning:
Specific version:
task = flyte.remote.Task.get(
"team_a.process_data",
version="v1.2.3"
)Latest version (auto_version="latest"):
# Always use the most recently deployed version
task = flyte.remote.Task.get(
"team_a.process_data",
auto_version="latest"
)Current version (auto_version="current"):
# Use the same version as the calling task's deployment
# Useful when all environments deploy with the same version
# Can only be used from within a task context
task = flyte.remote.Task.get(
"team_a.process_data",
auto_version="current"
)Customizing remote tasks
Remote tasks can be customized by overriding various properties without modifying the original deployed task. This allows you to adjust resource requirements, retry strategies, caching behavior, and more based on your specific use case.
Available overrides
The override() method on remote tasks accepts the following parameters:
- short_name (
str): A short name for the task instance - resources (
flyte.Resources): CPU, memory, GPU, and storage limits - retries (
int | flyte.RetryStrategy): Number of retries or retry strategy - timeout (
flyte.TimeoutType): Task execution timeout - env_vars (
Dict[str, str]): Environment variables to set - secrets (
flyte.SecretRequest): Secrets to inject - max_inline_io_bytes (
int): Maximum size for inline IO in bytes - cache (
flyte.Cache): Cache behavior and settings - queue (
str): Execution queue to use
Override examples
Increase resources for a specific use case:
import flyte.remote
# Get the base task
data_processor = flyte.remote.Task.get(
"data_team.spark_analyzer",
auto_version="latest"
)
# Override with more resources for large dataset processing
large_data_processor = data_processor.override(
resources=flyte.Resources(
cpu="16",
memory="64Gi",
storage="200Gi"
)
)
@env.task
async def process_large_dataset(data_path: str):
# Use the customized version
return await large_data_processor(input_path=data_path)Add retries and timeout:
# Override with retries and timeout for unreliable operations
reliable_processor = data_processor.override(
retries=3,
timeout="2h"
)
@env.task
async def robust_pipeline(data_path: str):
return await reliable_processor(input_path=data_path)Configure caching:
# Override cache settings
cached_processor = data_processor.override(
cache=flyte.Cache(
behavior="override",
version_override="v2",
serialize=True
)
)Set environment variables and secrets:
# Override with custom environment and secrets
custom_processor = data_processor.override(
env_vars={
"LOG_LEVEL": "DEBUG",
"REGION": "us-west-2"
},
secrets=flyte.SecretRequest(
secrets={"api_key": "my-secret-key"}
)
)Multiple overrides:
# Combine multiple overrides
production_processor = data_processor.override(
short_name="prod_spark_analyzer",
resources=flyte.Resources(cpu="8", memory="32Gi"),
retries=5,
timeout="4h",
env_vars={"ENV": "production"},
queue="high-priority"
)
@env.task
async def production_pipeline(data_path: str):
return await production_processor(input_path=data_path)Chain overrides
You can chain multiple override() calls to incrementally adjust settings:
# Start with base task
processor = flyte.remote.Task.get("data_team.analyzer", auto_version="latest")
# Add resources
processor = processor.override(resources=flyte.Resources(cpu="4", memory="16Gi"))
# Add retries for production
if is_production:
processor = processor.override(retries=5, timeout="2h")
# Use the customized task
result = await processor(input_path="s3://data.parquet")Best practices
1. Use meaningful task names
Remote tasks are accessed by name, so use clear, descriptive naming:
# Good
customer_segmentation = flyte.remote.Task.get("ml_platform.customer_segmentation")
# Avoid
task1 = flyte.remote.Task.get("team_a.task1")2. Document task interfaces
Since remote tasks abstract away implementation details, clear documentation of inputs, outputs, and behavior is essential:
@env.task
async def process_customer_data(
customer_ids: list[str],
date_range: tuple[str, str]
) -> flyte.io.DataFrame:
"""
Process customer data for the specified date range.
Args:
customer_ids: List of customer IDs to process
date_range: Tuple of (start_date, end_date) in YYYY-MM-DD format
Returns:
DataFrame with processed customer features
"""
...3. Prefer module-level loading
Load remote tasks at the module level rather than inside functions for cleaner code:
import flyte.remote
# Good - module level
data_processor = flyte.remote.Task.get("team.processor", auto_version="latest")
@env.task
async def my_task(data: str):
return await data_processor(input=data)This approach:
- Makes dependencies clear and discoverable
- Reduces code duplication
- Works well with lazy loading (no performance penalty)
Dynamic loading within tasks is also supported when you need runtime flexibility.
4. Handle versioning thoughtfully
- Use
auto_version="latest"during development for rapid iteration - Use specific versions in production for stability and reproducibility
- Use
auto_version="current"when coordinating multienvironment deployments
5. Deploy remote tasks first
Always deploy the remote tasks before using them. Tasks that reference them can be run directly without deployment:
# Deploy the Spark environment first
flyte deploy spark_env/
# Deploy the ML environment
flyte deploy ml_env/
# Now you can run the orchestration task directly (no deployment needed)
flyte run orchestration_env.py orchestrate_pipelineIf you want to deploy the orchestration task as well (for scheduled runs or to be referenced by other tasks), deploy it after its dependencies:
flyte deploy orchestration_env/Limitations
-
Lazy error detection: Because of lazy loading, errors about missing or invalid tasks only occur during invocation, not when calling
get(). You’ll receive aflyte.errors.ReferenceTaskErrorif the task doesn’t exist or can’t be invoked. -
Type fidelity: While Flyte translates types seamlessly, you work with Flyte’s representation of Pydantic models, not the exact original types
-
Deployment order: Referenced tasks must be deployed before tasks that reference them can be invoked
-
Context requirement: Using
auto_version="current"requires running within a task context -
Dictionary inputs: Pydantic models must be passed as dictionaries, which loses compile-time type checking
-
No positional arguments: Remote tasks currently only support keyword arguments (this may change in future versions)
Next steps
- Learn about task deployment
- Explore task environments and configuration