Maximum number of messages to process at once. Non-zero positive integer. Writeable.
Readonly
statsSome statistics about this Consumer
Total acknowledged messages
Total messages rejected BasicNack(requeue=false)
Size of the queue when this consumer started
How many messages are in memory, waiting to be processed
Total messages rejected with BasicNack(requeue=true)
Get the current consumer ID. If generated by the broker, then this will change each time the consumer is ready.
Get current queue name. If the queue name was left blank in ConsumerProps, then this will change whenever the channel is reset, as the name is randomly generated.
Stop consuming messages. Close the channel once all pending message handlers have settled. If called while the Connection is reconnecting, then this may be delayed by ConnectionOptions.acquireTimeout
The consumer is successfully (re)created
Errors are emitted if a message handler fails, or if channel setup fails, or if the consumer is cancelled by the server (like when the queue is deleted).
See
This will create a dedicated Channel, declare a queue, declare exchanges, declare bindings, establish QoS, and finally start consuming messages. If the connection is reset, then all of this setup will re-run on a new Channel. This uses the same retry-delay logic as the Connection.
The callback is called for each incoming message. If it throws an error then the message is rejected (BasicNack) and possibly requeued, or sent to a dead-letter exchange. The error is then emitted as an event. The callback can also return a numeric status code to control the ACK/NACK behavior. The ConsumerStatus enum is provided for convenience.
ACK/NACK behavior when the callback:
About concurency: For best performance, you'll likely want to start with concurrency=X and qos.prefetchCount=2X. In other words, up to 2X messages are loaded into memory, but only X ConsumerHandlers are running concurrently. The consumer won't need to wait for a new message if one has alredy been prefetched, minimizing idle time. With more worker processes, you will want a lower prefetchCount to avoid worker-starvation.
The 2nd argument of
handler(msg, reply)
can be used to reply to RPC requests. e.g.await reply('my-response-body')
. This acts like basicPublish() except the message body comes first. Some fields are also set automaticaly. See ConsumerHandler for more detail.This is an EventEmitter that may emit errors. Also, since this wraps a Channel, this must be closed before closing the Connection.
Example