diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index bf910d2..4fe4c02 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -15,6 +15,13 @@ +* Added client methods for the new `platformassets.v1alpha1` RPCs: + `list_gridpools()`, `list_gridpool_energy_schedules()`, + `list_market_topology_relations()`, `list_microgrids()`, and + `list_microgrid_sensors()`. +* Added `component_ids` and `categories` filters to + `list_microgrid_electrical_components()`. + ## Bug Fixes diff --git a/pyproject.toml b/pyproject.toml index 391d5ad..a5c8b91 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,8 +40,8 @@ dependencies = [ "frequenz-api-assets >= 0.3.0, < 0.4.0", "frequenz-api-common >= 0.8.9, < 1", "frequenz-client-base >= 0.11.0, < 0.12.0", - "frequenz-client-common >= 0.3.6, < 0.4.0", - "grpcio >= 1.80.0, < 2", + "frequenz-client-common >= 0.3.8, < 0.4.0", + "grpcio >= 1.81.0, < 2", ] dynamic = ["version"] diff --git a/src/frequenz/client/assets/__init__.py b/src/frequenz/client/assets/__init__.py index 2f3c56a..b94604d 100644 --- a/src/frequenz/client/assets/__init__.py +++ b/src/frequenz/client/assets/__init__.py @@ -3,18 +3,48 @@ """Assets API client.""" +from ._balancing_group import BalancingGroup from ._client import AssetsApiClient from ._delivery_area import DeliveryArea, EnergyMarketCodeType +from ._gridpool import Gridpool +from ._gridpool_energy_schedule import ( + DeliveryDuration, + GridpoolEnergySchedule, + GridpoolEnergyScheduleDirection, + GridpoolEnergyScheduleTimeSeriesEntry, +) +from ._interval import Interval from ._lifetime import Lifetime from ._location import Location +from ._market_location import MarketLocation, MarketLocationId, MarketLocationIdType +from ._market_topology import ( + MarketParticipation, + MarketParticipationType, + MarketTopologyRelation, +) from ._microgrid import Microgrid, MicrogridStatus +from ._sensor import Sensor __all__ = [ "AssetsApiClient", + "BalancingGroup", "DeliveryArea", + "DeliveryDuration", "EnergyMarketCodeType", + "Gridpool", + "GridpoolEnergySchedule", + "GridpoolEnergyScheduleDirection", + "GridpoolEnergyScheduleTimeSeriesEntry", + "Interval", "Microgrid", "MicrogridStatus", "Location", "Lifetime", + "MarketLocation", + "MarketLocationId", + "MarketLocationIdType", + "MarketParticipation", + "MarketParticipationType", + "MarketTopologyRelation", + "Sensor", ] diff --git a/src/frequenz/client/assets/_balancing_group.py b/src/frequenz/client/assets/_balancing_group.py new file mode 100644 index 0000000..21cc621 --- /dev/null +++ b/src/frequenz/client/assets/_balancing_group.py @@ -0,0 +1,19 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Balancing group definitions.""" + +from dataclasses import dataclass + +from ._delivery_area import EnergyMarketCodeType + + +@dataclass(frozen=True, kw_only=True) +class BalancingGroup: + """A market balancing group identified by its market code.""" + + code: str | None + """The balancing group code.""" + + code_type: EnergyMarketCodeType | int + """The type of market code used to identify the balancing group.""" diff --git a/src/frequenz/client/assets/_balancing_group_proto.py b/src/frequenz/client/assets/_balancing_group_proto.py new file mode 100644 index 0000000..85d2f44 --- /dev/null +++ b/src/frequenz/client/assets/_balancing_group_proto.py @@ -0,0 +1,20 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Conversion of BalancingGroup objects from protobuf messages.""" + +from frequenz.api.common.v1alpha8.grid import balancing_group_pb2 +from frequenz.client.common.proto import enum_from_proto + +from ._balancing_group import BalancingGroup +from ._delivery_area import EnergyMarketCodeType + + +def balancing_group_from_proto( + message: balancing_group_pb2.BalancingGroup, +) -> BalancingGroup: + """Convert a protobuf balancing group message to a balancing group object.""" + return BalancingGroup( + code=message.code or None, + code_type=enum_from_proto(message.code_type, EnergyMarketCodeType), + ) diff --git a/src/frequenz/client/assets/_client.py b/src/frequenz/client/assets/_client.py index 62da79b..1755065 100644 --- a/src/frequenz/client/assets/_client.py +++ b/src/frequenz/client/assets/_client.py @@ -9,8 +9,13 @@ from __future__ import annotations -from collections.abc import Iterable +from collections.abc import Callable, Iterable +from enum import Enum +from typing import TypeVar +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) from frequenz.api.platformassets.v1alpha1 import ( platformassets_pb2, platformassets_pb2_grpc, @@ -19,9 +24,27 @@ from frequenz.client.base.client import BaseApiClient, call_stub_method from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.electrical_components import ElectricalComponentId - +from frequenz.client.common.microgrid.sensors import SensorId + +from ._delivery_area import DeliveryArea +from ._delivery_area_proto import delivery_area_to_proto +from ._gridpool import Gridpool +from ._gridpool_energy_schedule import ( + GridpoolEnergySchedule, + GridpoolEnergyScheduleDirection, +) +from ._gridpool_energy_schedule_proto import gridpool_energy_schedule_from_proto +from ._gridpool_proto import gridpool_from_proto +from ._interval import Interval +from ._interval_proto import interval_to_proto +from ._market_location_proto import market_location_id_value_to_proto +from ._market_topology import MarketParticipationType, MarketTopologyRelation +from ._market_topology_proto import market_topology_relation_from_proto from ._microgrid import Microgrid from ._microgrid_proto import microgrid_from_proto, microgrid_from_proto_with_issues +from ._sensor import Sensor +from ._sensor_proto import sensor_from_proto +from .electrical_component._category import ElectricalComponentCategory from .electrical_component._connection import ComponentConnection from .electrical_component._connection_proto import ( component_connection_from_proto, @@ -42,6 +65,21 @@ DEFAULT_GRPC_CALL_TIMEOUT = 60.0 """The default timeout for gRPC calls made by this client (in seconds).""" +_ProtoEnumValue = TypeVar("_ProtoEnumValue", bound=int) + + +def _enum_value(value: Enum | int) -> int: + """Return the integer value for an enum or integer filter argument.""" + return value.value if isinstance(value, Enum) else value + + +def _proto_enum_values( + values: Iterable[Enum | int], + value_type: Callable[[int], _ProtoEnumValue], +) -> list[_ProtoEnumValue]: + """Return protobuf enum values for enum or integer filter arguments.""" + return [value_type(_enum_value(value)) for value in values] + class AssetsApiClient( BaseApiClient[platformassets_pb2_grpc.PlatformAssetsServiceStub] @@ -102,6 +140,163 @@ def stub(self) -> platformassets_pb2_grpc.PlatformAssetsServiceAsyncStub: # use the async stub, so we cast the sync stub to the async stub. return self._stub # type: ignore + async def list_gridpools( # noqa: DOC502 (raises indirectly) + self, + gridpool_ids: Iterable[int] = (), + ) -> list[Gridpool]: + """ + List gridpools within the current enterprise scope. + + Args: + gridpool_ids: Only return gridpools whose IDs are included in this list. + If empty, no filtering is applied. + + Returns: + The matching gridpools. + + Raises: + ApiClientError: If there are any errors communicating with the Assets API, + most likely a subclass of [GrpcError][frequenz.client.base.exception.GrpcError]. + """ + request = platformassets_pb2.ListGridpoolsRequest() + if ids := [int(gridpool_id) for gridpool_id in gridpool_ids]: + request.filter.gridpool_ids.extend(ids) + + response = await call_stub_method( + self, + lambda: self.stub.ListGridpools( + request, + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ), + method_name="ListGridpools", + ) + + return [gridpool_from_proto(gridpool) for gridpool in response.gridpools] + + async def list_gridpool_energy_schedules( # noqa: DOC502 (raises indirectly) + self, + gridpool_id: int, + schedule_ids: Iterable[int] = (), + directions: Iterable[GridpoolEnergyScheduleDirection | int] = (), + *, + time_series_interval: Interval | None = None, + effective_validity_period: Interval | None = None, + ) -> list[GridpoolEnergySchedule]: + """ + List energy schedules for a gridpool. + + Args: + gridpool_id: The ID of the gridpool whose schedules should be listed. + schedule_ids: Only return schedules whose IDs are included in this list. + If empty, no schedule-ID filtering is applied. + directions: Only return schedules with one of these directions. If empty, + no direction filtering is applied. + time_series_interval: Restrict returned time-series entries to delivery + periods that overlap this interval. + effective_validity_period: Only return schedules whose effective validity + period overlaps this interval. + + Returns: + The matching gridpool energy schedules. + + Raises: + ApiClientError: If there are any errors communicating with the Assets API, + most likely a subclass of [GrpcError][frequenz.client.base.exception.GrpcError]. + """ + request = platformassets_pb2.ListGridpoolEnergySchedulesRequest( + gridpool_id=int(gridpool_id), + ) + if ids := [int(schedule_id) for schedule_id in schedule_ids]: + request.filter.schedule_ids.extend(ids) + if direction_values := _proto_enum_values( + directions, + platformassets_pb2.GridpoolEnergyScheduleDirection.ValueType, + ): + request.filter.directions.extend(direction_values) + if time_series_interval is not None: + request.filter.time_series_interval.CopyFrom( + interval_to_proto(time_series_interval) + ) + if effective_validity_period is not None: + request.filter.effective_validity_period.CopyFrom( + interval_to_proto(effective_validity_period) + ) + + response = await call_stub_method( + self, + lambda: self.stub.ListGridpoolEnergySchedules( + request, + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ), + method_name="ListGridpoolEnergySchedules", + ) + + return [ + gridpool_energy_schedule_from_proto(schedule) + for schedule in response.schedules + ] + + async def list_market_topology_relations( # noqa: DOC502 (raises indirectly) + self, + *, + gridpool_ids: Iterable[int] = (), + microgrid_ids: Iterable[MicrogridId] = (), + market_location_id_values: Iterable[str] = (), + delivery_areas: Iterable[DeliveryArea] = (), + participation_types: Iterable[MarketParticipationType | int] = (), + ) -> list[MarketTopologyRelation]: + """ + List market-topology relations within the current enterprise scope. + + Args: + gridpool_ids: Only return relations involving any of these gridpools. + microgrid_ids: Only return relations involving any of these microgrids. + market_location_id_values: Only return relations involving market + locations whose ID values match any of these values. + delivery_areas: Only return relations applying to any of these delivery + areas. + participation_types: Only return relations that include at least one + participation with one of these types. + + Returns: + The matching market-topology relations. + + Raises: + ApiClientError: If there are any errors communicating with the Assets API, + most likely a subclass of [GrpcError][frequenz.client.base.exception.GrpcError]. + """ + request = platformassets_pb2.ListMarketTopologyRelationsRequest() + if ids := [int(gridpool_id) for gridpool_id in gridpool_ids]: + request.filter.gridpool_ids.extend(ids) + if ids := [int(microgrid_id) for microgrid_id in microgrid_ids]: + request.filter.microgrid_ids.extend(ids) + if values := [ + market_location_id_value_to_proto(value) + for value in market_location_id_values + ]: + request.filter.market_location_id_values.extend(values) + if areas := [delivery_area_to_proto(area) for area in delivery_areas]: + request.filter.delivery_areas.extend(areas) + if types := _proto_enum_values( + participation_types, + platformassets_pb2.MarketParticipationType.ValueType, + ): + request.filter.participation_types.extend(types) + + response = await call_stub_method( + self, + lambda: self.stub.ListMarketTopologyRelations( + request, + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ), + method_name="ListMarketTopologyRelations", + ) + + return [ + market_topology_relation_from_proto(relation) + for relation in response.relations + ] + async def get_microgrid( # noqa: DOC502,DOC503 (raises indirectly) self, microgrid_id: MicrogridId, @@ -155,9 +350,86 @@ async def get_microgrid( # noqa: DOC502,DOC503 (raises indirectly) return microgrid_from_proto(response.microgrid) + async def list_microgrids( # noqa: DOC502,DOC503 (raises indirectly) + self, + microgrid_ids: Iterable[MicrogridId] = (), + gridpool_ids: Iterable[int] = (), + *, + raise_on_errors: bool = False, + ) -> list[Microgrid]: + """ + List microgrids within the current enterprise scope. + + Args: + microgrid_ids: Only return microgrids whose IDs are included in this list. + If empty, no microgrid-ID filtering is applied. + gridpool_ids: Only return microgrids that are part of a market-topology + relation involving any of these gridpools. + raise_on_errors: If True, raise an `ExceptionGroup[InvalidMicrogridError]` + when major validation issues are found in any microgrid instead of + just logging them. + + Returns: + The matching microgrids. + + Raises: + ApiClientError: If there are any errors communicating with the Assets API, + most likely a subclass of [GrpcError][frequenz.client.base.exception.GrpcError]. + ExceptionGroup: If `raise_on_errors` is True and major validation + issues are found. All exceptions in the group are + [InvalidMicrogridError][frequenz.client.assets.exceptions.InvalidMicrogridError]. + """ + request = platformassets_pb2.ListMicrogridsRequest() + if ids := [int(microgrid_id) for microgrid_id in microgrid_ids]: + request.filter.microgrid_ids.extend(ids) + if ids := [int(gridpool_id) for gridpool_id in gridpool_ids]: + request.filter.gridpool_ids.extend(ids) + + response = await call_stub_method( + self, + lambda: self.stub.ListMicrogrids( + request, + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ), + method_name="ListMicrogrids", + ) + + if raise_on_errors: + microgrids: list[Microgrid] = [] + exceptions: list[InvalidMicrogridError] = [] + for microgrid_pb in response.microgrids: + major_issues: list[str] = [] + minor_issues: list[str] = [] + microgrid = microgrid_from_proto_with_issues( + microgrid_pb, + major_issues=major_issues, + minor_issues=minor_issues, + ) + if major_issues: + exceptions.append( + InvalidMicrogridError( + microgrid=microgrid, + major_issues=major_issues, + minor_issues=minor_issues, + raw_message=microgrid_pb, + ) + ) + else: + microgrids.append(microgrid) + if exceptions: + raise ExceptionGroup( + f"{len(exceptions)} microgrid(s) failed validation", + exceptions, + ) + return microgrids + + return [microgrid_from_proto(microgrid) for microgrid in response.microgrids] + async def list_microgrid_electrical_components( self, microgrid_id: MicrogridId, + component_ids: Iterable[ElectricalComponentId] = (), + categories: Iterable[ElectricalComponentCategory | int] = (), *, raise_on_errors: bool = False, ) -> list[ElectricalComponent]: @@ -166,6 +438,10 @@ async def list_microgrid_electrical_components( Args: microgrid_id: The ID of the microgrid to get the electrical components of. + component_ids: Only return components whose IDs are included in this list. + If empty, no component-ID filtering is applied. + categories: Only return components whose categories are included in this + list. If empty, no category filtering is applied. raise_on_errors: If True, raise an `ExceptionGroup[InvalidElectricalComponentError]` when major validation issues are found in any component instead @@ -179,12 +455,21 @@ async def list_microgrid_electrical_components( issues are found. All exceptions in the group are [InvalidElectricalComponentError][frequenz.client.assets.exceptions.InvalidElectricalComponentError]. """ + request = platformassets_pb2.ListMicrogridElectricalComponentsRequest( + microgrid_id=int(microgrid_id), + ) + if ids := [int(component_id) for component_id in component_ids]: + request.filter.component_ids.extend(ids) + if category_values := _proto_enum_values( + categories, + electrical_components_pb2.ElectricalComponentCategory.ValueType, + ): + request.filter.categories.extend(category_values) + response = await call_stub_method( self, lambda: self.stub.ListMicrogridElectricalComponents( - platformassets_pb2.ListMicrogridElectricalComponentsRequest( - microgrid_id=int(microgrid_id), - ), + request, timeout=DEFAULT_GRPC_CALL_TIMEOUT, ), method_name="ListMicrogridElectricalComponents", @@ -303,3 +588,40 @@ async def list_microgrid_electrical_component_connections( for c in map(component_connection_from_proto, response.connections) if c is not None ] + + async def list_microgrid_sensors( # noqa: DOC502 (raises indirectly) + self, + microgrid_id: MicrogridId, + sensor_ids: Iterable[SensorId] = (), + ) -> list[Sensor]: + """ + List sensors in a microgrid. + + Args: + microgrid_id: The ID of the microgrid whose sensors should be listed. + sensor_ids: Only return sensors whose IDs are included in this list. If + empty, no filtering is applied. + + Returns: + The matching sensors. + + Raises: + ApiClientError: If there are any errors communicating with the Assets API, + most likely a subclass of [GrpcError][frequenz.client.base.exception.GrpcError]. + """ + request = platformassets_pb2.ListMicrogridSensorsRequest( + microgrid_id=int(microgrid_id), + ) + if ids := [int(sensor_id) for sensor_id in sensor_ids]: + request.filter.sensor_ids.extend(ids) + + response = await call_stub_method( + self, + lambda: self.stub.ListMicrogridSensors( + request, + timeout=DEFAULT_GRPC_CALL_TIMEOUT, + ), + method_name="ListMicrogridSensors", + ) + + return [sensor_from_proto(sensor) for sensor in response.sensors] diff --git a/src/frequenz/client/assets/_delivery_area_proto.py b/src/frequenz/client/assets/_delivery_area_proto.py index 08ea77a..bf4aa40 100644 --- a/src/frequenz/client/assets/_delivery_area_proto.py +++ b/src/frequenz/client/assets/_delivery_area_proto.py @@ -6,7 +6,7 @@ import logging from frequenz.api.common.v1alpha8.grid import delivery_area_pb2 -from frequenz.client.common import enum_proto +from frequenz.client.common.proto import enum_from_proto from ._delivery_area import DeliveryArea, EnergyMarketCodeType @@ -28,7 +28,9 @@ def delivery_area_from_proto(message: delivery_area_pb2.DeliveryArea) -> Deliver if code is None: issues.append("code is empty") - code_type = enum_proto.enum_from_proto(message.code_type, EnergyMarketCodeType) + code_type: EnergyMarketCodeType | int = enum_from_proto( + message.code_type, EnergyMarketCodeType + ) if code_type is EnergyMarketCodeType.UNSPECIFIED: issues.append("code_type is unspecified") elif isinstance(code_type, int): @@ -42,3 +44,17 @@ def delivery_area_from_proto(message: delivery_area_pb2.DeliveryArea) -> Deliver ) return DeliveryArea(code=code, code_type=code_type) + + +def delivery_area_to_proto( + delivery_area: DeliveryArea, +) -> delivery_area_pb2.DeliveryArea: + """Convert a delivery area object to a protobuf message.""" + return delivery_area_pb2.DeliveryArea( + code=delivery_area.code or "", + code_type=delivery_area_pb2.EnergyMarketCodeType.ValueType( + delivery_area.code_type.value + if isinstance(delivery_area.code_type, EnergyMarketCodeType) + else delivery_area.code_type + ), + ) diff --git a/src/frequenz/client/assets/_gridpool.py b/src/frequenz/client/assets/_gridpool.py new file mode 100644 index 0000000..b48fb18 --- /dev/null +++ b/src/frequenz/client/assets/_gridpool.py @@ -0,0 +1,17 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Gridpool definitions.""" + +from dataclasses import dataclass + + +@dataclass(frozen=True, kw_only=True) +class Gridpool: + """A virtual balancing-group structure used for market interactions.""" + + id: int + """The unique identifier of the gridpool.""" + + name: str | None + """The human-readable gridpool name.""" diff --git a/src/frequenz/client/assets/_gridpool_energy_schedule.py b/src/frequenz/client/assets/_gridpool_energy_schedule.py new file mode 100644 index 0000000..d7abf5d --- /dev/null +++ b/src/frequenz/client/assets/_gridpool_energy_schedule.py @@ -0,0 +1,98 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Gridpool energy schedule definitions.""" + +import enum +from dataclasses import dataclass +from datetime import datetime + +from frequenz.api.common.v1alpha8.grid import delivery_duration_pb2 +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 + +from ._balancing_group import BalancingGroup +from ._delivery_area import DeliveryArea +from ._interval import Interval + + +@enum.unique +class DeliveryDuration(enum.Enum): + """Delivery duration used by scheduled energy values.""" + + UNSPECIFIED = delivery_duration_pb2.DELIVERY_DURATION_UNSPECIFIED + """The delivery duration is unspecified.""" + + MINUTES_5 = delivery_duration_pb2.DELIVERY_DURATION_5 + """A 5-minute delivery duration.""" + + MINUTES_15 = delivery_duration_pb2.DELIVERY_DURATION_15 + """A 15-minute delivery duration.""" + + MINUTES_30 = delivery_duration_pb2.DELIVERY_DURATION_30 + """A 30-minute delivery duration.""" + + MINUTES_60 = delivery_duration_pb2.DELIVERY_DURATION_60 + """A 60-minute delivery duration.""" + + +@enum.unique +class GridpoolEnergyScheduleDirection(enum.Enum): + """Direction of a scheduled energy exchange.""" + + UNSPECIFIED = platformassets_pb2.GRIDPOOL_ENERGY_SCHEDULE_DIRECTION_UNSPECIFIED + """The direction is unspecified.""" + + IMPORT = platformassets_pb2.GRIDPOOL_ENERGY_SCHEDULE_DIRECTION_IMPORT + """Energy is imported into the Frequenz balancing group.""" + + EXPORT = platformassets_pb2.GRIDPOOL_ENERGY_SCHEDULE_DIRECTION_EXPORT + """Energy is exported from the Frequenz balancing group.""" + + +@dataclass(frozen=True, kw_only=True) +class GridpoolEnergyScheduleTimeSeriesEntry: + """A scheduled active-power value for one delivery period.""" + + start_time: datetime | None + """The inclusive start timestamp of the scheduled delivery value.""" + + active_power_w: float + """Scheduled active power in watts.""" + + +@dataclass(frozen=True, kw_only=True) +class GridpoolEnergySchedule: # pylint: disable=too-many-instance-attributes + """A static energy schedule associated with a gridpool.""" + + gridpool_id: int + """The unique identifier of the gridpool this schedule belongs to.""" + + schedule_id: int + """The unique identifier of the energy schedule.""" + + name: str | None + """The human-readable schedule name.""" + + counterparty_balancing_group: BalancingGroup | None + """The third-party balancing group involved in the scheduled exchange.""" + + counterparty_delivery_area: DeliveryArea | None + """Delivery area of the counterparty side of the scheduled exchange.""" + + frequenz_delivery_area: DeliveryArea | None + """Delivery area of the Frequenz side of the scheduled exchange.""" + + direction: GridpoolEnergyScheduleDirection | int + """Direction of the scheduled exchange from the Frequenz perspective.""" + + validity_period: Interval | None + """Validity interval of the schedule configuration.""" + + cancel_time: datetime | None + """Timestamp at which the schedule was cancelled.""" + + delivery_duration: DeliveryDuration | int + """Delivery duration used by all time-series entries in this schedule.""" + + time_series: list[GridpoolEnergyScheduleTimeSeriesEntry] + """Scheduled active-power values.""" diff --git a/src/frequenz/client/assets/_gridpool_energy_schedule_proto.py b/src/frequenz/client/assets/_gridpool_energy_schedule_proto.py new file mode 100644 index 0000000..349823c --- /dev/null +++ b/src/frequenz/client/assets/_gridpool_energy_schedule_proto.py @@ -0,0 +1,74 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Conversion of GridpoolEnergySchedule objects from protobuf messages.""" + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 +from frequenz.client.base import conversion +from frequenz.client.common.proto import enum_from_proto + +from ._balancing_group_proto import balancing_group_from_proto +from ._delivery_area_proto import delivery_area_from_proto +from ._gridpool_energy_schedule import ( + DeliveryDuration, + GridpoolEnergySchedule, + GridpoolEnergyScheduleDirection, + GridpoolEnergyScheduleTimeSeriesEntry, +) +from ._interval_proto import interval_from_proto + + +def gridpool_energy_schedule_time_series_entry_from_proto( + message: platformassets_pb2.GridpoolEnergyScheduleTimeSeriesEntry, +) -> GridpoolEnergyScheduleTimeSeriesEntry: + """Convert a protobuf schedule time-series entry to a domain object.""" + return GridpoolEnergyScheduleTimeSeriesEntry( + start_time=( + conversion.to_datetime(message.start_time) + if message.HasField("start_time") + else None + ), + active_power_w=message.active_power_w, + ) + + +def gridpool_energy_schedule_from_proto( + message: platformassets_pb2.GridpoolEnergySchedule, +) -> GridpoolEnergySchedule: + """Convert a protobuf gridpool energy schedule to a domain object.""" + return GridpoolEnergySchedule( + gridpool_id=message.gridpool_id, + schedule_id=message.schedule_id, + name=message.name or None, + counterparty_balancing_group=( + balancing_group_from_proto(message.counterparty_balancing_group) + if message.HasField("counterparty_balancing_group") + else None + ), + counterparty_delivery_area=( + delivery_area_from_proto(message.counterparty_delivery_area) + if message.HasField("counterparty_delivery_area") + else None + ), + frequenz_delivery_area=( + delivery_area_from_proto(message.frequenz_delivery_area) + if message.HasField("frequenz_delivery_area") + else None + ), + direction=enum_from_proto(message.direction, GridpoolEnergyScheduleDirection), + validity_period=( + interval_from_proto(message.validity_period) + if message.HasField("validity_period") + else None + ), + cancel_time=( + conversion.to_datetime(message.cancel_time) + if message.HasField("cancel_time") + else None + ), + delivery_duration=enum_from_proto(message.delivery_duration, DeliveryDuration), + time_series=[ + gridpool_energy_schedule_time_series_entry_from_proto(entry) + for entry in message.time_series + ], + ) diff --git a/src/frequenz/client/assets/_gridpool_proto.py b/src/frequenz/client/assets/_gridpool_proto.py new file mode 100644 index 0000000..734aa76 --- /dev/null +++ b/src/frequenz/client/assets/_gridpool_proto.py @@ -0,0 +1,13 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Conversion of Gridpool objects from protobuf messages.""" + +from frequenz.api.common.v1alpha8.gridpool import gridpool_pb2 + +from ._gridpool import Gridpool + + +def gridpool_from_proto(message: gridpool_pb2.Gridpool) -> Gridpool: + """Convert a protobuf gridpool message to a gridpool object.""" + return Gridpool(id=message.id, name=message.name or None) diff --git a/src/frequenz/client/assets/_interval.py b/src/frequenz/client/assets/_interval.py new file mode 100644 index 0000000..5d86d19 --- /dev/null +++ b/src/frequenz/client/assets/_interval.py @@ -0,0 +1,25 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Time interval filters.""" + +from dataclasses import dataclass +from datetime import datetime + + +@dataclass(frozen=True, kw_only=True) +class Interval: + """A half-open time interval: `[start, end)`.""" + + start: datetime | None = None + """The inclusive start of the interval.""" + + end: datetime | None = None + """The exclusive end of the interval.""" + + def __post_init__(self) -> None: + """Validate this interval.""" + if self.start is not None and self.end is not None and self.start > self.end: + raise ValueError( + f"Start ({self.start}) must be before or equal to end ({self.end})" + ) diff --git a/src/frequenz/client/assets/_interval_proto.py b/src/frequenz/client/assets/_interval_proto.py new file mode 100644 index 0000000..ca4066f --- /dev/null +++ b/src/frequenz/client/assets/_interval_proto.py @@ -0,0 +1,35 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Conversion of Interval objects to/from protobuf messages.""" + +from frequenz.api.common.v1alpha8.types import interval_pb2 +from frequenz.client.base import conversion + +from ._interval import Interval + + +def interval_from_proto(message: interval_pb2.Interval) -> Interval: + """Convert a protobuf interval message to an interval object.""" + return Interval( + start=( + conversion.to_datetime(message.start_time) + if message.HasField("start_time") + else None + ), + end=( + conversion.to_datetime(message.end_time) + if message.HasField("end_time") + else None + ), + ) + + +def interval_to_proto(interval: Interval) -> interval_pb2.Interval: + """Convert an interval object to a protobuf interval message.""" + message = interval_pb2.Interval() + if interval.start is not None: + message.start_time.CopyFrom(conversion.to_timestamp(interval.start)) + if interval.end is not None: + message.end_time.CopyFrom(conversion.to_timestamp(interval.end)) + return message diff --git a/src/frequenz/client/assets/_market_location.py b/src/frequenz/client/assets/_market_location.py new file mode 100644 index 0000000..f9a5bab --- /dev/null +++ b/src/frequenz/client/assets/_market_location.py @@ -0,0 +1,78 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Market location definitions.""" + +import enum +from dataclasses import dataclass + +from frequenz.api.common.v1alpha8.grid import market_location_pb2 + + +@enum.unique +class MarketLocationIdType(enum.Enum): + """External market identifier types used for market locations.""" + + UNSPECIFIED = market_location_pb2.MARKET_LOCATION_ID_TYPE_UNSPECIFIED + """The market location ID type is unspecified.""" + + MALO_ID = market_location_pb2.MARKET_LOCATION_ID_TYPE_MALO_ID + """Germany Marktlokations-ID.""" + + ZAEHLPUNKT = market_location_pb2.MARKET_LOCATION_ID_TYPE_ZAEHLPUNKT + """Austria Zaehlpunktbezeichnung.""" + + MPAN = market_location_pb2.MARKET_LOCATION_ID_TYPE_MPAN + """United Kingdom Meter Point Administration Number.""" + + POD = market_location_pb2.MARKET_LOCATION_ID_TYPE_POD + """Italy Point of Delivery.""" + + CUPS = market_location_pb2.MARKET_LOCATION_ID_TYPE_CUPS + """Spain Codigo Universal de Punto de Suministro.""" + + PRM = market_location_pb2.MARKET_LOCATION_ID_TYPE_PRM + """France Point de Reference et Mesure.""" + + EAN = market_location_pb2.MARKET_LOCATION_ID_TYPE_EAN + """European Article Number.""" + + GSRN = market_location_pb2.MARKET_LOCATION_ID_TYPE_GSRN + """GS1 Global Service Relation Number.""" + + ESI_ID = market_location_pb2.MARKET_LOCATION_ID_TYPE_ESI_ID + """United States Electric Service Identifier.""" + + NMI = market_location_pb2.MARKET_LOCATION_ID_TYPE_NMI + """Australia National Metering Identifier.""" + + ICP = market_location_pb2.MARKET_LOCATION_ID_TYPE_ICP + """New Zealand Installation Control Point.""" + + SPN = market_location_pb2.MARKET_LOCATION_ID_TYPE_SPN + """Japan Supply Point Number.""" + + OTHER = market_location_pb2.MARKET_LOCATION_ID_TYPE_OTHER + """Generic identifier for markets not modeled explicitly.""" + + +@dataclass(frozen=True, kw_only=True) +class MarketLocationId: + """A market-standard identifier for a market location.""" + + value: str | None + """The official market location identifier value.""" + + type: MarketLocationIdType | int + """The type of official market identifier.""" + + +@dataclass(frozen=True, kw_only=True) +class MarketLocation: + """A market-facing metering point in a specific market area.""" + + market_area: int + """The market area in which this market location is registered.""" + + market_location_id: MarketLocationId | None + """The official market location identifier.""" diff --git a/src/frequenz/client/assets/_market_location_proto.py b/src/frequenz/client/assets/_market_location_proto.py new file mode 100644 index 0000000..68a17af --- /dev/null +++ b/src/frequenz/client/assets/_market_location_proto.py @@ -0,0 +1,40 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Conversion of MarketLocation objects to/from protobuf messages.""" + +from frequenz.api.common.v1alpha8.grid import market_location_pb2 +from frequenz.client.common.proto import enum_from_proto + +from ._market_location import MarketLocation, MarketLocationId, MarketLocationIdType + + +def market_location_id_from_proto( + message: market_location_pb2.MarketLocationId, +) -> MarketLocationId: + """Convert a protobuf market location ID message to a domain object.""" + return MarketLocationId( + value=message.id.value if message.HasField("id") else None, + type=enum_from_proto(message.type, MarketLocationIdType), + ) + + +def market_location_from_proto( + message: market_location_pb2.MarketLocation, +) -> MarketLocation: + """Convert a protobuf market location message to a domain object.""" + return MarketLocation( + market_area=message.market_area, + market_location_id=( + market_location_id_from_proto(message.market_location_id) + if message.HasField("market_location_id") + else None + ), + ) + + +def market_location_id_value_to_proto( + value: str, +) -> market_location_pb2.MarketLocationIdValue: + """Convert a market location ID value string to a protobuf message.""" + return market_location_pb2.MarketLocationIdValue(value=value) diff --git a/src/frequenz/client/assets/_market_topology.py b/src/frequenz/client/assets/_market_topology.py new file mode 100644 index 0000000..66fdc37 --- /dev/null +++ b/src/frequenz/client/assets/_market_topology.py @@ -0,0 +1,59 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Market topology relation definitions.""" + +import enum +from dataclasses import dataclass + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 +from frequenz.client.common.microgrid import MicrogridId + +from ._delivery_area import DeliveryArea +from ._interval import Interval +from ._market_location import MarketLocation + + +@enum.unique +class MarketParticipationType(enum.Enum): + """Market-related use cases for topology relations.""" + + UNSPECIFIED = platformassets_pb2.MARKET_PARTICIPATION_TYPE_UNSPECIFIED + """The market participation type is unspecified.""" + + ENERGY_TRADING = platformassets_pb2.MARKET_PARTICIPATION_TYPE_ENERGY_TRADING + """Energy trading, supply, balancing, or settlement participation.""" + + FLEX_MARKETS = platformassets_pb2.MARKET_PARTICIPATION_TYPE_FLEX_MARKETS + """Flex-market or ancillary-service participation.""" + + +@dataclass(frozen=True, kw_only=True) +class MarketParticipation: + """A relation's participation in a specific market use case.""" + + type: MarketParticipationType | int + """The use case for which this relation participates.""" + + validity_period: Interval | None + """Configured validity interval for this participation.""" + + +@dataclass(frozen=True, kw_only=True) +class MarketTopologyRelation: + """A relation between a gridpool, microgrid, and market location.""" + + microgrid_id: MicrogridId | None + """The microgrid associated with this relation.""" + + market_location: MarketLocation | None + """The market location associated with this relation.""" + + gridpool_id: int | None + """The gridpool associated with this relation.""" + + delivery_area: DeliveryArea | None + """Delivery area in which this relation applies.""" + + participations: list[MarketParticipation] + """Use-case-specific participations for this relation.""" diff --git a/src/frequenz/client/assets/_market_topology_proto.py b/src/frequenz/client/assets/_market_topology_proto.py new file mode 100644 index 0000000..28ab32e --- /dev/null +++ b/src/frequenz/client/assets/_market_topology_proto.py @@ -0,0 +1,59 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Conversion of MarketTopologyRelation objects from protobuf messages.""" + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.common.proto import enum_from_proto + +from ._delivery_area_proto import delivery_area_from_proto +from ._interval_proto import interval_from_proto +from ._market_location_proto import market_location_from_proto +from ._market_topology import ( + MarketParticipation, + MarketParticipationType, + MarketTopologyRelation, +) + + +def market_participation_from_proto( + message: platformassets_pb2.MarketParticipation, +) -> MarketParticipation: + """Convert a protobuf market participation message to a domain object.""" + return MarketParticipation( + type=enum_from_proto(message.type, MarketParticipationType), + validity_period=( + interval_from_proto(message.validity_period) + if message.HasField("validity_period") + else None + ), + ) + + +def market_topology_relation_from_proto( + message: platformassets_pb2.MarketTopologyRelation, +) -> MarketTopologyRelation: + """Convert a protobuf market topology relation message to a domain object.""" + return MarketTopologyRelation( + microgrid_id=( + MicrogridId(message.microgrid_id) + if message.HasField("microgrid_id") + else None + ), + market_location=( + market_location_from_proto(message.market_location) + if message.HasField("market_location") + else None + ), + gridpool_id=message.gridpool_id if message.HasField("gridpool_id") else None, + delivery_area=( + delivery_area_from_proto(message.delivery_area) + if message.HasField("delivery_area") + else None + ), + participations=[ + market_participation_from_proto(participation) + for participation in message.participations + ], + ) diff --git a/src/frequenz/client/assets/_microgrid_proto.py b/src/frequenz/client/assets/_microgrid_proto.py index cf88a4f..c791106 100644 --- a/src/frequenz/client/assets/_microgrid_proto.py +++ b/src/frequenz/client/assets/_microgrid_proto.py @@ -7,8 +7,8 @@ from frequenz.api.common.v1alpha8.microgrid import microgrid_pb2 from frequenz.client.base import conversion -from frequenz.client.common import enum_proto from frequenz.client.common.microgrid import EnterpriseId, MicrogridId +from frequenz.client.common.proto import enum_from_proto from ._delivery_area import DeliveryArea from ._delivery_area_proto import delivery_area_from_proto @@ -87,7 +87,7 @@ def microgrid_from_proto_with_issues( if name is None: minor_issues.append("name is empty") - status = enum_proto.enum_from_proto(message.status, MicrogridStatus) + status = enum_from_proto(message.status, MicrogridStatus) if status is MicrogridStatus.UNSPECIFIED: major_issues.append("status is unspecified") elif isinstance(status, int): diff --git a/src/frequenz/client/assets/_sensor.py b/src/frequenz/client/assets/_sensor.py new file mode 100644 index 0000000..c2436ec --- /dev/null +++ b/src/frequenz/client/assets/_sensor.py @@ -0,0 +1,31 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Microgrid sensor definitions.""" + +from dataclasses import dataclass + +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.common.microgrid.sensors import SensorId + +from ._lifetime import Lifetime + + +@dataclass(frozen=True, kw_only=True) +class Sensor: + """A sensor that measures a physical metric in a microgrid environment.""" + + id: SensorId + """The unique identifier of the sensor.""" + + microgrid_id: MicrogridId + """The unique identifier of the parent microgrid.""" + + name: str | None + """The human-readable sensor name.""" + + model: str | None + """The sensor model name.""" + + operational_lifetime: Lifetime | None + """The operational lifetime of the sensor.""" diff --git a/src/frequenz/client/assets/_sensor_proto.py b/src/frequenz/client/assets/_sensor_proto.py new file mode 100644 index 0000000..61d740f --- /dev/null +++ b/src/frequenz/client/assets/_sensor_proto.py @@ -0,0 +1,26 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Conversion of Sensor objects from protobuf messages.""" + +from frequenz.api.common.v1alpha8.microgrid.sensors import sensors_pb2 +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.common.microgrid.sensors import SensorId + +from ._lifetime_proto import lifetime_from_proto +from ._sensor import Sensor + + +def sensor_from_proto(message: sensors_pb2.Sensor) -> Sensor: + """Convert a protobuf sensor message to a sensor object.""" + return Sensor( + id=SensorId(message.id), + microgrid_id=MicrogridId(message.microgrid_id), + name=message.name or None, + model=message.model or None, + operational_lifetime=( + lifetime_from_proto(message.operational_lifetime) + if message.HasField("operational_lifetime") + else None + ), + ) diff --git a/src/frequenz/client/assets/electrical_component/_electrical_component_proto.py b/src/frequenz/client/assets/electrical_component/_electrical_component_proto.py index b3a07e5..7d9d4a3 100644 --- a/src/frequenz/client/assets/electrical_component/_electrical_component_proto.py +++ b/src/frequenz/client/assets/electrical_component/_electrical_component_proto.py @@ -10,9 +10,9 @@ from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( electrical_components_pb2, ) -from frequenz.client.common import enum_proto from frequenz.client.common.microgrid import MicrogridId from frequenz.client.common.microgrid.electrical_components import ElectricalComponentId +from frequenz.client.common.proto import enum_from_proto from .._lifetime import Lifetime from .._lifetime_proto import lifetime_from_proto @@ -155,7 +155,7 @@ def component_base_from_proto_with_issues( minor_issues=minor_issues, ) - category = enum_proto.enum_from_proto(message.category, ElectricalComponentCategory) + category = enum_from_proto(message.category, ElectricalComponentCategory) if category is ElectricalComponentCategory.UNSPECIFIED: major_issues.append("category is unspecified") elif isinstance(category, int): @@ -243,7 +243,7 @@ def electrical_component_from_proto_with_issues( BatteryType.LI_ION: LiIonBattery, BatteryType.NA_ION: NaIonBattery, } - battery_type = enum_proto.enum_from_proto( + battery_type = enum_from_proto( message.category_specific_info.battery.type, BatteryType ) match battery_type: @@ -285,7 +285,7 @@ def electrical_component_from_proto_with_issues( EvChargerType.DC: DcEvCharger, EvChargerType.HYBRID: HybridEvCharger, } - ev_charger_type = enum_proto.enum_from_proto( + ev_charger_type = enum_from_proto( message.category_specific_info.ev_charger.type, EvChargerType ) match ev_charger_type: @@ -337,7 +337,7 @@ def electrical_component_from_proto_with_issues( InverterType.SOLAR: SolarInverter, InverterType.HYBRID: HybridInverter, } - inverter_type = enum_proto.enum_from_proto( + inverter_type = enum_from_proto( message.category_specific_info.inverter.type, InverterType ) match inverter_type: @@ -462,7 +462,7 @@ def _metric_config_bounds_from_proto( """ bounds: dict[Metric | int, Bounds] = {} for metric_bound in message: - metric = enum_proto.enum_from_proto(metric_bound.metric, Metric) + metric = enum_from_proto(metric_bound.metric, Metric) match metric: case Metric.UNSPECIFIED: major_issues.append("metric_config_bounds has an UNSPECIFIED metric") diff --git a/tests/client_test_cases/list_gridpool_energy_schedules/empty_case.py b/tests/client_test_cases/list_gridpool_energy_schedules/empty_case.py new file mode 100644 index 0000000..68f329c --- /dev/null +++ b/tests/client_test_cases/list_gridpool_energy_schedules/empty_case.py @@ -0,0 +1,26 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Empty case for gridpool energy schedule listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListGridpoolEnergySchedulesRequest(gridpool_id=7), + timeout=60.0, + ) + + +client_args = (7,) +grpc_response = assets_pb2.ListGridpoolEnergySchedulesResponse( + gridpool_id=7, schedules=[] +) + + +def assert_client_result(result: Any) -> None: # noqa: D103 + assert not result diff --git a/tests/client_test_cases/list_gridpool_energy_schedules/error_case.py b/tests/client_test_cases/list_gridpool_energy_schedules/error_case.py new file mode 100644 index 0000000..764daea --- /dev/null +++ b/tests/client_test_cases/list_gridpool_energy_schedules/error_case.py @@ -0,0 +1,30 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Error case for gridpool energy schedule listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from grpc import StatusCode + +from frequenz.client.assets.exceptions import PermissionDenied +from tests.util import make_grpc_error + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListGridpoolEnergySchedulesRequest(gridpool_id=7), + timeout=60.0, + ) + + +client_args = (7,) +grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED) + + +def assert_client_exception(exception: Exception) -> None: + """Assert that the client exception matches the expected error.""" + assert isinstance(exception, PermissionDenied) + assert exception.grpc_error == grpc_response diff --git a/tests/client_test_cases/list_gridpool_energy_schedules/success_case.py b/tests/client_test_cases/list_gridpool_energy_schedules/success_case.py new file mode 100644 index 0000000..c00795e --- /dev/null +++ b/tests/client_test_cases/list_gridpool_energy_schedules/success_case.py @@ -0,0 +1,132 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful gridpool energy schedule listing.""" + +from datetime import datetime, timezone +from typing import Any + +from frequenz.api.common.v1alpha8.grid import ( + balancing_group_pb2, + delivery_area_pb2, + delivery_duration_pb2, +) +from frequenz.api.common.v1alpha8.types import interval_pb2 +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from frequenz.client.base.conversion import to_timestamp + +from frequenz.client.assets import ( + BalancingGroup, + DeliveryArea, + DeliveryDuration, + EnergyMarketCodeType, + GridpoolEnergySchedule, + GridpoolEnergyScheduleDirection, + GridpoolEnergyScheduleTimeSeriesEntry, + Interval, +) + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListGridpoolEnergySchedulesRequest( + gridpool_id=7, + filter=assets_pb2.ListGridpoolEnergySchedulesRequest.GridpoolEnergySchedulesFilter( + schedule_ids=[42], + directions=[ + assets_pb2.GRIDPOOL_ENERGY_SCHEDULE_DIRECTION_IMPORT, + ], + time_series_interval=interval_pb2.Interval( + start_time=to_timestamp(filter_start), + end_time=to_timestamp(filter_end), + ), + effective_validity_period=interval_pb2.Interval( + start_time=to_timestamp(validity_start), + end_time=to_timestamp(validity_end), + ), + ), + ), + timeout=60.0, + ) + + +filter_start = datetime(2026, 1, 1, tzinfo=timezone.utc) +filter_end = datetime(2026, 1, 2, tzinfo=timezone.utc) +validity_start = datetime(2025, 12, 1, tzinfo=timezone.utc) +validity_end = datetime(2026, 2, 1, tzinfo=timezone.utc) +cancel_time = datetime(2026, 1, 15, tzinfo=timezone.utc) +entry_start = datetime(2026, 1, 1, 0, 15, tzinfo=timezone.utc) + +client_args = (7,) +client_kwargs = { + "schedule_ids": [42], + "directions": [GridpoolEnergyScheduleDirection.IMPORT], + "time_series_interval": Interval(start=filter_start, end=filter_end), + "effective_validity_period": Interval(start=validity_start, end=validity_end), +} +grpc_response = assets_pb2.ListGridpoolEnergySchedulesResponse( + gridpool_id=7, + schedules=[ + assets_pb2.GridpoolEnergySchedule( + gridpool_id=7, + schedule_id=42, + name="OTC baseline", + counterparty_balancing_group=balancing_group_pb2.BalancingGroup( + code="11XCOUNTER", + code_type=delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC, + ), + counterparty_delivery_area=delivery_area_pb2.DeliveryArea( + code="10YDE-1", + code_type=delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC, + ), + frequenz_delivery_area=delivery_area_pb2.DeliveryArea( + code="10YDE-2", + code_type=delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC, + ), + direction=assets_pb2.GRIDPOOL_ENERGY_SCHEDULE_DIRECTION_IMPORT, + validity_period=interval_pb2.Interval( + start_time=to_timestamp(validity_start), + end_time=to_timestamp(validity_end), + ), + cancel_time=to_timestamp(cancel_time), + delivery_duration=delivery_duration_pb2.DELIVERY_DURATION_15, + time_series=[ + assets_pb2.GridpoolEnergyScheduleTimeSeriesEntry( + start_time=to_timestamp(entry_start), + active_power_w=1234.5, + ) + ], + ) + ], +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected schedule list.""" + assert result == [ + GridpoolEnergySchedule( + gridpool_id=7, + schedule_id=42, + name="OTC baseline", + counterparty_balancing_group=BalancingGroup( + code="11XCOUNTER", code_type=EnergyMarketCodeType.EUROPE_EIC + ), + counterparty_delivery_area=DeliveryArea( + code="10YDE-1", code_type=EnergyMarketCodeType.EUROPE_EIC + ), + frequenz_delivery_area=DeliveryArea( + code="10YDE-2", code_type=EnergyMarketCodeType.EUROPE_EIC + ), + direction=GridpoolEnergyScheduleDirection.IMPORT, + validity_period=Interval(start=validity_start, end=validity_end), + cancel_time=cancel_time, + delivery_duration=DeliveryDuration.MINUTES_15, + time_series=[ + GridpoolEnergyScheduleTimeSeriesEntry( + start_time=entry_start, + active_power_w=1234.5, + ) + ], + ) + ] diff --git a/tests/client_test_cases/list_gridpools/empty_case.py b/tests/client_test_cases/list_gridpools/empty_case.py new file mode 100644 index 0000000..6d3cf6f --- /dev/null +++ b/tests/client_test_cases/list_gridpools/empty_case.py @@ -0,0 +1,23 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Empty case for gridpool listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListGridpoolsRequest(), + timeout=60.0, + ) + + +grpc_response = assets_pb2.ListGridpoolsResponse(gridpools=[]) + + +def assert_client_result(result: Any) -> None: # noqa: D103 + assert not result diff --git a/tests/client_test_cases/list_gridpools/error_case.py b/tests/client_test_cases/list_gridpools/error_case.py new file mode 100644 index 0000000..52ff029 --- /dev/null +++ b/tests/client_test_cases/list_gridpools/error_case.py @@ -0,0 +1,29 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Error case for gridpool listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from grpc import StatusCode + +from frequenz.client.assets.exceptions import PermissionDenied +from tests.util import make_grpc_error + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListGridpoolsRequest(), + timeout=60.0, + ) + + +grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED) + + +def assert_client_exception(exception: Exception) -> None: + """Assert that the client exception matches the expected error.""" + assert isinstance(exception, PermissionDenied) + assert exception.grpc_error == grpc_response diff --git a/tests/client_test_cases/list_gridpools/success_case.py b/tests/client_test_cases/list_gridpools/success_case.py new file mode 100644 index 0000000..637f136 --- /dev/null +++ b/tests/client_test_cases/list_gridpools/success_case.py @@ -0,0 +1,38 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful gridpool listing.""" + +from typing import Any + +from frequenz.api.common.v1alpha8.gridpool import gridpool_pb2 +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 + +from frequenz.client.assets import Gridpool + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListGridpoolsRequest( + filter=assets_pb2.ListGridpoolsRequest.GridpoolsFilter(gridpool_ids=[7, 8]) + ), + timeout=60.0, + ) + + +client_kwargs = {"gridpool_ids": [7, 8]} +grpc_response = assets_pb2.ListGridpoolsResponse( + gridpools=[ + gridpool_pb2.Gridpool(id=7, name="North"), + gridpool_pb2.Gridpool(id=8), + ] +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected gridpool list.""" + assert result == [ + Gridpool(id=7, name="North"), + Gridpool(id=8, name=None), + ] diff --git a/tests/client_test_cases/list_market_topology_relations/empty_case.py b/tests/client_test_cases/list_market_topology_relations/empty_case.py new file mode 100644 index 0000000..32af1d7 --- /dev/null +++ b/tests/client_test_cases/list_market_topology_relations/empty_case.py @@ -0,0 +1,23 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Empty case for market topology relation listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMarketTopologyRelationsRequest(), + timeout=60.0, + ) + + +grpc_response = assets_pb2.ListMarketTopologyRelationsResponse(relations=[]) + + +def assert_client_result(result: Any) -> None: # noqa: D103 + assert not result diff --git a/tests/client_test_cases/list_market_topology_relations/error_case.py b/tests/client_test_cases/list_market_topology_relations/error_case.py new file mode 100644 index 0000000..4052a98 --- /dev/null +++ b/tests/client_test_cases/list_market_topology_relations/error_case.py @@ -0,0 +1,29 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Error case for market topology relation listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from grpc import StatusCode + +from frequenz.client.assets.exceptions import PermissionDenied +from tests.util import make_grpc_error + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMarketTopologyRelationsRequest(), + timeout=60.0, + ) + + +grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED) + + +def assert_client_exception(exception: Exception) -> None: + """Assert that the client exception matches the expected error.""" + assert isinstance(exception, PermissionDenied) + assert exception.grpc_error == grpc_response diff --git a/tests/client_test_cases/list_market_topology_relations/success_case.py b/tests/client_test_cases/list_market_topology_relations/success_case.py new file mode 100644 index 0000000..86b78c2 --- /dev/null +++ b/tests/client_test_cases/list_market_topology_relations/success_case.py @@ -0,0 +1,112 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful market topology relation listing.""" + +from datetime import datetime, timezone +from typing import Any + +from frequenz.api.common.v1alpha8.grid import delivery_area_pb2, market_location_pb2 +from frequenz.api.common.v1alpha8.market import market_area_pb2 +from frequenz.api.common.v1alpha8.types import interval_pb2 +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from frequenz.client.base.conversion import to_timestamp +from frequenz.client.common.microgrid import MicrogridId + +from frequenz.client.assets import ( + DeliveryArea, + EnergyMarketCodeType, + Interval, + MarketLocation, + MarketLocationId, + MarketLocationIdType, + MarketParticipation, + MarketParticipationType, + MarketTopologyRelation, +) + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMarketTopologyRelationsRequest( + filter=assets_pb2.ListMarketTopologyRelationsRequest.MarketTopologyRelationsFilter( + gridpool_ids=[7], + microgrid_ids=[1234], + market_location_id_values=[ + market_location_pb2.MarketLocationIdValue(value="DE001") + ], + delivery_areas=[ + delivery_area_pb2.DeliveryArea( + code="10YDE-1", + code_type=delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC, + ) + ], + participation_types=[ + assets_pb2.MARKET_PARTICIPATION_TYPE_ENERGY_TRADING + ], + ) + ), + timeout=60.0, + ) + + +validity_start = datetime(2026, 1, 1, tzinfo=timezone.utc) +delivery_area = DeliveryArea(code="10YDE-1", code_type=EnergyMarketCodeType.EUROPE_EIC) +client_kwargs = { + "gridpool_ids": [7], + "microgrid_ids": [MicrogridId(1234)], + "market_location_id_values": ["DE001"], + "delivery_areas": [delivery_area], + "participation_types": [MarketParticipationType.ENERGY_TRADING], +} +grpc_response = assets_pb2.ListMarketTopologyRelationsResponse( + relations=[ + assets_pb2.MarketTopologyRelation( + microgrid_id=1234, + market_location=market_location_pb2.MarketLocation( + market_area=market_area_pb2.MarketArea.ValueType(1), + market_location_id=market_location_pb2.MarketLocationId( + id=market_location_pb2.MarketLocationIdValue(value="DE001"), + type=market_location_pb2.MARKET_LOCATION_ID_TYPE_MALO_ID, + ), + ), + gridpool_id=7, + delivery_area=delivery_area_pb2.DeliveryArea( + code="10YDE-1", + code_type=delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC, + ), + participations=[ + assets_pb2.MarketParticipation( + type=assets_pb2.MARKET_PARTICIPATION_TYPE_ENERGY_TRADING, + validity_period=interval_pb2.Interval( + start_time=to_timestamp(validity_start) + ), + ) + ], + ) + ], +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected relation list.""" + assert result == [ + MarketTopologyRelation( + microgrid_id=MicrogridId(1234), + market_location=MarketLocation( + market_area=1, + market_location_id=MarketLocationId( + value="DE001", type=MarketLocationIdType.MALO_ID + ), + ), + gridpool_id=7, + delivery_area=delivery_area, + participations=[ + MarketParticipation( + type=MarketParticipationType.ENERGY_TRADING, + validity_period=Interval(start=validity_start), + ) + ], + ) + ] diff --git a/tests/client_test_cases/list_microgrid_electrical_component_connections/filtered_case.py b/tests/client_test_cases/list_microgrid_electrical_component_connections/filtered_case.py new file mode 100644 index 0000000..a8f91bd --- /dev/null +++ b/tests/client_test_cases/list_microgrid_electrical_component_connections/filtered_case.py @@ -0,0 +1,55 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test data for filtered connection listing.""" + +from typing import Any + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from frequenz.client.common.microgrid.electrical_components import ElectricalComponentId + +from frequenz.client.assets.electrical_component import ComponentConnection + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + request_type = assets_pb2.ListMicrogridElectricalComponentConnectionsRequest + filter_type = request_type.MicrogridElectricalComponentConnectionsFilter + stub_method.assert_called_once_with( + request_type( + microgrid_id=1234, + filter=filter_type( + source_component_ids=[1], + destination_component_ids=[2], + ), + ), + timeout=60.0, + ) + + +client_args = (1234,) +client_kwargs = { + "source_component_ids": [ElectricalComponentId(1)], + "destination_component_ids": [ElectricalComponentId(2)], +} +grpc_response = assets_pb2.ListMicrogridElectricalComponentConnectionsResponse( + connections=[ + electrical_components_pb2.ElectricalComponentConnection( + source_electrical_component_id=1, + destination_electrical_component_id=2, + ) + ] +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected connections list.""" + assert list(result) == [ + ComponentConnection( + source=ElectricalComponentId(1), + destination=ElectricalComponentId(2), + ) + ] diff --git a/tests/client_test_cases/list_microgrid_electrical_components/filtered_case.py b/tests/client_test_cases/list_microgrid_electrical_components/filtered_case.py new file mode 100644 index 0000000..7ecd0c1 --- /dev/null +++ b/tests/client_test_cases/list_microgrid_electrical_components/filtered_case.py @@ -0,0 +1,66 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test data for filtered electrical component listing.""" + +from typing import Any + +from frequenz.api.common.v1alpha8.microgrid.electrical_components import ( + electrical_components_pb2, +) +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.common.microgrid.electrical_components import ElectricalComponentId + +from frequenz.client.assets.electrical_component import ( + ElectricalComponentCategory, + SteamBoiler, +) + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + request_type = assets_pb2.ListMicrogridElectricalComponentsRequest + filter_type = request_type.MicrogridElectricalComponentsFilter + stub_method.assert_called_once_with( + request_type( + microgrid_id=1234, + filter=filter_type( + component_ids=[1], + categories=[ + electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_STEAM_BOILER + ], + ), + ), + timeout=60.0, + ) + + +client_args = (1234,) +client_kwargs = { + "component_ids": [ElectricalComponentId(1)], + "categories": [ElectricalComponentCategory.STEAM_BOILER], +} +grpc_response = assets_pb2.ListMicrogridElectricalComponentsResponse( + components=[ + electrical_components_pb2.ElectricalComponent( + id=1, + microgrid_id=1234, + category=electrical_components_pb2.ELECTRICAL_COMPONENT_CATEGORY_STEAM_BOILER, + name="Steam Boiler", + ) + ] +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected component list.""" + assert list(result) == [ + SteamBoiler( + id=ElectricalComponentId(1), + microgrid_id=MicrogridId(1234), + name="Steam Boiler", + manufacturer=None, + model_name=None, + ) + ] diff --git a/tests/client_test_cases/list_microgrid_sensors/empty_case.py b/tests/client_test_cases/list_microgrid_sensors/empty_case.py new file mode 100644 index 0000000..55d771b --- /dev/null +++ b/tests/client_test_cases/list_microgrid_sensors/empty_case.py @@ -0,0 +1,24 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Empty case for sensor listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridSensorsRequest(microgrid_id=1234), + timeout=60.0, + ) + + +client_args = (1234,) +grpc_response = assets_pb2.ListMicrogridSensorsResponse(microgrid_id=1234, sensors=[]) + + +def assert_client_result(result: Any) -> None: # noqa: D103 + assert not result diff --git a/tests/client_test_cases/list_microgrid_sensors/error_case.py b/tests/client_test_cases/list_microgrid_sensors/error_case.py new file mode 100644 index 0000000..bff557a --- /dev/null +++ b/tests/client_test_cases/list_microgrid_sensors/error_case.py @@ -0,0 +1,30 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Error case for sensor listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from grpc import StatusCode + +from frequenz.client.assets.exceptions import PermissionDenied +from tests.util import make_grpc_error + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridSensorsRequest(microgrid_id=1234), + timeout=60.0, + ) + + +client_args = (1234,) +grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED) + + +def assert_client_exception(exception: Exception) -> None: + """Assert that the client exception matches the expected error.""" + assert isinstance(exception, PermissionDenied) + assert exception.grpc_error == grpc_response diff --git a/tests/client_test_cases/list_microgrid_sensors/success_case.py b/tests/client_test_cases/list_microgrid_sensors/success_case.py new file mode 100644 index 0000000..7efb9ae --- /dev/null +++ b/tests/client_test_cases/list_microgrid_sensors/success_case.py @@ -0,0 +1,61 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful sensor listing.""" + +from datetime import datetime, timezone +from typing import Any + +from frequenz.api.common.v1alpha8.microgrid import lifetime_pb2 +from frequenz.api.common.v1alpha8.microgrid.sensors import sensors_pb2 +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from frequenz.client.base.conversion import to_timestamp +from frequenz.client.common.microgrid import MicrogridId +from frequenz.client.common.microgrid.sensors import SensorId + +from frequenz.client.assets import Lifetime, Sensor + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridSensorsRequest( + microgrid_id=1234, + filter=assets_pb2.ListMicrogridSensorsRequest.MicrogridSensorsFilter( + sensor_ids=[5] + ), + ), + timeout=60.0, + ) + + +lifetime_start = datetime(2024, 1, 1, tzinfo=timezone.utc) +client_args = (MicrogridId(1234),) +client_kwargs = {"sensor_ids": [SensorId(5)]} +grpc_response = assets_pb2.ListMicrogridSensorsResponse( + microgrid_id=1234, + sensors=[ + sensors_pb2.Sensor( + id=5, + microgrid_id=1234, + name="Weather station", + model="Vaisala WXT", + operational_lifetime=lifetime_pb2.Lifetime( + start_timestamp=to_timestamp(lifetime_start) + ), + ) + ], +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected sensor list.""" + assert result == [ + Sensor( + id=SensorId(5), + microgrid_id=MicrogridId(1234), + name="Weather station", + model="Vaisala WXT", + operational_lifetime=Lifetime(start=lifetime_start), + ) + ] diff --git a/tests/client_test_cases/list_microgrids/empty_case.py b/tests/client_test_cases/list_microgrids/empty_case.py new file mode 100644 index 0000000..ff7fa8a --- /dev/null +++ b/tests/client_test_cases/list_microgrids/empty_case.py @@ -0,0 +1,23 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Empty case for microgrid listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridsRequest(), + timeout=60.0, + ) + + +grpc_response = assets_pb2.ListMicrogridsResponse(microgrids=[]) + + +def assert_client_result(result: Any) -> None: # noqa: D103 + assert not result diff --git a/tests/client_test_cases/list_microgrids/error_case.py b/tests/client_test_cases/list_microgrids/error_case.py new file mode 100644 index 0000000..52f6af0 --- /dev/null +++ b/tests/client_test_cases/list_microgrids/error_case.py @@ -0,0 +1,29 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Error case for microgrid listing.""" + +from typing import Any + +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from grpc import StatusCode + +from frequenz.client.assets.exceptions import PermissionDenied +from tests.util import make_grpc_error + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridsRequest(), + timeout=60.0, + ) + + +grpc_response = make_grpc_error(StatusCode.PERMISSION_DENIED) + + +def assert_client_exception(exception: Exception) -> None: + """Assert that the client exception matches the expected error.""" + assert isinstance(exception, PermissionDenied) + assert exception.grpc_error == grpc_response diff --git a/tests/client_test_cases/list_microgrids/success_case.py b/tests/client_test_cases/list_microgrids/success_case.py new file mode 100644 index 0000000..fbd5b6c --- /dev/null +++ b/tests/client_test_cases/list_microgrids/success_case.py @@ -0,0 +1,78 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Test data for successful microgrid listing.""" + +from datetime import datetime, timezone +from typing import Any + +import pytest +from frequenz.api.common.v1alpha8.grid import delivery_area_pb2 +from frequenz.api.common.v1alpha8.microgrid import microgrid_pb2 +from frequenz.api.common.v1alpha8.types import location_pb2 +from frequenz.api.platformassets.v1alpha1 import platformassets_pb2 as assets_pb2 +from frequenz.client.base.conversion import to_timestamp +from frequenz.client.common.microgrid import EnterpriseId, MicrogridId + +from frequenz.client.assets import ( + DeliveryArea, + EnergyMarketCodeType, + Location, + Microgrid, + MicrogridStatus, +) + + +def assert_stub_method_call(stub_method: Any) -> None: + """Assert that the gRPC request matches the expected request.""" + stub_method.assert_called_once_with( + assets_pb2.ListMicrogridsRequest( + filter=assets_pb2.ListMicrogridsRequest.MicrogridsFilter( + microgrid_ids=[1234], gridpool_ids=[7] + ) + ), + timeout=60.0, + ) + + +create_timestamp = datetime(2023, 1, 1, tzinfo=timezone.utc) +client_kwargs = {"microgrid_ids": [MicrogridId(1234)], "gridpool_ids": [7]} +grpc_response = assets_pb2.ListMicrogridsResponse( + microgrids=[ + microgrid_pb2.Microgrid( + id=1234, + enterprise_id=5678, + name="Test Microgrid", + delivery_area=delivery_area_pb2.DeliveryArea( + code="Test Delivery Area", + code_type=delivery_area_pb2.ENERGY_MARKET_CODE_TYPE_EUROPE_EIC, + ), + location=location_pb2.Location( + latitude=37.7749, longitude=-122.4194, country_code="DE" + ), + status=microgrid_pb2.MICROGRID_STATUS_ACTIVE, + create_timestamp=to_timestamp(create_timestamp), + ) + ] +) + + +def assert_client_result(result: Any) -> None: + """Assert that the client result matches the expected Microgrid list.""" + assert result == [ + Microgrid( + id=MicrogridId(1234), + enterprise_id=EnterpriseId(5678), + name="Test Microgrid", + delivery_area=DeliveryArea( + code="Test Delivery Area", code_type=EnergyMarketCodeType.EUROPE_EIC + ), + location=Location( + latitude=pytest.approx(37.7749), # type: ignore[arg-type] + longitude=pytest.approx(-122.4194), # type: ignore[arg-type] + country_code="DE", + ), + status=MicrogridStatus.ACTIVE, + create_timestamp=create_timestamp, + ) + ] diff --git a/tests/test_client.py b/tests/test_client.py index 48fa293..5ee7e98 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -41,6 +41,58 @@ async def test_get_microgrid( await spec.test_unary_unary_call(client, "GetMicrogrid") +@pytest.mark.asyncio +@pytest.mark.parametrize( + "spec", + get_test_specs("list_gridpools", tests_dir=TESTS_DIR), + ids=str, +) +async def test_list_gridpools( + client: AssetsApiClient, spec: ApiClientTestCaseSpec +) -> None: + """Test list_gridpools method.""" + await spec.test_unary_unary_call(client, "ListGridpools") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "spec", + get_test_specs("list_gridpool_energy_schedules", tests_dir=TESTS_DIR), + ids=str, +) +async def test_list_gridpool_energy_schedules( + client: AssetsApiClient, spec: ApiClientTestCaseSpec +) -> None: + """Test list_gridpool_energy_schedules method.""" + await spec.test_unary_unary_call(client, "ListGridpoolEnergySchedules") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "spec", + get_test_specs("list_market_topology_relations", tests_dir=TESTS_DIR), + ids=str, +) +async def test_list_market_topology_relations( + client: AssetsApiClient, spec: ApiClientTestCaseSpec +) -> None: + """Test list_market_topology_relations method.""" + await spec.test_unary_unary_call(client, "ListMarketTopologyRelations") + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "spec", + get_test_specs("list_microgrids", tests_dir=TESTS_DIR), + ids=str, +) +async def test_list_microgrids( + client: AssetsApiClient, spec: ApiClientTestCaseSpec +) -> None: + """Test list_microgrids method.""" + await spec.test_unary_unary_call(client, "ListMicrogrids") + + @pytest.mark.asyncio @pytest.mark.parametrize( "spec", @@ -69,3 +121,16 @@ async def test_list_connections( await spec.test_unary_unary_call( client, "ListMicrogridElectricalComponentConnections" ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "spec", + get_test_specs("list_microgrid_sensors", tests_dir=TESTS_DIR), + ids=str, +) +async def test_list_microgrid_sensors( + client: AssetsApiClient, spec: ApiClientTestCaseSpec +) -> None: + """Test list_microgrid_sensors method.""" + await spec.test_unary_unary_call(client, "ListMicrogridSensors")