From d981a65274e1cbe581684bb5e70275eaec308ce8 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Tue, 7 Apr 2026 19:00:57 +0200 Subject: [PATCH 1/7] refactored device classes --- src/science_mode_4/device.py | 60 ++------------------------------ src/science_mode_4/device_i24.py | 28 +++++++++++++-- src/science_mode_4/device_p24.py | 31 +++++++++++++++-- 3 files changed, 58 insertions(+), 61 deletions(-) diff --git a/src/science_mode_4/device.py b/src/science_mode_4/device.py index 1ad9cd2..2b5fbe7 100644 --- a/src/science_mode_4/device.py +++ b/src/science_mode_4/device.py @@ -1,17 +1,11 @@ """Provides device class representing a science mode device""" from enum import IntEnum -from typing import Type from .layer import Layer -from .protocol.types import StimStatus from .protocol.packet_factory import PacketFactory from .protocol.packet_number_generator import PacketNumberGenerator from .general.general_layer import LayerGeneral -from .low_level.low_level_layer import LayerLowLevel -from .mid_level.mid_level_layer import LayerMidLevel -from .dyscom.dyscom_layer import LayerDyscom -from .dyscom.dyscom_types import DyscomGetOperationModeType from .utils.connection import Connection from .utils.packet_buffer import PacketBuffer @@ -33,13 +27,10 @@ def __init__(self, conn: Connection, capabilities: set[DeviceCapability]): self._packet_factory = PacketFactory() self._packet_buffer = PacketBuffer(self._connection, self._packet_factory) self._packet_number_generator = PacketNumberGenerator() - self._capabilities = capabilities + self._capabilities = capabilities + [DeviceCapability.GENERAL] self._layer: dict[DeviceCapability, Layer] = {} - self._add_layer(DeviceCapability.GENERAL, capabilities, LayerGeneral) - self._add_layer(DeviceCapability.LOW_LEVEL, capabilities, LayerLowLevel) - self._add_layer(DeviceCapability.MID_LEVEL, capabilities, LayerMidLevel) - self._add_layer(DeviceCapability.DYSCOM, capabilities, LayerDyscom) + self._layer_general = LayerGeneral(self._packet_buffer, self._packet_factory, self._packet_number_generator) @property @@ -74,54 +65,9 @@ def capabilities(self) -> set[DeviceCapability]: async def initialize(self): """Initialize device to get basic information (serial, versions) and stop any active stimulation/measurement""" - if {DeviceCapability.LOW_LEVEL, DeviceCapability.MID_LEVEL}.issubset(self._capabilities): - # get stim status to see if low/mid level is initialized or running - stim_status = await self.get_layer_general().get_stim_status() - if stim_status.stim_status == StimStatus.LOW_LEVEL_INITIALIZED: - await self.get_layer_low_level().stop() - elif stim_status.stim_status in [StimStatus.MID_LEVEL_INITIALIZED, StimStatus.MID_LEVEL_RUNNING]: - await self.get_layer_mid_level().stop() - if DeviceCapability.DYSCOM in self._capabilities: - # get operation mode to see if dyscom measurement is running - operation_mode = await self.get_layer_dyscom().get_operation_mode() - if operation_mode in [DyscomGetOperationModeType.UNDEFINED, - DyscomGetOperationModeType.LIVE_MEASURING_PRE, - DyscomGetOperationModeType.LIVE_MEASURING, - DyscomGetOperationModeType.RECORD_PRE, - DyscomGetOperationModeType.RECORD, - DyscomGetOperationModeType.DATATRANSFER_PRE, - DyscomGetOperationModeType.DATATRANSFER]: - await self.get_layer_dyscom().stop() - await self.get_layer_general().initialize() def get_layer_general(self) -> LayerGeneral: """Helper function to access general layer""" - return self._layer[DeviceCapability.GENERAL] - - - def get_layer_mid_level(self) -> LayerMidLevel: - """Helper function to access mid level layer""" - return self._layer[DeviceCapability.MID_LEVEL] - - - def get_layer_low_level(self) -> LayerLowLevel: - """Helper function to access low level layer""" - return self._layer[DeviceCapability.LOW_LEVEL] - - - def get_layer_dyscom(self) -> LayerDyscom: - """Helper function to access dyscom layer""" - return self._layer[DeviceCapability.DYSCOM] - - - def add_layer(self, capability: DeviceCapability, layer: Layer): - """Add layer""" - self._layer[capability] = layer - - - def _add_layer(self, capability: DeviceCapability, used_capabilities: set[DeviceCapability], layer_class: Type[Layer]): - """Helper method that checks if capability is in used_capabilities and if yes add a layer_class instance""" - if capability in used_capabilities: - self.add_layer(capability, layer_class(self._packet_buffer, self._packet_factory, self._packet_number_generator)) + return self._layer_general diff --git a/src/science_mode_4/device_i24.py b/src/science_mode_4/device_i24.py index 27f818e..a5a068b 100644 --- a/src/science_mode_4/device_i24.py +++ b/src/science_mode_4/device_i24.py @@ -2,11 +2,35 @@ from .device import Device, DeviceCapability from .utils.connection import Connection +from .dyscom.dyscom_types import DyscomGetOperationModeType +from .dyscom.dyscom_layer import LayerDyscom class DeviceI24(Device): """Device class for a I24 device""" def __init__(self, conn: Connection): - super().__init__(conn, [DeviceCapability.GENERAL, - DeviceCapability.DYSCOM]) + super().__init__(conn, [DeviceCapability.DYSCOM]) + + self._layer_dyscom = LayerDyscom(self._packet_buffer, self._packet_factory, self._packet_number_generator) + + + async def initialize(self): + """Initialize device to get basic information (serial, versions) and stop any active stimulation/measurement""" + # get operation mode to see if dyscom measurement is running + operation_mode = await self.get_layer_dyscom().get_operation_mode() + if operation_mode in [DyscomGetOperationModeType.UNDEFINED, + DyscomGetOperationModeType.LIVE_MEASURING_PRE, + DyscomGetOperationModeType.LIVE_MEASURING, + DyscomGetOperationModeType.RECORD_PRE, + DyscomGetOperationModeType.RECORD, + DyscomGetOperationModeType.DATATRANSFER_PRE, + DyscomGetOperationModeType.DATATRANSFER]: + await self.get_layer_dyscom().stop() + + await super().initialize() + + + def get_layer_dyscom(self) -> LayerDyscom: + """Helper function to access dyscom layer""" + return self._layer_dyscom diff --git a/src/science_mode_4/device_p24.py b/src/science_mode_4/device_p24.py index d62c60e..5e39766 100644 --- a/src/science_mode_4/device_p24.py +++ b/src/science_mode_4/device_p24.py @@ -2,12 +2,39 @@ from .device import Device, DeviceCapability from .utils.connection import Connection +from .protocol.types import StimStatus +from .low_level.low_level_layer import LayerLowLevel +from .mid_level.mid_level_layer import LayerMidLevel class DeviceP24(Device): """Device class for a P24 device""" def __init__(self, conn: Connection): - super().__init__(conn, [DeviceCapability.GENERAL, - DeviceCapability.LOW_LEVEL, + super().__init__(conn, [DeviceCapability.LOW_LEVEL, DeviceCapability.MID_LEVEL]) + + self._layer_mid_level = LayerMidLevel(self._packet_buffer, self._packet_factory, self._packet_number_generator) + self._layer_low_level = LayerLowLevel(self._packet_buffer, self._packet_factory, self._packet_number_generator) + + + async def initialize(self): + """Initialize device to get basic information (serial, versions) and stop any active stimulation/measurement""" + # get stim status to see if low/mid level is initialized or running + stim_status = await self.get_layer_general().get_stim_status() + if stim_status.stim_status == StimStatus.LOW_LEVEL_INITIALIZED: + await self.get_layer_low_level().stop() + elif stim_status.stim_status in [StimStatus.MID_LEVEL_INITIALIZED, StimStatus.MID_LEVEL_RUNNING]: + await self.get_layer_mid_level().stop() + + await super().initialize() + + + def get_layer_mid_level(self) -> LayerMidLevel: + """Helper function to access mid level layer""" + return self._layer_mid_level + + + def get_layer_low_level(self) -> LayerLowLevel: + """Helper function to access low level layer""" + return self._layer_low_level From c8a51b7c0e22295ddfa009609326dcbc7db02250 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Tue, 7 Apr 2026 19:01:40 +0200 Subject: [PATCH 2/7] updated readme --- HINTS.md | 5 +++-- README.md | 8 ++++++-- pyproject.toml | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/HINTS.md b/HINTS.md index b234ded..01c6cf4 100644 --- a/HINTS.md +++ b/HINTS.md @@ -10,7 +10,7 @@ This page describes implementation details. - To create a _Device_ object, a _Connection_ object is required, use _SerialConnection_ to connect to a serial port - _Connection_ must be opened and closed - Call _device.initialize()_ to get a defined state of the device (it stops any active stimulation/measurement) -- _Device_ object has layers to access commands +- _Device\_xxx_ object has layers to access commands - _Layer_ object has functions to send commands to the device and process acknowledges - To access layer, use helper functions _get\_layer\_xxx_ - _DeviceP24_ has layer general, low level and mid level @@ -30,7 +30,7 @@ This page describes implementation details. - Process incoming data until the expected acknowledge arrives - More data remains in connection buffer - Do not call async functions in parallel (e.g. from different event loops), because each function expects specific commands from the device and clears incoming data buffer -- Additionally functions with naming schema _send_xxx_ are normal functions not waiting for acknowledge +- Additionally functions with naming schema _send\_xxx_ are normal functions not waiting for acknowledge - The acknowledge needs to handled manually by using _PacketBuffer_ object from device ## Logging @@ -51,6 +51,7 @@ This page describes implementation details. - Call _init()_ to set device in mid level mode - Call _update()_ with stimulation pattern - Call _get_current_data()_ every 1.5s to keep stimulation ongoing + - Call _update()_ only when channel configuration must be changed - Call _stop()_ to end stimulation and leave mid level mode ## Low level layer (P24) diff --git a/README.md b/README.md index 8c77d0a..1b4d11d 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Python 3.11 or higher - Build project - `python -m build` - Install local library - - `pip install .\dist\science_mode_4-0.0.7-py3-none-any.whl` (adjust filename accordingly) + - `pip install --force .\dist\science_mode_4-0.0.7-py3-none-any.whl` (adjust filename accordingly) # Examples @@ -54,6 +54,7 @@ Python 3.11 or higher - All examples try to find the serial port that a science mode device is connected to automatically - If that fails, provide serial port name as parameter, e.g. `python -m examples.. COM3` - Good starting point for an simple stimulation is example `example_mid_level` +- See also HINTS.md file for more information - Examples have own dependencies, see [Dependencies for examples](#dependencies-for-examples) - General layer - `python -m examples.general.example_general` @@ -142,4 +143,7 @@ Python 3.11 or higher ## 0.0.21 - Added more error handling in SerialPortConnection._read_intern() to prevent ClearComErrors -- Added getter for underlying serial object in SerialPortConnection \ No newline at end of file +- Added getter for underlying serial object in SerialPortConnection + +## 0.0.22 +- Library specific exception classes \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8ad8912..025823d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "science_mode_4" -version = "0.0.21" +version = "0.0.22" authors = [ { name="Marc Hofmann", email="marc-hofmann@gmx.de" }, ] From df1760e7fec30138a10ad8d26e030aba46176dc2 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Tue, 7 Apr 2026 19:02:51 +0200 Subject: [PATCH 3/7] enhanced mid level example --- .../mid_level/example_mid_level_update.py | 194 ++++++++++-------- src/science_mode_4/__init__.py | 2 +- .../mid_level/mid_level_current_data.py | 2 +- src/science_mode_4/protocol/channel_point.py | 12 ++ 4 files changed, 121 insertions(+), 89 deletions(-) diff --git a/examples/mid_level/example_mid_level_update.py b/examples/mid_level/example_mid_level_update.py index 5a27b42..3dbbcac 100644 --- a/examples/mid_level/example_mid_level_update.py +++ b/examples/mid_level/example_mid_level_update.py @@ -12,101 +12,121 @@ from examples.utils.example_utils import ExampleUtils, KeyboardInputThread -async def main() -> int: - """Main function""" - - # lock is used to avoid interference of async functions of mid level layer - # mid_level.update() and mid_level.get_current_data() may run at the same - # time if a key is pressed at the moment when mid_level.get_current_data() is running - # (because they run on different event loops) - lock = threading.Lock() - - # some points - p1: ChannelPoint = ChannelPoint(200, 20) - p2: ChannelPoint = ChannelPoint(100, 0) - p3: ChannelPoint = ChannelPoint(200, -20) - # channel configuration - # we want to ignore first and last two channels (just for demonstration purpose how to handle unused channels) - # we need to pad list with None to achieve correct indices - # [None, None, ChannelConfig, ChannelConfig, ChannelConfig, ChannelConfig] - channel_config = [MidLevelChannelConfiguration(False, 3, 20, [p1, p2, p3]) for x in range(4)] - channel_config.insert(0, None) - channel_config.insert(0, None) - - # keyboard is our trigger to end program - def input_callback(input_value: str) -> bool: - """Callback call from keyboard input thread""" - # print(f"Input value {input_value}") - - if input_value == "q": - # end keyboard input thread - return True - if "1" <= input_value <= "8": - index = int(input_value) - 1 - # check if index is in range of channel_config - if 0 <= index < len(channel_config): - cc = channel_config[index] - # check if index contains a ChannelConfiguration object - if cc is not None: - # toggle active - cc.is_active = not cc.is_active - with lock: - asyncio.run(mid_level.update(channel_config)) +class ExampleMidLevelUpdate(): + """Example class""" + + def __init__(self): + # current in mA + # can be changed via keyboard input + self._current = 100 + + # lock is used to avoid interference of async functions of mid level layer + # mid_level.update() and mid_level.get_current_data() may run at the same + # time if a key is pressed at the moment when mid_level.get_current_data() is running + # (because they run on different event loops) + self._lock = threading.Lock() + + # some points + p1: ChannelPoint = ChannelPoint(250, self._current) + p2: ChannelPoint = ChannelPoint(250, -self._current) + p3: ChannelPoint = ChannelPoint(250, 0) + # channel configuration + self._channel_config = [MidLevelChannelConfiguration(True, 0, 20, [p1, p2, p3]) for x in range(8)] + + + async def run(self) -> int: + """Main function""" + + # keyboard is our trigger to end program + def input_callback(input_value: str) -> bool: + """Callback call from keyboard input thread""" + # print(f"Input value {input_value}") + + if input_value == "q": + # end keyboard input thread + return True + if "1" <= input_value <= "8": + index = int(input_value) - 1 + # check if index is in range of channel_config + if 0 <= index < len(self._channel_config): + cc = self._channel_config[index] + # check if index contains a ChannelConfiguration object + if cc is not None: + # toggle active + cc.is_active = not cc.is_active + with self._lock: + asyncio.run(mid_level.update(self._channel_config)) + else: + print("Channel config is None") else: - print("Channel config is None") - else: - print("Invalid channel config index") + print("Invalid channel config index") - return False - - - print("Invalid command") - return False - - print("Usage: press 1-8 to toggle channel, press q to quit") - print("Only channels 3-6 have a channel configuration (see comments where configuration is created)") - # create keyboard input thread for non blocking console input - keyboard_input_thread = KeyboardInputThread(input_callback) + return False - # get comport from command line argument - com_port = ExampleUtils.get_comport_from_commandline_argument() - # create serial port connection - connection = SerialPortConnection(com_port) - # open connection, now we can read and write data - connection.open() + if input_value in ["+", "-"]: + self._current += 1 if input_value == "+" else -1 + self._current = max(0, min(130, self._current)) + print(f"Change current to {self._current}") - # create science mode device - device = DeviceP24(connection) - # call initialize to get basic information (serial, versions) and stop any active stimulation/measurement - # to have a defined state - await device.initialize() + for x in self._channel_config: + x.points[0].current_in_milli_ampere = self._current + x.points[1].current_in_milli_ampere = -self._current - # get mid level layer to call mid level commands - mid_level = device.get_layer_mid_level() - # call init mid level, we do not want to stop on all stimulation errors to be able to - # see errors during get_current_data - await mid_level.init(False) - # set stimulation pattern, P24 device will now stimulate according this pattern - await mid_level.update(channel_config) + with self._lock: + asyncio.run(mid_level.update(self._channel_config)) + return False - possible_errors = [ResultAndError.ELECTRODE_ERROR, ResultAndError.PULSE_TIMEOUT_ERROR, ResultAndError.PULSE_LOW_CURRENT_ERROR] - while keyboard_input_thread.is_alive(): - # we have to call get_current_data() every 1.5s to keep stimulation ongoing - with lock: - _, _, channel_error = await mid_level.get_current_data() - if any(v in possible_errors for v in channel_error): - print(f"Channel with error: {[(i, v.name) for i, v in enumerate(channel_error) if v != ResultAndError.NO_ERROR]}") - - await asyncio.sleep(1) - - # call stop mid level - await mid_level.stop() + print("Invalid command") + return False - # close serial port connection - connection.close() - return 0 + print("Usage:") + print("Press 1-8 to toggle channel") + print("Press + or - to increase or decrease current") + print("Press q to quit") + + # create keyboard input thread for non blocking console input + keyboard_input_thread = KeyboardInputThread(input_callback) + + # get comport from command line argument + com_port = ExampleUtils.get_comport_from_commandline_argument() + # create serial port connection + connection = SerialPortConnection(com_port) + # open connection, now we can read and write data + connection.open() + + # create science mode device + device = DeviceP24(connection) + # call initialize to get basic information (serial, versions) and stop any active stimulation/measurement + # to have a defined state + await device.initialize() + + # get mid level layer to call mid level commands + mid_level = device.get_layer_mid_level() + # call init mid level, we do not want to stop on all stimulation errors to be able to + # see errors during get_current_data + await mid_level.init(False) + # set stimulation pattern, P24 device will now stimulate according this pattern + await mid_level.update(self._channel_config) + + possible_errors = [ResultAndError.ELECTRODE_ERROR, ResultAndError.PULSE_TIMEOUT_ERROR, ResultAndError.PULSE_LOW_CURRENT_ERROR] + while keyboard_input_thread.is_alive(): + # we have to call get_current_data() every 1.5s to keep stimulation ongoing + with self._lock: + _, _, channel_error = await mid_level.get_current_data() + if any(v in possible_errors for v in channel_error): + print(f"Channel with error: {[(i, v.name) for i, v in enumerate(channel_error) if v != ResultAndError.NO_ERROR]}") + + await asyncio.sleep(1) + + # call stop mid level + await mid_level.stop() + + # close serial port connection + connection.close() + return 0 if __name__ == "__main__": - res = asyncio.run(main()) + example = ExampleMidLevelUpdate() + res = asyncio.run(example.run()) sys.exit(res) diff --git a/src/science_mode_4/__init__.py b/src/science_mode_4/__init__.py index f0caf5c..d34b25f 100644 --- a/src/science_mode_4/__init__.py +++ b/src/science_mode_4/__init__.py @@ -16,6 +16,6 @@ try: __version__ = version("science_mode_4") - logger().info("Library version %s", __version__) + logger().info("science_mode_4 library version %s", __version__) except PackageNotFoundError: pass diff --git a/src/science_mode_4/mid_level/mid_level_current_data.py b/src/science_mode_4/mid_level/mid_level_current_data.py index 5a3da8c..8c444d8 100644 --- a/src/science_mode_4/mid_level/mid_level_current_data.py +++ b/src/science_mode_4/mid_level/mid_level_current_data.py @@ -63,7 +63,7 @@ def __init__(self, data: bytes): @property def result_error(self) -> ResultAndError: - """Getter for ResultError""" + """Getter for ResultError, possible values: NO_ERROR, ELECTRODE_ERROR, PULSE_TIMEOUT_ERROR, PULSE_LOW_CURRENT_ERROR""" return self._result_error diff --git a/src/science_mode_4/protocol/channel_point.py b/src/science_mode_4/protocol/channel_point.py index 9445f81..95ef742 100644 --- a/src/science_mode_4/protocol/channel_point.py +++ b/src/science_mode_4/protocol/channel_point.py @@ -18,12 +18,24 @@ def current_in_milli_ampere(self) -> float: return self._current_in_milli_ampere + @current_in_milli_ampere.setter + def current_in_milli_ampere(self, value: float): + """Setter for current""" + self._current_in_milli_ampere = value + + @property def duration_in_micro_seconds(self) -> int: """Getter for duration""" return self._duration_in_micro_seconds + @duration_in_micro_seconds.setter + def duration_in_micro_seconds(self, value: int): + """Setter for duration""" + self._duration_in_micro_seconds = value + + def get_data(self) -> bytes: """Convert information to bytes""" if (self._current_in_milli_ampere < -150.0) or (self._current_in_milli_ampere > 150.0): From 3ba224eeeb440e5dd7d21d230c4d23d50a1380f7 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Thu, 9 Apr 2026 18:26:27 +0200 Subject: [PATCH 4/7] intruduced custom errors --- src/science_mode_4/dyscom/dyscom_layer.py | 7 ++++--- src/science_mode_4/general/general_layer.py | 5 +++-- src/science_mode_4/layer.py | 3 ++- src/science_mode_4/protocol/__init__.py | 1 + src/science_mode_4/protocol/exceptions.py | 9 +++++++++ src/science_mode_4/protocol/protocol_helper.py | 7 ++++--- src/science_mode_4/utils/packet_buffer.py | 3 ++- 7 files changed, 25 insertions(+), 10 deletions(-) create mode 100644 src/science_mode_4/protocol/exceptions.py diff --git a/src/science_mode_4/dyscom/dyscom_layer.py b/src/science_mode_4/dyscom/dyscom_layer.py index 9783e67..eb3e14b 100644 --- a/src/science_mode_4/dyscom/dyscom_layer.py +++ b/src/science_mode_4/dyscom/dyscom_layer.py @@ -5,6 +5,7 @@ from science_mode_4.layer import Layer from science_mode_4.protocol.commands import Commands +from science_mode_4.protocol.exceptions import ProtocolError from science_mode_4.utils.logger import logger from .dyscom_types import DyscomFrequencyOut, DyscomGetOperationModeType, DyscomInitState, DyscomPowerModuleType,\ DyscomPowerModulePowerType, DyscomSignalType, DyscomSysState, DyscomSysType @@ -37,7 +38,7 @@ async def init(self, params: DyscomInitParams) -> DyscomInitResult: ack: PacketDyscomInitAck = await self.send_packet_and_wait(p) self._check_result_error(ack.result_error, "DyscomInit") if ack.init_state not in [DyscomInitState.UNUSED, DyscomInitState.SUCCESS]: - raise ValueError(f"Dyscom error init {ack.init_state.name}") + raise ProtocolError(f"Dyscom error init {ack.init_state.name}") logger().info("Dyscom init, measurement_file_id: %s, state: %s, frequency: %s",\ ack.measurement_file_id, ack.init_state.name, ack.frequency_out.name) @@ -143,7 +144,7 @@ async def sys(self, sys_type: DyscomSysType, filename: str = "") -> DyscomSysRes ack: PacketDyscomSysAck = await self.send_packet_and_wait(p) self._check_result_error(ack.result_error, "DyscomSys") if ack.state not in [DyscomSysState.SUCCESSFUL]: - raise ValueError(f"Dyscom error sys {ack.state.name}") + raise ProtocolError(f"Dyscom error sys {ack.state.name}") logger().info("Dyscom sys, type: %s, state: %s, filename: %s", ack.sys_type.name, ack.state.name, ack.filename) return DyscomSysResult(ack.sys_type, ack.state, ack.filename) @@ -190,7 +191,7 @@ async def get_file_content(self, filename: str) -> bytes: """Gets content of a file. Device must be in Idle operating mode""" om = await self.get_operation_mode() if om != DyscomGetOperationModeType.IDLE: - raise ValueError(f"Error wrong operation mode {om.name}") + raise ProtocolError(f"Error wrong operation mode {om.name}") # get meta information and sets device in mode DATATRANSFER_PRE # we need number of blocks to know how many SendFile commands we expect diff --git a/src/science_mode_4/general/general_layer.py b/src/science_mode_4/general/general_layer.py index 9def71a..8142411 100644 --- a/src/science_mode_4/general/general_layer.py +++ b/src/science_mode_4/general/general_layer.py @@ -1,5 +1,6 @@ """Provides general layer""" +from science_mode_4.protocol.exceptions import ProtocolError from science_mode_4.protocol.packet_number_generator import PacketNumberGenerator from science_mode_4.protocol.packet_factory import PacketFactory from science_mode_4.layer import Layer @@ -70,7 +71,7 @@ async def get_stim_status(self) -> GetStimStatusResult: p = PacketGeneralGetStimStatus() ack: PacketGeneralGetStimStatusAck = await self.send_packet_and_wait(p) if not ack.successful: - raise ValueError("Error get stim status") + raise ProtocolError("Error get stim status") logger().info("Get stim status: %s, active: %r", ack.stim_status.name, ack.high_voltage_on) return GetStimStatusResult(ack.stim_status, ack.high_voltage_on) @@ -80,7 +81,7 @@ async def get_version(self) -> GetExtendedVersionResult: p = PacketGeneralGetExtendedVersion() ack: PacketGeneralGetExtendedVersionAck = await self.send_packet_and_wait(p) if not ack.successful: - raise ValueError("Error get extended version") + raise ProtocolError("Error get extended version") self._firmware_version = ack.firmware_version self._science_mode_version = ack.science_mode_version logger().info("Get version, firmware version: %s science mode version: %s, firmware hash: %s, hash type: %s, is valid hash: %r",\ diff --git a/src/science_mode_4/layer.py b/src/science_mode_4/layer.py index 23d35c0..fb9beee 100644 --- a/src/science_mode_4/layer.py +++ b/src/science_mode_4/layer.py @@ -1,5 +1,6 @@ """Provides base class for all ScienceMode layers""" +from .protocol.exceptions import ProtocolError from .protocol.protocol_helper import ProtocolHelper from .protocol.types import ResultAndError from .protocol.packet import Packet, PacketAck @@ -41,4 +42,4 @@ async def send_packet_and_wait(self, packet: Packet) -> PacketAck: def _check_result_error(self, result_error: ResultAndError, packet_name: str): """Check if result_error contains an error and if yes prints packet_name""" if result_error != ResultAndError.NO_ERROR: - raise ValueError(f"Error {packet_name} {result_error.name}") + raise ProtocolError(f"Error {packet_name} {result_error.name}") diff --git a/src/science_mode_4/protocol/__init__.py b/src/science_mode_4/protocol/__init__.py index b61df44..b0fc010 100644 --- a/src/science_mode_4/protocol/__init__.py +++ b/src/science_mode_4/protocol/__init__.py @@ -2,6 +2,7 @@ from .channel_point import * from .commands import * +from .exceptions import * from .packet_factory import * from .packet_number_generator import * from .packet import * diff --git a/src/science_mode_4/protocol/exceptions.py b/src/science_mode_4/protocol/exceptions.py new file mode 100644 index 0000000..6d2f35d --- /dev/null +++ b/src/science_mode_4/protocol/exceptions.py @@ -0,0 +1,9 @@ +"""File with all library specific exception classes""" + + +class ScienceMode4Error(Exception): + """Base class for all library specific exceptions""" + + +class ProtocolError(ScienceMode4Error): + """Exception for protocol errors""" diff --git a/src/science_mode_4/protocol/protocol_helper.py b/src/science_mode_4/protocol/protocol_helper.py index 2d11658..5a1f871 100644 --- a/src/science_mode_4/protocol/protocol_helper.py +++ b/src/science_mode_4/protocol/protocol_helper.py @@ -5,6 +5,7 @@ from science_mode_4.general.general_error import PacketGeneralError from science_mode_4.general.general_unknown_command import PacketGeneralUnknownCommand from science_mode_4.utils.packet_buffer import PacketBuffer +from .exceptions import ProtocolError from .protocol import Protocol from .commands import Commands from .packet import Packet, PacketAck @@ -47,10 +48,10 @@ async def send_packet_and_wait(packet: Packet, packet_number: int, packet_buffer # check if we got an error if ack.command == Commands.GENERAL_ERROR: ge: PacketGeneralError = ack - raise ValueError(f"General error packet {ge.result_error.name}") + raise ProtocolError(f"General error packet {ge.result_error.name}") if ack.command == Commands.UNKNOWN_COMMAND: uc: PacketGeneralUnknownCommand = ack - raise ValueError(f"Unknown command packet {uc.result_error.name}") + raise ProtocolError(f"Unknown command packet {uc.result_error.name}") # discard acknowledge and continue @@ -62,4 +63,4 @@ async def send_packet_and_wait(packet: Packet, packet_number: int, packet_buffer # we got no response in time, so remove open acknowledges packet_buffer.remove_open_acknowledge(packet) - raise ValueError(f"No valid answer for packet {packet.command}") + raise ProtocolError(f"No valid answer for packet {packet.command}") diff --git a/src/science_mode_4/utils/packet_buffer.py b/src/science_mode_4/utils/packet_buffer.py index 74b5cf0..13f246e 100644 --- a/src/science_mode_4/utils/packet_buffer.py +++ b/src/science_mode_4/utils/packet_buffer.py @@ -1,5 +1,6 @@ """Provides a packet buffer functionality for more async handling of packets and acknowledges""" +from science_mode_4.protocol.exceptions import ProtocolError from science_mode_4.protocol.packet import Packet from science_mode_4.protocol.packet_factory import PacketFactory from science_mode_4.protocol.protocol import Protocol @@ -51,7 +52,7 @@ def remove_open_acknowledge(self, packet: Packet): if key in self._open_acknowledges: self._open_acknowledges[key] -= 1 else: - raise ValueError(f"Remove non existing acknowledge from packet buffer, command {packet.command}, number {packet.number}") + raise ProtocolError(f"Remove non existing acknowledge from packet buffer, command {packet.command}, number {packet.number}") def print_open_acknowledge(self): From f210395833a17f8a2652e54292cb847141fac7b5 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Sun, 12 Apr 2026 09:09:39 +0200 Subject: [PATCH 5/7] added more description --- HINTS.md | 9 +- .../mid_level/example_mid_level_update.py | 161 ++++++++++-------- 2 files changed, 97 insertions(+), 73 deletions(-) diff --git a/HINTS.md b/HINTS.md index 01c6cf4..ed39c15 100644 --- a/HINTS.md +++ b/HINTS.md @@ -28,7 +28,7 @@ This page describes implementation details. - Clear buffer - Send command - Process incoming data until the expected acknowledge arrives - - More data remains in connection buffer + - More data may remain in connection buffer - Do not call async functions in parallel (e.g. from different event loops), because each function expects specific commands from the device and clears incoming data buffer - Additionally functions with naming schema _send\_xxx_ are normal functions not waiting for acknowledge - The acknowledge needs to handled manually by using _PacketBuffer_ object from device @@ -41,6 +41,13 @@ This page describes implementation details. - For better performance, disable logger - `logger().disabled = True` +## Error handling +- Functions raise a ValueError if an input parameter is invalid +- Functions communicating with the device raise a ProtocolError if device does not return a matching acknowledge or returns an error + - Communication error may occur when setting high current for P24 depending on USB-PC port and USB cable +- _SerialPortConnection_ raise _serial.SerialException_ if unable to open/read/write serial port +- _UsbConnection_ raise a _usb.core.USBError_ if unable to open/read/write usb device + ## General layer (all devices) - Contains functions to get common information like device serial or firmware version diff --git a/examples/mid_level/example_mid_level_update.py b/examples/mid_level/example_mid_level_update.py index 3dbbcac..4378ab9 100644 --- a/examples/mid_level/example_mid_level_update.py +++ b/examples/mid_level/example_mid_level_update.py @@ -18,7 +18,7 @@ class ExampleMidLevelUpdate(): def __init__(self): # current in mA # can be changed via keyboard input - self._current = 100 + self._current = 50 # lock is used to avoid interference of async functions of mid level layer # mid_level.update() and mid_level.get_current_data() may run at the same @@ -42,87 +42,104 @@ def input_callback(input_value: str) -> bool: """Callback call from keyboard input thread""" # print(f"Input value {input_value}") - if input_value == "q": - # end keyboard input thread - return True - if "1" <= input_value <= "8": - index = int(input_value) - 1 - # check if index is in range of channel_config - if 0 <= index < len(self._channel_config): - cc = self._channel_config[index] - # check if index contains a ChannelConfiguration object - if cc is not None: - # toggle active - cc.is_active = not cc.is_active - with self._lock: - asyncio.run(mid_level.update(self._channel_config)) + try: + if input_value == "q": + # end keyboard input thread + return True + if "1" <= input_value <= "8": + index = int(input_value) - 1 + # check if index is in range of channel_config + if 0 <= index < len(self._channel_config): + cc = self._channel_config[index] + # check if index contains a ChannelConfiguration object + if cc is not None: + # toggle active + cc.is_active = not cc.is_active + with self._lock: + asyncio.run(mid_level.update(self._channel_config)) + else: + print("Channel config is None") else: - print("Channel config is None") - else: - print("Invalid channel config index") - - return False - - if input_value in ["+", "-"]: - self._current += 1 if input_value == "+" else -1 - self._current = max(0, min(130, self._current)) - print(f"Change current to {self._current}") - - for x in self._channel_config: - x.points[0].current_in_milli_ampere = self._current - x.points[1].current_in_milli_ampere = -self._current - - with self._lock: - asyncio.run(mid_level.update(self._channel_config)) + print("Invalid channel config index") + + return False + + if input_value in ["+", "-", "*", "/"]: + match input_value: + case "+": + self._current += 1 + case "-": + self._current -= 1 + case "*": + self._current += 10 + case "/": + self._current -= 10 + self._current = max(0, min(130, self._current)) + print(f"Change current to {self._current}") + + for x in self._channel_config: + x.points[0].current_in_milli_ampere = self._current + x.points[1].current_in_milli_ampere = -self._current + + with self._lock: + asyncio.run(mid_level.update(self._channel_config)) + return False + + print("Invalid command") return False - - print("Invalid command") - return False + except Exception as e: # pylint:disable=broad-exception-caught + print(e) + return True print("Usage:") print("Press 1-8 to toggle channel") - print("Press + or - to increase or decrease current") + print("Press + or - to increase or decrease current by 1mA") + print("Press * or / to increase or decrease current by 10mA") print("Press q to quit") # create keyboard input thread for non blocking console input keyboard_input_thread = KeyboardInputThread(input_callback) - # get comport from command line argument - com_port = ExampleUtils.get_comport_from_commandline_argument() - # create serial port connection - connection = SerialPortConnection(com_port) - # open connection, now we can read and write data - connection.open() - - # create science mode device - device = DeviceP24(connection) - # call initialize to get basic information (serial, versions) and stop any active stimulation/measurement - # to have a defined state - await device.initialize() - - # get mid level layer to call mid level commands - mid_level = device.get_layer_mid_level() - # call init mid level, we do not want to stop on all stimulation errors to be able to - # see errors during get_current_data - await mid_level.init(False) - # set stimulation pattern, P24 device will now stimulate according this pattern - await mid_level.update(self._channel_config) - - possible_errors = [ResultAndError.ELECTRODE_ERROR, ResultAndError.PULSE_TIMEOUT_ERROR, ResultAndError.PULSE_LOW_CURRENT_ERROR] - while keyboard_input_thread.is_alive(): - # we have to call get_current_data() every 1.5s to keep stimulation ongoing - with self._lock: - _, _, channel_error = await mid_level.get_current_data() - if any(v in possible_errors for v in channel_error): - print(f"Channel with error: {[(i, v.name) for i, v in enumerate(channel_error) if v != ResultAndError.NO_ERROR]}") - - await asyncio.sleep(1) - - # call stop mid level - await mid_level.stop() - - # close serial port connection - connection.close() + try: + # get comport from command line argument + com_port = ExampleUtils.get_comport_from_commandline_argument() + # create serial port connection + connection = SerialPortConnection(com_port) + # open connection, now we can read and write data + connection.open() + + # create science mode device + device = DeviceP24(connection) + # call initialize to get basic information (serial, versions) and stop any active stimulation/measurement + # to have a defined state + await device.initialize() + + # get mid level layer to call mid level commands + mid_level = device.get_layer_mid_level() + # call init mid level, we do not want to stop on all stimulation errors to be able to + # see errors during get_current_data + await mid_level.init(False) + # set stimulation pattern, P24 device will now stimulate according this pattern + await mid_level.update(self._channel_config) + + possible_errors = [ResultAndError.ELECTRODE_ERROR, ResultAndError.PULSE_TIMEOUT_ERROR, ResultAndError.PULSE_LOW_CURRENT_ERROR] + while keyboard_input_thread.is_alive(): + # we have to call get_current_data() every 1.5s to keep stimulation ongoing + with self._lock: + _, _, channel_error = await mid_level.get_current_data() + if any(v in possible_errors for v in channel_error): + print(f"Channel with error: {[(i, v.name) for i, v in enumerate(channel_error) if v != ResultAndError.NO_ERROR]}") + + await asyncio.sleep(1) + + # call stop mid level + await mid_level.stop() + + # close serial port connection + connection.close() + except Exception as e: # pylint:disable=broad-exception-caught + print(e) + return 0 From ed2066034a82009748b84aa9ecf3bbe777c2c4d4 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Sun, 12 Apr 2026 09:50:09 +0200 Subject: [PATCH 6/7] linter --- .pylintrc | 1 + examples/mid_level/example_mid_level_update.py | 1 + 2 files changed, 2 insertions(+) diff --git a/.pylintrc b/.pylintrc index 3d1c8ad..d713266 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,6 +1,7 @@ [MAIN] max-line-length=150 max-attributes=15 +max-branches=15 [DESIGN] max-statements=100 diff --git a/examples/mid_level/example_mid_level_update.py b/examples/mid_level/example_mid_level_update.py index 4378ab9..9274823 100644 --- a/examples/mid_level/example_mid_level_update.py +++ b/examples/mid_level/example_mid_level_update.py @@ -46,6 +46,7 @@ def input_callback(input_value: str) -> bool: if input_value == "q": # end keyboard input thread return True + if "1" <= input_value <= "8": index = int(input_value) - 1 # check if index is in range of channel_config From e69507037aea5a663fc9b5664c59ed43733494f3 Mon Sep 17 00:00:00 2001 From: Marc Hofmann Date: Sun, 12 Apr 2026 10:15:33 +0200 Subject: [PATCH 7/7] documentation --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1b4d11d..8fbc7b9 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,9 @@ Pure Python implementation of HasomedScience ScienceMode 4 protocol for P24 (https://github.com/ScienceMode/ScienceMode4_P24) and I24 (https://github.com/ScienceMode/ScienceMode4_I24) devices. To use this library see section [Installation](#installation). Library and examples are tested under Windows, Linux and MacOS. +See also [Hints](HINTS.md) file for more information. + + ## Requirements Python 3.11 or higher @@ -54,7 +57,6 @@ Python 3.11 or higher - All examples try to find the serial port that a science mode device is connected to automatically - If that fails, provide serial port name as parameter, e.g. `python -m examples.. COM3` - Good starting point for an simple stimulation is example `example_mid_level` -- See also HINTS.md file for more information - Examples have own dependencies, see [Dependencies for examples](#dependencies-for-examples) - General layer - `python -m examples.general.example_general`