当前位置:首页 > 文章列表 > Golang > Go问答 > Goroutine 具有高效的任务调度

Goroutine 具有高效的任务调度

来源:stackoverflow 2024-02-14 15:48:24 0浏览 收藏

从现在开始,努力学习吧!本文《Goroutine 具有高效的任务调度》主要讲解了等等相关知识点,我会在golang学习网中持续更新相关的系列文章,欢迎大家关注并积极留言建议。下面就先一起来看一下本篇正文内容吧,希望能帮到你!

问题内容

我正在使用 golang 使用 goroutine 同时运行两个 websocket 客户端(一个用于私有数据,一个用于公共数据)。从表面上看,一切似乎都很顺利。两个客户端都接收从 websocket 服务器传输的数据。然而,我相信我可能设置错误,因为当我检查活动监视器时,我的程序始终有 500 - 1500 次空闲唤醒,并且使用了 >200% 的 cpu。对于像两个 websocket 客户端这样简单的事情来说,这似乎并不正常。

我已将代码放在片段中,因此需要阅读的内容较少(希望这使其更易于理解),但如果您需要完整的代码,我也可以发布。这是我的 main func 中运行 ws 客户端的代码

comms := make(chan os.signal, 1)
signal.notify(comms, os.interrupt, syscall.sigterm)
ctx := context.background()
ctx, cancel := context.withcancel(ctx)
var wg sync.waitgroup

wg.add(1)
go pubsocket.publisten(ctx, &wg, &activesubs, testing)
wg.add(1)
go privsocket.privlisten(ctx, &wg, &activesubs, testing)

<- comms
cancel()
wg.wait()

这是客户端如何运行 go 例程的代码

func (socket *Socket) PubListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing public socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
                pubJsonDecoder(message, testing)
                //tradesParser(message);
            }
        }
    }
}

func (socket *Socket) PrivListen(ctx context.Context, wg *sync.WaitGroup, subManager *ConnStatus, testing bool) {
    defer wg.Done()
    for {
        select {
        case <- ctx.Done():
            log.Println("closing private socket")
            socket.Close()
            return
        default:
            socket.OnTextMessage = func(message string, socket Socket) {
                log.Println(message)
            }
        }
    }
}

关于为什么空闲唤醒如此高有什么想法吗?我应该使用多线程而不是并发吗?预先感谢您的帮助!


正确答案


你在这里浪费了cpu(多余的循环):

for {
       // ...
        default:
        // high cpu usage here.
        }
    }

尝试这样的事情:

func (socket *socket) publisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) {
    defer wg.done()
    defer socket.close()

    socket.ontextmessage = func(message string, socket socket) {
        log.println(message)
        pubjsondecoder(message, testing)
        //tradesparser(message);
    }

    <-ctx.done()
    log.println("closing public socket")
}

func (socket *socket) privlisten(ctx context.context, wg *sync.waitgroup, submanager *connstatus, testing bool) {
    defer wg.done()
    defer socket.close()

    socket.ontextmessage = func(message string, socket socket) {
        log.println(message)
    }

    <-ctx.done()
    log.println("closing private socket")
}

这也可能有帮助:
https://github.com/gorilla/websocket/blob/master/examples/chat/client.go

tl/dr:websocket 很难:)

看起来您可能有几个旋转器。您将在 for - select 语句的默认情况下为 ontextmessage() 分配处理程序函数。如果没有其他情况准备就绪,则始终执行默认情况。因为在默认情况下没有任何东西会阻塞,所以 for 循环就会失去控制。像这样旋转的两个 goroutine 可能会固定 2 个核心。 websocket 是网络 io,这些 goroutine 可能会并行运行。这就是您看到 200% 利用率的原因。

看看 gorilla/websocket 库。我不会说它比任何其他 websocket 库更好或更差,我有很多使用它的经验。

https://github.com/gorilla/websocket

下面是我多次使用的实现。 它的设置方式是注册接收到特定消息时触发的处理函数。假设消息中的值之一是“type”:“start-job”,websocket 服务器将调用您分配给“start-job”websocket 消息的处理程序。感觉就像为 http 路由器编写端点。

打包服务器

上下文.go

package serverws

import (
    "errors"
    "fmt"
    "strings"
    "sync"
)

// conncontext is the connection context to track a connected websocket user
type conncontext struct {
    specialkey  string
    supportgzip string
    userid      string
    mu         sync.mutex // websockets are not thread safe, we'll use a mutex to lock writes.
}

// hashkeyasctx returns a conncontext based on the hash provided
func hashkeyasctx(hashkey string) (*conncontext, error) {
    values := strings.split(hashkey, ":")
    if len(values) != 3 {
        return nil, errors.new("invalid key received: " + hashkey)
    }
    return &conncontext{values[0], values[1], values[2], sync.mutex{}}, nil
}

// ashashkey returns the hash key for a given connection context conncontext
func (ctx *conncontext) ashashkey() string {
    return strings.join([]string{ctx.specialkey, ctx.supportgzip, ctx.userid}, ":")
}

// string returns a string of the hash of a given connection context conncontext
func (ctx *conncontext) string() string {
    return fmt.sprint("specialkey: ", ctx.specialkey, " gzip ", ctx.supportgzip, " auth ", ctx.userid)
}

wshandler.go

package serverws

import (
    "encoding/json"
    "errors"
    "fmt"
    "net/http"
    "strings"
    "sync"
    "time"

    "github.com/gorilla/websocket"
    "github.com/rs/zerolog/log"
)

var (
    receivefunctionmap = make(map[string]receiveobjectfunc)
    ctxhashmap         sync.map
)

// receiveobjectfunc is a function signature for a websocket request handler
type receiveobjectfunc func(conn *websocket.conn, ctx *conncontext, t map[string]interface{})

// websockethandler does what it says, handles websockets (makes them easier for us to deal with)
type websockethandler struct {
    wsupgrader websocket.upgrader
}

// websocketmessage that is sent over a websocket.   messages must have a conversation type so the server and the client js know
// what is being discussed and what signals to raise on the server and the client.
// the "notification" message instructs the client to display an alert popup.
type websocketmessage struct {
    messagetype string      `json:"type"`
    message     interface{} `json:"message"`
}

// newwebsockethandler sets up a new websocket.
func newwebsockethandler() *websockethandler {
    wsh := new(websockethandler)
    wsh.wsupgrader = websocket.upgrader{
        readbuffersize:  4096,
        writebuffersize: 4096,
    }
    return wsh

}

// registermessagetype sets up an event bus for a message type.   when messages arrive from the client that match messagetypename,
// the function you wrote to handle that message is then called.
func (wsh *websockethandler) registermessagetype(messagetypename string, f receiveobjectfunc) {
    receivefunctionmap[messagetypename] = f
}

// onmessage triggers when the underlying websocket has received a message.
func (wsh *websockethandler) onmessage(conn *websocket.conn, ctx *conncontext, msg []byte, msgtype int) {
    //  handling text messages or binary messages. binary is usually some gzip text.
    if msgtype == websocket.textmessage {
        wsh.processincomingtextmsg(conn, ctx, msg)
    }
    if msgtype == websocket.binarymessage {

    }
}

// onopen triggers when the underlying websocket has established a connection.
func (wsh *websockethandler) onopen(conn *websocket.conn, r *http.request) (ctx *conncontext, err error) {
    //user, err := gothic.getfromsession("id", r)
    user := "testuser"
    if err := r.parseform(); err != nil {
        return nil, errors.new("parameter check error")
    }

    specialkey := r.formvalue("specialkey")
    supportgzip := r.formvalue("support_gzip")

    if user != "" && err == nil {
        ctx = &conncontext{specialkey, supportgzip, user, sync.mutex{}}
    } else {
        ctx = &conncontext{specialkey, supportgzip, "", sync.mutex{}}
    }

    keystring := ctx.ashashkey()

    if oldconn, ok := ctxhashmap.load(keystring); ok {
        wsh.onclose(oldconn.(*websocket.conn), ctx)
        oldconn.(*websocket.conn).close()
    }
    ctxhashmap.store(keystring, conn)
    return ctx, nil
}

