开发HIVE的UDTF自定义函数

[Author]: kwu

UDTF(User-Defined Table-Generating Functions) 用来解决 输入一行输出多行(On-to-many maping) 的需求,开发HIVE的UDTF自定义函数具体步骤如下:

1、继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。2、UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。3、初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。4、最后close()方法调用,对需要清理的方法进行清理。5、代码实例,实现的功能比较简单,首先按 "\001" 切分,,再处理字符串,其中涉及对JSON的处理

package com.hexun.udtf;import java.util.ArrayList;import net.sf.json.JSON;import net.sf.json.JSONSerializer;import org.apache.commons.beanutils.PropertyUtils;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;public class UDTFDratio extends GenericUDTF {public void close() throws HiveException {}// 返回UDTF的处理行的信息(个数,类型)。public StructObjectInspector initialize(ObjectInspector[] args)throws UDFArgumentException {if (args.length != 1) {throw new UDFArgumentLengthException("ExplodeMap takes only one argument");}if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("ExplodeMap takes string as a parameter");}ArrayList<String> fieldNames = new ArrayList<String>();ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();fieldNames.add("col1");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col2");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col3");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col4");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col5");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col6");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col7");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col8");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}// 对传入的参数进行处理,可以通过forword()方法返回结果public void process(Object[] args) throws HiveException {String input = args[0].toString();String[] splited = input.split("\001");String[] result = new String[8];for (int i = 0; i < splited.length; i++) {if (i == 0) {String head = splited[i];String userId = head.substring(0, head.indexOf("_"));String cookieId = head.substring(head.indexOf("_") + 1);result[0] = userId;result[1] = cookieId;} else {String json = splited[i];JSON jo = JSONSerializer.toJSON(json);Object o = JSONSerializer.toJava(jo);try{String sex = PropertyUtils.getProperty(o, "sex").toString();result[2] = sex;String age = PropertyUtils.getProperty(o, "age").toString();result[3] = age;String ppt = PropertyUtils.getProperty(o, "ppt").toString();result[4] = ppt;String degree = PropertyUtils.getProperty(o, "degree").toString();result[5] = degree;String favor = PropertyUtils.getProperty(o, "favor").toString();result[6] = favor;String commercial = PropertyUtils.getProperty(o, "commercial").toString();result[7] = commercial;}catch(Exception e){e.printStackTrace();}}}forward(result);}}示例代码涉及的JAR包

6、hive命令行操作,引入UDTF前,需要先加入JSON的依赖包

add jar /opt/softwares/lib/commons-beanutils-1.7.0.jar;add jar /opt/softwares/lib/commons-collections-3.2.jar;add jar /opt/softwares/lib/commons-lang-2.4.jar;add jar /opt/softwares/lib/commons-logging-1.1.3.jar;add jar /opt/softwares/lib/ezmorph-1.0.3.jar;add jar /opt/softwares/lib/json-lib-2.2.3-jdk15.jar;add jar /opt/softwares/UDF.jar;create temporary function explode_map3 as 'com.hexun.udtf.UDTFDratio';insert into table stage.dratio PARTITION (day='${yesterday}') select explode_map3(datadratio) as (col1,col2,col3,col4,col5,col6,col7,col8) from stage.dratio_tmp;

不敢接受失败的人,往往是那些追求完美的人,

开发HIVE的UDTF自定义函数

相关文章:

你感兴趣的文章:

标签云: