PySpark SQL Quick Start
Table of Contents
PySpark tutorial | PySpark SQL Quick Start
In this Pyspark tutorial blog, we will discuss PySpark, SparkContext, and HiveContext.
What is Spark?
Spark is an opensource distributed computing platform that is developed to work with a huge volume of data and real-time data processing.
Spark is fast because of its ability to compute in memory, whereas a popular framework like Hadoop follows disk-based computing. Hadoop process data by reading input from disk whereas spark process data in-memory. Spark is 100 times faster in memory and 10 times faster in disk-based computation.
Spark is suitable for both real-time as well as batch processing, whereas Hadoop primarily used for batch processing. As spark can process real-time data it is a popular choice for data analytics for a big data field. Spark provides multiple interfaces like streaming, processing, machine learning, SQL, and Graph whereas Hadoop requires external frameworks like Sqoop, pig, hive, etc.
Spark SQL is Spark’s module for working with structured data and as a result Spark SQL efficiently handles the computing as it has information about the structured data and the operation it has to be followed.
‘PySpark’ is a tool that allows users to interact with data using the Python programming language.
Spark-SQL provides several ways to interact with data.
- SQL
- DataFrame/ Dataset
SQLContext
‘SQLcontext’ is the class used to use the spark relational capabilities in the case of Spark-SQL. To use the spark SQL, the user needs to initiate the SQLContext class and pass sparkSession (spark) object into it. In this blog, you will find examples of PySpark SQLContext.
spark = SparkSession.builder.appName('spark-sql').master('local').getOrCreate() sqlContext = SQLContext(spark)
Let’s understand SQLContext by loading structured data.
Below is the sample CSV data:
id,name,country,email
100,Rick,Netherlands,rickgr@gmail.com
101,Jason,Aus,json.j@gmail.com
102,Maggie,Usa,mg@hotmail.com
103,Eugine,Denmark,eu@gmail.com
104,Jacob,Usa,jacob@hotmail.com
110,,Aus,john@gmail.com
112,Negan,Ind,Negan@gmail.com
Load the CSV file
from pyspark.sql import SparkSession from pyspark.sql.context import SQLContext spark = SparkSession.builder.appName('spark-sql').master('local').getOrCreate() sqlContext = SQLContext(spark) filepath = 'data.csv' df = sqlContext.read.load(filepath, format='com.databricks.spark.csv', header='true',inferSchema='true')
Users can also use the below to load CSV data.
filepath = 'data.csv' df = sqlContext.read.csv(filepath, header='true', inferSchema='true')
Load a JSON file
Below is the sample data in the JSON file.
[{"id": 100,"name": "Rick","country": "Netherlands"},
{"id": 101,"name": "Jason","country": "Aus"},
{"id": 102,"name": "Maggie","country": "Usa"},
{"id": 104,"name": "Eugine","country": "Denmark”},
{"id": 105,"name": "Jacob", ”country": "Usa"}]
filepath = 'data.json' df = sqlContext.read.json(filepath,multiLine=True)
registerTempTable()
registerTempTable() creates an in-memory table and the scope of the table is the same cluster. Once the table is created, the User can perform SQL like operation on the table.
spark = SparkSession.builder.appName('spark-sql').master('local').getOrCreate() sqlContext = SQLContext(spark) filepath = 'data.csv' df = sqlContext.read.load(filepath,format='com.databricks.spark.csv', header='true', inferSchema ='true') df.registerTempTable("emp")
Here in the above example, we have created a temp table called ’emp’ for the original dataset. This table can be used for further analysis.
Basic Operations
Select:
filepath = 'data.csv' df = sqlContext.read.csv(filepath, header='true', inferSchema='true') df.registerTempTable('emp') selectall= sqlContext.sql("SELECT * from emp") selectall.show()
Output:
+---+------+-----------+-----------------+
| id| name| country| email|
+---+------+-----------+-----------------+
|100| Rick|Netherlands| rickgr@gmail.com|
|101| Jason| Aus| json.j@gmail.com|
|102|Maggie| Usa| mg@hotmail.com|
|103|Eugine| Denmark| eu@gmail.com|
|104| Jacob| Usa|jacob@hotmail.com|
|110| null| Aus| john@gmail.com|
|112| Negan| Ind| Negan@gmail.com|
+---+------+-----------+-----------------+
Verify the schema of the table:
selectall.printSchema()
root
|-- id: integer (nullable = true)
|-- name: string (nullable = true)
|-- country: string (nullable = true)
|-- email: string (nullable = true)
Describe table:
describe = sqlContext.sql("describe emp") describe.show()
Output:
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
| id| int| null|
| name| string| null|
| country| string| null|
| email| string| null|
+--------+---------+-------+
Show tables:
gettables= sqlContext.sql("SHOW TABLES") gettables.show()
Output:
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| | emp| true|
+--------+---------+-----------+
Create a new table:
newtable= sqlContext.sql("Create table newtable (tempcol INT) USING CSV") gettables= sqlContext.sql("SHOW TABLES") gettables.show()
Output:
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| newtable| false|
| | emp| true|
+--------+---------+-----------+
Drop table:
droptable= sqlContext.sql("DROP table newtable") gettables= sqlContext.sql("SHOW TABLES") gettables.show()
Output:
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| | emp| true|
+--------+---------+-----------+
Filter table:
select= sqlContext.sql("SELECT * from emp where id = 100") select.show()
Output:
+---+----+-----------+----------------+
| id|name| country| email|
+---+----+-----------+----------------+
|100|Rick|Netherlands|rickgr@gmail.com|
+---+----+-----------+----------------+
selectall= sqlContext.sql("SELECT * from emp where id > 100") selectall.show()
Output:
+---+------+-------+-----------------+
| id| name|country| email|
+---+------+-------+-----------------+
|101| Jason| Aus| json.j@gmail.com|
|102|Maggie| Usa| mg@hotmail.com|
|103|Eugine|Denmark| eu@gmail.com|
|104| Jacob| Usa|jacob@hotmail.com|
|110| null| Aus| john@gmail.com|
|112| Negan| Ind| Negan@gmail.com|
+---+------+-------+-----------------+
Insert into a table:
sqlContext.sql("insert overwrite table emp values (122,'SAM','IND')") insert= sqlContext.sql("SELECT * from emp") insert.show()
This can be extended to most of the relational functionalities.
HIVEContext
To work with Hive, we have to instantiate SparkSession with Hive support, including connectivity to a persistent Hive Metastore. In the older version of spark versions, you have to use the HiveContext class to interact with the Spark. It uses the Spark SQL execution engine to work with data stored in Hive.
Spark SQL uses a Hive Metastore to manage the metadata of persistent relational entities (e.g. databases, tables, columns, partitions) in a relational database (for fast access).
The first step is to instantiate SparkSession with Hive support and provide a spark-warehouse path in the config like below.
spark = SparkSession.builder.appName("Spark-Hive")\ .config("spark.sql.warehouse.dir", "/users/spark-warehouse")\ .enableHiveSupport()\ .getOrCreate()
spark.sql.warehouse.dir directory for the location of the databases.
Create a Hive table:
sqlContext = SQLContext(spark) sqlContext.sql("CREATE TABLE IF NOT EXISTS emp (id INT, name STRING, country STRING,email STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'data1' INTO TABLE emp")
Save data to Hive:
data = [(100,"Rick","Netherlands"), (101,'Jason','Aus')] df = spark.createDataFrame(data) df.write.saveAsTable('empdata') empdf = spark.sql('SELECT * FROM empdata') empdf.show()
Output:
+---+-----+-----------+
| _1| _2| _3|
+---+-----+-----------+
|100| Rick|Netherlands|
|101|Jason| Aus|
+---+-----+-----------+
Show tables:
spark.sql('show tables').show()
Output:
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| emp| false|
| default| empdata| false|
| default| example| false|
+--------+---------+-----------+
Show Databases:
spark.sql('show databases').show()
Output:
+------------+
|databaseName|
+------------+
| default|
+------------+
Find the schema of a table:
emp_df = spark.sql('SELECT * FROM empdata') emp_df.printSchema()
Output:
root
|-- _1: long (nullable = true)
|-- _2: string (nullable = true)
|-- _3: string (nullable = true)
Like SQLContext, most of the relational functionalities can be used.
In this Pyspark tutorial blog, you learned about the basic command to handle data. In the next chapter, we will describe Dataframe and Dataset.
#PySpark tutorial
good one… Thanks