Goroutine 具有高效的任务调度
从现在开始,努力学习吧!本文《Goroutine 具有高效的任务调度》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!
我正在使用 golang 使用 goroutine 同时运行两个 websocket 客户端(一个用于私有数据,一个用于公共数据)。从表面上看,一切似乎都很顺利。两个客户端都接收从 websocket 服务器传输的数据。然而,我相信我可能设置错误,因为当我检查活动监视器时,我的程序始终有 500 - 1500 次空闲唤醒,并且使用了 >200% 的 cpu。对于像两个 websocket 客户端这样简单的事情来说,这似乎并不正常。
我已将代码放在片段中,因此需要阅读的内容较少(希望这使其更易于理解),但如果您需要完整的代码,我也可以发布。这是我的 main func 中运行 ws 客户端的代码
comms := make(chan os.signal, 1) signal.notify(comms, os.interrupt, syscall.sigterm) ctx := context.background() ctx, cancel := context.withcancel(ctx) var wg sync.waitgroup wg.add(1) go pubsocket.publisten(ctx, &wg, &activesubs, testing) wg.add(1) go privsocket.privlisten(ctx, &wg, &activesubs, testing) <- comms cancel() wg.wait()
这是客户端如何运行 go 例程的代码
func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) { defer wg.Done() for { select { case <- ctx.Done(): log.Println("closing public socket") socket.Close() return default: socket.OnTextMessage = func(message string, socket Socket) { log.Println(message) pubJsonDecoder(message, testing) //tradesParser(message); } } } } func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) { defer wg.Done() for { select { case <- ctx.Done(): log.Println("closing private socket") socket.Close() return default: socket.OnTextMessage = func(message string, socket Socket) { log.Println(message) } } } }
关于为什么空闲唤醒如此高有什么想法吗?我应该使用多线程而不是并发吗?预先感谢您的帮助!
正确答案
你在这里浪费了cpu(多余的循环):
for { // ... default: // high cpu usage here. } }
尝试这样的事情:
func (socket *socket) publisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) { defer wg.done() defer socket.close() socket.ontextmessage = func(message string, socket socket) { log.println(message) pubjsondecoder(message, testing) //tradesparser(message); } <-ctx.done() log.println("closing public socket") } func (socket *socket) privlisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) { defer wg.done() defer socket.close() socket.ontextmessage = func(message string, socket socket) { log.println(message) } <-ctx.done() log.println("closing private socket") }
这也可能有帮助:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go
tl/dr:websocket 很难:)
看起来您可能有几个旋转器。您将在 for - select 语句的默认情况下为 ontextmessage() 分配处理程序函数。如果没有其他情况准备就绪,则始终执行默认情况。因为在默认情况下没有任何东西会阻塞,所以 for 循环就会失去控制。像这样旋转的两个 goroutine 可能会固定 2 个核心。 websocket 是网络 io,这些 goroutine 可能会并行运行。这就是您看到 200% 利用率的原因。
看看 gorilla/websocket 库。我不会说它比任何其他 websocket 库更好或更差,我有很多使用它的经验。
https://github.com/gorilla/websocket
下面是我多次使用的实现。 它的设置方式是注册接收到特定消息时触发的处理函数。假设消息中的值之一是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。
打包服务器
上下文.go
package serverws import ( "errors" "fmt" "strings" "sync" ) // conncontext is the connection context to track a connected websocket user type conncontext struct { specialkey string supportgzip string userid string mu sync.mutex // websockets are not thread safe, we'll use a mutex to lock writes. } // hashkeyasctx returns a conncontext based on the hash provided func hashkeyasctx(hashkey string) (*conncontext, error) { values := strings.split(hashkey, ":") if len(values) != 3 { return nil, errors.new("invalid key received: " + hashkey) } return &conncontext{values[0], values[1], values[2], sync.mutex{}}, nil } // ashashkey returns the hash key for a given connection context conncontext func (ctx *conncontext) ashashkey() string { return strings.join([]string{ctx.specialkey, ctx.supportgzip, ctx.userid}, ":") } // string returns a string of the hash of a given connection context conncontext func (ctx *conncontext) string() string { return fmt.sprint("specialkey: ", ctx.specialkey, " gzip ", ctx.supportgzip, " auth ", ctx.userid) }
wshandler.go
package serverws import ( "encoding/json" "errors" "fmt" "net/http" "strings" "sync" "time" "github.com/gorilla/websocket" "github.com/rs/zerolog/log" ) var ( receivefunctionmap = make(map[string]receiveobjectfunc) ctxhashmap sync.map ) // receiveobjectfunc is a function signature for a websocket request handler type receiveobjectfunc func(conn *websocket.conn, ctx *conncontext, t map[string]interface{}) // websockethandler does what it says, handles websockets (makes them easier for us to deal with) type websockethandler struct { wsupgrader websocket.upgrader } // websocketmessage that is sent over a websocket. messages must have a conversation type so the server and the client js know // what is being discussed and what signals to raise on the server and the client. // the "notification" message instructs the client to display an alert popup. type websocketmessage struct { messagetype string `json:"type"` message interface{} `json:"message"` } // newwebsockethandler sets up a new websocket. func newwebsockethandler() *websockethandler { wsh := new(websockethandler) wsh.wsupgrader = websocket.upgrader{ readbuffersize: 4096, writebuffersize: 4096, } return wsh } // registermessagetype sets up an event bus for a message type. when messages arrive from the client that match messagetypename, // the function you wrote to handle that message is then called. func (wsh *websockethandler) registermessagetype(messagetypename string, f receiveobjectfunc) { receivefunctionmap[messagetypename] = f } // onmessage triggers when the underlying websocket has received a message. func (wsh *websockethandler) onmessage(conn *websocket.conn, ctx *conncontext, msg []byte, msgtype int) { // handling text messages or binary messages. binary is usually some gzip text. if msgtype == websocket.textmessage { wsh.processincomingtextmsg(conn, ctx, msg) } if msgtype == websocket.binarymessage { } } // onopen triggers when the underlying websocket has established a connection. func (wsh *websockethandler) onopen(conn *websocket.conn, r *http.request) (ctx *conncontext, err error) { //user, err := gothic.getfromsession("id", r) user := "testuser" if err := r.parseform(); err != nil { return nil, errors.new("parameter check error") } specialkey := r.formvalue("specialkey") supportgzip := r.formvalue("support_gzip") if user != "" && err == nil { ctx = &conncontext{specialkey, supportgzip, user, sync.mutex{}} } else { ctx = &conncontext{specialkey, supportgzip, "", sync.mutex{}} } keystring := ctx.ashashkey() if oldconn, ok := ctxhashmap.load(keystring); ok { wsh.onclose(oldconn.(*websocket.conn), ctx) oldconn.(*websocket.conn).close() } ctxhashmap.store(keystring, conn) return ctx, nil } // onclose triggers when the underlying websocket has been closed down func (wsh *websockethandler) onclose(conn *websocket.conn, ctx *conncontext) { //log.info().msg(("client close itself as " + ctx.string())) wsh.closeconnwithctx(ctx) } // onerror triggers when a websocket connection breaks func (wsh *websockethandler) onerror(errmsg string) { //log.error().msg(errmsg) } // handleconn happens when a user connects to us at the listening point. we ask // the user to authenticate and then send the required http upgrade return code. func (wsh *websockethandler) handleconn(w http.responsewriter, r *http.request) { user := "" if r.url.path == "/websocket" { user = "testuser" // authenticate however you want if user == "" { fmt.println("unauthenticated user tried to connect to websocket from ", r.header.get("x-forwarded-for")) return } } // don't do this. you need to check the origin, but this is here as a place holder wsh.wsupgrader.checkorigin = func(r *http.request) bool { return true } conn, err := wsh.wsupgrader.upgrade(w, r, nil) if err != nil { log.error().msg("failed to set websocket upgrade: " + err.error()) return } defer conn.close() ctx, err := wsh.onopen(conn, r) if err != nil { log.error().msg("open connection failed " + err.error() + r.url.rawquery) if user != "" { ctx.userid = user } return } if user != "" { ctx.userid = user } conn.setpinghandler(func(message string) error { conn.writecontrol(websocket.pongmessage, []byte(message), time.now().add(time.second)) return nil }) // message pump for the underlying websocket connection for { t, msg, err := conn.readmessage() if err != nil { // read errors are when the user closes the tab. ignore. wsh.onclose(conn, ctx) return } switch t { case websocket.textmessage, websocket.binarymessage: wsh.onmessage(conn, ctx, msg, t) case websocket.closemessage: wsh.onclose(conn, ctx) return case websocket.pingmessage: case websocket.pongmessage: } } } func (wsh *websockethandler) closeconnwithctx(ctx *conncontext) { keystring := ctx.ashashkey() ctxhashmap.delete(keystring) } func (wsh *websockethandler) processincomingtextmsg(conn *websocket.conn, ctx *conncontext, msg []byte) { //log.debug().msg("client said " + string(msg)) data := websocketmessage{} // try to turn this into data err := json.unmarshal(msg, &data) // and try to get at the data underneath var raw = make(map[string]interface{}) terr := json.unmarshal(msg, &raw) if err == nil { // what kind of message is this? if receivefunctionmap[data.messagetype] != nil { // we'll try to cast this message and call the handler for it if terr == nil { if v, ok := raw["message"].(map[string]interface{}); ok { receivefunctionmap[data.messagetype](conn, ctx, v) } else { log.debug().msg("nonsense sent over the websocket.") } } else { log.debug().msg("nonsense sent over the websocket.") } } } else { // received garbage from the transmitter. } } // sendjsontosocket sends a specific message to a specific websocket func (wsh *websockethandler) sendjsontosocket(socketid string, msg interface{}) { fields := strings.split(socketid, ":") message, _ := json.marshal(msg) ctxhashmap.range(func(key interface{}, value interface{}) bool { if ctx, err := hashkeyasctx(key.(string)); err != nil { wsh.onerror(err.error()) } else { if ctx.specialkey == fields[0] { ctx.mu.lock() if value != nil { err = value.(*websocket.conn).writemessage(websocket.textmessage, message) } ctx.mu.unlock() } if err != nil { ctx.mu.lock() // we'll lock here even though we're going to destroy this wsh.onclose(value.(*websocket.conn), ctx) value.(*websocket.conn).close() ctxhashmap.delete(key) // remove the websocket immediately //wsh.onerror("write err to user " + key.(string) + " err: " + err.error()) } } return true }) }
封装wsocket
类型.go
package wsocket // acknowledgement is for acking simple messages and sending errors type acknowledgement struct { responseid string `json:"responseid"` status string `json:"status"` ipaddress string `json:"ipaddress"` errortext string `json:"errortext"` }
wsocket.go
package wsocket import ( "fmt" server "project/serverws" "project/utils" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" // "github.com/mitchellh/mapstructure" "github.com/inconshreveable/log15" ) var ( websocket *server.websockethandler // so other packages can send out websocket messages websocketlocation string log log15.logger = log15.new("package", "wsocket" ) func setupwebsockets(r *gin.engine, socket *server.websockethandler, debug_mode bool) { websocket = socket websocketlocation = "example.mydomain.com" //websocketlocation = "example.mydomain.com" r.get("/websocket", func(c *gin.context) { socket.handleconn(c.writer, c.request) }) socket.registermessagetype("hello", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) { response := acknowledgement{ responseid: "hello", status: fmt.sprintf("ok/%v", ctx.authid), ipaddress: conn.remoteaddr().string(), } // mapstructure.decode(data, &request) -- used if we wanted to read what was fed in socket.sendjsontosocket(ctx.ashashkey(), &response) }) socket.registermessagetype("start-job", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) { response := acknowledgement{ responseid: "starting_job", status: fmt.sprintf("%s is being dialed.", data["did"]), ipaddress: conn.remoteaddr().string(), } // mapstructure.decode(data, &request) -- used if we wanted to read what was fed in to a struct. socket.sendjsontosocket(ctx.ashashkey(), &response) })
此实现是针对 web 应用程序的。这是 javascript 客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所要做的通信就是定义包含与下面 switch 中的情况匹配的responseid 的对象/结构,它基本上是一个长 switch 语句,将其序列化并将其发送到另一端,对方就会确认。我有一些版本在多个生产环境中运行。
websocket.js
$(() => { function wsMessage(object) { switch (object.responseId) { case "Hello": // HELLO! :-) console.log("Heartbeat received, we're connected."); break; case "Notification": if (object.errortext != "") { $.notify({ // options message: '' + object.errortext + ' ', }, { // settings type: 'danger', offset: 50, placement: { align: 'center', } }); } else { $.notify({ // options message: '' + object.status + ' ', }, { // settings type: 'success', offset: 50, placement: { align: 'center', } }); } break; } } $(document).ready(function () { function heartbeat() { if (!websocket) return; if (websocket.readyState !== 1) return; websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }"); setTimeout(heartbeat, 24000); } //TODO: CHANGE TO WSS once tls is enabled. function wireUpWebsocket() { websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0'); websocket.onopen = function (event) { console.log("Websocket connected."); heartbeat(); //if it exists if (typeof (wsReady) !== 'undefined') { //execute it wsReady(); } }; websocket.onerror = function (event) { console.log("WEBSOCKET ERROR " + event.data); }; websocket.onmessage = function (event) { wsMessage(JSON.parse(event.data)); }; websocket.onclose = function () { // Don't close! // Replace key console.log("WEBSOCKET CLOSED"); WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); websocketreconnects++; if (websocketreconnects > 30) { // Too much, time to bounce // location.reload(); Don't reload the page anymore, just re-connect. } setTimeout(function () { wireUpWebsocket(); }, 3000); }; } wireUpWebsocket(); }); }); function getCookie(name) { var value = "; " + document.cookie; var parts = value.split("; " + name + "="); if (parts.length == 2) return parts.pop().split(";").shift(); } function setCookie(cname, cvalue, exdays) { var d = new Date(); d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000)); var expires = "expires=" + d.toUTCString(); document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/"; }
在无限循环中一遍又一遍地分配处理函数肯定是行不通的。
https://github.com/gorilla/websocket
今天关于《Goroutine 具有高效的任务调度》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

- 上一篇
- Go 的 http.Client 的实例化是否可重复使用的原因是什么?

- 下一篇
- github.com/aws/aws-sdk-go/aws未提供必要模块
-
- Golang · Go问答 | 1年前 |
- 在读取缓冲通道中的内容之前退出
- 139浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 戈兰岛的全球 GOPRIVATE 设置
- 204浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将结构作为参数传递给 xml-rpc
- 325浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何用golang获得小数点以下两位长度?
- 477浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何通过 client-go 和 golang 检索 Kubernetes 指标
- 486浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将多个“参数”映射到单个可变参数的习惯用法
- 439浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将 HTTP 响应正文写入文件后出现 EOF 错误
- 357浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 结构中映射的匿名列表的“复合文字中缺少类型”
- 352浏览 收藏
-
- Golang · Go问答 | 1年前 |
- NATS Jetstream 的性能
- 101浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将复杂的字符串输入转换为mapstring?
- 440浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 相当于GoLang中Java将Object作为方法参数传递
- 212浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何确保所有 goroutine 在没有 time.Sleep 的情况下终止?
- 143浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- 毕业宝AIGC检测
- 毕业宝AIGC检测是“毕业宝”平台的AI生成内容检测工具,专为学术场景设计,帮助用户初步判断文本的原创性和AI参与度。通过与知网、维普数据库联动,提供全面检测结果,适用于学生、研究者、教育工作者及内容创作者。
- 18次使用
-
- AI Make Song
- AI Make Song是一款革命性的AI音乐生成平台,提供文本和歌词转音乐的双模式输入,支持多语言及商业友好版权体系。无论你是音乐爱好者、内容创作者还是广告从业者,都能在这里实现“用文字创造音乐”的梦想。平台已生成超百万首原创音乐,覆盖全球20个国家,用户满意度高达95%。
- 29次使用
-
- SongGenerator
- 探索SongGenerator.io,零门槛、全免费的AI音乐生成器。无需注册,通过简单文本输入即可生成多风格音乐,适用于内容创作者、音乐爱好者和教育工作者。日均生成量超10万次,全球50国家用户信赖。
- 27次使用
-
- BeArt AI换脸
- 探索BeArt AI换脸工具,免费在线使用,无需下载软件,即可对照片、视频和GIF进行高质量换脸。体验快速、流畅、无水印的换脸效果,适用于娱乐创作、影视制作、广告营销等多种场景。
- 30次使用
-
- 协启动
- SEO摘要协启动(XieQiDong Chatbot)是由深圳协启动传媒有限公司运营的AI智能服务平台,提供多模型支持的对话服务、文档处理和图像生成工具,旨在提升用户内容创作与信息处理效率。平台支持订阅制付费,适合个人及企业用户,满足日常聊天、文案生成、学习辅助等需求。
- 32次使用
-
- GoLand调式动态执行代码
- 2023-01-13 502浏览
-
- 用Nginx反向代理部署go写的网站。
- 2023-01-17 502浏览
-
- Golang取得代码运行时间的问题
- 2023-02-24 501浏览
-
- 请问 go 代码如何实现在代码改动后不需要Ctrl+c,然后重新 go run *.go 文件?
- 2023-01-08 501浏览
-
- 如何从同一个 io.Reader 读取多次
- 2023-04-11 501浏览