Kafka 入门 and kafka+logstash 实战使用51CTO博客 - 乐橙lc8

Kafka 入门 and kafka+logstash 实战使用51CTO博客

2019年03月09日09时00分42秒 | 作者: 谷冬 | 标签: 音讯,顾客,一个 | 浏览: 2532

一、基础理论


这块是整个kafka的中心不管你是先操作在来看仍是先看在操作都需求多看几遍。

首要来了解一下Kafka所运用的根本术语

Topic
Kafka将音讯种子(Feed)分门别类 每一类的音讯称之为论题(Topic).
Producer
发布音讯的目标称之为论题生产者(Kafka topic producer)
Consumer
订阅音讯并处理发布的音讯的种子的目标称之为论题顾客(consumers)
Broker
已发布的音讯保存在一组效劳器中称之为Kafka集群。集群中的每一个效劳器都是一个署理(Broker). 顾客能够订阅一个或多个论题并从Broker拉数据然后消费这些已发布的音讯。

让咱们站的高一点从高的视点来看Kafka集群的事务处理就像这姿态

Client和Server之间的通讯是经过一条简略、高功能而且和开发言语无关的TCP协议。除了Java Client外还有十分多的其它编程言语的Client。


论题和日志  (Topic和Log)

让咱们更深化的了解Kafka中的Topic。

Topic是发布的音讯的类别或许种子Feed名。关于每一个TopicKafka集群保护这一个分区的log就像下图中的示例

每一个分区都是一个次序的、不可变的音讯行列 而且能够继续的增加。分区中的音讯都被分配了一个序列号称之为偏移量(offset)在每个分区中此偏移量都是仅有的。 Kafka集群坚持一切的音讯直到它们过期 不管音讯是否被消费了。 实践上顾客所持有的仅有的元数据就是这个偏移量也就是顾客在这个log中的方位。 这个偏移量由顾客操控正常状况当顾客消费音讯的时分偏移量也线性的的增加。可是实践偏移量由顾客操控顾客能够将偏移量重置为更老的一个偏移量从头读取音讯。 能够看到这种规划对顾客来说操作自若 一个顾客的操作不会影响其它顾客对此log的处理。 再说说分区。Kafka中选用分区的规划有几个意图。一是能够处理更多的音讯不受单台效劳器的约束。Topic具有多个分区意味着它能够不受限的处理更多的数据。第二分区能够作为并行处理的单元。

散布式(Distribution)

Log的分区被散布到集群中的多个效劳器上。每个效劳器处理它分到的分区。 依据装备每个分区还能够仿制到其它效劳器作为备份容错。 每个分区有一个leader零或多个follower。Leader处理此分区的一切的读写恳求而follower被迫的仿制数据。假定leader宕机其它的一个follower会被推举为新的leader。 一台效劳器或许一起是一个分区的leader另一个分区的follower。 这样能够平衡负载防止一切的恳求都只让一台或许某几台效劳器处理。

生产者(Producers)

生产者往某个Topic上发布音讯。生产者也担任挑选发布到Topic上的哪一个分区。最简略的方法从分区列表中轮番挑选。也能够依据某种算法依照权重挑选分区。开发者担任怎么挑选分区的算法。

顾客(Consumers)

一般来讲音讯模型能够分为两种 行列和发布-订阅式。 行列的处理方法是 一组顾客从效劳器读取音讯一条音讯只要其间的一个顾客来处理。在发布-订阅模型中音讯被播送给一切的顾客接收到音讯的顾客都能够处理此音讯。Kafka为这两种模型供给了单一的顾客笼统模型 顾客组 consumer group。 顾客用一个顾客组名符号自己。 一个发布在Topic上音讯被分发给此顾客组中的一个顾客。 假定一切的顾客都在一个组中那么这就变成了queue模型。 假定一切的顾客都在不同的组中那么就彻底变成了发布-订阅模型。 更通用的 咱们能够创立一些顾客组作为逻辑上的订阅者。每个组包括数目不等的顾客 一个组内多个顾客能够用来扩展功能和容错。正如下图所示

  2个kafka集群保管4个分区P0-P32个顾客组消费组A有2个顾客实例消费组B有4个。


