admin 1 tahun lalu
induk
melakukan
1305cfef71

+ 65 - 0
rocketMQSpringBoot/pom.xml

@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>rocketMQSpringBoot</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.2.2.RELEASE</version>
+    </parent>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>2.0.7</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <version>2.2.3</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.8</source>
+                    <target>1.8</target>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 13 - 0
rocketMQSpringBoot/src/main/java/org/example/consumer/ConsumerApplication.java

@@ -0,0 +1,13 @@
+package org.example.consumer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+@Slf4j
+public class ConsumerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ConsumerApplication.class, args);
+    }
+}

+ 32 - 0
rocketMQSpringBoot/src/main/java/org/example/consumer/HeroConsumer.java

@@ -0,0 +1,32 @@
+package org.example.consumer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Service;
+
+@Service
+@Slf4j
+@RocketMQMessageListener(topic = "kingsTopic", consumerGroup = "hero",selectorExpression = "shooter")
+public class HeroConsumer implements RocketMQListener<String> {
+
+    // 监听到消息就会执行此方法
+    @Override
+    public void onMessage(String message) {
+        log.info(String.valueOf(111));
+        log.info(message);
+    }
+
+    // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以明确指定类型(建议还是指定类型较方便)
+    @Service
+    @RocketMQMessageListener(topic = "kingsTopic", consumerGroup = "hero")
+    public class Consumer implements RocketMQListener<MessageExt> {
+        @Override
+        public void onMessage(MessageExt messageExt) {
+            byte[] body = messageExt.getBody();
+            String msg = new String(body);
+            log.info("监听到消息:msg={}", msg);
+        }
+    }
+}

+ 8 - 0
rocketMQSpringBoot/src/main/java/org/example/producer/ExtRocketMQTemplate.java

@@ -0,0 +1,8 @@
+package org.example.producer;
+
+import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+
+@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmq.name-server}")
+public class ExtRocketMQTemplate extends RocketMQTemplate {
+}

+ 12 - 0
rocketMQSpringBoot/src/main/java/org/example/producer/ProducerApplication.java

@@ -0,0 +1,12 @@
+package org.example.producer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+@Slf4j
+public class ProducerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(ProducerApplication.class, args);}
+}

+ 215 - 0
rocketMQSpringBoot/src/main/java/org/example/producer/ProducerController.java

