問題描述
spring boot jms ‑ 在@JmsListner 中發送和接收消息 (spring boot jms ‑ send and receive message inside @JmsListner)
QUEUE_INI 使用 IBM MQ9、最新的 spring boot 和 IBM 的 spring‑boot‑starter。
我想用@JmsListner 接收一條消息,然後不提交它,將另一條消息發送到另一個隊列,接收響應,然後全部提交。
到目前為止,我有下一個:
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg ‑> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
我卡在“已發送消息”上。看起來子請求還沒有真正發送。我在 MQ UI 中看到隊列深度為 1,但裡面沒有消息,我的子請求偵聽器也沒有看到任何消息。
我也嘗試過使用 sendAndReceive 方法:
Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session ‑> {
Message msg = session.createTextMessage();
msg.setStringProperty("type", "com.example.SubRequest");
LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
return msg;
});
但是我沒有訪問模型隊列的權限。
有什麼辦法可以做到嗎?
更新:我在大家的共同幫助下完成了這項工作。我最終得到了單獨的服務,僅使用 @Transactional(propagation = Propagation.REQUIRES_NEW)
發送子請求。所有其他邏輯都保留在主偵聽器中。
同時打開事務開始/結束日誌也很有幫助:
logging:
level:
org.springframework.transaction.interceptor: trace
參考解法
方法 1:
Your outbound message sender is enrolled in the same transaction as your message receiver. So the receiver of the outbound message won't see the message until the transaction commits. I think you need to start a new transaction to perform the inner procedure.
== Update ==
So it's been a while, and I don't have a dev environment set up for this, but I suggest something like this. Essentially, you're splitting up the Listener and Sender/Receiver into two separate classes. The Sender/Receiver should be injected into your listener so the @Transactional annotation is honored.
public class MyListener {
private final MySender sender;
public MyListener(MySender sender) {
this.sender = sender;
}
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
Message reply = sender.sendAndReceive()
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
}
public class MySender {
private final JmsTemplate jmsTemplate2;
private final Destination QUEUE_OUT;
public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
this.jmsTemplate2 = jmsTemplate2;
this.QUEUE_OUT = QUEUE_OUT;
}
@Transactional(propagation=Propagation.NESTED) // Or REQUIRES_NEW, edepending on usage
public Message sendAndReceive() throws JMSException {
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg ‑> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
return reply;
}
}
The @Transactional(propagation=Propagation.NESTED) will start a new transaction if one is already running (and commit/rollback when the method exits), so you should be able to send/receive the message and return it to you listener.
方法 2:
Unfortunately I am unable to add a comment to the previous answer as I don’t have sufficient privilege.
As @Nicholas says, your message receive is in the same transaction as the message send. Until the receiveMessage
method completes the initial send won’t be committed to the queue for the receiver to consume. I am assuming you might have the receive configured with something like RECEIVE_TIMEOUT_INDEFINITE_WAIT
which would then block completion of the method.
With this configuration, you already have a Message Listener bound to QUEUE_IN
. Messages delivered to that queue would be delivered to just one consumer. I see you are using selectors to avoid this problem.
One option might be to introduce another Message Listener for type = com.example.SubResponse
and remove the blocking receive from the first.
(by Alexander Stepchkov、Nicholas、richc)
參考文件