[译]介绍 `core.async` 核心的一些概念

袭人
• 阅读 4833

源文档是 core.async 仓库的一个代码文件, 包含大量的教程性质的注释
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
中间不确定的两句留了原文, 有读懂的同学请回复帮我纠正


这份攻略介绍 core.async 核心的一些概念

clojure.core.async namespace 包含了公开的 API.

(require '[clojure.core.async :as async :refer :all])

Channel

数据通过类似队列的 Channel 来传输, Channel 默认不进行 buffer(长度为 0)
需要生产者和消费者进行约定从而在 Channel 当中传送数据

chan 可以创建一个不进行 buffer 的 Channel:

(chan)

传一个数字以创建限定了 buffer 大小的 Channel:

(chan 10)

close! 用来关闭 Channel 终结接受消息传入, 已存在的数据依然可以取出
取尽的 Channel 在取值时返回 nil, nil 是不能直接通过 Channel 发送的!

(let [c (chan)]
  (close! c))

一般的 Thread

对在一般的 Thread 中, 使用 >!!(阻塞的 put) 和 <!!(阻塞的 take)
与 Channel 进行通信

(let [c (chan 10)]
  (>!! c "hello")
  (assert (= "hello" (<!! c)))
  (close! c))

由于是这些调用是阻塞的, 如果尝试把数据放进没有 buffer 的 Channel, 那么整个 Thread 都会被卡住.
所以需要 thread(好比 future) 在线程池当中执行代码主体, 并且通过 Channel 传回数据
例子中启动了一个后台任务把 "hello" 放进 Channel, 然后在主线程读取数据

(let [c (chan)]
  (thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (close! c))

go 代码块和反转控制(IoC) thread

go 是一个宏, 能把它的 body 在特殊的线程池里异步执行
不同的是本来会阻塞的 Channel 操作会暂停, 不会有线程被阻塞
这套机制封装了事件/回调系统当中需要外部代码的反转控制
go block 内部, 我们使用 >!(put) 和 <!(take)

这里把前面 Channel 的例子转化成 go block:

(let [c (chan)]
  (go (>! c "hello"))
  (assert (= "hello" (<!! (go (<! c)))))
  (close! c))

这里使用了 go block 来模拟生产者, 而不是直接用 Thread 和阻塞调用
消费者用 go block 进行获取, 返回 Channel 作为结果, 对这个 Channel 做阻塞的读取
(原文: The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.)

选择性(alts)

Channel 对比队列一个啥手机应用是能够同时等待多个 Channel(像是 socket select)
通过 alts!!(一般 thread)或者 alts!(用于 go block)

可以通过 alts 创建后台线程讲两个任意的 Channel 结合到一起
alts!! 获取集合中某个操作的来执行
或者是可以 take 的 Channel, 或者是可以 put [channel value] 的 Channel
并返回包含具体的值(对于 put 返回 nil)以及获取成功的 Channel:
(原文: alts!! takes either a set of operations to perform either a channel to take from a [channel value] to put and returns the value (nil for put) and channel that succeeded:)

(let [c1 (chan)
      c2 (chan)]
  (thread (while true
            (let [[v ch] (alts!! [c1 c2])]
              (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

打印内容(在 stdout, 可能你的 REPL 当中看不到):
#<ManyToManyChannel ...> 读取 hi
#<ManyToManyChannel ...> 读取 there

使用 alts! 来做和 go block 一样的事情:

(let [c1 (chan)
      c2 (chan)]
  (go (while true
        (let [[v ch] (alts! [c1 c2])]
          (println "Read" v "from" ch))))
  (go (>! c1 "hi"))
  (go (>! c2 "there")))

因为 go block 是轻量级的进程而而不是限于 thread, 可以同时有大量的实例
这里创建 1000 个 go block 在 1000 个 Channel 里同时发送 hi
它们妥当时用 alts!! 来读取

(let [n 1000
      cs (repeatedly n chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeout 创建 Channel 并等待设定的毫秒时间, 然后关闭:

(let [t (timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

可以结合 timeoutalts 来做有时限的 Channel 等待
这里是花 100ms 等待数据到达 Channel, 没有的话放弃:

(let [c (chan)
      begin (System/currentTimeMillis)]
  (alts!! [c (timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))

ALT

todo

其他 Buffer

Channel 可以定制不同的策略来处理 Buffer 填满的情况
这套 API 中提供了两个实用的例子.

使用 dropping-buffer 控制当 buffer 填满时丢弃最新鲜的值:

(chan (dropping-buffer 10))

使用 sliding-buffer 控制当 buffer 填满时丢弃最久远的值:

(chan (sliding-buffer 10))
点赞
收藏
评论区
推荐文章
blmius blmius
4年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Wesley13 Wesley13
4年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Jacquelyn38 Jacquelyn38
4年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Peter20 Peter20
4年前
mysql中like用法
like的通配符有两种%(百分号):代表零个、一个或者多个字符。\(下划线):代表一个数字或者字符。1\.name以"李"开头wherenamelike'李%'2\.name中包含"云",“云”可以在任何位置wherenamelike'%云%'3\.第二个和第三个字符是0的值wheresalarylike'\00%'4\
Wesley13 Wesley13
4年前
FLV文件格式
1.        FLV文件对齐方式FLV文件以大端对齐方式存放多字节整型。如存放数字无符号16位的数字300(0x012C),那么在FLV文件中存放的顺序是:|0x01|0x2C|。如果是无符号32位数字300(0x0000012C),那么在FLV文件中的存放顺序是:|0x00|0x00|0x00|0x01|0x2C。2.  
Stella981 Stella981
4年前
Python之time模块的时间戳、时间字符串格式化与转换
Python处理时间和时间戳的内置模块就有time,和datetime两个,本文先说time模块。关于时间戳的几个概念时间戳,根据1970年1月1日00:00:00开始按秒计算的偏移量。时间元组(struct_time),包含9个元素。 time.struct_time(tm_y
Wesley13 Wesley13
4年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
4年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Python进阶者 Python进阶者
2年前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这