Hive通用型自定义聚合函数(UDAF)

计算器必须实现的方法:

1、getNewAggregationBuffer():返回存储临时聚合结果的AggregationBuffer对象。

2、reset(AggregationBuffer agg):重置聚合结果对象,以支持mapper和reducer的重用。

3、iterate(AggregationBuffer agg,Object[] parameters):迭代处理原始数据parameters并保存到agg中。

4、terminatePartial(AggregationBuffer agg):以持久化的方式返回agg表示的部分聚合结果,这里的持久化意味着返回值只能Java基础类型、数组、基础类型包装器、Hadoop的Writables、Lists和Maps。

5、merge(AggregationBuffer agg,Object partial):合并由partial表示的部分聚合结果到agg中。

6、terminate(AggregationBuffer agg):返回最终结果。

通常还需要覆盖初始化方法ObjectInspector init(Mode m,ObjectInspector[] parameters),需要注意的是,在不同的模式下parameters的含义是不同的,,比如m为 PARTIAL1 和 COMPLETE 时,parameters为原始数据;m为 PARTIAL2 和 FINAL 时,parameters仅为部分聚合数据(只有一个元素)。在 PARTIAL1 和 PARTIAL2 模式下,ObjectInspector 用于terminatePartial方法的返回值,在FINAL和COMPLETE模式下ObjectInspector 用于terminate方法的返回值。

下面实现一个计算器,按分组中元素的出现次数降序排序,并将每个元素的在分组中的出现次数也一起返回,格式为:

[data1, num1, data2, num2, …]

public static class CollectListUDAFEvaluator extends GenericUDAFEvaluator {protected PrimitiveObjectInspector inputKeyOI;protected StandardListObjectInspector loi;protected StandardListObjectInspector internalMergeOI;@Overridepublic ObjectInspector init(Mode m, ObjectInspector[] parameters)throws HiveException {super.init(m, parameters);if (m == Mode.PARTIAL1) {inputKeyOI = (PrimitiveObjectInspector) parameters[0];return ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI));} else {if ( parameters[0] instanceof StandardListObjectInspector ) {internalMergeOI = (StandardListObjectInspector) parameters[0];inputKeyOI = (PrimitiveObjectInspector) internalMergeOI.getListElementObjectInspector();loi = (StandardListObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(internalMergeOI);return loi;} else {inputKeyOI = (PrimitiveObjectInspector) parameters[0];return ObjectInspectorFactory.getStandardListObjectInspector(ObjectInspectorUtils.getStandardObjectInspector(inputKeyOI));}}}static class MkListAggregationBuffer implements AggregationBuffer {List<Object> container = Lists.newArrayList();}@Overridepublic void reset(AggregationBuffer agg) throws HiveException {((MkListAggregationBuffer) agg).container.clear();}@Overridepublic AggregationBuffer getNewAggregationBuffer() throws HiveException {MkListAggregationBuffer ret = new MkListAggregationBuffer();return ret;}@Overridepublic void iterate(AggregationBuffer agg, Object[] parameters)throws HiveException {if(parameters == null || parameters.length != 1){return;}Object key = parameters[0];if (key != null) {MkListAggregationBuffer myagg = (MkListAggregationBuffer) agg;putIntoList(key, myagg.container);}}private void putIntoList(Object key, List<Object> container) {Object pCopy = ObjectInspectorUtils.copyToStandardObject(key, this.inputKeyOI);container.add(pCopy);}@Overridepublic Object terminatePartial(AggregationBuffer agg)throws HiveException {MkListAggregationBuffer myagg = (MkListAggregationBuffer) agg;List<Object> ret = Lists.newArrayList(myagg.container);return ret;}@Overridepublic void merge(AggregationBuffer agg, Object partial)throws HiveException {if(partial == null){return;}MkListAggregationBuffer myagg = (MkListAggregationBuffer) agg;List<Object> partialResult = (List<Object>) internalMergeOI.getList(partial);for (Object ob: partialResult) {putIntoList(ob, myagg.container);}return;}@Overridepublic Object terminate(AggregationBuffer agg) throws HiveException {MkListAggregationBuffer myagg = (MkListAggregationBuffer) agg;Map<Text, Integer> map = Maps.newHashMap();for (int i = 0; i< myagg.container.size() ; i++){Text key = (Text) myagg.container.get(i);if (map.containsKey(key)) {map.put(key, map.get(key) + 1);}else{map.put(key, 1);}}List<Map.Entry<Text, Integer>> listData = Lists.newArrayList(map.entrySet());Collections.sort(listData, new Comparator<Map.Entry<Text, Integer>>() {public int compare(Map.Entry<Text, Integer> o1, Map.Entry<Text, Integer> o2) {if (o1.getValue() < o2.getValue())return 1;else if (o1.getValue() == o2.getValue())return 0;elsereturn -1;}});List<Object> ret = Lists.newArrayList();for(Map.Entry<Text, Integer> entry : listData){ret.add(entry.getKey());ret.add(new Text(entry.getValue().toString()));}return ret;}}

接受失败等于回归真实的自我,接受失败等于打破完美的面具,

Hive通用型自定义聚合函数(UDAF)

相关文章:

你感兴趣的文章:

标签云: