SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似map、reduce、join、window等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。
Spark Streaming流式处理系统特点有:
- 将流式计算分解成一系列短小的批处理作业
- 将失败或者执行较慢的任务在其它节点上并行执行
- 较强的容错能力(基于RDD继承关系Lineage)
- 使用和RDD一样的语义
本文将Spark Streaming结合FlumeNG,然后以源码中的JavaFlumeEventCount作参考,建立maven工程,打包在spark standalone集群运行。
需要spark streaming的flume插件包,jar的maven地址如下,填入pom.xml中
1 <dependency> 2 <groupId>org.apache.spark</groupId> 3 <artifactId>spark-streaming-flume_2.10</artifactId> 4 <version>1.1.0</version> 5 </dependency>
<project xmlns="" xmlns:xsi="" xsi:schemaLocation=""> <modelVersion>4.0.0</modelVersion> <groupId>test</groupId> <artifactId>hq</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <compilerVersion>1.6</compilerVersion> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.3.2</version> <configuration> <archive> <manifest> <addClasspath>true</addClasspath> <classpathPrefix>.</classpathPrefix> <mainClass>JavaFlumeEventCount</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.4</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.10</artifactId> <version>1.1.0</version> </dependency> </dependencies> </project>
import org.apache.spark.SparkConf; import; import org.apache.spark.streaming.*; import*; import org.apache.spark.streaming.flume.FlumeUtils; import org.apache.spark.streaming.flume.SparkFlumeEvent; public final class JavaFlumeEventCount { private JavaFlumeEventCount() { } public static void main(String[] args) { String host = args[0]; int port = Integer.parseInt(args[1]); Duration batchInterval = new Duration(Integer.parseInt(args[2])); SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils .createStream(ssc, host, port); flumeStream.count(); flumeStream.count().map(new Function<Long, String>() { private static final long serialVersionUID = -572435064083746235L; public String call(Long in) { return "Received " + in + " flume events."; } }).print(); ssc.start(); ssc.awaitTermination(); } }
除了自身打的jar包外,运行还需要:spark-streaming-flume_2.10-1.1.0.jar, flume-ng-sdk-1.4.0.jar 这2个jar包(我使用的flume-ng版本是1.4.0)可以在flume官网下载
[ebupt@eb174 test]$ /opt/spark-hadoop/bin/spark-submit –master local[4] –name FlumeStreaming –class JavaFlumeEventCount –jars spark-streaming-flume_2.10-1.5.2.jar,flume-ng-sdk-1.6.0.jar hq.jar localhost 10086 5000
此处我采用local模式运行spark, local[4] 表示本地模式下启用四个cpu核来处理数据
localhost 10086 为flume发送端定义的接受数据的服务器 极其端口
注意:参数解释:spark-submit –help。自己可以根据需要修改内存,防止OOM。另外jars可以同时加载多个jar包,逗号分隔。指定的运行类后需要指定3个参数。
1 #Agent5 2 #List the sources, sinks and channels for the agent 3 agent5.sources = source1 4 agent5.sinks = hdfs01 5 agent5.channels = channel1 6 7 #set channel for sources and sinks 8 agent5.sources.source1.channels = channel1 9 = channel1 10 11 #properties of someone source 12 agent5.sources.source1.type = spooldir 13 agent5.sources.source1.spoolDir = /home/hadoop/huangq/spark-flumeng-data/ 14 agent5.sources.source1.ignorePattern = .*(\\.index|\\.tmp|\\.xml)$ 15 agent5.sources.source1.fileSuffix = .1 16 agent5.sources.source1.fileHeader = true 17 agent5.sources.source1.fileHeaderKey = filename 18 19 # set interceptors 20 agent5.sources.source1.interceptors = i1 i2 21 agent5.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder 22 agent5.sources.source1.interceptors.i1.preserveExisting = false 23 agent5.sources.source1.interceptors.i1.hostHeader = hostname 24 agent5.sources.source1.interceptors.i1.useIP=false 25 agent5.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder 26 27 #properties of mem-channel-1 28 agent5.channels.channel1.type = memory 29 agent5.channels.channel1.capacity = 100000 30 agent5.channels.channel1.transactionCapacity = 100000 31 agent5.channels.channel1.keep-alive = 30 32 33 #properties of sink 34 agent5.sinks.hdfs01.type = avro 35 agent5.sinks.hdfs01.hostname = localhost 36 agent5.sinks.hdfs01.port = 10086
启动flume-ng: [hadoop@eb170 flume]$ bin/flume-ng agent -n agent5 -c conf -f conf/spark-flumeng.conf
②如果没有指定Flume的sdk包,会出现错误: java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;没有找到类。这个类在flume的sdk包内,在jars参数中指定jar包位置就可以。
1 Spark assembly has been built with Hive, including Datanucleus jars on classpath
2 Using Spark's default log4j profile: org/apache/spark/
3 15/12/5 19:00:44 INFO SecurityManager: Changing view acls to: ebupt,
4 15/12/5 19:00:44 INFO SecurityManager: Changing modify acls to: ebupt,
5 15/12/5 19:00:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ebupt, ); users with modify permissions: Set(ebupt, )
6 15/12/5 19:00:45 INFO Slf4jLogger: Slf4jLogger started
7 15/12/5 19:00:45 INFO Remoting: Starting remoting
8 15/12/5 19:00:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@eb174:51147]
9 15/12/5 19:00:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@eb174:51147]
10 15/12/5 19:00:45 INFO Utils: Successfully started service 'sparkDriver' on port 51147.
11 15/12/5 19:00:45 INFO SparkEnv: Registering MapOutputTracker
12 15/12/5 19:00:45 INFO SparkEnv: Registering BlockManagerMaster
13 ....
14 .....
15 15/12/5 19:09:21 INFO DAGScheduler: Missing parents: List()
16 15/12/5 19:09:21 INFO DAGScheduler: Submitting Stage 145 (MappedRDD[291] at map at MappedDStream.scala:35), which has no missing parents
17 15/12/5 19:09:21 INFO MemoryStore: ensureFreeSpace(3400) called with curMem=13047, maxMem=278302556
18 15/12/5 19:09:21 INFO MemoryStore: Block broadcast_110 stored as values in memory (estimated size 3.3 KB, free 265.4 MB)
19 15/12/5 19:09:21 INFO MemoryStore: ensureFreeSpace(2020) called with curMem=16447, maxMem=278302556
20 15/12/5 19:09:21 INFO MemoryStore: Block broadcast_110_piece0 stored as bytes in memory (estimated size 2020.0 B, free 265.4 MB)
21 15/12/5 19:09:21 INFO BlockManagerInfo: Added broadcast_110_piece0 in memory on eb174 (size: 2020.0 B, free: 265.4 MB)
22 15/12/5 19:09:21 INFO BlockManagerMaster: Updated info of block broadcast_110_piece0
23 15/12/5 19:09:21 INFO DAGScheduler: Submitting 1 missing tasks from Stage 145 (MappedRDD[291] at map at MappedDStream.scala:35)
24 15/12/5 19:09:21 INFO TaskSchedulerImpl: Adding task set 145.0 with 1 tasks
25 15/12/5 19:09:21 INFO TaskSetManager: Starting task 0.0 in stage 145.0 (TID 190, eb175, PROCESS_LOCAL, 1132 bytes)
26 15/12/5 19:09:21 INFO BlockManagerInfo: Added broadcast_110_piece0 in memory on eb175 (size: 2020.0 B, free: 519.6 MB)
27 15/12/5 19:09:21 INFO TaskSetManager: Finished task 0.0 in stage 145.0 (TID 190) in 25 ms on eb175 (1/1)
28 15/12/5 19:09:21 INFO DAGScheduler: Stage 145 (take at DStream.scala:608) finished in 0.026 s
29 15/12/5 19:09:21 INFO TaskSchedulerImpl: Removed TaskSet 145.0, whose tasks have all completed, from pool
30 15/12/5 19:09:21 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.036589357 s
31 -------------------------------------------
32 Time: 1413198560000 ms
33 -------------------------------------------
34 Received 35300 rows of flume events.
36 15/12/5 19:09:55 INFO JobScheduler: Finished job streaming job 1413198595000 ms.0 from job set of time 1413198595000 ms
37 15/12/5 19:09:55 INFO JobScheduler: Total delay: 0.126 s for time 1413198595000 ms (execution: 0.112 s)
38 15/12/5 19:09:55 INFO MappedRDD: Removing RDD 339 from persistence list
39 15/12/5 19:09:55 INFO BlockManager: Removing RDD 339
40 15/12/5 19:09:55 INFO MappedRDD: Removing RDD 338 from persistence list
41 15/12/5 19:09:55 INFO BlockManager: Removing RDD 338
42 15/12/5 19:09:55 INFO MappedRDD: Removing RDD 337 from persistence list
43 15/12/5 19:09:55 INFO BlockManager: Removing RDD 337
44 15/12/5 19:09:55 INFO ShuffledRDD: Removing RDD 336 from persistence list
45 15/12/5 19:09:55 INFO BlockManager: Removing RDD 336
46 15/12/5 19:09:55 INFO UnionRDD: Removing RDD 335 from persistence list
47 15/12/5 19:09:55 INFO BlockManager: Removing RDD 335
48 15/12/5 19:09:55 INFO MappedRDD: Removing RDD 333 from persistence list
49 15/12/5 19:09:55 INFO BlockManager: Removing RDD 333
50 15/12/5 19:09:55 INFO BlockRDD: Removing RDD 332 from persistence list
51 15/12/5 19:09:55 INFO BlockManager: Removing RDD 332
52 ...
53 ...
54 15/12/5 19:10:00 INFO TaskSchedulerImpl: Adding task set 177.0 with 1 tasks
55 15/12/5 19:10:00 INFO TaskSetManager: Starting task 0.0 in stage 177.0 (TID 215, eb175, PROCESS_LOCAL, 1132 bytes)
56 15/12/5 19:10:00 INFO BlockManagerInfo: Added broadcast_134_piece0 in memory on eb175 (size: 2021.0 B, free: 530.2 MB)
57 15/12/5 19:10:00 INFO TaskSetManager: Finished task 0.0 in stage 177.0 (TID 215) in 24 ms on eb175 (1/1)
58 15/12/5 19:10:00 INFO DAGScheduler: Stage 177 (take at DStream.scala:608) finished in 0.024 s
59 15/12/5 19:10:00 INFO TaskSchedulerImpl: Removed TaskSet 177.0, whose tasks have all completed, from pool
60 15/12/5 19:10:00 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.033844743 s
61 -------------------------------------------
62 Time: 1413198600000 ms
63 -------------------------------------------
64 Received 0 rows of flume events.
- flume-ng与spark的结合成功,可根据需要灵活编写相关的类来实现实时处理FlumeNG传输的数据。
- spark streaming和多种数据源结合,达到实时计算处理的能力。
- Spark Streaming和Flume-NG对接实验
- Spark和Flume-ng整合
- Flume sink 配置手册