当前位置:首页 > 文章列表 > Golang > Go教程 > Go语言实现的可读性更高的并发神库详解

Go语言实现的可读性更高的并发神库详解

来源:脚本之家 2023-02-25 09:25:31 0浏览 收藏

从现在开始,我们要努力学习啦!今天我给大家带来《Go语言实现的可读性更高的并发神库详解》,感兴趣的朋友请继续看下去吧!下文中的内容我们主要会涉及到并发、库、可读性等等知识点,如果在阅读本文过程中有遇到不清楚的地方,欢迎留言呀!我们一起讨论,一起学习!

  • 更难出现goroutine泄漏
  • 处理panic更友好
  • 并发代码可读性高

从简介上看主要封装功能如下:

  • waitGroup进行封装,避免了产生大量重复代码,并且也封装recover,安全性更高
  • 提供panics.Catcher封装recover逻辑,统一捕获panic,打印调用栈一些信息
  • 提供一个并发执行任务的worker池,可以控制并发度、goroutine可以进行复用,支持函数签名,同时提供了stream方法来保证结果有序
  • 提供ForEachmap方法优雅的处理切片

接下来就区分模块来介绍一下这个库;

仓库地址:github.com/sourcegraph…

WaitGroup的封装

Go语言标准库有提供sync.waitGroup控制等待goroutine,我们一般会写出如下代码:

func main(){
    var wg sync.WaitGroup
    for i:=0; i 

上述代码我们需要些一堆重复代码,并且需要单独在每一个func中处理recover逻辑,所以conc库对其进行了封装,代码简化如下:

func main() {
	wg := conc.NewWaitGroup()
	for i := 0; i < 10; i++ {
		wg.Go(doSomething)
	}
	wg.Wait()
}
func doSomething() {
	fmt.Println("test")
}

conc库封装也比较简单,结构如下:

type WaitGroup struct {
	wg sync.WaitGroup
	pc panics.Catcher
}

其自己实现了Catcher类型对recover逻辑进行了封装,封装思路如下:

type Catcher struct {
	recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指针类型,RecoveredPanic是捕获的recover封装,封装了堆栈等信息:

type RecoveredPanic struct {
	// The original value of the panic.
	Value any
	// The caller list as returned by runtime.Callers when the panic was
	// recovered. Can be used to produce a more detailed stack information with
	// runtime.CallersFrames.
	Callers []uintptr
	// The formatted stacktrace from the goroutine where the panic was recovered.
	// Easier to use than Callers.
	Stack []byte
}

提供了Try方法执行方法,只会记录第一个panic的goroutine信息:

func (p *Catcher) Try(f func()) {
	defer p.tryRecover()
	f()
}
func (p *Catcher) tryRecover() {
	if val := recover(); val != nil {
		rp := NewRecoveredPanic(1, val)
        // 只会记录第一个panic的goroutine信息
		p.recovered.CompareAndSwap(nil, &rp)
	}
}

提供了Repanic()方法用来重放捕获的panic:

func (p *Catcher) Repanic() {
	if val := p.Recovered(); val != nil {
		panic(val)
	}
}
func (p *Catcher) Recovered() *RecoveredPanic {
	return p.recovered.Load()
}

waitGroup对此也分别提供了Wait()WaitAndRecover()方法:

func (h *WaitGroup) Wait() {
	h.wg.Wait()
	// Propagate a panic if we caught one from a child goroutine.
	h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
	h.wg.Wait()
	// Return a recovered panic if we caught one from a child goroutine.
	return h.pc.Recovered()
}

wait方法只要有一个goroutine发生panic就会向上抛出panic,比较简单粗暴;

waitAndRecover方法只有有一个goroutine发生panic就会返回第一个recover的goroutine信息;

总结:conc库对waitGrouop的封装总体是比较不错的,可以减少重复的代码;

worker池

conc提供了几种类型的worker池:

  • ContextPool:可以传递context的pool,若有goroutine发生错误可以cancel其他goroutine
  • ErrorPool:通过参数可以控制只收集第一个error还是所有error
  • ResultContextPool:若有goroutine发生错误会cancel其他goroutine并且收集错误
  • RestultPool:收集work池中每个任务的执行结果,并不能保证顺序,保证顺序需要使用stream或者iter.map;

我们来看一个简单的例子:

import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i 

在创建pool时有如下方法可以调用:

  • p.WithMaxGoroutines()配置pool中goroutine的最大数量
  • p.WithErrors:配置pool中的task是否返回error
  • p.WithContext(ctx):配置pool中运行的task当遇到第一个error要取消
  • p.WithFirstError:配置pool中的task只返回第一个error
  • p.WithCollectErrored:配置pool的task收集所有error

pool的基础结构如下:

type Pool struct {
	handle   conc.WaitGroup
	limiter  limiter
	tasks    chan func()
	initOnce sync.Once
}

limiter是控制器,用chan来控制goroutine的数量:

type limiter chan struct{}
func (l limiter) limit() int {
	return cap(l)
}
func (l limiter) release() {
	if l != nil {
		<-l
	}
}

pool的核心逻辑也比较简单,如果没有设置limiter,那么就看有没有空闲的worker,否则就创建一个新的worker,然后投递任务进去;

如果设置了limiter,达到了limiter worker数量上限,就把任务投递给空闲的worker,没有空闲就阻塞等着;

func (p *Pool) Go(f func()) {
	p.init()
	if p.limiter == nil {
		// 没有限制
		select {
		case p.tasks 

这里work使用的是一个无缓冲的channel,这种复用方式很巧妙,如果goroutine执行很快避免创建过多的goroutine;

使用pool处理任务不能保证有序性,conc库又提供了Stream方法,返回结果可以保持顺序;

Stream

Steam的实现也是依赖于pool,在此基础上做了封装保证结果的顺序性,先看一个例子:

func ExampleStream() {
	times := []int{20, 52, 16, 45, 4, 80}
	stream := stream2.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		stream.Go(func() stream2.Callback {
			time.Sleep(dur)
			// This will print in the order the tasks were submitted
			return func() { fmt.Println(dur) }
		})
	}
	stream.Wait()
	// Output:
	// 20ms
	// 52ms
	// 16ms
	// 45ms
	// 4ms
	// 80ms
}

stream的结构如下:

type Stream struct {
	pool             pool.Pool
	callbackerHandle conc.WaitGroup
	queue            chan callbackCh
	initOnce sync.Once
}

queue是一个channel类型,callbackCh也是channel类型 - chan func():

type callbackCh chan func()

在提交goroutine时按照顺序生成callbackCh传递结果:

func (s *Stream) Go(f Task) {
	s.init()
	// Get a channel from the cache.
	ch := getCh()
	// Queue the channel for the callbacker.
	s.queue 

ForEach和map

ForEach

conc库提供了ForEach方法可以优雅的并发处理切片,看一下官方的例子:

conc库使用泛型进行了封装,我们只需要关注handle代码即可,避免冗余代码,我们自己动手写一个例子:

func main() {
	input := []int{1, 2, 3, 4}
	iterator := iter.Iterator[int]{
		MaxGoroutines: len(input) / 2,
	}
	iterator.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})
	fmt.Println(input)
}

ForEach内部实现为Iterator结构及核心逻辑如下:

type Iterator[T any] struct {
	MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
	if iter.MaxGoroutines == 0 {
		// iter is a value receiver and is hence safe to mutate
		iter.MaxGoroutines = defaultMaxGoroutines()
	}
	numInput := len(input)
	if iter.MaxGoroutines > numInput {
		// No more concurrent tasks than the number of input items.
		iter.MaxGoroutines = numInput
	}
	var idx atomic.Int64
	// 通过atomic控制仅创建一个闭包
	task := func() {
		i := int(idx.Add(1) - 1)
		for ; i 

可以设置并发的goroutine数量,默认取的是GOMAXPROCS ,也可以自定义传参;

并发执行这块设计的很巧妙,仅创建了一个闭包,通过atomic控制idx,避免频繁触发GC;

map

conc库提供的map方法可以得到对切片中元素结果,官方例子:

使用map可以提高代码的可读性,并且减少了冗余代码,自己写个例子:

func main() {
	input := []int{1, 2, 3, 4}
	mapper := iter.Mapper[int, bool]{
		MaxGoroutines: len(input) / 2,
	}
	results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
	fmt.Println(results)
	// Output:
	// [false true false true]
}

map的实现也依赖于Iterator,也是调用的ForEachIdx方法,区别于ForEach是记录处理结果;

总结

花了小半天时间看了一下这个库,很多设计点值得我们学习,总结一下我学习到的知识点:

  • conc.WatiGroup对Sync.WaitGroup进行了封装,对Add、Done、Recover进行了封装,提高了可读性,避免了冗余代码
  • ForEach、Map方法可以更优雅的并发处理切片,代码简洁易读,在实现上Iterator中的并发处理使用atomic来控制只创建一个闭包,避免了GC性能问题
  • pool是一个并发的协程队列,可以控制协程的数量,实现上也很巧妙,使用一个无缓冲的channel作为worker,如果goroutine执行速度快,避免了创建多个goroutine
  • stream是一个保证顺序的并发协程队列,实现上也很巧妙,使用sync.Pool在提交goroutine时控制顺序,值得我们学习;

到这里,我们也就讲完了《Go语言实现的可读性更高的并发神库详解》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于golang的知识点!

版本声明
本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
Go实现set类型的示例代码Go实现set类型的示例代码
上一篇
Go实现set类型的示例代码
Go语言通过WaitGroup实现控制并发的示例详解
下一篇
Go语言通过WaitGroup实现控制并发的示例详解
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • 笔灵AI生成答辩PPT:高效制作学术与职场PPT的利器
    笔灵AI生成答辩PPT
    探索笔灵AI生成答辩PPT的强大功能,快速制作高质量答辩PPT。精准内容提取、多样模板匹配、数据可视化、配套自述稿生成,让您的学术和职场展示更加专业与高效。
    16次使用
  • 知网AIGC检测服务系统:精准识别学术文本中的AI生成内容
    知网AIGC检测服务系统
    知网AIGC检测服务系统,专注于检测学术文本中的疑似AI生成内容。依托知网海量高质量文献资源,结合先进的“知识增强AIGC检测技术”,系统能够从语言模式和语义逻辑两方面精准识别AI生成内容,适用于学术研究、教育和企业领域,确保文本的真实性和原创性。
    25次使用
  • AIGC检测服务:AIbiye助力确保论文原创性
    AIGC检测-Aibiye
    AIbiye官网推出的AIGC检测服务,专注于检测ChatGPT、Gemini、Claude等AIGC工具生成的文本,帮助用户确保论文的原创性和学术规范。支持txt和doc(x)格式,检测范围为论文正文,提供高准确性和便捷的用户体验。
    30次使用
  • 易笔AI论文平台:快速生成高质量学术论文的利器
    易笔AI论文
    易笔AI论文平台提供自动写作、格式校对、查重检测等功能,支持多种学术领域的论文生成。价格优惠,界面友好,操作简便,适用于学术研究者、学生及论文辅导机构。
    42次使用
  • 笔启AI论文写作平台:多类型论文生成与多语言支持
    笔启AI论文写作平台
    笔启AI论文写作平台提供多类型论文生成服务,支持多语言写作,满足学术研究者、学生和职场人士的需求。平台采用AI 4.0版本,确保论文质量和原创性,并提供查重保障和隐私保护。
    35次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码