KafkaQ - 好用的 Kafka Linux 命令行可视化工具
软件效果前瞻 ~
鉴于并没有在网上找到比较好的linux平台的kafka可视化工具,今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ
虽然简陋,主要可以实现下面的这些功能:
1)查看当前topic的分片数量和副本数量
2)查看当前topic下面每个分片的最大offset
3)查看当前topic某个分片下面指定offset范围的数据
4)搜索当前topic指定关键词的message
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
KafkaQ分为普通版本和搜索版本:
* 普通版本支持上述3种查询
* 搜索版本支持上述3种查询之外,增加关键词搜索,即在分片中搜索指定关键词的message
一、普通版 KafkaQ.sh
使用方法:
Usage: KafkaQ.sh --topic [--partition] [--offset] [--limit] --topic 话题名称 --partition 分片索引(可选) --offset 从第k个offset开始检索(可选) --limit 从第k个offset开始检索X条结果(可选)
显示的效果如下,十分简洁,分片数据里面左边一列是消息入库的时间,右边是message内容:
KafkaQ 源码如下:
#!/bin/bash
# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}
# 检查参数
if [ -z "$1" ]; then
echo "Usage: $0 --topic [--partition] [--offset] [--limit]"
exit 1
fi
TOPIC="$1"
# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
echo "Kafka not found at /usr/local/kafka/bin/"
exit 1
fi
# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"
# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')
echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"
# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC" | awk -F: '{ printf " 分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"
# 获取分片数据
if [ "$LIMIT" -gt 0 ]; then
echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
# 格式化输出消息
echo "$MESSAGES" | awk -F'\t' 'BEGIN {
print "* 分片数据:"
}
{
if ($3 != "null") {
timestamp = substr($1, 12) / 1000 # 从第10个字符开始提取时间戳,并除以1000以转换为秒级时间戳
value = $3
printf "\033[0;33m%s\033[0m %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value
}
}'
fi
二、搜索版 KafkaQ-Search.sh
使用方法:
Usage: KafkaQ-Search.sh --topic [--partition] [--offset] [--limit] [--search] --topic 话题名称 --partition 分片索引(可选) --offset 从第k个offset开始检索(可选) --limit 从第k个offset开始检索X条结果(可选) --search 搜索字符串
示例(所有参数是必选的哦):
sh KafkaQ-Search.sh --topic log --partition 0 --offset 0 --limit 18480 --search '9fea9c52-c0fe-4429-81e1-d045f35f9be9'
显示效果如下:
KafkaQ-Search.sh 源码如下:
#!/bin/bash
# 默认值
PARTITION=${2:-0}
OFFSET=${3:-0}
LIMIT=${4:-0}
SEARCH=${5:-""}
# 检查参数
if [ -z "$1" ]; then
echo "Usage: $0 --topic [--partition] [--offset] [--limit] [--search]"
exit 1
fi
while [[ $# -gt 0 ]]; do
case "$1" in
--topic)
TOPIC="$2"
shift 2
;;
--partition)
PARTITION="$2"
shift 2
;;
--offset)
OFFSET="$2"
shift 2
;;
--limit)
LIMIT="$2"
shift 2
;;
--search)
SEARCH="$2"
shift 2
;;
*)
echo "Unknown parameter: $1"
exit 1
;;
esac
done
# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh >/dev/null 2>&1; then
echo "Kafka not found at /usr/local/kafka/bin/"
exit 1
fi
# 获取Topic信息
echo -e "\033[0;31m* 话题: $TOPIC\033[0m"
# 获取分区数和副本数
PARTITION_INFO=$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic "$TOPIC")
PARTITION_COUNT=$(echo "$PARTITION_INFO" | awk '/Partition:/ {print $2}' | wc -l)
REPLICA_COUNT=$(echo "$PARTITION_INFO" | grep -oP 'ReplicationFactor: \K\d+')
echo "* 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT"
# 获取分片a和分片b的最大偏移量
MAX_OFFSET=$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic "$TOPIC" | awk -F: '{ printf " 分片: %s,MaxOffset: %s\n", $2, $3 }')
echo "$MAX_OFFSET"
# 获取分片数据
if [ "$LIMIT" -gt 0 ]; then
echo -e "\033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0m"
MESSAGES=$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic "$TOPIC" --partition "$PARTITION" --offset "$OFFSET" --max-messages "$LIMIT" --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer)
# 搜索关键词并输出结果
if [[ ! -z $SEARCH ]]; then
echo -e "\033[0;32m* 搜索条件:$SEARCH\033[0m"
echo " 搜索结果:"
echo "$MESSAGES" | grep --color=never "$SEARCH" | awk -F'\t' '{
timestamp = substr($1, 12) / 1000 # 从第12个字符开始提取时间戳,并除以1000以转换为秒级时间戳
value = $3
printf "\033[0;33m%s\033[0m %s\n", strftime("%Y-%m-%d %H:%M:%S", timestamp), value
}'
fi
fi
* (附注)参考的shell如下
1、获取kafka的topic 分区数量
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic
2、获取kafka每个分片最大的offset
/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic
3、获取kafka分片指定offset范围的具体信息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic --partition --offset --max-messages --property print.key=true --property print.value=true --property print.timestamp=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!





