Programming

RabbitMQ / AMQP : 단일 대기열, 동일한 메시지에 대한 여러 소비자?

procodes 2020. 7. 17. 21:21
반응형

RabbitMQ / AMQP : 단일 대기열, 동일한 메시지에 대한 여러 소비자?


나는 RabbitMQ와 AMQP를 일반적으로 사용하기 시작했습니다.

  • 메시지 대기열이 있습니다
  • 여러 소비자가 있는데 동일한 메시지로 다른 일을하고 싶습니다 .

RabbitMQ 문서의 대부분은 라운드 로빈, 즉 단일 메시지가 단일 소비자에 의해 소비되고 각 소비자간에로드가 분산되는 경우에 초점을 둔 것으로 보입니다. 이것은 실제로 내가 목격하는 행동입니다.

예를 들어, 생산자는 단일 대기열을 가지며 2 초마다 메시지를 보냅니다.

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

그리고 여기 소비자가 있습니다 :

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

소비자를 두 번 시작하면 각 소비자가 라운드 로빈 동작으로 대체 메시지를 소비하고 있음을 알 수 있습니다. 예를 들어, 한 터미널에는 메시지 1, 3, 5, 다른 터미널에는 2, 4, 6 메시지가 표시됩니다 .

내 질문은 :

  • 각 소비자에게 동일한 메시지를 받도록 할 수 있습니까? 즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까? 이것을 AMQP / RabbitMQ에서 무엇이라고하나요? 일반적으로 어떻게 구성됩니까?

  • 이것은 일반적으로 수행됩니까? 교환기가 단일 소비자 대신 두 개의 별도 대기열로 메시지를 라우팅해야합니까?


각 소비자에게 동일한 메시지를 받도록 할 수 있습니까? 즉, 두 소비자 모두 메시지 1, 2, 3, 4, 5, 6을 수신합니까? 이것을 AMQP / RabbitMQ에서 무엇이라고하나요? 일반적으로 어떻게 구성됩니까?

소비자가 동일한 대기열에있는 경우 아닙니다. RabbitMQ의 AMQP 개념 안내서에서 :

AMQP 0-9-1에서 메시지는 소비자간에로드 균형 조정됨을 이해해야합니다.

이것은 큐 내의 라운드 로빈 동작이 지정 되어 있으며 구성 할 수 없음을 의미합니다. 즉, 여러 소비자가 동일한 메시지 ID를 처리하려면 별도의 큐가 필요합니다.

이것은 일반적으로 수행됩니까? 교환기가 단일 소비자 대신 두 개의 별도 대기열로 메시지를 라우팅해야합니까?

아니요, 각 소비자가 동일한 메시지 ID를 처리하는 단일 대기열 / 여러 소비자는 불가능합니다. 교환기가 메시지를 두 개의 개별 대기열로 라우팅하는 것이 실제로 더 좋습니다.

너무 복잡한 라우팅이 필요하지 않기 때문에 팬 아웃 교환 이이를 잘 처리합니다. node-amqp에는 메시지를 직접 연결에 게시 할 수있는 '기본 교환'개념이 있기 때문에 Exchange에 너무 집중하지 않았지만 대부분의 AMQP 메시지는 특정 교환에 게시됩니다.

팬 아웃 교환은 다음과 같습니다. 송수신

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})

Just read the rabbitmq tutorial. You publish message to exchange, not to queue; it is then routed to appropriate queues. In your case, you should bind separate queue for each consumer. That way, they can consume messages completely independently.


The last couple of answers are almost correct - I have tons of apps that generate messages that need to end up with different consumers so the process is very simple.

If you want multiple consumers to the same message, do the following procedure.

Create multiple queues, one for each app that is to receive the message, in each queue properties, "bind" a routing tag with the amq.direct exchange. Change you publishing app to send to amq.direct and use the routing-tag (not a queue). AMQP will then copy the message into each queue with the same binding. Works like a charm :)

Example: Lets say I have a JSON string I generate, I publish it to the "amq.direct" exchange using the routing tag "new-sales-order", I have a queue for my order_printer app that prints order, I have a queue for my billing system that will send a copy of the order and invoice the client and I have a web archive system where I archive orders for historic/compliance reasons and I have a client web interface where orders are tracked as other info comes in about an order.

So my queues are: order_printer, order_billing, order_archive and order_tracking All have the binding tag "new-sales-order" bound to them, all 4 will get the JSON data.

This is an ideal way to send data without the publishing app knowing or caring about the receiving apps.


Yes each consumer can receive the same messages. have a look at http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

for different ways to route messages. I know they are for python and java but its good to understand the principles, decide what you are doing and then find how to do it in JS. Its sounds like you want to do a simple fanout (tutorial 3), which sends the messages to all queues connected to the exchange.

The difference with what you are doing and what you want to do is basically that you are going to set up and exchange or type fanout. Fanout excahnges send all messages to all connected queues. Each queue will have a consumer that will have access to all the messages separately.

Yes this is commonly done, it is one of the features of AMPQ.


The send pattern is a one-to-one relationship. If you want to "send" to more than one receiver you should be using the pub/sub pattern. See http://www.rabbitmq.com/tutorials/tutorial-three-python.html for more details.


RabbitMQ / AMQP: single queue, multiple consumers for same message and page refresh.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });

To get the behavior you want, simply have each consumer consume from its own queue. You'll have to use a non-direct exchange type (topic, header, fanout) in order to get the message to all of the queues at once.


As I assess your case is:

  • I have a queue of messages (your source for receiving messages, lets name it q111)

  • I have multiple consumers, which I would like to do different things with the same message.

Your problem here is while 3 messages are received by this queue, message 1 is consumed by a consumer A, other consumers B and C consumes message 2 and 3. Where as you are in need of a setup where rabbitmq passes on the same copies of all these three messages(1,2,3) to all three connected consumers (A,B,C) simultaneously.

While many configurations can be made to achieve this, a simple way is to use the following two step concept:

  • Use a dynamic rabbitmq-shovel to pickup messages from the desired queue(q111) and publish to a fanout exchange (exchange exclusively created and dedicated for this purpose).
  • Now re-configure your consumers A,B & C (who were listening to queue(q111)) to listen from this Fanout exchange directly using a exclusive & anonymous queue for each consumer.

Note: While using this concept don't consume directly from the source queue(q111), as messages already consumed wont be shovelled to your Fanout exchange.

If you think this does not satisfies your exact requirement... feel free to post your suggestions :-)


There is one interesting option in this scenario I haven`t found in answers here.

You can Nack messages with "requeue" feature in one consumer to process them in another. Generally speaking it is not a right way, but maybe it will be good enough for someone.

https://www.rabbitmq.com/nack.html

And beware of loops (when all concumers nack+requeue message)!


If you happen to be using the amqplib library as I am, they have a handy example of an implementation of the Publish/Subscribe RabbitMQ tutorial which you might find handy.


I think you should check sending your messages using the fan-out exchanger. That way you willl receiving the same message for differents consumers, under the table RabbitMQ is creating differents queues for each one of this new consumers/subscribers.

This is the link for see the tutorial example in javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

참고URL : https://stackoverflow.com/questions/10620976/rabbitmq-amqp-single-queue-multiple-consumers-for-same-message

반응형