Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion src/cachekit/backends/memcached/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
from typing import Any, Optional

from cachekit.backends.errors import BackendError, BackendErrorType
from cachekit.backends.memcached.config import MAX_MEMCACHED_TTL, MemcachedBackendConfig
from cachekit.backends.memcached.error_handler import classify_memcached_error

Expand Down Expand Up @@ -108,12 +109,32 @@ def set(self, key: str, value: bytes, ttl: Optional[int] = None) -> None:
Raises:
BackendError: If Memcached operation fails.
"""
# Guard client-side against oversized items. Memcached rejects items over its
# item-size limit (default 1 MiB), but with noreply that rejection is never read —
# the call appears to succeed and the entry is silently never cached. Fail loudly
# instead, so the caller can compress, shard, or switch backends.
max_size = self._config.max_item_size_bytes
if max_size and len(value) > max_size:
raise BackendError(
message=(
f"Value for key {key!r} is {len(value)} bytes, which exceeds the Memcached "
f"max item size of {max_size} bytes. Memcached cannot store it. Enable "
f"compression, use a larger-payload backend (Redis/SaaS/File), or raise both "
f"the server's -I limit and CACHEKIT_MEMCACHED_MAX_ITEM_SIZE_BYTES."
),
error_type=BackendErrorType.PERMANENT,
operation="set",
key=key,
)

expire = 0
if ttl is not None and ttl > 0:
expire = min(ttl, MAX_MEMCACHED_TTL)

try:
self._client.set(self._prefixed_key(key), value, expire=expire)
# noreply=False so an oversized/error reply from the server is read and surfaced
# rather than silently swallowed (HashClient defaults to noreply=True).
self._client.set(self._prefixed_key(key), value, expire=expire, noreply=False)
except Exception as exc:
raise classify_memcached_error(exc, operation="set", key=key) from exc

Expand Down
10 changes: 10 additions & 0 deletions src/cachekit/backends/memcached/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ class MemcachedBackendConfig(BaseBackendConfig):
default="",
description="Optional prefix for all cache keys",
)
max_item_size_bytes: int = Field(
default=1024 * 1024,
ge=0,
description=(
"Reject values larger than this BEFORE sending to Memcached (0 disables the check). "
"Memcached's default item-size limit is 1 MiB (server -I flag); oversized items are "
"rejected by the server, and with noreply that rejection is silent — so cachekit "
"guards client-side and fails loudly. Raise this only if the server's -I is raised too."
),
)

@field_validator("servers", mode="after")
@classmethod
Expand Down
11 changes: 10 additions & 1 deletion src/cachekit/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from __future__ import annotations

from typing import Any, Optional
from typing import Any, Literal, Optional

from pydantic import (
Field,
Expand Down Expand Up @@ -116,6 +116,15 @@ class CachekitConfig(BaseSettings):
le=9,
description="Zlib compression level (1-9, where 9 is highest compression)",
)
arrow_compression: Literal["zstd", "lz4", "none"] = Field(
default="zstd",
description=(
"Arrow IPC compression codec for DataFrame caching (ArrowSerializer, compression='auto'). "
"'zstd'/'lz4' shrink the stored payload but must be decompressed into the heap on read. "
"'none' stores uncompressed Arrow IPC, which enables zero-copy memory-mapped reads "
"(lowest read memory) at the cost of a larger payload. Env: CACHEKIT_ARROW_COMPRESSION."
),
)
retry_on_timeout: bool = Field(
default=True,
description="Whether to retry operations on timeout",
Expand Down
17 changes: 17 additions & 0 deletions src/cachekit/l1_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,23 @@ def put(
# Estimate size
size = self._estimate_size(value)

# Reject entries that cannot fit even in an empty cache. Storing one would push L1
# permanently over its budget, and a multi-GB serialized DataFrame envelope is a
# direct OOM vector (it would also evict every other useful entry on the way in).
# The value is still available from L2; we only decline to mirror it in L1. If a
# smaller entry for this key was cached, drop it so L1 stops serving the stale value.
if size > self.max_memory_bytes:
with self._lock:
if key in self._cache:
self._remove_entry(key)
logger.debug(
"Skipping L1 cache for key %s - value %d bytes exceeds L1 budget %d bytes (served from L2 only)",
key,
size,
self.max_memory_bytes,
)
return

with self._lock:
# Check if key already exists
if key in self._cache:
Expand Down
Loading
Loading