使用Spark提取嵌套Json数据[译]

JSON是一种常用的数据存储方式。但是JSON解析起来还是比较麻烦的,这里是通过Spark DataFrames处理嵌套Json的一些例子(Spark 版本为 1.6.0)

原文:HOW TO EXTRACT NESTED JSON DATA IN SPARK

样本文件 sample.json

{
    "user": "gT35Hhhre9m",
    "dates": ["2016-01-29", "2016-01-28"],
    "status": "OK",
    "reason": "some reason",
    "content": [{
        "foo": 123,
        "bar": "val1"
    }, {
        "foo": 456,
        "bar": "val2"
    }, {
        "foo": 789,
        "bar": "val3"
    }, {
        "foo": 124,
        "bar": "val4"
    }, {
        "foo": 126,
        "bar": "val5"
    }]
}

假设你已经建立了一个SQLContext对象,下面的例子会给你演示怎么解析嵌套的Json文件。

将JSON文件载入 Spark DataFrame

scala> val df = sqlContext.read.json("sample.json")
df: org.apache.spark.sql.DataFrame = [content: array<struct<bar:string,foo:bigint>>, dates: array<string>, reason: string, status: string, user: string]

//output
df.show
+--------------------+--------------------+-----------+------+-----------+
|             content|               dates|     reason|status|       user|
+--------------------+--------------------+-----------+------+-----------+
|[[val1,123], [val...|[2016-01-29, 2016...|some reason|    OK|gT35Hhhre9m|
+--------------------+--------------------+-----------+------+-----------+

在上面的输出中我们看到“content”字段包含结构体数组,“dates”字段包含整形数组。我们要做的第一步是通过explode函数把数据抽取并转换到新的DataFrame中。

抽取“dates”然后放入新的DataFrame

//explode dates field
scala> val dfDates = df.select(explode(df("dates")))

//output
dfDates.show
+----------+
|       col|
+----------+
|2016-01-29|
|2016-01-28|
+----------+

//rename "col" to "dates"
scala> val dfDates = df.select(explode(df("dates"))).toDF("dates")

//output
dfDates.show
+----------+
|     dates|
+----------+
|2016-01-29|
|2016-01-28|
+----------+

我们“content”字段包含了结构化数组,要访问这些数据我们需要使用点操作符。

抽取结构化数据数据

//explode content field
scala> val dfContent = df.select(explode(df("content")))
dfContent: org.apache.spark.sql.DataFrame = [col: struct<bar:string,foo:bigint>]

//output
scala> dfContent.show
+----------+
|       col|
+----------+
|[val1,123]|
|[val2,456]|
|[val3,789]|
|[val4,124]|
|[val5,126]|
+----------+

//rename "col" to "content"
scala> val dfContent = df.select(explode(df("content"))).toDF("content")
dfContent: org.apache.spark.sql.DataFrame = [content: struct<bar:string,foo:bigint>]

//output
scala> dfContent.show
+----------+
|   content|
+----------+
|[val1,123]|
|[val2,456]|
|[val3,789]|
|[val4,124]|
|[val5,126]|
+----------+

//extracting fields in struct
scala> val dfFooBar = dfContent.select("content.foo", "content.bar")
dfFooBar: org.apache.spark.sql.DataFrame = [foo: bigint, bar: string]

//output
scala> dfFooBar.show
+---+----+
|foo| bar|
+---+----+
|123|val1|
|456|val2|
|789|val3|
|124|val4|
|126|val5|
+---+----+

参考

An introduction to JSON support in Spark SQL

打赏支持:如果你觉得我的文章对你有所帮助,可以打赏我哟。