Apache Spark和Azure Databricks允许使用DataFrame进行数据查询。
DataFrames从Resilient Distributed Datasets(RDD)派生而来,它非常像关系数据库中的表,有列名和数据类型。DataFrame是优化过的表格,比如在读取json文件时可以将数组对象和层状结构作为列的值进行处理。
| SQL | DataFrame (Python) | 
|---|---|
SELECT col_1 FROM myTable | 
      df.select(col("col_1")) | 
    
DESCRIBE myTable | 
      df.printSchema() | 
    
SELECT * FROM myTable WHERE col_1 > 0 | 
      df.filter(col("col_1") > 0) | 
    
..GROUP BY col_2 | 
      ..groupBy(col("col_2")) | 
    
..ORDER BY col_2 | 
      ..orderBy(col("col_2")) | 
    
..WHERE year(col_3) > 1990 | 
      ..filter(year(col("col_3")) > 1990) | 
    
SELECT * FROM myTable LIMIT 10 | 
      df.limit(10) | 
    
display(myTable) (text format) | 
      df.show() | 
    
display(myTable) (html format) | 
      display(df) | 
    
从.parquet文件创建DataFrame
ipGeocodeDF = spark.read.parquet("/mnt/training/ip-geocode.parquet")
从.csv文件创建DataFrame
bikeShareingDayDF = (spark.read.option("inferSchema","true")
                    .option("header","true")
                    .csv("/mnt/training/bikeSharing/data-001/day.csv"))
从.json文件创建DataFrame
databricksBlogDF = spark.read.option("inferSchema","true").option("header","true").json("/mnt/training/databricks-blog.json")
DataFrame变量创建完成后, 使用show()方法显示DataFrame中的数据,使用show(n)方法显示前n行数据。同时,也可以使用display(df)方法以HTML格式显示数据表格。
步骤一:使用spark.read.parquet()从.parquet文件创建DataFrame变量。
peopleDF = spark.read.parquet("/mnt/training/dataframes/people-10m.parquet")
display(peopleDF)
步骤二:按照年份比较给定的两个名字(Donna和Dorthy)在1990年以后出生的女孩子中的使用量。
from pyspark.sql.functions import col
dordonDF = (peopleDF 
  .select(year("birthDate").alias("birthYear"), "firstName") 
  .filter((col("firstName") == 'Donna') | (col("firstName") == 'Dorothy')) 
  .filter("gender == 'F' ") 
  .filter(year("birthDate") > 1990) 
  .orderBy("birthYear") 
  .groupBy("birthYear", "firstName") 
  .count()
)
display(dordonDF)
show()和display()
使用show()或show(n)显示DataFrame数据
myDF.show()
-- 显示前10行数据
myDF.show(10)
使用display()和display(df.limit(n))以HTML表格显示DataFrame数据
display(myDF)
-- 以HTML显示前10行数据
display(myDF.limit(10))
select()和filter()
使用select()方法选取列,可以使用alias()方法修改列名。
from pyspark.sql.functions import col
myDF = peopleDF 
  	.select("firstName","middleName","lastName",col("birthDate").alias("date"),"gender") 
使用filter()方法根据条件过滤数据。
from pyspark.sql.functions import year
myDF = peopleDF
	.filter("gender='F'")
	.filter(year("birthDate") > "1990")
orderBy()和groupBy()
使用orderBy()方法按照某列排序。
-- 升序
myDF = peopleDF.orderBy("birthYear")
-- 使用desc方法降序
from pyspark.sql.functions import desc
myDF = peopleDF.orderBy(desc("birthYear"))
使用groupBy()和count()方法按某列进行数量统计。
myDF = (peopleDF.select(year("birthDate").alias("birthYear")).groupBy("birthYear").count())
使用agg()方法给统计出的列取别名。
from pyspark.sql.functions import count
myDF = (peopleDF.select(year("birthDate").alias("birthYear")).groupBy("birthYear").agg(count("birthYear").alias("total")))
使用distinct()方法获取不重复数据。
myDF = (peopleDF.select("firstName").distinct())
peopleDF是一个已知的DataFrame,该DataFrame的定义如下:
root
 |-- id: integer (nullable = true)
 |-- firstName: string (nullable = true)
 |-- middleName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthDate: timestamp (nullable = true)
 |-- ssn: string (nullable = true)
 |-- salary: integer (nullable = true)
请从peopleDF 中筛选出10个使用最多的女性名字(firstName),并将结果保存到新的变量名为top10FemaleFirstNameDF的DataFrame中。
注意: 该语句需要导入python的count方法和desc方法。
limit(10)方法agg()方法和alias()方法desc()方法示例代码:
from pyspark.sql.functions import count, desc
top10FemaleFirstNamesDF = (peopleDF
                           .select("firstName")
                           .filter("gender='F'")
                           .groupBy("firstName")
                           .agg(count("firstName").alias("total"))
                           .orderBy(desc("total"))
                           .limit(10))
基于Step 1中创建的top10FemaleFirstNamesDFDataFrame创建一个Temporary View, View的变量名为Top10FemaleFirstNames, 并且将View的内容显示出来。
注意: Temporary View是的生命周期是一个Spark Session,它不会被保存下来。
createOrReplaceTempView()方法spark.sql()应用SQL语句到View中示例代码:
top10FemaleNamesDF.createOrReplaceTempView("Top10FemaleFirstNames")
resultDF = spark.sql("select * from Top10FemaleFirstNames")
display(resultDF)
peopleDF中的salary列中,有些值是负数,需要使用abs()方法将salary列中的值修改为大于0的数。
from pyspark.sql.functions import abs
peopleWithFixedSalariesDF = peopleDF.withColumn('salary',abs(peopleDF.salary))
基于peopleWithFixedSalariesDF 创建一个变量名为peopleWithFixedSalaries20KDF的DataFrame,要求:
from pyspark.sql.functions import col
peopleWithFixedSalaries20KDF = peopleWithFixedSalariesDF.filter("salary >= 20000").withColumn("salary10k", round(col("salary")/10000))
display(peopleWithFixedSalaries20KDF)
基于peopleDF,返回一个新的变量名为carenDF的DataFrame,要求:
from pyspark.sql.functions import month
carenDF = peopleDF.filter("gender='F'").filter("firstName = 'Caren'").filter(year("birthDate") < 1980).agg(count("id").alias("total"))
display(carenDF)