当前位置:首页 > 文章列表 > Golang > Go教程 > golang如何使用sarama访问kafka

golang如何使用sarama访问kafka

来源:脚本之家 2023-02-16 15:12:01 0浏览 收藏

亲爱的编程学习爱好者,如果你点开了这篇文章,说明你对《golang如何使用sarama访问kafka》很感兴趣。本篇文章就来给大家详细解析一下,主要介绍一下Kafka、sarama,希望所有认真读完的童鞋们,都有实质性的提高。

下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

使用方式

1、命令行参数

$ ./kafkaclient -h
Usage of ./client:
 -ca string
  CA Certificate (default "ca.pem")
 -cert string
  Client Certificate (default "cert.pem")
 -command string
  consumer|producer (default "consumer")
 -host string
  Common separated kafka hosts (default "localhost:9093")
 -key string
  Client Key (default "key.pem")
 -partition int
  Kafka topic partition
 -tls
  TLS enable
 -topic string
  Kafka topic (default "test--topic")

2、作为producer启动

$ ./kafkaclient -command producer \
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command producer \
 -tls -cert client.pem -key client.key -ca ca.pem \
 -host kafka1:9093,kafka2:9093

producer发送消息给kafka:

> aaa
2018/12/15 07:11:21 Produced message: [aaa]
> bbb
2018/12/15 07:11:30 Produced message: [bbb]
> quit

3、作为consumer启动

$ ./kafkaclient -command consumer \
 -host kafka1:9092,kafka2:9092

## TLS-enabled
$ ./kafkaclient -command consumer \
 -tls -cert client.pem -key client.key -ca ca.pem \
 -host kafka1:9093,kafka2:9093

consumer从kafka接受消息:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

完整源代码如下

这个代码使用到了Shopify/sarama库,请自行下载使用。

$ cat kafkaclient.go
package main

import (
 "flag"
 "fmt"
 "log"
 "os"
 "io/ioutil"
 "bufio"
 "strings"

 "crypto/tls"
 "crypto/x509"

 "github.com/Shopify/sarama"
)

var (
 command  string
 tlsEnable bool
 hosts  string
 topic  string
 partition int
 clientcert string
 clientkey string
 cacert  string
)

func main() {
 flag.StringVar(&command, "command",  "consumer",   "consumer|producer")
 flag.BoolVar(&tlsEnable, "tls",   false,    "TLS enable")
 flag.StringVar(&hosts,  "host",   "localhost:9093", "Common separated kafka hosts")
 flag.StringVar(&topic,  "topic",  "test--topic",  "Kafka topic")
 flag.IntVar(&partition,  "partition", 0,     "Kafka topic partition")
 flag.StringVar(&clientcert, "cert",   "cert.pem",   "Client Certificate")
 flag.StringVar(&clientkey, "key",   "key.pem",   "Client Key")
 flag.StringVar(&cacert,  "ca",   "ca.pem",   "CA Certificate")
 flag.Parse()

 config := sarama.NewConfig()
 if tlsEnable {
  //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)
  if err != nil {
   log.Fatal(err)
  }

  config.Net.TLS.Enable = true
  config.Net.TLS.Config = tlsConfig
 }
 client, err := sarama.NewClient(strings.Split(hosts, ","), config)
 if err != nil {
  log.Fatalf("unable to create kafka client: %q", err)
 }

 if command == "consumer" {
  consumer, err := sarama.NewConsumerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer consumer.Close()
  loopConsumer(consumer, topic, partition)
 } else {
  producer, err := sarama.NewAsyncProducerFromClient(client)
  if err != nil {
   log.Fatal(err)
  }
  defer producer.Close()
  loopProducer(producer, topic, partition)
 }
}

func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) {
 // load client cert
 clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile)
 if err != nil {
  return nil, err
 }

 // load ca cert pool
 cacert, err := ioutil.ReadFile(cacertfile)
 if err != nil {
  return nil, err
 }
 cacertpool := x509.NewCertPool()
 cacertpool.AppendCertsFromPEM(cacert)

 // generate tlcconfig
 tlsConfig := tls.Config{}
 tlsConfig.RootCAs = cacertpool
 tlsConfig.Certificates = []tls.Certificate{clientcert}
 tlsConfig.BuildNameToCertificate()
 // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert:
 return &tlsConfig, err
}

func loopProducer(producer sarama.AsyncProducer, topic string, partition int) {
 scanner := bufio.NewScanner(os.Stdin)
 fmt.Print("> ")
 for scanner.Scan() {
  text := scanner.Text()
  if text == "" {
  } else if text == "exit" || text == "quit" {
   break
  } else {
   producer.Input()  ")
 }
}

func loopConsumer(consumer sarama.Consumer, topic string, partition int) {
 partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)
 if err != nil {
  log.Println(err)
  return
 }
 defer partitionConsumer.Close()

 for {
  msg := 

<p>编译:</p>

<pre class="brush:plain;">
$ go build kafkaclient.go

到这里,我们也就讲完了《golang如何使用sarama访问kafka》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于golang的知识点!

版本声明
本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除
Go语言下载网络图片或文件的方法示例Go语言下载网络图片或文件的方法示例
上一篇
Go语言下载网络图片或文件的方法示例
golang解析域名的步骤全纪录
下一篇
golang解析域名的步骤全纪录
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    508次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    497次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • 茅茅虫AIGC检测:精准识别AI生成内容,保障学术诚信
    茅茅虫AIGC检测
    茅茅虫AIGC检测,湖南茅茅虫科技有限公司倾力打造,运用NLP技术精准识别AI生成文本,提供论文、专著等学术文本的AIGC检测服务。支持多种格式,生成可视化报告,保障您的学术诚信和内容质量。
    18次使用
  • 赛林匹克平台:科技赛事聚合,赋能AI、算力、量子计算创新
    赛林匹克平台(Challympics)
    探索赛林匹克平台Challympics,一个聚焦人工智能、算力算法、量子计算等前沿技术的赛事聚合平台。连接产学研用,助力科技创新与产业升级。
    50次使用
  • SEO  笔格AIPPT:AI智能PPT制作,免费生成,高效演示
    笔格AIPPT
    SEO 笔格AIPPT是135编辑器推出的AI智能PPT制作平台,依托DeepSeek大模型,实现智能大纲生成、一键PPT生成、AI文字优化、图像生成等功能。免费试用,提升PPT制作效率,适用于商务演示、教育培训等多种场景。
    57次使用
  • 稿定PPT:在线AI演示设计,高效PPT制作工具
    稿定PPT
    告别PPT制作难题!稿定PPT提供海量模板、AI智能生成、在线协作,助您轻松制作专业演示文稿。职场办公、教育学习、企业服务全覆盖,降本增效,释放创意!
    53次使用
  • Suno苏诺中文版:AI音乐创作平台,人人都是音乐家
    Suno苏诺中文版
    探索Suno苏诺中文版,一款颠覆传统音乐创作的AI平台。无需专业技能,轻松创作个性化音乐。智能词曲生成、风格迁移、海量音效,释放您的音乐灵感!
    57次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码