Flink应用详细说明
Flink开发骨架
我们知道,一个Java或Scala的程序入口一般是一个静态(static)的main函数。在main函数中,还需要定义下面几个核心步骤:
-
初始化运行环境。
-
读取一到多个数据源Source。
-
根据业务逻辑对数据流进行Transformation转换。
-
将结果输出到Sink。
- 调用作业执行函数。
接下来我们对这五个步骤拆解分析。
设置执行环境
一个Flink作业必须依赖一个执行环境:
// 创建Flink执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
这行代码可以获取一个Flink流处理执行环境。Flink一般运行在一个集群上,执行环境是Flink程序运行的上下文,它提供了一系列作业与集群交互的方法,比如作业如何与外部世界交互。当调用getExecutionEnvironment()
方法时,假如我们是在一个集群上提交作业,则返回集群的上下文,假如我们是在本地执行,则返回本地的上下文。本例中我们是进行流处理,在批处理场景则要获取DataSet API中批处理执行环境。
💁信息 本例中我们是进行流处理,在批处理场景则要获取DataSet API中批处理执行环境。流处理和批处理的执行环境不同,流处理的执行环境名为
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
,批处理的执行环境名为org.apache.flink.api.java.ExecutionEnvironment
。
Scala和Java所需要引用的包也不相同,Scala需要调用org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
和org.apache.flink.api.scala.ExecutionEnvironment
。
下图是批处理和流处理两种场景下,Java和Scala两种编程语言所需要引用的包。刚刚接触Flink的朋友很可能因为错误地引用导致出现莫名其妙的错误,一定要注意是否引用正确的包。
另外,使用Scala API时,应该按照下面的方式引用,否则会出现一些问题。
import org.apache.flink.streaming.api.scala._
Scala中的_
就像Java中的*
,是一种通配符。在这里使用_
会引用org.apache.flink.streaming.api.scala
下面的所有内容。
回到执行环境上,我们可以通过执行环境做很多设置。比如,env.setParallelism(2)
告知执行环境整个作业的并行度为2;env.disableOperatorChaining()
关闭算子链功能。
使用下面的设置可以创建一个基于本地的执行环境,这样我们使用IntelliJ Idea运行程序时,可以直接打开浏览器进入Flink Web UI查看运行的任务,本地的调试。
Configuration conf = new Configuration();
// 访问http://localhost:8082 可以看到Flink Web UI
conf.setInteger(RestOptions.PORT, 8082);
// 创建本地执行环境,并行度为2
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
此外,我们还可以在执行环境中设置一些时间属性等,配置Checkpoint等,我们将在后续章节中介绍这些功能。总之,执行环境是开发者和Flink交互的一个重要入口。
读取数据源
接着我们需要使用执行环境提供的方法读取数据源,读取数据源的部分统称为Source。数据源一般是消息队列或文件,我们也可以根据业务需求重写数据源,比如定时爬取网络中某处的数据。在本例中,我们使用DataStream<String> stream = env.addSource(consumer);
来读取数据源,其中consumer
是一个Kafka消费者,我们消费Kafka中的数据作为Flink的输入数据。绝大多数流处理实战场景可能都是消费其他消息队列作为Source。
进行转换操作
此时,我们已经获取了一个文本数据流,接下来我们就可以在数据流上进行有状态的计算了。我们一般调用Flink提供的各类算子,使用链式调用的方式,对一个数据流进行操作。经过各算子的处理,DataStream
可能被转换为KeyedStream
、WindowedStream
、JoinedStream
等不同的数据流结构。相比Spark RDD的数据结构,Flink的数据流结构确实更加复杂。
本例中,我们先对一行文本进行分词,形成(word, 1)
这样的二元组,然后以单词为Key进行分组,并开启一个时间窗口,统计该窗口中某个单词出现的次数。在这个过程中,涉及到对数据流的分组、窗口和聚合操作。其中,窗口相关操作涉及到如何将数据流中的元素划分到不同的窗口,聚合操作涉及到使用一个状态来记录单词出现的次数,不断维护更新状态来对数据进行实时处理。本章我们重点介绍一些DataStream API,第五章将介绍时间上的操作,第六章将介绍如何使用状态以及如何做失败恢复。
结果输出
然后我们需要将前面的计算结果输出到外部系统。目的地可能是一个消息队列、文件系统或数据库,或其他自定义的输出方式。输出结果的部分统称为Sink。
本例中,我们的结果是窗口内的词频统计,它是一个DataStream<Tuple2<String, Integer>>
的数据结构。我们调用print
函数将这个数据流打印到标准输出(Standard Output)上。print
主要是为调试使用的,在实战场景中,计算结果会输出到一个外部的数据库或下一个流处理作业。
执行
当定义好程序的Source、Transformation和Sink的业务逻辑后,程序并不会立即执行这些计算,我们还需要调用执行环境execute()
方法来明确通知Flink去执行。Flink是延迟执行(Lazy Evaluation)的,即当程序明确调用execute()
方法时,Flink才会将数据流图转化为一个JobGraph
,提交给JobManager,JobManager根据当前的执行环境来执行这个作业。如果没有execute()
方法,我们无法得到输出结果。
综上,一个Flink程序的核心业务逻辑主要包括:初始化执行环境、进行Source、Transformation和Sink操作,最后要调用执行环境的execute()
方法。
Flink数据源
1、直接从文件中读取数据
在已有 执行环境的前提下,可以通过readTextFile 读取数据文件
DataStreamSource<String> streamDS = env.readTextFile("input/clicks.txt"); //文件读取数据
streamDS.print("从文件读取数据--");
env.execute();
说明:
- 参数可以是目录,也可以是文件;
- 路径可以是相对路径,也可以是绝对路径;
- 相对路径是从系统属性user.dir获取路径: idea下是project的根目录, standalone模式下是集群节点根目录;
- 也可以从hdfs目录下读取, 使用路径hdfs://..., 由于Flink没有提供hadoop相关依赖, 需要pom中添加相关依赖
如果需要读取 hdfs 文件,需要引入以下依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
<scope\>provided</scope>
</dependency>
2、从集合中会读取数据
最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
ArrayList<Integer> nums = new ArrayList(); //nums 添加2个数字
nums.add(2);
nums.add(5);
DataStreamSource<Integer> numsStream = env.fromCollection(nums);//读取 nums 集合数据
ArrayList<Event> events = new ArrayList();
events.add(new Event("令狐冲", "./cart", 1000L));
events.add(new Event("依琳", "./home", 2000L));
DataStreamSource<Event> eventDataStreamSource = env.fromCollection(events);//读取 events 集合数据\*/
numsStream.print("集合1 数字列表");
eventDataStreamSource.print("集合2 event");
env.execute();
3、从元素中会读取数据
从元素中读取数据和从集合中读取数据一样简单,直接写好元素列表通过上下文环境的 fromElements 方法读取元素列表即可。
DataStreamSource<Event> elementStream =
//直接构建对象进行读取数据
env.fromElements(new Event("令狐冲", "./cart", 1000L), new Event("依琳", "./home", 2000L));
elementStream.print("从元素中读取数据")
4、从socket 读取数据
不论从集合还是文件,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。这时又从哪里读取呢?一个简单的方式,就是我们之前用到的读取socket文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
DataStreamSource<String> socketTextStream = env.socketTextStream("hadoop103", 7777);
socketTextStream.print("socketTextStream---");
env.execute();
运行代码之前向在终端上起一下 nc 进程
[hui@hadoop103 ~]$ nc -lk 7777
5、从Kafka中读取数据
Kafka作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说Kafka和Flink天生一对,是当前处理流式数据的双子星。在如今的实时流处理应用中,由Kafka进行数据的收集和传输,Flink 进行分析计算,这样的架构已经成为众多企业的首选,
- 引入依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency>
- 代码
Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "hadoop201:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest");//重置 offset 方法 最近一次 DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties)); kafkaStream.print(”kafka---“); env.execute();
服务器启动zk 和 kafka 启动kafka 生产者
[hui@hadoop201 kafka]$ bin/kafka-console-producer.sh --topic test --broker-list hadoop201:9092
启动程序,在生产着窗口发送消息,观察IDEA 控制台输出
6、自定义Source
大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,我们想要读取的数据源来自某个外部系统,而flink既没有预实现的方法、也没有提供连接器,又该怎么办呢?那就只好自定义实现SourceFunction了。接下来我们创建一个自定义的数据源,实现SourceFunction接口。主要重写两个关键方法:run()和cancel()。
run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。
代码如下:
- 我们先来自定义一下数据源:
import java.util.Calendar;
import java.util.Random;
public class ClinkSource implements SourceFunction<Event> { //声明一个标志位
private boolean runFlag = true;
@Override public void run(SourceContext<Event> ctx) throws Exception { //顶一个随机数
Random random = new Random(); //选举范围数据集
String users\[\] = {"令狐冲", "依琳", "宁中则", "任盈盈", "岳灵珊"};
String urls\[\] \= {"./home", "./cret", "./pro?id=21", "./fav"}; //循环不停的生成数据
while (runFlag) {
String user \= users\[random.nextInt(users.length)\];
String url \= urls\[random.nextInt(urls.length)\];
Long timeStamp \= Calendar.getInstance().getTimeInMillis();
ctx.collect(new Event(user, url, timeStamp));
Thread.sleep(1000L);
}
}
@Override public void cancel() {
runFlag \= false;
}
}
- 应用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<Event> enentStream = env.addSource(new ClinkSource());//.setParallelism(12); enentStream.print(); env.execute();
执行结果
Flink数据转换
Flink的Transformation转换主要包括四种:单数据流基本转换、基于Key的分组转换、多数据流转换和数据重分布转换。Transformation各算子可以对Flink数据流进行处理和转化,多个Transformation算子共同组成一个数据流图,DataStream Transformation是Flink流处理非常核心的API。下图展示了数据流上的几类操作,本章主要介绍四种Transformation:单数据流转换、基于Key的分组转换、多数据流转换和数据重分布转换。
Flink的Transformation是对数据流进行操作,其中数据流涉及到的最常用数据结构是DataStream
,DataStream
由多个相同的元素组成,每个元素是一个单独的事件。在Java中,我们使用泛型DataStream<T>
来定义这种组成关系,在Scala中,这种泛型对应的数据结构为DataStream[T]
,T
是数据流中每个元素的数据类型。在WordCount的例子中,数据流中每个元素的类型是字符串String
,整个数据流的数据类型为DataStream<String>
。
在使用这些算子时,需要在算子上进行用户自定义操作,一般使用Lambda表达式或者继承类并重写函数两种方式完成这个用户自定义的过程。接下来,我们将对Flink Transformation中各算子进行详细介绍,并使用大量例子展示具体使用方法。
单数据流转换
单数据流基本转换主要对单个数据流上的各元素进行处理。
map
map
算子对一个DataStream
中的每个元素使用用户自定义的Mapper函数进行处理,每个输入元素对应一个输出元素,最终整个数据流被转换成一个新的DataStream
。输出的数据流DataStream<OUT>
类型可能和输入的数据流DataStream<IN>
不同。
我们可以重写MapFunction
或RichMapFunction
来自定义map函数,MapFunction
在源码的定义为:MapFunction<T, O>
,其内部有一个map
虚函数,我们需要对这个虚函数重写。下面的代码重写了MapFunction
中的map
函数,将输入结果乘以2,转化为字符串后输出。
// 函数式接口类// T为输入类型,O为输出类型
@FunctionalInterfacepublic interface MapFunction<T, O> extends Function, Serializable {
// 调用这个API就是继承并实现这个虚函数
O map(T value) throws Exception;
}
第二章中我们曾介绍过,对于这样一个虚函数,可以继承接口类并实现虚函数:
// 继承并实现MapFunction
// 第一个泛型是输入类型,第二个泛型是输出类型
public static class DoubleMapFunction implements MapFunction<Integer, String> {
@Override
public String map(Integer input) {
return "function input : " + input + ", output : " + (input * 2);
}
}
然后在主逻辑中调用这个类:
DataStream<String> functionDataStream = dataStream.map(new DoubleMapFunction());
这段的代码清单重写了MapFunction
中的map
函数,将输入结果乘以2,转化为字符串后输出。我们也可以不用显式定义DoubleMapFunction
这个类,而是像下面的代码一样使用匿名类:
// 匿名类
DataStream<String> anonymousDataStream = dataStream.map(new MapFunction<Integer, String>() {
@Override
public String map(Integer input) throws Exception { return "anonymous function input : " + input + ", output : " + (input * 2);
}
});
自定义map函数最简便的操作是使用Lambda表达式。
// 使用Lambda表达式
DataStream<String> lambdaStream = dataStream.map(input -> "lambda input : " + input + ", output : " + (input * 2));
Scala的API相对更加灵活,可以使用下划线来构造Lambda表达式:
// 使用 _ 构造Lambda表达式
val lambda2 = dataStream.map { _.toDouble * 2 }
💁信息 使用Scala时,Lambda表达式可以可以放在圆括号()中,也可以使用花括号{}中。使用Java时,只能使用圆括号。
对上面的几种方式比较可见,Lambda表达式更为简洁。重写函数的方式代码更为臃肿,但定义更清晰。
此外,RichMapFunction
是一种RichFunction,它除了MapFunction
的基础功能外,还提供了一系列其他方法,包括open
、close
、getRuntimeContext
和setRuntimeContext
等虚函数方法,重写这些方法可以创建状态数据、对数据进行广播,获取累加器和计数器等,这部分内容将在后面介绍。
filter
filter
算子对每个元素进行过滤,过滤的过程使用一个Filter函数进行逻辑判断。对于输入的每个元素,如果filter函数返回True,则保留,如果返回False,则丢弃,如下图所示。
我们可以使用Lambda表达式过滤掉小于等于0的元素:
DataStream<Integer> dataStream = senv.fromElements(1, 2, -3, 0, 5, -9, 8);
// 使用 -> 构造Lambda表达式
DataStream<Integer> lambda = dataStream.filter ( input -> input > 0 );
也可以继承FilterFunction
或RichFilterFunction
,然后重写filter
方法,我们还可以将参数传递给继承后的类。如下面的代码所示,MyFilterFunction
增加一个构造函数参数limit
,并在filter
方法中使用这个参数。
public static class MyFilterFunction extends RichFilterFunction<Integer> {
// limit参数可以从外部传入
private Integer limit;
public MyFilterFunction(Integer limit) {
this.limit = limit;
}
@Override
public boolean filter(Integer input) {
return input > this.limit;
}
}
// 继承RichFilterFunction
DataStream<Integer> richFunctionDataStream = dataStream.filter(new MyFilterFunction(2));
flatMap
flatMap
算子和map
有些相似,输入都是数据流中的每个元素,与之不同的是,flatMap
的输出可以是零个、一个或多个元素,当输出元素是一个列表时,flatMap
会将列表展平。如下图所示,输入是包含圆形或正方形的列表,flatMap
过滤掉圆形,正方形列表被展平,以单个元素的形式输出。
我们可以用切水果的例子来理解map和flatMap的区别。map会对每个输入元素生成一个对应的输出元素:
{苹果,梨,香蕉}.map(去皮) => {去皮苹果, 去皮梨,去皮香蕉}
flatMap
先对每个元素进行相应的操作,生成一个相应的集合,再将集合展平:
{苹果,梨,香蕉}.flMap(切碎)
=> {[苹果碎片1, 苹果碎片2], [梨碎片1,梨碎片2, 梨碎片3],[香蕉碎片1]}
=>{苹果碎片1, 苹果碎片2, 梨碎片1,梨碎片2, 梨碎片3,香蕉碎片1}
下面的代码对字符串进行切词处理:
DataStream<String> dataStream = senv.fromElements("Hello World", "Hello this is Flink");
// split函数的输入为 "Hello World" 输出为 "Hello" 和 "World" 组成的列表 ["Hello", "World"]
// flatMap将列表中每个元素提取出来
// 最后输出为 ["Hello", "World", "Hello", "this", "is", "Flink"]
DataStream<String> words = dataStream.flatMap ((String input, Collector<String> collector) -> {
for (String word : input.split(" ")) {
collector.collect(word);
}
}).returns(Types.STRING);
因为flatMap
可以输出零到多个元素,我们可以将其看做是map
和filter
更一般的形式。如果我们只想对长度大于15的句子进行处理,可以先在程序判断处理,再输出,如下所示。
<<<<<<< HEAD
=======
8bdbbd835ea1e4a778da30e70ef7bc8fc2af8844 // 只对字符串数量大于15的句子进行处理 // 使用匿名函数 DataStream
longSentenceWords = dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String input, Collectorcollector) throws Exception {
if (input.length() > 15) { for (String word: input.split(" ")) collector.collect(word);
}
} });💁提示 虽然
flatMap
可以完全替代map
和filter
,但Flink仍然保留了这三个API,主要因为map
和filter
的语义更明确:map
可以表示一对一的转换,代码阅读者能够确认对于一个输入,肯定能得到一个输出;filter
则明确表示发生了过滤操作。更明确的语义有助于提高代码的可读性。
Scala的API相对更简单一些:
val dataStream: DataStream[String] = senv.fromElements("Hello World", "Hello this is Flink")
val words = dataStream.flatMap ( input => input.split(" ") )
val words2 = dataStream.map { _.split(" ") }
基于Key的分组转换
对数据分组主要是为了进行后续的聚合操作,即对同组数据进行聚合分析。如下图所示,keyBy
会将一个DataStream
转化为一个KeyedStream
,聚合操作会将KeyedStream
转化为DataStream
。如果聚合前每个元素数据类型是T,聚合后的数据类型仍为T。
keyBy
绝大多数情况,我们要根据事件的某种属性或数据的某个字段进行分组,然后对一个分组内的数据进行处理。如下图所示,keyBy
算子根据元素的形状对数据进行分组,相同形状的元素被分到了一起,可被后续算子统一处理。比如,对股票数据流处理时,可以根据股票代号进行分组,然后对同一支股票统计其价格变动。又如,电商用户行为日志把所有用户的行为都记录了下来,如果要分析某一个用户行为,需要先按用户ID进行分组。
keyBy
算子将DataStream
转换成一个KeyedStream
。KeyedStream
是一种特殊的DataStream
,事实上,KeyedStream
继承了DataStream
,DataStream
的各元素随机分布在各算子实例中,KeyedStream
的各元素按照Key分组,相同Key的数据会被分配到同一算子实例中。我们需要向keyBy
算子传递一个参数,以告知Flink以什么作为Key进行分组。
我们可以使用数字位置来指定Key:
DataStream<Tuple2<Integer, Double>> dataStream = senv.fromElements(Tuple2.of(1, 1.0), Tuple2.of(2, 3.2), Tuple2.of(1, 5.5), Tuple2.of(3, 10.0), Tuple2.of(3, 12.5));
// 使用数字位置定义Key 按照第一个字段进行分组
DataStream<Tuple2<Integer, Double>> keyedStream = dataStream.keyBy(0).sum(1);
也可以使用字段名来指定Key。比如,我们有一个Word
类:
public class Word {
public String word;
public int count;
public Word() {}
public Word(String word, int count) {
this.word = word;
this.count = count;
}
public static Word of(String word, int count) {
return new Word(word, count);
}
@Override
public String toString() {
return this.word + ": " + this.count;
}
}
我们可以直接用Word
中的字段名word
来选择Key。
// Flink历史版本 <=1.10
DataStream<Word> fieldNameStream = wordStream.keyBy("word").sum("count");
// 新版本>= 1.20
DataStream<Word> fieldNameStream = wordStream.keyBy(data->data.word).sum("count");
信息 这种方法只适用于Scala case class或Java POJO类型的数据。
指定Key本质上是实现一个KeySelector
,在Flink源码中,它是这么定义的:
// IN为数据流元素,KEY为所选择的Key
@FunctionalInterfacepublic interface KeySelector<IN, KEY> extends Function, Serializable {
// 选择一个字段作为Key
KEY getKey(IN value) throws Exception;
}
我们可以重写getKey()
方法,如下所示:
DataStream<Word> wordStream = senv.fromElements( Word.of("Hello", 1), Word.of("Flink", 1),Word.of("Hello", 2), Word.of("Flink", 2));
// 使用KeySelector
DataStream<Word> keySelectorStream = wordStream.keyBy(new KeySelector<Word, String> () {
@Override
public String getKey(Word in) {
return in.word;
}
}).sum("count");
一旦按照Key分组后,我们后续可以对每组数据进行时间窗口的处理以及状态的创建和更新。数据流里相同Key的数据都可以访问和修改相同的状态,如何使用时间和状态将在后续章节中分别介绍。
Aggregation
常见的聚合操作有sum
、max
、min
等,这些聚合操作统称为聚合(Aggregation)。与批处理不同,这些聚合函数是对流数据进行统计,流数据是依次进入Flink的,聚合操作是对流入的数据进行实时统计,并不断输出到下游。
使用聚合函数时,我们需要一个参数来指定按照哪个字段进行聚合。跟keyBy
相似,我们可以使用数字位置来指定对哪个字段进行聚合,也可以实现一个KeySelector
。
sum
算子对该字段进行加和,并将结果保存在该字段上,它无法确定其他字段的数值,或者说无法保证其他字段的计算结果。下面的例子中,sum
对第二个字段求和,他只保证了第二个字段的求和结果的正确性,第三个字段是不确定的。
DataStream<Tuple3<Integer, Integer, Integer>> tupleStream =senv.fromElements(Tuple3.of(0, 0, 0),Tuple3.of(0, 1, 1),Tuple3.of(0, 2, 2),Tuple3.of(1, 0, 6),Tuple3.of(1, 1, 7),Tuple3.of(1, 0, 8));
// 按第一个字段分组,对第二个字段求和,打印出来的结果如下:
// (0,0,0)
// (0,1,0)
// (0,3,0)
// (1,0,6)
// (1,1,6)
// (1,1,6)
DataStream<Tuple3<Integer, Integer, Integer>> sumStream = tupleStream.keyBy(0).sum(1);
max
算子对该字段求最大值,并将结果保存在该字段上。对于其他字段,该操作并不能保证其数值的计算结果。下面的例子对第三个字段求最大值,第二个字段是不确定的。
// 按第一个字段分组,对第三个字段求最大值max,打印出来的结果如下:
// (0,0,0)
// (0,0,1)
// (0,0,2)
// (1,0,6)
// (1,0,7)
// (1,0,8)
DataStream<Tuple3<Integer, Integer, Integer>> maxStream = tupleStream.keyBy(0).max(2);
maxBy
算子对该字段求最大值,maxBy
与max
的区别在于,maxBy
同时保留其他字段的数值,即maxBy
返回数据流中最大的整个元素,包括其他字段。以下面的输入中Key为1的数据为例,我们要求第三个字段的最大值,Flink首先接收到(1,0,6)
,当接收到(1,1,7)
时,最大值发生变化,Flink将(1,1,7)
这整个元组返回,当(1,0,8)
到达时,最大值再次发生变化,Flink将(1,0,8)
这整个元组返回。反观max
,它只负责所求的字段,其他字段概不负责,无法保证其他字段的结果。因此,maxBy
保证的是最大值的整个元素,max
只保证最大值的字段。
// 按第一个字段分组,对第三个字段求最大值maxBy,打印出来的结果如下:
// (0,0,0)
// (0,1,1)
// (0,2,2)
// (1,0,6)
// (1,1,7)
// (1,0,8)
DataStream<Tuple3<Integer, Integer, Integer>> maxByStream = tupleStream.keyBy(0).maxBy(2);
同样,min
和minBy
的区别在于,min
算子对某字段求最小值,minBy
返回具有最小值的整个元素。
其实,这些聚合操作里已经使用了状态数据,比如,sum
算子内部记录了当前的和,max
算子内部记录了当前的最大值。算子的计算过程其实就是不断更新状态数据的过程。由于内部使用了状态数据,而且状态数据并不会被清理,因此一定要慎重地在一个无限数据流上使用这些聚合操作。
💁信息 对于一个
KeyedStream
,一次只能使用一个Aggregation聚合操作,无法链式使用多个。
reduce
前面几个Aggregation是几个较为常用的操作,对分组数据进行处理更为通用的方法是使用reduce
算子。
上图展示了reduce
算子的原理:reduce
在分组的数据流上生效,它接受两个输入,生成一个输出,即两两合一地进行汇总操作,生成一个同类型的新元素。
例如,我们定义一个学生分数类:
public static class Score {
public String name;
public String course;
public int score;
public Score(){}
public Score(String name, String course, int score) { this.name = name;
this.course = course;
this.score = score;
}
public static Score of(String name, String course, int score) {
return new Score(name, course, score);
}
@Override
public String toString() {
return "(" + this.name + ", " + this.course + ", " + Integer.toString(this.score) + ")";
}
}
在这个类上进行reduce
:
DataStream<Score> dataStream = senv.fromElements(Score.of("Li", "English", 90),Score.of("Wang", "English", 88),Score.of("Li", "Math", 85),Score.of("Wang", "Math", 92),Score.of("Liu", "Math", 91), Score.of("Liu", "English", 87));
// 实现
ReduceFunctionDataStream<Score> sumReduceFunctionStream = dataStream.keyBy("name").reduce(new MyReduceFunction());
其中MyReduceFunction
继承并实现了ReduceFunction
:
public static class MyReduceFunction implements ReduceFunction<Score> {
@Override
public Score reduce(Score s1, Score s2) {
return Score.of(s1.name, "Sum", s1.score + s2.score);
}
}
使用Lambda表达式更简洁一些:
// 使用 Lambda 表达式
DataStream<Score> sumLambdaStream = dataStream.keyBy("name").reduce((s1, s2) -> Score.of(s1.name, "Sum", s1.score + s2.score));
多数据流转换
很多情况下,我们需要对多个数据流进行整合处理。
union
在DataStream
上使用union
算子可以合并多个同类型的数据流,或者说,可以将多个DataStream<T>
合并为一个新的DataStream<T>
。数据将按照先进先出(First In First Out)的模式合并,且不去重。下图中,union
对白色和深色两个数据流进行合并,生成一个数据流。
假设股票价格数据流来自不同的交易所,我们将其合并成一个数据流:
DataStream<StockPrice> shenzhenStockStream = ...
DataStream<StockPrice> hongkongStockStream = ...
DataStream<StockPrice> shanghaiStockStream = ...
DataStream<StockPrice> unionStockStream = shenzhenStockStream.union(hongkongStockStream, shanghaiStockStream);
connect
union
虽然可以合并多个数据流,但有一个限制:多个数据流的数据类型必须相同。connect
提供了和union
类似的功能,用来连接两个数据流,它与union
的区别在于:
connect
只能连接两个数据流,union
可以连接多个数据流。connect
所连接的两个数据流的数据类型可以不一致,union
所连接的两个数据流的数据类型必须一致。- 两个
DataStream
经过connect
之后被转化为ConnectedStreams
,ConnectedStreams
会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
如下图所示,connect
经常被应用在使用控制流对数据流进行控制处理的场景上。控制流可以是阈值、规则、机器学习模型或其他参数。
两个DataStream
经过connect
之后被转化为ConnectedStreams
。对于ConnectedStreams
,我们需要重写CoMapFunction
或CoFlatMapFunction
。这两个接口都提供了三个泛型,这三个泛型分别对应第一个输入流的数据类型、第二个输入流的数据类型和输出流的数据类型。在重写函数时,对于CoMapFunction
,map1
处理第一个流的数据,map2
处理第二个流的数据;对于CoFlatMapFunction
,flatMap1
处理第一个流的数据,flatMap2
处理第二个流的数据。下面是CoFlatMapFunction
在源码中的签名。
// IN1为第一个输入流的数据类型
// IN2为第二个输入流的数据类型
// OUT为输出类型
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
// 处理第一个流的数据
void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
// 处理第二个流的数据
void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
Flink并不能保证两个函数调用顺序,两个函数的调用依赖于两个数据流中数据的流入先后顺序,即第一个数据流有数据到达时,map1
或flatMap1
会被调用,第二个数据流有数据到达时,map2
或flatMap2
会被调用。下面的代码对一个整数流和一个字符串流进行了connect
操作。
DataStream<Integer> intStream = senv.fromElements(1, 0, 9, 2, 3, 6);
DataStream<String> stringStream = senv.fromElements("LOW", "HIGH", "LOW", "LOW");
ConnectedStreams<Integer, String> connectedStream = intStream.connect(stringStream);
DataStream<String> mapResult = connectedStream.map(new MyCoMapFunction());
// CoMapFunction三个泛型分别对应第一个流的输入、第二个流的输入,map之后的输出
public static class MyCoMapFunction implements CoMapFunction<Integer, String, String> {
@Override
public String map1(Integer input1) {
return input1.toString();
}
@Override
public String map2(String input2) {
return input2;
}
}
两个数据流connect
之后,可以使用FlatMapFunction
也可以使用ProcessFunction
继续处理,可以做到类似SQL中的连接(Join)的效果。Flink也提供了join
算子,join
主要作用在时间窗口上,connect
相比而言更广义一些,关于join
的介绍将在第五章时间相关章节中介绍。
并行度与数据重分布
并行度
第二章中我们曾经提到,Flink使用并行度来定义某个算子被切分为多少个算子子任务,或者说多少个算子实例、分区。我们编写的大部分Transformation转换操作能够形成一个逻辑视图,当实际运行时,逻辑视图中的算子会被并行切分为一到多个算子子任务,每个算子子任务处理一部分数据,各个算子并行地在多个子任务上执行。假如算子的并行度为2,那么它有两个子任务。
并行度可以在一个Flink作业的执行环境层面统一设置,这样将影响该作业所有算子并行度,也可以对某个算子单独设置其并行度。如果不进行任何设置,默认情况下,一个作业所有算子的并行度会依赖于这个作业的执行环境。如果一个作业在本地执行,那么并行度默认是本机CPU核心数。当我们将作业提交到Flink集群时,需要使用提交作业的Client,并指定一系列参数,其中一个参数就是并行度。
下面的代码展示了如何获取执行环境的默认并行度,如何更改执行环境的并行度。
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取当前执行环境的默认并行度
int defaultParalleism = senv.getParallelism();
// 设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4
senv.setParallelism(4);
也可以对某个算子设置并行度:
dataStream.map(new MyMapper()).setParallelism(defaultParallelism * 2);
数据重分布
默认情况下,数据是自动分配到多个实例(或者称之为分区)上的。有的时候,我们需要手动在多个实例上进行数据分配,例如,我们知道某个实例上的数据过多,其他实例上的数据稀疏,产生了数据倾斜,这时我们需要将数据均匀分布到各个实例上,以避免部分分区负载过重。数据倾斜问题会导致整个作业的计算时间过长或者内存不足等问题。
本节涉及到的各个数据重分布算子的输入是DataStream
,输出也是DataStream
。keyBy
也有对数据进行分组和数据重分布的功能,但keyBy
输出的是KeyedStream
。
shuffle
shuffle
基于正态分布,将数据随机分配到下游各算子实例上。
dataStream.shuffle();
rebalance与rescale
rebalance
使用Round-ribon思想将数据均匀分配到各实例上。Round-ribon是负载均衡领域经常使用的均匀分配的方法,上游的数据会轮询式地分配到下游的所有的实例上。如下图所示,上游的算子会将数据依次发送给下游所有算子实例。
dataStream.rebalance();
rescale
与rebalance
很像,也是将数据均匀分布到各下游各实例上,但它的传输开销更小,因为rescale
并不是将每个数据轮询地发送给下游每个实例,而是就近发送给下游实例。
dataStream.rescale();
如上图所示,当上游有两个实例时,上游第一个实例将数据发送给下游第一个和第二个实例,上游第二个实例将数据发送给下游第三个和第四个实例,相比rebalance
将数据发送给下游每个实例,rescale
的传输开销更小。下图则展示了当上游有四个实例,下游有两个实例,上游前两个实例将数据发送给下游第一个实例,上游后两个实例将数据发送给下游第二个实例。
broadcast
英文单词"broadcast"翻译过来为广播,在Flink里,数据会被复制并广播发送给下游的所有实例上。
dataStream.broadcast();
global
global
会将所有数据发送给下游算子的第一个实例上,使用这个算子时要小心,以免造成严重的性能问题。
dataStream.global();
partitionCustom
我们也可以在DataStream
上使用partitionCustom
来自定义数据重分布逻辑。下面是partitionCustom
的源码,它有两个参数:第一个参数是自定义的Partitioner
,我们需要重写里面的partition
函数;第二个参数是对数据流哪个字段使用partiton
逻辑。
public class DataStream<T> {
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, int field) { ... }
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) { ... }
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { ... }
}
下面为Partitioner
的源码,partition
函数的返回一个整数,表示该元素将被路由到下游第几个实例。
@FunctionalInterface
public interface Partitioner<K> extends java.io.Serializable, Function {
// 根据key决定该数据分配到下游第几个分区(实例)
int partition(K key, int numPartitions);
}
Partitioner[K]
中泛型K为根据哪个字段进行分区,比如我们要对一个Score
数据流重分布,希望按照id均匀分配到下游各实例,那么泛型K就为id的数据类型Long
。同时,泛型K也是int partition(K key, int numPartitions)
函数的第一个参数的数据类型。
public class Score {
public Long id;
public String name;
public Double score;
}
在调用partitionCustom(partitioner, field)
时,第一个参数是我们重写的Partitioner
,第二个参数表示按照id字段进行处理。
partitionCustom
涉及的类型和函数有点多,使用例子解释更为直观。下面的代码按照数据流中的第二个字段进行数据重分布,当该字段中包含数字时,将被路由到下游算子的前半部分,否则被路由到后半部分。如果设置并行度为4,表示所有算子的实例总数为4,或者说共有4个分区,那么如果字符串包含数字时,该元素将被分配到第0个和第1个实例上,否则被分配到第2个和第3个实例上。
public class PartitionCustomExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取当前执行环境的默认并行度
int defaultParalleism = senv.getParallelism();
// 设置所有算子的并行度为4,表示所有算子的并行执行的实例数为4
senv.setParallelism(4);
DataStream<Tuple2<Integer, String>> dataStream = senv.fromElements(Tuple2.of(1, "123"), Tuple2.of(2, "abc"),Tuple2.of(3, "256"), Tuple2.of(4, "zyx"),Tuple2.of(5, "bcd"), Tuple2.of(6, "666"));
// 对(Int, String)中的第二个字段使用 MyPartitioner 中的重分布逻辑
DataStream<Tuple2<Integer, String>> partitioned = dataStream.partitionCustom(new MyPartitioner(), 1);
partitioned.print();
senv.execute("partition custom transformation");
}
/**
* Partitioner<T> 其中泛型T为指定的字段类型
* 重写partiton函数,并根据T字段对数据流中的所有元素进行数据重分配
* */
public static class MyPartitioner implements Partitioner<String> {
private Random rand = new Random();
private Pattern pattern = Pattern.compile(".*\\d+.*");
/**
* key 泛型T 即根据哪个字段进行数据重分配,本例中是Tuple2(Int, String)中的String
* numPartitons 为当前有多少个并行实例
* 函数返回值是一个Int 为该元素将被发送给下游第几个实例
* */
@Override
public int partition(String key, int numPartitions) {
int randomNum = rand.nextInt(numPartitions / 2);
Matcher m = pattern.matcher(key);
if (m.matches()) {
return randomNum;
} else {
return randomNum + numPartitions / 2;
}
}
}
}
DataSink结果输出
DataSink简介
Flink
基础操作与一个处理数据 Http
接口的生命周期很像,接受数据 -> 处理数据 -> 存储数据,而 Sink
在翻译有表示【下沉】的意思,也就是我们经常对处理数据后做的一件事:存储。
下面来看下 RickSinkFunction
类的继承体系图:
个人觉得跟数据源 RichSourceFunction
很像,都继承了 AbstractRichFunction
抽象类,实现了 RichFunction
中的 open
和 close
等基础方法。两者的区别在于,数据源 DataSource
另外实现的是 SourceFunction
接口,而我们本篇文章的主角 DataSink
实现的就是 SinkFunction
接口。
官方支持的连接器 Connector
在流式计算框架 Flink
中,可以通过 Sink
进行存储操作。官方给出更推荐的说法是连接器 Connector
, 第三方中间件作为连接器,既可以当成数据源,也能当成目的地,取决于上面提到实现的接口(SourceFunction
/SinkFunction
)
以下是官方支持的连接器,感兴趣的可以点击参考资料三去详细了解
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Google PubSub (source/sink)
其中结尾的 source
表示数据源,sink
表示数据的发送地,例如常见的消息中间件 Apache Kafka
,它既可以作为数据源,也能成为数据的发送目的地。
Kafka
Kafka版本为0.11
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>`
Kafka版本为2.0以上
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
主函数中添加sink:
DataStream<String> union = high
.union(low)
.map(r -> r.temperature.toString);
union.addSink(
new FlinkKafkaProducer011<String>(
"localhost:9092",
"test",
new SimpleStringSchema()
)
);
Elasticsearch
在主函数中调用:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<<<<<<< HEAD
=======
>>>>>>> 8bdbbd835ea1e4a778da30e70ef7bc8fc2af8844
可选依赖:
```xml
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.9.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.9.1</version>
</dependency>`
示例代码: scala version
object SinkToES {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"))
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
val hashMap = new util.HashMap[String, String]()
hashMap.put("data", t.toString)
val indexRequest = Requests
.indexRequest()
.index("sensor") // 索引是sensor,相当于数据库
.source(hashMap)
requestIndexer.add(indexRequest)
}
}
)
// 设置每一批写入es多少数据
esSinkBuilder.setBulkFlushMaxActions(1)
val stream = env.addSource(new SensorSource)
stream.addSink(esSinkBuilder.build())
env.execute()
}
}`
java version
public class SinkToES {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<SensorReading> sensorReadingBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<SensorReading>() {
@Override
public void process(SensorReading sensorReading, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
HashMap<String, String> map = new HashMap<>();
map.put("data", sensorReading.toString());
IndexRequest indexRequest = Requests
.indexRequest()
.index("sensor") // 索引是sensor,相当于数据库
.source(map);
requestIndexer.add(indexRequest);
}
}
);
sensorReadingBuilder.setBulkFlushMaxActions(1);
DataStream<SensorReading> stream = env.addSource(new SensorSource());
stream.addSink(sensorReadingBuilder.build());
env.execute();
}
}
Redis
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
定义一个redis的mapper类,用于定义保存到redis时调用的命令:
scala version
object SinkToRedisExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build()
stream.addSink(new RedisSink[SensorReading](conf, new MyRedisSink))
env.execute()
}
class MyRedisSink extends RedisMapper[SensorReading] {
override def getKeyFromData(t: SensorReading): String = t.id
override def getValueFromData(t: SensorReading): String = t.temperature.toString
override def getCommandDescription: RedisCommandDescription = new RedisCommandDescription(RedisCommand.HSET, "sensor")
}
}
java version
public class WriteToRedisExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<SensorReading> stream = env.addSource(new SensorSource());
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build();
stream.addSink(new RedisSink<SensorReading>(conf, new MyRedisSink()));
env.execute();
}
public static class MyRedisSink implements RedisMapper<SensorReading> {
@Override
public String getKeyFromData(SensorReading sensorReading) {
return sensorReading.id;
}
@Override
public String getValueFromData(SensorReading sensorReading) {
return sensorReading.temperature + "";
}
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "sensor");
}
}
}
PrintSinkFunction
在我们平时编码中,常用的验证结果的方式是将结果输出到控制台,例如 IDEA
的快捷键 SOUT
,可以很快的将结果输出到底部控制台中。
在 Flink
的世界中,流式计算因为要一直接收数据进行处理,常用的操作对象是 DataStream
,它是一个流对象,有特定的打印 Print
方法,就是我接下来要介绍的 PrintSinkFunction
。
从结构图看出,PrintSinkFunction
继承自 RichSinkFunction
,重写了其中两个关键方法 open
和 invoke
,在这两个方法中,实现了输出功能。
PrintSinkFunction.java
public class PrintSinkFunction<IN> extends RichSinkFunction<IN> {
private final PrintSinkOutputWriter<IN> writer;
...
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
writer.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
}
@Override
public void invoke(IN record) {
writer.write(record);
}
...
}
上面是它的核心代码,省略了 `writer` 变量的初始化。
**在 `open` 方法中,获取了运行上下文对象,从中取出当前运行的任务下标以及并发任务数量,传递到了 `writer` 变量中**(所以 `demo` 中经常能看到 【1> xxxx】,前面的数字是一个前缀,实际值是当前任务下标 + 1)。
**在 `invoke` 方法中,做的工作就比较简单了,就是将流处理传入记录 `record` 进行输出打印(详细输出过程可跟踪进 `PrintSinkOutputWriter` 查看)**
官方库中的 `PrintSinkFunction` 在日常开发中常使用,通过控制台输出结果进行验证数据是否跟自己预期的一致。所以先以常用的类进行介绍,可以更快的对 `SinkFunction` 的整体结构有个更清晰的了解。
### 自定义SinkFunction(存储到 MySQL)
除了官方支持的 `Connector` 外,还提供了途径,让我们扩展存储方式,通过 `addSink()` 方法,添加自定义的 `SinkFunction`。
**上面是程序的输入和输出示意图**,在 `Input` 中,以秒为单位,`TimeWindow` 以 10s 为间隔,将输入的数据放在一个窗口中(在一个窗口中的数据,可以进行聚合 `reduce` 操作,然后进行输出),**最后 `Sink` 到常用的存储地,这里以 `MySQL` 进行数据的落库作为示例。**
**最后每次传入 `Sink` 时,是一个数据列表 `List` 型的入参。从上面的示意图来联想我们 `kafka` 消息,搜集 10s 内的消息,然后放入同一个时间窗口中,接着一次性存入到数据库中。**
- 数据源和转换如下:

