当前位置:首页 > 文章列表 > Golang > Go问答 > 在系统启动时加载文件数据,处理新文件并更新映射状态

在系统启动时加载文件数据,处理新文件并更新映射状态

来源:stackoverflow 2024-02-20 13:12:25 0浏览 收藏

编程并不是一个机械性的工作,而是需要有思考,有创新的工作,语法是固定的,但解决问题的思路则是依靠人的思维,这就需要我们坚持学习和更新自己的知识。今天golang学习网就整理分享《在系统启动时加载文件数据,处理新文件并更新映射状态》,文章讲解的知识点主要包括,如果你对Golang方面的知识点感兴趣,就不要错过golang学习网,在这可以对大家的知识积累有所帮助,助力开发能力的提升。

问题内容

我正在开发一个项目,在启动过程中,我需要读取某些文件并将其存储在地图的内存中,然后定期查找新文件(如果有),然后替换之前在地图中内存中的所有文件使用这个新数据启动。基本上每次如果有一个 full state 的新文件,那么我想将内存中的映射对象刷新到这个新文件,而不是附加到它。

下面的方法 loadatstartupandprocessnewchanges 在服务器启动期间被调用,该方法读取文件并将数据存储在内存中。它还启动一个 go 例程 detectnewfiles ,定期检查是否有任何新文件并将其存储在 deltachan 通道上,该通道稍后由另一个 go 例程 processnewfiles 访问以再次读取该新文件并将数据存储在同一个映射中。如果有任何错误,我们会将其存储在 err 通道上。 loadfiles 是读取内存中的文件并将其存储在 map 中的函数。

type customerconfig struct {
  deltachan   chan string
  err         chan error
  wg          sync.waitgroup
  data        *cmap.concurrentmap
}

// this is called during server startup.
func (r *customerconfig) loadatstartupandprocessnewchanges() error {
  path, err := r.getpath("...", "....")
  if err != nil {
    return err
  }

  r.wg.add(1)
  go r.detectnewfiles(path)
  err = r.loadfiles(4, path)
  if err != nil {
    return err
  }
  r.wg.add(1)
  go r.processnewfiles()
  return nil
}

此方法基本上确定是否有任何需要消耗的新文件,如果有,则会将其放入 deltachan 通道上,该通道稍后将由 processnewfiles go-routine 消耗并读取内存中的文件。如果有任何错误,则会将错误添加到错误通道。

func (r *customerconfig) detectnewfiles(rootpath string) {

}

这将读取所有 s3 文件并将其存储在内存中并返回错误。在这种方法中,我清除了地图的先前状态,以便它可以从新文件中获得新的状态。该方法在服务器启动期间调用,并且每当我们需要处理 processnewfiles go-routine 中的新文件时也会调用该方法。

func (r *customerconfig) loadfiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  r.data.clear()
  g, ctx := errgroup.withcontext(context.background())
  sem := make(chan struct{}, workers)
  for _, file := range files {
    select {
    case <-ctx.done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.go(func() error {
      defer func() { <-sem }()
      return r.read(spn, file, bucket)
    })
  }

  if err := g.wait(); err != nil {
    return err
  }
  return nil
}

此方法读取文件并添加到 data 并发映射中。

func (r *customerconfig) read(file string, bucket string) error {
  // read file and store it in "data" concurrent map 
  // and if there is any error then return the error
  var err error
  fr, err := pars3.news3filereader(context.background(), bucket, file, r.s3client.getsession().config)
  if err != nil {
    return errs.wrap(err)
  }
  defer xio.closeignoringerrors(fr)

  pr, err := reader.newparquetreader(fr, nil, 8)
  if err != nil {
    return errs.wrap(err)
  }

  if pr.getnumrows() == 0 {
    spn.infof("skipping %s due to 0 rows", file)
    return nil
  }

  for {
    rows, err := pr.readbynumber(r.cfg.rowstoread)
    if err != nil {
      return errs.wrap(err)
    }
    if len(rows) <= 0 {
      break
    }

    byteslice, err := json.marshal(rows)
    if err != nil {
      return errs.wrap(err)
    }
    var invmods []compmodel
    err = json.unmarshal(byteslice, &invmods)
    if err != nil {
      return errs.wrap(err)
    }

    for i := range invmods {
      key := strconv.formatint(invmods[i].productid, 10) + ":" + strconv.itoa(int(invmods[i].iaz))
      hasinventory := false
      if invmods[i].available > 0 {
        hasinventory = true
      }
      r.data.set(key, hasinventory)
    }
  }
  return nil
}

