Cuando un pipeline de ingestión en Apache Spark arroja la excepción multipleSourceRowMatchingTargetRowInMergeException
al ejecutar un MERGE sobre una tabla Delta, la confusión inicial suele ser grande: el mismo código funciona en otras tablas y las diferencias no saltan a la vista. En esta guía exhaustiva, desglosamos las verdaderas causas de este error, mostramos cómo reproducirlo de forma controlada y proponemos una ruta de corrección totalmente verificable en entornos de producción.
¿Qué está pasando realmente?
El mensaje oficial de Delta Lake indica que varios registros del origen coinciden con un único registro del destino según la condición de emparejamiento del MERGE. Es decir, Delta espera una correspondencia 1‑a‑1 pero encuentra 1‑a‑N. No obstante, detrás de esa descripción conviven múltiples factores técnicos: claves duplicadas, type mismatch, rutas inconsistentes, configuraciones de sesión e incluso desalineación de versiones de librerías.
Diagramando el escenario problemático
from delta import DeltaTable
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder.getOrCreate()
DataFrame origen con datos nuevos y supuestamente únicos
dfNewData = (
spark.read.parquet("abfss\://landing/data/entities/2025-07-29/")
)
MERGE simplificado (pero defectuoso)
(
DeltaTable.forPath(spark, "abfss\://silver/entities/")
.alias("entities")
.merge(
dfNewData.alias("New"),
"entities.uprn = New\.uprn"
)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
En apariencia todo luce correcto, sin embargo la llamada puede abortar con el stack trace siguiente:
org.apache.spark.sql.delta.DeltaErrors$MultipleSourceRowMatchingTargetRowInMergeException:
Duplicate source rows matched multiple target rows in MERGE
Casi siempre la falla se manifiesta en la última fase de shuffle del plan físico, es decir, después de haber consumido potencia de cómputo y cuotas de IO.
Checklist de diagnóstico
Para disminuir el tiempo hasta la causa raíz (mean time to root cause, MTTR) resulta útil un checklist ordenado de verificaciones. Las agrupamos en seis bloques:
Paso | Qué revisar | Acción propuesta |
---|---|---|
1. Unicidad de la clave | El error surge si más de un registro en dfNewData comparte el mismo uprn , generando colisiones contra la tabla objetivo. | • Ejecuta dfNewData.groupBy("uprn").count().filter("count > 1") .• De forma preventiva: dfNewData = dfNewData.dropDuplicates(["uprn"]) . |
2. Tipos de datos | Que las columnas se llamen igual no garantiza que sean del mismo tipo primitivo. | Alinea los esquemas:spark.read.table("C365.entities").printSchema() y dfNewData.printSchema() . Asegúrate de aplicar cast explícito si difieren. |
3. Ruta y tabla | Unos equipos usan referencia por ruta y otros por nombre de tabla; la ligera diferencia puede apuntar a escritorios Delta distintos. | • Igualar ambas partes:DeltaTable.forName(spark, "C365.entities") .• Confirma que la ubicación en metadatos coincide con la ruta física. |
4. Permisos / base de datos | Si el usuario o la app de servicio no puede leer versiones antiguas de la tabla, Spark recurre a vistas parciales que inducen ambigüedad. | Revisa SHOW GRANT ON TABLE C365.entities; y las ACLs del contenedor ADLS/OSS. |
5. Buenas prácticas operativas | En notebooks la mezcla de celdas %sql y PySpark reinicia contexto tras inactividad. | • Ejecutar las celdas en una sola pasada. • Encerrar el MERGE en try / except y registrar con log4j . |
6. Versiones de Delta Lake | La JVM de tu clúster puede tener delta-core_2.12-2.3.0.jar mientras que tu job trae 2.4.0 . | Determinar versiones con spark.conf.get("spark.databricks.delta.lastCommitVersion") y coincidirlas. |
Profundizando en la clave primaria (uprn)
uprn (Unique Property Reference Number) debería ser un identificador único por diseño. Pero en la práctica, llegan registros de varias fuentes con errores de captura, conversiones de tipo, padding de ceros o recortes de longitud. Un ejemplo típico es el siguiente:
SELECT uprn, COUNT(*) AS c
FROM parquet.`abfss://landing/data/entities/2025-07-29/`
GROUP BY uprn
HAVING c > 1
ORDER BY c DESC;
Un solo día de ingesta puede incluir docenas de duplicados, suficientes para romper la expectativa 1‑a‑1. Lo aconsejable es tratar el conjunto cerca del origen:
from pyspark.sql.window import Window
w = Window.partitionBy("uprn").orderBy(F.col("lastUpdated").desc())
dfNewData_clean = (
dfNewData
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn")
)
El patrón conserva la fila más reciente por uprn y descarta el resto, eliminando la ambigüedad.
Validación de tipos de datos
A veces el desglose de claves muestra cero duplicados y aun así el MERGE sigue fallando. La siguiente causa habitual es la desalineación de tipos entre origen y destino:
- En la tabla Delta,
uprn
es LONG. - En el DataFrame,
uprn
llega como STRING porque el productor cambió la definición.
Spark aplica un cast implícito; por ejemplo, "00123"
y "123"
se convierten ambos en 123L
, originando coincidencias múltiples.
Solución directa:
dfNewData_casted = dfNewData.withColumn(
"uprn",
F.col("uprn").cast("long")
)
O, si quieres mantener la columna textual, cambia el destino:
ALTER TABLE C365.entities
ALTER COLUMN uprn SET DATA TYPE STRING;
Consistencia de rutas y nombres de tabla
Este es un detalle sutil pero repetido en proyectos multi‑equipo: unos scripts se refieren a la tabla por ruta absoluta y otros por su nombre registrado en el metastore. El resultado es que se actualizan ubicaciones distintas —o peor aún, que se mezcla managed con external. La regla de oro:
En código productivo, usa siempre
DeltaTable.forName(spark, "esquema.tabla")
a menos que necesites direccionar un dataset experimental no registrado.
Este enfoque delega en el metastore la responsabilidad de resolver la ubicación física y elimina la divergencia de rutas con o sin barra final.
Tratamiento programado de transacciones fallidas
Las operaciones MERGE son atómicas, pero si el job se cancela a mitad de camino o se lanza la excepción analizada, queda un commit fallido en el transaction log. Dicha marca bloquea escrituras posteriores hasta que Spark la reconcilia. Para evitar parálisis continuas:
VACUUM `abfss://silver/entities/` RETAIN 0 HOURS;
La retención 0
conviene solamente tras un error de integración y fuera de ventanas en línea de negocio. Posteriormente, reestablece el umbral habitual (7/30 días).
Control de versiones de Delta Lake
Delta Lake ha mejorado la semántica de errores a partir de la rama 2.x; una combinación incoherente de artefactos (delta-core
, spark‑runtime
, io.delta
) puede reintroducir excepciones ya corregidas.
- Comprueba la versión en tiempo de ejecución:
spark.sql("SET -v").filter("key like 'spark.databricks.delta.%'").show(truncate=False)
- En Databricks, usa un runtime con “Photon enabled” ≥ 14.x.
En Azure Synapse o EMR instala el mismo JAR (delta‑core_2.12‑2.4.0.jar
) en todos los nodos.
Receta de corrección mínima reproducible
from delta import DeltaTable
from pyspark.sql import functions as F, Window
1. Deduplicación sólida
w = Window\.partitionBy("uprn").orderBy(F.lit(1))
dfNewData\_dedup = (
dfNewData
.withColumn("rn", F.row\_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn")
)
2. Merge seguro por nombre de tabla
delta\_table = DeltaTable.forName(spark, "C365.entities")
(delta\_table.alias("t")
.merge(dfNewData\_dedup.alias("s"), "t.uprn = s.uprn")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
Ventajas inmediatas
- Idempotencia: si el lote se repite, la clave primaria sigue limpia.
- Observabilidad: cada transacción incorpora métricas transparents (operationMetrics) visibles en el
DESCRIBE HISTORY
. - Performance: el plan físico se simplifica; al haber menos duplicados, Delta reduce shuffle y spill‑to‑disk.
Pruebas unitarias en laboratorio
La mejor forma de certificar que la solución evita regresiones es montar un test automático:
import pytest
from delta import DeltaTable
@pytest.fixture(scope="module")
def setup_tables(spark):
# Crea tabla temporal
path = "/tmp/delta/entities_test"
dfBase = spark.createDataFrame(
[(1, "foo"), (2, "bar")] ["uprn", "name"]
)
dfBase.write.format("delta").mode("overwrite").save(path)
return DeltaTable.forPath(spark, path), path
def testmergenoduplicates(setuptables, spark):
deltatbl, = setup_tables
dfSrc = spark.createDataFrame(
[(1, "foonew"), (1, "foodup")] ["uprn", "name"]
)
# Deduplicar antes del merge
dfSrc_dedup = dfSrc.dropDuplicates(["uprn"])
delta_tbl.alias("t").merge(
dfSrc_dedup.alias("s"), "t.uprn = s.uprn"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
out = delta_tbl.toDF().filter("uprn = 1").count()
assert out == 1
Al ejecutar pytest
el desarrollo confirma que la tabla final nunca almacena más de un registro por uprn.
Buenas prácticas de gobierno de datos
Más allá del bug‑fix, conviene institucionalizar controles para impedir que las filas problemáticas lleguen a la capa Silver:
- Restricciones en Delta:
ALTER TABLE C365.entities ADD CONSTRAINT pk_uprn UNIQUE (uprn);
- Pruebas de aceptación en la orquestación (Airflow, Data Factory, Oozie).
Marcar el lote como failed si existen duplicados antes de escribir. - Monitoreo de calidad con servicios como Great Expectations o Unity Catalog.
- Catálogo de datos: documenta la cardinalidad de uprn para que equipos aguas arriba conozcan la restricción.
Conclusión
El error multipleSourceRowMatchingTargetRowInMergeException
no es, en esencia, un problema del comando MERGE. Es una alerta sobre la violación de unicidad en la clave definida como condición de emparejamiento. La deduplicación proactiva, la verificación estricta de tipos y el alineamiento de versiones son la combinación que garantiza integridad en tablas Delta y evita reincidencias.