get-started-azure-databricks

DataFrame入门

使用DataFrame

Apache Spark和Azure Databricks允许使用DataFrame进行数据查询。

DataFrames从Resilient Distributed Datasets(RDD)派生而来,它非常像关系数据库中的表,有列名和数据类型。DataFrame是优化过的表格,比如在读取json文件时可以将数组对象和层状结构作为列的值进行处理。

DataFrame语法与SQL语法的对应表

SQL DataFrame (Python)
SELECT col_1 FROM myTable df.select(col("col_1"))
DESCRIBE myTable df.printSchema()
SELECT * FROM myTable WHERE col_1 > 0 df.filter(col("col_1") > 0)
..GROUP BY col_2 ..groupBy(col("col_2"))
..ORDER BY col_2 ..orderBy(col("col_2"))
..WHERE year(col_3) > 1990 ..filter(year(col("col_3")) > 1990)
SELECT * FROM myTable LIMIT 10 df.limit(10)
display(myTable) (text format) df.show()
display(myTable) (html format) display(df)

访问数据

.parquet文件创建DataFrame

ipGeocodeDF = spark.read.parquet("/mnt/training/ip-geocode.parquet")

.csv文件创建DataFrame

bikeShareingDayDF = (spark.read.option("inferSchema","true")
                    .option("header","true")
                    .csv("/mnt/training/bikeSharing/data-001/day.csv"))

.json文件创建DataFrame

databricksBlogDF = spark.read.option("inferSchema","true").option("header","true").json("/mnt/training/databricks-blog.json")

DataFrame变量创建完成后, 使用show()方法显示DataFrame中的数据,使用show(n)方法显示前n行数据。同时,也可以使用display(df)方法以HTML格式显示数据表格。

应用举例

步骤一:使用spark.read.parquet().parquet文件创建DataFrame变量。

peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")
display(peopleDF)

步骤二:按照年份比较给定的两个名字(Donna和Dorthy)在1990年以后出生的女孩子中的使用量。

from pyspark.sql.functions import col
dordonDF = (peopleDF 
  .select(year("birthDate").alias("birthYear"), "firstName") 
  .filter((col("firstName") == 'Donna') | (col("firstName") == 'Dorothy')) 
  .filter("gender == 'F' ") 
  .filter(year("birthDate") > 1990) 
  .orderBy("birthYear") 
  .groupBy("birthYear", "firstName") 
  .count()
)
display(dordonDF)

DataFrame的常用方法

show()display()

使用show()show(n)显示DataFrame数据

myDF.show()

-- 显示前10行数据
myDF.show(10)

使用display()display(df.limit(n))以HTML表格显示DataFrame数据

display(myDF)

-- 以HTML显示前10行数据
display(myDF.limit(10))

select()filter()

使用select()方法选取列,可以使用alias()方法修改列名。

from pyspark.sql.functions import col

myDF = peopleDF 
  	.select("firstName","middleName","lastName",col("birthDate").alias("date"),"gender") 

使用filter()方法根据条件过滤数据。

from pyspark.sql.functions import year

myDF = peopleDF
	.filter("gender='F'")
	.filter(year("birthDate") > "1990")

orderBy()groupBy()

使用orderBy()方法按照某列排序。

-- 升序
myDF = peopleDF.orderBy("birthYear")

-- 使用desc方法降序
from pyspark.sql.functions import desc
myDF = peopleDF.orderBy(desc("birthYear"))

使用groupBy()count()方法按某列进行数量统计。

myDF = (peopleDF.select(year("birthDate").alias("birthYear")).groupBy("birthYear").count())

使用agg()方法给统计出的列取别名。

from pyspark.sql.functions import count
myDF = (peopleDF.select(year("birthDate").alias("birthYear")).groupBy("birthYear").agg(count("birthYear").alias("total")))

使用distinct()方法获取不重复数据。

myDF = (peopleDF.select("firstName").distinct())

Exercise 1

Step 1

peopleDF是一个已知的DataFrame,该DataFrame的定义如下:

root
 |-- id: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- middleName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthDate: timestamp (nullable = true)
 |-- ssn: string (nullable = true)
 |-- salary: integer (nullable = true)

请从peopleDF 中筛选出10个使用最多的女性名字(firstName),并将结果保存到新的变量名为top10FemaleFirstNameDF的DataFrame中。

注意: 该语句需要导入python的count方法和desc方法。

示例代码:

from pyspark.sql.functions import count, desc

top10FemaleFirstNamesDF = (peopleDF
                           .select("firstName")
                           .filter("gender='F'")
                           .groupBy("firstName")
                           .agg(count("firstName").alias("total"))
                           .orderBy(desc("total"))
                           .limit(10))

Step 2

基于Step 1中创建的top10FemaleFirstNamesDFDataFrame创建一个Temporary View, View的变量名为Top10FemaleFirstNames, 并且将View的内容显示出来。

注意: Temporary View是的生命周期是一个Spark Session,它不会被保存下来。

示例代码:

top10FemaleNamesDF.createOrReplaceTempView("Top10FemaleFirstNames")
resultDF = spark.sql("select * from Top10FemaleFirstNames")
display(resultDF)

Exercise 2

peopleDF中的salary列中,有些值是负数,需要使用abs()方法将salary列中的值修改为大于0的数。

from pyspark.sql.functions import abs
peopleWithFixedSalariesDF = peopleDF.withColumn('salary',abs(peopleDF.salary))

Step 1

基于peopleWithFixedSalariesDF 创建一个变量名为peopleWithFixedSalaries20KDF的DataFrame,要求:

from pyspark.sql.functions import col
peopleWithFixedSalaries20KDF = peopleWithFixedSalariesDF.filter("salary >= 20000").withColumn("salary10k", round(col("salary")/10000))

display(peopleWithFixedSalaries20KDF)

Step 2

基于peopleDF,返回一个新的变量名为carenDF的DataFrame,要求:

from pyspark.sql.functions import month
carenDF = peopleDF.filter("gender='F'").filter("firstName = 'Caren'").filter(year("birthDate") < 1980).agg(count("id").alias("total"))
display(carenDF)