此方法将选择 delta 通道 上的内容,如果有任何新文件,则它将通过调用 loadfiles 方法开始读取该新文件。如果有任何错误,则会将错误添加到错误通道。

// processnewfiles - load new files found by detectnewfiles
func (r *customerconfig) processnewfiles() {
  // find new files on delta channel
  // and call "loadfiles" method to read it
  // if there is any error, then it will add it to the error channel.
}

如果 error 通道 上有任何错误,那么它将通过以下方法记录这些错误 -

func (r *customerConfig) handleError() {
  // read error from error channel if there is any
  // then log it
}

问题陈述

上述逻辑对我来说没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。正如您所看到的,我有一个并发映射,我在 read 方法中填充该映射,并在 loadfiles 方法中清除整个映射。因为每当 delta 通道上有新文件时,我都不想在地图中保留以前的状态,所以这就是为什么我要从地图中删除所有内容,然后从新文件中添加新状态。

现在,如果 read 方法中有任何错误,那么错误就会发生,因为我已经清除了 data 地图中的所有数据,该地图将具有空地图,这不是我想要的。基本上,如果有任何错误,那么我想保留 data 地图中的先前状态。我如何在上述当前设计中解决这个问题。

注意:我使用的是golang 并发map


正确答案


collecteddata 添加了 rwmutex 通过工作协程进行并发写保护

type customerconfig struct {
   ...
   m sync.rwmutex
}

不要在 read 方法中更新 read 方法,而是让 read 方法返回数据和错误

func (r *customerconfig) read(file string, bucket string) ([]compmodel, error) {
  // read file data and return with error if any
  var err error
  fr, err := pars3.news3filereader(context.background(), bucket, file, r.s3client.getsession().config)
  if err != nil {
    return (nil, errs.wrap(err))
  }
  defer xio.closeignoringerrors(fr)

  pr, err := reader.newparquetreader(fr, nil, 8)
  if err != nil {
    return (nil, errs.wrap(err))
  }

  if pr.getnumrows() == 0 {
    spn.infof("skipping %s due to 0 rows", file)
    return (nil, errors.new("no data"))
  }

  var invmods = []compmodel{}
  for {
    rows, err := pr.readbynumber(r.cfg.rowstoread)
    if err != nil {
      return (nil, errs.wrap(err))
    }
    if len(rows) <= 0 {
      break
    }

    byteslice, err := json.marshal(rows)
    if err != nil {
      return (nil, errs.wrap(err))
    }
    var jsondata []compmodel
    err = json.unmarshal(byteslice, &jsondata)
    if err != nil {
      return (nil, errs.wrap(err))
    }
    invmods = append(invmods, jsondata...)
  }
  return invmods, nil
}

然后loadfiles就可以收集read返回的数据 方法,如果没有错误,则清除并更新地图,否则 保留旧数据原样

func (r *customerconfig) loadfiles(workers int, path string) error {
  var err error
  ...
  var files []string
  files = .....

  // reset the map so that it can have fresh state from new files.
  // r.data.clear() <- remove the clear from here
  g, ctx := errgroup.withcontext(context.background())
  sem := make(chan struct{}, workers)
  collecteddata := []compmodel{}

  for _, file := range files {
    select {
    case <-ctx.done():
      break
    case sem <- struct{}{}:
    }
    file := file
    g.go(func() error {
      defer func() { <-sem }()

      data, err:= r.read(spn, file, bucket)
      if err != nil {
        return err
      }

      r.m.lock()
      append(collecteddata, data...)
      r.m.unlock()
      return nil
    })
  }

  if err := g.wait(); err != nil {
    return err
  }

  r.data.clear()
  for i := range collecteddata {
    key := strconv.formatint(collecteddata[i].productid, 10) + ":" + strconv.itoa(int(collecteddata[i].iaz))
    hasinventory := false
    if collecteddata[i].available > 0 {
      hasinventory = true
    }
    r.data.set(key, hasinventory)
  }

  return nil
}

注意:由于代码不可运行,仅更新了参考方法,并且我没有包含用于更新您可能需要处理这种情况的切片的互斥锁。

仅用 3 个函数即可实现相同的功能 - 检测、读取、加载,检测将按时间间隔检查新文件,如果发现有则推送到增量通道,加载将从增量通道获取文件路径并调用读取方法获取数据和错误,然后检查是否没有错误,然后清除地图并使用新内容更新,否则记录错误,因此您将有 2 个 go 例程和 1 个由加载例程调用的函数

package main

import (
  "fmt"

  "time"
  "os"
  "os/signal"
  "math/rand"
)

func main() {
  fmt.println(">>>", center("started", 30), "<<<")

  c := &config{
    initialpath: "old path",
    detectinterval: 3000,
  }
  c.start()
  fmt.println(">>>", center("ended", 30), "<<<")
}

// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
    return fmt.sprintf("%[1]*s", -w, fmt.sprintf("%[1]*s", (w + len(s))/2, s))
}

type config struct {
  deltach chan string
  ticker *time.ticker
  stopsignal chan os.signal
  initialpath string
  detectinterval time.duration
}

func (c *config) start() {
  c.stopsignal = make(chan os.signal, 1)
  signal.notify(c.stopsignal, os.interrupt)

  c.ticker = time.newticker(c.detectinterval * time.millisecond)
  c.deltach = make(chan string, 1)
  go c.detect()
  go c.load()
  if c.initialpath != "" {
    c.deltach <- c.initialpath
  }
  <- c.stopsignal
  c.ticker.stop()
}

// detect new files
func (c *config) detect() {
  for {
    select {
      case <- c.stopsignal:
        return
      case <- c.ticker.c:
        fmt.println(">>>", center("detect", 30), "<<<")
        c.deltach <- fmt.sprintf("path %f", rand.float64() * 1.5)
    }
  }
}
// read files
func read(path string) (map[string]int, error) {
  data := make(map[string]int)
  data[path] = 0
  fmt.println(">>>", center("read", 30), "<<<")
  fmt.println(path)
  return data, nil
}
// load files
func (c *config) load() {
  for {
    select {
      case <- c.stopsignal:
        return
      case path := <- c.deltach:
        fmt.println(">>>", center("load", 30), "<<<")
        data, err := read(path)
        if err != nil {
          fmt.println("log error")
        } else {
          fmt.println("success", data)
        }
        fmt.println()
    }
  }
}

注意:示例代码中未包含地图,它可以轻松更新以包含地图

我认为您的设计过于复杂。它可以更简单地解决,从而提供您想要的所有好处:

  • 并发访问安全
  • 重新加载检测到的更改
  • 访问配置将为您提供最新的、成功加载的配置
  • 即使由于检测到的更改而加载新配置需要很长时间,最新的配置也始终可以立即访问
  • 如果加载新配置失败,则保留之前的“快照”并保持当前状态
  • 作为奖励,它更简单,甚至不使用第 3 方库

让我们看看如何实现这一目标:

有一个 customerconfig 结构来保存您想要缓存的所有内容(这是“快照”):

type customerconfig struct {
    data map[string]bool

    // add other props if you need:
    loadedat time.time
}

提供一个函数来加载您想要缓存的配置。注意:该函数是无状态的,它不会访问/操作包级变量:

func loadconfig() (*customerconfig, error) {
    cfg := &customerconfig{
        data:     map[string]bool{},
        loadedat: time.now(),
    }

    // logic to load files, and populate cfg.data
    // if an error occurs, return it

    // if loading succeeds, return the config
    return cfg, nil
}

现在让我们创建我们的“缓存管理器”。缓存管理器存储实际/当前配置(快照),并提供对其的访问。为了安全的并发访问(和更新),我们使用sync.RWMutex。还有停止管理器的方法(停止并发刷新):

type configcache struct {
    configmu sync.rwmutex
    config   *customerconfig
    closech  chan struct{}
}

创建缓存会加载初始配置。还启动一个 goroutine 负责定期检查更改。

