spark structured streaming aggregation without timestamp on data (aggregation based on trigger) -
i need perform aggregation on incoming data based on spark driver timestamp, without watermark. data doesn't have timestamp field.
the requirement is: compute average of data received every sec (it doesn't matter when have been send)
for example need aggregation on data received every trigger, previous rdd streaming api.
is there way ?
you can create own sink , operation on each addbatch() call:
class customsink extends sink { override def addbatch(batchid: long, data: dataframe): unit = { data.groupby().agg(sum("age") "sumage").foreach(v => println(s"result=$v")) } } class customsinkprovider extends streamsinkprovider datasourceregister { def createsink( sqlcontext: sqlcontext, parameters: map[string, string], partitioncolumns: seq[string], outputmode: outputmode): sink = { new personsink() } def shortname(): string = "person" }
with outputmode set update , trigger every x seconds
val query = ds.writestream .trigger(trigger.processingtime("1 seconds")) .outputmode(outputmode.update()) .format("exactlyonce.customsinkprovider")
Comments
Post a Comment