Skip to content
Fix Code Error

Creating a custom counter in Spark based on dataframe conditions

June 19, 2021 by Code Error
Posted By: Anonymous

Current Dataset

+---+-----+-----+-----+----+
| ID|Event|Index|start| end|
+---+-----+-----+-----+----+
|  1|  run|    0|start|null|
|  1|  run|    1| null|null|
|  1|  run|    2| null|null|
|  1| swim|    3| null| end|
|  1|  run|    4|start|null|
|  1| swim|    5| null|null|
|  1| swim|    6| null| end|
|  1|  run|    7|start|null|
|  1|  run|    8| null|null|
|  1|  run|    9| null|null|
|  1| swim|   10| null| end|
|  1|  run|   11|start|null|
|  1|  run|   12| null|null|
|  1|  run|   13| null| end|
|  2|  run|   14|start|null|
|  2|  run|   15| null|null|
|  2|  run|   16| null|null|
|  2| swim|   17| null| end|
|  2|  run|   18|start|null|
|  2| swim|   19| null|null|
|  2| swim|   20| null|null|
|  2| swim|   21| null|null|
|  2| swim|   22| null| end|
|  2|  run|   23|start|null|
|  2|  run|   24| null|null|
|  2|  run|   25| null| end|
|  3|  run|   26|start|null|
|  3|  run|   27| null|null|
|  3| swim|   28| null|null|
+---+-----+-----+-----+----+

Dataset I’m After

+---+-----+-----+-----+----+-------+
| ID|Event|Index|start| end|EventID|
+---+-----+-----+-----+----+-------+
|  1|  run|    0|start|null|      1|
|  1|  run|    1| null|null|      1|
|  1|  run|    2| null|null|      1|
|  1| swim|    3| null| end|      1|
|  1|  run|    4|start|null|      2|
|  1| swim|    5| null|null|      2|
|  1| swim|    6| null| end|      2|
|  1|  run|    7|start|null|      3|
|  1|  run|    8| null|null|      3|
|  1|  run|    9| null|null|      3|
|  1| swim|   10| null| end|      3|
|  1|  run|   11|start|null|      4|
|  1|  run|   12| null|null|      4|
|  1|  run|   13| null| end|      4|
|  2|  run|   14|start|null|      1|
|  2|  run|   15| null|null|      1|
|  2|  run|   16| null|null|      1|
|  2| swim|   17| null| end|      1|
|  2|  run|   18|start|null|      2|
|  2| swim|   19| null|null|      2|
|  2| swim|   20| null|null|      2|
|  2| swim|   21| null|null|      2|
|  2| swim|   22| null| end|      2|
|  2|  run|   23|start|null|      3|
|  2|  run|   24| null|null|      3|
|  2|  run|   25| null| end|      3|
|  3|  run|   26|start|null|      1|
|  3|  run|   27| null|null|      1|
|  3| swim|   28| null|null|      1|
+---+-----+-----+-----+----+-------+

I am trying to create the above EventID Column. Is there a way to create a counter inside of a udf that updates based on column conditions? Note, I’m not sure if a UDF is the best approach here.

Here is my current thinking-logic:

  • When a "start" value is seen, start counting.
  • When an "end" value is seen, end counting
  • Every time a new ID is seen, reset the counter to 1

Thank you all for any assistance.

Here is the raw code to produce the current dataframe:

# Current Dataset

data = [
       (1, "run", 0, 'start', None),
       (1, "run", 1,  None,   None),
       (1, "run", 2,  None,   None),
       (1, "swim", 3, None,   'end'),
       (1, "run",  4, 'start',None),
       (1, "swim", 5, None,   None),
       (1, "swim", 6, None,   'end'),
       (1, "run",7, 'start',   None),
       (1, "run",8, None,   None),
       (1, "run",9, None,   None),
       (1, "swim",10, None,   'end'),
       (1, "run",11, 'start',   None),
       (1, "run",12, None,   None),
       (1, "run",13, None,   'end'),
       (2, "run",14, 'start',   None),
       (2, "run",15, None,   None),
       (2, "run",16, None,   None),
       (2, "swim",17, None,   'end'),
       (2, "run",18, 'start',   None),
       (2, "swim",19, None,   None),
       (2, "swim",20, None,   None),
       (2, "swim",21, None,   None),
       (2, "swim",22, None,   'end'),
       (2, "run",23, 'start',   None),
       (2, "run",24, None,   None),
       (2, "run",25, None,   'end'),
       (3, "run",26, 'start',   None),
       (3, "run",27, None,   None),
       (3, "swim",28, None,   None)
        ]

schema = StructType([
  StructField('ID', IntegerType(),True), 
  StructField('Event', StringType(),True), 
  StructField('Index', IntegerType(),True), 
  StructField('start', StringType(),True), 
  StructField('end', StringType(),True)
])

df = spark.createDataFrame(data=data, schema=schema)
df.show(30)

Solution

You can use a window function:

import pyspark.sql.functions as F
from pyspark.sql.window import Window

w = Window.partitionBy('ID').rowsBetween(Window.unboundedPreceding,0).orderBy('index')
df.withColumn('EventId', F.sum(F.when(F.col('start') == 'start', 1).otherwise(0))
    .over(w)).orderBy('ID', 'Index').show(100)

results in

+---+-----+-----+-----+----+-------+
| ID|Event|Index|start| end|EventId|
+---+-----+-----+-----+----+-------+
|  1|  run|    0|start|null|      1|
|  1|  run|    1| null|null|      1|
|  1|  run|    2| null|null|      1|
|  1| swim|    3| null| end|      1|
|  1|  run|    4|start|null|      2|
|  1| swim|    5| null|null|      2|
|  1| swim|    6| null| end|      2|
|  1|  run|    7|start|null|      3|
|  1|  run|    8| null|null|      3|
|  1|  run|    9| null|null|      3|
|  1| swim|   10| null| end|      3|
|  1|  run|   11|start|null|      4|
|  1|  run|   12| null|null|      4|
|  1|  run|   13| null| end|      4|
|  2|  run|   14|start|null|      1|
|  2|  run|   15| null|null|      1|
|  2|  run|   16| null|null|      1|
|  2| swim|   17| null| end|      1|
|  2|  run|   18|start|null|      2|
|  2| swim|   19| null|null|      2|
|  2| swim|   20| null|null|      2|
|  2| swim|   21| null|null|      2|
|  2| swim|   22| null| end|      2|
|  2|  run|   23|start|null|      3|
|  2|  run|   24| null|null|      3|
|  2|  run|   25| null| end|      3|
|  3|  run|   26|start|null|      1|
|  3|  run|   27| null|null|      1|
|  3| swim|   28| null|null|      1|
+---+-----+-----+-----+----+-------+
Answered By: Anonymous

Related Articles

  • Exception in thread "JobGenerator"…
  • PySpark 3 - UDF to remove items from list column
  • Add jars to a Spark Job - spark-submit
  • Exception in thread "main" java.lang.NoClassDefFoundError:…
  • Error when building docker image for jupyter spark notebook
  • How to define partitioning of DataFrame?
  • How to turn off INFO logging in Spark?
  • How to set Apache Spark Executor memory
  • Task not serializable: java.io.NotSerializableException when…
  • java spark - Why is my Dataset error?

Disclaimer: This content is shared under creative common license cc-by-sa 3.0. It is generated from StackExchange Website Network.

Post navigation

Previous Post:

Is there anyway in EF Core to Filter Navigation single Property ( not collection )

Next Post:

How do I connect two collections and get data from an array of object?

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

  • Get code errors & solutions at akashmittal.com
© 2022 Fix Code Error