Skip to content

Index

ReadAPIAction

Bases: PipelineAction

Reads data from an API and loads it into a Spark DataFrame.

This action executes HTTP requests (optionally paginated) in parallel using the APIReader and returns a DataFrame containing the response payloads plus request/response metadata. No intermediate files are written.

Example
Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: my/endpoint/
Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: my/endpoint/
        method: GET
        timeout: 90
        headers:
            Accept: application/json
            X-Request: foo
        params:
            q: widget
            include: details

Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: my/endpoint/
        auth:
            - type: basic
              username: my_username
              password: my_password
            - type: env
              header_template:
                "X-API-Key": "<ENV_VAR_NAME>"
            - type: secret_scope
              secret_scope: my_secret_scope
              header_template:
                "X-ORG-Token": "<SECRET_NAME>"
            - type: azure_oauth
              client_id: my_client_id
              client_secret: my_client_secret
              tenant_id: my_tenant_id
              scope: <entra-id-client-id>
The above will combine credentials (via ChainedAuth) so that headers from env/secret_scope are merged and auth flows like Basic / Azure OAuth are applied to each request.

If the API returns a large JSON object but you only want a nested list (e.g. data.items):

Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: reports/
        key: data.items

Only page_based and limit_offset strategies are currently supported. You may also supply the shared/advanced options check_field, next_page_field, max_page, pages_per_array_limit, and preliminary_probe.

1) Page-Based Pagination

Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: items/
        params:
            page: 1              # starting page (optional; defaults to 1)
            per_page: 100
        pagination:
            strategy: page_based
            page_field: page     # required
            # Shared/advanced (optional):
            check_field: results           # e.g. list to check for emptiness
            next_page_field: info.has_next # boolean flag; if present it is trusted
            max_page: -1                   # -1 = all pages
            pages_per_array_limit: 2       # chunk output rows every 2 pages
            preliminary_probe: false       # set true to pre-scan/build all page params
This issues requests like:
GET .../items/?page=1&per_page=100
GET .../items/?page=2&per_page=100
...

2) Limit/Offset Pagination

Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: products/
        params:
            limit: 50
            offset: 0
        pagination:
            strategy: limit_offset
            limit_field: limit       # required
            offset_field: offset     # required
            # Shared/advanced (optional):
            check_field: data.items
            next_page_field: page_info.has_next
            max_page: -1
            pages_per_array_limit: -1
            preliminary_probe: false
This issues requests like:
GET .../products/?limit=50&offset=0
GET .../products/?limit=50&offset=50
GET .../products/?limit=50&offset=100
...

Using preliminary_probe to pre-compute all pages If preliminary_probe: true is set, the reader will first probe the API to determine the final page (using check_field and/or next_page_field) and then fan out one request per page/offset—useful when driving fully parallel execution:

Read API:
    action: READ_API
    options:
        base_url: https://api.example.com/
        endpoint: orders/
        params:
            limit: 100
            offset: 0
        pagination:
            strategy: limit_offset
            limit_field: limit
            offset_field: offset
            check_field: data
            preliminary_probe: true
        max_concurrent_requests: 16

Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: heavy/endpoint/
        max_retries: 3           # network/5xx retry count
        backoff_factor: 2        # exponential backoff multiplier
        max_concurrent_requests: 16
        timeout: 60
Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        endpoint: v1/resources
        default_headers:
            X-Client: my-pipeline
            Accept: application/json
        headers:
            X-Request: custom

When requests_from_context: true, distinct rows from the upstream context.data are converted into individual requests (enabling heterogeneous endpoints/params). The DataFrame must have columns: endpoint, params, headers, data, json_body.

# Upstream step produces rows like:
# | endpoint        | params                  | headers | data | json_body |
# | "u/123/profile" | {"verbose": "true"}     |  null   | null |   null    |
# | "u/456/profile" | {"verbose": "false"}    |  null   | null |   null    |

Read API:
    action: READ_API
    options:
        base_url: https://some_url.com/api/
        requests_from_context: true
        method: GET
        timeout: 45
Output

The action returns a Spark DataFrame with one column json_response (ArrayType). Each element contains:

{
  "response": "<json string of the API payload (optionally reduced by 'key')>",
  "__metadata": {
    "timestamp": "YYYY-MM-DD HH:MM:SS.ssssss",
    "base_url": "https://some_url.com/api/",
    "url": "https://some_url.com/api/my/endpoint/?q=...",
    "status_code": 200,
    "reason": "OK",
    "elapsed": 0.123,
    "endpoint": "my/endpoint/",
    "query_parameters": { "q": "..." }
  }
}
When pagination is enabled and pages_per_array_limit > 0, responses are chunked into arrays of that many pages; otherwise all pages for a request are grouped together.

Validation & Errors: - base_url must be provided. - Either endpoint must be provided or requests_from_context must be true. - If requests_from_context is true, context.data must be present and non-empty. - Pagination config: - strategy must be page_based or limit_offset (other strategies are not yet supported). - For page_based, page_field is required. - For limit_offset, both limit_field and offset_field are required.

Secret information

Don't write sensitive information like passwords or tokens directly in the pipeline configuration. Use secret scopes or environment variables instead.

Source code in src/cloe_nessy/pipeline/actions/read_api.py
class ReadAPIAction(PipelineAction):
    """Reads data from an API and loads it into a Spark DataFrame.

    This action executes HTTP requests (optionally paginated) in parallel using the
    [`APIReader`][cloe_nessy.integration.reader.api_reader] and returns a DataFrame
    containing the response payloads plus request/response metadata. No intermediate
    files are written.

    Example:
        === "Basic Usage"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: my/endpoint/
            ```

        === "Usage with Parameters and Headers"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: my/endpoint/
                    method: GET
                    timeout: 90
                    headers:
                        Accept: application/json
                        X-Request: foo
                    params:
                        q: widget
                        include: details
            ```

        === "Usage with Authentication (can be chained)"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: my/endpoint/
                    auth:
                        - type: basic
                          username: my_username
                          password: my_password
                        - type: env
                          header_template:
                            "X-API-Key": "<ENV_VAR_NAME>"
                        - type: secret_scope
                          secret_scope: my_secret_scope
                          header_template:
                            "X-ORG-Token": "<SECRET_NAME>"
                        - type: azure_oauth
                          client_id: my_client_id
                          client_secret: my_client_secret
                          tenant_id: my_tenant_id
                          scope: <entra-id-client-id>
            ```
            The above will combine credentials (via `ChainedAuth`) so that headers from `env`/`secret_scope`
            are merged and auth flows like Basic / Azure OAuth are applied to each request.

        === "Extracting a Nested Field (key)"
            If the API returns a large JSON object but you only want a nested list (e.g. `data.items`):
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: reports/
                    key: data.items
            ```

        === "Pagination (Supported: page_based, limit_offset)"
            Only `page_based` and `limit_offset` strategies are currently supported. You may also
            supply the shared/advanced options `check_field`, `next_page_field`, `max_page`,
            `pages_per_array_limit`, and `preliminary_probe`.

            **1) Page-Based Pagination**
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: items/
                    params:
                        page: 1              # starting page (optional; defaults to 1)
                        per_page: 100
                    pagination:
                        strategy: page_based
                        page_field: page     # required
                        # Shared/advanced (optional):
                        check_field: results           # e.g. list to check for emptiness
                        next_page_field: info.has_next # boolean flag; if present it is trusted
                        max_page: -1                   # -1 = all pages
                        pages_per_array_limit: 2       # chunk output rows every 2 pages
                        preliminary_probe: false       # set true to pre-scan/build all page params
            ```
            This issues requests like:
            ```
            GET .../items/?page=1&per_page=100
            GET .../items/?page=2&per_page=100
            ...
            ```

            **2) Limit/Offset Pagination**
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: products/
                    params:
                        limit: 50
                        offset: 0
                    pagination:
                        strategy: limit_offset
                        limit_field: limit       # required
                        offset_field: offset     # required
                        # Shared/advanced (optional):
                        check_field: data.items
                        next_page_field: page_info.has_next
                        max_page: -1
                        pages_per_array_limit: -1
                        preliminary_probe: false
            ```
            This issues requests like:
            ```
            GET .../products/?limit=50&offset=0
            GET .../products/?limit=50&offset=50
            GET .../products/?limit=50&offset=100
            ...
            ```

            **Using `preliminary_probe` to pre-compute all pages**
            If `preliminary_probe: true` is set, the reader will first probe the API to determine
            the final page (using `check_field` and/or `next_page_field`) and then fan out one request
            per page/offset—useful when driving fully parallel execution:
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://api.example.com/
                    endpoint: orders/
                    params:
                        limit: 100
                        offset: 0
                    pagination:
                        strategy: limit_offset
                        limit_field: limit
                        offset_field: offset
                        check_field: data
                        preliminary_probe: true
                    max_concurrent_requests: 16
            ```

        === "Retries and Concurrency"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: heavy/endpoint/
                    max_retries: 3           # network/5xx retry count
                    backoff_factor: 2        # exponential backoff multiplier
                    max_concurrent_requests: 16
                    timeout: 60
            ```

        === "Default Headers on All Requests"
            ```yaml
            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    endpoint: v1/resources
                    default_headers:
                        X-Client: my-pipeline
                        Accept: application/json
                    headers:
                        X-Request: custom
            ```

        === "Deriving Requests from Context (multiple dynamic requests)"
            When `requests_from_context: true`, distinct rows from the upstream `context.data`
            are converted into individual requests (enabling heterogeneous endpoints/params).
            The DataFrame must have columns: `endpoint`, `params`, `headers`, `data`, `json_body`.

            ```yaml
            # Upstream step produces rows like:
            # | endpoint        | params                  | headers | data | json_body |
            # | "u/123/profile" | {"verbose": "true"}     |  null   | null |   null    |
            # | "u/456/profile" | {"verbose": "false"}    |  null   | null |   null    |

            Read API:
                action: READ_API
                options:
                    base_url: https://some_url.com/api/
                    requests_from_context: true
                    method: GET
                    timeout: 45
            ```

    Output:
        The action returns a Spark DataFrame with one column `json_response` (ArrayType).
        Each element contains:
        ```json
        {
          "response": "<json string of the API payload (optionally reduced by 'key')>",
          "__metadata": {
            "timestamp": "YYYY-MM-DD HH:MM:SS.ssssss",
            "base_url": "https://some_url.com/api/",
            "url": "https://some_url.com/api/my/endpoint/?q=...",
            "status_code": 200,
            "reason": "OK",
            "elapsed": 0.123,
            "endpoint": "my/endpoint/",
            "query_parameters": { "q": "..." }
          }
        }
        ```
        When pagination is enabled and `pages_per_array_limit` > 0, responses are chunked
        into arrays of that many pages; otherwise all pages for a request are grouped together.

    Validation & Errors:
        - `base_url` must be provided.
        - Either `endpoint` must be provided **or** `requests_from_context` must be `true`.
        - If `requests_from_context` is `true`, `context.data` must be present and non-empty.
        - Pagination config:
            - `strategy` must be `page_based` or `limit_offset` (other strategies are not yet supported).
            - For `page_based`, `page_field` is required.
            - For `limit_offset`, both `limit_field` and `offset_field` are required.

    !!! warning "Secret information"
        Don't write sensitive information like passwords or tokens directly in the pipeline configuration.
        Use secret scopes or environment variables instead.
    """

    name: str = "READ_API"

    @validate_call(config=ConfigDict(arbitrary_types_allowed=True))
    def run(
        self,
        context: PipelineContext,
        *,
        base_url: str | None = None,
        auth: Mapping[str, str | Mapping[str, str] | list[Mapping[str, str]]] | None = None,
        endpoint: str | None = None,
        default_headers: dict[str, Any] | None = None,
        method: str = "GET",
        key: str | None = None,
        timeout: int = 30,
        params: dict[str, Any] | None = None,
        headers: dict[str, Any] | None = None,
        data: dict[str, Any] | None = None,
        json_body: dict[str, Any] | None = None,
        pagination: PaginationConfigData | None = None,
        max_retries: int = 0,
        backoff_factor: int = 0,
        max_concurrent_requests: int = 8,
        requests_from_context: bool = False,
        **_: Any,
    ) -> PipelineContext:
        """Executes API requests in parallel by using mapInPandas.

        We do NOT write intermediate files; instead we directly return the responses
        as rows in a Spark DataFrame.


        Args:
            context: The pipeline context used to carry data between actions.
            base_url: The base URL for all API requests.
            auth: Authentication configuration, which may be a simple header map,
                a nested map for different auth scopes, or a list thereof.
            endpoint: The specific path to append to the base URL for this call.
            default_headers: Headers to include on every request.
            method: HTTP method to use.
            key: JSON field name to extract from each response.
            timeout: Request timeout in seconds.
            params: Query parameters to append to the URL.
            headers: Additional request-specific headers.
            data: Form-encoded body to send.
            json_body: JSON-encoded body to send.
            pagination: Configuration for paginated endpoints.
            max_retries: Number of times to retry on failure.
            backoff_factor: Multiplier for retry backoff delays.
            max_concurrent_requests: Maximum number of parallel API calls.
            requests_from_context: Whether to derive request parameters from context data.

        Returns:
            The updated context, with the read data as a DataFrame.

        Raises:
            ValueError: If no base URL is provided.
            ValueError: If neither an endpoint nor context-derived requests are specified.
            ValueError: If context-derived requests are enabled but no data is present in context.
        """
        deserialized_auth = process_auth(auth)
        pagination_config = PaginationConfig(**pagination) if pagination is not None else None

        if base_url is None:
            raise ValueError("A value for base_url must to be supplied")

        if endpoint is None and not requests_from_context:
            raise ValueError("A value for endpoint must to be supplied")

        api_reader = APIReader(
            base_url=base_url,
            auth=deserialized_auth,
            default_headers=default_headers,
            max_concurrent_requests=max_concurrent_requests,
        )

        dynamic_requests: list[RequestSet] | None = None

        if requests_from_context:
            if not context.data:
                raise ValueError("Cannot generate requests from the context without a DataFrame in the context.")

            dynamic_requests = [
                cast(RequestSet, row.asDict())
                for row in context.data.select(
                    "endpoint",
                    "params",
                    "headers",
                    "data",
                    "json_body",
                )
                .distinct()
                .collect()
            ]

        df = api_reader.read(
            endpoint=endpoint,
            method=method,
            key=key,
            timeout=timeout,
            params=params,
            headers=headers,
            data=data,
            json_body=json_body,
            pagination_config=pagination_config,
            max_retries=max_retries,
            backoff_factor=backoff_factor,
            dynamic_requests=dynamic_requests,
        )

        row_count = df.count()
        self._console_logger.info(f"API requests completed. Final row count = {row_count}.")

        return context.from_existing(data=df)

run(context, *, base_url=None, auth=None, endpoint=None, default_headers=None, method='GET', key=None, timeout=30, params=None, headers=None, data=None, json_body=None, pagination=None, max_retries=0, backoff_factor=0, max_concurrent_requests=8, requests_from_context=False, **_)

Executes API requests in parallel by using mapInPandas.

We do NOT write intermediate files; instead we directly return the responses as rows in a Spark DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline context used to carry data between actions.

required
base_url str | None

The base URL for all API requests.

None
auth Mapping[str, str | Mapping[str, str] | list[Mapping[str, str]]] | None

Authentication configuration, which may be a simple header map, a nested map for different auth scopes, or a list thereof.

None
endpoint str | None

The specific path to append to the base URL for this call.

None
default_headers dict[str, Any] | None

Headers to include on every request.

None
method str

HTTP method to use.

'GET'
key str | None

JSON field name to extract from each response.

None
timeout int

Request timeout in seconds.

30
params dict[str, Any] | None

Query parameters to append to the URL.

None
headers dict[str, Any] | None

Additional request-specific headers.

None
data dict[str, Any] | None

Form-encoded body to send.

None
json_body dict[str, Any] | None

JSON-encoded body to send.

None
pagination PaginationConfigData | None

Configuration for paginated endpoints.

None
max_retries int

Number of times to retry on failure.

0
backoff_factor int

Multiplier for retry backoff delays.

0
max_concurrent_requests int

Maximum number of parallel API calls.

8
requests_from_context bool

Whether to derive request parameters from context data.

False

Returns:

Type Description
PipelineContext

The updated context, with the read data as a DataFrame.

Raises:

Type Description
ValueError

If no base URL is provided.

ValueError

If neither an endpoint nor context-derived requests are specified.

ValueError

If context-derived requests are enabled but no data is present in context.

Source code in src/cloe_nessy/pipeline/actions/read_api.py
@validate_call(config=ConfigDict(arbitrary_types_allowed=True))
def run(
    self,
    context: PipelineContext,
    *,
    base_url: str | None = None,
    auth: Mapping[str, str | Mapping[str, str] | list[Mapping[str, str]]] | None = None,
    endpoint: str | None = None,
    default_headers: dict[str, Any] | None = None,
    method: str = "GET",
    key: str | None = None,
    timeout: int = 30,
    params: dict[str, Any] | None = None,
    headers: dict[str, Any] | None = None,
    data: dict[str, Any] | None = None,
    json_body: dict[str, Any] | None = None,
    pagination: PaginationConfigData | None = None,
    max_retries: int = 0,
    backoff_factor: int = 0,
    max_concurrent_requests: int = 8,
    requests_from_context: bool = False,
    **_: Any,
) -> PipelineContext:
    """Executes API requests in parallel by using mapInPandas.

    We do NOT write intermediate files; instead we directly return the responses
    as rows in a Spark DataFrame.


    Args:
        context: The pipeline context used to carry data between actions.
        base_url: The base URL for all API requests.
        auth: Authentication configuration, which may be a simple header map,
            a nested map for different auth scopes, or a list thereof.
        endpoint: The specific path to append to the base URL for this call.
        default_headers: Headers to include on every request.
        method: HTTP method to use.
        key: JSON field name to extract from each response.
        timeout: Request timeout in seconds.
        params: Query parameters to append to the URL.
        headers: Additional request-specific headers.
        data: Form-encoded body to send.
        json_body: JSON-encoded body to send.
        pagination: Configuration for paginated endpoints.
        max_retries: Number of times to retry on failure.
        backoff_factor: Multiplier for retry backoff delays.
        max_concurrent_requests: Maximum number of parallel API calls.
        requests_from_context: Whether to derive request parameters from context data.

    Returns:
        The updated context, with the read data as a DataFrame.

    Raises:
        ValueError: If no base URL is provided.
        ValueError: If neither an endpoint nor context-derived requests are specified.
        ValueError: If context-derived requests are enabled but no data is present in context.
    """
    deserialized_auth = process_auth(auth)
    pagination_config = PaginationConfig(**pagination) if pagination is not None else None

    if base_url is None:
        raise ValueError("A value for base_url must to be supplied")

    if endpoint is None and not requests_from_context:
        raise ValueError("A value for endpoint must to be supplied")

    api_reader = APIReader(
        base_url=base_url,
        auth=deserialized_auth,
        default_headers=default_headers,
        max_concurrent_requests=max_concurrent_requests,
    )

    dynamic_requests: list[RequestSet] | None = None

    if requests_from_context:
        if not context.data:
            raise ValueError("Cannot generate requests from the context without a DataFrame in the context.")

        dynamic_requests = [
            cast(RequestSet, row.asDict())
            for row in context.data.select(
                "endpoint",
                "params",
                "headers",
                "data",
                "json_body",
            )
            .distinct()
            .collect()
        ]

    df = api_reader.read(
        endpoint=endpoint,
        method=method,
        key=key,
        timeout=timeout,
        params=params,
        headers=headers,
        data=data,
        json_body=json_body,
        pagination_config=pagination_config,
        max_retries=max_retries,
        backoff_factor=backoff_factor,
        dynamic_requests=dynamic_requests,
    )

    row_count = df.count()
    self._console_logger.info(f"API requests completed. Final row count = {row_count}.")

    return context.from_existing(data=df)

ReadCatalogTableAction

Bases: PipelineAction

Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

This function retrieves data from a catalog table using the CatalogReader identified by either the table_identifier parameter or the table_metadata from the provided PipelineContext of a previous step. The retrieved data is loaded into a DataFrame and returned as part of an updated PipelineContext.

Example
Read Sales Table:
    action: READ_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        options: <options for the CatalogReader read method>
        delta_load_options:
            strategy: CDF
            delta_load_identifier: my_delta_load_id
            strategy_options:
                deduplication_columns: ["id"]
                enable_full_load: true
Read Sales Table:
    action: READ_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        options: <options for the CatalogReader read method>
Read Sales Table Stream:
    action: READ_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        stream: true
        options: <options for the CatalogReader read_stream method>
Read Sales Table:
    action: READ_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        options: <options for the CatalogReader read method>
    delta_load_options:
        strategy: CDF
        delta_load_identifier: my_delta_load_id
        strategy_options:
            deduplication_columns: ["id"]
            enable_full_load: true
Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
class ReadCatalogTableAction(PipelineAction):
    """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

    This function retrieves data from a catalog table using the
    [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader] identified
    by either the `table_identifier` parameter or the `table_metadata` from the
    provided `PipelineContext` of a previous step. The retrieved data is loaded
    into a DataFrame and returned as part of an updated `PipelineContext`.

    Example:
        ```yaml
        Read Sales Table:
            action: READ_CATALOG_TABLE
            options:
                table_identifier: my_catalog.business_schema.sales_table
                options: <options for the CatalogReader read method>
                delta_load_options:
                    strategy: CDF
                    delta_load_identifier: my_delta_load_id
                    strategy_options:
                        deduplication_columns: ["id"]
                        enable_full_load: true
        ```
        === "Batch Read"
            ```yaml
            Read Sales Table:
                action: READ_CATALOG_TABLE
                options:
                    table_identifier: my_catalog.business_schema.sales_table
                    options: <options for the CatalogReader read method>
            ```
        === "Streaming Read"
            ```yaml
            Read Sales Table Stream:
                action: READ_CATALOG_TABLE
                options:
                    table_identifier: my_catalog.business_schema.sales_table
                    stream: true
                    options: <options for the CatalogReader read_stream method>
            ```
        === "Delta Load Read"
            ```yaml
            Read Sales Table:
                action: READ_CATALOG_TABLE
                options:
                    table_identifier: my_catalog.business_schema.sales_table
                    options: <options for the CatalogReader read method>
                delta_load_options:
                    strategy: CDF
                    delta_load_identifier: my_delta_load_id
                    strategy_options:
                        deduplication_columns: ["id"]
                        enable_full_load: true
            ```
    """

    name: str = "READ_CATALOG_TABLE"

    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        options: dict[str, str] | None = None,
        delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
        stream: bool = False,
        **_: Any,  # define kwargs to match the base class signature
    ) -> PipelineContext:
        """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

        Args:
            context: The pipeline's context, which contains
                metadata and configuration for the action.
            table_identifier: The identifier of the catalog table to
                read. If not provided, the function will attempt to use the table
                identifier from the `table_metadata` in the `context`.
            options: A dictionary of options for customizing
                the [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader]
                behavior, such as filters or reading modes. Defaults to None.
            delta_load_options: Options for delta loading, if applicable.
                Configures the [`DeltaLoader`][cloe_nessy.integration.delta_loader].
                behavior, such as filters or reading modes.
            stream: If True, the action will read the table as a stream.

        Raises:
            ValueError: If neither `table_identifier` nor `table_metadata.identifier` in the `context` is provided.

        Returns:
        An updated pipeline context containing the data read from the catalog table as a DataFrame.
        """
        if not options:
            options = {}

        if not delta_load_options:
            delta_load_options = {}

        if (table_metadata := context.table_metadata) and table_identifier is None:
            table_identifier = table_metadata.identifier
        if table_identifier is None:
            raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

        if isinstance(delta_load_options, dict):
            delta_options_dict = delta_load_options
            if delta_load_options:
                delta_load_options = DeltaLoadOptions(**delta_load_options)
            else:
                delta_load_options = None
        else:
            delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

        runtime_info = set_delta_load_info(
            table_identifier=table_identifier,
            delta_load_options=delta_options_dict,
            runtime_info=context.runtime_info or {},
        )

        if isinstance(delta_load_options, dict):
            delta_options_dict = delta_load_options
            if delta_load_options:
                delta_load_options = DeltaLoadOptions(**delta_load_options)
            else:
                delta_load_options = None
        else:
            delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

        runtime_info = set_delta_load_info(
            table_identifier=table_identifier,
            delta_load_options=delta_options_dict,
            runtime_info=context.runtime_info or {},
        )

        table_reader = CatalogReader()

        if stream:
            context.runtime_info = (context.runtime_info or {}) | {"streaming": True}
            df = table_reader.read_stream(table_identifier=table_identifier, options=options)
        else:
            df = table_reader.read(
                table_identifier=table_identifier,
                options=options,
                delta_load_options=delta_load_options,
            )

        return context.from_existing(data=df, runtime_info=runtime_info)

run(context, *, table_identifier=None, options=None, delta_load_options=None, stream=False, **_)

Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline's context, which contains metadata and configuration for the action.

required
table_identifier str | None

The identifier of the catalog table to read. If not provided, the function will attempt to use the table identifier from the table_metadata in the context.

None
options dict[str, str] | None

A dictionary of options for customizing the CatalogReader behavior, such as filters or reading modes. Defaults to None.

None
delta_load_options dict[Any, Any] | DeltaLoadOptions | None

Options for delta loading, if applicable. Configures the DeltaLoader. behavior, such as filters or reading modes.

None
stream bool

If True, the action will read the table as a stream.

False

Raises:

Type Description
ValueError

If neither table_identifier nor table_metadata.identifier in the context is provided.

Returns: An updated pipeline context containing the data read from the catalog table as a DataFrame.

Source code in src/cloe_nessy/pipeline/actions/read_catalog_table.py
def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    options: dict[str, str] | None = None,
    delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
    stream: bool = False,
    **_: Any,  # define kwargs to match the base class signature
) -> PipelineContext:
    """Reads a table from Unity Catalog using a specified table identifier and optional reader configurations.

    Args:
        context: The pipeline's context, which contains
            metadata and configuration for the action.
        table_identifier: The identifier of the catalog table to
            read. If not provided, the function will attempt to use the table
            identifier from the `table_metadata` in the `context`.
        options: A dictionary of options for customizing
            the [`CatalogReader`][cloe_nessy.integration.reader.catalog_reader]
            behavior, such as filters or reading modes. Defaults to None.
        delta_load_options: Options for delta loading, if applicable.
            Configures the [`DeltaLoader`][cloe_nessy.integration.delta_loader].
            behavior, such as filters or reading modes.
        stream: If True, the action will read the table as a stream.

    Raises:
        ValueError: If neither `table_identifier` nor `table_metadata.identifier` in the `context` is provided.

    Returns:
    An updated pipeline context containing the data read from the catalog table as a DataFrame.
    """
    if not options:
        options = {}

    if not delta_load_options:
        delta_load_options = {}

    if (table_metadata := context.table_metadata) and table_identifier is None:
        table_identifier = table_metadata.identifier
    if table_identifier is None:
        raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

    if isinstance(delta_load_options, dict):
        delta_options_dict = delta_load_options
        if delta_load_options:
            delta_load_options = DeltaLoadOptions(**delta_load_options)
        else:
            delta_load_options = None
    else:
        delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

    runtime_info = set_delta_load_info(
        table_identifier=table_identifier,
        delta_load_options=delta_options_dict,
        runtime_info=context.runtime_info or {},
    )

    if isinstance(delta_load_options, dict):
        delta_options_dict = delta_load_options
        if delta_load_options:
            delta_load_options = DeltaLoadOptions(**delta_load_options)
        else:
            delta_load_options = None
    else:
        delta_options_dict = delta_load_options.model_dump() if delta_load_options else {}

    runtime_info = set_delta_load_info(
        table_identifier=table_identifier,
        delta_load_options=delta_options_dict,
        runtime_info=context.runtime_info or {},
    )

    table_reader = CatalogReader()

    if stream:
        context.runtime_info = (context.runtime_info or {}) | {"streaming": True}
        df = table_reader.read_stream(table_identifier=table_identifier, options=options)
    else:
        df = table_reader.read(
            table_identifier=table_identifier,
            options=options,
            delta_load_options=delta_load_options,
        )

    return context.from_existing(data=df, runtime_info=runtime_info)

ReadExcelAction

Bases: PipelineAction

Reads data from an Excel file or directory of Excel files and returns a DataFrame.

The function reads Excel files using the ExcelDataFrameReader either from a single file or a directory path. It can read specific sheets, handle file extensions, and offers various options to customize how the data is read, such as specifying headers, index columns, and handling missing values. The resulting data is returned as a DataFrame, and metadata about the read files can be included in the context.

Example
Read Excel Table:
    action: READ_EXCEL
    options:
        file: excel_file_folder/excel_files_june/interesting_excel_file.xlsx
        usecols:
            - key_column
            - interesting_column
        options: <options for the ExcelDataFrameReader read method>

More Options

The READ_EXCEL action supports additional options that can be passed to the run method. For more information, refer to the method documentation.

Source code in src/cloe_nessy/pipeline/actions/read_excel.py
class ReadExcelAction(PipelineAction):
    """Reads data from an Excel file or directory of Excel files and returns a DataFrame.

    The function reads Excel files using the
    [`ExcelDataFrameReader`][cloe_nessy.integration.reader.excel_reader] either
    from a single file or a directory path. It can read specific sheets, handle
    file extensions, and offers various options to customize how the data is
    read, such as specifying headers, index columns, and handling missing
    values. The resulting data is returned as a DataFrame, and metadata about
    the read files can be included in the context.

    Example:
        ```yaml
        Read Excel Table:
            action: READ_EXCEL
            options:
                file: excel_file_folder/excel_files_june/interesting_excel_file.xlsx
                usecols:
                    - key_column
                    - interesting_column
                options: <options for the ExcelDataFrameReader read method>
        ```

    !!! note "More Options"
        The `READ_EXCEL` action supports additional options that can be passed to the
        run method. For more information, refer to the method documentation.
    """

    name: str = "READ_EXCEL"

    def run(
        self,
        context: PipelineContext,
        *,
        file: str | None = None,
        path: str | None = None,
        extension: str = "xlsx",
        recursive: bool = False,
        sheet_name: str | int | list = 0,
        sheet_name_as_column: bool = False,
        header: int | list[int] = 0,
        index_col: int | list[int] | None = None,
        usecols: int | str | list | Callable | None = None,
        dtype: str | None = None,
        fillna: str | dict[str, list[str]] | dict[str, str] | None = None,
        true_values: list | None = None,
        false_values: list | None = None,
        nrows: int | None = None,
        na_values: list[str] | dict[str, list[str]] | None = None,
        keep_default_na: bool = True,
        parse_dates: bool | list | dict = False,
        date_parser: Callable | None = None,
        thousands: str | None = None,
        include_index: bool = False,
        options: dict | None = None,
        add_metadata_column: bool = True,
        load_as_strings: bool = False,
        **_,
    ) -> PipelineContext:
        """Reads data from an Excel file or directory of Excel files and returns a DataFrame.

        Args:
            context: The context in which the action is executed.
            file: The path to a single Excel file. Either `file` or `path` must be specified.
            path: The directory path containing multiple Excel files. Either `file` or `path` must be specified.
            extension: The file extension to look for when reading from a directory.
            recursive: Whether to include subdirectories when reading from a directory path.
            sheet_name: The sheet name(s) or index(es) to read from the Excel file.
            sheet_name_as_column: Whether to add a column with the sheet name to the DataFrame.
            header: Row number(s) to use as the column labels.
            index_col: Column(s) to use as the index of the DataFrame.
            usecols: Subset of columns to parse. Can be an integer, string, list,
                or function.
            dtype: Data type for the columns.
            fillna: Method or value to use to fill NaN values.
            true_values: Values to consider as True.
            false_values: Values to consider as False.
            nrows: Number of rows to parse.
            na_values: Additional strings to recognize as NaN/NA.
            keep_default_na: Whether to append default NaN values when custom `na_values` are specified.
            parse_dates: Options for parsing date columns.
            date_parser: Function to use for converting strings to datetime objects.
            thousands: Thousands separator to use when parsing numeric columns.
            include_index: Whether to include an index column in the output DataFrame.
            options: Additional options to pass to the DataFrame reader.
            add_metadata_column: Whether to add a metadata column with file information to the DataFrame.
            load_as_strings: Whether to load all columns as strings.

        Raises:
            ValueError: Raised if both `file` and `path` are specified, or if neither is provided.

        Returns:
            The updated context, with the read data as a DataFrame.
        """
        if not options:
            options = dict()

        if file is not None and path is not None:
            self._tabular_logger.error("message: Only one of file or path have to be specified.")
            raise ValueError("Only one of file or path have to be specified.")

        excel_reader = ExcelDataFrameReader()
        if file is not None:
            df = excel_reader.read(
                location=file,
                sheet_name=sheet_name,
                sheet_name_as_column=sheet_name_as_column,
                header=header,
                index_col=index_col,
                usecols=usecols,
                true_values=true_values,
                false_values=false_values,
                nrows=nrows,
                dtype=dtype,
                fillna=fillna,
                na_values=na_values,
                keep_default_na=keep_default_na,
                parse_dates=parse_dates,
                date_parser=date_parser,
                thousands=thousands,
                include_index=include_index,
                options=options,
                add_metadata_column=add_metadata_column,
                load_as_strings=load_as_strings,
            )
        elif path is not None:
            file_list = get_file_paths(path, extension, recursive)
            df_dict: dict = {}
            for path in file_list:
                df_dict[path] = excel_reader.read(
                    location=path,
                    sheet_name=sheet_name,
                    sheet_name_as_column=sheet_name_as_column,
                    header=header,
                    index_col=index_col,
                    usecols=usecols,
                    dtype=dtype,
                    fillna=fillna,
                    true_values=true_values,
                    false_values=false_values,
                    nrows=nrows,
                    na_values=na_values,
                    keep_default_na=keep_default_na,
                    parse_dates=parse_dates,
                    date_parser=date_parser,
                    thousands=thousands,
                    include_index=include_index,
                    options=options,
                    add_metadata_column=add_metadata_column,
                    load_as_strings=load_as_strings,
                )
            df = reduce(DataFrame.unionAll, list(df_dict.values()))

        else:
            self._tabular_logger.error("action_name: READ_EXCEL | message: Either file or path have to be specified.")
            raise ValueError("Either file or path have to be specified.")

        runtime_info = context.runtime_info

        if add_metadata_column:
            read_files_list = list(set([x.file_path for x in df.select("__metadata.file_path").collect()]))
            if runtime_info is None:
                runtime_info = {"read_files": read_files_list}
            else:
                try:
                    runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
                except KeyError:
                    runtime_info["read_files"] = read_files_list

        return context.from_existing(data=df)

run(context, *, file=None, path=None, extension='xlsx', recursive=False, sheet_name=0, sheet_name_as_column=False, header=0, index_col=None, usecols=None, dtype=None, fillna=None, true_values=None, false_values=None, nrows=None, na_values=None, keep_default_na=True, parse_dates=False, date_parser=None, thousands=None, include_index=False, options=None, add_metadata_column=True, load_as_strings=False, **_)

Reads data from an Excel file or directory of Excel files and returns a DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

The context in which the action is executed.

required
file str | None

The path to a single Excel file. Either file or path must be specified.

None
path str | None

The directory path containing multiple Excel files. Either file or path must be specified.

None
extension str

The file extension to look for when reading from a directory.

'xlsx'
recursive bool

Whether to include subdirectories when reading from a directory path.

False
sheet_name str | int | list

The sheet name(s) or index(es) to read from the Excel file.

0
sheet_name_as_column bool

Whether to add a column with the sheet name to the DataFrame.

False
header int | list[int]

Row number(s) to use as the column labels.

0
index_col int | list[int] | None

Column(s) to use as the index of the DataFrame.

None
usecols int | str | list | Callable | None

Subset of columns to parse. Can be an integer, string, list, or function.

None
dtype str | None

Data type for the columns.

None
fillna str | dict[str, list[str]] | dict[str, str] | None

Method or value to use to fill NaN values.

None
true_values list | None

Values to consider as True.

None
false_values list | None

Values to consider as False.

None
nrows int | None

Number of rows to parse.

None
na_values list[str] | dict[str, list[str]] | None

Additional strings to recognize as NaN/NA.

None
keep_default_na bool

Whether to append default NaN values when custom na_values are specified.

True
parse_dates bool | list | dict

Options for parsing date columns.

False
date_parser Callable | None

Function to use for converting strings to datetime objects.

None
thousands str | None

Thousands separator to use when parsing numeric columns.

None
include_index bool

Whether to include an index column in the output DataFrame.

False
options dict | None

Additional options to pass to the DataFrame reader.

None
add_metadata_column bool

Whether to add a metadata column with file information to the DataFrame.

True
load_as_strings bool

Whether to load all columns as strings.

False

Raises:

Type Description
ValueError

Raised if both file and path are specified, or if neither is provided.

Returns:

Type Description
PipelineContext

The updated context, with the read data as a DataFrame.

Source code in src/cloe_nessy/pipeline/actions/read_excel.py
def run(
    self,
    context: PipelineContext,
    *,
    file: str | None = None,
    path: str | None = None,
    extension: str = "xlsx",
    recursive: bool = False,
    sheet_name: str | int | list = 0,
    sheet_name_as_column: bool = False,
    header: int | list[int] = 0,
    index_col: int | list[int] | None = None,
    usecols: int | str | list | Callable | None = None,
    dtype: str | None = None,
    fillna: str | dict[str, list[str]] | dict[str, str] | None = None,
    true_values: list | None = None,
    false_values: list | None = None,
    nrows: int | None = None,
    na_values: list[str] | dict[str, list[str]] | None = None,
    keep_default_na: bool = True,
    parse_dates: bool | list | dict = False,
    date_parser: Callable | None = None,
    thousands: str | None = None,
    include_index: bool = False,
    options: dict | None = None,
    add_metadata_column: bool = True,
    load_as_strings: bool = False,
    **_,
) -> PipelineContext:
    """Reads data from an Excel file or directory of Excel files and returns a DataFrame.

    Args:
        context: The context in which the action is executed.
        file: The path to a single Excel file. Either `file` or `path` must be specified.
        path: The directory path containing multiple Excel files. Either `file` or `path` must be specified.
        extension: The file extension to look for when reading from a directory.
        recursive: Whether to include subdirectories when reading from a directory path.
        sheet_name: The sheet name(s) or index(es) to read from the Excel file.
        sheet_name_as_column: Whether to add a column with the sheet name to the DataFrame.
        header: Row number(s) to use as the column labels.
        index_col: Column(s) to use as the index of the DataFrame.
        usecols: Subset of columns to parse. Can be an integer, string, list,
            or function.
        dtype: Data type for the columns.
        fillna: Method or value to use to fill NaN values.
        true_values: Values to consider as True.
        false_values: Values to consider as False.
        nrows: Number of rows to parse.
        na_values: Additional strings to recognize as NaN/NA.
        keep_default_na: Whether to append default NaN values when custom `na_values` are specified.
        parse_dates: Options for parsing date columns.
        date_parser: Function to use for converting strings to datetime objects.
        thousands: Thousands separator to use when parsing numeric columns.
        include_index: Whether to include an index column in the output DataFrame.
        options: Additional options to pass to the DataFrame reader.
        add_metadata_column: Whether to add a metadata column with file information to the DataFrame.
        load_as_strings: Whether to load all columns as strings.

    Raises:
        ValueError: Raised if both `file` and `path` are specified, or if neither is provided.

    Returns:
        The updated context, with the read data as a DataFrame.
    """
    if not options:
        options = dict()

    if file is not None and path is not None:
        self._tabular_logger.error("message: Only one of file or path have to be specified.")
        raise ValueError("Only one of file or path have to be specified.")

    excel_reader = ExcelDataFrameReader()
    if file is not None:
        df = excel_reader.read(
            location=file,
            sheet_name=sheet_name,
            sheet_name_as_column=sheet_name_as_column,
            header=header,
            index_col=index_col,
            usecols=usecols,
            true_values=true_values,
            false_values=false_values,
            nrows=nrows,
            dtype=dtype,
            fillna=fillna,
            na_values=na_values,
            keep_default_na=keep_default_na,
            parse_dates=parse_dates,
            date_parser=date_parser,
            thousands=thousands,
            include_index=include_index,
            options=options,
            add_metadata_column=add_metadata_column,
            load_as_strings=load_as_strings,
        )
    elif path is not None:
        file_list = get_file_paths(path, extension, recursive)
        df_dict: dict = {}
        for path in file_list:
            df_dict[path] = excel_reader.read(
                location=path,
                sheet_name=sheet_name,
                sheet_name_as_column=sheet_name_as_column,
                header=header,
                index_col=index_col,
                usecols=usecols,
                dtype=dtype,
                fillna=fillna,
                true_values=true_values,
                false_values=false_values,
                nrows=nrows,
                na_values=na_values,
                keep_default_na=keep_default_na,
                parse_dates=parse_dates,
                date_parser=date_parser,
                thousands=thousands,
                include_index=include_index,
                options=options,
                add_metadata_column=add_metadata_column,
                load_as_strings=load_as_strings,
            )
        df = reduce(DataFrame.unionAll, list(df_dict.values()))

    else:
        self._tabular_logger.error("action_name: READ_EXCEL | message: Either file or path have to be specified.")
        raise ValueError("Either file or path have to be specified.")

    runtime_info = context.runtime_info

    if add_metadata_column:
        read_files_list = list(set([x.file_path for x in df.select("__metadata.file_path").collect()]))
        if runtime_info is None:
            runtime_info = {"read_files": read_files_list}
        else:
            try:
                runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
            except KeyError:
                runtime_info["read_files"] = read_files_list

    return context.from_existing(data=df)

ReadFilesAction

Bases: PipelineAction

Reads files from a specified location.

If an extension is provided, all files with the given extension will be read using the FileReader. If no extension is provided, the spark_format must be set, and all files in the location will be read using a DataFrameReader with the specified format.

Example
Read Files:
    action: READ_FILES
    options:
        location: json_file_folder/
        search_subdirs: True
        spark_format: JSON

Define Spark Format

Use the spark_format option to specify the format with which to read the files. Supported formats are e.g., CSV, JSON, PARQUET, TEXT, and XML.

Read Files:
    action: READ_FILES
    options:
        location: csv_file_folder/
        search_subdirs: True
        extension: csv

Define Extension

Use the extension option to specify the extension of the files to read. If not specified, the spark_format will be derived from the extension.

Read Files:
    action: READ_FILES
    options:
        location: file_folder/
        extension: abc_custom_extension  # specifies the files to read
        spark_format: CSV  # specifies the format to read the files with

Define both Extension & Spark Format

Use the extension option to specify the extension of the files to read. Additionally, use the spark_format option to specify the format with which to read the files.

Read Delta Files:
    action: READ_FILES
    options:
        location: /path/to/delta/table
        spark_format: delta
    delta_load_options:
        strategy: CDF
        delta_load_identifier: my_delta_files_load
        strategy_options:
            deduplication_columns: ["id"]
            enable_full_load: false

Delta Loading for Files

Use delta_load_options when reading Delta Lake tables to enable incremental loading. This works with both CDF and timestamp strategies.

