9.3 CSP并发机制
peter 166 3

CSP控制机制

Communicating sequential processes

依赖通道来实现进程间的通信

Actor Model

image-20210125201231737

CSP vs. Actor

  • 和 Actor 的直接通讯不同,CSP 模式则是通过 Channel 进行通讯的,更松耦合一些。
  • Go 中 channel 是有容量限制并且独立于处理 Groutine,而如Erlang,Actor 模式中的 mailbox 容量是无限的,接收进程也总是被动地处理消息
image-20210125201518384

Note: 这个不就是操作系统里的命名管道嘛

Channel 机制

image-20210125201821800

左边是普通的 channel,右边的是 buffer channel。

异步返回

首先我们看一下 Java 里的异步返回机制

image-20210125203703024

package csp

import (
    "fmt"
    "testing"
    "time"
)

func service() string {
    time.Sleep(time.Millisecond * 50)
    return "Done"
}

func otherTask() {
    fmt.Println("working on something else.")
    time.Sleep(time.Millisecond * 100)
    fmt.Println("Task is done.")
}

// 使用这种方法是完全串行的,最终耗时 0.16s
func TestService(t *testing.T)  {
    fmt.Println(service())
    otherTask()
}

// 对上面的 service 进行了包装
func AsyncService() chan string {
    // 声明一个 channel,指定 channel 的类型,此处指定 string,那么该 channel 只能放 string 类型的数据
    retCh := make(chan string)
    // 被调用时,启动另外一个协程去运行,而不阻塞当前的协程
    go func() {
        ret := service()
        fmt.Println("return result.")
        // service() 运行完毕之后,可以把结果放在 channel 里
        // retCh <- ret 将 ret 数据放到 channel 里
        retCh <- ret
        fmt.Println("service exited.")
    }()
    // 最终返回 channel,这样在外部调用时会阻塞在这里,直到 channel 返回结果
    return retCh
}

func TestAsyncService(t *testing.T)  {
    retCh := AsyncService()
    otherTask()
    // <-retCh 从 channel 里取数据
    fmt.Println(<-retCh)
}

最后一个函数,输出结果为:

image-20210126110243221

上述代码存在一个问题:当 service() 已经计算完成了,并将结果返回给 channel,应当可以退出做下一个请求的处理,而不是必须等客户端一定要把 channel 里的数据取出,才可以做下一个处理。否则这个协程会被阻塞。解决的办法是

// 将 channel 变成容量为 1 的 buffer channel
retCh := make(chan string, 1)

函数运行结果为:

image-20210126113517997
预览图
评论区

索引目录