@ambassify/queue

Queue implementation for node

Stats

stars 🌟issues ⚠️updated 🛠created 🐣size 🏋️‍♀️
@ambassify/queue
Minified + gzip package size for @ambassify/queue in KB

Readme

Queue

CircleCI

This library acts as a wrapper around different queue implementations that we might end up using.

Currently implemented backends: SQS

API

The public API that each queue exposes is defined as in Queue. An implementation for a new backend can be created using a new class that extends the public API. The public API is:

  • Queue.create( QueueType : class, ...args ) args is passed to the constructor of the QueueType.
  • constructor( queueName : string, options : object )
  • Available options are: itemPoolSize : int
  • getName() : string
  • getItemPool() : ItemPool ( see item-pool.js )
  • receive( count : int ) : Promise Attempt to receive at most count items
  • release( item : object, handled : boolean ) : Promise Release the item, if not handled the item will not be deleted from the queue. handled defaults to false.
  • touch( item : object, options : object ) : Promise touch / ping a message to keep it in use.
  • send( body : object ) : Promise submit a new queue item
  • connect() : Promise
  • start() : Promise Start watching the queue for new items
  • stop() : void Stop watching the queue for new items, a final batch might still arrive after calling stop()
  • lock( item : object, options: object ) : Promise Prevents a message from re-entering the queue.
  • unlock( item : object ) : Promise Release an earlier acquired lock.
  • on( event : string, callback : function ) : void Attach an eventhandler to the queue.
  • message event is triggered for each queue item that arrives.
  • error event is triggered for errors in the _eventLoop or _lock.

The public API will then call into the implementation specific methods through an internal API that each implementation should implement. The required private methods are:

  • _fetch( itemsToFetch : int ) : Promise Request itemsToFetch items from the queue. Do not perform any mutations on the raw object before resolving them.
  • _transform( item : object ) : object This method will receive the items retrieved using _fetch one by one, you can return altered objects from this method to change the queue items.
  • _delete( item : object ) : Promise Remove the item from the queue / mark as finished. This method should always receive the instance from the _transform step, such that you could add hidden fields to identify the item.
  • _touch( item : object, options: object ) : Promise Touch the message to keep it from becoming visible again.
  • _send( item : object ) : Promise Add item to the queue.
  • _connect() : Promise Start to connect with the backend.
  • _lock() : Promise Prevents a message from re-entering the queue. Default implementation uses queue.touch.
  • _unlock() : Promise Releases the lock and allows the item to re-enter the queue.

Libraries

  • BatchOperation Utility to batch batchSize items unless timeout expires. The SQS implementation uses this to batch delete and send operations.
  • ItemPool Currently only a counter which ensure no more than the poolsize amount of items are in flight.
  • sleep Returns a promise that resolves after a timeout.

Runtime configuration options

Configuration can be done through environment variables, options are:

  • BATCH_SIZE defaults to 10
  • QUEUE_POOL_SIZE defaults to 20
  • SQS_AWS_REGION defaults to AWS_REGION environment variable.
  • SQS_FETCH_WAIT defaults to 20 seconds

If you find any bugs or have a feature request, please open an issue on github!

The npm package download data comes from npm's download counts api and package details come from npms.io.