- Documentation
- Databricks reference documentation
- Language-specific introductions to Databricks
- What are user-defined functions (UDFs)?
- pandas user-defined functions
A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that usesApache Arrow to transfer data and pandas to work with the data. pandas UDFs allowvectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.
For background information, see the blog postNew Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.
You define a pandas UDF using the keyword pandas_udf
as a decorator and wrap the function with a Python type hint.This article describes the different types of pandas UDFs and shows how to use pandas UDFs with type hints.
Series to Series UDF
You use a Series to Series pandas UDF to vectorize scalar operations.You can use them with APIs such as select
and withColumn
.
The Python function should take a pandas Series as an input and return apandas Series of the same length, and you should specify these in the Pythontype hints. Spark runs a pandas UDF by splitting columns into batches, calling the functionfor each batch as a subset of the data, then concatenating the results.
The following example shows how to create a pandas UDF that computes the product of 2 columns.
import pandas as pdfrom pyspark.sql.functions import col, pandas_udffrom pyspark.sql.types import LongType# Declare the function and create the UDFdef multiply_func(a: pd.Series, b: pd.Series) -> pd.Series: return a * bmultiply = pandas_udf(multiply_func, returnType=LongType())# The function for a pandas_udf should be able to execute with local pandas datax = pd.Series([1, 2, 3])print(multiply_func(x, x))# 0 1# 1 4# 2 9# dtype: int64# Create a Spark DataFrame, 'spark' is an existing SparkSessiondf = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))# Execute function as a Spark vectorized UDFdf.select(multiply(col("x"), col("x"))).show()# +-------------------+# |multiply_func(x, x)|# +-------------------+# | 1|# | 4|# | 9|# +-------------------+
Iterator of Series to Iterator of Series UDF
An iterator UDF is the same as a scalar pandas UDF except:
The Python function
Takes an iterator of batches instead of a single input batch as input.
Returns an iterator of output batches instead of a single output batch.
The length of the entire output in the iterator should be the same as the length of the entire input.
The wrapped pandas UDF takes a single Spark column as an input.
You should specify the Python type hint asIterator[pandas.Series]
-> Iterator[pandas.Series]
.
This pandas UDF is useful when the UDF execution requires initializing some state, for example,loading a machine learning model file to apply inference to every input batch.
The following example shows how to create a pandas UDF with iterator support.
import pandas as pdfrom typing import Iteratorfrom pyspark.sql.functions import col, pandas_udf, structpdf = pd.DataFrame([1, 2, 3], columns=["x"])df = spark.createDataFrame(pdf)# When the UDF is called with the column,# the input to the underlying function is an iterator of pd.Series.@pandas_udf("long")def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]: for x in batch_iter: yield x + 1df.select(plus_one(col("x"))).show()# +-----------+# |plus_one(x)|# +-----------+# | 2|# | 3|# | 4|# +-----------+# In the UDF, you can initialize some state before processing batches.# Wrap your code with try/finally or use context managers to ensure# the release of resources at the end.y_bc = spark.sparkContext.broadcast(1)@pandas_udf("long")def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]: y = y_bc.value # initialize states try: for x in batch_iter: yield x + y finally: pass # release resources here, if anydf.select(plus_y(col("x"))).show()# +---------+# |plus_y(x)|# +---------+# | 2|# | 3|# | 4|# +---------+
Iterator of multiple Series to Iterator of Series UDF
An Iterator of multiple Series to Iterator of Series UDF has similar characteristics andrestrictions as Iterator of Series to Iterator of Series UDF. The specified function takes an iterator of batches andoutputs an iterator of batches. It is also useful when the UDF execution requires initializing somestate.
The differences are:
The underlying Python function takes an iterator of a tuple of pandas Series.
The wrapped pandas UDF takes multiple Spark columns as an input.
You specify the type hints as Iterator[Tuple[pandas.Series, ...]]
-> Iterator[pandas.Series]
.
from typing import Iterator, Tupleimport pandas as pdfrom pyspark.sql.functions import col, pandas_udf, structpdf = pd.DataFrame([1, 2, 3], columns=["x"])df = spark.createDataFrame(pdf)@pandas_udf("long")def multiply_two_cols( iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]: for a, b in iterator: yield a * bdf.select(multiply_two_cols("x", "x")).show()# +-----------------------+# |multiply_two_cols(x, x)|# +-----------------------+# | 1|# | 4|# | 9|# +-----------------------+
Series to scalar UDF
Series to scalar pandas UDFs are similar to Spark aggregate functions.A Series to scalar pandas UDF defines an aggregation from one or morepandas Series to a scalar value, where each pandas Series represents a Spark column.You use a Series to scalar pandas UDF with APIs such as select
, withColumn
, groupBy.agg
, andpyspark.sql.Window.
You express the type hint as pandas.Series, ...
-> Any
. The return type should be aprimitive data type, and the returned scalar can be either a Python primitive type, for example,int
or float
or a NumPy data type such as numpy.int64
or numpy.float64
. Any
should ideallybe a specific scalar type.
This type of UDF does not support partial aggregation and all data for each group is loaded into memory.
The following example shows how to use this type of UDF to compute mean with select
, groupBy
, and window
operations:
import pandas as pdfrom pyspark.sql.functions import pandas_udffrom pyspark.sql import Windowdf = spark.createDataFrame( [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))# Declare the function and create the UDF@pandas_udf("double")def mean_udf(v: pd.Series) -> float: return v.mean()df.select(mean_udf(df['v'])).show()# +-----------+# |mean_udf(v)|# +-----------+# | 4.2|# +-----------+df.groupby("id").agg(mean_udf(df['v'])).show()# +---+-----------+# | id|mean_udf(v)|# +---+-----------+# | 1| 1.5|# | 2| 6.0|# +---+-----------+w = Window \ .partitionBy('id') \ .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()# +---+----+------+# | id| v|mean_v|# +---+----+------+# | 1| 1.0| 1.5|# | 1| 2.0| 1.5|# | 2| 3.0| 6.0|# | 2| 5.0| 6.0|# | 2|10.0| 6.0|# +---+----+------+
For detailed usage, see pyspark.sql.functions.pandas_udf.
Usage
Setting Arrow batch size
Data partitions in Spark are converted into Arrow record batches, whichcan temporarily lead to high memory usage in the JVM. To avoid possibleout of memory exceptions, you can adjust the size of the Arrow record batchesby setting the spark.sql.execution.arrow.maxRecordsPerBatch
configuration to an integer thatdetermines the maximum number of rows for each batch. The default valueis 10,000 records per batch. If the number of columns is large, thevalue should be adjusted accordingly. Using this limit, each datapartition is divided into 1 or more record batches for processing.
Timestamp with time zone semantics
Spark internally stores timestamps as UTC values, and timestamp databrought in without a specified time zone is converted as localtime to UTC with microsecond resolution.
When timestamp data is exported or displayed in Spark,the session time zone is used to localize thetimestamp values. The session time zone is set with thespark.sql.session.timeZone
configuration and defaults to the JVM system localtime zone. pandas uses a datetime64
type with nanosecondresolution, datetime64[ns]
, with optional time zone on a per-columnbasis.
When timestamp data is transferred from Spark to pandas it isconverted to nanoseconds and each column is converted to the Sparksession time zone then localized to that time zone, which removes thetime zone and displays values as local time. This occurs whencalling toPandas()
or pandas_udf
with timestamp columns.
When timestamp data is transferred from pandas to Spark, it isconverted to UTC microseconds. This occurs when callingcreateDataFrame
with a pandas DataFrame or when returning atimestamp from a pandas UDF. These conversions are doneautomatically to ensure Spark has data in the expected format, soit is not necessary to do any of these conversions yourself. Anynanosecond values are truncated.
A standard UDF loads timestamp data as Pythondatetime objects, which is different than a pandas timestamp. To get the best performance, werecommend that you use pandas time series functionality when working withtimestamps in a pandas UDF. For details, see Time Series / Date functionality.
Example notebook
The following notebook illustrates the performance improvements you can achieve with pandas UDFs:
pandas UDFs benchmark notebook
Open notebook in new tab