Download as:
Rating : ⭐⭐⭐⭐⭐
Price: $10.99
Language:EN
Pages: 159
Words: 39196

Clearperiod the counter key string key number inc let counter this

What is Moleculer?

Features

support versioned services support Streams

service mixins

master-less architecture, all nodes are equal parameter validation with fastest-validator

built-in metrics feature with reporters (Console, CSV, Datadog, Event, Prometheus, StatsD) built-in tracing feature with exporters (Console, Datadog, Event, Jaeger, Zipkin)

Broker

Create a ServiceBroker

Create broker with default settings:
Create broker with custom settings:

const { ServiceBroker } = require("moleculer"); const broker = new ServiceBroker({

nodeID: "my-node"

Create broker with transporter to communicate with remote nodes:

requestTimeout: 5 * 1000

});

Metadata option

});

The metadata property can be obtained by running $node.list action. The metadata property is transferred to other nodes.

Ping

Ping a node with 1 second timeout

Output

nodeID: 'node-123', elapsedTime: 16,

timeDiff: -3

Ping multiple nodes

Output

"node-100": {

nodeID: 'node-100', elapsedTime: 10,

timeDiff: 850

}

Ping all available nodes

Output

nodeID: 'node-100', elapsedTime: 10,

timeDiff: -2

},

"node-102": {

Properties of ServiceBroker

Methods of ServiceBroker

Name Response Description

broker.stop() Promise Stop broker.

broker.repl() - Start REPL mode.

Promise Wait for services.

broker.call(actionName, params,

Promise

Multiple service calling.

broker.broadcast(eventName, payload, opts) - Broadcast an event.

broker.broadcastLocal(eventName, payload, opts)

event name.

broker.generateUid() String Generate an UUID/token.

broker.isMetricsEnabled() Boolean Check the metrics feature is enabled.

broker.isTracingEnabled() Boolean Check the tracing feature is enabled.

Global error handler

Catch, handle & log the error

});

Catch & throw further the error

const broker = new ServiceBroker({ errorHandler(err, info) {

Configuration

Broker options

These options can be used in ServiceBroker constructor or in moleculer.config.js file.

List of all available broker options:

namespace: String - Namespace of nodes to segment your nodes on the same network (e.g.: “development”, “staging”, “production”). Default: ""

requestTimeout: Number - Number of milliseconds to wait before reject a request with a RequestTimeout error.

Disabled: 0 Default: 0

heartbeatInterval: Number - Number of seconds to send heartbeat packet to other nodes. Default: 5

heartbeatTimeout: Number - Number of seconds to wait before setting remote nodes to unavailable status in Registry. Default: 15

transit.maxChunkSize Number - Maximum chunk size while streaming. Default: 256KB transit.disableReconnect: Boolean - Disables the reconnection logic while starting a broker. Default: false transit.disableVersionCheck: Boolean - Disable protocol version checking logic in Transit. Default: false transit.packetLogFilter: Array - Filters out the packets in debug log messages. It can be useful to filter out the HEARTBEAT packets while debugging. Default: []

uidGenerator: Function - Custom UID generator function for Context ID.

metrics: - Enable & configure metrics feature. Default:

tracing: Boolean | Object - Enable & configure tracing feature. Default: false internalServices: Boolean | Object - Register internal services at start. Default: true internalServices.$node - Object - Extend internal services with custom actions. Default: null internalMiddlewares: Boolean - Register internal middlewares. Default: true

than maxSafeObjectSize value. Default: null

created: Function - Fired when the broker created. Default: null

Full options object

{

namespace: "dev", nodeID: "node-25",

delay: 100,

maxDelay: 1000,

heartbeatInterval: 5,

heartbeatTimeout: 15,

registry: {

strategy: "RoundRobin", preferLocal: true

minRequestCount: 20,

halfOpenTime: 10 * 1000,

maxQueueSize: 100,

},

cacher: "MemoryLRU", serializer: "JSON",

validator: true, errorRegenerator: null,

},

tracing: {

internalServices: true, internalMiddlewares: true,

hotReload: true,

},

skipProcessEventRegistration: false, maxSafeObjectSize: null,

Services

Schema

The schema has some main parts: name, version, settings, actions, methods, events.

Simple service schema to define two actions

// math.service.js module.exports = {

sub(ctx) {

return Number(ctx.params.a) - Number(ctx.params.b);

Base properties

// posts.v1.service.js module.exports = {

name: "posts", version: 1

// posts.v2.service.js module.exports = {

name: "posts", version: 2, actions: {

broker.call("v2.posts.find");

REST call

Via API Gateway, make a request to GET /v2/posts/find.

Settings

transport: "mailgun"

},

}

}

Internal Settings

Name Type Default Description

$noServiceNamePrefix Boolean false Disable service name prefixing in action names.

Secure service settings

property in service settings and set the protected property keys. The protected settings won’t be published to other nodes and it won’t appear in Service Registry. These settings will only available under this.settings inside the service functions.

service: 'gmail', auth: {

user: 'gmail.user@gmail.com', pass: 'yourpass'

};

Mixins

Mixins are a flexible way to distribute reusable functionalities for Moleculer services. The Service constructor merges these mixins with the current schema. When a service uses mixins, all properties present in the mixin will be “mixed” into the current service.

mixins: [ApiGwService] settings: {

// Change port setting port: 8080

}

}

Merge algorithm

Property Algorithm

settings Deep extend with defaultsDeep.

metadata Deep extend with defaultsDeep.

dependencies Merge & overwrite.

any custom Merge & overwrite.

Merge algorithm examples

Actions

// math.service.js module.exports = {

name: "math", actions: {

// the `handler` function is required! mult: {

cache: false, params: {

return Number(ctx.params.a) * Number(ctx.params.b);

}

const res = await broker.call("math.add", { a: 5, b: 7 }); const res = await broker.call("math.mult", { a: 10, b: 31 });

Inside actions, you can call other nested actions in other services with ctx.call method. It is an alias to broker.call, but it sets itself as parent context (due to correct tracing chains).

let post = posts[ctx.params.id];

// Populate the post.author field through "users" service

return post;

}

Events

// report.service.js module.exports = {

name: "report",

},

// Subscribe to all "user.*" events "user.*"(ctx) {

this.logger.info(`Node '${ctx.params.id}' is connected!`);

}

Grouping

// payment.service.js module.exports = {

name: "payment", events: {

}

}

Methods

handlers).

Usage

// mailer.service.js module.exports = {

}

},

});

}

name: "posts",

methods: {

}

}

Lifecycle Events

// www.service.js module.exports = {

name: "www", actions: {...},

`broker.createService`)

},

async started() {

// Fired when broker starts this service (in `broker.start()`)

};

Dependencies

If your service depends on other services, use the dependencies property in the schema. The service waits for dependent services before calls the started lifecycle event handler.

dependencies: [

"likes", // shorthand w/o version "v2.auth", // shorthand w version

this.logger.info("It will be called after all dependent services are available."); const users = await this.broker.call("users.list");

}

Wait for services via ServiceBroker

Parameters

Parameter Type Default Description

services String or Array - Service list to waiting

Example

// Called after the `posts` & `v2.users` services are available

});

Set timeout & interval

});

Metadata

The Service schema has a metadata property. You can store here any meta information about service. You can access it as this.metadata inside service functions.

},

actions: { ... }

Properties of Service Instances

this.Promise Promise Class of Promise (Bluebird)

this.logger Logger Logger instance

Service Creation

broker.createService()

add(ctx) {

return Number(ctx.params.a) + Number(ctx.params.b);

Load service from file

math.service.js

// Export the schema of service module.exports = {

name: "math", actions: {

return Number(ctx.params.a) - Number(ctx.params.b);

}

Load it with broker:

// Load service broker.loadService("./math.service");

// Start broker broker.start();

actions: {

add(ctx) {

}

}

let users = [. ];

return {

}

};

Load multiple services from a folder

Syntax
Example

// Load every *.service.js file from the current folder (including subfolders) broker.loadServices("./");

// Load every user*.service.js file from the "./svc" folder broker.loadServices("./svc", "user*.service.js");

Hot Reloading Services

Enable in broker options

broker.loadService("./services/test.service.js");

Enable it in Moleculer Runner

Turn it on with --hot or -H flags.

Local Variables

If you would like to use local properties/variables in your service, declare them in the created event handler.

Example for local variables

const http = require("http"); module.exports = {

created() {

// Create HTTP server

},

stopped() {

res.end("Hello Moleculer!");

}

Naming restriction

ES6 Classes

Native ES6 classes with schema parsing

Define actions and events handlers as class methods and call the parseServiceSchema method in constructor with schema definition where the handlers pointed to these class methods.

const Service = require("moleculer").Service; class GreeterService extends Service {

},

dependencies: [ "auth", "users"

actions: {

hello: this.hello, welcome: {

name: "string"

},

"user.created": this.userCreated

},

)

};

return `Welcome, ${this.settings.upperCase ? name.toUpperCase() : name}`;

}

}

serviceStarted() {

}

}

Use decorators

Need a compiler

const { Service, Action, Event, Method } = require('moleculer-decorators'); const web = require('moleculer-web');

const broker = new ServiceBroker();

]

}

// With options @Action({

cache: false, params: {

//...

}

@Method

authorize(ctx, route, req, res) {

}

started() { // Reserved for moleculer, fired when started

}

stopped() { // Reserved for moleculer, fired when stopped

Internal Services

The ServiceBroker contains some internal services to check the node health or get some registry information. You can disable them by setting internalServices: false in broker options.

List of nodes

It lists all known nodes (including local node).

Parameters

List of services

Parameters

Name Type Default Description onlyLocal Boolean false List only local services. skipInternal Boolean false Skip the internal services ($node). withActions Boolean false List with actions. onlyAvailable Boolean false List only available services.

List of local actions

It lists all registered actions (local & remote).

Options

List of local events

broker.call("$node.events").then(res => console.log(res));

It has some options which you can declare within params.

Options

List of metrics

It has some options which you can declare within params.

Options

Name Type Default Description

Get Broker options

Health of node

broker.call("$node.health").then(res => console.log(res));

Example health info:

"load15": 0,

"cores": 4,

"total": 17161699328,

"percent": 7.094400109979598

"platform": "win32", "user": {

"uid": -1,

"process": {

"pid": 13096,

},

"uptime": 25.447

"langVersion": "v8.9.4"

},

"192.168.130.1",

"192.168.56.1",

"now": 1487338958409,

"iso": "2018-02-17T13:42:38.409Z", "utc": "Fri, 17 Feb 2018 13:42:38 GMT"

Extending

´s internalServices option.

// moleculer.config.js module.exports = {

return `Hello Moleculer!`;

}

Actions

The actions are the callable/public methods of the service. The action calling represents a remote-procedure-call (RPC). It has request parameters & returns response, like a HTTP request.

If you have multiple instances of services, the broker will load balance the request among instances.

Call services

Syntax

The params is an object which is passed to the action as a part of the Context. The service can access it via ctx.params. It is optional. If you don’t define, it will be {}.

The opts is an object to set/override some request parameters, e.g.: timeout, retryCount. It is optional.

Available calling options:

define fallbackResponse, broker will throw a RequestTimeout error. To disable set 0. If it’s not defined, the requestTimeout value from broker options will be used. Read more.

Count of retry of request. If the request is timed out, broker will try to call again. To disable set 0. If it’s not defined, the retryPolicy.retries value from broker options will be used. Read more.

parentCtx Context null Parent Context instance. Use it to chain the calls.

requestID String null Request ID or Correlation ID. Use it for tracing.

Usages

Call without params
Call with params
Call with calling options

fallbackResponse: defaultRecommendation

});

Call with promise error handling
Direct call: get health info from the “node-21” node

Metadata

Send meta information to services with meta property. Access it via ctx.meta in action handlers. Please note that in nested calls the meta is merged.

broker.createService({ name: "test", actions: {

second(ctx) {

The meta is sent back to the caller service. Use it to send extra meta information back to the caller. E.g.: send response headers back to API gateway or set resolved logged in user to metadata.

console.log(ctx.meta);

// Prints: { a: "John", b: 5 }

}

});

// Prints: { a: "John", b: 5 }

broker.call("test.first", null, { meta: { a: "John"

Internal calls

// Prints: { user: 'John' } ctx.meta.age = 123

return this.actions.subHello(ctx.params, { parentCtx: ctx });

}

}

Timeout

Example

nodeID: "node-1", requestTimeout: 3000

};

handler(ctx) { return "Normal";

}

}

}

Calling examples

await broker.call("greeter.slow", null, { timeout: 1000 });

Multiple calls

Calling multiple actions at the same time is also possible. To do it use broker.mcall or ctx.mcall.

],

