Integrate RocketMQ 5.0 client with Spring - 记录本次开发的思考与实践
前言 RocketMQ-Spring 目前已经支持了4.x Remoting SDK,我们还需要支持RocketMQ 5.0 gRPC SDK,即是对 rocketmq-clients-SDK 进行封装和整合,最终以 rocketmq-spring-v5-starter 的形式呈现给用户。
Spring 中的消息框架 Spring Messaging 是Spring Framework 4中添加的模块,是Spring与消息系统集成的一个扩展性的支持。它实现了从基于JmsTemplate的简单的使用JMS接口到异步接收消息的一整套完整的基础架构,Spring AMQP提供了该协议所要求的类似的功能集。 在与Spring Boot的集成后,它拥有了自动配置能力,能够在测试和运行时与相应的消息传递系统进行集成。 单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的Spring实现:在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean ,结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口 (实现方式通常会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口同样可以使用Spring Boot的自动化选项和一些定制化的属性。参考 Spring Messaging 的既有实现,RocketMQ的 spring-boot-starter 中遵循了相关的设计模式并结合 RocketMQ 自身的功能特点提供了相应的API (如,顺序,异步和事务半消息等)。
模块与适配 按照 Spring Boot 的规范,原有的 4.X SDK 划分为四个子模块,我们 v5 SDK 也将延用分成四个子模块的方式完成开发,在这一点上,会参考 4.X 的模块设计。
rocketmq-v5-client-spring-boot-parent (父pom文件,定义相关的依赖管理和Plugin,供其它几个模块引用)
rocketmq-v5-client-spring-boot (定义 auto-configuration 实现,具体RocketMQ相关的自动配置和 Bean 创建代码都集中在这里)
rocketmq-v5-client-spring-starter (将 rocketmq-v5-client-spring-boot 和其它的依赖打包生成全量的依赖,用户引用它即可完成所有 rocketmq-spring 的客户端操作)
rocketmq-v5-client-spring-samples (使用示例,展示如何使用 spring-boot 方式发送和消费消息)
为了更方便理解模块之间的依赖关系,我做了一个模块依赖图,图示如下,其中rocketmq-grpc-spring-boot-parent 等含有 grpc 写法皆代表 v5 模块,就不再更改了。 可以看到,我们在rocketmq-spring-all项目顶级模块之下,又开发了rocketmq-grpc-spring-boot-parent 新模块,他是我们其他 maven 子模块的父模块。所有的子模块都继承于父模块,项目中所有要使用到的 jar 包的版本都集中由父工程管理。 rocketmq-grpc-spring-boot-starter 会发布到maven仓库中,用户需要使用时只需要引入这个 starter 即可使用。
设计思路
添加maven依赖 项目需在 pom.xml 文件中引入SDK5.0 依赖 rocketmq-client-java 依赖,并且设置 rocketmq-client-java-version 最新版本
1 2 3 4 5 <dependency > <groupId > org.apache.rocketmq</groupId > <artifactId > rocketmq-client-java</artifactId > <version > ${rocketmq-client-java-version}</version > </dependency >
适配spring-boot自动化配置
支持适配新客户端实体 目前 rocketmq-spring 中基于 4.x SDK 实现的自动配置类主要是两个bean:
1 2 public static final String PRODUCER_BEAN_NAME = "defaultMQProducer" ;public static final String CONSUMER_BEAN_NAME = "defaultLitePullConsumer" ;
引入 v5-sdk 之后,在 template 中需要加入 IOC 容器进行管理的有以下几个bean:
1 2 3 4 private ProducerBuilder producerBuilder;private SimpleConsumerBuilder simpleConsumerBuilder;private Producer producer;private SimpleConsumer simpleConsumer;
在确定好我们需要进行控制反转注入ioc容器的bean有哪些后,就要考虑如何将他们交给ioc容器管理。按照目前rocketmq-spring给出的@Configuration+@Bean注解的方式,我们同样可以利用这些注解来进行对producer、pushConsumer和simpleConsumer进行管理。 但是 PushConsumer 需要交给 DefaultListenerContainer 容器进行管理,而不是放到 RocketMQGRpcTemplate容器中。这里为什么要让 IOC 容器管理额外的 builder 对象(像ProducerBuilder、SimpleConsumerBuilder 以及DefaultListenerContainer容器中的 PushConsumerBuilder),⽽不是只管理调⽤build()⽅法之后构建的3个真正的⽣产消费者实体bean, 乃是基于以下的考量: 我们在RocketMQAutoConfiguration这个⾃动配置类中,需要构造producer和consumer,但是此时我 们的producer尚不是⼀个完整的producer,因为如果⽤户要发送事务消息,是需要设置⼀个 TransactionChecker.
1 ProducerBuilder setTransactionChecker (TransactionChecker checker) ;
也就是说,如果我们在 RocketMQAutoConfiguration ⾥⾯构建bean时,就调⽤build⽅法去构建⼀个完整的 producer 对象,那么⽤户在使⽤我们的核⼼类 RocketmqTemplate 使⽤⽣产者发送事务消息时就只 能传⼊⼀个 TransactionChecker 的实现类作为参数去设置Producer的transactionChecker 属性,⽽ Producer ⼀旦被 build 构建,我们是⽆法再去修改的,所以在 RocketMQAutoConfiguration ⾥⾯构建 bean 时只能构建⼀个ProducerBuilder,等待⽤户实现的 TransactionChecker 参数设置进去。 同理的,我们的pushConsumer也是需要⼀个回调函数去处理 MessageView 的逻辑,所以这⾥也是需要先构建好⼀个builder,最后等到所有参数注⼊完成后才可以再去调⽤build⽅法⽣成⼀个真正的实例。
装配Producer 我们需要实现对 5.0SDK 中的Producer进⾏管理: (1)引⼊Producer属性的配置类 我们通过在RocketMQProperties类中定义我们的Producer实例引⼊的配置⽂件的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static class Producer { private String accessKey; private String secretKey; private String endpoints; private String topic; }
代码实现 首先我们需要对 5.0 SDK 中的 producer 进行管理,通过 RocketMQProperties 引入的参数直接对 producer 进行构造。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 @Bean(PRODUCER_BUILDER_BEAN_NAME) @ConditionalOnMissingBean(ProducerBuilderImpl.class) @ConditionalOnProperty(prefix = "rocketmq", value = {"producer.endpoints"}) public ProducerBuilder producerBuilder (RocketMQProperties rocketMQProperties) { RocketMQProperties.Producer rocketMQProducer = rocketMQProperties.getProducer(); log.info("Init Producer Args: " + rocketMQProducer); String topic = rocketMQProducer.getTopic(); String endPoints = rocketMQProducer.getEndpoints(); Assert.hasText(endPoints, "[rocketmq.producer.endpoints] must not be null" ); ClientConfiguration clientConfiguration = RocketMQUtil.createProducerClientConfiguration(rocketMQProducer); final ClientServiceProvider provider = ClientServiceProvider.loadService(); ProducerBuilder producerBuilder; producerBuilder = provider.newProducerBuilder() .setClientConfiguration(clientConfiguration) .setTopics(rocketMQProducer.getTopic()) .setMaxAttempts(rocketMQProducer.getMaxAttempts()); log.info(String.format("a producer init on proxy %s" , endPoints)); return producerBuilder; }
可以看到,由于需要⽀持 v5 版本的 producer,与原有 4.x 的内部类 Producer 不同,换新了以下字段:
Endpoints字段:我们知道endpoints是接⼊点地址,需要设置成Proxy的地址和端⼝列表。RocketMQ Proxy的出现,解决了4.X版本客户端多语⾔客户端实现Remoing协议难度⼤、复杂、功能不⼀致、维护⼯作⼤的问题。使⽤业界熟悉的gRPC协议, 各个语⾔代码统⼀、简单,使得多语⾔使⽤RocketMQ更⽅便容易。
Topic字段:在4.x SDK的Producer中,我们通常在messge中设置topic,⽽不在producer中直接设置topic。但是在5.0 SDK中,我们既可以在producer,也可以在message中指定topic。这样做的好处是,⽣产者能够在发送消息之前预先获取该主题的路由信息,并缓存到本地。这样,当⽣产者要发送消息时,它就可以直接从本地缓存中获取路由信息,避免了每次发送消息都需要进⾏路由查找的问题,从⽽提升了发送消息的效率和性能。
(2)加⼊配置属性注解在RocketMQProperties上加⼊注解@ConfigurationProperties(prefix = “rocketmq”),能够引⼊配置⽂件中的⽤户⾃定义属性。
1 2 3 4 @ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { }
(3)注⼊bean 我们需要在 RocketMQAutoConfiguration 中配置交给 IOC 容器管理的 bean,这⾥的 bean 只能是⼀个 ProducerBuilder,⽽不是 Producer,具体原因在上⽂已经阐述。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Bean(PUSH_CONSUMER_BEAN_NAME) @ConditionalOnMissingBean(PushConsumerBuilder.class) @ConditionalOnProperty(name = "rocketmq.sdk-version", havingValue = "grpc") public PushConsumerBuilder pushConsumerBuilder (RocketMQProperties rocketMQProperties) { RocketMQProperties.PushConsumer rocketMQPushConsumer = rocketMQProperties.getConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider (rocketMQPushConsumer.getAccessKey(), rocketMQPushConsumer.getSecretKey()); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(rocketMQPushConsumer.getEndpoints()) .setCredentialProvider(sessionCredentialsProvider) .build(); FilterExpression filterExpression = new FilterExpression (rocketMQPushConsumer.getTag(), FilterExpressionType.TAG); final PushConsumerBuilder builderImpl; builderImpl = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(rocketMQPushConsumer.getGroup()) .setSubscriptionExpressions(Collections.singletonMap(rocketMQPushConsumer.getTopic(), filterExpression)); return builderImpl; }
(4)配置加载执⾏的先后顺序 我们可以看到在RocketMQAutoConfiguration这个类上有以下注解,表明RocketMQAutoConfiguration 需要在TransactionConfiguration之前被加载和执⾏
1 @AutoConfigureBefore({TransactionConfiguration.class})
(5)引⼊事务处理注解 我们也需要⼀个⾃定义的 TransactionConfiguration 配置类,⽤来注⼊我们刚刚所说的,⽣产者所需要的 TransactionChecker 来执⾏回调函数。⾸先我们实现SmartInitializingSingleton 接⼝,在bean初始化完成后执⾏⼀些初始化操作:获取被@RocketMQTransactionListener 标记的类,这个类是需要⽤户实现 TransactionChecker 接⼝的,handleTransactionChecker ⽅法会⾃动将这个实现类给注⼊到 rocketmqTemplate 管理的 ProducerBuilder 对象中。 这⾥的 template 是根据 @RocketMQTransactionListener注解中的rocketMQTemplateBeanName 值来从容器上下⽂中提取的,template 是可拓展的,也就是我们所说的 @ExtTemplateConfiguration
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Configuration public class TransactionConfiguration implements ApplicationContextAware , SmartInitializingSingleton { private ConfigurableApplicationContext applicationContext; @Override public void setApplicationContext (ApplicationContext applicationContex t) throws BeansException { this .applicationContext = (ConfigurableApplicationContext) applica tionContext; } @Override public void afterSingletonsInstantiated () { Map<String, Object> beans = this .applicationContext.getBeansWithAn notation (TransactionListener.class) .entrySet().stream().filter(entry -> !ScopedProxyUtils.isS copedTarget (entry.getKey() )) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::ge tValue)); beans.forEach(this ::handleTransactionChecker); } public void handleTransactionChecker (String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!TransactionChecker.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException (clazz + " is not instance of " + TransactionChecker.class.getName()); } TransactionListener annotation = clazz.getAnnotation(TransactionLi stener.class); if (Objects.isNull(annotation)) { throw new IllegalStateException ("The transactionListener annot ation is missing" ); } RocketMQGRpcTemplate rocketMQTemplate = (RocketMQGRpcTemplate) app licationContext.getBean(annotation.rocketMQTemplateBeanName()); if ((rocketMQTemplate.getProducerBuilder()) != null ) { rocketMQTemplate.getProducerBuilder().setTransactionChecker((T ransactionChecker) bean); } } }
(6)⽣产者拓展的实现 我们可以⽤⼀个 @ExtTemplateConfiguration 注解来拓展我们的rocketmqTemplate,当⽤户需要多个不同的⽣产者时,可以使⽤这个注解来拓展装配不同的⽣产者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface ExtTemplateConfiguration { String value () default "" ; String accessKey () default "${rocketmq.producer.accessKey:}" ; String secretKey () default "${rocketmq.producer.secretKey:}" ; String endpoints () default "${rocketmq.producer.endpoints:}" ; String topic () default "${rocketmq.producer.topic:}" ; }
(7)容器拓展类的注册 对于这个拓展 ExtRocketmqTemplate 的装配,我们可以在⾃定义配置类 ExtProducerResetConfiguration 中构造。与上⽂中 TransactionConfiguration 配置类似的,⾸先获取 @ExtTemplateConfiguration 注解下的所有 bean,然后获取@ExtTemplateConfiguration 注解的所有拓展属性值,装配出⼀个 ProducerBuilder并注⼊到⾃定义的拓展 ExtRocketmqTemplate 中。
新增字段 由于在 rocketmq-spring 的基于 4.x remoting 的源代码中,RocketMQProperties 类中的静态内部类 PushConsumer 是继承 PullConsumer 的,所以我在 PullConsumer 的字段中加入了以下两个属性:Tag属性: 设置消息 Tag,用于消费端根据指定Tag过滤消息EndPoints属性: 同上 Producer 的 Endpoints 介绍:我们知道 endpoints 是接入点地址,需要设置成 Proxy 的地址和端口列表。有了 RocketMQ Proxy 的出现,无疑解决了4.X版本客户端多语言客户端实现 Remoing 协议难度大、复杂、功能不一致、维护工作大的问题。使用业界熟悉的GRPC协议, 各个语言代码统一、简单,使得多语言使用RocketMQ更方便容易。FilterExpressionType属性: 消费者过滤表达式的类型。RocketMQ 支持两种过滤表达式类型:TAG 和 SQL92。如果将 FilterExpressionType 设置为 TAG,则消费者只会消费指定标签的消息TAG 表达式类型比较简单,性能相对较好,适用于只需要简单标签过滤的场景。如果将 FilterExpressionType 设置为 SQL92,则消费者可以使用 SQL92 的语法来过滤消息。SQL92 表达式类型更加灵活,可以支持更复杂的过滤规则,但是由于需要解析 SQL92 语法,所以性能相对较差。我们可以根据实际情况,选择合适的过滤表达式类型可以提高消费者的性能和效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private String tag;private String endpoints;private String filterExpressionType = "tag" ;
装配SimpleConsumer
代码实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Bean(SIMPLE_CONSUMER_BEAN_NAME) @ConditionalOnMissingBean(SimpleConsumer.class) @ConditionalOnExpression("${rocketmq.sdk-version} == 'grpc' && ${rocketmq.pull-consumer.endpoints}") public SimpleConsumer simpleConsumer (RocketMQProperties rocketMQProperties) { RocketMQProperties.PullConsumer rocketMQPullConsumer = rocketMQProperties.getPullConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); SessionCredentialsProvider sessionCredentialsProvider = new StaticSessionCredentialsProvider (rocketMQPullConsumer.getAccessKey(), rocketMQPullConsumer.getSecretKey()); ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(rocketMQPullConsumer.getEndpoints()) .setCredentialProvider(sessionCredentialsProvider) .build(); Duration awaitDuration = Duration.ofSeconds(30 ); FilterExpressionType filterExpression = "tag" .equals(rocketMQPullConsumer.getTag()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92; FilterExpression expression = new FilterExpression (rocketMQPullConsumer.getTag(), filterExpression); SimpleConsumer simpleConsumer; try { simpleConsumer = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(rocketMQPullConsumer.getGroup()) .setAwaitDuration(awaitDuration) .setSubscriptionExpressions(Collections.singletonMap(rocketMQPullConsumer.getTopic(), expression)) .build(); } catch (ClientException e) { throw new RuntimeException (e); } return simpleConsumer; }
新增字段 同PushConsumer
注解控制实体装配 1 @ConditionalOnExpression("${rocketmq.sdk-version} == 'grpc' && ${rocketmq.pull-consumer.endpoints}")
@ConditionalOnExpression注解表示:只有当用户自定义配置属性rocketmq.sdk-version为grpc时 ,并且属性rocketmq.pull-consumer.endpoints不为空 ,才会将这个bean交给ioc容器管理,否则不会加载它。
封装消息发送方法 我们知道,RocketMQ-Spring 是遵循Spring Messaging API 规范的,RocketMQ-Spring 通过实现 RocketMQTemplate 完成消息的发送。 RocketMQTemplate 继承 AbstractMessageSendingTemplate 抽象类,来支持 Spring Messaging API 标准的消息转换和发送方法,这些方法最终会代理给 doSend 方法,doSend 方法会最终调用 syncSend,由 DefaultMQProducer 实现。
remoting SDK消息发送 首先我们先来分析一下基于rocketmq 4.x remoting SDK的消息发送封装类型:
同步消息的发送 1 2 3 4 5 6 private SendResult syncSend (String destination, Message<?> message, long timeout, long delayTime, DelayMode mode) public SendResult syncSend (String destination, Message<?> message, long timeout, int delayLevel) public <T extends Message > SendResult syncSend (String destination, Collection<T> messages, long timeout)
异步消息的发送 1 2 3 4 public void asyncSend (String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) public <T extends Message > void asyncSend (String destination, Collection<T> messages, SendCallback sendCallback, long timeout)
顺序消息的发送 1 2 3 4 5 6 public SendResult syncSendOrderly (String destination, Message<?> message, String hashKey, long timeout, int delayLevel) public <T extends Message > SendResult syncSendOrderly (String destination, Collection<T> messages, String hashKey, long timeout) {public void asyncSendOrderly (String destination, Message<?> message, String hashKey, SendCallback sendCallback, long timeout, int delayLevel)
单向消息的发送 1 2 3 4 public void sendOneWay (String destination, Message<?> message) public void sendOneWayOrderly (String destination, Message<?> message, String hashKey)
事务消息的发送 1 2 public TransactionSendResult sendMessageInTransaction (final String destination, final Message<?> message, final Object arg)
Request-Reply 消息的发送 Request-Reply 消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。
1 2 public <T> T sendAndReceive (String destination, Message<?> message, Type type, String hashKey,long timeout, int delayLevel)
v5 SDK消息发送 基于上述remoting SDK消息的发送,我们需要进一步整合进 v5 SDK到 rocketmq-spring中。 在rocketmq-client-java项目中,我们依据Producer接口下的方法,首先可以粗略地将生产者发送消息的方法归结为以下三类 : (1)Producer发送同步消息:
1 SendReceipt send (Message message) throws ClientException;
(2)Producer发送事务消息:
1 SendReceipt send (Message message, Transaction transaction) throws ClientException;
(3)Producer发送异步消息:
1 CompletableFuture <SendReceipt> sendAsync(Message message);
但是,实际上在rocketmq-client-java中,我们可以发送不同类型的消息,官方文档 有很清楚的记录:
基于rocketmq-client-java 5.0 SDK,我们需要整合的消息发送方法可以具体分为以下几类: (1)Producer发送普通消息(NormalMessage)
1 2 3 4 5 6 7 8 9 10 final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-1c151062f96e" ) .setBody(body) .build(); final SendReceipt sendReceipt = producer.send(message);
(2)Producer发送顺序消息(FIFOMessage) 顺序消息和普通消息的不同之处就在于,需要设置消息组、保证单一生产者 并且串行发送。
1 2 3 4 5 6 7 8 9 10 11 12 final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-1ff69ada8e0e" ) .setMessageGroup("yourMessageGroup0" ) .setBody(body) .build(); final SendReceipt sendReceipt = producer.send(message);
(3)Producer发送延迟消息(DelayMessage) 对message设置触发时间即可,例如示例代码中我们可以对message进行如下处理:
1 2 3 4 5 6 7 8 9 10 11 12 final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-3ee439f945d7" ) .setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()) .setBody(body) .build(); final SendReceipt sendReceipt = producer.send(message);
(4)Producer发送事务消息(TransactionMessage)
1 2 3 4 5 6 7 8 9 10 11 12 final Transaction transaction = producer.beginTransaction(); final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-565ef26f5727" ) .setBody(body) .build(); final SendReceipt sendReceipt = producer.send(message, transaction);
(5)Producer发送异步消息
1 2 3 4 5 6 7 8 9 10 11 final Message message = provider.newMessageBuilder() .setTopic(topic) .setTag(tag) .setKeys("yourMessageKey-0e094a5f9d85" ) .setBody(body) .build(); final CompletableFuture<SendReceipt> future = producer.sendAsync(message);
那么接下来,我们将围绕以上的五种消息,将消息发送整合到现有的 rocketmq-spring 框架中来。我们可以首先做一个简略的基于v5 SDK整合的思维导图:
同步消息的发送 在RocketmqTemplate中我们定义syncSendGrpcMessage方法 ,代码如下: 该方法主要是利用配置文件中的参数之前在ioc容器中管理的bean:grpcProducer,调用它的send方法达到发送同步消息的目的,其中message由spring的message转换为了client的message,作为grpcProducer.syncSendGrpcMessage方法的参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public SendReceipt syncSendGrpcMessage (String destination, Message<?> message, Duration messageDelayTime, String messageGroup) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("send request message failed. destination:{}, message is null " , destination); throw new IllegalArgumentException ("`message` and `message.payload` cannot be null" ); } SendReceipt sendReceipt = null ; try { org.apache.rocketmq.client.apis.message.Message rocketMsg = this .createRocketMqgRPCMessage(destination, message, messageDelayTime, messageGroup); try { sendReceipt = grpcProducer.send(rocketMsg); log.info("Send message successfully, messageId={}" , sendReceipt.getMessageId()); } catch (Throwable t) { log.error("Failed to send message" , t); } grpcProducer.close(); } catch (Exception e) { log.error("send request message failed. destination:{}, message:{} " , destination, message); throw new MessagingException (e.getMessage(), e); } return sendReceipt; }
这个方法与remoting SDK类似,remoting SDK调用了消息转换的方法createRocketMqMessage,而我们这里并不需要一个<org.apache.rocketmq.common.message.Message>类型的消息实体对象,而是需要一个<org.apache.rocketmq.client.apis.message.Message>类型的消息实体对象,所以这里我又自定义了一个构建基于 v5-SDK 的消息构造方法:createRocketMqMessage 。 下面是 createRocketMqMessage 方法具体源码:
1 2 3 4 5 private org.apache.rocketmq.client.apis.message.Message createRocketMQMessage (String destination, Message<?> message, Duration messageDelayTime, String messageGroup) { Message<?> msg = this .doConvert(message.getPayload(), message.getHeaders(), null ); return RocketMQUtil.convertToClientMessage(getMessageConverter(), charset, destination, msg, messageDelayTime, messageGroup); }
这里面调用了 RocketMQUtil 中的静态方法 convertToClientMessage 用来构造一个真正的<org.apache.rocketmq.client.apis.message>消息实例。 下面是 convertToClientMessage 方法代码:
1 2 3 4 5 6 7 8 9 10 11 12 public static org.apache.rocketmq.client.apis.message.Message convertToClientMessage ( MessageConverter messageConverter, String charset, String destination, org.springframework.messaging.Message<?> message, Duration messageDelayTime, String messageGroup) { Object payloadObject = message.getPayload(); byte [] payloads; try { payloads = getPayloadBytes(payloadObject, messageConverter, charset, message); } catch (Exception e) { throw new RuntimeException ("convert to gRPC message failed." , e); } return getAndWrapMessage(destination, message.getHeaders(), payloads, messageDelayTime, messageGroup); }
在获取了消息负载的byte数组后,我们需要封装消息实体了。 下面是方法 getAndWrapMessage 源码: 在这个方法里面,我们首先进行了判空操作解析了 destination 中的 topic 和 tag,取出MessageHeaders 中的属性并放到 message 的 properties 属性中,最后建造者模式调用build 方法构造了我们所需要的 message 类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public static org.apache.rocketmq.client.apis.message.Message getAndWrapMessage ( String destination, MessageHeaders headers, byte [] payloads, Duration messageDelayTime, String messageGroup) { if (payloads == null || payloads.length < 1 ) { return null ; } if (destination == null || destination.length() < 1 ) { return null ; } String[] tempArr = destination.split(":" , 2 ); final ClientServiceProvider provider = ClientServiceProvider.loadService(); org.apache.rocketmq.client.apis.message.MessageBuilder messageBuilder = null ; if (Objects.nonNull(headers) && !headers.isEmpty()) { Object keys = headers.get(RocketMQHeaders.KEYS); if (ObjectUtils.isEmpty(keys)) { keys = headers.get(toRocketHeaderKey(RocketMQHeaders.KEYS)); } messageBuilder = provider.newMessageBuilder() .setTopic(tempArr[0 ]); if (tempArr.length > 1 ) { messageBuilder.setTag(tempArr[1 ]); } if (StringUtils.hasLength(messageGroup)) { messageBuilder.setMessageGroup(messageGroup); } if (!ObjectUtils.isEmpty(keys)) { messageBuilder.setKeys(keys.toString()); } if (Objects.nonNull(messageDelayTime)) { messageBuilder.setDeliveryTimestamp(System.currentTimeMillis() + messageDelayTime.toMillis()); } messageBuilder.setBody(payloads); org.apache.rocketmq.client.apis.message.MessageBuilder builder = messageBuilder; headers.forEach((key, value) -> builder.addProperty(key, String.valueOf(value))); } return messageBuilder.build(); }
到这里我们需要的 message 实体类就构建完成了,之后交给 rocketmq-client-java 原生的send 方法就好了。syncSend(String destination, Message<?> message, Duration messageDelayTime, String messageGroup) 这个方法的参数可以看出,它既支持 Normal消息的发送,同时也支持 Delay 消息(messageDelayTime参数可配置)和 FIFO 消息(messageGroup可配置)的发送。方法重载: 相比于原生客户端需要自己去构建 RocketMQ Message(比如将对象序列化成 byte 数组放入 Message 对象),我们需要让 RocketMQTemplate 可以直接将对象、字符串或者 byte 数组作为参数发送出去【对象序列化操作由 RocketMQ-Spring 内置完成】 我们可以定义如下重载方法: (1)Normal Message
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public SendReceipt syncSendNormalMessage (String destination, Object payload) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, null , null ); } public SendReceipt syncSendNormalMessage (String destination, String payload) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, null , null ); } public SendReceipt syncSendNormalMessage (String destination, Message<?> message) { return syncSendGrpcMessage(destination, message, null , null ); } public SendReceipt syncSendNormalMessage (String destination, byte [] payload) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, null , null ); }
(2)FIFO Message
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public SendReceipt syncSendFifoMessage (String destination, Object payload, String messageGroup) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, null , messageGroup); } public SendReceipt syncSendFifoMessage (String destination, String payload, String messageGroup) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, null , messageGroup); } public SendReceipt syncSendFifoMessage (String destination, byte [] payload, String messageGroup) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, null , messageGroup); } public SendReceipt syncSendFifoMessage (String destination, Message<?> message, String messageGroup) { return syncSendGrpcMessage(destination, message, null , messageGroup); }
(3)Delay Message
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public SendReceipt syncSendDelayMessage (String destination, Object payload, Duration messageDelayTime) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, messageDelayTime, null ); } public SendReceipt syncSendDelayMessage (String destination, String payload, Duration messageDelayTime) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, messageDelayTime, null ); } public SendReceipt syncSendDelayMessage (String destination, byte [] payload, Duration messageDelayTime) { Message<?> message = MessageBuilder.withPayload(payload).build(); return syncSendGrpcMessage(destination, message, messageDelayTime, null ); } public SendReceipt syncSendDelayMessage (String destination, Message<?> message, Duration messageDelayTime) { return syncSendGrpcMessage(destination, message, messageDelayTime, null ); }
他们底层都调用了syncSendGrpcMessage 方法。
异步消息的发送 我们定义了如下一个异步消息的发送方法 asyncSend。 类似的,他的核心方法调用了 rocketmq-client-java 原生的sendAsync异步发送消息方法:future = grpcProducer.sendAsync(rocketMsg);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public CompletableFuture<SendReceipt> asyncSend (String destination, Message<?> message, Duration messageDelayTime, String messageGroup, CompletableFuture<SendReceipt> future) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("send request message failed. destination:{}, message is null " , destination); throw new IllegalArgumentException ("`message` and `message.payload` cannot be null" ); } Producer grpcProducer = this .getProducer(); try { org.apache.rocketmq.client.apis.message.Message rocketMsg = this .createRocketMQMessage(destination, message, messageDelayTime, messageGroup); future = grpcProducer.sendAsync(rocketMsg); } catch (Exception e) { log.error("send request message failed. destination:{}, message:{} " , destination, message); throw new MessagingException (e.getMessage(), e); } return future; }
事务消息的发送 与普通消息发送不同的是,我们需要在事务消息中调用grpcProducer.beginTransaction()开启事务,并且还要提交事务transaction.commit()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public Pair<SendReceipt, Transaction> sendMessageInTransaction (String destination, Object payload) throws ClientException { Message<?> message = MessageBuilder.withPayload(payload).build(); return sendTransactionMessage(destination, message); } public Pair<SendReceipt, Transaction> sendMessageInTransaction (String destination, String payload) throws ClientException { Message<?> message = MessageBuilder.withPayload(payload).build(); return sendTransactionMessage(destination, message); } public Pair<SendReceipt, Transaction> sendMessageInTransaction (String destination, byte [] payload) throws ClientException { Message<?> message = MessageBuilder.withPayload(payload).build(); return sendTransactionMessage(destination, message); } public Pair<SendReceipt, Transaction> sendTransactionMessage (String destination, Message<?> message) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("send request message failed. destination:{}, message is null " , destination); throw new IllegalArgumentException ("`message` and `message.payload` cannot be null" ); } final SendReceipt sendReceipt; Producer grpcProducer = this .getProducer(); org.apache.rocketmq.client.apis.message.Message rocketMsg = this .createRocketMQMessage(destination, message, null , null ); final Transaction transaction; try { transaction = grpcProducer.beginTransaction(); sendReceipt = grpcProducer.send(rocketMsg, transaction); log.info("Send transaction message successfully, messageId={}" , sendReceipt.getMessageId()); } catch (ClientException e) { log.error("send request message failed. destination:{}, message:{} " , destination, message); throw new RuntimeException (e); } return new Pair <>(sendReceipt, transaction); }
封装消息接收方法
PushConsumer消息接收 因为之前在RocketmqProperties中已经将bean:pushConsumerBuilder 注入了ioc容器中,现在只需要让用户定义消息监听处理方式,然后在初始化时将 pushConsumerBuilder所需要的参数传入就可以了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void initRocketMQPushConsumer () { if (rocketMQMessageListener == null ) { throw new IllegalArgumentException ("Property 'rocketMQMessageListener' is required" ); } Assert.notNull(consumerGroup, "Property 'consumerGroup' is required" ); Assert.notNull(topic, "Property 'topic' is required" ); Assert.notNull(tag, "Property 'tag' is required" ); FilterExpression filterExpression = null ; final ClientServiceProvider provider = ClientServiceProvider.loadService(); if (StringUtils.hasLength(this .getTag())) { filterExpression = RocketMQUtil.createFilterExpression(this .getTag(),this .getType()); } ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(this .getAccessKey(), this .getSecretKey(), this .getEndpoints(), this .getRequestTimeout()); PushConsumerBuilder pushConsumerBuilder = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration); if (StringUtils.hasLength(this .getConsumerGroup())) { pushConsumerBuilder.setConsumerGroup(this .getConsumerGroup()); } if (StringUtils.hasLength(this .getTopic()) && Objects.nonNull(filterExpression)) { pushConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(this .getTopic(), filterExpression)); } pushConsumerBuilder .setConsumptionThreadCount(this .getConsumptionThreadCount()) .setMaxCacheMessageSizeInBytes(this .getMaxCacheMessageSizeInBytes()) .setMaxCacheMessageCount(this .getMaxCachedMessageCount()) .setMessageListener(rocketMQListener); this .setPushConsumerBuilder(pushConsumerBuilder); }
SimpleConsumer消息接收
同步接收消息 我们定义了一个 receive 方法作为 SimpleConsumer 的消息接收方法,并且传入参数maxMessageNum指定了每次长轮询的最大消息数,invisibleDuration 指定了设置消息接收后的不可见持续时间。
1 2 3 4 public List<MessageView> receive (int maxMessageNum, Duration invisibleDuration) throws ClientException { SimpleConsumer simpleConsumer = this .getSimpleConsumer(); return simpleConsumer.receive(maxMessageNum, invisibleDuration); }
进行消息确认:
1 2 3 4 public void ack (MessageView message) throws ClientException { SimpleConsumer simpleConsumer = this .getSimpleConsumer(); simpleConsumer.ack(message); }
异步接收消息 receiveAsync方法返回一个CompletableFuture类型的异步返回结果,对于这个结果的处理,用户可以在调用rocketmqTemplate.receiveAsync之后自行处理。
1 2 3 4 5 6 public CompletableFuture<List<MessageView>> receiveAsync (int maxMessageNum, Duration invisibleDuration) throws ClientException, IOException { SimpleConsumer simpleConsumer = this .getSimpleConsumer(); CompletableFuture<List<MessageView>> listCompletableFuture = simpleConsumer.receiveAsync(maxMessageNum, invisibleDuration); simpleConsumer.close(); return listCompletableFuture; }
进行消息确认:
1 2 3 4 public CompletableFuture<Void> ackAsync (MessageView messageView) { SimpleConsumer simpleConsumer = this .getSimpleConsumer(); return simpleConsumer.ackAsync(messageView); }