当前位置:首页 > 文章列表 > Golang > Go问答 > 为何数据被写入通道却未被接收者 goroutine 读取?

为何数据被写入通道却未被接收者 goroutine 读取?

来源:stackoverflow 2024-03-26 10:42:40 0浏览 收藏

在使用 Go 语言构建数据缓冲系统时,服务 A 无法将数据推送到服务 B 的通道中。服务 B 中的缓冲子协程未能进入接收数据的情况。分析代码后发现,问题出在服务 A 中,因为当将新通道提供给服务 B 时,接收器不是指针,导致服务 A 的副本中更改了通道,而这些副本在函数退出时被丢弃。通过将接收器更改为指针并使用正确的赋值,数据能够成功从服务 A 发送到服务 B 的通道。

问题内容

我正在构建一个守护进程,并且有两个服务将相互发送数据。服务 a 产生数据,服务 b a 是数据缓冲区服务或类似队列。因此,从 main.go 文件中,服务 b 被实例化并启动。 start() 方法将执行 buffer() 函数作为 goroutine,因为该函数等待数据传递到通道,并且我不希望主进程停止等待 buffer 完成。然后服务a被实例化并启动。然后它也向服务 b“注册”。

我为服务 a 创建了一个名为 registerwithbufferservice 的方法,该方法创建两个新通道。它将把这些通道存储为它自己的属性,并将它们提供给服务 b。

func (s *servicea) registerwithbufferservice(bufservice *data.databuffer) error {
    newincomingchan := make(chan *data.dataframe, 1)
    newoutgoingchan := make(chan []byte, 1)
    s.incomingbuffchan = newincomingchan
    s.outgoingdatachannels = append(s.outgoingdatachannels, newoutgoingchan)
    bufservice.dataproviders[s.servicename()] = data.dataproviderinfo{
        incomingchan: newoutgoingchan, //our outgoing channel is their incoming
        outgoingchan: newincomingchan, // our incoming channel is their outgoing
    }
    s.databufferservice = bufservice
    bufservice.newprovider <- s.servicename() //the databuffer service listens for new services and creates a new goroutine for buffering
    s.logger.info().msg("registeration completed.")
    return nil
}

buffer 本质上监听来自服务 a 的传入数据,使用 decode() 对其进行解码,然后将其添加到名为 buf 的切片中。如果切片的长度大于 bufferperiod,则它将在传出通道中将切片中的第一项发送回服务 a。

func (b* databuffer) buffer(bufferperiod int) {
    for {
        select {
        case newprovider := <- b.newprovider:
            b.wg.add(1)
            /*
            newprovider is a string
            dataproviders is a map the value it returns is a struct containing the incoming and 
            outgoing channels for this service
            */
            p := b.dataproviders[newprovider]
            go func(prov string, in chan []byte, out chan *dataframe) {
                defer b.wg.done()
                var buf []*dataframe
                for {
                    select {
                    case rawdata := <-in:
                        tmp := decode(rawdata) //custom decoding function. returns a *dataframe
                        buf = append(buf, tmp)
                        if len(buf) < bufferperiod {
                            b.logger.info().msg("sending decoded data out.")
                            out <- buf[0]
                            buf = buf[1:] //pop
                        }
                    case <- b.quit:
                        return
                    }
                }
            }(newprovider, p.incomingchan, p.outgoingchan)
        }
    case <- b.quit:
        return
    }
}

现在服务 a 有一个名为 record 的方法,该方法会定期将数据推送到其 outgoingdatachannels 属性中的所有通道。

func (s *servicea) record() error {
    ...
    if atomic.loadint32(&s.listeners) != 0 {
        s.logger.info().msg("sending raw data to data buffer")
        for _, outchan := range s.outgoingdatachannels {
            outchan <- databytes // the receiver (service b) is already listening and this doesn't hang
        }
        s.logger.info().msg("raw data sent and received") // the logger will output this so i know it's not hanging 
    }
}

问题是,服务 a 似乎使用 record 成功推送数据,但服务 b 从未进入 case rawdata := <-in: 情况(在 buffer 子协程中)。这是因为我有嵌套的 goroutine 吗?如果不清楚的话,当服务 b 启动时,它会调用 buffer 但因为否则它会挂起,所以我将对 buffer 的调用设为 goroutine。因此,当服务 a 调用 registerwithbufferservice 时,buffer goroutine 创建一个 goroutine 来监听来自服务 b 的新数据,并在缓冲区填满后将其推送回服务 a。我希望我解释清楚了。

编辑 1 我制作了一个最小的、可重现的示例。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var (
    defaultbufferingperiod int = 3
    defaultpollinginterval int64 = 10
)

type dataobject struct{
    data    string
}

type dataprovider interface {
    registerwithbufferservice(*databuffer) error
    servicename() string
}

type dataproviderinfo struct{
    incomingchan    chan *dataobject
    outgoingchan    chan *dataobject
}

type databuffer struct{
    running         int32 //used atomically
    dataproviders   map[string]dataproviderinfo
    quit            chan struct{}
    newprovider     chan string
    wg              sync.waitgroup
}

func newdatabuffer() *databuffer{
    var (
        wg sync.waitgroup
    )
    return &databuffer{
        dataproviders: make(map[string]dataproviderinfo),
        quit: make(chan struct{}),
        newprovider: make(chan string),
        wg: wg,
    }
}

func (b *databuffer) start() error {
    if ok := atomic.compareandswapint32(&b.running, 0, 1); !ok {
        return fmt.errorf("could not start data buffer service.")
    }
    go b.buffer(defaultbufferingperiod)
    return nil
}

func (b *databuffer) stop() error {
    if ok := atomic.compareandswapint32(&b.running, 1, 0); !ok {
        return fmt.errorf("could not stop data buffer service.")
    }
    for _, p := range b.dataproviders {
        close(p.incomingchan)
        close(p.outgoingchan)
    }
    close(b.quit)
    b.wg.wait()
    return nil
}

// buffer creates goroutines for each incoming, outgoing data pair and decodes the incoming bytes into outgoing dataframes
func (b *databuffer) buffer(bufferperiod int) {
    for {
        select {
        case newprovider := <- b.newprovider:
            fmt.println("received new data provider.")
            if _, ok := b.dataproviders[newprovider]; ok { 
                b.wg.add(1)
                p := b.dataproviders[newprovider]
                go func(prov string, in chan *dataobject, out chan *dataobject) {
                    defer b.wg.done()
                    var (
                        buf []*dataobject
                    )
                    fmt.printf("waiting for data from: %s\n", prov)
                    for {
                        select {
                        case indata := <-in:
                            fmt.printf("received data from: %s\n", prov)
                            buf = append(buf, indata)
                            if len(buf) > bufferperiod {
                                fmt.printf("queue is filled, sending data back to %s\n", prov)
                                out <- buf[0]
                                fmt.println("data sent")
                                buf = buf[1:] //pop
                            }
                        case <- b.quit:
                            return
                        }
                    }
                }(newprovider, p.incomingchan, p.outgoingchan)
            }
        case <- b.quit:
            return
        }
    }
}

type servicea struct{
    active                  int32 // atomic
    stopping                int32 // atomic
    recording               int32 // atomic
    listeners               int32 // atomic
    name                    string
    quitchan                chan struct{}
    incomingbuffchan        chan *dataobject
    outgoingbuffchans       []chan *dataobject
    databufferservice       *databuffer
}

// a compile time check to ensure servicea fully implements the dataprovider interface
var _ dataprovider = (*servicea)(nil)

func newservicea() (*servicea, error) {
    var newsliceoutchans []chan *dataobject
    return &servicea{
        quitchan:  make(chan struct{}),
        outgoingbuffchans: newsliceoutchans,
        name:   "servicea",
    }, nil
}

// start starts the service. returns an error if any issues occur
func (s *servicea) start() error {
    atomic.storeint32(&s.active, 1)
    return nil
}

// stop stops the service. returns an error if any issues occur
func (s *servicea) stop() error {
    atomic.storeint32(&s.stopping, 1)
    close(s.quitchan)
    return nil
}

func (s *servicea) startrecording(pol_int int64) error {
    if ok := atomic.compareandswapint32(&s.recording, 0, 1); !ok {
        return fmt.errorf("could not start recording. data recording already started")
    }
    ticker := time.newticker(time.duration(pol_int) * time.second)
    go func() {
        for {
            select {
            case <-ticker.c:
                fmt.println("time to record...")
                err := s.record()
                if err != nil {
                    return
                }
            case <-s.quitchan:
                ticker.stop()
                return
            }
        }
    }()
    return nil
}

func (s *servicea) record() error {
    current_time := time.now()
    ct := fmt.sprintf("%02d-%02d-%d", current_time.day(), current_time.month(), current_time.year())
    dataobject := &dataobject{
        data: ct,
    }
    if atomic.loadint32(&s.listeners) != 0 {
        fmt.println("sending data to data buffer...")
        for _, outchan := range s.outgoingbuffchans {
            outchan <- dataobject // the receivers should already be listening
        }
        fmt.println("data sent.")
    }
    return nil
}

// registerwithbufferservice satisfies the dataprovider interface. it provides the bufservice with new incoming and outgoing channels along with a polling interval
func (s servicea) registerwithbufferservice(bufservice *databuffer) error {
    if _, ok := bufservice.dataproviders[s.servicename()]; ok {
        return fmt.errorf("%v data provider already registered with data buffer.", s.servicename())
    }
    newincomingchan := make(chan *dataobject, 1)
    newoutgoingchan := make(chan *dataobject, 1)
    s.incomingbuffchan = newincomingchan
    s.outgoingbuffchans = append(s.outgoingbuffchans, newoutgoingchan)
    bufservice.dataproviders[s.servicename()] = dataproviderinfo{
        incomingchan: newoutgoingchan, //our outgoing channel is their incoming
        outgoingchan: newincomingchan, // our incoming channel is their outgoing
    }
    s.databufferservice = bufservice
    bufservice.newprovider <- s.servicename() //the databuffer service listens for new services and creates a new goroutine for buffering
    return nil
}

// servicename satisfies the dataprovider interface. it returns the name of the service.
func (s servicea) servicename() string {
    return s.name
}

func main() {
    var bufferedservices []dataprovider
    fmt.println("instantiating and starting data buffer service...")
    bufservice := newdatabuffer()
    err := bufservice.start()
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    defer bufservice.stop()
    fmt.println("data buffer service successfully started.")

    fmt.println("instantiating and starting service a...")
    servicea, err := newservicea()
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    bufferedservices = append(bufferedservices, *servicea)
    err = servicea.start()
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    defer servicea.stop()
    fmt.println("service a successfully started.")

    fmt.println("registering services with data buffer...")
    for _, s := range bufferedservices {
        _ = s.registerwithbufferservice(bufservice) // ignoring error msgs for base case
    }
    fmt.println("registration complete.")

    fmt.println("beginning recording...")
    _ = atomic.addint32(&servicea.listeners, 1)
    err = servicea.startrecording(defaultpollinginterval)
    if err != nil {
        panic(fmt.sprintf("%v", err))
    }
    for {
        select {
        case rtd := <-servicea.incomingbuffchan:
            fmt.println(rtd)
        case <-servicea.quitchan:
            atomic.storeint32(&servicea.listeners, 0)
            bufservice.quit<-struct{}{}
        }
    }
}

在 go 1.17 上运行。运行示例时,它应该每 10 秒打印以下内容:

Time to record...
Sending data to Data buffer...
Data sent.