{

mcall with Object and options.meta

await broker.mcall(

{

// Common calling options for all calls.

meta: { token: '63f20c2d-8902-4d86-ad87-b58c9e2333c2' }

Example

const res = await broker.mcall([

The res will be something similar to

[

Streaming

Moleculer supports Node.js streams as request params and as response. Use it to transfer an incoming file from a gateway, encode/decode or compress/decompress streams.

Examples

Send a file to a service as a stream

const stream = fs.createReadStream(fileName);

Object Mode Streaming
Receiving a stream in a service

save(ctx) {

// Save the received stream to a file

Return a stream as response in a service

module.exports = { name: "storage", actions: {

get: {

return fs.createReadStream(`/tmp/${ctx.params.filename}`);

}

Process received stream on the caller side

.then(stream => {

const s = fs.createWriteStream(`./${filename}`);

AES encode/decode example service

module.exports = { name: "aes", actions: {

encrypt(ctx) {

}

}

Action visibility

Available values:

protected:

private: can be called only internally (via this.actions.xy() inside service)

Change visibility

handler(ctx) {}

}

}

}

Action hooks

Before hooks

If there are any problem, it can throw an Error. Please note, you can’t break/skip the further executions of hooks or action handler.

Main usages:

After hooks

Error hooks

Service level declaration

Hooks can be assigned to a specific action (by indicating action name), all actions (*) in service or by indicating a wildcard (e.g., create-*). The latter will be applied to all actions whose name starts with create-.

Before hooks

const DbService = require("moleculer-db"); module.exports = {

// Define multiple hooks for action `remove` remove: [

function isAuthenticated(ctx) { if (!ctx.user)

}

],

async function (ctx){}

],

ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id });

}

After & Error hooks

after: {

// Define a global hook for all actions to remove sensitive data "*": function(ctx, res) {

get: [

// Add a new virtual field to the entity async function (ctx, res) {

if (res.referrer)

res.referrer = await ctx.call("users.get", { id: res._id });

async function (ctx, res){}

],

error: {

// Global error handler "*": function(ctx, err) {

// Applies to all actions that start with "create-" "create-*": [

async function (ctx, err){}

}

}

Action level declaration

Before & After hooks

hooks: {

before(ctx) {

}

},

});

Execution order

It is important to keep in mind that hooks have a specific execution order. This is especially important to remember when multiple hooks are registered at different (service and/or action) levels. Overall, the hooks have the following execution logic:

Example of a global, service & action level hook execution chain

before: {

"*"(ctx) {

}

},

hello(ctx, res) {

broker.logger.info(chalk.magenta(" After hook")); return res;

hello: {

hooks: {

broker.logger.info(chalk.yellow.bold(" After action hook")); return res;

}

}

}

Output produced by global, service & action level hooks

INFO - After all hook

Reusability

The most efficient way of reusing hooks is by declaring them as service methods in a separate file and import them with the mixin mechanism. This way a single hook can be easily shared across multiple actions.

},

checkUserRole(ctx) {

}

}

mixins: [MyAuthMixin] hooks: {

before: {

actions: {

find: {

},

update: {

Local Storage

The locals property of Context object is a simple storage that can be used to store some additional data and pass it to the action handler. locals property and hooks are a powerful combo:

Setting ctx.locals in before hook

const entity = await this.findEntity(ctx.params.id); ctx.locals.entity = entity;

}

params: {

id: "number"

}

}

Events

Balanced events

Example: you have 2 main services: users & payments. Both subscribe to the user.created event. You start 3 instances of users service and 2 instances of payments service. When you emit

the user.created event, only one users and one payments service instance will receive the event.

Example

// Register handler to the "other" group instead of "payment" group. group: "other",

handler(ctx) {

}

}

Emit balanced events

// Only the `mail` & `payments` services receives it broker.emit("user.created", user, ["mail", "payments"]);

Broadcast event

The broadcast event is sent to all available local & remote services. It is not balanced, all service instances will receive it.

// Send to all "mail" service instances broker.broadcast("user.created", { user }, "mail");

// Send to all "user" & "purchase" service instances. broker.broadcast("user.created", { user }, ["user", "purchase"]);

Local broadcast event

Subscribe to events

Legacy event handlers

signature "user.created"(payload) { ... }. It is capable to detect different signatures of event handlers:

If it finds that the signature is "user.created"(ctx) { ... }, it will call it with Event Context. If not, it will call with old arguments & the 4th argument will be the Event Context,

Context-based event handler & emit a nested event

console.log("The called event name:", ctx.eventName);

ctx.emit("accounts.created", { user: ctx.params.user });

'${other.eventName}' received. Payload:`, other.params, other.meta);

}

module.exports = { events: {

// Subscribe to `user.created` event "user.created"(ctx) {

}

// Subscribe to every events

}

Event parameter validation

Similar to action parameter validation, the event parameter validation is supported.

// Validation schema params: {

from: "string|optional", to: "email",

}

}

Internal events

$services.changed

The broker sends this event if the local node or a remote node loads or destroys services.

Payload

Name Type Description

$circuit-breaker.opened

Payload

$circuit-breaker.half-opened

Payload

Name Type Description nodeID String Node ID action String Action name

$circuit-breaker.closed

The broker sends this event when the circuit breaker module change its state to closed.

Payload

$node.connected

Payload

$node.updated

Payload

Name Type Description

node Node Node info object

$node.disconnected

Payload

$broker.started

The broker sends this event once broker.start() is called and all local services are started.

$broker.stopped

The broker sends this event once broker.stop() is called and all local services are stopped.

$transporter.connected

$transporter.disconnected

$broker.error

Event payload

"error": "<the error object with all properties>"

"module": "broker" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:

$transit.error

Event payload

"error": "<the error object with all properties>"

"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:

$transporter.error

Event payload

"error": "<the error object with all properties>"

"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:

$cacher.error

Event payload

"error": "<the error object with all properties>"

"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:

$discoverer.error

Event payload

"error": "<the error object with all properties>"

"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:

Context

Properties of Context

Name Type Description

ctx.broker ServiceBroker Instance of the broker.

ctx.nodeID

Instance of action definition.

ctx.event

The emitted event name.

ctx.eventType

String

Service full name of the caller. E.g.: v3.myService

String

Parent context ID (in nested-calls).

Any

Request metadata. It will be also transferred to nested-calls.

Number

Request level (in nested-calls). The first level is 1.

Methods of Context

Name Response Description

ctx.emit() void Emit an event, same as broker.emit ctx.broadcast() void Broadcast an event, same as broker.broadcast ctx.startSpan(name, opts) Span Creates a new child span. ctx.finishSpan(span) void Finishes a span.

ctx.toJSON() Object Convert Context to a printable JSON.

Context tracking

Enable context tracking & change the timeout value

tracking: {

enabled: true, shutdownTimeout: 10 * 1000

Disable tracking in calling option

Lifecycle

Broker lifecycle

This section describes what happens when the broker is starting & stopping.

Starting logic

When starting, the broker tries to establish a connection with the transporter. When it’s done, it doesn’t publish the local service list to remote nodes because it can’t accept request yet. It starts all services (calls

Avoid deadlocks

Stopping logic

Service lifecycle

created event handler

This handler is triggered when the service instance is created (e.g.:

// Create HTTP server

this.server = http.createServer(this.httpHandler);

This handler is triggered when the broker.start is called and the broker starts all local services. Use it to connect to database, listen servers…etc.

module.exports = { name: "users", async started() {

}

}

module.exports = { name: "users", async stopped() {

try {

}

};

name: "posts",

settings: {},

},

handler(ctx) {

merged(schema) {

// Modify the service settings schema.settings.myProp = "myValue";

Middlewares

Example

// awesome.middleware.js module.exports = {

name: "Awesome",

}

};

Wrapping handlers

Wrap local action handler

// Wrap the handler return function(ctx) {

doSomethingBeforeHandler(ctx);

.catch(err => { doSomethingAfterHandlerIfFailed(err);

// Throw further the error throw err;

// So it won't cut down the performance when the feature is disabled. return next;

}

Example validator middleware

this.validate(action.params, ctx.params); return next(ctx);

};

The next is the original handler or the following wrapped handler. The middleware should return either the original handler or a new wrapped handler. As you can see above, the middleware checks whether the action has a params property. If yes, it will return a wrapped handler which calls the validator module before calling the original handler. If the params property is not defined, it simply returns the original handler (skip wrapping).

If you don’t call the original next in the middleware it will break the request. It can be used in cachers. For example, if it finds the requested data in the cache, it’ll return the cached data instead of calling the next.

Example cacher middleware

// Found in the cache! Don't call next, return with the cached content ctx.cachedResult = true;

return content;

}.bind(this);

}

Decorate core modules (extend functionality)

Decorate broker with a new allCall method

middlewares: [

{

.map(node => node.id);

// Make direct call to the given Node ID return Promise.all(

}

]

Hooks

This hook wraps the local action handlers.

// my.middleware.js module.export = {

// Do something with the response return res;

})

}

}

remoteAction(next, action) { return function(ctx) {

// Change context properties or something return next(ctx)

// Handle error or throw further throw err;

});

This hook wraps the local event handlers.

// my.middleware.js module.export = {

}

}

localMethod(next, method) { return (...args) => {

console.log(`The '${method.name}' method is called in '${method.service.fullName}' service.`, args);

createService(next)

This hook wraps the broker.createService method.

};

}

name: "MyMiddleware",

destroyService(next) {

}

call(next)

return function(actionName, params, opts) { console.log("The 'call' is called.", eventName); return next(actionName, params, opts).then(res => {

console.log("Response:", res); return res;

mcall(next)

This hook wraps the broker.mcall method.

console.log("The 'call' is called.", eventName); return next(...arguments).then(res => {

console.log("Response:", res); return res;

emit(next)

This hook wraps the broker.emit method.

};

}

name: "MyMiddleware",

broadcast(next) {

broadcastLocal(next)

This hook wraps the broker.broadcastLocal method.

console.log("The 'broadcastLocal' is called.", eventName); return next(eventName, payload, opts);

};

// my.middleware.js module.export = {

name: "MyMiddleware",

serviceStarting(service) (async)

This hook is called before service starting.

}

}

serviceStarted(service) {

console.log("Service started", service.fullName);

// my.middleware.js module.export = {

name: "MyMiddleware",

serviceStopped(service) (async)

This hook is called after service stopping.

}

}

registerLocalService(next) { return (service) => {

console.log("Registering a local service", service.name); return next(service);

This hook is called during local service creation (after mixins are applied, so service schema is merged completely).

// my.middleware.js module.export = {

}

transitPublish(next)

return next(packet);

};

// my.middleware.js module.export = {

name: "MyMiddleware",

}

transporterSend(next)

return (topic, data, meta) => {

// Do something with data. Data is a `Buffer` return next(topic, data, meta);

This hook is called after transporter received a communication packet but before serialization.

// my.middleware.js module.export = {

}

}

newLogEntry(type, args, bindings) {

// Do something with the `args`.

// my.middleware.js module.export = {

name: "MyMiddleware",

This hook is called before broker starting.

// my.middleware.js module.export = {

}

started(broker) (async)

}

}

stopping(broker) {

console.log("Broker is stopping");

// my.middleware.js module.export = {

name: "MyMiddleware",

Internal middlewares

Internal middlewares

Class name Type Description

ActionHook Optional Action hooks handler.

Retry Always Retry feature.

Fallback Always Fallback feature.

Transmit.Encryption Optional Transmission encryption middleware. Transmit.Compression Optional Transmission compression middleware. Debugging.TransitLogger Optional Transit Logger.

Debugging.ActionLogger Optional Action logger.

Access to internal middlewares

Transmission Middleware

Encryption

const { Middlewares } = require("moleculer"); const initVector = crypto.randomBytes(16);

module.exports = { middlewares: [

Compression

// moleculer.config.js

const { Middlewares } = require("moleculer");

};

Debug Middlewares

Transit Logger

Transit logger middleware allows to easily track the messages that are exchanged between services.

logPacketData: false, folder: null,

colors: {

]

};

Complete option list

folder Object null Folder where logs will be written extension String .json File extension of log file color.receive String grey Supports all Chalk colors color.send String grey Supports all Chalk colors packetFilter Array<String> HEARTBEAT Type of packets to skip

Action Logger

Action Logger middleware tracks “how” service actions were executed.

logParams: true, logResponse: true, folder: null, colors: {

send: "magenta", receive: "blue"

};

Complete option list

Class name Type Default Description

folder String null Path do folder where logs will be written extension String .json File extension of log file color.request String yellow Supports all Chalk colors color.response String cyan Supports all Chalk colors

colors.error String red Supports all Chalk colors

Event Execution Rate

Throttle

//my.service.js module.exports = {

name: "my", events: {

};

Debounce

Unlike throttling, debouncing is a technique of keeping the trigger rate at exactly 0 until a period of calm, and then triggering the listener exactly once. Same functionality as lodash’s _.debounce. For more info about debouncing check this article.

}

}

Loading & Extending

Load middleware by name

// Extend with custom middleware Middlewares.MyCustom = {

created(broker) {

// Load middleware by name "MyCustom"

]

Networking

Transporters

There are several built-in transporters in Moleculer framework.

TCP transporter

This is a no-dependency, zero-configuration TCP transporter. It uses Gossip protocol to disseminate node statuses, service list and heartbeats. It contains an integrated UDP discovery feature to detect new and disconnected nodes on the network.

Use TCP transporter with default options
All TCP transporter options with default values

// moleculer.config.js module.exports = {

logger: true, transporter: {

// UDP bind address (if null, bind on all interfaces) udpBindAddress: null,

// UDP sending period (seconds) udpPeriod: 30,

// Static remote nodes address list (when UDP discovery is not available) urls: null,

// Use hostname as preffered connection address useHostname: true,

}

};

TCP transporter with static endpoint list

"172.17.0.1:6000/node-1",

"172.17.0.2:6000/node-2", "172.17.0.3:6000/node-3"

You don’t need to set port because it find & parse the self TCP port from URL list.

TCP transporter with shorthand static endpoint list

It needs to start with tcp://.

TCP transporter with static endpoint list file

// moleculer.config.js module.exports = {

nodeID: "node-1",

"127.0.0.1:7001/server-1", "127.0.0.1:7002/server-2"

]

Serviceless node

NATS Transporter

// moleculer.config.js module.exports = {

nodeID: "server-1",

Dependencies
Examples

// moleculer.config.js module.exports = {

transporter: "NATS"

Connect to a remote NATS server
Connect to a remote NATS server with auth

// moleculer.config.js module.exports = {

transporter: "nats://user:pass@nats-server:4222"

Connect with options

pass: "1234"

}

Connect with TLS

type: "NATS", options: {

servers: ["nats://localhost:4222"]

}

};

Redis Transporter

};

Dependencies

To use this transporter install the ioredis module with npm install ioredis --save command.

Examples
Connect with connection string

// moleculer.config.js module.exports = {

transporter: "redis://localhost:6379"

Connect to a secure Redis server
Connect with options

// moleculer.config.js module.exports = {

transporter: { type: "Redis", options: {

Connect to Redis cluster

// moleculer.config.js module.exports = {

transporter: { type: "Redis", options: {

]

}

MQTT Transporter

// moleculer.config.js module.exports = {

nodeID: "server-1",

Dependencies
Examples

// moleculer.config.js module.exports = {

transporter: "MQTT"

Connect with connection string
Connect to secure MQTT server

// moleculer.config.js module.exports = {

transporter: "mqtts://mqtt-server:1883"

Connect with options

qos: 0, topicSeparator: "."

}

AMQP (0.9) Transporter

nodeID: "server-1",

transporter: "amqp://rabbitmq-server:5672"

Dependencies
Transporter options
Connect to ‘amqp://guest:guest@localhost:5672’

transporter: "AMQP"

});

Connect to a remote server
Connect to a secure server

transporter: "amqps://rabbitmq-server:5672"

});

Connect to a remote server with options & credentials

servername: process.env.RABBIT_SERVER_NAME

}

};

AMQP 1.0 Transporter

Built-in transporter for AMQP 1.0 protocol (e.g.: ActiveMq or RabbitMQ + rabbitmq-amqp1.0 plugin).

Dependencies

To use this transporter install the rhea-promise module with npm install rhea-promise -- save command.

Transporter options

Options can be passed to rhea.connection.open() method, the topics, the queues, and the messages themselves.

Connect to ‘amqp10://guest:guest@localhost:5672’
Connect to a remote server

transporter: "amqp10://activemq-server:5672"

};

Connect to a remote server with options & credentials

connectionOptions: { // rhea connection options https://github.com/amqp/rhea#connectoptions, example:

ca: "", // (if using tls) servername: "", // (if using tls)

topicPrefix: "topic://", // RabbitMq uses '/topic/' instead, 'topic://' is more

common

Kafka Transporter

It is a simple implementation. It transfers Moleculer packets to consumers via pub/sub. There are not implemented offset, replay…etc features.

Dependencies

To use this transporter install the kafka-node module with npm install kafka-node --save command.

Connect to Zookeeper
Connect to Zookeeper with custom options

transporter: { type: "kafka", options: {

host: "192.168.51.29:2181",

// KafkaProducer options. More info: https://github.com/SOHU-Co/kafka- node#producerclient-options-custompartitioner

producer: {}, customPartitioner: undefined,

publish: {

partition: 0,

};

NATS Streaming (STAN) Transporter

Built-in transporter for NATS Streaming.

};

Dependencies

To use this transporter install the node-nats-streaming module with npm install node-nats-streaming

Examples

};

Connect with connection string

// moleculer.config.js module.exports = {

Connect with options

url: "stan://127.0.0.1:4222", clusterID: "my-cluster"

}

Custom transporter

Create custom transporter

connect() { /*...*/ }

disconnect() { /*...*/ }

Use custom transporter

const MyTransporter = require("./my-transporter");

module.exports = {

Disabled balancer

Example

disableBalancer: true,

transporter: "nats://some-server:4222"

Please note

Serialization

Example

// moleculer.config.js module.exports = {

nodeID: "server-1", transporter: "NATS", serializer: "ProtoBuf"

JSON serializer

};

Avro serializer

Built-in Avro serializer.

Dependencies

MsgPack serializer

Built-in MsgPack serializer.

// moleculer.config.js module.exports = {

Dependencies

Notepack serializer

// moleculer.config.js module.exports = {

serializer: "Notepack"

Dependencies

ProtoBuf serializer

serializer: "ProtoBuf"

};

Dependencies

Thrift serializer

};

Dependencies

To use this serializer install the thrift module with npm install thrift --save command.

CBOR serializer

Custom serializer

Custom serializer module can be created. We recommend to copy the source of JSONSerializer and implement the serialize and deserialize methods.

Create custom serializer

const BaseSerializer = require("moleculer").Serializers.Base; class MySerializer extends BaseSerializer {

Use custom serializer

module.exports = {

serializer: new MySerializer()

Registry & Discovery

Dynamic service discovery

Local

Local Discovery with default options

// moleculer.config.js module.exports = {

registry: {

Local Discovery with custom options

registry: {

discoverer: {

// Disable removing offline nodes from registry, if true disableOfflineNodeRemoving: false,

// Remove offline nodes after 10 minutes cleanOfflineNodesTimeout: 10 * 60

Redis

Redis-based discovery uses a dedicated connection with the Redis server to exchange discovery and heartbeat packets. This approach reduces the load over the transporter module, it’s used exclusively for the exchange of the request, response, event packets.

When Redis-based discovery method is enabled, Moleculer nodes periodically publish and fetch the info from Redis and update their internal service registry. Redis key expiration mechanism removes nodes that don’t publish heartbeat packets for a certain period of time. This allows Moleculer nodes to detect that a specific node has disconnected.

Example of connection to a local Redis server

discoverer: "Redis"

}

Example of connection to a remote Redis server

}

}

Example with options

redis: {

// Redis connection options.

}

// Serializer serializer: "JSON",

// Monitoring Redis commands monitor: true,

// --- COMMON DISCOVERER OPTIONS ---

// Remove offline nodes after 10 minutes cleanOfflineNodesTimeout: 10 * 60

}

etcd3

Etcd3-based discovery method is very similar to Redis-based discovery. It stores heartbeat and discovery packets at etcd3 server. etcd3’s lease option will remove heartbeat info of nodes that have crashed or disconnected from the network.

This method has the same strengths and weaknesses of Redis-based discovery. It doesn’t use the transporter module for the discovery but it’s also slower to detect new or disconnected nodes.

Example to connect local etcd3 server

}

}

Example to connect remote etcd3 server

}

Example with options

// moleculer.config.js module.exports = {

// etcd3 connection options.

// More info: https://mixer.github.io/etcd3/interfaces/options_.ioptions.html

// 10 means every 10 cycle.

fullCheck: 10,

// Disable removing offline nodes from registry, if true disableOfflineNodeRemoving: false,

// Remove offline nodes after 10 minutes cleanOfflineNodesTimeout: 10 * 60

Tip: To further reduce network traffic use MsgPack/Notepack serializers instead of JSON.

Customization

You can create your custom discovery mechanism. We recommend to copy the source of Redis Discoverer and implement the necessary methods.

Built-in Service Registry

Load balancing

Built-in strategies

RoundRobin strategy

Usage

// moleculer.config.js module.exports = {

registry: {

Random strategy

Usage

// moleculer.config.js module.exports = {

registry: {

CPU usage-based strategy

Usage

// moleculer.config.js module.exports = {

registry: {

Strategy options

sampleCount Number 3 The number of samples. To turn of sampling, set to 0.

The low CPU usage percent (%). The node which has lower CPU usage than this value

Usage with custom options

sampleCount: 3,

lowCpuUsage: 10

Latency-based strategy

Usage

// moleculer.config.js module.exports = {

registry: {

Strategy options

sampleCount Number 5 The number of samples. If you have a lot of hosts/nodes, it’s recommended to increase the value. To turn of sampling, set to 0.

lowLatency Number 10 The low latency (ms). The node which has lower latency than this value is selected immediately.

Usage with custom options

strategy: "Latency", strategyOptions: {

sampleCount: 15,

}

};

Sharding strategy

registry: {

strategy: "Shard", strategyOptions: {

Example of a shard key user.id in context meta

// moleculer.config.js module.exports = {

}

};

Strategy options
All available options of Shard strategy

// moleculer.config.js module.exports = {

registry: {

}

}

Overwrite global options

Using ‘Shard’ strategy for ‘hello’ action instead of global ‘RoundRobin’

strategy: "RoundRobin"

}

params: {

name: "string"

handler(ctx) {

return `Hello ${ctx.params.name}`;

Custom strategy

Custom strategy can be created. We recommend to copy the source of RandomStrategy and implement the select method.

Create custom strategy

const BaseStrategy = require("moleculer").Strategies.Base; class MyStrategy extends BaseStrategy {

Use custom strategy

const Strategies = require("moleculer").Strategies

// Add custom strategy to the registry Strategies.register("myCustomStrategy", MyStrategy)

};

Preferring local services

The ServiceBroker first tries to call the local instances of service (if exists) to reduce network latencies. It means, if the given service is available on the local broker, the configured strategy will be skipped and the broker will call the local service always.

}

};

Fault tolerance

Circuit Breaker

What is the circuit breaker?
Enable it in the broker options

const broker = new ServiceBroker({ circuitBreaker: {

enabled: true, threshold: 0.5,

Settings

Name Type Default Description

Enable feature

Threshold value. 0.5 means that 50% should be failed for tripping.

These global options can be overridden in action definition, as well.

// users.service.js module.export = {

// All CB options can be overwritten from broker options. threshold: 0.3,

windowTime: 30

Retry

handler(ctx) {}

There is an exponential backoff retry solution. It can recall failed requests.

Enable it in the broker options

factor: 2,

check: err => err && !!err.retryable

Settings

Name Type Default Description

delay Number 100 First delay in milliseconds.

maxDelay Number 2000 Maximum delay in milliseconds.

Overwrite the retries value in calling option
Overwrite the retry policy values in action definitions

name: "users", actions: {

find: {

handler(ctx) {}

},

handler(ctx) {}

}

Timeout

Enable it in the broker options

});

Overwrite the timeout value in calling option

broker.call("posts.find", {}, { timeout: 3000 });

Distributed timeouts

Bulkhead

Enable it in the broker options

maxQueueSize: 10,

}

Global Settings

Name Type Default Description

The concurrency value restricts the concurrent request executions. If the maxQueueSize is bigger than 0, broker stores the additional requests in a queue if all slots are taken. If the queue size reaches the maxQueueSize limit, broker will throw QueueIsFull exception for every addition requests.

Action Settings

Global settings can be overridden in action definition.

Overwrite the retry policy values in action definitions

// Disable bulkhead for this action enabled: false

},

// Increment the concurrency value for this action concurrency: 10

},

Events Settings

Event handlers also support bulkhead feature.

Example

// my.service.js module.exports = {

async handler(ctx) {

// Do something.

Fallback

Fallback feature is useful, when you don’t want to give back errors to the users. Instead, call an other action or return some common content. Fallback response can be set in calling options or in action definition. It should be

a Function which returns a Promise with any content. The broker passes the current Context & Error objects to this function as arguments.

Fallback response setting in calling options

}

});

Fallback in action definition

Fallback as a function

add: {

fallback: (ctx, err) => "Some cached result", handler(ctx) {

};

Fallback as method name string

module.exports = { name: "recommends", actions: {

}

}

}

};

Caching

Cached action example

});

// Create a service broker.createService({

{ id: 1, name: "John" },

{ id: 2, name: "Jane" }

});

broker.start()

.then(() => {

// Return from cache, handler won't be called

Console messages:

As you can see, the Handler called message appears only once because the response of second request is returned from the cache.

Try it on Runkit

Cache keys

Example hashed cache key for “posts.find” action

The params object can contain properties that are not relevant for the cache key. Also, it can cause performance issues if the key is too long. Therefore it is recommended to set an object for cache property which contains a list of essential parameter names under the keys property.

To use meta keys in cache keys use the # prefix.

Strict the list of params & meta properties for key generation

// generate cache key from "limit", "offset" params and "user.id" meta keys: ["limit", "offset","#user.id"]

},

}

}

Performance tip

Limiting cache key length

Occasionally, the key can be very long, which can cause performance issues. To avoid it, maximize the length of concatenated params in the key with maxParamsLength cacher option. When the key is longer than the configured limit value, the cacher calculates a hash (SHA256) from the full key and adds it to the end of the key.

The minimum of maxParamsLength is 44 (SHA 256 hash length in Base64). To disable this feature, set it to 0 or null.

Generate a full key from the whole params without limit
Generate a limited-length key

maxParamsLength: 60

}

Conditional caching

Conditional caching allows to bypass the cached response and execute an action in order to obtain “fresh” data. To bypass the cache set ctx.meta.$cache to false before calling an action.

Example of turning off the caching for the greeter.hello action

Example of a custom conditional caching function

hello: {

cache: {

handler(ctx) {

this.logger.debug(chalk.yellow("Execute handler")); return `Hello ${ctx.params.name}`;

// Use custom `enabled` function to turn off caching for this request broker.call("greeter.hello", { name: "Moleculer", noCache: true }))

TTL

Default TTL setting can be overriden in action definition.

}

});

},

handler(ctx) {

});

Custom key-generator

To overwrite the built-in cacher key generator, set your own function as keygen in cacher options.

// name - action name

// params - ctx.params

}

});

Manual caching

// Remove entry from cache

await broker.cacher.del("mykey.a");

Additionally, the complete ioredis client API is available at broker.cacher.client when using the built-in Redis cacher:

// create an ioredis pipeline

Clear cache

When you create a new model in your service, you have to clear the old cached model entries.

Example to clean the cache inside actions

{

// Clear all cache entries this.broker.cacher.clean();

// Clear all cache entries which keys start with `users.` this.broker.cacher.clean("users.**");

}

}

Clear cache among multiple service instances

Example

// Create new user entity

const user = new User(ctx.params);

methods: {

cleanCache() {

this.broker.broadcast("cache.clean.users"); events: {

"cache.clean.users"() {

}

Clear cache among different services

Service dependency is a common situation. E.g. posts service stores information from users service in cached entries (in case of populating).

Example cache entry in posts service

fullName: "John Doe", avatar: "https://..."

},

cache.cleaner.mixin.js

module.exports = function(serviceNames) { const events = {};

serviceNames.forEach(name => { events[`cache.clean.${name}`] = function() {

});

return {

posts.service.js

name: "posts",

mixins: [CacheCleaner([ "users",

}

};

Cache locking

Enable Lock

lock: true, // Set to true to enable cache locks. Default is disabled.

}

Enable with TTL

seconds

staleTime: 10, // If the TTL is less than this number, means that the resources are staled

Disable Lock

ttl: 60, lock: {

enable: false, // Set to false to disable.

}

}

Example for Redis cacher with redlock library

// set Time-to-live to 30sec. ttl: 30,

// Turns Redis client monitoring on. monitor: false,

},

lock: {

// Redis clients. Support node-redis or ioredis. By default will use the local

client.

// the max number of times Redlock will attempt

// to lock a resource before erroring retryCount: 10,

}

}

Built-in cachers

Memory cacher

Enable memory cacher

Or

const broker = new ServiceBroker({ cacher: true

Enable with options

}

}

Options

keygen Function null Custom cache key generator function. maxParamsLength Number null Maximum length of params in generated keys. lock Boolean or Object null Enable lock feature.

Cloning

The cacher uses the lodash _.cloneDeep method for cloning. To change it, set a Function to the clone option instead of a Boolean.

Custom clone function with JSON parse & stringify

}

});

LRU memory cacher

Enable LRU cacher
With options

cacher: {

type: "MemoryLRU", options: {

});

Options

Name

max

Number

null

lock Boolean or Object

null

Enable lock feature.

Redis cacher

Enable Redis cacher

const broker = new ServiceBroker({ cacher: "Redis"

});

With connection string
With options

// Prefix for keys prefix: "MOL",

// set Time-to-live to 30sec. ttl: 30,

});

port: 6379,

With MessagePack serializer

You can define a serializer for Redis Cacher. By default, it uses the JSON serializer.

const broker = new ServiceBroker({ nodeID: "node-123",

redis: {

host: "my-redis"

With Redis Cluster Client

const broker = new ServiceBroker({ cacher: {

type: "Redis", options: {

{ port: 6381, host: "127.0.0.1" },

{ port: 6382, host: "127.0.0.1" }

});

Options

],

keygen Function null Custom cache key generator function. maxParamsLength Number null Maximum length of params in generated keys. serializer String "JSON" Name of a built-in serializer.

cluster Object null Redis Cluster client configuration. More information

Dependencies

Custom cacher

Custom cache module can be created. We recommend to copy the source of MemoryCacher or RedisCacher and implement the get, set, del and clean methods.

Create custom cacher

const BaseCacher = require("moleculer").Cachers.Base; class MyCacher extends BaseCacher {

Use custom cacher

const { ServiceBroker } = require("moleculer"); const MyCacher = require("./my-cacher");

const broker = new ServiceBroker({ cacher: new MyCacher()

Parameter Validation

Fastest Validator

Default usage

nodeID: "node-100",

validator: true // Using the default Fastest Validator

Setting validator by name

}

Example with options

//moleculer.config.js module.exports = {

aliases: { /*...*/ }

}

Actions Validation

Example

validator: true // Default is true

});

},

handler(ctx) {

});

broker.call("say.hello").then(console.log)

// -> throw ValidationError: "The 'name' field must be a string!"

broker.call("say.hello", { name: "Walter" }).then(console.log)

Example validation schema

id: { type: "number", positive: true, integer: true }, name: { type: "string", min: 3, max: 255 },

status: "boolean" // short-hand def

Documentation
Async custom validator
Enabling custom async validation

validator: {

type: "FastestValidator", options: {

}

}

Using custom async validation

owner: { type: "string", custom: async (value, errors, schema, name, parent, context) => {

});

},

return value;

Events Validation

Please note that the validation errors are not sent back to the caller, as happens with action errors. Event validation errors are logged but you can also catch them with the global error handler.

// mailer.service.js module.exports = {

definitions

params: {

this.logger.info("Event received, parameters OK!", ctx.params);

}

Custom validator

Creating custom validator

//moleculer.config.js

const BaseValidator = require("moleculer").Validators.Base; class MyValidator extends BaseValidator {}

Build Joi validator

const BaseValidator = require("moleculer").Validators.Base; const { ValidationError } = require("moleculer").Errors; const Joi = require("joi");

// --- JOI VALIDATOR CLASS ---

compile(schema) {

return (params) => this.validate(params, schema);

res.error.details);

return true;

});

// --- TEST BROKER ---

name: { type: "string", min: 4 }

},*/

return `Hello ${ctx.params.name}`;

}

.then(() => broker.call("greeter.hello").then(res => broker.logger.info(res)))

.catch(err => broker.logger.error(err.message, err.data))

.then(() => broker.call("greeter.hello", { name: "John" }).then(res => broker.logger.info(res)))

.catch(err => broker.logger.error(err.message, err.data));

Metrics

Enable metrics & define console reporter

metrics: {

enabled: true, reporter: [

Options

Name Type Default Description

enabled Boolean false Enable tracing feature.

reporter Object or Array<Object> null Metric reporter configuration. More info

5

Collect time period in seconds.

10]

defaultQuantiles Array<Number> Default quantiles for histograms. Default: [0.5, 0.9,

Metrics Reporters

Name Type Default Description

labelNameFormatter Function null Label name formatter

Example of metrics options

// moleculer.config.js module.exports = {

includes: ["moleculer.**.total"],

excludes: ["moleculer.broker.**","moleculer.request.**"],

}

]

Console

This is a debugging reporter which periodically prints the metrics to the console.

// moleculer.config.js module.exports = {

// Printing interval in seconds interval: 5,

// Custom logger. logger: null,

]

}

CSV

enabled: true, reporter: [

{

// Saving mode.

// - "metric" - save metrics to individual files

}

};

Event

Event reporter sends Moleculer events with metric values.

// moleculer.config.js module.exports = {

// Event name

eventName: "$metrics.snapshot",

}

}

Datadog

// moleculer.config.js module.exports = {

metrics: {

// Base URL

baseUrl: "https://api.datadoghq.eu/api/", // Default is https://api.datadoghq.com/api/

// Default labels which are appended to all metrics labels defaultLabels: (registry) => ({

namespace: registry.broker.namespace, nodeID: registry.broker.nodeID

]

}

Prometheus

enabled: true, reporter: [

{

namespace: registry.broker.namespace, nodeID: registry.broker.nodeID

})

};

StatsD

The StatsD reporter sends metric values to StatsD server via UDP.

type: "StatsD", options: {

// Server host host: "localhost",

]

}

Customer Reporter

Create custom metrics

stop() { /*...*/ } metricChanged() { /*...*/ }

}

Use custom metrics

new MyMetricsReporter(),

]

Supported Metric Types

Counter

increment(labels?: GenericObject, value?: number, timestamp?: number) set(value: number, labels?: GenericObject, timestamp?: number)

Gauge

A gauge is a metric that represents a single numerical value that can arbitrarily go up and down. Gauges are typically used for measured values like current memory usage, but also “counts” that can go up and down, like the number of concurrent requests. It can also provide 1-minute rate.

Histogram

Info

observe(value: number, labels?: GenericObject, timestamp?: number)

An info is a single string or number value like process arguments, hostname or version numbers. Info provides the following methods:

Built-in Internal Metrics

Process metrics

process.internal.active.handles (gauge) process.internal.active.requests (gauge) process.versions.node (info) process.gc.time (gauge) process.gc.total.time (gauge) process.gc.executed.total (gauge)

OS metrics

os.memory.free (gauge) os.memory.total (gauge) os.memory.used (gauge) os.uptime (gauge) os.type (info) os.release (info) os.hostname (info) os.arch (info) os.platform (info) os.user.uid (info) os.user.gid (info) os.user.username (info) os.user.homedir (info)

Moleculer metrics

Customizing

New metric registration

Create a counter

// posts.service.js module.exports = {

name: "posts",

}

},

description: "Number of requests of posts", unit: "request",

rate: true // calculate 1-minute rate

Create a gauge with labels

name: "posts",

actions: {

return posts;

},

return posts;

},

name: "posts.total", labelNames: ["userID"]

description: "Number of posts by user", unit: "post"

Create a histogram with buckets & quantiles

name: "posts",

actions: {

this.logger.debug("Post created. Elapsed time: ", duration, "ms"); return post;

}

name: "posts.creation.time", description: "Post creation time", unit: "millisecond", linearBuckets: {

start: 0,

};

},

Errors

Base error classes

MoleculerError The base error class. Parameters

Name Type Default Description message String Error message code Number 500 Error code type String Error type

Example

Error for retryable errors. It uses in broker.call. The broker retries requests if they rejected a MoleculerRetryableError.

Parameters

Name Type Default Description message String Error message code Number 500 Error code type String Error type

Example

MoleculerServerError

Error for retryable server errors. Parameters are same as MoleculerRetryableError.

Internal error classes

Throw it if you call a not registered service action. Error code: 404

Retryable: true

Retryable: true

Type: SERVICE_NOT_AVAILABLE

RequestSkippedError

Throw it if your nested call is skipped because the execution is timed out due to distributed timeout. Error code: 514

Retryable: true

Type: REQUEST_REJECTED

ValidationError

Validator throws it if the calling parameters are not valid. Error code: 422

Retryable: false

Type: MAX_CALL_LEVEL

BrokerOptionsError

Throw it if your broker options are not valid. Error code: 500

Retryable: false

Type: GRACEFUL_STOP_TIMEOUT

InvalidPacketDataError

Throw it if transporter receives unknown data. Error code: 500

Create custom errors

super(msg || `This is my business error.`, 500, "MY_BUSINESS_ERROR", data);

}

Preserve custom error classes while transferring between remote nodes

Public interface of Regenerator

Create custom regenerator

super(message, code, type, data); this.timestamp = timestamp;

}

return new TimestampedError(message, code, type, data, timestamp);

}

}

}

Use custom regenerator

errorRegenerator: new CustomRegenerator()

}

Moleculer Runner

Production-ready

Syntax

./node_modules/moleculer/bin/moleculer-runner.js --repl format.

Options

Option Type Default Description

-r, --repl Boolean false If true, it switches to REPL mode after broker started.

-E, --envfile

<file>

Example NPM scripts

"scripts": {

"dev": "moleculer-runner --repl --hot --config moleculer.dev.config.js

The dev script loads development configurations from the moleculer.dev.config.js file, start all services from the services folder, enable hot-reloading and switches to REPL mode. Run it with the npm run dev command.

The start script is to load the default moleculer.config.js file if it exists, otherwise only loads options from environment variables. Starts 4 instances of broker, then they start all services from the services folder. Run it with npm start command.

Configuration loading logic

  1. Once a config file has been loaded, it merges options with the default options of the ServiceBroker.

  2. The runner observes the options step by step and tries to overwrite them from environment variables.

Configuration file

Example config file

nodeID: "node-test", logger: true, logLevel: "debug",

transporter: "nats://localhost:4222", requestTimeout: 5 * 1000,

Asynchronous Configuration file

Moleculer Runner also supports asynchronous configuration files. In this case moleculer.config.js must export a Function that returns a Promise (or you can use async/await).

// moleculer.config.js

This function runs with the MoleculerRunner instance as the this context. Useful if you need to access the flags passed to the runner. Check the MoleculerRunner source more details.

Environment variables

The runner transforms the property names to uppercase. If nested, the runner concatenates names with _.

Example environment variables

Services loading logic

The runner loads service files or folders defined in CLI arguments. If you define folder(s), the runner loads all services **/*.service.js from specified one(s) (including sub-folders too). Services & service folder can be loaded with SERVICES and SERVICEDIR environment variables.

Loading steps:
  1. If SERVICEDIR env found, but no SERVICES env, it loads all services from the SERVICEDIR directory.

Example

SERVICEDIR=services SERVICES=math,post,user

It loads the math.service.js, post.service.js and user.service.js files from the services folder.

Glob patterns

Explanations:

services - legacy mode. Load all services from the services folder with **/*.service.js file mask.

!services/others/**/*.service.js - skip all services in the services/others folder and sub-folders.

Built-in clustering

Clustered Node ID

.env files

Moleculer runner can load .env file at starting. There are two new cli options to load env file:

-e, --env - Load environment variables from the ‘.env’ file from the current folder.

Example

$ moleculer-runner --envfile .my-env

Dependencies

To use this feature, install the dotenv module with npm install dotenv --save command.

API Gateway

moleculer-web

Features

multiple body parsers (json, urlencoded) CORS headers

Rate limiter

Install

Usage

Run with default settings

This example uses API Gateway service with default settings.

You can access all services (including internal $node.) via http://localhost:3000/

Example URLs:

Call test.hello action: http://localhost:3000/test/hello

Call math.add action with params: http://localhost:3000/math/add?a=25&b=13

Whitelist

settings: {

routes: [{

// Access any actions in 'math' service

/^math\.\w+$/

Aliases

You can use alias names instead of action names. You can also specify the method. Otherwise it will handle every method types.

Using named parameters in aliases is possible. Named parameters are defined by prefixing a colon to the parameter name (:name).

// Call `auth.login` action with `GET /login` or `POST /login` "login": "auth.login",

// Restrict the request method "POST users": "users.create",

}

});

Aliases Action

settings: {

routes: [{

}]

}

routes: [{

aliases: {

});

To use this shorthand alias, create a service which has list, get, create, update and remove actions.

routes: [{

aliases: {

}

}

req.$ctx are pointed to request context.

req.$service & res.$service are pointed to this service instance.

Mapping policy

The route has a mappingPolicy property to handle routes without aliases.

Available options:

all - enable to request all routes with or without aliases (default)

mappingPolicy: "restrict", aliases: {

"POST add": "math.add"

You can’t request the /math.add or /math/add URLs, only POST /add.

File upload aliases

API Gateway has implemented file uploads. You can upload files as a multipart form data (thanks to busboy library) or as a raw request body. In both cases, the file is transferred to an action as a Stream. In multipart form data mode you can upload multiple files, as well.

Example

{

path: "",

type: "multipart",

// Action level busboy config busboyConfig: {

},

// Route level busboy config.

},

}

Multipart parameters

In order to access the files passed by multipart-form these specific fields can be used inside the action:

ctx.params is the Readable stream containing the file passed to the endpoint

Auto-alias

Use whitelist parameter to specify services that the Gateway should track and build the routes.

Example

// api.service.js module.exports = {

path: "/api",

whitelist: [

},

autoAliases: true

// posts.service.js module.exports = {

name: "posts", version: 2,

// of /api/v2/posts, you can uncomment this line.

},

},

get: {

rest: "POST /", handler(ctx) {}

},

rest: "DELETE /:id", handler(ctx) {}

}

The generated aliases
=>

test.hello

GET

=>
=>

v2.posts.get

POST

=>
=>

v2.posts.update

DELETE

=>

name: "posts", version: 2,

settings: {

// Expose as "/tags" instead of "/api/v2/posts/tags" rest: {

method: "GET", fullPath: "/tags"

};

Parameters

API gateway collects parameters from URL querystring, request params & request body and merges them. The results is placed to the req.$params.

Disable merging

Example

}

});

// Querystring params query: {

category: "general",

// Request params params: {

id: 5

Query string parameters

Array parameters
Nested objects & arrays

URL: GET /api/opt-test?foo[bar]=a&foo[bar]=b&foo[baz]=c

foo: {

Middlewares

Examples

broker.createService({ mixins: [ApiService], settings: {

// Global middlewares. Applied to all routes. use: [

path: "/",

// Route-level middlewares. use: [

aliases: {

"GET /secret": [

}

]

const swMiddleware = swStats.getMiddleware(); broker.createService({

mixins: [ApiGatewayService], name: "gw-main",

},

routes: [

async started(this: Service): Promise<void> { this.addRoute({

path: "/",

Error-handler middleware

There is support to use error-handler middlewares in the API Gateway. So if you pass an Error to

the next(err) function, it will call error handler middlewares which have signature as (err, req, res, next).

routes: [

{

function(err, req, res, next) {

this.logger.error("Error is occured in middlewares!"); this.sendError(req, res, err);

Serve static files

settings: {

assets: {

});

Calling options

The route has a callOptions property which is passed to broker.call. So you can

callOptions: { timeout: 500,

retries: 3,

});

Multiple routes

You can create multiple routes with different prefix, whitelist, alias, calling options & authorization.

{

path: "/admin",

bodyParsers: { json: true

}

"posts.*",

"math.*",

]

}

Response type & status code

Available meta fields:

ctx.meta.$statusCode - set res.statusCode. ctx.meta.$statusMessage - set res.statusMessage. ctx.meta.$responseType - set Content-Type in header. ctx.meta.$responseHeaders - set all keys in header. ctx.meta.$location - set Location key in header for redirects.

Example

module.exports = { name: "export", actions: {

return csvFileStream;

}

}

}

Authorization

Example authorization

mixins: [ApiService],

settings: {

methods: {

// Second thing

// Check the token

if (token == "123456") {

}

} else {

}

}

Authentication

The returned value will be set to the ctx.meta.user property. You can use it in your actions to get the logged in user entity.

Example authentication

broker.createService({

}]

},

// valid credentials. It will be set to `ctx.meta.user`

return Promise.resolve({ id: 1, username: "john.doe", name: "John

} else {

// anonymous user

});

Route hooks

The route has before & after call hooks. You can use it to set ctx.meta, access req.headers or modify the response data.

path: "/",

onBeforeCall(ctx, route, req, res) {

}

}

Error handlers

You can add route-level & global-level custom error handlers.

In handlers, you must call the res.end. Otherwise, the request is unhandled.

res.setHeader("Content-Type", "application/json; charset=utf-8"); res.writeHead(500);

}

res.end("Global error: " + err.message);

}

Error formatter

reformatError(err) {

// Filter out the data from the error before sending it to the client return _.pick(err, ["name", "message", "code", "type", "data"]);

CORS headers

Usage

const svc = broker.createService({ mixins: [ApiService],

settings: {

// Configures the Access-Control-Expose-Headers CORS header. exposedHeaders: [],

// Configures the Access-Control-Allow-Credentials CORS header. credentials: false,

// Route CORS settings (overwrite global settings) cors: {

origin: ["http://localhost:3000", "https://localhost:4000"], methods: ["GET", "OPTIONS", "POST"],

});

Rate limiter

The Moleculer-Web has a built-in rate limiter with a memory store.

Usage

// Defaults to 60000 (1 min)

window: 60 * 1000,

}

}

Custom Store example

this.hits = new Map();

this.resetTime = Date.now() + clearPeriod;

/**

  • Increment the counter by key

let counter = this.hits.get(key) || 0; counter++;

this.hits.set(key, counter); return counter;

this.hits.clear();

}

ETag

// Service-level option etag: false,

routes: [

]

}

etag: (body) => generateHash(body)

}

Custom etag for streaming

ctx.meta.$responseType = "text/csv"; ctx.meta.$responseHeaders = {

"Content-Disposition": `attachment; filename="data-${ctx.params.id}.csv"`,

}

}

HTTP2 Server

Example

// HTTPS server with certificate https: {

key: fs.readFileSync("key.pem"), cert: fs.readFileSync("cert.pem")

ExpressJS middleware usage

You can use Moleculer-Web as a middleware in an ExpressJS application.

Usage

const svc = broker.createService({ mixins: [ApiService],

// Create Express application const app = express();

// Use ApiGateway as middleware app.use("/api", svc.express());

Full service settings

// Exposed port port: 3000,

// Exposed IP ip: "0.0.0.0",

// If false, it will start without server in middleware mode server: true,

// Exposed global path prefix path: "/api",

// Logging response data with 'debug' level logResponseData: "debug",

// Use HTTP2 server (experimental) http2: false,

// Path prefix to this route (full path: /api/admin ) path: "/admin",

// Whitelist of actions (array of string mask or regex) whitelist: [

// Merge parameters from querystring, request params & body mergeParams: true,

// Route-level middlewares use: [

},

mappingPolicy: "all",

},

{

/^math\.\w+$/

],

"GET greeter/:name": "test.greeter", "GET /": "test.hello",

"POST upload"(req, res) { this.parseUploadedFile(req, res);

json: false,

urlencoded: { extended: true }

fallbackResponse: "Static fallback response"

},

res.setHeader("X-Custom-Header", "123456"); return data;

},

],

// Folder to server assets (static files) assets: {

res.setHeader("Content-Type", "text/plain"); res.writeHead(err.code || 500); res.end("Global error: " + err.message);

}

Service Methods

removeRoute

Service method removes the route by path (this.removeRoute("/admin")).

Examples

WWW with assets

serve static files from the assets folder whitelist

set the authorized user to Context.meta

simple server with RESTful aliases example posts service with CRUD actions

call action and send back the response via websocket send Moleculer events to the browser via websocket

Full

metrics, statistics & validation from Moleculer custom error handlers

Webpack

Hot-replacement

Babel, SASS, SCSS, Vue SFC

REPL console

moleculer repl
Install
Usage

// Switch to REPL mode broker.repl();

});

REPL Commands

cache Manage cache

call [options] <actionName> [jsonParams] [meta] Call an action

env List of environment variables

events [options] List of event listeners

metrics [options] List metrics

nodes [options] List of nodes

List nodes

Options

-a, --all list all (offline) nodes

Output

Detailed output

List services

-d, --details print endpoints

-f, --filter <match> filter services (e.g.: 'user*')

Detailed output

List actions

mol $ actions

-i, --skipinternal skip internal actions

-l, --local only local actions

List events

Options

-a, --all list all (offline) event listeners

Output

Show common information

List environment variables

mol $ env

Call an action

--help output usage information

--load [filename] Load params from file

Params will be { a: 5, b: 'Bob', c: true, d: false, e: { f: 'hello' } }

Call an action with params, meta & options

Params will be { a: 5, b: 'Bob', c: true, d: false, e: { f: 'hello' } }

Call with parameters from file

Call with file stream

mol $ call "math.add" --stream my-picture.jpg

It saved the response to the my-response.json file.

Direct call

Get health info from node-12 node

Emit an event

mol $ emit "user.created" --a 5 --b Bob --c --no-d --e.f "hello"

Params will be { a: 5, b: 'Bob', c: true, d: false, e: { f: 'hello' } }

Benchmark services

# Call service until 5 seconds (default) mol $ bench math.add

# Call service 5000 times

--time <seconds> Time of bench

--nodeID <nodeID> NodeID (direct call)

mol $ bench math.add '{ "a": 50, "b": 32 }'

Load a service from file

mol $ load "./math.service.js"

Load all services from a folder

List metrics

Output

Cache Keys

Cache Clear

You clear the cache with:

mol $ cache clear

Event listener

mol $ listener add user.created

Subscribe with group option

mol $ listener list

Custom commands

Custom REPL commands can be defined in broker options to extend Moleculer REPL commands.

description: "Call the greeter.hello service with name", alias: "hi",

options: [

boolean: ["u", "uppercase"]

},

}

}

Command Line Tool

moleculer-cli

Install

$ npm i -g moleculer-cli

Commands Init

The init command is used to scaffold a new Moleculer project.

Answers from file

Disable installing dependencies

You can disable the automatic NPM dependency installation with --no-install argument. It can be useful to generate project programmatically.

$ moleculer init project my-project --answers ./answers.json --no-install

nano - Minimal project template for one microservice. Use it if you want to create a microservice which connect to others via transporter

sample service (greeter)

tests & coverage with Jest lint with ESLint

Custom templates

$ moleculer init username/repo my-project

Local Templates

Template aliases

To simplify usage of custom templates (local and remote), it is possible to register an alias and use that afterwards instead of the whole repository url.

$ moleculer alias-template myAlias somegithubuser/reponame

Creating Custom Templates

meta.js

The meta.js file exports a function that returns an object defining the Moleculer CLI init interface. The function takes a parameter values that gives access to external values passed in from the CLI. The object has several keys which are explained below.

The filters object takes a set of keys matching a path and a value matching the name of a question variable. If the question variable’s value is false, the specified path will be ignored during the transformation and those files will not be added to the project being intialized.

The completeMessage property takes a multiline string that will be displayed after the initialization is completed.

Start

$ moleculer start

Options

--version Show version number [boolean]

--id NodeID [string] [default: null]

--hot, -h Enable hot-reload [boolean] [default: false]

Connect

$ moleculer connect

# Connect to NATS

$ moleculer connect mqtt://localhost

# Connect to AMQP

Call

--version Show version number [boolean]

--help Show help [boolean]

--hot, -h Enable hot-reload [boolean] [default: false]

--serializer Serializer [string] [default: null]

Options

--version Show version number [boolean]

--help Show help [boolean]

--level Logging level [string] [default: "silent"]

--id NodeID [string] [default: null]

Example with params
Example with params & meta
Example with post processing the result with jq

The transporter can be defined via TRANSPORTER environment variable, as well.

Example with transporter env var

TRANSPORTER=nats://localhost:42222 moleculer call math.add --@a 5 --@b 3

Emit

Options

--transporter, -t Transporter connection string (NATS, nats://127.0.0.1:4222,

...etc) [string] [default: null]

Example with params
Example with params & meta
Example with broadcast & groups

moleculer emit math.add --transporter NATS --broadcast --@id 3 --@name John --group accounts

Example with multi groups

moleculer emit math.add --transporter NATS --broadcast --@id 3 --@name John --group accounts --group mail

Example with transporter env var

Database Adapters

Database per service

Features

default CRUD actions cached actions pagination support

pluggable adapter (NeDB is the default memory adapter for testing & prototyping) official adapters for MongoDB, PostgreSQL, SQLite, MySQL, MSSQL.

Base Adapter

Only use this adapter for prototyping and testing. When you are ready to go into production simply swap to Mongo, Mongoose or Sequelize adapters as they all implement common Settings, Actions and Methods.

Install

$ npm install moleculer-db --save

Usage

name: "users",

// Mixin DB service into (current) 'users' service mixins: [DbService],

},

afterConnected() {

// Create a new user

.then(() => broker.call("users.create", { username: "john",

// List users with pagination

.then(() => broker.call("users.list", { page: 2, pageSize: 10 }).then(console.log));

// Delete a user

.then(() => broker.call("users.remove", { id: 2 }).then(console.log));

Settings

Property Type Default Description
idField String required Name of ID field.
fields Array.<String> null Field filtering list. It must be an Array. If the value is null or undefined doesn’t filter the fields of entities.
populates Array null Schema for population. Read more.
pageSize Number required Default page size in list action.
maxPageSize Number required Maximum page size in list action.
maxLimit Number required Maximum value of limit in find action. Default: -1 (no limit)
entityValidator Object, function null Validator schema or a function to validate the incoming entity in create & ‘insert’ actions.

Actions

Find entities by query.

Parameters

Property Type Default Description

searchFields String required Fields for searching.

query Object required Query object. Passes to adapter.

Results
Parameters

query Object required Query object. Passes to adapter.

Results

Type: Number - Count of found entities.

Parameters

fields Array.<String> - Fields filter. page Number required Page number. pageSize Number required Size of a page. sort String required Sorted fields. search String required Search text.

searchFields String required Fields for searching.

Results
Parameters

Property Type Default Description

- - - -

Results
Parameters

Property Type Default Description entity Object - Entity to save. entities Array.<Object> - Entities to save.

Results

Type: Object, Array.<Object> - Saved entity(ies).

Property Type Default Description

populate Array.<String> - Field list for populate.

fields Array.<String> - Fields filter.

Results

After update, clear the cache & call lifecycle events.

Parameters

Property Type Default Description

Results

Remove an entity by ID.

Parameters

Property Type Default Description

Results

Methods

Get entity(ies) by ID(s).

Parameters

Property Type Default Description

Results

Clear cached entities

Parameters

Property Type Default Description

Results

Encode ID of entity.

Parameters

Property Type Default Description

Results Type: any
Parameters

id any required -

Results Type: any

_find

Parameters

limit Number required Max count of rows. offset Number required Count of skipped rows. sort String required Sorted fields.

search String required Search text.

Results

_count

Get count of entities by query.

Parameters

_list

List entities by filters and pagination results.

Parameters

pageSize Number required Size of a page.

sort String required Sorted fields. search String required Search text. searchFields String required Fields for searching.

Create a new entity.

Parameters

Property Type Default Description

Create many new entities.

Parameters

Property Type Default Description entity Object - Entity to save. entities Array.<Object> - Entities to save.

Results
Parameters

Property Type Default Description

id any, Array.<any> required ID(s) of entity. populate Array.<String> - Field list for populate. fields Array.<String> - Fields filter.

Results

Update an entity by ID.

After update, clear the cache & call lifecycle events.

Parameters

_remove

Remove an entity by ID.

Parameters

Data Manipulation

You can easily use Action hooks to modify (e.g. add timestamps, hash user’s passwords or remove sensitive info) before or after saving the data in DB.

Example of hooks adding a timestamp and removing sensitive data

Populating

The service allows you to easily populate fields from other services. For example: If you have an author field

const broker = new ServiceBroker();

broker.createService({

hooks: {

before: {

return ctx;

}

// Arrow function as a Hook

(ctx, res) => {

}

]

name: "John Doe",

mail: "john.doe@example.com",

// Call "create" action. Before hook will be triggered

.then(() => broker.call("db-with-hooks.create", user))

.catch(err => console.error(err));

Example of populate schema

broker.createService({

// Shorthand populate rule. Resolve the `voters` values with `users.get` action.

"voters": "users.get",

fields: "username fullName"

}

"reviewer": {

field: "reviewerId",

},

// Custom populator handler function

// https://github.com/moleculerjs/moleculer-db/blob/master/packages/moleculer-db/src/index.js#L636

return Promise.resolve(...);

// List posts with populated authors

broker.call("posts.find", { populate: ["author"]}).then(console.log);

Lifecycle entity events

},

entityCreated(json, ctx) {

this.logger.info(`Entity updated by '${ctx.meta.user.name}' user!`);

},

Extend with custom actions

Naturally you can extend this service with your custom actions.

const DbService = require("moleculer-db"); module.exports = {

actions: {

// Increment `votes` field by post ID vote(ctx) {

author: ctx.params.authorID

},

}

Mongo Adapter

This adapter is based on MongoDB.

Install

Dependencies

Usage

const MongoDBAdapter = require("moleculer-db-adapter-mongo"); const broker = new ServiceBroker();

// Create a Mongoose service for `post` entities broker.createService({

// Create a new post

.then(() => broker.call("posts.create", { title: "My first post",

Options

Example with connection URI

new MongoDBAdapter( "mongodb://localhost/moleculer-db" )

Example with connection URI & options

new MongoDBAdapter( "mongodb://db-server-hostname/my-db" , { keepAlive: 1

Mongoose Adapter

Install

Dependencies

To use this adapter you need to install MongoDB on you system.

Usage

"use strict";

name: "posts", mixins: [DbService],

adapter: new MongooseAdapter("mongodb://localhost/moleculer-demo"), model: mongoose.model("Post", mongoose.Schema({

broker.start()

// Create a new post

.then(() => broker.call("posts.find").then(console.log));

Options

Example with connection URI

new MongooseAdapter( "mongodb://localhost/moleculer-db" )

Example with URI and options

Connect to multiple DBs

Sequelize Adapter

Install

You have to install additional packages for your database server:

# For SQLite

$ npm install pg pg-hstore --save

# For MSSQL

Usage

const broker = new ServiceBroker();

// Create a Sequelize service for `post` entities broker.createService({

},

options: {

broker.start()

// Create a new post

.then(() => broker.call("posts.find").then(console.log));

Options

All constructor arguments are passed to the Sequelize constructor. Read more about Sequelize connection.

Example with connection URI
Example with connection options
Copyright © 2009-2023 UrgentHomework.com, All right reserved.