Flink状态原理及异常容错机制
在1.2.3节,笔者强调了Flink是一个高可用的有状态计算引擎,在第5章的案例中,也频繁出现了状态这个概念。那么状态是什么呢?有状态计算又是什么呢?Flink基于状态提供了什么样的能力呢?相信读者的疑问还有很多,本章我们逐一解答这些疑问。
6.1 Flink有状态计算
Flink的有状态计算和异常容错机制可以说是日常应用Flink过程中最核心也最复杂的概念,因此要想理解为何Flink着重突出有状态计算能力,就要从状态和有状态计算这两个基础概念入手。
6.1.1 状态及有状态计算的定义
相信读者在看到有状态计算这个词之后,也会和笔者第一次看到这个词的反应一样,脑海中出现一个问题:既然有“有状态(Stateful)计算”,那么是不是也有“无状态(Stateless)计算”呢?没错。不但有无状态计算,而且我们学习有状态计算时,必须要用无状态计算作为比较,我们先来看看两者的具体定义。
在一个计算作业中,如果当前的计算过程不依赖之前的数据就可以直接计算出结果,那么就称为无状态计算。以图6-1的数据处理场景为例,有一个数据格式为{id:long, value:long, time:long}的数据源,需求是解析其中的value值并输出结果到数据汇存储引擎中,那么该数据处理作业在处理数据时直接解析当前这条数据就可以得到结果,不会依赖到之前的数据,这就是一个典型的无状态计算作业。

图6-1 无状态计算作业的执行流程
有状态计算则恰恰相反。在一个计算作业中,如果当前数据的计算过程需要依赖之前数据的历史计算结果,使用历史计算结果和当前的数据同时进行计算才能得到新的结果,那么我们就称为有状态计算,其中依赖到的历史计算结果就称为状态(State)。举例来说,如图6-2所示,有一个数据格式为{id:long, value:long, time:long}的数据源,我们要解析出其中的value值,要求只有当前的value值比前一个value值大时,才输出结果。那么在处理数据时,我们就需要拿上一个value进行对比,这就是一个典型的有状态计算的案例。

图6-2 有状态计算作业的执行流程
如图6-3所示,在上述有状态计算案例执行的过程中,我们要将前一个value值存储下来才能在下一次计算时进行比较,其中存储的value值就是状态。在每一次计算过程中,都会访问前一次的value值,并在计算完成后将当前的value值更新到状态中以便下一次计算。

图6-3 有状态计算作业的执行流程
注意:有状态计算是指数据处理作业进行处理数据的过程,而状态强调的是状态数据。
从上述关于有状态计算的定义中不难发现,状态和有状态计算在各个领域都是普遍存在的,我们的生活和状态以及有状态计算也有着千丝万缕的关系,比如下面的场景。
6.1.2 Flink有状态计算的4类应用
在对状态和有状态计算的概念有了初步了解之后,我们回到Flink中,看看Flink中经常涉及状态和有状态计算的4类应用。
在第4章介绍的数据分组聚合操作中,Max、Min、Sum、Reduce等操作的计算过程都是有状态计算,这4类操作的执行流程都是将当前的输入数据和历史中间结果进行聚合计算,然后得到最新的结果并输出,其中保存的历史中间结果就是状态,聚合计算的过程就是有状态计算。
举例来说,使用Sum操作来计算每种商品累计销售额,KeyBySum算子每输入一条商品销售记录的数据,都会将历史累计的商品销售额和当前这一个商品的销售额相加,得到商品最新的累计销售额并输出。
在第5章介绍的时间窗口应用中,时间窗口的计算过程也是有状态计算。当我们使用的窗口数据处理函数是全量窗口处理函数时,在窗口触发计算之前,窗口算子会将数据收集在窗口内,并在窗口触发时统一计算整个窗口内的数据。在这个计算过程中,每一条数据的计算都不是独立的,都依赖窗口内其他的数据,其中保存在窗口内数据就是状态,窗口算子触发窗口计算的过程就是有状态计算。当我们使用的窗口数据处理函数是增量窗口处理函数时,窗口算子会将输入数据累加到当前窗口的累加器中得到新的结果值,其中窗口的累加器中保存的数据就是状态,窗口算子进行增量计算的过程就是有状态计算。
在前两种有状态计算的应用中,我们使用的都是Flink预置的分组聚合和时间窗口的DataStream API。这两种API屏蔽了有状态计算的过程,让用户感受不到有状态计算的存在。我们还可以在Flink中显式地自定义状态并进行有状态计算,比如在5.2.6节介绍增量窗口处理函数和全量窗口处理函数时,我们通过自定义状态累计计算了每种商品的历史累计销售额、销量,这种使用用户自定义状态的应用也是Flink中常见的一种有状态计算。
当在数据流上训练机器学习模型时,通过状态保存的当前版本的模型参数。这个应用场景不是本书的主要内容,此处不多赘述。Flink常见的应用场景或多或少都和有状态计算相关,因此熟练掌握状态、有状态计算的概念以及状态相关DataStream API的使用对于Flink开发人员来说是必备技能。
6.1.3 传统有状态计算方案应用于大数据场景时存在的3个问题
在学习了Flink中常见的4种有状态计算应用后,读者可能会产生疑问:在Flink诞生之前,有状态计算的应用其实已经出现了,并且在生活中也非常常见,为什么Flink能把有状态计算作为自己的王牌武器呢?Flink的有状态计算到底厉害在哪里?
任何新技术的诞生都源于旧技术无法解决当前应用场景下的新问题,而Flink有状态计算诞生的过程也不例外,要想回答上述问题,就要从下面这个问题聊起:为什么传统的有状态计算方案不能应用到大数据流处理场景中?
首先我们来看看传统的有状态计算方案是什么样的。我们以Flink诞生之前就已经很常见的传统事务型应用(常见的Web应用都是传统事务型应用)为例,来看看传统事务型应用中有状态计算的实现方案。如图6-4所示,在传统事务型应用中,通常会将状态保存在关系型数据库(比如MySQL)中,后端应用在执行有状态计算时,会通过网络通信访问和更新数据库中的状态数据,计算并输出结果。

