Index
ReadAPIAction
¶
Bases: PipelineAction
Reads data from an API and loads it into a Spark DataFrame.
This action executes HTTP requests (optionally paginated) in parallel using the
APIReader and returns a DataFrame
containing the response payloads plus request/response metadata. No intermediate
files are written.
Example
Read API:
action: READ_API
options:
base_url: https://some_url.com/api/
endpoint: my/endpoint/
auth:
- type: basic
username: my_username
password: my_password
- type: env
header_template:
"X-API-Key": "<ENV_VAR_NAME>"
- type: secret_scope
secret_scope: my_secret_scope
header_template:
"X-ORG-Token": "<SECRET_NAME>"
- type: azure_oauth
client_id: my_client_id
client_secret: my_client_secret
tenant_id: my_tenant_id
scope: <entra-id-client-id>
ChainedAuth) so that headers from env/secret_scope
are merged and auth flows like Basic / Azure OAuth are applied to each request.
If the API returns a large JSON object but you only want a nested list (e.g. data.items):
Only page_based and limit_offset strategies are currently supported. You may also
supply the shared/advanced options check_field, next_page_field, max_page,
pages_per_array_limit, and preliminary_probe.
1) Page-Based Pagination
Read API:
action: READ_API
options:
base_url: https://some_url.com/api/
endpoint: items/
params:
page: 1 # starting page (optional; defaults to 1)
per_page: 100
pagination:
strategy: page_based
page_field: page # required
# Shared/advanced (optional):
check_field: results # e.g. list to check for emptiness
next_page_field: info.has_next # boolean flag; if present it is trusted
max_page: -1 # -1 = all pages
pages_per_array_limit: 2 # chunk output rows every 2 pages
preliminary_probe: false # set true to pre-scan/build all page params
2) Limit/Offset Pagination
Read API:
action: READ_API
options:
base_url: https://some_url.com/api/
endpoint: products/
params:
limit: 50
offset: 0
pagination:
strategy: limit_offset
limit_field: limit # required
offset_field: offset # required
# Shared/advanced (optional):
check_field: data.items
next_page_field: page_info.has_next
max_page: -1
pages_per_array_limit: -1
preliminary_probe: false
GET .../products/?limit=50&offset=0
GET .../products/?limit=50&offset=50
GET .../products/?limit=50&offset=100
...
Using preliminary_probe to pre-compute all pages
If preliminary_probe: true is set, the reader will first probe the API to determine
the final page (using check_field and/or next_page_field) and then fan out one request
per page/offset—useful when driving fully parallel execution:
When requests_from_context: true, distinct rows from the upstream context.data
are converted into individual requests (enabling heterogeneous endpoints/params).
The DataFrame must have columns: endpoint, params, headers, data, json_body.
# Upstream step produces rows like:
# | endpoint | params | headers | data | json_body |
# | "u/123/profile" | {"verbose": "true"} | null | null | null |
# | "u/456/profile" | {"verbose": "false"} | null | null | null |
Read API:
action: READ_API
options:
base_url: https://some_url.com/api/
requests_from_context: true
method: GET
timeout: 45
Output
The action returns a Spark DataFrame with one column json_response (ArrayType).
Each element contains:
{
"response": "<json string of the API payload (optionally reduced by 'key')>",
"__metadata": {
"timestamp": "YYYY-MM-DD HH:MM:SS.ssssss",
"base_url": "https://some_url.com/api/",
"url": "https://some_url.com/api/my/endpoint/?q=...",
"status_code": 200,
"reason": "OK",
"elapsed": 0.123,
"endpoint": "my/endpoint/",
"query_parameters": { "q": "..." }
}
}
pages_per_array_limit > 0, responses are chunked
into arrays of that many pages; otherwise all pages for a request are grouped together.
Validation & Errors:
- base_url must be provided.
- Either endpoint must be provided or requests_from_context must be true.
- If requests_from_context is true, context.data must be present and non-empty.
- Pagination config:
- strategy must be page_based or limit_offset (other strategies are not yet supported).
- For page_based, page_field is required.
- For limit_offset, both limit_field and offset_field are required.
Secret information
Don't write sensitive information like passwords or tokens directly in the pipeline configuration. Use secret scopes or environment variables instead.
Source code in src/cloe_nessy/pipeline/actions/read_api.py
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | |
run(context, *, base_url=None, auth=None, endpoint=None, default_headers=None, method='GET', key=None, timeout=30, params=None, headers=None, data=None, json_body=None, pagination=None, max_retries=0, backoff_factor=0, max_concurrent_requests=8, requests_from_context=False, **_)
¶
Executes API requests in parallel by using mapInPandas.
We do NOT write intermediate files; instead we directly return the responses as rows in a Spark DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The pipeline context used to carry data between actions. |
required |
base_url
|
str | None
|
The base URL for all API requests. |
None
|
auth
|
Mapping[str, str | Mapping[str, str] | list[Mapping[str, str]]] | None
|
Authentication configuration, which may be a simple header map, a nested map for different auth scopes, or a list thereof. |
None
|
endpoint
|
str | None
|
The specific path to append to the base URL for this call. |
None
|
default_headers
|
dict[str, Any] | None
|
Headers to include on every request. |
None
|
method
|
str
|
HTTP method to use. |
'GET'
|
key
|
str | None
|
JSON field name to extract from each response. |
None
|
timeout
|
int
|
Request timeout in seconds. |
30
|
params
|
dict[str, Any] | None
|
Query parameters to append to the URL. |
None
|
headers
|
dict[str, Any] | None
|
Additional request-specific headers. |
None
|
data
|
dict[str, Any] | None
|
Form-encoded body to send. |
None
|
json_body
|
dict[str, Any] | None
|
JSON-encoded body to send. |
None
|
pagination
|
PaginationConfigData | None
|
Configuration for paginated endpoints. |
None
|
max_retries
|
int
|
Number of times to retry on failure. |
0
|
backoff_factor
|
int
|
Multiplier for retry backoff delays. |
0
|
max_concurrent_requests
|
int
|
Maximum number of parallel API calls. |
8
|
requests_from_context
|
bool
|
Whether to derive request parameters from context data. |
False
|
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The updated context, with the read data as a DataFrame. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no base URL is provided. |
ValueError
|
If neither an endpoint nor context-derived requests are specified. |
ValueError
|
If context-derived requests are enabled but no data is present in context. |
Source code in src/cloe_nessy/pipeline/actions/read_api.py
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 | |
ReadCatalogTableAction
¶
Bases: PipelineAction
Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.
This function retrieves data from a catalog table using the
CatalogReader identified
by either the table_identifier parameter or the table_metadata from the
provided PipelineContext of a previous step. The retrieved data is loaded
into a DataFrame and returned as part of an updated PipelineContext.
Example
Read Sales Table:
action: READ_CATALOG_TABLE
options:
table_identifier: my_catalog.business_schema.sales_table
options: <options for the CatalogReader read method>
delta_load_options:
strategy: CDF
delta_load_identifier: my_delta_load_id
strategy_options:
deduplication_columns: ["id"]
enable_full_load: true
Read Sales Table:
action: READ_CATALOG_TABLE
options:
table_identifier: my_catalog.business_schema.sales_table
options: <options for the CatalogReader read method>
delta_load_options:
strategy: CDF
delta_load_identifier: my_delta_load_id
strategy_options:
deduplication_columns: ["id"]
enable_full_load: true
Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | |
run(context, *, table_identifier=None, options=None, delta_load_options=None, stream=False, **_)
¶
Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The pipeline's context, which contains metadata and configuration for the action. |
required |
table_identifier
|
str | None
|
The identifier of the catalog table to
read. If not provided, the function will attempt to use the table
identifier from the |
None
|
options
|
dict[str, str] | None
|
A dictionary of options for customizing
the |
None
|
delta_load_options
|
dict[Any, Any] | DeltaLoadOptions | None
|
Options for delta loading, if applicable.
Configures the |
None
|
stream
|
bool
|
If True, the action will read the table as a stream. |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither |
Returns: An updated pipeline context containing the data read from the catalog table as a DataFrame.
Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 | |
ReadExcelAction
¶
Bases: PipelineAction
Reads data from an Excel file or directory of Excel files and returns a DataFrame.
The function reads Excel files using the
ExcelDataFrameReader either
from a single file or a directory path. It can read specific sheets, handle
file extensions, and offers various options to customize how the data is
read, such as specifying headers, index columns, and handling missing
values. The resulting data is returned as a DataFrame, and metadata about
the read files can be included in the context.
Example
More Options
The READ_EXCEL action supports additional options that can be passed to the
run method. For more information, refer to the method documentation.
Source code in src/cloe_nessy/pipeline/actions/read_excel.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | |
run(context, *, file=None, path=None, extension='xlsx', recursive=False, sheet_name=0, sheet_name_as_column=False, header=0, index_col=None, usecols=None, dtype=None, fillna=None, true_values=None, false_values=None, nrows=None, na_values=None, keep_default_na=True, parse_dates=False, date_parser=None, thousands=None, include_index=False, options=None, add_metadata_column=True, load_as_strings=False, **_)
¶
Reads data from an Excel file or directory of Excel files and returns a DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which the action is executed. |
required |
file
|
str | None
|
The path to a single Excel file. Either |
None
|
path
|
str | None
|
The directory path containing multiple Excel files. Either |
None
|
extension
|
str
|
The file extension to look for when reading from a directory. |
'xlsx'
|
recursive
|
bool
|
Whether to include subdirectories when reading from a directory path. |
False
|
sheet_name
|
str | int | list
|
The sheet name(s) or index(es) to read from the Excel file. |
0
|
sheet_name_as_column
|
bool
|
Whether to add a column with the sheet name to the DataFrame. |
False
|
header
|
int | list[int]
|
Row number(s) to use as the column labels. |
0
|
index_col
|
int | list[int] | None
|
Column(s) to use as the index of the DataFrame. |
None
|
usecols
|
int | str | list | Callable | None
|
Subset of columns to parse. Can be an integer, string, list, or function. |
None
|
dtype
|
str | None
|
Data type for the columns. |
None
|
fillna
|
str | dict[str, list[str]] | dict[str, str] | None
|
Method or value to use to fill NaN values. |
None
|
true_values
|
list | None
|
Values to consider as True. |
None
|
false_values
|
list | None
|
Values to consider as False. |
None
|
nrows
|
int | None
|
Number of rows to parse. |
None
|
na_values
|
list[str] | dict[str, list[str]] | None
|
Additional strings to recognize as NaN/NA. |
None
|
keep_default_na
|
bool
|
Whether to append default NaN values when custom |
True
|
parse_dates
|
bool | list | dict
|
Options for parsing date columns. |
False
|
date_parser
|
Callable | None
|
Function to use for converting strings to datetime objects. |
None
|
thousands
|
str | None
|
Thousands separator to use when parsing numeric columns. |
None
|
include_index
|
bool
|
Whether to include an index column in the output DataFrame. |
False
|
options
|
dict | None
|
Additional options to pass to the DataFrame reader. |
None
|
add_metadata_column
|
bool
|
Whether to add a metadata column with file information to the DataFrame. |
True
|
load_as_strings
|
bool
|
Whether to load all columns as strings. |
False
|
Raises:
| Type | Description |
|---|---|
ValueError
|
Raised if both |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The updated context, with the read data as a DataFrame. |
Source code in src/cloe_nessy/pipeline/actions/read_excel.py
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 | |
ReadFilesAction
¶
Bases: PipelineAction
Reads files from a specified location.
If an extension is provided, all files with the given extension will be read
using the FileReader. If no
extension is provided, the spark_format must be set, and all files in the
location will be read using a DataFrameReader with the specified format.
Example
Read Files:
action: READ_FILES
options:
location: json_file_folder/
search_subdirs: True
spark_format: JSON
Define Spark Format
Use the spark_format option to specify the format with which
to read the files. Supported formats are e.g., CSV, JSON,
PARQUET, TEXT, and XML.
Read Files:
action: READ_FILES
options:
location: csv_file_folder/
search_subdirs: True
extension: csv
Define Extension
Use the extension option to specify the extension of the files
to read. If not specified, the spark_format will be derived from
the extension.
Read Files:
action: READ_FILES
options:
location: file_folder/
extension: abc_custom_extension # specifies the files to read
spark_format: CSV # specifies the format to read the files with
Define both Extension & Spark Format
Use the extension option to specify the extension of the files
to read. Additionally, use the spark_format option to specify
the format with which to read the files.
Read Delta Files:
action: READ_FILES
options:
location: /path/to/delta/table
spark_format: delta
delta_load_options:
strategy: CDF
delta_load_identifier: my_delta_files_load
strategy_options:
deduplication_columns: ["id"]
enable_full_load: false
Delta Loading for Files
Use delta_load_options when reading Delta Lake tables to enable
incremental loading. This works with both CDF and timestamp strategies.
Source code in src/cloe_nessy/pipeline/actions/read_files.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | |
run(context, *, location=None, search_subdirs=False, extension=None, spark_format=None, schema=None, add_metadata_column=True, options=None, delta_load_options=None, **_)
¶
Reads files from a specified location.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
location
|
str | None
|
The location from which to read files. |
None
|
search_subdirs
|
bool
|
Recursively search subdirectories for files if an extension is provided. |
False
|
extension
|
str | None
|
The file extension to filter files by. |
None
|
spark_format
|
str | None
|
The format to use for reading the files. If not provided, it will be deferred from the file extension. |
None
|
schema
|
str | None
|
The schema of the data. If None, schema is obtained from the context metadata. |
None
|
add_metadata_column
|
bool
|
Whether to include the |
True
|
options
|
dict[str, str] | None
|
Additional options passed to the reader. |
None
|
delta_load_options
|
dict[Any, Any] | DeltaLoadOptions | None
|
Options for delta loading, if applicable. When provided for Delta format files, enables incremental loading using delta loader strategies. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If neither |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the Action has been executed, containing the read data as a DataFrame. |
Source code in src/cloe_nessy/pipeline/actions/read_files.py
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 | |
ReadMetadataYAMLAction
¶
Bases: PipelineAction
Reads table metadata from a yaml file using the Table model.
Example
Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
run(context, *, file_path=None, catalog_name=None, schema_name=None, storage_path=None, **_)
¶
Reads table metadata from a yaml file using the Table model.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
file_path
|
str | None
|
The path to the file that defines the table. |
None
|
catalog_name
|
str | None
|
The name of the catalog for the table. |
None
|
schema_name
|
str | None
|
The name of the schema for the table. |
None
|
storage_path
|
str | None
|
The storage path for the table, if applicable. If not provided, the table will be considered a managed table. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If any issues occur while reading the table metadata, such as an invalid table, missing file, missing path, or missing catalog/schema names. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the execution of this Action, containing the table metadata. |
Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
TransformChangeDatatypeAction
¶
Bases: PipelineAction
Changes the datatypes of specified columns in the given DataFrame.
Data Types
We make use of the PySpark cast function to change the data types of
the columns. Valid data types can be found in the PySpark
documentation.
Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
run(context, *, columns=None, **_)
¶
Changes the datatypes of specified columns in the given DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
columns
|
dict[str, str] | None
|
A dictionary where the key is the column name and the value is the desired datatype. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no columns are provided. |
ValueError
|
If the data from context is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the execution of this Action, containing the DataFrame with updated column datatypes. |
Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
TransformCleanColumnNamesAction
¶
Bases: PipelineAction
Fixes column names in the DataFrame to be valid.
Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.
Invalid characters include
- Any non-word character (anything other than letters, digits, and underscores).
- A single leading underscore.
Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
run(context, **_)
¶
Fixes column names in the DataFrame to be valid.
Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the data from the context is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the execution of this Action, containing the DataFrame with cleaned column names. |
Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
TransformConcatColumnsAction
¶
Bases: PipelineAction
Concatenates the specified columns in the given DataFrame.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
run(context, *, name='', columns=None, separator=None, **_)
¶
Concatenates the specified columns in the given DataFrame.
Warning
Null Handling Behavior¶
The behavior of null handling differs based on whether a separator is provided:
- When
separatoris specified: The function uses Spark'sconcat_ws, which ignoresNULLvalues. In this case,NULLvalues are treated as empty strings ("") and are excluded from the final concatenated result. - When
separatoris not specified: The function defaults to using Spark'sconcat, which returnsNULLif any of the concatenated values isNULL. This means the presence of aNULLin any input will make the entire outputNULL.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
name
|
str
|
The name of the new concatenated column. |
''
|
columns
|
list[str] | None
|
A list of columns to be concatenated. |
None
|
separator
|
str | None
|
The separator used between concatenated column values. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no name is provided. |
ValueError
|
If no columns are provided. |
ValueError
|
If the data from context is None. |
ValueError
|
If 'columns' is not a list. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the execution of this Action, containing the DataFrame with the concatenated column. |
Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
TransformConvertTimestampAction
¶
Bases: PipelineAction
This action performs timestamp based conversions.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_convert_timestamp.py
run(context, *, columns=None, source_format='', target_format='', **_)
¶
Converts column(s) from a given source format to a new format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
columns
|
list[str] | str | None
|
A column name or a list of column names that should be converted. |
None
|
source_format
|
str
|
Initial format type of the column. |
''
|
target_format
|
str
|
Desired format type of the column.
This also supports passing a format string like |
''
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no column, source_format or target_format are provided. |
ValueError
|
If source_format or target_format are not supported. |
Returns:
| Name | Type | Description |
|---|---|---|
PipelineContext |
PipelineContext
|
Context after the execution of this Action. |
Source code in src/cloe_nessy/pipeline/actions/transform_convert_timestamp.py
TransformDecodeAction
¶
Bases: PipelineAction
Decodes values of a specified column in the DataFrame based on the given format.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | |
run(context, *, column=None, input_format=None, schema=None, **_)
¶
Decodes values of a specified column in the DataFrame based on the given format.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
column
|
str | None
|
The name of the column that should be decoded. |
None
|
input_format
|
str | None
|
The format from which the column should be decoded. Currently supported formats are 'base64' and 'json'. |
None
|
schema
|
str | None
|
For JSON input, the schema of the JSON object. If empty, the schema is inferred from the first row of the DataFrame. For base64 input, the data type to which the column is cast. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no column is specified. |
ValueError
|
If no input_format is specified. |
ValueError
|
If the data from context is None. |
ValueError
|
If an invalid input_format is provided. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the execution of this Action, containing the DataFrame with the decoded column(s). |
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
TransformDeduplication
¶
Bases: PipelineAction
Deduplicates the data from the given DataFrame.
This method deduplicates the data where the key columns are the same and keeps the entry with the highest values in the order_by_columns (can be changed to lowest by setting the parameter descending to false).
Example
Source code in src/cloe_nessy/pipeline/actions/transform_deduplication.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 | |
run(context, *, key_columns=None, order_by_columns=None, descending=True, **_)
¶
Deduplicates the data based on key columns and order by columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
key_columns
|
list[str] | None
|
A list of the key column names. The returned data only keeps one line of data with the same key columns. |
None
|
order_by_columns
|
list[str] | None
|
A list of order by column names. The returned data keeps the first line of data with the same key columns ordered by these columns. |
None
|
descending
|
bool
|
Whether to sort descending or ascending. |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no key_columns are specified. |
ValueError
|
If no order_by_columns are specified. |
ValueError
|
If the data from context is None. |
ValueError
|
If key_columns and order_by_columns overlap. |
ValueError
|
If key_columns or order_by_columns contain Nulls. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the execution of this Action, containing the DataFrame with the deduplicated data. |
Source code in src/cloe_nessy/pipeline/actions/transform_deduplication.py
TransformDistinctAction
¶
Bases: PipelineAction
Selects distinct rows from the DataFrame in the given context.
If a subset is given these columns are used for duplicate comparison. If no subset is given all columns are used.
Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
run(context, *, subset=None, **_)
¶
Selects distinct rows from the DataFrame in the given context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this Action is executed. |
required |
subset
|
list[str] | None
|
List of column names to use for duplicate comparison (default All columns). |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the data from the context is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
The context after the execution of this Action, containing the DataFrame with distinct rows. |
Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
TransformFilterAction
¶
Bases: PipelineAction
Filters the DataFrame in the given context based on a specified condition.
Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
run(context, *, condition='', **_)
¶
Filters the DataFrame in the given context based on a specified condition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
condition
|
str
|
A SQL-like expression used to filter the DataFrame. |
''
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no condition is provided. |
ValueError
|
If the data from the context is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action, containing the filtered DataFrame. |
Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
TransformGroupAggregate
¶
Bases: PipelineAction
Performs aggregation operations on grouped data within a DataFrame.
This class allows you to group data by specified columns and apply various aggregation functions to other columns. The aggregation functions can be specified as a dictionary where keys are column names and values are either a single aggregation function or a list of functions.
The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation function as a prefix to the column name.
Example
Transform Group Aggregate:
action: TRANSFORM_GROUP_AGGREGATE
options:
grouping_columns:
- column1
- column2
aggregations:
column3:
- sum
- avg
column4: max
This example groups the DataFrame by column1 and column2 and aggregates column3 by sum and average
and column4 by max. The resulting DataFrame will contain the grouped columns column1 and column2
and the aggregated columns sum_column3, avg_column3, and max_column4.
Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
run(context, *, grouping_columns=None, aggregations=None, **_)
¶
Executes the aggregation on the grouped data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this action is executed. |
required |
grouping_columns
|
list[str] | None
|
A list of columns to group by. |
None
|
aggregations
|
dict[str, str | list] | None
|
A dictionary where keys are column names and values are either a single aggregation function or a list of functions. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the context data is None. |
ValueError
|
If no aggregations are provided. |
ValueError
|
If invalid aggregation operations are provided. |
ValueError
|
If columns with unsupported data types are included in the aggregations. |
Returns:
| Name | Type | Description |
|---|---|---|
PipelineContext |
PipelineContext
|
The context after the execution of this action. |
Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
TransformHashColumnsAction
¶
Bases: PipelineAction
Hashes specified columns in a DataFrame using a chosen algorithm.
Given the following hash_config:
Example
Given a DataFrame df with the following structure:
| column1 | column2 | column3 |
|---|---|---|
| foo | bar | baz |
After running the action, the resulting DataFrame will look like:
| column1 | column2 | column3 | hashed_column1 | hashed_column2 |
|---|---|---|---|---|
| foo | bar | baz | 17725b837e9c896e7123b142eb980131dcc0baa6160db45d4adfdb21 | 1670361220 |
Hash values might vary
The actual hash values will depend on the hashing algorithm used and the input data.
Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 | |
run(context, *, hash_config=None, **_)
¶
Hashes the specified columns in the DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
hash_config
|
HashConfig | None
|
Dictionary that contains the configuration for executing the hashing. |
None
|
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Updated PipelineContext with hashed columns. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If columns are missing, data is None, or algorithm/bits are invalid. |
ValueError
|
If the hash configuration is invalid. |
Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
TransformJoinAction
¶
Bases: PipelineAction
Joins the current DataFrame with another DataFrame defined in joined_data.
The join operation is performed based on specified columns and the type of
join indicated by the how parameter. Supported join types can be taken
from PySpark
documentation
Examples:
Join Tables:
action: TRANSFORM_JOIN
options:
joined_data: ((step:Load Conditions Table))
join_condition: |
left.material = right.material
AND right.sales_org = '10'
AND right.distr_chan = '10'
AND right.knart = 'ZUVP'
AND right.lovmkond <> 'X'
AND right.sales_unit = 'ST'
AND left.calday BETWEEN
to_date(right.date_from, 'yyyyMMdd') AND
to_date(right.date_to, 'yyyyMMdd')
how: left
Referencing a DataFrame from another step
The joined_data parameter is a reference to the DataFrame from another step.
The DataFrame is accessed using the result attribute of the PipelineStep. The syntax
for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.
Dictionary Join Syntax
When using a dictionary for join_on, the keys represent columns
from the DataFrame in context and the values represent columns from
the DataFrame in joined_data. This is useful when joining tables
with different column names for the same logical entity.
Complex Join Conditions
Use join_condition instead of join_on for complex joins with literals,
expressions, and multiple conditions. Reference columns using left.column_name
for the main DataFrame and right.column_name for the joined DataFrame.
Supports all PySpark functions and operators.
Source code in src/cloe_nessy/pipeline/actions/transform_join.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | |
run(context, *, joined_data=None, join_on=None, join_condition=None, how='inner', **_)
¶
Joins the current DataFrame with another DataFrame defined in joined_data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
joined_data
|
PipelineStep | None
|
The PipelineStep context defining the DataFrame to join with as the right side of the join. |
None
|
join_on
|
list[str] | str | dict[str, str] | None
|
A string for the join column name, a list of column names, or a dictionary mapping columns from the left DataFrame to the right DataFrame. This defines the condition for the join operation. Mutually exclusive with join_condition. |
None
|
join_condition
|
str | None
|
A string containing a complex join expression with literals, functions, and multiple conditions. Use 'left.' and 'right.' prefixes to reference columns from respective DataFrames. Mutually exclusive with join_on. |
None
|
how
|
str
|
The type of join to perform. Must be one of: inner, cross, outer, full, fullouter, left, leftouter, right, rightouter, semi, anti, etc. |
'inner'
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no joined_data is provided. |
ValueError
|
If neither join_on nor join_condition is provided. |
ValueError
|
If both join_on and join_condition are provided. |
ValueError
|
If the data from context is None. |
ValueError
|
If the data from the joined_data is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action, containing the result of the join operation. |
Source code in src/cloe_nessy/pipeline/actions/transform_join.py
TransformJsonNormalize
¶
Bases: PipelineAction
Normalizes and flattens the DataFrame by exploding array columns and flattening struct columns.
The method performs recursive normalization on the DataFrame present in the context, ensuring that the order of columns is retained and new columns created by flattening structs are appended after existing columns.
Example
Example Input Data:| id | name | coordinates | attributes |
|---|---|---|---|
| 1 | Alice | [10.0, 20.0] | {"age": 30, "city": "NY"} |
| 2 | Bob | [30.0, 40.0] | {"age": 25, "city": "LA"} |
Example Output Data:
| id | name | coordinates | attributes_age | attributes_city |
|---|---|---|---|---|
| 1 | Alice | [10.0, 20.0] | 30 | NY |
| 2 | Bob | [30.0, 40.0] | 25 | LA |
Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | |
run(context, *, exclude_columns=None, **_)
¶
Executes the normalization process on the DataFrame present in the context.
Please note that columns retain their relative order during the normalization process, and new columns created by flattening structs are appended after the existing columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The pipeline context that contains the DataFrame to be normalized. |
required |
exclude_columns
|
list[str] | None
|
A list of column names to exclude from the normalization process. These columns will not be exploded or flattened. |
None
|
**_
|
Any
|
Additional keyword arguments (not used). |
{}
|
Returns:
| Type | Description |
|---|---|
PipelineContext
|
A new pipeline context with the normalized DataFrame. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the DataFrame in the context is |
Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
TransformRegexExtract
¶
Bases: PipelineAction
Extract values from a specified column in a DataFrame using regex patterns.
This action extracts values from a column based on a regex pattern and stores the result in a new column. Optionally, you can replace the matched pattern in the original column with a different string, remove the original column, or add a boolean column indicating which rows matched the pattern.
Example
This action also supports processing multiple columns simultaneously. To use this functionality, structure the configuration as a dictionary mapping each source column name to its extraction parameters.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_regex_extract.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | |
run(context, source_column_name='', extract_column_name='', pattern='', keep_original_column=True, replace_by='', match_info_column_name='', extract_columns=None, **_)
¶
Performs a regex extract (and replace) on a specified column in a DataFrame.
This function performs a regex extract (and optionally a replace) on one or more columns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The context in which this action is executed. |
required |
source_column_name
|
str
|
Column name to perform the regex replace on. |
''
|
pattern
|
str
|
Regex pattern to match. |
''
|
replace_by
|
str
|
String that should replace the extracted pattern in the source column. |
''
|
extract_column_name
|
str
|
Column name to store the extract, default: |
''
|
keep_original_column
|
bool
|
Whether to keep the original column, default: True |
True
|
match_info_column_name
|
str
|
Column name to store a boolean column whether a match was found, default: None |
''
|
extract_columns
|
dict | None
|
Dictionary of column names and their corresponding 1-column-case. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If any of the required arguments are not provided. |
ValueError
|
If the regex pattern is invalid. |
Returns:
| Name | Type | Description |
|---|---|---|
PipelineContext |
PipelineContext
|
Transformed context with the modified DataFrame. |
Source code in src/cloe_nessy/pipeline/actions/transform_regex_extract.py
TransformRenameColumnsAction
¶
Bases: PipelineAction
Renames the specified columns in the DataFrame.
This method updates the DataFrame in the provided context by renaming columns according
to the mapping defined in the columns dictionary, where each key represents an old column
name and its corresponding value represents the new column name.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
run(context, *, columns=None, **_)
¶
Renames the specified columns in the DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
columns
|
dict[str, str] | None
|
A dictionary where the key is the old column name and the value is the new column name. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no columns are provided. |
ValueError
|
If the data from context is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action. |
Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
TransformReplaceValuesAction
¶
Bases: PipelineAction
Replaces specified values in the given DataFrame.
This method iterates over the specified replace dictionary, where each key is a column name
and each value is another dictionary containing old values as keys and new values as the corresponding
values. The method updates the DataFrame by replacing occurrences of the old values with the new ones
in the specified columns.
Example
Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
run(context, *, replace=None, **_)
¶
Replaces specified values in the given DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
replace
|
dict[str, dict[str, str]] | None
|
A dictionary where each key is the column name and the corresponding value is another dictionary mapping old values to new values. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no replace values are provided. |
ValueError
|
If the data from context is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action. |
Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
TransformSelectColumnsAction
¶
Bases: PipelineAction
Selects specified columns from the given DataFrame.
This method allows you to include or exclude specific columns from the
DataFrame. If include_columns is provided, only those columns will be
selected. If exclude_columns is provided, all columns except those will be
selected. The method ensures that the specified columns exist in the
DataFrame before performing the selection.
Example
Example Input Data:
| id | name | coordinates | attributes |
|---|---|---|---|
| 1 | Alice | [10.0, 20.0] | {"age": 30, "city": "NY"} |
| 2 | Bob | [30.0, 40.0] | {"age": 25, "city": "LA"} |
Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | |
run(context, *, include_columns=None, exclude_columns=None, raise_on_non_existing_columns=True, **_)
¶
Selects specified columns from the given DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
include_columns
|
list[str] | None
|
A list of column names that should be included. If provided, only these columns will be selected. |
None
|
exclude_columns
|
list[str] | None
|
A list of column names that should be excluded. If provided, all columns except these will be selected. |
None
|
raise_on_non_existing_columns
|
bool
|
If True, raise an error if a specified column is not found in the DataFrame. If False, ignore the column and continue with the selection. |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If a specified column is not found in the DataFrame. |
ValueError
|
If neither include_columns nor exclude_columns are provided, or if both are provided. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action. |
Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
TransformSqlAction
¶
Bases: PipelineAction
Executes a SQL statement on a DataFrame within the provided context.
A temporary view is created from the current DataFrame, and the SQL statement is executed on that view. The resulting DataFrame is returned.
Example
SQL Transform:
action: TRANSFORM_SQL
options:
sql_statement: select city, revenue, firm from {DATA_FRAME} where product="Databricks"
Note
The SQL statement should reference the DataFrame as "{DATA_FRAME}". This nessy specific placeholder will be replaced with your input DataFrame from the context. If your pipeline is defined as an f-string, you can escape the curly braces by doubling them, e.g., "{{DATA_FRAME}}".
Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
run(context, *, sql_statement='', **kwargs)
¶
Executes a SQL statement on a DataFrame within the provided context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
sql_statement
|
str
|
A string containing the SQL statement to be executed. The source table should be referred to as "{DATA_FRAME}". |
''
|
**kwargs
|
Any
|
Additional keyword arguments are passed as placeholders to the SQL statement. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If "{DATA_FRAME}" is not included in the SQL statement. |
ValueError
|
If no SQL statement is provided. |
ValueError
|
If the data from the context is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action, containing the DataFrame resulting from the SQL statement. |
Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
TransformUnionAction
¶
Bases: PipelineAction
Unions multiple DataFrames together.
This method takes the current DataFrame from the context and unites it with
additional DataFrames specified in the union_data argument. All DataFrames
must have the same schema. If any DataFrame in union_data is None or
empty, a ValueError will be raised.
Example
Union Tables:
action: TRANSFORM_UNION
options:
union_data:
- ((step:Filter First Table))
- ((step:SQL Transform Second Table))
Referencing a DataFrame from another step
The union_data parameter is a reference to the DataFrame from another step.
The DataFrame is accessed using the result attribute of the PipelineStep. The syntax
for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.
Source code in src/cloe_nessy/pipeline/actions/transform_union.py
run(context, *, union_data=None, **_)
¶
Unions multiple DataFrames together.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
union_data
|
list[PipelineStep] | None
|
A list of PipelineSteps that define the DataFrames to union with the current context. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no union_data is provided. |
ValueError
|
If the data from context is None. |
ValueError
|
If the data from any of the union_data is None. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action. |
Source code in src/cloe_nessy/pipeline/actions/transform_union.py
TransformWithColumnAction
¶
Bases: PipelineAction
Add or update a column in the DataFrame using a SQL expression.
This action uses PySpark's expr() function to evaluate SQL expressions and create or update columns in the DataFrame.
Examples:
Source code in src/cloe_nessy/pipeline/actions/transform_with_column.py
run(context, *, column_name='', expression='', **_)
¶
Add or update a column using a SQL expression.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
The pipeline context containing the DataFrame |
required |
column_name
|
str
|
Name of the column to create or update |
''
|
expression
|
str
|
SQL expression to evaluate for the column value |
''
|
**_
|
Any
|
Additional unused keyword arguments |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
PipelineContext |
PipelineContext
|
Updated context with the modified DataFrame |
Raises:
| Type | Description |
|---|---|
ValueError
|
If column_name is not provided |
ValueError
|
If expression is not provided |
ValueError
|
If context.data is None |
Exception
|
If the SQL expression is invalid |
Source code in src/cloe_nessy/pipeline/actions/transform_with_column.py
WriteCatalogTableAction
¶
Bases: PipelineAction
Writes a DataFrame to a specified catalog table using CatalogWriter.
Examples:
Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | |
run(context, *, table_identifier=None, mode='append', partition_by=None, options=None, checkpoint_location=None, trigger_dict=None, await_termination=False, **_)
¶
Writes a DataFrame to a specified catalog table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
table_identifier
|
str | None
|
The table identifier in the unity catalog in the format 'catalog.schema.table'. If not provided, attempts to use the context's table metadata. |
None
|
mode
|
str
|
The write mode. One of 'append', 'overwrite', 'error', 'errorifexists', or 'ignore'. |
'append'
|
partition_by
|
str | list[str] | None
|
Names of the partitioning columns. |
None
|
checkpoint_location
|
str | None
|
Location for checkpointing. |
None
|
trigger_dict
|
dict | None
|
A dictionary specifying the trigger configuration for the streaming query. |
None
|
await_termination
|
bool
|
If True, the function will wait for the streaming query to finish before returning. |
False
|
options
|
dict[str, str] | None
|
Additional options for the DataFrame write operation. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the table name is not specified or cannot be inferred from the context. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Context after the execution of this Action. |
Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
WriteDeltaAppendAction
¶
Bases: PipelineAction
This class implements an Append action for an ETL pipeline.
The WriteDeltaAppendAction appends a Dataframe to Delta Table.
Example
Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
run(context, *, table_identifier=None, ignore_empty_df=False, options=None, **_)
¶
Merge the dataframe into the delta table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
table_identifier
|
str | None
|
The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context. |
None
|
ignore_empty_df
|
bool
|
A flag indicating whether to ignore an empty source dataframe. |
False
|
options
|
dict[str, Any] | None
|
Additional options for the append writer. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the table does not exist. |
ValueError
|
If the data is not set in the pipeline context. |
ValueError
|
If the table metadata is empty. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Pipeline Context |
Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
WriteDeltaMergeAction
¶
Bases: PipelineAction
This class implements a Merge action for an ETL pipeline.
The MergeIntoDeltaAction merges a Dataframe to Delta Table.
Example
# Basic merge with same column names
Write Delta Merge:
action: WRITE_DELTA_MERGE
options:
table_identifier: my_catalog.my_schema.my_table
key_columns:
- id
- customer_id
cols_to_exclude_from_update:
- created_at
when_matched_update: true
when_not_matched_insert: true
use_partition_pruning: true
# Merge with different source and target column names
Write Delta Merge with Mapping:
action: WRITE_DELTA_MERGE
options:
table_identifier: my_catalog.my_schema.my_table
key_columns:
- customer_id
column_mapping:
customer_id: cust_id
full_name: name
email_address: email
when_matched_update: true
when_not_matched_insert: true
Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | |
run(context, *, table_identifier=None, key_columns=None, cols_to_exclude_from_update=None, column_mapping=None, when_matched_update=True, when_matched_delete=False, when_not_matched_insert=True, use_partition_pruning=True, ignore_empty_df=False, create_if_not_exists=True, refresh_table=True, **_)
¶
Merge the dataframe into the delta table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
table_identifier
|
str | None
|
The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context. |
None
|
key_columns
|
list[str] | None
|
List of target column names that form the key for the merge operation. |
None
|
cols_to_exclude_from_update
|
list[str] | None
|
List of target column names to be excluded from the update operation in the target Delta table. |
None
|
column_mapping
|
dict[str, str] | None
|
Mapping from target column names to source column names. Use this when source and target tables have different column names. If a column is not in the mapping, it's assumed to have the same name in both source and target. |
None
|
when_matched_update
|
bool
|
Flag to specify whether to perform an update operation when matching records are found in the target Delta table. |
True
|
when_matched_delete
|
bool
|
Flag to specify whether to perform a delete operation when matching records are found in the target Delta table. |
False
|
when_not_matched_insert
|
bool
|
Flag to specify whether to perform an insert operation when matching records are not found in the target Delta table. |
True
|
use_partition_pruning
|
bool
|
Flag to specify whether to use partition pruning to optimize the performance of the merge operation. |
True
|
ignore_empty_df
|
bool
|
A flag indicating whether to ignore an empty source dataframe. |
False
|
create_if_not_exists
|
bool
|
Create the table if it not exists. |
True
|
refresh_table
|
bool
|
Refresh the table after the transaction. |
True
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If the table does not exist. |
ValueError
|
If the data is not set in the pipeline context. |
ValueError
|
If the table metadata is empty. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Pipeline Context |
Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | |
WriteFileAction
¶
Bases: PipelineAction
This class implements a Write action for an ETL pipeline.
The WriteFileAction writes a Dataframe to a storage location defined in the
options using the FileWriter class.
Example
Source code in src/cloe_nessy/pipeline/actions/write_file.py
run(context, *, path='', format='delta', partition_cols=None, mode='append', is_stream=False, options=None, **_)
¶
Writes a file to a location.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
context
|
PipelineContext
|
Context in which this Action is executed. |
required |
path
|
str
|
Location to write data to. |
''
|
format
|
str
|
Format of files to write. |
'delta'
|
partition_cols
|
list[str] | None
|
Columns to partition on. If None, the writer will try to get the partition columns from the metadata. Default None. |
None
|
mode
|
str
|
Specifies the behavior when data or table already exists. |
'append'
|
is_stream
|
bool
|
If True, use the |
False
|
options
|
dict[str, str] | None
|
Additional options passed to the writer. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If no path is provided. |
ValueError
|
If the table metadata is empty. |
Returns:
| Type | Description |
|---|---|
PipelineContext
|
Pipeline Context |