正像传统的音讯体系相同Kafka确保音讯的次序不变。 再具体扯几句。传统的行列模型坚持音讯而且确保它们的先后次序不变。可是 虽然效劳器确保了音讯的次序音讯仍是异步的发送给各个顾客顾客收到音讯的先后次序不能确保了。这也意味着并行消费将不能确保音讯的先后次序。用过传统的音讯体系的同学必定清楚音讯的次序处理很让人头痛。假定只让一个顾客处理音讯又违反了并行处理的初衷。 在这一点上Kafka做的更好虽然并没有彻底处理上述问题。 Kafka选用了一种分而治之的战略分区。 由于Topic分区中音讯只能由顾客组中的仅有一个顾客处理所以音讯必定是依照先后次序进行处理的。可是它也仅仅是确保Topic的一个分区次序处理不能确保跨分区的音讯先后处理次序。 所以假定你想要次序的处理Topic的一切音讯那就只供给一个分区。

Kafka的确保(Guarantees)

生产者发送到一个特定的Topic的分区上的音讯将会依照它们发送的次序顺次参加


顾客收到的音讯也是此次序


假定一个Topic装备了仿制因子( replication facto)为N 那么能够答应N-1效劳器宕机而不丢掉任何现已增加的音讯



Kafka官网

http://kafka.apache.org/


作者半兽人
链接http://orchome.com/5
来历OrcHome
著作权归作者一切。商业转载请联络作者取得授权非商业转载请注明出处。


二、装置和发动


1、下载二进制装置包直接解压

tar xf kafka_2.11-0.10.0.1.tgz
cd kafka_2.11-0.10.0.1


2、发动效劳

Kafka需求用到ZooKeepr所以需求先发动一个ZooKeepr效劳端假定没有独自的ZooKeeper效劳端能够运用Kafka自带的脚本快速发动一个单节点ZooKeepr实例

bin/zookeeper-server-start.sh config/zookeeper.properties  # 发动zookeeper效劳端实例

bin/kafka-server-start.sh config/server.properties  # 发动kafka效劳端实例


三、根本操作指令


1、新建一个主题topic

创立一个名为“test”的Topic只要一个分区和一个备份

bin/kafka-topics.sh create zookeeper localhost:2181 replication-factor 1 partitions 1 topic test


2、创立好之后能够经过运转以下指令检查已创立的topic信息

bin/kafka-topics.sh list  zookeeper localhost:2181


3、发送音讯

Kafka供给了一个指令行的东西能够从输入文件或许指令行中读取音讯并发送给Kafka集群。每一行是一条音讯。

运转producer生产者,然后在操控台输入几条音讯到效劳器。

bin/kafka-console-producer.sh broker-list localhost:9092 topic test 
This is a message
This is another message


4、消费音讯

Kafka也供给了一个消费音讯的指令行东西,将存储的信息输出出来。

bin/kafka-console-consumer.sh zookeeper localhost:2181 topic test from-beginning
This is a message
This is another message


5、检查topic具体状况

bin/kafka-topics.sh describe zookeeper localhost:2181  topic peiyinlog

Topic: 主题称号

Partition: 分片编号

Leader: 该分区的leader节点

Replicas: 该副本存在于哪个broker节点

Isr: 活泼状况的broker


6、给Topic增加分区

bin/kafka-topics.sh zookeeper 192.168.90.201:2181 alter topic test2 partitions 20


7、删去Topic

bin/kafka-topics.sh zookeeper zk_host:port/chroot delete topic my_topic_name


主题(Topic)删去选项默许是封闭的,需求效劳器装备敞开它。

delete.topic.enable=true


注:假定需求在其他节点作为客户端运用指令衔接kafka broker,则需求留意以下两点(二选一即可)

另 : ( 运用logstash input 衔接kafka也需求留意 )


1、设置kafka broker 装备文件中 host.name 参数为监听的IP地址


