Kafka安装步骤

Stella981
• 阅读 530

基本概念

1.Producer:消息生产者,就是向 kafka broker 发消息的客户端
2.Consumer:消息消费者,向 kafka broker 取消息的客户端
3.Consumer Group(CG ):消费者组,由多个 consumer 组成。 消费者组内每个消费者负责消费不同分区的数据, 一个分区只能由一个 组内 消费者消费; 消费者组之间互不影响。 所有的消费者都属于某个消费者组,即 消费者组是逻辑上的一个订阅者。
4.Broker:一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。
5.Topic:可以理解为一个队列, 生产者和消费者面向的都是一个 topic
6.Partition:为了实现扩展性, 一个非常大的 topic 可以分布到多个 broker (即服务器) 上,一个 topic  可以分为多个 partition,每个 partition 是一个有序的队列;
7.Replica: 副本, 为保证集群中的某个节点发生故障时, 该节点上的 partition 数据不丢失,且 kafka 仍然能够继续工作, kafka 提供了副本机制, 一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower。
8.leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。
9.follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

采用伪集群的方式部署Kafka集群

# 下载软件
cd /usr/local/src
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -xvf kafka_2.12-2.3.0.tgz
cd kafka_2.12-2.3.0

# 解压缩后进入目录中
# 方法1:使用同一个程序,但是采用不同的配置文件
# 方法2:复制两份目录出来,使用不同的目录区分
# 此处采用的是方法1,在后续步骤采用的是方法2

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties 

# log.dir表示的是队列数据存储的路径,根据实际情况修改

config/server.properties: 
    broker.id=0 
    listeners=PLAINTEXT://:9092
    log.dir=/data/kafka-logs-0
    delete.topic.enable=true #删除 topic 功能
    
config/server-1.properties: 
    broker.id=1 
    listeners=PLAINTEXT://:9093 
    log.dir=/data/kafka-logs-1
    delete.topic.enable=true #删除 topic 功能

config/server-2.properties: 
    broker.id=2 
    listeners=PLAINTEXT://:9094 
    log.dir=/data/kafka-logs-2
    delete.topic.enable=true

后台启动内置的zookeeper

# 测试可以使用Kafka自带的zookeeper。生产上建议使用单独的zookeeper集群
# 若是搭建集群的话,方法同Kafka
# 此处先启动一个zookeeper,后续采用同Kafka的方法2搭建集群使用
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

关闭zookeeper

bin/zookeeper-server-stop.sh stop

开启kafk集群(伪集群),shell脚本

bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh -daemon config/server-1.properties
bin/kafka-server-start.sh -daemon config/server-2.properties

关闭kafk集群(伪集群),shell脚本

bin/zookeeper-server-stop.sh stop

查看当前服务器中的所有 topic

bin/kafka-topics.sh --list --zookeeper localhost:2181 

创建 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic first

删除 topic(需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除)

bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic first

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first

消费消息(默认当前,不是从头开始)

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first

消费消息,从头开始

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic second --from-beginning

查看某个 Topic 的详情

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic first

修改分区数

bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic first --partitions 6

自带压测命令

bin/kafka-producer-perf-test.sh --topic first --num-records 1000 --record-size 100 --throughput 1000  --producer-props bootstrap.servers=localhost:9092

安装Kafka Eagle监控Kafka集群

1.修改 kafka-server-start.sh 命令 因为使用的是伪集群,所以,需要把目录复制三份,每个目录下该文件的JMX_PORT值都不一样才行

正好利用这种情况,每个目录下都可以启动自带的zookeeper,修改端口号,从而构建一个zookeeper伪集群

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
fi

2.下载Kafka Eagle软件

cd /usr/local/src/
wget https://github.com/smartloli/kafka-eagle-bin/archive/v1.3.9.tar.gz
tar -zxvf v1.3.9.tar.gz
cd kafka-eagle-bin-1.3.9/
cp kafka-eagle-web-1.3.9-bin.tar.gz ../
tar -zxvf kafka-eagle-web-1.3.9-bin.tar.gz 
cd kafka-eagle-web-1.3.9/conf
vim system-config.properties

3.设置 若是使用zookeeper伪集群,做相应的增加

vim system-config.properties

kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181

cluster1.kafka.eagle.offset.storage=kafka

kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false

# 使用sqlite数据库,注意保存路径,没有的话需要新建,可以换成MySQL
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org

4.安装maven

yum install maven

5.配置环境变量

vim /etc/profile.d/ke.sh
export KE_HOME=/usr/local/src/kafka-eagle-web-1.3.9
export PATH=$PATH:$KE_HOME/bin

vim /etc/profile.d/java.sh
export JAVA_HOME=//usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64
export JRE_HOME=$JAVA_HOME/jre
export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

source /etc/profile.d/ke.sh
soucer /etc/profile.d/ke.sh

6.启动

cd bin
bash ke.sh start.sh

*******************************************************************
* Kafka Eagle system monitor port successful... 
*******************************************************************
[2019-09-10 16:41:46] INFO: Status Code[0]
[2019-09-10 16:41:46] INFO: [Job done!]
Welcome to
    __ __    ___     ____    __ __    ___            ______    ___    ______    __     ______
   / //_/   /   |   / __/   / //_/   /   |          / ____/   /   |  / ____/   / /    / ____/
  / ,<     / /| |  / /_    / ,<     / /| |         / __/     / /| | / / __    / /    / __/   
 / /| |   / ___ | / __/   / /| |   / ___ |        / /___    / ___ |/ /_/ /   / /___ / /___   
/_/ |_|  /_/  |_|/_/     /_/ |_|  /_/  |_|       /_____/   /_/  |_|\____/   /_____//_____/   
                                                                                             

Version 1.3.9
*******************************************************************
* Kafka Eagle Service has started success.
* Welcome, Now you can visit 'http://127.0.0.1:8048/ke'
* Account:admin ,Password:123456
*******************************************************************
* <Usage> ke.sh [start|status|stop|restart|stats] </Usage>
* <Usage> https://www.kafka-eagle.org/ </Usage>
*******************************************************************

提示的访问地址是http://127.0.0.1:8048/ke,但是可以用http://192.168.0.145/ke进行访问

点赞
收藏
评论区
推荐文章
blmius blmius
2年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Easter79 Easter79
2年前
swap空间的增减方法
(1)增大swap空间去激活swap交换区:swapoff v /dev/vg00/lvswap扩展交换lv:lvextend L 10G /dev/vg00/lvswap重新生成swap交换区:mkswap /dev/vg00/lvswap激活新生成的交换区:swapon v /dev/vg00/lvswap
Jacquelyn38 Jacquelyn38
2年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
Stella981 Stella981
2年前
KVM调整cpu和内存
一.修改kvm虚拟机的配置1、virsheditcentos7找到“memory”和“vcpu”标签,将<namecentos7</name<uuid2220a6d1a36a4fbb8523e078b3dfe795</uuid
Wesley13 Wesley13
2年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
2年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Stella981 Stella981
2年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
2年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
3个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这