但是数据缓冲区永远不会进入 indata := <-in 情况。


正确答案


为了诊断这个问题,我将 fmt.println("sending data to data buffer...") 更改为 fmt.println("sending data to data buffer...", s.outgoingbuffchans) ,输出为:

time to record...
sending data to data buffer... []

所以您实际上并没有将数据发送到任何通道。原因是:

func (s servicea) registerwithbufferservice(bufservice *databuffer) error {

由于当您执行 s.outgoingbuffchans = append(s.outgoingbuffchans, newoutgoingchan) 时,接收器不是指针,因此您将在 servicea 的副本中更改 s.outgoingbuffchans ,该副本在函数退出时将被丢弃。要修复此更改:

func (s servicea) registerwithbufferservice(bufservice *databuffer) error {

func (s *servicea) registerwithbufferservice(bufservice *databuffer) error {

bufferedservices = append(bufferedservices, *servicea)

bufferedservices = append(bufferedservices, servicea)

修改后的版本输出:

Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA
Time to record...
Sending data to Data buffer... [0xc0000d8060]
Data sent.
Received data from: SERVICEA

因此,这解决了报告的问题(如果存在其他问题,我不会感到惊讶,但希望这能为您指明正确的方向)。我确实注意到您最初发布的代码确实使用了指针接收器,因此可能会遇到另一个问题(但在这种情况下很难对代码片段进行评论)。

终于介绍完啦!小伙伴们,这篇关于《为何数据被写入通道却未被接收者 goroutine 读取?》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!

版本声明
本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
使用 PHP 实现将数字转换为中文大写的技巧使用 PHP 实现将数字转换为中文大写的技巧
上一篇
使用 PHP 实现将数字转换为中文大写的技巧
如何获取Windows 10的官方ISO镜像文件?
下一篇
如何获取Windows 10的官方ISO镜像文件?
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • PPTFake答辩PPT生成器:一键生成高效专业的答辩PPT
    PPTFake答辩PPT生成器
    PPTFake答辩PPT生成器,专为答辩准备设计,极致高效生成PPT与自述稿。智能解析内容,提供多样模板,数据可视化,贴心配套服务,灵活自主编辑,降低制作门槛,适用于各类答辩场景。
    3次使用
  • SEO标题Lovart AI:全球首个设计领域AI智能体,实现全链路设计自动化
    Lovart
    SEO摘要探索Lovart AI,这款专注于设计领域的AI智能体,通过多模态模型集成和智能任务拆解,实现全链路设计自动化。无论是品牌全案设计、广告与视频制作,还是文创内容创作,Lovart AI都能满足您的需求,提升设计效率,降低成本。
    3次使用
  • 美图AI抠图:行业领先的智能图像处理技术,3秒出图,精准无误
    美图AI抠图
    美图AI抠图,依托CVPR 2024竞赛亚军技术,提供顶尖的图像处理解决方案。适用于证件照、商品、毛发等多场景,支持批量处理,3秒出图,零PS基础也能轻松操作,满足个人与商业需求。
    26次使用
  • SEO标题PetGPT:智能桌面宠物程序,结合AI对话的个性化陪伴工具
    PetGPT
    SEO摘要PetGPT 是一款基于 Python 和 PyQt 开发的智能桌面宠物程序,集成了 OpenAI 的 GPT 模型,提供上下文感知对话和主动聊天功能。用户可高度自定义宠物的外观和行为,支持插件热更新和二次开发。适用于需要陪伴和效率辅助的办公族、学生及 AI 技术爱好者。
    24次使用
  • 可图AI图片生成:快手可灵AI2.0引领图像创作新时代
    可图AI图片生成
    探索快手旗下可灵AI2.0发布的可图AI2.0图像生成大模型,体验从文本生成图像、图像编辑到风格转绘的全链路创作。了解其技术突破、功能创新及在广告、影视、非遗等领域的应用,领先于Midjourney、DALL-E等竞品。
    51次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码