Kafka笔记_0
消费模式
点对点模式(P2P)
- 队列中只有一个消费者可以消费数据,用后即销毁,因此数据有且只有一次消费。
- 适合用于短信业务,发送一次,消费一次。
特点:
- 每个消息只有一个接收者
- 发送和接收之间没有依赖,发送者成功发送消息后,不管接收有没有运行,都可以再次发送消息
- 接收者在成功接收后需要向队列应答成功,以便消息队列删除当前消息
发布订阅模式(PS)
- 数据会在队列中存储7天,每个订阅都可以消费到相应的数据,可以重复进行消费数据。
- 大部分都是发布订阅模式。
特点:
- 每个消息有多个订阅者
- 发布者和订阅者之间有时间上的依赖性。针对某个Topic的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
- 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行
基础架构
Kafka不是主从集群,因此每一个节点都可以是主节点,如果一个节点宕机了,那么其他的节点可以被选举为主节点Kafka中每一个节点都称为Broker,每个节点都存在一个Kafka_controller组件- 但是只有一台节点的
controller组件是活跃状态,其他都是standby状态 - 主节点宕机时,从节点通过
ZooKeeper进行集群选举,得到新的主节点
Feed流实现方案
| 拉模式 | 推模式 | 推拉结合 | |
|---|---|---|---|
| 写比例 | 低 | 高 | 中 |
| 读比例 | 高 | 低 | 中 |
| 用户读取延迟 | 高 | 低 | 低 |
| 实现难度 | 复杂 | 简单 | 很复杂 |
| 使用场景 | 较少使用 | 用户量少,没有大V | 过千万的用户量,有大V |
消息队列应用
线程与线程之间的数据交互
- 栈内存是线程独享的,堆内存是共享的
进程与进程之间的数据交互
- 进程有自己独立的内存,无法通过堆内存共享数据
传统数据交互的问题
- 如果线程1发送数据的速度是
50 条/s,但是线程2处理数据的速度是30 条/s,就会导致每秒有20条数据积压在内存中,最后导致内存溢出,服务终止 - 如果放在磁盘文件中,磁盘文件也是有上限的,并且速度较慢
消息中间件
- 进程之间直接进行交互,可以在进程之间添加一个缓冲区,用于解耦合
- 发送数据的进程只需要考虑往缓冲区中存放数据,也就是消息中间件,从而实现降低系统之间的耦合性
- 消息一般包含消息头,消息属性和消息主体内容
Kafka基本概念
基本命令参数
- 连接
Kafka必须要的参数:--bootstrap-server ip:port(9092)9092是Kafka默认的端口号 Kafka创建新topic需要在后面跟上:--topic 主题名 --create- 查看
topic:--list - 查看某个具体的
topic:--topic 主题名 --describe - 修改
topic:--topic 主题名 --alter --修改的内容 - 删除
topic:--topic 主题名 --delete
单节点架构
- 消息称为
record App通过调用Kafka的接口实现生产和消费,会将数据保存到.log文件中,增加标记offset,固定顺序
集群架构
- 中间单个节点容易成为性能瓶颈,如果宕机,则数据无法访问
- 两种方式解决: 1. 横向扩展 2. 纵向扩展
横向扩展
- 增加服务节点,搭建集群,降低单点故障带来的问题
- 在一定程度上缓解,不是真正完全解决,因为主题在指定的
Broker上,所以依然必须要访问某一个特定的Broker - 因此真正解决需要将一个
topic分成多个部分,所有生产数据可以指向不同的Broker,但是都是同一个topic
- 在一定程度上缓解,不是真正完全解决,因为主题在指定的
纵向扩展
- 增加系统的资源配置,更快的网络,更大的硬盘
topic是一个整体,但是划分了不同的数据块- 通过添加不同的编号,即为不同的分区(
partition) - 生产者就可以将数据发向不同的
partition,消费者可以消费同一个topic不同的partition
- 通过添加不同的编号,即为不同的分区(
Kafka设置了消费者组Consumer Group,专门用来消费不同partition的消息,这样避免发送多次消费请求
- 为了数据的可靠性,可以将数据进行备份,但是
Kafka没有备份的概念,只有副本的概念。 - 多个副本同时只有一个能进行读写操作,称为
Leader副本,其他副本称为Follower副本 - 副本个数不应该超过broker数量 - 1
Kafka有一个管理的Broker,称为控制器(Controller)
如果Controller宕机了,如何解决
- 给
Controller添加备份 - 每一个
Broker都可以做备份,如果Controller宕机了,则会使用ZooKeeper选取一个新的Controller
ZooKeeper选举流程
ZooKeeper首先没有任何的节点,启动Broker1,则会产生临时节点Controller,id=1- 然后启动
Broker2和Broker3后,不能再产生临时节点,就会创建一个监听器,监听节点的变化 Controller失去连接了以后,监听器就会发生作用,选举出新的Controller
Broker启动流程
Broker1启动流程
- 在
ZooKeeper注册Broker节点,创建临时节点ids - 监听
/controller节点 - 注册
/controller节点 - 注册成功,选举称为
Controller,监听临时节点/brokers/ids,从而可以对后面加入的节点进行响应
Broker2启动流程
- 在
ZooKeeper注册Broker节点,在ids里面创建临时节点2 - 监听
/controller临时节点 - 注册
/controller临时节点,因为ZooKeeper中已经存在Controller,所以不会注册成功 - 通知集群变化
- 连接
Broker,发送集群的相关数据
Broker3启动流程:
- 注册
Broker节点,在ids里面创建临时节点3 - 监听
/controller临时节点 - 注册
/controller临时节点,注册不成功 - 通知集群变化
- 连接所有的
Broker,发送集群的相关信息
删除Controller节点(宕机):
- 由监听器通知节点删除
- 注册
Controller临时节点,只有一个节点能注册成功 - 增加
ids监听器,删除曾经的Controller监听器 - 连接所有的
Broker,发送集群的相关数据
底层实现
Broker和ZooKeeper之间的通信: 节点带有ZooKeeperClient实现通信Controller和Broker之间的通信:NIO的Channel, Buffer, Selector, SelectionKey
生产
创建主题
- 生产消息的时候如果不存在对应的主题,
Kafka会自动创建主题(可以通过配置关闭) - 要关注每个
Broker中的Leader的数量,平衡分配会更合理一些 Kafka默认的副本分配未必是平均分配,所以只能尽可能的合理分配,也可以自己指定副本的分配方案
分区计算
- 如果发送消息中指定了分区编号,则会直接向分区中发送消息,并且不会检查分区编号是否有问题。如果写了一个不存在的编号,则消息一直阻塞
- 自定义分区器实现
Partitioner,会检查编号是否小于0,但是过大依然不会检查 - 把
key做了一个非哈希散列算法murmur2,取余分区数量,得到的结果就是分区编号。所以指定的key主要用来做分区计算的
- 为了节省网络资源消耗,设计了
RecordAccumulator数据收集器,将多个消息一起发送,而不是简单的一条一条发送 - 数据收集器内部都是一些批次对象,如果批次对象的容量足够,则会自动往里面
append数据 - 如果批次对象的容量不够了,则会将批次对象锁定关闭,不再接收新的数据,开启一个新的批次对象,用于接收数据
- 批次对象中有一个
16K的阈值,不是不能超过16K,而是超过16K再关闭
应答处理(三个级别)
ACKS = 0: 优先考虑数据传送效率,数据是否真的发送过去是不关心的。假设网络是没有任何问题的,发送完一条数据立马发送下一条ACKS = 1: 可靠性和发送效率折中,Leader保存数据到磁盘后,就响应发送成功。但是如果Leader宕机了,Follower没有来得及备份,则数据无法访问ACKS = -1 or ALL: 优先考虑数据的安全性,Leader和Follower把数据都保存了,然后才响应发送成功,效率很低
- 这里的
Follower是ISR中的所有副本,而不是本身的所有副本
重试机制Retry
ACKS = 1: 如果一个数据,从主线程出发到Buffer缓冲区后到Sender线程中,发送到NetworkClient网络客户端,再发送到SocketServer服务器。如果这个时候数据没有到ReplicaManager,Leader宕机,则无法进行副本保存。此时NetworkClient客户端有一份数据,SocketServer有一份数据,但是由于宕机,导致SocketServer的数据丢失,没有保存则不会有响应,此时会有超时时间。如果超时了会进行Retry。- 将
NetworkClient的数据重新传送到Buffer中,再经过Sender线程,再回到NetworkClient进行发送 - 默认是不断重试,重试次数是整型的最大值
- 将
重试机制的问题(重复和乱序)
- 数据重试机制可能会导致数据重复(超时时间过少导致的)
Buffer有三条数据,被网络客户端发送到Broker中,生产者可同时处理五个请求,所以三个数据可以一起处理。- 但是如果三条数据中有一条处理失败了,则会重新发送那一条数据,但是此时会导致乱序问题
如何解决重复和乱序(幂等性)
- 开启幂等性: 配置
ENABLE_IDEMPOTENCE_CONFIG(默认不启用)
开启幂等性要求
ACKS = -1- 开启重试机制
- 在途请求缓冲区不能超过5(默认就是5)
- 给数据开启唯一的标识,包括两个部分,分别是生产者ID,数据的顺序号
-
不同的
partition数据可能有相同的标号,但是同一个partition内序号一定是唯一的 -
所以幂等性只能保证对同一个
partition起作用 -
Broker可以保存生产者状态,里面只有五条数据。如果是同一个Producer,同一个partition,就可以对比唯一标识判断数据是否重复 -
乱序是判断当前的序号是否连续
- 如果6失效了,会尝试将7插入进去,发现前面是5后面是7,不连续,则6和7都会重发
- 如果5后面的6正常,则会将1弹出后,再将6传入,保证数据按序输出
- 这个队列的大小为五,所以要求在途请求缓冲区不能超过5
幂等性跨会话问题
- 生产者ID是随机生成的,如果生产者重启了,则生产者ID也会发生改变。此时比对序号的操作时会发现比对不成功
- 可以通过事务解决
- 数据唯一标识中添加事务ID,事务是基于幂等性操作,保证一次提交。因此事务可以解决跨会话的幂等性问题,但是不能解决跨分区的问题
-
数据传输语义
| 数据传输语义 | 含义 | 场景 |
|---|---|---|
| at most once | 最多一次,不管是否能够接收到,数据最多传输一次,可能丢失 | Socket ACK = 0 |
| at least once | 最少一次,数据不会丢失,如果接收不到,则继续发送直到接收,可能会出现数据重复 | ACK = 1 |
| exactly once | 精准一次,消息只会传送一次,不会丢也不会重复 | 幂等 + 事务 + ACK = 1 |
事务流程
文件存储类型
- 数据写入
Leader节点时,不会立即将数据写入磁盘文件中,因为这样效率太低了。 Kafka将一个文件切分成一个一个文件段。当大于1G时,就会产生一个新的文件段。- 如果一份数据超过7天,也会生成一个新的文件段
.log就是数据的日志文件,当前文件中起始偏移量.index偏移量索引文件,将偏移量与文件中的数据的具体位置进行关联,称之为稀疏索引文件.timeindex保存时间戳和数据偏移量之间的关系
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Sangs Blog!









