摘要
与生产者对应的是消费者,应用程序可以通过KafkaConsumer 来订阅主题,并从订阅的主题中拉取消息。不过在使用KafkaConsumer消费消息之前需要先了解消费者和消费组的概念,否则无法理解如何使用KafkaConsumer。本章首先讲解消费者与消费组之间的关系,进而再细致地讲解如何使用KafkaConsumer。
消费者与消费组
消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。
对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point〉模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题〈Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合:
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。(群组协调器)
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。
消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。
消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。
客户端开发
一个正常的消费逻辑需要具备以下几个步骤:
- (1)配置消费者客户端参数及创建相应的消费者实例。
- (2)订阅主题。
- (3)拉取消息并消费。
- (4)提交消费位移。
- (5)关闭消费者实例。
必要的参数配置
- bootstrap.servers:该参数的释义和生产者客户端KafkaProducer 中的相同,用来指定连接Kafka集群所需的 broker地址清单,具体内容形式为host1 :port1 , host2 : post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的 broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的 broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上。有关此参数的更多释义可以参考6.5.2节。
- group.id:消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常: Exception in thread “main”org.apache.kafka.common.errors.InvalidGroupldException;The configured groupld is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。
- key.deserializer 和 value.deserializer:与生产者客户端KafkaProducer中的key.serializer和 value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和 value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如示例中的org.apache.kafka.common.serialization.StringDeserializer,单单指定StringDeserializer是错误的。有关更多的反序列化内容可以参考3.2.3节。
订阅主题与分区
在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题,代码清单3-1中我们使用subscribe()方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe的几个重载方法如下:
通过subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过 assign()方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从assign()方法的参数中就可以看出端倪,两种类型的subscribe()都有ConsumerRebalanceListener类型参数的方法,而 assign()方法却没有。
反序列化
在2.1.3节中我们讲述了KafkaProducer对应的序列化器,那么与此对应的KafkaConsumer就会有反序列化器。Kafka 所提供的反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于ByteBuffer、 ByteArray、Bytes、Double、Float、Integer、Long、Short 及 String类型的反序列化,这些序列化器也都实现了Deserializer 接口,与KafkaProducer 中提及的Serializer 接口一样,Deserializer接口也有三个方法。
位移提交
对于Kafka 中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。单词“offset”可以翻译为“偏移量”,也可以翻译为“位移”,读者可能并没有过多地在意这一点:在很多中文资料中都会交叉使用“偏移量”和“位移”这两个词,并没有很严谨地进行区分。笔者对 offset做了一些区分:对于消息在分区中的位置,我们将offset称为“偏移量”﹔对于消费者消费到的位置,将 offset称为“位移”,有时候也会更明确地称之为“消费位移”。做这一区分的目的是让读者在遇到 offset 的时候可以很容易甄别出是在讲分区存储层面的内容,还是在讲消费层面的内容,如此也可以使“偏移量”和“位移”这两个中文词汇具备更加丰富的意义。当然,对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的,在某些不需要具体划分的场景下也可以用“消息位置”或直接用“offset”这个单词来进行表述。
在每次调用poll()方法时,它返回的是还没有被消费过的消息集((当然这个前提是消息已经存储在Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。
在旧消费者客户端中,消费位移是存储在ZooKeeper 中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题_consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。
参考图3-6的消费位移,x表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了x位置的消息,那么我们就可以说消费者的消费位移为x,图中也用了lastConsumedOffset这个单词来标识它。
不过需要非常明确的是,当前消费者需要提交的消费位移并不是x,而是 x+1,对应于图3-6中的position,它表示下一条需要拉取的消息的位置。读者可能看过一些相关资料,里面所讲述的内容可能是提交的消费位移就是当前所消费到的消费位移,即提交的是x,这明显是错误的。类似的错误还体现在对LEO (Log End Offset)的解读上,与此相关的细节可以参阅第5章的内容。在消费者中还有一个committed offset的概念,它表示已经提交过的消费位移。
对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。
消息重复消费:(PID+seqnumber)
位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。
消费丢失问题:
参考图3-7,当前一次 poll()操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。
消费者拦截器
讲述了生产者拦截器的使用,对应的消费者也有相应的拦截器的概念。消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。与生产者拦截器对应的,消费者拦截器需要自定义实现 org.apache.kafka.clients.consumer.ConsumerInterceptor接口。ConsumerInterceptor接口包含3个方法:Org.apache.kafka.clients.consumer.ConsumerInterceptor与生产者拦截器对应的,消费者拦截器需要自定义实现接口.ConsumerInterceptor接口包含3个方法:
KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。
KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息,当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节,而使用拦截器的onCommit()方法却可以做到这一点。
close()方法和ConsumerInterceptor 的父接口中的configure()方法与生产者的 ProducerIntercepto接口中的用途一样,这里就不赘述了。
也可以自定义的消费者拦截器
消息消费
Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
从代码清单3-1中可以看出,Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,息而 poll()方法返回的是所订阅的主题(分区)上的一组消。
对于poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么poll()方法返回为空的消息集合。
控制或关闭消费
KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。KafkaConsumer中使用pause()和resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。这两个方法的具体定义如下:
第二种方法是通过 timeout参数来设定关闭方法的最长执行时间,有些内部的关闭逻辑会耗费一定的时间,比如设置了自动提交消费位移,这里还会做一次位移提交的动作;而第一种方法没有 timeout参数,这并不意味着会无限制地等待,它内部设定了最长等待时间(30秒);
第三种方法已被标记为@Deprecated,可以不考虑。
指定位移消费(需要详细补充)
在3.2.5节中我们讲述了如何进行消费位移的提交,正是有了消费位移的持久化,才使消费者在关闭、崩溃或者在遇到再均衡的时候,可以让接替的消费者能够根据存储的消费位移继续进行消费。
当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当_consumer_offsets主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。
在Kafka中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。参考图3-9,按照默认的配置,消费者会从9开始进行消费(9是下一条要写入消息的位置),更加确切地说是从9开始拉取消息。如果将auto.offset.reset参数配置为“earliest”,那么消费者会从起始处,也就是0开始消费。
再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。
多线程实现
KafkaProducer是线程安全的,然而KafkaConsumer 却是非线程安全的。K afkaConsumer 中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentMod ifc at ionExcept ion异常:
KafkaConsumer中的每个公用方法在执行所要执行的动作之前都会调用这个acquire()方法,只有 wakeup()方法是个例外。acquire()方法和我们通常所说的锁(synchronized、Lock等〉不同,它不会造成阻塞等待.我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作。
KafkaConsumer非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象,如图3-10所示。
一个线程对应一个KafkaConsumer实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,根据3.1节中介绍的消费者与分区数的关系,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。
与此对应的第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()、seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,笔者也并不推荐。一般而言,分区是消费线程的最小划分单位。下面我们通过实际编码来演示第一种多线程消费实现的方式,详细示例参考如代码清单3-11所示。
重要的消费者参数
1. fetch.min.bytes
该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka 中拉取的最小数据量,默认值为1(B)。Kafka在收到Consumer的拉取请求时,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。
2. fetch.max.bytes
该参数与fetch.max.bytes参数对应,它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为52428800 (B),也就是50MB。如果这个参数设置的值比任何一条写入Kafka 中的消息要小,那么会不会造成无法消费呢?很多资料对此参数的解读认为是无法消费的,比如一条消息的大小为10B,而这个参数的值是1(B),既然此参数设定的值是一次拉取请求中所能拉取的最大数据量,那么显然1B
3. fetch.max.wait.ms
这个参数也和 fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给Consumer,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500 (ms)。如果Kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,那么最终会等待500ms。这个参数的设定和Consumer 与Kafka之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。
4. max.partition.fetch.bytes
这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。这个参数与fetch.max.bytes参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka为了保持消费逻辑的正常运转不会对此做强硬的限制。
5. max.poll.records
这个参数用来配置Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
6.connections.max.idle.ms
这个参数用来指定在多久之后关闭限制的连接,默认值是540000 (ms),即9分钟。
7.exclude.internal.topics
Kafka中有两个内部的主题:_consumer_offsets和_transaction state。 exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true。如果设置为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。
8.receive.buffer.bytes
这个参数用来设置Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B)即64KB。如果设置为-1,则使用操作系统的默认值。如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。
9.send.buffer.bytes
这个参数用来设置Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B)即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值
10.request.timeout.ms
这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000 (ms)。
11.metadata.max.age.ms
这个参数用来配置元数据的过期时间,默认值为300000 (ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。
12.reconnect.backoff.ms
这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50 (ms)。这种机制适用于消费者向broker 发送的所有请求。
13.retry.backoff.ms
这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100 (ms)。
14. isolation.level
这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和“read committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到LSO (LastStableOffset〉的位置,默认情况下为“read_uncommitted”,即可以消费到HW(High Watermark)处的位置。有关事务和LSO的内容可以分别参考7.4节和10.2节。
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net