Creating a custom counter in Spark based on dataframe conditions
Posted By: Anonymous
Current Dataset
+---+-----+-----+-----+----+
| ID|Event|Index|start| end|
+---+-----+-----+-----+----+
| 1| run| 0|start|null|
| 1| run| 1| null|null|
| 1| run| 2| null|null|
| 1| swim| 3| null| end|
| 1| run| 4|start|null|
| 1| swim| 5| null|null|
| 1| swim| 6| null| end|
| 1| run| 7|start|null|
| 1| run| 8| null|null|
| 1| run| 9| null|null|
| 1| swim| 10| null| end|
| 1| run| 11|start|null|
| 1| run| 12| null|null|
| 1| run| 13| null| end|
| 2| run| 14|start|null|
| 2| run| 15| null|null|
| 2| run| 16| null|null|
| 2| swim| 17| null| end|
| 2| run| 18|start|null|
| 2| swim| 19| null|null|
| 2| swim| 20| null|null|
| 2| swim| 21| null|null|
| 2| swim| 22| null| end|
| 2| run| 23|start|null|
| 2| run| 24| null|null|
| 2| run| 25| null| end|
| 3| run| 26|start|null|
| 3| run| 27| null|null|
| 3| swim| 28| null|null|
+---+-----+-----+-----+----+
Dataset I’m After
+---+-----+-----+-----+----+-------+
| ID|Event|Index|start| end|EventID|
+---+-----+-----+-----+----+-------+
| 1| run| 0|start|null| 1|
| 1| run| 1| null|null| 1|
| 1| run| 2| null|null| 1|
| 1| swim| 3| null| end| 1|
| 1| run| 4|start|null| 2|
| 1| swim| 5| null|null| 2|
| 1| swim| 6| null| end| 2|
| 1| run| 7|start|null| 3|
| 1| run| 8| null|null| 3|
| 1| run| 9| null|null| 3|
| 1| swim| 10| null| end| 3|
| 1| run| 11|start|null| 4|
| 1| run| 12| null|null| 4|
| 1| run| 13| null| end| 4|
| 2| run| 14|start|null| 1|
| 2| run| 15| null|null| 1|
| 2| run| 16| null|null| 1|
| 2| swim| 17| null| end| 1|
| 2| run| 18|start|null| 2|
| 2| swim| 19| null|null| 2|
| 2| swim| 20| null|null| 2|
| 2| swim| 21| null|null| 2|
| 2| swim| 22| null| end| 2|
| 2| run| 23|start|null| 3|
| 2| run| 24| null|null| 3|
| 2| run| 25| null| end| 3|
| 3| run| 26|start|null| 1|
| 3| run| 27| null|null| 1|
| 3| swim| 28| null|null| 1|
+---+-----+-----+-----+----+-------+
I am trying to create the above EventID Column. Is there a way to create a counter inside of a udf that updates based on column conditions? Note, I’m not sure if a UDF is the best approach here.
Here is my current thinking-logic:
- When a "start" value is seen, start counting.
- When an "end" value is seen, end counting
- Every time a new ID is seen, reset the counter to 1
Thank you all for any assistance.
Here is the raw code to produce the current dataframe:
# Current Dataset
data = [
(1, "run", 0, 'start', None),
(1, "run", 1, None, None),
(1, "run", 2, None, None),
(1, "swim", 3, None, 'end'),
(1, "run", 4, 'start',None),
(1, "swim", 5, None, None),
(1, "swim", 6, None, 'end'),
(1, "run",7, 'start', None),
(1, "run",8, None, None),
(1, "run",9, None, None),
(1, "swim",10, None, 'end'),
(1, "run",11, 'start', None),
(1, "run",12, None, None),
(1, "run",13, None, 'end'),
(2, "run",14, 'start', None),
(2, "run",15, None, None),
(2, "run",16, None, None),
(2, "swim",17, None, 'end'),
(2, "run",18, 'start', None),
(2, "swim",19, None, None),
(2, "swim",20, None, None),
(2, "swim",21, None, None),
(2, "swim",22, None, 'end'),
(2, "run",23, 'start', None),
(2, "run",24, None, None),
(2, "run",25, None, 'end'),
(3, "run",26, 'start', None),
(3, "run",27, None, None),
(3, "swim",28, None, None)
]
schema = StructType([
StructField('ID', IntegerType(),True),
StructField('Event', StringType(),True),
StructField('Index', IntegerType(),True),
StructField('start', StringType(),True),
StructField('end', StringType(),True)
])
df = spark.createDataFrame(data=data, schema=schema)
df.show(30)
Solution
You can use a window function:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w = Window.partitionBy('ID').rowsBetween(Window.unboundedPreceding,0).orderBy('index')
df.withColumn('EventId', F.sum(F.when(F.col('start') == 'start', 1).otherwise(0))
.over(w)).orderBy('ID', 'Index').show(100)
results in
+---+-----+-----+-----+----+-------+
| ID|Event|Index|start| end|EventId|
+---+-----+-----+-----+----+-------+
| 1| run| 0|start|null| 1|
| 1| run| 1| null|null| 1|
| 1| run| 2| null|null| 1|
| 1| swim| 3| null| end| 1|
| 1| run| 4|start|null| 2|
| 1| swim| 5| null|null| 2|
| 1| swim| 6| null| end| 2|
| 1| run| 7|start|null| 3|
| 1| run| 8| null|null| 3|
| 1| run| 9| null|null| 3|
| 1| swim| 10| null| end| 3|
| 1| run| 11|start|null| 4|
| 1| run| 12| null|null| 4|
| 1| run| 13| null| end| 4|
| 2| run| 14|start|null| 1|
| 2| run| 15| null|null| 1|
| 2| run| 16| null|null| 1|
| 2| swim| 17| null| end| 1|
| 2| run| 18|start|null| 2|
| 2| swim| 19| null|null| 2|
| 2| swim| 20| null|null| 2|
| 2| swim| 21| null|null| 2|
| 2| swim| 22| null| end| 2|
| 2| run| 23|start|null| 3|
| 2| run| 24| null|null| 3|
| 2| run| 25| null| end| 3|
| 3| run| 26|start|null| 1|
| 3| run| 27| null|null| 1|
| 3| swim| 28| null|null| 1|
+---+-----+-----+-----+----+-------+
Answered By: Anonymous
Disclaimer: This content is shared under creative common license cc-by-sa 3.0. It is generated from StackExchange Website Network.