2.部署kafka:9092

news/2025/2/26 18:14:44

官方文档:http://kafka.apache.org/documentation.html

(虽然kafka中集成了zookeeper,但还是建议使用独立的zk集群)

Kafka3台集群搭建环境:

操作系统: centos7

防火墙:全关

3台zookeeper集群内的机器,1台logstash

软件版本: zookeeper-3.4.12.tar.gz

软件版本kafka_2.12-2.1.0.tgz

安装软件

(3台zookeeper集群的机器)

# tar xf kafka_2.12-2.1.0.tgz -C /usr/local/

# ln -s /usr/local/kafka_2.12-2.1.0/ /usr/local/kafka

创建数据目录(3台)

# mkdir /data/kafka-logs

修改第一台配置文件

(注意不同颜色标记的部分)

# egrep -v "^$|^#" /usr/local/kafka/config/server.properties

broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样

listeners=PLAINTEXT://192.168.148.141:9092 #监听套接字

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

log.dirs=/data/kafka-logs #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数

#如果配置多个目录,新创建的topic把消息持久化在分区数最少那一个目录中

num.partitions=1 #默认的分区数,一个topic默认1个分区数

num.recovery.threads.per.data.dir=1 #在启动时恢复日志和关闭时刷新日志时每个数据目录的线程的数量,默认1

offsets.topic.replication.factor=2

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本数

replica.fetch.max.bytes=5242880 #取消息的最大字节数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间,到目录查看是否有过期的消息如果有,删除

zookeeper.connect=192.168.148.141:2181,192.168.148.142:2181,192.168.148.143:2181

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

修改另外两台配置文件

#scp /usr/local/kafka/config/server.properties kafka-2:/usr/local/kafka/config/

broker.id=2

listeners=PLAINTEXT://192.168.148.142:9092

# scp /usr/local/kafka/config/server.properties kafka-3:/usr/local/kafka/config/

broker.id=3

listeners=PLAINTEXT://192.168.148.143:9092

启动kafka(3台)

