-
Notifications
You must be signed in to change notification settings - Fork 39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
消息队列实现概要——深度解读分区Topic的实现 #27
Comments
👍。找code,意外发现这里。 :) |
@jiazhai Pulsar的话写的比较简单, 改天我再完善一下,Exclusive/Failover模式应该是只能单个consumer消费所有partition的消息吧,kafka的话类似shared模式,但是多了个partition分配的过程,特定的分区同一时刻只会分配给一个consumer,但pulsar的share模式类似是RoundRobin,所以才不支持顺序性。Exclusive/Failover模式不知道理解的对不对,晚点我再确认下 |
hi,你好,偶尔查到你的wiki,写的很棒。不过上面这段话不敢苟同,我之前有个问题正好分析测试了下,记录到了这里。 也就是说某个分区被多个客户端同时消费基本上是不可能的,因为一旦rebalance,subVersion必然会更新,你有空可以做个测试验证一下。 |
@gaoyf 赞! 这篇博客写的比较久了,我晚点验证下 |
前言
说道message queue相信大家都不陌生,对于业务方来说很简单就是几个简单的API, 通常也不用关心其内部实现,但如果想要用好消息队列、出了问题能够cover的住,那就必须要能够了解其实现原理,知其然知其所以然。
首先Message queue有一个topic的概念,可以将其理解为日志,发消息的过程说白了其实就是打日志,而消息队列就是存储日志记录的持久层,类似java中的log4j、logback等日志系统,打了日志之后我们就会有一些分析需求,比如说用户过来一个请求,我们将参数、耗时、响应内容等打到日志中,然后会在本机部署一个agent用于抓取日志发送到jstorm集群中用于分析等。那这里我们的应用系统就对应消息队列中的生产者producer,jstorm集群就对应消费者consumer。
所以说白了其实消息队列和log4j这些组件都是日志系统,那么回到message queue的模型中去,如果我们的topic都是单分区的,那么也就是说所有的producer都是往这一个"文件"中打日志,那么这样的话性能必然会有很大的问题,类似日志系统,我们希望每台机器都能够有自己的日志文件,那么自然而然的我们就需要将单分区的topic扩展到多分区的topic。
生产者
生产者这边的实现相对来说比较简单,对于应用层来说我们提供一个统一的topic,比如说
hello-world
, 那么创建topic的时候,我们对内表示的时候会加一层映射hello-world-> [hello-world-1,hello-world-2...]
,也就是说producer实际上是往对应的子topic发送消息的,但具体往哪个发送就需要一个路由策略,一般不要求顺序的话就直接轮训发送,如果需要顺序的话就用hash即可,这个信息直接存储到zookeeper即可,说到zookeeper这里简单提一下,zk并不能给客户单提供全局一致的视图,就是说对于两个不同的客户端,zk并不能保证他们能够在任一时间都能够读到完全相同的数据,这可能是由于网络延迟等原因造成,但如果客户端需要的话可以主动调用sync()
先强制同步一把数据。启动流程:
消费者
消费者这边的话会比较麻烦一些,根据消息队列要提供的消费语义有不同的实现方案,如果要实现顺序性的话相对会复杂一些。
流程:
实现的区别主要是由于第三点造成的,下面我们看一下主流的几款mq是如何实现的
RocketMQ
RocketMQ的consumer使用一般是均衡消费的方式,比如有一个topic: hello-world,我们创建的时候分配了16个分区,而我们总共有4个consumer:A/B/C/D,那么就会每个consumer分配四个分区,比如consumer A就会分配: hello-world-1、hello-world-2、hello-world-3、hello-world-4这四个分区。
也就是说总共需要这么几个要素: topic的总分区数、consumer的总数。topic的总分区数比较简单,启动的时候直接从zk读取即可,关键是第二个consumer的总数,我们知道应用是会宕机的、扩容、缩容、网络问题等,都会影响consumer的总数,RocketMQ是采用的方式是从name server中获取,每个节点启动的时候会定时向name server发送心跳,如果一定时间内没收到心跳包就可以判定这个节点挂掉了。那么这个问题解决了,consumer启动的时候可以从name server 发送请求,获取consumer的列表。
RocketMQ的分配分区是在客户端执行的,也就是说consumer获取到全量的consumer列表后,根据一定的策略分配分区,然后开始消费。然后我们知道consumer会扩容、缩容等原因不停的在变化,那么这个时候就需要重新负载均衡,那么客户端如果知道consumer变化了呢,有两种方式:
那么这里就会有几个问题,由于网络延迟、每个consumer启动的时间不一样,那么consumer获取到变化的时机就会不一样,也就是说数据会不一致,导致在某段时间内 某个分区被多个客户端同时消费。
Kafka
kafka最初是基于zookeeper实现的,简单来说,每个客户端都会连接zookeeper,然后建立对应的watch事件,如果某一个consumer挂掉或者新增了consumer的话,zk下对应的路径就会有变化,然后产生watch事件并通知给客户端,客户端然后执行rebalance事件,看上去貌似方案还阔以,但是会有几个问题:
由于上述原因kafka后面进行了几次改版,核心思想就是将收集consumer全量列表的过程放到了server端,然后将rebalance的过程放到了客户端,之前有一版是两个过程都在server端,但是这样就会有个问题:由于rebalance的过程放到了服务器端,那么如果想要指定自定义的分配策略就不灵活了,比如说想要根据机架、机房等分配,因此后面将这个过程放到了客户端。
流程:
consumer启动后,会首先向broker发送请求查询当前consumer group的GroupCoordinator,然后就会进入
Joining the Group
阶段,向GroupCoordinator发送JoinGroupRequest,GroupCoordinator会从中选一个成为leader,然后向所有的节点发送响应,表明加入成功,但是只有leader的响应中才会包含所有的consumer元数据。然后会进入
Synchronizing Group State
阶段, 每个consumer会向GroupCoordinator发送SyncGroup请求,其中leader的SyncGroup请求包含了分配的结果,等leader收到了分配的结果后就会发送响应将结果同步给所有的consumer。当然这里会有很多的corner case,比如说如果GroupCoordinator挂掉了咋办?consumer leader挂掉了又咋办?这里就不详细叙述了。Pulsar
pulsar的话相对比较清晰,pulsar支持三种消费语义,详细可以参考之前的博客,其中一个shared模式,或者叫RoundRobin模式,所有的consumer可以连接到同一个topic,然后消息会以RoundRobin的形式,轮训发送给每一个consumer,一个消息只会下发给一个consumer。如果一个consumer挂掉了,所有发给它但是还没有ack的消息,都会重新调度发给其他consumer。
看起来实现还是比较清晰的,但是不支持顺序性,因此需要看一下自己的业务场景是否可以满足。
总结
这篇博客总结了一下如何实现分区消息队列,并分析了一下主流的几款mq的实现方式,没不存在一种最完美的方案,按照自己的业务场景,比如说是否要求顺序性、性能、是否需要自定义rebalance策略等,决定自己的消息队列的实现方式。
The text was updated successfully, but these errors were encountered: