catalog_writer
CatalogWriter
¶
A writer for Catalog tables.
Source code in src/cloe_nessy/integration/writer/catalog_writer.py
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | |
write(df, table_identifier, partition_by=None, options=None, mode='append')
staticmethod
¶
Write a table to the unity catalog.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | None
|
The DataFrame to write. |
required |
table_identifier
|
str | None
|
The table identifier in the unity catalog in the format 'catalog.schema.table'. |
required |
mode
|
str
|
The write mode. One of append, overwrite, error, errorifexists, ignore. |
'append'
|
partition_by
|
str | list[str] | None
|
Names of the partitioning columns. |
None
|
options
|
dict[str, str] | None
|
PySpark options for the DataFrame.saveAsTable operation (e.g. mergeSchema:true). |
None
|
Notes
append: Append contents of this DataFrame to existing data. overwrite: Overwrite existing data. error or errorifexists: Throw an exception if data already exists. ignore: Silently ignore this operation if data already exists.
Raises:
| Type | Description |
|---|---|
ValueError
|
If the mode is not one of append, overwrite, error, errorifexists, ignore. |
ValueError
|
If the table_identifier is not a string or not in the format 'catalog.schema.table'. |
ValueError
|
If the DataFrame is None. |
Source code in src/cloe_nessy/integration/writer/catalog_writer.py
write_stream(df, table_identifier, checkpoint_location=None, trigger_dict=None, options=None, mode='append', await_termination=False)
staticmethod
¶
Write a streaming DataFrame to a Unity Catalog table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame | None
|
The streaming DataFrame to write. |
required |
table_identifier
|
str | None
|
The table identifier in the Unity Catalog in the format 'catalog.schema.table'. |
required |
checkpoint_location
|
str | None
|
Location for checkpointing. Required for stream recovery. |
None
|
trigger_dict
|
dict | None
|
A dictionary specifying the trigger configuration for the streaming query. Supported keys include: - "processingTime": Specifies a time interval (e.g., "10 seconds") for micro-batch processing. - "once": Processes all available data once and then stops. - "continuous": Specifies a time interval (e.g., "1 second") for continuous processing. - "availableNow": Processes all available data immediately and then stops. If nothing is provided, the default is {"availableNow": True}. |
None
|
options
|
dict[str, str] | None
|
PySpark options for the DataFrame streaming write operation. |
None
|
mode
|
str
|
The write mode. For streaming, typically "append". |
'append'
|
await_termination
|
bool
|
If True, the function will wait for the streaming query to finish before returning. |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the mode is not supported for streaming operations. |
ValueError
|
If the table_identifier is not a string or not in the format 'catalog.schema.table'. |
ValueError
|
If the DataFrame is None. |
ValueError
|
If checkpoint_location is not provided. |