irpas技术客

PDI(kettle) Java代码组件应用案例_果果的小莴笋_kettle调用java代码组件

未知 3336

1 概述

Java代码步骤,位于Kettle转换的核心对象/脚本类别中,属于典型的需要编程基础才能掌控的步骤类型。而Java代码步骤,适用于熟悉Java语言的开发人员,用好这个步骤,需要对类、接口、多线程等语言相关知识有所掌握,并且需要对Kettle的基础框架有所理解。

2 主要方法说明 2.1 初始化

PDI转换在执行前,会有一个各步骤的初始化动作,为步骤执行前的准备工作创造机会。为提高初始化的性能,Kettle为每个步骤启用一个初始化线程,从而并行完成所有步骤的初始化。初始化的主要内容就是调用一次步骤的以下方法:

public boolean init( StepMetaInterface meta, StepDataInterfacedata)

此方法包含两个参数。其中,meta为元数据,data为数据。如果返回true,那么代表初始化成功,否则代表初始化失败。任何一个步骤初始化失败,都会导致整个转换停止执行。

2.2 执行

执行阶段是每一个步骤实现特定价值的时候。为提高效率,Kettle为每一个步骤单独启动一个线程来执行任务。线程会一直紧密循环调用步骤的processRow()方法,该方法是一步的心脏,将持续到返回 false 为止。 方法申明如下:

public boolean processRow( StepMetaInterface meta,StepDataInterfacedata ) throws KettleException;

从输入行集中取数据可以调用getRow方法。如果getRow方法返回值不为null,则步骤应将该行数据进行处理,并调用putRow方法将处理结果存入输出行集,然后返回true,以继续为下一行输入数据处理提供机会。如果getRow方法返回null,代表输入行集已经处理完毕,这时可以调用setOutputDone,标识本步骤执行完毕,并返回false,以结束本工作线程的执行。

2.3 释放资源

不管工作线程是正常执行完毕还是异常执行完毕,最终会调用dispose方法。该方法声明如下:

public void dispose( StepMetaInterface meta, StepDataInterfacedata);

一般情况下重写processRow方法即可满足需求,如果用到了一些重量级的资源,最好在init方法中初始化,并在dispose方法中释放。

3 案例分享 3.1 使用Java获取数据库中表的数据

第一个组件:增加常量,我的这里一个表面,其他的都写在了Java代码里面 第二个组件:Java代码

import java.sql.*; long c = 0l; String a = ""; String b = ""; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } if (first) { first = false; } r = createOutputRow(r, data.outputRowMeta.size()); //数据库信息,简单做个例子直接写死了 String urlString = "jdbc:oracle:thin:@ip:port:sid"; String driverName = "oracle.jdbc.driver.OracleDriver"; String usernameString = "username"; String passwString = "pwd"; PreparedStatement pst = null; ResultSet rs = null; String table = get(Fields.In, "table").getString(r); try { Class.forName(driverName); Connection dbconConnection = DriverManager.getConnection(urlString,usernameString,passwString); String sqlString = "SELECT * FROM " + table; pst = dbconConnection.prepareStatement(sqlString); rs = pst.executeQuery(); while (rs.next()) { String aa = rs.getString("a"); String bb = rs.getString("b"); get(Fields.Out, "a").setValue(r, aa); get(Fields.Out, "b").setValue(r, bb); get(Fields.Out, "c").setValue(r, ++ c); putRow(data.outputRowMeta, r); } } catch (Exception e) { e.printStackTrace(); } finally { if(rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } } if(pst != null) { try { pst.close(); } catch (SQLException e) { e.printStackTrace(); } } if(myConnection != null ) { try { myConnection.close(); } catch (SQLException e) { e.printStackTrace(); } } return true; }

第三个组件:表输出, 输出到一个数据库表,表结构可以通过获取字段直接拿到 大致内容就是这样。

3.2 Java代码组件内使用外部jar 有时候代码比较多, 逻辑比较复杂,直接在PDI里面写的话很不方便,而且调试也麻烦。这时候我们可以直接在Java开发工具里面写好,导出成一个jar,然后放到PDI的安装目录的lib下面,就可以直接引用了。

Java组件内代码

