PlayScala 开发技巧

Stella981
• 阅读 433

1 如何实时同步MongoDB?

MongoDB 从 3.6 开始为开发者提供了 Change Streams 功能,利用 Change Streams 功能可以非常方便地监听指定 Collection 上的数据变化。例如在 mongo shell 中,我们可以通过如下方式监听 shopping 数据库 order 表上的变化:

watchCursor = db.getSiblingDB("shopping").order.watch() while (!watchCursor.isExhausted()){ if (watchCursor.hasNext()){ printjson(watchCursor.next()); } }

2 在Play中如何操作?

利用 Play Mongo 可以方便地实现数据监听功能,并且我们可以将 Change Stream 转换成 Akka Stream,然后以流的方式处理指定 Collection 上的数据变化,

mongo .collection[Order] .watch() .fullDocument .toSource .groupedWithin(10, 1000.millis) .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping) .runForeach{ seq => // ... }

上面的代码实现了以下几个功能:

  • 将从 Change Stream 接收到的元素进行缓冲,以方便批处理,当满足下面任意一个条件时便结束缓冲向后传递:
    • 缓冲满10个元素
    • 缓冲时间超过了1000毫秒
  • 对缓冲后的元素进行流控,每秒只允许通过1个元素

3 如何实现高可用?

上面的代码并没有考虑可用性,如果在监听过程中发生了网络错误,如何从错误中恢复呢? 上面的实现代码底层是基于官方的 mongo-java-driver 实现的,关于可用性官方文档有如下描述:

Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the MongoCollection API includes a new watch method. The ChangeStreamIterable sets up the change stream and automatically attempts to resume if it encounters a potentially recoverable error.

文档中提及程序可以自动从可恢复的错误中恢复。经测试验证,如果网络中断在 30 秒以内均属于可恢复错误;但是如果大于 30 秒,则会报连接超时错误并且无法从错误中自动恢复:

com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=127.0.0.1:27117, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}]     at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:401)     at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:309)     at com.mongodb.internal.connection.BaseCluster.access$800(BaseCluster.java:65)     at com.mongodb.internal.connection.BaseCluster$WaitQueueHandler.run(BaseCluster.java:482)     at java.lang.Thread.run(Thread.java:748)

幸运的是,Akka Stream 的 RestartSource 可以帮我们解决这种不可恢复错误,解决方式就是通过指数规避(exponential back-off)方式不断重试。下面是一个通用的创建 RestartSource 的方法实现:

def restartSource(colName: String): Source[ChangeStreamDocument[JsObject], _] = { RestartSource.withBackoff( minBackoff = 3.seconds, maxBackoff = 10.seconds, randomFactor = 0.2, maxRestarts = 1000000 ) { () ⇒ Logger.warn(s"Creating source for watching ${colName}.") mongo.collection(colName).watch().fullDocument.toSource } }

通过 Backoff 参数可以指定重试策略:

  • minBackoff 最小重试时间间隔
  • maxBackoff 最大重试时间间隔
  • randomFactor 设置一个随机的浮动因子,使得每次计算的间隔有些许差异
  • maxRestarts 最大重试次数

当发生错误时,RestartSource 会尝试重新创建一个 Source:

Logger.warn(s"Creating source for watching ${colName}.") mongo.collection(colName).watch().fullDocument.toSource

完整代码如下:

val colName = "common-user" restartSource(colName) .groupedWithin(10, 1000.millis) .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping) .runForeach{ seq => try { Logger.info(seq.toString())
} catch { case t: Throwable => Logger.error(s"Watch change stream of ${colName} error: ${t.getMessage}", t) } }

需要注意的是 runForeach 中需要显式捕获异常并处理,否则会导致 Source 结束并退出。

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
2年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Wesley13 Wesley13
2年前
PPDB:今晚老齐直播
【今晚老齐直播】今晚(本周三晚)20:0021:00小白开始“用”飞桨(https://www.oschina.net/action/visit/ad?id1185)由PPDE(飞桨(https://www.oschina.net/action/visit/ad?id1185)开发者专家计划)成员老齐,为深度学习小白指点迷津。
Stella981 Stella981
2年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Stella981 Stella981
2年前
MongoDB Sharding 分片集群配置理论
分片集群  MongoDB的分片集群由以下部分组成:_shard_(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fdocs.mongodb.com%2Fmanual%2Fcore%2Fshardedclustershards%2F): 用来存储数据,为这个分片
Stella981 Stella981
2年前
Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法
Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法参考文章:(1)Google地球出现“无法连接到登录服务器(错误代码:c00a0194)”解决方法(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.codeprj.com%2Fblo
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这