网站首页 > 技术文章 正文
相关术语
Message: 一个key/value 对,在value部分包含有用的数据/记录。
Topic: 对于多租户,可以创建多个主题,这些主题只是发布和订阅消息的提要名称。
Offset: 消息以日志提交的顺序存储,并且从0开始为每个消息提供顺序id。
Broker: Kafka集群由代理组成,这些代理只是集群中的节点,承载由zookeeper维护的无状态服务器。由于这里没有主从概念,所以所有代理都是对等的。在继续之前,让我们先了解一下动物管理员。
什么是zookeeper,kafka集群为什么使用zookeeper
Zookeeper是一个分布式集群管理系统。它是一个分布式键值存储集群。它对读做了高度优化,但是写入速度比较慢。它一般由奇数个znode组成,称为集合。在kafka中,zookeeper有如下作用:
1. 控制选举: 对于特定topic,来自分区的所有读和写都是通过副本的leader进行的。如果leader出现crash,zookeeper就会选出一个新的leader。
2. 配置topics: 与topic相关的元数据,无论特定topic是否位于broker中,有多少分区等等,都存储在zookeeper端,并且无论何时生成消息,这些元数据都是实时同步的。
3. topic的访问控制列表(ACL)由zookeeper维护。
为什么选择kafka
kafka的一些主要的特性,相对于传统的消息系统,更加受到欢迎:
1. 高吞吐: 吞吐量表示每秒可处理的消息数量(消息速率)。由于可以将topic划分到不同的broker上,因此每秒可以实现数千次读写。
2. 分布式: 分布式系统是一个被分割成多个正在运行的机器的系统上,所有这些机器在一个集群中一起工作,在最终用户面前显示为一个节点。Kafka是分布式的,它在称为broker的几个节点上存储、读取和写入数据,broker和Zookeeper一起创建了一个称为Kafka集群的分布式系统。
3. 持久化: 消息队列完全保存在磁盘上,而不是保存在内存中,相同数据的多个副本/副本称为ISR (in-sync replica)可以跨不同节点存储。因此,不存在由于故障转移场景而导致数据丢失的机会,并使其持久存储。
4. 可伸缩性: 任何系统都可以水平或垂直伸缩。垂直可伸缩性意味着向相同的节点添加更多的资源,如CPU、内存,并不会带来很高的操作成本。只需在集群中添加几个节点就可以实现水平可伸缩性,这将增加容量需求。Kafka水平伸缩意味着我们可以在集群中添加一个新的节点/代理,当我们快要耗尽容量/空间。
5. 容错能力: 如果我们有n个主题,每个主题都有m个分区,那么如果我们将复制因子设置为q,那么所有的n*m个分区都将被复制到q代理上。因此,使其容忍度为q-1,即我们可以承受q-1代理节点的失败。复制因子应该始终小于或等于代理的数量,因为违反此条件将在单个代理上拥有相同副本的两个副本,这是没有意义的。
note: Kafka默认情况下保证至少一次交付,并允许用户通过禁用对生成器的重试和在处理一批消息之前提交其偏移量来实现最多一次交付。
Hands-on
在此下载http://kafka.apache.org/downloads kafaka scala库。这个库里面也有zookeeper,所以不需要单独下载。
下载完毕后解压,然后设置HOME_DIR = kafka_2.12–2.3.0/
首先要启动zookeeper,请在HOME_DIR/config/zookeeper.properties中设置好dataDir和port。
./bin/zookeeper-server-start.sh ../config/zookeeper.properties
启动后zookeeper的默认端口是2181。然后在启动kafka server。Kafka的配置文件在HOME_DIR/config/server.properties。做如下修改:
broker.id=101 listeners=PLAINTEXT:localhost:9091 logldirs=some_log_dir/kafka-logs-1
然后启动:
./bin/kafka-servert-start.sh ../config/server.properties
kafaka的默认端口是9091。进入client,然后做一下小测试。
Creating a topic: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 partitions 1 --topic topic1 Describing a particular topic: bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic1 Deleting a topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topic1
注意:分区一旦设置为一个特定的值就不能减少,只能增加。
生产者如何写消息?
Kafka为了知道那个message需要更新,首先要获取这个topic的元数据信息。元数据也存储在broker中,并且与zookeeper保持实时同步,因为zookeeper的node数要比broker少。因此,许多生产者希望连接到zookeeper来访问元数据,结果性能下降。现在,一旦生产者获得关于topic和partition的元数据,它就会将消息写入leader的 broker节点的日志中,然后follwers(ISR)将其复制。
这个写操作可以是同步的。关于消息确认的状态只有在follwers也将消息复制到他们的日志中时才会返回给生产者。保留期:磁盘上的消息可以保存一段特定的时间,称为保留期,在此期间之后,将自动清除旧消息,不再供使用。默认设置为7天。
信息可以用三种策略写在一个主题上:
a. send(key, value, topic, partition):专门提供需要进行写操作的分区。不鼓励使用这种方法,因为它可能造成分区大小的不平衡。
b. send(key, value, topic):在这里,使用默认的HashPartitioner来确定消息将在哪个分区中编写,方法是找到key的哈希值,然后使用mod对其进行取模来进行这个主题的划分。我们也可以自定义分区方式。
c. ull, value, topic):在本例中,消息以循环方式存储在所有分区中。
Hands-on:
Creating a console producer: bin/kafka-console-producer.sh --broker-list localhost:9091 --topic topic1 Changing retention period for a topic to 10sec: bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name topic1 --add-config retention.ms=10000
生产者可以以批处理的形式发送消息,以提高效率。一旦批处理达到特定的大小限制,就将其转储到队列一次。但是,偏移量只对所有单独的消息进行顺序调整,并在传递给消费者API之前在消费者端进行调整。
Producer API
Kafka producer是一个应用程序,它可以作为Kafka集群中的数据源。生产者可以使用Kafka jar文件/依赖项提供的API将消息发布到一个或多个Kafka主题。在发送消息之前,需要设置一个属性对象,其中包含存储消息的配置。主要类ProducerRecord, KafkaProducer 。
简单样例
Producer<String, String> producer = new KafkaProducer <>(createProperties()); ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "SyncKey", "SyncMessage"); try { RecordMetadata recordMetadata = producer.send(record).get(); System.out.println("Message is sent to Partition no " + recordMetadata.partition() + " and offset " + recordMetadata.offset()); } catch (Exception e) { System.out.println("Exception--> "+ e); } finally{ producer.close(); } } private static Properties createProperties() { Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9091,localhost:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return properties; }
消费者如何检索到消息?
消费者检索消息和生产者写入消息方式相同,生产者通过查找元数据并从leader分区读取消息。由于Kafka速度非常快,可以获得实时消息,因此单个消费者在从一个名为"消费者滞后"的topic中读取一个很大的chunk时肯定会有延迟。
为了克服这个问题,可以创建一个消费者组,其中包含多个具有相同组id的消费者。将分区分配给特定的使用者是组协调器的职责——集群中的代理之一被指定担任此角色。为了管理活动使用者列表,组的所有使用者都将心跳发送给组协调器。组中的使用者数量应该小于或等于该特定主题中的分区数量,违反此条件将导致使用者处于空闲状态。
Hands-on
Creating a simple Consumer: bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1 Creating consumer to read from particular offset and partition: bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1 --offset 0 --partition 1 Creating a consumer group: bin/kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic topic1 --group groupName Multiple process/consumer of the above groupName can be created which is called Consumer Group.
不止一个消费者可以同时读取一个topic。现在,为了记住到哪一个偏移量是特定读的,提供了一个名为consumer offset的存储变量,它是一个隐藏的topic-_consumer_offsets,用于存储特定组的使用者读取的分区的最后偏移量。
使用者偏移量的键值为 ->[组Id、主题、分区]和- >[偏移量,…]
Consumer PAI
与producer API类似,Kafka提供了连接到引导服务器并获取消息的类。当传递非标准数据类型的消息时,需要编写反序列化器。
Kafka面向生产者和消费者的Java API可以在这里找到:
kafka有多快?
Kafka遵循一定的策略,这是其设计的一部分,使它执行得更好更快。
1. 无随机磁盘访问:它使用一种称为不可变队列的顺序数据结构,其中读写操作总是常数时间O(1)。它将消息追加到末尾,并从开头或特定偏移量读取。
2. 顺序I/O:现代操作系统将大部分空闲内存分配给磁盘缓存,并且存储和检索顺序数据的速度更快。
3. 零拷贝:磁盘中的数据没有必要加载到应用程序内存中,因为它根本没有被修改。因此,它不是将数据加载到应用程序,而是通过套接字、NIC缓冲区将相同的数据从内核上下文缓冲区发送到网络。
4. 消息批处理:将多个消息分组在一起,以避免多个网络调用。
5. 消息压缩:在通过网络传输消息之前,使用gzip、snappy等压缩算法对消息进行压缩,并在消费者API层进行解压缩。
数据如何存储在broker实例/物理磁盘上?
在启动Kafka服务器之前,broker中的所有消息都存储在配置文件中配置的日志目录(log-dir-1)中。在该目录中,可以找到包含特定topic分区的文件夹,其格式为topic_name-partition_number,例如topic1-0。_consumer_offsets主题也存储在相同的日志目录中。
在特定topic的分区目录中,Kafka段文件0000-00.log,索引文件0000-00.iondex和时间索引00:00 – 00. Timeindex可以被找到 。所有属于该分区的数据都被写入一个活动段中,当旧段大小或时间限制达到时,将创建一个新的段文件。索引将每个偏移量映射到其消息在日志中的位置。由于偏移量是顺序的二进制搜索,所以使用搜索来查找日志文件中特定偏移量的数据索引。
Log Compacted Topics
重复的键被标记为要从段文件中删除。这里的值可以通过将空值传递给特定键来更新和删除。
感谢阅读,喜欢的可以关注加收藏。
猜你喜欢
- 2024-10-25 使用自带zookeeper安装单机kafka(适合刚接触kafka的同学)
- 2024-10-25 不用 zookeeper 照样部署kafka集群
- 2024-10-25 Kafka最全详解(图文全面总结) kafka基础知识
- 2024-10-25 kafka和zookeeper在k8s集群踩的一些坑
- 2024-10-25 Kafka 2.8独立运行,不再需要ZooKeeper
- 2024-10-25 告别 ZooKeeper:Kafka 将提供自管理的元数据仲裁机制
- 2024-10-25 Kafka架构最全详解(图文全面总结)
- 2024-10-25 Zookeeper集群搭建及原理 zookeeper集群主要有哪三种角色
- 2024-10-25 Kafka架构和原理机制(图文全面详解)
- 2024-10-25 玩了分布式这么久,你不会连Kafka都不清楚吧
你 发表评论:
欢迎- 最近发表
-
- 吴谨言专访大反转!痛批耍大牌后竟翻红,六公主七连发力显真诚
- 港股2月28日物业股涨幅榜:CHINAOVSPPT涨1.72%位居首位
- 港股2月28日物业股午盘:CHINAOVSPPT涨1.72%位居首位
- 港股3月2日物业股涨幅榜:CHINAOVSPPT涨1.03%位居首位
- 港股3月2日物业股午盘:CHINAOVSPPT涨1.03%
- 天赋与心痛的背后:邓鸣贺成长悲剧引发的深刻反思
- 冯小刚女儿徐朵追星范丞丞 同框合照曝光惹人羡,回应网友尽显亲民
- “资本大佬”王冉:51岁娶小17岁童瑶,并承诺余生为娇妻保驾护航
- 港股3月2日物业股午盘:CHINAOVSPPT涨1.03%位居首位
- 「IT之家开箱」vivo S15 图赏:双镜云窗,盛夏风光
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)