irpas技术客

Spring Cloud RocketMQTemplate类详解_CERTAINTY_rocketmqtemplate方法详解

网络 1075

1.继承关系

生成类关系图的操作步骤如下所示

由idea生成的继承关系图可知顶层父类是MessageSendingOperations ,这个类中定义了一些发送消息的方法。

public interface MessageSendingOperations<D> { //向默认目的地发送消息,一个参数消息 void send(Message<?> message) throws MessagingException; //向指定目的地发送消息,第一个参数目的地,第二个参数消息 void send(D destination, Message<?> message) throws MessagingException; //序列化参数并发送默认目的地,一个参数 有效载荷 void convertAndSend(Object payload) throws MessagingException; //序列化参数 并发送指定目的地 第一个参数 目的地,第二个参数 有效载荷 void convertAndSend(D destination, Object payload) throws MessagingException; //目的地 有效载荷 标题 void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException; //参数 有效载荷 和 消息后处理器 void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException; //目的地 有效载荷 后处理器 void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor) throws MessagingException; //目的地 有效载荷 标题 后处理器 void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException; }

AbstractMessageSendingTemplate 源码如下,该类是RocketMQTemplate的父类,是 MessageSendingOperations的子类

public abstract class AbstractMessageSendingTemplate<D> implements MessageSendingOperations<D> { public static final String CONVERSION_HINT_HEADER = "conversionHint"; protected final Log logger = LogFactory.getLog(getClass()); @Nullable private D defaultDestination; private MessageConverter converter = new SimpleMessageConverter(); public void setDefaultDestination(@Nullable D defaultDestination) { this.defaultDestination = defaultDestination; } @Nullable public D getDefaultDestination() { return this.defaultDestination; } public void setMessageConverter(MessageConverter messageConverter) { Assert.notNull(messageConverter, "MessageConverter must not be null"); this.converter = messageConverter; } public MessageConverter getMessageConverter() { return this.converter; } @Override public void send(Message<?> message) { send(getRequiredDefaultDestination(), message); } protected final D getRequiredDefaultDestination() { Assert.state(this.defaultDestination != null, "No 'defaultDestination' configured"); return this.defaultDestination; } @Override public void send(D destination, Message<?> message) { doSend(destination, message); } protected abstract void doSend(D destination, Message<?> message); @Override public void convertAndSend(Object payload) throws MessagingException { convertAndSend(payload, null); } @Override public void convertAndSend(D destination, Object payload) throws MessagingException { convertAndSend(destination, payload, (Map<String, Object>) null); } @Override public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers) throws MessagingException { convertAndSend(destination, payload, headers, null); } @Override public void convertAndSend(Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException { convertAndSend(getRequiredDefaultDestination(), payload, postProcessor); } @Override public void convertAndSend(D destination, Object payload, @Nullable MessagePostProcessor postProcessor) throws MessagingException { convertAndSend(destination, payload, null, postProcessor); } @Override public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) throws MessagingException { Message<?> message = doConvert(payload, headers, postProcessor); send(destination, message); } protected Message<?> doConvert(Object payload, @Nullable Map<String, Object> headers, @Nullable MessagePostProcessor postProcessor) { MessageHeaders messageHeaders = null; Object conversionHint = (headers != null ? headers.get(CONVERSION_HINT_HEADER) : null); Map<String, Object> headersToUse = processHeadersToSend(headers); if (headersToUse != null) { if (headersToUse instanceof MessageHeaders) { messageHeaders = (MessageHeaders) headersToUse; } else { messageHeaders = new MessageHeaders(headersToUse); } } MessageConverter converter = getMessageConverter(); Message<?> message = (converter instanceof SmartMessageConverter ? ((SmartMessageConverter) converter).toMessage(payload, messageHeaders, conversionHint) : converter.toMessage(payload, messageHeaders)); if (message == null) { String payloadType = payload.getClass().getName(); Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null); throw new MessageConversionException("Unable to convert payload with type='" + payloadType + "', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]"); } if (postProcessor != null) { message = postProcessor.postProcessMessage(message); } return message; } @Nullable protected Map<String, Object> processHeadersToSend(@Nullable Map<String, Object> headers) { return headers; } }

AbstractMessageSendingTemplate类有四个属性如下。

public static final String CONVERSION_HINT_HEADER = "conversionHint"; protected final Log logger = LogFactory.getLog(getClass()); @Nullable private D defaultDestination; private MessageConverter converter = new SimpleMessageConverter();

定义了一个抽象方法doSend

protected abstract void doSend(D destination, Message<?> message);

重写内容

@Override protected void doSend(String destination, Message<?> message) { SendResult sendResult = syncSend(destination, message); if (log.isDebugEnabled()) { log.debug("send message to `{}` finished. result:{}", destination, sendResult); } } InitializingBean接口

该接口的使用可以参考这篇文章Spring中InitializingBean的作用

InitializingBean 接口代码如下

public interface InitializingBean { void afterPropertiesSet() throws Exception; }

RocketMQTemplate类重写了 afterPropertiesSet方法

@Override public void afterPropertiesSet() throws Exception { if (producer != null) { producer.start(); } if (Objects.nonNull(consumer)) { try { consumer.start(); } catch (Exception e) { log.error("Failed to startup PullConsumer for RocketMQTemplate", e); } } } DisposableBean接口

该接口的使用可以参考这篇文章DisposableBean

public interface DisposableBean { void destroy() throws Exception; }

RocketMQTemplate类重写了 destroy方法

@Override public void destroy() { if (Objects.nonNull(producer)) { producer.shutdown(); } if (Objects.nonNull(consumer)) { consumer.shutdown(); } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Spring