// onclose triggers when the underlying websocket has been closed down
func (wsh *websockethandler) onclose(conn *websocket.conn, ctx *conncontext) {
    //log.info().msg(("client close itself as " + ctx.string()))
    wsh.closeconnwithctx(ctx)
}

// onerror triggers when a websocket connection breaks
func (wsh *websockethandler) onerror(errmsg string) {
    //log.error().msg(errmsg)
}

// handleconn happens when a user connects to us at the listening point.  we ask
// the user to authenticate and then send the required http upgrade return code.
func (wsh *websockethandler) handleconn(w http.responsewriter, r *http.request) {

    user := ""
    if r.url.path == "/websocket" {
        user = "testuser" // authenticate however you want
        if user == "" {
            fmt.println("unauthenticated user tried to connect to websocket from ", r.header.get("x-forwarded-for"))
            return
        }
    }
    // don't do this.  you need to check the origin, but this is here as a place holder
    wsh.wsupgrader.checkorigin = func(r *http.request) bool {
        return true
    }

    conn, err := wsh.wsupgrader.upgrade(w, r, nil)
    if err != nil {
        log.error().msg("failed to set websocket upgrade: " + err.error())
        return
    }
    defer conn.close()

    ctx, err := wsh.onopen(conn, r)
    if err != nil {
        log.error().msg("open connection failed " + err.error() + r.url.rawquery)
        if user != "" {
            ctx.userid = user
        }
        return
    }

    if user != "" {
        ctx.userid = user
    }
    conn.setpinghandler(func(message string) error {
        conn.writecontrol(websocket.pongmessage, []byte(message), time.now().add(time.second))
        return nil
    })

    // message pump for the underlying websocket connection
    for {
        t, msg, err := conn.readmessage()
        if err != nil {
            // read errors are when the user closes the tab. ignore.
            wsh.onclose(conn, ctx)
            return
        }

        switch t {
        case websocket.textmessage, websocket.binarymessage:
            wsh.onmessage(conn, ctx, msg, t)
        case websocket.closemessage:
            wsh.onclose(conn, ctx)
            return
        case websocket.pingmessage:
        case websocket.pongmessage:
        }

    }

}

func (wsh *websockethandler) closeconnwithctx(ctx *conncontext) {
    keystring := ctx.ashashkey()
    ctxhashmap.delete(keystring)
}

func (wsh *websockethandler) processincomingtextmsg(conn *websocket.conn, ctx *conncontext, msg []byte) {
    //log.debug().msg("client said " + string(msg))
    data := websocketmessage{}

    // try to turn this into data
    err := json.unmarshal(msg, &data)

    // and try to get at the data underneath
    var raw = make(map[string]interface{})
    terr := json.unmarshal(msg, &raw)

    if err == nil {
        // what kind of message is this?
        if receivefunctionmap[data.messagetype] != nil {
            // we'll try to cast this message and call the handler for it
            if terr == nil {
                if v, ok := raw["message"].(map[string]interface{}); ok {
                    receivefunctionmap[data.messagetype](conn, ctx, v)
                } else {
                    log.debug().msg("nonsense sent over the websocket.")
                }
            } else {
                log.debug().msg("nonsense sent over the websocket.")
            }
        }
    } else {
        // received garbage from the transmitter.
    }
}

// sendjsontosocket sends a specific message to a specific websocket
func (wsh *websockethandler) sendjsontosocket(socketid string, msg interface{}) {
    fields := strings.split(socketid, ":")
    message, _ := json.marshal(msg)

    ctxhashmap.range(func(key interface{}, value interface{}) bool {
        if ctx, err := hashkeyasctx(key.(string)); err != nil {
            wsh.onerror(err.error())
        } else {
            if ctx.specialkey == fields[0] {
                ctx.mu.lock()
                if value != nil {
                    err = value.(*websocket.conn).writemessage(websocket.textmessage, message)
                }
                ctx.mu.unlock()
            }
            if err != nil {
                ctx.mu.lock() // we'll lock here even though we're going to destroy this
                wsh.onclose(value.(*websocket.conn), ctx)
                value.(*websocket.conn).close()
                ctxhashmap.delete(key) // remove the websocket immediately
                //wsh.onerror("write err to user " + key.(string) + " err: " + err.error())
            }
        }
        return true
    })
}

