Zabbix 监控 Kafka Topic 积压数据是一个经典且关键的监控场景。积压量直接反映了消费者的处理能力与生产速度是否匹配,是衡量系统健康度的重要指标。以下是完整的解决方案,涵盖原理、步骤和最佳实践。
核心原理
通过 Kafka 自带的 kafka-consumer-groups.sh 脚本(或 Kafka API)获取指定 Consumer Group 对特定 Topic 的 Lag(滞后量),即未消费的消息数。Zabbix 通过定期抓取此数值进行监控、告警和可视化。
方案一:使用 Zabbix Agent + 自定义脚本(推荐)
这是最灵活、可控性最高的方案。
步骤 1:在 Kafka 节点或监控节点上创建监控脚本
创建一个 Shell 或 Python 脚本,用于获取指定 Consumer Group 和 Topic 的 Lag。
示例 Shell 脚本 (/usr/local/bin/kafka_lag_check.sh):
#!/bin/bash
# 参数:Consumer Group, Topic Name
CONSUMER_GROUP=$1
TOPIC_NAME=$2
KAFKA_HOME=/opt/kafka
KAFKA_BROKERS="kafka-broker1:9092,kafka-broker2:9092"
# 使用 kafka-consumer-groups 命令获取 lag,并过滤出指定 Topic 的行
LAG=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group ${CONSUMER_GROUP} 2>/dev/null | grep "^${TOPIC_NAME}" | awk '{sum += $6} END {print sum}')
# 如果命令执行失败或没有匹配项,输出 -1 表示异常
if [ -z "$LAG" ]; then
echo -1
else
echo $LAG
fi
给脚本执行权限:
chmod +x /usr/local/bin/kafka_lag_check.sh
测试脚本:
./kafka_lag_check.sh your-consumer-group your-topic-name
# 输出应为数字,如 1200
更健壮的 Python 脚本示例(使用 kafka-python 库):
#!/usr/bin/env python3
import sys
from kafka import KafkaAdminClient
from kafka.admin import ConsumerGroupDescription
from kafka.errors import KafkaError
def get_topic_lag(brokers, group_id, topic):
try:
client = KafkaAdminClient(bootstrap_servers=brokers)
# 获取消费者组详细信息(此方法可能需要根据Kafka版本调整)
# 这里是一个简化的示例,实际生产环境建议使用 confluent_kafka 库的 list_consumer_group_offsets 和 list_offsets
# 或者直接调用 kafka-consumer-groups.sh 并解析输出
cmd = f"/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server {brokers} --describe --group {group_id} --topic {topic}"
import subprocess
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
for line in result.stdout.strip().split('\n'):
if line.startswith(topic):
parts = line.split()
if len(parts) >= 6:
return int(parts[5])
return -1
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
return -1
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: {} <bootstrap_servers> <consumer_group> <topic>".format(sys.argv[0]))
sys.exit(1)
brokers, group, topic = sys.argv[1], sys.argv[2], sys.argv[3]
lag = get_topic_lag(brokers, group, topic)
print(lag)
步骤 2:配置 Zabbix Agent 自定义监控项
在运行脚本的机器(可以是 Kafka Broker 或一个集中监控主机)上修改 Zabbix Agent 配置。
创建 UserParameter 配置文件(如 /etc/zabbix/zabbix_agentd.d/userparameter_kafka.conf):
# 语法:UserParameter=<key>[*],<command>
# $1: Consumer Group, $2: Topic Name
UserParameter=kafka.topic.lag[*],/usr/local/bin/kafka_lag_check.sh "$1" "$2"
[*] 表示接受参数,参数会传递给脚本。
重启 Zabbix Agent:
systemctl restart zabbix-agent
在 Zabbix Server 上测试是否能获取数据:
zabbix_get -s <agent_host_ip> -k "kafka.topic.lag[your-consumer-group,your-topic-name]"
# 应返回 lag 数值
步骤 3:在 Zabbix Web 界面配置
创建主机:如果还没为 Kafka 集群创建主机,建议创建一个,例如 “Kafka-Monitor”。
创建监控项(Item):
- 名称:
Kafka Lag for [your-topic]:[your-consumer-group]
- 类型:Zabbix agent
- 键值:
kafka.topic.lag[your-consumer-group,your-topic-name]
- 信息类型:数字(无正负)
- 更新间隔:
1m 或 5m(根据业务敏感度)
- 历史数据存储:根据需求设置(如30天)
创建触发器(Trigger):
- 表达式:
{Kafka-Monitor:kafka.topic.lag[your-consumer-group,your-topic-name].last()}>1000
- 严重性:警告(Warning)或严重(High)
- 描述:
Topic {#TOPIC} 在 Consumer Group {#GROUP} 上的积压超过 1000 条消息。
- 建议设置多级告警:
> 1000:警告
> 10000:严重
> 50000:灾难
创建图形(Graph):
- 将多个 Topic 或 Consumer Group 的监控项放在同一个图形中,便于对比。
- 使用 Zabbix 的 “聚合图形” 功能展示整个 Kafka 集群的积压概览。
方案二:使用 Zabbix JMX 监控(原生但复杂)
Kafka 本身通过 JMX 暴露了大量指标,包括 kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*,topic=* 中的 records-lag-max 等。但直接使用 Zabbix 的 JMX 监控存在以下挑战:
性能开销:每个 Consumer Group/Topic 组合都会产生大量 JMX 对象,对 Zabbix Java Gateway 压力大。
配置繁琐:需要在 Zabbix 中为每个 MBean 创建监控项,不易于管理动态的 Consumer Group。
Lag 计算不直接:JMX 提供的 Lag 通常是每个分区的,需要聚合,不如命令行工具直接。
适用场景:如果 Kafka 集群规模小,且已启用 JMX,可以作为补充方案监控 Broker 和 Producer 端指标。
简化步骤:
在 Kafka 启动脚本中开启 JMX 端口(如
JMX_PORT=9999)。
在 Zabbix Server 上配置 Zabbix Java Gateway。
在 Zabbix 中创建一个 JMX 接口的主机,连接到 Kafka Broker。
通过 Zabbix 的
“JMX 发现” 功能自动发现 MBean 并创建监控项(可能需要自定义 MBean 模式)。
方案三:使用第三方模板或集成
Zabbix 官方模板:Zabbix 官方的
“Apache Kafka by JMX” 模板主要监控 Broker 状态,对 Consumer Lag 支持有限。
社区模板:GitHub 上有很多社区维护的模板(如使用
kafka-python 或
py-zabbix),通常基于方案一,但提供了开箱即用的模板和发现规则。
使用 Telegraf + Zabbix:
- 使用 Telegraf 的
exec 或 kafka_consumer 插件收集 Lag 数据。
- 通过 Telegraf 的 Zabbix 输出插件 或将数据写入数据库再由 Zabbix 读取。
- 此方案更适用于已在使用 Telegraf 作为统一指标收集器的环境。
最佳实践与高级配置
低层次自动发现(LLD):
- 目标:自动监控集群中所有 Consumer Group 和 Topic 的组合,无需手动为每个组合创建监控项。
- 实现:
- 编写一个 发现脚本,输出所有
{GROUP, TOPIC} 对的 JSON(使用 kafka-consumer-groups.sh --list 和 --describe 解析)。
- 在 Zabbix Agent 中配置
UserParameter 调用此脚本作为 发现规则。
- 在 Zabbix Web 中创建 发现规则,并为其配置 监控项原型 和 触发器原型。
- 优点:大幅减少维护工作,自动适配新的消费者组。
标签(Tags):
- 为监控项添加标签,如
service:kafka, metric:lag, topic:xxx, consumer_group:yyy。便于在监控仪表盘中筛选和聚合。
依赖项(Dependencies):
- 如果 Lag 过高可能是因为下游数据库或服务故障,可以设置触发器依赖关系,避免告警风暴。
趋势预测与基线告警:
- 使用 Zabbix 的 “基线监控” 或 “预测函数”(如
forecast)来识别 Lag 的非正常增长趋势,即使绝对值未达到阈值也发出预警。
- 示例触发器:
{host:kafka.topic.lag[group,topic].forecast(1h)} > {host:kafka.topic.lag[group,topic].avg(1w)} * 3
告警消息优化:
- 在触发器动作中,使用宏
{ITEM.LASTVALUE} 和 {ITEM.HOST} 等,使告警信息包含具体的 Lag 数值、Topic 和 Consumer Group 名称。
总结
| 方案 |
优点 |
缺点 |
适用场景 |
|---|
| 方案一:自定义脚本 |
灵活、准确、性能好、主流方案 |
需要自行编写和维护脚本 |
绝大多数生产环境,尤其是需要监控特定业务 Topic 的场景 |
| 方案二:JMX |
无需额外脚本,可同时获取Broker指标 |
配置复杂、性能开销大、Lag监控不便 |
小型集群,且已启用JMX进行Broker监控 |
| 方案三:社区模板 |
开箱即用,节省开发时间 |
可能不符合特定需求,需调试 |
想快速上手,且模板功能匹配 |
生产环境推荐组合:
- 核心方案:采用 方案一(自定义脚本 + LLD),实现全自动、低成本的 Consumer Lag 监控。
- 补充方案:可同时使用方案二或 Zabbix 的 JMX 模板监控 Kafka Broker 本身的健康度(如活跃控制器、网络处理器比率、请求队列大小等)。
按照上述步骤,您可以构建一个稳定、可扩展的 Kafka Topic 积压监控体系,为系统的稳定运行提供有力保障。