2、给broker设置一个仅有的主机名,然后在本机/etc/hosts文件装备解析到自己的IP(当然假定有内网的DNS效劳器也行),一起还需求在zk server 和 客户端的 /etc/hosts 增加broker主机名的解析。 


原因详解:


场景假定

broker_server ip主机名zookeeper ip客户端 ip
192.168.1.2 默许 localhost192.168.1.4192.168.1.5
# 此刻客户端向broker主张一些消费:

bin/kafka-console-consumer.sh zookeeper 192.168.1.4:2181 topic test2 from-beginning


这时客户端衔接到zookeeper要求消费数据,zk则回来broker的ip地址和端口给客户端,可是假定broker没有设置host.name 和 advertised.host.name  broker默许回来的是自己的主机名,默许就是localhost和端口9092,这时客户端拿到这个主机名解析到自己,操作失利。


所以,需求装备broker 的host.name参数为监听的IP,这时broker就会回来IP。 客户端就能正常衔接了。


或许也能够设置好broker的主机名,然后分别给两边装备好解析。


四、broker根本装备

#  server.properties

broker.id=0  # broker节点的仅有标识 ID 不能重复。
host.name=10.10.4.1  # 监听的地址,假定不设置默许回来主机名给zk_server
log.dirs=/u01/kafka/kafka_2.11-0.10.0.1/data  # 音讯数据寄存途径
num.partitions=6  # 默许主题(Topic)分片数
log.retention.hours=24  # 音讯数据的最大保存时长
zookeeper.connect=10.160.4.225:2181  # zookeeper server 衔接地址和端口



五、Logstash + Kafka 实战运用


Logstash-1.51才开端内置Kafka插件,也就是说用之前的logstash版别是需求手动编译Kafka插件的,信任也很少人用了。主张运用2.3以上的logstash版别。


1、运用logstash向kafka写入一些数据


软件版别:

logstash 2.3.2 

kafka_2.11-0.10.0.1


logstash output 部分装备

output {
  kafka {
    workers => 2
    bootstrap_servers => "10.160.4.25:9092,10.160.4.26:9092,10.160.4.27:9092"
    topic_id => "xuexilog"

}

}


参数解说 : 

workers:用于写入时的作业线程

bootstrap_servers:指定可用的kafka broker实例列表

topic_id:指定topic称号,能够在写入前手动在broker创立界说好分片数和副本数,也能够不提早创立,那么在logstash写入时会主动创立topic,分片数和副本数则默许为broker装备文件中设置的。



2、运用logstash消费一些数据,并写入到elasticsearch


软件版别:

logstash 2.3.2 

elasticsearch-2.3.4


logstash 装备文件

input{
    kafka {
        zk_connect => "112.100.6.1:2181,112.100.6.2:2181,112.100.6.3:2181"
        group_id => "logstash"
        topic_id => "xuexilog"
        reset_beginning => false
        consumer_threads => 5
        decorate_events => true

}

}

# 这儿group_id 需求解说一下,在Kafka中,相同group的Consumer能够一起消费一个topic,不同group的Consumer作业则互不搅扰。
# 弥补: 在同一个topic中的同一个partition一起只能由一个Consumer消费,当同一个topic一起需求有多个Consumer消费时,则能够创立更多的partition。

output {
    if [type]  "nginxacclog" {
        elasticsearch {
            hosts => ["10.10.1.90:9200"]
            index => "logstash-nginxacclog-%{+YYYY.MM.dd}"
            manage_template => true
            flush_size => 50000
            idle_flush_time => 10
            workers => 2
}

}

}


3、经过group_id 检查当时具体的消费状况

bin/kafka-consumer-groups.sh group logstash describe zookeeper 127.0.0.1:2181


输出解说:

GROUPTOPICPARTITIONCURRENT-OFFSETLOG-END-OFFSETLAG
顾客组论题id分区id当时已消费的条数总条数未消费的条数


版权声明
本文来源于网络,版权归原作者所有,其内容与观点不代表乐橙lc8立场。转载文章仅为传播更有价值的信息,如采编人员采编有误或者版权原因,请与我们联系,我们核实后立即修改或删除。

猜您喜欢的文章