A raw Channel can be acquired from your Connection, but please consider using a higher level abstraction like a Consumer or Publisher for most cases.

AMQP is a multi-channelled protocol. Channels provide a way to multiplex a heavyweight TCP/IP connection into several light weight connections. This makes the protocol more “firewall friendly” since port usage is predictable. It also means that traffic shaping and other network QoS features can be easily employed. Channels are independent of each other and can perform different functions simultaneously with other channels, the available bandwidth being shared between the concurrent activities.

const rabbit = new Connection()

// Will wait for the connection to establish and then create a Channel
const ch = await rabbit.acquire()

// Channels can emit some events too (see documentation)
ch.on('close', () => {
console.log('channel was closed')
})

// Create a queue for the duration of this connection
await ch.queueDeclare({queue: 'my-queue'})

// Enable publisher acknowledgements
await ch.confirmSelect()

const data = {title: 'just some object'}

// Resolves when the data has been flushed through the socket or if
// ch.confirmSelect() was called: will wait for an acknowledgement
await ch.basicPublish({routingKey: 'my-queue'}, data)

const msg = ch.basicGet('my-queue')
console.log(msg)

await ch.queueDelete('my-queue')

// It's your responsibility to close any acquired channels
await ch.close()

Hierarchy

  • EventEmitter
    • Channel

Properties

active: boolean

False if the channel is closed

id: number

