Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloudsmith_cli/cli/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
auth,
check,
copy,
credential_helper,
delete,
dependencies,
docs,
Expand Down
31 changes: 31 additions & 0 deletions cloudsmith_cli/cli/commands/credential_helper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Comment thread
BartoszBlizniak marked this conversation as resolved.
Credential helper commands for Cloudsmith.

This module provides credential helper commands for package managers
that follow their respective credential helper protocols.
"""

import click

from ..main import main
from .docker import docker as docker_cmd


@click.group()
def credential_helper():
"""
Credential helpers for package managers.

These commands provide credentials for package managers like Docker.
They are typically called by wrapper binaries
(e.g., docker-credential-cloudsmith) or used directly for debugging.

Examples:
# Test Docker credential helper
$ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker
"""


credential_helper.add_command(docker_cmd, name="docker")

main.add_command(credential_helper, name="credential-helper")
83 changes: 83 additions & 0 deletions cloudsmith_cli/cli/commands/credential_helper/docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Comment thread
BartoszBlizniak marked this conversation as resolved.
Docker credential helper command.

Implements the Docker credential helper protocol for Cloudsmith registries.

See: https://github.com/docker/docker-credential-helpers
"""

import json
import sys

import click

from ....credential_helpers.docker import get_credentials
from ...decorators import common_api_auth_options, resolve_credentials


@click.command()
@common_api_auth_options
@resolve_credentials
def docker(opts):
Comment thread
BartoszBlizniak marked this conversation as resolved.
Comment thread
BartoszBlizniak marked this conversation as resolved.
"""
Docker credential helper for Cloudsmith registries.

Reads a Docker registry server URL from stdin and returns credentials in JSON format.
This command implements the 'get' operation of the Docker credential helper protocol.

Only provides credentials for Cloudsmith Docker registries: `*.cloudsmith.io`
and any custom domains configured for the organization (requires CLOUDSMITH_ORG
and a valid API key/token).

Input (stdin):
Server URL as plain text (e.g., "docker.cloudsmith.io")

Output (stdout):
JSON: {"Username": "token", "Secret": "<cloudsmith-token>"}

Exit codes:
0: Success
1: Error (no credentials available, not a Cloudsmith registry, etc.)

Examples:
# Manual testing
$ echo "docker.cloudsmith.io" | cloudsmith credential-helper docker
{"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."}

# Called by Docker via wrapper
$ echo "docker.cloudsmith.io" | docker-credential-cloudsmith get
{"Username":"token","Secret":"eyJ0eXAiOiJKV1Qi..."}

Environment variables:
CLOUDSMITH_API_KEY: API key for authentication (optional)
CLOUDSMITH_ORG: Organization slug (required for custom domain support)
"""
try:
server_url = sys.stdin.read().strip()

if not server_url:
click.echo("Error: No server URL provided on stdin", err=True)
sys.exit(1)

credentials = get_credentials(
server_url,
credential=opts.credential,
session=opts.session,
api_host=opts.api_host or "https://api.cloudsmith.io",
)

if not credentials:
click.echo(
"Error: Unable to retrieve credentials. "
"Provide credentials via the CLOUDSMITH_API_KEY environment variable, "
"credentials.ini, the system keyring, or an OIDC service. "
"Verify current authentication with `cloudsmith whoami --verbose`.",
err=True,
)
Comment thread
BartoszBlizniak marked this conversation as resolved.
sys.exit(1)

click.echo(json.dumps(credentials))

except OSError as e:
click.echo(f"Error: {e}", err=True)
sys.exit(1)
131 changes: 131 additions & 0 deletions cloudsmith_cli/cli/tests/commands/test_credential_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Tests for the `cloudsmith credential-helper docker` command."""

import io
import json
from unittest.mock import patch

import pytest

from ....cli.commands.credential_helper.docker import docker
from ....core.credentials.models import CredentialResult
from ....credential_helpers.custom_domains import get_cache_path, write_cache
from ....credential_helpers.docker.credentials import (
get_credentials as helper_get_credentials,
)
from ....credential_helpers.docker.wrapper import main as docker_wrapper_main


class TestDockerCredentialHelper:
"""Test suite for the Docker credential helper CLI command."""

def test_get_credentials_for_cloudsmith_io(self, runner):
"""`*.cloudsmith.io` URLs should return credentials JSON on stdout."""
fake_creds = {"Username": "token", "Secret": "k_abc"}

with patch(
"cloudsmith_cli.cli.commands.credential_helper.docker.get_credentials"
) as mock_get:
mock_get.return_value = fake_creds
result = runner.invoke(
docker, input="docker.cloudsmith.io", catch_exceptions=False
)

assert result.exit_code == 0
# stdout should contain the serialized JSON exactly as produced by the command.
assert json.dumps(fake_creds) in result.stdout
mock_get.assert_called_once()
# The first positional argument to get_credentials is the server URL.
called_args, _called_kwargs = mock_get.call_args
assert called_args[0] == "docker.cloudsmith.io"

def test_refuses_non_cloudsmith_domain(self, runner):
"""Non-Cloudsmith URLs should exit 1 with an error message on stderr."""
with patch(
"cloudsmith_cli.cli.commands.credential_helper.docker.get_credentials"
) as mock_get:
mock_get.return_value = None
result = runner.invoke(
docker, input="evil.example.com", catch_exceptions=False
)

assert result.exit_code == 1
assert "Unable to retrieve credentials" in result.output
mock_get.assert_called_once()

def test_empty_stdin_exits_1(self, runner):
"""Empty stdin should exit 1 with a descriptive error on stderr."""
with patch(
"cloudsmith_cli.cli.commands.credential_helper.docker.get_credentials"
) as mock_get:
result = runner.invoke(docker, input="", catch_exceptions=False)

assert result.exit_code == 1
assert "No server URL provided" in result.output
# get_credentials should never be called when there is no URL.
mock_get.assert_not_called()

def test_custom_domain_with_cached_response(self, tmp_path, monkeypatch):
"""A cached custom-domain entry should authorise credential issuance.

