当前位置:首页 > 文章列表 > Golang > Go问答 > 竞争条件下的 Uber-go/zap 和 kafka-go

竞争条件下的 Uber-go/zap 和 kafka-go

来源:stackoverflow 2024-02-18 11:18:22 0浏览 收藏

IT行业相对于一般传统行业,发展更新速度更快,一旦停止了学习,很快就会被行业所淘汰。所以我们需要踏踏实实的不断学习,精进自己的技术,尤其是初学者。今天golang学习网给大家整理了《竞争条件下的 Uber-go/zap 和 kafka-go》,聊聊,我们一起来看看吧!

问题内容

我正在创建一个自定义记录器,我们可以在其中登录到 std out 和 std err,而且还添加了登录到 kafka 的可能性(代码示例位于:https://github.com/roppa/kafka-go )。我们有多个主题,因此需要多个记录器,但是当我们使用多个记录器时,就会发生一些奇怪的事情。当两个 kafka-go 设置都是异步时,我不会收到任何消费者消息,当一个是异步而另一个是同步时,我们会得到如下内容:

//consumer topica
{"level":"\u001b[34minfo\u001b[0m","timestamp":"2020-12-09t15:31:04.023z","msg":"topic-a log 1","uid":"abc123","ns":"test-service"}

{"level":"\u001b[34minfo\u001b[0m","timestamp":"2020-12-09t15:31:05.078z","msg":"topic-a log 2","uid":"abc123","ns":"test-service"}

{"level":"\u001b[34minfo\u001b[0m","timestamp":"2020-12-09t15:31:06.085z","msg":"topic-a log 3","uid":"abc123","ns":"test-service"}

//consumer topicb
2020-12-09t15:31:06.085z    info    topic-a log 3   {"uid": "abc123", "ns": "test-service"}
2","uid":"abc123","ns":"test-service"}

更改同步会产生完全不同的效果。我对 go 还很陌生。

这是 main.go:

package main

import (
    "context"
    "kafka-log/logger"
)

func main() {
    loggera := logger.init("test-service", "localhost:9092", "topica", false, false)
    loggerb := logger.init("test-service", "localhost:9092", "topicb", false, true)

    ctx := context.background()
    ctx2 := context.withvalue(ctx, logger.uid, "abc123")

    loggera.cinfo(ctx2, "topic-a log 1")
    loggerb.cinfo(ctx2, "topic-b log 1")

    loggera.cinfo(ctx2, "topic-a log 2")
    loggerb.cinfo(ctx2, "topic-b log 2")

    loggera.cinfo(ctx2, "topic-a log 3")
    loggerb.cinfo(ctx2, "topic-b log 3")
}

这是 logger/logger.go:

package logger

import (
    "context"
    "os"

    "github.com/segmentio/kafka-go"
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
)

type (
    key string

    // logger type embeds zap and also contains the current system name (namespace, ns)
    logger struct {
        *zap.logger
        ns string
    }

    // kconfig type for creating a new kafka logger. takes a namespace,
    // broker (eg 'localhost:9092'), topic (eg 'topic-a')
    kconfig struct {
        namespace string
        broker    string
        topic     string
        async     bool
    }

    producerinterface interface {
        writemessages(ctx context.context, msgs ...kafka.message) error
    }

    // kafkaproducer contains a kafka.producer and kafka topic
    kafkaproducer struct {
        producer producerinterface
        topic    string
    }
)

const (
    // uid - uniquely request identifier
    uid key = "request_id"
)

var customconfig = zapcore.encoderconfig{
    timekey:        "timestamp",
    levelkey:       "level",
    namekey:        "logger",
    callerkey:      "caller",
    functionkey:    zapcore.omitkey,
    messagekey:     "msg",
    stacktracekey:  "stacktrace",
    lineending:     zapcore.defaultlineending,
    encodelevel:    zapcore.capitalcolorlevelencoder,
    encodetime:     zapcore.iso8601timeencoder,
    encodeduration: zapcore.secondsdurationencoder,
}

// cinfo this function takes a context as first parameter, extracts specific fields as well as namespace, and calls zap info
func (l *logger) cinfo(ctx context.context, msg string, fields ...zap.field) {
    l.info(msg, consolidate(ctx, l.ns, fields...)...)
}

func consolidate(ctx context.context, namespace string, fields ...zap.field) []zap.field {
    return append(append(ctxtozapfields(ctx), fields...), zap.string("ns", namespace))
}

// see advanced config example: https://github.com/uber-go/zap/blob/master/example_test.go#l105
var lowpriority = zap.levelenablerfunc(func(lvl zapcore.level) bool {
    return lvl < zapcore.errorlevel && lvl > zapcore.debuglevel
})
var debugpriority = zap.levelenablerfunc(func(lvl zapcore.level) bool {
    return lvl < zapcore.errorlevel
})
var kafkapriority = zap.levelenablerfunc(func(lvl zapcore.level) bool {
    return lvl > zapcore.debuglevel
})

// init creates a new instance of a logger. namespace is the name of the module using the logger. broker and topic are kafa specific,
// if either of these is not set a default console logger is created.
func init(namespace, broker, topic string, debug, async bool) *logger {
    var kp *kafkaproducer = nil
    if broker != "" && topic != "" {
        kp = newkafkaproducer(&kconfig{
        broker: broker,
        topic:  topic,
        async:  async,
    })
    }
    logger := getlogger(debug, kp)
    // logger.info("initiated logger", zap.string("ns", namespace), zap.bool("kafka", kp != nil), zap.bool("debug", debug))
    return &logger{logger, namespace}
}

func getlogger(debug bool, kp *kafkaproducer) *zap.logger {
    // cores are logger interfaces
    var cores []zapcore.core

    // optimise message for console output (human readable)
    consoleencoder := zapcore.newconsoleencoder(customconfig)
    // lock wraps a writesyncer in a mutex to make it safe for concurrent use.
    // see https://godoc.org/go.uber.org/zap/zapcore
    cores = append(cores,
        zapcore.newcore(consoleencoder, zapcore.lock(os.stdout), getpriority(debug)),
        zapcore.newcore(consoleencoder, zapcore.lock(os.stderr), zap.errorlevel),
    )

    if kp != nil {
        cores = append(cores, zapcore.newcore(zapcore.newjsonencoder(customconfig), zapcore.lock(zapcore.addsync(kp)), kafkapriority))
    }

    // join inputs, encoders, level-handling functions into cores, then "tee" together
    logger := zap.new(zapcore.newtee(cores...))
    defer logger.sync()
    return logger
}

func getpriority(debug bool) zap.levelenablerfunc {
    if debug {
        return debugpriority
    }
    return lowpriority
}

func ctxtozapfields(ctx context.context) []zap.field {
    reqid, _ := ctx.value(uid).(string)
    return []zap.field{
        zap.string("uid", reqid),
    }
}

// newkafkaproducer instantiates a kafka.producer, saves topic, and returns a kafkaproducer
func newkafkaproducer(c *kconfig) *kafkaproducer {
    return &kafkaproducer{
        producer: kafka.newwriter(kafka.writerconfig{
            brokers:      []string{c.broker},
            topic:        c.topic,
            balancer:     &kafka.hash{},
            async:        c.async,
            requiredacks: -1, // -1 = all
        }),
        topic: c.topic,
    }
}

// write takes a message as a byte slice, wraps in a kafka.message and calls kafka produce
func (kp *kafkaproducer) write(msg []byte) (int, error) {
    return len(msg), kp.producer.writemessages(context.background(), kafka.message{
        key:   []byte(""),
        value: msg,
    })
}

我正在为消费者使用这些:

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topica

docker exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic topicb

这是我的 kafka docker-compose:

version: '3.8'

services:
  
  zookeeper:
    image: confluentinc/cp-zookeeper
    networks:
      - kafka-net
    container_name: zookeeper
    environment:
        ZOOKEEPER_CLIENT_PORT: 2181
    ports:
        - 2181:2181

  kafka:
    image: confluentinc/cp-kafka
    networks:
      - kafka-net
    container_name: kafka
    environment:
        KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
        KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        ALLOW_PLAINTEXT_LISTENER: "yes"
        KAFKA_LISTENERS-INTERNAL: //kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
        KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
        KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    ports:
        - 9092:9092
        - 29092:29092
    depends_on:
        - zookeeper
    restart: on-failure

networks:
  kafka-net:
    driver: bridge


解决方案


我想象你的程序在异步消息有时间发送之前就退出了(尽管如果我正确地阅读你的示例,我很奇怪“topic-a log 3”是唯一的日志消息)。与 javascript 不同,go 不会等待所有线程/goroutines 终止才退出。

还将突出显示 kafka-go 异步配置的文档字符串:

// Setting this flag to true causes the WriteMessages method to never block.
// It also means that errors are ignored since the caller will not receive
// the returned value. Use this only if you don't care about guarantees of
// whether the messages were written to kafka.

在解决方案方面:我认为您可以通过在编写器上调用 clos​​e 来解决此问题:

https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.Close

您需要在退出之前显示底层 kafkaproducer.producer 并调用 kafkaproducer.producer.close

可能有更聪明的方法来构造清理,但我似乎找不到比仅在编写器上调用 close 更简单的方法来刷新待处理消息。

今天带大家了解了的相关知识,希望对你有所帮助;关于Golang的技术知识我们会一点点深入介绍,欢迎大家关注golang学习网公众号,一起学习编程~

版本声明
本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
解决win10应用程序无法正常启动0xc0000142问题解决win10应用程序无法正常启动0xc0000142问题
上一篇
解决win10应用程序无法正常启动0xc0000142问题
揭秘Java变量类型:深入探讨各种变量类型的特性
下一篇
揭秘Java变量类型:深入探讨各种变量类型的特性
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • SEO标题魔匠AI:高质量学术写作平台,毕业论文生成与优化专家
    魔匠AI
    SEO摘要魔匠AI专注于高质量AI学术写作,已稳定运行6年。提供无限改稿、选题优化、大纲生成、多语言支持、真实参考文献、数据图表生成、查重降重等全流程服务,确保论文质量与隐私安全。适用于专科、本科、硕士学生及研究者,满足多语言学术需求。
    11次使用
  • PPTFake答辩PPT生成器:一键生成高效专业的答辩PPT
    PPTFake答辩PPT生成器
    PPTFake答辩PPT生成器,专为答辩准备设计,极致高效生成PPT与自述稿。智能解析内容,提供多样模板,数据可视化,贴心配套服务,灵活自主编辑,降低制作门槛,适用于各类答辩场景。
    26次使用
  • SEO标题Lovart AI:全球首个设计领域AI智能体,实现全链路设计自动化
    Lovart
    SEO摘要探索Lovart AI,这款专注于设计领域的AI智能体,通过多模态模型集成和智能任务拆解,实现全链路设计自动化。无论是品牌全案设计、广告与视频制作,还是文创内容创作,Lovart AI都能满足您的需求,提升设计效率,降低成本。
    25次使用
  • 美图AI抠图:行业领先的智能图像处理技术,3秒出图,精准无误
    美图AI抠图
    美图AI抠图,依托CVPR 2024竞赛亚军技术,提供顶尖的图像处理解决方案。适用于证件照、商品、毛发等多场景,支持批量处理,3秒出图,零PS基础也能轻松操作,满足个人与商业需求。
    35次使用
  • SEO标题PetGPT:智能桌面宠物程序,结合AI对话的个性化陪伴工具
    PetGPT
    SEO摘要PetGPT 是一款基于 Python 和 PyQt 开发的智能桌面宠物程序,集成了 OpenAI 的 GPT 模型,提供上下文感知对话和主动聊天功能。用户可高度自定义宠物的外观和行为,支持插件热更新和二次开发。适用于需要陪伴和效率辅助的办公族、学生及 AI 技术爱好者。
    36次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码