Golang程序中的并发问题
积累知识,胜过积蓄金银!毕竟在Golang开发的过程中,会遇到各种各样的问题,往往都是一些细节知识点还没有掌握好而导致的,因此基础知识点的积累是很重要的。下面本文《Golang程序中的并发问题》,就带大家讲解一下知识点,若是你对本文感兴趣,或者是想搞懂其中某个知识点,就请你继续往下看吧~
我正在尝试创建一个充当代理服务器并可以动态切换到新端点的程序。但我遇到一个问题,在调用 switchovertonewendpoint()
后,仍然有一些代理对象连接到原始端点 8.8.8.8
,应该将其关闭。
package main import ( "net" "sync" "sync/atomic" "time" ) type proxy struct { id int32 from, to *net.tcpconn } var switchover int32 = 0 func setswitchover() { atomic.storeint32((*int32)(&switchover), 1) } func switchoverenabled() bool { return atomic.loadint32((*int32)(&switchover)) == 1 } var proxies map[int32]*proxy = make(map[int32]*proxy, 0) var proxyseq int32 = 0 var mu sync.rwmutex func addproxy(from *net.tcpconn) { mu.lock() proxyseq += 1 proxy := &proxy{id: proxyseq, from: from} proxies[proxyseq] = proxy mu.unlock() var toaddr string if switchoverenabled() { toaddr = "1.1.1.1" } else { toaddr = "8.8.8.8" } tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr) toconn, err := net.dialtcp("tcp", nil, tcpaddr) if err != nil { panic(err) } proxy.to = toconn } func switchovertonewendpoint() { mu.rlock() closedproxies := proxies mu.runlock() setswitchover() for _, proxy := range closedproxies { proxy.from.close() proxy.to.close() mu.lock() delete(proxies, proxy.id) mu.unlock() } } func main() { tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { time.sleep(time.second * 30) switchovertonewendpoint() }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go addproxy(clientconn) } }
想了一会儿,我猜问题出在
mu.rlock() closedproxies := proxies mu.runlock()
但我不确定这是否是根本原因,以及是否可以通过将其替换为以下内容来修复它:
closedProxies := make([]*Proxy, 0) mu.RLock() for _, proxy := range proxies { closedProxies = append(closedProxies, proxy) } mu.RUnlock()
由于该案例很难重现,所以有专业人士可以提供想法或提示吗?欢迎任何评论。提前致谢。
正确答案
问题
改变是必要的。在最初的实现中, latedproxies
持有相同的映射。请参阅此演示:
package main import "fmt" func main() { proxies := make(map[int]int, 0) for i := 0; i < 10; i++ { proxies[i] = i } closeproxies := proxies proxies[10] = 10 proxies[11] = 11 for k := range closeproxies { delete(proxies, k) } fmt.printf("items left: %d\n", len(proxies)) // output: // items left: 0 }
但这不是根本原因。可以在复制 closeproxies
之后但在调用 setswitchover
之前添加新代理。在这种情况下,新代理连接到旧地址,但不在 closeproxies
中。我认为这是根本原因。
还有一个问题。在设置 to
字段之前,将向 proxies
添加新代理。程序可能希望在设置 to
字段之前关闭此代理,从而导致恐慌。
可靠的设计
这个想法是将所有端点放入一个切片中,并让每个端点管理自己的代理列表。所以我们只需要跟踪当前端点的索引。当我们想要切换到另一个端点时,我们只需要更改索引,并告诉过时的端点清除其代理。剩下的唯一复杂的事情是确保过时的端点可以清除其所有代理。请参阅下面的实现:
manager.go
这就是这个想法的实现。
package main import ( "sync" ) // conn is abstraction of a connection to make manager easy to test. type conn interface { close() error } // dialer is abstraction of a dialer to make manager easy to test. type dialer interface { dial(addr string) (conn, error) } type manager struct { // mucurrent protects the "current" member. mucurrent sync.rwmutex current int // when current is -1, the manager is shuted down. endpoints []*endpoint // mu protects the whole switch action. mu sync.mutex } func newmanager(dialer dialer, addresses ...string) *manager { if len(addresses) < 2 { panic("a manger should handle at least 2 addresses") } endpoints := make([]*endpoint, len(addresses)) for i, addr := range addresses { endpoints[i] = &endpoint{ address: addr, dialer: dialer, } } return &manager{ endpoints: endpoints, } } func (m *manager) addproxy(from conn) { // 1. addproxy will wait when the write lock of m.mucurrent is taken. // once the write lock is released, addproxy will connect to the new endpoint. // switch only holds the write lock for a short time, and switch is called // not so frequently, so addproxy won't wait too much. // 2. switch will wait if there is any addproxy holding the read lock of // m.mucurrent. that means switch waits longer. the advantage is that when // e.clear is called in switch, all addproxy requests to the old endpoint // are done. so it's safe to call e.clear then. m.mucurrent.rlock() defer m.mucurrent.runlock() current := m.current // do not accept any new connection when m has been shutdown. if current == -1 { from.close() return } m.endpoints[current].addproxy(from) } func (m *manager) switch() { // in a real world, switch is called not so frequently. // so it's ok to add a lock here. // and it's necessary to make sure the old endpoint is cleared and ready // for use in the future. m.mu.lock() defer m.mu.unlock() // take the write lock of m.mucurrent. // it waits for all the addproxy requests holding the read lock to finish. m.mucurrent.lock() old := m.current // do nothing when m has been shutdown. if old == -1 { m.mucurrent.unlock() return } next := old + 1 if next >= len(m.endpoints) { next = 0 } m.current = next m.mucurrent.unlock() // when it reaches here, all addproxy requests to the old endpoint are done. // and it's safe to call e.clear now. m.endpoints[old].clear() } func (m *manager) shutdown() { m.mu.lock() defer m.mu.unlock() m.mucurrent.lock() current := m.current m.current = -1 m.mucurrent.unlock() m.endpoints[current].clear() } type proxy struct { from, to conn } type endpoint struct { address string dialer dialer mu sync.mutex proxies []*proxy } func (e *endpoint) clear() { for _, p := range e.proxies { p.from.close() p.to.close() } // assign a new slice to e.proxies, and the gc will collect the old one. e.proxies = []*proxy{} } func (e *endpoint) addproxy(from conn) { toconn, err := e.dialer.dial(e.address) if err != nil { // close the from connection so that the client will reconnect? from.close() return } e.mu.lock() defer e.mu.unlock() e.proxies = append(e.proxies, &proxy{from: from, to: toconn}) }
main.go
这个演示展示了如何使用之前实现的manager类型:
package main import ( "net" "time" ) type realdialer struct{} func (d realdialer) dial(addr string) (conn, error) { tcpaddr, err := net.resolvetcpaddr("tcp4", addr) if err != nil { return nil, err } return net.dialtcp("tcp", nil, tcpaddr) } func main() { manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8") tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432") ln, _ := net.listentcp("tcp", tcpaddr) go func() { for range time.tick(30 * time.second) { manager.switch() } }() for { clientconn, err := ln.accepttcp() if err != nil { panic(err) } go manager.addproxy(clientconn) } }
manager_test.go
使用以下命令运行测试:go test ./... -race -count 10
package main import ( "errors" "math/rand" "sync" "sync/atomic" "testing" "time" "github.com/google/uuid" ) func TestManager(t *testing.T) { addresses := []string{"1.1.1.1", "8.8.8.8"} dialer := newDialer(addresses...) manager := NewManager(dialer, addresses...) ch := make(chan int, 1) var wg sync.WaitGroup wg.Add(1) go func() { for range ch { manager.Switch() } wg.Done() }() count := 1000 total := count * 10 wg.Add(total) fromConn := &fakeFromConn{} for i := 0; i < total; i++ { if i%count == count-1 { ch <- 0 } go func() { manager.AddProxy(fromConn) wg.Done() }() } close(ch) wg.Wait() manager.Shutdown() for _, s := range dialer.servers { left := len(s.conns) if left != 0 { t.Errorf("server %s, unexpected connections left: %d", s.addr, left) } } closedCount := fromConn.closedCount.Load() if closedCount != int32(total) { t.Errorf("want closed count: %d, got: %d", total, closedCount) } } type fakeFromConn struct { closedCount atomic.Int32 } func (c *fakeFromConn) Close() error { c.closedCount.Add(1) return nil } type fakeToConn struct { id uuid.UUID server *fakeServer } func (c *fakeToConn) Close() error { if c.id == uuid.Nil { return nil } c.server.removeConn(c.id) return nil } type fakeServer struct { addr string mu sync.Mutex conns map[uuid.UUID]bool } func (s *fakeServer) addConn() (uuid.UUID, error) { s.mu.Lock() defer s.mu.Unlock() id, err := uuid.NewRandom() if err == nil { s.conns[id] = true } return id, err } func (s *fakeServer) removeConn(id uuid.UUID) { s.mu.Lock() defer s.mu.Unlock() delete(s.conns, id) } type fakeDialer struct { servers map[string]*fakeServer } func newDialer(addresses ...string) *fakeDialer { servers := make(map[string]*fakeServer) for _, addr := range addresses { servers[addr] = &fakeServer{ addr: addr, conns: make(map[uuid.UUID]bool), } } return &fakeDialer{ servers: servers, } } func (d *fakeDialer) Dial(addr string) (Conn, error) { n := rand.Intn(100) if n == 0 { return nil, errors.New("fake network error") } // Simulate network latency. time.Sleep(time.Duration(n) * time.Millisecond) s := d.servers[addr] id, err := s.addConn() if err != nil { return nil, err } conn := &fakeToConn{ id: id, server: s, } return conn, nil }
理论要掌握,实操不能落!以上关于《Golang程序中的并发问题》的详细介绍,大家都掌握了吧!如果想要继续提升自己的能力,那么就来关注golang学习网公众号吧!

- 上一篇
- 测量 gRPC 响应的数据量

- 下一篇
- 解决Win11中IT管理员限制应用程序访问某些区域的问题
-
- Golang · Go问答 | 1年前 |
- 在读取缓冲通道中的内容之前退出
- 139浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 戈兰岛的全球 GOPRIVATE 设置
- 204浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将结构作为参数传递给 xml-rpc
- 325浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何用golang获得小数点以下两位长度?
- 477浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何通过 client-go 和 golang 检索 Kubernetes 指标
- 486浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将多个“参数”映射到单个可变参数的习惯用法
- 439浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将 HTTP 响应正文写入文件后出现 EOF 错误
- 357浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 结构中映射的匿名列表的“复合文字中缺少类型”
- 352浏览 收藏
-
- Golang · Go问答 | 1年前 |
- NATS Jetstream 的性能
- 101浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将复杂的字符串输入转换为mapstring?
- 440浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 相当于GoLang中Java将Object作为方法参数传递
- 212浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何确保所有 goroutine 在没有 time.Sleep 的情况下终止?
- 143浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- 毕业宝AIGC检测
- 毕业宝AIGC检测是“毕业宝”平台的AI生成内容检测工具,专为学术场景设计,帮助用户初步判断文本的原创性和AI参与度。通过与知网、维普数据库联动,提供全面检测结果,适用于学生、研究者、教育工作者及内容创作者。
- 12次使用
-
- AI Make Song
- AI Make Song是一款革命性的AI音乐生成平台,提供文本和歌词转音乐的双模式输入,支持多语言及商业友好版权体系。无论你是音乐爱好者、内容创作者还是广告从业者,都能在这里实现“用文字创造音乐”的梦想。平台已生成超百万首原创音乐,覆盖全球20个国家,用户满意度高达95%。
- 26次使用
-
- SongGenerator
- 探索SongGenerator.io,零门槛、全免费的AI音乐生成器。无需注册,通过简单文本输入即可生成多风格音乐,适用于内容创作者、音乐爱好者和教育工作者。日均生成量超10万次,全球50国家用户信赖。
- 23次使用
-
- BeArt AI换脸
- 探索BeArt AI换脸工具,免费在线使用,无需下载软件,即可对照片、视频和GIF进行高质量换脸。体验快速、流畅、无水印的换脸效果,适用于娱乐创作、影视制作、广告营销等多种场景。
- 26次使用
-
- 协启动
- SEO摘要协启动(XieQiDong Chatbot)是由深圳协启动传媒有限公司运营的AI智能服务平台,提供多模型支持的对话服务、文档处理和图像生成工具,旨在提升用户内容创作与信息处理效率。平台支持订阅制付费,适合个人及企业用户,满足日常聊天、文案生成、学习辅助等需求。
- 27次使用
-
- GoLand调式动态执行代码
- 2023-01-13 502浏览
-
- 用Nginx反向代理部署go写的网站。
- 2023-01-17 502浏览
-
- Golang取得代码运行时间的问题
- 2023-02-24 501浏览
-
- 请问 go 代码如何实现在代码改动后不需要Ctrl+c,然后重新 go run *.go 文件?
- 2023-01-08 501浏览
-
- 如何从同一个 io.Reader 读取多次
- 2023-04-11 501浏览