Bases: PipelineAction
This class implements an Append action for an ETL pipeline.
The WriteDeltaAppendAction appends a Dataframe to Delta Table.
Example
Write Delta Append:
action: WRITE_DELTA_APPEND
options:
table_identifier: my_catalog.my_schema.my_table
ignore_empty_df: false
Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
| class WriteDeltaAppendAction(PipelineAction):
"""This class implements an Append action for an ETL pipeline.
The WriteDeltaAppendAction appends a Dataframe to Delta Table.
Example:
```yaml
Write Delta Append:
action: WRITE_DELTA_APPEND
options:
table_identifier: my_catalog.my_schema.my_table
ignore_empty_df: false
```
"""
name: str = "WRITE_DELTA_APPEND"
def run(
self,
context: PipelineContext,
*,
table_identifier: str | None = None,
ignore_empty_df: bool = False,
options: dict[str, Any] | None = None,
**_: Any,
) -> PipelineContext:
"""Merge the dataframe into the delta table.
Args:
context: Context in which this Action is executed.
table_identifier: The identifier of the table. If passed, the
UC Adapter will be used to create a table object. Otherwise the Table
object will be created from the table metadata in the context.
ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
options: Additional options for the append writer.
Raises:
ValueError: If the table does not exist.
ValueError: If the data is not set in the pipeline context.
ValueError: If the table metadata is empty.
Returns:
Pipeline Context
"""
delta_append_writer = DeltaAppendWriter()
if context.data is None:
raise ValueError("Data is required for the append operation.")
if context.table_metadata is None and table_identifier is None:
raise ValueError("Table metadata or a table identifier are required for the append operation.")
if table_identifier is not None:
context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
else:
if context.table_metadata is None:
raise ValueError("Table metadata is required.")
if context.table_metadata is None:
raise ValueError("Table metadata is required.")
delta_append_writer.write(
table_identifier=context.table_metadata.identifier,
table_location=context.table_metadata.storage_path,
data_frame=context.data,
ignore_empty_df=ignore_empty_df,
options=options,
)
runtime_info = getattr(context, "runtime_info", None)
if runtime_info and runtime_info.get("is_delta_load"):
consume_delta_load(runtime_info)
return context.from_existing()
|
run(context, *, table_identifier=None, ignore_empty_df=False, options=None, **_)
Merge the dataframe into the delta table.
Parameters:
| Name |
Type |
Description |
Default |
context
|
PipelineContext
|
Context in which this Action is executed.
|
required
|
table_identifier
|
str | None
|
The identifier of the table. If passed, the
UC Adapter will be used to create a table object. Otherwise the Table
object will be created from the table metadata in the context.
|
None
|
ignore_empty_df
|
bool
|
A flag indicating whether to ignore an empty source dataframe.
|
False
|
options
|
dict[str, Any] | None
|
Additional options for the append writer.
|
None
|
Raises:
| Type |
Description |
ValueError
|
If the table does not exist.
|
ValueError
|
If the data is not set in the pipeline context.
|
ValueError
|
If the table metadata is empty.
|
Returns:
Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
| def run(
self,
context: PipelineContext,
*,
table_identifier: str | None = None,
ignore_empty_df: bool = False,
options: dict[str, Any] | None = None,
**_: Any,
) -> PipelineContext:
"""Merge the dataframe into the delta table.
Args:
context: Context in which this Action is executed.
table_identifier: The identifier of the table. If passed, the
UC Adapter will be used to create a table object. Otherwise the Table
object will be created from the table metadata in the context.
ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
options: Additional options for the append writer.
Raises:
ValueError: If the table does not exist.
ValueError: If the data is not set in the pipeline context.
ValueError: If the table metadata is empty.
Returns:
Pipeline Context
"""
delta_append_writer = DeltaAppendWriter()
if context.data is None:
raise ValueError("Data is required for the append operation.")
if context.table_metadata is None and table_identifier is None:
raise ValueError("Table metadata or a table identifier are required for the append operation.")
if table_identifier is not None:
context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
else:
if context.table_metadata is None:
raise ValueError("Table metadata is required.")
if context.table_metadata is None:
raise ValueError("Table metadata is required.")
delta_append_writer.write(
table_identifier=context.table_metadata.identifier,
table_location=context.table_metadata.storage_path,
data_frame=context.data,
ignore_empty_df=ignore_empty_df,
options=options,
)
runtime_info = getattr(context, "runtime_info", None)
if runtime_info and runtime_info.get("is_delta_load"):
consume_delta_load(runtime_info)
return context.from_existing()
|