pyspark - How to convert spark streaming nested json coming on kafka to flat dataframe? -


need on first attempt parse json coming on kafka spark structured streaming.

i struggling convert incoming json , covert flat dataframe further processing.

my input json

[     { "siteid": "30:47:47:be:16:8f", "sitedata":          [             { "dataseries": "trend-255", "values":                  [                     {"ts": 1502715600, "value": 35.74 },                     {"ts": 1502715660, "value": 35.65 },                     {"ts": 1502715720, "value": 35.58 },                     {"ts": 1502715780, "value": 35.55 }                 ]             },             { "dataseries": "trend-256", "values":                 [                     {"ts": 1502715840, "value": 18.45 },                     {"ts": 1502715900, "value": 18.35 },                     {"ts": 1502715960, "value": 18.32 }                 ]             }         ]     },     { "siteid": "30:47:47:be:16:ff", "sitedata":          [             { "dataseries": "trend-255", "values":                  [                     {"ts": 1502715600, "value": 35.74 },                     {"ts": 1502715660, "value": 35.65 },                     {"ts": 1502715720, "value": 35.58 },                     {"ts": 1502715780, "value": 35.55 }                 ]             },             { "dataseries": "trend-256", "values":                 [                     {"ts": 1502715840, "value": 18.45 },                     {"ts": 1502715900, "value": 18.35 },                     {"ts": 1502715960, "value": 18.32 }                 ]             }         ]     } ] 

spark schema is

data1_spark_schema = arraytype( structtype([   structfield("siteid", stringtype(), false),   structfield("sitedata", arraytype(structtype([     structfield("dataseries", stringtype(), false),     structfield("values", arraytype(structtype([       structfield("ts", integertype(), false),       structfield("value", stringtype(), false)     ]), false), false)   ]), false), false) ]), false ) 

my simple code is:

from pyspark.sql import sparksession pyspark.sql.functions import *  config.general import kafka_instance config.general import topic schemas.schema import data1_spark_schema  spark = sparksession \             .builder \             .appname("structured_bms_feed") \             .getorcreate()  stream = spark \         .readstream \         .format("kafka") \         .option("kafka.bootstrap.servers", kafka_instance) \         .option("subscribe", topic) \         .option("startingoffsets", "latest") \         .option("max.poll.records", 100) \         .option("failondataloss", false) \         .load()  stream_records = stream.selectexpr("cast(key string)", "cast(value string) bms_data1") \                        .select(from_json("bms_data1", data1_spark_schema).alias("bms_data1"))  sites = stream_records.select(explode("bms_data1").alias("site")) \                       .select("site.*")  sites.printschema()  stream_debug = sites.writestream \                              .outputmode("append") \                              .format("console") \                              .option("numrows", 20) \                              .option("truncate", false) \                              .start()   stream_debug.awaittermination() 

when run code schema printing this:

root  |-- siteid: string (nullable = false)  |-- sitedata: array (nullable = false)  |    |-- element: struct (containsnull = false)  |    |    |-- dataseries: string (nullable = false)  |    |    |-- values: array (nullable = false)  |    |    |    |-- element: struct (containsnull = false)  |    |    |    |    |-- ts: integer (nullable = false)  |    |    |    |    |-- value: string (nullable = false) 

is possible have schema in way fields in flat dataframe instead of nested json. every ts , value should give me 1 row parent dataseries , site id.

answering own question. managed flatten using following lines:

sites_flat = stream_records.select(explode("bms_data1").alias("site")) \                            .select("site.siteid", explode("site.sitedata").alias("sitedata")) \                            .select("siteid", "sitedata.dataseries", explode("sitedata.values").alias("values")) \                            .select("siteid", "dataseries", "values.*") 

Comments

Popular posts from this blog

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -

python - Error while using APScheduler: 'NoneType' object has no attribute 'now' -