apache spark - How to apply windowing function in pyspark over grouped data which needs an aggregation within an aggregation? -
i have complicated winodwing operation need in pyspark.
i have data grouped src , dest, , need following operations each group: - select rows amounts in socket2 do not appear in socket1 (for rows in group) - after applying filtering criteria, sum amounts in amounts field
amounts src dest socket1 socket2 10 1 2 b 11 1 2 b c 12 1 2 c d 510 1 2 c d 550 1 2 b c 500 1 2 b 80 1 3 b and want aggregate in following way:
512+10 = 522, , 80 record src=1 , dest=3
amounts src dest 522 1 2 80 1 3 i borrowed sample data here: how write pyspark udaf on multiple columns?
you can split dataframe 2 dataframes 1 socket1 , other 1 socket2and use leftanti join instead of filtering (works spark >= 2.0).
first let's create dataframe:
df = spark.createdataframe( sc.parallelize([ [10,1,2,"a","b"], [11,1,2,"b","c"], [12,1,2,"c","d"], [510,1,2,"c","d"], [550,1,2,"b","c"], [500,1,2,"a","b"], [80,1,3,"a","b"] ]), ["amounts","src","dest","socket1","socket2"] ) and split dataframe :
spark >= 2.0
df1 = df.withcolumnrenamed("socket1", "socket").drop("socket2") df2 = df.withcolumnrenamed("socket2", "socket").drop("socket1") res = df2.join(df1, ["src", "dest", "socket"], "leftanti") spark 1.6
df1 = df.withcolumnrenamed("socket1", "socket").drop("socket2").withcolumnrenamed("amounts", "amounts1") df2 = df.withcolumnrenamed("socket2", "socket").drop("socket1") res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 null").drop("amounts1") and aggregation:
import pyspark.sql.functions psf res.groupby("src", "dest").agg( psf.sum("amounts").alias("amounts") ).show() +---+----+-------+ |src|dest|amounts| +---+----+-------+ | 1| 3| 80| | 1| 2| 522| +---+----+-------+
Comments
Post a Comment