This represents a single connection to a RabbitMQ server (or cluster). Once created, it will immediately attempt to establish a connection. When the connection is lost, for whatever reason, it will reconnect. This implements the EventEmitter interface and may emit error events. Close it with Connection#close()

const rabbit = new Connection('amqp://guest:guest@localhost:5672')
rabbit.on('error', (err) => {
console.log('RabbitMQ connection error', err)
})
rabbit.on('connection', () => {
console.log('RabbitMQ (re)connected')
})
process.on('SIGINT', () => {
rabbit.close()
})

Hierarchy

  • EventEmitter
    • Connection

Constructors

Accessors

Methods

  • Allocate and return a new AMQP Channel. You MUST close the channel yourself. Will wait for connect/reconnect when necessary.

    Returns Promise<Channel>

  • Request a single message from a queue. Useful for testing.

    Parameters

    • params: {
          noAck?: boolean;
          queue?: string;
      }
      • OptionalnoAck?: boolean

        If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application.

      • Optionalqueue?: string

        Specifies the name of the queue to consume from.

    Returns Promise<undefined | SyncMessage>

  • Parameters

    • Optionalqueue: string

    Returns Promise<undefined | SyncMessage>

  • Wait for channels to close and then end the connection. Will not automatically close any channels, giving you the chance to ack/nack any outstanding messages while preventing new channels.

    Returns Promise<void>

  • Create a message publisher that can recover from dropped connections. This will create a dedicated Channel, declare queues, declare exchanges, and declare bindings. If the connection is reset, then all of this setup will rerun on a new Channel. This also supports retries.

    Parameters

    Returns Publisher

  • This will create a single "client" Channel on which you may publish messages and listen for direct responses. This can allow, for example, two micro-services to communicate with each other using RabbitMQ as the middleman instead of directly via HTTP.

    Parameters

    Returns RPCClient

  • Bind exchange to an exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          destination: string;
          routingKey?: string;
          source: string;
      }
      • Optionalarguments?: Record<string, any>

        A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.

      • destination: string

        Specifies the name of the destination exchange to bind.

      • OptionalroutingKey?: string

        Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation.

      • source: string

        Specifies the name of the source exchange to bind.

    Returns Promise<void>

  • Verify exchange exists, create if needed.

    Parameters

    • params: {
          arguments?: {
              alternate-exchange?: string;
              [k: string]: any;
          };
          autoDelete?: boolean;
          durable?: boolean;
          exchange: string;
          internal?: boolean;
          passive?: boolean;
          type?: string;
      }
      • Optionalarguments?: {
            alternate-exchange?: string;
            [k: string]: any;
        }
      • OptionalautoDelete?: boolean

        If set, the exchange is deleted when all queues have finished using it.

      • Optionaldurable?: boolean

        If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

      • exchange: string

        Exchange names starting with "amq." are reserved for pre-declared and standardised exchanges. The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon.

      • Optionalinternal?: boolean

        If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

      • Optionalpassive?: boolean

        If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.

      • Optionaltype?: string

        direct, topic, fanout, or headers: Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it.

        "direct"
        

    Returns Promise<void>

  • Delete an exchange.

    Parameters

    • params: {
          exchange: string;
          ifUnused?: boolean;
      }
      • exchange: string

        Name of the exchange

      • OptionalifUnused?: boolean

        If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

    Returns Promise<void>

  • Unbind an exchange from an exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          destination: string;
          routingKey?: string;
          source: string;
      }
      • Optionalarguments?: Record<string, any>
      • destination: string

        Specifies the name of the destination exchange to unbind.

      • OptionalroutingKey?: string

        Specifies the routing key of the binding to unbind.

      • source: string

        Specifies the name of the source exchange to unbind.

    Returns Promise<void>

  • The connection is successfully (re)established

    Parameters

    • name: "connection"
    • cb: (() => void)
        • (): void
        • Returns void

    Returns this

  • The rabbitmq server is low on resources. Message publishers should pause. The outbound side of the TCP socket is blocked until "connection.unblocked" is received, meaning messages will be held in memory. https://www.rabbitmq.com/connection-blocked.html

    Parameters

    • name: "connection.blocked"
    • cb: ((reason: string) => void)
        • (reason): void
        • Parameters

          • reason: string

          Returns void

    Returns this

  • The rabbitmq server is accepting new messages.

    Parameters

    • name: "connection.unblocked"
    • cb: (() => void)
        • (): void
        • Returns void

    Returns this

  • Parameters

    • name: "error"
    • cb: ((err: any) => void)
        • (err): void
        • Parameters

          • err: any

          Returns void

    Returns this

  • Returns a promise which is resolved when the connection is established. WARNING: if timeout=0 and you call close() before the client can connect, then this promise may never resolve!

    Parameters

    • timeout: number = 10_000

      Milliseconds to wait before giving up and rejecting the promise. Use 0 for no timeout.

    • disableAutoClose: boolean = false

      By default this will automatically call connection.close() when the timeout is reached. If disableAutoClose=true, then connection will instead continue to retry after this promise is rejected. You can call close() manually.

    Returns Promise<void>

  • This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          exchange: string;
          queue?: string;
          routingKey?: string;
      }
      • Optionalarguments?: Record<string, any>
      • exchange: string

        Name of the exchange to bind to.

      • Optionalqueue?: string

        Specifies the name of the queue to bind. If blank, then the last declared queue on the channel will be used.

      • OptionalroutingKey?: string

        Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the queue name is empty, the server uses the last queue declared on the channel. If the routing key is also empty, the server uses this queue name for the routing key as well. If the queue name is provided but the routing key is empty, the server does the binding with that empty routing key. The meaning of empty routing keys depends on the exchange implementation.

    Returns Promise<void>

  • Declare queue, create if needed. If queue empty or undefined then a random queue name is generated (see the return value).

    Parameters

    • params: {
          arguments?: {
              x-dead-letter-exchange?: string;
              x-dead-letter-routing-key?: string;
              x-expires?: number;
              x-max-length?: number;
              x-max-priority?: number;
              x-message-ttl?: number;
              x-overflow?: "drop-head" | "reject-publish" | "reject-publish-dlx";
              x-queue-type?: "quorum" | "classic" | "stream";
              [k: string]: any;
          };
          autoDelete?: boolean;
          durable?: boolean;
          exclusive?: boolean;
          passive?: boolean;
          queue?: string;
      }
      • Optionalarguments?: {
            x-dead-letter-exchange?: string;
            x-dead-letter-routing-key?: string;
            x-expires?: number;
            x-max-length?: number;
            x-max-priority?: number;
            x-message-ttl?: number;
            x-overflow?: "drop-head" | "reject-publish" | "reject-publish-dlx";
            x-queue-type?: "quorum" | "classic" | "stream";
            [k: string]: any;
        }
      • OptionalautoDelete?: boolean

        If set, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues using the Delete method as normal.

      • Optionaldurable?: boolean

        If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

      • Optionalexclusive?: boolean

        Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

      • Optionalpassive?: boolean

        If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. The client can use this to check whether a queue exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect.

      • Optionalqueue?: string

        The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method. Queue names starting with "amq." are reserved for pre-declared and standardised queues. The queue name can be empty, or a sequence of these characters: letters, digits, hyphen, underscore, period, or colon.

    Returns Promise<{
        consumerCount: number;
        messageCount: number;
        queue: string;
    }>

  • Parameters

    • Optionalqueue: string

    Returns Promise<{
        consumerCount: number;
        messageCount: number;
        queue: string;
    }>

  • This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. If queue is empty or undefined then the last declared queue on the channel is used.

    Parameters

    • params: {
          ifEmpty?: boolean;
          ifUnused?: boolean;
          queue?: string;
      }
      • OptionalifEmpty?: boolean

        If set, the server will only delete the queue if it has no messages.

      • OptionalifUnused?: boolean

        If set, the server will only delete the queue if it has no consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead.

      • Optionalqueue?: string

        Specifies the name of the queue to delete.

    Returns Promise<{
        messageCount: number;
    }>

  • Parameters

    • Optionalqueue: string

    Returns Promise<{
        messageCount: number;
    }>

  • Remove all messages from a queue which are not awaiting acknowledgment. If queue is empty or undefined then the last declared queue on the channel is used.

    Parameters

    • Optionalqueue: string

    Returns Promise<{
        messageCount: number;
    }>

  • Parameters

    • params: {
          queue?: string;
      }
      • Optionalqueue?: string

    Returns Promise<{
        messageCount: number;
    }>

  • Unbind a queue from an exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          exchange: string;
          queue?: string;
          routingKey?: string;
      }
      • Optionalarguments?: Record<string, any>
      • exchange: string

        The name of the exchange to unbind from.

      • Optionalqueue?: string

        Specifies the name of the queue to unbind.

      • OptionalroutingKey?: string

        Specifies the routing key of the binding to unbind.

    Returns Promise<void>

  • Immediately destroy the connection. All channels are closed. All pending actions are rejected.

    Returns void