Node——微服务架构(一)

Stella981
• 阅读 718

new ServiceBroker

Node——微服务架构(一)

default settings

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker();

custom settings

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    logLevel: "info"
});

communicate with remote nodes

const { ServiceBroker } = require("moleculer");
const broker = new ServiceBroker({
    nodeID: "node-1",
    transporter: "nats://localhost:4222",
    logLevel: "debug",
    requestTimeout: 5 * 1000,
    requestRetry: 3
});

broker options

  • logLevel
    • type:string
    • default:info
    • des:可选项目还有 trace、debug、 info、 warn、 error、 fatal
  • middlewares
    • type:Array
    • default:null
    • des:中间件
  • created
    • type:Function
    • default:null
    • des:broker 实例被创建的时候将会触发此函数
  • started
    • type:Function
    • default:null
    • des:broker 实例开始执行时触发此函数
  • stopped
    • type:Function
    • default:null
    • des:broker 实例停止执行时触发此函数
  • hotReload
    • type:Boolean
    • default:false
    • des:是否启动热加载
  • cacher
    • type:String、Object、Cacher
    • default:null
    • des:若是启动缓存,两个相同模型的 broker.call,只有第一个 call 会让 action 中对应 handler 完整的执行一遍,第二个 call 就不会了,它会直接从缓存中取数据,不常用
    • https://moleculer.services/docs/0.13/caching.html
  • transporter
  • serializer
  • nodeID
    • type:string
    • default:hostname + PID
    • des:这是节点的id,挂载在 某一个 namespace 中是不能够同名的
  • namespace
    • type:string
    • defalut:”“
    • des:分割一个网咯中的不同区域,基本上用不到,除非项目特别复杂,子服务特别多
  • requestTimeout
    • type:Number
    • default:0
    • des:请求超时设置,单位毫秒

createService

该服务表示Moleculer框架中的一个微服务。您可以定义操作并订阅事件。若要创建服务,必须定义架构。服务模式类似于VueJS的一个组件

// 定义了两个actions
broker.createService({
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        },

        sub(ctx) {
            return Number(ctx.params.a) - Number(ctx.params.b);
        }
    }
});

name

  • 强制属性,最后去 call 某一个微服务的时候必须带上 name

version

settings

  • 此属性相当于仓库
    • 赋值可以是对象,对象中设置任意键值对,action 中通过 this.settings.xxxx 能够访问到设置项
    • 远程节点上可以获得这些设置项
    • 有一些内部设置是由核心模块使用的。这些设置名称以$(美元符号)开头
      • $noVersionPrefix
        • type:Boolean
        • default:false
        • des:禁用 action 版本前缀
      • $noServiceNamePrefix
        • type:Boolean
        • default:false
        • des:禁用 action 中的服务名称前缀。
      • $dependencyTimeout
        • type:Number
        • default:0
        • des:依赖等待超时
      • $shutdownTimeout
        • type:Number
        • default:0
        • des:关闭时等待活动请求的超时

mixins

Mixins是一种为Moleculer服务分发可重用功能的灵活方法。服务构造函数将这些混合与当前架构合并。它是在您的服务中扩展其他服务。当服务使用混音时,混音中的所有属性都将“混合”到当前服务中。

const ApiGwService = require("moleculer-web");

module.exports = {
    name: "api",
    mixins: [ApiGwService]
    settings: {
        // Change port setting
        port: 8080
    },
    actions: {
        myAction() {
            // Add a new action to apiGwService service
        }
    }
}

上面的示例创建了一个API服务,该服务继承了ApiGwService的所有内容,但是覆盖了端口设置,并使用新的myAction操作对其进行了扩展

actions

  • action 是服务中可调用的公共方法,broker.call 或 ctx.call,具体 action 必须在 action 中,可以是一个函数,可以是一个对象

events

  • 事件订阅

lifecycle events

  • 有一些生命周期服务事件,这些事件将由代理触发。它们被放置在模式的根中
    • created:broker.loadService 或者 broker.createService 触发
    • started:broker.start() 触发
    • stopped:broker.stop() 触发

methods

  • 创建私有方法,以供 action、event、lifecycle event 使用

dependencies

  • 如果您的服务依赖于其他服务,请使用架构中的依赖项属性。服务在调用已启动的生命周期事件处理程序之前等待依赖服务
  • 除了配做中添加 dependencies 属性,也可以用 broker 实例进行外部设置
    • broker.waitForServices(["posts", "users"]),返回以恶 promise 对象
    • broker.waitForServices("accounts", 10 * 1000, 500),设置超时事件和

metadata

  • 元数据属性,您可以在这里存储有关服务的任何元信息。在服务函数中可以访问到元数据
  • 元数据时可以被远程节点获取的

this

broker.createService

// 创建微服务实例方式之一
broker.createService({
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        }
    }
});

load service from file

math.service.js

// Export the schema of service
module.exports = {
    name: "math",
    actions: {
        add(ctx) {
            return Number(ctx.params.a) + Number(ctx.params.b);
        },
        sub(ctx) {
            return Number(ctx.params.a) - Number(ctx.params.b);
        }
    }
}


// Create broker
const broker = new ServiceBroker();

// Load service
broker.loadService("./math.service");

// Start broker
broker.start();

推荐使用这样的方式,一目了然,不会在一个文件写过多的代码

Load multiple services from a folder

如果您有很多服务,建议将它们放到一个服务文件夹中,并使用 Serge.loadService s方法加载所有这些服务

broker.loadServices(folder = "./services", fileMask = "**/*.service.js");


// 从 ./services 文件夹(包括子文件夹)加载每个 *.service.js 文件
broker.loadServices();
// 从当前文件夹(包括子文件夹)加载每个 *.service.js 文件
broker.loadServices("./");
// 从“./svc”文件夹加载每个用户*.service.js文件
broker.loadServices("./svc", "user*.service.js");

hot reloading services

Moleculer具有内置的热重加载功能.在开发期间,注意只针对 service.js 文件的修改被启动热重启,其他位置可以使用 nodemon

const broker = new ServiceBroker({
    hotReload: true
});

broker.loadService("./services/test.service.js");

Internal services

// 列出所有已知节点(包括本地节点)
broker.call("$node.list").then(res => console.log(res))

// 列出所有注册的服务(本地和远程)
broker.call("$node.services").then(res => console.log(res))

// 列出所有已注册 action(本地和远程)。
broker.call("$node.actions").then(res => console.log(res))

// 列出所有订阅的事件
broker.call("$node.events").then(res => console.log(res))

// 列出本地节点的健康信息(包括进程和OS信息)
broker.call("$node.health").then(res => console.log(res));

action

action 是服务的可调用的公共方法。action 调用表示远程过程调用(RPC)。它有请求参数&返回响应,就像HTTP请求一样。如果您有多个服务实例,代理将在实例之间负载平衡请求

Node——微服务架构(一)

call services

若要调用服务,请使用 broke.Call 方法。代理查找具有给定 action service (可能在某一个节点上)并调用它。调用之后将会返回一个承诺

const res = await broker.call(actionName, params, opts)
  • params:参数是作为上下文的一部分传递给 action,action service 可以通过 ctx.params 访问传递参数,这是可选的

  • ops:一个对象,用于设置或者覆盖某些请求参数,例如:timeout、retry Count,这是可选的

    • tiemout:请求超时,以毫秒为单位。如果请求超时,而您没有定义应急响应,将会报错。若要禁用设置0,请执行以下操作。如果未定义,将会启用 new ServiceBroker 中的 requestTimeout 设置
    • retries :请求重试次数,如果请求超时,代理将再次尝试调用。若要禁用设置0。如果没有定义,将启用 new ServiceBroker 中的配置
    • fallbackResponse :若请求失败就返回,这是一个 Function
    • nodeID:目标节点,如果设置,它将直接调用给定的节点
    • meta:请求元数据,通过操作处理程序中的 ctx.meta 访问它,它也将在嵌套调用中被传输和合并
    • parentCtx:父亲的上下文实例
    • requestID:请求ID或相关ID。它出现在标准事件中

    broker.call("user.recommendation", { limit: 5 }, { timeout: 500, retries: 3, fallbackResponse: defaultRecommendation }).then(res => console.log("Result: ", res));

meta

  • 元信息发送到具有元属性的服务,通过 action 处理程序中的ctx.meta访问它。请注意,在嵌套调用时,元被合并。

Streaming

  • Moleculer支持Node.js流作为请求参数和响应。使用它来传输从网关上传的文件,或者编码/解码或压缩/解压缩流

    const stream = fs.createReadStream(fileName);

    broker.call("storage.save", stream, { meta: { filename: "avatar-123.jpg" }});

  • 请注意,参数应该是一个流,您不能向参数中添加更多的变量。使用元属性传输其他数据。

  • 服务中接受流

    module.exports = { name: "storage", actions: { save(ctx) { const s = fs.createWriteStream(/tmp/${ctx.meta.filename}); ctx.params.pipe(s); } } };

  • 将流作为服务中的响应返回

    module.exports = { name: "storage", actions: { get: { params: { filename: "string" }, handler(ctx) { return fs.createReadStream(/tmp/${ctx.params.filename}); } } } };

  • 调用方接受流

    const filename = "avatar-123.jpg"; broker.call("storage.get", { filename }) .then(stream => { const s = fs.createWriteStream(./${filename}); stream.pipe(s); s.on("close", () => broker.logger.info("File has been received")); })

  • AES编解码示例服务

    const crypto = require("crypto"); const password = "moleculer";

    module.exports = { name: "aes", actions: { encrypt(ctx) { const encrypt = crypto.createCipher("aes-256-ctr", password); return ctx.params.pipe(encrypt); }, decrypt(ctx) { const decrypt = crypto.createDecipher("aes-256-ctr", password); return ctx.params.pipe(decrypt); } } };

action visibility

  • visibility:该属性控制 action service 是否可见、可调用

    • published:公共的 action,它可以在本地调用,也可以远程调用,并且可以通过API网关发布
    • pulic:公共的 action ,可以在本地或者远程调用,但不能通过APIGW发布
    • protected:只能在本地 action service 调用(从本地服务调用)
    • private:只能在内部调用(通过 this.actions.xy() 内部服务)
    • 不设置,默认是 null,也就是 published,公共的

    module.exports = { name: "posts", actions: { // It's published by default find(ctx) {}, clean: { // Callable only via this.actions.clean visibility: "private", handler(ctx) {} } }, methods: { cleanEntities() { // Call the action directly return this.actions.clean(); } } }

action hooks

  • 定义 action 钩子来包装来自混合器的某些 action

  • 有 before、after、error 钩子,将其分配给指定的 action 或者所有 action service (*)

  • 钩子可以是函数,也可以是字符串。字符串必须是本地服务方法名。

    const DbService = require("moleculer-db"); // before hook module.exports = { name: "posts", mixins: [DbService] hooks: { before: { // Define a global hook for all actions // The hook will call the resolveLoggedUser method. "*": "resolveLoggedUser", // Define multiple hooks remove: [ function isAuthenticated(ctx) { if (!ctx.user) throw new Error("Forbidden"); }, function isOwner(ctx) { if (!this.checkOwner(ctx.params.id, ctx.user.id)) throw new Error("Only owner can remove it."); } ] } }, methods: { async resolveLoggedUser(ctx) { if (ctx.meta.user) ctx.user = await ctx.call("users.get", { id: ctx.meta.user.id }); } } }

    const DbService = require("moleculer-db"); // after hook // error hook module.exports = { name: "users", mixins: [DbService] hooks: { after: { // Define a global hook for all actions to remove sensitive data "": function(ctx, res) { // Remove password delete res.password; // Please note, must return result (either the original or a new) return res; }, get: [ // Add a new virtual field to the entity async function (ctx, res) { res.friends = await ctx.call("friends.count", { query: { follower: res._id }}); return res; }, // Populate the referrer field async function (ctx, res) { if (res.referrer) res.referrer = await ctx.call("users.get", { id: res._id }); return res; } ] }, error: { // Global error handler "": function(ctx, err) { this.logger.error(Error occurred when '${ctx.action.name}' action was called, err); // Throw further the error throw err; } } } };

  • 推荐的用例是创建混合元素,用方法填充服务,并在钩子中设置方法名

    module.exports = { methods: { checkIsAuthenticated(ctx) { if (!ctx.meta.user) throw new Error("Unauthenticated"); }, checkUserRole(ctx) { if (ctx.action.role && ctx.meta.user.role != ctx.action.role) throw new Error("Forbidden"); }, checkOwner(ctx) { // Check the owner of entity } } }

    // Use mixin methods in hooks const MyAuthMixin = require("./my.mixin");

    module.exports = { name: "posts", mixins: [MyAuthMixin] hooks: { before: { "*": ["checkIsAuthenticated"], create: ["checkUserRole"], update: ["checkUserRole", "checkOwner"], remove: ["checkUserRole", "checkOwner"] } }, actions: { find: { // No required role handler(ctx) {} }, create: { role: "admin", handler(ctx) {} }, update: { role: "user", handler(ctx) {} } } };

context

  • 当你去 call 一个 action service,broker 就会创建一个上下文 context 实例,这个实例包含着所有的请求信息,最后这些信息都会被当做 action service 中 handler 的一个参数 ctx 进行传递使用

  • 在 handler 中可以点出的上下文信息(属性或者方法)

    • ctx.id:context id
    • ctx.broker:broker 对象实例
    • ctx.action:action 定义实例
    • ctx.nodeID:caller 或者 目标节点 id
    • ctx.requestID:请求ID,如果在 nested-calls 中使用,它将是相同的ID。
    • ctx.parentID:父亲上下文实例 id(在 nested-calls 中使用)
    • ctx.params:请求参数,也就是 broker.call 中第二个参数具体设置
    • ctx.meta:请求元数据,它将会传递到 nested-calls 中
    • ctx.level:请求等级(在 nested-calls 内部使用),第一层等级为1
    • ctx.call():在 nested-calls 中触发 action service,参数形式与 broker.call 一样
    • ctx.emit():emit an event,same as broker.emit
    • ctx.broadcast():Broadcast an event, same as broker.broadcast
  • 优雅地关闭服务,请在代理选项中启用上下文跟踪功能。如果启用它,所有服务都将在关闭之前等待所有正在运行的上下文

    • 一个超时值可以通过关闭Timeout Broker选项来定义。默认值为5秒
    • 在 action services 中,关闭超时设置可以通过 $Shupdown Timeout 属性重写

    const broker = new ServiceBroker({ nodeID: "node-1", tracking: { enabled: true, shutdownTimeout: 10 * 1000 } });

    broker.call("posts.find", {}, { tracking: false }) // 关闭追踪

event

  • Broker 有一个内置的事件总线来支持事件驱动体系结构,并将事件发送到本地和远程服务
  • 事件侦听器被排列成逻辑组,这意味着每个组中只触发一个侦听器
  • 例如你有两个主要服务 users、payments,这两个服务都订阅了 user.created 事件。此时,从 users 服务上注册 3 个具体实例,同时从 paymengs 服务上注册 2 个具体实例,当 emit 触发 user.created 事件,只有一个 user 和一个 payments 服务会被触发,效果如下

Node——微服务架构(一)

  • 组名来自服务名称,但可以在服务中的事件定义中覆盖它。

    module.exports = { name: "payment", events: { "order.created": { // Register handler to the "other" group instead of "payment" group. group: "other", handler(payload) { // ... } } } }

Emit balanced events

  • broker.emit 函数发送平衡的事件,第一个参数是事件的名称,第二个参数是传递的载荷,如果是复杂数据,可以传递一个对象

    // The user will be serialized to transportation. broker.emit("user.created", user);

  • 指定哪些组/服务接收事件

    // Only the mail & payments services receives it broker.emit("user.created", user, ["mail", "payments"]);

Broadcast event

  • 广播事件被发送到所有可用的本地和远程服务,它是不平衡的,所有服务实例都会收到它

Node——微服务架构(一)

  • 利用 broker.broadcast 发送广播

    broker.broadcast("config.changed", config);

  • 指定哪些组/服务接收事件

    // Send to all "mail" service instances broker.broadcast("user.created", { user }, "mail");

    // Send to all "user" & "purchase" service instances. broker.broadcast("user.created", { user }, ["user", "purchase"]);

Local broadcast event

  • Send broadcast events to only all local services with broker.broadcastLocal method

    broker.broadcastLocal("config.changed", config);

Subscribe to events

  • 通过 service 中的属性 event 可以订阅具体事件,在事件名称中可以使用通配符

    module.exports = { events: { // Subscribe to user.created event "user.created"(user) { console.log("User created:", user); }, // Subscribe to all user events "user.*"(user) { console.log("User event:", user); } // Subscribe to all internal events "$**"(payload, sender, event) { console.log(Event '${event}' received from ${sender} node:, payload); } } }

Internal events

  • broker broadcasts 广播内部事件,这些事件总是以$前缀开头
  • $services.changed
    • 如果本地节点或远程节点加载或破坏服务,代理将发送此事件
  • $circuit-breaker.opened
    • The broker sends this event when the circuit breaker module change its state to open
  • $circuit-breaker.half-opened
    • The broker sends this event when the circuit breaker module change its state to half-open.
  • $circuit-breaker.closed
    • The broker sends this event when the circuit breaker module change its state to closed.
  • $node.connected
    • The broker sends this event when a node connected or reconnected.
  • $node.updated
    • The broker sends this event when it has received an INFO message from a node, (i.e. a service is loaded or destroyed).
  • $node.disconnected
    • The broker sends this event when a node disconnected (gracefully or unexpectedly).
  • $broker.started
    • The broker sends this event once broker.start() is called and all local services are started.
  • $broker.stopped
    • The broker sends this event once broker.stop() is called and all local services are stopped.
  • $transporter.connected
    • The transporter sends this event once the transporter is connected.
  • $transporter.disconnected
    • The transporter sends this event once the transporter is disconnected.

lifecycle

Broker lifecycle

  • starting logic
    • broker 启动传输连接,但是不会将本地服务列表发送到远程节点
    • 完成后,broker 将启动所有服务(call service started handler)
    • 一旦所有服务启动成功,broker 就会将本地服务列表发布到远程节点上
    • 因此,远程节点只有在所有本地服务正确启动之后才能发送请求

Node——微服务架构(一)

  • avoid deadlocks
    • broker start...
    • user service has dependencies: ["post"]
    • posts service has dependencies: ["users"]
    • 这就死锁了,按照顺序加载,user 永元无法加载到依赖项 post
  • stopping logic
    • call broker.stop 或者停止进程
    • 首先,broker 会向远程节点发送一个空的服务列表,所以他们可以将请求路由到其他实例而不是停止服务
    • 之后,broker 开始停止所有本地服务,之后 transporter 断开连接

Node——微服务架构(一)

Service lifecycle

  • created event handler

    • broker.createService or broker.loadService 会触发此事件
    • 函数内部拿到 broker 实例(this),还可以创建其他模块实例,例如 http 服务器、数据库模块

    const http = require("http");

    module.exports = { name: "www", created() { // Create HTTP server this.server = http.createServer(this.httpHandler); } }; // created function is sync event handler,can not use async/await

  • started event handler

    • 它被触发的时候,代理会启动所有的本地服务,而 broker 会启动所有的本地服务。使用它连接到数据库,侦听服务器…等

    module.exports = { name: "users", async started() { try { await this.db.connect(); } catch(e) { throw new MoleculerServerError("Unable to connect to database.", e.message); } } }; // started function is async handler. you can use async/await

  • stopped event handler

    • 它被触发的时候,broker.stop 被调用和 broker 开始停止所有的本地服务。使用它关闭数据库连接,关闭套接字…等

    module.exports = { name: "users", async stopped() { try { await this.db.disconnect(); } catch(e) { this.logger.warn("Unable to stop database connection gracefully.", e); } } }; // stopped function is async handler. you can use async/await

logging

  • 在Moleculer框架中,所有核心模块都有一个自定义记录器实例。它们是从Broker记录器实例继承的,该实例可以在Broker选项中进行配置。

Built-in logger

  • Moleculer有一个内置控制台记录器。这是默认的记录器

    const { ServiceBroker } = require("moleculer"); const broker = new ServiceBroker({ nodeID: "node-100", // logger: true, logLevel: "info" });

    broker.createService({ name: "posts", actions: { get(ctx) { this.logger.info("Log message via Service logger"); } } });

    broker.start() .then(() => broker.call("posts.get")) .then(() => broker.logger.info("Log message via Broker logger")); [2018-06-26T11:38:06.728Z] INFO node-100/POSTS: Log message via Service logger [2018-06-26T11:38:06.728Z] INFO node-100/BROKER: Log message via Broker logger [2018-06-26T11:38:06.730Z] INFO node-100/BROKER: ServiceBroker is stopped. Good bye.

  • 可以使用Broker选项中的logLevel选项更改日志级别。只与内置控制台记录器一起使用

    const broker = new ServiceBroker({ logger: true, // the true is same as console logLevel: "warn" // only logs the 'warn' & 'error' entries to the console });

  • Available log levels: fatalerrorwarninfodebugtrace

  • 可以为每个Moleculer模块设置日志级别。允许通配符使用

    const broker = new ServiceBroker({ logLevel: { "MY.": false, // Disable log "TRANS": "warn", // Only 'warn ' and 'error' log entries "*.GREETER": "debug", // All log entries "": "info", // All other modules use this level } }); // 此设置是从上到下计算的,因此*级别必须是最后一项。

  • 有一些内置的日志格式化程序

    • default:[2018-06-26T13:36:05.761Z] INFO node-100/BROKER: Message
    • simple:INFO - Message
    • short:[13:36:30.968Z] INFO BROKER: Message
  • 可以为内置控制台记录器设置自定义日志格式化程序函数

    const broker = new ServiceBroker({ logFormatter(level, args, bindings) { return level.toUpperCase() + " " + bindings.nodeID + ": " + args.join(" "); } }); broker.logger.warn("Warn message"); broker.logger.error("Error message");

    WARN dev-pc: Warn message ERROR dev-pc: Error message

  • 自定义对象&数组打印格式化程序

    • 设置一个自定义格式化程序函数来打印对象和数组。默认函数将对象和数组打印到一行,以便便于使用外部日志工具进行处理。但是,当您正在开发时,将对象打印成人类可读的多行格式将是有用的。为此,在代理选项中覆盖logObjectPrint函数。

    const util = require("util");

    const broker = new ServiceBroker({
    logObjectPrinter: o => util.inspect(o, { depth: 4, breakLength: 100 }) }); broker.logger.warn(process.release);

    [2017-08-18T12:37:25.720Z] INFO dev-pc/BROKER: { name: 'node', lts: 'Carbon', sourceUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0.tar.gz', headersUrl: 'https://nodejs.org/download/release/v8.10.0/node-v8.10.0-headers.tar.gz' }

External loggers

  • 外部记录器可以与Moleculer一起使用。在这种情况下,将创建者函数设置为LOGER。当一个新模块继承一个新的记录器实例时,ServiceBroker将调用它

    // pino const pino = require("pino")({ level: "info" }); const broker = new ServiceBroker({ logger: bindings => pino.child(bindings) });

    // bunyan const bunyan = require("bunyan"); const logger = bunyan.createLogger({ name: "moleculer", level: "info" }); const broker = new ServiceBroker({ logger: bindings => logger.child(bindings) });

middlewares

networking

要通信其他节点(ServiceBrokers),您需要配置一个传输程序。大多数传输者连接到中心消息代理服务器,该服务器负责节点之间的消息传输。这些消息代理主要支持发布/订阅消息传递模式

Node——微服务架构(一)

Transporters

如果要在多个节点上运行服务,传输程序是一个重要的模块。传送器与其他节点通信。它传输事件、调用请求和处理响应… 如果一个服务在不同节点上的多个实例上运行,则请求将在活动节点之间实现负载平衡

整个通信逻辑是在传输类之外的。这意味着在不改变代码行的情况下,在传送器之间切换是很容易的。

Moleculer框架中有几个内置的运输机。

NATS

NATS服务器是一个简单、高性能的开源消息传递系统,用于云本机应用程序、物联网消息传递和微服务体系结构。

let { ServiceBroker } = require("moleculer");

const broker = new ServiceBroker({
    nodeID: "server-1",
    transporter: "nats://nats.server:4222"
});

使用 nats 传输需要安装 nats 模块 npm install nats

// Connect to 'nats://localhost:4222'
const broker = new ServiceBroker({
    transporter: "NATS"
});

// Connect to a remote NATS server
const broker = new ServiceBroker({
    transporter: "nats://nats-server:4222"
});

// Connect with options
const broker = new ServiceBroker({
    transporter: {
        type: "NATS",
        options: {
            url: "nats://localhost:4222"
            user: "admin",
            pass: "1234"
        }
    }
});

// Connect with TLS
const broker = new ServiceBroker({
    transporter: {
        type: "NATS",
        options: {
            url: "nats://localhost:4222"
            // More info: https://github.com/nats-io/node-nats#tls
            tls: {
                key: fs.readFileSync('./client-key.pem'),
                cert: fs.readFileSync('./client-cert.pem'),
                ca: [ fs.readFileSync('./ca.pem') ]
            }
        }
    }
});

Serialization

传输程序需要一个序列化模块来序列化和反序列化传输的数据包。默认的串行化程序是JSONS序列化程序,但是有几个内置的串行化程序。

const { ServiceBroker } = require("moleculer");

const broker = new ServiceBroker({
    nodeID: "server-1",
    transporter: "NATS",
    serializer: "ProtoBuf"
});
  • JSON serializer:这是内置的默认序列化程序。它将数据包序列化为JSON字符串,并将接收到的数据反序列化为数据包。

    const broker = new ServiceBroker({ // serializer: "JSON" // don't need to set, because it is the default });

Load balancing

Built-in strategies

若要配置策略,请在注册表属性下设置策略代理选项。它可以是一个名称(在内置策略的情况下),也可以是一个策略类(在自定义策略的情况下)。

Random strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "Random"
    }
});

RoundRobin strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "RoundRobin"
    }
});

CPU usage-based strategy

const broker = new ServiceBroker({
    registry: {
        strategy: "CpuUsage"
    }
});

Fault tolerance

Circuit Breaker

  • Moleculer有一个内置的断路器解决方案.这是一个基于阈值的实现。它使用一个时间窗口来检查失败的请求率。一旦达到阈值,它就会触发断路器。

  • 电路断路器可以防止应用程序重复尝试执行可能失败的操作。允许它继续,而不等待故障被修复或浪费CPU周期,而它确定故障是长期的。断路器模式还允许应用程序检测故障是否已经解决。如果问题似乎已经解决,应用程序可以尝试调用操作。

  • 如果启用它,所有服务调用都将受到此内置断路器的保护。

  • 在代理选项中启用它

    const broker = new ServiceBroker({ circuitBreaker: { enabled: true, threshold: 0.5, minRequestCount: 20, windowTime: 60, // in seconds halfOpenTime: 5 * 1000, // in milliseconds check: err => err && err.code >= 500 } });

  • settings

    • enabled:是否启动此功能,默认是 false
    • threshold:阈值,默认0.5,意味着50%的跳闸失败
    • minRequestCount:最小请求数,默认20,在它下面,回调函数不会触发
    • windowTime:时间窗口的秒数,默认60秒
    • halfOpenTime:从打开状态切换到半打开状态的毫秒数,默认10000毫秒
    • check:检查失败请求的函数,默认 err && err.code >= 500
  • 如果断路器状态发生更改,ServiceBroker将发送内部事件

  • 这些全局选项也可以在操作定义中重写。

    // users.service.js module.export = { name: "users", actions: { create: { circuitBreaker: { // All CB options can be overwritten from broker options. threshold: 0.3, windowTime: 30 }, handler(ctx) {} } } };

Retry

  • 重试解决方案

    const broker = new ServiceBroker({ retryPolicy: { enabled: true, retries: 5, delay: 100, maxDelay: 2000, factor: 2, check: err => err && !!err.retryable } });

  • settings

    • enabled:是否启用,默认 false
    • retries:重试的次数,默认5次
    • delay:第一次延迟以毫秒为单位,默认 100
    • maxDelay:最大延迟(以毫秒为单位),默认 2000
    • factor:延迟退避系数,默认是 2,表示指数退避
    • check:检查失败请求的函数,err && !!err.retryable
  • 在调用选项中覆盖retry值

    broker.call("posts.find", {}, { retries: 3 });

  • 在操作定义中覆盖重试策略值

    // users.service.js module.export = { name: "users", actions: { find: { retryPolicy: { // All Retry policy options can be overwritten from broker options. retries: 3, delay: 500 }, handler(ctx) {} }, create: { retryPolicy: { // Disable retries for this action enabled: false }, handler(ctx) {} } } };

Timeout

  • 可以为服务调用设置超时。它可以在代理选项或调用选项中全局设置。如果定义了超时并且请求超时,代理将抛出RequestTimeoutError错误。

    const broker = new ServiceBroker({ requestTimeout: 5 * 1000 // in seconds });

  • 覆盖调用选项中的超时值

    broker.call("posts.find", {}, { timeout: 3000 });

  • 分布式超时:Moleculer使用分布式超时。在嵌套调用的情况下,超时值会随着时间的推移而递减。如果超时值小于或等于0,则跳过下一个嵌套调用(RequestSkippedError),因为第一个调用已被RequestTimeoutError错误拒绝。

Bulkhead

  • 在Moleculer框架中实现了舱壁特性,以控制动作的并发请求处理。

    const broker = new ServiceBroker({ bulkhead: { enabled: true, concurrency: 3, maxQueueSize: 10, } });

  • settings

    • enabled:是否启动,默认 false

    • concurreny:最大限度的并行数量,默认3

    • maxQueueSize:最大队列大小,默认10

    • concurreny 值限制并发请求执行

    • 如果 maxQueueSize大于0,则如果所有插槽都被占用,则 broker 将额外的请求存储在队列中

    • 如果队列大小达到maxQueueSize限制或为0,则 Broker 将对每个添加请求抛出QueueIsFull异常

  • 这些全局选项也可以在操作定义中重写

    // users.service.js // 在操作定义中覆盖重试策略值 module.export = { name: "users", actions: { find: { bulkhead: { enabled: false }, handler(ctx) {} }, create: { bulkhead: { // Increment the concurrency value // for this action concurrency: 10 }, handler(ctx) {} } } };

Fallback

  • 当您不想将错误返回给用户时,回退功能是非常有用的。相反,调用其他操作或返回一些常见的内容。可以在调用选项或操作定义中设置回退响应。

  • 它应该是一个返回包含任何内容的承诺的函数。borker 将当前 context&Error对象作为参数传递给此函数。

    // fallback settings in calling options const result = await broker.call("users.recommendation", { userID: 5 }, { timeout: 500, fallbackResponse(ctx, err) { // Return a common response from cache return broker.cacher.get("users.fallbackRecommendation:" + ctx.params.userID); } });

  • 回退响应也可以在接收端,在 action 中定义

  • 请注意,只有在action 处理程序中发生错误时,才会使用此回退响应。如果从远程节点调用请求,并且请求在远程节点上超时,则不使用回退响应。在这种情况下,在调用选项中使用回退响应。

    // fallback as a function module.exports = { name: "recommends", actions: { add: { fallback: (ctx, err) => "Some cached result", //fallback: "fakeResult", handler(ctx) { // Do something } } } };

    // fallback as method name module.exports = { name: "recommends", actions: { add: { // Call the 'getCachedResult' method when error occurred fallback: "getCachedResult", handler(ctx) { // Do something } } }, methods: { getCachedResult(ctx, err) { return "Some cached result"; } } };

NATS

帮助文档

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
Java获得今日零时零分零秒的时间(Date型)
publicDatezeroTime()throwsParseException{    DatetimenewDate();    SimpleDateFormatsimpnewSimpleDateFormat("yyyyMMdd00:00:00");    SimpleDateFormatsimp2newS
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这