RPC 指的是远程过程调用(Remote Procedure Call),它的调用包含传输协议和编码(对象序列)协议等,允许运行于一台计算机上的程序调用另一台计算机上的子程序,而开发人员无须额外为这个交互作用编程,就像对本地函数进行调用一样方便。

Go 语言在标准库中提供了 net/rpc,可以实现简单的 RPC。不过,Go 的 RPC 协议是使用特殊编码的 Gob,不适合作为通用的协议。

简单的 RPC

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package main

import (
"log"
"net"
"net/rpc"
)

type HelloService struct{}

func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello:" + request
return nil
}

func main() {
rpc.RegisterName("HelloService", new(HelloService))

listener, err := net.Listen("tcp", ":1234")
if err != nil {
log.Fatal("ListenTCP error:", err)
}

conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}

rpc.ServeConn(conn)
}


Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"log"
"net/rpc"
)

func main() {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}

var reply string
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}

fmt.Println(reply)
}

grpc

gRPC 是一个高性能、开源、通用的 RPC 框架,它的接口描述语言(Interface description language,IDL)使用的是 Protobuf,是由 Google 开源的。

优点

  • 性能良好
  • 代码生成方法
  • 流传输
  • 超时和取消

缺点

  • 可读性差
  • 不支持浏览器调用
  • 外部组件支持较差

Protobuf

Protobuf(Protocol Buffers)是一种与语言、平台无关,且可扩展的序列化结构化数据的数据描述语言,通常称其为 IDL,常用于通信协议、数据存储等,与 JSON、XML 相比,它更小、更快,因此也更受开发人员的青睐。

Protobuf 安装

在 Mac 上,可以直接使用 Homebrew 安装:

1
brew install protobuf

检查是否安装成功,命令如下:

1
protoc --version

目前本文用的版本是 3.19.4

仅仅安装 Protoc 的编译器并不够,对于不同的编程语言,需要安装运行时的 protoc 插件,Go 语言对应的是 protoc-gen-go 插件。(多人开发时推荐安装时锁定版本,避免出现意外的兼容问题。)

1
2
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest

grpc 的 HelloWorld

使用 go mod init 建立一个项目,并创建对应的文件和目录:

1
2
3
4
5
6
7
├── client
│ └── client.go
├── go.mod
├── proto
│ └── helloworld.proto
└── server
└── server.go

helloworld.proto 中写入以下声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";

package helloworld;

option go_package = "./";

service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
string name = 1;
}

message HelloReply {
string message = 1;
}

在项目根目录下,执行 protoc 命令,生成对应的 pb.go 文件:

1
protoc -I proto/ --go-grpc_out=proto/ --go_out=proto/ proto/helloworld.proto

运行命令后,会生成 helloworld.pb.gohelloworld_grpc.pb.go

Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"context"
"flag"
"fmt"
"log"
"net"

"google.golang.org/grpc"

pb "gorpc/proto"
)

var (
port = flag.Int("port", 50051, "The server port")
)

// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.GetName())
return &pb.HelloReply{Message: "Hello " + in.GetName()}, nil
}

func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
  • pb.UnimplementedGreeterServer 必须嵌入到 server 中,这样才能获得对应的方法实现。
  • 主要就是使用 RegisterGreeterServer 注册方法到内部的注册中心,这样在接收请求的时候,可以通过内部的“服务发现”发现该服务端端口,并进行处理。

Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package main

import (
"context"
"flag"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

pb "gorpc/proto"
)

const (
defaultName = "world"
)

var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
name = flag.String("name", defaultName, "Name to greet")
)

func main() {
flag.Parse()
// Set up a connection to the server.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGreeterClient(conn)

// Contact the server and print out its response.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("Greeting: %s", r.GetMessage())
}

gRPC 拦截器

的拦截器(Interceptor)可以实现对每一个 RPC 方法的前面或者后面做操作,例如记录日志、鉴权等等,且不直接侵入业务代码。

  • 预处理
  • 调用 RPC 方法
  • 后处理

流和 Unary 拦截器是分开的。

记录日志示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package middleware

import (
"context"
"log"
"time"

"google.golang.org/grpc"
)

func AccessLog(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
requestLog := "access request log: method: %s, begin_time: %d, request: %v"
beginTime := time.Now().Local().Unix()
log.Printf(requestLog, info.FullMethod, beginTime, req)

resp, err := handler(ctx, req)

responseLog := "access response log: method: %s, begin_time: %d, end_time: %d, response: %v"
endTime := time.Now().Local().Unix()
log.Printf(responseLog, info.FullMethod, beginTime, endTime, resp)
return resp, err
}

func StreamAccessLog(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
requestLog := "stream access request log: method: %s, begin_time: %d"
beginTime := time.Now().Local().Unix()
log.Printf(requestLog, info.FullMethod, beginTime)

err := handler(srv, ss)

responseLog := "stream access response log: method: %s, begin_time: %d, end_time: %d"
endTime := time.Now().Local().Unix()
log.Printf(responseLog, info.FullMethod, beginTime, endTime)

return err
}

在启动服务时注册拦截器:

1
2
3
4
5
6
7
8
9
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(middleware.AccessLog),
grpc.StreamInterceptor(middleware.StreamAccessLog),
grpc.StatsHandler(&interal.ServerStats{}),
}

s := grpc.NewServer(opts...)
pb.RegisterGameServer(s, &interal.Server{})
log.Printf("grpc server is listening at %v", lis.Addr())

grpc-go 只允许使用一个拦截器,但可以考虑使用 gRPC 应用生态中的 go-grpc-middleware 提供的链式调用。

1
go get -u github.com/grpc-ecosystem/go-grpc-middleware
1
2
3
4
5
6
7
8
9
10
11
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
middleware.Recovery,
middleware.AccessLog,
)),
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
middleware.StreamRecovery,
middleware.StreamAccessLog,
)),
grpc.StatsHandler(&interal.ServerStats{}),
}

可以考虑加入异常捕获,避免服务因为异常抛出而中断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func Recovery(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (interface{}, error) {
defer func() {
if e := recover(); e != nil {
recoveryLog := "recovery log: method: %s, message: %v, stack: %s"
log.Printf(recoveryLog, info.FullMethod, e, string(debug.Stack()[:]))
}
}()

return handler(ctx, req)
}

func StreamRecovery(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
defer func() {
if e := recover(); e != nil {
recoveryLog := "stream recovery log: method: %s, message: %v, stack: %s"
log.Printf(recoveryLog, info.FullMethod, e, string(debug.Stack()))
err = status.Errorf(status.Code(err), "panic occurred: %v", e)
}
}()

return handler(srv, ss)
}

小结

这仅仅是 grpc 的简单入门,在 grpc 中,支持 4 种调用,本文仅仅介绍了最简单的 Unary RPC。