当前位置:
首页
文章
前端
详情

SparkSQL查询程序的两种方法,及其对比

import包:

import org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDDimport org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Row, SQLContext}

样例类:

case class Person(id:Int,name:String,age:Int)

主函数:def main(args: Array[String]): Unit = {  val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local")  val sparkContext = new SparkContext(sparkConf)  val sqlContext = new SQLContext(sparkContext)  val rdd: RDD[String] = sparkContext.textFile("C:\\Users\\dummy\\Desktop\\person.txt")  val lineRdd: RDD[Array[String]] = rdd.map(_.split(" "))  InferringSchema(lineRdd,sqlContext)  SpecifyingSchema(lineRdd,sqlContext)  sparkContext.stop()}第一种方法:(需要创建样例类)

/**  * 通过反射推断Schema  * @param lineRdd  * @param sqlContext  */def InferringSchema(lineRdd: RDD[Array[String]],sqlContext:SQLContext): Unit ={  //将RDD和case class关联  val personRdd: RDD[Person] = lineRdd.map(x=>Person(x(0).toInt,x(1),x(2).toInt))  //导入隐式转换,如果不导入无法将RDD转换成DataFrame  import sqlContext.implicits._  //将RDD转换成DataFrame  val personDF: DataFrame = personRdd.toDF()  personDF.show()  //注册一张临时表  //personDF.registerTempTable("person")  //val personDF2: DataFrame = sqlContext.sql("select * from person")  //将结果以JSON的方式存储到指定位置  //personDF2.write.json("C:\\Users\\dummy\\Desktop\\out")  //personDF2.show()}

第二种方法:

/**  * 通过StructType直接指定Schema  * @param lineRdd  * @param sqlContext  */def SpecifyingSchema(lineRdd: RDD[Array[String]],sqlContext:SQLContext): Unit ={  //通过StructType直接指定每个字段的schema  val schema=StructType(    List(      /**StructField只需传入前面两个参数即可        * name: String,        * dataType: DataType,        * nullable: Boolean = true,        * metadata: Metadata = Metadata.empty)        */      StructField("id",IntegerType),      StructField("name",StringType),      StructField("age",IntegerType)    )  )  val rowRdd: RDD[Row] = lineRdd.map(x=>Row(x(0).toInt,x(1),x(2).toInt))  val personDF: DataFrame = sqlContext.createDataFrame(rowRdd,schema)  //personDF.show()  personDF.registerTempTable("person")  val personDF2: DataFrame = sqlContext.sql("select * from person")  //personDF2.write.json("C:\\Users\\dummy\\Desktop\\out")  personDF2.show()}对比:

SparkSQL查询程序的两种方法,及其对比

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。