Solo  当前访客:1 开始使用


Rocket MQ

例子

生产者
import org.apache.rocketmq.client.exception.MQClientException;  
import org.apache.rocketmq.client.producer.DefaultMQProducer;  
import org.apache.rocketmq.common.message.Message;  
  
public class RocketMQProducer {  
    public static void main(String[] args) throws MQClientException, InterruptedException {  
        // 创建消息生产者,并设置NameServer的地址  
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  
        producer.setNamesrvAddr("127.0.0.1:9876");  
  
        // 启动Producer实例  
        producer.start();  
  
        // 创建消息实例,指定主题Topic、Tag和消息体  
        Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello world".getBytes());  
  
        // 发送消息到一个Broker  
        SendResult sendResult = producer.send(msg);  
  
        // 打印发送结果  
        System.out.printf("%s%n", sendResult);  
  
        // 关闭Producer实例  
        producer.shutdown();  
    }  
}
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import org.apache.rocketmq.client.exception.MQClientException;  
import org.apache.rocketmq.common.message.MessageExt;  
  
import java.util.List;  
  
public class RocketMQConsumer {  
    public static void main(String[] args) throws MQClientException {  
        // 创建消息消费者,并设置NameServer的地址  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");  
        consumer.setNamesrvAddr("127.0.0.1:9876");  
  
        // 订阅主题Topic和Tag  
        consumer.subscribe("TopicTest", "*");  
  
        // 注册回调实现类来处理从Broker拉取回来的消息  
        consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {  
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);  
            // 标记该消息已经被成功消费  
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
        });  
  
        // 启动消费者实例  
        consumer.start();  
  
        System.out.printf("Consumer Started.%n");  
    }  
}

实例

生产者
public void delayOrderProducer(String msgId, String tag, String body,long delayTime) {
		// RocketMQ发布消息
		Producer producer = producer(MQConstant.DELAY_GROUP);
		// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
		producer.start();
		Message msg = new Message(
				MQConstant.DELAY_TOPIC,
				tag,
				body.getBytes()
		);
		msg.setKey(msgId);
		try {
			// 设置消息需要被投递的时间。
			msg.setStartDeliverTime(delayTime);

			SendResult sendResult = producer.send(msg);
			// 同步发送消息,只要不抛异常就是成功。
			if (sendResult != null) {
				System.out.println(
						new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId()
				);
			}
		}catch (Exception e){
			// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
			System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
			e.printStackTrace();
		}
		producer.shutdown();
	}
消费者
	@PostConstruct
	public void init() {
		Consumer consumer = rocketMQ.consumer(MQConstant.MONITOR_ORDER_GROUP);
		consumer.subscribe(MQConstant.ORDER_TOPIC, MQConstant.CUSTOMER_ORDER_TAG, (message, consumeContext) -> {
			logger.info("--------Receive: " + message);
			try {
				OrderEvent event = new OrderEvent("");
				String tag = message.getTag();// 获取消息tag
				event.setTag(tag);
				String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);// 获取消息体
				logger.info("tag:"+tag+"----msgBody:"+msgBody);
				event.setMsg(msgBody);
				context.publishEvent(event);
			} catch (Exception e) {
				logger.debug("----Group:" + MQConstant.MONITOR_ORDER_GROUP + "----Topic:" + MQConstant.ORDER_TOPIC + "----Tag:"
						+ MQConstant.CUSTOMER_ORDER_TAG + "----Error:" + e);
			}
			return Action.CommitMessage;// 消费消息
		});
		consumer.start();
		logger.info("Mall Order Consumer started.");
	}

一些需要注意的配置参数

顺序消息消费失败进行重试前的等待时间、消息消费失败时的最大重试次数

异步消息

消息的发送者无需等待消息接收者(Broker)的处理及返回,甚至无需关心消息是否发送成功。当消息发送者发送消息后,消息将由消息代理接管,消息代理保证消息传递到指定目的地。异步消息主要有两种形式的目的地:队列和主题。队列用于点对点式的消息通信,确保每条消息只有唯一的发送者和接收者;而主题用于发布/订阅式的消息通信,多个消息接收者可以监听同一个主题

同步消息

同步消息传递涉及到等待服务器响应消息的客户端。发送方在发送消息后,必须等待接收方(Broker)处理完该消息后才能继续执行后续操作,这意味着发送方和接收方需要同步协调,保持一定的顺序和时序。同步消息的优点是简单易用,但缺点是在消息响应期间发送方处于阻塞状态,无法进行其他操作,这可能会导致性能问题。

延时消息

当消息到达Broker后,Broker会根据设置的延时级别将消息放入特定的延迟队列中,等待指定的时间后再投递给消费者

使用场景:在订单支付时,设置30分钟的后再投递给消费者,消费者在30分钟后收到消息后去查看订单状态,如果订单未支付,则取消订单。

定时消息

生产者发送普通消息后,可以通过控制台或代码向Broker设置定时规则,Broker会在规则触发时将消息投递给消费者

问题

分布式环境中的消息消费

多个微服务实例的时候,意味着会有多个相同组名的消费者。

消息队列会根据服务治理中的服务信息进行负载均衡,不会将消息发送给所有的微服务实例

微服务中同组消费者的数量

在代码层次建议创建一次消费者的对象,否则会出现消息重复消费的问题,且要做好消费的幂等性

消息的持久化

RocketMQ通过文件系统来实现消息的持久化。当消息被发送到Broker时,它们会被刷盘(写入磁盘)以确保即使在Broker重启或故障的情况下,消息也不会丢失。RocketMQ支持异步刷盘和同步刷盘两种模式,可以根据具体需求进行选择。异步刷盘性能较高,但可能会有一定的数据丢失风险;同步刷盘则能够保证每条消息都被成功写入磁盘。

消息的可靠传输

RocketMQ提供了多种发送消息的方式,包括同步发送、异步发送和单向发送。其中,同步发送是最可靠的方式,因为它会阻塞等待Broker的确认结果。如果发送失败,客户端可以收到明确的错误响应,从而进行重试或其他错误处理。异步发送和单向发送在性能上可能更优,但可靠性相对较低。此外,RocketMQ还提供了消息确认机制,确保消息在消费端被成功处理。

消息的顺序消费

RocketMQ通过消息队列和消费者的负载均衡机制来保证消息的顺序消费。在发送消息时,可以将具有顺序依赖的消息发送到同一个队列中。然后,在消费端,可以确保同一个队列中的消息按照发送的顺序被消费。RocketMQ的负载均衡模块会根据消费者的数量和队列的数量进行智能分配,以实现高效且有序的消息消费。

事务消息

RocketMQ支持分布式事务消息,确保本地事务与发送消息的原子性。它采用了两阶段提交的思想,即首先执行本地事务,然后发送一个半事务消息到Broker。如果本地事务执行成功,则提交该半事务消息;如果失败,则回滚该消息。同时,RocketMQ还提供了事务结果回查机制,用于解决消息发送失败、客户端宕机等极端场景。在消费端,RocketMQ通过ack机制保证已持久化的消息至少被成功消费一次,从而确保消费端的事务一致性。

ack反馈

当消费者从消息队列中接收到消息并处理完成后,它会向消息队列发送一个ACK(Acknowledgment,确认)反馈。这个反馈告诉消息队列该消息已经被成功处理,消息队列在收到ACK反馈后,会将该消息从队列中删除;

否则就会引起重复消费,设置在集群环境中会引起其他实例的消费者进行消费

消费速度
  1. 限制消息消费速率
    • 在消费者端,可以设置consumeMessageBatchMaxSize参数,该参数用于限制每次消费的消息数量。通过合理设置这个参数,可以有效地控制消费者的消费速率,防止消费者消费过快。
  2. 并发消费与线程数调整
    • RocketMQ支持多线程并发消费消息,可以通过调整消费者端的线程数来控制消费速度。在启动消费者时,可以通过设置ConsumerConfig类的setConsumeThreadMinsetConsumeThreadMax方法来指定最小和最大线程数。增加线程数可以提高并发度,但过多的线程数也会带来额外的开销,需要根据实际情况进行权衡。
  3. 消息预取机制
    • RocketMQ引入了消息预取机制,通过提前拉取一定数量的消息到消费者端,可以减少网络通信的开销,提高消费速率。但预取过多消息也可能导致消费者处理不过来,因此可以通过调整pullBatchSize参数来控制一次拉取的消息数量,以达到控制消费速率的目的。
  4. 使用Sentinel进行流量控制
    • Sentinel是阿里巴巴开源的一个流量控制组件,它可以与RocketMQ集成,用于控制消息的生产和消费速率。如果消息生产速度很大,Sentinel可以限制大量的消息,以匀速的形式推送一小部分消息给消费者消费,消费完再推送下一部分,从而确保消费端不会因系统负载过高而出现问题。
  5. 合理设置BatchSize
    • 在生产者端,可以适当增大BatchSize,将更多消息批量发送,减少网络round trip,提高发送性能。但请注意,过大的BatchSize可能会增加消息延迟,需要根据实际情况进行调整。
  6. 增加消费者实例
    • 在高并发环境下,如果单个消费者实例的处理能力不足,可以通过增加消费者实例来提高整体的处理效率。这样可以将消息分散到多个消费者实例上进行处理,从而降低单个实例的消费压力。
标签:
新一篇: ApplicationEvent、ApplicationListener和ApplicationContext 旧一篇: ApplicationRunner