@leisurelink/magicbus

A message bus framework using RabbitMQ.

Stats

StarsIssuesVersionUpdatedCreatedSize
@leisurelink/magicbus
032.3.35 years ago6 years agoMinified + gzip package size for @leisurelink/magicbus in KB

Readme

magicbus

Helps messages get on the bus that takes me to you.

A message bus framework implementing configurable pipelines to prepare messages to be published to and consumed from RabbitMQ. Internally, the primary library for interacting with RabbitMQ is amqplib.

Important Note for version 2.0!

There are breaking changes in the way you connect for version 2.0. If you are using a connection string, then you should be ok. Otherwise, the connectionInfo options are as below:

let broker = magicbus.createBroker('service.domain', 'app', {
  name: 'default',      // connection name
  server: 'docker.dev', // server hostname (can be a list, separated by ,)
  port: 5672,           // server port (can be a list, separated by , with the same # of entries as s erver)
  heartbeat: 30,        // heartbeat timer, 30 seconds if omitted
  protocol: 'amqp://',  // protocol, usually ok to default
  user: 'guest',        // user name
  pass: 'guest',        // password
  vhost: '%2f',         //vhost to connect to, defaults to '%2f' which is '/'
  timeout: 0,           // connection timeout, defaults to never timeout
  // used for TLS connections
  certPath: null,       // certificate file path
  keyPath: null,        // key file path
  caPath: null,         // certificate authority file path(s), separated by ','
  passphrase: null,     // certificate passphrase
  pfxPath: null         // pfx file path
});

