diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 43d94a0..ca07c00 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,12 +13,12 @@ jobs: strategy: max-parallel: 4 matrix: - python-version: ["3.10", "3.11", "3.12", "3.13"] + python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"] steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v6 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v5 + uses: actions/setup-python@v6 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.gitignore b/.gitignore index 627d0bf..0adbb21 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,7 @@ dist/ # Temporary sample files sample*.py + + +# vscode +.vscode/ \ No newline at end of file diff --git a/thingsdb/client/client.py b/thingsdb/client/client.py index 666c6eb..95a21ae 100644 --- a/thingsdb/client/client.py +++ b/thingsdb/client/client.py @@ -58,7 +58,7 @@ def __init__( self._scope = '@t' # default to thingsdb scope self._pool_idx = 0 self._reconnecting = False - self._rooms: dict[int, RoomBase] = dict() + self._rooms: dict[str, dict[int, RoomBase]] = defaultdict(dict) self._rooms_lock = asyncio.Lock() if ssl is True: @@ -74,7 +74,10 @@ def get_rooms(self) -> tuple[RoomBase, ...]: Returns: a tuple with unique Room instances. """ - return tuple(self._rooms.values()) + total: list[RoomBase] = [] + for rooms in self._rooms.values(): + total.extend(rooms.values()) + return tuple(total) def get_event_loop(self) -> asyncio.AbstractEventLoop: """Can be used to get the event loop. @@ -632,10 +635,19 @@ async def _connect(self, timeout: int | None = 5): self._pool_idx += 1 self._pool_idx %= len(self._pool) - async def _on_room(self, room_id: int, pkg: Package): + async def _on_room(self, scope: str | None, room_id: int, pkg: Package): async with self._rooms_lock: try: - room = self._rooms[room_id] + if scope is None: + # Fallback for ThingsDB < 1.8.6 + for rooms in self._rooms.values(): + room = rooms.get(room_id) + if room is not None: + break + else: + raise KeyError + else: + room = self._rooms[scope][room_id] except KeyError: logging.warning( f'Got an event (tp:{pkg.tp}) for room Id {room_id} but ' @@ -658,7 +670,8 @@ def _on_event(self, pkg: Package): return try: - room_id = pkg.data['id'] + scope = pkg.data.get('scope') # ThingsDB < 1.8.6 + room_id: int = pkg.data['id'] except KeyError: if pkg.tp == Proto.ON_WARN: warn = pkg.data @@ -669,7 +682,7 @@ def _on_event(self, pkg: Package): logging.warning( f'Unexpected event: tp:{pkg.tp} data:{pkg.data}') else: - asyncio.ensure_future(self._on_room(room_id, pkg), + asyncio.ensure_future(self._on_room(scope, room_id, pkg), loop=self.get_event_loop()) def _on_connection_lost(self, protocol: asyncio.Protocol, exc: Exception): @@ -728,9 +741,10 @@ async def _rejoin(self): # re-arrange the rooms per scope to combine joins in a less requests scopes: dict[str, list[int]] = defaultdict(list) - for room in self._rooms.values(): - if room.id and room.scope: - scopes[room.scope].append(room.id) + for rooms in self._rooms.values(): + for room in rooms.values(): + if room.id and room.scope: + scopes[room.scope].append(room.id) # join request per scope, each for one or more rooms await asyncio.gather(*[ diff --git a/thingsdb/room/roombase.py b/thingsdb/room/roombase.py index 8aca047..d9e2d72 100644 --- a/thingsdb/room/roombase.py +++ b/thingsdb/room/roombase.py @@ -6,6 +6,8 @@ from ..client import Client from ..client.protocol import Proto from ..util.is_name import is_name +from ..util.cnscope import cnscope +from ..util.fcscope import fcscope if TYPE_CHECKING: from ..client.package import Package @@ -47,11 +49,13 @@ def __init__( """ self._client: Client | None = None self._id = room - self._scope = scope + self._key = None + self._scope = \ + None if scope is None else f'@collection:{cnscope(scope)}' self._wait_join: bool | asyncio.Future[None] | None = False @property - def id(self): + def id(self) -> int | None: return self._id if isinstance(self._id, int) else None @property @@ -72,7 +76,8 @@ async def no_join(self, client: Client): """ async with client._rooms_lock: if self._scope is None: - self._scope = client.get_default_scope() + scope = client.get_default_scope() + self._scope = f'@collection:{cnscope(scope)}' self._client = client if isinstance(self._id, str): @@ -96,6 +101,7 @@ async def no_join(self, client: Client): '!is_err(try(room(id)));', id=id, scope=self._scope) if not is_room: raise TypeError(f'Id `{id}` is not a room') + assert isinstance(id, int) self._id = id async def join(self, client: Client, wait: float | None = 60.0): @@ -116,7 +122,9 @@ async def join(self, client: Client, wait: float | None = 60.0): # is created inside the dict *before* the on_join is called. async with client._rooms_lock: if self._scope is None: - self._scope = client.get_default_scope() + scope = client.get_default_scope() + self._scope = f'@collection:{cnscope(scope)}' + self._client = client if isinstance(self._id, str): @@ -141,19 +149,23 @@ async def join(self, client: Client, wait: float | None = 60.0): f'the room Id has been returned using the ThingsDB ' f'code `{code}` using scope `{self._scope}`') self._id = id + assert isinstance(self._id, int) else: assert isinstance(self._id, int) res = await client._join(self._id, scope=self._scope) if res[0] is None: raise LookupError(f'room with Id {self._id} not found') - if self._id in client._rooms: - prev = client._rooms[self._id] + self._scope = fcscope(self._scope) + try: + prev = client._rooms[self._scope][self._id] logging.warning( - f'Room Id {self._id} is previously registered by {prev} ' + f'Room Id {self._key} is previously registered by {prev} ' f'and will be overwritten with {self}') + except KeyError: + pass - client._rooms[self._id] = self + client._rooms[self._scope][self._id] = self self.on_init() if wait: self._wait_join = asyncio.Future() @@ -255,12 +267,16 @@ def _on_join(self, _data: Any) -> asyncio.Task[None] | None: asyncio.ensure_future(self.on_join(), loop=loop) def _on_stop(self, func: Callable[[], None]): - try: - assert self._client - if isinstance(self._id, int): - del self._client._rooms[self._id] - except KeyError: - pass + assert self._client + if isinstance(self._id, int) and isinstance(self._scope, str): + rooms = self._client._rooms.get(self._scope) + if rooms is not None: + try: + del rooms[self._id] + except KeyError: + pass + if not rooms: + del self._client._rooms[self._scope] func() def _emit_handler(self, data: _TEvent): diff --git a/thingsdb/util/fcscope.py b/thingsdb/util/fcscope.py new file mode 100644 index 0000000..dc4cb39 --- /dev/null +++ b/thingsdb/util/fcscope.py @@ -0,0 +1,9 @@ +from .cnscope import cnscope + + +def fcscope(scope: str) -> str: + if scope.startswith("@collection:"): + return scope + + cn = cnscope(scope) + return f'@collection:{cn}' diff --git a/thingsdb/version.py b/thingsdb/version.py index e398332..96e3ce8 100644 --- a/thingsdb/version.py +++ b/thingsdb/version.py @@ -1 +1 @@ -__version__ = '1.3.2' +__version__ = '1.4.0'