Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ jobs:
strategy:
max-parallel: 4
matrix:
python-version: ["3.10", "3.11", "3.12", "3.13"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v6

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
uses: actions/setup-python@v6
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ dist/

# Temporary sample files
sample*.py


# vscode
.vscode/
32 changes: 23 additions & 9 deletions thingsdb/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def __init__(
self._scope = '@t' # default to thingsdb scope
self._pool_idx = 0
self._reconnecting = False
self._rooms: dict[int, RoomBase] = dict()
self._rooms: dict[str, dict[int, RoomBase]] = defaultdict(dict)
self._rooms_lock = asyncio.Lock()

if ssl is True:
Expand All @@ -74,7 +74,10 @@ def get_rooms(self) -> tuple[RoomBase, ...]:
Returns:
a tuple with unique Room instances.
"""
return tuple(self._rooms.values())
total: list[RoomBase] = []
for rooms in self._rooms.values():
total.extend(rooms.values())
return tuple(total)

def get_event_loop(self) -> asyncio.AbstractEventLoop:
"""Can be used to get the event loop.
Expand Down Expand Up @@ -632,10 +635,19 @@ async def _connect(self, timeout: int | None = 5):
self._pool_idx += 1
self._pool_idx %= len(self._pool)

async def _on_room(self, room_id: int, pkg: Package):
async def _on_room(self, scope: str | None, room_id: int, pkg: Package):
async with self._rooms_lock:
try:
room = self._rooms[room_id]
if scope is None:
# Fallback for ThingsDB < 1.8.6
for rooms in self._rooms.values():
room = rooms.get(room_id)
if room is not None:
break
else:
raise KeyError
else:
room = self._rooms[scope][room_id]
except KeyError:
logging.warning(
f'Got an event (tp:{pkg.tp}) for room Id {room_id} but '
Expand All @@ -658,7 +670,8 @@ def _on_event(self, pkg: Package):
return

try:
room_id = pkg.data['id']
scope = pkg.data.get('scope') # ThingsDB < 1.8.6
room_id: int = pkg.data['id']
except KeyError:
if pkg.tp == Proto.ON_WARN:
warn = pkg.data
Expand All @@ -669,7 +682,7 @@ def _on_event(self, pkg: Package):
logging.warning(
f'Unexpected event: tp:{pkg.tp} data:{pkg.data}')
else:
asyncio.ensure_future(self._on_room(room_id, pkg),
asyncio.ensure_future(self._on_room(scope, room_id, pkg),
loop=self.get_event_loop())

def _on_connection_lost(self, protocol: asyncio.Protocol, exc: Exception):
Expand Down Expand Up @@ -728,9 +741,10 @@ async def _rejoin(self):

# re-arrange the rooms per scope to combine joins in a less requests
scopes: dict[str, list[int]] = defaultdict(list)
for room in self._rooms.values():
if room.id and room.scope:
scopes[room.scope].append(room.id)
for rooms in self._rooms.values():
for room in rooms.values():
if room.id and room.scope:
scopes[room.scope].append(room.id)

# join request per scope, each for one or more rooms
await asyncio.gather(*[
Expand Down
44 changes: 30 additions & 14 deletions thingsdb/room/roombase.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from ..client import Client
from ..client.protocol import Proto
from ..util.is_name import is_name
from ..util.cnscope import cnscope
from ..util.fcscope import fcscope
if TYPE_CHECKING:
from ..client.package import Package

Expand Down Expand Up @@ -47,11 +49,13 @@ def __init__(
"""
self._client: Client | None = None
self._id = room
self._scope = scope
self._key = None
self._scope = \
None if scope is None else f'@collection:{cnscope(scope)}'
self._wait_join: bool | asyncio.Future[None] | None = False

@property
def id(self):
def id(self) -> int | None:
return self._id if isinstance(self._id, int) else None

@property
Expand All @@ -72,7 +76,8 @@ async def no_join(self, client: Client):
"""
async with client._rooms_lock:
if self._scope is None:
self._scope = client.get_default_scope()
scope = client.get_default_scope()
self._scope = f'@collection:{cnscope(scope)}'
self._client = client

if isinstance(self._id, str):
Expand All @@ -96,6 +101,7 @@ async def no_join(self, client: Client):
'!is_err(try(room(id)));', id=id, scope=self._scope)
if not is_room:
raise TypeError(f'Id `{id}` is not a room')
assert isinstance(id, int)
self._id = id

async def join(self, client: Client, wait: float | None = 60.0):
Expand All @@ -116,7 +122,9 @@ async def join(self, client: Client, wait: float | None = 60.0):
# is created inside the dict *before* the on_join is called.
async with client._rooms_lock:
if self._scope is None:
self._scope = client.get_default_scope()
scope = client.get_default_scope()
self._scope = f'@collection:{cnscope(scope)}'

self._client = client

if isinstance(self._id, str):
Expand All @@ -141,19 +149,23 @@ async def join(self, client: Client, wait: float | None = 60.0):
f'the room Id has been returned using the ThingsDB '
f'code `{code}` using scope `{self._scope}`')
self._id = id
assert isinstance(self._id, int)
else:
assert isinstance(self._id, int)
res = await client._join(self._id, scope=self._scope)
if res[0] is None:
raise LookupError(f'room with Id {self._id} not found')

if self._id in client._rooms:
prev = client._rooms[self._id]
self._scope = fcscope(self._scope)
try:
prev = client._rooms[self._scope][self._id]
logging.warning(
f'Room Id {self._id} is previously registered by {prev} '
f'Room Id {self._key} is previously registered by {prev} '
f'and will be overwritten with {self}')
except KeyError:
pass

client._rooms[self._id] = self
client._rooms[self._scope][self._id] = self
self.on_init()
if wait:
self._wait_join = asyncio.Future()
Expand Down Expand Up @@ -255,12 +267,16 @@ def _on_join(self, _data: Any) -> asyncio.Task[None] | None:
asyncio.ensure_future(self.on_join(), loop=loop)

def _on_stop(self, func: Callable[[], None]):
try:
assert self._client
if isinstance(self._id, int):
del self._client._rooms[self._id]
except KeyError:
pass
assert self._client
if isinstance(self._id, int) and isinstance(self._scope, str):
rooms = self._client._rooms.get(self._scope)
if rooms is not None:
try:
del rooms[self._id]
except KeyError:
pass
if not rooms:
del self._client._rooms[self._scope]
func()

def _emit_handler(self, data: _TEvent):
Expand Down
9 changes: 9 additions & 0 deletions thingsdb/util/fcscope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .cnscope import cnscope


def fcscope(scope: str) -> str:
if scope.startswith("@collection:"):
return scope

cn = cnscope(scope)
return f'@collection:{cn}'
2 changes: 1 addition & 1 deletion thingsdb/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.3.2'
__version__ = '1.4.0'