Apache Spark和Azure Databricks允许使用DataFrame进行数据查询。
DataFrames从Resilient Distributed Datasets(RDD)派生而来,它非常像关系数据库中的表,有列名和数据类型。DataFrame是优化过的表格,比如在读取json文件时可以将数组对象和层状结构作为列的值进行处理。
| SQL | DataFrame (Python) |
|---|---|
SELECT col_1 FROM myTable |
df.select(col("col_1")) |
DESCRIBE myTable |
df.printSchema() |
SELECT * FROM myTable WHERE col_1 > 0 |
df.filter(col("col_1") > 0) |
..GROUP BY col_2 |
..groupBy(col("col_2")) |
..ORDER BY col_2 |
..orderBy(col("col_2")) |
..WHERE year(col_3) > 1990 |
..filter(year(col("col_3")) > 1990) |
SELECT * FROM myTable LIMIT 10 |
df.limit(10) |
display(myTable) (text format) |
df.show() |
display(myTable) (html format) |
display(df) |
从.parquet文件创建DataFrame
ipGeocodeDF = spark.read.parquet("/mnt/training/ip-geocode.parquet")
从.csv文件创建DataFrame
bikeShareingDayDF = (spark.read.option("inferSchema","true")
.option("header","true")
.csv("/mnt/training/bikeSharing/data-001/day.csv"))
从.json文件创建DataFrame
databricksBlogDF = spark.read.option("inferSchema","true").option("header","true").json("/mnt/training/databricks-blog.json")
DataFrame变量创建完成后, 使用show()方法显示DataFrame中的数据,使用show(n)方法显示前n行数据。同时,也可以使用display(df)方法以HTML格式显示数据表格。
步骤一:使用spark.read.parquet()从.parquet文件创建DataFrame变量。
peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")
display(peopleDF)
步骤二:按照年份比较给定的两个名字(Donna和Dorthy)在1990年以后出生的女孩子中的使用量。
from pyspark.sql.functions import col
dordonDF = (peopleDF
.select(year("birthDate").alias("birthYear"), "firstName")
.filter((col("firstName") == 'Donna') | (col("firstName") == 'Dorothy'))
.filter("gender == 'F' ")
.filter(year("birthDate") > 1990)
.orderBy("birthYear")
.groupBy("birthYear", "firstName")
.count()
)
display(dordonDF)
show()和display()
使用show()或show(n)显示DataFrame数据
myDF.show()
-- 显示前10行数据
myDF.show(10)
使用display()和display(df.limit(n))以HTML表格显示DataFrame数据
display(myDF)
-- 以HTML显示前10行数据
display(myDF.limit(10))
select()和filter()
使用select()方法选取列,可以使用alias()方法修改列名。
from pyspark.sql.functions import col
myDF = peopleDF
.select("firstName","middleName","lastName",col("birthDate").alias("date"),"gender")
使用filter()方法根据条件过滤数据。
from pyspark.sql.functions import year
myDF = peopleDF
.filter("gender='F'")
.filter(year("birthDate") > "1990")
orderBy()和groupBy()
使用orderBy()方法按照某列排序。
-- 升序
myDF = peopleDF.orderBy("birthYear")
-- 使用desc方法降序
from pyspark.sql.functions import desc
myDF = peopleDF.orderBy(desc("birthYear"))
使用groupBy()和count()方法按某列进行数量统计。
myDF = (peopleDF.select(year("birthDate").alias("birthYear")).groupBy("birthYear").count())
使用agg()方法给统计出的列取别名。
from pyspark.sql.functions import count
myDF = (peopleDF.select(year("birthDate").alias("birthYear")).groupBy("birthYear").agg(count("birthYear").alias("total")))
使用distinct()方法获取不重复数据。
myDF = (peopleDF.select("firstName").distinct())
peopleDF是一个已知的DataFrame,该DataFrame的定义如下:
root
|-- id: integer (nullable = true)
|-- firstName: string (nullable = true)
|-- middleName: string (nullable = true)
|-- lastName: string (nullable = true)
|-- gender: string (nullable = true)
|-- birthDate: timestamp (nullable = true)
|-- ssn: string (nullable = true)
|-- salary: integer (nullable = true)
请从peopleDF 中筛选出10个使用最多的女性名字(firstName),并将结果保存到新的变量名为top10FemaleFirstNameDF的DataFrame中。
注意: 该语句需要导入python的count方法和desc方法。
limit(10)方法agg()方法和alias()方法desc()方法示例代码:
from pyspark.sql.functions import count, desc
top10FemaleFirstNamesDF = (peopleDF
.select("firstName")
.filter("gender='F'")
.groupBy("firstName")
.agg(count("firstName").alias("total"))
.orderBy(desc("total"))
.limit(10))
基于Step 1中创建的top10FemaleFirstNamesDFDataFrame创建一个Temporary View, View的变量名为Top10FemaleFirstNames, 并且将View的内容显示出来。
注意: Temporary View是的生命周期是一个Spark Session,它不会被保存下来。
createOrReplaceTempView()方法spark.sql()应用SQL语句到View中示例代码:
top10FemaleNamesDF.createOrReplaceTempView("Top10FemaleFirstNames")
resultDF = spark.sql("select * from Top10FemaleFirstNames")
display(resultDF)
peopleDF中的salary列中,有些值是负数,需要使用abs()方法将salary列中的值修改为大于0的数。
from pyspark.sql.functions import abs
peopleWithFixedSalariesDF = peopleDF.withColumn('salary',abs(peopleDF.salary))
基于peopleWithFixedSalariesDF 创建一个变量名为peopleWithFixedSalaries20KDF的DataFrame,要求:
from pyspark.sql.functions import col
peopleWithFixedSalaries20KDF = peopleWithFixedSalariesDF.filter("salary >= 20000").withColumn("salary10k", round(col("salary")/10000))
display(peopleWithFixedSalaries20KDF)
基于peopleDF,返回一个新的变量名为carenDF的DataFrame,要求:
from pyspark.sql.functions import month
carenDF = peopleDF.filter("gender='F'").filter("firstName = 'Caren'").filter(year("birthDate") < 1980).agg(count("id").alias("total"))
display(carenDF)