Skip to content

delta_merge_writer

DeltaMergeConfig

Bases: BaseModel

Configuration for Merge options.

Attributes:

Name Type Description
dataframe_columns list[str]

The columns of the DataFrame.

key_columns list[str]

List of column names that form the key for the merge operation.

when_matched_update bool

Flag to specify whether to perform an update operation when matching records are found in the target Delta table.

when_matched_delete bool

Flag to specify whether to perform a delete operation when matching records are found in the target Delta table.

when_not_matched_insert bool

Flag to specify whether to perform an insert operation when matching records are not found in the target Delta table.

cols_to_exclude_from_update list[str]

List of column names to be excluded from the update in the target Delta table.

use_partition_pruning bool

Flag to specify whether to use partition pruning to optimize the performance of the merge operation.

partition_by list[str]

List of column names to partition by.

column_mapping dict[str, str]

Mapping from target column names to source column names. If a column is not in the mapping, it's assumed to have the same name in both source and target.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
class DeltaMergeConfig(BaseModel):
    """Configuration for Merge options.

    Attributes:
        dataframe_columns: The columns of the DataFrame.
        key_columns: List of column names that form the key for the merge
            operation.
        when_matched_update: Flag to specify whether to perform an update
            operation when matching records are found in the target Delta table.
        when_matched_delete: Flag to specify whether to perform a delete
            operation when matching records are found in the target Delta table.
        when_not_matched_insert: Flag to specify whether to perform an insert
            operation when matching records are not found in the target Delta
            table.
        cols_to_exclude_from_update: List of column names to be excluded from
            the update in the target Delta table.
        use_partition_pruning: Flag to specify whether to use partition
            pruning to optimize the performance of the merge operation.
        partition_by: List of column names to partition by.
        column_mapping: Mapping from target column names to source column names.
            If a column is not in the mapping, it's assumed to have the same name
            in both source and target.
    """

    dataframe_columns: list[str]
    key_columns: list[str]
    cols_to_exclude_from_update: list[str] = Field(default_factory=list)
    when_matched_update: bool = True
    when_matched_delete: bool = False
    when_not_matched_insert: bool = True
    use_partition_pruning: bool = True
    partition_by: list[str] = Field(default_factory=list)
    column_mapping: dict[str, str] = Field(default_factory=dict)
    cols_to_merge: list[str] = Field(default_factory=list, alias="_cols_to_merge")
    cols_to_update: set[str] = Field(default_factory=set, alias="_cols_to_update")
    cols_to_insert: set[str] = Field(default_factory=set, alias="_cols_to_insert")
    final_cols_to_update: dict[str, str] = Field(default_factory=dict)
    final_cols_to_insert: dict[str, str] = Field(default_factory=dict)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="before")
    @classmethod
    def _validate_update_delete(cls, config: Any):
        """Update and delete operations must be mutually exclusive."""
        if config.get("when_matched_update") and config.get("when_matched_delete"):
            raise ValueError("Update and delete operations cannot be used together.")
        return config

    @model_validator(mode="before")
    @classmethod
    def _validate_key_columns(cls, config: Any):
        """Key columns must exist in the data frame (considering column mapping)."""
        key_columns = config.get("key_columns")
        dataframe_columns = config.get("dataframe_columns")
        column_mapping = config.get("column_mapping", {})

        # For each key column (target name), find the corresponding source column
        missing_columns = []
        for key_col in key_columns:
            source_col = column_mapping.get(key_col, key_col)
            if source_col not in dataframe_columns:
                missing_columns.append(f"{key_col} (maps to {source_col})" if key_col != source_col else key_col)

        if missing_columns:
            raise ValueError(f"Key columns must exist in the DataFrame. Missing columns: {', '.join(missing_columns)}")
        return config

    @model_validator(mode="before")
    @classmethod
    def _derive_merge_columns(cls, config: Any):
        """Derive update and insert columns from the DataFrame columns."""
        dataframe_columns = config.get("dataframe_columns", [])
        column_mapping = config.get("column_mapping", {})

        # Build reverse mapping: source_col -> target_col
        reverse_mapping = {v: k for k, v in column_mapping.items()}

        # Determine which target columns we're working with
        # For each dataframe column, find its corresponding target column
        target_columns = []
        for df_col in dataframe_columns:
            target_col = reverse_mapping.get(df_col, df_col)
            target_columns.append(target_col)

        config["_cols_to_merge"] = list(set(target_columns))

        if config.get("cols_to_exclude_from_update"):
            config["_cols_to_update"] = set(config["_cols_to_merge"]) - set(config["cols_to_exclude_from_update"])
        else:
            config["_cols_to_update"] = set(config["_cols_to_merge"])

        config["_cols_to_insert"] = config["_cols_to_merge"]

        # Build final mappings using column_mapping (target -> source)
        # For each target column, find the corresponding source column
        config["final_cols_to_update"] = {
            target_col: f"source.`{column_mapping.get(target_col, target_col)}`"
            for target_col in config["_cols_to_update"]
        }
        config["final_cols_to_insert"] = {
            target_col: f"source.`{column_mapping.get(target_col, target_col)}`"
            for target_col in config["_cols_to_insert"]
        }
        return config

    @model_validator(mode="after")
    def _validate_partition_pruning(self) -> Self:
        """If partition_pruning is set, the partition by columns must be known."""
        if self.use_partition_pruning is True and not self.partition_by:
            raise ValueError("Partition columns must be specified when using partition pruning.")
        return self

    @model_validator(mode="after")
    def _validate_cols_exist(self) -> Self:
        """If partition_pruning is set, the partition by columns must be known."""
        if any(col not in self.cols_to_merge for col in self.cols_to_update) or any(
            col not in self.cols_to_merge for col in self.cols_to_insert
        ):
            raise ValueError(
                "You specified column names for UPDATE or INSERT that either don't exist in the dataframe "
                "or are explicitly excluded from the MERGE.",
            )
        return self

