irpas技术客

【Flink从入门到精通 00】流式处理概念Mac上搭建Flink 1.14.0环境并编写Demo_是苏辞啊!

未知 7778

1. 准备环境

这里主要介绍Mac系统下的环境搭建,其他操作系统可直接查看官网或者我个人的翻译博客

1.1 安装并查看Java版本号

Flink要求Java版本为Java8或Java11及以上。

java -version

1.2 安装Flink

查看Flink信息

brew info apache-flink

安装Flink

brew install apache-flink

1.3 检查安装 flink --version

1.4 启动Flink # 切换到Flink安装目录下的bin目录 cd /opt/homebrew/Cellar/apache-flink/1.14.0/libexec/bin # 启动Flink集群 ./start-cluster.sh

1.5 查看Web页面

FlinkWeb页面

1.6 删除 brew remove apache-flink 2. Demo项目 2.1 新建一个maven项目

2.2 编写代码 2.2.1 引入依赖 <!-- flink-core --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${FLINK_VERSION}</version> <scope>provided</scope> </dependency> <!-- flink-streaming --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${SCALA_VERSION}</artifactId> <version>${FLINK_VERSION}</version> <scope>provided</scope> </dependency> 2.2.2 创建SocketStreamWordCount package com.suci.knowledge.firststep; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * flink入门案例 * Socket发送文本,flink对文本进行单词统计 * @Author: rugu * @Date: 2021/12/27 10:50 */ public class SocketStreamWordCount { public static void main(String[] args) throws Exception { final int ARGS_LENGTH = 2; // Socket参数校验 if (args.length < ARGS_LENGTH){ System.err.println("ERROR:参数校验失败,请输入正确的参数 <hostname> <port>"); return; } String hostname = args[0]; String port = args[1]; // 创建Flink运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 添加数据源 DataStream<String> socketTextStream = env.socketTextStream(hostname, Integer.parseInt(port)); // 对数据分组统计 DataStream<Tuple2<String, Integer>> sum = socketTextStream.flatMap(new SocketStreamFlatMapFunction()) .keyBy(0) .sum(1); sum.print(); // 运行程序 env.execute("SocketStream"); } /** * 自定义FlatMapFunction * 对文本进行切割,并组装成tuple */ private static class SocketStreamFlatMapFunction implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = s.split(" "); for (String word : words) { collector.collect(new Tuple2<String, Integer>(word, 1)); } } } } 2.2 打包运行 将项目打成jar包 mvn clean package -Dmaven.test.skip=true

监听7777端口 nc -l 7777

运行程序 进入flink的bin目录下,执行以下命令 flink run -c com.suci.knowledge.firststep.SocketStreamWordCount /Users/apple/Desktop/flink-learning/flink-core/target/flink-core-1.0-SNAPSHOT.jar 127.0.0.1 7777

Jar包路径根据个人代码位置进行替换

进入Web UI查看运行的程序

Web UI:http://localhost:8081/

查看统计结果

使用tail命令监控程序的实时输出,注意将文件路径切换为你的路径。

cd /opt/homebrew/Cellar/apache-flink/1.14.0/libexec/log tail -f flink-apple-taskexecutor-0-apples-MacBook-Pro.local.out

Web UI上也可以看到总共接收了12条记录。

总结

本文主要介绍了在Mac环境下如何使用HomeBrew安装Flink,并运行了一个Demo程序,希望能对你有所帮助。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink从入门到精通 #1140环境并编写Demo #Mac上搭建Flink #1140环境并编写Demo1 #JAVA