- 大数据实时流处理技术实战:基于Flink+Kafka技术
- 王璐烽 刘均主编
- 5185字
- 2023-10-17 18:55:44
任务 搭建Flink开发环境
【任务描述】
本任务主要讲解Flink流式数据实时计算的原理以及基于IDEA的集成开发环境的搭建。通过本任务的学习和实践,读者可以了解Flink API的基本使用方法,掌握在IDEA环境中创建项目的方法以及安装Scala插件的方法。
【知识链接】
1.流式数据
在现实生活中,任何类型的数据都可以形成事件流,例如,信用卡交易、传感器测量、服务器日志、网站或移动应用程序上的用户交互记录,所有这些数据都会源源不断地产生,形成数据流。数据可以被作为无界流或者有界流来处理。
● 无界流:定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被获取后需要立刻处理。不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。在处理无界流数据时通常要求以特定顺序获取事件,例如事件发生的顺序,以便能够推断结果的完整性。
● 有界流:既定义流的开始,又定义流的结束。针对有界流,可以在获取所有数据后再进行计算。有界流的所有数据可以被排序,所以并不需要有序获取。有界流处理通常被称为批量处理。
在对无界流的数据流进行处理和分析时,需要将无界数据流转换为有界数据流。处理的方式是使用“窗口”来划分数据,比如,根据事件时间划分为不同的窗口,就可以将无界数据流转换为有界数据流,对同一窗口内的数据进行分析和处理,如图 1-1所示。
图1-1 流式数据示意
2.Flink介绍
Flink起源于由柏林工业大学的教授主导的Stratosphere项目,这个项目由3所地处柏林的大学和欧洲其他一些大学共同进行研发。2014年,Stratosphere项目代码被捐赠给Apache软件基金会,Flink是在这个项目的基础上设计开发的。Flink在德语中表示快速、灵巧,它的LOGO是一只可爱的小松鼠,如图1-2所示。德国柏林有很多这样的小松鼠,它们的特点恰好是快速和灵巧,这也和Flink产品设计的初衷是一致的。在工业界,Flink被认为是最好的数据流处理引擎之一,它是一个开源的分布式大数据处理引擎和计算框架,能够对无界数据流和有界数据流进行统一处理,能进行有状态和无状态的计算。
图1-2 Flink的LOGO
3.Flink的发展历史
Flink的发展是非常快速的。它有一个非常活跃的社区,而且一直在快速成长。下面简单了解一下Flink的发展历史。
● 2010年到2014年,柏林工业大学、柏林洪堡大学和哈索 普拉特纳研究院共同发起名为Stratosphere的研究项目。
● 2014年4月,Stratosphere项目代码被捐赠给Apache软件基金会,并成为后者的孵化项目。此后该项目团队的大部分成员一起创建了另一家公司——Data Artisans,该公司的主要目标是实现Stratosphere项目的商业化。
● 2014年8月,Apache软件基金会将Stratosphere 0.6版本更名为Flink,并发布了第一个正式版本Flink 0.6。该版本具有更好的流式引擎支持。
● 2014年12月,Flink项目完成孵化,成为Apache软件基金会的顶级项目。
● 2019年1月,长期对Flink投入研发的阿里巴巴公司收购了Data Artisans公司,之后又开源了自己的内部版本Blink,在人工智能方面部署了机器学习基础设施。
4.Flink的优势
相对于其他流式处理系统,Flink 在流式数据处理方面具有非常明显的优势,因此,在工业界得到广泛应用。Flink的优势主要表现在以下几方面。
● 同时支持高吞吐、低延迟、高性能:Flink是开源社区中支持高吞吐、低延迟、高性能的分布式流式数据处理框架。满足高吞吐、低延迟、高性能这 3 个目标对分布式流式计算框架来说是非常重要的。
● 支持事件时间:在流式计算领域中,目前大多数框架窗口计算采用的都是系统时间,也是事件传输到计算框架处理时系统主机的当前时间。Flink能够支持基于事件时间语义进行窗口计算,也就是使用事件产生的时间,基于事件驱动的机制使得事件即使乱序到达,流系统也能够计算出精确的结果,保持了事件原本产生时的时序性,尽可能避免受到网络传输或硬件系统的影响。
● 支持有状态计算:所谓状态就是在流式计算过程中将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后,可以从之前的状态中获取中间结果以计算当前的结果,从而无须每次都基于全部的原始数据来统计结果。这种方式极大地提升了系统的性能,并降低了数据计算过程的资源消耗。
● 支持窗口操作:在流处理应用中,数据是连续不断的,需要通过窗口的方式对流数据进行一定范围的聚合计算,窗口可以用灵活的触发条件定制化来达到对复杂的流传输模式的支持,用户可以定义不同的窗口触发机制来满足不同的需求。
● 容错机制:Flink能够将一个大型计算任务的流程拆解成小的计算过程,然后将任务分布到并行节点上进行处理。在任务执行过程中,能够自动发现因事件处理过程中的错误而导致数据不一致的问题。通过基于分布式快照技术的检查点,可以将执行过程中的状态信息进行持久化存储,一旦任务出现异常停止,Flink就能够从检查点中自动恢复任务。
5.Flink系统架构
Flink是一个分布式系统,需要有效分配和管理计算资源才能执行流应用程序。它集成了所有常见的集群资源管理器,例如YARN。Flink也可以作为独立集群运行。Flink运行时由两种类型的进程组成:一个作业管理器(JobManager)和多个任务管理器(TaskManager)。从Master/Slave架构的角度来分析,作业管理器就是Flink集群中的Master节点,任务管理器就是Flink集群中的Slave节点,作业管理器在集群中只有一个,而Master节点可以有多个。如图1-3所示,Flink系统架构主要由作业管理器、任务管理器和客户端(Client)组成。
图1-3 Flink系统架构
● 客户端:客户端不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给作业管理器。之后,客户端可以通过断开连接或保持连接来接收进程报告。客户端可以作为触发执行程序的一部分运行,也可以在命令行进程中运行。
● 作业管理器:具有许多与协调Flink应用程序的分布式执行有关的职责,它决定何时调度下一个任务、对完成或执行失败的任务做出反应并协调从失败中恢复等。
● 资源管理器(ResourceManager):资源管理器负责Flink集群中的资源提供、回收、分配,它管理任务插槽(Slot),这是Flink集群中资源调度的单位。Flink为不同的环境和资源提供者实现了对应的资源管理器。
● 分发器(Dispatcher):分发器提供了一个REST接口,用来提交Flink应用程序执行,并为每个提交的作业启动一个新的进程,它还运行Web UI来提供作业执行信息。
● 任务管理器:执行作业流的任务,并且缓存和交换数据流。系统必须始终至少有一个任务管理器。在任务管理器中资源调度的最小单位是任务插槽。任务管理器中任务插槽的数量表示并发处理任务的数量。在一个任务插槽中可以执行多个算子。
6.Flink API介绍
为方便开发者开发流式应用程序,Flink提供了非常丰富的API,如图1-4所示。最底层的API可以对流式数据的状态、事件时间进行直接的处理。最高层的API使用SQL的方式对数据流进行处理。底层的API可以对事件流进行更直接、更细粒度的控制,使用起来比较灵活。高层的API具有更高级的抽象,提供更方便、快捷的开发方法。开发者可以选择使用不同层级的API,也可以在应用程序中混合使用。
图1-4 Flink API架构
1)Stateful Stream Processing
Flink提供的最底层的接口,可以处理输入数据流中的单个事件或者归入一个特定窗口内的多个事件。它提供了对于时间和状态的细粒度控制。
2)DataStream/DataSet API
DataStream API应用于有界数据流和无界数据流场景。DataSet API主要应用于有界数据流场景,能够实现对数据流操作,包括窗口操作、连接操作、聚合操作和转换操作等。API提供了大量算子,如map、reduce等。开发者还可以通过扩展API预定义接口来实现自定义函数。
下面的代码示例展示了如何基于DataStream API计算每个传感器的最高温度。传感器数据使用三元组表示,如("sensor_1", 1547718199, 1.0),表示ID为sensor_1的传感器在时间戳为1547718199的时间检测到的温度是1.0,Flink处理的是由不同的传感器源源不断产生的数据流,通过map算子将三元组转换为二元组("sensor_1", 1.0),然后使用keyBy算子按照传感器的ID分组,相同ID的传感器分为一组,统计每个传感器的最大值。
//三元组转换为(传感器ID,温度)形式 val dataStream = inputDataStream .map(data => { (data._1, data._3) }) //根据传感器ID分组 val dataStream2 = dataStream.keyBy(_._1) //按照温度汇总 .max(1)
3)Table API
以Table为中心的API。例如在流式数据场景下,它可以表示一张正在动态改变的表。Table API遵循关系模型,即表拥有Schema,类似于关系数据库中的Schema,并且Table API也提供了类似于关系模型中的操作,如select、join、group by等。
下面的代码示例实现了将由单击流对象PageView构成的数据流转换为Table,使用select、where方法实现数据流过滤的功能,查询访问记录只包含主页index.html的数据流。
/** * 页面访问记录 * * @param id ID * @param timestamp 访问时间戳(秒) * @param userId 用户ID * @param visitUrl 访问的链接 * @param visitTime 访问停留时间(秒) */ case class PageView(id: Int, timestamp: Long, userId: Int, visitUrl:String, visitTime:Int) //创建表环境 val tableEnv = StreamTableEnvironment.create(env) //将DataStream转换为表 val pvTable = tableEnv.fromDataStream(dataStream) //只查询访问过index.html的记录 val resultTable1 = pvTable.select($("userId"), $("visitUrl"), $("visitTime")) .where($("visitUrl").isEqual("/index.html")) //转换为流进行输出 tableEnv.toDataStream(resultTable1) .print("resultTable1")
4)SQL
大部分开发人员比较熟悉的使用方式。使用SQL语句对Table进行查询的编程方式,可以大大降低学习和开发成本。一般的处理流程是,将Table对象注册成表名称,Table本身包含Schema,这样就可以通过SQL语句进行查询。
下面的代码示例首先将Table注册成一个表名page_view,然后使用SQL对该表进行查询。
//创建临时表 tableEnv.createTemporaryView("page_view", pvTable) //只查询访问过index.html的记录 val resultTable2 = tableEnv.sqlQuery("select userId,visitUrl,visitTime " + "from page_view " + "where visitUrl = '/index.html' ") //转换为流进行输出 tableEnv.toDataStream(resultTable2) .print("resultTable2")
7.Flink CEP介绍
复杂事件处理(Complex Event Processing,CEP)是事件流处理中的一个常见场景。Flink的CEP库提供了相应的API,使用户能够通过定义模式的方式检测关注的事件,在监控到指定事件后进行后续的处理。CEP库的应用包括网络入侵检测、业务流程监控和欺诈检测等。
下面的代码示例使用Flink CEP库实现对用户登录的日志进行监控,用于检测连续登录失败3次的用户日志。连续登录失败的行为有可能是试图通过猜测密码的攻击行为,将连续登录失败的事件进行报警处理是系统异常检测常用的方法。
/** * 登录事件 * * @param userId 用户ID * @param ipAddr IP地址 * @param eventType 事件类型,success表示登录成功,fail表示登录失败 * @param timestamp 登录时间戳 */ case class LoginEvent(userId: String, ipAddr: String, eventType: String, timestamp: Long) //定义pattern,检测连续3次登录失败事件 val pattern = Pattern.begin[LoginEvent]("firstLoginFail").where(_.eventType == "fail") //第1次登录失败事件 .next("secondLoginFail").where(_.eventType == "fail") //第2次登录失败事件 .next("thirdLoginFail").where(_.eventType == "fail") //第3次登录失败事件 //将pattern应用到事件流上,检测匹配的复杂事件 val patternStream: PatternStream[LoginEvent] = CEP.pattern(loginEventStream.keyBy(_.userId), pattern) //将检测到的匹配事件报警输出 val resultStream: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] { override def select(map: util.Map[String, util.List[LoginEvent]]): String = { //返回报警信息 } })
【任务实施】
1.创建项目
在集成开发环境IDEA启动以后,首先创建一个项目,选择File→New→Project菜单创建一个新的项目,如图1-5所示。
图1-5 创建新项目
由于要创建的是基于Maven的项目,因此在New Project对话框中选择Maven,然后单击Next按钮进行下一步的配置,如图1-6所示。
图1-6 创建Maven项目
继续配置项目的名称(Name)、存储位置(Location)和Maven相关的配置,如图1-7所示。
● Name:项目的名称,这里输入flink_project,也可以根据实际情况输入其他名称。
● Location:项目存储的路径。
● Maven相关的配置包括GroupId、ArtifactId和Version。
❏ GroupId:组织的域名,如果没有特殊的需求,保留默认内容即可。
❏ ArtifactId:项目的名称,输入flink_project。
❏ Version:项目的版本号,如果没有特殊的需求,保留默认内容即可。
图1-7 配置新项目
确认无误后单击Finish按钮完成配置。
2.安装Scala插件
在集成开发环境IDEA中,选择File→Settings菜单,如图1-8所示,打开设置对话框。
图1-8 打开设置菜单
进入图1-9所示的Settings对话框,在左侧导航栏中单击Plugins,然后在右侧的搜索框中输入Scala进行搜索并查找插件,按照相应的提示安装插件即可。
图1-9 安装Scala插件
Scala插件安装以后会显示在插件列表中,如图1-10所示。
图1-10 插件列表
3.在全局类库中设置Scala库
在集成开发环境IDEA中选择File→Project Structure菜单,如图1-11所示,打开项目结构对话框。
图1-11 选择Project Structure菜单
在Project Structure对话框中选择Global Libraries,设置全局类库,如图1-12所示。
图1-12 设置全局类库
本项目使用的Scala版本号是2.12.11。双击Scala SDK,在弹出的Select JAR’s for the new Scala SDK对话框中选择相应的版本,然后单击OK按钮,如图1-13所示。
图1-13 选择Scala SDK版本
确认Scala 2.12.11版添加到flink_project项目中,如图1-14所示。
图1-14 确认Scala 2.12.11版添加到flink_project项目中
Scala类库添加完成以后,在项目框架的Global Libraries菜单中,可以看到新加入的Scala类库,如图1-15所示。
图1-15 查看新加入的Scala类库
4.测试Scala环境
Scala插件安装和全局类库设置完成以后,还需要在项目中添加Scala框架的支持。在左侧项目浏览器的项目flink_project上右击,在弹出的快捷菜单中选择Add Framework Support,添加框架支持,如图1-16所示。
图1-16 添加框架支持
在打开的Add Frameworks Support对话框中选择Scala,确认Scala的版本号正确无误后,单击OK按钮完成设置,如图1-17所示。
图1-17 添加Scala框架支持
5.创建scala文件夹
Maven项目默认创建的main文件夹下面只有java文件夹,这个文件夹一般存储Java源文件。为了使用Scala语言编写程序,可以在main文件夹下创建一个scala文件夹。右击main文件夹,在弹出的快捷菜单中选择New→Directory菜单,如图1-18所示,并将新建的文件夹命名为scala。
图1-18 创建新的文件夹
scala文件夹创建成功后,会显示在项目的框架结构中,如图1-19所示。
图1-19 创建的新文件夹scala
为了标记scala文件夹下面存储的是Scala源文件,需要进一步设置。右击scala文件夹,在弹出的快捷菜单中选择Make Directory as→Sources Root菜单,如图1-20所示,标记该文件夹为源代码的根目录。
图1-20 设置Sources Root
右击scala文件夹,在弹出的快捷菜单中选择New→Scala Class菜单,如图1-21所示,创建一个Scala类,用于对环境进行测试。
图1-21 新建Scala类
将新建的Scala类命名为HelloScala,类型为Object,如图1-22所示。
图1-22 创建HelloScala类
编写main函数,在控制台输出“hello scala”,对环境进行测试。
object HelloScala { def main(args: Array[String]): Unit = { print("hello scala") } }
右击HelloScala类,在弹出的快捷菜单中选择Run 'HelloScala'以运行程序,如图1-23所示。
图1-23 运行HelloScala程序
在控制台可以看到输出结果“hello scala”,说明Scala环境已经安装完成,如图1-24所示。
图1-24 HelloScala程序输出结果