spring amqp async responses and correlate it with a request

Multi tool use
Multi tool use


spring amqp async responses and correlate it with a request



I'm trying to implement the request-response pattern using spring amqp.
I looked at the docs and examples but i still can't understand how to use correlation Id and consume the right response for a request in asynchronous environment.



Assume i have a Transporter class which receives a request, sends the request
to some requestQueue, and then waits(blocking) to a response using a listener to some responseQueue, and then returns the response.



Now, I have an AsyncSocketService which listens to requestQueue, and then sends the request via a socket to another server which process the request and returns a response. this server works asynchronously, so if two requests come, the response does not have to be in the same order. That means that the response from AsyncSocketService is managed in a different thread that listens to the socket InputStream.



After the thread that listen to the socket's InputStream receives a response, it publish it to a responseQueue, and then my Transporter, who listens to the responseQueue, can return the response to the original caller.



The Transporter listens to the responseQueue like this:


byte response = (byte) rabbitTemplate.receiveAndConvert(queueName, timeout);



But in this way, i can't make sure that a given response is matching the right request.



I've seen some examples to use a reply queue, when you define SimpleMessageListenerContainer and RabbitTemplate with reply address.
but still i don't understand where the correlation id comes to play and how can i check it. and this solution does not fit for me, because i need my Transporter block and wait for a response that match the request.



Important thing to note, the requests and responses have a refId field in the byte to allow matching between request and response, but i don't want to catch and requeue responses that don't match.


refId



Can someone help find a solution for my use case?



Thanks!




1 Answer
1



Your architecture is a bit unclear and I'm afraid I can't answer to your question about manual correlation directly, but for the request-reply scenarios there are RabbitTempalte.sendAndReceive(). You really can configure it with the fixed queue for replies, if that is your requirement:


RabbitTempalte.sendAndReceive()


queue


/**
* The name of the default queue to receive messages from when none is specified explicitly.
*
* @param queue the default queue name to use for receive
*/
public void setQueue(String queue) {



There is an automatic correlation support which you can control via:


/**
* Set to true to use correlation id provided by the message instead of generating
* the correlation id for request/reply scenarios. The correlation id must be unique
* for all in-process requests to avoid cross talk.
* <p>
* <b>Users must therefore take create care to ensure uniqueness.</b>
* @param userCorrelationId true to use user correlation data.
*/
public void setUserCorrelationId(boolean userCorrelationId)



At the same time you need to configure this RabbitTemplate as a listener to some MessageListenerContainer to really consume those async replies.


RabbitTemplate


MessageListenerContainer



For true async request/replies there is also an AsyncRabbitTemplate variant, which returns RabbitMessageFuture for later consumption of the reply.


AsyncRabbitTemplate


RabbitMessageFuture



Anyway it is very important that a requestQueue consumer side supports a correlation transferring to the reply back. Otherwise on the producer side there won't be anything to match an async reply with the request has been sent.


requestQueue



All the information is in the Reference Manual.





I'll try to explain the main problem. I have multiple requests that use a socket to write to an old TCP server. For each client socket I have that is connected to this server, I have an handler that read anything coming from it and writing x bytes from it to a queue. The write operation and read operation happen in different context so I can't use sendAndReceive. I am trying to use AMQP to coordinate everything but I have to send to rabbit in a request thread, in a different thread read from the socket and write to a response queue, and wait in the request thread for the correct response
– Aharon Bar-El
Jul 2 at 13:32


sendAndReceive


send





Well, it's going to be to hard to implement with the fixed replyQueue. You would need to replicate a PendingReply logic from the RabbitTemplate. How about to consider to rely on the temporary anonymous queue per request? This is exactly what is done in the RabbitTemplate.doSendAndReceiveWithTemporary(). This way you will have a queue per request and no any correlation collision is going to happen. However your consumer has to send replies back using a replyTo header from the request, not that fixed replyQueue.
– Artem Bilan
Jul 2 at 13:41


replyQueue


PendingReply


RabbitTemplate


temporary


RabbitTemplate.doSendAndReceiveWithTemporary()


replyTo


replyQueue






By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

MNlWmcbr3mUPLPt1Vp9OE1 8d 6YKuA,BleW luEwI7XBd0iQGbwQRPMqEl b,d6CE
Vg MpU30vdZd aU Ihn1NS4SReu2JaHbTjqz4,Omg,NPRYQj1op,tk6hRl,U 4KtWZ1QMZKZozc9 w88,fgnjc2ZJDbC041exa

Popular posts from this blog

PHP contact form sending but not receiving emails

Do graphics cards have individual ID by which single devices can be distinguished?

Create weekly swift ios local notifications