See

This will create a dedicated Channel, declare a queue, declare exchanges, declare bindings, establish QoS, and finally start consuming messages. If the connection is reset, then all of this setup will re-run on a new Channel. This uses the same retry-delay logic as the Connection.

The callback is called for each incoming message. If it throws an error then the message is rejected (BasicNack) and possibly requeued, or sent to a dead-letter exchange. The error is then emitted as an event. The callback can also return a numeric status code to control the ACK/NACK behavior. The ConsumerStatus enum is provided for convenience.

ACK/NACK behavior when the callback:

  • throws an error - BasicNack(requeue=ConsumerProps.requeue)
  • returns 0 or undefined - BasicAck
  • returns 1 - BasicNack(requeue=true)
  • returns 2 - BasicNack(requeue=false)

About concurency: For best performance, you'll likely want to start with concurrency=X and qos.prefetchCount=2X. In other words, up to 2X messages are loaded into memory, but only X ConsumerHandlers are running concurrently. The consumer won't need to wait for a new message if one has alredy been prefetched, minimizing idle time. With more worker processes, you will want a lower prefetchCount to avoid worker-starvation.

The 2nd argument of handler(msg, reply) can be used to reply to RPC requests. e.g. await reply('my-response-body'). This acts like basicPublish() except the message body comes first. Some fields are also set automaticaly. See ConsumerHandler for more detail.

This is an EventEmitter that may emit errors. Also, since this wraps a Channel, this must be closed before closing the Connection.

Example

const sub = rabbit.createConsumer({queue: 'my-queue'}, async (msg, reply) => {
console.log(msg)
// ... do some work ...

// optionally reply to an RPC-type message
await reply('my-response-data')

// optionally return a status code
if (somethingBad) {
return ConsumerStatus.DROP
}
})

sub.on('error', (err) => {
console.log('consumer error (my-queue)', err)
})

// when closing the application
await sub.close()

Hierarchy

  • EventEmitter
    • Consumer

Properties

Accessors

Methods

Properties

concurrency: number

Maximum number of messages to process at once. Non-zero positive integer. Writeable.

stats: {
    acknowledged: number;
    dropped: number;
    initialMessageCount: number;
    prefetched: number;
    requeued: number;
} = ...

Some statistics about this Consumer

Type declaration

  • acknowledged: number

    Total acknowledged messages

  • dropped: number

    Total messages rejected BasicNack(requeue=false)

  • initialMessageCount: number

    Size of the queue when this consumer started

  • prefetched: number

    How many messages are in memory, waiting to be processed

  • requeued: number

    Total messages rejected with BasicNack(requeue=true)

Accessors

  • get consumerTag(): string
  • Get the current consumer ID. If generated by the broker, then this will change each time the consumer is ready.

    Returns string

  • get queue(): string
  • Get current queue name. If the queue name was left blank in ConsumerProps, then this will change whenever the channel is reset, as the name is randomly generated.

    Returns string

Methods

  • Stop consuming messages. Close the channel once all pending message handlers have settled. If called while the Connection is reconnecting, then this may be delayed by ConnectionOptions.acquireTimeout

    Returns Promise<void>

  • The consumer is successfully (re)created

    Parameters

    • name: "ready"
    • cb: (() => void)
        • (): void
        • Returns void

    Returns Consumer

  • Errors are emitted if a message handler fails, or if channel setup fails, or if the consumer is cancelled by the server (like when the queue is deleted).

    Parameters

    • name: "error"
    • cb: ((err) => void)
        • (err): void
        • Parameters

          • err: any

          Returns void

    Returns Consumer

  • Starts the consumer if it is currently stopped. When created with lazy=true, begin consuming.

    Returns void

Generated using TypeDoc