DataFrames
By default, return values in Python are materialized - meaning the actual data is downloaded and loaded into memory. This applies to simple types like integers, as well as more complex types like DataFrames.
To avoid downloading large datasets into memory, Flyte V2 exposes
flyte.io.dataframe: a thin, uniform wrapper type for DataFrame-style objects that allows you to pass a reference to the data, rather than the fully materialized contents.
The flyte.io.DataFrame type provides serialization support for common engines like pandas, polars, pyarrow, dask, etc.; enabling you to move data between different DataFrame backends.
Setting up the environment and sample data
For our example we will start by setting up our task environment with the required dependencies and create some sample data.
from typing import Annotated
import numpy as np
import pandas as pd
import flyte
import flyte.io
env = flyte.TaskEnvironment(
"dataframe_usage",
image= flyte.Image.from_debian_base().with_pip_packages("pandas", "pyarrow", "numpy"),
resources=flyte.Resources(cpu="1", memory="2Gi"),
)
BASIC_EMPLOYEE_DATA = {
"employee_id": range(1001, 1009),
"name": ["Alice", "Bob", "Charlie", "Diana", "Ethan", "Fiona", "George", "Hannah"],
"department": ["HR", "Engineering", "Engineering", "Marketing", "Finance", "Finance", "HR", "Engineering"],
"hire_date": pd.to_datetime(
["2018-01-15", "2019-03-22", "2020-07-10", "2017-11-01", "2021-06-05", "2018-09-13", "2022-01-07", "2020-12-30"]
),
}
ADDL_EMPLOYEE_DATA = {
"employee_id": range(1001, 1009),
"salary": [55000, 75000, 72000, 50000, 68000, 70000, np.nan, 80000],
"bonus_pct": [0.05, 0.10, 0.07, 0.04, np.nan, 0.08, 0.03, 0.09],
"full_time": [True, True, True, False, True, True, False, True],
"projects": [
["Recruiting", "Onboarding"],
["Platform", "API"],
["API", "Data Pipeline"],
["SEO", "Ads"],
["Budget", "Forecasting"],
["Auditing"],
[],
["Platform", "Security", "Data Pipeline"],
],
}Create a raw dataframe
Now, let’s create a task that returns a native Pandas DataFrame:
@env.task
async def create_raw_dataframe() -> pd.DataFrame:
return pd.DataFrame(BASIC_EMPLOYEE_DATA)
@env.task
async def create_flyte_dataframe() -> Annotated[flyte.io.DataFrame, "parquet"]:
pd_df = pd.DataFrame(ADDL_EMPLOYEE_DATA)
fdf = flyte.io.DataFrame.from_df(pd_df)
return fdf
@env.task
async def join_data(raw_dataframe: pd.DataFrame, flyte_dataframe: pd.DataFrame) -> flyte.io.DataFrame:
joined_df = raw_dataframe.merge(flyte_dataframe, on="employee_id", how="inner")
return flyte.io.DataFrame.from_df(joined_df)
@env.task
async def download_data(joined_df: flyte.io.DataFrame):
downloaded = await joined_df.open(pd.DataFrame).all()
print("Downloaded Data:\n", downloaded)
@env.task
async def main():
raw_df = await create_raw_dataframe ()
flyte_df = await create_flyte_dataframe ()
joined_df = await join_data (raw_df, flyte_df)
await download_data (joined_df)
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()This is the most basic use-case of how to pass DataFrames (of all kinds, not just Pandas). We simply create the DataFrame as normal, and return it.
Because the task has been declared to return a supported native DataFrame type (in this case pandas.DataFrame Flyte will automatically detect it, serialize it correctly and upload it at task completion enabling it to be passed transparently to the next task.
Flyte supports auto-serialization for the following DataFrame types:
pandas.DataFramepyarrow.Tabledask.dataframe.DataFramepolars.DataFrameflyte.io.DataFrame(see below)
Create a flyte.io.DataFrame
Alternatively you can also create a flyte.io.DataFrame object directly from a native object with the from_df method:
@env.task
async def create_flyte_dataframe() -> Annotated[flyte.io.DataFrame, "parquet"]:
pd_df = pd.DataFrame(ADDL_EMPLOYEE_DATA)
fdf = flyte.io.DataFrame.from_df(pd_df)
return fdfThe flyte.io.DataFrame class creates a thin wrapper around objects of any standard DataFrame type. It serves as a generic “any dataframe type” (a concept that Python itself does not cxurrently offer).
As with native DataFrame types, Flyte will automatically serialize and upload the data at task completion.
The advantage of the unified flyte.io.DataFrame wrapper is that you can be explicit about the storage format that makes sense for your use case, by using an Annotated type where the second argument encodes format or other lightweight hints. For example, here we specify that the DataFrame should be stored as Parquet:
Automatically convert between types
You can leverage Flyte to automatically download and convert the dataframe between types when needed:
@env.task
async def join_data(raw_dataframe: pd.DataFrame, flyte_dataframe: pd.DataFrame) -> flyte.io.DataFrame:
joined_df = raw_dataframe.merge(flyte_dataframe, on="employee_id", how="inner")
return flyte.io.DataFrame.from_df(joined_df)This task takes two dataframes as input. We’ll pass one raw Pandas dataframe, and one flyte.io.DataFrame.
Flyte automatically converts the flyte.io.DataFrame to a Pandas DataFrame (since we declared that as the input type) before passing it to the task.
The actual download and conversion happens only when we access the data, in this case, when we do the merge.
Downloading DataFrames
When a task receives a flyte.io.DataFrame, you can request a concrete backend representation. For example, to download as a pandas DataFrame:
@env.task
async def download_data(joined_df: flyte.io.DataFrame):
downloaded = await joined_df.open(pd.DataFrame).all()
print("Downloaded Data:\n", downloaded)The open() call delegates to the DataFrame handler for the stored format and converts to the requested in-memory type.
Run the example
Finally, we can define a main function to run the tasks defined above and a __main__ block to execute the workflow:
@env.task
async def main():
raw_df = await create_raw_dataframe ()
flyte_df = await create_flyte_dataframe ()
joined_df = await join_data (raw_df, flyte_df)
await download_data (joined_df)
if __name__ == "__main__":
flyte.init_from_config()
r = flyte.run(main)
print(r.name)
print(r.url)
r.wait()