python - Add a column to a dataframe whose value is based on another field, but needs to increment -
i have problem i've been wrestling , have imagine there more elegant solution looping through dataframe row row. have dataframe following:
eventtime | conditionmet --------- --------- | -------- 2017-09-11 00:00:01 | 0 2017-09-11 00:00:02 | 0 2017-09-11 00:00:03 | 0 2017-09-11 00:00:04 | 1 2017-09-11 00:00:05 | 1 2017-09-11 00:00:06 | 1 2017-09-11 00:00:07 | 0 2017-09-11 00:00:08 | 0 2017-09-11 00:00:09 | 1 2017-09-11 00:00:10 | 1 2017-09-11 00:00:11 | 1 2017-09-11 00:00:12 | 1 2017-09-11 00:00:13 | 0
each time condition met (conditionmet=1
), want label records event name (e.g. event1). can't find elegant way .withcolumn()
using when
condition or windowing. ideal result be:
eventtime |conditionmet|eventname ----------------- - | ---------- | -------- 2017-09-11 00:00:01 | 0 | 2017-09-11 00:00:02 | 0 | 2017-09-11 00:00:03 | 0 | 2017-09-11 00:00:04 | 1 | event1 2017-09-11 00:00:05 | 1 | event1 2017-09-11 00:00:06 | 1 | event1 2017-09-11 00:00:07 | 0 | 2017-09-11 00:00:08 | 0 | 2017-09-11 00:00:09 | 1 | event2 2017-09-11 00:00:10 | 1 | event2 2017-09-11 00:00:11 | 1 | event2 2017-09-11 00:00:12 | 1 | event2 2017-09-11 00:00:13 | 0 |
interested in clever approaches here.
i think if idea tag each sequential cluster unique tag can calculating cumulative sum
- sort df, on
eventtime
example - invert
conditionmet
- calculate cumulative sum of inverted column
ignore cumulative sum
conditionmet
= 0, , take cumulative sum tag of clusterconditionmet
= 1+-------------------+------------+-------+-------------+---------+ | eventtime|conditionmet|invcond|cumulativesum|eventname| +-------------------+------------+-------+-------------+---------+ |2017-09-11 00:00:01| 0| 1| 1| | |2017-09-11 00:00:02| 0| 1| 2| | |2017-09-11 00:00:03| 0| 1| 3| | |2017-09-11 00:00:04| 1| 0| 3| event3| |2017-09-11 00:00:05| 1| 0| 3| event3| |2017-09-11 00:00:06| 1| 0| 3| event3| |2017-09-11 00:00:07| 0| 1| 4| | |2017-09-11 00:00:08| 0| 1| 5| | |2017-09-11 00:00:09| 1| 0| 5| event5| |2017-09-11 00:00:10| 1| 0| 5| event5| |2017-09-11 00:00:11| 1| 0| 5| event5| |2017-09-11 00:00:12| 1| 0| 5| event5| |2017-09-11 00:00:13| 0| 1| 6| | +-------------------+------------+-------+-------------+---------+
code
from pyspark.sql.functions import lag, udf, col pyspark.sql import row pyspark.sql.types import stringtype, integertype pyspark.sql.window import window def tagsequentialclusters(df, condcolumn, tagcolumnname): ## invert condition 0 1 , 1 0 def invcond(value): if value == '1': return 0 else: return 1 ## add event valid clusters def mapeventnumber(cond, number): if cond == "1": return "event" + str(number) else: return "" ## add new columns row def addrowcolumn(row, **kwargs): rowdata = row.asdict() column in kwargs: rowdata[column] = kwargs[column] return row(**rowdata) ## calculate partial cumulative sum partition iterator def calcpartialcumulativesum(iter): counter = 0 final_iterator = [] row in iter: counter = counter + row["invcond"] newrow = addrowcolumn(row, partialsum=counter) final_iterator.append(newrow) return final_iterator ## tail of partiton index def gettailwithindex(index, iter): tailrow = none row in iter: tailrow = row return (index, tailrow["partialsum"]) ## calculate sum map each partition def calcsummap(collectedmap): final_iterator = {} index, value in enumerate(collectedmap): newval = value in range(0, index): newval += collectedmap[i] final_iterator[index] = newval return final_iterator ## calculate global cumulative sum def calccumulativesum(index, iter): final_iterator = [] row in iter: newval = row["partialsum"] + summap.value[index] final_iterator.append(addrowcolumn(row, eventnumber=newval)) return final_iterator ## register udf functions invcondudf = udf(invcond, integertype()) mapeventnumberudf = udf(mapeventnumber, stringtype()) ## invert conditionmet column rdd = df.withcolumn("invcond", invcondudf(col(condcolumn))).rdd ## calculate partial cumulative sum on each partition rdd = rdd.mappartitions(lambda iter: calcpartialcumulativesum(iter)).cache() ## calculate max sum value each partition collctedmap = rdd.mappartitionswithindex(gettailwithindex).collect() summap = spark.sparkcontext.broadcast(calcsummap(collctedmap)) ## calculate global cumulative sum df = rdd.mappartitionswithindex(calccumulativesum).todf() ## append `event` before each cluster number , ignore rest df = df.withcolumn(tagcolumnname, mapeventnumberudf(col(condcolumn), col("eventnumber"))) return df.drop(col("eventnumber")).drop(col("invcond")).drop(col("partialsum")) ## read data df = spark.read.csv("/home/eslam-elbanna/data.csv", header=true) ## tag sequnetial clusters df = tagsequentialclusters(df, "conditionmet", "eventname") df.show()
Comments
Post a Comment