func newconfigcache() (*configcache, error) {
    cfg, err := loadconfig()
    if err != nil {
        return nil, fmt.errorf("loading initial config failed: %w", err)
    }

    cc := &configcache{
        config:  cfg,
        closech: make(chan struct{}),
    }

    // launch goroutine to periodically check for changes, and load new configs
    go cc.refresher()

    return cc, nil
}

refresher() 定期检查更改,如果检测到更改,则调用 loadconfig() 加载要缓存的新数据,并将其存储为当前/实际配置(同时锁定 configmu)。它还监视 closech 以在收到请求时停止:

func (cc *configcache) refresher() {
    ticker := time.newticker(1 * time.minute) // every minute
    defer ticker.stop()

    for {
        select {
        case <-ticker.c:
            // check if there are changes
            changes := false // logic to detect changes
            if !changes {
                continue // no changes, continue
            }

            // changes! load new config:
            cfg, err := loadconfig()
            if err != nil {
                log.printf("failed to load config: %v", err)
                continue // keep the previous config
            }

            // apply / store new config
            cc.configmu.lock()
            cc.config = cfg
            cc.configmu.unlock()

        case <-cc.closech:
            return
        }
    }
}

关闭缓存管理器(刷新 goroutine)非常简单:

func (cc *configcache) stop() {
    close(cc.closech)
}

最后一个缺失的部分是如何访问当前配置。这是一个简单的 getconfig() 方法(也使用 configmu,但处于只读模式):

func (cc *configcache) getconfig() *customerconfig {
    cc.configmu.rlock()
    defer cc.configmu.runlock()
    return cc.config
}

这是您如何使用它:

cc, err := NewConfigCache()
if err != nil {
    // Decide what to do: retry, terminate etc.
}

// Where ever, whenever you need the actual (most recent) config in your app:

cfg := cc.GetConfig()
// Use cfg

在关闭应用程序(或者想要停止刷新)之前,您可以调用 cc.stop()

本篇关于《在系统启动时加载文件数据,处理新文件并更新映射状态》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于Golang的相关知识,请关注golang学习网公众号!

版本声明
本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
优化PyCharm环境:优化Python开发效率优化PyCharm环境:优化Python开发效率
上一篇
优化PyCharm环境:优化Python开发效率
Linux管道命令的简介和基础用法
下一篇
Linux管道命令的简介和基础用法
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • Vozo AI:超真实AI视频换脸工具,提升创意内容制作
    Vozo AI
    探索Vozo AI,一款功能强大的在线AI视频换脸工具,支持跨性别、年龄和肤色换脸,适用于广告本地化、电影制作和创意内容创作,提升您的视频制作效率和效果。
    2次使用
  • AIGAZOU:免费AI图像生成工具,简洁高效,支持中文
    AIGAZOU-AI图像生成
    AIGAZOU是一款先进的免费AI图像生成工具,无需登录即可使用,支持中文提示词,生成高清图像。适用于设计、内容创作、商业和艺术领域,提供自动提示词、专家模式等多种功能。
    2次使用
  • Raphael AI:Flux.1 Dev支持的免费AI图像生成器
    Raphael AI
    探索Raphael AI,一款由Flux.1 Dev支持的免费AI图像生成器,无需登录即可无限生成高质量图像。支持多种风格,快速生成,保护隐私,适用于艺术创作、商业设计等多种场景。
    1次使用
  • 笔灵AI生成答辩PPT:高效制作学术与职场PPT的利器
    笔灵AI生成答辩PPT
    探索笔灵AI生成答辩PPT的强大功能,快速制作高质量答辩PPT。精准内容提取、多样模板匹配、数据可视化、配套自述稿生成,让您的学术和职场展示更加专业与高效。
    30次使用
  • 知网AIGC检测服务系统:精准识别学术文本中的AI生成内容
    知网AIGC检测服务系统
    知网AIGC检测服务系统,专注于检测学术文本中的疑似AI生成内容。依托知网海量高质量文献资源,结合先进的“知识增强AIGC检测技术”,系统能够从语言模式和语义逻辑两方面精准识别AI生成内容,适用于学术研究、教育和企业领域,确保文本的真实性和原创性。
    45次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码