gRPC 流:最佳实践和性能见解
大家好,我们又见面了啊~本文《gRPC 流:最佳实践和性能见解》的内容中将会涉及到等等。如果你正在学习Golang相关知识,欢迎关注我,以后会给大家带来更多Golang相关文章,希望我们能一起进步!下面就开始本文的正式内容~

介绍
grpc 流允许 protobuf 消息从客户端流式传输到服务器、从服务器流式传输到客户端,或者双向流式传输。
这一强大的功能可用于构建实时应用程序,例如聊天应用程序、实时监控仪表板等。
在本文中,我们将探讨如何正确使用 grpc 流。
先决条件
- grpc基础知识
- go 编程语言的基础知识(示例代码是用 go 编写的,但这个概念也可以应用于其他语言)
- 代码示例可在 github 上获取
良好实践
让我们检查一下使用 grpc 流的良好实践:
使用一元请求进行一元请求
一个常见的错误是对一元请求使用流式传输。
例如,考虑以下 grpc 服务定义:
service myservice {
rpc getsomething (somethingrequest) returns (stream somethingresponse) {}
}
如果客户端只需要发送一个请求并接收一个响应,
您不需要使用流式传输。相反,我们可以按如下方式定义服务:
service myservice {
rpc getsomething (somethingrequest) returns (somethingresponse) {}
}
通过对一元请求使用流式传输,我们增加了不必要的复杂性
到代码,这可能会使其更难理解和维护,而不是
从使用流媒体中获得任何好处。
比较一元请求和流请求的 golang 代码示例:
一元请求:
type somethingunary struct {
pb.unimplementedsomethingunaryserver
}
func (s *somethingunary) getsomething(ctx context.context, req *pb.somethingrequest) (*pb.somethingresponse, error) {
return &pb.somethingresponse{
message: "hello " + req.name,
}, nil
}
func testsomethingunary(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registersomethingunaryserver(s, &somethingunary{})
})
client := pb.newsomethingunaryclient(conn)
response, err := client.getsomething(
context.background(),
&pb.somethingrequest{
name: "test",
},
)
if err != nil {
t.fatalf("failed to get something: %v", err)
}
if response.message != "hello test" {
t.errorf("unexpected response: %v", response.message)
}
}
流式一元请求:
type somethingstream struct {
pb.unimplementedsomethingstreamserver
}
func (s *somethingstream) getsomething(req *pb.somethingrequest, stream pb.somethingstream_getsomethingserver) error {
if err := stream.send(&pb.somethingresponse{
message: "hello " + req.name,
}); err != nil {
return err
}
return nil
}
func testsomethingstream(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registersomethingstreamserver(s, &somethingstream{})
})
client := pb.newsomethingstreamclient(conn)
stream, err := client.getsomething(
context.background(),
&pb.somethingrequest{
name: "test",
},
)
if err != nil {
t.fatalf("failed to get something stream: %v", err)
}
response, err := stream.recv()
if err != nil {
t.fatalf("failed to receive response: %v", err)
}
if response.message != "hello test" {
t.errorf("unexpected response: %v", response.message)
}
}
我们可以看到,一元请求的代码更简单,更容易理解
比流请求的代码。
如果可以的话,一次发送多个文档
让我们比较一下这两个服务定义:
service bookstore {
rpc listbooks(listbooksrequest) returns (stream book) {}
}
service bookstorebatch {
rpc listbooks(listbooksrequest) returns (stream listbooksresponse) {}
}
message listbooksresponse {
repeated book books = 1;
}
bookstore 一次流式传输一本书,而 bookstorebatch 同时流式传输多本书。
如果客户端需要列出所有书籍,使用bookstorebatch 效率更高
因为它减少了客户端和服务器之间的往返次数。
让我们看看 bookstore 和 bookstorebatch 的 golang 代码示例:
书店:
type bookstore struct {
pb.unimplementedbookstoreserver
}
func (s *bookstore) listbooks(req *pb.listbooksrequest, stream pb.bookstore_listbooksserver) error {
for _, b := range bookstoredata {
if b.author == req.author {
if err := stream.send(&pb.book{
title: b.title,
author: b.author,
publicationyear: int32(b.publicationyear),
genre: b.genre,
}); err != nil {
return err
}
}
}
return nil
}
func testbookstore_listbooks(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registerbookstoreserver(s, &bookstore{})
})
client := pb.newbookstoreclient(conn)
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
t.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
book, err := stream.recv()
if err != nil {
break
}
books = append(books, book)
}
if len(books) != charlesdickensbooks {
t.errorf("unexpected number of books: %d", len(books))
}
}
书店批次:
type bookstorebatch struct {
pb.unimplementedbookstorebatchserver
}
func (s *bookstorebatch) listbooks(req *pb.listbooksrequest, stream pb.bookstorebatch_listbooksserver) error {
const batchsize = 10
books := make([]*pb.book, 0, batchsize)
for _, b := range bookstoredata {
if b.author == req.author {
books = append(books, &pb.book{
title: b.title,
author: b.author,
publicationyear: int32(b.publicationyear),
genre: b.genre,
})
if len(books) == batchsize {
if err := stream.send(&pb.listbooksresponse{
books: books,
}); err != nil {
return err
}
books = books[:0]
}
}
}
if len(books) > 0 {
if err := stream.send(&pb.listbooksresponse{
books: books,
}); err != nil {
return nil
}
}
return nil
}
func testbookstorebatch_listbooks(t *testing.t) {
conn := newserver(t, func(s grpc.serviceregistrar) {
pb.registerbookstorebatchserver(s, &bookstorebatch{})
})
client := pb.newbookstorebatchclient(conn)
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
t.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
response, err := stream.recv()
if err != nil {
break
}
books = append(books, response.books...)
}
if len(books) != charlesdickensbooks {
t.errorf("unexpected number of books: %d", len(books))
}
}
从上面的代码中,需要明确哪一个更好。
让我们运行一个基准测试来看看差异:
书店基准:
func benchmarkbookstore_listbooks(b *testing.b) {
conn := newserver(b, func(s grpc.serviceregistrar) {
pb.registerbookstoreserver(s, &bookstore{})
})
client := pb.newbookstoreclient(conn)
var benchinnerbooks []*pb.book
b.resettimer()
for i := 0; i < b.n; i++ {
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
b.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
book, err := stream.recv()
if err != nil {
break
}
books = append(books, book)
}
benchinnerbooks = books
}
benchbooks = benchinnerbooks
}
bookstorebatch 基准:
func benchmarkbookstorebatch_listbooks(b *testing.b) {
conn := newserver(b, func(s grpc.serviceregistrar) {
pb.registerbookstorebatchserver(s, &bookstorebatch{})
})
client := pb.newbookstorebatchclient(conn)
var benchinnerbooks []*pb.book
b.resettimer()
for i := 0; i < b.n; i++ {
stream, err := client.listbooks(
context.background(),
&pb.listbooksrequest{
author: charlesdickens,
},
)
if err != nil {
b.fatalf("failed to list books: %v", err)
}
books := []*pb.book{}
for {
response, err := stream.recv()
if err != nil {
break
}
books = append(books, response.books...)
}
benchinnerbooks = books
}
benchbooks = benchinnerbooks
}
基准测试结果:
benchmarkbookstore_listbooks benchmarkbookstore_listbooks-12 732 1647454 ns/op 85974 b/op 1989 allocs/op benchmarkbookstorebatch_listbooks benchmarkbookstorebatch_listbooks-12 1202 937491 ns/op 61098 b/op 853 allocs/op
多么大的进步啊! bookstorebatch 比 bookstore 快 1.75 倍。
但是为什么 bookstorebatch 比 bookstore 快?
服务器每次向客户端发送消息流.send(),都需要
对消息进行编码并通过网络发送。通过发送多个文件
我们立即减少了服务器需要编码和发送的次数
消息,不仅提高了服务器的性能,还提高了
对于需要解码消息的客户端。
在上面的例子中,批量大小设置为10,但客户端可以根据网络情况和文档大小进行调整。
使用双向流来控制流量
书店示例返回所有书籍并完成流,但如果客户端
需要实时观察事件(例如传感器),使用双向
直播是正确的选择。
双向流有点棘手,因为客户端和服务器都
可以同时发送和接收消息。希望 golang 能让这一切变得简单
像这样处理并发。
如前所述,传感器是双向流的一个很好的例子。
监视功能允许客户端决定监视和请求哪些传感器
如果需要的话,当前值。
让我们看一下下面的protobuf定义:
service sensor {
rpc watch(stream watchrequest) returns (stream watchresponse) {}
}
message watchrequest {
oneof request {
watchcreaterequest create_request = 1;
watchcancelrequest cancel_request = 2;
watchnowrequest now_request = 3;
}
}
message watchcreaterequest {
// sensor_id contains the sensor id to watch.
string sensor_id = 1;
}
message watchcancelrequest {
// sensor_id contains the sensor id to cancel.
string sensor_id = 1;
}
message watchnowrequest {
// sensor_id contains the sensor id to get the current value.
string sensor_id = 1;
}
message watchresponse {
// sensor_id contains the sensor id for the current response.
string sensor_id = 1;
// created is true if the watch was created successfully.
bool created = 2;
// canceleted is true if the watch was canceled successfully or if the creation failed.
bool canceleted = 3;
// error contains the error message if something went wrong.
string error = 4;
// timestamp contains the timestamp of the value.
google.protobuf.timestamp timestamp = 5;
// value contains the value of the sensor.
int32 value = 6;
}
请求消息不仅仅是消息流,更是一条可以
包含不同类型的请求。 oneof 指令允许我们定义一个
只能包含指定类型之一的字段。
传感器的 golang 代码将被忽略,但您可以在这里找到它
serverstream 包装流和传感器数据,使其更易于使用。
type serverstream struct {
s *sensorservice // service
stream pb.sensor_watchserver // stream
sendch chan *pb.watchresponse // control channel
sensorch chan sensordata // data channel
sensorwatch map[string]int // map of sensor id to watch id
}
如前所述,服务器可以同时发送和接收消息,一个
函数将处理传入的消息,另一个函数将处理
传出消息。
接收消息:
func (ss *serverstream) recvloop() error {
defer ss.close()
for {
req, err := ss.stream.recv()
if errors.is(err, io.eof) {
return nil
}
if err != nil {
return err
}
switch req := req.request.(type) {
case *pb.watchrequest_createrequest:
// ignore validation (check the full code)
// create a channel to send data to the client
id := sensor.watch(ss.sensorch)
ss.sensorwatch[sensorid] = id
// send created message
ss.sendch <- &pb.watchresponse{
sensorid: sensorid,
created: true,
}
case *pb.watchrequest_cancelrequest:
// ignore validation (check the full code)
// cancel the watch
ss.s.sensors[sensorid].cancel(id)
delete(ss.sensorwatch, sensorid)
ss.sendch <- &pb.watchresponse{
sensorid: sensorid,
canceleted: true,
}
case *pb.watchrequest_nowrequest:
// ignore validation (check the full code)
// send current value
ss.sendch <- &pb.watchresponse{
sensorid: sensorid,
timestamp: timestamppb.now(),
value: int32(sensor.read()),
}
}
}
}
switch语句用于处理不同类型的请求并做出决定
如何处理每个请求。只保留recvloop 函数很重要
读取消息但不向客户端发送消息,因此我们有 sendloop
将从控制通道读取消息并将其发送到客户端。
发送消息:
func (ss *serverstream) sendloop() {
for {
select {
case m, ok := <-ss.sendch:
if !ok {
return
}
// send message
if err := ss.stream.send(m); err != nil {
return
}
case data, ok := <-ss.sensorch:
if !ok {
return
}
// send data
if err := ss.stream.send(&pb.watchresponse{
sensorid: data.id,
timestamp: timestamppb.new(data.time),
value: int32(data.val),
}); err != nil {
return
}
case <-ss.stream.context().done():
return
}
}
}
sendloop函数读取控制通道和数据通道并发送
发送给客户端的消息。如果流关闭,该函数将返回。
最后,传感器服务的快乐路径测试:
func TestSensor(t *testing.T) {
conn := newServer(t, func(s grpc.ServiceRegistrar) {
pb.RegisterSensorServer(s, &sensorService{
sensors: newSensors(),
})
})
client := pb.NewSensorClient(conn)
stream, err := client.Watch(context.Background())
if err != nil {
t.Fatalf("failed to watch: %v", err)
}
response := make(chan *pb.WatchResponse)
// Go routine to read from the stream
go func() {
defer close(response)
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
return
}
if err != nil {
return
}
response <- resp
}
}()
createRequest(t, stream, "temp")
waitUntilCreated(t, response, "temp")
waitForSensorData(t, response, "temp")
createRequest(t, stream, "pres")
waitUntilCreated(t, response, "pres")
waitForSensorData(t, response, "pres")
waitForSensorData(t, response, "temp")
waitForSensorData(t, response, "pres")
// invalid sensor
createRequest(t, stream, "invalid")
waitUntilCanceled(t, response, "invalid")
nowRequest(t, stream, "light")
waitForSensorData(t, response, "light")
// Wait for 2 seconds to make sure we don't receive any data for light
waitForNoSensorData(t, response, "light", 2*time.Second)
cancelRequest(t, stream, "temp")
waitUntilCanceled(t, response, "temp")
waitForSensorData(t, response, "pres")
// Wait for 2 seconds to make sure we don't receive any data for temp
waitForNoSensorData(t, response, "temp", 2*time.Second)
err = stream.CloseSend()
if err != nil {
t.Fatalf("failed to close send: %v", err)
}
}
从上面的测试中我们可以看到客户端可以创建、取消、获取当前
传感器的值。客户端还可以同时观看多个传感器。
挑战自己
- 使用 grpc 流实现聊天应用程序。
- 修改传感器服务以一次发送多个值以节省往返次数。
- 嗅探网络流量以查看一元请求和流式请求之间的区别。
结论
grpc 流是一种用于构建实时应用程序的多功能且强大的工具。
通过遵循最佳实践,例如仅在必要时使用流式传输、有效地批处理数据以及明智地利用双向流式传输,开发人员可以最大限度地提高性能
并保持代码简单性。
虽然 grpc 流式传输带来了复杂性,但其好处远远超过了挑战
当深思熟虑地应用时。
保持联系
如果您有任何问题或反馈,请随时在 linkedin 上与我联系。
到这里,我们也就讲完了《gRPC 流:最佳实践和性能见解》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!
如何避免使用rem计算造成页面变形?
- 上一篇
- 如何避免使用rem计算造成页面变形?
- 下一篇
- DataTable数据显示数量设置失效,问题出在哪?
-
- Golang · Go教程 | 52分钟前 |
- Golangreflect动态赋值方法详解
- 299浏览 收藏
-
- Golang · Go教程 | 53分钟前 |
- Golang标准库与依赖安装详解
- 350浏览 收藏
-
- Golang · Go教程 | 56分钟前 |
- Golang微服务熔断降级实现详解
- 190浏览 收藏
-
- Golang · Go教程 | 59分钟前 |
- Go语言指针操作:*的多义与隐式&
- 325浏览 收藏
-
- Golang · Go教程 | 1小时前 |
- Golang自动扩容策略怎么实现
- 145浏览 收藏
-
- Golang · Go教程 | 1小时前 |
- Golang指针与闭包关系详解
- 272浏览 收藏
-
- Golang · Go教程 | 1小时前 |
- Golang自定义错误详解与教程
- 110浏览 收藏
-
- Golang · Go教程 | 1小时前 |
- GolangJSON读写实战教程详解
- 289浏览 收藏
-
- Golang · Go教程 | 1小时前 |
- gorun支持从标准输入执行代码吗?
- 408浏览 收藏
-
- Golang · Go教程 | 1小时前 |
- Golang环境搭建与依赖安装指南
- 368浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3188次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3400次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3431次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4537次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3809次使用
-
- Golangmap实践及实现原理解析
- 2022-12-28 505浏览
-
- go和golang的区别解析:帮你选择合适的编程语言
- 2023-12-29 503浏览
-
- 试了下Golang实现try catch的方法
- 2022-12-27 502浏览
-
- 如何在go语言中实现高并发的服务器架构
- 2023-08-27 502浏览
-
- 提升工作效率的Go语言项目开发经验分享
- 2023-11-03 502浏览

