6. SpringBoot使用jms聆聽多個MQ(依據yml動態新增)


此次主要介紹jmsTemplate 主動式聆聽多個MQ
目標:依據yml設定擋動態新增 主動式聆聽多個MQ

前情提要:
銜接之前
3.SpringBoot使用jms聆聽MQ
可依據yml自定義1組要聆聽的MQ和指定要聆聽哪1個頻道

此篇會介紹的資訊較為繁多
因此分篇介紹

ibm:
  mq:
    queueManager: qm管理者
    channel: 自定義頻道
    connName: host(port)
    user:

這次主要介紹可以用多組設定參數聆聽多個MQ
設定檔長這樣

mqList:
  - concurrency: "10-100"
    receive-timeout: -1
    queueManager: 自定義佇列管理者
    channel: 自定義頻道
    connName: host(ip)
    user:
    destination: 目標佇列名稱
-   concurrency: "10-100"
    receive-timeout: -1
    queueManager: 自定義佇列管理者
    channel: 自定義頻道
    connName: host(ip)
    user:
    destination: 目標佇列名稱

先前沒介紹過的參數,這邊提出來介紹
concurrency 設定初始和最大consumer大小
recevice-timeout 接收超時設置 預設為 -1表 JMS聆聽將無限期地。如果有設定時間表示一定時間沒聆聽到就換別的事情做
destination 聆聽哪個住列 3這篇介紹的方式是用註釋@JmsListener方式

接下來我們來看一下原本比較單純的設定檔資訊
JmsTemplateConfiguration.class

@Slf4j
@EnableJms
@Component
public class JmsTemplateConfiguration {

    @Bean
    public JmsListenerContainerFactory<?>myFactory(ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);

        // 現在我們先另外加上兩個參數
        factory.setConcurrency("5-10");
        factory.setReceiveTimeout(-1);
        return factory;
    }
}

spring boot 是1個能夠幫助需求比較單1、快速展示時,所適合的開發適合的1個框架,也開放我們可以依據需求彈性的調整。我們需要再增加4個步驟

  1. 登記註冊
  2. 連線資訊註冊
  3. 基本共同連線相關註冊
  4. jms聆聽註冊

登記註冊是最後步驟
主要材料為連項資訊和jms聆聽註冊

先從1開始接觸的JmsListenerContainerFactory來說明

JmsListenerContainerFactory<?> myFactory(
ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer)

有兩個參數 connectionFactory,configurer
那我們方多增加兩個參數 這樣共4個(另外兩個concurrency,receiveTimeout)

connectionFactory 其實就是連線資訊
configurer 其實就是jms聆聽註冊
  • 連線資訊ConnectionFactory
    很多不同的公司有出不同的ConnectionFactory。像是,ibm mq, activeMQ...
    這邊以ibm mq為例

    // 此為ibm mq 的connectionFactory
    MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
    mqQueueConnectionFactory.setQueueManager("佇列管理者");
    mqQueueConnectionFactory.setChannel("自定義頻道");
    // 設定傳輸類型
    mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT); mqQueueConnectionFactory.setObjectProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, "ip(port)");
    

    以上的區塊稱為A

  • 基本共同連線相關註冊,不管使用ibm mq、 activeMQ 都會共同的設定就是這個

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
          /*
           * setConcurrency("1-3") is the same as
           * setConcurrentConsumers(1);
           * setMaxConcurrentConsumers(3);
           * 設定初始和最大consumer大小
           * */
          factory.setConcurrency("1-3");
          factory.setErrorHandler(jmsErrorHandler); //  同之前的錯誤處理
          factory.setReceiveTimeout(-1);
    

    以上區塊稱為B

  • jmsListener聆聽註冊

    SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
    endpoint.setDestination("自定義佇列");
    endpoint.setMessageListener(message -> {
      log.info(message.toString())
    });
    

    以上區塊稱為C

  • 現在把他們拚再一起 最後一步

    @EnableJms
    @Configuration
    public class JmsTemplateAdvConfiguration implements JmsListenerConfigurer {
    
      @Autowired
      JmsErrorHandler jmsErrorHandler;
    
      @Autowired
      MQInfoConfiguration mqInfoConfiguration;
    
      @Autowired
      DefaultJmsListenerContainerFactoryConfigurer defaultConfigurer;
    
      @Override
      public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
      //   此作法請參考 [透過yml設定檔注入參數至特定資料結構]
      if (CollectionUtils.isNotEmpty(mqInfo.getMqList())) {
          for (MQInfoConfiguration.mqInfo mqinfo: mqInfoConfiguration.getMqList()) {
              MQQueueConnectionFactory connectionFactory = A;
              DefaultJmsListenerContainerFactory factory = B;
              // 預設註冊器把連線資訊和基本共用連線相關註冊 設定在一起
              defaultConfigurer.configure(factory,connectionFactory);
              SimpleJmsListenerEndpoint endpoint = C;
              // 最後把jms聆聽註冊和 共用通用連線相關註冊 設定在一起
              registrar.registerEndpoint(endpoint, factory);
              }
          }
      }
    

    A=

    this.genMqQueueConnectionFactory(String queueManager,String channel,String connName);
    

    B=

    (DefaultJmsListenerContainerFactory) this.genJmsListenerContainerFactory(String concurrency,String receiveTimeout);
    

    C=

    // Function<R,T> func 這部分略過 主要是放入已經定義好等下要執行的方法
    this.getEndPoint(String destination, Function<Boolean,Message> func);
    

完整合併設定檔如下:

@EnableJms
@Configuration
public class JmsTemplateAdvConfiguration implements JmsListenerConfigurer {

    @Autowired
    JmsErrorHandler jmsErrorHandler;

    @Autowired
    MQInfoConfiguration mqInfoConfiguration;

    @Autowired
    DefaultJmsListenerContainerFactoryConfigurer defaultConfigurer;

    @Override
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
    //   此作法請參考 [透過yml設定檔注入參數至特定資料結構]
    if (CollectionUtils.isNotEmpty(mqInfo.getMqList())) {
            for (MQInfoConfiguration.mqInfo mqinfo: mqInfoConfiguration.getMqList()) {
                MQQueueConnectionFactory connectionFactory = this.genMqQueueConnectionFactory(
                        mqinfo.getQueueManager(),
                        mqinfo.getChannel(),
                        mqinfo.getConnName()
                );
            // MQ connectionFactory  和這預設是有關連的 因此強制轉型
            DefaultJmsListenerContainerFactory factory =(DefaultJmsListenerContainerFactory)this.genJmsListenerContainerFactory(mqinfo.getConcurrency(),mqinfo.getReceiveTimeout());

            // 預設註冊器把連線資訊和基本共用連線相關註冊 設定在一起
            defaultConfigurer.configure(factory,connectionFactory);
            SimpleJmsListenerEndpoint endpoint = this.getEndPoint( mqinfo.destination, func);

            // 最後把jms聆聽註冊和 共用通用連線相關註冊 設定在一起
            registrar.registerEndpoint(endpoint, factory);
            }
        }
    }

因為我們上方是註冊
並不是啟動服務時進行聆聽 之前使用@JmsListener
現在動態方式需要人工啟動

@Bean
public ApplicationRunner runner(JmsListenerEndpointRegistry registry) {
            return args -> {
                log.info("Hit Enter to start container(s)");
                registry.getListenerContainers().forEach(
                        messageListenerContainer ->{
                        messageListenerContainer.star(); // Lifecycle::start
                        });
            };
        return args -> {
        };
    }

以上設定方式即可依據yml動態新增聆聽MQ的服務
ref:

  1. https://dev.to/adzubla/using-multiple-jms-servers-with-spring-boot-3cbm
  2. https://stackoverflow.com/questions/55543440/spring-jmslistener-to-listen-to-multiple-queues
  3. https://stackoverflow.com/questions/60762328/spring-jmslistener-start-stop-while-application-is-running

主要是給自己的一個紀錄,也分享給有需要的夥伴
註解部分有提及一些參考的連結,有興趣可以點進去看看喔

這是一個心血來潮,產生的文章
若有喜歡或交流的部分都歡迎在下方留言,多多關照。

#mq #jmstemplate #ibm







你可能感興趣的文章

Day 39 & 40 - Flight Deal Finder [BIG project]

Day 39 & 40 - Flight Deal Finder [BIG project]

基礎 JavaScript 語法

基礎 JavaScript 語法

從零開始學資料科學:Numpy 基礎入門

從零開始學資料科學:Numpy 基礎入門






留言討論