diff --git a/cloudsmith_cli/cli/commands/__init__.py b/cloudsmith_cli/cli/commands/__init__.py index ed80eac5..634add8f 100644 --- a/cloudsmith_cli/cli/commands/__init__.py +++ b/cloudsmith_cli/cli/commands/__init__.py @@ -4,6 +4,7 @@ auth, check, copy, + credential_helper, delete, dependencies, docs, diff --git a/cloudsmith_cli/cli/commands/credential_helper/__init__.py b/cloudsmith_cli/cli/commands/credential_helper/__init__.py new file mode 100644 index 00000000..10727bf4 --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/__init__.py @@ -0,0 +1,31 @@ +""" +Credential helper commands for Cloudsmith. + +This module provides credential helper commands for package managers +that follow their respective credential helper protocols. +""" + +import click + +from ..main import main +from .docker import docker as docker_cmd + + +@click.group() +def credential_helper(): + """ + Credential helpers for package managers. + + These commands provide credentials for package managers like Docker. + They are typically called by wrapper binaries + (e.g., docker-credential-cloudsmith) or used directly for debugging. + + Examples: + # Test Docker credential helper + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + """ + + +credential_helper.add_command(docker_cmd, name="docker") + +main.add_command(credential_helper, name="credential-helper") diff --git a/cloudsmith_cli/cli/commands/credential_helper/docker.py b/cloudsmith_cli/cli/commands/credential_helper/docker.py new file mode 100644 index 00000000..3026e34b --- /dev/null +++ b/cloudsmith_cli/cli/commands/credential_helper/docker.py @@ -0,0 +1,83 @@ +""" +Docker credential helper command. + +Implements the Docker credential helper protocol for Cloudsmith registries. + +See: https://github.com/docker/docker-credential-helpers +""" + +import json +import sys + +import click + +from ....credential_helpers.docker import get_credentials +from ...decorators import common_api_auth_options, resolve_credentials + + +@click.command() +@common_api_auth_options +@resolve_credentials +def docker(opts): + """ + Docker credential helper for Cloudsmith registries. + + Reads a Docker registry server URL from stdin and returns credentials in JSON format. + This command implements the 'get' operation of the Docker credential helper protocol. + + Only provides credentials for Cloudsmith Docker registries: `*.cloudsmith.io` + and any custom domains configured for the organization (requires CLOUDSMITH_ORG + and a valid API key/token). + + Input (stdin): + Server URL as plain text (e.g., "docker.cloudsmith.io") + + Output (stdout): + JSON: {"Username": "token", "Secret": ""} + + Exit codes: + 0: Success + 1: Error (no credentials available, not a Cloudsmith registry, etc.) + + Examples: + # Manual testing + $ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker + {"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."} + + # Called by Docker via wrapper + $ echo "docker.cloudsmith.io" | docker-credential-cloudsmith get + {"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."} + + Environment variables: + CLOUDSMITH_API_KEY: API key for authentication (optional) + CLOUDSMITH_ORG: Organization slug (required for custom domain support) + """ + try: + server_url = sys.stdin.read().strip() + + if not server_url: + click.echo("Error: No server URL provided on stdin", err=True) + sys.exit(1) + + credentials = get_credentials( + server_url, + credential=opts.credential, + session=opts.session, + api_host=opts.api_host or "https://api.cloudsmith.io", + ) + + if not credentials: + click.echo( + "Error: Unable to retrieve credentials. " + "Provide credentials via the CLOUDSMITH_API_KEY environment variable, " + "credentials.ini, the system keyring, or an OIDC service. " + "Verify current authentication with `cloudsmith whoami --verbose`.", + err=True, + ) + sys.exit(1) + + click.echo(json.dumps(credentials)) + + except OSError as e: + click.echo(f"Error: {e}", err=True) + sys.exit(1) diff --git a/cloudsmith_cli/cli/tests/commands/test_credential_helper.py b/cloudsmith_cli/cli/tests/commands/test_credential_helper.py new file mode 100644 index 00000000..9c91465e --- /dev/null +++ b/cloudsmith_cli/cli/tests/commands/test_credential_helper.py @@ -0,0 +1,131 @@ +"""Tests for the `cloudsmith credential-helper docker` command.""" + +import io +import json +from unittest.mock import patch + +import pytest + +from ....cli.commands.credential_helper.docker import docker +from ....core.credentials.models import CredentialResult +from ....credential_helpers.custom_domains import get_cache_path, write_cache +from ....credential_helpers.docker.credentials import ( + get_credentials as helper_get_credentials, +) +from ....credential_helpers.docker.wrapper import main as docker_wrapper_main + + +class TestDockerCredentialHelper: + """Test suite for the Docker credential helper CLI command.""" + + def test_get_credentials_for_cloudsmith_io(self, runner): + """`*.cloudsmith.io` URLs should return credentials JSON on stdout.""" + fake_creds = {"Username": "token", "Secret": "k_abc"} + + with patch( + "cloudsmith_cli.cli.commands.credential_helper.docker.get_credentials" + ) as mock_get: + mock_get.return_value = fake_creds + result = runner.invoke( + docker, input="docker.cloudsmith.io", catch_exceptions=False + ) + + assert result.exit_code == 0 + # stdout should contain the serialized JSON exactly as produced by the command. + assert json.dumps(fake_creds) in result.stdout + mock_get.assert_called_once() + # The first positional argument to get_credentials is the server URL. + called_args, _called_kwargs = mock_get.call_args + assert called_args[0] == "docker.cloudsmith.io" + + def test_refuses_non_cloudsmith_domain(self, runner): + """Non-Cloudsmith URLs should exit 1 with an error message on stderr.""" + with patch( + "cloudsmith_cli.cli.commands.credential_helper.docker.get_credentials" + ) as mock_get: + mock_get.return_value = None + result = runner.invoke( + docker, input="evil.example.com", catch_exceptions=False + ) + + assert result.exit_code == 1 + assert "Unable to retrieve credentials" in result.output + mock_get.assert_called_once() + + def test_empty_stdin_exits_1(self, runner): + """Empty stdin should exit 1 with a descriptive error on stderr.""" + with patch( + "cloudsmith_cli.cli.commands.credential_helper.docker.get_credentials" + ) as mock_get: + result = runner.invoke(docker, input="", catch_exceptions=False) + + assert result.exit_code == 1 + assert "No server URL provided" in result.output + # get_credentials should never be called when there is no URL. + mock_get.assert_not_called() + + def test_custom_domain_with_cached_response(self, tmp_path, monkeypatch): + """A cached custom-domain entry should authorise credential issuance. + + This exercises the helper-level `get_credentials` (not the click command) + so the on-disk custom-domain cache lookup runs end to end. The click + command's wiring is covered by the other tests in this class. + """ + # Point the cache base at a per-test temp directory. + monkeypatch.setattr( + "cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path", + lambda: str(tmp_path), + ) + # is_cloudsmith_domain reads CLOUDSMITH_ORG from the environment. + monkeypatch.setenv("CLOUDSMITH_ORG", "acme") + + # Seed the cache file at the path the helper will read from. + cache_path = get_cache_path("acme") + assert cache_path.parent.exists(), "get_cache_path should create the dir" + write_cache(cache_path, ["docker.acme.com"]) + + credential = CredentialResult(api_key="k_xyz", source_name="test") + + # Sentinel session; the cache hit means no HTTP call should be made. + class _BoomSession: + def get(self, *_args, **_kwargs): + raise AssertionError( + "Network call attempted despite a valid custom-domain cache" + ) + + result = helper_get_credentials( + "docker.acme.com", + credential=credential, + session=_BoomSession(), + api_host="https://api.cloudsmith.io", + ) + + assert result == {"Username": "token", "Secret": "k_xyz"} + + @pytest.mark.parametrize("operation", ["store", "erase"]) + def test_wrapper_read_only_operations_are_noops( + self, operation, monkeypatch, capsys + ): + """Docker's write operations should succeed without storing anything.""" + monkeypatch.setattr("sys.argv", ["docker-credential-cloudsmith", operation]) + monkeypatch.setattr("sys.stdin", io.StringIO('{"ServerURL":"example.com"}')) + + with pytest.raises(SystemExit) as exc: + docker_wrapper_main() + + assert exc.value.code == 0 + output = capsys.readouterr() + assert output.out == "" + assert output.err == "" + + def test_wrapper_list_returns_empty_json(self, monkeypatch, capsys): + """Docker's list operation should return an empty credential object.""" + monkeypatch.setattr("sys.argv", ["docker-credential-cloudsmith", "list"]) + + with pytest.raises(SystemExit) as exc: + docker_wrapper_main() + + assert exc.value.code == 0 + output = capsys.readouterr() + assert output.out == "{}\n" + assert output.err == "" diff --git a/cloudsmith_cli/core/cache_utils.py b/cloudsmith_cli/core/cache_utils.py new file mode 100644 index 00000000..1e65bdef --- /dev/null +++ b/cloudsmith_cli/core/cache_utils.py @@ -0,0 +1,35 @@ +# Copyright 2026 Cloudsmith Ltd +"""Shared utilities for on-disk credential and cache storage.""" + +from __future__ import annotations + +import json +import os +import tempfile +from typing import Any + + +def atomic_write_json(path: str | os.PathLike, data: Any, *, mode: int = 0o600) -> None: + """Atomically write JSON to a file with restrictive permissions. + + Writes to a sibling temp file, fsyncs, sets mode, then renames over the + destination. Concurrent readers never see a partial file. Temp file is + removed on error. Caller is responsible for ensuring the parent directory + exists. + """ + dest = os.fspath(path) + parent = os.path.dirname(dest) or "." + tmp_fd, tmp_path = tempfile.mkstemp(dir=parent, prefix=".tmp_", suffix=".json") + try: + with os.fdopen(tmp_fd, "w", encoding="utf-8") as f: + json.dump(data, f) + f.flush() + os.fsync(f.fileno()) + os.chmod(tmp_path, mode) + os.replace(tmp_path, dest) + except (OSError, TypeError, ValueError): + try: + os.unlink(tmp_path) + except OSError: + pass + raise diff --git a/cloudsmith_cli/core/credentials/oidc/cache.py b/cloudsmith_cli/core/credentials/oidc/cache.py index 183a9615..4136543f 100644 --- a/cloudsmith_cli/core/credentials/oidc/cache.py +++ b/cloudsmith_cli/core/credentials/oidc/cache.py @@ -183,13 +183,13 @@ def _store_in_keyring(api_host: str, org: str, service_slug: str, data: dict) -> def _store_on_disk(api_host: str, org: str, service_slug: str, data: dict) -> None: """Store token on disk.""" + from ...cache_utils import atomic_write_json + cache_dir = _get_cache_dir() cache_file = os.path.join(cache_dir, _cache_key(api_host, org, service_slug)) try: - fd = os.open(cache_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) - with os.fdopen(fd, "w") as f: - json.dump(data, f) + atomic_write_json(cache_file, data) logger.debug( "Stored OIDC token on disk (expires_at=%s)", data.get("expires_at") ) diff --git a/cloudsmith_cli/credential_helpers/__init__.py b/cloudsmith_cli/credential_helpers/__init__.py new file mode 100644 index 00000000..6e6715ce --- /dev/null +++ b/cloudsmith_cli/credential_helpers/__init__.py @@ -0,0 +1,6 @@ +""" +Credential helpers for various package managers. + +This package provides credential helper implementations for Docker, pip, npm, etc. +Each helper follows its respective package manager's credential helper protocol. +""" diff --git a/cloudsmith_cli/credential_helpers/common.py b/cloudsmith_cli/credential_helpers/common.py new file mode 100644 index 00000000..a9e0f787 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/common.py @@ -0,0 +1,93 @@ +""" +Shared utilities for credential helpers. + +Provides domain checking used by all credential helpers. +""" + +import logging +import os + +logger = logging.getLogger(__name__) + + +def extract_hostname(url): + """ + Extract bare hostname from any URL format. + + Handles protocols, sparse+ prefix, ports, paths, and trailing slashes. + + Args: + url: URL in any format (e.g., "sparse+https://cargo.cloudsmith.io/org/repo/") + + Returns: + str: Lowercase hostname (e.g., "cargo.cloudsmith.io") + """ + if not url: + return "" + + normalized = url.lower().strip() + + # Remove sparse+ prefix (Cargo) + if normalized.startswith("sparse+"): + normalized = normalized[7:] + + # Remove protocol + if "://" in normalized: + normalized = normalized.split("://", 1)[1] + + # Remove userinfo (user@host) + if "@" in normalized.split("/")[0]: + normalized = normalized.split("@", 1)[1] + + # Extract hostname (before first / or :) + hostname = normalized.split("/")[0].split(":")[0] + + return hostname + + +def is_cloudsmith_domain( + url, session=None, api_key=None, auth_type="api_key", api_host=None +): + """ + Check if a URL points to a Cloudsmith service. + + Checks standard *.cloudsmith.io domains first (no auth needed). + If not a standard domain, queries the Cloudsmith API for custom domains. + + Args: + url: URL or hostname to check + session: Pre-configured requests.Session with proxy/SSL settings + api_key: API key/token for authenticating custom domain lookups + auth_type: "api_key" (X-Api-Key header) or "bearer" (Authorization: Bearer) + api_host: Cloudsmith API host URL + + Returns: + bool: True if this is a Cloudsmith domain + """ + hostname = extract_hostname(url) + if not hostname: + return False + + # Standard Cloudsmith domains — no auth needed + if hostname == "cloudsmith.io" or hostname.endswith(".cloudsmith.io"): + return True + + # Custom domains require org + auth + org = os.environ.get("CLOUDSMITH_ORG", "").strip() + if not org: + return False + + if not api_key: + return False + + from .custom_domains import get_custom_domains_for_org + + custom_domains = get_custom_domains_for_org( + org, + session=session, + api_key=api_key, + auth_type=auth_type, + api_host=api_host, + ) + + return hostname in {d.lower() for d in custom_domains if isinstance(d, str)} diff --git a/cloudsmith_cli/credential_helpers/custom_domains.py b/cloudsmith_cli/credential_helpers/custom_domains.py new file mode 100644 index 00000000..027cb510 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/custom_domains.py @@ -0,0 +1,198 @@ +""" +Helper for discovering Cloudsmith custom domains. + +This module provides functions to fetch custom domains from the Cloudsmith API +for use in credential helpers. Results are cached on the filesystem. +""" + +import json +import logging +import time +from pathlib import Path + +import requests + +from ..cli.config import get_default_config_path +from ..core.cache_utils import atomic_write_json + +logger = logging.getLogger(__name__) + +# Cache custom domains for 1 hour +CACHE_TTL_SECONDS = 3600 + + +def get_cache_dir() -> Path: + """ + Get the cache directory for custom domains. + """ + cache_dir = Path(get_default_config_path()) / "custom_domains_cache" + cache_dir.mkdir(mode=0o700, parents=True, exist_ok=True) + return cache_dir + + +def get_cache_path(org: str) -> Path: + """ + Get the cache file path for an organization's custom domains. + + Args: + org: Organization slug + + Returns: + Path to cache file + """ + cache_dir = get_cache_dir() + safe_org = "".join(c if c.isalnum() or c in "-_" else "_" for c in org) + return cache_dir / f"{safe_org}.json" + + +def is_cache_valid(cache_path: Path) -> bool: + """ + Check if a cache file exists and is still valid. + + Args: + cache_path: Path to cache file + + Returns: + bool: True if cache exists and hasn't expired + """ + if not cache_path.exists(): + return False + + try: + mtime = cache_path.stat().st_mtime + age = time.time() - mtime + return age < CACHE_TTL_SECONDS + except OSError: + return False + + +def read_cache(cache_path: Path) -> list[str] | None: + """ + Read custom domains from cache file. + + Args: + cache_path: Path to cache file + + Returns: + List of domain strings or None if cache invalid/missing + """ + if not is_cache_valid(cache_path): + return None + + try: + with open(cache_path, encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict) and "domains" in data: + domains = data["domains"] + if isinstance(domains, list): + logger.debug( + "Read %d domains from cache: %s", len(domains), cache_path + ) + return domains + except (OSError, json.JSONDecodeError) as exc: + logger.debug("Failed to read cache %s: %s", cache_path, exc) + + return None + + +def write_cache(cache_path: Path, domains: list[str]) -> None: + """Write custom domains to cache file.""" + data = { + "domains": domains, + "cached_at": time.time(), + } + try: + atomic_write_json(cache_path, data) + logger.debug("Wrote %d domains to cache: %s", len(domains), cache_path) + except OSError as exc: + logger.debug("Failed to write cache %s: %s", cache_path, exc) + + +def get_custom_domains_for_org( # pylint: disable=too-many-return-statements + org: str, + session=None, + api_key: str | None = None, + auth_type: str = "api_key", + api_host: str | None = None, +) -> list[str]: + """ + Fetch custom domains for a Cloudsmith organization. + + Results are cached on the filesystem for 1 hour to avoid excessive API calls. + + Args: + org: Organization slug + session: Pre-configured requests.Session with proxy/SSL settings. + If None, a plain requests session is used. + api_key: Optional API key/token for authentication + auth_type: "api_key" (uses X-Api-Key header) or "bearer" (uses Authorization: Bearer) + api_host: Cloudsmith API host URL. Defaults to https://api.cloudsmith.io. + + Returns: + List of custom domain strings (e.g., ['docker.customer.com', 'dl.customer.com']) + Empty list if API call fails or org has no custom domains + """ + cache_path = get_cache_path(org) + cached_domains = read_cache(cache_path) + if cached_domains is not None: + logger.debug("Using cached custom domains for %s", org) + return cached_domains + + logger.debug("Fetching custom domains from API for %s", org) + + try: + if session is None: + session = requests.Session() + + headers = {} + if api_key: + if auth_type == "bearer": + headers["Authorization"] = f"Bearer {api_key}" + else: + headers["X-Api-Key"] = api_key + + host = api_host or "https://api.cloudsmith.io" + url = f"{host}/orgs/{org}/custom-domains/" + + response = session.get(url, headers=headers, timeout=10) + + if response.status_code in (401, 403): + logger.debug( + "Custom domains API requires auth - assuming no custom domains for %s", + org, + ) + return [] # Don't cache 401/403 - might work later with auth + + if response.status_code == 404: + logger.debug("Organization %s not found or has no custom domains", org) + write_cache(cache_path, []) # Cache empty result to avoid repeated 404s + return [] + + if response.status_code != 200: + logger.debug( + "Failed to fetch custom domains for %s: HTTP %d", + org, + response.status_code, + ) + return [] + + data = response.json() + + # Expected format: [{"host": "docker.customer.com", ...}, ...] + domains = [] + if isinstance(data, list): + for item in data: + if isinstance(item, dict): + host = item.get("host") + if isinstance(host, str) and host: + domains.append(host) + + logger.debug("Fetched %d custom domains for %s", len(domains), org) + + write_cache(cache_path, domains) + + return domains + + except (requests.RequestException, ValueError) as exc: + logger.debug("Error fetching custom domains: %s", exc) + return [] diff --git a/cloudsmith_cli/credential_helpers/docker/__init__.py b/cloudsmith_cli/credential_helpers/docker/__init__.py new file mode 100644 index 00000000..a96d42d1 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/__init__.py @@ -0,0 +1,3 @@ +from .credentials import get_credentials + +__all__ = ["get_credentials"] diff --git a/cloudsmith_cli/credential_helpers/docker/credentials.py b/cloudsmith_cli/credential_helpers/docker/credentials.py new file mode 100644 index 00000000..0180a6ed --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/credentials.py @@ -0,0 +1,39 @@ +""" +Docker credential helper logic for Cloudsmith. + +This module provides functions for retrieving credentials for Docker registries +using the existing Cloudsmith credential provider chain (OIDC, API keys, config, keyring). +""" + +from ..common import is_cloudsmith_domain + + +def get_credentials(server_url, credential=None, session=None, api_host=None): + """ + Get credentials for a Cloudsmith Docker registry. + + Verifies the URL is a Cloudsmith registry (including custom domains) + and returns credentials if available. + + Args: + server_url: The Docker registry server URL + credential: Pre-resolved CredentialResult from the provider chain + session: Pre-configured requests.Session with proxy/SSL settings + api_host: Cloudsmith API host URL + + Returns: + dict: Credentials with 'Username' and 'Secret' keys, or None + """ + if not credential or not credential.api_key: + return None + + if not is_cloudsmith_domain( + server_url, + session=session, + api_key=credential.api_key, + auth_type=getattr(credential, "auth_type", "api_key"), + api_host=api_host, + ): + return None + + return {"Username": "token", "Secret": credential.api_key} diff --git a/cloudsmith_cli/credential_helpers/docker/wrapper.py b/cloudsmith_cli/credential_helpers/docker/wrapper.py new file mode 100644 index 00000000..cc115455 --- /dev/null +++ b/cloudsmith_cli/credential_helpers/docker/wrapper.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +""" +Wrapper for docker-credential-cloudsmith. + +This is the entry point binary that Docker calls. It delegates to the main +cloudsmith credential-helper docker command for credential lookups and handles +read-only protocol operations locally. + +See: https://github.com/docker/docker-credential-helpers + +Configure in ~/.docker/config.json: + { + "credHelpers": { + "docker.cloudsmith.io": "cloudsmith" + } + } +""" +import subprocess +import sys + + +def main(): + """ + Docker credential helper wrapper. + + Docker calls this with the operation as argv[1]: + - get: Retrieve credentials + - store: Store credentials (not supported) + - erase: Erase credentials (not supported) + - list: List credentials (not supported) + + The helper is read-only, so only 'get' returns Cloudsmith credentials. + """ + if len(sys.argv) < 2: + print( + "Error: Missing operation argument. " + "Usage: docker-credential-cloudsmith ", + file=sys.stderr, + ) + sys.exit(1) + + operation = sys.argv[1] + + if operation == "get": + try: + result = subprocess.run( + ["cloudsmith", "credential-helper", "docker"], + stdin=sys.stdin, + capture_output=False, + check=False, + ) + sys.exit(result.returncode) + except FileNotFoundError: + print( + "Error: 'cloudsmith' command not found. " + "Make sure cloudsmith-cli is installed.", + file=sys.stderr, + ) + sys.exit(1) + elif operation in ("store", "erase"): + try: + if not sys.stdin.isatty(): + sys.stdin.read() + except (OSError, ValueError): + pass + sys.exit(0) + elif operation == "list": + print("{}") + sys.exit(0) + else: + print( + f"Error: Unknown operation '{operation}'. " + "Valid operations: get, store, erase, list", + file=sys.stderr, + ) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/setup.py b/setup.py index 83dd3034..f98cbf8b 100644 --- a/setup.py +++ b/setup.py @@ -74,7 +74,10 @@ def get_long_description(): ], }, entry_points={ - "console_scripts": ["cloudsmith=cloudsmith_cli.cli.commands.main:main"] + "console_scripts": [ + "cloudsmith=cloudsmith_cli.cli.commands.main:main", + "docker-credential-cloudsmith=cloudsmith_cli.credential_helpers.docker.wrapper:main", + ] }, keywords=["cloudsmith", "cli", "devops"], classifiers=[