最新RocketMq与SpringBoot整合
2019-04-17 00:12:34

最新版的RocketMqSpringBoot2.X进行整合可以利用rocketmq-spring-boot-starter来简化配置,本文采用了最新版的jar包来整合,并且略微做了封装,以便于其他模块引用,适合于多生产者多消费者的情况。

项目依赖

主要用到了rocketmq的包和lombok的包,具体的依赖如下所示:

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>

配置项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
rocketmq.name-server=127.0.0.1:9876
rocketmq.producer.enable=true

rocketmq.producer.group=test-group-e
rocketmq.producer.topic=test-topic-e

rocketmq.producer.group2=test-group-f
rocketmq.producer.topic2=test-topic-f



rocketmq.consume.group=consumer-group-3
rocketmq.consume.topic=test-topic-e

rocketmq.consume.group2=consumer-group-4
rocketmq.consume.topic2=test-topic-f

最重要的是rocketmq.name-serverrocketmq.producer.group这两个属性一定要配置,否则项目无法启动,在封装成中间件的时候rocketmq.producer.group可以随意指定一个,亲测即便指定了也不会在启动的时候就生成这个生产者的group.rocketmq.producer.enable这个是我自己加的,为了让rocketmq能按需加载,因为封装成中间件的话其他模块引入,有的可能用不到,所以bean需要按条件加载。 剩下的group和topic我分别都指定了两个,为了模拟多消费者和多生产者的情况。没有用tag来区分,因为Tag使用不当会引来不必要的麻烦,不同的功能严格按照topic和group来区分。

封装的消息类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.rocket.mq.demo.controller;

import lombok.Data;

import java.io.Serializable;

/**
* @author : zhangwei
* @description : 消息
* @date: 2020-08-20 11:03
*/

@Data
public class RocketMqMessage<T> implements Serializable {
/**
* 消息内容
*/
private T content;


/**
* 消息的key
*/
private String msgKey;

/**
* topic
*/
private String producerTopic;
/**
* group
*/
private String producerGroup;
/**
* tag
*/
private String producerTag;
}

发送消息的公共方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
* @author : zhangwei
* @description : Mq发送消息的类
* @date: 2020-08-21 09:54
*/
@Component
@Slf4j
@ConditionalOnProperty(name = "rocketmq.producer.enable", havingValue = "true")
public class MqSendService {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 发送带tag的消息
*
* @param msg
* @param topic
* @param group
* @param tag
* @author: zhangwei
* @date: 2020/8/21 10:54
* @return: org.apache.rocketmq.client.producer.SendResult
**/
private <T> SendResult send(T msg, String topic, String group, String tag) {
if (StringUtils.isBlank(topic) || StringUtils.isBlank(group)) {
new Throwable("发送方topic或者group不能为空");
}
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
RocketMqMessage message = new RocketMqMessage();
message.setProducerTopic(topic);
message.setProducerGroup(group);
message.setProducerTag(tag);
message.setContent(msg);
message.setMsgKey(uuid);
// 发送消息
Message messageFinal = MessageBuilder.withPayload(message).setHeader("KEYS", uuid).build();
String destination = topic;
if (StringUtils.isNotBlank(tag)) {
destination = topic + ":" + tag;
}
SendResult result = rocketMQTemplate.syncSend(destination, messageFinal);
log.info("成功发送消息,消息内容为:{},返回值为:{}", message, result);
return result;
}

/**
* 发送不带tag的消息
*
* @param msg
* @param topic
* @param group
* @author: zhangwei
* @date: 2020/8/21 10:54
* @return: org.apache.rocketmq.client.producer.SendResult
**/
public <T> SendResult send(T msg, String topic, String group) {
return this.send(msg, topic, group, null);
}


}

多生产者发送消息示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

/**
* @author : zhangwei
* @description : mq消息发送服务
* @date: 2020-08-20 19:11
*/
@Slf4j
@Service
@RestController
public class Producer {

@Value(value = "${rocketmq.producer.topic}")
private String topic;

@Value(value = "${rocketmq.producer.group}")
private String group;

@Autowired
private MqSendService mqSendService;

@GetMapping("/test-rocketmq/sendMsg")
public String testSendMsg() {
List<String> list=new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
mqSendService.send(list,topic,group);
return "send message success";
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;

/**
* @author : zhangwei
* @description : mq消息发送服务
* @date: 2020-08-20 19:11
*/
@Slf4j
@Service
@RestController
public class Producer2 {

@Value(value = "${rocketmq.producer.topic2}")
private String topic;

@Value(value = "${rocketmq.producer.group2}")
private String group;

@Autowired
private MqSendService mqSendService;

@GetMapping("/test-rocketmq/sendMsg2")
public String testSendMsg() {
List<String> list=new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
mqSendService.send(list,topic,group);
return "send message success";
}
}

多消费者消费消息示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


/**
* @author : zhangwei
* @description : ed
* @date: 2020-08-20 16:29
*/
@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic}", consumerGroup = "${rocketmq.consume.group}")
public class Consumer implements RocketMQListener<RocketMqMessage> {

@Override
public void onMessage(RocketMqMessage message) {
log.info("======我收到了消息,消息内容为:{}",message);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.rocket.mq.demo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;


/**
* @author : zhangwei
* @description : ed
* @date: 2020-08-20 16:29
*/
@Slf4j
@Component
// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
@RocketMQMessageListener(nameServer = "${rocketmq.name-server}", topic = "${rocketmq.consume.topic2}", consumerGroup = "${rocketmq.consume.group2}")
public class Consumer2 implements RocketMQListener<RocketMqMessage> {

@Override
public void onMessage(RocketMqMessage message) {
log.info("======我收到了消息,消息内容为:{}",message);
}
}

镜像地址

https://www.cnblogs.com/coderzhw/p/13589737.html

pay