|
flume+spark streaming+redis完整篇 一.前言
本篇是用flume作为数据源,spark streaming来实时处理,然后把结果存在redis供查询.
本篇介绍的是一个实时统计网站访问的pv的例子.
本篇采用的各种版本如下 scala-2.10.4 spark-1.6.1 flume-1.6.0
本篇采用的spark集群为sdandalone模式
二.数据源flume配置
flume的详细说明请自行百度.这里的flume source采用http,并且使用json handler来处理.
source 配置:
a1.sources.r2.type = http
a1.sources.r2.bind=10.8.23.58
a1.sources.r2.port = 5140
a1.sources.r2.channels = c3
a1.sources.r2.handler = org.apache.flume.source.http.JSONHandler
channel因为测试,所以选择的是内存方式,实际根据情况,建议使用flie模式,并且配置checkpoint防止数据丢失.
channel配置:
a1.channels.c3.type = memory
a1.channels.c3.capacity = 100
a1.channels.c3.transactionCapacity = 100
sink有两种,如下
flume=>spark streaming有两种方式
1.推模式:这种模式比较简单,直接连接上对应的avro端口即可,但是有个最大的问题,你必须先启动spark streaming任务,然后观察这个端口开在哪台spark节点上,并且每次启动都会随机到某一个节点,然后再去改flume的配置,往那台机器上发数据,这就比较蛋疼,如果你有100台flume,那会让人疯狂的.因此这个模式,只有你的flume数量比较少的情况下适用.
推模式的sink配置:
a1.sinks.k3.type = avro
a1.sinks.k3.channel = c3
a1.sinks.k3.hostname = 10.8.23.32
a1.sinks.k3.port = 4545
2.拉模式:
这里建议先使用推模式,等推模式跑通了,再切换到拉模式. 拉模式相比推模式稍微复杂点,主要复杂在flume的配置,官网有很详细的说明,我这里会把用到的都描述出来.
首先需要把3个jar放到flume/lib下.(分别是:spark-streaming-flume-sink_2.10-1.6.1.jar ,scala-library-2.10.5.jar, commons-lang3-3.3.2.jar,在最后的网盘地址里面有)
拉模式的sink配置
a1.sinks = spark
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.spark.hostname = 10.2.23.58
a1.sinks.spark.port = 4545
a1.sinks.spark.channel = c3
至此 flume配置完成.可以启动观察一下.
三.代码开发
object PvStatistic {
def main (args: Array[ String ]): Unit = {
val masterUrl = "spark://10.8.23.112:7077"
val conf = new SparkConf().setMaster(masterUrl).setAppName( "PvStatistic" )
val ssc = new StreamingContext(conf , Seconds ( 15 ))
// 推模式
// val flumeStream = FlumeUtils.createStream(ssc, "10.8.23.58", 4545, StorageLevel.MEMORY_ONLY_SER_2)
// 拉模式
val flumeStream = FlumeUtils. createPollingStream (ssc , "10.8.23.58" , 4545 , StorageLevel. MEMORY_ONLY_SER_2 )
flumeStream.foreachRDD(rdd => {
rdd.foreachPartition(it=>{
val jedis = RedisClient. pool .getResource
it.foreach(event=>{
val sensorInfo = new String(event. event .getBody.array()) //单行记录
// println(sensorInfo)
val json = JSONObject. fromObject (sensorInfo) ;
val url=json.getString( "url" )
jedis.hincrBy( "Spark:PV" , url , 1 ) ;
})
RedisClient. pool .returnResource(jedis)
})
})
ssc.start()
ssc.awaitTermination()
}
两种模式对于数据源的获取只是方法不一样,处理逻辑都一样.
需要的jar包 我直接贴pom文件
<dependency>
<groupId>org.apache.spark </groupId>
<artifactId>spark-core_2.10 </artifactId>
<version>1.6.1 </version>
<scope>provided </scope>
</dependency>
<dependency>
<groupId>org.apache.spark </groupId>
<artifactId>spark-streaming_2.10 </artifactId>
<version>1.6.1 </version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume_2.10 -->
<dependency>
<groupId>org.apache.spark </groupId>
<artifactId>spark-streaming-flume_2.10 </artifactId>
<version>1.6.1 </version>
</dependency>
<dependency>
<groupId>redis.clients </groupId>
<artifactId>jedis </artifactId>
<version>2.8.1 </version>
</dependency>
<dependency>
<groupId>net.sf.json-lib </groupId>
<artifactId>json-lib </artifactId>
<version>2.4 </version>
<classifier>jdk15 </classifier>
</dependency>
完成以后就是打包上传到spark主节点了,这里提一下,因为需要很多依赖包,所以建议直接打成一个包含所有依赖的jar.
推荐一个maven插件来做这个事.
<plugin>
<groupId>org.apache.maven.plugins </groupId>
<artifactId>maven-shade-plugin </artifactId>
<version>2.3 </version>
<executions>
<execution>
<phase>package </phase>
<goals>
<goal>shade </goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:* </artifact>
<excludes>
<exclude>META-INF/*.SF </exclude>
<exclude>META-INF/*.DSA </exclude>
<exclude>META-INF/*.RSA </exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation= "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" >
<mainClass>com.defonds.RsaEncryptor </mainClass>
</transformer>
<transformer
implementation= "org.apache.maven.plugins.shade.resource.AppendingTransformer" >
<resource>META-INF/spring.handlers </resource>
</transformer>
<transformer
implementation= "org.apache.maven.plugins.shade.resource.AppendingTransformer" >
<resource>META-INF/spring.schemas </resource>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
有了这个插件,直接maven clean package打包就行了,打出来的包包含所有依赖文件.
然后把这个包上传到spark主节点,在spark/bin下执行
spark-submit --master spark://cdh112:7077 --class com.dome.PvStatistic /opt/spark/job/analyseSys-1.0-SNAPSHOT.jar
然后可以向之前配置的flume的接收端口发送http请求测试了
发送内容:
{
"body":"{\"url\": \"http://20160926 16:02\"}",
"headers":{"v1":"log"}
}
在redis里面可以看到 SparkPv开头的key下面 每个访问地址的访问次数.
上面提到的jar包
链接:http://pan.baidu.com/s/1kUEJJJx 密码:9fxv
最后感谢一下群内的各位兄弟以及群主aDog~ 有兴趣交流的可以加入QQ群:459898801
----------------------------
原文链接:https://blog.csdn.net/ghost06211/article/details/52667958
程序猿的技术大观园:www.javathinker.net
[这个贴子最后由 flybird 在 2020-03-22 21:27:01 重新编辑]
|
网站系统异常
系统异常信息 |
Request URL:
http://www.javathinker.net/WEB-INF/lybbs/jsp/topic.jsp?postID=2947
java.lang.NullPointerException
如果你不知道错误发生的原因,请把上面完整的信息提交给本站管理人员。
|
|