import com.controller.CheckIdcard; import com.dao.ErrorResult; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { if (first) { first = false; } Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } r = createOutputRow(r, data.outputRowMeta.size()); String idCard = get(Fields.In, "a").getString(r); ErrorResult checkInfo = CheckIdcard.isNormal(idCard, "a"); if("身份证校验合格!".equals(checkInfo.getErrorDescribe())) { putRow(data.outputRowMeta, r); } else { //错误处理的代码,Java步骤连接下一步骤的时候需要选择错误处理时需要用到 putError(data.outputRowMeta, r, checkInfo.getErrorNum(), checkInfo.getErrorDescribe(), checkInfo.getErrorColumn(), checkInfo.getErrorCode()); } return true; }

Java外部代码: ErrorResult.java

package com.dao; public class ErrorResult { //错误列 private String errorColumn; //错误代码 private String errorCode; //错误行数 private int errorNum; //错误描述 private String errorDescribe; public ErrorResult() {}; public ErrorResult(String errorColumn, String errorCode, int errorNum, String errorDescribe) { this.errorColumn = errorColumn; this.errorCode = errorCode; this.errorNum = errorNum; this.errorDescribe = errorDescribe; } public String toString () { return "errorColumn:" + this.errorColumn + ",errorCode:" + this.errorCode + ",errorNum:" + this.errorNum + ",errorDescribe:" + this.errorDescribe; } public String getErrorColumn() { return errorColumn; } public void setErrorColumn(String errorColumn) { this.errorColumn = errorColumn; } public String getErrorCode() { return errorCode; } public void setErrorCode(String errorCode) { this.errorCode = errorCode; } public int getErrorNum() { return errorNum; } public void setErrorNum(int errorNum) { this.errorNum = errorNum; } public String getErrorDescribe() { return errorDescribe; } public void setErrorDescribe(String errorDescribe) { this.errorDescribe = errorDescribe; } }

CheckIdcard.java

package com.controller; import com.dao.ErrorResult; /** * 校验公民身份证号码是否有误 * @author 果果的小莴笋 * */ public class CheckIdcard { public static ErrorResult isNormal(String idCard, String column) { int errorNum = 0; ErrorResult er = new ErrorResult(); if (idCard == null || "".equals(idCard)) { er.setErrorCode("NOT_ALLOW_IDCARD"); er.setErrorColumn(column); er.setErrorDescribe("身份证号码为空!"); er.setErrorNum(++ errorNum); return er; } // 定义判别用户身份证号的正则表达式(15位或者18位,最后一位可以为字母) String reg = "(^[1-9]\\d{5}(19|([23]\\d))\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{3}[0-9Xx]$)|(^[1-9]\\d{5}\\d{2}((0[1-9])|(10|11|12))(([0-2][1-9])|10|20|30|31)\\d{2}$)"; //判断是否符合正则校验 boolean isOK = idCard.matches(reg); if(isOK) { if (idCard.length() == 18) { try { char[] charArray = idCard.toCharArray(); // 前十七位加权因子 int[] idCardWi = {7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2}; // 这是除以11后,可能产生的11位余数对应的验证码 String[] idCardY = {"1", "0", "X", "9", "8", "7", "6", "5", "4", "3", "2"}; int sum = 0; for (int i = 0; i < idCardWi.length; i++) { int current = Integer.parseInt(String.valueOf(charArray[i])); int count = current * idCardWi[i]; sum += count; } char idCardLast = charArray[17]; int idCardMod = sum % 11; if (idCardY[idCardMod].toUpperCase().equals(String.valueOf(idCardLast).toUpperCase())) { er.setErrorCode(""); er.setErrorColumn(column); er.setErrorDescribe("身份证校验合格!"); er.setErrorNum(errorNum); return er; } else { // System.out.println("身份证最后一位:" + String.valueOf(idCardLast).toUpperCase() // + "错误,正确的应该是:" + idCardY[idCardMod].toUpperCase()); er.setErrorCode("NOT_ALLOW_IDCARD"); er.setErrorColumn(column); er.setErrorDescribe("身份证号码校验失败,请检查!"); er.setErrorNum(++ errorNum); return er; } } catch (Exception e) { e.printStackTrace(); er.setErrorCode("NOT_ALLOW_IDCARD"); er.setErrorColumn(column); er.setErrorDescribe("身份证号码校验失败,请检查!"); er.setErrorNum(++ errorNum); return er; } } else if(idCard.length() == 15) { er.setErrorCode(""); er.setErrorColumn(column); er.setErrorDescribe("身份证校验合格!"); er.setErrorNum(errorNum); return er; } else { er.setErrorCode("NOT_ALLOW_IDCARD"); er.setErrorColumn(column); er.setErrorDescribe("身份证号码校验失败,请检查!"); er.setErrorNum(++ errorNum); return er; } } else { er.setErrorCode("NOT_ALLOW_IDCARD"); er.setErrorColumn(column); er.setErrorDescribe("身份证号码校验失败,请检查!"); er.setErrorNum(++ errorNum); return er; } } public static void main(String[] args) { ErrorResult er = isNormal("36243088052422", "c"); System.out.println(er.toString()); } } 3.3 Java组件调用本转换已有的数据库连接

Java代码组件内容大致如下:

import java.sql.*; import org.pentaho.di.core.database.*; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } //获取数据库名和表名 String table = get(Fields.In, "table").getString(r); String dbName = get(Fields.In, "dbName").getString(r); //数据库连接 Database database=null; DatabaseMeta databaseMeta=null; try { databaseMeta = getTransMeta().findDatabase(dbName); if (databaseMeta == null) { logError("A connection with name "+dbName+" could not be found!"); setErrors(1); return false; } database = new Database(getTrans(), databaseMeta); database.connect(); } catch(Exception e) { logError("Connecting to database "+dbName+" failed.", e); setErrors(1); return false; } //查询表数据 String sql="select a, b from " + table; ResultSet resultSet; try { resultSet = database.openQuery(sql); Object[] idxRow = database.getRow(resultSet); RowMetaInterface idxRowMeta =null; if(idxRow!=null){ idxRowMeta=database.getReturnRowMeta(); } int i=0; while(idxRow!=null){ r = createOutputRow(r, data.outputRowMeta.size()); int index = getInputRowMeta().size(); r[index++] = idxRowMeta.getString(idxRow, "a", null); r[index++] = idxRowMeta.getString(idxRow, "b", null); putRow(data.outputRowMeta, r); idxRow = database.getRow(resultSet); i++; } } catch(Exception e) { throw new KettleException(e); } //释放连接, 这个应该可以不写, 每个组件最后都会自动执行dispose释放资源 if (database!=null) { database.disconnect(); database.closeQuery(resultSet); } return true; }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #1 #2 #主要方法说明21