`
maosheng
  • 浏览: 550671 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Apache kafka 介绍

阅读更多
kafka是一种高吞吐量的分布式发布订阅消息系统,它有如下特性:

    通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

    高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。

    支持通过kafka服务器和消费机集群来分区消息。

    支持Hadoop并行数据加载。

kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

Kafka的架构:





Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。

一个典型的 Kafka 体系架构包括若干 Producer(消息生产者),若干broker(作为 Kafka 节点的服务器),若干Consumer(Group),以及一个 ZooKeeper 集群。Kafka通过 ZooKeeper 管理集群配置、选举 Leader 以及在 consumer group 发生变化时进行 Rebalance(即消费者负载均衡)。Producer 使用 push(推)模式将消息发布到 broker,Consumer 使用 pull(拉)模式从 broker 订阅并消费消息。

在 Kafka 架构中,有以下术语:

•Producer:生产者,即消息发送者,push 消息到 Kafka 集群中的 broker(就是 server)中;

•Broker:Kafka节点,一个Kafka节点就是一个broker,Kafka 集群由多个 Kafka 实例(server) 组成,每个实例构成一个 broker,说白了就是服务器;

•Topic:producer 向 kafka 集群 push 的消息会被归于某一类别,即Topic,这本质上只是一个逻辑概念,面向的对象是 producer 和 consumer,producer 只需要关注将消息 push 到哪一个 Topic 中,而 consumer 只需要关心自己订阅了哪个 Topic;

•Partition:每一个 Topic 又被分为多个 Partitions,即物理分区;出于负载均衡的考虑,同一个 Topic 的 Partitions 分别存储于 Kafka 集群的多个 broker 上;而为了提高可靠性,这些 Partitions 可以由 Kafka 机制中的 replicas 来设置备份的数量;如上面的框架图所示,每个 partition 都存在两个备份;

•Consumer:消费者,从 Kafka 集群的 broker 中 pull 消息、消费消息;

•Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer-group,每条消息只能被 consumer-group 中的一个 Consumer 消费,但可以被多个 consumer-group 消费;

•replicas:partition 的副本,保障 partition 的高可用;

•leader:replicas 中的一个角色, producer 和 consumer 只跟 leader 交互;

•follower:replicas 中的一个角色,从 leader 中复制数据,作为副本,一旦 leader 挂掉,会从它的 followers 中选举出一个新的 leader 继续提供服务;

•controller:Kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover;

•ZooKeeper:Kafka 通过 ZooKeeper 来存储集群的 meta 信息等


Topic 只是逻辑概念,面向的是 producer 和 consumer;而 Partition 则是物理概念。可以想象,如果 Topic 不进行分区,而将 Topic 内的消息存储于一个 broker,那么关于该 Topic 的所有读写请求都将由这一个 broker 处理,吞吐量很容易陷入瓶颈,这显然是不符合高吞吐量应用场景的。有了 Partition 概念以后,假设一个 Topic 被分为 10 个 Partitions,Kafka 会根据一定的算法将 10 个 Partition 尽可能均匀的分布到不同的 broker(服务器)上,当 producer 发布消息时,producer 客户端可以采用 random、key-hash 及 轮询 等算法选定目标 partition,若不指定,Kafka 也将根据一定算法将其置于某一分区上。Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。


消息发送的流程:





1.Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
2.kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长,而不关注消息是否被消费。
3.Consumer从kafka集群pull数据,并控制获取消息的offset。


Kafka的设计:

吞吐量:

高吞吐是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:
    1.数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
    2.zero-copy:减少IO操作步骤
    3.数据批量发送
    4.数据压缩
    5.Topic划分为多个partition,提高parallelism

负载均衡:

    1.producer根据用户指定的算法,将消息发送到指定的partition
    2.存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
    3.多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
    4.通过zookeeper管理broker与consumer的动态加入与离开

拉取系统:

由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:
    1.简化kafka设计
    2.consumer根据消费能力自主控制消息拉取速度
    3.consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等

可扩展性:

当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。

Kafka的应用场景:

1.消息队列

比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ。

2.行为跟踪

Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

3.元信息监控

作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4.日志收集

日志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5.流处理

这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom和Samza是非常著名的实现这种类型数据转换的框架。

6.事件源

事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7.持久性日志(commit log)

Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。


发送消息的主要步骤:

首先创建ProducerRecord对象,此对象除了包括需要发送的数据value之外还必须指定topic,另外也可以指定key和分区。当发送ProducerRecord的时候,生产者做的第一件事就是把key和value序列化为ByteArrays,以便它们可以通过网络发送。

接下来,数据会被发送到分区器。如果在ProducerRecord中指定了一个分区,那么分区器会直接返回指定的分区;否则,分区器通常会基于ProducerRecord的key值计算出一个分区。一旦分区被确定,生产者就知道数据会被发送到哪个topic和分区。然后数据会被添加到同一批发送到相同topic和分区的数据里面,一个单独的线程会负责把那些批数据发送到对应的brokers。

当broker接收到数据的时候,如果数据已被成功写入到Kafka,会返回一个包含topic、分区和偏移量offset的RecordMetadata对象;如果broker写入数据失败,会返回一个异常信息给生产者。当生产者接收到异常信息时会尝试重新发送数据,如果尝试失败则抛出异常。


生产者配置属性

发送数据到Kafka的第一步是创建一个生产者,必须指定以下三个属性:

bootstrap.servers:生产者用于与Kafka集群建立初始连接的主机和端口的列表。该列表不需要包括所有的brokers信息,因为生产者在建立连接后能够获取所有brokers的信息。但建议至少包含两个,防止一个broker宕机,生产者仍然能够通过另外一个broker连接到群集。

key.serializer:用于序列化keys的类名。Kafka brokers期待key和value的类型为byte数组,但是也允许使用参数化的Java对象作为key和value。这使得代码非常易读,但也意味着生产者必须知道如何把这些对象转换为byte数组。key.serializer应设为实现了org.apache.kafka.common.serialization.Serializer接口的类名,生产者将会使用这个类来把key对象序列化为byte数组。Kafka内置实现了ByteArraySerializer、StringSerializer和IntegerSerializer。注意,即使生产者发送的数据没有指定key,也必须设置key.serializer这个属性。

value.serializer:用于序列化value的类名。类似于key.serializer,生产者将会使用指定的类来把value对象序列化为byte数组。


生产者有很多配置属性,除了上述的三个之外,下面是一些比较重要的属性:

1 acks

此配置设置在生产者可以认为发送请求完成之前,有多少分区副本必须接收到数据。此选项对消息可能丢失的可能性有重大影响,此配置有三个允许的值,默认为1:

acks=0,生产者不会等待broker的任何确认,消息会被立即添加到缓冲区并被认为已经发送。在这种情况下,不能保证服务器已经收到消息,并且重试配置不会生效(因为客户端通常不会知道任何异常),每条消息返回的偏移量始终设置为-1。由于生产者不等待broker的任何确认,因此它可以以网络支持的最快速度发送消息,所以这个配置适用于实现非常高的吞吐量。
acks=1,在leader服务器的副本收到消息的同一时间,生产者会接收到broker的确认。如果消息不能写入leader的副本(例如,如果leader宕机并且还没有选出新的leader),生产者将接收到异常响应,然后可以重新发送消息,避免丢失数据。如果leader宕机并且消息没有被写入到新的leader(通过不确定的leader选举),该消息仍然会丢失。在这种情况下,吞吐量取决于消息是同步还是异步发送。如果我们的客户端等待服务器的回复(通过上述的调用发送消息时返回的Future对象的get()方法),它明显会显著地增加延迟(至少通过网络往返)。如果客户端使用callback,则延迟不会那么明显,但吞吐量将受到正在发送消息数量的限制(例如,在接收到响应之前生产者将会发送多少消息)。
acks=all(或-1),一旦所有的同步副本接收到消息,生产者才会接收到broker的确认。这是最安全的模式,因为可以确保多于一个的broker接收到该消息,即使在宕机的情况下,该消息也能被保存。然而,延迟性会比acks=1的时候更高,因为需要等待所有broker接收到消息。

2 buffer.memory

此配置设置生产者可用于缓冲等待发送给brokers消息的总内存字节数,默认为33554432=32MB。如果消息发送到缓存区的速度比发送到broker的速度快,那么生产者会被阻塞(根据max.block.ms配置的时间,默认为60000ms=1分钟,在0.9.0.0版本之前使用block.on.buffer.full配置),之后会抛出异常。

3 compression.type

生产者对生成的所有数据使用的压缩类型,默认值是none(即不压缩),有效值为none,gzip,snappy或lz4。Snappy压缩技术是Google开发的,它可以在提供较好的压缩比的同时,减少对CPU的使用率并保证好的性能,所以建议在同时考虑性能和带宽的情况下使用。Gzip压缩技术通常会使用更多的CPU和时间,但会产生更好的压缩比,所以建议在网络带宽更受限制的情况下使用。通过启用压缩功能,可以减少网络利用率和存储空间,这往往是向Kafka发送消息的瓶颈。

4 retries

默认值为0,当设置为大于零的值,客户端会重新发送任何发送失败的消息。注意,此重试与客户端收到错误时重新发送消息是没有区别的。在配置max.in.flight.requests.per.connection不等于1的情况下,允许重试可能会改变消息的顺序,因为如果两个批次的消息被发送到同一个分区,第一批消息发送失败但第二批成功,而第一批消息会被重新发送,则第二批消息会先被写入。

5 batch.size

当多个消息被发送到同一个分区时,生产者会把它们一起处理。此配置设置用于每批处理使用的内存字节数,默认为16384=16KB。当使用的内存满的时候,生产者会发送当前批次的所有消息。但是,这并不意味着生产者会一直等待使用的内存变满,根据下面linger.ms配置的时间也会触发消息发送。设置较小的值会增加发送的频率,从而可能会减少吞吐量;设置较大的值会使用较多的内存,设置为0会关闭批处理的功能。

6 linger.ms

此配置设置在发送当前批次消息之前等待新消息的时间量,默认值为0。KafkaProducer会在当前批次使用的内存已满或等待时间到达linger.ms配置时间的时候发送消息。当linger.ms>0时,延时性会增加,但会提高吞吐量,因为会减少消息发送频率。

7 client.id

用于标识发送消息的客户端,通常用于日志和性能指标以及配额。

8 max.in.flight.requests.per.connection

此配置设置客户端在单个连接上能够发送的未确认请求的最大数量,默认为5,超过此数量会造成阻塞。设置大的值可以提高吞吐量但会增加内存使用,但是需要注意的是,当设置值大于1而且发送失败时,如果启用了重试配置,有可能会改变消息的顺序。设置为1时,即使重新发送消息,也可以保证发送的顺序和写入的顺序一致。

9 request.timeout.ms

此配置设置客户端等待请求响应的最长时间,默认为30000ms=30秒,如果在这个时间内没有收到响应,客户端将重发请求,如果超过重试次数将抛异常。此配置应该比replica.lag.time.max.ms(broker配置,默认10秒)大,以减少由于生产者不必要的重试造成消息重复的可能性。

10 max.block.ms

当发送缓冲区已满或者元数据不可用时,生产者调用send()和partitionsFor()方法会被阻塞,默认阻塞时间为60000ms=1分钟。由于使用用户自定义的序列化器和分区器造成的阻塞将不会计入此时间。

11 max.request.size

此配置设置生产者在单个请求中能够发送的最大字节数,默认为1048576字节=1MB。例如,你可以发送单个大小为1MB的消息或者1000个大小为1KB的消息。注意,broker也有接收消息的大小限制,使用的配置是message.max.bytes=1000012字节(好奇怪的数字,约等于1MB)。

12 receive.buffer.bytes和send.buffer.bytes

receive.buffer.bytes:读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小,默认值为32768字节=32KB。如果设置为-1,则将使用操作系统的默认值。
send.buffer.bytes:发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小,默认值为131072字节=128KB。如果设置为-1,则将使用操作系统的默认值。







  • 大小: 86.3 KB
  • 大小: 77 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics