基于内存通信的gRPC调用

算法算
• 阅读 2323

Apache Dubbo 有injvm方式的通信,能够避免网络带来的延迟,同时也不占用本地端口,对测试、本地验证而言,是一种比较方便的RPC通信方式。

最近看到 containerd 的代码,发现它也有类似的需求。
但使用ip端口通信,有可能会有端口冲突;使用unix socket,可能会有路径冲突。
考察了下gRPC有没有和injvm类似的,基于内存的通信方式。后来发现pipe非常好用,所以记录了下。

Golang/gRPC对网络的抽象

首先,我们先看一下gRPC一次调用的架构图。当然,这个架构图目前只关注了网络抽象分布。

基于内存通信的gRPC调用

我们重点关注网络部分。

操作系统系统抽象

首先,在网络包之上,系统抽象出来了socket,代表一条虚拟连接,对于UDP,这个虚拟连接是不可靠的,对于TCP,这个链接是尽力可靠的。

对于网络编程而言,仅仅有连接是不够的,还需要告诉开发者如何创建、关闭连接。
对于服务端,系统提供了accept方法,用来接收连接。
对于客户端,系统提供了connect方法,用于和服务端建立连接。

Golang抽象

在Golang中,socket对等的概念叫net.Conn,代表了一条虚拟连接。

接下来,对于服务端,accept这个行为被包装成了net.Listener接口;对于客户端,Golang则基于connect提供了net.Dial方法

type Listener interface {
  // 接收来自客户端的网络连接
  Accept() (Conn, error)
  Close() error
  Addr() Addr
}

gRPC使用

那么gRPC是怎么使用Listener和Dial的呢?

对于gRPC服务端,Serve方法接收一个Listener,表示在这个Listener上提供服务。

对于gRPC客户端,网络本质上就是一个能够连接到某个地方的东西就可以,所以只需要一个dialer func(context.Context, string) (net.Conn, error)函数就行了。

什么是pipe

在操作系统层面,pipe表示一个数据管道,而这个管道两端都在本程序中,可以很好的满足我们的要求:基于内存的网络通信。

Golang也基于pipe提供了net.Pipe()函数创建了一个双向的、基于内存通信的管道,在能力上,能够很好的满足gRPC对底层通信的要求。

但是net.Pipe仅仅产生了两个net.Conn,即只产生两个网络连接,没有之前提到的Listner,也没有Dial方法。

于是结合Golang的channel,把net.Pipe包装成了Listner,也提供了Dial方法:

  1. Listener.Accept(),只需要监听一个channel,客户端连接过来的时候,把连接通过channel传递过来即可
  2. Dial方法,调用Pipe,将一端通过channel给服务端(作为服务端连接),另一端作为客户端连接

代码如下:

package main

import (
  "context"
  "errors"
  "net"
  "sync"
  "sync/atomic"
)

var ErrPipeListenerClosed = errors.New(`pipe listener already closed`)

type PipeListener struct {
  ch    chan net.Conn
  close chan struct{}
  done  uint32
  m     sync.Mutex
}

func ListenPipe() *PipeListener {
  return &PipeListener{
    ch:    make(chan net.Conn),
    close: make(chan struct{}),
  }
}

// Accept 等待客户端连接
func (l *PipeListener) Accept() (c net.Conn, e error) {
  select {
  case c = <-l.ch:
  case <-l.close:
    e = ErrPipeListenerClosed
  }
  return
}

// Close 关闭 listener.
func (l *PipeListener) Close() (e error) {
  if atomic.LoadUint32(&l.done) == 0 {
    l.m.Lock()
    defer l.m.Unlock()
    if l.done == 0 {
      defer atomic.StoreUint32(&l.done, 1)
      close(l.close)
      return
    }
  }
  e = ErrPipeListenerClosed
  return
}