DeltaMergeWriter

Bases: BaseDeltaWriter

A class for merging DataFrames to Delta tables.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
class DeltaMergeWriter(BaseDeltaWriter):
    """A class for merging DataFrames to Delta tables."""

    def __init__(self):
        super().__init__()
        self._spark = SessionManager.get_spark_session()
        self._dbutils = SessionManager.get_utils()

    def _validate_table_inputs(
        self, table: Table | None, table_identifier: str | None, storage_path: str | None
    ) -> tuple[str, str]:
        """Validates and retrieves table identifier and storage path."""
        if table is None and (table_identifier is None or storage_path is None):
            raise ValueError("Either a Table object or table_identifier and storage_path must be provided.")
        if table is not None:
            table_identifier = table.identifier
            storage_path = str(table.storage_path)
        if not storage_path:
            raise ValueError("Storage path must be provided or extracted from the Table object.")
        assert table_identifier is not None, "Table identifier must be provided."
        return table_identifier, storage_path

    def _build_match_conditions(self, data_frame: DataFrame, config: DeltaMergeConfig) -> str:
        """Builds match conditions for the Delta table merge."""
        match_conditions = self._merge_match_conditions_with_mapping(config.key_columns, config.column_mapping)
        if config.use_partition_pruning:
            match_conditions_list = [match_conditions] + [
                self._partition_pruning_conditions(data_frame, config.partition_by),
            ]
            match_conditions = " AND ".join(match_conditions_list)
        return match_conditions

    def _build_merge_operations(
        self, delta_table, data_frame: DataFrame, config: DeltaMergeConfig, match_conditions: str
    ):
        """Builds the Delta table merge operations."""
        delta_table_merge = delta_table.alias("target").merge(
            source=data_frame.alias("source"),
            condition=match_conditions,
        )
        if config.when_matched_update:
            delta_table_merge = delta_table_merge.whenMatchedUpdate(set=config.final_cols_to_update)
        elif config.when_matched_delete:
            delta_table_merge = delta_table_merge.whenMatchedDelete()
        if config.when_not_matched_insert:
            delta_table_merge = delta_table_merge.whenNotMatchedInsert(values=config.final_cols_to_insert)
        return delta_table_merge

    @table_log_decorator(operation="merge")
    def write(
        self,
        data_frame: DataFrame,
        table: Table | None = None,
        table_identifier: str | None = None,
        storage_path: str | None = None,
        ignore_empty_df: bool = False,
        **kwargs: Any,
    ):
        """Merges the data in a spark DataFrame into a Delta table.

        This function performs a merge operation between a DataFrame and a Delta
        table. The function supports update, delete, and insert operations on
        the target Delta table based on conditions specified by the user. The
        function also supports partition pruning to optimize the performance of
        the merge operation.

        When source and target tables have different column names, use the
        `column_mapping` parameter to map target column names to source column names.
        For any columns not in the mapping, the same name is assumed for both source
        and target.

        Args:
            table: The Table object representing the Delta table.
            table_identifier: The identifier of the Delta table in the format
                'catalog.schema.table'.
            storage_path: The location of the Delta table.
            data_frame: The DataFrame to be merged into the Delta table.
            ignore_empty_df: A flag indicating whether to ignore an empty source
                dataframe.
            kwargs: Passed to the
                [`DeltaMergeConfig`][cloe_nessy.integration.writer.delta_writer.delta_merge_writer.DeltaMergeConfig].
                Common kwargs include:

                - key_columns: List of target column names to use as merge keys.
                - column_mapping: Dict mapping target column names to source column names.
                - when_matched_update: Whether to update matching records.
                - when_matched_delete: Whether to delete matching records.
                - when_not_matched_insert: Whether to insert non-matching records.
                - cols_to_exclude_from_update: Target columns to exclude from updates.
                - use_partition_pruning: Whether to use partition pruning.
                - partition_by: List of partition columns.

        Raises:
            ValueError: If both, table and table_identifier or storage_path are provided.
            EmptyDataframeException: If the source dataframe is empty and
                ignore_empty_df is False.
            ValueError: If the specified columns for update or insert do not
                exist in the DataFrame or are explicitly excluded from the
                merge operation.
            ValueError: If partition columns are not specified when using
                partition pruning.

        Example:
            ```python
            # Merge with different column names
            writer.write(
                data_frame=source_df,
                table=target_table,
                key_columns=["customer_id"],
                column_mapping={
                    "customer_id": "cust_id",
                    "full_name": "name",
                    "email_address": "email"
                }
            )
            ```
        """
        if self._empty_dataframe_check(data_frame, ignore_empty_df):
            return
        table_identifier, storage_path = self._validate_table_inputs(table, table_identifier, storage_path)

        config = DeltaMergeConfig(dataframe_columns=data_frame.columns, **kwargs)

        delta_table = self.table_manager.get_delta_table(
            table=table,
            location=storage_path if not table else None,
            spark=data_frame.sparkSession,
        )

        match_conditions = self._build_match_conditions(data_frame, config)

        delta_table_merge = self._build_merge_operations(delta_table, data_frame, config, match_conditions)
        delta_table_merge.execute()
        self._report_delta_table_operation_metrics(
            table_identifier,
            operation_type=DeltaTableOperationType.MERGE,
        )

    @table_log_decorator(operation="stream_merge")
    def write_stream(self):
        """Not implemented yet. See docs for more details."""
        raise NotImplementedError(
            "Streaming merge is not implemented yet. Please use the `write` method for batch merges."
        )

