Kafka笔记_0
消费模式
点对点模式(P2P
)
- 队列中只有一个消费者可以消费数据,用后即销毁,因此数据有且只有一次消费。
- 适合用于短信业务,发送一次,消费一次。
特点:
- 每个消息只有一个接收者
- 发送和接收之间没有依赖,发送者成功发送消息后,不管接收有没有运行,都可以再次发送消息
- 接收者在成功接收后需要向队列应答成功,以便消息队列删除当前消息
发布订阅模式(PS
)
- 数据会在队列中存储7天,每个订阅都可以消费到相应的数据,可以重复进行消费数据。
- 大部分都是发布订阅模式。
特点:
- 每个消息有多个订阅者
- 发布者和订阅者之间有时间上的依赖性。针对某个Topic
的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
- 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行
基础架构
Kafka
不是主从集群,因此每一个节点都可以是主节点,如果一个节点宕机了,那么其他的节点可以被选举为主节点Kafka
中每一个节点都称为Broker
,每个节点都存在一个Kafka_controller
组件- 但是只有一台节点的
controller
组件是活跃状态,其他都是standby
状态 - 主节点宕机时,从节点通过
ZooKeeper
进行集群选举,得到新的主节点
Feed
流实现方案
拉模式 | 推模式 | 推拉结合 | |
---|---|---|---|
写比例 | 低 | 高 | 中 |
读比例 | 高 | 低 | 中 |
用户读取延迟 | 高 | 低 | 低 |
实现难度 | 复杂 | 简单 | 很复杂 |
使用场景 | 较少使用 | 用户量少,没有大V | 过千万的用户量,有大V |
消息队列应用
线程与线程之间的数据交互
- 栈内存是线程独享的,堆内存是共享的
进程与进程之间的数据交互
- 进程有自己独立的内存,无法通过堆内存共享数据
传统数据交互的问题
- 如果线程1发送数据的速度是
50 条/s
,但是线程2处理数据的速度是30 条/s
,就会导致每秒有20条数据积压在内存中,最后导致内存溢出,服务终止 - 如果放在磁盘文件中,磁盘文件也是有上限的,并且速度较慢
消息中间件
- 进程之间直接进行交互,可以在进程之间添加一个缓冲区,用于解耦合
- 发送数据的进程只需要考虑往缓冲区中存放数据,也就是消息中间件,从而实现降低系统之间的耦合性
- 消息一般包含消息头,消息属性和消息主体内容
Kafka
基本概念
基本命令参数
- 连接
Kafka
必须要的参数:--bootstrap-server ip:port(9092)
9092
是Kafka
默认的端口号 Kafka
创建新topic
需要在后面跟上:--topic 主题名 --create
- 查看
topic
:--list
- 查看某个具体的
topic
:--topic 主题名 --describe
- 修改
topic
:--topic 主题名 --alter --修改的内容
- 删除
topic
:--topic 主题名 --delete
单节点架构
- 消息称为
record
App
通过调用Kafka
的接口实现生产和消费,会将数据保存到.log
文件中,增加标记offset
,固定顺序
集群架构
- 中间单个节点容易成为性能瓶颈,如果宕机,则数据无法访问
- 两种方式解决: 1. 横向扩展 2. 纵向扩展
横向扩展
- 增加服务节点,搭建集群,降低单点故障带来的问题
- 在一定程度上缓解,不是真正完全解决,因为主题在指定的
Broker
上,所以依然必须要访问某一个特定的Broker
- 因此真正解决需要将一个
topic
分成多个部分,所有生产数据可以指向不同的Broker
,但是都是同一个topic
- 在一定程度上缓解,不是真正完全解决,因为主题在指定的
纵向扩展
- 增加系统的资源配置,更快的网络,更大的硬盘
topic
是一个整体,但是划分了不同的数据块- 通过添加不同的编号,即为不同的分区(
partition
) - 生产者就可以将数据发向不同的
partition
,消费者可以消费同一个topic
不同的partition
- 通过添加不同的编号,即为不同的分区(
Kafka
设置了消费者组Consumer Group
,专门用来消费不同partition
的消息,这样避免发送多次消费请求
- 为了数据的可靠性,可以将数据进行备份,但是
Kafka
没有备份的概念,只有副本的概念。 - 多个副本同时只有一个能进行读写操作,称为
Leader
副本,其他副本称为Follower
副本 - 副本个数不应该超过broker数量 - 1
Kafka
有一个管理的Broker
,称为控制器(Controller
)
如果Controller
宕机了,如何解决
- 给
Controller
添加备份 - 每一个
Broker
都可以做备份,如果Controller
宕机了,则会使用ZooKeeper
选取一个新的Controller
ZooKeeper
选举流程
ZooKeeper
首先没有任何的节点,启动Broker1
,则会产生临时节点Controller
,id=1
- 然后启动
Broker2
和Broker3
后,不能再产生临时节点,就会创建一个监听器,监听节点的变化 Controller
失去连接了以后,监听器就会发生作用,选举出新的Controller
Broker
启动流程
Broker1
启动流程
- 在
ZooKeeper
注册Broker
节点,创建临时节点ids
- 监听
/controller
节点 - 注册
/controller
节点 - 注册成功,选举称为
Controller
,监听临时节点/brokers/ids
,从而可以对后面加入的节点进行响应
Broker2
启动流程
- 在
ZooKeeper
注册Broker
节点,在ids
里面创建临时节点2 - 监听
/controller
临时节点 - 注册
/controller
临时节点,因为ZooKeeper
中已经存在Controller
,所以不会注册成功 - 通知集群变化
- 连接
Broker
,发送集群的相关数据
Broker3
启动流程:
- 注册
Broker
节点,在ids
里面创建临时节点3 - 监听
/controller
临时节点 - 注册
/controller
临时节点,注册不成功 - 通知集群变化
- 连接所有的
Broker
,发送集群的相关信息
删除Controller
节点(宕机):
- 由监听器通知节点删除
- 注册
Controller
临时节点,只有一个节点能注册成功 - 增加
ids
监听器,删除曾经的Controller
监听器 - 连接所有的
Broker
,发送集群的相关数据
底层实现
Broker
和ZooKeeper
之间的通信: 节点带有ZooKeeperClient
实现通信Controller
和Broker
之间的通信:NIO
的Channel, Buffer, Selector, SelectionKey
生产
创建主题
- 生产消息的时候如果不存在对应的主题,
Kafka
会自动创建主题(可以通过配置关闭) - 要关注每个
Broker
中的Leader
的数量,平衡分配会更合理一些 Kafka
默认的副本分配未必是平均分配,所以只能尽可能的合理分配,也可以自己指定副本的分配方案
分区计算
- 如果发送消息中指定了分区编号,则会直接向分区中发送消息,并且不会检查分区编号是否有问题。如果写了一个不存在的编号,则消息一直阻塞
- 自定义分区器实现
Partitioner
,会检查编号是否小于0,但是过大依然不会检查 - 把
key
做了一个非哈希散列算法murmur2
,取余分区数量,得到的结果就是分区编号。所以指定的key
主要用来做分区计算的
- 为了节省网络资源消耗,设计了
RecordAccumulator
数据收集器,将多个消息一起发送,而不是简单的一条一条发送 - 数据收集器内部都是一些批次对象,如果批次对象的容量足够,则会自动往里面
append
数据 - 如果批次对象的容量不够了,则会将批次对象锁定关闭,不再接收新的数据,开启一个新的批次对象,用于接收数据
- 批次对象中有一个
16K
的阈值,不是不能超过16K
,而是超过16K
再关闭
应答处理(三个级别)
ACKS = 0
: 优先考虑数据传送效率,数据是否真的发送过去是不关心的。假设网络是没有任何问题的,发送完一条数据立马发送下一条ACKS = 1
: 可靠性和发送效率折中,Leader
保存数据到磁盘后,就响应发送成功。但是如果Leader
宕机了,Follower
没有来得及备份,则数据无法访问ACKS = -1 or ALL
: 优先考虑数据的安全性,Leader
和Follower
把数据都保存了,然后才响应发送成功,效率很低
- 这里的
Follower
是ISR
中的所有副本,而不是本身的所有副本
重试机制Retry
ACKS = 1
: 如果一个数据,从主线程出发到Buffer
缓冲区后到Sender
线程中,发送到NetworkClient
网络客户端,再发送到SocketServer
服务器。如果这个时候数据没有到ReplicaManager
,Leader
宕机,则无法进行副本保存。此时NetworkClient
客户端有一份数据,SocketServer
有一份数据,但是由于宕机,导致SocketServer
的数据丢失,没有保存则不会有响应,此时会有超时时间。如果超时了会进行Retry
。- 将
NetworkClient
的数据重新传送到Buffer
中,再经过Sender
线程,再回到NetworkClient
进行发送 - 默认是不断重试,重试次数是整型的最大值
- 将
重试机制的问题(重复和乱序)
- 数据重试机制可能会导致数据重复(超时时间过少导致的)
Buffer
有三条数据,被网络客户端发送到Broker
中,生产者可同时处理五个请求,所以三个数据可以一起处理。- 但是如果三条数据中有一条处理失败了,则会重新发送那一条数据,但是此时会导致乱序问题
如何解决重复和乱序(幂等性)
- 开启幂等性: 配置
ENABLE_IDEMPOTENCE_CONFIG
(默认不启用)
开启幂等性要求
ACKS = -1
- 开启重试机制
- 在途请求缓冲区不能超过5(默认就是5)
- 给数据开启唯一的标识,包括两个部分,分别是生产者ID,数据的顺序号
-
不同的
partition
数据可能有相同的标号,但是同一个partition
内序号一定是唯一的 -
所以幂等性只能保证对同一个
partition
起作用 -
Broker
可以保存生产者状态,里面只有五条数据。如果是同一个Producer
,同一个partition
,就可以对比唯一标识判断数据是否重复 -
乱序是判断当前的序号是否连续
- 如果6失效了,会尝试将7插入进去,发现前面是5后面是7,不连续,则6和7都会重发
- 如果5后面的6正常,则会将1弹出后,再将6传入,保证数据按序输出
- 这个队列的大小为五,所以要求在途请求缓冲区不能超过5
幂等性跨会话问题
- 生产者ID是随机生成的,如果生产者重启了,则生产者ID也会发生改变。此时比对序号的操作时会发现比对不成功
- 可以通过事务解决
- 数据唯一标识中添加事务ID,事务是基于幂等性操作,保证一次提交。因此事务可以解决跨会话的幂等性问题,但是不能解决跨分区的问题
-
数据传输语义
数据传输语义 | 含义 | 场景 |
---|---|---|
at most once | 最多一次,不管是否能够接收到,数据最多传输一次,可能丢失 | Socket ACK = 0 |
at least once | 最少一次,数据不会丢失,如果接收不到,则继续发送直到接收,可能会出现数据重复 | ACK = 1 |
exactly once | 精准一次,消息只会传送一次,不会丢也不会重复 | 幂等 + 事务 + ACK = 1 |
事务流程
文件存储类型
- 数据写入
Leader
节点时,不会立即将数据写入磁盘文件中,因为这样效率太低了。 Kafka
将一个文件切分成一个一个文件段。当大于1G
时,就会产生一个新的文件段。- 如果一份数据超过7天,也会生成一个新的文件段
.log
就是数据的日志文件,当前文件中起始偏移量.index
偏移量索引文件,将偏移量与文件中的数据的具体位置进行关联,称之为稀疏索引文件.timeindex
保存时间戳和数据偏移量之间的关系
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Sangs Blog!