// Addr 返回 listener 的地址
func (l *PipeListener) Addr() net.Addr {
  return pipeAddr(0)
}
func (l *PipeListener) Dial(network, addr string) (net.Conn, error) {
  return l.DialContext(context.Background(), network, addr)
}
func (l *PipeListener) DialContext(ctx context.Context, network, addr string) (conn net.Conn, e error) {
  // PipeListener是否已经关闭
  if atomic.LoadUint32(&l.done) != 0 {
    e = ErrPipeListenerClosed
    return
  }

  // 创建pipe
  c0, c1 := net.Pipe()
  // 等待连接传递到服务端接收
  select {
  case <-ctx.Done():
    e = ctx.Err()
  case l.ch <- c0:
    conn = c1
  case <-l.close:
    c0.Close()
    c1.Close()
    e = ErrPipeListenerClosed
  }
  return
}

type pipeAddr int

func (pipeAddr) Network() string {
  return `pipe`
}
func (pipeAddr) String() string {
  return `pipe`
}

如何用pipe作为gRPC的connection

有了上面的包装,我们就可以基于此创建一个gRPC的服务器端和客户端,来进行基于内存的RPC通信了。

首先,我们简单的创建一个服务,包含了四种调用方式:

syntax = "proto3";

option go_package = "google.golang.org/grpc/examples/helloworld/helloworld";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";

package helloworld;

// The greeting service definition.
service Greeter {
  // unary调用
  rpc SayHello(HelloRequest) returns (HelloReply) {}

  // 服务端流式调用
  rpc SayHelloReplyStream(HelloRequest) returns (stream HelloReply);

  // 客户端流式调用
  rpc SayHelloRequestStream(stream HelloRequest) returns (HelloReply);

  // 双向流式调用
  rpc SayHelloBiStream(stream HelloRequest) returns (stream HelloReply);
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

然后生成相关的stub代码:

protoc --go_out=. --go_opt=paths=source_relative \
  --go-grpc_out=. --go-grpc_opt=paths=source_relative \
  helloworld/helloworld.proto

然后开始写服务端代码,基本逻辑就是实现一个demo版本的服务端就好:

package main

import (
  "context"
  "log"

  "github.com/robberphex/grpc-in-memory/helloworld"
  pb "github.com/robberphex/grpc-in-memory/helloworld"
)

// helloworld.GreeterServer 的实现
type server struct {
  // 为了后面代码兼容,必须聚合UnimplementedGreeterServer
  // 这样以后在proto文件中新增加一个方法的时候,这段代码至少不会报错
  pb.UnimplementedGreeterServer
}

// unary调用的服务端代码
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  log.Printf("Received: %v", in.GetName())
  return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

// 客户端流式调用的服务端代码
// 接收两个req,然后返回一个resp
func (s *server) SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error {
  req, err := streamServer.Recv()
  if err != nil {
    log.Printf("error receiving: %v", err)
    return err
  }
  log.Printf("Received: %v", req.GetName())
  req, err = streamServer.Recv()
  if err != nil {
    log.Printf("error receiving: %v", err)
    return err
  }
  log.Printf("Received: %v", req.GetName())
  streamServer.SendAndClose(&pb.HelloReply{Message: "Hello " + req.GetName()})
  return nil
}

// 服务端流式调用的服务端代码
// 接收一个req,然后发送两个resp
func (s *server) SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error {
  log.Printf("Received: %v", req.GetName())
  err := streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
  if err != nil {
    log.Printf("error Send: %+v", err)
    return err
  }
  err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName() + "_dup"})
  if err != nil {
    log.Printf("error Send: %+v", err)
    return err
  }
  return nil
}

// 双向流式调用的服务端代码
func (s *server) SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error {
  req, err := streamServer.Recv()
  if err != nil {
    log.Printf("error receiving: %+v", err)
    // 及时将错误返回给客户端,下同
    return err
  }
  log.Printf("Received: %v", req.GetName())
  err = streamServer.Send(&pb.HelloReply{Message: "Hello " + req.GetName()})
  if err != nil {
    log.Printf("error Send: %+v", err)
    return err
  }
  // 离开这个函数后,streamServer会关闭,所以不推荐在单独的goroute发送消息
  return nil
}