[root@host1 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties &

查看启动情况(3台)

[root@host1 ~]# jps

10754 QuorumPeerMain

11911 Kafka

12287 Jps

创建topic来验证

[root@host1 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.148.143:2181 --replication-factor 2 --partitions 1 --topic cien

出现Created topic "cien"验证成功运行

在一台服务器上创建一个发布者

[root@host2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.148.141:9092 --topic cien

> hello kafka

> ni hao ya

>

在另一台服务器上创建一个订阅者

[root@host3 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.148.142:9092 --topic cien --from-beginning

...

hello kafka

ni hao ya

如果都能接收到,说明kafka部署成功!

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.10.23:2181 --list #查看所有topic

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.10.23:2181 --topic qianfeng #查看指定topic的详细信息

Topic:qianfeng PartitionCount:1 ReplicationFactor:2 Configs:

Topic: qianfeng Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3

[root@host3 ~]# /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.10.23:2181 --topic qianfeng #删除topic

Topic qianfeng is marked for deletion.

Note: This will have no impact if delete.topic.enable is not set to true.

配置elfk集群订阅和zookeeperkafka

配置第一台logstash生产消息输出到kafka

yum -y install wget

wget https://d6.injdk.cn/oraclejdk/8/jdk-8u341-linux-x64.rpm

yum localinstall jdk-8u341-linux-x64.rpm -y

java -version

1.安装logstash

tar xf logstash-6.4.1.tar.gz -C /usr/local

ln -s /usr/local/logstash-6.4.1 /usr/local/logstash

2.修改配置文件

cd /usr/local/logstash/config/

vim logstash.yml

http.host: "0.0.0.0"

3.编写配置文件

不要过滤, logstash会将message内容写入到队列中

# cd /usr/local/logstash/config/

# vim logstash-kafka.conf

input {
    file {
      type => "sys-log"
      path => "/var/log/messages"
      start_position => beginning
    }
}
output {
    kafka {
      bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092"     #输出到kafka集群
      topic_id => "sys-log-messages"         #主题名称
      compression_type => "snappy"         #压缩类型
      codec =>  "json"
    }
}

启动logstash

# /usr/local/logstash/bin/logstash -f logstash-kafka.conf

kafka上查看主题,发现已经有了sys-log-messages,说明写入成功了

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --zookeeper 192.168.148.141:2181 --list

__consumer_offsets

qianfeng

sys-log-messages

[root@host2 ~]# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.148.141:2181 --topic sys-log-messages

Topic:sys-log-messages PartitionCount:1 ReplicationFactor:2 Configs:

Topic: sys-log-messages Partition: 0 Leader: 3 Replicas: 3,2 Isr: 3,2

配置第二台logstash,订阅kafka日志,输出到es集群

# cat kafka-es.conf

input {
	kafka {
		bootstrap_servers => "192.168.148.141:9092,192.168.148.142:9092,192.168.148.143:9092" 
    	topics => "sys-log-messages"          #kafka主题名称
      codec => "json"
    	auto_offset_reset => "earliest"
	}
}

output {
    elasticsearch {
       hosts => ["192.168.148.131:9200","192.168.148.132:9200"]
       index => "kafka-%{type}-%{+YYYY.MM.dd}"
    }
}

http://www.niftyadmin.cn/n/5869045.html

相关文章

java后端开发day18--学生管理系统

(以下内容全部来自上述课程) 1.业务分析并搭建主菜单 1.需求 采取控制台的方式去书写学生管理系统 2.分析 1.初始菜单 2.学生类 属性:id,姓名,年龄,家庭住址 3.添加功能 键盘录入每一个学生信息并…

嵌入式开发:傅里叶变换(4):在 STM32上面实现FFT(基于STM32L071KZT6 HAL库+DSP库)

目录 步骤 1:准备工作 步骤 2:创建 Keil 项目,并配置工程 步骤 3:在MDK工程上添加 CMSIS-DSP 库 步骤 5:编写代码 步骤 6:配置时钟和优化 步骤 7:调试与验证 步骤 8:优化和调…

分享httprunner 结合django实现平台接口自动化方案

说明,可以直接在某个视图集定义自定义接口来验证。 调试1:前端界面直接编写yaml文件. 新增要实现存数据到mysql,同时存文件到testcase下, 如test.yaml 更新yaml数据,同时做到更新 testcase下的文件,如test.yaml acti…

React 源码揭秘 | 更新队列

前面几篇遇到updateQueue的时候,我们把它先简单的当成了一个队列处理,这篇我们来详细讨论一下这个更新队列。 有关updateQueue中的部分,可以见源码 UpdateQueue实现 Update对象 我们先来看一下UpdateQueue中的内容,Update对象&…

docker部署go简单web项目(无mysql等附加功能)

首先准备好go语言代码 代码表示当访问主机上8080端口下的/hello路径时,会返回hello,world。 package mainimport ("fmt""github.com/gin-gonic/gin" )type hh struct {S string }func main() {router : gin.Default()router.GET(&…

简单理解Oracle中的latch

可以用一个小卖部抢购的例子来理解 Oracle 数据库中的 Latch: 1、 什么是 Latch? 打个比方,假设数据库的某个内存区域(比如缓存的数据块)是小卖部货架上的最后一包辣条,Latch 就像是货架前的一个狭窄通道&a…

【docker】namespace底层机制

Linux 的 Namespace 机制是实现容器化(如 Docker、LXC 等)的核心技术之一,它通过隔离系统资源(如进程、网络、文件系统等)为进程提供独立的运行环境。其底层机制涉及内核数据结构、系统调用和进程管理。以下是其核心实…

angular新闻列表分页

说明:使用angular技术,material控件,ngfor循环,img网络图片展示,分页组件 效果图: step1: C:\Users\Administrator\WebstormProjects\untitled4\src\app\home\home.component.ts import { Component, V…