@@ -0,0 +1,215 @@
+package org.example.producer;
+
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.example.producer.dto.HeroDTO;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+@RestController
+@Slf4j
+public class ProducerController {
+
+    @Resource(name = "extRocketMQTemplate")
+    private RocketMQTemplate extRocketMQTemplate;
+
+    /**
+     *发送单向消息
+     */
+    @GetMapping("/send1")
+    public void sendOneWay(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("jialuo");
+        heroDTO.setName("伽罗");
+        // 设置消息KEYS,一般是数据的唯一ID,主要用于在仪表盘中方便搜索
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+        // 给消息打上射手的标签。主题+tag,中间用“:”分隔,主要是用于消息的过滤,比如说在消费的时候,只消费ESS标签下的消息
+        extRocketMQTemplate.sendOneWay("kingsTopic".concat(":shooter"), msgs);
+
+        //log.info(transactionSendResult.toString());
+    }
+
+
+    /**
+     *同步发送消息
+     * syncSend方法会阻塞当前线程,直到消息发送完成并收到了消息服务器的响应。如果消息发送成功,
+     * syncSend方法会返回一个SendResult对象,包含了消息的发送状态、消息ID等信息。如果消息发送失败,
+     * syncSend方法会抛出一个MessagingException异常。
+     */
+    @GetMapping("/send2")
+    public void syncSend(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("luban");
+        heroDTO.setName("小鲁班");
+
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+
+        log.info("同步发送消息:");
+        SendResult sendResult = extRocketMQTemplate.syncSend("kingsTopic".concat(":shooter"), msgs);
+
+        log.info(sendResult.toString());
+    }
+
+    /**
+     *异步发送消息
+     * asyncSend方法不会阻塞当前线程,而是在另一个线程中异步发送消息。因此,
+     * asyncSend方法会立即返回,不会等待消息发送完成。如果需要等待消息发送完成并处理发送结果,
+     * 可以使用SendCallback回调接口。
+     */
+    @GetMapping("/send3")
+    public void asyncSend(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("luban");
+        heroDTO.setName("小鲁班");
+
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+
+        log.info("异步发送消息:");
+        extRocketMQTemplate.asyncSend("kingsTopic".concat(":shooter"), msgs, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                log.info(sendResult.toString());
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                log.info(e.getMessage());
+            }
+        });
+    }
+
+    /**
+     *发送单向顺序消息
+     */
+    @GetMapping("/send4")
+    public void sendOneWayOrderly(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("zhangfei");
+        heroDTO.setName("张飞");
+
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+
+        log.info("发送单向顺序消息:");
+        extRocketMQTemplate.sendOneWayOrderly("kingsTopic".concat(":tank"), msgs,heroDTO.getId());
+    }
+
+    /**
+     *同步发送顺序消息
+     */
+    @GetMapping("/send5")
+    public void syncSendOrderly(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("niumo");
+        heroDTO.setName("牛魔");
+
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+
+        log.info("同步发送顺序消息:");
+        SendResult sendResult = extRocketMQTemplate.syncSendOrderly("kingsTopic".concat(":tank"), msgs, heroDTO.getId());
+        log.info(sendResult.toString());
+    }
+
+    /**
+     *异步发送顺序消息
+     */
+    @GetMapping("/send6")
+    public void asyncSendOrderly(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("xiangyu");
+        heroDTO.setName("项羽");
+
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+
+        log.info("异步发送顺序消息:");
+        extRocketMQTemplate.asyncSendOrderly("kingsTopic".concat(":tank"), msgs, heroDTO.getId(), new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                log.info(sendResult.toString());
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                log.info(e.getMessage());
+            }
+        });
+    }
+
+    /**
+     *发送延迟消息-任务超时处理
+     */
+    @GetMapping("/send7")
+    public void syncSendDelayTimeSeconds(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("baiqi");
+        heroDTO.setName("白起");
+
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+
+        log.info("发送延迟消息-任务超时处理:");
+        //10秒后才能消费这条消息
+        SendResult sendResult = extRocketMQTemplate.syncSendDelayTimeSeconds("kingsTopic".concat(":tank"), msgs, 10L);
+        //10毫秒后才能消费这条消息
+        //extRocketMQTemplate.syncSendDelayTimeMills("kingsTopic".concat(":tank"), msgs, 10L);
+        log.info(sendResult.toString());
+    }
+
+    /**
+     *发送延迟消息-定时处理
+     */
+    @GetMapping("/send8")
+    public void syncSendDeliverTimeMills(){
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("liubang");
+        heroDTO.setName("刘邦");
+
+        Message<String> msgs = MessageBuilder.withPayload(JSON.toJSONString(heroDTO)).setHeader("KEYS", heroDTO.getId()).build();
+
+        //每天凌晨处理
+        long time = LocalDate.now().atStartOfDay().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+        log.info("发送延迟消息-定时处理:每天凌晨处理");
+        SendResult sendResult = extRocketMQTemplate.syncSendDeliverTimeMills("kingsTopic".concat(":tank"), msgs, time);
+
+        log.info(sendResult.toString());
+    }
+
+    /**
+     *批量发送
+     */
+    @GetMapping("/send9")
+    public void syncSendBatchMessage() throws InterruptedException {
+        List<HeroDTO> heroList = new ArrayList<>();
+        HeroDTO heroDTO = new HeroDTO();
+        heroDTO.setId("geya");
+        heroDTO.setName("戈娅");
+        heroList.add(heroDTO);
+
+        HeroDTO heroDTO1 = new HeroDTO();
+        heroDTO.setId("direnjie");
+        heroDTO.setName("狄仁杰");
+        heroList.add(heroDTO1);
+
+        List<Message> msgs = new ArrayList<Message>();
+        for (HeroDTO hero : heroList){
+            Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(heroDTO))
+                    .setHeader("KEYS", heroDTO.getId())
+                    .build();
+            msgs.add(message);
+        }
+        log.info("批量发送:");
+        SendResult sendResult = extRocketMQTemplate.syncSend("kingsTopic".concat(":shooter"), msgs);
+        log.info(sendResult.toString());
+    }
+
+
+}

