Flink使用示例

Flink使用示例

POM说明<Flink版本1.14.6>


<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>demo_flink</groupId>
    <artifactId>demo_flink</artifactId>
    <version>0.1</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>https://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <flink.version>1.14.6</flink.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Apache Flink 核心依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Apache Flink 流批处理依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Apache Flink 连接器依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>lz4-java</artifactId>
                    <groupId>org.lz4</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- Apache Flink web依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 简易Hutool小工具库 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.16</version>
        </dependency>

    </dependencies>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>demo.stream.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_2.12</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>

示例代码

从kafka进行监控计数

// 从kafka进行监控计数
public class StreamingJob {

    public static void main(String[] args) throws Exception {

        // 访问http://localhost:8082 可以看到Flink Web UI
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT, 8082);
        // 创建本地执行环境,并行度为2
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
        // 不启动窗口
//      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

          // 远程提交任务
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
//        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "10.19.28.136:9094");
        properties.setProperty("group.id", "flink");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");//重置 offset 方法 最近一次
        DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("flink-string", new SimpleStringSchema(), properties));
        DataStream<Map<String,Integer>> words = kafkaStream.flatMap ((String input, Collector<Map<String,Integer>> collector) -> {
            for (String word : input.split("[, ,]")) {
                Integer integer = count.containsKey(word) ? count.put(word, count.get(word) + 1) : count.put(word, 1);
            }
            collector.collect(count);
            System.out.println(JSONUtil.toJsonStr(count));
        }).returns(Types.MAP(Types.STRING,Types.INT));
        words.map(JSONUtil::toJsonStr).print();
        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }

    //计数器
    public static Map<String,Integer> count = new HashMap<>(); 
}

从KAFKA到KAFKA计数处理

public class StreamingJob {

    public static void main(String[] args) throws Exception {

        // 访问http://localhost:8082 可以看到Flink Web UI
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT, 8082);
        // 创建本地执行环境,并行度为2
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);
        // 不启动窗口
//      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

         // 远程提交任务
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
//        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "10.19.28.136:9094");
        properties.setProperty("group.id", "flink");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");//重置 offset 方法 最近一次
        DataStreamSource<String> kafkaStream = env.addSource(new FlinkKafkaConsumer<String>("flink-string", new SimpleStringSchema(), properties));
        DataStream<String> words = kafkaStream.flatMap ((String input, Collector<String> collector) -> {
            for (String word : input.split("[, ,]")) {
                Integer integer = count.containsKey(word) ? count.put(word, count.get(word) + 1) : count.put(word, 1);
            }
            collector.collect(JSONUtil.toJsonStr(count));
            System.out.println(JSONUtil.toJsonStr(count));
        }).returns(Types.STRING);
        //创建sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("10.19.28.136:9094")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("flink-out")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();
        //传入sink
        words.sinkTo(sink).name("send to kafka");
        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }

    public static Map<String,Integer> count = new HashMap<>();
}

从soket进行监控滑动计数

public class SocketWindowWordCount {
    public static void main(String[] args) throws Exception {
        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // 访问http://localhost:8082 可以看到Flink Web UI
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT, 8082);
        // 创建本地执行环境,并行度为2
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(2, conf);

        // 不启动窗口
//        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 远程提交任务
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("gcw1", 8081);
//        StreamTableEnvironment stEnv = StreamTableEnvironment.create(env);

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");
        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
                .flatMap((String value, Collector<WordWithCount> collector) -> {
                    for (String word : value.split("\\s")) {
                        collector.collect(new WordWithCount(word, 1L));
                    }
                }).returns(Types.POJO(WordWithCount.class))
                .keyBy(data->data.word)
                // 添加滑动窗口(窗口宽度,滑动速度),采用处理时间进行处理
                .window(SlidingProcessingTimeWindows.of(Time.seconds(5),Time.seconds(1)))
                // 添加滚动窗口 ,采用事件时间进行处理
//                .window(TumblingEventTimeWindows.of(Time.seconds(8)))
                // 历史版本 <=1.10
//                .keyBy("word")
//                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce((ReduceFunction<WordWithCount>) (a, b) -> new WordWithCount(a.word, a.count + b.count));
        // print the results with a single thread, rather than in parallel
        windowCounts.print();
        env.execute("Socket Window WordCount");
    }

    /**
     * Data type for words with count
      */
    public static class WordWithCount {
        public String word;
        public long count;

        /**
         * 提升序列化效率
         */
        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}