在系统启动时加载文件数据,处理新文件并更新映射状态
编程并不是一个机械性的工作,而是需要有思考,有创新的工作,语法是固定的,但解决问题的思路则是依靠人的思维,这就需要我们坚持学习和更新自己的知识。今天golang学习网就整理分享《在系统启动时加载文件数据,处理新文件并更新映射状态》,文章讲解的知识点主要包括,如果你对Golang方面的知识点感兴趣,就不要错过golang学习网,在这可以对大家的知识积累有所帮助,助力开发能力的提升。
我正在开发一个项目,在启动过程中,我需要读取某些文件并将其存储在地图的内存中,然后定期查找新文件(如果有),然后替换之前在地图中内存中的所有文件使用这个新数据启动。基本上每次如果有一个 full state
的新文件,那么我想将内存中的映射对象刷新到这个新文件,而不是附加到它。
下面的方法 loadatstartupandprocessnewchanges
在服务器启动期间被调用,该方法读取文件并将数据存储在内存中。它还启动一个 go 例程 detectnewfiles
,定期检查是否有任何新文件并将其存储在 deltachan
通道上,该通道稍后由另一个 go 例程 processnewfiles
访问以再次读取该新文件并将数据存储在同一个映射中。如果有任何错误,我们会将其存储在 err
通道上。 loadfiles
是读取内存中的文件并将其存储在 map
中的函数。
type customerconfig struct { deltachan chan string err chan error wg sync.waitgroup data *cmap.concurrentmap } // this is called during server startup. func (r *customerconfig) loadatstartupandprocessnewchanges() error { path, err := r.getpath("...", "....") if err != nil { return err } r.wg.add(1) go r.detectnewfiles(path) err = r.loadfiles(4, path) if err != nil { return err } r.wg.add(1) go r.processnewfiles() return nil }
此方法基本上确定是否有任何需要消耗的新文件,如果有,则会将其放入 deltachan
通道上,该通道稍后将由 processnewfiles
go-routine 消耗并读取内存中的文件。如果有任何错误,则会将错误添加到错误通道。
func (r *customerconfig) detectnewfiles(rootpath string) { }
这将读取所有 s3
文件并将其存储在内存中并返回错误。在这种方法中,我清除了地图的先前状态,以便它可以从新文件中获得新的状态。该方法在服务器启动期间调用,并且每当我们需要处理 processnewfiles
go-routine 中的新文件时也会调用该方法。
func (r *customerconfig) loadfiles(workers int, path string) error { var err error ... var files []string files = ..... // reset the map so that it can have fresh state from new files. r.data.clear() g, ctx := errgroup.withcontext(context.background()) sem := make(chan struct{}, workers) for _, file := range files { select { case <-ctx.done(): break case sem <- struct{}{}: } file := file g.go(func() error { defer func() { <-sem }() return r.read(spn, file, bucket) }) } if err := g.wait(); err != nil { return err } return nil }
此方法读取文件并添加到 data
并发映射中。
func (r *customerconfig) read(file string, bucket string) error { // read file and store it in "data" concurrent map // and if there is any error then return the error var err error fr, err := pars3.news3filereader(context.background(), bucket, file, r.s3client.getsession().config) if err != nil { return errs.wrap(err) } defer xio.closeignoringerrors(fr) pr, err := reader.newparquetreader(fr, nil, 8) if err != nil { return errs.wrap(err) } if pr.getnumrows() == 0 { spn.infof("skipping %s due to 0 rows", file) return nil } for { rows, err := pr.readbynumber(r.cfg.rowstoread) if err != nil { return errs.wrap(err) } if len(rows) <= 0 { break } byteslice, err := json.marshal(rows) if err != nil { return errs.wrap(err) } var invmods []compmodel err = json.unmarshal(byteslice, &invmods) if err != nil { return errs.wrap(err) } for i := range invmods { key := strconv.formatint(invmods[i].productid, 10) + ":" + strconv.itoa(int(invmods[i].iaz)) hasinventory := false if invmods[i].available > 0 { hasinventory = true } r.data.set(key, hasinventory) } } return nil }
此方法将选择 delta 通道
上的内容,如果有任何新文件,则它将通过调用 loadfiles
方法开始读取该新文件。如果有任何错误,则会将错误添加到错误通道。
// processnewfiles - load new files found by detectnewfiles func (r *customerconfig) processnewfiles() { // find new files on delta channel // and call "loadfiles" method to read it // if there is any error, then it will add it to the error channel. }
如果 error 通道
上有任何错误,那么它将通过以下方法记录这些错误 -
func (r *customerConfig) handleError() { // read error from error channel if there is any // then log it }
问题陈述
上述逻辑对我来说没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。正如您所看到的,我有一个并发映射,我在 read
方法中填充该映射,并在 loadfiles
方法中清除整个映射。因为每当 delta 通道上有新文件时,我都不想在地图中保留以前的状态,所以这就是为什么我要从地图中删除所有内容,然后从新文件中添加新状态。
现在,如果 read
方法中有任何错误,那么错误就会发生,因为我已经清除了 data
地图中的所有数据,该地图将具有空地图,这不是我想要的。基本上,如果有任何错误,那么我想保留 data
地图中的先前状态。我如何在上述当前设计中解决这个问题。
注意:我使用的是golang 并发map
正确答案
为 collecteddata
添加了 rwmutex
通过工作协程进行并发写保护
type customerconfig struct { ... m sync.rwmutex }
不要在 read
方法中更新 read
方法,而是让 read
方法返回数据和错误
func (r *customerconfig) read(file string, bucket string) ([]compmodel, error) { // read file data and return with error if any var err error fr, err := pars3.news3filereader(context.background(), bucket, file, r.s3client.getsession().config) if err != nil { return (nil, errs.wrap(err)) } defer xio.closeignoringerrors(fr) pr, err := reader.newparquetreader(fr, nil, 8) if err != nil { return (nil, errs.wrap(err)) } if pr.getnumrows() == 0 { spn.infof("skipping %s due to 0 rows", file) return (nil, errors.new("no data")) } var invmods = []compmodel{} for { rows, err := pr.readbynumber(r.cfg.rowstoread) if err != nil { return (nil, errs.wrap(err)) } if len(rows) <= 0 { break } byteslice, err := json.marshal(rows) if err != nil { return (nil, errs.wrap(err)) } var jsondata []compmodel err = json.unmarshal(byteslice, &jsondata) if err != nil { return (nil, errs.wrap(err)) } invmods = append(invmods, jsondata...) } return invmods, nil }
然后loadfiles
就可以收集read
返回的数据
方法,如果没有错误,则清除并更新地图,否则
保留旧数据原样
func (r *customerconfig) loadfiles(workers int, path string) error { var err error ... var files []string files = ..... // reset the map so that it can have fresh state from new files. // r.data.clear() <- remove the clear from here g, ctx := errgroup.withcontext(context.background()) sem := make(chan struct{}, workers) collecteddata := []compmodel{} for _, file := range files { select { case <-ctx.done(): break case sem <- struct{}{}: } file := file g.go(func() error { defer func() { <-sem }() data, err:= r.read(spn, file, bucket) if err != nil { return err } r.m.lock() append(collecteddata, data...) r.m.unlock() return nil }) } if err := g.wait(); err != nil { return err } r.data.clear() for i := range collecteddata { key := strconv.formatint(collecteddata[i].productid, 10) + ":" + strconv.itoa(int(collecteddata[i].iaz)) hasinventory := false if collecteddata[i].available > 0 { hasinventory = true } r.data.set(key, hasinventory) } return nil }
注意:由于代码不可运行,仅更新了参考方法,并且我没有包含用于更新您可能需要处理这种情况的切片的互斥锁。
仅用 3 个函数即可实现相同的功能 - 检测、读取、加载,检测将按时间间隔检查新文件,如果发现有则推送到增量通道,加载将从增量通道获取文件路径并调用读取方法获取数据和错误,然后检查是否没有错误,然后清除地图并使用新内容更新,否则记录错误,因此您将有 2 个 go 例程和 1 个由加载例程调用的函数
package main import ( "fmt" "time" "os" "os/signal" "math/rand" ) func main() { fmt.println(">>>", center("started", 30), "<<<") c := &config{ initialpath: "old path", detectinterval: 3000, } c.start() fmt.println(">>>", center("ended", 30), "<<<") } // https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center func center(s string, w int) string { return fmt.sprintf("%[1]*s", -w, fmt.sprintf("%[1]*s", (w + len(s))/2, s)) } type config struct { deltach chan string ticker *time.ticker stopsignal chan os.signal initialpath string detectinterval time.duration } func (c *config) start() { c.stopsignal = make(chan os.signal, 1) signal.notify(c.stopsignal, os.interrupt) c.ticker = time.newticker(c.detectinterval * time.millisecond) c.deltach = make(chan string, 1) go c.detect() go c.load() if c.initialpath != "" { c.deltach <- c.initialpath } <- c.stopsignal c.ticker.stop() } // detect new files func (c *config) detect() { for { select { case <- c.stopsignal: return case <- c.ticker.c: fmt.println(">>>", center("detect", 30), "<<<") c.deltach <- fmt.sprintf("path %f", rand.float64() * 1.5) } } } // read files func read(path string) (map[string]int, error) { data := make(map[string]int) data[path] = 0 fmt.println(">>>", center("read", 30), "<<<") fmt.println(path) return data, nil } // load files func (c *config) load() { for { select { case <- c.stopsignal: return case path := <- c.deltach: fmt.println(">>>", center("load", 30), "<<<") data, err := read(path) if err != nil { fmt.println("log error") } else { fmt.println("success", data) } fmt.println() } } }
注意:示例代码中未包含地图,它可以轻松更新以包含地图
我认为您的设计过于复杂。它可以更简单地解决,从而提供您想要的所有好处:
- 并发访问安全
- 重新加载检测到的更改
- 访问配置将为您提供最新的、成功加载的配置
- 即使由于检测到的更改而加载新配置需要很长时间,最新的配置也始终可以立即访问
- 如果加载新配置失败,则保留之前的“快照”并保持当前状态
- 作为奖励,它更简单,甚至不使用第 3 方库
让我们看看如何实现这一目标:
有一个 customerconfig
结构来保存您想要缓存的所有内容(这是“快照”):
type customerconfig struct { data map[string]bool // add other props if you need: loadedat time.time }
提供一个函数来加载您想要缓存的配置。注意:该函数是无状态的,它不会访问/操作包级变量:
func loadconfig() (*customerconfig, error) { cfg := &customerconfig{ data: map[string]bool{}, loadedat: time.now(), } // logic to load files, and populate cfg.data // if an error occurs, return it // if loading succeeds, return the config return cfg, nil }
现在让我们创建我们的“缓存管理器”。缓存管理器存储实际/当前配置(快照),并提供对其的访问。为了安全的并发访问(和更新),我们使用sync.RWMutex
。还有停止管理器的方法(停止并发刷新):
type configcache struct { configmu sync.rwmutex config *customerconfig closech chan struct{} }
创建缓存会加载初始配置。还启动一个 goroutine 负责定期检查更改。
func newconfigcache() (*configcache, error) { cfg, err := loadconfig() if err != nil { return nil, fmt.errorf("loading initial config failed: %w", err) } cc := &configcache{ config: cfg, closech: make(chan struct{}), } // launch goroutine to periodically check for changes, and load new configs go cc.refresher() return cc, nil }
refresher()
定期检查更改,如果检测到更改,则调用 loadconfig()
加载要缓存的新数据,并将其存储为当前/实际配置(同时锁定 configmu
)。它还监视 closech
以在收到请求时停止:
func (cc *configcache) refresher() { ticker := time.newticker(1 * time.minute) // every minute defer ticker.stop() for { select { case <-ticker.c: // check if there are changes changes := false // logic to detect changes if !changes { continue // no changes, continue } // changes! load new config: cfg, err := loadconfig() if err != nil { log.printf("failed to load config: %v", err) continue // keep the previous config } // apply / store new config cc.configmu.lock() cc.config = cfg cc.configmu.unlock() case <-cc.closech: return } } }
关闭缓存管理器(刷新 goroutine)非常简单:
func (cc *configcache) stop() { close(cc.closech) }
最后一个缺失的部分是如何访问当前配置。这是一个简单的 getconfig()
方法(也使用 configmu
,但处于只读模式):
func (cc *configcache) getconfig() *customerconfig { cc.configmu.rlock() defer cc.configmu.runlock() return cc.config }
这是您如何使用它:
cc, err := NewConfigCache() if err != nil { // Decide what to do: retry, terminate etc. } // Where ever, whenever you need the actual (most recent) config in your app: cfg := cc.GetConfig() // Use cfg
在关闭应用程序(或者想要停止刷新)之前,您可以调用 cc.stop()
。
本篇关于《在系统启动时加载文件数据,处理新文件并更新映射状态》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!

- 上一篇
- 优化PyCharm环境:优化Python开发效率

- 下一篇
- Linux管道命令的简介和基础用法
-
- Golang · Go问答 | 1年前 |
- 在读取缓冲通道中的内容之前退出
- 139浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 戈兰岛的全球 GOPRIVATE 设置
- 204浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将结构作为参数传递给 xml-rpc
- 325浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何用golang获得小数点以下两位长度?
- 477浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何通过 client-go 和 golang 检索 Kubernetes 指标
- 486浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将多个“参数”映射到单个可变参数的习惯用法
- 439浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将 HTTP 响应正文写入文件后出现 EOF 错误
- 357浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 结构中映射的匿名列表的“复合文字中缺少类型”
- 352浏览 收藏
-
- Golang · Go问答 | 1年前 |
- NATS Jetstream 的性能
- 101浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将复杂的字符串输入转换为mapstring?
- 440浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 相当于GoLang中Java将Object作为方法参数传递
- 212浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何确保所有 goroutine 在没有 time.Sleep 的情况下终止?
- 143浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 508次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 497次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- Vozo AI
- 探索Vozo AI,一款功能强大的在线AI视频换脸工具,支持跨性别、年龄和肤色换脸,适用于广告本地化、电影制作和创意内容创作,提升您的视频制作效率和效果。
- 2次使用
-
- AIGAZOU-AI图像生成
- AIGAZOU是一款先进的免费AI图像生成工具,无需登录即可使用,支持中文提示词,生成高清图像。适用于设计、内容创作、商业和艺术领域,提供自动提示词、专家模式等多种功能。
- 2次使用
-
- Raphael AI
- 探索Raphael AI,一款由Flux.1 Dev支持的免费AI图像生成器,无需登录即可无限生成高质量图像。支持多种风格,快速生成,保护隐私,适用于艺术创作、商业设计等多种场景。
- 1次使用
-
- 笔灵AI生成答辩PPT
- 探索笔灵AI生成答辩PPT的强大功能,快速制作高质量答辩PPT。精准内容提取、多样模板匹配、数据可视化、配套自述稿生成,让您的学术和职场展示更加专业与高效。
- 30次使用
-
- 知网AIGC检测服务系统
- 知网AIGC检测服务系统,专注于检测学术文本中的疑似AI生成内容。依托知网海量高质量文献资源,结合先进的“知识增强AIGC检测技术”,系统能够从语言模式和语义逻辑两方面精准识别AI生成内容,适用于学术研究、教育和企业领域,确保文本的真实性和原创性。
- 45次使用
-
- GoLand调式动态执行代码
- 2023-01-13 502浏览
-
- 用Nginx反向代理部署go写的网站。
- 2023-01-17 502浏览
-
- Golang取得代码运行时间的问题
- 2023-02-24 501浏览
-
- 请问 go 代码如何实现在代码改动后不需要Ctrl+c,然后重新 go run *.go 文件?
- 2023-01-08 501浏览
-
- 如何从同一个 io.Reader 读取多次
- 2023-04-11 501浏览