广播模式,小笔记

广播模式,小笔记

Scroll Down

最近业务碰到个需求,要每个消费实例都要消费到消息。可能聪明的亚子一看就想到,用广播模式不就得了。

没错,小年一开始也是想到一块了。

于是就研究一下 Kafka 的广播模式是怎么用的。结果一搜才发现,原来 Kafka 的广播模式,就是为每个实例配置不同的消费组ID(GroupId),即每个实例都是单独的消费组。

但kafka消费者的配置信息都是走配置中心的,如果要每个实例都不一样,那代码得要怎么写?

当然,神通广大的网友们已经想到办法了:

@KafkaListener注解的GroupId属性支持SpEL表达式,可以通过这个实现每个应用实例都是单独的消费组,进而实现广播消费。

下面表达式指的是取值java.util.UUID这个类的randomUUID方法,当然也可以通过后缀当前机器ip的方式实现动态groupId

@KafkaListener(topics = {"topic"}, properties = {"auto.offset.reset=latest"},
            groupId = "consumerGroup-" + "#{T(java.util.UUID).randomUUID()}")

等等,有点不对劲,小年突然想起,RocketMQ 就能支持同一个消费组下,实现广播模式的呀。

看了一下使用的姿势,果然:

// 声明并初始化一个 consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("comsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

那么问题来了,为什么 RocketMQ 能够支持同一个消费组的广播模式呢,它是如何实现的呢?

带着问题先思考一下,要实现广播模式,也就是说每个消费实例节点,都必须消费 Topic 的所有消息。

在 RocketMQ 中 Topic 会被分成多个 queue,而这么做的目的就是为了负载均衡。

假设是集群模式下,同一个消费组下的多个消费实例,会分摊 topic 上的消息。

如果是广播模式,那么就会变成这样:

(这里的queue,实际上是 consumeQueue,而 topic 划分的 queue 实际上都会写到 commitLog 中,只不过会保留 queue的信息)

那么这里其实就是客户端的负载均衡,如果没猜错的话,如果 MessageModel 如果是 BROADCASTING,那么消费实例就会分配所有的queue。

于是,小年本着务实求真,果断手撕源码,确实如此。

RebalanceImpl.java

private void rebalanceByTopic(final String topic, final boolean isOrder) {
    switch (messageModel) {
        case BROADCASTING: {
            Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
            if (mqSet != null) {
                boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                if (changed) {
                    this.messageQueueChanged(topic, mqSet, mqSet);
                    log.info("messageQueueChanged {} {} {} {}",
                        consumerGroup,
                        topic,
                        mqSet,
                        mqSet);
                }
            } else {
                log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
            }
            break;
        }
        case CLUSTERING: {
            .....
            break;
        }
        default:
            break;
    }
}

代码逻辑还比较简单,这里就不详细讲了。