0%

grpc入门

protobuf

Protocol Buffer:轻量高效结构化存储数据

使用protobuf之前必须按格式写一个文件,根据这个文件生成各种脚本

以python为例:

1
2
3
4
5
6
7
8
9
// file:proto.hello
//必须标明版本
syntax = "proto3";

//只能发送message类型
message HelloRequest {
string name = 1; //name表示名称, name编号为1
string date = 2;
}

命令行cd进文件夹后执行:

1
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. hello.proto

生成:

1
2
3
4
├─protobuf3_demo
│ │ hello.proto
│ │ hello_pb2.py
│ │ hello_pb2_grpc.py

记得改hello_pb2_grpc.py中import路径,否则会报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 使用示例
from protobuf3_demo import hello_pb2

request = hello_pb2.HelloRequest()

request.name = "hhhh"
request.date = "2021.10.2"
req_str = request.SerializeToString()
print(req_str)

request2 = hello_pb2.HelloRequest()

request2.ParseFromString(req_str)
print(request2)

python grpc helloworld

首先写.proto文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
syntax = "proto3";

package proto;//不能不一样

// 服务名称
service Greeter {
// 依次为:rpc关键字,函数名,函数参数,return关键字,返回值,函数内容体
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

执行命令并改import路径

完成操作后,开始写server和client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import grpc
from concurrent import futures
from proto import hello_pb2,hello_pb2_grpc

class Greeter(hello_pb2_grpc.GreeterServicer):
def SayHello(self, request, context):
return hello_pb2.HelloReply(message=f"hello,{request.name}")

if __name__ == "__main__":
# 1,实例化server
server = grpc.server(futures.ThreadPoolExecutor(5))
# 2,注册逻辑到server中
hello_pb2_grpc.add_GreeterServicer_to_server(servicer=Greeter(),server=server)
# 3,启动server
server.add_insecure_port('127.0.0.1:50000')
server.start()
# 4,让server保持服务
server.wait_for_termination()
1
2
3
4
5
6
7
8
import grpc
from python_grpc_helloworld.proto import hello_pb2_grpc,hello_pb2

with grpc.insecure_channel('127.0.0.1:50000') as channel:
stub = hello_pb2_grpc.GreeterStub(channel)
rsp: hello_pb2.HelloReply = stub.SayHello(hello_pb2.HelloRequest(name="lily"))

print(rsp.message)

python concurrent.futures

  • 基本组成:

    concurrent.futures.Executor类:虚拟基类,ThreadPoolExecutor是其子类

    submit(function, argument):调度函数执行,argument为参数

    map(function,argument):同上,但是以异步的方式调度函数

    shutdown(Wait=True):发出让执行者释放所有资源的信号,wait为True时等待任务完成,False时立即返回

    concurrent.futures.Future:封装了异步执行,在executor.submit()时创建

  • concurrent.futures.ThreadPoolExecutor类

1
concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

max_workers:最大线程数;若为None或没有指定,默认为机器处理器数量

这是一个线程池对象

go grpc helloworld

在go中写.proto文件,基本和python一样(除了option那一行,一定要完全一样)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
syntax = "proto3";

package proto;//不能不一样
option go_package="./;proto";

// 服务名称
service Greeter {
// 依次为:rpc关键字,函数名,函数参数,return关键字,返回值,函数内容体
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

cd到目标文件下:

执行

1
protoc --go_out=plugins=grpc:./ ./hello.proto

生成一个脚本文件

之后编写代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//服务端
package main

import (
"context"
"net"

"google.golang.org/grpc"

"awesomeProject/grpc_test/proto"
)

type Server struct{}

//简单的通信需要一个上下文对象
func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,error){
return &proto.HelloReply{
Message: "Hello" + request.Name,
},nil
}

func main(){
server := grpc.NewServer()
proto.RegisterGreeterServer(server,&Server{})
//指定端口
listen,err := net.Listen("tcp","127.0.0.1:50000")
if err != nil{
panic("fail to listen:" + err.Error())
}
//开始监听
err = server.Serve(listen)
if err != nil{
panic("fail to start:" + err.Error())
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//客户端
package main

import (
"awesomeProject/grpc_test/proto"
"context"
"fmt"
"google.golang.org/grpc"
)

func main(){
conn, err := grpc.Dial("127.0.0.1:50000", grpc.WithInsecure())
if err != nil{
panic(err)
}
defer conn.Close()

c := proto.NewGreeterClient(conn)
resp, err := c.SayHello(context.Background(), &proto.HelloRequest{Name:"Lily"})
if err != nil{
panic(err)
}
fmt.Println(resp.Message)
}

grpc stream

  • 简单模式(simple RPC)

    客户端发起一个请求,服务端返回一个请求

  • 服务端数据流模式(Server-side streaming RPC)

    客户端发起一个请求,服务端返回一段连续的数据流(典型如客户端向服务端发送一个请求,服务端源源不断的返回数据)

  • 客户端数据流模式(CLient-side streaming RPC)

    客户端源源不断的发送数据,发送结束后,客户端返回一个响应(如物联网终端向服务器报送数据)

  • 双向数据流模式(Bidirectional streaming RPC)

    实现实时交互(如聊天)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";

option go_package = "./;proto";

service Greeter{
//stream关键字表明是数据流
rpc GetStream(StreamReqData) returns (stream StreamResData); //服务端流模式
rpc PutStream(stream StreamReqData) returns (StreamResData); //客户端流模式
rpc AllStream(stream StreamReqData) returns (stream StreamResData); //双向流模式
}

message StreamReqData {
string data = 1;
}

message StreamResData {
string data = 1;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// server
package main

import (
"awesomeProject/stream_grpc_test/proto"
"errors"
"fmt"
"google.golang.org/grpc"
"log"
"net"
"sync"
"time"
)

const PORT = ":50000"
type Server struct {}

// 服务端 单向流
func (s *Server)GetStream(req *proto.StreamReqData,serveStream proto.Greeter_GetStreamServer) error{
for i:=0;i<10;i++ {
err := serveStream.Send(&proto.StreamResData{
Data: time.Now().String(),
})
if err != nil{
panic(err.Error())
}
time.Sleep(time.Second)
}
return nil
}

// 客户端 单向流
func (s *Server)PutStream(clientStream proto.Greeter_PutStreamServer) error{
for {
a,err := clientStream.Recv()
if err != nil{
return errors.New("fail to receive data:"+err.Error())
}
log.Println(a.Data)
}
}

// 双向流
func (s *Server)AllStream(allStream proto.Greeter_AllStreamServer) error{
wg := sync.WaitGroup{}
wg.Add(2)
go func(){
defer wg.Done()
for i:=0;i<10;i++{
err := allStream.Send(&proto.StreamResData{Data: time.Now().String()})
if err != nil{
fmt.Println("allStream sending goes wrong:"+err.Error())
}
time.Sleep(time.Second)
}
}()

go func(){
defer wg.Done()
for i:=0;i<10;i++{
data,err := allStream.Recv()
if err != nil{
fmt.Println("allStream sending goes wrong:"+err.Error())
break
}
fmt.Println(data)
}
}()
wg.Wait()
return nil
}

func main(){
server := grpc.NewServer()
proto.RegisterGreeterServer(server,&Server{})
listen,err := net.Listen("tcp",PORT)
if err != nil{
panic(err.Error())
}

err = server.Serve(listen)
if err != nil{
panic(err.Error())
}


}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//client
package main

import (
"awesomeProject/stream_grpc_test/proto"
"context"
"fmt"
"google.golang.org/grpc"
"log"
"sync"
"time"
)

const ADDR = "127.0.0.1:50000"

func main(){
conn, err := grpc.Dial(ADDR,grpc.WithInsecure())
if err != nil{
panic(err.Error())
}
defer conn.Close()

c := proto.NewGreeterClient(conn)
// 服务端 流模式
res,err := c.GetStream(context.Background(),&proto.StreamReqData{Data: "Hello,Lily!"})
if err != nil{
log.Fatalln("fail to send message:"+err.Error())
}

for {
a,err := res.Recv()
if err != nil{
log.Println("fail to receive data:"+err.Error())
break
}
log.Println(a)
}

// 客户端 流模式
putStream,err := c.PutStream(context.Background())
for i:=0;i<10;i++{
err = putStream.Send(&proto.StreamReqData{Data: time.Now().String()})
if err != nil{
log.Println("fail to send stream:"+err.Error())
}
time.Sleep(time.Second)
}

// 双向流模式
allStream,err := c.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(2)
go func(){
defer wg.Done()
for i:=0;i<10;i++{
err := allStream.Send(&proto.StreamReqData{Data: time.Now().String()})
if err != nil{
fmt.Println("allStream sending goes wrong:"+err.Error())
}
time.Sleep(time.Second)
}
}()

go func(){
defer wg.Done()
for i:=0;i<10;i++{
data,err := allStream.Recv()
if err != nil{
fmt.Println("allStream sending goes wrong:"+err.Error())
break
}
fmt.Println(data)
}
}()
wg.Wait()

}