图6-4 传统事务型应用有状态计算执行流程
但是如果在大数据流处理场景中采用传统事务型应用的有状态计算方案就会出现以下3个问题。
接下来我们针对这3个问题逐一分析。
相比传统事务型应用的场景,在大数据流处理领域的场景中,每一个Flink作业需要处理的数据量都是非常庞大的,数据处理的峰值吞吐量可以达到每秒百万条甚至千万条。如果Flink选择将状态数据存储在数据库中,并通过网络去访问,就会出现大量的网络I/O请求。执行一次网络I/O请求至少是毫秒级别的响应延迟,那么在大数据量处理场景下,就会导致数据处理的高延迟和低吞吐,状态访问性能不佳,如图6-5所示。

图6-5 Flink采用传统有状态计算方案将导致数据处理的高延迟、低吞吐
无论在哪种数据处理场景中,我们都需要重点关注网络连接问题或者机器硬件故障导致的作业宕机问题,这些问题往往会导致数据不一致。为了解决数据不一致的问题,我们通常会选择编写复杂的容错性代码实现数据处理的精确一次。这里我们先对异常容错时数据处理的精确一次做一个简单的解释:数据处理的精确一次要求作业在发生故障时要做到数据处理的不重、不丢,从而保证状态值既不会算多也不会算少,最终计算得到的结果和没有发生过故障得到的结果一致。
在传统有状态计算方案中要实现数据处理的精确一次,最常用的方法是通过事务访问和更新数据库。事务这种异常容错方式的开发成本不但高,而且由于要考虑的异常场景很多,因此可能无法保证数据处理的精确一次,所以如果Flink使用传统事务型应用的有状态计算方案,将会使异常容错的成本变得很高,如图6-6所示。

图6-6 Flink采用传统有状态计算方案将导致异常容错成本高
在传统事务型应用的有状态计算实现方案中,不同的业务场景下我们可能会使用多种不同的数据库来存储状态数据,比如MySQL、Redis、HBase等。那么作为用户就要同时学习并使用每种数据库各自提供的接口,这对用户来说也是一个不低的成本。除此之外,当存储状态的数据库选型发生变化时,用户不但要去修改作业代码,将代码中的访问、更新状态的接口替换为新的数据库提供的接口,还需要考虑如何将状态数据从一个数据库迁移到另一个数据库中。因此如果Flink使用传统事务型应用的有状态计算方案,状态接口会很不易用,如图6-7所示。

