为什么RocketMQ 4.9.6版本删除了filtersrv?RocketMQ把消息过滤功能放到了Broker里面

   抖音SEO    
```html

在 RocketMQ 4.9.6 版本中,filtersrv 被移除了,这一变化是出于以下几个目的和考虑:

rocketmq4.9.6版本把filtersrv删除是出于什么目的呢?(图片来源网络,侵删)

1、架构优化:RocketMQ 作为一个高性能、分布式的消息中间件,其架构设计和实现一直在不断演进和优化filtersrv 的移除是其中一部分,旨在简化架构,提高系统的可维护性和扩展性。

2、功能整合filtersrv 主要用于消息过滤,但在新版本中,这部分功能被整合到了其他组件中,Broker 或 NameServer,这样的整合有助于减少组件数量,降低系统复杂性。

3、性能提升:通过移除 filtersrv,可以减少消息传递的环节,从而降低延迟,提高整体性能。

4、资源节省filtersrv 作为额外的服务进程,会占用一定的系统资源,移除后,可以节省这部分资源,降低部署和维护成本。

5、易用性增强:对于用户来说,filtersrv 的存在可能增加了部署和维护的复杂性,移除后,用户可以更加便捷地进行部署和使用。

接下来,我们将详细探讨如何在移除 filtersrv 后,使用 RocketMQ 进行消息过滤。

消息过滤机制

在 RocketMQ 中,消息过滤主要通过两种方式实现:

1、消费端过滤:消费者在订阅主题时,可以通过设置过滤条件(如 SQL 表达式),来选择性地接收消息,这种方式适用于消费者对特定类型或属性的消息感兴趣,而不想处理全部消息的场景。

2、生产者端过滤:生产者在发送消息时,可以设置消息的属性(如键值对),这些属性可以被用于后续的消息筛选和处理。

示例代码

以下是一个简单的示例,展示了如何在移除 filtersrv 后,使用 RocketMQ 进行消息过滤。

生产者端

import org.apache.rocketmq.client.producer.DefaultMQProducer;import org.apache.rocketmq.common.message.Message;public class Producer {    public static void main(String[] args) throws Exception {        // 创建生产者实例        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");        // 设置 NameServer 地址        producer.setNamesrvAddr("127.0.0.1:9876");        // 启动生产者        producer.start();        // 创建消息实例        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());        // 设置消息属性        msg.setKeys("KeyA");        // 发送消息        producer.send(msg);        // 关闭生产者        producer.shutdown();    }}

消费者端

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {    public static void main(String[] args) throws Exception {        // 创建消费者实例        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");        // 设置 NameServer 地址        consumer.setNamesrvAddr("127.0.0.1:9876");        // 订阅主题        consumer.subscribe("TopicTest", "*");        // 注册消息监听器        consumer.registerMessageListener(new MessageListenerConcurrently() {            @Override            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {                for (MessageExt msg : msgs) {                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msg.getBody()));                }                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;            }        });        // 启动消费者        consumer.start();        System.out.printf("Consumer Started.%n");    }}

在上述示例中,生产者发送了一条带有属性 KeyA 的消息,消费者可以基于这些属性进行过滤,只消费满足特定条件的消息。

现在,您已经了解了如何在 RocketMQ 中进行消息过滤,尝试在您的项目中应用吧!

如果您对 RocketMQ 或者消息中间件有任何疑问,欢迎在下方留言,我们会尽快回复您的问题。同时,也欢迎您关注我们的社交媒体平台,获取更多有关消息中间件的最新资讯和技术分享。如果觉得这篇文章对您有帮

评论留言

我要留言

欢迎参与讨论,请在这里发表您的看法、交流您的观点。