SCD 2 on delta tables using pySpark

By Niraj Zade | 2025-07-09 | Tags: scd


Import relevant modules

from datetime import datetime
import pyspark.sql.types as T
import pyspark.sql.functions as F

Create spark session

Note: Not required if you're using databricks

import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

Create delta table with some existing data

schema = T.StructType([
    T.StructField("id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), False),
    T.StructField("address", T.StringType(), False),
    T.StructField("current", T.BooleanType(), False),
    T.StructField("effectiveDate", T.TimestampType(), False),
    T.StructField("endDate", T.TimestampType(), False),
])

effective_date = datetime(1990, 1, 1)
end_date = datetime.today()

table_data = [
    (1, "Alice", "Aliceland", True, effective_date, end_date),
    (2, "Bob", "Bobland", True, effective_date, end_date),
]
table_data_df = spark.createDataFrame(data=table_data, schema=schema)
table_data_df.show(truncate=False)

# Write data into delta table
(
    table_data_df
    .write
    .format("delta")
    .mode("overwrite")
    .save("data/scd2-data")
)

Create updates df, representing changed dimension rows

schema_updates = T.StructType([
    T.StructField("id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), False),
    T.StructField("address", T.StringType(), False),
])
# write df into a delta table. Later on, SCD2 will be applied to this delta table
data_updates = [
    (1, "Alice", "Otherland"),  # changed row
    (2, "Bob", "Bobland"),      # unchanged row
    (3, "Chad", "Chadland"),    # newly inserted row
]

updates_df = spark.createDataFrame(data = data_updates, schema = schema_updates)
updates_df.show(truncate=False)

Perform SCD2

customers_table = DeltaTable.forPath(spark, 'data/scd2-data')  # DeltaTable with schema (id, address, current, effectiveDate, endDate)

# add date columns to updates df
updates_df = (
    updates_df
    .withColumn("effectiveDate", F.current_timestamp())
    .withColumn("endDate", F.lit(None).cast("timestamp"))
)

# Rows containing new addresses or names of existing customers
new_records_to_insert = (
    updates_df
    .alias("updates")
    .join(customers_table.toDF().alias("customers"), "id")
    .where("customers.current = true AND ((updates.name <> customers.name) OR (updates.address <> customers.address))")
)

# Stage the update by unioning two sets of rows (this one is tricky to understand)
# 1. Rows matching whenNotMatched clause condition
#    These rows don't have a merge key, and will be inserted.
#    They represent customers that don't yet exist, or differ in name/address. Their merge key has been set to null.
# 2. Rows not matching whenNotMatched clause condition
#    These are the rows with a merge key.
#    These rows will either update the existing record (marking it as inactive), or create a new row representing the new version.

staged_updates_df = (
  new_records_to_insert
  .selectExpr("NULL as mergeKey", "updates.*")   # Rows for 1
  .union(updates_df.alias("updates").selectExpr("id as mergeKey", "*"))  # Rows for 2
)

print("staged updates")
staged_updates_df.show()

# Perform scd2 write on the delta table, by writing data of staged_updates_df through merge()
(
    customers_table
    .alias("customers")
    .merge(staged_updates_df.alias("staged_updates"), "customers.id = mergeKey")
    .whenMatchedUpdate( # existing changed records that have to be expired
      condition = "customers.current = true AND ((customers.address <> staged_updates.address) OR (customers.name <> staged_updates.name))",
      set = {
        "current": "false",
        "endDate": "staged_updates.effectiveDate",
      }
    ).whenNotMatchedInsert(
      values = {
        "current": "true",
        "endDate": "null",
        "id": "staged_updates.id",
        "name": "staged_updates.name",
        "address": "staged_updates.address",
        "effectiveDate": "staged_updates.effectiveDate",
      }
    ).execute()
)

updated_delta_table_data_df = (  
    spark  
    .read
    .format("delta")
    .load("data/scd2-data/")  
)
updated_delta_table_data_df.show(truncate=False)

Output

Delta table before:

+---+-----+---------+-------+-------------------+-------------------------+
|id |name |address  |current|effectiveDate      |endDate                  |
+---+-----+---------+-------+-------------------+-------------------------+
|1  |Alice|Aliceland|true   |1990-01-01 00:00:00|2025-07-09 18:21:20.44975|
|2  |Bob  |Bobland  |true   |1990-01-01 00:00:00|2025-07-09 18:21:20.44975|
+---+-----+---------+-------+-------------------+-------------------------+

Delta table after:

+---+-----+---------+-------+--------------------------+--------------------------+
|id |name |address  |current|effectiveDate             |endDate                   |
+---+-----+---------+-------+--------------------------+--------------------------+
|1  |Alice|Otherland|true   |2025-07-09 18:24:25.023453|NULL                      |
|1  |Alice|Aliceland|false  |1990-01-01 00:00:00       |2025-07-09 18:24:25.023453|
|2  |Bob  |Bobland  |true   |1990-01-01 00:00:00       |2025-07-09 18:21:20.44975 |
|3  |Chad |Chadland |true   |2025-07-09 18:24:25.023453|NULL                      |
+---+-----+---------+-------+--------------------------+--------------------------+