Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d5b488f
docs(types): add UUID Arrow type mapping design spec (PLT-1162)
kurodo3[bot] Jun 11, 2026
1cca130
docs(plans): add UUID Arrow type implementation plan (PLT-1162)
kurodo3[bot] Jun 11, 2026
045803c
feat(types): add UUID_ARROW_TYPE and UUID_STRUCT_ARROW_TYPE constants…
kurodo3[bot] Jun 11, 2026
eaa27c2
fix(types): make UUID_ARROW_TYPE and UUID_STRUCT_ARROW_TYPE truly laz…
kurodo3[bot] Jun 11, 2026
400c000
feat(semantic-types): add UUIDStructConverter for uuid.UUID Arrow str…
kurodo3[bot] Jun 11, 2026
52b9695
fix(semantic-types): add missing protocol methods to UUIDStructConver…
kurodo3[bot] Jun 11, 2026
cad5ccf
feat(semantic-types): register UUIDStructConverter in default semanti…
kurodo3[bot] Jun 11, 2026
503b730
fix(hashing): UUIDHandler returns bytes instead of str, consistent wi…
kurodo3[bot] Jun 11, 2026
7b3101f
docs(hashing): fix stale UUIDHandler docstring; add missing test docs…
kurodo3[bot] Jun 11, 2026
e353f21
fix(arrow): update system UUID column Arrow types to pa.binary(16) (P…
kurodo3[bot] Jun 11, 2026
e391011
fix(core): correct stale comment and rid_val bytes fallback after UUI…
kurodo3[bot] Jun 11, 2026
19a977a
fix(core): UUID generation sites produce bytes instead of str (PLT-1162)
kurodo3[bot] Jun 11, 2026
7301a8e
fix(databases): map PostgreSQL uuid columns to UUID_ARROW_TYPE (pa.bi…
kurodo3[bot] Jun 11, 2026
0c922b7
docs(types): add PLT-1615 cross-reference in UUID Arrow type spec (PL…
kurodo3[bot] Jun 11, 2026
13a1d5a
refactor(types): address PR review — drop UUID constants, uuid.UUID d…
kurodo3[bot] Jun 12, 2026
74f36d8
fix(uuid): migrate record IDs to pa.large_binary(); add ContentHash.t…
kurodo3[bot] Jun 12, 2026
abea56e
fix(ci): resolve two CI failures — bytes record IDs and PostgreSQL UU…
kurodo3[bot] Jun 12, 2026
6155f75
refactor(databases): coerce str record IDs to bytes at method boundaries
kurodo3[bot] Jun 12, 2026
282afb9
refactor(databases): revert str coercion from protocol — concrete imp…
kurodo3[bot] Jun 12, 2026
246e5d5
refactor(databases): rename _utils.py → utils.py
kurodo3[bot] Jun 12, 2026
e0bbc37
refactor(databases): use pa.binary(16) for UUID/record-id columns, dr…
kurodo3[bot] Jun 12, 2026
87ae2e9
refactor(datagrams): rename datagram_id → datagram_uuid for clarity
kurodo3[bot] Jun 12, 2026
d0cbe5a
fix(sources,databases): resolve Copilot review issues — ambiguous UUI…
kurodo3[bot] Jun 12, 2026
b9a6934
revert(sources): restore original UUIDv5 name format in _make_record_id
kurodo3[bot] Jun 12, 2026
62c8f88
refactor(datagrams,semantic-types): address eywalker review — naming,…
kurodo3[bot] Jun 12, 2026
2fab1b8
refactor(semantic-types): remove add_prefix from hash_struct_dict
kurodo3[bot] Jun 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/orcapod/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@
"streams",
"types",
]


7 changes: 4 additions & 3 deletions src/orcapod/core/data_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
import re
import sys
import uuid
from abc import abstractmethod
from collections.abc import Callable, Iterable, Sequence
import typing
Expand Down Expand Up @@ -549,13 +550,13 @@ def combine(*components: tuple[str, ...]) -> str:
inner_parsed = [":".join(component) for component in components]
return "::".join(inner_parsed)

record_id = str(uuid7())
source_info = {k: combine(self.uri, (record_id,), (k,)) for k in output_data}
new_record_uuid = uuid.UUID(bytes=uuid7().bytes)
source_info = {k: combine(self.uri, (new_record_uuid.hex,), (k,)) for k in output_data}

return Data(
output_data,
source_info=source_info,
record_id=record_id,
record_uuid=new_record_uuid,
python_schema=self.output_data_schema,
data_context=self.data_context,
)
Expand Down
35 changes: 22 additions & 13 deletions src/orcapod/core/datagrams/datagram.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import annotations

import logging
import uuid
from collections.abc import Collection, Iterator, Mapping
from typing import TYPE_CHECKING, Any, Self, cast

Expand Down Expand Up @@ -66,18 +67,18 @@ def __init__(
data: Mapping[str, DataValue] | pa.Table | pa.RecordBatch,
python_schema: SchemaLike | None = None,
meta_info: Mapping[str, DataValue] | None = None,
record_id: str | None = None,
record_uuid: uuid.UUID | None = None,
data_context: str | contexts.DataContext | None = None,
config: OrcapodConfig | None = None,
) -> None:
if isinstance(data, pa.RecordBatch):
data = pa.Table.from_batches([data])

if isinstance(data, pa.Table):
self._init_from_table(data, meta_info, data_context, record_id)
self._init_from_table(data, meta_info, data_context, record_uuid)
else:
self._init_from_dict(
data, python_schema, meta_info, data_context, record_id
data, python_schema, meta_info, data_context, record_uuid
)

def _init_from_dict(
Expand All @@ -86,7 +87,7 @@ def _init_from_dict(
python_schema: SchemaLike | None,
meta_info: Mapping[str, DataValue] | None,
data_context: str | contexts.DataContext | None,
record_id: str | None,
record_uuid: uuid.UUID | None,
) -> None:
data_columns: dict[str, DataValue] = {}
meta_columns: dict[str, DataValue] = {}
Expand All @@ -102,7 +103,7 @@ def _init_from_dict(
data_columns[k] = v

super().__init__(data_context=data_context or extracted_context)
self._datagram_id = record_id
self._datagram_uuid = record_uuid

self._data_dict: dict[str, DataValue] | None = data_columns
self._data_table: pa.Table | None = None
Expand Down Expand Up @@ -134,7 +135,7 @@ def _init_from_table(
table: pa.Table,
meta_info: Mapping[str, DataValue] | None,
data_context: str | contexts.DataContext | None,
record_id: str | None,
record_uuid: uuid.UUID | None,
) -> None:
if len(table) != 1:
raise ValueError(
Expand All @@ -150,7 +151,7 @@ def _init_from_table(
context_cols = [c for c in table.column_names if c == constants.CONTEXT_KEY]

super().__init__(data_context=data_context)
self._datagram_id = record_id
self._datagram_uuid = record_uuid

meta_col_names = [
c for c in table.column_names if c.startswith(constants.META_PREFIX)
Expand Down Expand Up @@ -426,11 +427,19 @@ def identity_structure(self) -> Any:
return self._ensure_data_table()

@property
def datagram_id(self) -> str:
"""Return (or lazily generate) the datagram's unique ID."""
if self._datagram_id is None:
self._datagram_id = str(uuid7())
return self._datagram_id
def datagram_uuid(self) -> uuid.UUID:
"""Return (or lazily generate) the datagram's unique UUID."""
if self._datagram_uuid is None:
# uuid_utils is used here because it provides UUIDv7 (time-ordered,
# monotonic) generation, which is not available in the stdlib before
# Python 3.12. However, uuid_utils.UUID and stdlib uuid.UUID are
# distinct types that do not compare equal even for identical bit
# patterns. We therefore normalise to stdlib uuid.UUID immediately
# so that the public API always returns a consistent type and
# equality / hashing work correctly regardless of how a UUID was
# originally produced.
self._datagram_uuid = uuid.UUID(bytes=uuid7().bytes)
return self._datagram_uuid

@property
def converter(self) -> TypeConverterProtocol:
Expand Down Expand Up @@ -763,7 +772,7 @@ def copy(self, include_cache: bool = True, preserve_id: bool = True) -> Self:
new_d._cached_int_hash = None

# Datagram identity
new_d._datagram_id = self._datagram_id if preserve_id else None
new_d._datagram_uuid = self._datagram_uuid if preserve_id else None

# Data representations — Arrow table is immutable so a ref copy is fine
new_d._data_table = self._data_table
Expand Down
13 changes: 7 additions & 6 deletions src/orcapod/core/datagrams/tag_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import logging
import uuid
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Self

Expand Down Expand Up @@ -61,7 +62,7 @@ def __init__(
meta_info: "Mapping[str, DataValue] | None" = None,
python_schema: "SchemaLike | None" = None,
data_context: "str | contexts.DataContext | None" = None,
record_id: "str | None" = None,
record_uuid: "uuid.UUID | None" = None,
**kwargs,
) -> None:
import pyarrow as _pa
Expand All @@ -78,7 +79,7 @@ def __init__(
data,
meta_info=meta_info,
data_context=data_context,
record_id=record_id,
record_uuid=record_uuid,
**kwargs,
)
sys_tag_cols = [
Expand Down Expand Up @@ -114,7 +115,7 @@ def __init__(
python_schema=python_schema,
meta_info=meta_info,
data_context=data_context,
record_id=record_id,
record_uuid=record_uuid,
**kwargs,
)

Expand Down Expand Up @@ -256,7 +257,7 @@ def __init__(
source_info: "Mapping[str, str | None] | None" = None,
python_schema: "SchemaLike | None" = None,
data_context: "str | contexts.DataContext | None" = None,
record_id: "str | None" = None,
record_uuid: "uuid.UUID | None" = None,
**kwargs,
) -> None:
import pyarrow as _pa
Expand Down Expand Up @@ -287,7 +288,7 @@ def __init__(
data_table,
meta_info=meta_info,
data_context=data_context,
record_id=record_id,
record_uuid=record_uuid,
**kwargs,
)
si_table = prefixed_tables[constants.SOURCE_PREFIX]
Expand Down Expand Up @@ -315,7 +316,7 @@ def __init__(
python_schema=python_schema,
meta_info=meta_info,
data_context=data_context,
record_id=record_id,
record_uuid=record_uuid,
**kwargs,
)
self._source_info = {**contained_source_info, **(source_info or {})}
Expand Down
36 changes: 19 additions & 17 deletions src/orcapod/core/nodes/function_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import asyncio
import logging
import uuid
from collections.abc import Iterator
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, cast

Expand Down Expand Up @@ -696,7 +697,7 @@ def __init__(

# stream-level caching state
self._cached_output_datas: dict[
str, tuple[TagProtocol, DataProtocol | None]
bytes, tuple[TagProtocol, DataProtocol | None]
] = {}
self._cached_output_table: "pa.Table | None" = None
self._cached_content_hash_column: "pa.Array | None" = None
Expand Down Expand Up @@ -1042,7 +1043,7 @@ def execute(
ctx_obs.on_node_start(node_label, node_hash, tag_schema=tag_schema)

# Collect upstream entries and resolve entry_ids
upstream_entries: list[tuple[TagProtocol, DataProtocol, str]] = [
upstream_entries: list[tuple[TagProtocol, DataProtocol, bytes]] = [
(tag, data, self.compute_pipeline_entry_id(tag, data))
for tag, data in input_stream.iter_data()
]
Expand Down Expand Up @@ -1125,7 +1126,7 @@ def _process_data_internal(
self.add_pipeline_record(
tag,
data,
data_record_id=output_data.datagram_id,
data_record_id=output_data.datagram_uuid,
computed=result_computed,
)
else:
Expand All @@ -1142,8 +1143,8 @@ def _process_data_internal(
return tag_out, output_data

def get_cached_results(
self, entry_ids: list[str]
) -> dict[str, tuple[TagProtocol, DataProtocol]]:
self, entry_ids: list[bytes]
) -> dict[bytes, tuple[TagProtocol, DataProtocol]]:
"""Public cache façade: return already-computed results for the given entry IDs.

Serves hits directly from the in-memory cache (``_cached_output_datas``).
Expand Down Expand Up @@ -1213,7 +1214,7 @@ async def _async_process_data_internal(
self.add_pipeline_record(
tag,
data,
data_record_id=output_data.datagram_id,
data_record_id=output_data.datagram_uuid,
computed=result_computed,
)
else:
Expand All @@ -1233,7 +1234,7 @@ async def _async_process_data_internal(

def compute_pipeline_entry_id(
self, tag: TagProtocol, input_data: DataProtocol
) -> str:
) -> bytes:
"""Compute a unique pipeline entry ID from tag + system tags + input data hash.

``NODE_CONTENT_HASH_COL`` is always included so that two runs processing
Expand All @@ -1246,8 +1247,9 @@ def compute_pipeline_entry_id(
input_data: The input data.

Returns:
A hash string uniquely identifying this (tag, input_data, node run)
combination.
Method-prefixed raw bytes (``b"{method}:{digest}"``) uniquely
identifying this (tag, input_data, node run) combination. Suitable
for storage in a ``pa.large_binary()`` column.
"""
tag_with_hash = (
tag.as_table(columns={"system_tags": True})
Expand All @@ -1260,13 +1262,13 @@ def compute_pipeline_entry_id(
pa.array([self.content_hash().to_string()], type=pa.large_string()),
)
)
return self.data_context.arrow_hasher.hash_table(tag_with_hash).to_string()
return self.data_context.arrow_hasher.hash_table(tag_with_hash).to_prefixed_digest()

def add_pipeline_record(
self,
tag: TagProtocol,
input_data: DataProtocol,
data_record_id: str,
data_record_id: uuid.UUID,
computed: bool,
skip_cache_lookup: bool = False,
) -> None:
Expand Down Expand Up @@ -1309,7 +1311,7 @@ def add_pipeline_record(
meta_table = pa.table(
{
constants.DATA_RECORD_ID: pa.array(
[data_record_id], type=pa.large_string()
[data_record_id.bytes], type=pa.large_binary()
),
constants.NODE_CONTENT_HASH_COL: pa.array(
[self.content_hash().to_string()], type=pa.large_string()
Expand Down Expand Up @@ -1435,7 +1437,7 @@ def as_source(self):

def _fetch_joined_records(
self,
entry_ids: list[str] | None = None,
entry_ids: list[bytes] | None = None,
) -> _JoinedRecords | None:
"""Internal primitive: fetch both DBs, content-hash-filter, and inner-join.

Expand Down Expand Up @@ -1503,8 +1505,8 @@ def _fetch_joined_records(

def _load_cached_entries(
self,
entry_ids: list[str] | None = None,
) -> "dict[str, tuple[TagProtocol, DataProtocol]]":
entry_ids: list[bytes] | None = None,
) -> "dict[bytes, tuple[TagProtocol, DataProtocol]]":
"""DB loader: fetch ``(tag, data)`` pairs from the pipeline and result databases.

Calls ``_fetch_joined_records`` to obtain the raw joined table, then
Expand Down Expand Up @@ -1565,7 +1567,7 @@ def _load_cached_entries(
data_table = joined.drop([c for c in drop_cols if c in joined.column_names])
stream = ArrowTableStream(data_table, tag_columns=tag_keys)

loaded: dict[str, tuple[TagProtocol, DataProtocol]] = {}
loaded: dict[bytes, tuple[TagProtocol, DataProtocol]] = {}
for eid, (tag, data) in zip(entry_ids_col, stream.iter_data()):
loaded[eid] = (tag, data)
return loaded
Expand Down Expand Up @@ -1758,7 +1760,7 @@ async def async_execute(
if loaded:
self._cached_output_table = None
self._cached_content_hash_column = None
cached_by_entry_id: dict[str, tuple[TagProtocol, DataProtocol]] = dict(loaded)
cached_by_entry_id: dict[bytes, tuple[TagProtocol, DataProtocol]] = dict(loaded)

# Phase 2: drive output from input channel — cached or compute
async def _process_one_db(
Expand Down
6 changes: 3 additions & 3 deletions src/orcapod/core/operators/join.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ def _predict_system_tag_schema(self, *streams: StreamProtocol) -> Schema:
system_tag_fields: dict[str, type] = {}
for idx, stream in enumerate(ordered_streams):
stream_tag_schema, _ = stream.output_schema(columns={"system_tags": True})
for col_name in stream_tag_schema:
for col_name, col_type in stream_tag_schema.items():
if col_name.startswith(constants.SYSTEM_TAG_PREFIX):
new_name = (
f"{col_name}{constants.BLOCK_SEPARATOR}"
f"{stream.pipeline_hash().to_hex(n_char)}:{idx}"
)
system_tag_fields[new_name] = str
system_tag_fields[new_name] = col_type

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should include tests to ensure that the removal force convresion into str is not creating a new error?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added in . It iterates over all system-tag columns in the predicted schema after a two-way join and asserts that record_id columns map to (from ) and source_id columns map to — verifying that is carried through unchanged.

return Schema(system_tag_fields)

def static_process(self, *streams: StreamProtocol) -> StreamProtocol:
Expand Down Expand Up @@ -699,7 +699,7 @@ def _sort_merged_system_tags(merged_sys: dict) -> dict:
sid_val = merged_sys.get(fmap.get(sid_field, ""))
rid_val = merged_sys.get(fmap.get(rid_field, ""))
vals = {ft: merged_sys[k] for ft, k in fmap.items()}
entries.append(((sid_val or "", rid_val or ""), vals))
entries.append(((sid_val or "", rid_val or b""), vals))

entries.sort(key=lambda e: e[0])

Expand Down
4 changes: 2 additions & 2 deletions src/orcapod/core/operators/merge_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ def _predict_system_tag_schema(
for stream, orig_idx in canonical:
canon_pos = canonical.index((stream, orig_idx))
stream_tag_schema, _ = stream.output_schema(columns={"system_tags": True})
for col_name in stream_tag_schema:
for col_name, col_type in stream_tag_schema.items():
if col_name.startswith(constants.SYSTEM_TAG_PREFIX):
new_name = (
f"{col_name}{constants.BLOCK_SEPARATOR}"
f"{stream.pipeline_hash().to_hex(n_char)}:{canon_pos}"
)
system_tag_fields[new_name] = str
system_tag_fields[new_name] = col_type
return Schema(system_tag_fields)

def binary_static_process(
Expand Down
Loading
Loading