+ 76 - 0
rocketMQSpringBoot/src/main/java/org/example/producer/TransactionListenerImpl.java

@@ -0,0 +1,76 @@
+package org.example.producer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
+import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
+import org.apache.rocketmq.spring.support.RocketMQHeaders;
+import org.springframework.messaging.Message;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@RocketMQTransactionListener(rocketMQTemplateBeanName = "extRocketMQTemplate")
+@Slf4j
+public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
+    private AtomicInteger transactionIndex = new AtomicInteger(0);
+
+    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
+
+    /**
+     *执行事务
+     */
+    @Override
+    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
+        //事务ID
+        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
+
+        int value = transactionIndex.getAndIncrement();
+        int status = value % 3;
+        assert transId != null;
+        localTrans.put(transId, status);
+
+        if (status == 0) {
+            log.info("success");
+            //成功,提交事务
+            return RocketMQLocalTransactionState.COMMIT;
+        }
+
+        if (status == 1) {
+            log.info("failure");
+            //失败,回滚事务
+            return RocketMQLocalTransactionState.ROLLBACK;
+        }
+
+        log.info("unknown");
+        //中间状态
+        return RocketMQLocalTransactionState.UNKNOWN;
+    }
+
+    /**
+     *检查事务状态
+     */
+    @Override
+    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
+        String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
+        RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
+        Integer status = localTrans.get(transId);
+        if (null != status) {
+            switch (status) {
+                case 0:
+                    retState = RocketMQLocalTransactionState.COMMIT;
+                    break;
+                case 1:
+                    retState = RocketMQLocalTransactionState.ROLLBACK;
+                    break;
+                case 2:
+                    retState = RocketMQLocalTransactionState.UNKNOWN;
+                    break;
+
+                default: break;
+            }
+        }
+        log.info("msgTransactionId:{},TransactionState:{},status:{}",transId,retState,status);
+        return retState;
+    }
+}

+ 9 - 0
rocketMQSpringBoot/src/main/java/org/example/producer/dto/HeroDTO.java

@@ -0,0 +1,9 @@
+package org.example.producer.dto;
+
+import lombok.Data;
+
+@Data
+public class HeroDTO {
+    private String id;
+    private String name;
+}

+ 12 - 0
rocketMQSpringBoot/src/main/java/org/example/producer/dto/StudentDTO.java

@@ -0,0 +1,12 @@
+package org.example.producer.dto;
+
+import lombok.Data;
+
+@Data
+public class StudentDTO {
+    private String id;
+
+    private String name;
+
+    private Integer age;
+}

+ 52 - 0
rocketMQSpringBoot/src/main/resources/application.yml

