|
spark读取kafka后写入redis package com .prince .demo .test
import com .typesafe .config .ConfigFactory
import org .apache .kafka .common .serialization .StringDeserializer
import org .apache .log4j.{Level, Logger}
import org .apache .spark .streaming.{Seconds, StreamingContext}
import org .apache .spark .streaming .kafka010 .ConsumerStrategies .Subscribe
import org .apache .spark .streaming .kafka010 .KafkaUtils
import org .apache .spark .streaming .kafka010 .LocationStrategies .PreferConsistent
import org .apache .spark .sql .SparkSession
import redis .clients .jedis .Jedis
/**
* Created by prince on 2017/9/13.
*/
object SparkStreamingWriteRedis {
Logger .getLogger( "org") .setLevel(Level .WARN)
def main(args: Array[String]): Unit = {
val spark = SparkSession .builder .appName( "SparkStreamingWriteRedis") .master( "local[*]") .getOrCreate()
val sparkContext = spark .sparkContext
val ssc = new StreamingContext(sparkContext, Seconds( 1))
implicit val conf = ConfigFactory .load
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> conf .getString( "kafka.brokers"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> conf .getString( "kafka.group"),
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java .lang .Boolean))
val topic = conf .getString( "kafka.topics")
val topics = Array(topic)
val stream = KafkaUtils
.createDirectStream(ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
val input = stream .flatMap(line => {
Some(line .value .toString)
})
input .foreachRDD(rdd => {
rdd .foreachPartition(part => {
val jedis = new Jedis( "192.168.1.97", 6379, 3000)
jedis .auth( "123456")
part .foreach( x => {
jedis .lpush( "test_key", x)
jedis .close()
})
})
})
ssc .start()
ssc .awaitTermination()
}
} |
----------------------------
原文链接:https://blog.csdn.net/qq_39869388/article/details/80366380
程序猿的技术大观园:www.javathinker.net
[这个贴子最后由 flybird 在 2020-03-22 12:05:09 重新编辑]
|
网站系统异常
系统异常信息 |
Request URL:
http://www.javathinker.net/WEB-INF/lybbs/jsp/topic.jsp?postID=2937
java.lang.NullPointerException
如果你不知道错误发生的原因,请把上面完整的信息提交给本站管理人员。
|
|