pyspark - How to convert spark streaming nested json coming on kafka to flat dataframe? -
need on first attempt parse json coming on kafka spark structured streaming.
i struggling convert incoming json , covert flat dataframe further processing.
my input json
[ { "siteid": "30:47:47:be:16:8f", "sitedata": [ { "dataseries": "trend-255", "values": [ {"ts": 1502715600, "value": 35.74 }, {"ts": 1502715660, "value": 35.65 }, {"ts": 1502715720, "value": 35.58 }, {"ts": 1502715780, "value": 35.55 } ] }, { "dataseries": "trend-256", "values": [ {"ts": 1502715840, "value": 18.45 }, {"ts": 1502715900, "value": 18.35 }, {"ts": 1502715960, "value": 18.32 } ] } ] }, { "siteid": "30:47:47:be:16:ff", "sitedata": [ { "dataseries": "trend-255", "values": [ {"ts": 1502715600, "value": 35.74 }, {"ts": 1502715660, "value": 35.65 }, {"ts": 1502715720, "value": 35.58 }, {"ts": 1502715780, "value": 35.55 } ] }, { "dataseries": "trend-256", "values": [ {"ts": 1502715840, "value": 18.45 }, {"ts": 1502715900, "value": 18.35 }, {"ts": 1502715960, "value": 18.32 } ] } ] } ]
spark schema is
data1_spark_schema = arraytype( structtype([ structfield("siteid", stringtype(), false), structfield("sitedata", arraytype(structtype([ structfield("dataseries", stringtype(), false), structfield("values", arraytype(structtype([ structfield("ts", integertype(), false), structfield("value", stringtype(), false) ]), false), false) ]), false), false) ]), false )
my simple code is:
from pyspark.sql import sparksession pyspark.sql.functions import * config.general import kafka_instance config.general import topic schemas.schema import data1_spark_schema spark = sparksession \ .builder \ .appname("structured_bms_feed") \ .getorcreate() stream = spark \ .readstream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_instance) \ .option("subscribe", topic) \ .option("startingoffsets", "latest") \ .option("max.poll.records", 100) \ .option("failondataloss", false) \ .load() stream_records = stream.selectexpr("cast(key string)", "cast(value string) bms_data1") \ .select(from_json("bms_data1", data1_spark_schema).alias("bms_data1")) sites = stream_records.select(explode("bms_data1").alias("site")) \ .select("site.*") sites.printschema() stream_debug = sites.writestream \ .outputmode("append") \ .format("console") \ .option("numrows", 20) \ .option("truncate", false) \ .start() stream_debug.awaittermination()
when run code schema printing this:
root |-- siteid: string (nullable = false) |-- sitedata: array (nullable = false) | |-- element: struct (containsnull = false) | | |-- dataseries: string (nullable = false) | | |-- values: array (nullable = false) | | | |-- element: struct (containsnull = false) | | | | |-- ts: integer (nullable = false) | | | | |-- value: string (nullable = false)
is possible have schema in way fields in flat dataframe instead of nested json. every ts , value should give me 1 row parent dataseries , site id.
answering own question. managed flatten using following lines:
sites_flat = stream_records.select(explode("bms_data1").alias("site")) \ .select("site.siteid", explode("site.sitedata").alias("sitedata")) \ .select("siteid", "sitedata.dataseries", explode("sitedata.values").alias("values")) \ .select("siteid", "dataseries", "values.*")
Comments
Post a Comment