irpas技术客

4.1.28 Flink-流处理框架-Flink使用Lambda表达式引发了泛型擦除问题_敲代码的乔帮主_lambda泛型擦除

网络 1511

目录

1.写在前面

2.什么是JAVA泛型?

3.JAVA泛型擦除

4.Flink中使用Lambda表达式导致泛型擦除

5.如何解决Flink中使用Lambda表达式导致泛型擦除问题

5.1 Flink类型暗示机制,returns方法

5.2 使用匿名内部类,提供更多信息

5.2 实现ResultTypeQueryable接口


1.写在前面

? ? ? ? 最近在重温Flink相关知识点的时候,发现了一个以前没有注意的点,当我们利用Flink的lambda表达式的时候,返回值要类型要给定,不给定的话,就会报错。如下:

could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

由于类型擦除,无法自动确定。您可以使用返回(…)给出类型信息提示方法,或者让函数实现“ResultTypeQueryable”接口。

? ? ? ? 查找资料,发现这个问题是由于lambda表达式导致的类型擦除导致的。

2.什么是JAVA泛型?

????????泛型在java中有很重要的地位,在面向对象编程及各种设计模式中有非常广泛的应用。什么是泛型?为什么要使用泛型?泛型,即“参数化类型”。一提到参数,最熟悉的就是定义方法时有形参,然后调用此方法时传递实参。那么参数化类型怎么理解呢? 顾名思义,就是将类型由原来的具体的类型参数化,类似于方法中的变量参数,此时类型也定义成参数形式(可以称之为类型形参), 然后在使用/调用时传入具体的类型(类型实参)。 泛型的本质是为了参数化类型(在不创建新的类型的情况下,通过泛型指定的不同类型来控制形参具体限制的类型)。也就是说在泛型使用过程中, 操作的数据类型被指定为一个参数,这种参数类型可以用在类、接口和方法中,分别被称为泛型类、泛型接口、泛型方法。

????????泛型的使用方式是利用<>,在<>内添加类型。比如

List<String> arrayList = new ArrayList<String>();

????????我们思考一下,如果没有泛型这个工具,在写代码的过程中我们可能会遇到什么问题?举一个非常常见的例子。

public class Test { public static void main(String[] args) { List arrayList = new ArrayList(); arrayList.add("hello"); arrayList.add(100); // 遍历打印输出到控制台 arrayList.forEach(item -> { Object item1 = item; System.out.println((String) item1); }); } }

????????报错内容如下:

hello Exception in thread "main" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String ?? ?at com.chapter02.Test.lambda$main$0(Test.java:19) ?? ?at java.util.ArrayList.forEach(ArrayList.java:1249) ?? ?at com.atguigu.chapter02.Test.main(Test.java:17)

????????因为我们没有指定泛型的类型,所以在List中可以存放任意类型的数据。上述代码先在List中添加了一个String类型的数据,后添加了一个Integer类型的数据,编译器不会提示任何错误,但运行时却报错了。这是因为List以第一次添加的数据类型为准,即以String的方式使用,后面再添加Integer类型的数据,程序就崩溃了。为了在编译阶段解决类似的问题,我们可以在代码中执行泛型的类型:

List<String> arrayList = new ArrayList<String>(); //arrayList.add(100); 在编译阶段,编译器提示错误 3.JAVA泛型擦除

? ? ? ? 我们继续看第二个例子:

List<String> stringArrayList = new ArrayList<String>(); List<Integer> integerArrayList = new ArrayList<Integer>(); Class classStringArrayList = stringArrayList.getClass(); Class classIntegerArrayList = integerArrayList.getClass(); System.out.println(classStringArrayList==classIntegerArrayList); // 输出结果:true

????????通过上面的例子可以证明,在编译之后程序会采取去泛型化的措施。也就是说Java中的泛型,只在编译阶段有效。在编译过程中,正确检验泛型结果后,在运行时会将泛型的相关信息擦出,编译器只会在对象进入JVM和离开JVM的边界处添加类型检查和转换的方法,泛型的信息不会进入到运行时阶段,这就是所谓的Java类型擦除。

????????泛型擦除有两种方式,Java使用的是第一种方式,C++和C#使用的是第二种方式

方式一:Code sharing。对同一个原始类型下的泛型类型只生成同一份目标代码方式二:Code specialization。对每一个泛型类型都生成不同的目标代码。

????????它们也分别俗称“假”泛型和“真”泛型。导致程序在运行时对泛型类型没有感知,所以上述例子一的代码反编译后只剩下了List,实际上都是Class<? extends ArrayList>的比较,导致例2输出的true。为什么Java要采用Code sharing机制进行类型擦除呢?有两点原因:一是Java泛型是到1.5版本才出现的特性,在此之前JVM已经在无泛型的条件下经历了较长时间的发展,如果采用Code specialization,就得对JVM的类型系统做伤筋动骨的改动,并且无法保证向前兼容性。二是Code specialization对每个泛型类型都生成不同的目标代码,如果有10个不同泛型的List,就要生成10份字节码,造成代码膨胀。

????????Java的泛型被很多人诟病称为“伪泛型”,也是因为类型擦除这个原因,泛型在Java中就是属于语法糖;在Java中JVM虚拟机层面并不存在泛型的概念,Java在编译阶段把泛型的类型参数给擦除掉了,在运行阶段并没有泛型的概念;

public class Data<T> { private T obj; public T getObj() { return obj; } public void setObj(T obj) { this.obj = obj; } }

????????如上类,在经过Java编译成为class文件后其中的类型参数T将被擦除,字段obj变成了Object类型,两个get、set方法中的T也都换成了Object类型;

4.Flink中使用Lambda表达式导致泛型擦除

? ? ? ? 在Flink中也会经常使用Lambda表达式来简化我们的代码,我们看下面一段代码:

public static void main(String[] args) throws Exception { // 1. 创建执行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本) DataSource<String> lineDS = env.readTextFile("input/words.txt"); // 3. 转换数据格式 FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS .flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }); // 4. 按照 word 进行分组 UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0); // 5. 分组内聚合统计 AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1); // 6. 打印结果 sum.print(); }

? ? ? ? 上面是非常简单的wordCount代码样例,如果直接执行的话,会爆出错误,即我们开头指出的问题。

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(BatchWordCount.java:28)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

????????意思是说Tuple2中的参数类型缺失,这很可能是因为lambda表达式不能提供足够的信息,使得无法自动检测出Tuple2中的参数类型。建议使用returns方法或实现ResultTypeQueryable接口。

????????上面介绍了在Java中会对泛型信息进行类型参数擦除,但在这里为啥使用匿名内部类实现FlatMapFunction时却还是可以获取得到泛型参数?其实Java中编译时的泛型类型擦除并不是把所以泛型相关的信息全部擦干干净净,Javac编译时擦除的只是结构化之外(程序执行流)的信息这部分信息存储在字节码的Code属性中,类、字段、方法的泛型类型参数元数据都会被保留下来,这些存储在Signature属性中;可通过反射得到相关的泛型参数信息;

s.flatMap(new FlatMapFunction<String, Integer>() { @Override public void flatMap(String value, List<Integer> out) { System.out.println("stu"); } });

????????而lambda表达式实现FlatMapFunction却获取不到泛型参数,是的。匿名内部类会编译成相关的类字节码存储在class文件中,而lambda表达式却也只是Java的语法糖并不会存在相关的类字节码,只会在lambda表达式运行时调用invokedynamic指令执行逻辑。lambda表达式丢失了更多的类型信息,也就导致了使用lambda表达式获取不到泛型类型参数;

s.flatMap((FlatMapFunction<String, Integer>) (value, out) -> System.out.println("stu"));

5.如何解决Flink中使用Lambda表达式导致泛型擦除问题

? ? ? ?那么如何解决由于使用Lambda表达式导致的泛型擦除问题呢?其实上面异常信息已经说得非常清楚了,调用returns方法或实现ResultTypeQueryable接口,这里就简单说这两种用法;

5.1 Flink类型暗示机制,returns方法

????????调用该方法的用法也比较简单,就是返回的Collector需要哪个泛型类型参数你就调用returns方法注册哪种类型,调用returns方法一定是要在某个算子之后紧接着第一个调用,简单理解就是未某个算子注册返回类型;

// 3. 转换数据格式 FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS .flatMap((String line, Collector<Tuple2<String, Long>> out) -> { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } }) .returns(Types.TUPLE(Types.STRING, Types.LONG)); //当Lambda表达式使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示的声明类型信息 5.2 使用匿名内部类,提供更多信息 // 3. 转换数据格式 FlatMapOperator<String, Tuple2<String, Long>> word_1 = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(Tuple2.of(word, 1L)); } } }); 5.2 实现ResultTypeQueryable接口

????????实现此接口就可以告诉系统此算子的返回值类型,实现了此接口的优先级最高,不会再通过反射去获取返回值类型。还可以根据类型参数的不同使用不同的返回值类型;实现此接口可定制化程度很高、灵活。Flink kafka相关的连接器中就是用了这种模式。

public class FlatFun implements ResultTypeQueryable<String>, FlatMapFunction<Integer, String> { @Override public TypeInformation getProducedType() { return TypeInformation.of(String.class); } @Override public void flatMap(Integer value, Collector<String> out) { out.collect(String.valueOf(value)); System.out.println("flatFun"); } } stream.flatMap(new FlatFun()) .print();

????????另外,对于确定的数据类型(即没有泛型的数据类型),可以随意在flink中使用lambda表达式。例如:?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream = env.fromCollection(Arrays.asList("hello", "world", "flink", "hello", "flink")); DataStream<String> mapDataStream = dataStream.map(word -> word+"_1"); mapDataStream.print(); env.execute();

????????上述代码就正常执行。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #lambda泛型擦除 #1写在前面 #如下could #not