最新版的RocketMq
与SpringBoot2.X
进行整合可以利用rocketmq-spring-boot-starter
来简化配置,本文采用了最新版的jar包来整合,并且略微做了封装,以便于其他模块引用,适合于多生产者多消费者的情况。
项目依赖
主要用到了rocketmq的包和lombok的包,具体的依赖如下所示:
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> </dependency>
|
配置项
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| rocketmq.name-server=127.0.0.1:9876 rocketmq.producer.enable=true
rocketmq.producer.group=test-group-e rocketmq.producer.topic=test-topic-e
rocketmq.producer.group2=test-group-f rocketmq.producer.topic2=test-topic-f
rocketmq.consume.group=consumer-group-3 rocketmq.consume.topic=test-topic-e
rocketmq.consume.group2=consumer-group-4 rocketmq.consume.topic2=test-topic-f
|
最重要的是rocketmq.name-server
和rocketmq.producer.group
这两个属性一定要配置,否则项目无法启动,在封装成中间件的时候rocketmq.producer.group
可以随意指定一个,亲测即便指定了也不会在启动的时候就生成这个生产者的group.rocketmq.producer.enable
这个是我自己加的,为了让rocketmq能按需加载,因为封装成中间件的话其他模块引入,有的可能用不到,所以bean需要按条件加载。 剩下的group和topic我分别都指定了两个,为了模拟多消费者和多生产者的情况。没有用tag来区分,因为Tag使用不当会引来不必要的麻烦,不同的功能严格按照topic和group来区分。
封装的消息类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.rocket.mq.demo.controller;
import lombok.Data;
import java.io.Serializable;
@Data public class RocketMqMessage<T> implements Serializable {
private T content;
private String msgKey;
private String producerTopic;
private String producerGroup;
private String producerTag; }
|
发送消息的公共方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| package com.rocket.mq.demo.controller;
import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;
import java.util.UUID;
@Component @Slf4j @ConditionalOnProperty(name = "rocketmq.producer.enable", havingValue = "true") public class MqSendService {
@Autowired private RocketMQTemplate rocketMQTemplate;
private <T> SendResult send(T msg, String topic, String group, String tag) { if (StringUtils.isBlank(topic) || StringUtils.isBlank(group)) { new Throwable("发送方topic或者group不能为空"); } String uuid = UUID.randomUUID().toString().replaceAll("-", ""); RocketMqMessage message = new RocketMqMessage(); message.setProducerTopic(topic); message.setProducerGroup(group); message.setProducerTag(tag); message.setContent(msg); message.setMsgKey(uuid); Message messageFinal = MessageBuilder.withPayload(message).setHeader("KEYS", uuid).build(); String destination = topic; if (StringUtils.isNotBlank(tag)) { destination = topic + ":" + tag; } SendResult result = rocketMQTemplate.syncSend(destination, messageFinal); log.info("成功发送消息,消息内容为:{},返回值为:{}", message, result); return result; }
public <T> SendResult send(T msg, String topic, String group) { return this.send(msg, topic, group, null); }
}
|
多生产者发送消息示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.rocket.mq.demo.controller;
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList; import java.util.List;
@Slf4j @Service @RestController public class Producer {
@Value(value = "${rocketmq.producer.topic}") private String topic;
@Value(value = "${rocketmq.producer.group}") private String group;
@Autowired private MqSendService mqSendService;
@GetMapping("/test-rocketmq/sendMsg") public String testSendMsg() { List<String> list=new ArrayList<>(); list.add("1"); list.add("2"); list.add("3"); mqSendService.send(list,topic,group); return "send message success"; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| package com.rocket.mq.demo.controller;
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList; import java.util.List;
@Slf4j @Service @RestController public class Producer2 {
@Value(value = "${rocketmq.producer.topic2}") private String topic;
@Value(value = "${rocketmq.producer.group2}") private String group;
@Autowired private MqSendService mqSendService;
@GetMapping("/test-rocketmq/sendMsg2") public String testSendMsg() { List<String> list=new ArrayList<>(); list.add("1"); list.add("2"); list.add("3"); mqSendService.send(list,topic,group); return "send message success"; } }
|
多消费者消费消息示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package com.rocket.mq.demo.controller;
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Slf4j @Component
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic}", consumerGroup = "${rocketmq.consume.group}") public class Consumer implements RocketMQListener<RocketMqMessage> {
@Override public void onMessage(RocketMqMessage message) { log.info("======我收到了消息,消息内容为:{}",message); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package com.rocket.mq.demo.controller;
import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;
@Slf4j @Component
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic2}", consumerGroup = "${rocketmq.consume.group2}") public class Consumer2 implements RocketMQListener<RocketMqMessage> {
@Override public void onMessage(RocketMqMessage message) { log.info("======我收到了消息,消息内容为:{}",message); } }
|
镜像地址
https://www.cnblogs.com/coderzhw/p/13589737.html