irpas技术客

Flink入门之Flink程序开发步骤(java语言)_保护我方胖虎_flink java

网络投稿 7363

文章目录 (0)开发程序所需依赖(1)获取执行环境(2)加载/创建数据源(3)数据转换处理(4)处理后数据放置/输出(5)执行计算程序(6)完整示例 注:本篇章的flink学习均是基于java开发语言

我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢?

前情回顾:什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?

- Batch Analytics,右边是 Streaming Analytics。批量计算: 统一收集数据->存储到DB->对数据进行批量处理,对数据实时性邀请不高,比如生成离线报表、月汇总,支付宝年度账单(一年结束批处理计算)

- Streaming Analytics 流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如 实时报表、车辆实时报警计算等等。

(0)开发程序所需依赖 <properties> <encoding>UTF-8</encoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <scala.version>2.12</scala.version> <flink.version>1.12.2</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.12</artifactId> <version>${flink.version}</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <plugins> <!-- 编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> <!--<encoding>${project.build.sourceEncoding}</encoding>--> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <!-- 打包插件(会包含所有依赖) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF --> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <!-- 设置jar包的入口类(可选) --> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> (1)获取执行环境

flink程序开发,首要的便是需要获取其执行环境!

ex:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

或者:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

如果使用StreamExecutionEnvironment 默认便是流式处理环境

但是flink1.12 开始,流批一体,我们可以自己指定当前计算程序的环境模式

指定为自动模式:AUTOMATIC

此设置后,flink将会自动识别数据源类型

有界数据流,则会采用批方式进行数据处理

无界束流,则会采用流方式进行数据处理

env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

强制指定为批数据处理模式:BATCH

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

强制指定为流数据处理模式:STREAMING

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

注意点:

在flink中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATIC 或STREAMING,不可以指定为BATCH否则程序会报错!

(2)加载/创建数据源

flink,是一个计算框架,在计算的前提,肯定是要有数据来源啊!

flink可以从多种场景读取加载数据,例如 各类DB 如Mysql、SQL SERVER、MongoDB、各类MQ 如Kafka、RabbitMQ、以及很多常用数据存储场景 如redis、文件(本地文件/HDFS)、scoket…

我们在加载数据源的时候,便知道,该数据是有界还是无界了!

ex:

flink读取rabbitMQ消息,是有界还是无界呢?当然是无界!因为flink程序启动时,能通过连接知道什么时候MQ中有数据,什么时候没有数据吗?不知道,因为本身MQ中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待MQ中有数据到来,然后处理。


flink读取指定某个文件中的数据,那么此数据源是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!

ex:

从集合中读取数据:

DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");

那么,这是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。

从scoket中读取数据:

DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);

这是无界数据还是有界数据呢?很明显,无界数据!因为scoket一旦连接,flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket中有数据到来,然后处理。

(3)数据转换处理

数据转换处理,就是flink使用算子,对从数据源中获取的数据进行数据加工处理(例如 数据转换,计算等等)

例如:开窗口、低阶处理函数ProcessFuction、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。

demo示例:

DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++", "java,scala,php", "java,scala", "java"); // 数据处理 DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String element, Collector<String> out) throws Exception { String[] wordArr = element.split(","); for (String word : wordArr) { out.collect(word); } } }); flatMap.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } }); (4)处理后数据放置/输出

将计算后的数据,进行放置(输出/存储),可以很地方,从什么地方读取数据,自然也可以将计算结果输出到该地点。

例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket…

ex:输出到控制台

source.print(); (5)执行计算程序

flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar & 或者编译器中点击图标按钮启动

启动示例:

// 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置模式 (流、批、自动) // 2.加载数据源 // 3.数据转换 // 4.数据输出 // 5.执行程序 env.execute(); //或者 env.execute("指定当前计算程序名"); (6)完整示例 public class FlinkDemo { public static void main(String[] args) throws Exception { // 1.准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置运行模式 env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 2.加载数据源 DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++", "java,scala,php", "java,scala", "java"); // 3.数据转换 DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String element, Collector<String> out) throws Exception { String[] wordArr = element.split(","); for (String word : wordArr) { out.collect(word); } } }); //DataStream 下边为DataStream子类 SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { return value.toUpperCase(); } }); // 4.数据输出 source.print(); // 5.执行程序 env.execute("flink-hello-world"); } }

IDEA执行后,输出结果:

前边序号可以理解为多线程执行时的线程名字!


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #JAVA #Batch #Analytics右边是 #Streaming #Analytics #批量计算