apache spark - How to apply windowing function in pyspark over grouped data which needs an aggregation within an aggregation? -


i have complicated winodwing operation need in pyspark.

i have data grouped src , dest, , need following operations each group: - select rows amounts in socket2 do not appear in socket1 (for rows in group) - after applying filtering criteria, sum amounts in amounts field

amounts     src    dest    socket1   socket2 10          1        2                 b 11          1        2           b        c 12           1        2          c       d 510          1       2          c       d 550          1        2          b       c   500          1        2                b 80            1         3                 b 

and want aggregate in following way:
512+10 = 522, , 80 record src=1 , dest=3

amounts     src    dest     522          1        2       80          1        3     

i borrowed sample data here: how write pyspark udaf on multiple columns?

you can split dataframe 2 dataframes 1 socket1 , other 1 socket2and use leftanti join instead of filtering (works spark >= 2.0).

first let's create dataframe:

df = spark.createdataframe(     sc.parallelize([         [10,1,2,"a","b"],         [11,1,2,"b","c"],         [12,1,2,"c","d"],         [510,1,2,"c","d"],         [550,1,2,"b","c"],         [500,1,2,"a","b"],         [80,1,3,"a","b"]     ]),      ["amounts","src","dest","socket1","socket2"] ) 

and split dataframe :

spark >= 2.0

df1 = df.withcolumnrenamed("socket1", "socket").drop("socket2") df2 = df.withcolumnrenamed("socket2", "socket").drop("socket1") res = df2.join(df1, ["src", "dest", "socket"], "leftanti") 

spark 1.6

df1 = df.withcolumnrenamed("socket1", "socket").drop("socket2").withcolumnrenamed("amounts", "amounts1") df2 = df.withcolumnrenamed("socket2", "socket").drop("socket1") res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 null").drop("amounts1") 

and aggregation:

import pyspark.sql.functions psf res.groupby("src", "dest").agg(     psf.sum("amounts").alias("amounts") ).show()      +---+----+-------+     |src|dest|amounts|     +---+----+-------+     |  1|   3|     80|     |  1|   2|    522|     +---+----+-------+ 

Comments

Popular posts from this blog

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -

python - Error while using APScheduler: 'NoneType' object has no attribute 'now' -