Rocket MQ
例子
生产者
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class RocketMQProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建消息生产者,并设置NameServer的地址
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动Producer实例
producer.start();
// 创建消息实例,指定主题Topic、Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes());
// 发送消息到一个Broker
SendResult sendResult = producer.send(msg);
// 打印发送结果
System.out.printf("%s%n", sendResult);
// 关闭Producer实例
producer.shutdown();
}
}
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class RocketMQConsumer {
public static void main(String[] args) throws MQClientException {
// 创建消息消费者,并设置NameServer的地址
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅主题Topic和Tag
consumer.subscribe("TopicTest", "*");
// 注册回调实现类来处理从Broker拉取回来的消息
consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
实例
生产者
public void delayOrderProducer(String msgId, String tag, String body,long delayTime) {
// RocketMQ发布消息
Producer producer = producer(MQConstant.DELAY_GROUP);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
Message msg = new Message(
MQConstant.DELAY_TOPIC,
tag,
body.getBytes()
);
msg.setKey(msgId);
try {
// 设置消息需要被投递的时间。
msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// 同步发送消息,只要不抛异常就是成功。
if (sendResult != null) {
System.out.println(
new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()
);
}
}catch (Exception e){
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
producer.shutdown();
}
消费者
@PostConstruct
public void init() {
Consumer consumer = rocketMQ.consumer(MQConstant.MONITOR_ORDER_GROUP);
consumer.subscribe(MQConstant.ORDER_TOPIC, MQConstant.CUSTOMER_ORDER_TAG, (message, consumeContext) -> {
logger.info("--------Receive: " + message);
try {
OrderEvent event = new OrderEvent("");
String tag = message.getTag();// 获取消息tag
event.setTag(tag);
String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);// 获取消息体
logger.info("tag:"+tag+"----msgBody:"+msgBody);
event.setMsg(msgBody);
context.publishEvent(event);
} catch (Exception e) {
logger.debug("----Group:" + MQConstant.MONITOR_ORDER_GROUP + "----Topic:" + MQConstant.ORDER_TOPIC + "----Tag:"
+ MQConstant.CUSTOMER_ORDER_TAG + "----Error:" + e);
}
return Action.CommitMessage;// 消费消息
});
consumer.start();
logger.info("Mall Order Consumer started.");
}
一些需要注意的配置参数
顺序消息消费失败进行重试前的等待时间、消息消费失败时的最大重试次数
异步消息
消息的发送者无需等待消息接收者(Broker)的处理及返回,甚至无需关心消息是否发送成功。当消息发送者发送消息后,消息将由消息代理接管,消息代理保证消息传递到指定目的地。异步消息主要有两种形式的目的地:队列和主题。队列用于点对点式的消息通信,确保每条消息只有唯一的发送者和接收者;而主题用于发布/订阅式的消息通信,多个消息接收者可以监听同一个主题
同步消息
同步消息传递涉及到等待服务器响应消息的客户端。发送方在发送消息后,必须等待接收方(Broker)处理完该消息后才能继续执行后续操作,这意味着发送方和接收方需要同步协调,保持一定的顺序和时序。同步消息的优点是简单易用,但缺点是在消息响应期间发送方处于阻塞状态,无法进行其他操作,这可能会导致性能问题。
延时消息
当消息到达Broker后,Broker会根据设置的延时级别将消息放入特定的延迟队列中,等待指定的时间后再投递给消费者
使用场景:在订单支付时,设置30分钟的后再投递给消费者,消费者在30分钟后收到消息后去查看订单状态,如果订单未支付,则取消订单。
定时消息
生产者发送普通消息后,可以通过控制台或代码向Broker设置定时规则,Broker会在规则触发时将消息投递给消费者
问题
分布式环境中的消息消费
多个微服务实例的时候,意味着会有多个相同组名的消费者。
消息队列会根据服务治理中的服务信息进行负载均衡,不会将消息发送给所有的微服务实例
微服务中同组消费者的数量
在代码层次建议创建一次消费者的对象,否则会出现消息重复消费的问题,且要做好消费的幂等性
消息的持久化
RocketMQ通过文件系统来实现消息的持久化。当消息被发送到Broker时,它们会被刷盘(写入磁盘)以确保即使在Broker重启或故障的情况下,消息也不会丢失。RocketMQ支持异步刷盘和同步刷盘两种模式,可以根据具体需求进行选择。异步刷盘性能较高,但可能会有一定的数据丢失风险;同步刷盘则能够保证每条消息都被成功写入磁盘。
消息的可靠传输
RocketMQ提供了多种发送消息的方式,包括同步发送、异步发送和单向发送。其中,同步发送是最可靠的方式,因为它会阻塞等待Broker的确认结果。如果发送失败,客户端可以收到明确的错误响应,从而进行重试或其他错误处理。异步发送和单向发送在性能上可能更优,但可靠性相对较低。此外,RocketMQ还提供了消息确认机制,确保消息在消费端被成功处理。
消息的顺序消费
RocketMQ通过消息队列和消费者的负载均衡机制来保证消息的顺序消费。在发送消息时,可以将具有顺序依赖的消息发送到同一个队列中。然后,在消费端,可以确保同一个队列中的消息按照发送的顺序被消费。RocketMQ的负载均衡模块会根据消费者的数量和队列的数量进行智能分配,以实现高效且有序的消息消费。
事务消息
RocketMQ支持分布式事务消息,确保本地事务与发送消息的原子性。它采用了两阶段提交的思想,即首先执行本地事务,然后发送一个半事务消息到Broker。如果本地事务执行成功,则提交该半事务消息;如果失败,则回滚该消息。同时,RocketMQ还提供了事务结果回查机制,用于解决消息发送失败、客户端宕机等极端场景。在消费端,RocketMQ通过ack机制保证已持久化的消息至少被成功消费一次,从而确保消费端的事务一致性。
ack反馈
当消费者从消息队列中接收到消息并处理完成后,它会向消息队列发送一个ACK(Acknowledgment,确认)反馈。这个反馈告诉消息队列该消息已经被成功处理,消息队列在收到ACK反馈后,会将该消息从队列中删除;
否则就会引起重复消费,设置在集群环境中会引起其他实例的消费者进行消费
消费速度
- 限制消息消费速率 :
- 在消费者端,可以设置
consumeMessageBatchMaxSize参数,该参数用于限制每次消费的消息数量。通过合理设置这个参数,可以有效地控制消费者的消费速率,防止消费者消费过快。
- 在消费者端,可以设置
- 并发消费与线程数调整 :
- RocketMQ支持多线程并发消费消息,可以通过调整消费者端的线程数来控制消费速度。在启动消费者时,可以通过设置
ConsumerConfig类的setConsumeThreadMin和setConsumeThreadMax方法来指定最小和最大线程数。增加线程数可以提高并发度,但过多的线程数也会带来额外的开销,需要根据实际情况进行权衡。
- RocketMQ支持多线程并发消费消息,可以通过调整消费者端的线程数来控制消费速度。在启动消费者时,可以通过设置
- 消息预取机制 :
- RocketMQ引入了消息预取机制,通过提前拉取一定数量的消息到消费者端,可以减少网络通信的开销,提高消费速率。但预取过多消息也可能导致消费者处理不过来,因此可以通过调整
pullBatchSize参数来控制一次拉取的消息数量,以达到控制消费速率的目的。
- RocketMQ引入了消息预取机制,通过提前拉取一定数量的消息到消费者端,可以减少网络通信的开销,提高消费速率。但预取过多消息也可能导致消费者处理不过来,因此可以通过调整
- 使用Sentinel进行流量控制 :
- Sentinel是阿里巴巴开源的一个流量控制组件,它可以与RocketMQ集成,用于控制消息的生产和消费速率。如果消息生产速度很大,Sentinel可以限制大量的消息,以匀速的形式推送一小部分消息给消费者消费,消费完再推送下一部分,从而确保消费端不会因系统负载过高而出现问题。
- 合理设置BatchSize :
- 在生产者端,可以适当增大
BatchSize,将更多消息批量发送,减少网络round trip,提高发送性能。但请注意,过大的BatchSize可能会增加消息延迟,需要根据实际情况进行调整。
- 在生产者端,可以适当增大
- 增加消费者实例 :
- 在高并发环境下,如果单个消费者实例的处理能力不足,可以通过增加消费者实例来提高整体的处理效率。这样可以将消息分散到多个消费者实例上进行处理,从而降低单个实例的消费压力。
废话短说