Golang程序中的并发问题
积累知识,胜过积蓄金银!毕竟在Golang开发的过程中,会遇到各种各样的问题,往往都是一些细节知识点还没有掌握好而导致的,因此基础知识点的积累是很重要的。下面本文《Golang程序中的并发问题》,就带大家讲解一下知识点,若是你对本文感兴趣,或者是想搞懂其中某个知识点,就请你继续往下看吧~
我正在尝试创建一个充当代理服务器并可以动态切换到新端点的程序。但我遇到一个问题,在调用 switchovertonewendpoint() 后,仍然有一些代理对象连接到原始端点 8.8.8.8 ,应该将其关闭。
package main
import (
"net"
"sync"
"sync/atomic"
"time"
)
type proxy struct {
id int32
from, to *net.tcpconn
}
var switchover int32 = 0
func setswitchover() {
atomic.storeint32((*int32)(&switchover), 1)
}
func switchoverenabled() bool {
return atomic.loadint32((*int32)(&switchover)) == 1
}
var proxies map[int32]*proxy = make(map[int32]*proxy, 0)
var proxyseq int32 = 0
var mu sync.rwmutex
func addproxy(from *net.tcpconn) {
mu.lock()
proxyseq += 1
proxy := &proxy{id: proxyseq, from: from}
proxies[proxyseq] = proxy
mu.unlock()
var toaddr string
if switchoverenabled() {
toaddr = "1.1.1.1"
} else {
toaddr = "8.8.8.8"
}
tcpaddr, _ := net.resolvetcpaddr("tcp4", toaddr)
toconn, err := net.dialtcp("tcp", nil, tcpaddr)
if err != nil {
panic(err)
}
proxy.to = toconn
}
func switchovertonewendpoint() {
mu.rlock()
closedproxies := proxies
mu.runlock()
setswitchover()
for _, proxy := range closedproxies {
proxy.from.close()
proxy.to.close()
mu.lock()
delete(proxies, proxy.id)
mu.unlock()
}
}
func main() {
tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432")
ln, _ := net.listentcp("tcp", tcpaddr)
go func() {
time.sleep(time.second * 30)
switchovertonewendpoint()
}()
for {
clientconn, err := ln.accepttcp()
if err != nil {
panic(err)
}
go addproxy(clientconn)
}
}
想了一会儿,我猜问题出在
mu.rlock()
closedproxies := proxies
mu.runlock()
但我不确定这是否是根本原因,以及是否可以通过将其替换为以下内容来修复它:
closedProxies := make([]*Proxy, 0)
mu.RLock()
for _, proxy := range proxies {
closedProxies = append(closedProxies, proxy)
}
mu.RUnlock()
由于该案例很难重现,所以有专业人士可以提供想法或提示吗?欢迎任何评论。提前致谢。
正确答案
问题
改变是必要的。在最初的实现中, latedproxies 持有相同的映射。请参阅此演示:
package main
import "fmt"
func main() {
proxies := make(map[int]int, 0)
for i := 0; i < 10; i++ {
proxies[i] = i
}
closeproxies := proxies
proxies[10] = 10
proxies[11] = 11
for k := range closeproxies {
delete(proxies, k)
}
fmt.printf("items left: %d\n", len(proxies))
// output:
// items left: 0
}
但这不是根本原因。可以在复制 closeproxies 之后但在调用 setswitchover 之前添加新代理。在这种情况下,新代理连接到旧地址,但不在 closeproxies 中。我认为这是根本原因。
还有一个问题。在设置 to 字段之前,将向 proxies 添加新代理。程序可能希望在设置 to 字段之前关闭此代理,从而导致恐慌。
可靠的设计
这个想法是将所有端点放入一个切片中,并让每个端点管理自己的代理列表。所以我们只需要跟踪当前端点的索引。当我们想要切换到另一个端点时,我们只需要更改索引,并告诉过时的端点清除其代理。剩下的唯一复杂的事情是确保过时的端点可以清除其所有代理。请参阅下面的实现:
manager.go
这就是这个想法的实现。
package main
import (
"sync"
)
// conn is abstraction of a connection to make manager easy to test.
type conn interface {
close() error
}
// dialer is abstraction of a dialer to make manager easy to test.
type dialer interface {
dial(addr string) (conn, error)
}
type manager struct {
// mucurrent protects the "current" member.
mucurrent sync.rwmutex
current int // when current is -1, the manager is shuted down.
endpoints []*endpoint
// mu protects the whole switch action.
mu sync.mutex
}
func newmanager(dialer dialer, addresses ...string) *manager {
if len(addresses) < 2 {
panic("a manger should handle at least 2 addresses")
}
endpoints := make([]*endpoint, len(addresses))
for i, addr := range addresses {
endpoints[i] = &endpoint{
address: addr,
dialer: dialer,
}
}
return &manager{
endpoints: endpoints,
}
}
func (m *manager) addproxy(from conn) {
// 1. addproxy will wait when the write lock of m.mucurrent is taken.
// once the write lock is released, addproxy will connect to the new endpoint.
// switch only holds the write lock for a short time, and switch is called
// not so frequently, so addproxy won't wait too much.
// 2. switch will wait if there is any addproxy holding the read lock of
// m.mucurrent. that means switch waits longer. the advantage is that when
// e.clear is called in switch, all addproxy requests to the old endpoint
// are done. so it's safe to call e.clear then.
m.mucurrent.rlock()
defer m.mucurrent.runlock()
current := m.current
// do not accept any new connection when m has been shutdown.
if current == -1 {
from.close()
return
}
m.endpoints[current].addproxy(from)
}
func (m *manager) switch() {
// in a real world, switch is called not so frequently.
// so it's ok to add a lock here.
// and it's necessary to make sure the old endpoint is cleared and ready
// for use in the future.
m.mu.lock()
defer m.mu.unlock()
// take the write lock of m.mucurrent.
// it waits for all the addproxy requests holding the read lock to finish.
m.mucurrent.lock()
old := m.current
// do nothing when m has been shutdown.
if old == -1 {
m.mucurrent.unlock()
return
}
next := old + 1
if next >= len(m.endpoints) {
next = 0
}
m.current = next
m.mucurrent.unlock()
// when it reaches here, all addproxy requests to the old endpoint are done.
// and it's safe to call e.clear now.
m.endpoints[old].clear()
}
func (m *manager) shutdown() {
m.mu.lock()
defer m.mu.unlock()
m.mucurrent.lock()
current := m.current
m.current = -1
m.mucurrent.unlock()
m.endpoints[current].clear()
}
type proxy struct {
from, to conn
}
type endpoint struct {
address string
dialer dialer
mu sync.mutex
proxies []*proxy
}
func (e *endpoint) clear() {
for _, p := range e.proxies {
p.from.close()
p.to.close()
}
// assign a new slice to e.proxies, and the gc will collect the old one.
e.proxies = []*proxy{}
}
func (e *endpoint) addproxy(from conn) {
toconn, err := e.dialer.dial(e.address)
if err != nil {
// close the from connection so that the client will reconnect?
from.close()
return
}
e.mu.lock()
defer e.mu.unlock()
e.proxies = append(e.proxies, &proxy{from: from, to: toconn})
}
main.go
这个演示展示了如何使用之前实现的manager类型:
package main
import (
"net"
"time"
)
type realdialer struct{}
func (d realdialer) dial(addr string) (conn, error) {
tcpaddr, err := net.resolvetcpaddr("tcp4", addr)
if err != nil {
return nil, err
}
return net.dialtcp("tcp", nil, tcpaddr)
}
func main() {
manager := newmanager(realdialer{}, "1.1.1.1", "8.8.8.8")
tcpaddr, _ := net.resolvetcpaddr("tcp4", "0.0.0.0:5432")
ln, _ := net.listentcp("tcp", tcpaddr)
go func() {
for range time.tick(30 * time.second) {
manager.switch()
}
}()
for {
clientconn, err := ln.accepttcp()
if err != nil {
panic(err)
}
go manager.addproxy(clientconn)
}
}
manager_test.go
使用以下命令运行测试:go test ./... -race -count 10
package main
import (
"errors"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/google/uuid"
)
func TestManager(t *testing.T) {
addresses := []string{"1.1.1.1", "8.8.8.8"}
dialer := newDialer(addresses...)
manager := NewManager(dialer, addresses...)
ch := make(chan int, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for range ch {
manager.Switch()
}
wg.Done()
}()
count := 1000
total := count * 10
wg.Add(total)
fromConn := &fakeFromConn{}
for i := 0; i < total; i++ {
if i%count == count-1 {
ch <- 0
}
go func() {
manager.AddProxy(fromConn)
wg.Done()
}()
}
close(ch)
wg.Wait()
manager.Shutdown()
for _, s := range dialer.servers {
left := len(s.conns)
if left != 0 {
t.Errorf("server %s, unexpected connections left: %d", s.addr, left)
}
}
closedCount := fromConn.closedCount.Load()
if closedCount != int32(total) {
t.Errorf("want closed count: %d, got: %d", total, closedCount)
}
}
type fakeFromConn struct {
closedCount atomic.Int32
}
func (c *fakeFromConn) Close() error {
c.closedCount.Add(1)
return nil
}
type fakeToConn struct {
id uuid.UUID
server *fakeServer
}
func (c *fakeToConn) Close() error {
if c.id == uuid.Nil {
return nil
}
c.server.removeConn(c.id)
return nil
}
type fakeServer struct {
addr string
mu sync.Mutex
conns map[uuid.UUID]bool
}
func (s *fakeServer) addConn() (uuid.UUID, error) {
s.mu.Lock()
defer s.mu.Unlock()
id, err := uuid.NewRandom()
if err == nil {
s.conns[id] = true
}
return id, err
}
func (s *fakeServer) removeConn(id uuid.UUID) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.conns, id)
}
type fakeDialer struct {
servers map[string]*fakeServer
}
func newDialer(addresses ...string) *fakeDialer {
servers := make(map[string]*fakeServer)
for _, addr := range addresses {
servers[addr] = &fakeServer{
addr: addr,
conns: make(map[uuid.UUID]bool),
}
}
return &fakeDialer{
servers: servers,
}
}
func (d *fakeDialer) Dial(addr string) (Conn, error) {
n := rand.Intn(100)
if n == 0 {
return nil, errors.New("fake network error")
}
// Simulate network latency.
time.Sleep(time.Duration(n) * time.Millisecond)
s := d.servers[addr]
id, err := s.addConn()
if err != nil {
return nil, err
}
conn := &fakeToConn{
id: id,
server: s,
}
return conn, nil
}
理论要掌握,实操不能落!以上关于《Golang程序中的并发问题》的详细介绍,大家都掌握了吧!如果想要继续提升自己的能力,那么就来关注golang学习网公众号吧!
测量 gRPC 响应的数据量
- 上一篇
- 测量 gRPC 响应的数据量
- 下一篇
- 解决Win11中IT管理员限制应用程序访问某些区域的问题
-
- Golang · Go问答 | 1年前 |
- 在读取缓冲通道中的内容之前退出
- 139浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 戈兰岛的全球 GOPRIVATE 设置
- 204浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将结构作为参数传递给 xml-rpc
- 325浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何用golang获得小数点以下两位长度?
- 478浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何通过 client-go 和 golang 检索 Kubernetes 指标
- 486浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将多个“参数”映射到单个可变参数的习惯用法
- 439浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将 HTTP 响应正文写入文件后出现 EOF 错误
- 357浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 结构中映射的匿名列表的“复合文字中缺少类型”
- 352浏览 收藏
-
- Golang · Go问答 | 1年前 |
- NATS Jetstream 的性能
- 101浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将复杂的字符串输入转换为mapstring?
- 440浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 相当于GoLang中Java将Object作为方法参数传递
- 212浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何确保所有 goroutine 在没有 time.Sleep 的情况下终止?
- 143浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3183次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3394次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3426次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4531次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3803次使用
-
- GoLand调式动态执行代码
- 2023-01-13 502浏览
-
- 用Nginx反向代理部署go写的网站。
- 2023-01-17 502浏览
-
- Golang取得代码运行时间的问题
- 2023-02-24 501浏览
-
- 请问 go 代码如何实现在代码改动后不需要Ctrl+c,然后重新 go run *.go 文件?
- 2023-01-08 501浏览
-
- 如何从同一个 io.Reader 读取多次
- 2023-04-11 501浏览

