|
Spark SQL支持两种方式将现有RDD转换为DataFrame。- 第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame。这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型。
- 第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD。虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet
方法如下
1.将RDD转换成Rows
2.按照第一步Rows的结构定义StructType
3.基于rows和StructType使用createDataFrame创建相应的DF |
测试数据为order.data
1 小王 电视 12 2015-08-01 09:08:31
1 小王 冰箱 24 2015-08-01 09:08:14
2 小李 空调 12 2015-09-02 09:01:31 |
代码如下:
object RDD2DF {
/**
* 主要有两种方式
* 第一种是在已经知道schema已经知道的情况下,我们使用反射把RDD转换成DS,进而转换成DF
* 第二种是你不能提前定义好case class,例如数据的结构是以String类型存在的。我们使用接口自定义一个schema
* @param args
*/
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder()
.appName("DFDemo")
.master("local[2]")
.getOrCreate()
// rdd2DFFunc1(spark)
rdd2DFFunc2(spark)
spark.stop()
}
/**
* 提前定义好case class
* @param spark
*/
def rdd2DFFunc1(spark:SparkSession): Unit ={
import spark.implicits._
val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")
val orderDF=orderRDD.map(_.split("\t"))
.map(attributes=>Order(attributes(0),attributes(1),attributes(2),attributes(3),attributes(4)))
.toDF()
orderDF.show()
Thread.sleep(1000000)
}
/**
*总结:第二种方式就是通过最基础的DF接口方法,将
* @param spark
*/
def rdd2DFFunc2(spark:SparkSession): Unit ={
//TODO: 1.将RDD转换成Rows 2.按照第一步Rows的结构定义StructType 3.基于rows和StructType使用createDataFrame创建相应的DF
val orderRDD=spark.sparkContext.textFile("F:\\JAVA\\WorkSpace\\spark\\src\\main\\resources\\order.data")
//TODO: 1.将RDD转换成Rows
val rowsRDD=orderRDD
// .filter((str:String)=>{val arr=str.split("\t");val res=arr(1)!="小李";res})
.map(_.split("\t"))
.map(attributes=>Row(attributes(0).trim,attributes(1),attributes(2),attributes(3).trim,attributes(4)))
//TODO: 2.按照第一步Rows的结构定义StructType
val schemaString="id|name|commodity|age|date"
val fields=schemaString.split("\\|")
.map(filedName=>StructField(filedName,StringType,nullable = true))
val schema=StructType(fields)
//TODO: 3.基于rows和StructType使用createDataFrame创建相应的DF
val orderDF= spark.createDataFrame(rowsRDD,schema)
orderDF.show()
orderDF.groupBy("name").count().show()
orderDF.select("name","commodity").show()
Thread.sleep(10000000)
}
}
case class Order(id:String,name:String,commodity:String,age:String,date:String) |
生产中创建DataFrame代码举例 在实际生产环境中,我们其实选择的是方式二这种进行创建DataFrame的,因为我们生产中很难提前定义case class ,因为业务处理之后字段常常会发生意想不到的变化,所以一定要掌握这种方法。
测试数据 baidu CN A E [01/May/2018:02:15:52 +0800] 2 61.237.59.0 - 112.29.213.35:80 0 movieshow2000.edu.chinaren.com GET http://movieshow2000.edu.chinaren.com/user_upload/15316339776271455.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 16374 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568
baidu CN A E [01/May/2018:02:25:33 +0800] 2 61.232.37.228 - 112.29.213.35:80 0 github.com GET http://github.com/user_upload/15316339776271/44y.mp4 HTTP/1.1 - bytes 13869056-13885439/25136186 TCP_HIT/206 112.29.213.35 video/mp4 83552 16384 -:0 0 0 - - - 11451601 - "JSP3/2.0.14" "-" "-" "-" http - 2 v1.go2yd.com 0.002 25136186 16384 - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 1531818470104-11451601-112.29.213.66#2705261172 644514568 |
Schema方法类 import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
object LogConverUtil {
private val struct=StructType(
Array(
StructField("domain",StringType)
,StructField("url",StringType)
,StructField("pv",LongType)
,StructField("traffic",LongType)
,StructField("date",StringType)
)
)
def getStruct():StructType={
struct
}
def parseLog(logLine:String): Row ={
val sourceFormat=new SimpleDateFormat("[dd/MMM/yyyy:hh:mm:ss +0800]",Locale.ENGLISH)
val targetFormat=new SimpleDateFormat("yyyyMMddhh")
try{
val fields=logLine.split("\t")
val domain=fields(10)
val url=fields(12)
val pv=1L
val traffic=fields(19).trim.toLong
val date=getFormatedDate(fields(4),sourceFormat,targetFormat)
Row(domain,url,pv,traffic,date)
}catch {
case e:Exception=>Row(0)
}
}
/**
*
* @param sourceDate Log中的未格式化日期 [01/May/2018:01:09:45 +0800]
* @return 按照需求格式化字段 2018050101
*/
def getFormatedDate(sourceDate: String, sourceFormat: SimpleDateFormat, targetFormat: SimpleDateFormat) = {
val targetTime=targetFormat.format(sourceFormat.parse(sourceDate))
targetTime
}
} |
RDD2DataFrame主类 import org.apache.spark.sql.SparkSession
object SparkCleanJob {
def main(args: Array[String]): Unit = {
val spark=SparkSession.builder()
.master("local[2]")
.appName("SparkCleanJob")
.getOrCreate()
val logRDD=spark.sparkContext.textFile("file:///D:/baidu.log")
// logRDD.take(2).foreach(println(_))
//调用LogConverUtil里的parseLog方法和getStruct方法获得Rows对象和StructType对象
val logDF=spark.createDataFrame(logRDD.map(LogConverUtil.parseLog(_)),LogConverUtil.getStruct())
logDF.show(false)
logDF.printSchema()
}
} |
结果 注:除了这种使用RDD读取文本进而转化成DataFrame之外,我们也会使用自定义DefaultSource来直接将text转化成DataFrame
----------------------------
原文链接:https://blog.51cto.com/14309075/2402582
程序猿的技术大观园:www.javathinker.net
[这个贴子最后由 flybird 在 2020-01-21 22:00:20 重新编辑]
|
|