amqp-ts (AMQP TypeScript)
This is a summary. See the amqp-ts Wiki for the full documentation of the library.Table of Contents
Overview
Amqp-ts is a library for nodejs that simplifies communication with AMQP message busses written in Typescript. It has been tested on RabbitMQ. It uses the amqplib library by Michael Bridgen (squaremo).Important Changes
Starting in version 0.14 the return type of exchange.rpc and queue.rpc changed from 'Promise < any >' to 'Promise < Message >'.Starting in version 0.12 the Message class has been added. It is a more elegant way to send and receive messages. It is the preferred way to deal with sending and receiving messages.
Defining Features
- High level non opinioned library, no need to worry about channels etc.
- 'Lazy' initialization, async AMQP dependencies are resolved automatically
- Automatic reconnection, when the connection with the AMQP server fails, the whole connection and configuration is rebuilt automatically
- Written in typescript, it is compatible with the Typescript 1.6 module type definition resolution for node.js.
Current status
The library is considered production ready.It does depend on the following npm libraries:
The DefinitelyTyped tsd tool is used to manage the typescript type definitions.
Lazy Initialization
No need to nest functionality, just create a connection, declare your exchanges, queues and bindings and send and receive messages. The library takes care of any direct dependencies.If you define an exchange and a queue and bind the queue to the exchange and want to make sure that the queue is connected to the exchange before you send a message to the exchange you can call the
connection.completeConfiguration()
method and act on the promise it returns.ES6/Typescript Example
import * as Amqp from "amqp-ts";
var connection = new Amqp.Connection("amqp://localhost");
var exchange = connection.declareExchange("ExchangeName");
var queue = connection.declareQueue("QueueName");
queue.bind(exchange);
queue.activateConsumer((message) => {
console.log("Message received: " + message.getContent());
});
// it is possible that the following message is not received because
// it can be sent before the queue, binding or consumer exist
var msg = new Amqp.Message("Test");
exchange.send(msg);
connection.completeConfiguration().then(() => {
// the following message will be received because
// everything you defined earlier for this connection now exists
var msg2 = new Amqp.Message("Test2");
exchange.send(msg2);
});
Javascript Example
var amqp = require("amqp-ts");
var connection = new amqp.Connection("amqp://localhost");
var exchange = connection.declareExchange("ExchangeName");
var queue = connection.declareQueue("QueueName");
queue.bind(exchange);
queue.activateConsumer((message) => {
console.log("Message received: " + message.getContent());
});
// it is possible that the following message is not received because
// it can be sent before the queue, binding or consumer exist
var msg = new amqp.Message("Test");
exchange.send(msg);
connection.completeConfiguration().then(() => {
// the following message will be received because
// everything you defined earlier for this connection now exists
var msg2 = new amqp.Message("Test2");
exchange.send(msg2);
});
More examples can be found in the tutorials directory.
Connection Status
To know the status of the connection:connection.isConnected
. Returns true if the connection exists and false, otherwise.Events
#on('open_connection', function() {...})
It's emitted when a connection is concretized and can publish/subscribe in Rabbit Bus.#on('close_connection', function() {...})
It's emitted when a connection is closed, after calling the close method. #on('lost_connection', function() {...})
It is emitted when the connection is lost and before attempting to re-establish the connection.
#on('trying_connect', function() {...})
It is emitted during the time that try re-establish the connection.#on('re_established_connection', function() {...})
It is emitted when the connection is re-established.#on('error_connection', function(err) {...})
It's emitted when a error is registered during the connection.Automatic Reconnection
When the library detects that the connection with the AMQP server is lost, it tries to automatically reconnect to the server.What's new
version 1.4.0
- now you can return aPromise
with queue.activateConsumer
for RPC's.The result of the resolved `Promise` will be returned to the RPC caller.
version 1.3.0
- addednoCreate
creation option property for Exchange
and Queue
(expects the exchange or queue to already existin AMQP)
- improved unit testsversion 1.2.0
- addedname
property for Exchange
and Queue
and type
property for Exchange
- improved consumer cleanup for Exchange
and Queue
methods close
and delete
version 1.1.1
- added theprefetch
option to DeclarationOptions
in the amqp-ts.d.ts
fileversion 1.1.0
- fixed incorrect implementation of nack, syntax is now in line with amqplib nackversion 1.0.1
- fixed bug in automatic reconnect (exponential growth of retries hanging the application)version 1.0.0
- updated typescript definition file management from tsd to typings - added queue.prefetch and queue.recover methods - updated to version 1.0 (finally)version 0.14.4
- fixed error when using node.js version 0.10.x:path
library does not have a method parse
in 0.10.xversion 0.14.3
- improved readability of readme.md on npmjsversion 0.14.2
- multiple calls ofexchange.close
, exchange.delete
, queue.close
and queue.delete
return the same promise (and are thereby executed only once)version 0.14.1
- added extra promise rejection handling forexchange.close
, exchange.delete
, queue.close
and queue.delete
version 0.14.0
- changed the return type of exchange.rpc and queue.rpc from 'Promise < any >' to 'Promise < Message >' - added the option to return a Message in exchange.activateConsumer and queue.activateConsumer - updated the amqp-ts Wiki API documentationversion 0.13.0
- skipped to avoid bad luck :)version 0.12.0
- added Message class - added exchange.send and queue.send. - deprecated exchange.publish and queue.publish. - added exchange.activateConsumer and queue.activateConsumer. - deprecated exchange.startConsumer and queue.startConsumer. - changed connection.declareExchangeand [connection.declareQueue](https://github.com/abreits/amqp-ts/wiki/Connection%20class#declareQueue)
to prevent duplicate declaration of the same exchange/queue
- added connection.declareTopology
- added support functions getMessageContent and setMessageContent
- fixed bug in integration testversion 0.11.0
- revised amqp-ts logging, see Logging in the wiki for more details - fixed bug in tutorials library referenceversion 0.10.4
- added amqp-ts examples for the RabbitMQ tutorials - fixed a bug in the queue.rpc - fixed documentation errorsversion 0.10.3
- Moved the documentation to the wiki,only the 'Overview', 'What's new' and 'Roadmap' stay in the readme.md for npmjs and GitHub. - Improved the documentationversion 0.10.2
- rearranged this readme - added rpc support to the Queue and Exchange for RabbitMQ 'direct reply-to' RPC functionality - updated dependencies - updated the documentationversion 0.10.1
- added a 'low level' queue consumer that receives the raw message and can 'ack' or 'nack' these messages itself - cleanup integration tests - readme update and fixesversion 0.10
- added close methods to Exchange and Queue - changed Promise type for Exchange.initialized and Queue.initialized - minor readme fixes - improved robustness for unit testsversion 0.9.4 & 0.9.5
- small code cleanup: defined optional parameter default values in typescript - fixed a few bugs when publishing a message to an exchange after a disconnect/reconnectversion 0.9.3
- Added this section - Added the roadmap section - Improved the winston logging messagesRoadmap
The roadmap section describes things that I want to add or change in the (hopefully near) future.- Better source code documentation, maybe even use jsdoc or tsdoc to generate the api documentation