1、前言 1.1、JavaCC
????????JavaCC(Java Compiler Compiler)是一个开源的语法分析器生成器和词法分析器生成器。JavaCC通过词法和语法描述文件来生成分析器。
????????flink通过java CC生成分析器用于sql解析和校验。
如下图:在flink-table下的flink-sql-parser项目中,org.apache.flink.sql.parser.impl下的类,就是使用javacc生成的。
1.2、Calcite????????Apache Calcite是一个动态数据管理框架 ,它具备很多典型数据库管理系统的功能,如SQL解析、SQL校验、SQL查询优化等,又省略了一些功能,如不存储相关数据,也不完全包含相关处理数据等。
????????flink中的sql解析、sql校验和sql优化便是依赖calcite来完成的。
????????梳理一下Calcite SQL执行的几个阶段:
通过Parser解析器将传入的sql解析成一颗词法树,SqlNode作为树的节点做词法的校验Validate,类型校验,元数据校验等等将校验好的SqlNode树转换成对应的关系代数表达式,也是一颗树,RelNode作为节点将RelNode关系代数表达式树,通过内置的两种优化器Volcano , Hep 优化关系代数表达式得到最优逻辑代数的一颗树,也是RelNode最优的逻辑代数表达式(RelNode),会被转换成对应的可执行的物理执行计划(转换逻辑根据框架有所不同),像Flink就转成他的Operator去运行 2、Flink SQL转换流程????????SQL语句经过Calcite解析生成抽象语法树SQLNode,基于生成的SQLNode并结合flink Catalog完成校验生成一颗Operation树,接下来blink planner将Operation树,接下来blink planner将Opearation树转为RelNode然后进行优化,最后生成Transformation变成流计算任务。
2.1、Sql语句解析成语法树阶段(SQL - > SqlNode)????????TableEnvironmentImpl是sql执行的入口类,TableEnvironmentImpl中提供了executeSql,sqlQuery等方法用来执行DDL和DML等sql,sql执行时会对sql进行解析,ParserImpl是flink调用sql解析的实现类,ParserImpl#parse()方法中通过调用包装器对象CalciteParser#parse()方法并创建并调用使用javacc生成的sql解析器(FlinkSqlParserImpl)中的parseSqlStmtEof方法完成sql解析,并返回SqlNode对象
? ? ? ? 核心代码如下:
public List<Operation> parse(String statement) { CalciteParser parser = calciteParserSupplier.get(); FlinkPlannerImpl planner = validatorSupplier.get(); //TODO 在这里调用使用javacc生成的分析器,将sql语句解析成sqlNode SqlNode parsed = parser.parse(statement); //TODO 将sqlNode转换为Operation Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed) .orElseThrow(() -> new TableException("Unsupported query: " + statement)); return Collections.singletonList(operation); }? ? ? ? 其中parser.parse(...)方法,将sql语句解析成sqlNode。对应的表名、列名、with属性参数、主键、唯一键、分区键、水印、表注释、表操作(create table、alter table、drop table。。。)都放到SqlNode对象的对应属性中,SqlNode是一个树形结构也就是AST。如下:
2.2、Sql校验(SqlNode - > Operation)????????sql解析完成后执行sql校验,flink sql中增加了SqlNode转换为Operation的过程,sql校验是在这个过程中完成。在SqlToOperationConverter#convert()方法中完成这个过程的转换,之间会通过FlinkPlannerInpm#validate()方法对表、函数、字段等完成校验并基于生成的validated SqlNode生成对应的Opeation。
?????????不同的sql经过convert处理后返回不同的Operation,最后会根据不同的Operation有不同的处理行为。
? ? ? ? ?其中
2.3、Flink SQL优化(Operation - > RelNode->Transformation?)????????Blink中并没有直接使用Calcite的优化器,而是通过规则组合和Calcite优化组合分别为batch和stream实现了自定义的优化器。 ????????优化执行前会先将SqlNode转为RelNode,基于RelNode调用PlannerBase#optimize()并执行StreamCommonSubGraphBasedOptimizer#doOptimize()方法完成优化
? ? ? ? 在完成Sql到RelNode的转换后,会执行executeOperation(...)操作,在这里先将sqlNode转换成RelNode。然后进行优化操作。传入的参数为:
? ? ? ? 然后根据传入的sql语句类型,选择不同的操作。包含有Modify、CreateTable、DropTable等。如下:
? ? ? ? 其实都是调用的TableEnvironmentImpl.executeInternal(...)。
? ? ? ? ?在这里,有进行转换和优化操作,重点是在translate方法中,最终调用的是PlannerBase里的translate(...)方法
override def translate( modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = { if (modifyOperations.isEmpty) { return List.empty[Transformation[_]] } // prepare the execEnv before translating getExecEnv.configure( getTableConfig.getConfiguration, Thread.currentThread().getContextClassLoader) overrideEnvParallelism() // TODO 在这里完成转换 SqlNode转换为RelNode val relNodes = modifyOperations.map(translateToRel) // TODO 在这里完成优化 val optimizedRelNodes = optimize(relNodes) val execNodes = translateToExecNodePlan(optimizedRelNodes) translateToPlan(execNodes) }? ? ? ? 在上述的优化代码行,根据是流处理或者批处理老选择不同的类中的方法进行优化。
????????最终由translateToPlan方法将ExecNode转换成Transfomation列表
? ? ? ? 整体流程大致为:sqlNode --> Operation --> RelNode --> 优化 --> execNode --> Transformation
????????基于生成的Transformation对象调用StreamExecutor#createPipeline()方法生成StreamGraph便可以执行任务了。
????????至此flink sql转换流程便结束了。
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。 |
标签: #Flink #SQL #源码解析 #Compiler #flink通过java