Source code in src/cloe_nessy/pipeline/actions/read_files.py
class ReadFilesAction(PipelineAction):
    """Reads files from a specified location.

    If an extension is provided, all files with the given extension will be read
    using the [`FileReader`][cloe_nessy.integration.reader.file_reader]. If no
    extension is provided, the `spark_format` must be set, and all files in the
    location will be read using a DataFrameReader with the specified format.

    Example:
        === "Read files specified by spark_format"
            ```yaml
            Read Files:
                action: READ_FILES
                options:
                    location: json_file_folder/
                    search_subdirs: True
                    spark_format: JSON
            ```
            !!! note "Define Spark Format"
                Use the `spark_format` option to specify the format with which
                to read the files. Supported formats are e.g., `CSV`, `JSON`,
                `PARQUET`, `TEXT`, and `XML`.

        === "Read files specified by extension"
            ```yaml
            Read Files:
                action: READ_FILES
                options:
                    location: csv_file_folder/
                    search_subdirs: True
                    extension: csv
            ```
            !!! note "Define Extension"
                Use the `extension` option to specify the extension of the files
                to read. If not specified, the `spark_format` will be derived from
                the extension.

        === "Read files with a specified spark_format AND extension"
            ```yaml
            Read Files:
                action: READ_FILES
                options:
                    location: file_folder/
                    extension: abc_custom_extension  # specifies the files to read
                    spark_format: CSV  # specifies the format to read the files with
            ```
            !!! note "Define both Extension & Spark Format"
                Use the `extension` option to specify the extension of the files
                to read. Additionally, use the `spark_format` option to specify
                the format with which to read the files.

        === "Read Delta Lake table with delta loading"
            ```yaml
            Read Delta Files:
                action: READ_FILES
                options:
                    location: /path/to/delta/table
                    spark_format: delta
                delta_load_options:
                    strategy: CDF
                    delta_load_identifier: my_delta_files_load
                    strategy_options:
                        deduplication_columns: ["id"]
                        enable_full_load: false
            ```
            !!! note "Delta Loading for Files"
                Use `delta_load_options` when reading Delta Lake tables to enable
                incremental loading. This works with both CDF and timestamp strategies.
    """

    name: str = "READ_FILES"

    def run(
        self,
        context: PipelineContext,
        *,
        location: str | None = None,
        search_subdirs: bool = False,
        extension: str | None = None,
        spark_format: str | None = None,
        schema: str | None = None,
        add_metadata_column: bool = True,
        options: dict[str, str] | None = None,
        delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Reads files from a specified location.

        Args:
            context: The context in which this Action is executed.
            location: The location from which to read files.
            search_subdirs: Recursively search subdirectories for files
                if an extension is provided.
            extension: The file extension to filter files by.
            spark_format: The format to use for reading the files. If not provided,
                it will be deferred from the file extension.
            schema: The schema of the data. If None, schema is obtained from
                the context metadata.
            add_metadata_column: Whether to include the `__metadata` column with
                file metadata in the DataFrame.
            options: Additional options passed to the reader.
            delta_load_options: Options for delta loading, if applicable. When provided
                for Delta format files, enables incremental loading using delta loader strategies.

        Raises:
            ValueError: If neither `extension` nor `spark_format` are provided, or if
                no location is specified.

        Returns:
            The context after the Action has been executed, containing the read data as a DataFrame.
        """
        if not location:
            raise ValueError("No location provided. Please specify location to read files from.")
        if not options:
            options = dict()
        if not spark_format and not extension:
            raise ValueError("Either spark_format or extension must be provided.")

        if (metadata := context.table_metadata) and schema is None:
            schema = metadata.schema

        # Convert dict to DeltaLoadOptions if needed
        if isinstance(delta_load_options, dict):
            delta_load_options = DeltaLoadOptions(**delta_load_options)

        # Set up runtime info for delta loading
        runtime_info = context.runtime_info or {}
        if delta_load_options:
            # Convert DeltaLoadOptions to dict for runtime info storage
            delta_options_dict = (
                delta_load_options.model_dump()
                if isinstance(delta_load_options, DeltaLoadOptions)
                else delta_load_options
            )
            runtime_info = set_delta_load_info(
                table_identifier=location,  # Use location as identifier for file-based delta loading
                delta_load_options=delta_options_dict,
                runtime_info=runtime_info,
            )

        file_reader = FileReader()
        df = file_reader.read(
            location=location,
            schema=schema,
            extension=extension,
            spark_format=spark_format,
            search_subdirs=search_subdirs,
            options=options,
            add_metadata_column=add_metadata_column,
            delta_load_options=delta_load_options,
        )

        # Only process metadata column if it exists and wasn't using delta loading
        if add_metadata_column and "__metadata" in df.columns:
            read_files_list = [x.file_path for x in df.select("__metadata.file_path").drop_duplicates().collect()]
            if runtime_info is None:
                runtime_info = {"read_files": read_files_list}
            else:
                try:
                    runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
                except KeyError:
                    runtime_info["read_files"] = read_files_list

        return context.from_existing(data=df, runtime_info=runtime_info)

run(context, *, location=None, search_subdirs=False, extension=None, spark_format=None, schema=None, add_metadata_column=True, options=None, delta_load_options=None, **_)

Reads files from a specified location.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
location str | None

The location from which to read files.

None
search_subdirs bool

Recursively search subdirectories for files if an extension is provided.

False
extension str | None

The file extension to filter files by.

None
spark_format str | None

The format to use for reading the files. If not provided, it will be deferred from the file extension.

None
schema str | None

The schema of the data. If None, schema is obtained from the context metadata.

None
add_metadata_column bool

Whether to include the __metadata column with file metadata in the DataFrame.

True
options dict[str, str] | None

Additional options passed to the reader.

None
delta_load_options dict[Any, Any] | DeltaLoadOptions | None

Options for delta loading, if applicable. When provided for Delta format files, enables incremental loading using delta loader strategies.

None

Raises:

Type Description
ValueError

If neither extension nor spark_format are provided, or if no location is specified.

Returns:

Type Description
PipelineContext

The context after the Action has been executed, containing the read data as a DataFrame.

Source code in src/cloe_nessy/pipeline/actions/read_files.py
def run(
    self,
    context: PipelineContext,
    *,
    location: str | None = None,
    search_subdirs: bool = False,
    extension: str | None = None,
    spark_format: str | None = None,
    schema: str | None = None,
    add_metadata_column: bool = True,
    options: dict[str, str] | None = None,
    delta_load_options: dict[Any, Any] | DeltaLoadOptions | None = None,
    **_: Any,
) -> PipelineContext:
    """Reads files from a specified location.

    Args:
        context: The context in which this Action is executed.
        location: The location from which to read files.
        search_subdirs: Recursively search subdirectories for files
            if an extension is provided.
        extension: The file extension to filter files by.
        spark_format: The format to use for reading the files. If not provided,
            it will be deferred from the file extension.
        schema: The schema of the data. If None, schema is obtained from
            the context metadata.
        add_metadata_column: Whether to include the `__metadata` column with
            file metadata in the DataFrame.
        options: Additional options passed to the reader.
        delta_load_options: Options for delta loading, if applicable. When provided
            for Delta format files, enables incremental loading using delta loader strategies.

    Raises:
        ValueError: If neither `extension` nor `spark_format` are provided, or if
            no location is specified.

    Returns:
        The context after the Action has been executed, containing the read data as a DataFrame.
    """
    if not location:
        raise ValueError("No location provided. Please specify location to read files from.")
    if not options:
        options = dict()
    if not spark_format and not extension:
        raise ValueError("Either spark_format or extension must be provided.")

    if (metadata := context.table_metadata) and schema is None:
        schema = metadata.schema

    # Convert dict to DeltaLoadOptions if needed
    if isinstance(delta_load_options, dict):
        delta_load_options = DeltaLoadOptions(**delta_load_options)

    # Set up runtime info for delta loading
    runtime_info = context.runtime_info or {}
    if delta_load_options:
        # Convert DeltaLoadOptions to dict for runtime info storage
        delta_options_dict = (
            delta_load_options.model_dump()
            if isinstance(delta_load_options, DeltaLoadOptions)
            else delta_load_options
        )
        runtime_info = set_delta_load_info(
            table_identifier=location,  # Use location as identifier for file-based delta loading
            delta_load_options=delta_options_dict,
            runtime_info=runtime_info,
        )

    file_reader = FileReader()
    df = file_reader.read(
        location=location,
        schema=schema,
        extension=extension,
        spark_format=spark_format,
        search_subdirs=search_subdirs,
        options=options,
        add_metadata_column=add_metadata_column,
        delta_load_options=delta_load_options,
    )

    # Only process metadata column if it exists and wasn't using delta loading
    if add_metadata_column and "__metadata" in df.columns:
        read_files_list = [x.file_path for x in df.select("__metadata.file_path").drop_duplicates().collect()]
        if runtime_info is None:
            runtime_info = {"read_files": read_files_list}
        else:
            try:
                runtime_info["read_files"] = list(set(runtime_info["read_files"] + read_files_list))
            except KeyError:
                runtime_info["read_files"] = read_files_list

    return context.from_existing(data=df, runtime_info=runtime_info)

ReadMetadataYAMLAction

Bases: PipelineAction

Reads table metadata from a yaml file using the Table model.

Example
Read Table Metadata:
    action: READ_METADATA_YAML_ACTION
    options:
        file_path: metadata/schemas/bronze/sales_table.yml
        catalog_name: production
        schema_name: sales_data
Read Table Metadata:
    action: READ_METADATA_YAML_ACTION
    options:
        file_path: metadata/schemas/bronze/sales_table.yml
        catalog_name: production
        schema_name: sales_data
        storage_path: abfs://external_storage/sales_data/sales_table
Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
class ReadMetadataYAMLAction(PipelineAction):
    """Reads table metadata from a yaml file using the [`Table`][cloe_nessy.models.table] model.

    Example:
        === "Managed Table"
            ```yaml
            Read Table Metadata:
                action: READ_METADATA_YAML_ACTION
                options:
                    file_path: metadata/schemas/bronze/sales_table.yml
                    catalog_name: production
                    schema_name: sales_data
            ```
        === "External Table"
            ```yaml
            Read Table Metadata:
                action: READ_METADATA_YAML_ACTION
                options:
                    file_path: metadata/schemas/bronze/sales_table.yml
                    catalog_name: production
                    schema_name: sales_data
                    storage_path: abfs://external_storage/sales_data/sales_table
            ```
    """

    name: str = "READ_METADATA_YAML_ACTION"

    def run(
        self,
        context: PipelineContext,
        *,
        file_path: str | None = None,
        catalog_name: str | None = None,
        schema_name: str | None = None,
        storage_path: str | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Reads table metadata from a yaml file using the [`Table`][cloe_nessy.models.table] model.

        Args:
            context: The context in which this Action is executed.
            file_path: The path to the file that defines the table.
            catalog_name: The name of the catalog for the table.
            schema_name: The name of the schema for the table.
            storage_path: The storage path for the table, if applicable. If not
                provided, the table will be considered a managed table.

        Raises:
            ValueError: If any issues occur while reading the table metadata, such as an invalid table,
                missing file, missing path, or missing catalog/schema names.

        Returns:
            The context after the execution of this Action, containing the table metadata.
        """
        missing_params = []
        if not file_path:
            missing_params.append("file_path")
        if not catalog_name:
            missing_params.append("catalog_name")
        if not schema_name:
            missing_params.append("schema_name")

        if missing_params:
            raise ValueError(
                f"Missing required parameters: {', '.join(missing_params)}. Please specify all required parameters."
            )

        final_file_path = Path(file_path) if file_path else Path()

        table, errors = Table.read_instance_from_file(
            final_file_path,
            catalog_name=catalog_name,
            schema_name=schema_name,
        )
        if errors:
            raise ValueError(f"Errors while reading table metadata: {errors}")
        if not table:
            raise ValueError("No table found in metadata.")

        if not table.storage_path and storage_path:
            self._console_logger.info(f"Setting storage path for table [ '{table.name}' ] to [ '{storage_path}' ]")
            table.storage_path = storage_path
            table.is_external = True

        self._console_logger.info(f"Table [ '{table.name}' ] metadata read successfully from [ '{file_path}' ]")
        return context.from_existing(table_metadata=table)

run(context, *, file_path=None, catalog_name=None, schema_name=None, storage_path=None, **_)

Reads table metadata from a yaml file using the Table model.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
file_path str | None

The path to the file that defines the table.

None
catalog_name str | None

The name of the catalog for the table.

None
schema_name str | None

The name of the schema for the table.

None
storage_path str | None

The storage path for the table, if applicable. If not provided, the table will be considered a managed table.

None

Raises:

Type Description
ValueError

If any issues occur while reading the table metadata, such as an invalid table, missing file, missing path, or missing catalog/schema names.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the table metadata.

Source code in src/cloe_nessy/pipeline/actions/read_metadata_yaml.py
def run(
    self,
    context: PipelineContext,
    *,
    file_path: str | None = None,
    catalog_name: str | None = None,
    schema_name: str | None = None,
    storage_path: str | None = None,
    **_: Any,
) -> PipelineContext:
    """Reads table metadata from a yaml file using the [`Table`][cloe_nessy.models.table] model.

    Args:
        context: The context in which this Action is executed.
        file_path: The path to the file that defines the table.
        catalog_name: The name of the catalog for the table.
        schema_name: The name of the schema for the table.
        storage_path: The storage path for the table, if applicable. If not
            provided, the table will be considered a managed table.

    Raises:
        ValueError: If any issues occur while reading the table metadata, such as an invalid table,
            missing file, missing path, or missing catalog/schema names.

    Returns:
        The context after the execution of this Action, containing the table metadata.
    """
    missing_params = []
    if not file_path:
        missing_params.append("file_path")
    if not catalog_name:
        missing_params.append("catalog_name")
    if not schema_name:
        missing_params.append("schema_name")

    if missing_params:
        raise ValueError(
            f"Missing required parameters: {', '.join(missing_params)}. Please specify all required parameters."
        )

    final_file_path = Path(file_path) if file_path else Path()

    table, errors = Table.read_instance_from_file(
        final_file_path,
        catalog_name=catalog_name,
        schema_name=schema_name,
    )
    if errors:
        raise ValueError(f"Errors while reading table metadata: {errors}")
    if not table:
        raise ValueError("No table found in metadata.")

    if not table.storage_path and storage_path:
        self._console_logger.info(f"Setting storage path for table [ '{table.name}' ] to [ '{storage_path}' ]")
        table.storage_path = storage_path
        table.is_external = True

    self._console_logger.info(f"Table [ '{table.name}' ] metadata read successfully from [ '{file_path}' ]")
    return context.from_existing(table_metadata=table)

TransformChangeDatatypeAction

Bases: PipelineAction

Changes the datatypes of specified columns in the given DataFrame.

Data Types

We make use of the PySpark cast function to change the data types of the columns. Valid data types can be found in the PySpark documentation.

Example
Cast Columns:
    action: TRANSFORM_CHANGE_DATATYPE
    options:
        columns:
            id: string
            revenue: long
Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
class TransformChangeDatatypeAction(PipelineAction):
    """Changes the datatypes of specified columns in the given DataFrame.

    !!! note "Data Types"
        We make use of the PySpark `cast` function to change the data types of
        the columns. Valid data types can be found in the [PySpark
        documentation](https://spark.apache.org/docs/3.5.3/sql-ref-datatypes.html).

    Example:
        ```yaml
        Cast Columns:
            action: TRANSFORM_CHANGE_DATATYPE
            options:
                columns:
                    id: string
                    revenue: long
        ```
    """

    name: str = "TRANSFORM_CHANGE_DATATYPE"

    def run(
        self,
        context: PipelineContext,
        *,
        columns: dict[str, str] | None = None,
        **_: Any,  # define kwargs to match the base class signature
    ) -> PipelineContext:
        """Changes the datatypes of specified columns in the given DataFrame.

        Args:
            context: The context in which this Action is executed.
            columns: A dictionary where the key is the column
                name and the value is the desired datatype.

        Raises:
            ValueError: If no columns are provided.
            ValueError: If the data from context is None.

        Returns:
            The context after the execution of this Action, containing the DataFrame with updated column datatypes.
        """
        if not columns:
            raise ValueError("No columns provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data
        change_columns = {col: F.col(col).cast(dtype) for col, dtype in columns.items()}
        df = df.withColumns(change_columns)  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, columns=None, **_)

Changes the datatypes of specified columns in the given DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
columns dict[str, str] | None

A dictionary where the key is the column name and the value is the desired datatype.

None

Raises:

Type Description
ValueError

If no columns are provided.

ValueError

If the data from context is None.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with updated column datatypes.

Source code in src/cloe_nessy/pipeline/actions/transform_change_datatype.py
def run(
    self,
    context: PipelineContext,
    *,
    columns: dict[str, str] | None = None,
    **_: Any,  # define kwargs to match the base class signature
) -> PipelineContext:
    """Changes the datatypes of specified columns in the given DataFrame.

    Args:
        context: The context in which this Action is executed.
        columns: A dictionary where the key is the column
            name and the value is the desired datatype.

    Raises:
        ValueError: If no columns are provided.
        ValueError: If the data from context is None.

    Returns:
        The context after the execution of this Action, containing the DataFrame with updated column datatypes.
    """
    if not columns:
        raise ValueError("No columns provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data
    change_columns = {col: F.col(col).cast(dtype) for col, dtype in columns.items()}
    df = df.withColumns(change_columns)  # type: ignore

    return context.from_existing(data=df)  # type: ignore

TransformCleanColumnNamesAction

Bases: PipelineAction

Fixes column names in the DataFrame to be valid.

Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.

Invalid characters include
  • Any non-word character (anything other than letters, digits, and underscores).
  • A single leading underscore.
Example
Clean Column Names:
    action: TRANSFORM_CLEAN_COLUMN_NAMES
Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
class TransformCleanColumnNamesAction(PipelineAction):
    """Fixes column names in the DataFrame to be valid.

    Removes invalid characters from the column names, including the fields of a struct and
    replaces a single leading underscore by a double underscore.

    Invalid characters include:
        - Any non-word character (anything other than letters, digits, and underscores).
        - A single leading underscore.

    Example:
        ```yaml
        Clean Column Names:
            action: TRANSFORM_CLEAN_COLUMN_NAMES
        ```
    """

    name: str = "TRANSFORM_CLEAN_COLUMN_NAMES"

    def run(
        self,
        context: PipelineContext,
        **_: Any,
    ) -> PipelineContext:
        """Fixes column names in the DataFrame to be valid.

        Removes invalid characters from the column names, including the fields of a struct and
        replaces a single leading underscore by a double underscore.

        Args:
            context: The context in which this Action is executed.

        Raises:
            ValueError: If the data from the context is None.

        Returns:
            The context after the execution of this Action, containing the DataFrame with cleaned column names.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        with_columns_renamed = {}
        with_columns_casted: dict[str, T.StructType | T.ArrayType | T.MapType] = {}

        single_underscrore_at_beginning = r"^_(?=[^_])"

        for c in context.data.schema:
            old_name = c.name
            new_name = re.sub(single_underscrore_at_beginning, "__", re.sub(r"\W", "_", old_name))
            with_columns_renamed[old_name] = new_name

            if isinstance(c.dataType, (T.StructType | T.ArrayType | T.MapType)):
                old_column_schema = c.dataType.json()
                new_column_schema = re.sub(
                    r'(?<="name":")[^"]+',
                    lambda m: re.sub(r"\W", "_", str(m.group())),
                    old_column_schema,
                )
                if isinstance(c.dataType, T.StructType):
                    with_columns_casted[new_name] = T.StructType.fromJson(json.loads(new_column_schema))
                elif isinstance(c.dataType, T.ArrayType):
                    with_columns_casted[new_name] = T.ArrayType.fromJson(json.loads(new_column_schema))
                elif isinstance(c.dataType, T.MapType):
                    with_columns_casted[new_name] = T.MapType.fromJson(json.loads(new_column_schema))

        df = context.data.withColumnsRenamed(with_columns_renamed)
        for c_name, c_type in with_columns_casted.items():
            df = df.withColumn(c_name, F.col(c_name).cast(c_type))

        return context.from_existing(data=df)  # type: ignore

run(context, **_)

Fixes column names in the DataFrame to be valid.

Removes invalid characters from the column names, including the fields of a struct and replaces a single leading underscore by a double underscore.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required

Raises:

Type Description
ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with cleaned column names.

Source code in src/cloe_nessy/pipeline/actions/transform_clean_column_names.py
def run(
    self,
    context: PipelineContext,
    **_: Any,
) -> PipelineContext:
    """Fixes column names in the DataFrame to be valid.

    Removes invalid characters from the column names, including the fields of a struct and
    replaces a single leading underscore by a double underscore.

    Args:
        context: The context in which this Action is executed.

    Raises:
        ValueError: If the data from the context is None.

    Returns:
        The context after the execution of this Action, containing the DataFrame with cleaned column names.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    with_columns_renamed = {}
    with_columns_casted: dict[str, T.StructType | T.ArrayType | T.MapType] = {}

    single_underscrore_at_beginning = r"^_(?=[^_])"

    for c in context.data.schema:
        old_name = c.name
        new_name = re.sub(single_underscrore_at_beginning, "__", re.sub(r"\W", "_", old_name))
        with_columns_renamed[old_name] = new_name

        if isinstance(c.dataType, (T.StructType | T.ArrayType | T.MapType)):
            old_column_schema = c.dataType.json()
            new_column_schema = re.sub(
                r'(?<="name":")[^"]+',
                lambda m: re.sub(r"\W", "_", str(m.group())),
                old_column_schema,
            )
            if isinstance(c.dataType, T.StructType):
                with_columns_casted[new_name] = T.StructType.fromJson(json.loads(new_column_schema))
            elif isinstance(c.dataType, T.ArrayType):
                with_columns_casted[new_name] = T.ArrayType.fromJson(json.loads(new_column_schema))
            elif isinstance(c.dataType, T.MapType):
                with_columns_casted[new_name] = T.MapType.fromJson(json.loads(new_column_schema))

    df = context.data.withColumnsRenamed(with_columns_renamed)
    for c_name, c_type in with_columns_casted.items():
        df = df.withColumn(c_name, F.col(c_name).cast(c_type))

    return context.from_existing(data=df)  # type: ignore

TransformConcatColumnsAction

Bases: PipelineAction

Concatenates the specified columns in the given DataFrame.

Example
Concat Columns:
    action: TRANSFORM_CONCAT_COLUMNS
    options:
        name: address
        columns:
            - street
            - postcode
            - country
        separator: ', '
Concat Column:
    action: TRANSFORM_CONCAT_COLUMNS
    options:
        name: address
        columns:
            - street
            - postcode
            - country

beware of null handling

The separator option is not provided, so the default behavior is to use concat which returns NULL if any of the concatenated values is NULL.

Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
class TransformConcatColumnsAction(PipelineAction):
    """Concatenates the specified columns in the given DataFrame.

    Example:
        === "concat with separator"
            ```yaml
            Concat Columns:
                action: TRANSFORM_CONCAT_COLUMNS
                options:
                    name: address
                    columns:
                        - street
                        - postcode
                        - country
                    separator: ', '
            ```
        === "concat without separator"
            ```yaml
            Concat Column:
                action: TRANSFORM_CONCAT_COLUMNS
                options:
                    name: address
                    columns:
                        - street
                        - postcode
                        - country
            ```
            !!! warning "beware of null handling"
                The `separator` option is not provided, so the default behavior is to use `concat` which returns `NULL` if any of the concatenated values is `NULL`.
    """

    name: str = "TRANSFORM_CONCAT_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        name: str = "",
        columns: list[str] | None = None,
        separator: str | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Concatenates the specified columns in the given DataFrame.

        !!!warning

            # Null Handling Behavior

            The behavior of null handling differs based on whether a `separator` is provided:

            - **When `separator` is specified**: The function uses Spark's
                `concat_ws`, which **ignores `NULL` values**. In this case, `NULL`
                values are treated as empty strings (`""`) and are excluded from the
                final concatenated result.
            - **When `separator` is not specified**: The function defaults to
                using Spark's `concat`, which **returns `NULL` if any of the
                concatenated values is `NULL`**. This means the presence of a `NULL`
                in any input will make the entire output `NULL`.

        Args:
            context: The context in which this Action is executed.
            name: The name of the new concatenated column.
            columns: A list of columns to be concatenated.
            separator: The separator used between concatenated column values.

        Raises:
            ValueError: If no name is provided.
            ValueError: If no columns are provided.
            ValueError: If the data from context is None.
            ValueError: If 'columns' is not a list.

        Returns:
            The context after the execution of this Action, containing the
                DataFrame with the concatenated column.
        """
        if not name:
            raise ValueError("No name provided.")
        if not columns:
            raise ValueError("No columns provided.")

        if context.data is None:
            raise ValueError("The data from context is required for the operation.")

        df = context.data

        if isinstance(columns, list):
            if separator:
                df = df.withColumn(name, F.concat_ws(separator, *columns))  # type: ignore
            else:
                df = df.withColumn(name, F.concat(*columns))  # type: ignore
        else:
            raise ValueError("'columns' should be a list, like ['col1', 'col2',]")

        return context.from_existing(data=df)  # type: ignore

run(context, *, name='', columns=None, separator=None, **_)

Concatenates the specified columns in the given DataFrame.

Warning

Null Handling Behavior

The behavior of null handling differs based on whether a separator is provided:

  • When separator is specified: The function uses Spark's concat_ws, which ignores NULL values. In this case, NULL values are treated as empty strings ("") and are excluded from the final concatenated result.
  • When separator is not specified: The function defaults to using Spark's concat, which returns NULL if any of the concatenated values is NULL. This means the presence of a NULL in any input will make the entire output NULL.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
name str

The name of the new concatenated column.

''
columns list[str] | None

A list of columns to be concatenated.

None
separator str | None

The separator used between concatenated column values.

None

Raises:

Type Description
ValueError

If no name is provided.

ValueError

If no columns are provided.

ValueError

If the data from context is None.

ValueError

If 'columns' is not a list.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with the concatenated column.

Source code in src/cloe_nessy/pipeline/actions/transform_concat_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    name: str = "",
    columns: list[str] | None = None,
    separator: str | None = None,
    **_: Any,
) -> PipelineContext:
    """Concatenates the specified columns in the given DataFrame.

    !!!warning

        # Null Handling Behavior

        The behavior of null handling differs based on whether a `separator` is provided:

        - **When `separator` is specified**: The function uses Spark's
            `concat_ws`, which **ignores `NULL` values**. In this case, `NULL`
            values are treated as empty strings (`""`) and are excluded from the
            final concatenated result.
        - **When `separator` is not specified**: The function defaults to
            using Spark's `concat`, which **returns `NULL` if any of the
            concatenated values is `NULL`**. This means the presence of a `NULL`
            in any input will make the entire output `NULL`.

    Args:
        context: The context in which this Action is executed.
        name: The name of the new concatenated column.
        columns: A list of columns to be concatenated.
        separator: The separator used between concatenated column values.

    Raises:
        ValueError: If no name is provided.
        ValueError: If no columns are provided.
        ValueError: If the data from context is None.
        ValueError: If 'columns' is not a list.

    Returns:
        The context after the execution of this Action, containing the
            DataFrame with the concatenated column.
    """
    if not name:
        raise ValueError("No name provided.")
    if not columns:
        raise ValueError("No columns provided.")

    if context.data is None:
        raise ValueError("The data from context is required for the operation.")

    df = context.data

    if isinstance(columns, list):
        if separator:
            df = df.withColumn(name, F.concat_ws(separator, *columns))  # type: ignore
        else:
            df = df.withColumn(name, F.concat(*columns))  # type: ignore
    else:
        raise ValueError("'columns' should be a list, like ['col1', 'col2',]")

    return context.from_existing(data=df)  # type: ignore

TransformConvertTimestampAction

Bases: PipelineAction

This action performs timestamp based conversions.

Example
Convert Timestamp:
    action: TRANSFORM_CONVERT_TIMESTAMP
    options:
        columns:
            - date
            - creation_timestamp
            - current_ts
        source_format: unixtime_ms
        target_format: timestamp
Source code in src/cloe_nessy/pipeline/actions/transform_convert_timestamp.py
class TransformConvertTimestampAction(PipelineAction):
    """This action performs timestamp based conversions.

    Example:
        ```yaml
        Convert Timestamp:
            action: TRANSFORM_CONVERT_TIMESTAMP
            options:
                columns:
                    - date
                    - creation_timestamp
                    - current_ts
                source_format: unixtime_ms
                target_format: timestamp
        ```
    """

    name: str = "TRANSFORM_CONVERT_TIMESTAMP"

    def run(
        self,
        context: PipelineContext,
        *,
        columns: list[str] | str | None = None,
        source_format: str = "",
        target_format: str = "",
        **_: Any,
    ) -> PipelineContext:
        """Converts column(s) from a given source format to a new format.

        Args:
            context: Context in which this Action is executed.
            columns: A column name or a list of column names that should be converted.
            source_format: Initial format type of the column.
            target_format: Desired format type of the column.
                This also supports passing a format string like `yyyy-MM-dd HH:mm:ss`.

        Raises:
            ValueError: If no column, source_format or target_format are provided.
            ValueError: If source_format or target_format are not supported.

        Returns:
            PipelineContext: Context after the execution of this Action.
        """
        if not columns:
            raise ValueError("No column names provided.")
        if not source_format:
            raise ValueError("No source_format provided.")
        if not target_format:
            raise ValueError("No target_format provided.")
        if context.data is None:
            raise ValueError("Context DataFrame is required.")
        df = context.data

        columns = [columns] if isinstance(columns, str) else columns

        match source_format:
            # convert always to timestamp first
            case "string" | "date" | "unixtime":
                for column in columns:
                    df = df.withColumn(column, F.to_timestamp(F.col(column)))
            case "unixtime_ms":
                for column in columns:
                    df = df.withColumn(column, F.to_timestamp(F.col(column) / 1000))
            case "timestamp":
                pass
            case _:
                raise ValueError(f"Unknown source_format {source_format}")

        match target_format:
            # convert from timestamp to desired output type and format
            case "timestamp":
                pass
            case "unixtime":
                for column in columns:
                    df = df.withColumn(column, F.to_unix_timestamp(F.col(column)))
            case "date":
                for column in columns:
                    df = df.withColumn(column, F.to_date(F.col(column)))
            case _:
                try:
                    for column in columns:
                        df = df.withColumn(column, F.date_format(F.col(column), target_format))
                except (IllegalArgumentException, AnalysisException) as e:
                    raise ValueError(f"Invalid target_format {target_format}") from e

        return context.from_existing(data=df)

run(context, *, columns=None, source_format='', target_format='', **_)

Converts column(s) from a given source format to a new format.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
columns list[str] | str | None

A column name or a list of column names that should be converted.

None
source_format str

Initial format type of the column.

''
target_format str

Desired format type of the column. This also supports passing a format string like yyyy-MM-dd HH:mm:ss.

''

Raises:

Type Description
ValueError

If no column, source_format or target_format are provided.

ValueError

If source_format or target_format are not supported.

Returns:

Name Type Description
PipelineContext PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_convert_timestamp.py
def run(
    self,
    context: PipelineContext,
    *,
    columns: list[str] | str | None = None,
    source_format: str = "",
    target_format: str = "",
    **_: Any,
) -> PipelineContext:
    """Converts column(s) from a given source format to a new format.

    Args:
        context: Context in which this Action is executed.
        columns: A column name or a list of column names that should be converted.
        source_format: Initial format type of the column.
        target_format: Desired format type of the column.
            This also supports passing a format string like `yyyy-MM-dd HH:mm:ss`.

    Raises:
        ValueError: If no column, source_format or target_format are provided.
        ValueError: If source_format or target_format are not supported.

    Returns:
        PipelineContext: Context after the execution of this Action.
    """
    if not columns:
        raise ValueError("No column names provided.")
    if not source_format:
        raise ValueError("No source_format provided.")
    if not target_format:
        raise ValueError("No target_format provided.")
    if context.data is None:
        raise ValueError("Context DataFrame is required.")
    df = context.data

    columns = [columns] if isinstance(columns, str) else columns

    match source_format:
        # convert always to timestamp first
        case "string" | "date" | "unixtime":
            for column in columns:
                df = df.withColumn(column, F.to_timestamp(F.col(column)))
        case "unixtime_ms":
            for column in columns:
                df = df.withColumn(column, F.to_timestamp(F.col(column) / 1000))
        case "timestamp":
            pass
        case _:
            raise ValueError(f"Unknown source_format {source_format}")

    match target_format:
        # convert from timestamp to desired output type and format
        case "timestamp":
            pass
        case "unixtime":
            for column in columns:
                df = df.withColumn(column, F.to_unix_timestamp(F.col(column)))
        case "date":
            for column in columns:
                df = df.withColumn(column, F.to_date(F.col(column)))
        case _:
            try:
                for column in columns:
                    df = df.withColumn(column, F.date_format(F.col(column), target_format))
            except (IllegalArgumentException, AnalysisException) as e:
                raise ValueError(f"Invalid target_format {target_format}") from e

    return context.from_existing(data=df)

TransformDecodeAction

Bases: PipelineAction

Decodes values of a specified column in the DataFrame based on the given format.

Example
Expand JSON:
    action: "TRANSFORM_DECODE"
    options:
        column: "data"
        input_format: "json"
        schema: "quality INT, timestamp TIMESTAMP, value DOUBLE"
Decode base64:
    action: TRANSFORM_DECODE
    options:
        column: encoded_data
        input_format: base64
        schema: string
Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
class TransformDecodeAction(PipelineAction):
    """Decodes values of a specified column in the DataFrame based on the given format.

    Example:
        === "Decode JSON column"
            ```yaml
            Expand JSON:
                action: "TRANSFORM_DECODE"
                options:
                    column: "data"
                    input_format: "json"
                    schema: "quality INT, timestamp TIMESTAMP, value DOUBLE"
            ```
        === "Decode base64 column"
            ```yaml
            Decode base64:
                action: TRANSFORM_DECODE
                options:
                    column: encoded_data
                    input_format: base64
                    schema: string
            ```
    """

    name: str = "TRANSFORM_DECODE"

    def run(
        self,
        context: PipelineContext,
        *,
        column: str | None = None,
        input_format: str | None = None,
        schema: str | None = None,
        **_: Any,  # define kwargs to match the base class signature
    ) -> PipelineContext:
        """Decodes values of a specified column in the DataFrame based on the given format.

        Args:
            context: The context in which this Action is executed.
            column: The name of the column that should be decoded.
            input_format: The format from which the column should be decoded.
                Currently supported formats are 'base64' and 'json'.
            schema: For JSON input, the schema of the JSON object. If empty,
                the schema is inferred from the first row of the DataFrame. For base64 input,
                the data type to which the column is cast.

        Raises:
            ValueError: If no column is specified.
            ValueError: If no input_format is specified.
            ValueError: If the data from context is None.
            ValueError: If an invalid input_format is provided.

        Returns:
            The context after the execution of this Action, containing the DataFrame with the decoded column(s).
        """
        if not column:
            raise ValueError("No column specified.")
        if not input_format:
            raise ValueError("No input_format specified")
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data
        match input_format.lower():
            case "base64":
                df = self._decode_base64(df, column, schema)  # type: ignore
            case "json":
                df = self._decode_json(df, column, schema)  # type: ignore
            case _:
                raise ValueError(
                    f"Invalid input_format: [ '{input_format}' ]. Please specify a valid format to decode.",
                )

        return context.from_existing(data=df)  # type: ignore

    def _decode_base64(self, df: DataFrame, column: str, base64_schema: str | None):
        """Decode base64 column."""
        df_decoded = df.withColumn(column, unbase64(col(column)))
        if base64_schema:
            df_decoded = df_decoded.withColumn(column, col(column).cast(base64_schema))
        return df_decoded

    def _decode_json(self, df: DataFrame, column: str, json_schema: str | None):
        """Decode json column."""
        distinct_schemas = (
            df.select(column)
            .withColumn("json_schema", schema_of_json(col(column)))
            .select("json_schema")
            .dropDuplicates()
        )
        if not (json_schema or distinct_schemas.count() > 0):
            raise RuntimeError("Cannot infer schema from empty DataFrame.")

        elif distinct_schemas.count() > 1:
            raise RuntimeError(f"There is more than one JSON schema in column {column}.")

        if json_schema is None:
            final_json_schema = distinct_schemas.collect()[0].json_schema
        else:
            final_json_schema = json_schema  # type: ignore

        df_decoded = df.withColumn(column, from_json(col(column), final_json_schema)).select(*df.columns, f"{column}.*")

        return df_decoded

run(context, *, column=None, input_format=None, schema=None, **_)

Decodes values of a specified column in the DataFrame based on the given format.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
column str | None

The name of the column that should be decoded.

None
input_format str | None

The format from which the column should be decoded. Currently supported formats are 'base64' and 'json'.

None
schema str | None

For JSON input, the schema of the JSON object. If empty, the schema is inferred from the first row of the DataFrame. For base64 input, the data type to which the column is cast.

None

Raises:

Type Description
ValueError

If no column is specified.

ValueError

If no input_format is specified.

ValueError

If the data from context is None.

ValueError

If an invalid input_format is provided.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with the decoded column(s).

Source code in src/cloe_nessy/pipeline/actions/transform_decode.py
def run(
    self,
    context: PipelineContext,
    *,
    column: str | None = None,
    input_format: str | None = None,
    schema: str | None = None,
    **_: Any,  # define kwargs to match the base class signature
) -> PipelineContext:
    """Decodes values of a specified column in the DataFrame based on the given format.

    Args:
        context: The context in which this Action is executed.
        column: The name of the column that should be decoded.
        input_format: The format from which the column should be decoded.
            Currently supported formats are 'base64' and 'json'.
        schema: For JSON input, the schema of the JSON object. If empty,
            the schema is inferred from the first row of the DataFrame. For base64 input,
            the data type to which the column is cast.

    Raises:
        ValueError: If no column is specified.
        ValueError: If no input_format is specified.
        ValueError: If the data from context is None.
        ValueError: If an invalid input_format is provided.

    Returns:
        The context after the execution of this Action, containing the DataFrame with the decoded column(s).
    """
    if not column:
        raise ValueError("No column specified.")
    if not input_format:
        raise ValueError("No input_format specified")
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data
    match input_format.lower():
        case "base64":
            df = self._decode_base64(df, column, schema)  # type: ignore
        case "json":
            df = self._decode_json(df, column, schema)  # type: ignore
        case _:
            raise ValueError(
                f"Invalid input_format: [ '{input_format}' ]. Please specify a valid format to decode.",
            )

    return context.from_existing(data=df)  # type: ignore

TransformDeduplication

Bases: PipelineAction

Deduplicates the data from the given DataFrame.

This method deduplicates the data where the key columns are the same and keeps the entry with the highest values in the order_by_columns (can be changed to lowest by setting the parameter descending to false).

Example
Deduplicate Columns:
    action: TRANSFORM_DEDUPLICATION
    options:
        key_columns:
            - id
        order_by_columns:
            - source_file_modification_time
Source code in src/cloe_nessy/pipeline/actions/transform_deduplication.py
class TransformDeduplication(PipelineAction):
    """Deduplicates the data from the given DataFrame.

    This method deduplicates the data where the key columns are the same
    and keeps the entry with the highest values in the order_by_columns
    (can be changed to lowest by setting the parameter descending to false).

    Example:
        ```yaml
        Deduplicate Columns:
            action: TRANSFORM_DEDUPLICATION
            options:
                key_columns:
                    - id
                order_by_columns:
                    - source_file_modification_time
        ```
    """

    name: str = "TRANSFORM_DEDUPLICATION"

    def run(
        self,
        context: PipelineContext,
        *,
        key_columns: list[str] | None = None,
        order_by_columns: list[str] | None = None,
        descending: bool = True,
        **_: Any,
    ) -> PipelineContext:
        """Deduplicates the data based on key columns and order by columns.

        Args:
            context: The context in which this Action is executed.
            key_columns: A list of the key column names. The returned data only keeps one
                line of data with the same key columns.
            order_by_columns: A list of order by column names. The returned data keeps the
                first line of data with the same key columns ordered by these columns.
            descending: Whether to sort descending or ascending.

        Raises:
            ValueError: If no key_columns are specified.
            ValueError: If no order_by_columns are specified.
            ValueError: If the data from context is None.
            ValueError: If key_columns and order_by_columns overlap.
            ValueError: If key_columns or order_by_columns contain Nulls.

        Returns:
            The context after the execution of this Action, containing the DataFrame with the deduplicated data.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")
        if key_columns is None:
            raise ValueError("Please provide at least one key column.")
        if order_by_columns is None:
            raise ValueError("Please provide at least one order by column.")

        # check if the key_columns and order_by_columns are the same
        if len(set(key_columns) & set(order_by_columns)) != 0:
            raise ValueError("The key_columns and order_by_columns cannot contain the same column")

        # check if the key_columns and order_by_columns are not null
        df_nulls = context.data.filter(F.greatest(*[F.col(c).isNull() for c in key_columns + order_by_columns]) == 1)  # type: ignore[misc]
        if df_nulls.head(1):  # if the filteredDataFrame is not empty
            raise ValueError(
                "The key_columns and order_by_columns cannot be null. Please check the quality of the provided columns (null handling)"
            )

        # check if the order_by columns have the preferred data types
        recommended_order_by_data_types = [
            T.TimestampType(),
            T.TimestampNTZType(),
            T.DataType(),
            T.IntegerType(),
            T.LongType(),
            T.DoubleType(),
            T.FloatType(),
            T.DecimalType(),
        ]

        for c in context.data.schema:
            if c.name in order_by_columns and c.dataType not in recommended_order_by_data_types:
                log_message = (
                    f"action_name : {self.name} | message : order_by_column `{c.name}` is of type {c.dataType}; "
                    "recommended data types are {recommended_order_by_data_types}"
                )
                self._console_logger.warning(log_message)
                self._tabular_logger.warning(log_message)

        # sort the order_by columns in the preferred order
        if descending:
            order_by_list = [F.col(col_name).desc() for col_name in order_by_columns]  # type: ignore[misc]
        else:
            order_by_list = [F.col(col_name).asc() for col_name in order_by_columns]  # type: ignore[misc]

        window_specification = (
            Window.partitionBy(key_columns)
            .orderBy(order_by_list)
            .rowsBetween(Window.unboundedPreceding, Window.currentRow)
        )

        row_number_col_name = generate_unique_column_name(existing_columns=set(context.data.columns), prefix="row_num")

        df = (
            context.data.withColumn(row_number_col_name, F.row_number().over(window_specification))
            .filter(F.col(row_number_col_name) == 1)
            .drop(row_number_col_name)
        )
        return context.from_existing(data=df)

run(context, *, key_columns=None, order_by_columns=None, descending=True, **_)

Deduplicates the data based on key columns and order by columns.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
key_columns list[str] | None

A list of the key column names. The returned data only keeps one line of data with the same key columns.

None
order_by_columns list[str] | None

A list of order by column names. The returned data keeps the first line of data with the same key columns ordered by these columns.

None
descending bool

Whether to sort descending or ascending.

True

Raises:

Type Description
ValueError

If no key_columns are specified.

ValueError

If no order_by_columns are specified.

ValueError

If the data from context is None.

ValueError

If key_columns and order_by_columns overlap.

ValueError

If key_columns or order_by_columns contain Nulls.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with the deduplicated data.

Source code in src/cloe_nessy/pipeline/actions/transform_deduplication.py
def run(
    self,
    context: PipelineContext,
    *,
    key_columns: list[str] | None = None,
    order_by_columns: list[str] | None = None,
    descending: bool = True,
    **_: Any,
) -> PipelineContext:
    """Deduplicates the data based on key columns and order by columns.

    Args:
        context: The context in which this Action is executed.
        key_columns: A list of the key column names. The returned data only keeps one
            line of data with the same key columns.
        order_by_columns: A list of order by column names. The returned data keeps the
            first line of data with the same key columns ordered by these columns.
        descending: Whether to sort descending or ascending.

    Raises:
        ValueError: If no key_columns are specified.
        ValueError: If no order_by_columns are specified.
        ValueError: If the data from context is None.
        ValueError: If key_columns and order_by_columns overlap.
        ValueError: If key_columns or order_by_columns contain Nulls.

    Returns:
        The context after the execution of this Action, containing the DataFrame with the deduplicated data.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")
    if key_columns is None:
        raise ValueError("Please provide at least one key column.")
    if order_by_columns is None:
        raise ValueError("Please provide at least one order by column.")

    # check if the key_columns and order_by_columns are the same
    if len(set(key_columns) & set(order_by_columns)) != 0:
        raise ValueError("The key_columns and order_by_columns cannot contain the same column")

    # check if the key_columns and order_by_columns are not null
    df_nulls = context.data.filter(F.greatest(*[F.col(c).isNull() for c in key_columns + order_by_columns]) == 1)  # type: ignore[misc]
    if df_nulls.head(1):  # if the filteredDataFrame is not empty
        raise ValueError(
            "The key_columns and order_by_columns cannot be null. Please check the quality of the provided columns (null handling)"
        )

    # check if the order_by columns have the preferred data types
    recommended_order_by_data_types = [
        T.TimestampType(),
        T.TimestampNTZType(),
        T.DataType(),
        T.IntegerType(),
        T.LongType(),
        T.DoubleType(),
        T.FloatType(),
        T.DecimalType(),
    ]

    for c in context.data.schema:
        if c.name in order_by_columns and c.dataType not in recommended_order_by_data_types:
            log_message = (
                f"action_name : {self.name} | message : order_by_column `{c.name}` is of type {c.dataType}; "
                "recommended data types are {recommended_order_by_data_types}"
            )
            self._console_logger.warning(log_message)
            self._tabular_logger.warning(log_message)

    # sort the order_by columns in the preferred order
    if descending:
        order_by_list = [F.col(col_name).desc() for col_name in order_by_columns]  # type: ignore[misc]
    else:
        order_by_list = [F.col(col_name).asc() for col_name in order_by_columns]  # type: ignore[misc]

    window_specification = (
        Window.partitionBy(key_columns)
        .orderBy(order_by_list)
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
    )

    row_number_col_name = generate_unique_column_name(existing_columns=set(context.data.columns), prefix="row_num")

    df = (
        context.data.withColumn(row_number_col_name, F.row_number().over(window_specification))
        .filter(F.col(row_number_col_name) == 1)
        .drop(row_number_col_name)
    )
    return context.from_existing(data=df)

TransformDistinctAction

Bases: PipelineAction

Selects distinct rows from the DataFrame in the given context.

If a subset is given these columns are used for duplicate comparison. If no subset is given all columns are used.

Example
Distinct Columns:
    action: TRANSFORM_DISTINCT
    options:
        subset:
            - first_name
            - last_name
Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
class TransformDistinctAction(PipelineAction):
    """Selects distinct rows from the DataFrame in the given context.

    If a subset is given these columns are used for duplicate comparison. If no subset is given all columns are used.

    Example:
        ```yaml
        Distinct Columns:
            action: TRANSFORM_DISTINCT
            options:
                subset:
                    - first_name
                    - last_name
        ```
    """

    name: str = "TRANSFORM_DISTINCT"

    def run(
        self,
        context: PipelineContext,
        *,
        subset: list[str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Selects distinct rows from the DataFrame in the given context.

        Args:
            context: The context in which this Action is executed.
            subset: List of column names to use for duplicate comparison (default All columns).

        Raises:
            ValueError: If the data from the context is None.

        Returns:
            The context after the execution of this Action, containing the DataFrame with distinct rows.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        # check if all columns that are part of the subset are actually part of the dataframe.
        if subset is not None:
            subset_columns_not_in_dataframe = set(subset) - set(context.data.columns)
            if len(subset_columns_not_in_dataframe) != 0:
                raise ValueError(
                    f"The following subset columns are not part of the dataframe: {subset_columns_not_in_dataframe}"
                )

        df = context.data.dropDuplicates(subset=subset)

        return context.from_existing(data=df)  # type: ignore

run(context, *, subset=None, **_)

Selects distinct rows from the DataFrame in the given context.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this Action is executed.

required
subset list[str] | None

List of column names to use for duplicate comparison (default All columns).

None

Raises:

Type Description
ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

The context after the execution of this Action, containing the DataFrame with distinct rows.

Source code in src/cloe_nessy/pipeline/actions/transform_distinct.py
def run(
    self,
    context: PipelineContext,
    *,
    subset: list[str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Selects distinct rows from the DataFrame in the given context.

    Args:
        context: The context in which this Action is executed.
        subset: List of column names to use for duplicate comparison (default All columns).

    Raises:
        ValueError: If the data from the context is None.

    Returns:
        The context after the execution of this Action, containing the DataFrame with distinct rows.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    # check if all columns that are part of the subset are actually part of the dataframe.
    if subset is not None:
        subset_columns_not_in_dataframe = set(subset) - set(context.data.columns)
        if len(subset_columns_not_in_dataframe) != 0:
            raise ValueError(
                f"The following subset columns are not part of the dataframe: {subset_columns_not_in_dataframe}"
            )

    df = context.data.dropDuplicates(subset=subset)

    return context.from_existing(data=df)  # type: ignore

TransformFilterAction

Bases: PipelineAction

Filters the DataFrame in the given context based on a specified condition.

Example
Filter Columns:
    action: TRANSFORM_FILTER
    options:
        condition: city="Hamburg"
Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
class TransformFilterAction(PipelineAction):
    """Filters the DataFrame in the given context based on a specified condition.

    Example:
        ```yaml
        Filter Columns:
            action: TRANSFORM_FILTER
            options:
                condition: city="Hamburg"
        ```
    """

    name: str = "TRANSFORM_FILTER"

    def run(
        self,
        context: PipelineContext,
        *,
        condition: str = "",
        **_: Any,
    ) -> PipelineContext:
        """Filters the DataFrame in the given context based on a specified condition.

        Args:
            context: Context in which this Action is executed.
            condition: A SQL-like expression used to filter the DataFrame.

        Raises:
            ValueError: If no condition is provided.
            ValueError: If the data from the context is None.

        Returns:
            Context after the execution of this Action, containing the filtered DataFrame.
        """
        if not condition:
            raise ValueError("No condition provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data

        df_filtered = df.filter(condition=condition)

        return context.from_existing(data=df_filtered)  # type: ignore

run(context, *, condition='', **_)

Filters the DataFrame in the given context based on a specified condition.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
condition str

A SQL-like expression used to filter the DataFrame.

''

Raises:

Type Description
ValueError

If no condition is provided.

ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action, containing the filtered DataFrame.

Source code in src/cloe_nessy/pipeline/actions/transform_filter.py
def run(
    self,
    context: PipelineContext,
    *,
    condition: str = "",
    **_: Any,
) -> PipelineContext:
    """Filters the DataFrame in the given context based on a specified condition.

    Args:
        context: Context in which this Action is executed.
        condition: A SQL-like expression used to filter the DataFrame.

    Raises:
        ValueError: If no condition is provided.
        ValueError: If the data from the context is None.

    Returns:
        Context after the execution of this Action, containing the filtered DataFrame.
    """
    if not condition:
        raise ValueError("No condition provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data

    df_filtered = df.filter(condition=condition)

    return context.from_existing(data=df_filtered)  # type: ignore

TransformGroupAggregate

Bases: PipelineAction

Performs aggregation operations on grouped data within a DataFrame.

This class allows you to group data by specified columns and apply various aggregation functions to other columns. The aggregation functions can be specified as a dictionary where keys are column names and values are either a single aggregation function or a list of functions.

The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation function as a prefix to the column name.

Example
Transform Group Aggregate:
    action: TRANSFORM_GROUP_AGGREGATE
    options:
        grouping_columns:
            - column1
            - column2
        aggregations:
            column3:
                - sum
                - avg
            column4: max

This example groups the DataFrame by column1 and column2 and aggregates column3 by sum and average and column4 by max. The resulting DataFrame will contain the grouped columns column1 and column2 and the aggregated columns sum_column3, avg_column3, and max_column4.

Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
class TransformGroupAggregate(PipelineAction):
    """Performs aggregation operations on grouped data within a DataFrame.

    This class allows you to group data by specified columns and apply various aggregation functions
    to other columns. The aggregation functions can be specified as a dictionary where keys are column names
    and values are either a single aggregation function or a list of functions.

    The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation
    function as a prefix to the column name.

    Example:
        ```yaml
        Transform Group Aggregate:
            action: TRANSFORM_GROUP_AGGREGATE
            options:
                grouping_columns:
                    - column1
                    - column2
                aggregations:
                    column3:
                        - sum
                        - avg
                    column4: max
        ```

        This example groups the DataFrame by `column1` and `column2` and aggregates `column3` by sum and average
        and `column4` by max. The resulting DataFrame will contain the grouped columns `column1` and `column2`
        and the aggregated columns `sum_column3`, `avg_column3`, and `max_column4`.
    """

    name: str = "TRANSFORM_GROUP_AGGREGATE"

    def run(
        self,
        context: PipelineContext,
        *,
        grouping_columns: list[str] | None = None,
        aggregations: dict[str, str | list] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Executes the aggregation on the grouped data.

        Args:
            context: The context in which this action is executed.
            grouping_columns: A list of columns to group by.
            aggregations: A dictionary where keys are column names and values are either a single
                aggregation function or a list of functions.

        Raises:
            ValueError: If the context data is None.
            ValueError: If no aggregations are provided.
            ValueError: If invalid aggregation operations are provided.
            ValueError: If columns with unsupported data types are included in the aggregations.

        Returns:
            PipelineContext: The context after the execution of this action.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        if grouping_columns is None:
            raise ValueError("Please provide at least one grouping column")
        if aggregations is None:
            raise ValueError("Please provide aggregations.")

        valid_operations = ["avg", "max", "min", "mean", "sum", "count"]

        for operation in aggregations.values():
            if isinstance(operation, list):
                if not set(operation).issubset(valid_operations):
                    raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
            elif isinstance(operation, str):
                if operation not in valid_operations:
                    raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
            else:
                raise ValueError("OPERATION DATATYPE INVALID")

        aggregation_list = []
        for column_name, aggregation in aggregations.items():
            if isinstance(aggregation, list):
                for subaggregation in aggregation:
                    aggregation_list.append(
                        getattr(F, subaggregation)(column_name).alias(f"{subaggregation}_{column_name}")
                    )
            else:
                aggregation_list.append(getattr(F, aggregation)(column_name).alias(f"{aggregation}_{column_name}"))

        df = context.data.groupBy(grouping_columns).agg(*aggregation_list)

        return context.from_existing(data=df)

run(context, *, grouping_columns=None, aggregations=None, **_)

Executes the aggregation on the grouped data.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this action is executed.

required
grouping_columns list[str] | None

A list of columns to group by.

None
aggregations dict[str, str | list] | None

A dictionary where keys are column names and values are either a single aggregation function or a list of functions.

None

Raises:

Type Description
ValueError

If the context data is None.

ValueError

If no aggregations are provided.

ValueError

If invalid aggregation operations are provided.

ValueError

If columns with unsupported data types are included in the aggregations.

Returns:

Name Type Description
PipelineContext PipelineContext

The context after the execution of this action.

Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
def run(
    self,
    context: PipelineContext,
    *,
    grouping_columns: list[str] | None = None,
    aggregations: dict[str, str | list] | None = None,
    **_: Any,
) -> PipelineContext:
    """Executes the aggregation on the grouped data.

    Args:
        context: The context in which this action is executed.
        grouping_columns: A list of columns to group by.
        aggregations: A dictionary where keys are column names and values are either a single
            aggregation function or a list of functions.

    Raises:
        ValueError: If the context data is None.
        ValueError: If no aggregations are provided.
        ValueError: If invalid aggregation operations are provided.
        ValueError: If columns with unsupported data types are included in the aggregations.

    Returns:
        PipelineContext: The context after the execution of this action.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    if grouping_columns is None:
        raise ValueError("Please provide at least one grouping column")
    if aggregations is None:
        raise ValueError("Please provide aggregations.")

    valid_operations = ["avg", "max", "min", "mean", "sum", "count"]

    for operation in aggregations.values():
        if isinstance(operation, list):
            if not set(operation).issubset(valid_operations):
                raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
        elif isinstance(operation, str):
            if operation not in valid_operations:
                raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
        else:
            raise ValueError("OPERATION DATATYPE INVALID")

    aggregation_list = []
    for column_name, aggregation in aggregations.items():
        if isinstance(aggregation, list):
            for subaggregation in aggregation:
                aggregation_list.append(
                    getattr(F, subaggregation)(column_name).alias(f"{subaggregation}_{column_name}")
                )
        else:
            aggregation_list.append(getattr(F, aggregation)(column_name).alias(f"{aggregation}_{column_name}"))

    df = context.data.groupBy(grouping_columns).agg(*aggregation_list)

    return context.from_existing(data=df)

TransformHashColumnsAction

Bases: PipelineAction

Hashes specified columns in a DataFrame using a chosen algorithm.

Given the following hash_config:

Example
Hash Columns:
    action: TRANSFORM_HASH_COLUMNS
    options:
        hash_config:
            hashed_column1:
                columns: ["column1", "column2"]
                algorithm: "sha2"
                bits: 224
            hashed_column2:
                columns: ["column1"]
                algorithm: "crc32"

Given a DataFrame df with the following structure:

column1 column2 column3
foo bar baz

After running the action, the resulting DataFrame will look like:

column1 column2 column3 hashed_column1 hashed_column2
foo bar baz 17725b837e9c896e7123b142eb980131dcc0baa6160db45d4adfdb21 1670361220

Hash values might vary

The actual hash values will depend on the hashing algorithm used and the input data.

Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
class TransformHashColumnsAction(PipelineAction):
    """Hashes specified columns in a DataFrame using a chosen algorithm.

    Given the following `hash_config`:

    Example:
        ```yaml
        Hash Columns:
            action: TRANSFORM_HASH_COLUMNS
            options:
                hash_config:
                    hashed_column1:
                        columns: ["column1", "column2"]
                        algorithm: "sha2"
                        bits: 224
                    hashed_column2:
                        columns: ["column1"]
                        algorithm: "crc32"
        ```

    Given a DataFrame `df` with the following structure:

    | column1 | column2 | column3 |
    |---------|---------|---------|
    |   foo   |   bar   |   baz   |

    After running the action, the resulting DataFrame will look like:

    | column1 | column2 | column3 |                 hashed_column1                            | hashed_column2 |
    |---------|---------|---------|-----------------------------------------------------------|----------------|
    |   foo   |   bar   |   baz   |  17725b837e9c896e7123b142eb980131dcc0baa6160db45d4adfdb21 |  1670361220    |


    !!! note "Hash values might vary"
        The actual hash values will depend on the hashing algorithm used and the input data.
    """

    name: str = "TRANSFORM_HASH_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        hash_config: HashConfig | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Hashes the specified columns in the DataFrame.

        Args:
            context: Context in which this Action is executed.
            hash_config: Dictionary that contains the configuration for executing the hashing.

        Returns:
            Updated PipelineContext with hashed columns.

        Raises:
            ValueError: If columns are missing, data is None, or algorithm/bits are invalid.
            ValueError: If the hash configuration is invalid.
        """
        if context.data is None:
            raise ValueError("Context data is required for hashing.")

        if not hash_config:
            raise ValueError("Hash configuration is required.")

        df = context.data

        hash_functions = {
            "hash": lambda cols: F.hash(*[F.col(c) for c in cols]).cast("string"),
            "xxhash64": lambda cols: F.xxhash64(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
            "md5": lambda cols: F.md5(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
            "sha1": lambda cols: F.sha1(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
            "sha2": lambda cols, bits: F.sha2(F.concat_ws("||", *[F.col(c) for c in cols]), bits).cast("string"),
            "crc32": lambda cols: F.crc32(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        }
        default_sha2_bits = 256

        config_obj = HashConfig.model_validate({"hash_config": hash_config})
        for new_col, config in config_obj.hash_config.items():
            hash_func = hash_functions[config.algorithm]
            if config.algorithm == "sha2":
                df = df.withColumn(new_col, hash_func(config.columns, config.bits or default_sha2_bits))  # type: ignore
            else:
                df = df.withColumn(new_col, hash_func(config.columns))  # type: ignore

        return context.from_existing(data=df)

run(context, *, hash_config=None, **_)

Hashes the specified columns in the DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
hash_config HashConfig | None

Dictionary that contains the configuration for executing the hashing.

None

Returns:

Type Description
PipelineContext

Updated PipelineContext with hashed columns.

Raises:

Type Description
ValueError

If columns are missing, data is None, or algorithm/bits are invalid.

ValueError

If the hash configuration is invalid.

Source code in src/cloe_nessy/pipeline/actions/transform_hash_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    hash_config: HashConfig | None = None,
    **_: Any,
) -> PipelineContext:
    """Hashes the specified columns in the DataFrame.

    Args:
        context: Context in which this Action is executed.
        hash_config: Dictionary that contains the configuration for executing the hashing.

    Returns:
        Updated PipelineContext with hashed columns.

    Raises:
        ValueError: If columns are missing, data is None, or algorithm/bits are invalid.
        ValueError: If the hash configuration is invalid.
    """
    if context.data is None:
        raise ValueError("Context data is required for hashing.")

    if not hash_config:
        raise ValueError("Hash configuration is required.")

    df = context.data

    hash_functions = {
        "hash": lambda cols: F.hash(*[F.col(c) for c in cols]).cast("string"),
        "xxhash64": lambda cols: F.xxhash64(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        "md5": lambda cols: F.md5(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        "sha1": lambda cols: F.sha1(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
        "sha2": lambda cols, bits: F.sha2(F.concat_ws("||", *[F.col(c) for c in cols]), bits).cast("string"),
        "crc32": lambda cols: F.crc32(F.concat_ws("||", *[F.col(c) for c in cols])).cast("string"),
    }
    default_sha2_bits = 256

    config_obj = HashConfig.model_validate({"hash_config": hash_config})
    for new_col, config in config_obj.hash_config.items():
        hash_func = hash_functions[config.algorithm]
        if config.algorithm == "sha2":
            df = df.withColumn(new_col, hash_func(config.columns, config.bits or default_sha2_bits))  # type: ignore
        else:
            df = df.withColumn(new_col, hash_func(config.columns))  # type: ignore

    return context.from_existing(data=df)

TransformJoinAction

Bases: PipelineAction

Joins the current DataFrame with another DataFrame defined in joined_data.

The join operation is performed based on specified columns and the type of join indicated by the how parameter. Supported join types can be taken from PySpark documentation

Examples:

Join Tables:
    action: TRANSFORM_JOIN
    options:
        joined_data: ((step:Transform First Table))
        join_on: id
        how: inner
Join Tables:
    action: TRANSFORM_JOIN
    options:
        joined_data: ((step:Transform First Table))
        join_on: [customer_id, order_date]
        how: left
Join Tables:
    action: TRANSFORM_JOIN
    options:
        joined_data: ((step:Transform First Table))
        join_on:
            customer_id: cust_id
            order_date: date
        how: inner
Join Tables:
    action: TRANSFORM_JOIN
    options:
        joined_data: ((step:Load Conditions Table))
        join_condition: |
            left.material = right.material
            AND right.sales_org = '10'
            AND right.distr_chan = '10'
            AND right.knart = 'ZUVP'
            AND right.lovmkond <> 'X'
            AND right.sales_unit = 'ST'
            AND left.calday BETWEEN
                to_date(right.date_from, 'yyyyMMdd') AND
                to_date(right.date_to, 'yyyyMMdd')
        how: left

Referencing a DataFrame from another step

The joined_data parameter is a reference to the DataFrame from another step. The DataFrame is accessed using the result attribute of the PipelineStep. The syntax for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.

Dictionary Join Syntax

When using a dictionary for join_on, the keys represent columns from the DataFrame in context and the values represent columns from the DataFrame in joined_data. This is useful when joining tables with different column names for the same logical entity.

Complex Join Conditions

Use join_condition instead of join_on for complex joins with literals, expressions, and multiple conditions. Reference columns using left.column_name for the main DataFrame and right.column_name for the joined DataFrame. Supports all PySpark functions and operators.

Source code in src/cloe_nessy/pipeline/actions/transform_join.py
class TransformJoinAction(PipelineAction):
    """Joins the current DataFrame with another DataFrame defined in joined_data.

    The join operation is performed based on specified columns and the type of
    join indicated by the `how` parameter. Supported join types can be taken
    from [PySpark
    documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html)

    Examples:
        === "Simple Column Join"
            ```yaml
            Join Tables:
                action: TRANSFORM_JOIN
                options:
                    joined_data: ((step:Transform First Table))
                    join_on: id
                    how: inner
            ```

        === "Multiple Columns Join"
            ```yaml
            Join Tables:
                action: TRANSFORM_JOIN
                options:
                    joined_data: ((step:Transform First Table))
                    join_on: [customer_id, order_date]
                    how: left
            ```

        === "Dictionary Join (Different Column Names)"
            ```yaml
            Join Tables:
                action: TRANSFORM_JOIN
                options:
                    joined_data: ((step:Transform First Table))
                    join_on:
                        customer_id: cust_id
                        order_date: date
                    how: inner
            ```

        === "Complex Join with Literals and Expressions"
            ```yaml
            Join Tables:
                action: TRANSFORM_JOIN
                options:
                    joined_data: ((step:Load Conditions Table))
                    join_condition: |
                        left.material = right.material
                        AND right.sales_org = '10'
                        AND right.distr_chan = '10'
                        AND right.knart = 'ZUVP'
                        AND right.lovmkond <> 'X'
                        AND right.sales_unit = 'ST'
                        AND left.calday BETWEEN
                            to_date(right.date_from, 'yyyyMMdd') AND
                            to_date(right.date_to, 'yyyyMMdd')
                    how: left
            ```

        !!! note "Referencing a DataFrame from another step"
            The `joined_data` parameter is a reference to the DataFrame from another step.
            The DataFrame is accessed using the `result` attribute of the PipelineStep. The syntax
            for referencing the DataFrame is `((step:Step Name))`, mind the double parentheses.

        !!! tip "Dictionary Join Syntax"
            When using a dictionary for `join_on`, the keys represent columns
            from the DataFrame in context and the values represent columns from
            the DataFrame in `joined_data`. This is useful when joining tables
            with different column names for the same logical entity.

        !!! tip "Complex Join Conditions"
            Use `join_condition` instead of `join_on` for complex joins with literals,
            expressions, and multiple conditions. Reference columns using `left.column_name`
            for the main DataFrame and `right.column_name` for the joined DataFrame.
            Supports all PySpark functions and operators.
    """

    name: str = "TRANSFORM_JOIN"

    def run(
        self,
        context: PipelineContext,
        *,
        joined_data: PipelineStep | None = None,
        join_on: list[str] | str | dict[str, str] | None = None,
        join_condition: str | None = None,
        how: str = "inner",
        **_: Any,
    ) -> PipelineContext:
        """Joins the current DataFrame with another DataFrame defined in joined_data.

        Args:
            context: Context in which this Action is executed.
            joined_data: The PipelineStep context defining the DataFrame
                to join with as the right side of the join.
            join_on: A string for the join column
                name, a list of column names, or a dictionary mapping columns from the
                left DataFrame to the right DataFrame. This defines the condition for the
                join operation. Mutually exclusive with join_condition.
            join_condition: A string containing a complex join expression with literals,
                functions, and multiple conditions. Use 'left.' and 'right.' prefixes
                to reference columns from respective DataFrames. Mutually exclusive with join_on.
            how: The type of join to perform. Must be one of: inner, cross, outer,
                full, fullouter, left, leftouter, right, rightouter, semi, anti, etc.

        Raises:
            ValueError: If no joined_data is provided.
            ValueError: If neither join_on nor join_condition is provided.
            ValueError: If both join_on and join_condition are provided.
            ValueError: If the data from context is None.
            ValueError: If the data from the joined_data is None.

        Returns:
            Context after the execution of this Action, containing the result of the join operation.
        """
        if joined_data is None or joined_data.result is None or joined_data.result.data is None:
            raise ValueError("No joined_data provided.")

        if not join_on and not join_condition:
            raise ValueError("Either join_on or join_condition must be provided.")

        if join_on and join_condition:
            raise ValueError("Cannot specify both join_on and join_condition. Use one or the other.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df_right = joined_data.result.data.alias("right")  # type: ignore
        df_left = context.data.alias("left")  # type: ignore

        if join_condition:
            try:
                condition = F.expr(join_condition)
            except Exception as e:
                # this will not raise an error in most cases, because the evaluation of the expression is lazy
                raise ValueError(f"Failed to parse join condition '{join_condition}': {str(e)}") from e
            df = df_left.join(df_right, on=condition, how=how)  # type: ignore

        if join_on:
            if isinstance(join_on, str):
                join_condition_list = [join_on]
            elif isinstance(join_on, list):
                join_condition_list = join_on
            else:
                join_condition_list = [
                    df_left[left_column] == df_right[right_column]  # type: ignore
                    for left_column, right_column in join_on.items()
                ]

            df = df_left.join(df_right, on=join_condition_list, how=how)  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, joined_data=None, join_on=None, join_condition=None, how='inner', **_)

Joins the current DataFrame with another DataFrame defined in joined_data.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
joined_data PipelineStep | None

The PipelineStep context defining the DataFrame to join with as the right side of the join.

None
join_on list[str] | str | dict[str, str] | None

A string for the join column name, a list of column names, or a dictionary mapping columns from the left DataFrame to the right DataFrame. This defines the condition for the join operation. Mutually exclusive with join_condition.

None
join_condition str | None

A string containing a complex join expression with literals, functions, and multiple conditions. Use 'left.' and 'right.' prefixes to reference columns from respective DataFrames. Mutually exclusive with join_on.

None
how str

The type of join to perform. Must be one of: inner, cross, outer, full, fullouter, left, leftouter, right, rightouter, semi, anti, etc.

'inner'

Raises:

Type Description
ValueError

If no joined_data is provided.

ValueError

If neither join_on nor join_condition is provided.

ValueError

If both join_on and join_condition are provided.

ValueError

If the data from context is None.

ValueError

If the data from the joined_data is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action, containing the result of the join operation.

Source code in src/cloe_nessy/pipeline/actions/transform_join.py
def run(
    self,
    context: PipelineContext,
    *,
    joined_data: PipelineStep | None = None,
    join_on: list[str] | str | dict[str, str] | None = None,
    join_condition: str | None = None,
    how: str = "inner",
    **_: Any,
) -> PipelineContext:
    """Joins the current DataFrame with another DataFrame defined in joined_data.

    Args:
        context: Context in which this Action is executed.
        joined_data: The PipelineStep context defining the DataFrame
            to join with as the right side of the join.
        join_on: A string for the join column
            name, a list of column names, or a dictionary mapping columns from the
            left DataFrame to the right DataFrame. This defines the condition for the
            join operation. Mutually exclusive with join_condition.
        join_condition: A string containing a complex join expression with literals,
            functions, and multiple conditions. Use 'left.' and 'right.' prefixes
            to reference columns from respective DataFrames. Mutually exclusive with join_on.
        how: The type of join to perform. Must be one of: inner, cross, outer,
            full, fullouter, left, leftouter, right, rightouter, semi, anti, etc.

    Raises:
        ValueError: If no joined_data is provided.
        ValueError: If neither join_on nor join_condition is provided.
        ValueError: If both join_on and join_condition are provided.
        ValueError: If the data from context is None.
        ValueError: If the data from the joined_data is None.

    Returns:
        Context after the execution of this Action, containing the result of the join operation.
    """
    if joined_data is None or joined_data.result is None or joined_data.result.data is None:
        raise ValueError("No joined_data provided.")

    if not join_on and not join_condition:
        raise ValueError("Either join_on or join_condition must be provided.")

    if join_on and join_condition:
        raise ValueError("Cannot specify both join_on and join_condition. Use one or the other.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df_right = joined_data.result.data.alias("right")  # type: ignore
    df_left = context.data.alias("left")  # type: ignore

    if join_condition:
        try:
            condition = F.expr(join_condition)
        except Exception as e:
            # this will not raise an error in most cases, because the evaluation of the expression is lazy
            raise ValueError(f"Failed to parse join condition '{join_condition}': {str(e)}") from e
        df = df_left.join(df_right, on=condition, how=how)  # type: ignore

    if join_on:
        if isinstance(join_on, str):
            join_condition_list = [join_on]
        elif isinstance(join_on, list):
            join_condition_list = join_on
        else:
            join_condition_list = [
                df_left[left_column] == df_right[right_column]  # type: ignore
                for left_column, right_column in join_on.items()
            ]

        df = df_left.join(df_right, on=join_condition_list, how=how)  # type: ignore

    return context.from_existing(data=df)  # type: ignore

TransformJsonNormalize

Bases: PipelineAction

Normalizes and flattens the DataFrame by exploding array columns and flattening struct columns.

The method performs recursive normalization on the DataFrame present in the context, ensuring that the order of columns is retained and new columns created by flattening structs are appended after existing columns.

Example

Normalize Tables:
    action: TRANSFORM_JSON_NORMALIZE
    options:
        exclude_columns: coordinates
Example Input Data:

id name coordinates attributes
1 Alice [10.0, 20.0] {"age": 30, "city": "NY"}
2 Bob [30.0, 40.0] {"age": 25, "city": "LA"}

Example Output Data:

id name coordinates attributes_age attributes_city
1 Alice [10.0, 20.0] 30 NY
2 Bob [30.0, 40.0] 25 LA
Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
class TransformJsonNormalize(PipelineAction):
    """Normalizes and flattens the DataFrame by exploding array columns and flattening struct columns.

    The method performs recursive normalization on the DataFrame present in the context,
    ensuring that the order of columns is retained and new columns created by flattening
    structs are appended after existing columns.

    Example:
        ```yaml
        Normalize Tables:
            action: TRANSFORM_JSON_NORMALIZE
            options:
                exclude_columns: coordinates
        ```
        Example Input Data:

        | id | name   | coordinates          | attributes                |
        |----|--------|----------------------|---------------------------|
        | 1  | Alice  | [10.0, 20.0]         | {"age": 30, "city": "NY"} |
        | 2  | Bob    | [30.0, 40.0]         | {"age": 25, "city": "LA"} |

        Example Output Data:

        | id | name   | coordinates | attributes_age | attributes_city |
        |----|--------|-------------|----------------|-----------------|
        | 1  | Alice  | [10.0, 20.0]| 30             | NY              |
        | 2  | Bob    | [30.0, 40.0]| 25             | LA              |
    """

    name: str = "TRANSFORM_JSON_NORMALIZE"

    def run(
        self,
        context: PipelineContext,
        *,
        exclude_columns: list[str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Executes the normalization process on the DataFrame present in the context.

        Please note that columns retain their relative order during the
        normalization process, and new columns created by flattening structs are
        appended after the existing columns.

        Args:
            context: The pipeline context that contains the DataFrame to be normalized.
            exclude_columns: A list of column names to exclude from the normalization process.
                    These columns will not be exploded or flattened.
            **_: Additional keyword arguments (not used).

        Returns:
            A new pipeline context with the normalized DataFrame.

        Raises:
            ValueError: If the DataFrame in the context is `None`.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        if not exclude_columns:
            exclude_columns = []
        df = TransformJsonNormalize._normalize(context.data, exclude_columns=cast(list, exclude_columns))
        return context.from_existing(data=df)

    @staticmethod
    def _normalize(df, exclude_columns):
        """Recursively normalizes the given DataFrame by exploding arrays and flattening structs.

        This method performs two primary operations:
        1. Explodes any array columns, unless they are in the list of excluded columns.
        2. Flattens any struct columns, renaming nested fields and appending them to the top-level DataFrame.

        The method continues these operations in a loop until there are no array or struct columns left.

        Args:
            df: The input DataFrame to normalize.
            exclude_columns: A list of column names to exclude from the normalization process. These columns
                                         will not be exploded or flattened.

        Returns:
            pyspark.sql.DataFrame: The normalized DataFrame with no array or struct columns.
        """

        def explode_arrays(df, exclude_columns):
            array_present = False
            for col in df.columns:
                if df.schema[col].dataType.typeName() == "array" and col not in exclude_columns:
                    df = df.withColumn(col, F.explode(col))
                    array_present = True
            return df, array_present

        def flatten_structs(df):
            struct_present = False
            struct_columns = [col for col in df.columns if df.schema[col].dataType.typeName() == "struct"]
            for col in struct_columns:
                df = df.select(F.col("*"), F.col(col + ".*"))
                nested_columns = df.select(F.col(col + ".*")).schema.names
                for nested_col in nested_columns:
                    df = df.withColumnRenamed(nested_col, f"{col}_{nested_col}")
                df = df.drop(col)
                struct_present = True
            return df, struct_present

        array_present = True
        struct_present = True

        while array_present or struct_present:
            df, array_present = explode_arrays(df, exclude_columns)
            df, struct_present = flatten_structs(df)

        return df

run(context, *, exclude_columns=None, **_)

Executes the normalization process on the DataFrame present in the context.

Please note that columns retain their relative order during the normalization process, and new columns created by flattening structs are appended after the existing columns.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline context that contains the DataFrame to be normalized.

required
exclude_columns list[str] | None

A list of column names to exclude from the normalization process. These columns will not be exploded or flattened.

None
**_ Any

Additional keyword arguments (not used).

{}

Returns:

Type Description
PipelineContext

A new pipeline context with the normalized DataFrame.

Raises:

Type Description
ValueError

If the DataFrame in the context is None.

Source code in src/cloe_nessy/pipeline/actions/transform_json_normalize.py
def run(
    self,
    context: PipelineContext,
    *,
    exclude_columns: list[str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Executes the normalization process on the DataFrame present in the context.

    Please note that columns retain their relative order during the
    normalization process, and new columns created by flattening structs are
    appended after the existing columns.

    Args:
        context: The pipeline context that contains the DataFrame to be normalized.
        exclude_columns: A list of column names to exclude from the normalization process.
                These columns will not be exploded or flattened.
        **_: Additional keyword arguments (not used).

    Returns:
        A new pipeline context with the normalized DataFrame.

    Raises:
        ValueError: If the DataFrame in the context is `None`.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    if not exclude_columns:
        exclude_columns = []
    df = TransformJsonNormalize._normalize(context.data, exclude_columns=cast(list, exclude_columns))
    return context.from_existing(data=df)

TransformRegexExtract

Bases: PipelineAction

Extract values from a specified column in a DataFrame using regex patterns.

This action extracts values from a column based on a regex pattern and stores the result in a new column. Optionally, you can replace the matched pattern in the original column with a different string, remove the original column, or add a boolean column indicating which rows matched the pattern.

Example
Extract Action:
    action: TRANSFORM_REGEX_EXTRACT
    options:
        source_column_name: Email
        extract_column_name: org_domain
        pattern: (?<=@)([A-Za-z0-9-]+)
        replace_by: exampledomain.org

This action also supports processing multiple columns simultaneously. To use this functionality, structure the configuration as a dictionary mapping each source column name to its extraction parameters.

Example
Extract Action:
    action: TRANSFORM_REGEX_EXTRACT
    options:
        extract_columns:
            Name:
                pattern: (?<=\w+) (\w+)
                replace_by: ''
                extract_column_name: last_name
                match_info_column_name: has_last_name
            Email:
                pattern: @\w+\.\w+
                extract_column_name: domain
                keep_original_column: False
Source code in src/cloe_nessy/pipeline/actions/transform_regex_extract.py
class TransformRegexExtract(PipelineAction):
    r"""Extract values from a specified column in a DataFrame using regex patterns.

    This action extracts values from a column based on a regex pattern and stores
    the result in a new column. Optionally, you can replace the matched pattern in
    the original column with a different string, remove the original column, or add
    a boolean column indicating which rows matched the pattern.

    Example:
        ```yaml
        Extract Action:
            action: TRANSFORM_REGEX_EXTRACT
            options:
                source_column_name: Email
                extract_column_name: org_domain
                pattern: (?<=@)([A-Za-z0-9-]+)
                replace_by: exampledomain.org
        ```

    This action also supports processing multiple columns simultaneously. To use this
    functionality, structure the configuration as a dictionary mapping each source
    column name to its extraction parameters.

    Example:
        ```yaml
        Extract Action:
            action: TRANSFORM_REGEX_EXTRACT
            options:
                extract_columns:
                    Name:
                        pattern: (?<=\w+) (\w+)
                        replace_by: ''
                        extract_column_name: last_name
                        match_info_column_name: has_last_name
                    Email:
                        pattern: @\w+\.\w+
                        extract_column_name: domain
                        keep_original_column: False
        ```

    """

    name: str = "TRANSFORM_REGEX_EXTRACT"

    def run(
        self,
        context: PipelineContext,
        source_column_name: str = "",
        extract_column_name: str = "",
        pattern: str = "",
        keep_original_column: bool = True,
        replace_by: str = "",
        match_info_column_name: str = "",
        extract_columns: dict | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Performs a regex extract (and replace) on a specified column in a DataFrame.

        This function performs a regex extract (and optionally a replace) on one or more columns.

        Args:
            context: The context in which this action is executed.
            source_column_name: Column name to perform the regex replace on.
            pattern: Regex pattern to match.
            replace_by: String that should replace the extracted pattern in the source column.
            extract_column_name: Column name to store the extract, default: <source_column_name>_extract
            keep_original_column: Whether to keep the original column, default: True
            match_info_column_name: Column name to store a boolean column whether a match was found, default: None
            extract_columns: Dictionary of column names and their corresponding 1-column-case.

        Raises:
            ValueError: If any of the required arguments are not provided.
            ValueError: If the regex pattern is invalid.

        Returns:
            PipelineContext: Transformed context with the modified DataFrame.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")
        if not extract_columns and not source_column_name:
            raise ValueError("Either extract_columns or source_column_name must be provided.")

        df = context.data

        if source_column_name:
            self._console_logger.info(f"Extracting from column '{source_column_name}' using pattern: {pattern}")
            df = self._process_one_column(
                df,
                source_column_name,
                pattern,
                extract_column_name,
                replace_by,
                keep_original_column,
                match_info_column_name,
            )

        elif isinstance(extract_columns, dict):
            self._console_logger.info(f"Extracting from {len(extract_columns)} columns")
            for one_source_column_name in extract_columns:
                parameter_dict = self._get_default_dict() | extract_columns[one_source_column_name]
                df = self._process_one_column(df, one_source_column_name, **parameter_dict)

        else:
            raise ValueError("extract_columns must be a dictionary. See documentation for proper format.")

        return context.from_existing(data=df)

    def _process_one_column(
        self,
        df,
        source_column_name,
        pattern,
        extract_column_name,
        replace_by,
        keep_original_column,
        match_info_column_name,
    ):
        # Extract the first captured group (group 0 is the entire match)
        matched_group_id = 0

        if not extract_column_name:
            extract_column_name = f"{source_column_name}_extracted"

        if not pattern:
            raise ValueError(f"The regex pattern (pattern) for column {source_column_name} must be provided.")

        # Validate regex pattern
        try:
            re.compile(pattern)
        except re.error as e:
            raise ValueError(f"Invalid regex pattern '{pattern}' for column {source_column_name}: {e}") from e

        df = df.withColumn(extract_column_name, F.regexp_extract(source_column_name, pattern, matched_group_id))

        if replace_by:
            df = df.withColumn(source_column_name, F.regexp_replace(source_column_name, pattern, replace_by))

        if match_info_column_name:
            # Check if extraction is null or empty string
            df = df.withColumn(
                match_info_column_name,
                F.when((F.col(extract_column_name).isNull()) | (F.col(extract_column_name) == ""), False).otherwise(  # type: ignore[misc]
                    True
                ),
            )

        if not keep_original_column:
            df = df.drop(source_column_name)

        return df

    def _get_default_dict(self) -> dict[str, Any]:
        """Return default parameters for single column extraction."""
        return {
            "pattern": "",
            "extract_column_name": "",
            "replace_by": "",
            "keep_original_column": True,
            "match_info_column_name": "",
        }

run(context, source_column_name='', extract_column_name='', pattern='', keep_original_column=True, replace_by='', match_info_column_name='', extract_columns=None, **_)

Performs a regex extract (and replace) on a specified column in a DataFrame.

This function performs a regex extract (and optionally a replace) on one or more columns.

Parameters:

Name Type Description Default
context PipelineContext

The context in which this action is executed.

required
source_column_name str

Column name to perform the regex replace on.

''
pattern str

Regex pattern to match.

''
replace_by str

String that should replace the extracted pattern in the source column.

''
extract_column_name str

Column name to store the extract, default: _extract

''
keep_original_column bool

Whether to keep the original column, default: True

True
match_info_column_name str

Column name to store a boolean column whether a match was found, default: None

''
extract_columns dict | None

Dictionary of column names and their corresponding 1-column-case.

None

Raises:

Type Description
ValueError

If any of the required arguments are not provided.

ValueError

If the regex pattern is invalid.

Returns:

Name Type Description
PipelineContext PipelineContext

Transformed context with the modified DataFrame.

Source code in src/cloe_nessy/pipeline/actions/transform_regex_extract.py
def run(
    self,
    context: PipelineContext,
    source_column_name: str = "",
    extract_column_name: str = "",
    pattern: str = "",
    keep_original_column: bool = True,
    replace_by: str = "",
    match_info_column_name: str = "",
    extract_columns: dict | None = None,
    **_: Any,
) -> PipelineContext:
    """Performs a regex extract (and replace) on a specified column in a DataFrame.

    This function performs a regex extract (and optionally a replace) on one or more columns.

    Args:
        context: The context in which this action is executed.
        source_column_name: Column name to perform the regex replace on.
        pattern: Regex pattern to match.
        replace_by: String that should replace the extracted pattern in the source column.
        extract_column_name: Column name to store the extract, default: <source_column_name>_extract
        keep_original_column: Whether to keep the original column, default: True
        match_info_column_name: Column name to store a boolean column whether a match was found, default: None
        extract_columns: Dictionary of column names and their corresponding 1-column-case.

    Raises:
        ValueError: If any of the required arguments are not provided.
        ValueError: If the regex pattern is invalid.

    Returns:
        PipelineContext: Transformed context with the modified DataFrame.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")
    if not extract_columns and not source_column_name:
        raise ValueError("Either extract_columns or source_column_name must be provided.")

    df = context.data

    if source_column_name:
        self._console_logger.info(f"Extracting from column '{source_column_name}' using pattern: {pattern}")
        df = self._process_one_column(
            df,
            source_column_name,
            pattern,
            extract_column_name,
            replace_by,
            keep_original_column,
            match_info_column_name,
        )

    elif isinstance(extract_columns, dict):
        self._console_logger.info(f"Extracting from {len(extract_columns)} columns")
        for one_source_column_name in extract_columns:
            parameter_dict = self._get_default_dict() | extract_columns[one_source_column_name]
            df = self._process_one_column(df, one_source_column_name, **parameter_dict)

    else:
        raise ValueError("extract_columns must be a dictionary. See documentation for proper format.")

    return context.from_existing(data=df)

TransformRenameColumnsAction

Bases: PipelineAction

Renames the specified columns in the DataFrame.

This method updates the DataFrame in the provided context by renaming columns according to the mapping defined in the columns dictionary, where each key represents an old column name and its corresponding value represents the new column name.

Example
Rename Column:
    action: TRANSFORM_RENAME_COLUMNS
    options:
        columns:
            a_very_long_column_name: shortname
Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
class TransformRenameColumnsAction(PipelineAction):
    """Renames the specified columns in the DataFrame.

    This method updates the DataFrame in the provided context by renaming columns according
    to the mapping defined in the `columns` dictionary, where each key represents an old column
    name and its corresponding value represents the new column name.

    Example:
        ```yaml
        Rename Column:
            action: TRANSFORM_RENAME_COLUMNS
            options:
                columns:
                    a_very_long_column_name: shortname
        ```
    """

    name: str = "TRANSFORM_RENAME_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        columns: dict[str, str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Renames the specified columns in the DataFrame.

        Args:
            context: Context in which this Action is executed.
            columns: A dictionary where the key is the old column name
                and the value is the new column name.

        Raises:
            ValueError: If no columns are provided.
            ValueError: If the data from context is None.

        Returns:
            Context after the execution of this Action.
        """
        if not columns:
            raise ValueError("No columns provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data

        if isinstance(columns, dict):
            df = df.withColumnsRenamed(columns)
        else:
            raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

        return context.from_existing(data=df)  # type: ignore

run(context, *, columns=None, **_)

Renames the specified columns in the DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
columns dict[str, str] | None

A dictionary where the key is the old column name and the value is the new column name.

None

Raises:

Type Description
ValueError

If no columns are provided.

ValueError

If the data from context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_rename_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    columns: dict[str, str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Renames the specified columns in the DataFrame.

    Args:
        context: Context in which this Action is executed.
        columns: A dictionary where the key is the old column name
            and the value is the new column name.

    Raises:
        ValueError: If no columns are provided.
        ValueError: If the data from context is None.

    Returns:
        Context after the execution of this Action.
    """
    if not columns:
        raise ValueError("No columns provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data

    if isinstance(columns, dict):
        df = df.withColumnsRenamed(columns)
    else:
        raise ValueError("'columns' should be a dict, like {'old_name_1':'new_name_1', 'old_name_2':'new_name_2'}")

    return context.from_existing(data=df)  # type: ignore

TransformReplaceValuesAction

Bases: PipelineAction

Replaces specified values in the given DataFrame.

This method iterates over the specified replace dictionary, where each key is a column name and each value is another dictionary containing old values as keys and new values as the corresponding values. The method updates the DataFrame by replacing occurrences of the old values with the new ones in the specified columns.

Example
Replace Values:
    action: TRANSFORM_REPLACE_VALUES
    options:
        replace:
            empl_function:
                sales_employee: seller
Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
class TransformReplaceValuesAction(PipelineAction):
    """Replaces specified values in the given DataFrame.

    This method iterates over the specified `replace` dictionary, where each key is a column name
    and each value is another dictionary containing old values as keys and new values as the corresponding
    values. The method updates the DataFrame by replacing occurrences of the old values with the new ones
    in the specified columns.

    Example:
        ```yaml
        Replace Values:
            action: TRANSFORM_REPLACE_VALUES
            options:
                replace:
                    empl_function:
                        sales_employee: seller
        ```
    """

    name: str = "TRANSFORM_REPLACE_VALUES"

    def run(
        self,
        context: PipelineContext,
        *,
        replace: dict[str, dict[str, str]] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Replaces specified values in the given DataFrame.

        Args:
            context: Context in which this Action is executed.
            replace: A dictionary where each key is the column name
                and the corresponding value is another dictionary mapping old values to new values.

        Raises:
            ValueError: If no replace values are provided.
            ValueError: If the data from context is None.

        Returns:
            Context after the execution of this Action.
        """
        if not replace:
            raise ValueError("No replace values provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data
        for column, to_replace in replace.items():
            df = df.replace(to_replace=to_replace, subset=[column])  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, replace=None, **_)

Replaces specified values in the given DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
replace dict[str, dict[str, str]] | None

A dictionary where each key is the column name and the corresponding value is another dictionary mapping old values to new values.

None

Raises:

Type Description
ValueError

If no replace values are provided.

ValueError

If the data from context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_replace_values.py
def run(
    self,
    context: PipelineContext,
    *,
    replace: dict[str, dict[str, str]] | None = None,
    **_: Any,
) -> PipelineContext:
    """Replaces specified values in the given DataFrame.

    Args:
        context: Context in which this Action is executed.
        replace: A dictionary where each key is the column name
            and the corresponding value is another dictionary mapping old values to new values.

    Raises:
        ValueError: If no replace values are provided.
        ValueError: If the data from context is None.

    Returns:
        Context after the execution of this Action.
    """
    if not replace:
        raise ValueError("No replace values provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data
    for column, to_replace in replace.items():
        df = df.replace(to_replace=to_replace, subset=[column])  # type: ignore

    return context.from_existing(data=df)  # type: ignore

TransformSelectColumnsAction

Bases: PipelineAction

Selects specified columns from the given DataFrame.

This method allows you to include or exclude specific columns from the DataFrame. If include_columns is provided, only those columns will be selected. If exclude_columns is provided, all columns except those will be selected. The method ensures that the specified columns exist in the DataFrame before performing the selection.

Example

Example Input Data:

id name coordinates attributes
1 Alice [10.0, 20.0] {"age": 30, "city": "NY"}
2 Bob [30.0, 40.0] {"age": 25, "city": "LA"}

Select Columns:
    action: TRANSFORM_SELECT_COLUMNS
    options:
        include_columns:
            - id
            - name
            - coordinates
Example Output Data:

id name coordinates
1 Alice [10.0, 20.0]
2 Bob [30.0, 40.0]

Select Columns:
    action: TRANSFORM_SELECT_COLUMNS
    options:
        exclude_columns:
            - coordinates
Example Output Data:

id name attributes
1 Alice {"age": 30, "city": "NY"}
2 Bob {"age": 25, "city": "LA"}
Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
class TransformSelectColumnsAction(PipelineAction):
    """Selects specified columns from the given DataFrame.

    This method allows you to include or exclude specific columns from the
    DataFrame. If `include_columns` is provided, only those columns will be
    selected. If `exclude_columns` is provided, all columns except those will be
    selected. The method ensures that the specified columns exist in the
    DataFrame before performing the selection.

    Example:
        Example Input Data:

        | id | name   | coordinates          | attributes                |
        |----|--------|----------------------|---------------------------|
        | 1  | Alice  | [10.0, 20.0]         | {"age": 30, "city": "NY"} |
        | 2  | Bob    | [30.0, 40.0]         | {"age": 25, "city": "LA"} |
        === "Include Columns"
            ```yaml
            Select Columns:
                action: TRANSFORM_SELECT_COLUMNS
                options:
                    include_columns:
                        - id
                        - name
                        - coordinates
            ```
            Example Output Data:

            | id | name   | coordinates          |
            |----|--------|----------------------|
            | 1  | Alice  | [10.0, 20.0]         |
            | 2  | Bob    | [30.0, 40.0]         |

        === "Exclude Columns"
            ```yaml
            Select Columns:
                action: TRANSFORM_SELECT_COLUMNS
                options:
                    exclude_columns:
                        - coordinates
            ```
            Example Output Data:

            | id | name   | attributes                |
            |----|--------|---------------------------|
            | 1  | Alice  | {"age": 30, "city": "NY"} |
            | 2  | Bob    | {"age": 25, "city": "LA"} |

    """

    name: str = "TRANSFORM_SELECT_COLUMNS"

    def run(
        self,
        context: PipelineContext,
        *,
        include_columns: list[str] | None = None,
        exclude_columns: list[str] | None = None,
        raise_on_non_existing_columns: bool = True,
        **_: Any,
    ) -> PipelineContext:
        """Selects specified columns from the given DataFrame.

        Args:
            context: Context in which this Action is executed.
            include_columns: A list of column names that should be included.
                If provided, only these columns will be selected.
            exclude_columns: A list of column names that should be excluded.
                If provided, all columns except these will be selected.
            raise_on_non_existing_columns: If True, raise an error if a specified
                column is not found in the DataFrame. If False, ignore the column
                and continue with the selection.

        Raises:
            ValueError: If a specified column is not found in the DataFrame.
            ValueError: If neither include_columns nor exclude_columns are provided,
                or if both are provided.

        Returns:
            Context after the execution of this Action.
        """
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        df = context.data

        if (not include_columns and not exclude_columns) or (include_columns and exclude_columns):
            raise ValueError("Please define either 'include_columns' or 'exclude_columns'.")

        def check_missing_columns(df, columns, raise_on_non_existing_columns):
            if raise_on_non_existing_columns:
                missing_columns = [col for col in columns if col not in df.columns]
                if missing_columns:
                    raise ValueError(f"Columns not found in DataFrame: {missing_columns}")

        try:
            if include_columns:
                check_missing_columns(df, include_columns, raise_on_non_existing_columns)
                df_selected = df.select(*include_columns)
            elif exclude_columns:
                check_missing_columns(df, exclude_columns, raise_on_non_existing_columns)
                df_selected = df.drop(*exclude_columns)
        except Exception as e:
            raise ValueError(f"Column selection error: {e}") from e

        return context.from_existing(data=df_selected)  # type: ignore

run(context, *, include_columns=None, exclude_columns=None, raise_on_non_existing_columns=True, **_)

Selects specified columns from the given DataFrame.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
include_columns list[str] | None

A list of column names that should be included. If provided, only these columns will be selected.

None
exclude_columns list[str] | None

A list of column names that should be excluded. If provided, all columns except these will be selected.

None
raise_on_non_existing_columns bool

If True, raise an error if a specified column is not found in the DataFrame. If False, ignore the column and continue with the selection.

True

Raises:

Type Description
ValueError

If a specified column is not found in the DataFrame.

ValueError

If neither include_columns nor exclude_columns are provided, or if both are provided.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_select_columns.py
def run(
    self,
    context: PipelineContext,
    *,
    include_columns: list[str] | None = None,
    exclude_columns: list[str] | None = None,
    raise_on_non_existing_columns: bool = True,
    **_: Any,
) -> PipelineContext:
    """Selects specified columns from the given DataFrame.

    Args:
        context: Context in which this Action is executed.
        include_columns: A list of column names that should be included.
            If provided, only these columns will be selected.
        exclude_columns: A list of column names that should be excluded.
            If provided, all columns except these will be selected.
        raise_on_non_existing_columns: If True, raise an error if a specified
            column is not found in the DataFrame. If False, ignore the column
            and continue with the selection.

    Raises:
        ValueError: If a specified column is not found in the DataFrame.
        ValueError: If neither include_columns nor exclude_columns are provided,
            or if both are provided.

    Returns:
        Context after the execution of this Action.
    """
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    df = context.data

    if (not include_columns and not exclude_columns) or (include_columns and exclude_columns):
        raise ValueError("Please define either 'include_columns' or 'exclude_columns'.")

    def check_missing_columns(df, columns, raise_on_non_existing_columns):
        if raise_on_non_existing_columns:
            missing_columns = [col for col in columns if col not in df.columns]
            if missing_columns:
                raise ValueError(f"Columns not found in DataFrame: {missing_columns}")

    try:
        if include_columns:
            check_missing_columns(df, include_columns, raise_on_non_existing_columns)
            df_selected = df.select(*include_columns)
        elif exclude_columns:
            check_missing_columns(df, exclude_columns, raise_on_non_existing_columns)
            df_selected = df.drop(*exclude_columns)
    except Exception as e:
        raise ValueError(f"Column selection error: {e}") from e

    return context.from_existing(data=df_selected)  # type: ignore

TransformSqlAction

Bases: PipelineAction

Executes a SQL statement on a DataFrame within the provided context.

A temporary view is created from the current DataFrame, and the SQL statement is executed on that view. The resulting DataFrame is returned.

Example
SQL Transform:
    action: TRANSFORM_SQL
    options:
        sql_statement: select city, revenue, firm from {DATA_FRAME} where product="Databricks"

Note

The SQL statement should reference the DataFrame as "{DATA_FRAME}". This nessy specific placeholder will be replaced with your input DataFrame from the context. If your pipeline is defined as an f-string, you can escape the curly braces by doubling them, e.g., "{{DATA_FRAME}}".

Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
class TransformSqlAction(PipelineAction):
    """Executes a SQL statement on a DataFrame within the provided context.

    A temporary view is created from the current DataFrame, and the SQL
    statement is executed on that view. The resulting DataFrame is returned.

    Example:
        ```yaml
        SQL Transform:
            action: TRANSFORM_SQL
            options:
                sql_statement: select city, revenue, firm from {DATA_FRAME} where product="Databricks"
        ```
        !!! note
            The SQL statement should reference the DataFrame as "{DATA_FRAME}".
            This nessy specific placeholder will be replaced with your input
            DataFrame from the context. If your pipeline is defined as an
            f-string, you can escape the curly braces by doubling them, e.g.,
            "{{DATA_FRAME}}".
    """

    name: str = "TRANSFORM_SQL"

    def run(
        self,
        context: PipelineContext,
        *,
        sql_statement: str = "",
        **kwargs: Any,
    ) -> PipelineContext:
        """Executes a SQL statement on a DataFrame within the provided context.

        Args:
            context: Context in which this Action is executed.
            sql_statement: A string containing the SQL statement to be
                executed. The source table should be referred to as "{DATA_FRAME}".
            **kwargs: Additional keyword arguments are passed as placeholders to the
                SQL statement.

        Raises:
            ValueError: If "{DATA_FRAME}" is not included in the SQL statement.
            ValueError: If no SQL statement is provided.
            ValueError: If the data from the context is None.

        Returns:
            Context after the execution of this Action, containing the DataFrame resulting from the SQL statement.
        """
        if not sql_statement:
            raise ValueError("No SQL statement provided.")

        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        _spark = SessionManager.get_spark_session()

        temp_view_name = str(uuid.uuid1()).replace("-", "_")
        context.data.createTempView(temp_view_name)

        if "FROM {DATA_FRAME}".casefold() not in sql_statement.casefold():
            raise ValueError("Please use 'FROM {DATA_FRAME}' in your SQL statement.")

        df = _spark.sql(sql_statement.format(DATA_FRAME=temp_view_name, **kwargs))

        return context.from_existing(data=df)

run(context, *, sql_statement='', **kwargs)

Executes a SQL statement on a DataFrame within the provided context.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
sql_statement str

A string containing the SQL statement to be executed. The source table should be referred to as "{DATA_FRAME}".

''
**kwargs Any

Additional keyword arguments are passed as placeholders to the SQL statement.

{}

Raises:

Type Description
ValueError

If "{DATA_FRAME}" is not included in the SQL statement.

ValueError

If no SQL statement is provided.

ValueError

If the data from the context is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action, containing the DataFrame resulting from the SQL statement.

Source code in src/cloe_nessy/pipeline/actions/transform_generic_sql.py
def run(
    self,
    context: PipelineContext,
    *,
    sql_statement: str = "",
    **kwargs: Any,
) -> PipelineContext:
    """Executes a SQL statement on a DataFrame within the provided context.

    Args:
        context: Context in which this Action is executed.
        sql_statement: A string containing the SQL statement to be
            executed. The source table should be referred to as "{DATA_FRAME}".
        **kwargs: Additional keyword arguments are passed as placeholders to the
            SQL statement.

    Raises:
        ValueError: If "{DATA_FRAME}" is not included in the SQL statement.
        ValueError: If no SQL statement is provided.
        ValueError: If the data from the context is None.

    Returns:
        Context after the execution of this Action, containing the DataFrame resulting from the SQL statement.
    """
    if not sql_statement:
        raise ValueError("No SQL statement provided.")

    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    _spark = SessionManager.get_spark_session()

    temp_view_name = str(uuid.uuid1()).replace("-", "_")
    context.data.createTempView(temp_view_name)

    if "FROM {DATA_FRAME}".casefold() not in sql_statement.casefold():
        raise ValueError("Please use 'FROM {DATA_FRAME}' in your SQL statement.")

    df = _spark.sql(sql_statement.format(DATA_FRAME=temp_view_name, **kwargs))

    return context.from_existing(data=df)

TransformUnionAction

Bases: PipelineAction

Unions multiple DataFrames together.

This method takes the current DataFrame from the context and unites it with additional DataFrames specified in the union_data argument. All DataFrames must have the same schema. If any DataFrame in union_data is None or empty, a ValueError will be raised.

Example
Union Tables:
    action: TRANSFORM_UNION
    options:
        union_data:
            - ((step:Filter First Table))
            - ((step:SQL Transform Second Table))

Referencing a DataFrame from another step

The union_data parameter is a reference to the DataFrame from another step. The DataFrame is accessed using the result attribute of the PipelineStep. The syntax for referencing the DataFrame is ((step:Step Name)), mind the double parentheses.

Source code in src/cloe_nessy/pipeline/actions/transform_union.py
class TransformUnionAction(PipelineAction):
    """Unions multiple DataFrames together.

    This method takes the current DataFrame from the context and unites it with
    additional DataFrames specified in the `union_data` argument. All DataFrames
    must have the same schema. If any DataFrame in `union_data` is None or
    empty, a ValueError will be raised.

    Example:
        ```yaml
        Union Tables:
            action: TRANSFORM_UNION
            options:
                union_data:
                    - ((step:Filter First Table))
                    - ((step:SQL Transform Second Table))
        ```
        !!! note "Referencing a DataFrame from another step"
            The `union_data` parameter is a reference to the DataFrame from another step.
            The DataFrame is accessed using the `result` attribute of the PipelineStep. The syntax
            for referencing the DataFrame is `((step:Step Name))`, mind the double parentheses.
    """

    name: str = "TRANSFORM_UNION"

    def run(
        self,
        context: PipelineContext,
        *,
        union_data: list[PipelineStep] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Unions multiple DataFrames together.

        Args:
            context: Context in which this Action is executed.
            union_data: A list of PipelineSteps that define the DataFrames
                to union with the current context.

        Raises:
            ValueError: If no union_data is provided.
            ValueError: If the data from context is None.
            ValueError: If the data from any of the union_data is None.

        Returns:
            Context after the execution of this Action.
        """
        if not union_data:
            raise ValueError("No union_data provided.")

        # Check that all union_data contexts have valid data
        result_contexts = []
        if context.data is None:
            raise ValueError("Data from the context is required for the operation.")

        for ctx in union_data:
            if ctx.result is None or ctx.result.data is None:
                raise ValueError(f"Data from the context of step '{ctx.name}' is required for the operation.")
            result_contexts.append(ctx.result.data)

        # Union all DataFrames
        union_dfs = [context.data] + result_contexts
        df = reduce(DataFrame.unionAll, union_dfs)  # type: ignore

        return context.from_existing(data=df)  # type: ignore

run(context, *, union_data=None, **_)

Unions multiple DataFrames together.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
union_data list[PipelineStep] | None

A list of PipelineSteps that define the DataFrames to union with the current context.

None

Raises:

Type Description
ValueError

If no union_data is provided.

ValueError

If the data from context is None.

ValueError

If the data from any of the union_data is None.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/transform_union.py
def run(
    self,
    context: PipelineContext,
    *,
    union_data: list[PipelineStep] | None = None,
    **_: Any,
) -> PipelineContext:
    """Unions multiple DataFrames together.

    Args:
        context: Context in which this Action is executed.
        union_data: A list of PipelineSteps that define the DataFrames
            to union with the current context.

    Raises:
        ValueError: If no union_data is provided.
        ValueError: If the data from context is None.
        ValueError: If the data from any of the union_data is None.

    Returns:
        Context after the execution of this Action.
    """
    if not union_data:
        raise ValueError("No union_data provided.")

    # Check that all union_data contexts have valid data
    result_contexts = []
    if context.data is None:
        raise ValueError("Data from the context is required for the operation.")

    for ctx in union_data:
        if ctx.result is None or ctx.result.data is None:
            raise ValueError(f"Data from the context of step '{ctx.name}' is required for the operation.")
        result_contexts.append(ctx.result.data)

    # Union all DataFrames
    union_dfs = [context.data] + result_contexts
    df = reduce(DataFrame.unionAll, union_dfs)  # type: ignore

    return context.from_existing(data=df)  # type: ignore

TransformWithColumnAction

Bases: PipelineAction

Add or update a column in the DataFrame using a SQL expression.

This action uses PySpark's expr() function to evaluate SQL expressions and create or update columns in the DataFrame.

Examples:

Create Full Name:
    action: TRANSFORM_WITH_COLUMN
    options:
        column_name: full_name
        expression: concat(first_name, ' ', last_name)
Lowercase Email:
    action: TRANSFORM_WITH_COLUMN
    options:
        column_name: email
        expression: lower(email)
Calculate Total:
    action: TRANSFORM_WITH_COLUMN
    options:
        column_name: total_price
        expression: price * quantity * (1 + tax_rate)
Extract Year:
    action: TRANSFORM_WITH_COLUMN
    options:
        column_name: year
        expression: year(order_date)
Source code in src/cloe_nessy/pipeline/actions/transform_with_column.py
class TransformWithColumnAction(PipelineAction):
    """Add or update a column in the DataFrame using a SQL expression.

    This action uses PySpark's expr() function to evaluate SQL expressions and
    create or update columns in the DataFrame.

    Examples:
        === "Create new column"
            ```yaml
            Create Full Name:
                action: TRANSFORM_WITH_COLUMN
                options:
                    column_name: full_name
                    expression: concat(first_name, ' ', last_name)
            ```

        === "Update existing column"
            ```yaml
            Lowercase Email:
                action: TRANSFORM_WITH_COLUMN
                options:
                    column_name: email
                    expression: lower(email)
            ```

        === "Calculated column"
            ```yaml
            Calculate Total:
                action: TRANSFORM_WITH_COLUMN
                options:
                    column_name: total_price
                    expression: price * quantity * (1 + tax_rate)
            ```

        === "Extract date parts"
            ```yaml
            Extract Year:
                action: TRANSFORM_WITH_COLUMN
                options:
                    column_name: year
                    expression: year(order_date)
            ```
    """

    name: str = "TRANSFORM_WITH_COLUMN"

    def run(
        self,
        context: PipelineContext,
        *,
        column_name: str = "",
        expression: str = "",
        **_: Any,
    ) -> PipelineContext:
        """Add or update a column using a SQL expression.

        Args:
            context: The pipeline context containing the DataFrame
            column_name: Name of the column to create or update
            expression: SQL expression to evaluate for the column value
            **_: Additional unused keyword arguments

        Returns:
            PipelineContext: Updated context with the modified DataFrame

        Raises:
            ValueError: If column_name is not provided
            ValueError: If expression is not provided
            ValueError: If context.data is None
            Exception: If the SQL expression is invalid
        """
        if not column_name:
            raise ValueError("No column_name provided.")

        if not expression:
            raise ValueError("No expression provided.")

        if context.data is None:
            raise ValueError("Data from context is required for transform_with_column")

        self._console_logger.info(f"Adding/updating column '{column_name}' with expression: {expression}")

        df = context.data

        try:
            # Use F.expr() to evaluate the SQL expression
            df = df.withColumn(column_name, F.expr(expression))
        except Exception as e:
            self._console_logger.error(f"Failed to evaluate expression '{expression}' for column '{column_name}': {e}")
            raise

        self._console_logger.info(f"Successfully added/updated column '{column_name}'")

        return context.from_existing(data=df)

run(context, *, column_name='', expression='', **_)

Add or update a column using a SQL expression.

Parameters:

Name Type Description Default
context PipelineContext

The pipeline context containing the DataFrame

required
column_name str

Name of the column to create or update

''
expression str

SQL expression to evaluate for the column value

''
**_ Any

Additional unused keyword arguments

{}

Returns:

Name Type Description
PipelineContext PipelineContext

Updated context with the modified DataFrame

Raises:

Type Description
ValueError

If column_name is not provided

ValueError

If expression is not provided

ValueError

If context.data is None

Exception

If the SQL expression is invalid

Source code in src/cloe_nessy/pipeline/actions/transform_with_column.py
def run(
    self,
    context: PipelineContext,
    *,
    column_name: str = "",
    expression: str = "",
    **_: Any,
) -> PipelineContext:
    """Add or update a column using a SQL expression.

    Args:
        context: The pipeline context containing the DataFrame
        column_name: Name of the column to create or update
        expression: SQL expression to evaluate for the column value
        **_: Additional unused keyword arguments

    Returns:
        PipelineContext: Updated context with the modified DataFrame

    Raises:
        ValueError: If column_name is not provided
        ValueError: If expression is not provided
        ValueError: If context.data is None
        Exception: If the SQL expression is invalid
    """
    if not column_name:
        raise ValueError("No column_name provided.")

    if not expression:
        raise ValueError("No expression provided.")

    if context.data is None:
        raise ValueError("Data from context is required for transform_with_column")

    self._console_logger.info(f"Adding/updating column '{column_name}' with expression: {expression}")

    df = context.data

    try:
        # Use F.expr() to evaluate the SQL expression
        df = df.withColumn(column_name, F.expr(expression))
    except Exception as e:
        self._console_logger.error(f"Failed to evaluate expression '{expression}' for column '{column_name}': {e}")
        raise

    self._console_logger.info(f"Successfully added/updated column '{column_name}'")

    return context.from_existing(data=df)

WriteCatalogTableAction

Bases: PipelineAction

Writes a DataFrame to a specified catalog table using CatalogWriter.

Examples:

Write Table to Catalog:
    action: WRITE_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        mode: append
        partition_by: day
        options:
            mergeSchema: true
Write Table to Catalog Stream:
    action: WRITE_CATALOG_TABLE
    options:
        table_identifier: my_catalog.business_schema.sales_table
        mode: append
        checkpoint_location: /path/to/checkpoint
        trigger_dict:
            processingTime: 10 seconds
        options:
            mergeSchema: true
Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
class WriteCatalogTableAction(PipelineAction):
    """Writes a DataFrame to a specified catalog table using [CatalogWriter][cloe_nessy.integration.writer.CatalogWriter].

    Examples:
        === "Batch Write"
            ```yaml
            Write Table to Catalog:
                action: WRITE_CATALOG_TABLE
                options:
                    table_identifier: my_catalog.business_schema.sales_table
                    mode: append
                    partition_by: day
                    options:
                        mergeSchema: true
            ```
        === "Streaming Write"
            ```yaml
            Write Table to Catalog Stream:
                action: WRITE_CATALOG_TABLE
                options:
                    table_identifier: my_catalog.business_schema.sales_table
                    mode: append
                    checkpoint_location: /path/to/checkpoint
                    trigger_dict:
                        processingTime: 10 seconds
                    options:
                        mergeSchema: true
            ```
    """

    name: str = "WRITE_CATALOG_TABLE"

    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        mode: str = "append",
        partition_by: str | list[str] | None = None,
        options: dict[str, str] | None = None,
        checkpoint_location: str | None = None,
        trigger_dict: dict | None = None,
        await_termination: bool = False,
        **_: Any,
    ) -> PipelineContext:
        """Writes a DataFrame to a specified catalog table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The table identifier in the unity catalog in the
                format 'catalog.schema.table'. If not provided, attempts to use the
                context's table metadata.
            mode: The write mode. One of 'append', 'overwrite', 'error',
                'errorifexists', or 'ignore'.
            partition_by: Names of the partitioning columns.
            checkpoint_location: Location for checkpointing.
            trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
            await_termination: If True, the function will wait for the streaming
                query to finish before returning.
            options: Additional options for the DataFrame write operation.

        Raises:
            ValueError: If the table name is not specified or cannot be inferred from
                the context.

        Returns:
            Context after the execution of this Action.
        """
        if not options:
            options = dict()
        streaming = context.runtime_info and context.runtime_info.get("streaming")
        if streaming and not checkpoint_location:
            raise ValueError("Checkpoint location must be specified for streaming writes.")
        if (
            partition_by is None
            and context.table_metadata is not None
            and hasattr(context.table_metadata, "partition_by")
            and not context.table_metadata.liquid_clustering
        ):
            partition_by = context.table_metadata.partition_by  # type: ignore

        if (table_metadata := context.table_metadata) and table_identifier is None:
            table_identifier = table_metadata.identifier
        if table_identifier is None:
            raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

        if table_metadata:
            manager = TableManager()
            manager.create_table(table=table_metadata, ignore_if_exists=True, replace=False)

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        writer = CatalogWriter()

        if streaming:
            writer.write_stream(
                df=context.data,  # type: ignore
                table_identifier=table_identifier,
                checkpoint_location=checkpoint_location,
                trigger_dict=trigger_dict,
                options=options,
                mode=mode,
                await_termination=await_termination,
            )
        else:
            writer.write(
                df=context.data,  # type: ignore
                table_identifier=table_identifier,
                mode=mode,
                partition_by=partition_by,
                options=options,
            )
        return context.from_existing()

run(context, *, table_identifier=None, mode='append', partition_by=None, options=None, checkpoint_location=None, trigger_dict=None, await_termination=False, **_)

Writes a DataFrame to a specified catalog table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The table identifier in the unity catalog in the format 'catalog.schema.table'. If not provided, attempts to use the context's table metadata.

None
mode str

The write mode. One of 'append', 'overwrite', 'error', 'errorifexists', or 'ignore'.

'append'
partition_by str | list[str] | None

Names of the partitioning columns.

None
checkpoint_location str | None

Location for checkpointing.

None
trigger_dict dict | None

A dictionary specifying the trigger configuration for the streaming query.

None
await_termination bool

If True, the function will wait for the streaming query to finish before returning.

False
options dict[str, str] | None

Additional options for the DataFrame write operation.

None

Raises:

Type Description
ValueError

If the table name is not specified or cannot be inferred from the context.

Returns:

Type Description
PipelineContext

Context after the execution of this Action.

Source code in src/cloe_nessy/pipeline/actions/write_catalog_table.py
def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    mode: str = "append",
    partition_by: str | list[str] | None = None,
    options: dict[str, str] | None = None,
    checkpoint_location: str | None = None,
    trigger_dict: dict | None = None,
    await_termination: bool = False,
    **_: Any,
) -> PipelineContext:
    """Writes a DataFrame to a specified catalog table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The table identifier in the unity catalog in the
            format 'catalog.schema.table'. If not provided, attempts to use the
            context's table metadata.
        mode: The write mode. One of 'append', 'overwrite', 'error',
            'errorifexists', or 'ignore'.
        partition_by: Names of the partitioning columns.
        checkpoint_location: Location for checkpointing.
        trigger_dict: A dictionary specifying the trigger configuration for the streaming query.
        await_termination: If True, the function will wait for the streaming
            query to finish before returning.
        options: Additional options for the DataFrame write operation.

    Raises:
        ValueError: If the table name is not specified or cannot be inferred from
            the context.

    Returns:
        Context after the execution of this Action.
    """
    if not options:
        options = dict()
    streaming = context.runtime_info and context.runtime_info.get("streaming")
    if streaming and not checkpoint_location:
        raise ValueError("Checkpoint location must be specified for streaming writes.")
    if (
        partition_by is None
        and context.table_metadata is not None
        and hasattr(context.table_metadata, "partition_by")
        and not context.table_metadata.liquid_clustering
    ):
        partition_by = context.table_metadata.partition_by  # type: ignore

    if (table_metadata := context.table_metadata) and table_identifier is None:
        table_identifier = table_metadata.identifier
    if table_identifier is None:
        raise ValueError("Table name must be specified or a valid Table object with identifier must be set.")

    if table_metadata:
        manager = TableManager()
        manager.create_table(table=table_metadata, ignore_if_exists=True, replace=False)

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    writer = CatalogWriter()

    if streaming:
        writer.write_stream(
            df=context.data,  # type: ignore
            table_identifier=table_identifier,
            checkpoint_location=checkpoint_location,
            trigger_dict=trigger_dict,
            options=options,
            mode=mode,
            await_termination=await_termination,
        )
    else:
        writer.write(
            df=context.data,  # type: ignore
            table_identifier=table_identifier,
            mode=mode,
            partition_by=partition_by,
            options=options,
        )
    return context.from_existing()

WriteDeltaAppendAction

Bases: PipelineAction

This class implements an Append action for an ETL pipeline.

The WriteDeltaAppendAction appends a Dataframe to Delta Table.

Example
Write Delta Append:
    action: WRITE_DELTA_APPEND
    options:
        table_identifier: my_catalog.my_schema.my_table
        ignore_empty_df: false
Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
class WriteDeltaAppendAction(PipelineAction):
    """This class implements an Append action for an ETL pipeline.

    The WriteDeltaAppendAction appends a Dataframe to Delta Table.

    Example:
        ```yaml
        Write Delta Append:
            action: WRITE_DELTA_APPEND
            options:
                table_identifier: my_catalog.my_schema.my_table
                ignore_empty_df: false
        ```
    """

    name: str = "WRITE_DELTA_APPEND"

    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        ignore_empty_df: bool = False,
        options: dict[str, Any] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Merge the dataframe into the delta table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The identifier of the table. If passed, the
                UC Adapter will be used to create a table object. Otherwise the Table
                object will be created from the table metadata in the context.
            ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
            options: Additional options for the append writer.

        Raises:
            ValueError: If the table does not exist.
            ValueError: If the data is not set in the pipeline context.
            ValueError: If the table metadata is empty.

        Returns:
            Pipeline Context
        """
        delta_append_writer = DeltaAppendWriter()

        if context.data is None:
            raise ValueError("Data is required for the append operation.")
        if context.table_metadata is None and table_identifier is None:
            raise ValueError("Table metadata or a table identifier are required for the append operation.")

        if table_identifier is not None:
            context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
        else:
            if context.table_metadata is None:
                raise ValueError("Table metadata is required.")

        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

        delta_append_writer.write(
            table_identifier=context.table_metadata.identifier,
            table_location=context.table_metadata.storage_path,
            data_frame=context.data,
            ignore_empty_df=ignore_empty_df,
            options=options,
        )

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        return context.from_existing()

run(context, *, table_identifier=None, ignore_empty_df=False, options=None, **_)

Merge the dataframe into the delta table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context.

None
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
options dict[str, Any] | None

Additional options for the append writer.

None

Raises:

Type Description
ValueError

If the table does not exist.

ValueError

If the data is not set in the pipeline context.

ValueError

If the table metadata is empty.

Returns:

Type Description
PipelineContext

Pipeline Context

Source code in src/cloe_nessy/pipeline/actions/write_delta_append.py
def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    ignore_empty_df: bool = False,
    options: dict[str, Any] | None = None,
    **_: Any,
) -> PipelineContext:
    """Merge the dataframe into the delta table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The identifier of the table. If passed, the
            UC Adapter will be used to create a table object. Otherwise the Table
            object will be created from the table metadata in the context.
        ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
        options: Additional options for the append writer.

    Raises:
        ValueError: If the table does not exist.
        ValueError: If the data is not set in the pipeline context.
        ValueError: If the table metadata is empty.

    Returns:
        Pipeline Context
    """
    delta_append_writer = DeltaAppendWriter()

    if context.data is None:
        raise ValueError("Data is required for the append operation.")
    if context.table_metadata is None and table_identifier is None:
        raise ValueError("Table metadata or a table identifier are required for the append operation.")

    if table_identifier is not None:
        context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
    else:
        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

    if context.table_metadata is None:
        raise ValueError("Table metadata is required.")

    delta_append_writer.write(
        table_identifier=context.table_metadata.identifier,
        table_location=context.table_metadata.storage_path,
        data_frame=context.data,
        ignore_empty_df=ignore_empty_df,
        options=options,
    )

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    return context.from_existing()

WriteDeltaMergeAction

Bases: PipelineAction

This class implements a Merge action for an ETL pipeline.

The MergeIntoDeltaAction merges a Dataframe to Delta Table.

Example
# Basic merge with same column names
Write Delta Merge:
    action: WRITE_DELTA_MERGE
    options:
        table_identifier: my_catalog.my_schema.my_table
        key_columns:
            - id
            - customer_id
        cols_to_exclude_from_update:
            - created_at
        when_matched_update: true
        when_not_matched_insert: true
        use_partition_pruning: true

# Merge with different source and target column names
Write Delta Merge with Mapping:
    action: WRITE_DELTA_MERGE
    options:
        table_identifier: my_catalog.my_schema.my_table
        key_columns:
            - customer_id
        column_mapping:
            customer_id: cust_id
            full_name: name
            email_address: email
        when_matched_update: true
        when_not_matched_insert: true
Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
class WriteDeltaMergeAction(PipelineAction):
    """This class implements a Merge action for an ETL pipeline.

    The MergeIntoDeltaAction merges a Dataframe to Delta Table.

    Example:
        ```yaml
        # Basic merge with same column names
        Write Delta Merge:
            action: WRITE_DELTA_MERGE
            options:
                table_identifier: my_catalog.my_schema.my_table
                key_columns:
                    - id
                    - customer_id
                cols_to_exclude_from_update:
                    - created_at
                when_matched_update: true
                when_not_matched_insert: true
                use_partition_pruning: true

        # Merge with different source and target column names
        Write Delta Merge with Mapping:
            action: WRITE_DELTA_MERGE
            options:
                table_identifier: my_catalog.my_schema.my_table
                key_columns:
                    - customer_id
                column_mapping:
                    customer_id: cust_id
                    full_name: name
                    email_address: email
                when_matched_update: true
                when_not_matched_insert: true
        ```
    """

    name: str = "WRITE_DELTA_MERGE"

    def run(
        self,
        context: PipelineContext,
        *,
        table_identifier: str | None = None,
        key_columns: list[str] | None = None,
        cols_to_exclude_from_update: list[str] | None = None,
        column_mapping: dict[str, str] | None = None,
        when_matched_update: bool = True,
        when_matched_delete: bool = False,
        when_not_matched_insert: bool = True,
        use_partition_pruning: bool = True,
        ignore_empty_df: bool = False,
        create_if_not_exists: bool = True,
        refresh_table: bool = True,
        **_: Any,
    ) -> PipelineContext:
        """Merge the dataframe into the delta table.

        Args:
            context: Context in which this Action is executed.
            table_identifier: The identifier of the table. If passed, the
                UC Adapter will be used to create a table object. Otherwise the Table
                object will be created from the table metadata in the context.
            key_columns: List of target column names that form the
                key for the merge operation.
            cols_to_exclude_from_update: List of target column names to be
                excluded from the update operation in the target Delta table.
            column_mapping: Mapping from target column names to source column names.
                Use this when source and target tables have different column names.
                If a column is not in the mapping, it's assumed to have the same name
                in both source and target.
            when_matched_update: Flag to specify whether to
                perform an update operation when matching records are found in
                the target Delta table.
            when_matched_delete: Flag to specify whether to
                perform a delete operation when matching records are found in
                the target Delta table.
            when_not_matched_insert: Flag to specify whether to perform an
                insert operation when matching records are not found in the target
                Delta table.
            use_partition_pruning: Flag to specify whether to use partition
                pruning to optimize the performance of the merge operation.
            ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
            create_if_not_exists: Create the table if it not exists.
            refresh_table: Refresh the table after the transaction.

        Raises:
            ValueError: If the table does not exist.
            ValueError: If the data is not set in the pipeline context.
            ValueError: If the table metadata is empty.

        Returns:
            Pipeline Context
        """
        delta_merge_writer = DeltaMergeWriter()

        if context.data is None:
            raise ValueError("Data is required for the merge operation.")
        if context.table_metadata is None and table_identifier is None:
            raise ValueError("Table metadata or a table identifier are required for the merge operation.")

        if table_identifier is not None:
            context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
        else:
            if context.table_metadata is None:
                raise ValueError("Table metadata is required.")

        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

        if create_if_not_exists:
            delta_merge_writer.table_manager.create_table(table=context.table_metadata, ignore_if_exists=True)

        if not delta_merge_writer.table_manager.table_exists(context.table_metadata):
            raise ValueError(f"Table {context.table_metadata.name} does not exist.")

        assert key_columns is not None, "Key columns must be provided."

        delta_merge_writer.write(
            data_frame=context.data,
            table=context.table_metadata,
            table_identifier=context.table_metadata.identifier,
            storage_path=str(context.table_metadata.storage_path),
            key_columns=key_columns,
            cols_to_exclude_from_update=cols_to_exclude_from_update or [],
            column_mapping=column_mapping or {},
            when_matched_update=when_matched_update,
            when_matched_delete=when_matched_delete,
            when_not_matched_insert=when_not_matched_insert,
            use_partition_pruning=use_partition_pruning,
            partition_by=context.table_metadata.partition_by,
            ignore_empty_df=ignore_empty_df,
        )

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        if refresh_table:
            delta_merge_writer.table_manager.refresh_table(table_identifier=context.table_metadata.identifier)

        return context.from_existing()

run(context, *, table_identifier=None, key_columns=None, cols_to_exclude_from_update=None, column_mapping=None, when_matched_update=True, when_matched_delete=False, when_not_matched_insert=True, use_partition_pruning=True, ignore_empty_df=False, create_if_not_exists=True, refresh_table=True, **_)

Merge the dataframe into the delta table.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
table_identifier str | None

The identifier of the table. If passed, the UC Adapter will be used to create a table object. Otherwise the Table object will be created from the table metadata in the context.

None
key_columns list[str] | None

List of target column names that form the key for the merge operation.

None
cols_to_exclude_from_update list[str] | None

List of target column names to be excluded from the update operation in the target Delta table.

None
column_mapping dict[str, str] | None

Mapping from target column names to source column names. Use this when source and target tables have different column names. If a column is not in the mapping, it's assumed to have the same name in both source and target.

None
when_matched_update bool

Flag to specify whether to perform an update operation when matching records are found in the target Delta table.

True
when_matched_delete bool

Flag to specify whether to perform a delete operation when matching records are found in the target Delta table.

False
when_not_matched_insert bool

Flag to specify whether to perform an insert operation when matching records are not found in the target Delta table.

True
use_partition_pruning bool

Flag to specify whether to use partition pruning to optimize the performance of the merge operation.

True
ignore_empty_df bool

A flag indicating whether to ignore an empty source dataframe.

False
create_if_not_exists bool

Create the table if it not exists.

True
refresh_table bool

Refresh the table after the transaction.

True

Raises:

Type Description
ValueError

If the table does not exist.

ValueError

If the data is not set in the pipeline context.

ValueError

If the table metadata is empty.

Returns:

Type Description
PipelineContext

Pipeline Context

Source code in src/cloe_nessy/pipeline/actions/write_delta_merge.py
def run(
    self,
    context: PipelineContext,
    *,
    table_identifier: str | None = None,
    key_columns: list[str] | None = None,
    cols_to_exclude_from_update: list[str] | None = None,
    column_mapping: dict[str, str] | None = None,
    when_matched_update: bool = True,
    when_matched_delete: bool = False,
    when_not_matched_insert: bool = True,
    use_partition_pruning: bool = True,
    ignore_empty_df: bool = False,
    create_if_not_exists: bool = True,
    refresh_table: bool = True,
    **_: Any,
) -> PipelineContext:
    """Merge the dataframe into the delta table.

    Args:
        context: Context in which this Action is executed.
        table_identifier: The identifier of the table. If passed, the
            UC Adapter will be used to create a table object. Otherwise the Table
            object will be created from the table metadata in the context.
        key_columns: List of target column names that form the
            key for the merge operation.
        cols_to_exclude_from_update: List of target column names to be
            excluded from the update operation in the target Delta table.
        column_mapping: Mapping from target column names to source column names.
            Use this when source and target tables have different column names.
            If a column is not in the mapping, it's assumed to have the same name
            in both source and target.
        when_matched_update: Flag to specify whether to
            perform an update operation when matching records are found in
            the target Delta table.
        when_matched_delete: Flag to specify whether to
            perform a delete operation when matching records are found in
            the target Delta table.
        when_not_matched_insert: Flag to specify whether to perform an
            insert operation when matching records are not found in the target
            Delta table.
        use_partition_pruning: Flag to specify whether to use partition
            pruning to optimize the performance of the merge operation.
        ignore_empty_df: A flag indicating whether to ignore an empty source dataframe.
        create_if_not_exists: Create the table if it not exists.
        refresh_table: Refresh the table after the transaction.

    Raises:
        ValueError: If the table does not exist.
        ValueError: If the data is not set in the pipeline context.
        ValueError: If the table metadata is empty.

    Returns:
        Pipeline Context
    """
    delta_merge_writer = DeltaMergeWriter()

    if context.data is None:
        raise ValueError("Data is required for the merge operation.")
    if context.table_metadata is None and table_identifier is None:
        raise ValueError("Table metadata or a table identifier are required for the merge operation.")

    if table_identifier is not None:
        context.table_metadata = UnityCatalogAdapter().get_table_by_name(table_identifier)
    else:
        if context.table_metadata is None:
            raise ValueError("Table metadata is required.")

    if context.table_metadata is None:
        raise ValueError("Table metadata is required.")

    if create_if_not_exists:
        delta_merge_writer.table_manager.create_table(table=context.table_metadata, ignore_if_exists=True)

    if not delta_merge_writer.table_manager.table_exists(context.table_metadata):
        raise ValueError(f"Table {context.table_metadata.name} does not exist.")

    assert key_columns is not None, "Key columns must be provided."

    delta_merge_writer.write(
        data_frame=context.data,
        table=context.table_metadata,
        table_identifier=context.table_metadata.identifier,
        storage_path=str(context.table_metadata.storage_path),
        key_columns=key_columns,
        cols_to_exclude_from_update=cols_to_exclude_from_update or [],
        column_mapping=column_mapping or {},
        when_matched_update=when_matched_update,
        when_matched_delete=when_matched_delete,
        when_not_matched_insert=when_not_matched_insert,
        use_partition_pruning=use_partition_pruning,
        partition_by=context.table_metadata.partition_by,
        ignore_empty_df=ignore_empty_df,
    )

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    if refresh_table:
        delta_merge_writer.table_manager.refresh_table(table_identifier=context.table_metadata.identifier)

    return context.from_existing()

WriteFileAction

Bases: PipelineAction

This class implements a Write action for an ETL pipeline.

The WriteFileAction writes a Dataframe to a storage location defined in the options using the FileWriter class.

Example
Write to File:
    action: WRITE_FILE
    options:
        path: "path/to/location"
        format: "parquet"
        partition_cols: ["date"]
        mode: "append"
        is_stream: False
        options:
            mergeSchema: true
Source code in src/cloe_nessy/pipeline/actions/write_file.py
class WriteFileAction(PipelineAction):
    """This class implements a Write action for an ETL pipeline.

    The WriteFileAction writes a Dataframe to a storage location defined in the
    options using the [`FileWriter`][cloe_nessy.integration.writer.FileWriter] class.

    Example:
        ```yaml
        Write to File:
            action: WRITE_FILE
            options:
                path: "path/to/location"
                format: "parquet"
                partition_cols: ["date"]
                mode: "append"
                is_stream: False
                options:
                    mergeSchema: true
        ```
    """

    name: str = "WRITE_FILE"

    def run(
        self,
        context: PipelineContext,
        *,
        path: str = "",
        format: str = "delta",
        partition_cols: list[str] | None = None,
        mode: str = "append",
        is_stream: bool = False,
        options: dict[str, str] | None = None,
        **_: Any,
    ) -> PipelineContext:
        """Writes a file to a location.

        Args:
            context: Context in which this Action is executed.
            path: Location to write data to.
            format: Format of files to write.
            partition_cols: Columns to partition on. If None, the writer will try to get the partition
                columns from the metadata. Default None.
            mode: Specifies the behavior when data or table already exists.
            is_stream: If True, use the `write_stream` method of the writer.
            options: Additional options passed to the writer.

        Raises:
            ValueError: If no path is provided.
            ValueError: If the table metadata is empty.

        Returns:
            Pipeline Context
        """
        if not path:
            raise ValueError("No path provided. Please specify path to write data to.")
        if not options:
            options = {}

        if context.data is None:
            raise ValueError("Data context is required for the operation.")

        if partition_cols is None:
            if context.table_metadata is None:
                partition_cols = []
            else:
                partition_cols = context.table_metadata.partition_by
        writer = FileWriter()
        if not is_stream:
            writer.write(
                data_frame=context.data,
                location=path,
                format=format,
                partition_cols=partition_cols,
                mode=mode,
                options=options,
            )
        else:
            writer.write_stream(
                data_frame=context.data,
                location=path,
                format=format,
                mode=mode,
                partition_cols=partition_cols,
                options=options,
            )

        runtime_info = getattr(context, "runtime_info", None)
        if runtime_info and runtime_info.get("is_delta_load"):
            consume_delta_load(runtime_info)

        return context.from_existing()

run(context, *, path='', format='delta', partition_cols=None, mode='append', is_stream=False, options=None, **_)

Writes a file to a location.

Parameters:

Name Type Description Default
context PipelineContext

Context in which this Action is executed.

required
path str

Location to write data to.

''
format str

Format of files to write.

'delta'
partition_cols list[str] | None

Columns to partition on. If None, the writer will try to get the partition columns from the metadata. Default None.

None
mode str

Specifies the behavior when data or table already exists.

'append'
is_stream bool

If True, use the write_stream method of the writer.

False
options dict[str, str] | None

Additional options passed to the writer.

None

Raises:

Type Description
ValueError

If no path is provided.

ValueError

If the table metadata is empty.

Returns:

Type Description
PipelineContext

Pipeline Context

Source code in src/cloe_nessy/pipeline/actions/write_file.py
def run(
    self,
    context: PipelineContext,
    *,
    path: str = "",
    format: str = "delta",
    partition_cols: list[str] | None = None,
    mode: str = "append",
    is_stream: bool = False,
    options: dict[str, str] | None = None,
    **_: Any,
) -> PipelineContext:
    """Writes a file to a location.

    Args:
        context: Context in which this Action is executed.
        path: Location to write data to.
        format: Format of files to write.
        partition_cols: Columns to partition on. If None, the writer will try to get the partition
            columns from the metadata. Default None.
        mode: Specifies the behavior when data or table already exists.
        is_stream: If True, use the `write_stream` method of the writer.
        options: Additional options passed to the writer.

    Raises:
        ValueError: If no path is provided.
        ValueError: If the table metadata is empty.

    Returns:
        Pipeline Context
    """
    if not path:
        raise ValueError("No path provided. Please specify path to write data to.")
    if not options:
        options = {}

    if context.data is None:
        raise ValueError("Data context is required for the operation.")

    if partition_cols is None:
        if context.table_metadata is None:
            partition_cols = []
        else:
            partition_cols = context.table_metadata.partition_by
    writer = FileWriter()
    if not is_stream:
        writer.write(
            data_frame=context.data,
            location=path,
            format=format,
            partition_cols=partition_cols,
            mode=mode,
            options=options,
        )
    else:
        writer.write_stream(
            data_frame=context.data,
            location=path,
            format=format,
            mode=mode,
            partition_cols=partition_cols,
            options=options,
        )

    runtime_info = getattr(context, "runtime_info", None)
    if runtime_info and runtime_info.get("is_delta_load"):
        consume_delta_load(runtime_info)

    return context.from_existing()