当前位置:
首页
文章
前端
详情

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

温馨提示:如果使用电脑查看图片不清晰,可以使用手机打开文章单击文中的图片放大查看高清原图。

Fayson的github:

https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的

在前面的文章Fayson介绍了一些关于Spark2Streaming的示例如《Spark2Streaming读Kerberos环境的Kafka并写数据到HBase》、《Spark2Streaming读Kerberos环境的Kafka并写数据到Kudu》及《Spark2Streaming读Kerberos环境的Kafka并写数据到Hive》,本篇文章Fayson主要介绍如何使用Spark2Streaming访问Kerberos环境的Kafka并将接收到的Kafka数据逐条写入HDFS,在介绍本篇文章前,你可能需要知道:

《如何在CDH集群启用Kerberos》

《如何在Redhat7.3的CDH5.14中启用Kerberos》

《如何在Redhat7.4的CDH5.15中启用Kerberos》

《如何通过Cloudera Manager为Kafka启用Kerberos及使用》

示例架构图如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

示例详细流程图如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

  • 内容概述:

1.环境准备

2.Spark2Streaming示例开发

3.示例运行

4.总结

  • 测试环境:

1.CM5.14.3/CDH5.14.2

2.CDK2.2.0(Apache Kafka0.10.2)

3.SPARK2.2.0

4.操作系统版本为Redhat7.3

5.采用root用户进行操作

6.集群已启用Kerberos

2.环境准备

1.准备访问Kafka的Keytab文件,使用xst命令导出keytab文件

[root@cdh01 ~]# kadmin.local kadmin.local:  xst -norandkey -k fayson.keytab fayson@FAYSON.COM

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

使用klist命令检查导出的keytab文件是否正确

[root@cdh01 ~]# klist -ek fayson.keytab

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

2.准备jaas.cof文件内容如下:

