spark structured streaming aggregation without timestamp on data (aggregation based on trigger) -


i need perform aggregation on incoming data based on spark driver timestamp, without watermark. data doesn't have timestamp field.

the requirement is: compute average of data received every sec (it doesn't matter when have been send)

for example need aggregation on data received every trigger, previous rdd streaming api.

is there way ?

you can create own sink , operation on each addbatch() call:

class customsink extends sink {   override def addbatch(batchid: long, data: dataframe): unit = {     data.groupby().agg(sum("age") "sumage").foreach(v => println(s"result=$v"))   } }  class customsinkprovider extends streamsinkprovider datasourceregister {   def createsink(                   sqlcontext: sqlcontext,                   parameters: map[string, string],                   partitioncolumns: seq[string],                   outputmode: outputmode): sink = {     new personsink()   }    def shortname(): string = "person" } 

with outputmode set update , trigger every x seconds

  val query = ds.writestream     .trigger(trigger.processingtime("1 seconds"))     .outputmode(outputmode.update())     .format("exactlyonce.customsinkprovider") 

Comments

Popular posts from this blog

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -

python - Error while using APScheduler: 'NoneType' object has no attribute 'now' -