Pyspark UDF

Spark allows users to define their own function which is suitable for their requirements. This chapter will demonstrate how to define and use a UDF in PySpark and discuss PySpark UDF examples.

Example – 1:

Let’s use the below sample data to understand UDF in PySpark.

id,name,birthyear
100,Rick,2000
101,Jason,1998
102,Maggie,1999
104,Eugine,2001
105,Jacob,1985
112,Negan,2001

Let’s create a UDF in spark to ‘Calculate the age of each person‘.

calculate_age‘ function, is the UDF defined to find the age of the person.

Step-1: Define a UDF function.

 
def calculate_age(birthyear):
    now = datetime.datetime.now()
    return now.year - birthyear

Step-2: Register the UDF.

The next step is to register the UDF after defining the UDF.

 
spark.udf.register("calculateage", calculate_age)

Step-3: Use the UDF(Approach-1).

 
spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate()
spark.udf.register("calculateage", calculate_age)
df = spark.read.csv('udf.csv', header='true', inferSchema='true')
udf_df = df.withColumn('name',calculate_age(df['birthyear']))
udf_df.show()

Output:

+---+----+---------+
| id|name|birthyear|
+---+----+---------+
|100|  19|     2000|
|101|  21|     1998|
|102|  20|     1999|
|104|  18|     2001|
|105|  34|     1985|
|112|  18|     2001|
+---+----+---------+

Another way of defining UDF(Approach-2).

 
from pyspark.sql import functions as F
spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate()
df = spark.read.csv('udf.csv', header='true', inferSchema='true')
calculateage = F.udf(lambda x : calculate_age(x),IntegerType())
udf_df = df.select('name','birthyear',calculateage('birthyear').alias('age'))
udf_df.show()

Output:

+------+---------+---+
|  name|birthyear|age|
+------+---------+---+
|  Rick|     2000| 19|
| Jason|     1998| 21|
|Maggie|     1999| 20|
|Eugine|     2001| 18|
| Jacob|     1985| 34|
| Negan|     2001| 18|
+------+---------+---+

Use of UDF in SQL:

 
spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate()
sqlContext = SQLContext(spark)
spark.udf.register("calculateage", calculate_age)
df = spark.read.csv('udf.csv', header='true', inferSchema='true')
df.registerTempTable("emp")
sqlContext.sql("select name,calculateage(birthyear) as age from emp").show()

Output:

+------+---+
|  name|age|
+------+---+
|  Rick| 19|
| Jason| 21|
|Maggie| 20|
|Eugine| 18|
| Jacob| 34|
| Negan| 18|
+------+---+

Example – 2:

Let’s take one more example to understand the UDF and we will use the below dataset for the same.

+---------+
|      arr|
+---------+
|[2, 3, 4]|
|   [5, 6]|
+---------+
Create a spark dataframe using pandas.
 
import pandas as pd
df_pandas= pd.DataFrame({'arr':[[2,3,4], [5,6]]})
spark = SparkSession.builder.appName('spark-sql').master("local").getOrCreate()
df = spark.createDataFrame(df_pandas)

Step-1: Define a UDF function to calculate the square of the above data.

import numpy as np
def square(x):
  return np.square(x).tolist()

Step-2: Use UDF as a function.

 
from pyspark.sql import functions as F
sq = F.udf(lambda x: square(x), ArrayType(IntegerType()))
df.select('arr',sq('arr').alias('arr_sq')).show()

Output:

+---------+----------+
|      arr|    arr_sq|
+---------+----------+
|[2, 3, 4]|[4, 9, 16]|
|   [5, 6]|  [25, 36]|
+---------+----------+

However, Spark UDFs are not efficient because spark treats UDF as a black box and does not even try to optimize them. In this module, you learned how to create a PySpark UDF and PySpark UDF examples. Follow this link to learn more about PySpark.

Leave a Reply