Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[MAIN]
max-line-length=150
max-attributes=15
max-branches=15

[DESIGN]
max-statements=100
Expand Down
14 changes: 11 additions & 3 deletions HINTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ This page describes implementation details.
- To create a _Device_ object, a _Connection_ object is required, use _SerialConnection_ to connect to a serial port
- _Connection_ must be opened and closed
- Call _device.initialize()_ to get a defined state of the device (it stops any active stimulation/measurement)
- _Device_ object has layers to access commands
- _Device\_xxx_ object has layers to access commands
- _Layer_ object has functions to send commands to the device and process acknowledges
- To access layer, use helper functions _get\_layer\_xxx_
- _DeviceP24_ has layer general, low level and mid level
Expand All @@ -28,9 +28,9 @@ This page describes implementation details.
- Clear buffer
- Send command
- Process incoming data until the expected acknowledge arrives
- More data remains in connection buffer
- More data may remain in connection buffer
- Do not call async functions in parallel (e.g. from different event loops), because each function expects specific commands from the device and clears incoming data buffer
- Additionally functions with naming schema _send_xxx_ are normal functions not waiting for acknowledge
- Additionally functions with naming schema _send\_xxx_ are normal functions not waiting for acknowledge
- The acknowledge needs to handled manually by using _PacketBuffer_ object from device

## Logging
Expand All @@ -41,6 +41,13 @@ This page describes implementation details.
- For better performance, disable logger
- `logger().disabled = True`

## Error handling
- Functions raise a ValueError if an input parameter is invalid
- Functions communicating with the device raise a ProtocolError if device does not return a matching acknowledge or returns an error
- Communication error may occur when setting high current for P24 depending on USB-PC port and USB cable
- _SerialPortConnection_ raise _serial.SerialException_ if unable to open/read/write serial port
- _UsbConnection_ raise a _usb.core.USBError_ if unable to open/read/write usb device

## General layer (all devices)
- Contains functions to get common information like device serial or firmware version

Expand All @@ -51,6 +58,7 @@ This page describes implementation details.
- Call _init()_ to set device in mid level mode
- Call _update()_ with stimulation pattern
- Call _get_current_data()_ every 1.5s to keep stimulation ongoing
- Call _update()_ only when channel configuration must be changed
- Call _stop()_ to end stimulation and leave mid level mode

## Low level layer (P24)
Expand Down
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

