Pyspark UDF
Spark allows users to define their own function which is suitable for their requirements. This chapter will demonstrate how to define and use a UDF in PySpark and discuss PySpark UDF examples.
Example – 1:
Let’s use the below sample data to understand UDF in PySpark.
id,name,birthyear
100,Rick,2000
101,Jason,1998
102,Maggie,1999
104,Eugine,2001
105,Jacob,1985
112,Negan,2001
Let’s create a UDF in spark to ‘Calculate the age of each person‘.
‘calculate_age‘ function, is the UDF defined to find the age of the person.
Step-1: Define a UDF function.
def calculate_age(birthyear): now = datetime.datetime.now() return now.year - birthyear
Step-2: Register the UDF.
The next step is to register the UDF after defining the UDF.
spark.udf.register("calculateage", calculate_age)
Step-3: Use the UDF(Approach-1).
spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate() spark.udf.register("calculateage", calculate_age) df = spark.read.csv('udf.csv', header='true', inferSchema='true') udf_df = df.withColumn('name',calculate_age(df['birthyear'])) udf_df.show()
Output:
+---+----+---------+
| id|name|birthyear|
+---+----+---------+
|100| 19| 2000|
|101| 21| 1998|
|102| 20| 1999|
|104| 18| 2001|
|105| 34| 1985|
|112| 18| 2001|
+---+----+---------+
Another way of defining UDF(Approach-2).
from pyspark.sql import functions as F spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate() df = spark.read.csv('udf.csv', header='true', inferSchema='true') calculateage = F.udf(lambda x : calculate_age(x),IntegerType()) udf_df = df.select('name','birthyear',calculateage('birthyear').alias('age')) udf_df.show()
Output:
+------+---------+---+
| name|birthyear|age|
+------+---------+---+
| Rick| 2000| 19|
| Jason| 1998| 21|
|Maggie| 1999| 20|
|Eugine| 2001| 18|
| Jacob| 1985| 34|
| Negan| 2001| 18|
+------+---------+---+
Use of UDF in SQL:
spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate() sqlContext = SQLContext(spark) spark.udf.register("calculateage", calculate_age) df = spark.read.csv('udf.csv', header='true', inferSchema='true') df.registerTempTable("emp") sqlContext.sql("select name,calculateage(birthyear) as age from emp").show()
Output:
+------+---+
| name|age|
+------+---+
| Rick| 19|
| Jason| 21|
|Maggie| 20|
|Eugine| 18|
| Jacob| 34|
| Negan| 18|
+------+---+
Example – 2:
Let’s take one more example to understand the UDF and we will use the below dataset for the same.
+---------+
| arr|
+---------+
|[2, 3, 4]|
| [5, 6]|
+---------+
Create a spark dataframe using pandas.
import pandas as pd df_pandas= pd.DataFrame({'arr':[[2,3,4], [5,6]]}) spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate() df = spark.createDataFrame(df_pandas)
Step-1: Define a UDF function to calculate the square of the above data.
import numpy as np def square(x): return np.square(x).tolist()
Step-2: Use UDF as a function.
from pyspark.sql import functions as F sq = F.udf(lambda x: square(x), ArrayType(IntegerType())) df.select('arr',sq('arr').alias('arr_sq')).show()
Output:
+---------+----------+
| arr| arr_sq|
+---------+----------+
|[2, 3, 4]|[4, 9, 16]|
| [5, 6]| [25, 36]|
+---------+----------+
However, Spark UDFs are not efficient because spark treats UDF as a black box and does not even try to optimize them. In this module, you learned how to create a PySpark UDF and PySpark UDF examples. Follow this link to learn more about PySpark.