protobuf
Protocol Buffer:轻量高效结构化存储数据
使用protobuf之前必须按格式写一个文件,根据这个文件生成各种脚本
以python为例:
1 2 3 4 5 6 7 8 9
|
syntax = "proto3";
message HelloRequest { string name = 1; string date = 2; }
|
命令行cd进文件夹后执行:
1
| python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I. hello.proto
|
生成:
1 2 3 4
| ├─protobuf3_demo │ │ hello.proto │ │ hello_pb2.py │ │ hello_pb2_grpc.py
|
记得改hello_pb2_grpc.py中import路径,否则会报错
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| from protobuf3_demo import hello_pb2
request = hello_pb2.HelloRequest()
request.name = "hhhh" request.date = "2021.10.2" req_str = request.SerializeToString() print(req_str)
request2 = hello_pb2.HelloRequest()
request2.ParseFromString(req_str) print(request2)
|
python grpc helloworld
首先写.proto文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| syntax = "proto3";
package proto;
service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} }
message HelloRequest { string name = 1; }
message HelloReply { string message = 1; }
|
执行命令并改import路径
完成操作后,开始写server和client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import grpc from concurrent import futures from proto import hello_pb2,hello_pb2_grpc
class Greeter(hello_pb2_grpc.GreeterServicer): def SayHello(self, request, context): return hello_pb2.HelloReply(message=f"hello,{request.name}")
if __name__ == "__main__": server = grpc.server(futures.ThreadPoolExecutor(5)) hello_pb2_grpc.add_GreeterServicer_to_server(servicer=Greeter(),server=server) server.add_insecure_port('127.0.0.1:50000') server.start() server.wait_for_termination()
|
1 2 3 4 5 6 7 8
| import grpc from python_grpc_helloworld.proto import hello_pb2_grpc,hello_pb2
with grpc.insecure_channel('127.0.0.1:50000') as channel: stub = hello_pb2_grpc.GreeterStub(channel) rsp: hello_pb2.HelloReply = stub.SayHello(hello_pb2.HelloRequest(name="lily"))
print(rsp.message)
|
python concurrent.futures
基本组成:
concurrent.futures.Executor类:虚拟基类,ThreadPoolExecutor是其子类
submit(function, argument):调度函数执行,argument为参数
map(function,argument):同上,但是以异步的方式调度函数
shutdown(Wait=True):发出让执行者释放所有资源的信号,wait为True时等待任务完成,False时立即返回
concurrent.futures.Future:封装了异步执行,在executor.submit()时创建
concurrent.futures.ThreadPoolExecutor类
1
| concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
|
max_workers:最大线程数;若为None或没有指定,默认为机器处理器数量
这是一个线程池对象
go grpc helloworld
在go中写.proto文件,基本和python一样(除了option那一行,一定要完全一样)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| syntax = "proto3";
package proto; option go_package="./;proto";
service Greeter { rpc SayHello (HelloRequest) returns (HelloReply) {} }
message HelloRequest { string name = 1; }
message HelloReply { string message = 1; }
|
cd到目标文件下:
执行
1
| protoc --go_out=plugins=grpc:./ ./hello.proto
|
生成一个脚本文件
之后编写代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package main
import ( "context" "net"
"google.golang.org/grpc"
"awesomeProject/grpc_test/proto" )
type Server struct{}
func (s *Server) SayHello(ctx context.Context, request *proto.HelloRequest) (*proto.HelloReply,error){ return &proto.HelloReply{ Message: "Hello" + request.Name, },nil }
func main(){ server := grpc.NewServer() proto.RegisterGreeterServer(server,&Server{}) listen,err := net.Listen("tcp","127.0.0.1:50000") if err != nil{ panic("fail to listen:" + err.Error()) } err = server.Serve(listen) if err != nil{ panic("fail to start:" + err.Error()) } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package main
import ( "awesomeProject/grpc_test/proto" "context" "fmt" "google.golang.org/grpc" )
func main(){ conn, err := grpc.Dial("127.0.0.1:50000", grpc.WithInsecure()) if err != nil{ panic(err) } defer conn.Close()
c := proto.NewGreeterClient(conn) resp, err := c.SayHello(context.Background(), &proto.HelloRequest{Name:"Lily"}) if err != nil{ panic(err) } fmt.Println(resp.Message) }
|
grpc stream
简单模式(simple RPC)
客户端发起一个请求,服务端返回一个请求
服务端数据流模式(Server-side streaming RPC)
客户端发起一个请求,服务端返回一段连续的数据流(典型如客户端向服务端发送一个请求,服务端源源不断的返回数据)
客户端数据流模式(CLient-side streaming RPC)
客户端源源不断的发送数据,发送结束后,客户端返回一个响应(如物联网终端向服务器报送数据)
双向数据流模式(Bidirectional streaming RPC)
实现实时交互(如聊天)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| syntax = "proto3";
option go_package = "./;proto";
service Greeter{ rpc GetStream(StreamReqData) returns (stream StreamResData); rpc PutStream(stream StreamReqData) returns (StreamResData); rpc AllStream(stream StreamReqData) returns (stream StreamResData); }
message StreamReqData { string data = 1; }
message StreamResData { string data = 1; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| package main
import ( "awesomeProject/stream_grpc_test/proto" "errors" "fmt" "google.golang.org/grpc" "log" "net" "sync" "time" )
const PORT = ":50000" type Server struct {}
func (s *Server)GetStream(req *proto.StreamReqData,serveStream proto.Greeter_GetStreamServer) error{ for i:=0;i<10;i++ { err := serveStream.Send(&proto.StreamResData{ Data: time.Now().String(), }) if err != nil{ panic(err.Error()) } time.Sleep(time.Second) } return nil }
func (s *Server)PutStream(clientStream proto.Greeter_PutStreamServer) error{ for { a,err := clientStream.Recv() if err != nil{ return errors.New("fail to receive data:"+err.Error()) } log.Println(a.Data) } }
func (s *Server)AllStream(allStream proto.Greeter_AllStreamServer) error{ wg := sync.WaitGroup{} wg.Add(2) go func(){ defer wg.Done() for i:=0;i<10;i++{ err := allStream.Send(&proto.StreamResData{Data: time.Now().String()}) if err != nil{ fmt.Println("allStream sending goes wrong:"+err.Error()) } time.Sleep(time.Second) } }()
go func(){ defer wg.Done() for i:=0;i<10;i++{ data,err := allStream.Recv() if err != nil{ fmt.Println("allStream sending goes wrong:"+err.Error()) break } fmt.Println(data) } }() wg.Wait() return nil }
func main(){ server := grpc.NewServer() proto.RegisterGreeterServer(server,&Server{}) listen,err := net.Listen("tcp",PORT) if err != nil{ panic(err.Error()) }
err = server.Serve(listen) if err != nil{ panic(err.Error()) }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| package main
import ( "awesomeProject/stream_grpc_test/proto" "context" "fmt" "google.golang.org/grpc" "log" "sync" "time" )
const ADDR = "127.0.0.1:50000"
func main(){ conn, err := grpc.Dial(ADDR,grpc.WithInsecure()) if err != nil{ panic(err.Error()) } defer conn.Close()
c := proto.NewGreeterClient(conn) res,err := c.GetStream(context.Background(),&proto.StreamReqData{Data: "Hello,Lily!"}) if err != nil{ log.Fatalln("fail to send message:"+err.Error()) }
for { a,err := res.Recv() if err != nil{ log.Println("fail to receive data:"+err.Error()) break } log.Println(a) }
putStream,err := c.PutStream(context.Background()) for i:=0;i<10;i++{ err = putStream.Send(&proto.StreamReqData{Data: time.Now().String()}) if err != nil{ log.Println("fail to send stream:"+err.Error()) } time.Sleep(time.Second) }
allStream,err := c.AllStream(context.Background()) wg := sync.WaitGroup{} wg.Add(2) go func(){ defer wg.Done() for i:=0;i<10;i++{ err := allStream.Send(&proto.StreamReqData{Data: time.Now().String()}) if err != nil{ fmt.Println("allStream sending goes wrong:"+err.Error()) } time.Sleep(time.Second) } }()
go func(){ defer wg.Done() for i:=0;i<10;i++{ data,err := allStream.Recv() if err != nil{ fmt.Println("allStream sending goes wrong:"+err.Error()) break } fmt.Println(data) } }() wg.Wait()
}
|