Pure Python implementation of HasomedScience ScienceMode 4 protocol for P24 (https://github.com/ScienceMode/ScienceMode4_P24) and I24 (https://github.com/ScienceMode/ScienceMode4_I24) devices. To use this library see section [Installation](#installation). Library and examples are tested under Windows, Linux and MacOS.

See also [Hints](HINTS.md) file for more information.


## Requirements

Python 3.11 or higher
Expand Down Expand Up @@ -42,7 +45,7 @@ Python 3.11 or higher
- Build project
- `python -m build`
- Install local library
- `pip install .\dist\science_mode_4-0.0.7-py3-none-any.whl` (adjust filename accordingly)
- `pip install --force .\dist\science_mode_4-0.0.7-py3-none-any.whl` (adjust filename accordingly)

# Examples

Expand Down Expand Up @@ -142,4 +145,7 @@ Python 3.11 or higher

## 0.0.21
- Added more error handling in SerialPortConnection._read_intern() to prevent ClearComErrors
- Added getter for underlying serial object in SerialPortConnection
- Added getter for underlying serial object in SerialPortConnection

## 0.0.22
- Library specific exception classes
226 changes: 132 additions & 94 deletions examples/mid_level/example_mid_level_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,101 +12,139 @@
from examples.utils.example_utils import ExampleUtils, KeyboardInputThread


async def main() -> int:
"""Main function"""

# lock is used to avoid interference of async functions of mid level layer
# mid_level.update() and mid_level.get_current_data() may run at the same
# time if a key is pressed at the moment when mid_level.get_current_data() is running
# (because they run on different event loops)
lock = threading.Lock()

# some points
p1: ChannelPoint = ChannelPoint(200, 20)
p2: ChannelPoint = ChannelPoint(100, 0)
p3: ChannelPoint = ChannelPoint(200, -20)
# channel configuration
# we want to ignore first and last two channels (just for demonstration purpose how to handle unused channels)
# we need to pad list with None to achieve correct indices
# [None, None, ChannelConfig, ChannelConfig, ChannelConfig, ChannelConfig]
channel_config = [MidLevelChannelConfiguration(False, 3, 20, [p1, p2, p3]) for x in range(4)]
channel_config.insert(0, None)
channel_config.insert(0, None)

# keyboard is our trigger to end program
def input_callback(input_value: str) -> bool:
"""Callback call from keyboard input thread"""
# print(f"Input value {input_value}")

if input_value == "q":
# end keyboard input thread
return True
if "1" <= input_value <= "8":
index = int(input_value) - 1
# check if index is in range of channel_config
if 0 <= index < len(channel_config):
cc = channel_config[index]
# check if index contains a ChannelConfiguration object
if cc is not None:
# toggle active
cc.is_active = not cc.is_active
with lock:
asyncio.run(mid_level.update(channel_config))
else:
print("Channel config is None")
else:
print("Invalid channel config index")

return False


print("Invalid command")
return False

print("Usage: press 1-8 to toggle channel, press q to quit")
print("Only channels 3-6 have a channel configuration (see comments where configuration is created)")
# create keyboard input thread for non blocking console input
keyboard_input_thread = KeyboardInputThread(input_callback)

# get comport from command line argument
com_port = ExampleUtils.get_comport_from_commandline_argument()
# create serial port connection
connection = SerialPortConnection(com_port)
# open connection, now we can read and write data
connection.open()

# create science mode device
device = DeviceP24(connection)
# call initialize to get basic information (serial, versions) and stop any active stimulation/measurement
# to have a defined state
await device.initialize()

# get mid level layer to call mid level commands
mid_level = device.get_layer_mid_level()
# call init mid level, we do not want to stop on all stimulation errors to be able to
# see errors during get_current_data
await mid_level.init(False)
# set stimulation pattern, P24 device will now stimulate according this pattern
await mid_level.update(channel_config)

possible_errors = [ResultAndError.ELECTRODE_ERROR, ResultAndError.PULSE_TIMEOUT_ERROR, ResultAndError.PULSE_LOW_CURRENT_ERROR]
while keyboard_input_thread.is_alive():
# we have to call get_current_data() every 1.5s to keep stimulation ongoing
with lock:
_, _, channel_error = await mid_level.get_current_data()
if any(v in possible_errors for v in channel_error):
print(f"Channel with error: {[(i, v.name) for i, v in enumerate(channel_error) if v != ResultAndError.NO_ERROR]}")

await asyncio.sleep(1)

# call stop mid level
await mid_level.stop()

# close serial port connection
connection.close()
return 0
class ExampleMidLevelUpdate():
"""Example class"""

def __init__(self):
# current in mA
# can be changed via keyboard input
self._current = 50

# lock is used to avoid interference of async functions of mid level layer
# mid_level.update() and mid_level.get_current_data() may run at the same
# time if a key is pressed at the moment when mid_level.get_current_data() is running
# (because they run on different event loops)
self._lock = threading.Lock()

# some points
p1: ChannelPoint = ChannelPoint(250, self._current)
p2: ChannelPoint = ChannelPoint(250, -self._current)
p3: ChannelPoint = ChannelPoint(250, 0)
# channel configuration
self._channel_config = [MidLevelChannelConfiguration(True, 0, 20, [p1, p2, p3]) for x in range(8)]


async def run(self) -> int:
"""Main function"""

# keyboard is our trigger to end program
def input_callback(input_value: str) -> bool:
"""Callback call from keyboard input thread"""
# print(f"Input value {input_value}")

try:
if input_value == "q":
# end keyboard input thread
return True

if "1" <= input_value <= "8":
index = int(input_value) - 1
# check if index is in range of channel_config
if 0 <= index < len(self._channel_config):
cc = self._channel_config[index]
# check if index contains a ChannelConfiguration object
if cc is not None:
# toggle active
cc.is_active = not cc.is_active
with self._lock:
asyncio.run(mid_level.update(self._channel_config))
else:
print("Channel config is None")
else:
print("Invalid channel config index")

return False

if input_value in ["+", "-", "*", "/"]:
match input_value:
case "+":
self._current += 1
case "-":
self._current -= 1
case "*":
self._current += 10
case "/":
self._current -= 10
self._current = max(0, min(130, self._current))
print(f"Change current to {self._current}")

for x in self._channel_config:
x.points[0].current_in_milli_ampere = self._current
x.points[1].current_in_milli_ampere = -self._current

with self._lock:
asyncio.run(mid_level.update(self._channel_config))
return False

print("Invalid command")
return False
except Exception as e: # pylint:disable=broad-exception-caught
print(e)
return True

print("Usage:")
print("Press 1-8 to toggle channel")
print("Press + or - to increase or decrease current by 1mA")
print("Press * or / to increase or decrease current by 10mA")
print("Press q to quit")

# create keyboard input thread for non blocking console input
keyboard_input_thread = KeyboardInputThread(input_callback)

try:
# get comport from command line argument
com_port = ExampleUtils.get_comport_from_commandline_argument()
# create serial port connection
connection = SerialPortConnection(com_port)
# open connection, now we can read and write data
connection.open()

# create science mode device
device = DeviceP24(connection)
# call initialize to get basic information (serial, versions) and stop any active stimulation/measurement
# to have a defined state
await device.initialize()

# get mid level layer to call mid level commands
mid_level = device.get_layer_mid_level()
# call init mid level, we do not want to stop on all stimulation errors to be able to
# see errors during get_current_data
await mid_level.init(False)
# set stimulation pattern, P24 device will now stimulate according this pattern
await mid_level.update(self._channel_config)

possible_errors = [ResultAndError.ELECTRODE_ERROR, ResultAndError.PULSE_TIMEOUT_ERROR, ResultAndError.PULSE_LOW_CURRENT_ERROR]
while keyboard_input_thread.is_alive():
# we have to call get_current_data() every 1.5s to keep stimulation ongoing
with self._lock:
_, _, channel_error = await mid_level.get_current_data()
if any(v in possible_errors for v in channel_error):
print(f"Channel with error: {[(i, v.name) for i, v in enumerate(channel_error) if v != ResultAndError.NO_ERROR]}")

await asyncio.sleep(1)

# call stop mid level
await mid_level.stop()

# close serial port connection
connection.close()
except Exception as e: # pylint:disable=broad-exception-caught
print(e)

return 0


if __name__ == "__main__":
res = asyncio.run(main())
example = ExampleMidLevelUpdate()
res = asyncio.run(example.run())
sys.exit(res)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "science_mode_4"
version = "0.0.21"
version = "0.0.22"
authors = [
{ name="Marc Hofmann", email="marc-hofmann@gmx.de" },
]
Expand Down
2 changes: 1 addition & 1 deletion src/science_mode_4/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@

try:
__version__ = version("science_mode_4")
logger().info("Library version %s", __version__)
logger().info("science_mode_4 library version %s", __version__)
except PackageNotFoundError:
pass
Loading
Loading