图6-7 Flink采用传统有状态计算方案会凸显出状态接口不易用的问题
6.1.4 Flink实现有状态计算的思路
在6.1.3节我们探讨了在大数据流处理场景中应用传统有状态计算方案存在的3个问题,每一个问题都是Flink成为优秀的有状态计算引擎路上的一座大山,但是Flink不惧困难,针对这3个问题分别提出了以下3个优雅的解决思路,最终形成了具有Flink特色的有状态计算方案。
接下来我们分析上述3个解决思路的诞生过程。需要提前说明的是,由于每个解决思路包含的内容都比较多,因此本节只简单分析每种解决思路的核心思想,在6.2节及后续章节再分析每种思路的具体实现以及使用方法。
在6.1.3中提到,如果Flink使用传统有状态计算的方案通过网络请求去访问或者更新状态值,将会导致Flink作业在处理数据时存在高延迟和低吞吐的问题。解决这个问题的方法其实很简单,就是将状态数据存储在Flink SubTask的本地内存或磁盘中,避免访问和更新状态时发起网络请求,这个解决思路称为状态本地化。
为了实现状态本地化,Flink中的状态数据需要存储在SubTask所在机器的内存或者磁盘中,SubTask只需要访问内存或者本地磁盘就可以获取状态值,这样对于状态的访问和更新的耗时就可以从毫秒级别降低到微秒甚至纳秒级别,即使在处理超大数据量的情况下,也能做到极致的低延迟和高吞吐,如图6-8所示。

图6-8 Flink通过状态本地化实现极致的状态访问速度
状态本地化的优化思路看起来很完美,但是一旦状态本地化,就意味着存储在SubTask本地的状态数据全都需要Flink自己进行维护和管理了,而这又引入了一大堆新的问题。举例来说,我们需要考虑状态数据应该以什么样的数据结构存储在SubTask本地,以及将状态数据存储在内存还是磁盘中?一旦作业发生故障,存储在SubTask本地的状态数据丢失了又该怎么办?这些新问题都是Flink要去考虑和解决的,此处我们先讨论其中一个最核心的问题,即作业发生故障时,存储在SubTask本地的状态数据丢失了怎么办。这个问题也被称作状态本地化后状态数据的异常容错问题。
在生产环境中,有状态的流处理作业通常是7×24小时不间断运行的,因为网络连接或者机器硬件故障导致的作业宕机问题无法避免,所以遇到这种问题,我们希望流处理作业能够自动恢复,继续处理数据。在传统事务型应用的有状态计算方案中,状态数据是存储在远程数据库中的,因此即使作业故障宕机,存储在远程数据库中的状态数据也不会丢失,这就可以做到在作业恢复后继续处理,我们依然能够得到正确的计算结果。
但是如果采用状态本地化的方案,将状态数据保存在SubTask内存或者本地磁盘中,在故障宕机时,状态数据就极易丢失,这时即使作业能够自动恢复继续处理数据,大概率也会计算得到错误的结果。
以6.1.2节介绍的Flink有状态计算4种场景中的分组聚合应用场景为例,有一个计算每种商品累计销售额的Flink作业,使用状态来存储并计算每种商品的累计销售额。假设这时商品1的累计销售额已经达到300元,如果状态数据保存在内存中,那么在作业宕机后内存中的数据都会直接丢失。接下来作业恢复后继续计算,商品1的累计销售额就只能从重新从0元开始累计,这就会导致商品累计销售额算少。
为了解决这个问题,有读者会想到在批处理作业中,作业发生故障常见的解决方案就是重跑一遍,那么流处理作业是不是也可以参考批处理作业的方式,在发生故障后从头开始回溯数据呢?
没错,只要能做到从头开始回溯数据,计算结果的准确性也是可以保证的,但是问题就出现在从头回溯数据这件事的可行性上,从头回溯数据是一个需要对数据源存储引擎中所有的历史数据进行全量处理的过程,而一旦涉及全量处理,下面这3个问题就随之而来了。
因此在流处理领域,异常容错方案一旦涉及从头进行全量数据的回溯,往往不是一个好选择,既然全量回溯这条路走不通,那我们就往增量回溯的思路上探索。
我们先想想如果在生活中遇到宕机问题是怎么处理的呢,以我们常用的编码软件IDE(Integrated Development Environment,集成开发环境)来说,市面上无论哪款IDE,为了防止IDE卡死或者电脑宕机导致用户编写的代码丢失,都会将用户编写的代码定时、自动保存一份快照到本地磁盘中,甚至有些IDE为了避免本地磁盘的损坏,提供了将代码快照自动保存到云端存储(远程持久化存储)的能力。有了保存代码快照的功能,即使发生故障,IDE依然可以从上一次保存成功的代码快照恢复,用户就可以继续开发,而无需从头重写代码。
从上面这个案例出发,解决思路自然而然就有了。我们可以发现Flink有状态计算的过程和用户使用IDE编码的过程是非常相似的。Flink引擎就等同于IDE开发环境,Flink中的状态就等同于用户使用IDE编写的代码,Flink有状态计算的过程就等同于用户使用IDE编写代码的过程,Flink作业遇到故障宕机的过程就等同于IDE卡死或者电脑宕机的过程,那么Flink中状态数据的异常容错的过程也可以参考IDE通过持久化代码快照实现异常容错的机制。
那么在Flink中具体要如何实现呢?其实方案并不复杂,如图6-9所示,我们可以让Flink作业在运行时定时且自动地将作业中的状态数据持久化到远程的分布式文件系统(比如HDFS)中。当Flink作业遇到故障时,重启之后可以从远程分布式文件系统中获取到上一次保存的快照,从该快照进行恢复,恢复后继续处理数据即可。在恢复后,作业就可以正常运行了,在接下来的运行过程中,依然定时且自动地将作业中的最新的状态数据持久化到远程分布式文件系统中,这样就可以避免因为网络连接或者机器硬件故障导致作业宕机后状态数据丢失的问题了。上述解决方案就被称作状态的持久化机制,也称作快照机制。

