Hive:创建UDF

Hive的内置函数有可能无法满足你的查询需求,这时候就需要自定义函数来处理数据.本文介绍UDF类别,并创建一个简单的UDF示例.

三种UDF类型

UDF (user-defined function) : 作用于单个数据行,且产生一个数据行作为输出.大多数函数(例如数学函数和字符串函数)都属于这一类.
UDAF(user-defined aggregate function): 接受多个输入数据行,并产生一个输出数据行.像COUNT和MAX这昂的函数都是聚集函数
UDTF(user-defined table-generating function):操作作用于单个数据行,且产生多个数据行(一个表)作为输出.

一个UDF必须满足下面两个条件:

  • 一个UDF必须是 org.apache.hadoop.hive.ql.exec.UDF的子类
  • 一个UDF必须至少实现了evaluate()方法

创建简单UDF

代码

package com.bihell.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public final class Lower extends UDF {
    public Text evaluate(final Text s) {
        if (s == null) {
            return null;
        }
        return new Text(s.toString().toLowerCase());
    }

}

创建Schema

CREATE TABLE IF NOT EXISTS authors( name STRING, book STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

导入数据

LOAD DATA LOCAL INPATH ‘authors.txt’ INTO TABLE authors;

加载Jar包

add jar my_lower.jar

创建函数

CREATE TEMPORARY FUNCTION my_lower AS ‘com.bihell.hive.udf.Lower’;

运行函数

select my_lower(name) from authors;

UDAF示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Created by master on 2016/4/27.
*/
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;

public class Maximum extends UDAF {

public static class MaximumIntUDAFEvaluator implements UDAFEvaluator {

private IntWritable result;

public void init() {
System.err.printf("%s %s\n", hashCode(), "init");
result = null;
}

public boolean iterate(IntWritable value) {
System.err.printf("%s %s %s\n", hashCode(), "iterate", value);
if (value == null) {
return true;
}
if (result == null) {
result = new IntWritable(value.get());
} else {
result.set(Math.max(result.get(), value.get()));
}
return true;
}

public IntWritable terminatePartial() {
System.err.printf("%s %s\n", hashCode(), "terminatePartial");
return result;
}

public boolean merge(IntWritable other) {
System.err.printf("%s %s %s\n", hashCode(), "merge", other);
return iterate(other);
}

public IntWritable terminate() {
System.err.printf("%s %s\n", hashCode(), "terminate");
return result;
}
}
}

UDAF 必须是org.apache.hadoop.hive.ql.exec.UDAF的子类,且包含一个或多个嵌套的,实现了org.apache.hadoop.hive.ql.exec.UDAFEvaluator的静态类.在这个示例中只有一个嵌套类.MaximumIntUDAFEvaluator.我们也可以添加更多的计算函数(如MaximunLongUDAFEvaluator和MaximumFloatUDAFEvaluator)来提供长整型、浮点型等类型数的最大值的UDAF的重载。

必须实现下面5个方法:

  • init()方法 负责初始化计算函数并重设它的内部状态. 在MaximumIntUDAFevaluator中我们把最终结果的IntWritable对象设为null. 我们使用null来表示目前还没有对很合值进行聚集计算,这和对空集NULL计算最大值有的结果是一致的.
  • iterate() 方法 每次对一个新值进行聚集计算时都会调用iterate() 方法. 计算函数要根据聚集计算的结果更新其内部状态.iterate()接受的参数和Hive中被调用的函数的参数是对应的.在这个示例中,只有一个参数.方法首先检测参数值是否为null,如果是,则将其忽略.否则,resul变量实例就被设为value的整数值(如果这是方法第一次接受输入),或设为当前值和value值中的较大值(如果已经接受一些值).如果输入值合法我们就放方法返回ture.
  • terminatePartial() 方法 Hive需要部分聚集结果时会调用terminatePartial()方法.这个方法必须返回一个封装了聚集计算当前状态的对象.在这里,因为只需要对已知的最大值或在没有值时的控制null进行封装,所以使用一个IntWritable即可.
  • merge() 方法 在Hive决定要合并一个部分聚集值和另一个部分聚集值会调用merge()方法.该方法接受一个对象作为输入.这个对象类型必须和terminatePartial()方法的返回类型一致.在示例里merge()方法可以直接使用iterate()方法,因为部分结果的聚集和原始值的聚集的表达方法是相同的.但一般情况下不能这样做,这个方法实现的逻辑会合并计算函数和部分聚集的状态.
  • terminate() 方法 Hive需要最终聚集结果时会调用terminate()方法,计算函数需要把状态作为一个值返回.在这里,我们返回实例变量result

执行我们的函数

create temporary function maximum as ‘com.bihell.hive.Maximum’;
select maxinum(temperature) from records;

创建永久函数

创建临时函数每次导入声明比较麻烦,现在让我们创建永久函数

1.把JAR文件拷贝到HDFS并且确保hive用户可以访问这个JAR文件。
2.把JAR文件拷贝到HiveServer2运行的主机上,放到一个hive用户可以读取,写入和执行的目录里面(比如/opt/local/hive/lib/)。
3.进入Cloudera Manager管理控制台,选择Hive服务。
4.点击Configuration标签。
5.展开Service-Wide > Advanced目录。
6.配置Hive Auxiliary JARs Directory属性,值为步骤2的/opt/local/hive/lib/路径。这个这个属性会覆盖hive.aux.jars.path,即使它在HiveServer2 advanced 配置里面也一样会被覆盖。
7.点击Save ChangesJAR文件就被添加到HIVE_AUX_JARS_PATH变量里面了。
8.推送配置文件
 a.在Cloudera Manager管理控制台,进入Hive服务。
 b.在Actions菜单选择Deploy Client Configuration
 c.点击Deploy Client Configuration
9.重启Hive服务,如果Hive Auxiliary JARs Directory属性设置了,但是实际目录不存在,HiveServer2会自动失败。
10.如果启用了Sentry,需要给role赋予特权。以hive用户使用Beeline并且用过Hive SQL GRANT语句做以下操作:

1
GRANT ALL ON URI 'file:///opt/local/hive/lib/my.jar' TO ROLE EXAMPLE_ROLE

HDFS上面的JAR文件也需要授权

1
GRANT ALL ON URI 'hdfs:///path/to/jar' TO ROLE EXAMPLE_ROLE

11.运行CREATE FUNCTION来创建函数

1
CREATE FUNCTION addfunc AS 'com.example.hiveserver2.udf.add' USING JAR 'hdfs:///path/to/jar'

参考

http://mvnrepository.com/
Using the CDH 5 Maven Repository
Hadoop- The Definitive Guide, 4th Edition
User-Defined Functions (UDFs) with HiveServer2 Using Cloudera Manager

打赏支持:如果你觉得我的文章对你有所帮助,可以打赏我哟。