HTTP调用与RPC(Remote Procedure Call)
RPC是远端过程调用,其调用协议通常包含传输协议和序列化协议,如:传统的http1.1和json(传统的HTTP也属于RPC)、gRPC的http2和protobuf。
调用远程计算机上提供的函数。服务端实现具体的函数功能,客户端本地留有远程服务接口定义存根,表现形式是函数调用,但实际上与HTTP调用一样是向远程计算器发起的网络请求,函数的入参是请求,函数的返回值是响应。RPC函数调用的形式封装了跨语言的服务访问,大大简化了应用层编程(HTTP调用需要显式的序列化和反序列化)。
RPC与HTTP调用的不同点是:传输协议、序列化方式、代码的调用方式。
为什么要用gRPC?
通信协议基于标准的 HTTP/2 设计,支持双向流、消息头压缩、单 TCP 的多路复用、服务端推送等特性。
序列化支持 PB(Protocol Buffer)和 JSON,PB 是一种语言无关的高性能序列化协议。使用 HTTP/2 + PB, 保障了 RPC 调用的高性能。如果是一个大型服务,内部子系统较多,微服务架构,接口非常多的情况下,gRPC将体现出性能优势。
gRPC还可以很简单的插入身份认证、负载均衡、日志和监控等功能。
环境安装
Copy $ brew install protobuf protoc-gen-go protoc-gen-go-grpc
$ export GO111MODULE=off # 有的包必须要装到$GOPATH/src下
$ cd $GOPATH/src
$ git clone https://github.com/golang/text.git ./golang.org/x/text
$ git clone https://github.com/golang/net.git ./golang.org/x/net
$ git clone https://github.com/grpc/grpc-go ./google.golang.org/grpc
$ git clone https://github.com/google/go-genproto.git ./google.golang.org/genproto
$ git clone https://github.com/protocolbuffers/protobuf-go.git ./google.golang.org/protobuf
$ git clone https://github.com/golang/protobuf.git ./github.com/golang/protobuf
$ git clone https://github.com/googleapis/googleapis ./googleapis # 这个包直接放src目录下
$ export GO111MODULE=on # 开启 go mod 才能安装指定版本
# $ go install github.com/golang/protobuf/protoc-gen-go@v1.3.5 # 这个代码生成的包不装太新的
golang工程如果配好了GOPROXY=https://goproxy.io,direct,仍然有包下不下来,go.mod可用replace来替换代码库地址,类似如下这样:
Copy module project-name
go 1.16
replace (
cloud.google.com/go => github.com/googleapis/google-cloud-go v0.37.4
golang.org/x/crypto => github.com/golang/crypto v0.0.0-20181203042331-505ab145d0a9
golang.org/x/exp => github.com/golang/exp v0.0.0-20190402192236-7fd597ecf556
golang.org/x/image => github.com/golang/image v0.0.0-20190321063152-3fc05d484e9f
golang.org/x/lint => github.com/golang/lint v0.0.0-20190313153728-d0100b6bd8b3
golang.org/x/mobile => github.com/golang/mobile v0.0.0-20190327163128-167ebed0ec6d
golang.org/x/net => github.com/golang/net v0.0.0-20190311031020-56fb01167e7d
golang.org/x/oauth2 => github.com/golang/oauth2 v0.0.0-20190523182746-aaccbc9213b0
golang.org/x/sync => github.com/golang/sync v0.0.0-20190227155943-e225da77a7e6
golang.org/x/sys => github.com/golang/sys v0.0.0-20190225065934-cc5685c2db12
golang.org/x/text => github.com/golang/text v0.3.0
golang.org/x/time => github.com/golang/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/tools => github.com/golang/tools v0.0.0-20190402200628-202502a5a924
google.golang.org/api => github.com/googleapis/google-api-go-client v0.5.0
google.golang.org/appengine => github.com/golang/appengine v1.6.0
google.golang.org/genproto => github.com/google/go-genproto v0.0.0-20180831171423-11092d34479b
google.golang.org/grpc => github.com/grpc/grpc-go v1.21.0
)
require (
github.com/fsnotify/fsnotify v1.4.9
github.com/go-redis/redis v6.15.2+incompatible
github.com/go-sql-driver/mysql v1.4.1
github.com/golang/protobuf v1.4.3
github.com/grpc-ecosystem/go-grpc-middleware v1.2.0
github.com/jinzhu/gorm v1.9.10
github.com/kr/pretty v0.2.1 // indirect
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/onsi/ginkgo v1.15.1 // indirect
github.com/onsi/gomega v1.11.0 // indirect
github.com/pkg/errors v0.8.1
github.com/satori/go.uuid v1.2.0
github.com/smartystreets/goconvey v1.6.4
github.com/spf13/viper v1.7.1
go.uber.org/zap v1.16.0
google.golang.org/grpc v1.21.1
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
)
Protocol Buffers
可以理解为与 json、xml 作用相类似。
为什么使用 Protocol Buffer?
更小:它可以在序列化数据的同时对数据进行压缩,所以它生成的字节流,通常都要比相同数据的其他格式(例如 XML 和 JSON)占用的空间明显小很多,小3-10倍。
更快:序列化速度更快,比xml和JSON快20-100倍,体积缩小后,传输时,带宽也会优化
更简单:proto编译器,自动进行序列化和反序列化
维护成本低:跨平台、跨语言,多平台仅需要维护一套对象协议(.proto)
“向后”兼容性好:允许在保证向后兼容的前提下更新字段
Protocol Buffer 的缺点是调试不便,以二进制数据流做传输,速度是快了,但可读性变差了。
gRPC默认使用 Protocol Buffer 作为接口设计语言(IDL,interface design language),这个 .proto 文件包括两部分:
Copy syntax = "proto3";
package xxx.project; // 一个项目用到多个proto时,可能会命名重复,用这个可以隔离开命名空间
message Date {
int32 year = 1;
int32 month = 2;
int32 day = 3;
}
message Person {
int32 id = 1;
string name = 2;
float height = 3;
float weight = 4; // 这个数字是tag,tag的数值是非常重要的,Protocol Buffer会根据tag的数值来序列化,而不是根据字段名。举个例子,服务端对字段做了增删,把tag也调整了一下,而此时客户端的proto文件还在用老的,则此时客户端仍然会按老的tag绑定关系来解析。
bytes avatar = 5;
string email = 6;
bool email_verified = 7;
repeated string phone_numbers = 8; // repeated 表示这是个数组字段
Gender gender = 11;
Date birthday = 12; // 引用了自定义的消息类型,在golang中会被解析成结构体
enum Gender {
NOT_SPECIFIED = 0;
FEMALE = 1;
MALE = 2;
}
// 针对以上proto文件更新tag绑定引起的问题,引入了保留字段的功能,标记为不再使用的tag和字段名,客户端如果在使用这些tag或字段名,就会抛出异常
reserved 9, 10, 20 to 100, 200 to max; // 保留的tag
reserved "foo", "bar"; // 保留的字段名
}
service Employee {
rpc GetByName (Request) returns (Reply) {}
}
定义好服务之后,执行:protoc --go_out=plugins=grpc:. helloworld/helloworld.proto 可以自动生成gRPC接口代码xxx.pb.go文件。
向前/向后兼容
所谓的“向后兼容”(backward compatible),就是说,当模块 B 升级了之后,它能够正确识别模块 A 发出的老版本的协议。 所谓的“向前兼容”(forward compatible),就是说,当模块A升级了之后,模块 B 能够正常识别模块 A 发出的新版本的协议。
这个特性依赖于字段编号始终表示相同的数据项。如果从服务的新版本的消息中删除字段,则永远不应重复使用该字段编号。 可以通过使用reserved
关键字强制执行此行为。
gRPC四种通信方式
1. 简单RPC(Simple RPC):就是一般的rpc调用,一个请求对象对应一个返回对象。
2. 服务端流式RPC(Server-side streaming RPC):一个请求对象,服务端返回数据流(数组,每次传一个元素)。
3. 客户端流式RPC(Client-side streaming RPC):客户端传入连续的请求对象(数组),服务端返回一个响应结果。
4. 双向流式RPC(Bidirectional streaming RPC):结合客户端流式RPC和服务端流式RPC,可以传入多个对象,返回多个响应对象。
流式接口的使用场景:一个接口要发送大量数据时,一次只传输一部分数据,分批传输数据,比如文件的传输,用流式接口可以降低服务器的瞬时压力,对客户端的响应也更快。
gRPC拦截器
与HTTP服务的拦截器功能类似,可以在RPC方法前、后执行一些操作。
拦截器分类
典型应用场景:统一接口身份认证。
gRPC支持拦截器链。
进阶使用之import其他proto
import 同一级或子级的 proto
Copy syntax = "proto3";
package employee;
import "messages.proto"; // 若是子级的情况,把路径加上即可
message AddEmployee {
string employeeName = 1;
HelloRequest hello = 2;
}
import 其他自己项目的proto
Copy import "other-project/path/xxx.proto";
各个项目都放到同级目录,按如上这样import
,调整 protoc 编译命令即可,protoc可以有多个--proto_path
参数指定,编译时就会到指定的多个路径下找package。
import google 提供的 proto
Copy import "google/api/annotations.proto"; // 来自 googleapis
import "google/rpc/status.proto"; // 来自 googleapis
import "google/protobuf/timestamp.proto"; // 来自 protocolbuffers
不管引用了哪个包,直接 import 它下面的代码即可。
与 import 其他自己项目的proto不同的是,import 自己的项目要写上项目名
而 import google 提供的 proto 不写,这样写是不对的:import "googleapis/google/api/annotations.proto"
Copy $ cd 项目目录
$ protoc --go_out=plugins=grpc:. --proto_path=$GOPATH/src/google.golang.org/protobuf --proto_path=$GOPATH/src/googleapis --proto_path=/Users/zhhnzw/workspace/project-name --proto_path=. path/xxx.proto
注意这个protoc
编译指令指定的多个proto_path
,除了指定的当前项目目录的.
,其他添加的路径都要是全路径,不可以写成../
,也不能有~/
这种路径
需要导入 google.golang.org/grpc/metadata 这个包
Copy headerData := metadata.Pairs("timestamp", strconv.Itoa(int(time.Now().Unix())), "token", "123")
ctxH := metadata.NewOutgoingContext(ctx, headerData)
// 后续也可以往后面添加数据
ctxH = metadata.AppendToOutgoingContext(ctxH, "kay1", "val1", "key2", "val2")
发起请求时需要传递 context,使用创建的这个ctxH 即可。
一元模式 数据的读取
Copy md, ok := metadata.FromIncomingContext(ctx)
流模式 数据的读取
Copy md, ok := metadata.FromIncomingContext(stream.Context())
server 端会把 metadata 分为 header 和 trailer 发送给 client
一元模式 数据的写入
Copy header := metadata.Pairs("token", "123")
grpc.SendHeader(ctx, header)
trailer := metadata.Pairs("is-end", "end")
grpc.SetTrailer(ctx, trailer)
流模式 数据的写入
Copy header := metadata.Pairs("token", "123")
stream.SendHeader(ctx, header)
trailer := metadata.Pairs("is-end", "end")
stream.SetTrailer(ctx, trailer)
一元模式读取 header 信息
Copy var header, trailer metadata.MD
// 调用服务端方法的时候可以在后面传参数
serverRetMsg, err := client.RPCMethod(ctx, &Struct, grpc.Header(&header), grpc.Trailer(&trailer))
流模式读取 header 信息
Copy stream, err := client.RequestServerStreamMethod(ctx)
header, err := stream.Header()
trailer := stream.Trailer()
超时处理
gRPC 服务的客户端可以通过 context 来控制超时时间。
请求失败重试
gRPC 的重试策略有两种,分别是:重试(retryPolicy)和对冲(hedging),在客户端创建 gRPC 连接时,下面以通过 service config 来配置 retryPolicy 为例:
Copy retryPolicy = `{
"methodConfig": [{
"name": [{"service": "grpc.examples.echo.Echo"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": ["UNAVAILABLE"]
}
}],
"retryThrottling": {
"maxTokens": 10,
"tokenRatio": 0.1
}}`
_, err := grpc.Dial(*addr, grpc.WithInsecure(), grpc.WithDefaultServiceConfig(retryPolicy))
MaxAttempts
:最大重试次数。
RetryableStatusCodes
:配置哪些错误是允许重试的。
InitialBackoff
:第一次重试等待的间隔。
BackoffMultiplier
:每次间隔的指数因子。
gPRC 采用指数避退+随机间隔 组合起来的方式进行重试。
指数避退:重试间隔时间按照指数增长,如等 3s 9s 27s后重试。指数避退能有效防止对对端造成不必要的冲击。
MaxBackoff
:等待的最大时长,随着重试次数的增加,不希望第N次重试等待的时间变成30分钟这样不切实际的值。
retryThrottling
限流配置是针对整个服务器的,当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载。
错误处理与状态码
以下是客户端的错误处理示例:
Copy import (
"context"
"time"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
)
func call() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
req := &pb.Req{
Arg: 1,
}
resp, err := client.CallSomething(ctx, req)
if err != nil {
if rpcErr, ok := grpcstatus.FromError(err); ok {
if rpcErr.Code() == grpccodes.Canceled ||
rpcErr.Code() == grpccodes.DeadlineExceeded {
log.Warnf("RPC call timeout: %s", err)
return nil
}
}
return err
}
return nil
}
gRPC 服务端要像这样返回 err:
Copy import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func (s *Service) GetSomething(req *pb.Req) error {
if ... {
return status.New(codes.InvalidArgument, "param err").Err()
}
return nil
}
单元测试这样断言:
Copy func TestGetSomething(t *testing.T) {
req := &pb.Req{
Arg: 1,
}
res, err := Service.GetAppraisalsWithTypeAndTime(ctx, req)
t.Log(res)
assert.Equal(t, status.Error(codes.InvalidArgument, "param err"), err)
}
调试工具
https://github.com/fullstorydev/grpcui
使用Wireshark抓包
默认不会识别HTTP2
,要设置一下,在第一个报文上右击,选择解码为(Decode As),会出现一个Field
为TCP
的行,把port
后面的Value
字段设置为50051
(启动的gRPC服务的端口号),把当前
(Current)字段设置为HTTP2
,再点OK
即可。接下来即可看到gRPC
和HTTP2
的报文,下面展开的是一个请求的报文,发送的是string
类型的字段,值为world
:
浅谈gRPC服务端理解
Copy // server is used to implement helloworld.SimpleServer.
type server struct{}
// SayHello implements helloworld.SimpleServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.Name)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}
func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer(grpc.UnaryInterceptor(UnaryServerInterceptor))
pb.RegisterSimpleServiceServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
初始化
NewServer
注册
用 Protobuf 生成出来的 .pb.go 文件中,会定义出 Service APIs interface 作为 server 的具体实现约束,必须要实现所定义接口包含的所有方法。
Copy func (s *Server) register(sd *ServiceDesc, ss interface{}) {
...
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
for i := range sd.Streams {
d := &sd.Streams[i]
srv.sd[d.StreamName] = d
}
s.m[sd.ServiceName] = srv
}
服务端注册代码:pb.RegisterSimpleServiceServer(s, &server{})
,接口的具体实现是server{}
实例的,如上 gRPC 源码中register
方法把接口的具体实现注册到内部service
实例,使接口方法名与其具体实现一一对应,以便于后续实际调用的使用。
监听
监听/处理阶段,核心代码如下:
Copy func (s *Server) Serve(lis net.Listener) error {
...
var tempDelay time.Duration // how long to sleep on accept failure
for {
rawConn, err := lis.Accept()
if err != nil {
if ne, ok := err.(interface {
Temporary() bool
}); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
case <-s.quit:
timer.Stop()
return nil
}
continue
}
...
return err
}
tempDelay = 0
// Start a new goroutine to deal with rawConn so we don't stall this Accept
// loop goroutine.
//
// Make sure we account for the goroutine so GracefulStop doesn't nil out
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}
循环处理客户端请求,通过 lis.Accept 取出新的客户端请求,如果队列中没有需处理的连接时,会形成阻塞等待。
若 lis.Accept 失败,则触发休眠机制,第一次休眠 5ms,不断翻倍,最大 1s。
若 lis.Accept 成功,代表监听到请求,重置休眠的时间计数和启动一个新的 goroutine 调用 handleRawConn 方法去执行/处理新的请求,也就是说每一个请求都是不同的 goroutine 在处理 。
加入 waitGroup 用来处理优雅重启或退出,等待所有 goroutine 执行结束之后才会退出。
注1:listen()
函数可以让套接字进入被动监听状态(当没有客户端请求时,套接字处于“睡眠”状态;当接收到客户端请求时,套接字才会被“唤醒”来响应请求)。
注2:当套接字正在处理客户端请求时,如果有新的请求进来,套接字是没法处理的,只能把它放进缓冲区,待当前请求处理完毕后,再从缓冲区中读取出来处理。如果不断有新的请求进来,它们就按照先后顺序在缓冲区中排队,直到缓冲区满。这个缓冲区,就称为请求队列(Request Queue)。
注3:listen()
只是让套接字进入监听状态,并没有真正接收客户端请求,listen()
不会阻塞。通过accept()
函数来接收客户端请求,accept()
会阻塞程序执行,直到有新的请求到来,accept()
每次接受一个请求。
浅谈gRPC客户端理解
Copy func main() {
// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewSimpleServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: "world"})
if err != nil {
log.Fatalf("could not greet: %v", err)
}
log.Printf("call response message: %s", r.Message)
}
创建连接
grpc.Dial
方法是对grpc.DialContext
的封装,DialContext
是异步建立连接的,也就是并不是马上生效,处于Connecting
状态,而要到达Ready
状态才可用。
如果想通过Dial
方法就立刻打通与服务端的连接,需要在grpc.Dial
方法多传一个opt参数:grpc.WithBlock()
。
创建客户端实例
调用
Copy func (c *simpleServiceClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
out := new(HelloReply)
err := c.cc.Invoke(ctx, "/grpc_demo.SimpleService/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
proto 生成的 RPC 方法更像是一个包装盒,把需要的东西放进去,而实际上调用的还是 grpc.invoke 方法。如下:
Copy func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}
通过概览,可以关注到三块调用。如下:
newClientStream:获取传输层 Trasport 并组合封装到 ClientStream 中返回,在这块会涉及负载均衡、超时控制、 Encoding、 Stream 的动作,与服务端基本一致的行为。
cs.SendMsg:发送 RPC 请求出去,但其并不承担等待响应的功能。
cs.RecvMsg:阻塞等待接受到的 RPC 方法响应结果。
关闭连接
conn.Close
方法会取消ClientConn
上下文,同时关闭所有底层传输。涉及如下:
gRPC-Gateway
安装
Copy $ cd $GOPATH/src/github.com/zhhnzw # 需要在$GOPATH/src下操作,安装到$GOPATH/bin
$ mkdir tool
$ cd tool
$ go mod init tool
$ go mod tidy
$ go install \
github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway \
github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2 \
google.golang.org/protobuf/cmd/protoc-gen-go \
google.golang.org/grpc/cmd/protoc-gen-go-grpc
使用示例
protobuf定义:
Copy syntax = "proto3";
option go_package = "./;helloworld";
package helloworld;
import "google/api/annotations.proto"; // 需要 import googleapis 的包googleapis的包
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
service SimpleService {
rpc SayHello (HelloRequest) returns (HelloReply) {
option (google.api.http) = {
post: "/helloworld"
body: "*"
};
}
}
Copy # 生成gRPC-Gateway源码
$ protoc -I=./helloworld -I=$GOPATH/src/googleapis --grpc-gateway_out=logtostderr=true:./helloworld/ helloworld/helloworld.proto
Copy package main
import (
"flag"
"fmt"
gw "github.com/zhhnzw/grpc_demo/helloworld"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
var (
echoEndpoint = flag.String("echo_endpoint", "localhost:50051", "endpoint of YourService")
)
func run() error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mux := runtime.NewServeMux()
opts := []grpc.DialOption{grpc.WithInsecure()}
err := gw.RegisterSimpleServiceHandlerFromEndpoint(ctx, mux, *echoEndpoint, opts)
if err != nil {
return err
}
return http.ListenAndServe(":9090", mux)
}
func main() {
if err := run(); err != nil {
fmt.Print(err.Error())
}
}
测试:curl -X POST -d '{"name": "will"}' 127.0.0.1:9090/helloworld