write(data_frame, table=None, table_identifier=None, storage_path=None, ignore_empty_df=False, **kwargs)

Merges the data in a spark DataFrame into a Delta table.

This function performs a merge operation between a DataFrame and a Delta table. The function supports update, delete, and insert operations on the target Delta table based on conditions specified by the user. The function also supports partition pruning to optimize the performance of the merge operation.

When source and target tables have different column names, use the column_mapping parameter to map target column names to source column names. For any columns not in the mapping, the same name is assumed for both source and target.

Parameters:

Name Type Description Default
table Table | None

The Table object representing the Delta table.

None
table_identifier str | None

The identifier of the Delta table in the format 'catalog.schema.table'.

None
storage_path str | None

The location of the Delta table.

None
data_frame DataFrame

The DataFrame to be merged into the Delta table.

required
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
kwargs Any

Passed to the DeltaMergeConfig. Common kwargs include:

  • key_columns: List of target column names to use as merge keys.
  • column_mapping: Dict mapping target column names to source column names.
  • when_matched_update: Whether to update matching records.
  • when_matched_delete: Whether to delete matching records.
  • when_not_matched_insert: Whether to insert non-matching records.
  • cols_to_exclude_from_update: Target columns to exclude from updates.
  • use_partition_pruning: Whether to use partition pruning.
  • partition_by: List of partition columns.
{}

Raises:

Type Description
ValueError

If both, table and table_identifier or storage_path are provided.

EmptyDataframeException

If the source dataframe is empty and ignore_empty_df is False.

ValueError

If the specified columns for update or insert do not exist in the DataFrame or are explicitly excluded from the merge operation.

ValueError

If partition columns are not specified when using partition pruning.

Example
# Merge with different column names
writer.write(
    data_frame=source_df,
    table=target_table,
    key_columns=["customer_id"],
    column_mapping={
        "customer_id": "cust_id",
        "full_name": "name",
        "email_address": "email"
    }
)
Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
@table_log_decorator(operation="merge")
def write(
    self,
    data_frame: DataFrame,
    table: Table | None = None,
    table_identifier: str | None = None,
    storage_path: str | None = None,
    ignore_empty_df: bool = False,
    **kwargs: Any,
):
    """Merges the data in a spark DataFrame into a Delta table.

    This function performs a merge operation between a DataFrame and a Delta
    table. The function supports update, delete, and insert operations on
    the target Delta table based on conditions specified by the user. The
    function also supports partition pruning to optimize the performance of
    the merge operation.

    When source and target tables have different column names, use the
    `column_mapping` parameter to map target column names to source column names.
    For any columns not in the mapping, the same name is assumed for both source
    and target.

    Args:
        table: The Table object representing the Delta table.
        table_identifier: The identifier of the Delta table in the format
            'catalog.schema.table'.
        storage_path: The location of the Delta table.
        data_frame: The DataFrame to be merged into the Delta table.
        ignore_empty_df: A flag indicating whether to ignore an empty source
            dataframe.
        kwargs: Passed to the
            [`DeltaMergeConfig`][cloe_nessy.integration.writer.delta_writer.delta_merge_writer.DeltaMergeConfig].
            Common kwargs include:

            - key_columns: List of target column names to use as merge keys.
            - column_mapping: Dict mapping target column names to source column names.
            - when_matched_update: Whether to update matching records.
            - when_matched_delete: Whether to delete matching records.
            - when_not_matched_insert: Whether to insert non-matching records.
            - cols_to_exclude_from_update: Target columns to exclude from updates.
            - use_partition_pruning: Whether to use partition pruning.
            - partition_by: List of partition columns.

    Raises:
        ValueError: If both, table and table_identifier or storage_path are provided.
        EmptyDataframeException: If the source dataframe is empty and
            ignore_empty_df is False.
        ValueError: If the specified columns for update or insert do not
            exist in the DataFrame or are explicitly excluded from the
            merge operation.
        ValueError: If partition columns are not specified when using
            partition pruning.

    Example:
        ```python
        # Merge with different column names
        writer.write(
            data_frame=source_df,
            table=target_table,
            key_columns=["customer_id"],
            column_mapping={
                "customer_id": "cust_id",
                "full_name": "name",
                "email_address": "email"
            }
        )
        ```
    """
    if self._empty_dataframe_check(data_frame, ignore_empty_df):
        return
    table_identifier, storage_path = self._validate_table_inputs(table, table_identifier, storage_path)

    config = DeltaMergeConfig(dataframe_columns=data_frame.columns, **kwargs)

    delta_table = self.table_manager.get_delta_table(
        table=table,
        location=storage_path if not table else None,
        spark=data_frame.sparkSession,
    )

    match_conditions = self._build_match_conditions(data_frame, config)

    delta_table_merge = self._build_merge_operations(delta_table, data_frame, config, match_conditions)
    delta_table_merge.execute()
    self._report_delta_table_operation_metrics(
        table_identifier,
        operation_type=DeltaTableOperationType.MERGE,
    )

write_stream()

Not implemented yet. See docs for more details.

Source code in src/cloe_nessy/integration/writer/delta_writer/delta_merge_writer.py
@table_log_decorator(operation="stream_merge")
def write_stream(self):
    """Not implemented yet. See docs for more details."""
    raise NotImplementedError(
        "Streaming merge is not implemented yet. Please use the `write` method for batch merges."
    )