图6-9 通过将状态持久化(快照机制)从而实现状态数据的异常容错
在6.1.3节中提到,如果Flink使用传统有状态计算的方案,那么为了实现异常容错时数据处理和状态结果满足精确一次的一致性要求,用户就不得不使用事务这样的机制,这将导致用户耗费很大的开发成本在异常容错上。
下面来分析Flink针对这个问题的解决思路。我们知道异常容错应对的主要场景就是网络连接或者机器硬件故障等问题导致的宕机问题,而这类问题最大的特点就是并非用户编码Bug导致的,而是外在的环境因素导致。既然错误不是由于用户的编码导致的,理想情况下就应该把实现状态一致性异常容错的逻辑从用户的代码里给完全剥离出来,由Flink计算框架本身完成,用户无需参与。
Flink最终给出的解决方案就是,在状态持久化的基础之上,以Chandy-Lamport分布式系统快照算法作为理论基础,实现了名为Checkpoint(检查点)的分布式轻量级异步快照,保证了精确一次的数据处理和一致性状态,数据既不会算多,也不会算丢。
Flink Checkpoint最有价值的地方在于真正实现了处理数据的逻辑和异常容错逻辑之间的解耦,用户无需像传统事务型应用的有状态计算的方案一样通过编写事务相关的代码来保障精确一次的数据处理和状态数据的一致性,只需要使用Flink状态接口,并开启Checkpoint机制,Flink作业就可以自动实现精确一次的数据处理,如图6-10所示。关于Flink状态接口会在6.2节介绍,Flink Checkpoint机制的原理以及执行流程会在6.3节介绍。

图6-10 通过精确一次的一致性快照实现低成本的异常容错
在6.1.3中提到,如果Flink使用传统有状态计算的方案,那么由于不同场景下存储状态数据的可选数据库类型是非常多的,因此用于访问、更新状态的接口也是五花八门的,这将导致状态接口易用性很差。
针对这个问题,Flink在状态本地化、状态持久化以及Checkpoint机制的基础上,基于DataStream API提供了一套标准且易用的状态接口。用户使用这套状态接口,不但能够享受到状态本地化带来的极致的状态访问速度,还能够享受到状态持久化和一致性快照带来的异常容错场景下精确一次的数据处理保证。
其实对于状态接口我们并不陌生,在5.2.6节介绍增量和全量窗口处理函数时,我们使用了名为ValueState的状态接口计算得到了每种商品的累计销售额、累计销量以及累计平均销售额。在ValueState中包含T get()和void update(T value)两个方法,分别用于访问和更新状态值,我们只需要在自定义函数中调用这两个方法就能轻松访问和更新状态。如图6-11所示,相比传统事务型应用的有状态计算方案来说,使用Flink提供的统一状态接口来访问状态,编码逻辑被极大的简化了。

图6-11 通过统一的状态接口提升状态的易用性
关于Flink DataStream API中状态接口的使用方法将在6.2节进行介绍。
本文作者:羊艺超
本文摘编于《Flink SQL 与 DataStream 入门、进阶与实战》(70万字,500页)。
京东机械工业出版社旗舰店限时京东机械工业出版社旗舰店限时5折5折,点击链接即可购买。,点击链接即可购买。









暂无评论内容