node-mongo
Lightweight reactive extension to official Node.js MongoDB driver.
Features
- ObjectId mapping. Automatically converts the
_id
field from theObjectId
to astring
. - ️️Reactive. Fires events as a document created, updated, or deleted from the database;
- CUD operations timestamps. Automatically sets
createdOn
,updatedOn
, anddeletedOn
timestamps for CUD operations; - Schema validation. Validates your data before saving;
- Paging. Implements high-level paging API;
- Soft delete. By default, documents don't remove from the collection, but are marked with the
deletedOn
field; - Extendable. API is easily extendable, you can add new methods or override existing ones;
- Outbox support. node-mongo can create collections with
_outbox
postfix that stores all CUD events for implementing the transactional outbox pattern;
The following example shows some of these features:
import { eventBus, InMemoryEvent } from '@paralect/node-mongo';
await userService.updateOne(
{ _id: '62670b6204f1aab85e5033dc' },
(doc) => ({ firstName: 'Mark' }),
);
eventBus.onUpdated('users', ['firstName', 'lastName'], async (data: InMemoryEvent<User>) => {
await userService.atomic.updateOne(
{ _id: data.doc._id },
{ $set: { fullName: `${data.doc.firstName} ${data.doc.lastName}` } },
);
});
Installation
npm i @paralect/node-mongo
Connect to Database
Usually, you need to define a file calleddb
that does two things:- Creates database instance and connects to the database;
- Exposes factory method
createService
to create different Services to work with MongoDB;
```typescript title=db.ts import { Database, Service, ServiceOptions, IDocument } from '@paralect/node-mongo';
import config from 'config';
const database = new Database(config.mongo.connection, config.mongo.dbName); database.connect();
class CustomService extends Service { // You can add new methods or override existing here }
function createService(collectionName: string, options: ServiceOptions = {}) { return new CustomService(collectionName, database, options); }
export default { database, createService, };
## Services
Service is a collection wrapper that adds all node-mongo features. Under the hood it uses Node.js MongoDB native methods.
`createService` method returns the service instance. It accepts two parameters: collection name and [ServiceOptions](#serviceoptions).
```typescript title=user.service.ts
import { z } from 'zod';
import db from 'db';
const schema = z.object({
_id: z.string(),
createdOn: z.date().optional(),
updatedOn: z.date().optional(),
deletedOn: z.date().optional().nullable(),
fullName: z.string(),
}).strict();
type User = z.infer<typeof schema>;
const service = db.createService<User>('users', {
schemaValidator: (obj) => schema.parseAsync(obj),
});
export default service;
```typescript title=update-user.ts import userService from 'user.service';
await userService.insertOne({ fullName: 'Max' });
## Schema validation
Node-mongo supports any schema library, but we recommend [Zod](https://zod.dev/), due to this ability to generate TypeScript types from the schemas.
### Zod
```typescript
const schema = z.object({
_id: z.string(),
createdOn: z.date().optional(),
updatedOn: z.date().optional(),
deletedOn: z.date().optional().nullable(),
fullName: z.string(),
});
type User = z.infer<typeof schema>;
const service = createService<User>('users', {
schemaValidator: (obj) => schema.parseAsync(obj),
});
Joi
const schema = Joi.object({
_id: Joi.string().required(),
createdOn: Joi.date(),
updatedOn: Joi.date(),
deletedOn: Joi.date().allow(null),
fullName: Joi.string().required(),
});
type User = {
_id: string;
createdOn?: Date;
updatedOn?: Date;
deletedOn?: Date | null;
fullName: string;
};
const service = createService<User>('users', {
schemaValidator: (obj) => schema.validateAsync(obj),
});
Node-mongo validates documents before save.
Reactivity
The key feature of thenode-mongo
is that each create, update or delete operation publishes a CUD event.${collectionName}.created
${collectionName}.updated
${collectionName}.deleted
Events are used to easily update denormalized data and also to implement complex business logic without tight coupling of different entities.
SDK support two type of events:
In-memory events
- Enabled by default;
- Events can be lost on service failure;
- Events are stored in
eventBus
(Node.js EventEmitter instance); - For handling these events type you will use Events API;
- Designed for transferring events inside a single Node.js process. Events handlers listens node-mongo
eventBus
.
Transactional events
- Can be enabled by setting
{ outbox: true }
when creating a service; - Guarantee that every database write will produce an event;
- Events are stored in special collections with
_outbox
postfix; - For handling these events type you will use
watch
(method for working with Change Streams) on the outbox table; - Designed for transferring events to messages broker like Kafka. Events handlers should listen to message broker events (You need to implement this layer yourself).
On the project start, we recommend using
in-memory
events. When your application becomes tougher you should migrate to transactional
events.Service API
find
find(
filter: Filter<T>,
readConfig: ReadConfig & { page?: number; perPage?: number } = {},
findOptions: FindOptions = {},
): Promise<FindResult<T>>
const { results: users, count: usersCount } = await userService.find(
{ status: 'active' },
);
Fetches documents that matches the filter. Returns an object with the following fields(
FindResult
):| Field | Description | | ------------- | --------| | results | documents, that matches the filter | | count | total number of documents, that matches the filter | | pagesCount | total number of documents, that matches the filter divided by the number of documents per page |
Pass
page
and perPage
params to get a paginated result. Otherwise, all documents will be returned.Parameters
- filter:
Filter<T>
; - readConfig:
ReadConfig
& { page?: number; perPage?: number }
; - findOptions:
FindOptions
;
Returns
Promise<FindResult<T>>
.findOne
findOne(
filter: Filter<T>,
readConfig: ReadConfig = {},
findOptions: FindOptions = {},
): Promise<T | null>
const user = await userService.findOne({ _id: u._id });
Fetches the first document that matches the filter. Returns
null
if document was not found.Parameters
- filter:
Filter<T>
; - readConfig:
ReadConfig
; - findOptions:
FindOptions
;
Returns
Promise<T | null>
.updateOne
updateOne: (
filter: Filter<T>,
updateFn: (doc: T) => Partial<T>,
updateConfig: UpdateConfig = {},
updateOptions: UpdateOptions = {},
): Promise<T | null>
const updatedUserWithEvent = await userService.updateOne(
{ _id: u._id },
(doc) => ({ fullName: 'Updated fullname' }),
);
const updatedUser = await userService.updateOne(
{ _id: u._id },
(doc) => ({ fullName: 'Updated fullname' }),
{ publishEvents: false }
);
Updates a single document and returns it. Returns
null
if document was not found.Parameters
- filter:
Filter<T>
; - updateFn:
(doc: T) => Partial<T>
;
- updateConfig:
UpdateConfig
; - updateOptions:
UpdateOptions
;
Returns
Promise<T | null>
.updateMany
updateMany: (
filter: Filter<T>,
updateFn: (doc: T) => Partial<T>,
updateConfig: UpdateConfig = {},
updateOptions: UpdateOptions = {},
): Promise<T[]>
const updatedUsers = await userService.updateMany(
{ status: 'active' },
(doc) => ({ isEmailVerified: true }),
);
Updates multiple documents that match the query. Returns array with updated documents.
Parameters
- filter:
Filter<T>
; - updateFn:
(doc: T) => Partial<T>
;
- updateConfig:
UpdateConfig
; - updateOptions:
UpdateOptions
;
Returns
Promise<T[]>
.insertOne
insertOne: (
object: Partial<T>,
createConfig: CreateConfig = {},
insertOneOptions: InsertOneOptions = {},
): Promise<T>
const user = await userService.insertOne({
fullName: 'John',
});
Inserts a single document into a collection and returns it.
Parameters
- object:
Partial<T>
; - createConfig:
CreateConfig
; - insertOneOptions:
InsertOneOptions
;
Returns
Promise<T>
.insertMany
insertMany: (
objects: Partial<T>[],
createConfig: CreateConfig = {},
bulkWriteOptions: BulkWriteOptions = {},
): Promise<T[]>
const users = await userService.insertMany([
{ fullName: 'John' },
{ fullName: 'Kobe' },
]);
Inserts multiple documents into a collection and returns them.
Parameters
- objects:
Partial<T>[]
; - createConfig:
CreateConfig
; - bulkWriteOptions:
BulkWriteOptions
;
Returns
Promise<T[]>
.deleteSoft
deleteSoft: (
filter: Filter<T>,
deleteConfig: DeleteConfig = {},
deleteOptions: DeleteOptions = {},
): Promise<T[]>
const deletedUsers = await userService.deleteSoft(
{ status: 'deactivated' },
);
Adds
deletedOn
field to the documents that match the query and returns them.Parameters
- filter:
Filter<T>
; - deleteConfig:
DeleteConfig
; - deleteOptions:
DeleteOptions
;
Returns
Promise<T[]>
.deleteOne
deleteOne: (
filter: Filter<T>,
deleteConfig: DeleteConfig = {},
deleteOptions: DeleteOptions = {},
): Promise<T | null>
const deletedUser = await userService.deleteOne(
{ _id: u._id },
);
Deletes a single document and returns it. Returns
null
if document was not found.Parameters
- filter:
Filter<T>
; - deleteConfig:
DeleteConfig
; - deleteOptions:
DeleteOptions
;
Returns
Promise<T | null>
.deleteMany
deleteMany: (
filter: Filter<T>,
deleteConfig: DeleteConfig = {},
deleteOptions: DeleteOptions = {},
): Promise<T[]>
const deletedUsers = await userService.deleteMany(
{ status: 'deactivated' },
);
Deletes multiple documents that match the query. Returns array with deleted documents.
Parameters
- filter:
Filter<T>
; - deleteConfig:
DeleteConfig
; - deleteOptions:
DeleteOptions
;
Returns
Promise<T[]>
.replaceOne
replaceOne: (
filter: Filter<T>,
replacement: Partial<T>,
readConfig: ReadConfig = {},
replaceOptions: ReplaceOptions = {},
): Promise<UpdateResult | Document>
await usersService.replaceOne(
{ _id: u._id },
{ fullName: fullNameToUpdate },
);
Replaces a single document within the collection based on the filter. Doesn't validate schema or publish events.
Parameters
- filter:
Filter<T>
; - replacement:
Partial<T>
; - readConfig:
ReadConfig
; - replaceOptions:
ReplaceOptions
;
Returns
Promise<
UpdateResult |
Document>
.atomic.updateOne
updateOne: (
filter: Filter<T>,
updateFilter: UpdateFilter<T>,
readConfig: ReadConfig = {},
updateOptions: UpdateOptions = {},
): Promise<UpdateResult>
await userService.atomic.updateOne(
{ _id: u._id },
{ $set: { fullName: `${u.firstName} ${u.lastName}` } },
);
Updates a single document. Doesn't validate schema or publish events.
Parameters
- filter:
Filter<T>
; - updateFilter:
UpdateFilter<T>
; - readConfig:
ReadConfig
; - updateOptions:
UpdateOptions
;
Returns
Promise<
UpdateResult>
.atomic.updateMany
updateMany: (
filter: Filter<T>,
updateFilter: UpdateFilter<T>,
readConfig: ReadConfig = {},
updateOptions: UpdateOptions = {},
): Promise<Document | UpdateResult>
await userService.atomic.updateMany(
{ firstName: { $exists: true }, lastName: { $exists: true } },
{ $set: { fullName: `${u.firstName} ${u.lastName}` } },
);
Updates all documents that match the specified filter. Doesn't validate schema or publish events.
Parameters
- filter:
Filter<T>
; - updateFilter:
UpdateFilter<T>
; - readConfig:
ReadConfig
; - updateOptions:
UpdateOptions
;
Returns
Promise<
UpdateResult |
Document>
.exists
exists(
filter: Filter<T>,
readConfig: ReadConfig = {},
findOptions: FindOptions = {},
): Promise<boolean>
const isUserExists = await userService.exists(
{ email: 'example@gmail.com' },
);
Returns true if document exists, otherwise false.
Parameters
- filter:
Filter<T>
; - readConfig:
ReadConfig
; - findOptions:
FindOptions
;
Returns
Promise<boolean>
.countDocuments
countDocuments(
filter: Filter<T>,
readConfig: ReadConfig = {},
countDocumentOptions: CountDocumentsOptions = {},
): Promise<boolean>
const documentsCount = await userService.countDocuments(
{ status: 'active' },
);
Returns amount of documents that matches the query.
Parameters
- filter:
Filter<T>
; - readConfig:
ReadConfig
; - countDocumentOptions:
CountDocumentsOptions
;
Returns
Promise<number>
.distinct
distinct(
key: string,
filter: Filter<T>,
readConfig: ReadConfig = {},
distinctOptions: DistinctOptions = {},
): Promise<any[]>
const statesList = await userService.distinct('states');
Returns distinct values for a specified field across a single collection or view and returns the results in an array.
Parameters
- key:
string
; - filter:
Filter<T>
; - readConfig:
ReadConfig
; - distinctOptions:
DistinctOptions
;
Returns
Promise<any[]>
.aggregate
aggregate: (
pipeline: any[],
options: AggregateOptions = {},
): Promise<any[]>
const sortedActiveUsers = await userService.aggregate([
{ $match: { status: "active" } },
{ $sort: { firstName: -1, lastName: -1 } }
]);
Executes an aggregation framework pipeline and returns array with aggregation result of documents.
Parameters
- pipeline:
any[]
; - options:
AggregateOptions
;
Returns
Promise<any[]>
.watch
watch: (
pipeline: Document[] | undefined,
options: ChangeStreamOptions = {},
): Promise<any>
const watchCursor = userService.watch();
Creates a new Change Stream, watching for new changes and returns a cursor.
Parameters
- pipeline:
Document[] | undefined
; - options:
ChangeStreamOptions
;
Returns
Promise<any>
.drop
drop: (
recreate: boolean = false,
): Promise<void>
await userService.drop();
Removes a collection from the database. The method also removes any indexes associated with the dropped collection.
Parameters
- recreate:
boolean
;
Returns
Promise<void>
.indexExists
indexExists: (
indexes: string | string[],
indexInformationOptions: IndexInformationOptions = {},
): Promise<boolean>
const isIndexExists = await usersService.indexExists(index);
Checks if one or more indexes exist on the collection, fails on first non-existing index.
Parameters
- indexes:
string | string[]
; - indexInformationOptions:
IndexInformationOptions
;
Returns
Promise<string | void>
.createIndex
createIndex: (
indexSpec: IndexSpecification,
options: CreateIndexesOptions = {},
): Promise<string | void>
await usersService.createIndex({ fullName: 1 });
Creates collection index.
Parameters
- indexSpec:
IndexSpecification
; - options:
CreateIndexesOptions
;
Returns
Promise<string | void>
.createIndexes
createIndexes: (
indexSpecs: IndexDescription[],
options: CreateIndexesOptions = {},
): Promise<string[] | void>
await usersService.createIndexes([
{ key: { fullName: 1 } },
{ key: { createdOn: 1 } },
]);
Creates one or more indexes on a collection.
Parameters
- indexSpecs:
IndexDescription[]
; - options:
CreateIndexesOptions
;
Returns
Promise<string[] | void>
.dropIndex
dropIndex: (
indexName: string,
options: DropIndexesOptions = {},
): Promise<void | Document>
await userService.dropIndex({ firstName: 1, lastName: -1 });
Removes the specified index from a collection.
Parameters
- indexName:
string
; - options:
DropIndexesOptions
;
Returns
Promise<void | Document>
.dropIndexes
dropIndexes: (
options: DropIndexesOptions = {},
): Promise<void | Document>
Removes all but the
_id
index from a collection.await userService.dropIndexes();
Parameters
- options:
DropIndexesOptions
;
Returns
Promise<void | Document>
.Events API
eventBus.on
on: (
eventName: string,
handler: InMemoryEventHandler,
): void
import { eventBus, InMemoryEvent } from '@paralect/node-mongo';
const collectionName = 'users';
eventBus.on(`${collectionName}.created`, (data: InMemoryEvent<User>) => {
try {
const user = data.doc;
console.log('user created', user);
} catch (err) {
logger.error(`${USERS}.created handler error: ${err}`);
}
});
eventBus.on(`${collectionName}.updated`, (data: InMemoryEvent<User>) => {});
eventBus.on(`${collectionName}.deleted`, (data: InMemoryEvent<User>) => {});
In-memory events handler that listens for a CUD events.
Parameters
- eventName:
string
;
Valid format:
${collectionName}.created
, ${collectionName}.updated
, ${collectionName}.deleted
.- handler:
InMemoryEventHandler
;
Returns
void
.eventBus.once
once: (
eventName: string,
handler: InMemoryEventHandler,
): void
eventBus.once(`${USERS}.updated`, (data: InMemoryEvent<User>) => {
try {
const user = data.doc;
console.log('user updated', user);
} catch (err) {
logger.error(`${USERS}.updated handler error: ${err}`);
}
});
In-memory events handler that listens for a CUD events. It will be called only once.
Parameters
- eventName:
string
;
Valid format:
${collectionName}.created
, ${collectionName}.updated
, ${collectionName}.deleted
.- handler:
InMemoryEventHandler
;
Returns
void
.eventBus.onUpdated
onUpdated: (
entity: string,
properties: OnUpdatedProperties,
handler: InMemoryEventHandler,
): void
import { eventBus, InMemoryEvent } from '@paralect/node-mongo';
eventBus.onUpdated('users', ['firstName', 'lastName'], async (data: InMemoryEvent<User>) => {
try {
await userService.atomic.updateOne(
{ _id: data.doc._id },
{ $set: { fullName: `${data.doc.firstName} ${data.doc.lastName}` } },
);
} catch (err) {
console.log(`users onUpdated ['firstName', 'lastName'] handler error: ${err}`);
}
});
eventBus.onUpdated('users', [{ fullName: 'John Wake', firstName: 'John' }, 'lastName'], () => {});
eventBus.onUpdated('users', ['oauth.google'], () => {});
In-memory events handler that listens for specific fields updates. It will be called when one of the provided
properties
updates.Parameters
- entity:
string
;
- properties:
OnUpdatedProperties
;
- handler:
InMemoryEventHandler
;
Returns
void
.Transactions API
withTransaction
withTransaction: <TRes = any>(
transactionFn: (session: ClientSession) => Promise<TRes>,
): Promise<TRes>
Runs callbacks and automatically commits or rollbacks transaction.
import db from 'db';
const { user, company } = await db.withTransaction(async (session) => {
const createdUser = await usersService.insertOne({ fullName: 'Bahrimchuk' }, {}, { session });
const createdCompany = await companyService.insertOne(
{ users: [createdUser._id] }, {},
{ session },
);
return { user: createdUser, company: createdCompany };
});
Parameters
- transactionFn:
(session: ClientSession) => Promise<TRes>
;
Promise
.Returns
Promise<TRes>
.Options and Types
ServiceOptions
interface ServiceOptions {
skipDeletedOnDocs?: boolean,
schemaValidator?: (obj: any) => Promise<any>,
publishEvents?: boolean,
addCreatedOnField?: boolean,
addUpdatedOnField?: boolean,
outbox?: boolean,
collectionOptions?: CollectionOptions;
collectionCreateOptions?: CreateCollectionOptions;
}
| Option | Description |Default value| | ------------- | --------|----| |
skipDeletedOnDocs
|Skip documents with the deletedOn
field|true
|
|schemaValidator
|Validation function that will be called on data save|-|
|publishEvents
|Publish CUD events on save.|true
|
|addCreatedOnField
|Set the createdOn
field to the current timestamp on document creation.|true
|
|addUpdatedOnField
|Set updateOne
field to the current timestamp on the document update.|true
|
|outbox
|Use transactional events instead of in-memory events|false
|
|collectionOptions
|MongoDB CollectionOptions|{}
|
|collectionCreateOptions
|MongoDB CreateCollectionOptions|{}
|CreateConfig
Overrides ServiceOptions
parameters for create operations.type CreateConfig = {
validateSchema?: boolean,
publishEvents?: boolean,
};
ReadConfig
Overrides ServiceOptions
parameters for read operations.type ReadConfig = {
skipDeletedOnDocs?: boolean,
};
UpdateConfig
Overrides ServiceOptions
parameters for update operations.type UpdateConfig = {
skipDeletedOnDocs?: boolean,
validateSchema?: boolean,
publishEvents?: boolean,
};
DeleteConfig
Overrides ServiceOptions
parameters for delete operations.type DeleteConfig = {
skipDeletedOnDocs?: boolean,
publishEvents?: boolean,
};
InMemoryEvent
type InMemoryEvent<T = any> = {
doc: T,
prevDoc?: T,
name: string,
createdOn: Date
};
InMemoryEventHandler
type InMemoryEventHandler = (evt: InMemoryEvent) => Promise<void> | void;
OnUpdatedProperties
type OnUpdatedProperties = Array<Record<string, unknown> | string>;
Extending API
Extending API for a single service.const service = db.createService<User>('users', {
schemaValidator: (obj) => schema.parseAsync(obj),
});
const privateFields = [
'passwordHash',
'signupToken',
'resetPasswordToken',
];
const getPublic = (user: User | null) => _.omit(user, privateFields);
export default Object.assign(service, {
updateLastRequest,
getPublic,
});
Extending API for all services.
const database = new Database(config.mongo.connection, config.mongo.dbName);
class CustomService<T extends IDocument> extends Service<T> {
createOrUpdate = async (query: any, updateCallback: (item?: T) => Partial<T>) => {
const docExists = await this.exists(query);
if (!docExists) {
const newDoc = updateCallback();
return this.insertOne(newDoc);
}
return this.updateOne(query, (doc) => updateCallback(doc));
};
}
function createService<T extends IDocument>(collectionName: string, options: ServiceOptions = {}) {
return new CustomService<T>(collectionName, database, options);
}
const userService = createService<UserType>('users', {
schemaValidator: (obj) => schema.parseAsync(obj),
});
await userService.createOrUpdate(
{ _id: 'some-id' },
() => ({ fullName: 'Max' }),
);