Maximum number of messages to process at once. Non-zero positive integer. Writeable.
Get the current consumer ID. If generated by the broker, then this will change each time the consumer is ready.
Get current queue name. If the queue name was left blank in ConsumerProps, then this will change whenever the channel is reset, as the name is randomly generated.
Stop consuming messages. Close the channel once all pending message handlers have settled. If called while the Connection is reconnecting, then this may be delayed by acquireTimeout
The consumer is successfully (re)created
Errors are emitted if a message handler fails, or if channel setup fails, or if the consumer is cancelled by the server (like when the queue is deleted).
Generated using TypeDoc
See
This will create a dedicated Channel, declare a queue, declare exchanges, declare bindings, establish QoS, and finally start consuming messages. If the connection is reset, then all of this setup will re-run on a new Channel. This uses the same retry-delay logic as the Connection.
The callback is called for each incoming message. If it throws an error or returns a rejected Promise then the message is NACK'd (rejected) and possibly requeued, or sent to a dead-letter exchange. Otherwise the message is automatically ACK'd and removed from the queue.
About concurency: For best performance, you'll likely want to start with concurrency=X and qos.prefetchCount=2X. In other words, up to 2X messages are loaded into memory, but only X ConsumerHandlers are running concurrently. The consumer won't need to wait for a new message if one has alredy been prefetched, minimizing idle time. With more worker processes, you will want a lower prefetchCount to avoid worker-starvation.
The 2nd argument of
handler(msg, reply)
can be used to reply to RPC requests. e.g.await reply('my-response-body')
. This acts like basicPublish() except the message body comes first. Some fields are also set automaticaly. See ConsumerHandler for more detail.This is an EventEmitter that may emit errors. Also, since this wraps a Channel, this must be closed before closing the Connection.
Example