此次主要介紹jmsTemplate 主動式聆聽多個MQ
目標:依據yml設定擋動態新增 主動式聆聽多個MQ
前情提要:
銜接之前
3.SpringBoot使用jms聆聽MQ
可依據yml自定義1組要聆聽的MQ和指定要聆聽哪1個頻道
此篇會介紹的資訊較為繁多
因此分篇介紹
ibm:
mq:
queueManager: qm管理者
channel: 自定義頻道
connName: host(port)
user:
這次主要介紹可以用多組設定參數聆聽多個MQ
設定檔長這樣
mqList:
- concurrency: "10-100"
receive-timeout: -1
queueManager: 自定義佇列管理者
channel: 自定義頻道
connName: host(ip)
user:
destination: 目標佇列名稱
- concurrency: "10-100"
receive-timeout: -1
queueManager: 自定義佇列管理者
channel: 自定義頻道
connName: host(ip)
user:
destination: 目標佇列名稱
先前沒介紹過的參數,這邊提出來介紹
concurrency 設定初始和最大consumer大小
recevice-timeout 接收超時設置 預設為 -1表 JMS聆聽將無限期地。如果有設定時間表示一定時間沒聆聽到就換別的事情做
destination 聆聽哪個住列 3這篇介紹的方式是用註釋@JmsListener方式
接下來我們來看一下原本比較單純的設定檔資訊
JmsTemplateConfiguration.class
@Slf4j
@EnableJms
@Component
public class JmsTemplateConfiguration {
@Bean
public JmsListenerContainerFactory<?>myFactory(ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// 現在我們先另外加上兩個參數
factory.setConcurrency("5-10");
factory.setReceiveTimeout(-1);
return factory;
}
}
spring boot 是1個能夠幫助需求比較單1、快速展示時,所適合的開發適合的1個框架,也開放我們可以依據需求彈性的調整。我們需要再增加4個步驟
- 登記註冊
- 連線資訊註冊
- 基本共同連線相關註冊
- jms聆聽註冊
登記註冊是最後步驟
主要材料為連項資訊和jms聆聽註冊
先從1開始接觸的JmsListenerContainerFactory來說明
JmsListenerContainerFactory<?> myFactory(
ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer)
有兩個參數 connectionFactory,configurer
那我們方多增加兩個參數 這樣共4個(另外兩個concurrency,receiveTimeout)
connectionFactory 其實就是連線資訊
configurer 其實就是jms聆聽註冊
連線資訊ConnectionFactory
很多不同的公司有出不同的ConnectionFactory。像是,ibm mq, activeMQ...
這邊以ibm mq為例// 此為ibm mq 的connectionFactory MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory(); mqQueueConnectionFactory.setQueueManager("佇列管理者"); mqQueueConnectionFactory.setChannel("自定義頻道"); // 設定傳輸類型 mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT); mqQueueConnectionFactory.setObjectProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, "ip(port)");
以上的區塊稱為A
基本共同連線相關註冊,不管使用ibm mq、 activeMQ 都會共同的設定就是這個
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); /* * setConcurrency("1-3") is the same as * setConcurrentConsumers(1); * setMaxConcurrentConsumers(3); * 設定初始和最大consumer大小 * */ factory.setConcurrency("1-3"); factory.setErrorHandler(jmsErrorHandler); // 同之前的錯誤處理 factory.setReceiveTimeout(-1);
以上區塊稱為B
jmsListener聆聽註冊
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); endpoint.setDestination("自定義佇列"); endpoint.setMessageListener(message -> { log.info(message.toString()) });
以上區塊稱為C
現在把他們拚再一起 最後一步
@EnableJms @Configuration public class JmsTemplateAdvConfiguration implements JmsListenerConfigurer { @Autowired JmsErrorHandler jmsErrorHandler; @Autowired MQInfoConfiguration mqInfoConfiguration; @Autowired DefaultJmsListenerContainerFactoryConfigurer defaultConfigurer; @Override public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { // 此作法請參考 [透過yml設定檔注入參數至特定資料結構] if (CollectionUtils.isNotEmpty(mqInfo.getMqList())) { for (MQInfoConfiguration.mqInfo mqinfo: mqInfoConfiguration.getMqList()) { MQQueueConnectionFactory connectionFactory = A; DefaultJmsListenerContainerFactory factory = B; // 預設註冊器把連線資訊和基本共用連線相關註冊 設定在一起 defaultConfigurer.configure(factory,connectionFactory); SimpleJmsListenerEndpoint endpoint = C; // 最後把jms聆聽註冊和 共用通用連線相關註冊 設定在一起 registrar.registerEndpoint(endpoint, factory); } } }
A=
this.genMqQueueConnectionFactory(String queueManager,String channel,String connName);
B=
(DefaultJmsListenerContainerFactory) this.genJmsListenerContainerFactory(String concurrency,String receiveTimeout);
C=
// Function<R,T> func 這部分略過 主要是放入已經定義好等下要執行的方法 this.getEndPoint(String destination, Function<Boolean,Message> func);
完整合併設定檔如下:
@EnableJms
@Configuration
public class JmsTemplateAdvConfiguration implements JmsListenerConfigurer {
@Autowired
JmsErrorHandler jmsErrorHandler;
@Autowired
MQInfoConfiguration mqInfoConfiguration;
@Autowired
DefaultJmsListenerContainerFactoryConfigurer defaultConfigurer;
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
// 此作法請參考 [透過yml設定檔注入參數至特定資料結構]
if (CollectionUtils.isNotEmpty(mqInfo.getMqList())) {
for (MQInfoConfiguration.mqInfo mqinfo: mqInfoConfiguration.getMqList()) {
MQQueueConnectionFactory connectionFactory = this.genMqQueueConnectionFactory(
mqinfo.getQueueManager(),
mqinfo.getChannel(),
mqinfo.getConnName()
);
// MQ connectionFactory 和這預設是有關連的 因此強制轉型
DefaultJmsListenerContainerFactory factory =(DefaultJmsListenerContainerFactory)this.genJmsListenerContainerFactory(mqinfo.getConcurrency(),mqinfo.getReceiveTimeout());
// 預設註冊器把連線資訊和基本共用連線相關註冊 設定在一起
defaultConfigurer.configure(factory,connectionFactory);
SimpleJmsListenerEndpoint endpoint = this.getEndPoint( mqinfo.destination, func);
// 最後把jms聆聽註冊和 共用通用連線相關註冊 設定在一起
registrar.registerEndpoint(endpoint, factory);
}
}
}
因為我們上方是註冊
並不是啟動服務時進行聆聽 之前使用@JmsListener
現在動態方式需要人工啟動
@Bean
public ApplicationRunner runner(JmsListenerEndpointRegistry registry) {
return args -> {
log.info("Hit Enter to start container(s)");
registry.getListenerContainers().forEach(
messageListenerContainer ->{
messageListenerContainer.star(); // Lifecycle::start
});
};
return args -> {
};
}
以上設定方式即可依據yml動態新增聆聽MQ的服務
ref:
- https://dev.to/adzubla/using-multiple-jms-servers-with-spring-boot-3cbm
- https://stackoverflow.com/questions/55543440/spring-jmslistener-to-listen-to-multiple-queues
- https://stackoverflow.com/questions/60762328/spring-jmslistener-start-stop-while-application-is-running
主要是給自己的一個紀錄,也分享給有需要的夥伴
註解部分有提及一些參考的連結,有興趣可以點進去看看喔
這是一個心血來潮,產生的文章
若有喜歡或交流的部分都歡迎在下方留言,多多關照。