python - Add a column to a dataframe whose value is based on another field, but needs to increment -


i have problem i've been wrestling , have imagine there more elegant solution looping through dataframe row row. have dataframe following:

          eventtime | conditionmet  --------- --------- | -------- 2017-09-11 00:00:01 | 0 2017-09-11 00:00:02 | 0 2017-09-11 00:00:03 | 0 2017-09-11 00:00:04 | 1 2017-09-11 00:00:05 | 1 2017-09-11 00:00:06 | 1 2017-09-11 00:00:07 | 0 2017-09-11 00:00:08 | 0 2017-09-11 00:00:09 | 1 2017-09-11 00:00:10 | 1 2017-09-11 00:00:11 | 1 2017-09-11 00:00:12 | 1 2017-09-11 00:00:13 | 0 

each time condition met (conditionmet=1), want label records event name (e.g. event1). can't find elegant way .withcolumn() using when condition or windowing. ideal result be:

        eventtime   |conditionmet|eventname ----------------- - | ---------- | -------- 2017-09-11 00:00:01 |       0    |      2017-09-11 00:00:02 |       0    | 2017-09-11 00:00:03 |       0    | 2017-09-11 00:00:04 |       1    | event1 2017-09-11 00:00:05 |       1    | event1 2017-09-11 00:00:06 |       1    | event1 2017-09-11 00:00:07 |       0    | 2017-09-11 00:00:08 |       0    | 2017-09-11 00:00:09 |       1    | event2 2017-09-11 00:00:10 |       1    | event2 2017-09-11 00:00:11 |       1    | event2 2017-09-11 00:00:12 |       1    | event2 2017-09-11 00:00:13 |       0    |  

interested in clever approaches here.

i think if idea tag each sequential cluster unique tag can calculating cumulative sum

  1. sort df, on eventtime example
  2. invert conditionmet
  3. calculate cumulative sum of inverted column
  4. ignore cumulative sum conditionmet = 0, , take cumulative sum tag of cluster conditionmet = 1

    +-------------------+------------+-------+-------------+---------+ |          eventtime|conditionmet|invcond|cumulativesum|eventname| +-------------------+------------+-------+-------------+---------+ |2017-09-11 00:00:01|           0|      1|            1|         | |2017-09-11 00:00:02|           0|      1|            2|         | |2017-09-11 00:00:03|           0|      1|            3|         | |2017-09-11 00:00:04|           1|      0|            3|   event3| |2017-09-11 00:00:05|           1|      0|            3|   event3| |2017-09-11 00:00:06|           1|      0|            3|   event3| |2017-09-11 00:00:07|           0|      1|            4|         | |2017-09-11 00:00:08|           0|      1|            5|         | |2017-09-11 00:00:09|           1|      0|            5|   event5| |2017-09-11 00:00:10|           1|      0|            5|   event5| |2017-09-11 00:00:11|           1|      0|            5|   event5| |2017-09-11 00:00:12|           1|      0|            5|   event5| |2017-09-11 00:00:13|           0|      1|            6|         | +-------------------+------------+-------+-------------+---------+ 

code

from pyspark.sql.functions import lag, udf, col pyspark.sql import row pyspark.sql.types import stringtype, integertype pyspark.sql.window import window  def tagsequentialclusters(df, condcolumn, tagcolumnname):     ## invert condition 0 1 , 1 0     def invcond(value):         if value == '1':             return 0         else:             return 1      ## add event valid clusters     def mapeventnumber(cond, number):         if cond == "1":             return "event" + str(number)         else:             return ""      ## add new columns row     def addrowcolumn(row, **kwargs):         rowdata = row.asdict()         column in kwargs:             rowdata[column] = kwargs[column]         return row(**rowdata)      ## calculate partial cumulative sum partition iterator     def calcpartialcumulativesum(iter):         counter = 0              final_iterator = []         row in iter:             counter = counter + row["invcond"]             newrow = addrowcolumn(row, partialsum=counter)             final_iterator.append(newrow)         return final_iterator      ## tail of partiton index     def gettailwithindex(index, iter):          tailrow = none         row in iter:             tailrow = row         return (index, tailrow["partialsum"])      ## calculate sum map each partition     def calcsummap(collectedmap):         final_iterator = {}         index, value in enumerate(collectedmap):             newval = value             in range(0, index):                 newval += collectedmap[i]             final_iterator[index] = newval         return final_iterator      ## calculate global cumulative sum     def calccumulativesum(index, iter):         final_iterator = []         row in iter:             newval = row["partialsum"] + summap.value[index]             final_iterator.append(addrowcolumn(row, eventnumber=newval))         return final_iterator      ## register udf functions     invcondudf = udf(invcond, integertype())     mapeventnumberudf = udf(mapeventnumber, stringtype())      ## invert conditionmet column     rdd = df.withcolumn("invcond", invcondudf(col(condcolumn))).rdd      ## calculate partial cumulative sum on each partition     rdd = rdd.mappartitions(lambda iter: calcpartialcumulativesum(iter)).cache()      ## calculate max sum value each partition     collctedmap = rdd.mappartitionswithindex(gettailwithindex).collect()     summap = spark.sparkcontext.broadcast(calcsummap(collctedmap))      ## calculate global cumulative sum      df = rdd.mappartitionswithindex(calccumulativesum).todf()      ## append `event` before each cluster number , ignore rest     df = df.withcolumn(tagcolumnname, mapeventnumberudf(col(condcolumn), col("eventnumber")))      return df.drop(col("eventnumber")).drop(col("invcond")).drop(col("partialsum"))  ## read data df = spark.read.csv("/home/eslam-elbanna/data.csv", header=true)      ## tag sequnetial clusters df = tagsequentialclusters(df, "conditionmet", "eventname") df.show() 

Comments

Popular posts from this blog

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -

python - Error while using APScheduler: 'NoneType' object has no attribute 'now' -