消费模式

点对点模式(P2P)

  • 队列中只有一个消费者可以消费数据,用后即销毁,因此数据有且只有一次消费。
  • 适合用于短信业务,发送一次,消费一次。

    特点:
    - 每个消息只有一个接收者
    - 发送和接收之间没有依赖,发送者成功发送消息后,不管接收有没有运行,都可以再次发送消息
    - 接收者在成功接收后需要向队列应答成功,以便消息队列删除当前消息

    p2p

发布订阅模式(PS)

  • 数据会在队列中存储7天,每个订阅都可以消费到相应的数据,可以重复进行消费数据。
  • 大部分都是发布订阅模式。

    特点:
    - 每个消息有多个订阅者
    - 发布者和订阅者之间有时间上的依赖性。针对某个Topic的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
    - 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行

    ps

基础架构

  • Kafka不是主从集群,因此每一个节点都可以是主节点,如果一个节点宕机了,那么其他的节点可以被选举为主节点
  • Kafka中每一个节点都称为Broker,每个节点都存在一个Kafka_controller组件
  • 但是只有一台节点的controller组件是活跃状态,其他都是standby状态
  • 主节点宕机时,从节点通过ZooKeeper进行集群选举,得到新的主节点

Feed流实现方案

拉模式 推模式 推拉结合
写比例
读比例
用户读取延迟
实现难度 复杂 简单 很复杂
使用场景 较少使用 用户量少,没有大V 过千万的用户量,有大V

消息队列应用

线程与线程之间的数据交互

  • 栈内存是线程独享的,堆内存是共享的

threaddata

进程与进程之间的数据交互

  • 进程有自己独立的内存,无法通过堆内存共享数据

processdata

传统数据交互的问题

  • 如果线程1发送数据的速度是50 条/s,但是线程2处理数据的速度是30 条/s,就会导致每秒有20条数据积压在内存中,最后导致内存溢出,服务终止
  • 如果放在磁盘文件中,磁盘文件也是有上限的,并且速度较慢

消息中间件

  • 进程之间直接进行交互,可以在进程之间添加一个缓冲区,用于解耦合
  • 发送数据的进程只需要考虑往缓冲区中存放数据,也就是消息中间件,从而实现降低系统之间的耦合性
  • 消息一般包含消息头,消息属性和消息主体内容
    JMS

Kafka基本概念

基本命令参数

  • 连接Kafka必须要的参数: --bootstrap-server ip:port(9092) 9092Kafka默认的端口号
  • Kafka创建新topic需要在后面跟上: --topic 主题名 --create
  • 查看topic: --list
  • 查看某个具体的topic: --topic 主题名 --describe
  • 修改topic: --topic 主题名 --alter --修改的内容
  • 删除topic: --topic 主题名 --delete

单节点架构

kafka

  • 消息称为record
  • App通过调用Kafka的接口实现生产和消费,会将数据保存到.log文件中,增加标记offset,固定顺序

集群架构

arch

  • 中间单个节点容易成为性能瓶颈,如果宕机,则数据无法访问
  • 两种方式解决: 1. 横向扩展 2. 纵向扩展

横向扩展

  • 增加服务节点,搭建集群,降低单点故障带来的问题
    horizontal
    • 一定程度上缓解,不是真正完全解决,因为主题在指定的Broker上,所以依然必须要访问某一个特定的Broker
    • 因此真正解决需要将一个topic分成多个部分,所有生产数据可以指向不同的Broker,但是都是同一个topic

纵向扩展

  • 增加系统的资源配置,更快的网络,更大的硬盘
    • topic是一个整体,但是划分了不同的数据块
      • 通过添加不同的编号,即为不同的分区(partition)
      • 生产者就可以将数据发向不同的partition,消费者可以消费同一个topic不同的partition
    • Kafka设置了消费者组Consumer Group,专门用来消费不同partition的消息,这样避免发送多次消费请求
  • 为了数据的可靠性,可以将数据进行备份,但是Kafka没有备份的概念,只有副本的概念。
  • 多个副本同时只有一个能进行读写操作,称为Leader副本,其他副本称为Follower副本
  • 副本个数不应该超过broker数量 - 1

Kafka有一个管理的Broker,称为控制器(Controller)

如果Controller宕机了,如何解决

  1. Controller添加备份
  2. 每一个Broker都可以做备份,如果Controller宕机了,则会使用ZooKeeper选取一个新的Controller

jiqun

ZooKeeper选举流程

  1. ZooKeeper首先没有任何的节点,启动Broker1,则会产生临时节点Controllerid=1
  2. 然后启动Broker2Broker3后,不能再产生临时节点,就会创建一个监听器,监听节点的变化
  3. Controller失去连接了以后,监听器就会发生作用,选举出新的Controller

Broker启动流程

Broker1启动流程

  1. ZooKeeper注册Broker节点,创建临时节点ids
  2. 监听/controller节点
  3. 注册/controller节点
  4. 注册成功,选举称为Controller,监听临时节点/brokers/ids,从而可以对后面加入的节点进行响应

Broker2启动流程

  1. ZooKeeper注册Broker节点,在ids里面创建临时节点2
  2. 监听/controller临时节点
  3. 注册/controller临时节点,因为ZooKeeper中已经存在Controller,所以不会注册成功
  4. 通知集群变化
  5. 连接Broker,发送集群的相关数据

Broker3启动流程:

  1. 注册Broker节点,在ids里面创建临时节点3
  2. 监听/controller临时节点
  3. 注册/controller临时节点,注册不成功
  4. 通知集群变化
  5. 连接所有的Broker,发送集群的相关信息

