The question is regarding working with dataframes, I want to delete completely duplicate records excluding some fields (dates). I tried to use a windowFunction (WindowSpec) as:
val wFromDupl: WindowSpec = Window .partitionBy(comparateFields: _*) .orderBy(asc(orderField))
At the variable comparateFields I store all the fields that I have to check (in the example it would be DESC1 and DESC2) to eliminate duplicates following the logic that, if there is a duplicate record, we discard those with higher date.
In the orderField variable, I simply store the effective_date field.
Therefore, by applying the window function, what I do is calculate a temporary column, assigning the smallest date to all the records that are duplicates, and then filter the dataFrame as:
val dfFinal: DataFrame = dfInicial .withColumn("w_eff_date", min(col("effective_date")).over(wFromDupl)) .filter(col("effective_date") === col("w_eff_date")) .drop("w_eff_date") .distinct()
For the following case it works correctly:
KEY EFFECTIVE_DATE DESC 1 DESC 2 W_EFF_DATE (tmp) E2 2000 A B 2000 E2 2001 A B 2000 E2 2002 AA B 2002
The code will drop the second record:
E2 2001 A B 2000
But the logic must be applied for CONSECUTIVE records (in date), for example, for the following case, as the code is implemented, we are deleting the third record (DESC1 and DESC2 are the same, and the min eff date is 2000), but we dont want this because we have (by eff_date) a record in the middle (2001 AA B)so we want to keep the 3 records
KEY EFFECTIVE_DATE DESC1 DESC2 W_EFF_DATE (tmp) E1 2000 A B 2000 E1 2001 AA B 2001 E1 2002 A B 2000
Any advice on this? Thank you all!