使用 Spark 将 RDD 转换为 DataFrame

一则或许对你有用的小广告

欢迎加入小哈的星球 ,你将获得:专属的项目实战 / 1v1 提问 / Java 学习路线 / 学习打卡 / 每月赠书 / 社群讨论

  • 新项目:《从零手撸:仿小红书(微服务架构)》 正在持续爆肝中,基于 Spring Cloud Alibaba + Spring Boot 3.x + JDK 17...点击查看项目介绍 ;
  • 《从零手撸:前后端分离博客项目(全栈开发)》 2 期已完结,演示链接: http://116.62.199.48/ ;

截止目前, 星球 内专栏累计输出 54w+ 字,讲解图 2476+ 张,还在持续爆肝中.. 后续还会上新更多项目,目标是将 Java 领域典型的项目都整一波,如秒杀系统, 在线商城, IM 即时通讯,权限管理,Spring Cloud Alibaba 微服务等等,已有 1900+ 小伙伴加入学习 ,欢迎点击围观

正如我在之前的博客文章中提到的,我一直 在使用 Databricks Spark CSV 库 ,并希望获取一个 CSV 文件,清理它,然后写出一个包含一些列的新 CSV 文件。

我首先处理 CSV 文件并将其写入临时表:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

我想达到可以调用以下将 DataFrame 写入磁盘的函数的程度:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

第一个文件只需要包含主要的犯罪类型,我们可以使用以下查询提取它:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

一些主要类型有我想去掉的尾随空格。据我所知,Spark 的 SQL 变体没有 LTRIM 或 RTRIM 函数,但我们可以映射“行”并改用字符串“trim”函数:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

现在我们有一个行的 RDD,我们需要再次将其转换回 DataFrame。 'sqlContext' 有一个我们可以使用的函数:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

这些是我们可以选择的签名:


如果我们想传入一个 Row 类型的 RDD,我们将不得不定义一个 StructType,或者我们可以将每一行转换成更强类型的东西:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

太好了,我们已经有了我们的 DataFrame,我们现在可以像这样将其插入到“createFile”函数中:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

我们实际上可以 做得更好

由于我们有一个特定类的 RDD,我们可以使用“rddToDataFrameHolder”隐式函数,然后使用“DataFrameHolder”上的“toDF”函数。这是代码的样子:


 import org.apache.spark.sql.{SQLContext, Row, DataFrame}

val sqlContext = new SQLContext(sc) val crimeFile = "Crimes_-_2001_to_present.csv" sqlContext.load("com.databricks.spark.csv", Map("path" -> crimeFile, "header" -> "true")).registerTempTable("crimes")

我们完成了!