删除Controller节点(宕机):

  1. 由监听器通知节点删除
  2. 注册Controller临时节点,只有一个节点能注册成功
  3. 增加ids监听器,删除曾经的Controller监听器
  4. 连接所有的Broker,发送集群的相关数据

底层实现

  1. BrokerZooKeeper之间的通信: 节点带有ZooKeeperClient实现通信
  2. ControllerBroker之间的通信: NIOChannel, Buffer, Selector, SelectionKey

生产

创建主题

  • 生产消息的时候如果不存在对应的主题,Kafka会自动创建主题(可以通过配置关闭)
  • 要关注每个Broker中的Leader的数量,平衡分配会更合理一些
  • Kafka默认的副本分配未必是平均分配,所以只能尽可能的合理分配,也可以自己指定副本的分配方案

分区计算

  1. 如果发送消息中指定了分区编号,则会直接向分区中发送消息,并且不会检查分区编号是否有问题。如果写了一个不存在的编号,则消息一直阻塞
  2. 自定义分区器实现Partitioner,会检查编号是否小于0,但是过大依然不会检查
  3. key做了一个非哈希散列算法murmur2,取余分区数量,得到的结果就是分区编号。所以指定的key主要用来做分区计算的
  • 为了节省网络资源消耗,设计了RecordAccumulator数据收集器,将多个消息一起发送,而不是简单的一条一条发送
  • 数据收集器内部都是一些批次对象,如果批次对象的容量足够,则会自动往里面append数据
  • 如果批次对象的容量不够了,则会将批次对象锁定关闭,不再接收新的数据,开启一个新的批次对象,用于接收数据
  • 批次对象中有一个16K的阈值,不是不能超过16K,而是超过16K再关闭

produce

应答处理(三个级别)

  1. ACKS = 0: 优先考虑数据传送效率,数据是否真的发送过去是不关心的。假设网络是没有任何问题的,发送完一条数据立马发送下一条
  2. ACKS = 1: 可靠性和发送效率折中,Leader保存数据到磁盘后,就响应发送成功。但是如果Leader宕机了,Follower没有来得及备份,则数据无法访问
  3. ACKS = -1 or ALL: 优先考虑数据的安全性,LeaderFollower把数据都保存了,然后才响应发送成功,效率很低
  • 这里的FollowerISR中的所有副本,而不是本身的所有副本

重试机制Retry

  • ACKS = 1: 如果一个数据,从主线程出发到Buffer缓冲区后到Sender线程中,发送到NetworkClient网络客户端,再发送到SocketServer服务器。如果这个时候数据没有到ReplicaManagerLeader宕机,则无法进行副本保存。此时NetworkClient客户端有一份数据,SocketServer有一份数据,但是由于宕机,导致SocketServer的数据丢失,没有保存则不会有响应,此时会有超时时间。如果超时了会进行Retry
    1. NetworkClient的数据重新传送到Buffer中,再经过Sender线程,再回到NetworkClient进行发送
    2. 默认是不断重试,重试次数是整型的最大值

重试机制的问题(重复和乱序)

  1. 数据重试机制可能会导致数据重复(超时时间过少导致的)
  2. Buffer有三条数据,被网络客户端发送到Broker中,生产者可同时处理五个请求,所以三个数据可以一起处理。
    • 但是如果三条数据中有一条处理失败了,则会重新发送那一条数据,但是此时会导致乱序问题

如何解决重复和乱序(幂等性)

  • 开启幂等性: 配置ENABLE_IDEMPOTENCE_CONFIG(默认不启用)

开启幂等性要求

  • ACKS = -1
  • 开启重试机制
  • 在途请求缓冲区不能超过5(默认就是5)
  • 给数据开启唯一的标识,包括两个部分,分别是生产者ID,数据的顺序号
    • 不同的partition数据可能有相同的标号,但是同一个partition内序号一定是唯一的

    • 所以幂等性只能保证对同一个partition起作用

    • Broker可以保存生产者状态,里面只有五条数据。如果是同一个Producer,同一个partition,就可以对比唯一标识判断数据是否重复

    • 乱序是判断当前的序号是否连续

      • 如果6失效了,会尝试将7插入进去,发现前面是5后面是7,不连续,则6和7都会重发
      • 如果5后面的6正常,则会将1弹出后,再将6传入,保证数据按序输出
      • 这个队列的大小为五,所以要求在途请求缓冲区不能超过5

      幂等性跨会话问题

      • 生产者ID是随机生成的,如果生产者重启了,则生产者ID也会发生改变。此时比对序号的操作时会发现比对不成功
      • 可以通过事务解决
      • 数据唯一标识中添加事务ID,事务是基于幂等性操作,保证一次提交。因此事务可以解决跨会话的幂等性问题,但是不能解决跨分区的问题

数据传输语义

数据传输语义 含义 场景
at most once 最多一次,不管是否能够接收到,数据最多传输一次,可能丢失 Socket ACK = 0
at least once 最少一次,数据不会丢失,如果接收不到,则继续发送直到接收,可能会出现数据重复 ACK = 1
exactly once 精准一次,消息只会传送一次,不会丢也不会重复 幂等 + 事务 + ACK = 1

事务流程

tx

文件存储类型

  • 数据写入Leader节点时,不会立即将数据写入磁盘文件中,因为这样效率太低了。
  • Kafka将一个文件切分成一个一个文件段。当大于1G时,就会产生一个新的文件段。
  • 如果一份数据超过7天,也会生成一个新的文件段
  • .log就是数据的日志文件,当前文件中起始偏移量
  • .index偏移量索引文件,将偏移量与文件中的数据的具体位置进行关联,称之为稀疏索引文件
  • .timeindex保存时间戳和数据偏移量之间的关系