Clearperiod the counter key string key number inc let counter this
What is Moleculer?
Features
support versioned services support Streams
service mixins
master-less architecture, all nodes are equal parameter validation with fastest-validator
built-in metrics feature with reporters (Console, CSV, Datadog, Event, Prometheus, StatsD) built-in tracing feature with exporters (Console, Datadog, Event, Jaeger, Zipkin)
Broker
Create a ServiceBroker
Create broker with default settings:
Create broker with custom settings:
const { ServiceBroker } = require("moleculer"); const broker = new ServiceBroker({
nodeID: "my-node"
Create broker with transporter to communicate with remote nodes:
requestTimeout: 5 * 1000
});
Metadata option
});
The metadata property can be obtained by running $node.list action. The metadata property is transferred to other nodes.
Ping
Ping a node with 1 second timeout
Output
nodeID: 'node-123', elapsedTime: 16,
timeDiff: -3
Ping multiple nodes
Output
"node-100": {
nodeID: 'node-100', elapsedTime: 10,
timeDiff: 850
}
Ping all available nodes
Output
nodeID: 'node-100', elapsedTime: 10,
timeDiff: -2
},
"node-102": {
Properties of ServiceBroker
Methods of ServiceBroker
Name Response Description
broker.stop() Promise Stop broker.
broker.repl() - Start REPL mode.
Promise Wait for services.
|
|||
---|---|---|---|
|
|
||
broker.broadcast(eventName, payload, opts) - Broadcast an event.
broker.broadcastLocal(eventName, payload, opts)
event name.
broker.generateUid() String Generate an UUID/token.
broker.isMetricsEnabled() Boolean Check the metrics feature is enabled.
broker.isTracingEnabled() Boolean Check the tracing feature is enabled.
Global error handler
Catch, handle & log the error
});
Catch & throw further the error
const broker = new ServiceBroker({ errorHandler(err, info) {
Configuration
Broker options
These options can be used in ServiceBroker constructor or in moleculer.config.js file.
List of all available broker options:
namespace: String - Namespace of nodes to segment your nodes on the same network (e.g.: “development”, “staging”, “production”). Default: ""
requestTimeout: Number - Number of milliseconds to wait before reject a request with a RequestTimeout error.
Disabled: 0 Default: 0
heartbeatInterval: Number - Number of seconds to send heartbeat packet to other nodes. Default: 5
heartbeatTimeout: Number - Number of seconds to wait before setting remote nodes to unavailable status in Registry. Default: 15
transit.maxChunkSize Number - Maximum chunk size while streaming. Default: 256KB transit.disableReconnect: Boolean - Disables the reconnection logic while starting a broker. Default: false transit.disableVersionCheck: Boolean - Disable protocol version checking logic in Transit. Default: false transit.packetLogFilter: Array - Filters out the packets in debug log messages. It can be useful to filter out the HEARTBEAT packets while debugging. Default: []
uidGenerator: Function - Custom UID generator function for Context ID.
metrics: - Enable & configure metrics feature. Default:
tracing: Boolean | Object - Enable & configure tracing feature. Default: false internalServices: Boolean | Object - Register internal services at start. Default: true internalServices.$node - Object - Extend internal services with custom actions. Default: null internalMiddlewares: Boolean - Register internal middlewares. Default: true
than maxSafeObjectSize value. Default: null
created: Function - Fired when the broker created. Default: null
Full options object
{
namespace: "dev", nodeID: "node-25",
delay: 100,
maxDelay: 1000,
heartbeatInterval: 5,
heartbeatTimeout: 15,
registry: {
strategy: "RoundRobin", preferLocal: true
minRequestCount: 20,
halfOpenTime: 10 * 1000,
maxQueueSize: 100,
},
cacher: "MemoryLRU", serializer: "JSON",
validator: true, errorRegenerator: null,
},
tracing: {
internalServices: true, internalMiddlewares: true,
hotReload: true,
},
skipProcessEventRegistration: false, maxSafeObjectSize: null,
Services
Schema
The schema has some main parts: name, version, settings, actions, methods, events.
Simple service schema to define two actions
// math.service.js module.exports = {
sub(ctx) {
return Number(ctx.params.a) - Number(ctx.params.b);
Base properties
// posts.v1.service.js module.exports = {
name: "posts", version: 1
// posts.v2.service.js module.exports = {
name: "posts", version: 2, actions: {
broker.call("v2.posts.find");
REST call
Via API Gateway, make a request to GET /v2/posts/find.
Settings
transport: "mailgun"
},
}
}
Internal Settings
Name Type Default Description
$noServiceNamePrefix Boolean false Disable service name prefixing in action names.
Secure service settings
property in service settings and set the protected property keys. The protected settings won’t be published to other nodes and it won’t appear in Service Registry. These settings will only available under this.settings inside the service functions.
service: 'gmail', auth: {
user: 'gmail.user@gmail.com', pass: 'yourpass'
};
Mixins
Mixins are a flexible way to distribute reusable functionalities for Moleculer services. The Service constructor merges these mixins with the current schema. When a service uses mixins, all properties present in the mixin will be “mixed” into the current service.
mixins: [ApiGwService] settings: {
// Change port setting port: 8080
}
}
Merge algorithm
Property Algorithm
settings Deep extend with defaultsDeep.
metadata Deep extend with defaultsDeep.
dependencies Merge & overwrite.
any custom Merge & overwrite.
Merge algorithm examples
Actions
// math.service.js module.exports = {
name: "math", actions: {
// the `handler` function is required! mult: {
cache: false, params: {
return Number(ctx.params.a) * Number(ctx.params.b);
}
const res = await broker.call("math.add", { a: 5, b: 7 }); const res = await broker.call("math.mult", { a: 10, b: 31 });
Inside actions, you can call other nested actions in other services with ctx.call method. It is an alias to broker.call, but it sets itself as parent context (due to correct tracing chains).
let post = posts[ctx.params.id];
// Populate the post.author field through "users" service
return post;
}
Events
// report.service.js module.exports = {
name: "report",
},
// Subscribe to all "user.*" events "user.*"(ctx) {
this.logger.info(`Node '${ctx.params.id}' is connected!`);
}
Grouping
// payment.service.js module.exports = {
name: "payment", events: {
}
}
Methods
handlers).
Usage
// mailer.service.js module.exports = {
}
},
});
}
name: "posts",
methods: {
}
}
Lifecycle Events
// www.service.js module.exports = {
name: "www", actions: {...},
`broker.createService`)
},
async started() {
// Fired when broker starts this service (in `broker.start()`)
};
Dependencies
If your service depends on other services, use the dependencies property in the schema. The service waits for dependent services before calls the started lifecycle event handler.
dependencies: [
"likes", // shorthand w/o version "v2.auth", // shorthand w version
this.logger.info("It will be called after all dependent services are available."); const users = await this.broker.call("users.list");
}
Wait for services via ServiceBroker
Parameters
Parameter Type Default Description
services String or Array - Service list to waiting
Example
// Called after the `posts` & `v2.users` services are available
});
Set timeout & interval
});
Metadata
The Service schema has a metadata property. You can store here any meta information about service. You can access it as this.metadata inside service functions.
},
actions: { ... }
Properties of Service Instances
this.Promise Promise Class of Promise (Bluebird)
this.logger Logger Logger instance
Service Creation
broker.createService()
add(ctx) {
return Number(ctx.params.a) + Number(ctx.params.b);
Load service from file
math.service.js
// Export the schema of service module.exports = {
name: "math", actions: {
return Number(ctx.params.a) - Number(ctx.params.b);
}
Load it with broker:
// Load service broker.loadService("./math.service");
// Start broker broker.start();
actions: {
add(ctx) {
}
}
let users = [. ];
return {
}
};
Load multiple services from a folder
Syntax
Example
// Load every *.service.js file from the current folder (including subfolders) broker.loadServices("./");
// Load every user*.service.js file from the "./svc" folder broker.loadServices("./svc", "user*.service.js");
Load with Moleculer Runner (recommended)
Hot Reloading Services
Enable in broker options
broker.loadService("./services/test.service.js");
Enable it in Moleculer Runner
Turn it on with --hot or -H flags.
Local Variables
If you would like to use local properties/variables in your service, declare them in the created event handler.
Example for local variables
const http = require("http"); module.exports = {
created() {
// Create HTTP server
},
stopped() {
res.end("Hello Moleculer!");
}
Naming restriction
ES6 Classes
Native ES6 classes with schema parsing
Define actions and events handlers as class methods and call the parseServiceSchema method in constructor with schema definition where the handlers pointed to these class methods.
const Service = require("moleculer").Service; class GreeterService extends Service {
},
dependencies: [ "auth", "users"
actions: {
hello: this.hello, welcome: {
name: "string"
},
"user.created": this.userCreated
},
)
};
return `Welcome, ${this.settings.upperCase ? name.toUpperCase() : name}`;
}
}
serviceStarted() {
}
}
Use decorators
Need a compiler
const { Service, Action, Event, Method } = require('moleculer-decorators'); const web = require('moleculer-web');
const broker = new ServiceBroker();
]
}
// With options @Action({
cache: false, params: {
//...
}
@Method
authorize(ctx, route, req, res) {
}
started() { // Reserved for moleculer, fired when started
}
stopped() { // Reserved for moleculer, fired when stopped
Internal Services
The ServiceBroker contains some internal services to check the node health or get some registry information. You can disable them by setting internalServices: false in broker options.
List of nodes
It lists all known nodes (including local node).
Parameters
List of services
Parameters
Name Type Default Description onlyLocal Boolean false List only local services. skipInternal Boolean false Skip the internal services ($node). withActions Boolean false List with actions. onlyAvailable Boolean false List only available services.
List of local actions
It lists all registered actions (local & remote).
Options
List of local events
broker.call("$node.events").then(res => console.log(res));
It has some options which you can declare within params.
Options
List of metrics
It has some options which you can declare within params.
Options
Name Type Default Description
Get Broker options
Health of node
broker.call("$node.health").then(res => console.log(res));
Example health info:
"load15": 0,
"cores": 4,
"total": 17161699328,
"percent": 7.094400109979598
"platform": "win32", "user": {
"uid": -1,
"process": {
"pid": 13096,
},
"uptime": 25.447
"langVersion": "v8.9.4"
},
"192.168.130.1",
"192.168.56.1",
"now": 1487338958409,
"iso": "2018-02-17T13:42:38.409Z", "utc": "Fri, 17 Feb 2018 13:42:38 GMT"
Extending
´s internalServices option.
// moleculer.config.js module.exports = {
return `Hello Moleculer!`;
}
Actions
The actions are the callable/public methods of the service. The action calling represents a remote-procedure-call (RPC). It has request parameters & returns response, like a HTTP request.
If you have multiple instances of services, the broker will load balance the request among instances.
Call services
Syntax
The params is an object which is passed to the action as a part of the Context. The service can access it via ctx.params. It is optional. If you don’t define, it will be {}.
The opts is an object to set/override some request parameters, e.g.: timeout, retryCount. It is optional.
Available calling options:
define fallbackResponse, broker will throw a RequestTimeout error. To disable set 0. If it’s not defined, the requestTimeout value from broker options will be used. Read more.
Count of retry of request. If the request is timed out, broker will try to call again. To disable set 0. If it’s not defined, the retryPolicy.retries value from broker options will be used. Read more.
parentCtx Context null Parent Context instance. Use it to chain the calls.
requestID String null Request ID or Correlation ID. Use it for tracing.
Usages
Call without params
Call with params
Call with calling options
fallbackResponse: defaultRecommendation
});
Call with promise error handling
Direct call: get health info from the “node-21” node
Metadata
Send meta information to services with meta property. Access it via ctx.meta in action handlers. Please note that in nested calls the meta is merged.
broker.createService({ name: "test", actions: {
second(ctx) {
The meta is sent back to the caller service. Use it to send extra meta information back to the caller. E.g.: send response headers back to API gateway or set resolved logged in user to metadata.
console.log(ctx.meta);
// Prints: { a: "John", b: 5 }
}
});
// Prints: { a: "John", b: 5 }
broker.call("test.first", null, { meta: { a: "John"
Internal calls
// Prints: { user: 'John' } ctx.meta.age = 123
return this.actions.subHello(ctx.params, { parentCtx: ctx });
}
}
Timeout
Example
nodeID: "node-1", requestTimeout: 3000
};
handler(ctx) { return "Normal";
}
}
}
Calling examples
await broker.call("greeter.slow", null, { timeout: 1000 });
Multiple calls
Calling multiple actions at the same time is also possible. To do it use broker.mcall or ctx.mcall.
],
{
mcall with Object and options.meta
await broker.mcall(
{
// Common calling options for all calls.
meta: { token: '63f20c2d-8902-4d86-ad87-b58c9e2333c2' }
Example
const res = await broker.mcall([
The res will be something similar to
[
Streaming
Moleculer supports Node.js streams as request params and as response. Use it to transfer an incoming file from a gateway, encode/decode or compress/decompress streams.
Examples
Send a file to a service as a stream
const stream = fs.createReadStream(fileName);
Object Mode Streaming
Receiving a stream in a service
save(ctx) {
// Save the received stream to a file
Return a stream as response in a service
module.exports = { name: "storage", actions: {
get: {
return fs.createReadStream(`/tmp/${ctx.params.filename}`);
}
Process received stream on the caller side
.then(stream => {
const s = fs.createWriteStream(`./${filename}`);
AES encode/decode example service
module.exports = { name: "aes", actions: {
encrypt(ctx) {
}
}
Action visibility
Available values:
protected:
private: can be called only internally (via this.actions.xy() inside service)
Change visibility
handler(ctx) {}
}
}
}
Action hooks
Before hooks
If there are any problem, it can throw an Error. Please note, you can’t break/skip the further executions of hooks or action handler.
Main usages:
After hooks
Error hooks
Service level declaration
Hooks can be assigned to a specific action (by indicating action name), all actions (*) in service or by indicating a wildcard (e.g., create-*). The latter will be applied to all actions whose name starts with create-.
Before hooks
const DbService = require("moleculer-db"); module.exports = {
// Define multiple hooks for action `remove` remove: [
function isAuthenticated(ctx) { if (!ctx.user)
}
],
async function (ctx){}
],
ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id });
}
After & Error hooks
after: {
// Define a global hook for all actions to remove sensitive data "*": function(ctx, res) {
get: [
// Add a new virtual field to the entity async function (ctx, res) {
if (res.referrer)
res.referrer = await ctx.call("users.get", { id: res._id });
async function (ctx, res){}
],
error: {
// Global error handler "*": function(ctx, err) {
// Applies to all actions that start with "create-" "create-*": [
async function (ctx, err){}
}
}
Action level declaration
Before & After hooks
hooks: {
before(ctx) {
}
},
});
Execution order
It is important to keep in mind that hooks have a specific execution order. This is especially important to remember when multiple hooks are registered at different (service and/or action) levels. Overall, the hooks have the following execution logic:
Example of a global, service & action level hook execution chain
before: {
"*"(ctx) {
}
},
hello(ctx, res) {
broker.logger.info(chalk.magenta(" After hook")); return res;
hello: {
hooks: {
broker.logger.info(chalk.yellow.bold(" After action hook")); return res;
}
}
}
Output produced by global, service & action level hooks
INFO - After all hook
Reusability
The most efficient way of reusing hooks is by declaring them as service methods in a separate file and import them with the mixin mechanism. This way a single hook can be easily shared across multiple actions.
},
checkUserRole(ctx) {
}
}
mixins: [MyAuthMixin] hooks: {
before: {
actions: {
find: {
},
update: {
Local Storage
The locals property of Context object is a simple storage that can be used to store some additional data and pass it to the action handler. locals property and hooks are a powerful combo:
Setting ctx.locals in before hook
const entity = await this.findEntity(ctx.params.id); ctx.locals.entity = entity;
}
params: {
id: "number"
}
}
Events
Balanced events
Example: you have 2 main services: users & payments. Both subscribe to the user.created event. You start 3 instances of users service and 2 instances of payments service. When you emit
the user.created event, only one users and one payments service instance will receive the event.
Example
// Register handler to the "other" group instead of "payment" group. group: "other",
handler(ctx) {
}
}
Emit balanced events
// Only the `mail` & `payments` services receives it broker.emit("user.created", user, ["mail", "payments"]);
Broadcast event
The broadcast event is sent to all available local & remote services. It is not balanced, all service instances will receive it.
// Send to all "mail" service instances broker.broadcast("user.created", { user }, "mail");
// Send to all "user" & "purchase" service instances. broker.broadcast("user.created", { user }, ["user", "purchase"]);
Local broadcast event
Subscribe to events
Legacy event handlers
signature "user.created"(payload) { ... }. It is capable to detect different signatures of event handlers:
If it finds that the signature is "user.created"(ctx) { ... }, it will call it with Event Context. If not, it will call with old arguments & the 4th argument will be the Event Context,
Context-based event handler & emit a nested event
console.log("The called event name:", ctx.eventName);
ctx.emit("accounts.created", { user: ctx.params.user });
'${other.eventName}' received. Payload:`, other.params, other.meta);
}
module.exports = { events: {
// Subscribe to `user.created` event "user.created"(ctx) {
}
// Subscribe to every events
}
Event parameter validation
Similar to action parameter validation, the event parameter validation is supported.
// Validation schema params: {
from: "string|optional", to: "email",
}
}
Internal events
$services.changed
The broker sends this event if the local node or a remote node loads or destroys services.
Payload
Name Type Description
$circuit-breaker.opened
Payload
$circuit-breaker.half-opened
Payload
Name Type Description nodeID String Node ID action String Action name
$circuit-breaker.closed
The broker sends this event when the circuit breaker module change its state to closed.
Payload
$node.connected
Payload
$node.updated
Payload
Name Type Description
node Node Node info object
$node.disconnected
Payload
$broker.started
The broker sends this event once broker.start() is called and all local services are started.
$broker.stopped
The broker sends this event once broker.stop() is called and all local services are stopped.
$transporter.connected
$transporter.disconnected
$broker.error
Event payload
"error": "<the error object with all properties>"
"module": "broker" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:
$transit.error
Event payload
"error": "<the error object with all properties>"
"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:
$transporter.error
Event payload
"error": "<the error object with all properties>"
"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:
$cacher.error
Event payload
"error": "<the error object with all properties>"
"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:
$discoverer.error
Event payload
"error": "<the error object with all properties>"
"module": "transit" // Name of the module where the error happened "type": "error-type" // Type of error. Full of error types:
Context
Properties of Context
Name Type Description
ctx.broker ServiceBroker Instance of the broker.
|
||
---|---|---|
|
||
|
||
|
||
|
||
|
|
|
|
|
|
|
|
|
|
|
|
Methods of Context
Name Response Description
ctx.emit() void Emit an event, same as broker.emit ctx.broadcast() void Broadcast an event, same as broker.broadcast ctx.startSpan(name, opts) Span Creates a new child span. ctx.finishSpan(span) void Finishes a span.
ctx.toJSON() Object Convert Context to a printable JSON.
Context tracking
Enable context tracking & change the timeout value
tracking: {
enabled: true, shutdownTimeout: 10 * 1000
Disable tracking in calling option
Lifecycle
Broker lifecycle
This section describes what happens when the broker is starting & stopping.
Starting logic
When starting, the broker tries to establish a connection with the transporter. When it’s done, it doesn’t publish the local service list to remote nodes because it can’t accept request yet. It starts all services (calls
Avoid deadlocks
Stopping logic
Service lifecycle
created event handler
This handler is triggered when the service instance is created (e.g.:
// Create HTTP server
this.server = http.createServer(this.httpHandler);
This handler is triggered when the broker.start is called and the broker starts all local services. Use it to connect to database, listen servers…etc.
module.exports = { name: "users", async started() {
}
}
module.exports = { name: "users", async stopped() {
try {
}
};
name: "posts",
settings: {},
},
handler(ctx) {
merged(schema) {
// Modify the service settings schema.settings.myProp = "myValue";
Middlewares
Example
// awesome.middleware.js module.exports = {
name: "Awesome",
}
};
Wrapping handlers
Wrap local action handler
// Wrap the handler return function(ctx) {
doSomethingBeforeHandler(ctx);
.catch(err => { doSomethingAfterHandlerIfFailed(err);
// Throw further the error throw err;
// So it won't cut down the performance when the feature is disabled. return next;
}
Example validator middleware
this.validate(action.params, ctx.params); return next(ctx);
};
The next is the original handler or the following wrapped handler. The middleware should return either the original handler or a new wrapped handler. As you can see above, the middleware checks whether the action has a params property. If yes, it will return a wrapped handler which calls the validator module before calling the original handler. If the params property is not defined, it simply returns the original handler (skip wrapping).
If you don’t call the original next in the middleware it will break the request. It can be used in cachers. For example, if it finds the requested data in the cache, it’ll return the cached data instead of calling the next.
Example cacher middleware
// Found in the cache! Don't call next, return with the cached content ctx.cachedResult = true;
return content;
}.bind(this);
}
Decorate core modules (extend functionality)
Decorate broker with a new allCall method
middlewares: [
{
.map(node => node.id);
// Make direct call to the given Node ID return Promise.all(
}
]
Hooks
This hook wraps the local action handlers.
// my.middleware.js module.export = {
// Do something with the response return res;
})
}
}
remoteAction(next, action) { return function(ctx) {
// Change context properties or something return next(ctx)
// Handle error or throw further throw err;
});
This hook wraps the local event handlers.
// my.middleware.js module.export = {
}
}
localMethod(next, method) { return (...args) => {
console.log(`The '${method.name}' method is called in '${method.service.fullName}' service.`, args);
This hook wraps the broker.createService method.
};
}
name: "MyMiddleware",
destroyService(next) {
}
call(next)
return function(actionName, params, opts) { console.log("The 'call' is called.", eventName); return next(actionName, params, opts).then(res => {
console.log("Response:", res); return res;
This hook wraps the broker.mcall method.
console.log("The 'call' is called.", eventName); return next(...arguments).then(res => {
console.log("Response:", res); return res;
emit(next)
This hook wraps the broker.emit method.
};
}
name: "MyMiddleware",
broadcast(next) {
This hook wraps the broker.broadcastLocal method.
console.log("The 'broadcastLocal' is called.", eventName); return next(eventName, payload, opts);
};
// my.middleware.js module.export = {
name: "MyMiddleware",
serviceStarting(service) (async)
This hook is called before service starting.
}
}
serviceStarted(service) {
console.log("Service started", service.fullName);
// my.middleware.js module.export = {
name: "MyMiddleware",
serviceStopped(service) (async)
This hook is called after service stopping.
}
}
registerLocalService(next) { return (service) => {
console.log("Registering a local service", service.name); return next(service);
This hook is called during local service creation (after mixins are applied, so service schema is merged completely).
// my.middleware.js module.export = {
}
return next(packet);
};
// my.middleware.js module.export = {
name: "MyMiddleware",
}
transporterSend(next)
return (topic, data, meta) => {
// Do something with data. Data is a `Buffer` return next(topic, data, meta);
This hook is called after transporter received a communication packet but before serialization.
// my.middleware.js module.export = {
}
}
newLogEntry(type, args, bindings) {
// Do something with the `args`.
// my.middleware.js module.export = {
name: "MyMiddleware",
This hook is called before broker starting.
// my.middleware.js module.export = {
}
started(broker) (async)
}
}
stopping(broker) {
console.log("Broker is stopping");
// my.middleware.js module.export = {
name: "MyMiddleware",
Internal middlewares
Internal middlewares
Class name Type Description
ActionHook Optional Action hooks handler.
Retry Always Retry feature.
Fallback Always Fallback feature.
Transmit.Encryption Optional Transmission encryption middleware. Transmit.Compression Optional Transmission compression middleware. Debugging.TransitLogger Optional Transit Logger.
Debugging.ActionLogger Optional Action logger.
Access to internal middlewares
Transmission Middleware
Encryption
const { Middlewares } = require("moleculer"); const initVector = crypto.randomBytes(16);
module.exports = { middlewares: [
Compression
// moleculer.config.js
const { Middlewares } = require("moleculer");
};
Debug Middlewares
Transit Logger
Transit logger middleware allows to easily track the messages that are exchanged between services.
logPacketData: false, folder: null,
colors: {
]
};
Complete option list
folder Object null Folder where logs will be written extension String .json File extension of log file color.receive String grey Supports all Chalk colors color.send String grey Supports all Chalk colors packetFilter Array<String> HEARTBEAT Type of packets to skip
Action Logger
Action Logger middleware tracks “how” service actions were executed.
logParams: true, logResponse: true, folder: null, colors: {
send: "magenta", receive: "blue"
};
Complete option list
Class name Type Default Description
folder String null Path do folder where logs will be written extension String .json File extension of log file color.request String yellow Supports all Chalk colors color.response String cyan Supports all Chalk colors
colors.error String red Supports all Chalk colors
Event Execution Rate
Throttle
//my.service.js module.exports = {
name: "my", events: {
};
Debounce
Unlike throttling, debouncing is a technique of keeping the trigger rate at exactly 0 until a period of calm, and then triggering the listener exactly once. Same functionality as lodash’s _.debounce. For more info about debouncing check this article.
}
}
Loading & Extending
Load middleware by name
// Extend with custom middleware Middlewares.MyCustom = {
created(broker) {
// Load middleware by name "MyCustom"
]
Networking
Transporters
There are several built-in transporters in Moleculer framework.
TCP transporter
This is a no-dependency, zero-configuration TCP transporter. It uses Gossip protocol to disseminate node statuses, service list and heartbeats. It contains an integrated UDP discovery feature to detect new and disconnected nodes on the network.
Use TCP transporter with default options
All TCP transporter options with default values
// moleculer.config.js module.exports = {
logger: true, transporter: {
// UDP bind address (if null, bind on all interfaces) udpBindAddress: null,
// UDP sending period (seconds) udpPeriod: 30,
// Static remote nodes address list (when UDP discovery is not available) urls: null,
// Use hostname as preffered connection address useHostname: true,
}
};
TCP transporter with static endpoint list
"172.17.0.1:6000/node-1",
"172.17.0.2:6000/node-2", "172.17.0.3:6000/node-3"
You don’t need to set port because it find & parse the self TCP port from URL list.
TCP transporter with shorthand static endpoint list
It needs to start with tcp://.
TCP transporter with static endpoint list file
// moleculer.config.js module.exports = {
nodeID: "node-1",
"127.0.0.1:7001/server-1", "127.0.0.1:7002/server-2"
]
Serviceless node
NATS Transporter
// moleculer.config.js module.exports = {
nodeID: "server-1",
Dependencies
Examples
// moleculer.config.js module.exports = {
transporter: "NATS"
Connect to a remote NATS server
Connect to a remote NATS server with auth
// moleculer.config.js module.exports = {
transporter: "nats://user:pass@nats-server:4222"
Connect with options
pass: "1234"
}
Connect with TLS
type: "NATS", options: {
servers: ["nats://localhost:4222"]
}
};
Redis Transporter
};
Dependencies
To use this transporter install the ioredis module with npm install ioredis --save command.
Examples
Connect with connection string
// moleculer.config.js module.exports = {
transporter: "redis://localhost:6379"
Connect to a secure Redis server
Connect with options
// moleculer.config.js module.exports = {
transporter: { type: "Redis", options: {
Connect to Redis cluster
// moleculer.config.js module.exports = {
transporter: { type: "Redis", options: {
]
}
MQTT Transporter
// moleculer.config.js module.exports = {
nodeID: "server-1",
Dependencies
Examples
// moleculer.config.js module.exports = {
transporter: "MQTT"
Connect with connection string
Connect to secure MQTT server
// moleculer.config.js module.exports = {
transporter: "mqtts://mqtt-server:1883"
Connect with options
qos: 0, topicSeparator: "."
}
AMQP (0.9) Transporter
nodeID: "server-1",
transporter: "amqp://rabbitmq-server:5672"
Dependencies
Transporter options
Connect to ‘amqp://guest:guest@localhost:5672’
transporter: "AMQP"
});
Connect to a remote server
Connect to a secure server
transporter: "amqps://rabbitmq-server:5672"
});
Connect to a remote server with options & credentials
servername: process.env.RABBIT_SERVER_NAME
}
};
AMQP 1.0 Transporter
Built-in transporter for AMQP 1.0 protocol (e.g.: ActiveMq or RabbitMQ + rabbitmq-amqp1.0 plugin).
Dependencies
To use this transporter install the rhea-promise module with npm install rhea-promise -- save command.
Transporter options
Options can be passed to rhea.connection.open() method, the topics, the queues, and the messages themselves.
Connect to ‘amqp10://guest:guest@localhost:5672’
Connect to a remote server
transporter: "amqp10://activemq-server:5672"
};
Connect to a remote server with options & credentials
connectionOptions: { // rhea connection options https://github.com/amqp/rhea#connectoptions, example:
ca: "", // (if using tls) servername: "", // (if using tls)
topicPrefix: "topic://", // RabbitMq uses '/topic/' instead, 'topic://' is more
common
Kafka Transporter
It is a simple implementation. It transfers Moleculer packets to consumers via pub/sub. There are not implemented offset, replay…etc features.
Dependencies
To use this transporter install the kafka-node module with npm install kafka-node --save command.
Connect to Zookeeper
Connect to Zookeeper with custom options
transporter: { type: "kafka", options: {
host: "192.168.51.29:2181",
// KafkaProducer options. More info: https://github.com/SOHU-Co/kafka- node#producerclient-options-custompartitioner
producer: {}, customPartitioner: undefined,
publish: {
partition: 0,
};
NATS Streaming (STAN) Transporter
Built-in transporter for NATS Streaming.
};
Dependencies
To use this transporter install the node-nats-streaming module with npm install node-nats-streaming
Examples
};
Connect with connection string
// moleculer.config.js module.exports = {
Connect with options
url: "stan://127.0.0.1:4222", clusterID: "my-cluster"
}
Custom transporter
Create custom transporter
connect() { /*...*/ }
disconnect() { /*...*/ }
Use custom transporter
const MyTransporter = require("./my-transporter");
module.exports = {
Disabled balancer
Example
disableBalancer: true,
transporter: "nats://some-server:4222"
Please note
Serialization
Example
// moleculer.config.js module.exports = {
nodeID: "server-1", transporter: "NATS", serializer: "ProtoBuf"
JSON serializer
};
Avro serializer
Built-in Avro serializer.
Dependencies
MsgPack serializer
Built-in MsgPack serializer.
// moleculer.config.js module.exports = {
Dependencies
Notepack serializer
// moleculer.config.js module.exports = {
serializer: "Notepack"
Dependencies
ProtoBuf serializer
serializer: "ProtoBuf"
};
Dependencies
Thrift serializer
};
Dependencies
To use this serializer install the thrift module with npm install thrift --save command.
CBOR serializer
Custom serializer
Custom serializer module can be created. We recommend to copy the source of JSONSerializer and implement the serialize and deserialize methods.
Create custom serializer
const BaseSerializer = require("moleculer").Serializers.Base; class MySerializer extends BaseSerializer {
Use custom serializer
module.exports = {
serializer: new MySerializer()
Registry & Discovery
Dynamic service discovery
Local
Local Discovery with default options
// moleculer.config.js module.exports = {
registry: {
Local Discovery with custom options
registry: {
discoverer: {
// Disable removing offline nodes from registry, if true disableOfflineNodeRemoving: false,
// Remove offline nodes after 10 minutes cleanOfflineNodesTimeout: 10 * 60
Redis
Redis-based discovery uses a dedicated connection with the Redis server to exchange discovery and heartbeat packets. This approach reduces the load over the transporter module, it’s used exclusively for the exchange of the request, response, event packets.
When Redis-based discovery method is enabled, Moleculer nodes periodically publish and fetch the info from Redis and update their internal service registry. Redis key expiration mechanism removes nodes that don’t publish heartbeat packets for a certain period of time. This allows Moleculer nodes to detect that a specific node has disconnected.
Example of connection to a local Redis server
discoverer: "Redis"
}
Example of connection to a remote Redis server
}
}
Example with options
redis: {
// Redis connection options.
}
// Serializer serializer: "JSON",
// Monitoring Redis commands monitor: true,
// --- COMMON DISCOVERER OPTIONS ---
// Remove offline nodes after 10 minutes cleanOfflineNodesTimeout: 10 * 60
}
etcd3
Etcd3-based discovery method is very similar to Redis-based discovery. It stores heartbeat and discovery packets at etcd3 server. etcd3’s lease option will remove heartbeat info of nodes that have crashed or disconnected from the network.
This method has the same strengths and weaknesses of Redis-based discovery. It doesn’t use the transporter module for the discovery but it’s also slower to detect new or disconnected nodes.
Example to connect local etcd3 server
}
}
Example to connect remote etcd3 server
}
Example with options
// moleculer.config.js module.exports = {
// etcd3 connection options.
// More info: https://mixer.github.io/etcd3/interfaces/options_.ioptions.html
// 10 means every 10 cycle.
fullCheck: 10,
// Disable removing offline nodes from registry, if true disableOfflineNodeRemoving: false,
// Remove offline nodes after 10 minutes cleanOfflineNodesTimeout: 10 * 60
Tip: To further reduce network traffic use MsgPack/Notepack serializers instead of JSON.
Customization
You can create your custom discovery mechanism. We recommend to copy the source of Redis Discoverer and implement the necessary methods.
Built-in Service Registry
Load balancing
Built-in strategies
RoundRobin strategy
Usage
// moleculer.config.js module.exports = {
registry: {
Random strategy
Usage
// moleculer.config.js module.exports = {
registry: {
CPU usage-based strategy
Usage
// moleculer.config.js module.exports = {
registry: {
Strategy options
sampleCount Number 3 The number of samples. To turn of sampling, set to 0.
The low CPU usage percent (%). The node which has lower CPU usage than this value
Usage with custom options
sampleCount: 3,
lowCpuUsage: 10
Latency-based strategy
Usage
// moleculer.config.js module.exports = {
registry: {
Strategy options
sampleCount Number 5 The number of samples. If you have a lot of hosts/nodes, it’s recommended to increase the value. To turn of sampling, set to 0.
lowLatency Number 10 The low latency (ms). The node which has lower latency than this value is selected immediately.
Usage with custom options
strategy: "Latency", strategyOptions: {
sampleCount: 15,
}
};
Sharding strategy
registry: {
strategy: "Shard", strategyOptions: {
Example of a shard key user.id in context meta
// moleculer.config.js module.exports = {
}
};
Strategy options
All available options of Shard strategy
// moleculer.config.js module.exports = {
registry: {
}
}
Overwrite global options
Using ‘Shard’ strategy for ‘hello’ action instead of global ‘RoundRobin’
strategy: "RoundRobin"
}
params: {
name: "string"
handler(ctx) {
return `Hello ${ctx.params.name}`;
Custom strategy
Custom strategy can be created. We recommend to copy the source of RandomStrategy and implement the select method.
Create custom strategy
const BaseStrategy = require("moleculer").Strategies.Base; class MyStrategy extends BaseStrategy {
Use custom strategy
const Strategies = require("moleculer").Strategies
// Add custom strategy to the registry Strategies.register("myCustomStrategy", MyStrategy)
};
Preferring local services
The ServiceBroker first tries to call the local instances of service (if exists) to reduce network latencies. It means, if the given service is available on the local broker, the configured strategy will be skipped and the broker will call the local service always.
}
};
Fault tolerance
Circuit Breaker
What is the circuit breaker?
Enable it in the broker options
const broker = new ServiceBroker({ circuitBreaker: {
enabled: true, threshold: 0.5,
Settings
Name Type Default Description
Enable feature
Threshold value. 0.5 means that 50% should be failed for tripping.
These global options can be overridden in action definition, as well.
// users.service.js module.export = {
// All CB options can be overwritten from broker options. threshold: 0.3,
windowTime: 30
Retry
handler(ctx) {}
There is an exponential backoff retry solution. It can recall failed requests.
Enable it in the broker options
factor: 2,
check: err => err && !!err.retryable
Settings
Name Type Default Description
delay Number 100 First delay in milliseconds.
maxDelay Number 2000 Maximum delay in milliseconds.
Overwrite the retries value in calling option
Overwrite the retry policy values in action definitions
name: "users", actions: {
find: {
handler(ctx) {}
},
handler(ctx) {}
}
Timeout
Enable it in the broker options
});
Overwrite the timeout value in calling option
broker.call("posts.find", {}, { timeout: 3000 });
Distributed timeouts
Bulkhead
Enable it in the broker options
maxQueueSize: 10,
}
Global Settings
Name Type Default Description
The concurrency value restricts the concurrent request executions. If the maxQueueSize is bigger than 0, broker stores the additional requests in a queue if all slots are taken. If the queue size reaches the maxQueueSize limit, broker will throw QueueIsFull exception for every addition requests.
Action Settings
Global settings can be overridden in action definition.
Overwrite the retry policy values in action definitions
// Disable bulkhead for this action enabled: false
},
// Increment the concurrency value for this action concurrency: 10
},
Events Settings
Event handlers also support bulkhead feature.
Example
// my.service.js module.exports = {
async handler(ctx) {
// Do something.
Fallback
Fallback feature is useful, when you don’t want to give back errors to the users. Instead, call an other action or return some common content. Fallback response can be set in calling options or in action definition. It should be
a Function which returns a Promise with any content. The broker passes the current Context & Error objects to this function as arguments.
Fallback response setting in calling options
}
});
Fallback in action definition
Fallback as a function
add: {
fallback: (ctx, err) => "Some cached result", handler(ctx) {
};
Fallback as method name string
module.exports = { name: "recommends", actions: {
}
}
}
};
Caching
Cached action example
});
// Create a service broker.createService({
{ id: 1, name: "John" },
{ id: 2, name: "Jane" }
});
broker.start()
.then(() => {
// Return from cache, handler won't be called
Console messages:
As you can see, the Handler called message appears only once because the response of second request is returned from the cache.
Cache keys
Example hashed cache key for “posts.find” action
The params object can contain properties that are not relevant for the cache key. Also, it can cause performance issues if the key is too long. Therefore it is recommended to set an object for cache property which contains a list of essential parameter names under the keys property.
To use meta keys in cache keys use the # prefix.
Strict the list of params & meta properties for key generation
// generate cache key from "limit", "offset" params and "user.id" meta keys: ["limit", "offset","#user.id"]
},
}
}
Performance tip
Limiting cache key length
Occasionally, the key can be very long, which can cause performance issues. To avoid it, maximize the length of concatenated params in the key with maxParamsLength cacher option. When the key is longer than the configured limit value, the cacher calculates a hash (SHA256) from the full key and adds it to the end of the key.
The minimum of maxParamsLength is 44 (SHA 256 hash length in Base64). To disable this feature, set it to 0 or null.
Generate a full key from the whole params without limit
Generate a limited-length key
maxParamsLength: 60
}
Conditional caching
Conditional caching allows to bypass the cached response and execute an action in order to obtain “fresh” data. To bypass the cache set ctx.meta.$cache to false before calling an action.
Example of turning off the caching for the greeter.hello action
Example of a custom conditional caching function
hello: {
cache: {
handler(ctx) {
this.logger.debug(chalk.yellow("Execute handler")); return `Hello ${ctx.params.name}`;
// Use custom `enabled` function to turn off caching for this request broker.call("greeter.hello", { name: "Moleculer", noCache: true }))
TTL
Default TTL setting can be overriden in action definition.
}
});
},
handler(ctx) {
});
Custom key-generator
To overwrite the built-in cacher key generator, set your own function as keygen in cacher options.
// name - action name
// params - ctx.params
}
});
Manual caching
// Remove entry from cache
await broker.cacher.del("mykey.a");
Additionally, the complete ioredis client API is available at broker.cacher.client when using the built-in Redis cacher:
// create an ioredis pipeline
Clear cache
When you create a new model in your service, you have to clear the old cached model entries.
Example to clean the cache inside actions
{
// Clear all cache entries this.broker.cacher.clean();
// Clear all cache entries which keys start with `users.` this.broker.cacher.clean("users.**");
}
}
Clear cache among multiple service instances
Example
// Create new user entity
const user = new User(ctx.params);
methods: {
cleanCache() {
this.broker.broadcast("cache.clean.users"); events: {
"cache.clean.users"() {
}
Clear cache among different services
Service dependency is a common situation. E.g. posts service stores information from users service in cached entries (in case of populating).
Example cache entry in posts service
fullName: "John Doe", avatar: "https://..."
},
cache.cleaner.mixin.js
module.exports = function(serviceNames) { const events = {};
serviceNames.forEach(name => { events[`cache.clean.${name}`] = function() {
});
return {
posts.service.js
name: "posts",
mixins: [CacheCleaner([ "users",
}
};
Cache locking
Enable Lock
lock: true, // Set to true to enable cache locks. Default is disabled.
}
Enable with TTL
seconds
staleTime: 10, // If the TTL is less than this number, means that the resources are staled
Disable Lock
ttl: 60, lock: {
enable: false, // Set to false to disable.
}
}
Example for Redis cacher with redlock library
// set Time-to-live to 30sec. ttl: 30,
// Turns Redis client monitoring on. monitor: false,
},
lock: {
// Redis clients. Support node-redis or ioredis. By default will use the local
client.
// the max number of times Redlock will attempt
// to lock a resource before erroring retryCount: 10,
}
}
Built-in cachers
Memory cacher
Enable memory cacher
Or
const broker = new ServiceBroker({ cacher: true
Enable with options
}
}
Options
keygen Function null Custom cache key generator function. maxParamsLength Number null Maximum length of params in generated keys. lock Boolean or Object null Enable lock feature.
Cloning
The cacher uses the lodash _.cloneDeep method for cloning. To change it, set a Function to the clone option instead of a Boolean.
Custom clone function with JSON parse & stringify
}
});
LRU memory cacher
Enable LRU cacher
With options
cacher: {
type: "MemoryLRU", options: {
});
Options
|
|||
---|---|---|---|
|
|
null | ||
---|---|---|
|
null |
|
Redis cacher
Enable Redis cacher
const broker = new ServiceBroker({ cacher: "Redis"
});
With connection string
With options
// Prefix for keys prefix: "MOL",
// set Time-to-live to 30sec. ttl: 30,
});
port: 6379,
With MessagePack serializer
You can define a serializer for Redis Cacher. By default, it uses the JSON serializer.
const broker = new ServiceBroker({ nodeID: "node-123",
redis: {
host: "my-redis"
With Redis Cluster Client
const broker = new ServiceBroker({ cacher: {
type: "Redis", options: {
{ port: 6381, host: "127.0.0.1" },
{ port: 6382, host: "127.0.0.1" }
});
Options
],
keygen Function null Custom cache key generator function. maxParamsLength Number null Maximum length of params in generated keys. serializer String "JSON" Name of a built-in serializer.
cluster Object null Redis Cluster client configuration. More information
Dependencies
Custom cacher
Custom cache module can be created. We recommend to copy the source of MemoryCacher or RedisCacher and implement the get, set, del and clean methods.
Create custom cacher
const BaseCacher = require("moleculer").Cachers.Base; class MyCacher extends BaseCacher {
Use custom cacher
const { ServiceBroker } = require("moleculer"); const MyCacher = require("./my-cacher");
const broker = new ServiceBroker({ cacher: new MyCacher()
Parameter Validation
Fastest Validator
Default usage
nodeID: "node-100",
validator: true // Using the default Fastest Validator
Setting validator by name
}
Example with options
//moleculer.config.js module.exports = {
aliases: { /*...*/ }
}
Actions Validation
Example
validator: true // Default is true
});
},
handler(ctx) {
});
broker.call("say.hello").then(console.log)
// -> throw ValidationError: "The 'name' field must be a string!"
broker.call("say.hello", { name: "Walter" }).then(console.log)
Example validation schema
id: { type: "number", positive: true, integer: true }, name: { type: "string", min: 3, max: 255 },
status: "boolean" // short-hand def
Documentation
Async custom validator
Enabling custom async validation
validator: {
type: "FastestValidator", options: {
}
}
Using custom async validation
owner: { type: "string", custom: async (value, errors, schema, name, parent, context) => {
});
},
return value;
Events Validation
Please note that the validation errors are not sent back to the caller, as happens with action errors. Event validation errors are logged but you can also catch them with the global error handler.
// mailer.service.js module.exports = {
definitions
params: {
this.logger.info("Event received, parameters OK!", ctx.params);
}
Custom validator
Creating custom validator
//moleculer.config.js
const BaseValidator = require("moleculer").Validators.Base; class MyValidator extends BaseValidator {}
Build Joi validator
const BaseValidator = require("moleculer").Validators.Base; const { ValidationError } = require("moleculer").Errors; const Joi = require("joi");
// --- JOI VALIDATOR CLASS ---
compile(schema) {
return (params) => this.validate(params, schema);
res.error.details);
return true;
});
// --- TEST BROKER ---
name: { type: "string", min: 4 }
},*/
return `Hello ${ctx.params.name}`;
}
.then(() => broker.call("greeter.hello").then(res => broker.logger.info(res)))
.catch(err => broker.logger.error(err.message, err.data))
.then(() => broker.call("greeter.hello", { name: "John" }).then(res => broker.logger.info(res)))
.catch(err => broker.logger.error(err.message, err.data));
Metrics
Enable metrics & define console reporter
metrics: {
enabled: true, reporter: [
Options
Name Type Default Description
enabled Boolean false Enable tracing feature.
reporter Object or Array<Object> null Metric reporter configuration. More info
|
|
||
---|---|---|---|
|
defaultQuantiles Array<Number> Default quantiles for histograms. Default: [0.5, 0.9,
Metrics Reporters
Name Type Default Description
labelNameFormatter Function null Label name formatter
Example of metrics options
// moleculer.config.js module.exports = {
includes: ["moleculer.**.total"],
excludes: ["moleculer.broker.**","moleculer.request.**"],
}
]
Console
This is a debugging reporter which periodically prints the metrics to the console.
// moleculer.config.js module.exports = {
// Printing interval in seconds interval: 5,
// Custom logger. logger: null,
]
}
CSV
enabled: true, reporter: [
{
// Saving mode.
// - "metric" - save metrics to individual files
}
};
Event
Event reporter sends Moleculer events with metric values.
// moleculer.config.js module.exports = {
// Event name
eventName: "$metrics.snapshot",
}
}
Datadog
// moleculer.config.js module.exports = {
metrics: {
// Base URL
baseUrl: "https://api.datadoghq.eu/api/", // Default is https://api.datadoghq.com/api/
// Default labels which are appended to all metrics labels defaultLabels: (registry) => ({
namespace: registry.broker.namespace, nodeID: registry.broker.nodeID
]
}
Prometheus
enabled: true, reporter: [
{
namespace: registry.broker.namespace, nodeID: registry.broker.nodeID
})
};
StatsD
The StatsD reporter sends metric values to StatsD server via UDP.
type: "StatsD", options: {
// Server host host: "localhost",
]
}
Customer Reporter
Create custom metrics
stop() { /*...*/ } metricChanged() { /*...*/ }
}
Use custom metrics
new MyMetricsReporter(),
]
Supported Metric Types
Counter
increment(labels?: GenericObject, value?: number, timestamp?: number) set(value: number, labels?: GenericObject, timestamp?: number)
Gauge
A gauge is a metric that represents a single numerical value that can arbitrarily go up and down. Gauges are typically used for measured values like current memory usage, but also “counts” that can go up and down, like the number of concurrent requests. It can also provide 1-minute rate.
Histogram
Info
observe(value: number, labels?: GenericObject, timestamp?: number)
An info is a single string or number value like process arguments, hostname or version numbers. Info provides the following methods:
Built-in Internal Metrics
Process metrics
process.internal.active.handles (gauge) process.internal.active.requests (gauge) process.versions.node (info) process.gc.time (gauge) process.gc.total.time (gauge) process.gc.executed.total (gauge)
OS metrics
os.memory.free (gauge) os.memory.total (gauge) os.memory.used (gauge) os.uptime (gauge) os.type (info) os.release (info) os.hostname (info) os.arch (info) os.platform (info) os.user.uid (info) os.user.gid (info) os.user.username (info) os.user.homedir (info)
Moleculer metrics
Customizing
New metric registration
Create a counter
// posts.service.js module.exports = {
name: "posts",
}
},
description: "Number of requests of posts", unit: "request",
rate: true // calculate 1-minute rate
Create a gauge with labels
name: "posts",
actions: {
return posts;
},
return posts;
},
name: "posts.total", labelNames: ["userID"]
description: "Number of posts by user", unit: "post"
Create a histogram with buckets & quantiles
name: "posts",
actions: {
this.logger.debug("Post created. Elapsed time: ", duration, "ms"); return post;
}
name: "posts.creation.time", description: "Post creation time", unit: "millisecond", linearBuckets: {
start: 0,
};
},
Errors
Base error classes
MoleculerError The base error class. Parameters
Name Type Default Description message String Error message code Number 500 Error code type String Error type
Example
Error for retryable errors. It uses in broker.call. The broker retries requests if they rejected a MoleculerRetryableError.
Parameters
Name Type Default Description message String Error message code Number 500 Error code type String Error type
Example
Error for retryable server errors. Parameters are same as MoleculerRetryableError.
Internal error classes
Throw it if you call a not registered service action. Error code: 404
Retryable: true
Retryable: true
Type: SERVICE_NOT_AVAILABLE
Throw it if your nested call is skipped because the execution is timed out due to distributed timeout. Error code: 514
Retryable: true
Type: REQUEST_REJECTED
Validator throws it if the calling parameters are not valid. Error code: 422
Retryable: false
Type: MAX_CALL_LEVEL
BrokerOptionsError
Throw it if your broker options are not valid. Error code: 500
Retryable: false
Type: GRACEFUL_STOP_TIMEOUT
InvalidPacketDataError
Throw it if transporter receives unknown data. Error code: 500
Create custom errors
super(msg || `This is my business error.`, 500, "MY_BUSINESS_ERROR", data);
}
Preserve custom error classes while transferring between remote nodes
Public interface of Regenerator
Create custom regenerator
super(message, code, type, data); this.timestamp = timestamp;
}
return new TimestampedError(message, code, type, data, timestamp);
}
}
}
Use custom regenerator
errorRegenerator: new CustomRegenerator()
}
Moleculer Runner
Production-ready
Syntax
./node_modules/moleculer/bin/moleculer-runner.js --repl format.
Options
Option Type Default Description
-r, --repl Boolean false If true, it switches to REPL mode after broker started.
-E, --envfile
<file>
Example NPM scripts
"scripts": {
"dev": "moleculer-runner --repl --hot --config moleculer.dev.config.js
The dev script loads development configurations from the moleculer.dev.config.js file, start all services from the services folder, enable hot-reloading and switches to REPL mode. Run it with the npm run dev command.
The start script is to load the default moleculer.config.js file if it exists, otherwise only loads options from environment variables. Starts 4 instances of broker, then they start all services from the services folder. Run it with npm start command.
Configuration loading logic
Once a config file has been loaded, it merges options with the default options of the ServiceBroker.
The runner observes the options step by step and tries to overwrite them from environment variables.
Configuration file
Example config file
nodeID: "node-test", logger: true, logLevel: "debug",
transporter: "nats://localhost:4222", requestTimeout: 5 * 1000,
Asynchronous Configuration file
Moleculer Runner also supports asynchronous configuration files. In this case moleculer.config.js must export a Function that returns a Promise (or you can use async/await).
// moleculer.config.js
This function runs with the MoleculerRunner instance as the this context. Useful if you need to access the flags passed to the runner. Check the MoleculerRunner source more details.
Environment variables
The runner transforms the property names to uppercase. If nested, the runner concatenates names with _.
Example environment variables
Services loading logic
The runner loads service files or folders defined in CLI arguments. If you define folder(s), the runner loads all services **/*.service.js from specified one(s) (including sub-folders too). Services & service folder can be loaded with SERVICES and SERVICEDIR environment variables.
Loading steps:
If SERVICEDIR env found, but no SERVICES env, it loads all services from the SERVICEDIR directory.
Example
SERVICEDIR=services SERVICES=math,post,user
It loads the math.service.js, post.service.js and user.service.js files from the services folder.
Glob patterns
Explanations:
services - legacy mode. Load all services from the services folder with **/*.service.js file mask.
!services/others/**/*.service.js - skip all services in the services/others folder and sub-folders.
Built-in clustering
Clustered Node ID
.env files
Moleculer runner can load .env file at starting. There are two new cli options to load env file:
-e, --env - Load environment variables from the ‘.env’ file from the current folder.
Example
$ moleculer-runner --envfile .my-env
Dependencies
To use this feature, install the dotenv module with npm install dotenv --save command.
API Gateway
moleculer-web
Features
multiple body parsers (json, urlencoded) CORS headers
Rate limiter
Install
Usage
Run with default settings
This example uses API Gateway service with default settings.
You can access all services (including internal $node.) via http://localhost:3000/
Example URLs:
Call test.hello action: http://localhost:3000/test/hello
Call math.add action with params: http://localhost:3000/math/add?a=25&b=13
Whitelist
settings: {
routes: [{
// Access any actions in 'math' service
/^math\.\w+$/
Aliases
You can use alias names instead of action names. You can also specify the method. Otherwise it will handle every method types.
Using named parameters in aliases is possible. Named parameters are defined by prefixing a colon to the parameter name (:name).
// Call `auth.login` action with `GET /login` or `POST /login` "login": "auth.login",
// Restrict the request method "POST users": "users.create",
}
});
Aliases Action
settings: {
routes: [{
}]
}
routes: [{
aliases: {
});
To use this shorthand alias, create a service which has list, get, create, update and remove actions.
routes: [{
aliases: {
}
}
req.$ctx are pointed to request context.
req.$service & res.$service are pointed to this service instance.
Mapping policy
The route has a mappingPolicy property to handle routes without aliases.
Available options:
all - enable to request all routes with or without aliases (default)
mappingPolicy: "restrict", aliases: {
"POST add": "math.add"
You can’t request the /math.add or /math/add URLs, only POST /add.
File upload aliases
API Gateway has implemented file uploads. You can upload files as a multipart form data (thanks to busboy library) or as a raw request body. In both cases, the file is transferred to an action as a Stream. In multipart form data mode you can upload multiple files, as well.
Example
{
path: "",
type: "multipart",
// Action level busboy config busboyConfig: {
},
// Route level busboy config.
},
}
Multipart parameters
In order to access the files passed by multipart-form these specific fields can be used inside the action:
ctx.params is the Readable stream containing the file passed to the endpoint
Auto-alias
Use whitelist parameter to specify services that the Gateway should track and build the routes.
Example
// api.service.js module.exports = {
path: "/api",
whitelist: [
},
autoAliases: true
// posts.service.js module.exports = {
name: "posts", version: 2,
// of /api/v2/posts, you can uncomment this line.
},
},
get: {
rest: "POST /", handler(ctx) {}
},
rest: "DELETE /:id", handler(ctx) {}
}
The generated aliases
=> |
|
||
---|---|---|---|
|
=> | ||
=> |
|
||
|
=> | ||
=> |
|
||
|
=> |
name: "posts", version: 2,
settings: {
// Expose as "/tags" instead of "/api/v2/posts/tags" rest: {
method: "GET", fullPath: "/tags"
};
Parameters
API gateway collects parameters from URL querystring, request params & request body and merges them. The results is placed to the req.$params.
Disable merging
Example
}
});
// Querystring params query: {
category: "general",
// Request params params: {
id: 5
Query string parameters
Array parameters
Nested objects & arrays
URL: GET /api/opt-test?foo[bar]=a&foo[bar]=b&foo[baz]=c
foo: {
Middlewares
Examples
broker.createService({ mixins: [ApiService], settings: {
// Global middlewares. Applied to all routes. use: [
path: "/",
// Route-level middlewares. use: [
aliases: {
"GET /secret": [
}
]
const swMiddleware = swStats.getMiddleware(); broker.createService({
mixins: [ApiGatewayService], name: "gw-main",
},
routes: [
async started(this: Service): Promise<void> { this.addRoute({
path: "/",
Error-handler middleware
There is support to use error-handler middlewares in the API Gateway. So if you pass an Error to
the next(err) function, it will call error handler middlewares which have signature as (err, req, res, next).
routes: [
{
function(err, req, res, next) {
this.logger.error("Error is occured in middlewares!"); this.sendError(req, res, err);
Serve static files
settings: {
assets: {
});
Calling options
The route has a callOptions property which is passed to broker.call. So you can
callOptions: { timeout: 500,
retries: 3,
});
Multiple routes
You can create multiple routes with different prefix, whitelist, alias, calling options & authorization.
{
path: "/admin",
bodyParsers: { json: true
}
"posts.*",
"math.*",
]
}
Response type & status code
Available meta fields:
ctx.meta.$statusCode - set res.statusCode. ctx.meta.$statusMessage - set res.statusMessage. ctx.meta.$responseType - set Content-Type in header. ctx.meta.$responseHeaders - set all keys in header. ctx.meta.$location - set Location key in header for redirects.
Example
module.exports = { name: "export", actions: {
return csvFileStream;
}
}
}
Authorization
Example authorization
mixins: [ApiService],
settings: {
methods: {
// Second thing
// Check the token
if (token == "123456") {
}
} else {
}
}
Authentication
The returned value will be set to the ctx.meta.user property. You can use it in your actions to get the logged in user entity.
Example authentication
broker.createService({
}]
},
// valid credentials. It will be set to `ctx.meta.user`
return Promise.resolve({ id: 1, username: "john.doe", name: "John
} else {
// anonymous user
});
Route hooks
The route has before & after call hooks. You can use it to set ctx.meta, access req.headers or modify the response data.
path: "/",
onBeforeCall(ctx, route, req, res) {
}
}
Error handlers
You can add route-level & global-level custom error handlers.
In handlers, you must call the res.end. Otherwise, the request is unhandled.
res.setHeader("Content-Type", "application/json; charset=utf-8"); res.writeHead(500);
}
res.end("Global error: " + err.message);
}
Error formatter
reformatError(err) {
// Filter out the data from the error before sending it to the client return _.pick(err, ["name", "message", "code", "type", "data"]);
CORS headers
Usage
const svc = broker.createService({ mixins: [ApiService],
settings: {
// Configures the Access-Control-Expose-Headers CORS header. exposedHeaders: [],
// Configures the Access-Control-Allow-Credentials CORS header. credentials: false,
// Route CORS settings (overwrite global settings) cors: {
origin: ["http://localhost:3000", "https://localhost:4000"], methods: ["GET", "OPTIONS", "POST"],
});
Rate limiter
The Moleculer-Web has a built-in rate limiter with a memory store.
Usage
// Defaults to 60000 (1 min)
window: 60 * 1000,
}
}
Custom Store example
this.hits = new Map();
this.resetTime = Date.now() + clearPeriod;
/**
Increment the counter by key
let counter = this.hits.get(key) || 0; counter++;
this.hits.set(key, counter); return counter;
this.hits.clear();
}
ETag
// Service-level option etag: false,
routes: [
]
}
etag: (body) => generateHash(body)
}
Custom etag for streaming
ctx.meta.$responseType = "text/csv"; ctx.meta.$responseHeaders = {
"Content-Disposition": `attachment; filename="data-${ctx.params.id}.csv"`,
}
}
HTTP2 Server
Example
// HTTPS server with certificate https: {
key: fs.readFileSync("key.pem"), cert: fs.readFileSync("cert.pem")
ExpressJS middleware usage
You can use Moleculer-Web as a middleware in an ExpressJS application.
Usage
const svc = broker.createService({ mixins: [ApiService],
// Create Express application const app = express();
// Use ApiGateway as middleware app.use("/api", svc.express());
Full service settings
// Exposed port port: 3000,
// Exposed IP ip: "0.0.0.0",
// If false, it will start without server in middleware mode server: true,
// Exposed global path prefix path: "/api",
// Logging response data with 'debug' level logResponseData: "debug",
// Use HTTP2 server (experimental) http2: false,
// Path prefix to this route (full path: /api/admin ) path: "/admin",
// Whitelist of actions (array of string mask or regex) whitelist: [
// Merge parameters from querystring, request params & body mergeParams: true,
// Route-level middlewares use: [
},
mappingPolicy: "all",
},
{
/^math\.\w+$/
],
"GET greeter/:name": "test.greeter", "GET /": "test.hello",
"POST upload"(req, res) { this.parseUploadedFile(req, res);
json: false,
urlencoded: { extended: true }
fallbackResponse: "Static fallback response"
},
res.setHeader("X-Custom-Header", "123456"); return data;
},
],
// Folder to server assets (static files) assets: {
res.setHeader("Content-Type", "text/plain"); res.writeHead(err.code || 500); res.end("Global error: " + err.message);
}
Service Methods
Service method removes the route by path (this.removeRoute("/admin")).
Examples
serve static files from the assets folder whitelist
set the authorized user to Context.meta
simple server with RESTful aliases example posts service with CRUD actions
call action and send back the response via websocket send Moleculer events to the browser via websocket
metrics, statistics & validation from Moleculer custom error handlers
Hot-replacement
Babel, SASS, SCSS, Vue SFC
REPL console
moleculer repl
Install
Usage
// Switch to REPL mode broker.repl();
});
REPL Commands
cache Manage cache
call [options] <actionName> [jsonParams] [meta] Call an action
env List of environment variables
events [options] List of event listeners
metrics [options] List metrics
nodes [options] List of nodes
List nodes
Options
-a, --all list all (offline) nodes
Output
Detailed output
List services
-d, --details print endpoints
-f, --filter <match> filter services (e.g.: 'user*')
Detailed output
List actions
mol $ actions
-i, --skipinternal skip internal actions
-l, --local only local actions
List events
Options
-a, --all list all (offline) event listeners
Output
Show common information
List environment variables
mol $ env
Call an action
--help output usage information
--load [filename] Load params from file
Params will be { a: 5, b: 'Bob', c: true, d: false, e: { f: 'hello' } }
Call an action with params, meta & options
Params will be { a: 5, b: 'Bob', c: true, d: false, e: { f: 'hello' } }
Call with parameters from file
Call with file stream
mol $ call "math.add" --stream my-picture.jpg
It saved the response to the my-response.json file.
Direct call
Get health info from node-12 node
Emit an event
mol $ emit "user.created" --a 5 --b Bob --c --no-d --e.f "hello"
Params will be { a: 5, b: 'Bob', c: true, d: false, e: { f: 'hello' } }
Benchmark services
# Call service until 5 seconds (default) mol $ bench math.add
# Call service 5000 times
--time <seconds> Time of bench
--nodeID <nodeID> NodeID (direct call)
mol $ bench math.add '{ "a": 50, "b": 32 }'
Load a service from file
mol $ load "./math.service.js"
Load all services from a folder
List metrics
Output
Cache Keys
Cache Clear
You clear the cache with:
mol $ cache clear
Event listener
mol $ listener add user.created
Subscribe with group option
mol $ listener list
Custom commands
Custom REPL commands can be defined in broker options to extend Moleculer REPL commands.
description: "Call the greeter.hello service with name", alias: "hi",
options: [
boolean: ["u", "uppercase"]
},
}
}
Command Line Tool
moleculer-cli
Install
$ npm i -g moleculer-cli
Commands Init
The init command is used to scaffold a new Moleculer project.
Answers from file
Disable installing dependencies
You can disable the automatic NPM dependency installation with --no-install argument. It can be useful to generate project programmatically.
$ moleculer init project my-project --answers ./answers.json --no-install
nano - Minimal project template for one microservice. Use it if you want to create a microservice which connect to others via transporter
sample service (greeter)
Custom templates
$ moleculer init username/repo my-project
Local Templates
Template aliases
To simplify usage of custom templates (local and remote), it is possible to register an alias and use that afterwards instead of the whole repository url.
$ moleculer alias-template myAlias somegithubuser/reponame
Creating Custom Templates
meta.js
The meta.js file exports a function that returns an object defining the Moleculer CLI init interface. The function takes a parameter values that gives access to external values passed in from the CLI. The object has several keys which are explained below.
The filters object takes a set of keys matching a path and a value matching the name of a question variable. If the question variable’s value is false, the specified path will be ignored during the transformation and those files will not be added to the project being intialized.
The completeMessage property takes a multiline string that will be displayed after the initialization is completed.
Start
$ moleculer start
Options
--version Show version number [boolean]
--id NodeID [string] [default: null]
--hot, -h Enable hot-reload [boolean] [default: false]
Connect
$ moleculer connect
# Connect to NATS
$ moleculer connect mqtt://localhost
# Connect to AMQP
Call
--version Show version number [boolean]
--help Show help [boolean]
--hot, -h Enable hot-reload [boolean] [default: false]
--serializer Serializer [string] [default: null]
Options
--version Show version number [boolean]
--help Show help [boolean]
--level Logging level [string] [default: "silent"]
--id NodeID [string] [default: null]
Example with params
Example with params & meta
Example with post processing the result with jq
The transporter can be defined via TRANSPORTER environment variable, as well.
Example with transporter env var
TRANSPORTER=nats://localhost:42222 moleculer call math.add --@a 5 --@b 3
Emit
Options
--transporter, -t Transporter connection string (NATS, nats://127.0.0.1:4222,
...etc) [string] [default: null]
Example with params
Example with params & meta
Example with broadcast & groups
moleculer emit math.add --transporter NATS --broadcast --@id 3 --@name John --group accounts
Example with multi groups
moleculer emit math.add --transporter NATS --broadcast --@id 3 --@name John --group accounts --group mail
Example with transporter env var
Database Adapters
Database per service
Features
default CRUD actions cached actions pagination support
pluggable adapter (NeDB is the default memory adapter for testing & prototyping) official adapters for MongoDB, PostgreSQL, SQLite, MySQL, MSSQL.
Base Adapter
Only use this adapter for prototyping and testing. When you are ready to go into production simply swap to Mongo, Mongoose or Sequelize adapters as they all implement common Settings, Actions and Methods.
Install
$ npm install moleculer-db --save
Usage
name: "users",
// Mixin DB service into (current) 'users' service mixins: [DbService],
},
afterConnected() {
// Create a new user
.then(() => broker.call("users.create", { username: "john",
// List users with pagination
.then(() => broker.call("users.list", { page: 2, pageSize: 10 }).then(console.log));
// Delete a user
.then(() => broker.call("users.remove", { id: 2 }).then(console.log));
Settings
Property | Type | Default | Description |
---|---|---|---|
idField | String | required | Name of ID field. |
fields | Array.<String> | null | Field filtering list. It must be an Array. If the value is null or undefined doesn’t filter the fields of entities. |
populates | Array | null | Schema for population. Read more. |
pageSize | Number | required | Default page size in list action. |
maxPageSize | Number | required | Maximum page size in list action. |
maxLimit | Number | required | Maximum value of limit in find action. Default: -1 (no limit) |
entityValidator | Object, function | null | Validator schema or a function to validate the incoming entity in create & ‘insert’ actions. |
Actions
Find entities by query.
Parameters
Property Type Default Description
searchFields String required Fields for searching.
query Object required Query object. Passes to adapter.
Results
Parameters
query Object required Query object. Passes to adapter.
Results
Type: Number - Count of found entities.
Parameters
fields Array.<String> - Fields filter. page Number required Page number. pageSize Number required Size of a page. sort String required Sorted fields. search String required Search text.
searchFields String required Fields for searching.
Results
Parameters
Property Type Default Description
- - - -
Results
Parameters
Property Type Default Description entity Object - Entity to save. entities Array.<Object> - Entities to save.
Results
Type: Object, Array.<Object> - Saved entity(ies).
Property Type Default Description
populate Array.<String> - Field list for populate.
fields Array.<String> - Fields filter.
Results
After update, clear the cache & call lifecycle events.
Parameters
Property Type Default Description
Results
Remove an entity by ID.
Parameters
Property Type Default Description
Results
Methods
Get entity(ies) by ID(s).
Parameters
Property Type Default Description
Results
Clear cached entities
Parameters
Property Type Default Description
Results
Encode ID of entity.
Parameters
Property Type Default Description
Results Type: any
Parameters
id any required -
Results Type: any
_find
Parameters
limit Number required Max count of rows. offset Number required Count of skipped rows. sort String required Sorted fields.
search String required Search text.
Results
_count
Get count of entities by query.
Parameters
_list
List entities by filters and pagination results.
Parameters
pageSize Number required Size of a page.
sort String required Sorted fields. search String required Search text. searchFields String required Fields for searching.
Create a new entity.
Parameters
Property Type Default Description
Create many new entities.
Parameters
Property Type Default Description entity Object - Entity to save. entities Array.<Object> - Entities to save.
Results
Parameters
Property Type Default Description
id any, Array.<any> required ID(s) of entity. populate Array.<String> - Field list for populate. fields Array.<String> - Fields filter.
Results
Update an entity by ID.
After update, clear the cache & call lifecycle events.
Parameters
Remove an entity by ID.
Parameters
Data Manipulation
You can easily use Action hooks to modify (e.g. add timestamps, hash user’s passwords or remove sensitive info) before or after saving the data in DB.
Example of hooks adding a timestamp and removing sensitive data
Populating
The service allows you to easily populate fields from other services. For example: If you have an author field
const broker = new ServiceBroker();
broker.createService({
hooks: {
before: {
return ctx;
}
// Arrow function as a Hook
(ctx, res) => {
}
]
name: "John Doe",
mail: "john.doe@example.com",
// Call "create" action. Before hook will be triggered
.then(() => broker.call("db-with-hooks.create", user))
.catch(err => console.error(err));
Example of populate schema
broker.createService({
// Shorthand populate rule. Resolve the `voters` values with `users.get` action.
"voters": "users.get",
fields: "username fullName"
}
"reviewer": {
field: "reviewerId",
},
// Custom populator handler function
// https://github.com/moleculerjs/moleculer-db/blob/master/packages/moleculer-db/src/index.js#L636
return Promise.resolve(...);
// List posts with populated authors
broker.call("posts.find", { populate: ["author"]}).then(console.log);
Lifecycle entity events
},
entityCreated(json, ctx) {
this.logger.info(`Entity updated by '${ctx.meta.user.name}' user!`);
},
Extend with custom actions
Naturally you can extend this service with your custom actions.
const DbService = require("moleculer-db"); module.exports = {
actions: {
// Increment `votes` field by post ID vote(ctx) {
author: ctx.params.authorID
},
}
Mongo Adapter
This adapter is based on MongoDB.
Install
Dependencies
Usage
const MongoDBAdapter = require("moleculer-db-adapter-mongo"); const broker = new ServiceBroker();
// Create a Mongoose service for `post` entities broker.createService({
// Create a new post
.then(() => broker.call("posts.create", { title: "My first post",
Options
Example with connection URI
new MongoDBAdapter( "mongodb://localhost/moleculer-db" )
Example with connection URI & options
new MongoDBAdapter( "mongodb://db-server-hostname/my-db" , { keepAlive: 1
Mongoose Adapter
Install
Dependencies
To use this adapter you need to install MongoDB on you system.
Usage
"use strict";
name: "posts", mixins: [DbService],
adapter: new MongooseAdapter("mongodb://localhost/moleculer-demo"), model: mongoose.model("Post", mongoose.Schema({
broker.start()
// Create a new post
.then(() => broker.call("posts.find").then(console.log));
Options
Example with connection URI
new MongooseAdapter( "mongodb://localhost/moleculer-db" )
Example with URI and options
Connect to multiple DBs
Sequelize Adapter
Install
You have to install additional packages for your database server:
# For SQLite
$ npm install pg pg-hstore --save
# For MSSQL
Usage
const broker = new ServiceBroker();
// Create a Sequelize service for `post` entities broker.createService({
},
options: {
broker.start()
// Create a new post
.then(() => broker.call("posts.find").then(console.log));
Options
All constructor arguments are passed to the Sequelize constructor. Read more about Sequelize connection.