Class Consumer

See

This will create a dedicated Channel, declare a queue, declare exchanges, declare bindings, establish QoS, and finally start consuming messages. If the connection is reset, then all of this setup will re-run on a new Channel. This uses the same retry-delay logic as the Connection.

The callback is called for each incoming message. If it throws an error or returns a rejected Promise then the message is NACK'd (rejected) and possibly requeued, or sent to a dead-letter exchange. Otherwise the message is automatically ACK'd and removed from the queue.

About concurency: For best performance, you'll likely want to start with concurrency=X and qos.prefetchCount=2X. In other words, up to 2X messages are loaded into memory, but only X ConsumerHandlers are running concurrently. The consumer won't need to wait for a new message if one has alredy been prefetched, minimizing idle time. With more worker processes, you will want a lower prefetchCount to avoid worker-starvation.

The 2nd argument of handler(msg, reply) can be used to reply to RPC requests. e.g. await reply('my-response-body'). This acts like basicPublish() except the message body comes first. Some fields are also set automaticaly. See ConsumerHandler for more detail.

This is an EventEmitter that may emit errors. Also, since this wraps a Channel, this must be closed before closing the Connection.

Example

const sub = rabbit.createConsumer({queue: 'my-queue'}, async (msg, reply) => {
console.log(msg)
// ... do some work ...
// optionally reply to an RPC-type message
await reply('my-response-data')
})

sub.on('error', (err) => {
console.log('consumer error (my-queue)', err)
})

// when closing the application
await sub.close()

Hierarchy

  • EventEmitter
    • Consumer

Properties

Accessors

Methods

Properties

concurrency: number

Maximum number of messages to process at once. Non-zero positive integer. Writeable.

Accessors

  • get consumerTag(): string
  • Get the current consumer ID. If generated by the broker, then this will change each time the consumer is ready.

    Returns string

  • get queue(): string
  • Get current queue name. If the queue name was left blank in ConsumerProps, then this will change whenever the channel is reset, as the name is randomly generated.

    Returns string

Methods

  • Stop consuming messages. Close the channel once all pending message handlers have settled. If called while the Connection is reconnecting, then this may be delayed by acquireTimeout

    Returns Promise<void>

  • The consumer is successfully (re)created

    Parameters

    • name: "ready"
    • cb: (() => void)
        • (): void
        • Returns void

    Returns Consumer

  • Errors are emitted if a message handler fails, or if channel setup fails, or if the consumer is cancelled by the server (like when the queue is deleted).

    Parameters

    • name: "error"
    • cb: ((err) => void)
        • (err): void
        • Parameters

          • err: any

          Returns void

    Returns Consumer

Generated using TypeDoc