What Does This Add to amqplib?

  • A simple interface for application code to use (you'll never have to work directly with a channel)
  • Setup local topology following the LeisureLink Event Bus Architecture (cross-app topology/bindings out of scope)
  • Pluggable "envelope" formats to be interoperable with other opinionated frameworks
  • Middleware pipelines for producers and consumers
  • Content serialization/deserialization
  • Dispatch of messages to handlers based on message type when consuming multiple message types from a single queue
  • Connection management (single connection, re-establishing a lost connection)

Installation

$ npm install @leisurelink/magicbus

Usage

Usage is a broad topic due to the number of potential scenarios. A bare-bones pub-sub scenario is described below. For more examples, including best practices for usage in LeisureLink's "Blue" system, see the messaging-examples repo.

Publishing App

var magicbus = require('@leisurelink/magicbus');
var broker = magicbus.createBroker('domain-one', 'my-api', 'amqp://guest:guest@docker.dev/');

var publisher = magicbus.createPublisher(broker);

publisher.publish('publisher-executed', {
  some: 'data'
});

Subscribing App

var magicbus = require('@leisurelink/magicbus');

var broker = magicbus.createBroker('domain-two', 'my-worker', 'amqp://guest:guest@docker.dev/');

var subscriber = magicbus.createSubscriber(broker);

subscriber.on('publisher-executed', function(eventName, data, rawMessage) {
  console.log('The publisher was executed!');
  console.log(data);
});

subscriber.startSubscription();

Events

The magicbus library will emit the following events (subscribed to via the on function).

log (compatible with @leisurelink/skinny-loggins consumeFrom function)

Example event data:

{
  kind: 'silly|debug|verbose|info|warn|error',
  message: 'message',
  namespace: 'magicbus[.specific namespace]',
  err: new Error('magicbus error')
}

unhandled-error

Fired when a subscriber event handler or consumer handler returns or throws an Error.

Example event data:

{
  data: /* deserialized message payload */,
  messageTypes: /* deserialized message type(s), array */,
  message: /* raw rabbitmq message */,
  error: /* the Error */
}

unhandled-middleware-error

Fired when middleware returns an Error.

Example event data:

{
  message: /* raw rabbitmq message */,
  error: /* the Error */
}

unhandled-event

Fired when a subscriber receives a message that does not have a corresponding handler.

Example event data:

{
  data: /* deserialized message payload */,
  messageTypes: /* deserialized message type(s), array */,
  message: /* raw rabbitmq message */
}

Cross-app Bindings

Since the publisher and subscriber above are in two two separate domains, and it's assumed that one app has no permission on any of the exchanges/queues in the other app's security domain, the framework does not setup any bindings. You'll need to use a Binder to bind the subscriber's queue to the publisher's exchange for messages to reach the subscriber. Typically this is done by a configuration app with elevated permissions.

Framework Components

Broker

  • Maintains a single connection to a single RabbitMQ server/vhost
  • Maintains channels for each producer/consumer in the app
  • Creates local (as opposed to cross-app) exchanges/queues/bindings for producers/consumers
  • Provides delayed retry support for consumers (planned, not implemented yet)

You should have a single Broker for your entire deployable. That way the same connection is used between all your publishing and consuming routes, which is thought to be a RabbitMQ best practice.

Publisher

Use a Publisher to publish events and send commands through RabbitMQ.

createPublisher(broker, configurator)

Creates a new instance of Publisher.

  • broker is an instance of the Broker class, configured for connection to your desired endpoint
  • configurator is a function that will be called and allow you to override default implementations of internal components

#publish(eventName, data, options)

Publish an event.

  • eventName is a required string
  • data is an optional parameter of any type
  • options is an optional collection of publishing options, overriding the options from the constructor

This method is asynchronous and returns a promise.

#send(message, messageType, options)

Send a command/message.

  • message is a require parameter of any type
  • messageType is an optional string
  • options is an optional collection of publishing options

This method is asynchronous and returns a promise.

Consumer

Use a Consumer to consume all messages from a RabbitMQ queue. The consumer does not handle dispatching messages by message type so there are only limited scenarios where you want to use a Consumer directly. You probably want to use a Subscriber.

createConsumer(broker, configurator)

Creates a new instance of Consumer.

  • broker is an instance of the Broker class, configured for connection to your desired endpoint
  • configurator is a function that will be called and allow you to override default implementations of internal components

#startConsuming(handler, options)

Register a handler for messages returned from a queue.

  • handler is a required function with the signature described below
  • options is an optional collection of consumption options, overriding the options from the constructor

Handler Signature

function handleMessage(message, messageTypes, rawMessage) {
  //Do work
}

Messages will be acked as long as the handler doesn't throw. If the handler does throw, the message will either be:

  • failed and discarded or placed on a failure queue depending on subscriber configuration
  • re-queued and retried later

The type of error determines how the message is treated. Programmer errors will be treated as "unrecoverable" and will not be retried. Operational errors will be retried. See this article for a description of the difference. Need to define our best guess at differentiating the two.

Asynchronous handlers should return a promise. They should reject the promise using the same guidelines that applies for throwing errors from synchronous handlers.

Subscriber

Use a Subscriber to consume events and commands from RabbitMQ and have those messages dispatched to handlers based on message type.

createSubscriber(broker, configurator)

Creates a new instance of Subscriber.

  • broker is an instance of the Broker class, configured for connection to your desired endpoint
  • configurator is a function that will be called and allow you to override default implementations of internal components

#on(eventNames, handler)

Register a handler for an event.

  • eventNames is a required single string or regex, or an array of strings or regex. To handle all messages, use /.*/.
  • handler is a required function with the signature described below

Handler Signature

NOTE The order of arguments on a subscriber handler is different from the order of arguments on a consumer handler.

function handleFooCreated(eventName, data, rawMessage) {
  //Do work
}

Message acknowledgement is the same as with a Consumer handler. Asynchronous handlers should return a Promise. If multiple handlers are matched for a given event, only the first handler (by order of registration) is executed.

#startSubscription(options)

Starts consuming from the queue.

  • options is an optional collection of consumption options, overriding the options from the constructor

Don't call this until you've finished registering your handlers or you may end up with unhandled messages that you would have handled if your handler registration were complete.

This method is asynchronous and returns a promise.

Binder

Link a publishing route to a consuming route by binding an exchange to a queue. Typically useful in configuration apps and integration tests.

createBinder(connectionInfo, configurator)

Creates a new instance of Binder with the specified connection info. Connection info can be any url that amqplib's connect method supports.

#bind(publishingRoute, consumingRoute, options)

Configures a binding from the publishing route to the consuming route.

Extension Points

Magic bus provides default implementations of all components so you can be up and running quickly. However all the messaging parties also allow you to inject custom implementations of any component used in the message pipeline.

Envelopes

An "envelope" is responsible for producing an initial message at the beginning of the producer messaging pipeline. In the consumer pipeline, it is responsible for reading values required by the messaging pipeline from wherever it put them when it produced the message.

The initial message must have a payload property which will be serialized into a Buffer to pass as the content parameter to the amqplib publish method. The properties property of the initial message will be used as the options parameter of the publish method so documentation of those options applies.

The default envelope creates messages with the following shape:

{
  properties: {
    contentType: 'application/prs.magicbus+json',
    type: '<the type>'
  },
  payload: <the data>
}

Choosing a Content Type

The content type was chosen based on information from this standards doc and this Wikipedia article. Specifically, our content type is in the "Personal or Vanity Tree" because at this time, MagicBus is a "product[] that [is] not distributed commercially". When building an envelope to interoperate with another system, you probably want your envelope to use the other system's content type.

Content Serializers

In the producer messaging pipeline, a content serializer is responsible for converting the payload property of the message into a Buffer. In the consumer messaging pipeline it is responsible for the reverse: converting the content property of the message received by the amqplib consume callback into an appropriate data structure.

The default content serializer uses JSON.

Middleware

Middleware can be added to any messaging party to manipulate the message produced or consumed. In the producer pipeline, middleware runs after the envelope creates the initial message, and before the payload is serialized. In the consumer pipeline, middleware runs after the content is deserialized and before the consumed message is sent to any handlers registered by the application. Middleware functions should have the following signature:

function MyCoolMiddleware(message, actions) {
  //do something cool
}
// ...
publisher.use(MyCoolMiddleware);
// ...
consumer.use(MyOtherCoolMiddleware);
// ...
subscriber.use(YetAnotherMiddleware);

The message parameter will always be a "complete" message, either created by an envelope, or provided to the amqplib consume callback. The actions available are different for producer and consumer middleware.

There is no default middleware. The messaging-examples repo demonstrates some of the middleware available from other repos.

Producer Middleware

The actions available to producer middleware (both publisher and sender) are:

  • actions.next() - proceed to the next step.
  • actions.error(err) - abort publishing with the associated error.

Consumer Middleware

The actions available to consumer middleware (both subscriber and receiver) ar:

  • actions.next() - proceed to the next step.
  • actions.error(err) - abort consumption of this message with the associated error.
  • actions.ack() - acknowledge the message (removing it from the queue) and stop processing.
  • actions.nack() - abort processing of this message, but leave it in the queue.
  • actions.reject() - abort processing of this message and remove it from the queue. Results in a NACK. Depending on route pattern and queue setup, may result in a requeue to a failure queue.

Customizing the Message Pipeline

The main methods of all messaging parties are implemented as template methods so you can override an individual piece of the message pipeline if needed.

Contributing

Running Tests

Run all tests with the usual command:

$ npm test

This will run all the tests, including integration tests. To run just the unit tests, run:

$ npm run-script test:unit

To run just the integration tests, run:

$ RABBITMQ_HOST=$YOUR_RABBIT_HOST_NAME_OR_IP npm run-script test:integration

Setting Up For Integration Tests

You'll need access to a running RabbitMQ server. The easiest way to get rabbit running is with this docker command:

$ docker create --name rabbitmq -p 5672:5672 -p 15672:15672 \
  rabbitmq:management

You can also download and install it locally for free.

The integration tests will automatically create the necessary exchanges, queues, and bindings.

Style Guidelines

Prevent git from messing up the line endings on windows: git config --global core.autocrlf false

License

MIT license

If you find any bugs or have a feature request, please open an issue on github!

The npm package download data comes from npm's download counts api and package details come from npms.io.