封装wsocket

类型.go

package wsocket



// acknowledgement is for acking simple messages and sending errors
type acknowledgement struct {
    responseid string `json:"responseid"`
    status     string `json:"status"`
    ipaddress  string `json:"ipaddress"`
    errortext  string `json:"errortext"`
}

wsocket.go

package wsocket

import (
    "fmt"
    server "project/serverws"
    "project/utils"
    "sync"
    "time"

    "github.com/gin-gonic/gin"
    "github.com/gorilla/websocket"
    // "github.com/mitchellh/mapstructure"
    "github.com/inconshreveable/log15"
)
var (
    websocket         *server.websockethandler // so other packages can send out websocket messages
    websocketlocation string
    log               log15.logger = log15.new("package", "wsocket"
)

func setupwebsockets(r *gin.engine, socket *server.websockethandler, debug_mode bool) {

    websocket = socket
    websocketlocation = "example.mydomain.com"
    //websocketlocation = "example.mydomain.com"
    r.get("/websocket", func(c *gin.context) {
        socket.handleconn(c.writer, c.request)

    })

socket.registermessagetype("hello", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) {

        response := acknowledgement{
            responseid: "hello",
            status:     fmt.sprintf("ok/%v", ctx.authid),
            ipaddress:  conn.remoteaddr().string(),
        }
        // mapstructure.decode(data, &request) -- used if we wanted to read what was fed in
        socket.sendjsontosocket(ctx.ashashkey(), &response)
    })

socket.registermessagetype("start-job", func(conn *websocket.conn, ctx *server.conncontext, data map[string]interface{}) {

        response := acknowledgement{
            responseid: "starting_job",
            status:     fmt.sprintf("%s is being dialed.", data["did"]),
            ipaddress:  conn.remoteaddr().string(),
        }
        // mapstructure.decode(data, &request) -- used if we wanted to read what was fed in to a struct.
        socket.sendjsontosocket(ctx.ashashkey(), &response)

    })

此实现是针对 web 应用程序的。这是 javascript 客户端的简化版本。您可以使用此实现处理许多并发连接,并且您所要做的通信就是定义包含与下面 switch 中的情况匹配的responseid 的对象/结构,它基本上是一个长 switch 语句,将其序列化并将其发送到另一端,对方就会确认。我有一些版本在多个生产环境中运行。

websocket.js

$(() => {

    function wsMessage(object) {
        switch (object.responseId) {
            case "Hello": // HELLO! :-)
                console.log("Heartbeat received, we're connected.");
                break;

            case "Notification":
                if (object.errortext != "") {
                    $.notify({
                        // options
                        message: '
  ' + object.errortext + '
', }, { // settings type: 'danger', offset: 50, placement: { align: 'center', } }); } else { $.notify({ // options message: '
' + object.status + '
', }, { // settings type: 'success', offset: 50, placement: { align: 'center', } }); } break; } } $(document).ready(function () { function heartbeat() { if (!websocket) return; if (websocket.readyState !== 1) return; websocket.send("{\"type\": \"Hello\", \"message\": { \"RequestID\": \"Hello\", \"User\":\"" + /*getCookie("_loginuser")*/"TestUser" + "\"} }"); setTimeout(heartbeat, 24000); } //TODO: CHANGE TO WSS once tls is enabled. function wireUpWebsocket() { websocket = new WebSocket('wss://' + WEBSOCKET_LOCATION + '/websocket?specialKey=' + WEBSOCKET_KEY + '&support_gzip=0'); websocket.onopen = function (event) { console.log("Websocket connected."); heartbeat(); //if it exists if (typeof (wsReady) !== 'undefined') { //execute it wsReady(); } }; websocket.onerror = function (event) { console.log("WEBSOCKET ERROR " + event.data); }; websocket.onmessage = function (event) { wsMessage(JSON.parse(event.data)); }; websocket.onclose = function () { // Don't close! // Replace key console.log("WEBSOCKET CLOSED"); WEBSOCKET_KEY = Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15) + Math.random().toString(36).substring(2, 15); websocketreconnects++; if (websocketreconnects > 30) { // Too much, time to bounce // location.reload(); Don't reload the page anymore, just re-connect. } setTimeout(function () { wireUpWebsocket(); }, 3000); }; } wireUpWebsocket(); }); }); function getCookie(name) { var value = "; " + document.cookie; var parts = value.split("; " + name + "="); if (parts.length == 2) return parts.pop().split(";").shift(); } function setCookie(cname, cvalue, exdays) { var d = new Date(); d.setTime(d.getTime() + (exdays * 24 * 60 * 60 * 1000)); var expires = "expires=" + d.toUTCString(); document.cookie = cname + "=" + cvalue + ";" + expires + ";path=/"; }

在无限循环中一遍又一遍地分配处理函数肯定是行不通的。

https://github.com/gorilla/websocket

今天关于《Goroutine 具有高效的任务调度》的内容就介绍到这里了,是不是学起来一目了然!想要了解更多关于的内容请关注golang学习网公众号!

版本声明
本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
Go 的 http.Client 的实例化是否可重复使用的原因是什么?Go 的 http.Client 的实例化是否可重复使用的原因是什么?
上一篇
Go 的 http.Client 的实例化是否可重复使用的原因是什么?
github.com/aws/aws-sdk-go/aws未提供必要模块
下一篇
github.com/aws/aws-sdk-go/aws未提供必要模块
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • 毕业宝AIGC检测:AI生成内容检测工具,助力学术诚信
    毕业宝AIGC检测
    毕业宝AIGC检测是“毕业宝”平台的AI生成内容检测工具,专为学术场景设计,帮助用户初步判断文本的原创性和AI参与度。通过与知网、维普数据库联动,提供全面检测结果,适用于学生、研究者、教育工作者及内容创作者。
    18次使用
  • AI Make Song:零门槛AI音乐创作平台,助你轻松制作个性化音乐
    AI Make Song
    AI Make Song是一款革命性的AI音乐生成平台,提供文本和歌词转音乐的双模式输入,支持多语言及商业友好版权体系。无论你是音乐爱好者、内容创作者还是广告从业者,都能在这里实现“用文字创造音乐”的梦想。平台已生成超百万首原创音乐,覆盖全球20个国家,用户满意度高达95%。
    29次使用
  • SongGenerator.io:零门槛AI音乐生成器,快速创作高质量音乐
    SongGenerator
    探索SongGenerator.io,零门槛、全免费的AI音乐生成器。无需注册,通过简单文本输入即可生成多风格音乐,适用于内容创作者、音乐爱好者和教育工作者。日均生成量超10万次,全球50国家用户信赖。
    27次使用
  •  BeArt AI换脸:免费在线工具,轻松实现照片、视频、GIF换脸
    BeArt AI换脸
    探索BeArt AI换脸工具,免费在线使用,无需下载软件,即可对照片、视频和GIF进行高质量换脸。体验快速、流畅、无水印的换脸效果,适用于娱乐创作、影视制作、广告营销等多种场景。
    30次使用
  • SEO标题协启动:AI驱动的智能对话与内容生成平台 - 提升创作效率
    协启动
    SEO摘要协启动(XieQiDong Chatbot)是由深圳协启动传媒有限公司运营的AI智能服务平台,提供多模型支持的对话服务、文档处理和图像生成工具,旨在提升用户内容创作与信息处理效率。平台支持订阅制付费,适合个人及企业用户,满足日常聊天、文案生成、学习辅助等需求。
    32次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码