Node.js client library for RabbitMQ. Publish messages, declare rules for routing those messages into queues, consume messages from queues.
Why not amqplib?
Performance is comparable to amqplib (see ./benchmark.ts). Time to publish 1 million messages in batches of 50:
TOTAL (messages)=1_000_000
BATCH_SIZE=50
time per batch (milliseconds) (transient queue)
total_time mean std min max
rabbitmq-client 39128 2.465 11.713 1.000 1192.000
amqplib@0.10.3 42378 2.615 5.783 1.000 478.000
time per batch (milliseconds) (no queue)
total_time mean std min max
rabbitmq-client 23163 1.726 0.497 1.000 13.000
amqplib@0.10.3 24897 1.842 0.444 1.000 17.000
In addition to the lower-level RabbitMQ methods, this library exposes two main
interfaces, a Consumer
and a Publisher
(which should cover 90% of uses
cases), as well as a third RPCClient
for request-response communication.
import {Connection} from 'rabbitmq-client'
// Initialize:
const rabbit = new Connection('amqp://guest:guest@localhost:5672')
rabbit.on('error', (err) => {
console.log('RabbitMQ connection error', err)
})
rabbit.on('connection', () => {
console.log('Connection successfully (re)established')
})
// Consume messages from a queue:
// See API docs for all options
const sub = rabbit.createConsumer({
queue: 'user-events',
queueOptions: {durable: true},
// handle 2 messages at a time
qos: {prefetchCount: 2},
// Optionally ensure an exchange exists
exchanges: [{exchange: 'my-events', type: 'topic'}],
// With a "topic" exchange, messages matching this pattern are routed to the queue
queueBindings: [{exchange: 'my-events', routingKey: 'users.*'}],
}, async (msg) => {
console.log('received message (user-events)', msg)
// The message is automatically acknowledged when this function ends.
// If this function throws an error, then msg is NACK'd (rejected) and
// possibly requeued or sent to a dead-letter exchange
})
sub.on('error', (err) => {
// Maybe the consumer was cancelled, or the connection was reset before a
// message could be acknowledged.
console.log('consumer error (user-events)', err)
})
// Declare a publisher
// See API docs for all options
const pub = rabbit.createPublisher({
// Enable publish confirmations, similar to consumer acknowledgements
confirm: true,
// Enable retries
maxAttempts: 2,
// Optionally ensure the existence of an exchange before we use it
exchanges: [{exchange: 'my-events', type: 'topic'}]
})
// Publish a message to a custom exchange
await pub.send(
{exchange: 'my-events', routingKey: 'users.visit'}, // metadata
{id: 1, name: 'Alan Turing'}) // message content
// Or publish directly to a queue
await pub.send('user-events', {id: 1, name: 'Alan Turing'})
// Clean up when you receive a shutdown signal
async function onShutdown() {
// Waits for pending confirmations and closes the underlying Channel
await pub.close()
// Stop consuming. Wait for any pending message handlers to settle.
await sub.close()
await rabbit.close()
}
The above Consumer
& Publisher
interfaces are recommended for most cases.
These combine a few of the lower level RabbitMQ methods (exposed on the
Channel
interface) and and are much safer to use since they can recover after
connection loss, or after a number of other edge-cases you may not have
imagined. Consider the following list of scenarios (not exhaustive):
In all of these cases you would need to create a new channel and re-declare any queues/exchanges/bindings before you can start publishing/consuming messages again. And you're probably publishing many messages, concurrently, so you'd want to make sure this setup only runs once per connection. If a consumer is cancelled then you may be able to reuse the channel but you still need to check the queue and so on...
The Consumer
& Publisher
interfaces abstract all of that away by running
the necessary setup as needed and handling all the edge-cases for you.
The basic RabbitMQ methods are available on the Channel
interface for
creating/deleting queues, exchanges, or bindings, etc:
// Will wait for the connection to establish and then create a Channel
const ch = await rabbit.acquire()
// Channels can emit some events too (see documentation)
ch.on('close', () => {
console.log('channel was closed')
})
// Create a queue for the duration of this connection
await ch.queueDeclare({queue: 'my-queue'})
// Enable publisher acknowledgements
await ch.confirmSelect()
const data = {title: 'just some object'}
// Resolves when the data has been flushed through the socket or if
// ch.confirmSelect() was called: will wait for an acknowledgement
await ch.basicPublish({routingKey: 'my-queue'}, data)
const msg = ch.basicGet('my-queue')
console.log(msg)
await ch.queueDelete('my-queue')
// It's your responsibility to close any acquired channels
await ch.close()
This will create a single "client" Channel
on which you may publish messages
and listen for direct responses. This can allow, for example, two
micro-services to communicate with each other using RabbitMQ as the middleman
instead of directly via HTTP.
// rpc-server.js
const rabbit = new Connection()
const rpcServer = rabbit.createConsumer({
queue: 'my-rpc-queue'
}, async (req, reply) => {
console.log('request:', req.body)
await reply('pong')
})
process.on('SIGINT', async () => {
await rpcServer.close()
await rabbit.close()
})
// rpc-client.js
const rabbit = new Connection()
const rpcClient = rabbit.createRPCClient({confirm: true})
const res = await rpcClient.send('my-rpc-queue', 'ping')
console.log('response:', res.body) // pong
await rpcClient.close()
await rabbit.close()
Generated using TypeDoc