yield-bytes

沉淀、分享与无限进步

基于PySpark整合Spark Streaming与Kafka

  本文内容主要给出基于PySpark程序,整合Spark Streaming和Kafka,实现实时消费和处理topic消息,为PySpark开发大数据实时计算项目提供基本参考。

1 程序环境准备:

  这里不再使用Spark的集群环境,因涉及的计算资源测试环境受限,目前两台虚拟机:1个vcore+2G内存,其中一台虚拟机启动Spark Streaming服务进程,另外一台虚拟机启动kafka进程。

  • 虚拟机A:启动单实例kafka服务
  • 虚拟机B:运行PySpark程序

  在VM A,程序环境要求安装jdk1.8以上以及与kafka匹配版本的scala版本
版本兼容说明:

1
2
3
kafka:kafka_2.11-2.4.0
java:java version "1.8.0_11"
scala: Scala 2.12.0

  这里需要注意:如果使用kafka_2.12版本以上,需要使用jdk1.8.0_212以上;kafka_2.12与jdk1.8.0_11有不兼容地方,kafka启动报错提示java.lang.VerifyError: Uninitialized object exists on backward branch 209

1.1 基本配置

(1)配置单机zk这里无需依赖ZooKeeper集群,只需使用kafka自带的zk服务即可
vim /opt/kafka_2.11-2.4.0/config/zookeeper.properties

1
2
dataDir=/opt/zookeeper # zk的snapshot数据存储路径
clientPort=2181 # 按默认端口

(2)配置kafka的,路径/opt/kafka_2.11-2.4.0/config/ server.properties

1
2
log.dirs=/opt/kafka-logs # 存放kafka数据目录
zookeeper.connect=127.0.0.1:2181 # 按默认连接本机zk即可

1.2 启动zk和kafka
1
2
3
4
[root@nn kafka_2.11-2.4.0]# pwd
/opt/kafka_2.12-2.4.0