// 新建一个服务端实现
func NewServerImpl() *server {
  return &server{}
}

然后我们创建一个基于pipe连接的客户端来调用服务端。

包含如下几个步骤:

  1. 创建服务端实现
  2. 基于pipe创建listener,然后基于它创建gRPC server
  3. 基于pipe创建客户端连接,然后创建gRPC client,调用服务

代码如下:

package main

import (
  "context"
  "fmt"
  "log"
  "net"

  pb "github.com/robberphex/grpc-in-memory/helloworld"
  "google.golang.org/grpc"
)

// 将一个服务实现转化为一个客户端
func serverToClient(svc *server) pb.GreeterClient {
  // 创建一个基于pipe的Listener
  pipe := ListenPipe()

  s := grpc.NewServer()
  // 注册Greeter服务到gRPC
  pb.RegisterGreeterServer(s, svc)
  if err := s.Serve(pipe); err != nil {
    log.Fatalf("failed to serve: %v", err)
  }
  // 客户端指定使用pipe作为网络连接
  clientConn, err := grpc.Dial(`pipe`,
    grpc.WithInsecure(),
    grpc.WithContextDialer(func(c context.Context, s string) (net.Conn, error) {
      return pipe.DialContext(c, `pipe`, s)
    }),
  )
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  // 基于pipe连接,创建gRPC客户端
  c := pb.NewGreeterClient(clientConn)
  return c
}

func main() {
  svc := NewServerImpl()
  c := serverToClient(svc)

  ctx := context.Background()

  // unary调用
  for i := 0; i < 5; i++ {
    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d", i)})
    if err != nil {
      log.Fatalf("could not greet: %v", err)
    }
    log.Printf("Greeting: %s", r.GetMessage())
  }

  // 客户端流式调用
  for i := 0; i < 5; i++ {
    streamClient, err := c.SayHelloRequestStream(ctx)
    if err != nil {
      log.Fatalf("could not SayHelloRequestStream: %v", err)
    }
    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d", i)})
    if err != nil {
      log.Fatalf("could not Send: %v", err)
    }
    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup", i)})
    if err != nil {
      log.Fatalf("could not Send: %v", err)
    }
    reply, err := streamClient.CloseAndRecv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
  }

  // 服务端流式调用
  for i := 0; i < 5; i++ {
    streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d", i)})
    if err != nil {
      log.Fatalf("could not SayHelloReplyStream: %v", err)
    }
    reply, err := streamClient.Recv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
    reply, err = streamClient.Recv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
  }

  // 双向流式调用
  for i := 0; i < 5; i++ {
    streamClient, err := c.SayHelloBiStream(ctx)
    if err != nil {
      log.Fatalf("could not SayHelloStream: %v", err)
    }
    err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d", i)})
    if err != nil {
      log.Fatalf("could not Send: %v", err)
    }
    reply, err := streamClient.Recv()
    if err != nil {
      log.Fatalf("could not Recv: %v", err)
    }
    log.Println(reply.GetMessage())
  }
}

总结

当然,作为基于内存的RPC调用,还可以有更好的方式,比如直接将对象传递到服务端,直接通过本地调用方式来通信。
但这种方式破坏了很多约定,比如对象地址、比如gRPC连接参数不生效等等。

本文介绍的,基于Pipe的通信方式,除了网络层走了内存传递之外,其他都和正常RPC通信行为一致,比如同样经历了序列化、经历了HTTP/2的流控制等。当然,性能上比原生调用也会差一点,但是好在对于测试、验证场景,行为上的一致比较重要些。

本文代码已经托管到了GitHub https://github.com/robberphex...


本文首发于 https://robberphex.com/grpc-i...

