文章目录 一、概述二、EFAK架构三、EFAK数据采集原理四、安装Kafka1)Kafka下载2)配置环境变量3)创建logs目录4)修改kafka配置5)修改zookeeper配置6)配置Zookeeper myid7)开启Kafka JMX监控8)将kafka目录推送到其它节点9)启动服务 五、安装EFAK1)下载EFAK2)创建数据库2)设置环境变量3)配置4)调整启动参数6)修改 works配置7)将EFAK推送其它节点8)启动 六、简单使用1)Kafka CLI简单使用2)EFAK常用命令 一、概述
EFAK(Eagle For Apache Kafka,以前称为 Kafka Eagle)是一款由国内公司开源的Kafka集群监控系统,可以用来监视kafka集群的broker状态、Topic信息、IO、内存、consumer线程、偏移量等信息,并进行可视化图表展示。独特的KQL还可以通过SQL在线查询kafka中的数据。
源码: https://github.com/smartloli/kafka-eagle/ 下载: http://download.kafka-eagle.org/ 官方文档:https://·mon.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn't match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:228) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)
【解决】在server.properties找到log.dirs配置的路径。将该路径下的meta.properties文件删除,或者编辑meta.properties文件修改里面的cluster.id即可。
8)将kafka目录推送到其它节点 $ scp -r $KAFKA_HOME hadoop-node2:/opt/bigdata/hadoop/server/ $ scp -r $KAFKA_HOME hadoop-node3:/opt/bigdata/hadoop/server/ # 在hadoop-node2和hadoop-node3节点上设置环境变量 $ vi /etc/profile export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1 export PATH=$PATH:$KAFKA_HOME/bin $ source /etc/profile # 修改advertised.listeners,值改成对应的hostname或者ip【温馨提示】修改hadoop-node2上server.properties文件的broker.id,设置为1和2,只要不重复就行,advertised.listeners地址改成对应机器IP。
9)启动服务启动zookeeper集群之后再启动kafka集群
$ cd $KAFKA_HOME # -daemon后台启动 $ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties # 默认端口2181,可以在配置自定义,这里修改为12181端口 $ netstat -tnlp|grep 12181启动Kafka
$ cd $KAFKA_HOME # 默认端口9092,这里修改成了19092,可以修改listeners和advertised.listeners $ ./bin/kafka-server-start.sh -daemon ./config/server.properties $ netstat -tnlp|grep 9092 $ jps 五、安装EFAK 1)下载EFAK $ cd /opt/bigdata/hadoop/software $ wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz $ tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/bigdata/hadoop/server/ $ cd /opt/bigdata/hadoop/server/ $ tar -xf 2)创建数据库 $ mysql -uroot -p 123456 create database ke; 2)设置环境变量 $ vi /etc/profile export KE_HOME=/opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0/efak-web-2.1.0 export PATH=$PATH:$KE_HOME/bin $ source /etc/profile 3)配置这里设置hadoop-node1为master节点,其它两个节点为slave节点,修改参数
$ vi $KE_HOME/conf/system-config.properties # Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here efak.zk.cluster.alias=cluster1 cluster1.zk.list=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181 ###################################### # kafka jmx 地址,默认Apache发布的Kafka基本是这个默认值, # 对于一些公有云Kafka厂商,它们会修改这个值, # 比如会将jmxrmi修改为kafka或者是其它的值, # 若是选择的公有云厂商的Kafka,可以根据实际的值来设置该属性 ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi # Zkcli limit -- Zookeeper cluster allows the number of clients to connect to # If you enable distributed mode, you can set value to 4 or 8 kafka.zk.limit.size=16 # EFAK webui port -- WebConsole port access address efak.webui.port=8048 ###################################### # EFAK enable distributed,启用分布式部署 ###################################### efak.distributed.enable=true # 设置节点类型slave or master # master worknode set status to master, other node set status to slave efak.cluster.mode.status=master # deploy efak server address efak.worknode.master.host=hadoop-node1 efak.worknode.port=8085 # Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option cluster1.efak.offset.storage=kafka # Whether the Kafka performance monitoring diagram is enabled efak.metrics.charts=true # EFAK keeps data for 30 days by default efak.metrics.retain=15 # If offset is out of range occurs, enable this property -- Only suitable for kafka sql efak.sql.fix.error=false efak.sql.topic.records.max=5000 # Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete efak.topic.token=keadmin # 关闭自带的sqlite数据库,使用外部的mysql数据库 # Default use sqlite to store data # efak.driver=org.sqlite.JDBC # It is important to note that the '/hadoop/kafka-eagle/db' path must be exist. # efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db # efak.username=root # efak.password=smartloli # 配置外部数据库 # (Optional) set mysql address efak.driver=com.mysql.jdbc.Driver efak.url=jdbc:mysql://hadoop-node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=123456 4)调整启动参数EFAK默认启动内存大小为2G,考虑到服务器情况可以将其调小
## 在 efak 安装目录执行 $ vi $KE_HOME/bin/ke.sh ## 将 KE_JAVA_OPTS 最大最小容量调小,例如: export KE_JAVA_OPTS="-server -Xmx512m -Xms512m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80" 6)修改 works配置默认是localhost
$ cat >$KE_HOME/conf/works<<EOF hadoop-node2 hadoop-node3 EOF 7)将EFAK推送其它节点 # hadoop-node1推送 $ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node2:/opt/bigdata/hadoop/server/ $ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node3:/opt/bigdata/hadoop/server/ # 在hadoop-node2和hadoop-node3配置环境变量修改节点类型为slave 8)启动上面配置文件配置的是分布式部署,当然也可以单机跑,但是不建议单机跑
# 在master节点上执行 $ cd $KE_HOME/bin $ chmod +x ke.sh # 单机版启动 $ ke.sh start # 集群方式启动 $ ke.sh cluster start $ ke.sh cluster restartweb UI:http://192.168.0.113:8048/ 账号密码:admin/123456
六、简单使用 1)Kafka CLI简单使用【增】添加topic
# 创建topic,1副本,1分区,设置数据过期时间72小时(-1表示不过期),单位ms,72*3600*1000=259200000 $ kafka-topics.sh --create --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000【查】
# 查看topic列表 $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --list # 查看topic列表详情 $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe # 指定topic $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002 # 查看消费者组 $ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --list $ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002【改】这里主要是修改最常用的三个参数:分区、副本,过期时间
# 修改分区,扩分区,不能减少分区 $ kafka-topics.sh --alter --bootstrap-server hadoop-node1:9092 --topic test002 --partitions 2 # 修改过期时间,下面两行都可以 $ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --topic test002 --add-config retention.ms=86400000 $ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000 # 修改副本数,将副本数修改成3 $ cat >1.json<<EOF {"version":1, "partitions":[ {"topic":"test002","partition":0,"replicas":[0,1,2]}, {"topic":"test002","partition":1,"replicas":[1,2,0]}, {"topic":"test002","partition":2,"replicas":[2,0,1]} ]} EOF $ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002【删】
$ kafka-topics.sh --delete --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092【生成者】
$ kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic test002 {"id":"1","name":"n1","age":"20"} {"id":"2","name":"n2","age":"21"} {"id":"3","name":"n3","age":"22"}【消费者】
# 从头开始消费 $ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --from-beginning # 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区 $ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --partition 0 --offset 100【消费组】
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --group test002【查看数据积压】
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002 2)EFAK常用命令$KE_HOME/bin/ke.sh启动脚本中包含以下命令:
EFAK环境部署,和kafka的一些简单操作就到这里了,后续会分享KSQL和其它更详细的页面化操作,请小伙伴耐心等待~
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |
标签: #kafka图形化工具 #for #apache #Kafka以前称为 #Kafka