KafkaClient {  com.sun.security.auth.module.Krb5LoginModule required  useKeyTab=true  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"  principal="fayson@FAYSON.COM";};Client {  com.sun.security.auth.module.Krb5LoginModule required  useKeyTab=true  storeKey=true  keyTab="/data/disk1/0286-kafka-shell/conf/fayson.keytab"  principal="fayson@FAYSON.COM";};

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

将fayson.keytab和jaas.conf文件拷贝至集群的所有节点统一的/data/disk1/0286-kafka-shell/conf目录下。

3.准备向Kerberos环境发送数据的脚本,关于脚本这里就不在过多的介绍前面很多文章都有介绍,具体可以参考Fayson的GitHub:

https://github.com/fayson/cdhproject/tree/master/kafkademo/0286-kafka-shell

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

根据需要将conf下面的配置文件修改为自己集群的环境即可,发送至Kafka的JSON数据示例如下:

{   "occupation": "生产工作、运输工作和部分体力劳动者",   "address": "台东东二路16号-8-8",   "city": "长治",   "marriage": "1",   "sex": "1",   "name": "仲淑兰",   "mobile_phone_num": "13607268580",   "bank_name": "广州银行31",   "id": "510105197906185179",   "child_num": "1",   "fix_phone_num": "15004170180"}

(可左右滑动)

4.登录CM进入SPARK2服务的配置项将spark_kafka_version的kafka版本修改为0.10

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

3.SparkStreaming示例开发

1.使用maven创建scala语言的spark2demo工程,pom.xml依赖如下

<dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-client</artifactId>    <version>2.6.0-cdh5.11.2</version></dependency><dependency>    <groupId>org.apache.hadoop</groupId>    <artifactId>hadoop-common</artifactId>    <version>2.6.0-cdh5.11.2</version></dependency><dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-core_2.11</artifactId>    <version>2.2.0.cloudera2</version></dependency><dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-sql_2.11</artifactId>    <version>2.2.0.cloudera2</version></dependency><dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-streaming_2.11</artifactId>    <version>2.2.0.cloudera2</version></dependency><dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>    <version>2.2.0.cloudera2</version></dependency><dependency>    <groupId>org.scala-lang</groupId>    <artifactId>scala-library</artifactId>    <version>2.11.8</version></dependency>

(可左右滑动)

2.在resources下创建0292.properties配置文件,内容如下:

kafka.brokers=cdh02.fayson.com:9092,cdh03.fayson.com:9092,cdh04.fayson.com:9092kafka.topics=kafka_hdfs_topic

(可左右滑动)

3.创建Kafka2Spark2HDFS.scala文件,内容如下:

package com.cloudera.streamingimport java.io.{File, FileInputStream}import java.util.Propertiesimport org.apache.commons.lang.StringUtilsimport org.apache.hadoop.conf.Configurationimport org.apache.hadoop.fs.{FileSystem, Path}import org.apache.kafka.common.serialization.StringDeserializerimport org.apache.log4j.{Level, Logger}import org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.util.parsing.json.JSON/**  * package: com.cloudera.streaming  * describe: Kerberos环境中Spark2Streaming应用实时读取Kafka数据,解析后存入HDFS  * creat_user: Fayson   * email: htechinfo@163.com  * creat_date: 2018/7/17  * creat_time: 下午11:08  * 公众号:Hadoop实操  */object Kafka2Spark2HDFS {  Logger.getLogger("com").setLevel(Level.ERROR) //设置日志级别  var confPath: String = System.getProperty("user.dir") + File.separator + "conf/0292.properties"  def main(args: Array[String]): Unit = {    //加载配置文件    val properties = new Properties()    val file = new File(confPath)    if(!file.exists()) {      System.out.println(Kafka2Spark2Hive.getClass.getClassLoader.getResource("0292.properties"))      val in = Kafka2Spark2Hive.getClass.getClassLoader.getResourceAsStream("0292.properties")      properties.load(in);    } else {      properties.load(new FileInputStream(confPath))    }    val brokers = properties.getProperty("kafka.brokers")    val topics = properties.getProperty("kafka.topics")    println("kafka.brokers:" + brokers)    println("kafka.topics:" + topics)    if(StringUtils.isEmpty(brokers)|| StringUtils.isEmpty(topics)) {      println("未配置Kafka信息...")      System.exit(0)    }    val topicsSet = topics.split(",").toSet    val spark = SparkSession.builder().appName("Kafka2Spark2HDFS-kerberos").config(new SparkConf()).getOrCreate()    val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) //设置Spark时间窗口,每5s处理一次    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers      , "auto.offset.reset" -> "latest"      , "security.protocol" -> "SASL_PLAINTEXT"      , "sasl.kerberos.service.name" -> "kafka"      , "key.deserializer" -> classOf[StringDeserializer]      , "value.deserializer" -> classOf[StringDeserializer]      , "group.id" -> "testgroup"    )    val dStream = KafkaUtils.createDirectStream[String, String](ssc,      LocationStrategies.PreferConsistent,      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))    dStream.foreachRDD(rdd => {      val newrdd = rdd.map(line => {        val jsonObj =  JSON.parseFull(line.value())        val map:Map[String,Any] = jsonObj.get.asInstanceOf[Map[String, Any]]        //将Map数据转为以","隔开的字符串        val userInfoStr = map.get("id").get.asInstanceOf[String].concat(",")          .concat(map.get("name").get.asInstanceOf[String]).concat(",")          .concat(map.get("sex").get.asInstanceOf[String]).concat(",")          .concat(map.get("city").get.asInstanceOf[String]).concat(",")          .concat(map.get("occupation").get.asInstanceOf[String]).concat(",")          .concat(map.get("mobile_phone_num").get.asInstanceOf[String]).concat(",")          .concat(map.get("fix_phone_num").get.asInstanceOf[String]).concat(",")          .concat(map.get("bank_name").get.asInstanceOf[String]).concat(",")          .concat(map.get("address").get.asInstanceOf[String]).concat(",")          .concat(map.get("marriage").get.asInstanceOf[String]).concat(",")          .concat(map.get("child_num").get.asInstanceOf[String])        userInfoStr      })      //将解析好的数据已流的方式写入HDFS,未使用RDD的方式可以避免数据被覆盖      newrdd.foreachPartition(partitionrecord => {        val conf = new Configuration()        val fs = FileSystem.get(conf)        val path =  new Path("/tmp/kafka-data/test.txt")        val outputStream = if (fs.exists(path)){          fs.append(path)        }else{          fs.create(path)        }        partitionrecord.foreach(line => outputStream.write((line + "\n").getBytes("UTF-8")))        outputStream.close()      })    })    ssc.start()    ssc.awaitTermination()  }}

(可左右滑动)

4.使用mvn命令编译工程,注意由于是scala工程编译时mvn命令要加scala:compile

mvn clean scala:compile package

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

5.将编译好的spark2-demo-1.0-SNAPSHOT.jar包上传至服务

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

在conf目录下新增0292.properties配置文件,内容如下:

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

4.示例运行

1.使用spark2-submit命令向集群提交SparkStreaming作业

spark2-submit --class com.cloudera.streaming.Kafka2Spark2HDFS \  --master yarn \  --deploy-mode client \  --executor-memory 2g \  --executor-cores 2 \  --driver-memory 2g \  --num-executors 2 \  --queue default  \  --principal fayson@FAYSON.COM \  --keytab /data/disk1/spark2streaming-kafka-hdfs/conf/fayson.keytab \  --driver-java-options "-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hdfs/conf/jaas.conf" \  --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/data/disk1/spark2streaming-kafka-hdfs/conf/jaas.conf" \  spark2-demo-1.0-SNAPSHOT.jar

(可左右滑动)

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

通过CM查看作业是否提交成功

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

Spark2的UI界面

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

2.运行脚本向Kafka的Kafka_hdfs_topic生产消息,重复执行三次

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

3.使用hdfs命令查看数据是否已写入/tmp/kafka-data/test.txt文件

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

查看写入的数据量,共1800条

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

5.总结

1.在前面的文章Fayson也有介绍Java访问Kerberos环境的Kafka,需要使用到jaas.conf文件,这里的jaas.conf文件Fayson通过spark2-submit的方式指定,注意我们的jaas.conf文件及keytab需要在集群的所有节点存在,因为Driver和Executor是随机在集群的节点上启动的。

2.同样在scala代码中访问Kafka是也一样需要添加Kerberos相关的配置security.protocol和sasl.kerberos.service.name参数。

3.Spark2默认的kafka版本为0.9需要通过CM将默认的Kafka版本修改为0.10

4.在本篇文章中,Fayson将接受到的Kafka JSON数据转换为以逗号分割的字符串,将字符串数据以流的方式写入指定的HDFS文件。

5.本篇文章主要使用FileSystem对象以流的方式将Kafka消息逐条写入HDFS指定的数据问题,该方式可以追加的写入数据。

GitHub地址如下:

https://github.com/fayson/cdhproject/tree/master/spark2demo/spark2streaming-kafka-hdfs

https://github.com/fayson/cdhproject/blob/master/spark2demo/src/main/scala/com/cloudera/streaming/Kafka2Spark2HDFS.scala

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

Spark2Streaming读Kerberos环境的Kafka并写数据到HDFS

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

本文分享自微信公众号 - Hadoop实操(gh_c4c535955d0f)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

免责申明:本站发布的内容(图片、视频和文字)以转载和分享为主,文章观点不代表本站立场,如涉及侵权请联系站长邮箱:xbc-online@qq.com进行反馈,一经查实,将立刻删除涉嫌侵权内容。