This exercises the helper-level `get_credentials` (not the click command)
so the on-disk custom-domain cache lookup runs end to end. The click
command's wiring is covered by the other tests in this class.
"""
# Point the cache base at a per-test temp directory.
monkeypatch.setattr(
"cloudsmith_cli.credential_helpers.custom_domains.get_default_config_path",
lambda: str(tmp_path),
)
# is_cloudsmith_domain reads CLOUDSMITH_ORG from the environment.
monkeypatch.setenv("CLOUDSMITH_ORG", "acme")

# Seed the cache file at the path the helper will read from.
cache_path = get_cache_path("acme")
assert cache_path.parent.exists(), "get_cache_path should create the dir"
write_cache(cache_path, ["docker.acme.com"])

credential = CredentialResult(api_key="k_xyz", source_name="test")

# Sentinel session; the cache hit means no HTTP call should be made.
class _BoomSession:
def get(self, *_args, **_kwargs):
raise AssertionError(
"Network call attempted despite a valid custom-domain cache"
)

result = helper_get_credentials(
"docker.acme.com",
credential=credential,
session=_BoomSession(),
api_host="https://api.cloudsmith.io",
)

assert result == {"Username": "token", "Secret": "k_xyz"}

@pytest.mark.parametrize("operation", ["store", "erase"])
def test_wrapper_read_only_operations_are_noops(
self, operation, monkeypatch, capsys
):
"""Docker's write operations should succeed without storing anything."""
monkeypatch.setattr("sys.argv", ["docker-credential-cloudsmith", operation])
monkeypatch.setattr("sys.stdin", io.StringIO('{"ServerURL":"example.com"}'))

with pytest.raises(SystemExit) as exc:
docker_wrapper_main()

assert exc.value.code == 0
output = capsys.readouterr()
assert output.out == ""
assert output.err == ""

def test_wrapper_list_returns_empty_json(self, monkeypatch, capsys):
"""Docker's list operation should return an empty credential object."""
monkeypatch.setattr("sys.argv", ["docker-credential-cloudsmith", "list"])

with pytest.raises(SystemExit) as exc:
docker_wrapper_main()

assert exc.value.code == 0
output = capsys.readouterr()
assert output.out == "{}\n"
assert output.err == ""
35 changes: 35 additions & 0 deletions cloudsmith_cli/core/cache_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright 2026 Cloudsmith Ltd
"""Shared utilities for on-disk credential and cache storage."""

from __future__ import annotations

import json
import os
import tempfile
from typing import Any


def atomic_write_json(path: str | os.PathLike, data: Any, *, mode: int = 0o600) -> None:
"""Atomically write JSON to a file with restrictive permissions.

Writes to a sibling temp file, fsyncs, sets mode, then renames over the
destination. Concurrent readers never see a partial file. Temp file is
removed on error. Caller is responsible for ensuring the parent directory
exists.
"""
dest = os.fspath(path)
parent = os.path.dirname(dest) or "."
tmp_fd, tmp_path = tempfile.mkstemp(dir=parent, prefix=".tmp_", suffix=".json")
try:
with os.fdopen(tmp_fd, "w", encoding="utf-8") as f:
json.dump(data, f)
f.flush()
os.fsync(f.fileno())
os.chmod(tmp_path, mode)
os.replace(tmp_path, dest)
except (OSError, TypeError, ValueError):
try:
os.unlink(tmp_path)
except OSError:
pass
raise
6 changes: 3 additions & 3 deletions cloudsmith_cli/core/credentials/oidc/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,13 @@ def _store_in_keyring(api_host: str, org: str, service_slug: str, data: dict) ->

def _store_on_disk(api_host: str, org: str, service_slug: str, data: dict) -> None:
"""Store token on disk."""
from ...cache_utils import atomic_write_json

cache_dir = _get_cache_dir()
cache_file = os.path.join(cache_dir, _cache_key(api_host, org, service_slug))

try:
fd = os.open(cache_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600)
with os.fdopen(fd, "w") as f:
json.dump(data, f)
atomic_write_json(cache_file, data)
logger.debug(
"Stored OIDC token on disk (expires_at=%s)", data.get("expires_at")
)
Expand Down
6 changes: 6 additions & 0 deletions cloudsmith_cli/credential_helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""
Comment thread
BartoszBlizniak marked this conversation as resolved.
Credential helpers for various package managers.

This package provides credential helper implementations for Docker, pip, npm, etc.
Each helper follows its respective package manager's credential helper protocol.
"""
Loading
Loading