go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

2024-05-13 1914阅读

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:

Windows下安装Kafka(图文记录详细步骤)

sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:

    go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

    go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

  • 我们在.mod文件内指定第三方包及其版本:
    module Kafkademo
    require (
    	github.com/Shopify/sarama v1.19
    )
    go 1.21.6
    

    其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

    package main
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    )
    func main() {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll                                
    }
    

    这时候再打开终端输入go mod tidy

    go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

    等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

    go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

    利用sarama向Kafka发送消息(消息的生产)

    代码

    package main
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    )
    func main() {
    	config := sarama.NewConfig()                              //创建config实例
    	config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
    	config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
    	config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回
    	//创建信息
    	msg := &sarama.ProducerMessage{}
    	msg.Topic = "web.log"
    	msg.Value = sarama.StringEncoder("this is a test log")
    	//连接KafKa
    	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    	if err != nil {
    		fmt.Println("producer closed, err:", err)
    		return
    	}
    	defer client.Close()
    	//发送消息
    	pid, offset, err := client.SendMessage(msg)
    	if err != nil {
    		fmt.Println("send msg failed,err:", err)
    		return
    	}
    	fmt.Printf("pid:%v offset:%v\n", pid, offset)
    }
    

    运行过程

    • 首先我们打开终端开起ZooKepper服务
      zkServer
      
      go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
    • 然后再Kafka所在文件夹下输入命令运行Kafka:
      .\bin\windows\kafka-server-start.bat .\config\server.properties
      

      go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

      最后运行程序即可,输出结果为:

      go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

      补充:消息的消费

      代码

      package main
      import (
      	"fmt"
      	"github.com/Shopify/sarama"
      	"time"
      )
      func main() {
      	customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
      	if err != nil {
      		fmt.Println("failed init customer,err:", err)
      		return
      	}
      	partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
      	if err != nil {
      		fmt.Println("failed get partition list,err:", err)
      		return
      	}
      	fmt.Println("partitions:", partitionlist)
      	for partition := range partitionlist { // 遍历所有分区
      		//根据消费者对象创建一个分区对象
      		pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
      		if err != nil {
      			fmt.Println("failed get partition consumer,err:", err)
      			return
      		}
      		defer pc.Close() // 移动到这里
      		go func(consumer sarama.PartitionConsumer) {
      			defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
      			for msg := range pc.Messages() {
      				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
      			}
      		}(pc)
      		time.Sleep(time.Second * 10)
      	}
      }
      

      不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。

VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]