使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?
各位小伙伴们,大家好呀!看看今天我又给各位带来了什么文章?本文标题是《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》,很明显是关于Golang的文章哈哈哈,其中内容主要会涉及到等等,如果能帮到你,觉得很不错的话,欢迎各位多多点评和分享!
我将从 rabbitmq 切换到 kafka。这只是一个简单的秒杀,看看 kafka 是如何运作的。我不确定是否缺少某些设置,是否是我的代码,是否是 kafka-go,或者这是否是预期的 kafka 行为。
我尝试调整 batchsize 以及 batchtimeout 但都没有产生影响。
下面的代码创建一个具有 6 个分区且复制因子为 3 的主题。然后,它每隔 100ms 生成一条递增消息。它启动 6 个消费者,每个分区一个。读取和写入都是在 go 例程中执行的。
在下面的日志中,它在 7 秒内没有收到消息,然后收到突发消息。我正在使用 confluence 的平台,因此我认识到会有一些网络延迟,但没有达到我所看到的程度。
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"net"
"strconv"
"time"
kafka "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
)
func newdialer(clientid, username, password string) *kafka.dialer {
mechanism := plain.mechanism{
username: username,
password: password,
}
rootcas, _ := x509.systemcertpool()
if rootcas == nil {
rootcas = x509.newcertpool()
}
return &kafka.dialer{
timeout: 10 * time.second,
dualstack: true,
clientid: clientid,
saslmechanism: mechanism,
tls: &tls.config{
insecureskipverify: false,
rootcas: rootcas,
},
}
}
func createtopic(url string, topic string, dialer *kafka.dialer) {
conn, err := dialer.dial("tcp", url)
if err != nil {
panic(err.error())
}
defer conn.close()
controller, err := conn.controller()
if err != nil {
panic(err.error())
}
var controllerconn *kafka.conn
controllerconn, err = dialer.dial("tcp", net.joinhostport(controller.host, strconv.itoa(controller.port)))
if err != nil {
panic(err.error())
}
defer controllerconn.close()
topicconfigs := []kafka.topicconfig{
{
topic: topic,
numpartitions: 6,
replicationfactor: 3,
},
}
err = controllerconn.createtopics(topicconfigs...)
if err != nil {
panic(err.error())
}
}
func newwriter(url string, topic string, dialer *kafka.dialer) *kafka.writer {
return kafka.newwriter(kafka.writerconfig{
brokers: []string{url},
topic: topic,
balancer: &kafka.crc32balancer{},
dialer: dialer,
batchsize: 10,
batchtimeout: 1 * time.millisecond,
})
}
func newreader(url string, topic string, partition int, dialer *kafka.dialer) *kafka.reader {
return kafka.newreader(kafka.readerconfig{
brokers: []string{url},
topic: topic,
dialer: dialer,
partition: partition,
})
}
func read(url string, topic string, dialer *kafka.dialer, partition int) {
reader := newreader(url, topic, partition, dialer)
defer reader.close()
for {
msg, err := reader.readmessage(context.background())
if err != nil {
panic(err)
}
log.printf("rec%d:\t%s\n", partition, msg.value)
}
}
func write(url string, topic string, dialer *kafka.dialer) {
writer := newwriter(url, topic, dialer)
defer writer.close()
for i := 0; ; i++ {
v := []byte("v" + strconv.itoa(i))
log.printf("send:\t%s\n", v)
msg := kafka.message{ key: v, value: v }
err := writer.writemessages(context.background(), msg)
if err != nil {
fmt.println(err)
}
time.sleep(100 * time.millisecond)
}
}
func main() {
url := "_______.______.___.confluent.cloud:9092"
topic := "test"
username := "________________"
password := "________________"
clientid := "________________"
dialer := newdialer(clientid, username, password)
ctx := context.background()
createtopic(url, topic, dialer)
for i := 0; i < 6; i++ {
go read(url, topic, dialer, i)
}
go write(url, topic, dialer)
<-ctx.done()
}
正在记录以下内容。
2020/11/02 23:19:22 send: v0 2020/11/02 23:19:23 send: v1 2020/11/02 23:19:23 send: v2 2020/11/02 23:19:23 send: v3 2020/11/02 23:19:24 send: v4 2020/11/02 23:19:24 send: v5 2020/11/02 23:19:24 send: v6 2020/11/02 23:19:25 send: v7 2020/11/02 23:19:25 send: v8 2020/11/02 23:19:25 send: v9 2020/11/02 23:19:25 send: v10 2020/11/02 23:19:26 send: v11 2020/11/02 23:19:26 send: v12 2020/11/02 23:19:26 send: v13 2020/11/02 23:19:26 send: v14 2020/11/02 23:19:26 send: v15 2020/11/02 23:19:27 send: v16 2020/11/02 23:19:27 send: v17 2020/11/02 23:19:27 send: v18 2020/11/02 23:19:27 send: v19 2020/11/02 23:19:28 send: v20 2020/11/02 23:19:29 send: v21 2020/11/02 23:19:29 send: v22 2020/11/02 23:19:29 send: v23 2020/11/02 23:19:29 send: v24 2020/11/02 23:19:29 send: v25 2020/11/02 23:19:30 send: v26 2020/11/02 23:19:30 send: v27 2020/11/02 23:19:30 send: v28 2020/11/02 23:19:30 send: v29 2020/11/02 23:19:31 send: v30 2020/11/02 23:19:31 send: v31 2020/11/02 23:19:31 send: v32 2020/11/02 23:19:32 send: v33 2020/11/02 23:19:32 send: v34 2020/11/02 23:19:32 rec3: v8 2020/11/02 23:19:32 rec3: v14 2020/11/02 23:19:32 rec3: v15 2020/11/02 23:19:32 rec3: v16 2020/11/02 23:19:32 rec3: v17 2020/11/02 23:19:32 rec3: v20 2020/11/02 23:19:32 rec3: v21 2020/11/02 23:19:32 rec3: v23 2020/11/02 23:19:32 rec3: v29 2020/11/02 23:19:32 rec1: v0 2020/11/02 23:19:32 rec1: v9 2020/11/02 23:19:32 rec1: v22 2020/11/02 23:19:32 rec1: v28 2020/11/02 23:19:32 rec4: v4 2020/11/02 23:19:32 rec4: v5 2020/11/02 23:19:32 rec4: v7 2020/11/02 23:19:32 rec4: v10 2020/11/02 23:19:32 rec4: v11 2020/11/02 23:19:32 rec4: v12 2020/11/02 23:19:32 rec4: v18 2020/11/02 23:19:32 rec4: v24 2020/11/02 23:19:32 rec4: v25 2020/11/02 23:19:32 rec4: v30 2020/11/02 23:19:32 rec4: v31 2020/11/02 23:19:32 send: v35 2020/11/02 23:19:32 rec5: v1 2020/11/02 23:19:32 rec5: v2 2020/11/02 23:19:32 rec5: v3 2020/11/02 23:19:32 rec5: v34 2020/11/02 23:19:32 rec2: v6 2020/11/02 23:19:32 rec2: v13 2020/11/02 23:19:32 rec2: v26 2020/11/02 23:19:32 rec2: v33 2020/11/02 23:19:32 send: v36 2020/11/02 23:19:33 send: v37 2020/11/02 23:19:33 send: v38 2020/11/02 23:19:33 send: v39 2020/11/02 23:19:33 send: v40 2020/11/02 23:19:33 send: v41 2020/11/02 23:19:33 rec0: v19 2020/11/02 23:19:33 rec0: v27 2020/11/02 23:19:33 rec0: v32 2020/11/02 23:19:34 send: v42 2020/11/02 23:19:34 send: v43 2020/11/02 23:19:34 send: v44 2020/11/02 23:19:34 send: v45 2020/11/02 23:19:34 send: v46 2020/11/02 23:19:35 send: v47 2020/11/02 23:19:35 send: v48 2020/11/02 23:19:35 send: v49 2020/11/02 23:19:35 send: v50 2020/11/02 23:19:35 send: v51 2020/11/02 23:19:35 send: v52 2020/11/02 23:19:36 send: v53 2020/11/02 23:19:36 send: v54 2020/11/02 23:19:36 send: v55 2020/11/02 23:19:36 send: v56 2020/11/02 23:19:36 send: v57 2020/11/02 23:19:37 send: v58 2020/11/02 23:19:37 send: v59 2020/11/02 23:19:37 send: v60 2020/11/02 23:19:38 send: v61 2020/11/02 23:19:38 send: v62 2020/11/02 23:19:38 send: v63 2020/11/02 23:19:38 send: v64 2020/11/02 23:19:38 send: v65 2020/11/02 23:19:39 send: v66 2020/11/02 23:19:39 send: v67 2020/11/02 23:19:39 send: v68 2020/11/02 23:19:40 send: v69 2020/11/02 23:19:40 send: v70 2020/11/02 23:19:40 send: v71 2020/11/02 23:19:40 send: v72 2020/11/02 23:19:40 send: v73 2020/11/02 23:19:40 send: v74 2020/11/02 23:19:41 send: v75 2020/11/02 23:19:41 send: v76 2020/11/02 23:19:41 rec1: v41 2020/11/02 23:19:41 rec1: v56 2020/11/02 23:19:41 rec1: v68 2020/11/02 23:19:41 rec1: v74 2020/11/02 23:19:41 rec1: v75 2020/11/02 23:19:41 rec1: v76 2020/11/02 23:19:41 rec3: v37 2020/11/02 23:19:41 rec3: v40 2020/11/02 23:19:41 rec3: v42 2020/11/02 23:19:41 rec3: v48 2020/11/02 23:19:41 rec3: v55 2020/11/02 23:19:41 rec3: v57 2020/11/02 23:19:41 rec3: v60 2020/11/02 23:19:41 rec3: v61 2020/11/02 23:19:41 rec3: v62 2020/11/02 23:19:41 send: v77 2020/11/02 23:19:41 rec4: v38 2020/11/02 23:19:41 rec4: v39 2020/11/02 23:19:41 rec4: v45 2020/11/02 23:19:41 rec4: v46 2020/11/02 23:19:41 rec4: v47 2020/11/02 23:19:41 rec4: v53 2020/11/02 23:19:41 rec4: v59 2020/11/02 23:19:41 rec4: v70 2020/11/02 23:19:41 rec4: v71 2020/11/02 23:19:41 rec4: v73 2020/11/02 23:19:41 rec5: v35 2020/11/02 23:19:41 rec5: v36 2020/11/02 23:19:41 rec5: v43 2020/11/02 23:19:41 rec5: v49 2020/11/02 23:19:41 rec5: v54 2020/11/02 23:19:41 rec5: v63 2020/11/02 23:19:41 rec5: v69 2020/11/02 23:19:41 rec5: v77 2020/11/02 23:19:41 send: v78 2020/11/02 23:19:41 rec2: v44 2020/11/02 23:19:41 rec2: v50 2020/11/02 23:19:41 rec2: v51 2020/11/02 23:19:41 rec2: v64 2020/11/02 23:19:41 rec2: v65 2020/11/02 23:19:41 rec2: v66 2020/11/02 23:19:41 rec2: v72 2020/11/02 23:19:41 send: v79 2020/11/02 23:19:42 send: v80 2020/11/02 23:19:42 send: v81 2020/11/02 23:19:42 send: v82 2020/11/02 23:19:42 send: v83 2020/11/02 23:19:42 send: v84 2020/11/02 23:19:43 send: v85 2020/11/02 23:19:43 rec0: v52 2020/11/02 23:19:43 rec0: v58 2020/11/02 23:19:43 rec0: v67 2020/11/02 23:19:43 send: v86
如有任何建议,我们将不胜感激。谢谢!
编辑:
缓冲肯定是在 kafka-go 中发生的。 sarama 没有遇到相同的行为:
package main
import (
"context"
"fmt"
"github.com/shopify/sarama"
"crypto/tls"
"crypto/x509"
"log"
"strings"
"time"
)
var (
broker = "___-_____.us-east1.gcp.confluent.cloud:9092"
brokers = []string{broker}
clientid = "___________"
username = "___________"
password = "___________"
topic = "sarama"
)
func main() {
log.printf("kafka brokers: %s", strings.join(brokers, ", "))
ctx := context.background()
sync := newsyncproducer()
// accesslog := newasyncproducer()
createtopic(topic)
go func() {
for i := 0; ; i++ {
v := sarama.stringencoder(fmt.sprintf("v%d", i))
p, o, err := sync.sendmessage(&sarama.producermessage{
topic: topic,
value: v,
})
if err != nil {
panic(err)
}
fmt.printf("sent\t\t%v\tp: %d\toffset: %d\t\n", v, p, o)
time.sleep(100 * time.millisecond)
}
}()
ps := []sarama.partitionconsumer{}
offset := int64(0)
loop := func(msgs <-chan *sarama.consumermessage) {
for msg := range msgs {
fmt.printf("recv:\t\t%s\tp: %d\toffset: %d\n", msg.value, msg.partition, msg.offset)
}
}
for i := 0; i < 6; i++ {
ps = append(ps, createpartitionconsumer(topic, int32(i), offset))
}
for _, p := range ps {
go loop(p.messages())
}
<-ctx.done()
}
func createpartitionconsumer(topic string, partition int32, offset int64) sarama.partitionconsumer {
config := baseconfig()
c, err := sarama.newconsumer(brokers, config)
if err != nil {
panic(err)
}
p, err := c.consumepartition(topic, partition, offset)
if err != nil {
panic(err)
}
return p
}
func createtopic(topic string) {
config := baseconfig()
admin, err := sarama.newclusteradmin(brokers, config)
if err != nil {
log.fatal("error while creating cluster admin: ", err.error())
}
defer func() { _ = admin.close() }()
err = admin.createtopic(topic, &sarama.topicdetail{
numpartitions: 6,
replicationfactor: 3,
}, false)
if err != nil {
log.println("error while creating topic: ", err.error())
}
}
func baseconfig() *sarama.config {
rootcas, _ := x509.systemcertpool()
if rootcas == nil {
rootcas = x509.newcertpool()
}
config := sarama.newconfig()
config.version = sarama.maxversion
config.net.tls.enable = true
config.net.tls.config = &tls.config{
rootcas: rootcas,
insecureskipverify: false,
}
config.clientid = clientid
config.net.sasl.enable = true
config.net.sasl.password = password
config.net.sasl.user = username
return config
}
func newsyncproducer() sarama.syncproducer {
config := baseconfig()
config.producer.requiredacks = sarama.waitforall
config.producer.retry.max = 10
config.producer.return.successes = true
producer, err := sarama.newsyncproducer(brokers, config)
if err != nil {
log.fatalln("failed to start sarama producer:", err)
}
return producer
}
事实上,在某些情况下,它实际上在发送被确认之前就收到了,这让我想知道是否发生了内部消息传递,如果是这样,我是否应该关心......
recv: V1176 p: 1 offset: 355 recv: V1177 p: 2 offset: 363 send: V1177 p: 2 offset: 363 send: V1178 p: 5 offset: 377 recv: V1178 p: 5 offset: 377 recv: V1179 p: 1 offset: 356 send: V1179 p: 1 offset: 356 send: V1180 p: 1 offset: 357 recv: V1180 p: 1 offset: 357 recv: V1181 p: 1 offset: 358 send: V1181 p: 1 offset: 358 send: V1182 p: 4 offset: 393 recv: V1182 p: 4 offset: 393 send: V1183 p: 4 offset: 394 recv: V1183 p: 4 offset: 394 send: V1184 p: 3 offset: 358 recv: V1184 p: 3 offset: 358 send: V1185 p: 2 offset: 364 recv: V1185 p: 2 offset: 364 send: V1186 p: 3 offset: 359 recv: V1186 p: 3 offset: 359 recv: V1187 p: 3 offset: 360 send: V1187 p: 3 offset: 360 send: V1188 p: 5 offset: 378 recv: V1188 p: 5 offset: 378 send: V1189 p: 2 offset: 365 recv: V1189 p: 2 offset: 365 recv: V1190 p: 4 offset: 395 send: V1190 p: 4 offset: 395 send: V1191 p: 1 offset: 359 recv: V1191 p: 1 offset: 359 send: V1192 p: 4 offset: 396 recv: V1192 p: 4 offset: 396 send: V1193 p: 0 offset: 431 recv: V1193 p: 0 offset: 431 send: V1194 p: 4 offset: 397 recv: V1194 p: 4 offset: 397 recv: V1195 p: 2 offset: 366 send: V1195 p: 2 offset: 366 send: V1196 p: 3 offset: 361 recv: V1196 p: 3 offset: 361
解决方案
您需要更改 readerconfig.minbytes,否则 segmentio/kafka-go 会将其设置为 1e6 = 1 mb,在这种情况下,kafka 将等待那么多数据积累后再响应请求。
func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{url},
Topic: topic,
Dialer: dialer,
Partition: partition,
MinBytes: 1, // same value of Shopify/sarama
MaxBytes: 57671680,
})
}
另一方面,shopify/sarama 的默认值是 1 个字节。
参考文献:
到这里,我们也就讲完了《使用 Kafka-Go,为什么我会看到似乎正在批量读取/写入的内容?有我缺少的配置吗?》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于的知识点!
win8关闭uac用户账号控制操作方法
- 上一篇
- win8关闭uac用户账号控制操作方法
- 下一篇
- 如何在 Go 中选择 os.Stdin 和 http?
-
- Golang · Go问答 | 1年前 |
- 在读取缓冲通道中的内容之前退出
- 139浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 戈兰岛的全球 GOPRIVATE 设置
- 204浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将结构作为参数传递给 xml-rpc
- 325浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何用golang获得小数点以下两位长度?
- 478浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何通过 client-go 和 golang 检索 Kubernetes 指标
- 486浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将多个“参数”映射到单个可变参数的习惯用法
- 439浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将 HTTP 响应正文写入文件后出现 EOF 错误
- 357浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 结构中映射的匿名列表的“复合文字中缺少类型”
- 352浏览 收藏
-
- Golang · Go问答 | 1年前 |
- NATS Jetstream 的性能
- 101浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将复杂的字符串输入转换为mapstring?
- 440浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 相当于GoLang中Java将Object作为方法参数传递
- 212浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何确保所有 goroutine 在没有 time.Sleep 的情况下终止?
- 143浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3186次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3398次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3429次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4535次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3807次使用
-
- GoLand调式动态执行代码
- 2023-01-13 502浏览
-
- 用Nginx反向代理部署go写的网站。
- 2023-01-17 502浏览
-
- Golang取得代码运行时间的问题
- 2023-02-24 501浏览
-
- 请问 go 代码如何实现在代码改动后不需要Ctrl+c,然后重新 go run *.go 文件?
- 2023-01-08 501浏览
-
- 如何从同一个 io.Reader 读取多次
- 2023-04-11 501浏览

