Spark Scala: Set New Column Based on Each Row Values
Hi, dear reader. Hope everything is fine with you! Today I bring you a very brief article about a useful solution when dealing with Apache’s Spark Scala dataframes manipulation for dynamic column value population.
How to Use Existent Row Values to Condition New Column Values?
Let’s assume that we have a table (dataframe) coming from a dependency in which we would like to append some extra information based on existing values. In other words, let’s create a new column in which each row value is dependent on all the other row columns values.
In this scenario, let’s assume we own a garage and each a day there is an ETL job that consumes a dataset that contains information about the incoming service repair requests. As part of this job, we would like to validate that the incoming dataset (E: extract) is valid, meaning that for we would like to append a value to each row (T: transform) which indicates that such row is or not in a valid state (meaning, no null values). Given that this is a sample scenario there will be no Load (L) responsibility given that we will just output the raw transformation value as well as a direct extraction of the test data from an internal CSV file.
repair_id,brand,engine,tires
1,BMW,s4t,255,
2,Fiat,3er,245
3,Audi,,,
Above is the input CSV which contains the current repair requests with information about the characteristics of each vehicle (which can be nullable). The code snippet below presents the extraction logic of reading the contents of the CSV file into a dataframe to be manipulated for further transformation.
In order to efficiently achieve our goal (append a new column with row-specific validation details) it is required to manipulate the previously created dataframe by defining a new column name and its respective value. Note that this value can be a constant across each row (achievable through the use of the lit function) or a dynamic value that can be defined through a conditional expression (when function).
The usage of Spark’s when function works with a Column
object which specifies for each row a certain evaluated condition value (True
/False
) to assign the proper value to the target column depending on the defined condition to meet. Below is presented how this was achieved for the current scenario.
The populateContent
function is triggered per each row and has the responsibility of returning the new column value based on a condition (expressed through the when function) which will return one of two possible string values: VALID_DATA
(=YES
) or INVALID_DATA
(=NO
).
The predicate for the condition is based upon an externally defined UDF (User-Defined Function) which receives a Row
object and verifies if there is any null value within the input row’s values (the struct("*")
instruction indicates to select all available values).
More specifically, this function has the responsibility of looping through the row values and validate that there is any column value (of that row) that has a null value by invoking isNullAt
method of the Row
object.
Note that the predicate of the condition is expressed in the form of a column to obey when function interface, hence the lit wrapper for the returned boolean value of the UDF.
The code snippet below presents the full unaltered Scala objects that compose this test application.
After running the above script, we can validate that the defined transformation executed as expected and it correctly appended the new column values based on the each existent row values.
+---------+-----+------+-----+------------+
|repair_id|brand|engine|tires|valid_record|
+---------+-----+------+-----+------------+
| 1| BMW| s4t| 255| YES|
| 2| Fiat| 3er| 245| YES|
| 3| Audi| null| null| NO|
+---------+-----+------+-----+------------+
As initially required, we were able to append a new column that, depending on each row values, would indicate that the current record/row was or not complete with a YES
/NO
value.
Conclusion
This brief article presented a walkthrough on how to dynamically populate a new Spark dataframe column depending on each row’s existent values, in other words, a row-based dynamic column population solution. Such transformation might be helpful for validation, completeness or any other type of action that relies on the appendance of new values based on existent row values.
Feel free to go through the github repo and pull the code to test it or event to re-use it in your own projects.
Hope that this article was informative, feel free to comment with any questions or suggestions. Thank you and stay tuned for further articles!