apache spark - How to apply windowing function in pyspark over grouped data which needs an aggregation within an aggregation? -
i have complicated winodwing operation need in pyspark.
i have data grouped src
, dest
, , need following operations each group: - select rows amounts in socket2
do not appear in socket1
(for rows in group) - after applying filtering criteria, sum amounts in amounts
field
amounts src dest socket1 socket2 10 1 2 b 11 1 2 b c 12 1 2 c d 510 1 2 c d 550 1 2 b c 500 1 2 b 80 1 3 b
and want aggregate in following way:
512+10 = 522, , 80 record src=1 , dest=3
amounts src dest 522 1 2 80 1 3
i borrowed sample data here: how write pyspark udaf on multiple columns?
you can split dataframe 2 dataframes 1 socket1
, other 1 socket2
and use leftanti
join instead of filtering (works spark >= 2.0
).
first let's create dataframe:
df = spark.createdataframe( sc.parallelize([ [10,1,2,"a","b"], [11,1,2,"b","c"], [12,1,2,"c","d"], [510,1,2,"c","d"], [550,1,2,"b","c"], [500,1,2,"a","b"], [80,1,3,"a","b"] ]), ["amounts","src","dest","socket1","socket2"] )
and split dataframe :
spark >= 2.0
df1 = df.withcolumnrenamed("socket1", "socket").drop("socket2") df2 = df.withcolumnrenamed("socket2", "socket").drop("socket1") res = df2.join(df1, ["src", "dest", "socket"], "leftanti")
spark 1.6
df1 = df.withcolumnrenamed("socket1", "socket").drop("socket2").withcolumnrenamed("amounts", "amounts1") df2 = df.withcolumnrenamed("socket2", "socket").drop("socket1") res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 null").drop("amounts1")
and aggregation:
import pyspark.sql.functions psf res.groupby("src", "dest").agg( psf.sum("amounts").alias("amounts") ).show() +---+----+-------+ |src|dest|amounts| +---+----+-------+ | 1| 3| 80| | 1| 2| 522| +---+----+-------+
Comments
Post a Comment