在Apache Spark中使用UDF[译]

用户定义函数(UDF)是大多数SQL环境的一个关键特性,用于扩展系统的内置功能。 UDF允许开发人员通过抽象其低级语言实现来在更高级语言(如SQL)中启用新功能。 Apache Spark也不例外,并且提供了用于将UDF与Spark SQL工作流集成的各种选项。

在这篇博文中,我们将回顾Python,Java和Scala中的Apache Spark UDF和UDAF(用户定义的聚合函数)实现的简单示例。我们还将讨论重要的UDF API功能和集成点,包括它们的可用性。然后,我们将介绍一些在选择在应用程序中利用UDF时应注意的重要性能注意事项。

Spark SQL UDFs

UDF从表中的单个行转换值,以便为每行生成单个对应的输出值。例如,大多数SQL环境提供UPPER函数返回作为输入提供的字符串的大写版本。

自定义函数可以在Spark SQL中定义和注册为UDF,并具有可用于SQL查询的关联别名。作为一个简单的例子,我们将定义一个UDF来将以下JSON数据中的温度从摄氏度转换为华氏度:

{"city":"St. John's","avgHigh":8.7,"avgLow":0.6}
{"city":"Charlottetown","avgHigh":9.7,"avgLow":0.9}
{"city":"Halifax","avgHigh":11.0,"avgLow":1.6}
{"city":"Fredericton","avgHigh":11.2,"avgLow":-0.5}
{"city":"Quebec","avgHigh":9.0,"avgLow":-1.0}
{"city":"Montreal","avgHigh":11.1,"avgLow":1.4}
...

https://github.com/curtishoward/sparkudfexamples/blob/master/data/temperatures.json

下面的示例代码是我们转换UDF,注册名字为CTOF,然后使用SQL查询来转换每个城市的温度。为简洁起见,省略了SQLContext对象和其他代码的创建,不过在每段代码下面都提供了完整的代码链接。

Python

df = sqlContext.read.json("temperatures.json")

df.registerTempTable("citytemps")
# Register the UDF with our SQLContext

sqlContext.registerFunction("CTOF", lambda degreesCelsius: ((degreesCelsius * 9.0 / 5.0) + 32.0))
sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()

https://github.com/curtishoward/sparkudfexamples/tree/master/python-udf

Scala

val df = sqlContext.read.json("temperatures.json")

df.registerTempTable("citytemps")
// Register the UDF with our SQLContext

sqlContext.udf.register("CTOF", (degreesCelcius: Double) => ((degreesCelcius * 9.0 / 5.0) + 32.0))
sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()

https://github.com/curtishoward/sparkudfexamples/tree/master/scala-udf

Java

DataFrame df = sqlContext.read().json("temperatures.json");
df.registerTempTable("citytemps");

// Register the UDF with our SQLContext
sqlContext.udf().register("CTOF", new UDF1<Double, Double>() {
  @Override
  public Double call(Double degreesCelcius) {
    return ((degreesCelcius * 9.0 / 5.0) + 32.0);
  }
}, DataTypes.DoubleType);
sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show();

https://github.com/curtishoward/sparkudfexamples/tree/master/java-udf

请注意,Spark SQL定义了UDF1~UDF22,22个类,支持最多22个输入参数的UDF。上面的例子使用UDF1来处理我们的单个温度值作为输入。没有对Apache Spark源代码的更新,使用数组或结构作为参数对于需要超过22个输入的应用程序可能很有帮助,如果你发现自己用了UDF6或者更高UDF类你也可以尝试这样操作。

Spark SQL UDAF functions

用户定义的聚合函数(UDAF)同时处理多行,返回单个值作为结果,通常与GROUP BY语句(例如COUNT或SUM)一起工作。为了保持这个例子的简单性,我们将实现一个叫SUMPRODUCT的UDAF来计算以库存分组的所有车辆的零售价值,给定一个价格和一个整数数量的库存在以下数据:

{"Make":"Honda","Model":"Pilot","RetailValue":32145.0,"Stock":4}
{"Make":"Honda","Model":"Civic","RetailValue":19575.0,"Stock":11}
{"Make":"Honda","Model":"Ridgeline","RetailValue":42870.0,"Stock":2}
{"Make":"Jeep","Model":"Cherokee","RetailValue":23595.0,"Stock":13}
{"Make":"Jeep","Model":"Wrangler","RetailValue":27895.0,"Stock":4}
{"Make":"Volkswagen","Model":"Passat","RetailValue":22440.0,"Stock":2}

https://github.com/curtishoward/sparkudfexamples/blob/master/data/inventory.json

Apache Spark UDAF definitions are currently supported in Scala and Java by the extending UserDefinedAggregateFunction class. Once defined, we can instantiate and register our SumProductAggregateFunction UDAF object under the alias SUMPRODUCT and make use of it from a SQL query, much in the same way that we did for our CTOF UDF in the previous example.

Apache Spark UDAF的定义在Scala和Java中受支持(扩展UserDefinedAggregateFunction类)。定义好以后我们可以进行实例化并注册SumProductAggregateFunction,然后使用同样的方法调用。

Scala

object ScalaUDAFExample {

  // Define the SparkSQL UDAF logic
  private class SumProductAggregateFunction extends UserDefinedAggregateFunction {
    // Define the UDAF input and result schema's
    def inputSchema: StructType =     // Input  = (Double price, Long quantity)
      new StructType().add("price", DoubleType).add("quantity", LongType)
    def bufferSchema: StructType =    // Output = (Double total)
      new StructType().add("total", DoubleType)
    def dataType: DataType = DoubleType
    def deterministic: Boolean = true // true: our UDAF's output given an input is deterministic

    def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer.update(0, 0.0)           // Initialize the result to 0.0
    }

    def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      val sum   = buffer.getDouble(0) // Intermediate result to be updated
      val price = input.getDouble(0)  // First input parameter
      val qty   = input.getLong(1)    // Second input parameter
      buffer.update(0, sum + (price * qty))   // Update the intermediate result
    }
    // Merge intermediate result sums by adding them
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1.update(0, buffer1.getDouble(0) + buffer2.getDouble(0))
    }
    // THe final result will be contained in 'buffer'
    def evaluate(buffer: Row): Any = {
      buffer.getDouble(0)
    }
  }

  def main (args: Array[String]) {
    val conf       = new SparkConf().setAppName("Scala UDAF Example")
    val sc         = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val testDF = sqlContext.read.json("inventory.json")
    testDF.registerTempTable("inventory") 
    // Register the UDAF with our SQLContext
    sqlContext.udf.register("SUMPRODUCT", new SumProductAggregateFunction)

    sqlContext.sql("SELECT Make, SUMPRODUCT(RetailValue,Stock) AS InventoryValuePerMake FROM inventory GROUP BY Make").show()
  }
}

https://github.com/curtishoward/sparkudfexamples/tree/master/scala-udaf

Hive UDF整合

Spark SQL支持集成现有Hive中的UDF,UDAF和UDTF的(Java或Scala)实现。 注意UDTFs(用户定义的表函数)可以返回多个列和行 - 它们超出了本文的范围,我们以后可能进行说明。集成Hive现有UDF的两一个有意义的方法是重写并注册他们,这对性能的提升也是有帮助的特别是PySpark写的UDF,这将在下一节中讨论。另外一种方式则可以通过使用spark-submit的-jars选项包含包含Hive UDF实现的JAR文件,然后使用 CREATE TEMPORARY FUNCTION声明函数来从HiveContext访问Hive函数,如下所示:

Hive UDF definition in Java

package com.cloudera.fce.curtis.sparkudfexamples.hiveudf;

import org.apache.hadoop.hive.ql.exec.UDF;

public class CTOF extends UDF {
  public Double evaluate(Double degreesCelsius) {
    return ((degreesCelsius * 9.0 / 5.0) + 32.0);
  }
}

https://github.com/curtishoward/sparkudfexamples/tree/master/hive-udf

Hive UDF access from Python

df= sqlContext.read.json("temperatures.json")
df.registerTempTable("citytemps")

# Register our Hive UDF
sqlContext.sql("CREATE TEMPORARY FUNCTION CTOF AS 'com.cloudera.fce.curtis.sparkudfexamples.hiveudf.CTOF'")

sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()

https://github.com/curtishoward/sparkudfexamples/tree/master/hive-udf

请注意,Hive UDF只能使用Apache Spark的SQL查询语言来调用 - 换句话说,它们不能与Dataframe API的领域特定语言(DSL)一起使用,就像我们在上面示例中实现的UDF和UDAF函数。

另外通过包含实现jar文件(使用-jars选项与spark-submit)的方式PySpark也可以调用Scala或Java写的UDF(通过JVM底层的调用), Holden Karau对这种方式进行了讨论.请注意,在这种技术中使用的一些Apache Spark私有变量不是正式面向最终用户的。此外这种方式还允许PySpark调用UDAF(目前必须在Java和Scala中定义),下面的示例演示了使用先前我们在Scala中定义的SUMPRODUCT UDAF:

object ScalaUDAFFromPythonExample {
  // … UDAF as defined in our example earlier ...
}

  // This function is called from PySpark to register our UDAF
  def registerUdf(sqlCtx: SQLContext) {
    sqlCtx.udf.register("SUMPRODUCT", new SumProductAggregateFunction)
  }
}

https://github.com/curtishoward/sparkudfexamples/tree/master/scala-udaf-from-python

Scala UDAF from PySpark

df = sqlContext.read.json("inventory.json")
df.registerTempTable("inventory")

scala_sql_context  =  sqlContext._ssql_ctx
scala_spark_context = sqlContext._sc
scala_spark_context._jvm.com.cloudera.fce.curtis.sparkudfexamples.scalaudaffrompython.ScalaUDAFFromPythonExample.registerUdf(scala_sql_context)

sqlContext.sql("SELECT Make, SUMPRODUCT(RetailValue,Stock) AS InventoryValuePerMake FROM inventory GROUP BY Make").show()

https://github.com/curtishoward/sparkudfexamples/tree/master/scala-udaf-from-python

每个版本的Apache Spark不断地添加与UDF相关的功能。在2.0中R中增加了对UDF的支持。作为参考,下面的表格总结了本博客中讨论特性版本:

table summarizing versions in which the key features discussed so far in this blog were introduced

性能相关

了解Apache Spark的UDF功能的性能影响很重要。例如,Python UDF(例如我们的CTOF函数)执行器JVM和运行UDF逻辑的Python解释器之间的数据序列化,与Java或Scala中的UDF实现相比,这大大降低了性能。缓解这种序列化瓶颈的解决方案如下:

1.从PySpark访问Hive UDF,如上一节所述。 Java UDF实现可以由执行器JVM直接访问。再次注意,这种方法只提供从Apache Spark的SQL查询语言访问UDF。
2.利用这种方法也可以访问在Java或Scala中实现的UDF,如我们之前定义的Scala UDAF示例所示。

一般来说,UDF逻辑应尽可能精简,因为它将为每一行调用。例如,在缩放到10亿行时,一个UDF需要100毫秒才能处理完一行数据,这会导致性能问题。

Spark SQL的另一个重要组成部分是Catalyst查询优化器。它的功能随着每个版本而扩展,通常可以提高Spark SQL查询的性能;然而,任意UDF实现代码可能不能被Catalyst很好的理解(虽然分析字节码的未来功能被认为可以解决这个问题)。因此,使用Apache Spark的内置SQL查询函数通常会带来最佳性能,并且应该是在避免引入UDF时考虑的第一种方法。高级用户寻求更紧密地结合他们的代码与Catalyst可以参考Chris Fregly使用Expression.genCode优化UDF代码的谈话,以及新的Apache Spark 2.0的实验功能,它提供了一个可插拔的API用于定制Catalyst优化程序规则。

结论

当Spark SQL的内置功能需要扩展时,UDF是一个有用的工具。本文提供了UDF和UDAF实现的演示,并讨论了集成步骤,以利用Spark SQL中现有的Java Hive UDF。 UDF可以在Python,Scala,Java和(在Spark 2.0中)R和Scala和Java中的UDAF实现。当使用UDF与PySpark时,必须考虑数据序列化成本,并且应该考虑上面讨论的两个策略来解决这个问题。最后,我们谈到了Spark SQL的Catalyst优化器以及若非必要的情况下应该先使用内置SQL函数的原因。

引用

原文:Working with UDFs in Apache Spark
Hive UDF 读取文件进行HashMap查找
Hive:创建UDF

代码

https://github.com/curtishoward/sparkudfexamples
CDH Version: 5.8.0 (Apache Spark 1.6.0)

打赏支持:支付宝/微信。如果你觉得我的文章对你有所帮助,可以打赏我哟。