Flink使用示例
POM说明<Flink版本1.14.6>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>demo_flink</groupId>
<artifactId>demo_flink</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>https://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<flink.version>1.14.6</flink.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<!-- Apache Flink 核心依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Apache Flink 流批处理依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- Apache Flink 连接器依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>lz4-java</artifactId>
<groupId>org.lz4</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- Apache Flink web依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 简易Hutool小工具库 -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.16</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>demo.stream.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
示例代码
从kafka进行监控计数
// 从kafka进行监控计数
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 访问http://localhost:8082 可以看到Flink Web UI
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT, 8082);
// 创建本地执行环境,并行度为2
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
// 不启动窗口
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 远程提交任务
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
// StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.19.28.136:9094");
properties.setProperty("group.id", "flink");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");//重置 offset 方法 最近一次
DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("flink-string", new SimpleStringSchema(), properties));
DataStream<Map<String,Integer>> words = kafkaStream.flatMap ((String input, Collector<Map<String,Integer>> collector) -> {
for (String word : input.split("[, ,]")) {
Integer integer = count.containsKey(word) ? count.put(word, count.get(word) + 1) : count.put(word, 1);
}
collector.collect(count);
System.out.println(JSONUtil.toJsonStr(count));
}).returns(Types.MAP(Types.STRING,Types.INT));
words.map(JSONUtil::toJsonStr).print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
//计数器
public static Map<String,Integer> count = new HashMap<>();
}
从KAFKA到KAFKA计数处理
public class StreamingJob {
public static void main(String[] args) throws Exception {
// 访问http://localhost:8082 可以看到Flink Web UI
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT, 8082);
// 创建本地执行环境,并行度为2
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
// 不启动窗口
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 远程提交任务
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
// StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.19.28.136:9094");
properties.setProperty("group.id", "flink");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");//重置 offset 方法 最近一次
DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("flink-string", new SimpleStringSchema(), properties));
DataStream<String> words = kafkaStream.flatMap ((String input, Collector<String> collector) -> {
for (String word : input.split("[, ,]")) {
Integer integer = count.containsKey(word) ? count.put(word, count.get(word) + 1) : count.put(word, 1);
}
collector.collect(JSONUtil.toJsonStr(count));
System.out.println(JSONUtil.toJsonStr(count));
}).returns(Types.STRING);
//创建sink
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("10.19.28.136:9094")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flink-out")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
//传入sink
words.sinkTo(sink).name("send to kafka");
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static Map<String,Integer> count = new HashMap<>();
}
从soket进行监控滑动计数
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the port to connect to
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}
// 访问http://localhost:8082 可以看到Flink Web UI
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT, 8082);
// 创建本地执行环境,并行度为2
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
// 不启动窗口
// final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 远程提交任务
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
// StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text
.flatMap((String value, Collector<WordWithCount> collector) -> {
for (String word : value.split("\\s")) {
collector.collect(new WordWithCount(word, 1L));
}
}).returns(Types.POJO(WordWithCount.class))
.keyBy(data->data.word)
// 添加滑动窗口(窗口宽度,滑动速度),采用处理时间进行处理
.window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)))
// 添加滚动窗口 ,采用事件时间进行处理
// .window(TumblingEventTimeWindows.of(Time.seconds(8)))
// 历史版本 <=1.10
// .keyBy("word")
// .timeWindow(Time.seconds(5), Time.seconds(1))
.reduce((ReduceFunction<WordWithCount>) (a, b) -> new WordWithCount(a.word, a.count + b.count));
// print the results with a single thread, rather than in parallel
windowCounts.print();
env.execute("Socket Window WordCount");
}
/**
* Data type for words with count
*/
public static class WordWithCount {
public String word;
public long count;
/**
* 提升序列化效率
*/
public WordWithCount() {
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}