点赞
收藏
评论区
推荐文章
美凌格栋栋酱 美凌格栋栋酱
7个月前
Oracle 分组与拼接字符串同时使用
SELECTT.,ROWNUMIDFROM(SELECTT.EMPLID,T.NAME,T.BU,T.REALDEPART,T.FORMATDATE,SUM(T.S0)S0,MAX(UPDATETIME)CREATETIME,LISTAGG(TOCHAR(
Python进阶者 Python进阶者
3年前
一文带你了解Python Socket 编程
大家好,我是皮皮。前言Socket又称为套接字,它是所有网络通信的基础。网络通信其实就是进程间的通信,Socket主要是使用IP地址,协议,端口号来标识一个进程。端口号的范围为065535(用户端口号一般大于1024),协议有很多种,一般我们经常用到的就是TCP,IP,UDP。下面我们来详细了解下Socket吧。一、导入Socket模块因为要操作套接字,
Wesley13 Wesley13
3年前
TARS的服务鉴权功能|避免数据泄露
!(https://oscimg.oschina.net/oscnet/084208376c754453a44175dc09e16402.gif)在我们使用微服务架构时,经常会选择通过RPC通信框架方便地实现服务间的调用。但方便的同时也带来了一些安全隐患,任何用户都能够访问对外公开的接口,可能造成部分敏感数据的泄露,这是我们不希望
Wesley13 Wesley13
3年前
HTTPS加密原理
http(超文本传输协议)一种属于应用层的协议缺点:1.通信使用明文(不加密),内容可能会被窃听2.不验证通信方的身份,因此有可能遭遇伪装3.无法证明报文的完整性,所以有可能已遭篡改优点:1.传输速度快httpsHTTPS并非是应用层的一种新协议。只是HTTP
可莉 可莉
3年前
055. SkyWalking 集群环境搭建
1\.环境准备1.1.用于搭建SkyWalking的三台服务器1.1.1.服务器10.1.62.7810.1.62.7910.1.62.801.1.2.需要端口11800(gRPC数据收集和内网节点间通信)12800(SkyWalki
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
GO语言网络编程
socket编程Socket是BSDUNIX的进程通信机制,通常也称作”套接字”,用于描述IP地址和端口,是一个通信链的句柄。Socket可以理解为TCP/IP网络的API,它定义了许多函数或例程,程序员可以用它们来开发TCP/IP网络上的应用程序。电脑上运行的应用程序通常通过”套接字”向网络发出请求或者应答网络请求。Socke
Wesley13 Wesley13
3年前
HTTPS是如何保证安全的
HTTP存在的问题1.窃听风险:通信使用明文(不加密),内容可能会被窃听(第三方可能获知通信内容)2.冒充风险:不验证通信方的身份,因此有可能遭遇伪装3.篡改风险:无法证明报文的完整性,所以有可能已遭篡改HTTPS!(https://osci
Stella981 Stella981
3年前
055. SkyWalking 集群环境搭建
1\.环境准备1.1.用于搭建SkyWalking的三台服务器1.1.1.服务器10.1.62.7810.1.62.7910.1.62.801.1.2.需要端口11800(gRPC数据收集和内网节点间通信)12800(SkyWalki
Stella981 Stella981
3年前
Centos8 如何配置DHCP服务器
DHCP(动态主机配置协议)用于自动为PC和其他网络设备分配IP地址,以便它们进行通信。它使用UDP协议的67端口,对客户端使用UDP端口68。DHCP操作分为四个阶段:服务器发现,IP租约报价,IP租约请求和IP租约确认。这些阶段通常缩写为DORA,用于发现,提供,请求和确认。系统环境Centos(https://www.osc
京东云开发者 京东云开发者
10个月前
https 的本质、证书验证过程以及数据加密
1.什么是HTTPSHTTP加上加密处理和认证以及完整性保护后即是HTTPS。它是为了解决HTTP存在的安全性问题,而衍生的协议,那使用HTTP的缺点有:1.通信使用明文可能会被窃听2.不验证通信方的身份可能遭遇伪装3.无法验证报文完整性,可能已遭篡改HT
算法算
算法算
Lv1
想要忘记那么多过往偏偏清醒到荒唐
文章
2
粉丝
0
获赞
0