- SinkToMySQL
```java
public class SinkToMySQL extends RichSinkFunction<List<Student>> {
<<<<<<< HEAD
private PreparedStatement ps;
private Connection connection;
=======
private PreparedStatement ps;
private Connection connection;
>>>>>>> 8bdbbd835ea1e4a778da30e70ef7bc8fc2af8844
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
connection = MyDruidUtils.getConnection();
String sql = "insert into student(name, age, address) values (?, ?, ?);";
ps = connection.prepareStatement(sql);
}
<<<<<<< HEAD
=======
>>>>>>> 8bdbbd835ea1e4a778da30e70ef7bc8fc2af8844
@Override
public void close() throws Exception {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
}
<<<<<<< HEAD
=======
>>>>>>> 8bdbbd835ea1e4a778da30e70ef7bc8fc2af8844
@Override
public void invoke(List<Student> value, Context context) throws Exception {
for (Student student : value) {
ps.setString(1, student.getName());
ps.setInt(2, student.getAge());
ps.setString(3, student.getAddress());
ps.addBatch();
}
int[] count = ps.executeBatch();
}
}
**上面的类就是自定义的 `Sink` 具体实现, `open` 获取数据库链接和初始化 `SQL`, `close` 时释放链接,每次落库具体操作在 `invoke` 方法中。**