Bases: PipelineAction
Performs aggregation operations on grouped data within a DataFrame.
This class allows you to group data by specified columns and apply various aggregation functions
to other columns. The aggregation functions can be specified as a dictionary where keys are column names
and values are either a single aggregation function or a list of functions.
The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation
function as a prefix to the column name.
Example
Transform Group Aggregate:
action: TRANSFORM_GROUP_AGGREGATE
options:
grouping_columns:
- column1
- column2
aggregations:
column3:
- sum
- avg
column4: max
This example groups the DataFrame by column1 and column2 and aggregates column3 by sum and average
and column4 by max. The resulting DataFrame will contain the grouped columns column1 and column2
and the aggregated columns sum_column3, avg_column3, and max_column4.
Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
| class TransformGroupAggregate(PipelineAction):
"""Performs aggregation operations on grouped data within a DataFrame.
This class allows you to group data by specified columns and apply various aggregation functions
to other columns. The aggregation functions can be specified as a dictionary where keys are column names
and values are either a single aggregation function or a list of functions.
The output DataFrame will contain the grouped columns and the aggregated columns with the aggregation
function as a prefix to the column name.
Example:
```yaml
Transform Group Aggregate:
action: TRANSFORM_GROUP_AGGREGATE
options:
grouping_columns:
- column1
- column2
aggregations:
column3:
- sum
- avg
column4: max
```
This example groups the DataFrame by `column1` and `column2` and aggregates `column3` by sum and average
and `column4` by max. The resulting DataFrame will contain the grouped columns `column1` and `column2`
and the aggregated columns `sum_column3`, `avg_column3`, and `max_column4`.
"""
name: str = "TRANSFORM_GROUP_AGGREGATE"
def run(
self,
context: PipelineContext,
*,
grouping_columns: list[str] | None = None,
aggregations: dict[str, str | list] | None = None,
**_: Any,
) -> PipelineContext:
"""Executes the aggregation on the grouped data.
Args:
context: The context in which this action is executed.
grouping_columns: A list of columns to group by.
aggregations: A dictionary where keys are column names and values are either a single
aggregation function or a list of functions.
Raises:
ValueError: If the context data is None.
ValueError: If no aggregations are provided.
ValueError: If invalid aggregation operations are provided.
ValueError: If columns with unsupported data types are included in the aggregations.
Returns:
PipelineContext: The context after the execution of this action.
"""
if context.data is None:
raise ValueError("Data from the context is required for the operation.")
if grouping_columns is None:
raise ValueError("Please provide at least one grouping column")
if aggregations is None:
raise ValueError("Please provide aggregations.")
valid_operations = ["avg", "max", "min", "mean", "sum", "count"]
for operation in aggregations.values():
if isinstance(operation, list):
if not set(operation).issubset(valid_operations):
raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
elif isinstance(operation, str):
if operation not in valid_operations:
raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
else:
raise ValueError("OPERATION DATATYPE INVALID")
aggregation_list = []
for column_name, aggregation in aggregations.items():
if isinstance(aggregation, list):
for subaggregation in aggregation:
aggregation_list.append(
getattr(F, subaggregation)(column_name).alias(f"{subaggregation}_{column_name}")
)
else:
aggregation_list.append(getattr(F, aggregation)(column_name).alias(f"{aggregation}_{column_name}"))
df = context.data.groupBy(grouping_columns).agg(*aggregation_list)
return context.from_existing(data=df)
|
Executes the aggregation on the grouped data.
Parameters:
| Name |
Type |
Description |
Default |
context
|
PipelineContext
|
The context in which this action is executed.
|
required
|
grouping_columns
|
list[str] | None
|
A list of columns to group by.
|
None
|
aggregations
|
dict[str, str | list] | None
|
A dictionary where keys are column names and values are either a single
aggregation function or a list of functions.
|
None
|
Raises:
| Type |
Description |
ValueError
|
If the context data is None.
|
ValueError
|
If no aggregations are provided.
|
ValueError
|
If invalid aggregation operations are provided.
|
ValueError
|
If columns with unsupported data types are included in the aggregations.
|
Returns:
| Name | Type |
Description |
PipelineContext |
PipelineContext
|
The context after the execution of this action.
|
Source code in src/cloe_nessy/pipeline/actions/transform_group_aggregate.py
| def run(
self,
context: PipelineContext,
*,
grouping_columns: list[str] | None = None,
aggregations: dict[str, str | list] | None = None,
**_: Any,
) -> PipelineContext:
"""Executes the aggregation on the grouped data.
Args:
context: The context in which this action is executed.
grouping_columns: A list of columns to group by.
aggregations: A dictionary where keys are column names and values are either a single
aggregation function or a list of functions.
Raises:
ValueError: If the context data is None.
ValueError: If no aggregations are provided.
ValueError: If invalid aggregation operations are provided.
ValueError: If columns with unsupported data types are included in the aggregations.
Returns:
PipelineContext: The context after the execution of this action.
"""
if context.data is None:
raise ValueError("Data from the context is required for the operation.")
if grouping_columns is None:
raise ValueError("Please provide at least one grouping column")
if aggregations is None:
raise ValueError("Please provide aggregations.")
valid_operations = ["avg", "max", "min", "mean", "sum", "count"]
for operation in aggregations.values():
if isinstance(operation, list):
if not set(operation).issubset(valid_operations):
raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
elif isinstance(operation, str):
if operation not in valid_operations:
raise ValueError(f"Please provide valid operations. Valid operations are {valid_operations}")
else:
raise ValueError("OPERATION DATATYPE INVALID")
aggregation_list = []
for column_name, aggregation in aggregations.items():
if isinstance(aggregation, list):
for subaggregation in aggregation:
aggregation_list.append(
getattr(F, subaggregation)(column_name).alias(f"{subaggregation}_{column_name}")
)
else:
aggregation_list.append(getattr(F, aggregation)(column_name).alias(f"{aggregation}_{column_name}"))
df = context.data.groupBy(grouping_columns).agg(*aggregation_list)
return context.from_existing(data=df)
|