@@ -0,0 +1,52 @@
+
+spring:
+  profiles: dev
+
+server:
+  port: 9001
+
+
+rocketmq:
+  # 服务地址,多个用逗号分开
+  name-server: 127.0.0.1:9876
+  producer:
+    # 发送消息超时时间,默认3000
+    send-message-timeout: 30000
+    # 生产者组
+    group: group1
+    # 发送消息失败重试次数,默认2
+    retryTimesWhenSendFailed: 2
+    # 异步消息重试此处,默认2
+    retryTimesWhenSendAsyncFailed: 2
+    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
+    maxMessageSize: 4096
+    # 压缩消息阈值,默认4k(1024 * 4)
+    compressMessageBodyThreshold: 4096
+    # 是否在内部发送失败时重试另一个broker,默认false
+    retryNextServer: false
+---
+spring:
+  profiles: test
+
+server:
+  port: 9002
+
+
+rocketmq:
+  # 服务地址,多个用逗号分开
+  name-server: 127.0.0.1:9876
+  producer:
+    # 发送消息超时时间,默认3000
+    send-message-timeout: 30000
+    # 生产者组
+    group: group1
+    # 发送消息失败重试次数,默认2
+    retryTimesWhenSendFailed: 2
+    # 异步消息重试此处,默认2
+    retryTimesWhenSendAsyncFailed: 2
+    # 消息最大长度,默认1024 * 1024 * 4(默认4M)
+    maxMessageSize: 4096
+    # 压缩消息阈值,默认4k(1024 * 4)
+    compressMessageBodyThreshold: 4096
+    # 是否在内部发送失败时重试另一个broker,默认false
+    retryNextServer: false

+ 91 - 0
rocketMQSpringBoot/src/test/java/producer/ProducerNormalMessageTest.java

@@ -0,0 +1,91 @@
+package producer;
+
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.example.producer.ProducerApplication;
+import org.example.producer.dto.StudentDTO;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+
+import javax.annotation.Resource;
+
+@SpringBootTest(classes = ProducerApplication.class)
+@Slf4j
+public class ProducerNormalMessageTest {
+
+    @Resource(name = "extRocketMQTemplate")
+    private RocketMQTemplate extRocketMQTemplate;
+
+    /**
+     *发送单向消息
+     */
+    @Test
+    public void sendOneWay(){
+        StudentDTO studentDTO = new StudentDTO();
+        studentDTO.setId("T001");
+        studentDTO.setName("丽莎");
+        studentDTO.setAge(12);
+
+        Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(studentDTO)).setHeader("KEYS", studentDTO.getId()).build();
+        extRocketMQTemplate.sendOneWay("studentTopic".concat(":ESS"),message);
+    }
+
+    @Test
+    public void sendOneWayOrderly(){
+        StudentDTO studentDTO = new StudentDTO();
+        studentDTO.setId("T005");
+        studentDTO.setName("里斯");
+        studentDTO.setAge(12);
+
+        Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(studentDTO)).setHeader("KEYS", studentDTO.getId()).build();
+        extRocketMQTemplate.sendOneWayOrderly("studentTopic".concat(":ESS"), message,studentDTO.getId());
+    }
+
+    @Test
+    public void syncSendOrderly(){
+        for (int i=0; i<20; i++){
+            Message<String> message = MessageBuilder.withPayload("订单".concat("Orderder:"+i)).setHeader("KEYS", i).build();
+            SendResult sendResult = extRocketMQTemplate.syncSendOrderly("order".concat(":OR"), message, String.valueOf(i));
+            log.info(sendResult.toString());
+        }
+    }
+
+
+    @Test
+    public void syncSend(){
+        StudentDTO studentDTO = new StudentDTO();
+        studentDTO.setId("T002");
+        studentDTO.setName("张三丰");
+        studentDTO.setAge(11);
+
+        Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(studentDTO)).setHeader("KEYS", studentDTO.getId()).build();
+        SendResult sendResult = extRocketMQTemplate.syncSend("studentTopic".concat(":ESS"), message);
+        log.info(sendResult.toString());
+    }
+
+    @Test
+    public void asyncSend(){
+        StudentDTO studentDTO = new StudentDTO();
+        studentDTO.setId("T003");
+        studentDTO.setName("戴莉");
+        studentDTO.setAge(11);
+
+        Message<String> message = MessageBuilder.withPayload(JSON.toJSONString(studentDTO)).setHeader("KEYS", studentDTO.getId()).build();
+        extRocketMQTemplate.asyncSend("studentTopic".concat(":ESS"), message, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                log.info(sendResult.toString());
+            }
+
+            @Override
+            public void onException(Throwable throwable) {
+                log.info(throwable.getMessage());
+            }
+        });
+    }
+}