Import relevant modules
from datetime import datetime
import pyspark.sql.types as T
import pyspark.sql.functions as F
Create spark session
Note: Not required if you're using databricks
import pyspark
from delta import *
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
Create delta table with some existing data
schema = T.StructType([
T.StructField("id", T.IntegerType(), False),
T.StructField("name", T.StringType(), False),
T.StructField("address", T.StringType(), False),
T.StructField("current", T.BooleanType(), False),
T.StructField("effectiveDate", T.TimestampType(), False),
T.StructField("endDate", T.TimestampType(), False),
])
effective_date = datetime(1990, 1, 1)
end_date = datetime.today()
table_data = [
(1, "Alice", "Aliceland", True, effective_date, end_date),
(2, "Bob", "Bobland", True, effective_date, end_date),
]
table_data_df = spark.createDataFrame(data=table_data, schema=schema)
table_data_df.show(truncate=False)
# Write data into delta table
(
table_data_df
.write
.format("delta")
.mode("overwrite")
.save("data/scd2-data")
)
Create updates df, representing changed dimension rows
schema_updates = T.StructType([
T.StructField("id", T.IntegerType(), False),
T.StructField("name", T.StringType(), False),
T.StructField("address", T.StringType(), False),
])
# write df into a delta table. Later on, SCD2 will be applied to this delta table
data_updates = [
(1, "Alice", "Otherland"), # changed row
(2, "Bob", "Bobland"), # unchanged row
(3, "Chad", "Chadland"), # newly inserted row
]
updates_df = spark.createDataFrame(data = data_updates, schema = schema_updates)
updates_df.show(truncate=False)
Perform SCD2
customers_table = DeltaTable.forPath(spark, 'data/scd2-data') # DeltaTable with schema (id, address, current, effectiveDate, endDate)
# add date columns to updates df
updates_df = (
updates_df
.withColumn("effectiveDate", F.current_timestamp())
.withColumn("endDate", F.lit(None).cast("timestamp"))
)
# Rows containing new addresses or names of existing customers
new_records_to_insert = (
updates_df
.alias("updates")
.join(customers_table.toDF().alias("customers"), "id")
.where("customers.current = true AND ((updates.name <> customers.name) OR (updates.address <> customers.address))")
)
# Stage the update by unioning two sets of rows (this one is tricky to understand)
# 1. Rows matching whenNotMatched clause condition
# These rows don't have a merge key, and will be inserted.
# They represent customers that don't yet exist, or differ in name/address. Their merge key has been set to null.
# 2. Rows not matching whenNotMatched clause condition
# These are the rows with a merge key.
# These rows will either update the existing record (marking it as inactive), or create a new row representing the new version.
staged_updates_df = (
new_records_to_insert
.selectExpr("NULL as mergeKey", "updates.*") # Rows for 1
.union(updates_df.alias("updates").selectExpr("id as mergeKey", "*")) # Rows for 2
)
print("staged updates")
staged_updates_df.show()
# Perform scd2 write on the delta table, by writing data of staged_updates_df through merge()
(
customers_table
.alias("customers")
.merge(staged_updates_df.alias("staged_updates"), "customers.id = mergeKey")
.whenMatchedUpdate( # existing changed records that have to be expired
condition = "customers.current = true AND ((customers.address <> staged_updates.address) OR (customers.name <> staged_updates.name))",
set = {
"current": "false",
"endDate": "staged_updates.effectiveDate",
}
).whenNotMatchedInsert(
values = {
"current": "true",
"endDate": "null",
"id": "staged_updates.id",
"name": "staged_updates.name",
"address": "staged_updates.address",
"effectiveDate": "staged_updates.effectiveDate",
}
).execute()
)
updated_delta_table_data_df = (
spark
.read
.format("delta")
.load("data/scd2-data/")
)
updated_delta_table_data_df.show(truncate=False)
Output
Delta table before:
+---+-----+---------+-------+-------------------+-------------------------+
|id |name |address |current|effectiveDate |endDate |
+---+-----+---------+-------+-------------------+-------------------------+
|1 |Alice|Aliceland|true |1990-01-01 00:00:00|2025-07-09 18:21:20.44975|
|2 |Bob |Bobland |true |1990-01-01 00:00:00|2025-07-09 18:21:20.44975|
+---+-----+---------+-------+-------------------+-------------------------+
Delta table after:
+---+-----+---------+-------+--------------------------+--------------------------+
|id |name |address |current|effectiveDate |endDate |
+---+-----+---------+-------+--------------------------+--------------------------+
|1 |Alice|Otherland|true |2025-07-09 18:24:25.023453|NULL |
|1 |Alice|Aliceland|false |1990-01-01 00:00:00 |2025-07-09 18:24:25.023453|
|2 |Bob |Bobland |true |1990-01-01 00:00:00 |2025-07-09 18:21:20.44975 |
|3 |Chad |Chadland |true |2025-07-09 18:24:25.023453|NULL |
+---+-----+---------+-------+--------------------------+--------------------------+