Goroutine 具有高效的任务调度
从现在开始,努力学习吧!本文《Goroutine 具有高效的任务调度》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!
我正在使用 golang 使用 goroutine 同时运行两个 websocket 客户端(一个用于私有数据,一个用于公共数据)。从表面上看,一切似乎都很顺利。两个客户端都接收从 websocket 服务器传输的数据。然而,我相信我可能设置错误,因为当我检查活动监视器时,我的程序始终有 500 - 1500 次空闲唤醒,并且使用了 >200% 的 cpu。对于像两个 websocket 客户端这样简单的事情来说,这似乎并不正常。
我已将代码放在片段中,因此需要阅读的内容较少(希望这使其更易于理解),但如果您需要完整的代码,我也可以发布。这是我的 main func 中运行 ws 客户端的代码
comms := make(chan os.signal, 1) signal.notify(comms, os.interrupt, syscall.sigterm) ctx := context.background() ctx, cancel := context.withcancel(ctx) var wg sync.waitgroup wg.add(1) go pubsocket.publisten(ctx, &wg, &activesubs, testing) wg.add(1) go privsocket.privlisten(ctx, &wg, &activesubs, testing) <- comms cancel() wg.wait()
这是客户端如何运行 go 例程的代码
func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
for {
select {
case <- ctx.Done():
log.Println("closing public socket")
socket.Close()
return
default:
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
pubJsonDecoder(message, testing)
//tradesParser(message);
}
}
}
}
func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
defer wg.Done()
for {
select {
case <- ctx.Done():
log.Println("closing private socket")
socket.Close()
return
default:
socket.OnTextMessage = func(message string, socket Socket) {
log.Println(message)
}
}
}
}
关于为什么空闲唤醒如此高有什么想法吗?我应该使用多线程而不是并发吗?预先感谢您的帮助!
正确答案
你在这里浪费了cpu(多余的循环):
for {
// ...
default:
// high cpu usage here.
}
}
尝试这样的事情:
func (socket *socket) publisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) {
defer wg.done()
defer socket.close()
socket.ontextmessage = func(message string, socket socket) {
log.println(message)
pubjsondecoder(message, testing)
//tradesparser(message);
}
<-ctx.done()
log.println("closing public socket")
}
func (socket *socket) privlisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) {
defer wg.done()
defer socket.close()
socket.ontextmessage = func(message string, socket socket) {
log.println(message)
}
<-ctx.done()
log.println("closing private socket")
}
这也可能有帮助:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go
tl/dr:websocket 很难:)
看起来您可能有几个旋转器。您将在 for - select 语句的默认情况下为 ontextmessage() 分配处理程序函数。如果没有其他情况准备就绪,则始终执行默认情况。因为在默认情况下没有任何东西会阻塞,所以 for 循环就会失去控制。像这样旋转的两个 goroutine 可能会固定 2 个核心。 websocket 是网络 io,这些 goroutine 可能会并行运行。这就是您看到 200% 利用率的原因。
看看 gorilla/websocket 库。我不会说它比任何其他 websocket 库更好或更差,我有很多使用它的经验。
https://github.com/gorilla/websocket
下面是我多次使用的实现。 它的设置方式是注册接收到特定消息时触发的处理函数。假设消息中的值之一是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。
打包服务器
上下文.go
package serverws
import (
"errors"
"fmt"
"strings"
"sync"
)
// conncontext is the connection context to track a connected websocket user
type conncontext struct {
specialkey string
supportgzip string
userid string
mu sync.mutex // websockets are not thread safe, we'll use a mutex to lock writes.
}
// hashkeyasctx returns a conncontext based on the hash provided
func hashkeyasctx(hashkey string) (*conncontext, error) {
values := strings.split(hashkey, ":")
if len(values) != 3 {
return nil, errors.new("invalid key received: " + hashkey)
}
return &conncontext{values[0], values[1], values[2], sync.mutex{}}, nil
}
// ashashkey returns the hash key for a given connection context conncontext
func (ctx *conncontext) ashashkey() string {
return strings.join([]string{ctx.specialkey, ctx.supportgzip, ctx.userid}, ":")
}
// string returns a string of the hash of a given connection context conncontext
func (ctx *conncontext) string() string {
return fmt.sprint("specialkey: ", ctx.specialkey, " gzip ", ctx.supportgzip, " auth ", ctx.userid)
}
wshandler.go
package serverws
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/rs/zerolog/log"
)
var (
receivefunctionmap = make(map[string]receiveobjectfunc)
ctxhashmap sync.map
)
// receiveobjectfunc is a function signature for a websocket request handler
type receiveobjectfunc func(conn *websocket.conn, ctx *conncontext, t map[string]interface{})
// websockethandler does what it says, handles websockets (makes them easier for us to deal with)
type websockethandler struct {
wsupgrader websocket.upgrader
}
// websocketmessage that is sent over a websocket. messages must have a conversation type so the server and the client js know
// what is being discussed and what signals to raise on the server and the client.
// the "notification" message instructs the client to display an alert popup.
type websocketmessage struct {
messagetype string `json:"type"`
message interface{} `json:"message"`
}
// newwebsockethandler sets up a new websocket.
func newwebsockethandler() *websockethandler {
wsh := new(websockethandler)
wsh.wsupgrader = websocket.upgrader{
readbuffersize: 4096,
writebuffersize: 4096,
}
return wsh
}
// registermessagetype sets up an event bus for a message type. when messages arrive from the client that match messagetypename,
// the function you wrote to handle that message is then called.
func (wsh *websockethandler) registermessagetype(messagetypename string, f receiveobjectfunc) {
receivefunctionmap[messagetypename] = f
}
// onmessage triggers when the underlying websocket has received a message.
func (wsh *websockethandler) onmessage(conn *websocket.conn, ctx *conncontext, msg []byte, msgtype int) {
// handling text messages or binary messages. binary is usually some gzip text.
if msgtype == websocket.textmessage {
wsh.processincomingtextmsg(conn, ctx, msg)
}
if msgtype == websocket.binarymessage {
}
}
// onopen triggers when the underlying websocket has established a connection.
func (wsh *websockethandler) onopen(conn *websocket.conn, r *http.request) (ctx *conncontext, err error) {
//user, err := gothic.getfromsession("id", r)
user := "testuser"
if err := r.parseform(); err != nil {
return nil, errors.new("parameter check error")
}
specialkey := r.formvalue("specialkey")
supportgzip := r.formvalue("support_gzip")
if user != "" && err == nil {
ctx = &conncontext{specialkey, supportgzip, user, sync.mutex{}}
} else {
ctx = &conncontext{specialkey, supportgzip, "", sync.mutex{}}
}
keystring := ctx.ashashkey()
if oldconn, ok := ctxhashmap.load(keystring); ok {
wsh.onclose(oldconn.(*websocket.conn), ctx)
oldconn.(*websocket.conn).close()
}
ctxhashmap.store(keystring, conn)
return ctx, nil
}
// onclose triggers when the underlying websocket has been closed down
func (wsh *websockethandler) onclose(conn *websocket.conn, ctx *conncontext) {
//log.info().msg(("client close itself as " + ctx.string()))
wsh.closeconnwithctx(ctx)
}
// onerror triggers when a websocket connection breaks
func (wsh *websockethandler) onerror(errmsg string) {
//log.error().msg(errmsg)
}
// handleconn happens when a user connects to us at the listening point. we ask
// the user to authenticate and then send the required http upgrade return code.
func (wsh *websockethandler) handleconn(w http.responsewriter, r *http.request) {
user := ""
if r.url.path == "/websocket" {
user = "testuser" // authenticate however you want
if user == "" {
fmt.println("unauthenticated user tried to connect to websocket from ", r.header.get("x-forwarded-for"))
return
}
}
// don't do this. you need to check the origin, but this is here as a place holder
wsh.wsupgrader.checkorigin = func(r *http.request) bool {
return true
}
conn, err := wsh.wsupgrader.upgrade(w, r, nil)
if err != nil {
log.error().msg("failed to set websocket upgrade: " + err.error())
return
}
defer conn.close()
ctx, err := wsh.onopen(conn, r)
if err != nil {
log.error().msg("open connection failed " + err.error() + r.url.rawquery)
if user != "" {
ctx.userid = user
}
return
}
if user != "" {
ctx.userid = user
}
conn.setpinghandler(func(message string) error {
conn.writecontrol(websocket.pongmessage, []byte(message), time.now().add(time.second))
return nil
})
// message pump for the underlying websocket connection
for {
t, msg, err := conn.readmessage()
if err != nil {
// read errors are when the user closes the tab. ignore.
wsh.onclose(conn, ctx)
return
}
switch t {
case websocket.textmessage, websocket.binarymessage:
wsh.onmessage(conn, ctx, msg, t)
case websocket.closemessage:
wsh.onclose(conn, ctx)
return
case websocket.pingmessage:
case websocket.pongmessage:
}
}
}
func (wsh *websockethandler) closeconnwithctx(ctx *conncontext) {
keystring := ctx.ashashkey()
ctxhashmap.delete(keystring)
}
func (wsh *websockethandler) processincomingtextmsg(conn *websocket.conn, ctx *conncontext, msg []byte) {
//log.debug().msg("client said " + string(msg))
data := websocketmessage{}
// try to turn this into data
err := json.unmarshal(msg, &data)
// and try to get at the data underneath
var raw = make(map[string]interface{})
terr := json.unmarshal(msg, &raw)
if err == nil {
// what kind of message is this?
if receivefunctionmap[data.messagetype] != nil {
// we'll try to cast this message and call the handler for it
if terr == nil {
if v, ok := raw["message"].(map[string]interface{}); ok {
receivefunctionmap[data.messagetype](conn, ctx, v)
} else {
log.debug().msg("nonsense sent over the websocket.")
}
} else {
log.debug().msg("nonsense sent over the websocket.")
}
}
} else {
// received garbage from the transmitter.
}
}
// sendjsontosocket sends a specific message to a specific websocket
func (wsh *websockethandler) sendjsontosocket(socketid string, msg interface{}) {
fields := strings.split(socketid, ":")
message, _ := json.marshal(msg)
ctxhashmap.range(func(key interface{}, value interface{}) bool {
if ctx, err := hashkeyasctx(key.(string)); err != nil {
wsh.onerror(err.error())
} else {
if ctx.specialkey == fields[0] {
ctx.mu.lock()
if value != nil {
err = value.(*websocket.conn).writemessage(websocket.textmessage, message)
}
ctx.mu.unlock()
}
if err != nil {
ctx.mu.lock() // we'll lock here even though we're going to destroy this
wsh.onclose(value.(*websocket.conn), ctx)
value.(*websocket.conn).close()
ctxhashmap.delete(key) // remove the websocket immediately
//wsh.onerror("write err to user " + key.(string) + " err: " + err.error())
}
}
return true
})
}
封装wsocket
类型.go
package wsocket
// acknowledgement is for acking simple messages and sending errors
type acknowledgement struct {
responseid string `json:"responseid"`
status string `json:"status"`
ipaddress string `json:"ipaddress"`
errortext string `json:"errortext"`
}
wsocket.go
package wsocket
import (
"fmt"
server "project/serverws"
"project/utils"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
// "github.com/mitchellh/mapstructure"
"github.com/inconshreveable/log15"
)
var (
websocket *server.websockethandler // so other packages can send out websocket messages
websocketlocation string
log log15.logger = log15.new("package", "wsocket"
)
func setupwebsockets(r *gin.engine, socket *server.websockethandler, debug_mode bool) {
websocket = socket
websocketlocation = "example.mydomain.com"
//websocketlocation = "example.mydomain.com"
r.get("/websocket", func(c *gin.context) {
socket.handleconn(c.writer, c.request)
})
socket.registermessagetype("hello", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) {
response := acknowledgement{
responseid: "hello",
status: fmt.sprintf("ok/%v", ctx.authid),
ipaddress: conn.remoteaddr().string(),
}
// mapstructure.decode(data, &request) -- used if we wanted to read what was fed in
socket.sendjsontosocket(ctx.ashashkey(), &response)
})
socket.registermessagetype("start-job", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) {
response := acknowledgement{
responseid: "starting_job",
status: fmt.sprintf("%s is being dialed.", data["did"]),
ipaddress: conn.remoteaddr().string(),
}
// mapstructure.decode(data, &request) -- used if we wanted to read what was fed in to a struct.
socket.sendjsontosocket(ctx.ashashkey(), &response)
})
此实现是针对 web 应用程序的。这是 javascript 客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所要做的通信就是定义包含与下面 switch 中的情况匹配的responseid 的对象/结构,它基本上是一个长 switch 语句,将其序列化并将其发送到另一端,对方就会确认。我有一些版本在多个生产环境中运行。
websocket.js
$(() => {
function wsMessage(object) {
switch (object.responseId) {
case "Hello": // HELLO! :-)
console.log("Heartbeat received, we're connected.");
break;
case "Notification":
if (object.errortext != "") {
$.notify({
// options
message: '<center><B><i class="fas fa-exclamation-triangle"></i> ' + object.errortext + '</B></center>',
}, {
// settings
type: 'danger',
offset: 50,
placement: {
align: 'center',
}
});
} else {
$.notify({
// options
message: '<center><B>' + object.status + '</B></center>',
}, {
// settings
type: 'success',
offset: 50,
placement: {
align: 'center',
}
});
}
break;
}
}
$(document).ready(function () {
function heartbeat() {
if (!websocket) return;
if (websocket.readyState !== 1) return;
websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }");
setTimeout(heartbeat, 24000);
}
//TODO: CHANGE TO WSS once tls is enabled.
function wireUpWebsocket() {
websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0');
websocket.onopen = function (event) {
console.log("Websocket connected.");
heartbeat();
//if it exists
if (typeof (wsReady) !== 'undefined') {
//execute it
wsReady();
}
};
websocket.onerror = function (event) {
console.log("WEBSOCKET ERROR " + event.data);
};
websocket.onmessage = function (event) {
wsMessage(JSON.parse(event.data));
};
websocket.onclose = function () {
// Don't close!
// Replace key
console.log("WEBSOCKET CLOSED");
WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15);
websocketreconnects++;
if (websocketreconnects > 30) { // Too much, time to bounce
// location.reload(); Don't reload the page anymore, just re-connect.
}
setTimeout(function () { wireUpWebsocket(); }, 3000);
};
}
wireUpWebsocket();
});
});
function getCookie(name) {
var value = "; " + document.cookie;
var parts = value.split("; " + name + "=");
if (parts.length == 2) return parts.pop().split(";").shift();
}
function setCookie(cname, cvalue, exdays) {
var d = new Date();
d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000));
var expires = "expires=" + d.toUTCString();
document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/";
}
在无限循环中一遍又一遍地分配处理函数肯定是行不通的。
https://github.com/gorilla/websocket
今天关于《Goroutine 具有高效的任务调度》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!
Go 的 http.Client 的实例化是否可重复使用的原因是什么?
- 上一篇
- Go 的 http.Client 的实例化是否可重复使用的原因是什么?
- 下一篇
- github.com/aws/aws-sdk-go/aws未提供必要模块
-
- Golang · Go问答 | 1年前 |
- 在读取缓冲通道中的内容之前退出
- 139浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 戈兰岛的全球 GOPRIVATE 设置
- 204浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将结构作为参数传递给 xml-rpc
- 325浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何用golang获得小数点以下两位长度?
- 478浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何通过 client-go 和 golang 检索 Kubernetes 指标
- 486浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将多个“参数”映射到单个可变参数的习惯用法
- 439浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将 HTTP 响应正文写入文件后出现 EOF 错误
- 357浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 结构中映射的匿名列表的“复合文字中缺少类型”
- 352浏览 收藏
-
- Golang · Go问答 | 1年前 |
- NATS Jetstream 的性能
- 101浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将复杂的字符串输入转换为mapstring?
- 440浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 相当于GoLang中Java将Object作为方法参数传递
- 212浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何确保所有 goroutine 在没有 time.Sleep 的情况下终止?
- 143浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3193次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3405次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3436次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4543次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3814次使用
-
- GoLand调式动态执行代码
- 2023-01-13 502浏览
-
- 用Nginx反向代理部署go写的网站。
- 2023-01-17 502浏览
-
- Golang取得代码运行时间的问题
- 2023-02-24 501浏览
-
- 请问 go 代码如何实现在代码改动后不需要Ctrl+c,然后重新 go run *.go 文件?
- 2023-01-08 501浏览
-
- 如何从同一个 io.Reader 读取多次
- 2023-04-11 501浏览