Methods

  • Acknowledge one or more messages.

    Parameters

    • params: {
          deliveryTag?: number;
          multiple?: boolean;
      }
      • OptionaldeliveryTag?: number

        The server-assigned and channel-specific delivery tag

      • Optionalmultiple?: boolean

        If set to 1, the delivery tag is treated as "up to and including", so that multiple messages can be acknowledged with a single method. If set to zero, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates acknowledgement of all outstanding messages.

    Returns void

  • Stop a consumer.

    Parameters

    • consumerTag: string

    Returns Promise<{
        consumerTag: string;
    }>

  • Parameters

    • params: {
          consumerTag: string;
      }
      • consumerTag: string

    Returns Promise<{
        consumerTag: string;
    }>

  • This is a low-level method; consider using Connection#createConsumer() instead.

    Begin consuming messages from a queue. Consumers last as long as the channel they were declared on, or until the client cancels them. The callback cb(msg) is called for each incoming message. You must call Channel#basicAck to complete the delivery, usually after you've finished some task.

    Parameters

    • params: {
          arguments?: {
              x-cancel-on-ha-failover?: boolean;
              x-priority?: number;
              [k: string]: any;
          };
          consumerTag?: string;
          exclusive?: boolean;
          noAck?: boolean;
          noLocal?: boolean;
          queue?: string;
      }
      • Optionalarguments?: {
            x-cancel-on-ha-failover?: boolean;
            x-priority?: number;
            [k: string]: any;
        }
      • OptionalconsumerTag?: string

        Specifies the identifier for the consumer. The consumer tag is local to a channel, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag.

      • Optionalexclusive?: boolean

        Request exclusive consumer access, meaning only this consumer can access the queue.

      • OptionalnoAck?: boolean

        If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application.

      • OptionalnoLocal?: boolean

        If the no-local field is set the server will not send messages to the connection that published them.

      • Optionalqueue?: string

        Specifies the name of the queue to consume from. If blank then the last declared queue (on the channel) will be used.

    • cb: ((msg: AsyncMessage) => void)

    Returns Promise<{
        consumerTag: string;
    }>

  • Request a single message from a queue. Useful for testing.

    Parameters

    • params: {
          noAck?: boolean;
          queue?: string;
      }
      • OptionalnoAck?: boolean

        If this field is set the server does not expect acknowledgements for messages. That is, when a message is delivered to the client the server assumes the delivery will succeed and immediately dequeues it. This functionality may increase performance but at the cost of reliability. Messages can get lost if a client dies before they are delivered to the application.

      • Optionalqueue?: string

        Specifies the name of the queue to consume from.

    Returns Promise<undefined | SyncMessage>

  • Parameters

    • Optionalqueue: string

    Returns Promise<undefined | SyncMessage>

  • Reject one or more incoming messages.

    Parameters

    • params: {
          deliveryTag?: number;
          multiple?: boolean;
          requeue?: boolean;
      }
      • OptionaldeliveryTag?: number
      • Optionalmultiple?: boolean

        If set to 1, the delivery tag is treated as "up to and including", so that multiple messages can be rejected with a single method. If set to zero, the delivery tag refers to a single message. If the multiple field is 1, and the delivery tag is zero, this indicates rejection of all outstanding messages.

      • Optionalrequeue?: boolean

        If requeue is true, the server will attempt to requeue the message. If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered. The default should be TRUE, according to the AMQP specification, however this can lead to an endless retry-loop if you're not careful. Messages consumed from a quorum queue will have the "x-delivery-count" header, allowing you to discard a message after too many attempted deliveries. For classic mirrored queues, or non-mirrored queues, you will need to construct your own mechanism for discarding poison messages.

        true
        

    Returns void

  • This method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration.

    If the body is a string then it will be serialized with contentType='text/plain'. If body is an object then it will be serialized with contentType='application/json'. Buffer objects are unchanged.

    If publisher-confirms are enabled, then this will resolve when the acknowledgement is received. Otherwise this will resolve after writing to the TCP socket, which is usually immediate. Note that if you keep publishing while the connection is blocked (see :BLOCKED | Connection#on('connection.blocked')) then the TCP socket buffer will eventually fill and this method will no longer resolve immediately.

    Parameters

    Returns Promise<void>

  • Send directly to a queue. Same as basicPublish({routingKey: queue}, body)

    Parameters

    • queue: string
    • body: any

    Returns Promise<void>

  • Specify quality of service.

    Parameters

    • params: {
          global?: boolean;
          prefetchCount?: number;
          prefetchSize?: number;
      }
      • Optionalglobal?: boolean

        RabbitMQ has reinterpreted this field. The original specification said: "By default the QoS settings apply to the current channel only. If this field is set, they are applied to the entire connection." Instead, RabbitMQ takes global=false to mean that the QoS settings should apply per-consumer (for new consumers on the channel; existing ones being unaffected) and global=true to mean that the QoS settings should apply per-channel.

      • OptionalprefetchCount?: number

        Specifies a prefetch window in terms of whole messages. This field may be used in combination with the prefetch-size field; a message will only be sent in advance if both prefetch windows (and those at the channel and connection level) allow it. The prefetch-count is ignored if the no-ack option is set.

      • OptionalprefetchSize?: number

        The client can request that messages be sent in advance so that when the client finishes processing a message, the following message is already held locally, rather than needing to be sent down the channel. Prefetching gives a performance improvement. This field specifies the prefetch window size in octets. The server will send a message in advance if it is equal to or smaller in size than the available prefetch size (and also falls into other prefetch limits). May be set to zero, meaning "no specific limit", although other prefetch limits may still apply. The prefetch-size is ignored if the no-ack option is set.

    Returns Promise<void>

  • This method asks the server to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered.

    Parameters

    • params: {
          requeue?: boolean;
      }
      • Optionalrequeue?: boolean

        If this field is zero, the message will be redelivered to the original recipient. If this bit is 1, the server will attempt to requeue the message, potentially then delivering it to an alternative subscriber.

    Returns Promise<void>

  • Close the channel

    Returns Promise<void>

  • Bind exchange to an exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          destination: string;
          routingKey?: string;
          source: string;
      }
      • Optionalarguments?: Record<string, any>

        A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.

      • destination: string

        Specifies the name of the destination exchange to bind.

      • OptionalroutingKey?: string

        Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation.

      • source: string

        Specifies the name of the source exchange to bind.

    Returns Promise<void>

  • Verify exchange exists, create if needed.

    Parameters

    • params: {
          arguments?: {
              alternate-exchange?: string;
              [k: string]: any;
          };
          autoDelete?: boolean;
          durable?: boolean;
          exchange: string;
          internal?: boolean;
          passive?: boolean;
          type?: string;
      }
      • Optionalarguments?: {
            alternate-exchange?: string;
            [k: string]: any;
        }
      • OptionalautoDelete?: boolean

        If set, the exchange is deleted when all queues have finished using it.

      • Optionaldurable?: boolean

        If set when creating a new exchange, the exchange will be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged if/when a server restarts.

      • exchange: string

        Exchange names starting with "amq." are reserved for pre-declared and standardised exchanges. The exchange name consists of a non-empty sequence of these characters: letters, digits, hyphen, underscore, period, or colon.

      • Optionalinternal?: boolean

        If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications.

      • Optionalpassive?: boolean

        If set, the server will reply with Declare-Ok if the exchange already exists with the same name, and raise an error if not. The client can use this to check whether an exchange exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect. Arguments are compared for semantic equivalence.

      • Optionaltype?: string

        direct, topic, fanout, or headers: Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it.

        "direct"
        

    Returns Promise<void>

  • Delete an exchange.

    Parameters

    • params: {
          exchange: string;
          ifUnused?: boolean;
      }
      • exchange: string

        Name of the exchange

      • OptionalifUnused?: boolean

        If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead.

    Returns Promise<void>

  • Unbind an exchange from an exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          destination: string;
          routingKey?: string;
          source: string;
      }
      • Optionalarguments?: Record<string, any>
      • destination: string

        Specifies the name of the destination exchange to unbind.

      • OptionalroutingKey?: string

        Specifies the routing key of the binding to unbind.

      • source: string

        Specifies the name of the source exchange to unbind.

    Returns Promise<void>

  • The specified consumer was stopped by the server. The error param describes the reason for the cancellation.

    Parameters

    • name: "basic.cancel"
    • cb: ((consumerTag: string, err: any) => void)
        • (consumerTag, err): void
        • Parameters

          • consumerTag: string
          • err: any

          Returns void

    Returns this

  • An undeliverable message was published with the "immediate" flag set, or an unroutable message published with the "mandatory" flag set. The reply code and text provide information about the reason that the message was undeliverable.

    Parameters

    Returns this

  • The channel was closed, because you closed it, or due to some error

    Parameters

    • name: "close"
    • cb: (() => void)
        • (): void
        • Returns void

    Returns this

  • This method binds a queue to an exchange. Until a queue is bound it will not receive any messages. In a classic messaging model, store-and-forward queues are bound to a direct exchange and subscription queues are bound to a topic exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          exchange: string;
          queue?: string;
          routingKey?: string;
      }
      • Optionalarguments?: Record<string, any>
      • exchange: string

        Name of the exchange to bind to.

      • Optionalqueue?: string

        Specifies the name of the queue to bind. If blank, then the last declared queue on the channel will be used.

      • OptionalroutingKey?: string

        Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the queue name is empty, the server uses the last queue declared on the channel. If the routing key is also empty, the server uses this queue name for the routing key as well. If the queue name is provided but the routing key is empty, the server does the binding with that empty routing key. The meaning of empty routing keys depends on the exchange implementation.

    Returns Promise<void>

  • Declare queue, create if needed. If queue empty or undefined then a random queue name is generated (see the return value).

    Parameters

    • params: {
          arguments?: {
              x-dead-letter-exchange?: string;
              x-dead-letter-routing-key?: string;
              x-expires?: number;
              x-max-length?: number;
              x-max-priority?: number;
              x-message-ttl?: number;
              x-overflow?: "drop-head" | "reject-publish" | "reject-publish-dlx";
              x-queue-type?: "quorum" | "classic" | "stream";
              [k: string]: any;
          };
          autoDelete?: boolean;
          durable?: boolean;
          exclusive?: boolean;
          passive?: boolean;
          queue?: string;
      }
      • Optionalarguments?: {
            x-dead-letter-exchange?: string;
            x-dead-letter-routing-key?: string;
            x-expires?: number;
            x-max-length?: number;
            x-max-priority?: number;
            x-message-ttl?: number;
            x-overflow?: "drop-head" | "reject-publish" | "reject-publish-dlx";
            x-queue-type?: "quorum" | "classic" | "stream";
            [k: string]: any;
        }
      • OptionalautoDelete?: boolean

        If set, the queue is deleted when all consumers have finished using it. The last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted. Applications can explicitly delete auto-delete queues using the Delete method as normal.

      • Optionaldurable?: boolean

        If set when creating a new queue, the queue will be marked as durable. Durable queues remain active when a server restarts. Non-durable queues (transient queues) are purged if/when a server restarts. Note that durable queues do not necessarily hold persistent messages, although it does not make sense to send persistent messages to a transient queue.

      • Optionalexclusive?: boolean

        Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

      • Optionalpassive?: boolean

        If set, the server will reply with Declare-Ok if the queue already exists with the same name, and raise an error if not. The client can use this to check whether a queue exists without modifying the server state. When set, all other method fields except name and no-wait are ignored. A declare with both passive and no-wait has no effect.

      • Optionalqueue?: string

        The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method. Queue names starting with "amq." are reserved for pre-declared and standardised queues. The queue name can be empty, or a sequence of these characters: letters, digits, hyphen, underscore, period, or colon.

    Returns Promise<{
        consumerCount: number;
        messageCount: number;
        queue: string;
    }>

  • Parameters

    • Optionalqueue: string

    Returns Promise<{
        consumerCount: number;
        messageCount: number;
        queue: string;
    }>

  • This method deletes a queue. When a queue is deleted any pending messages are sent to a dead-letter queue if this is defined in the server configuration, and all consumers on the queue are cancelled. If queue is empty or undefined then the last declared queue on the channel is used.

    Parameters

    • params: {
          ifEmpty?: boolean;
          ifUnused?: boolean;
          queue?: string;
      }
      • OptionalifEmpty?: boolean

        If set, the server will only delete the queue if it has no messages.

      • OptionalifUnused?: boolean

        If set, the server will only delete the queue if it has no consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead.

      • Optionalqueue?: string

        Specifies the name of the queue to delete.

    Returns Promise<{
        messageCount: number;
    }>

  • Parameters

    • Optionalqueue: string

    Returns Promise<{
        messageCount: number;
    }>

  • Remove all messages from a queue which are not awaiting acknowledgment. If queue is empty or undefined then the last declared queue on the channel is used.

    Parameters

    • Optionalqueue: string

    Returns Promise<{
        messageCount: number;
    }>

  • Parameters

    • params: {
          queue?: string;
      }
      • Optionalqueue?: string

    Returns Promise<{
        messageCount: number;
    }>

  • Unbind a queue from an exchange.

    Parameters

    • params: {
          arguments?: Record<string, any>;
          exchange: string;
          queue?: string;
          routingKey?: string;
      }
      • Optionalarguments?: Record<string, any>
      • exchange: string

        The name of the exchange to unbind from.

      • Optionalqueue?: string

        Specifies the name of the queue to unbind.

      • OptionalroutingKey?: string

        Specifies the routing key of the binding to unbind.

    Returns Promise<void>

  • This method commits all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a commit.

    Returns Promise<void>

  • This method abandons all message publications and acknowledgments performed in the current transaction. A new transaction starts immediately after a rollback. Note that unacked messages will not be automatically redelivered by rollback; if that is required an explicit recover call should be issued.

    Returns Promise<void>

  • Don't use this unless you know what you're doing. This method is provided for the sake of completeness, but you should use confirmSelect() instead.

    Sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods. Mutually exclusive with confirm mode.

    Returns Promise<void>