[root@nn kafka_2.11-2.4.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties 2>&1 &

kafka server后台启动:

1
[root@nn kafka_2.11-2.4.0]# nohup bin/kafka-server-start.sh config/server.properties 2>&1 &

1.3 测试单实例Kafka

  对于kafka单节点而言,这里只能使用1个分区且1个replication-factor,topic名称为sparkapp

1
2
[root@nn kafka_2.11-2.4.0]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sparkapp
Created topic sparkapp.

打开一个新的shell,用于启动producer

1
[root@nn kafka_2.11-2.4.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sparkapp

再打开一个新的shell,用于启动consumer

1
[root@nn kafka_2.11-2.4.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092  --topic sparkapp

  在producer shell输入字符串,consumer端可以看到相应输出,说明单机的kafka可以正常运行,下面将使用Spark Streaming实时读取kafka的输入流

2 整合streaming和kafka

2.1 配置依赖包

  具体说明参考官方文档spark streaming连接kafka需要依赖两个jar包(注意版本号):
spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar: 下载链接
spark-streaming-kafka-0-8_2.11-2.4.4.jar: 下载链接
  将这两个jar包放在spark 的jars目录下,需要注意的是:这两个jar包缺一不可,如果是在Spark集群上做测试,那么每个Spark节点都需要放置这两个jars包:

1
2
3
4
5
6
[root@nn jars]# pwd
/opt/spark-2.4.4-bin-hadoop2.7/jars

[root@nn jars]# ls spark-streaming-kafka-0-8
spark-streaming-kafka-0-8_2.11-2.4.4.jar
spark-streaming-kafka-0-8-assembly_2.11-2.4.3.jar

  (关于spark-streaming-kafka的jar包依赖说明:就像python连接kafka,需要使用pip 安装kafka这个库)

2.2 Spark Streaming实时消费Kafka消息

  使用spark自带的直连kafka,实现实时计算wordcount,可以看到写普通的PySpark逻辑相对简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == "__main__":

sc = SparkContext(appName="streamingkafka")
sc.setLogLevel("WARN") # 减少shell打印日志
ssc = StreamingContext(sc, 5) # 5秒的计算窗口
brokers='127.0.0.1:9092'
topic = 'sparkapp'
# 使用streaming使用直连模式消费kafka
kafka_streaming_rdd = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines_rdd = kafka_streaming_rdd.map(lambda x: x[1])
counts = lines_rdd.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
# 将workcount结果打印到当前shell
counts.pprint()
ssc.start()
ssc.awaitTermination()

spark streaming流默认接收的是utf-8编码的字符串

KafkaUtils接口createDirectStream说明:

1
2
3
4
5
6
7
8
9
10
11
Parameters:	
ssc – StreamingContext object.
topics – list of topic_name to consume.
kafkaParams – Additional params for Kafka.
fromOffsets – Per-topic/partition Kafka offsets defining the (inclusive) starting point of the stream.
keyDecoder – A function used to decode key (default is utf8_decoder).
valueDecoder – A function used to decode value (default is utf8_decoder).
messageHandler – A function used to convert KafkaMessageAndMetadata. You can assess meta using messageHandler (default is None).

Returns:
A DStream object

spark streaming 从 kafka 接收数据,有两种方式
(1)使用Direct API,这是更底层的kafka API
(2)使用receivers方式,这是更为高层次的API

  在本博客后面讨论streaming的原理同时也给出Direct模式的相关详细的解析。当前测试使用为Direct模式,在虚拟机B的Spark目录下,启动application,启动命令需要带上指定的jars包。

1
bin/spark-submit --jars spark-streaming-kafka-0-8_2.11-2.4.4.jar direct_stream.py 

  在虚拟机A的producer shell端,输入字符串句子
1
2
3
[root@nn kafka_2.11-2.4.0]# bin/kafka-console-producer.sh --broker-list localhost:9 --topic sparkapp
>welcome to pyspark kafka
>从这里开始 将开发一个 由sparkstreaming 完成的 实时计算的 大数据项目

  在spark-submit窗口,可以看到spark streaming消费并处理kafka生成的实时流字符串结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-------------------------------------------
Time: *** 09:34:28
-------------------------------------------
('welcome', 1)
('to', 1)
('pyspark', 1)
('kafka', 1)

-------------------------------------------
Time: *** 09:34:30
-------------------------------------------
-------------------------------------------
Time: *** 09:34:34
-------------------------------------------
('从这里开始', 1)
('', 1)
('将开发一个', 1)
('由sparkstreaming', 1)
('完成的', 1)
('实时计算的', 1)
('大数据项目', 1)

-------------------------------------------
Time: *** 09:34:36
-------------------------------------------

  以上完成基于PySpark整合Spark Streaming与Kafka的测试。

2.3 关于以上测试过程有关offset简单说明

  该测试并没有给出consumer自己管理消息的offset,在上面测试中,例如,producer连续生产5条消息,那么消息体可以看出以下简单构成:
| offset| msg |
|—|—|
| 0|123@qq.com|
| 1|124@qq.com |
| 2|125@qq.com |
| 3|126@qq.com |
| 5|127@qq.com |
  上面的测试中,streaming 以Direct模式连接kafka,每消费一条消息,streaming默认自动commit offset到kafka,以期实现当下一批streaming去kafka取消息时,是按顺延下一条来取,保证没有重复处理消息,也不会漏了消息,这是什么意思呢?
  例如当前streaming 消费offset=1的消息后,自动将消费位置offset=1告诉kafka:你记住我已经把第1个位置消息处理了,如果我下次找你kafka消费,请你找出offset=2的消息给我,但如果你将offset=0的消息给我,说明你让我重复消费消息,如果将offset=4消息给我,说明你让我漏了处理offset=3的消息。
  根据以上说明,例如producer已经生产了offset=9共10消息,即使将当前spark streaming进程再消费offset=1的消息后,被退出,之后重启,spark streaming从kafka消费的消息将是offset=2的消息,而不是offset=10的消息。虽然默认配置有一定合理性,但也有这种情况,导致无法实现“仅消费一次而且保证业务正常”,参考以下场景:
  spark streaming当前进程消费了offset=1的消息后,在业务处理过程中程序出错导致没有将办理业务详情发送到用户124@qq.com,因为spark streaming默认自动提交offset的位置给到kafka,因此spark streaming在一批处理中将消费offset=2的消息。若你想倒回去重新处理offset=1的消息,以保证邮件正确送到给用户,那么只能自己用外部数据库存放成功完成业务的offset,也即是自行管理offset,而不是被动的自动提交到kafka保存消费的offset。
  kafka的offset消费位置的管理详解将在之后的文章给出,只有将offset的消费位置交由客户端自行管理,才能灵活实现各种需求:重新消费、只消费一次等

3 Spark Streaming与Kafka整合的两种方式

  在上面的整合测试里,用的streaming直连kafka进行消费消息。目前Spark Streaming 与 Kafka 的结合主要有两种方式:Receiver Dstream和Direct Dstream,目前企业实际项目主要采用 Direct Dstream 的模式,为何我这边可以断言企业主要使用Direct Dstream模式呢?因为在企业中,他们主力用Java和Scala,考虑企业需求方面,肯定使用spark-streaming-kafka-0-10版本的整合包,而这一版本不再支持Receiver模式。除非某些企业用了Pyspark作为spark应用开发,否则基本没人用Receiver模式。Spark官网也给出整合Kafka的指引链接
在这里插入图片描述  因为基于PySpark开发实时流计算程序,这里只能选择spark-streaming-kafka-0-8开发包,从官方提示可知,spark-streaming-kafka-0-10是stable版本而且支持ssl安全传输,支持offset commit(支持手动提交,这个非常重要,自行控制消息位置从哪条开始处理,保证准确消费)和dynamic topic subscription,这就是为何要用Scala语言开发面向高级需求的Spark程序或者streaming程序,亲儿子!
  对于两种连接连接方式,有必要给出讨论和对比,以便加深streaming消费kafka topic更深理论知识。

3.1 基于Receiver消费消息方式

原理图(已启用WAL机制)
在这里插入图片描述 (原理图需要注意的地方:如果Receiver模式下,未开启WAL用于备份接收的消息,那么图中Save data to WAL是不存在的。)
  早期版本的Spark Streaming与Kafka的整合方式为Receiver从Kafka消费消息,在提交Spark Streaming任务后,Spark会划出指定的Receiver来持续不断、异步读取kafka数据,这个Receiver其实是Executor(jvm进程)的一个常驻线程,跟task类似,为何它是常驻的?因为它需要不断监听Kafka的Producer生产的消息,从这点也可以看出,Receiver收到的消息是存放在Executor的内存中,换句话说,占用了Executor的内存。至于Receiver线程内部使用哪种数据结构存放接收的消息?对于先进先消费,后进后消费场景,显然使用queue最适合(通过队列实现多线程的生产-消费编程逻辑)。当Driver这边提交job后,Executors从Receiver拿到消息去交给task处理。在执行完之后,Receiver向Kafka的Zookeeper提交offset,告诉Kafka记主它当前已消费的位置。
  早期的设计中,Spark Streaming为了零丢失地消费kafka消息,增加对接收到的消息进行预写日志处理(Write Ahead Log, WAL)这个WAL是放在hdfs的checkpoint 目录下,开启该功能后,Receiver除了将接收到消息存放到Executor内存中,还将其同步写入到hdfs上的WAL日志文件。因此,当一直运行的Spark Streaming任务突然挂了,后期启动时,Streaming也可以自动从hfds的checkpoint目录下的WAL日志找回丢失的消息。

Receiver连接方式的缺点

  从上面receiver工作原理可以总结其缺点出将出现在内存方面、wal日志影响吞吐量等方面存在设计上的缺点:
(1)占用cpu+内存:每个receiver需要单独占用一个vcore以及相应内存,如果Receiver并发数量多,占用Executor更多cpu和内存资源,这些资源本应用来跑tasks做计算用的,这就出现浪费资源的情况。

(2)WAL拖累整体处理效率:为了不丢数据需要开启WAL,也即Receiver将接收到的数据写一份备份到文件系统上(hdfs的checkpoint目录),既然有落到磁盘自然会有IO,这降低了kafka+streaming这个组合实时处理消息的效率,换句话说:增加job的执行时间。此外,开启WAL,还有造成重复消费的可能。

(3)接收数量大于处理速率: 若Receiver并发数量设置不合理,接受消息速率大于streaming处理消息的速率,就会出现数据积压在队列中,最终可能会导致程序异常退出。这里也是面试常见的问题:例如提高Receiver的并发数量,就可以提高streaming处理能力吗?首先,Receiver异步接收kafka消息,不参与计算,真正执行计算的是streaming,如果streaming并发性没有调高,整个计算能力也没有提高。一定要记着:kafka跟streaming是需要两边同时调优,才能打得计算能力的整体提升,不能只调优一边,这一个组合!!

(补充知识点:Receiver的并发数据量是怎么确定?
  在KafkaUtils.createStream()中,可以指定topic的partition数量,该数量就是Receiver消费此topic的并发数(其实就是Executor 启动消费此topic的线程数量)但需要指出的是:Kafka中topic的partition与Spark中RDD的partition是两个不同的概念,两者没有关联关系。)

3.2 基于Direct消费消息方式

原理图:
在这里插入图片描述  当Receiver的工作原理及其缺点理解后,Direct模式将更容易理解。Driect模式下,Streaming定时主动查询Kafka,以获得指定topic的所有partition的最新offset,结合上一批次已保存的offset位置,Streaming就可以确定出每个批次拉取消息offset的范围,例如第1批次的消息(offset范围0-100)正在处理过程中,streaming指定特定的线程定时去Kafka查询第2批次最新的offset,发现最新值为300,那么如果streaming没有限制每批次的最大消费速率,在第2批次取消息时,会一次性取回offset=101到300的消息记录,这个就是所谓的offset ranges。当让如果streaming没有限制每批次的最大消费速率就是每批次100,那么即使最新的offset位置为300,第2批次消费的offset 访问只能是101~200共计100条消费记录。
  当处理数据的job启动时,就会使用kafka的简单Consumer API来获取kafka中指定offset范围的数据。此外,Streaming已消费的offset不再交由Zookeeper来管理,而是手动采用外部存储数据库如mysql、redis等存放和管理已消费的offset。
以下为Scala代码演示从rdd拿到offset ranges属性的逻辑(rdd当然本身包含消息数据)
​```java
directKafkaStream.map {

}.foreachRDD { batchRdd =>
// 获取当前rdd数据对应的offset
val offsetRanges = batchRdd.asInstanceOf[HasOffsetRanges].offsetRanges
// 运行计算任务
doCompute(batchRdd)
// 使用外部数据库自行保存和管理offsetRanges
saveToRedis(offsetRanges)
}

1
2
3
4
5
6
7
8
  而Receiver方式下没有关于offset的处理逻辑,这是因为streaming在该模式下内部通过kafka consumer high level API 提交到zk保存。
​```java
receiverkafkaStream.map {
...
}.foreachRDD { streamRdd =>
// 运行计算任务
doCompute(rdd)
}

Direct连接方式的优点

(1)提高计算资源利率:不像Receiver那样还占用Executor的一部分内存和计算资源,Direct方式下的Executor的代码实现踢掉Receiver这块设计,因此可以实现计算和内存资源全部用在计算任务,因为streaming定时主动去kafka拉取batch 消息,拉过来直接计算,而不是像Receiver不断接收消息不断地存放在内存中。

(2)无需开启WAL:Receiver方式需要开启WAL机制以保证不丢失消息,这种方式加大了集群的计算延迟和效率,而Direct的方式,无需开启WAL机制,因为Kafka集群有partition做了高可用,只要streaming消费方自己存放和管理好已经消费过的offset,那么即使程序异常退出等,也可利用已存储的offset去Kafka消费丢失的消息。

(3)可保证exactly once的消费语义:基于Receiver的方式,使用kafka的高阶API来在Zookeeper中保存消费过的offset。这是消费kafka数据的传统方式。这种方式配合WAL机制,可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和Zookeeper之间可能是不同步的。基于Direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据时消费一次且仅消费一次。

(4)计算程序更稳定:Receiver模式是通过异步持续不断的读取数据,当集群出现网络、计算负载跟不上等因素,导致streaming计算任务侧出现延迟和堆积,而Receiver却还在持续接收kafka消息,此种情况容易导致Executor内存溢出或者其他异常抛出,从而引起计算程序退出,换句话说,Receiver模式的streaming实时计算可靠性和稳定性欠缺。对于Direct模式,Driver在触发batch计算任务时,才会去kafka拉消息回来并计算,而且给streaming加入最大消费速率控制后,整个实时计算集群鲁棒性更强。

(5)Dstream 的rdd分区数与kafka分区一致
  Direct模式下,Spark Streaming创建的rdd分区数跟Kafka的partition数量一致,也就是说Kafka partitions和streaming rdd partitions之间有一对一的映射关系,这样的好处是明显和直观的:只要增加kafka topic partition数量,就可以直接增大spark streaming的计算的并发数。
  当然,Direct模式不足的地方就是需要自行实现可靠的offset管理逻辑,但对于开发方向来说,这点很容易实现,我个人若对offset管理,将优先选用redis,而且是集群!
  以上有关Spark Streaming 整合Kafka的方式和原理分析必须要理解,否则在后面的实时计算平台的代码开发上,有些逻辑你不一定能处理好。