diff --git a/.gitignore b/.gitignore index 43877db..a3c10b3 100644 --- a/.gitignore +++ b/.gitignore @@ -162,5 +162,13 @@ secrets.json venv venv3 venv27 +.venv vcert/example_cert.py -credentials \ No newline at end of file +credentials + +# Generated certificate material from example scripts / live tests (never commit) +/cert.pem +/cert.key +/key.pem +/chain.pem +/renewed_cert.pem \ No newline at end of file diff --git a/examples/ngts/get_cert_ngts.py b/examples/ngts/get_cert_ngts.py new file mode 100644 index 0000000..699b75d --- /dev/null +++ b/examples/ngts/get_cert_ngts.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python3 +# +# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk") +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from vcert import (CertificateRequest, venafi_connection, VenafiPlatform) +import string +import random +import logging +from os import environ + +logging.basicConfig(level=logging.INFO) +logging.getLogger("urllib3").setLevel(logging.ERROR) + + +def main(): + # Get credentials from environment variables. + # NGTS (Palo Alto Networks Next-Gen Trust Security) authenticates with Strata Cloud Manager + # OAuth2 client credentials issued by a service account. Both the API base URL and the token + # URL differ per environment (dev/prod), so both must be supplied. + url = environ.get('NGTS_URL') # NGTS API base URL (e.g. https://api.sase.paloaltonetworks.com/ngts) + token_url = environ.get('NGTS_TOKEN_URL') # OAuth2 token endpoint (different FQDN, env-specific) + client_id = environ.get('NGTS_CLIENT_ID') # Service-account client id + client_secret = environ.get('NGTS_CLIENT_SECRET') # Service-account client secret + tsg_id = environ.get('NGTS_TSG_ID') # Tenant service group id (used to build the scope) + scope = environ.get('NGTS_SCOPE') # Optional: a ready "tsg_id:" scope + zone = environ.get('NGTS_ZONE') # Certificate Issuing Template alias (CIT-only) + + # The connection is chosen automatically: when token_url + client_id + client_secret are + # present, an NGTS connection is built. The platform can also be set explicitly: + # conn = venafi_connection(platform=VenafiPlatform.NGTS, ...) + conn = venafi_connection(url=url, token_url=token_url, client_id=client_id, client_secret=client_secret, + tsg_id=tsg_id, scope=scope) + + # Build a Certificate request + request = CertificateRequest(common_name=f"{random_word(10)}.venafi.example.com") + request.san_dns = ["www.dns.venafi.example.com", "ww1.dns.venafi.example.com"] + + # Request the certificate. + conn.request_cert(request, zone) + # Wait for the certificate to be retrieved (until ISSUED or timeout, 180s by default). + cert = conn.retrieve_cert(request) + + # Print the certificate + print(cert.full_chain) + # Save it into a file + with open("./cert.pem", "w") as f: + f.write(cert.full_chain) + + +def random_word(length): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for i in range(length)) + + +if __name__ == '__main__': + main() diff --git a/tests/test_env.py b/tests/test_env.py index e9f5aab..393b014 100644 --- a/tests/test_env.py +++ b/tests/test_env.py @@ -40,5 +40,14 @@ TPP_SSH_CADN = environ.get('TPP_SSH_CADN') +# NGTS (Palo Alto Networks Next-Gen Trust Security) +NGTS_URL = environ.get('NGTS_URL') +NGTS_TOKEN_URL = environ.get('NGTS_TOKEN_URL') +NGTS_CLIENT_ID = environ.get('NGTS_CLIENT_ID') +NGTS_CLIENT_SECRET = environ.get('NGTS_CLIENT_SECRET') +NGTS_TSG_ID = environ.get('NGTS_TSG_ID') +NGTS_SCOPE = environ.get('NGTS_SCOPE') +NGTS_ZONE = environ.get('NGTS_ZONE') + if RANDOM_DOMAIN and not isinstance(RANDOM_DOMAIN, text_type): RANDOM_DOMAIN = RANDOM_DOMAIN.decode() diff --git a/tests/test_local_methods.py b/tests/test_local_methods.py index 2bc8cd1..1490a3b 100644 --- a/tests/test_local_methods.py +++ b/tests/test_local_methods.py @@ -16,13 +16,16 @@ # import json import unittest +from unittest import mock from cryptography import x509 from cryptography.hazmat.backends import default_backend from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection, - logger) + NGTSConnection, logger) +from vcert.connection_ngts import _parse_ngts_zone +from vcert.errors import ClientBadData, ServerUnexptedBehavior from vcert.pem import parse_pem, Certificate pkcs12_enc_cert = """-----BEGIN CERTIFICATE----- @@ -346,3 +349,96 @@ def test_pkcs12_plain_pk(self): cert = Certificate(cert=pkcs12_plain_cert, chain=chain, key=pkcs12_plain_pk) output = cert.as_pkcs12() log.info(f"PKCS12 created successfully:\n{output}") + + # -- NGTS (offline) ----------------------------------------------------------------------- + + @staticmethod + def _ngts_conn(**kwargs): + defaults = dict( + client_id="cid", + client_secret="csecret", + token_url="https://auth.example.com/oauth2/token", + tsg_id="123", + url="https://api.sase.paloaltonetworks.com/ngts", + ) + defaults.update(kwargs) + return NGTSConnection(**defaults) + + def test_parse_ngts_zone(self): + # CIT-only: the whole (trimmed) string is the template alias, no backslash split. + self.assertEqual(_parse_ngts_zone("MyTemplate"), "MyTemplate") + self.assertEqual(_parse_ngts_zone(" MyTemplate "), "MyTemplate") + # A backslash is NOT a separator for NGTS - it is part of the alias. + self.assertEqual(_parse_ngts_zone("App\\CIT"), "App\\CIT") + with self.assertRaises(ClientBadData): + _parse_ngts_zone("") + with self.assertRaises(ClientBadData): + _parse_ngts_zone(None) + + def test_ngts_scope_from_tsg_id(self): + conn = self._ngts_conn() + self.assertEqual(conn._scope, "tsg_id:123") + + def test_ngts_explicit_scope_wins(self): + conn = self._ngts_conn(scope="tsg_id:999", tsg_id=None) + self.assertEqual(conn._scope, "tsg_id:999") + + def test_ngts_requires_url(self): + with self.assertRaises(ClientBadData): + self._ngts_conn(url=None) + + def test_ngts_requires_token_url_without_access_token(self): + with self.assertRaises(ClientBadData): + self._ngts_conn(token_url=None) + + def test_ngts_requires_scope_or_tsg_id(self): + with self.assertRaises(ClientBadData): + self._ngts_conn(tsg_id=None, scope=None) + + def test_ngts_get_access_token(self): + conn = self._ngts_conn() + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.json.return_value = { + 'access_token': 'abc.def.ghi', + 'token_type': 'Bearer', + 'expires_in': 900, + 'scope': 'tsg_id:123', + } + with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp) as post: + token = conn._get_access_token() + + self.assertEqual(token, 'abc.def.ghi') + self.assertEqual(conn._access_token, 'abc.def.ghi') + self.assertIsNotNone(conn._token_expires) + # client_id/client_secret go through HTTP Basic auth; the body carries grant_type + scope. + _, kwargs = post.call_args + self.assertEqual(kwargs['auth'], ('cid', 'csecret')) + self.assertEqual(kwargs['data']['grant_type'], 'client_credentials') + self.assertEqual(kwargs['data']['scope'], 'tsg_id:123') + + def test_ngts_access_token_rejects_non_bearer(self): + conn = self._ngts_conn() + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.json.return_value = {'access_token': 'x', 'token_type': 'mac', 'expires_in': 900} + with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp): + with self.assertRaises(ServerUnexptedBehavior): + conn._get_access_token() + + def test_ngts_auth_header_is_bearer(self): + conn = self._ngts_conn(access_token='pre.issued.token', token_url=None) + headers = conn._auth_headers('application/json') + self.assertEqual(headers['Authorization'], 'Bearer pre.issued.token') + self.assertNotIn('tppl-api-key', headers) + + def test_ngts_get_sends_bearer_header(self): + conn = self._ngts_conn(access_token='pre.issued.token', token_url=None) + fake_resp = mock.MagicMock() + fake_resp.status_code = 200 + fake_resp.headers = {'content-type': 'application/json'} + fake_resp.json.return_value = {} + with mock.patch('vcert.connection_ngts.requests.get', return_value=fake_resp) as get: + conn._get("v1/certificateissuingtemplates") + _, kwargs = get.call_args + self.assertEqual(kwargs['headers']['Authorization'], 'Bearer pre.issued.token') diff --git a/tests/test_ngts.py b/tests/test_ngts.py new file mode 100644 index 0000000..b215e5c --- /dev/null +++ b/tests/test_ngts.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3 +# +# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk") +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Live tests for the NGTS (Palo Alto Networks Next-Gen Trust Security) connector. +# These hit a live backend and are skipped unless the NGTS_* credentials are present in the +# environment (see tests/test_env.py). +# +import binascii +import time +import unittest + +from cryptography.hazmat.primitives import hashes + +from test_env import (NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_TSG_ID, NGTS_SCOPE, + NGTS_ZONE) +from test_utils import random_word, enroll, renew, renew_by_thumbprint +from vcert import NGTSConnection, KeyType, logger +from vcert.common import RetireRequest + +log = logger.get_child("test-ngts") + +_HAS_CREDS = all([NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_ZONE]) \ + and (NGTS_TSG_ID or NGTS_SCOPE) + + +@unittest.skipUnless(_HAS_CREDS, "NGTS_* credentials are not set; skipping live NGTS tests") +class TestNGTSMethods(unittest.TestCase): + def setUp(self): + # Built in setUp (not __init__) so collecting this module without NGTS_* creds does not + # try to construct a connection - the class is skipped before setUp runs. + self.ngts_zone = NGTS_ZONE + self.ngts_conn = NGTSConnection(client_id=NGTS_CLIENT_ID, client_secret=NGTS_CLIENT_SECRET, + token_url=NGTS_TOKEN_URL, scope=NGTS_SCOPE, tsg_id=NGTS_TSG_ID, url=NGTS_URL) + + def test_ngts_auth(self): + token = self.ngts_conn.auth() + self.assertTrue(token) + self.assertIsNotNone(self.ngts_conn._token_expires) + + def test_ngts_enroll(self): + cn = f"{random_word(10)}.venafi.example.com" + enroll(self.ngts_conn, self.ngts_zone, cn) + + def test_ngts_renew(self): + cn = f"{random_word(10)}.venafi.example.com" + cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn) + time.sleep(5) + renew(self.ngts_conn, cert_id, pkey, cert.serial_number, cn) + + def test_ngts_renew_by_thumbprint(self): + cn = f"{random_word(10)}.venafi.example.com" + cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn) + time.sleep(5) + renew_by_thumbprint(self.ngts_conn, cert) + + def test_ngts_retire_by_thumbprint(self): + cn = f"{random_word(10)}.venafi.example.com" + cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn) + fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode() + ret_request = RetireRequest(thumbprint=fingerprint) + self.assertTrue(self.ngts_conn.retire_cert(ret_request)) + + def test_ngts_read_zone_config(self): + zone = self.ngts_conn.read_zone_conf(self.ngts_zone) + self.assertIsNotNone(zone.policy) + self.assertTrue(len(zone.policy.key_types) > 0) + + def test_ngts_read_zone_invalid_zone(self): + with self.assertRaises(Exception): + self.ngts_conn.read_zone_conf(f"non-existent-cit-{random_word(8)}") + + +if __name__ == '__main__': + unittest.main() diff --git a/vcert/__init__.py b/vcert/__init__.py index 2ef1acc..02e5bfc 100644 --- a/vcert/__init__.py +++ b/vcert/__init__.py @@ -17,6 +17,7 @@ CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED, CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE, CHAIN_OPTION_LAST, VenafiPlatform) from .connection_cloud import CloudConnection +from .connection_ngts import NGTSConnection from .connection_tpp import TPPConnection from .connection_tpp_token import TPPTokenConnection from .connection_fake import FakeConnection @@ -54,21 +55,28 @@ def Connection(url=None, token=None, user=None, password=None, fake=False, http_ def venafi_connection(url=None, api_key=None, user=None, password=None, access_token=None, refresh_token=None, - fake=False, http_request_kwargs=None, platform=None): + fake=False, http_request_kwargs=None, platform=None, client_id=None, client_secret=None, + token_url=None, scope=None, tsg_id=None): """ Return connection based on credentials list. CyberArk Platform (CyberArk Certificate Manager, Self-Hosted) requires URL and access_token (or user and password for getting a new access_token) Cloud requires api_key and optional URL + NGTS (Palo Alto Networks Next-Gen Trust Security) requires URL, token_url and OAuth2 service-account credentials (client_id, client_secret, tsg_id/scope) Fake requires no parameters - :param str url: CyberArk Certificate Manager, Self-Hosted or CyberArk Certificate Manager, SaaS URL (for Cloud is optional) + :param str url: CyberArk Certificate Manager, Self-Hosted / SaaS / NGTS URL (for Cloud is optional, required for NGTS) :param str api_key: CyberArk Certificate Manager, SaaS API Key :param str user: CyberArk Certificate Manager, Self-Hosted username for getting new tokens :param str password: CyberArk Certificate Manager, Self-Hosted password for getting new tokens - :param str access_token: CyberArk Certificate Manager, Self-Hosted access token + :param str access_token: CyberArk Certificate Manager, Self-Hosted access token (or a pre-issued NGTS access token) :param str refresh_token: CyberArk Certificate Manager, Self-Hosted refresh token (optional) :param bool fake: Use fake connection :param dict[str, Any] http_request_kwargs: Option for specifying trust bundle or to operate insecurely. :param VenafiPlatform platform: The platform to be used with the Connector + :param str client_id: NGTS OAuth2 service-account client id + :param str client_secret: NGTS OAuth2 service-account client secret + :param str token_url: NGTS OAuth2 token endpoint (differs per environment) + :param str scope: NGTS OAuth2 scope (``tsg_id:``); derived from tsg_id when omitted + :param str tsg_id: NGTS tenant service group id :rtype CommonConnection: """ if platform: @@ -79,11 +87,21 @@ def venafi_connection(url=None, api_key=None, user=None, password=None, access_t refresh_token=refresh_token, http_request_kwargs=http_request_kwargs) elif platform == VenafiPlatform.VAAS: return CloudConnection(token=api_key, url=url, http_request_kwargs=http_request_kwargs) + elif platform == VenafiPlatform.NGTS: + return NGTSConnection(client_id=client_id, client_secret=client_secret, token_url=token_url, scope=scope, + tsg_id=tsg_id, access_token=access_token, url=url, + http_request_kwargs=http_request_kwargs) else: raise VenafiError(f"Invalid Platform: {platform}. Cannot instantiate a Connector.") else: if fake: return FakeConnection() + # NGTS is detected before the TPP/Cloud branches so its OAuth service-account credentials + # are not shadowed by them. + if token_url and client_id and client_secret: + return NGTSConnection(client_id=client_id, client_secret=client_secret, token_url=token_url, scope=scope, + tsg_id=tsg_id, access_token=access_token, url=url, + http_request_kwargs=http_request_kwargs) if url and (access_token or refresh_token or (user and password)): return TPPTokenConnection(url=url, user=user, password=password, access_token=access_token, refresh_token=refresh_token, http_request_kwargs=http_request_kwargs) diff --git a/vcert/common.py b/vcert/common.py index 3b55d4c..eb8742c 100644 --- a/vcert/common.py +++ b/vcert/common.py @@ -609,7 +609,8 @@ def __init__(self, req_id=None, thumbprint=None, guid=None, description=None): class Authentication: def __init__(self, user=None, password=None, access_token=None, refresh_token=None, api_key=None, state=None, - token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM): + token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM, client_secret=None, token_url=None, + tsg_id=None): self.user = user self.password = password self.access_token = access_token @@ -617,6 +618,13 @@ def __init__(self, user=None, password=None, access_token=None, refresh_token=No self.token_expires = token_expires self.api_key = api_key self.client_id = client_id + self.client_secret = client_secret + self.token_url = token_url + self.tsg_id = tsg_id + # NGTS OAuth scope is structured as "tsg_id:" (tenant service group id). + # Derive it from tsg_id when an explicit scope was not supplied. + if tsg_id and scope == SCOPE_CM: + scope = f"tsg_id:{tsg_id}" self.scope = scope self.state = state @@ -772,4 +780,5 @@ def __new__(cls, value, description): FAKE = 100, "Connector for testing purposes" TPP = 200, "Trust Protection Platform" + NGTS = 300, "Next-Gen Trust Security" VAAS = 400, "Venafi as a Service" diff --git a/vcert/connection_ngts.py b/vcert/connection_ngts.py new file mode 100644 index 0000000..2800ffa --- /dev/null +++ b/vcert/connection_ngts.py @@ -0,0 +1,383 @@ +# +# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk") +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import re +from datetime import datetime, timedelta + +import requests + +from .common import (ZoneConfig, CertField, CertificateRequest, KeyType, get_ip_address, MIME_JSON, MIME_ANY, + CSR_ORIGIN_SERVICE) +from .connection_cloud import CloudConnection, URLS, APPLICATION_SERVER_TYPE_ID +from .errors import (VenafiConnectionError, ServerUnexptedBehavior, ClientBadData, CertificateRequestError, + CertificateRenewError, VenafiError) +from .http_status import HTTPStatus +from .logger import get_child + +# OAuth2 access tokens issued by Strata Cloud Manager live ~15 minutes. Refresh a little +# ahead of expiry so in-flight calls never race the boundary (mirrors Go's +# tokenBufferToExpiryWindow). +TOKEN_EXPIRY_BUFFER_SECONDS = 120 +OAUTH_TOKEN_TYPE = "Bearer" # nosec B105 +DEFAULT_TOKEN_LIFESPAN_SECONDS = 900 + +log = get_child("connection-ngts") + + +def _parse_ngts_zone(zone): + """ + NGTS zones are a Certificate Issuing Template alias only - the entire zone string is the + template name. Unlike Cloud/VaaS there is no ``Application\\CIT`` split and no Applications API. + + :param str zone: + :rtype: str + """ + if not zone: + log.error("Invalid Zone. It is empty") + raise ClientBadData("You need to specify a zone") + return zone.strip() + + +class NGTSConnection(CloudConnection): + """ + Connector for Palo Alto Networks Next-Gen Trust Security (NGTS). + + NGTS is VaaS-derived: it reuses the same ``outagedetection/v1/*`` REST endpoints as + :class:`CloudConnection`. Only authentication and zone format differ: + + - Auth is Strata Cloud Manager OAuth2 client-credentials via a service account + (Client ID, Client Secret, TSG ID). Resource calls carry ``Authorization: Bearer `` + instead of the ``tppl-api-key`` header. + - Zones are a Certificate Issuing Template alias only (no ``Application\\CIT`` split), and + request payloads omit ``applicationId``. + """ + + def __init__(self, client_id, client_secret, token_url, scope=None, tsg_id=None, access_token=None, url=None, + http_request_kwargs=None): + # The NGTS API base URL and token URL both differ per environment (dev/prod), including the + # path, so neither can be hardcoded - both must be supplied by the caller. + if not url: + raise ClientBadData("NGTS requires the API base URL (it differs per environment)") + if not access_token and not token_url: + raise ClientBadData("NGTS requires the token URL (it differs per environment) " + "when no access_token is supplied") + + if not scope: + if not tsg_id: + raise ClientBadData("NGTS requires either a scope or a tsg_id") + scope = f"tsg_id:{tsg_id}" + + # CloudConnection.__init__ normalizes/verifies the base URL and sets up + # self._http_request_kwargs. The Bearer token replaces the api-key token entirely. + super().__init__(token=None, url=url, http_request_kwargs=http_request_kwargs) + + self._client_id = client_id + self._client_secret = client_secret + self._token_url = token_url + self._scope = scope + self._tsg_id = tsg_id + self._access_token = access_token + self._token_expires = None + + def __str__(self): + return f"[NGTS] {self._base_url}" + + def _normalize_and_verify_base_url(self): + # Unlike Cloud (host-only), NGTS base URLs carry an environment-specific path + # (e.g. https://api.sase.paloaltonetworks.com/ngts), so path segments must be allowed. + u = self._base_url + if u.startswith('http://'): + u = f"https://{u[7:]}" + elif not u.startswith('https://'): + u = f"https://{u}" + if not u.endswith("/"): + u += "/" + if not re.match(r"^https://[a-z\d]+[-a-z\d.]+[a-z\d][:\d]*(/[-a-zA-Z\d._~]+)*/$", u): + raise ClientBadData + self._base_url = u + + # -- Authentication -------------------------------------------------------------------------- + + def _get_access_token(self): + """ + Fetch an OAuth2 access token via the client-credentials grant. ``client_id``/ + ``client_secret`` are sent through HTTP Basic auth; the body carries only ``grant_type`` + and the structured ``scope`` (``tsg_id:``). + + :rtype: str + """ + if not self._client_id or not self._client_secret: + raise ClientBadData("client_id and client_secret are required to fetch an access token") + + headers = {'Content-Type': 'application/x-www-form-urlencoded'} + data = { + 'grant_type': 'client_credentials', + 'scope': self._scope, + } + r = requests.post(self._token_url, data=data, auth=(self._client_id, self._client_secret), + headers=headers, **self._http_request_kwargs) # nosec B113 + if r.status_code != HTTPStatus.OK: + log.error(f"Failed to obtain access token. Server status: {r.status_code}") + raise VenafiConnectionError(f"Failed to obtain access token. Server status: {r.status_code}") + + response = r.json() + token_type = response.get('token_type') + if token_type != OAUTH_TOKEN_TYPE: + log.error(f"Unexpected token type: {token_type}") + raise ServerUnexptedBehavior(f"Unexpected token type: {token_type}") + + self._access_token = response.get('access_token') + if not self._access_token: + raise ServerUnexptedBehavior("Access token missing from token response") + + expires_in = response.get('expires_in', DEFAULT_TOKEN_LIFESPAN_SECONDS) + self._token_expires = datetime.now() + timedelta(seconds=expires_in - TOKEN_EXPIRY_BUFFER_SECONDS) + return self._access_token + + def auth(self): + """ + Use a valid supplied access token, otherwise fetch a new one. + """ + if not (self._access_token and self._token_is_valid()): + self._get_access_token() + return self._access_token + + def _token_is_valid(self): + """ + :rtype: bool + """ + if not self._access_token: + return False + # A supplied token without a known expiry is taken at face value. + if self._token_expires is None: + return True + return datetime.now() < self._token_expires + + def _ensure_token(self): + """ + Lazily (re-)fetch the access token when it is missing or near expiry and client + credentials are available. Simpler than Go's background renewal goroutine and adequate + for an SDK that authenticates at call time. + """ + if self._token_is_valid(): + return + if self._client_id and self._client_secret: + self._get_access_token() + elif not self._access_token: + raise ClientBadData("No valid access token and no client credentials to obtain one") + + def _auth_headers(self, accept): + """ + :param str accept: + :rtype: dict[str, str] + """ + return { + 'Authorization': f"Bearer {self._access_token}", + 'accept': accept, + 'cache-control': "no-cache", + } + + # -- HTTP verbs (Bearer auth instead of tppl-api-key) -------------------------------------- + + def _get(self, url, params=None): + self._ensure_token() + headers = self._auth_headers(MIME_ANY) + r = requests.get(self._base_url + url, params=params, headers=headers, + **self._http_request_kwargs) # nosec B113 + return self.process_server_response(r) + + def _post(self, url, data=None): + self._ensure_token() + headers = self._auth_headers(MIME_JSON) + if isinstance(data, dict): + r = requests.post(self._base_url + url, json=data, headers=headers, + **self._http_request_kwargs) # nosec B113 + else: + log.error(f"Unexpected client data type: {type(data)} for {url}") + raise ClientBadData + return self.process_server_response(r) + + def _put(self, url, data=None): + self._ensure_token() + headers = self._auth_headers(MIME_JSON) + if isinstance(data, dict): + r = requests.put(self._base_url + url, json=data, headers=headers, + **self._http_request_kwargs) # nosec B113 + else: + log.error(f"Unexpected client data type: {type(data)} for {url}") + raise ClientBadData + return self.process_server_response(r) + + # -- Certificate lifecycle (deltas vs Cloud) ---------------------------------------------- + + def _get_cit_or_fail(self, zone): + """ + Resolve the Certificate Issuing Template for an NGTS (CIT-only) zone via the global + template list. + + :param str zone: + :rtype: dict + """ + cit = self._get_cit(_parse_ngts_zone(zone)) + if not cit: + log.error(f"Certificate issuing template not found for zone [{zone}]") + raise VenafiError(f"Certificate issuing template not found for zone [{zone}]") + return cit + + def request_cert(self, request, zone): + cit = self._get_cit_or_fail(zone) + cit_id = cit['id'] + + ip_address = get_ip_address() + request_data = { + 'certificateIssuingTemplateId': cit_id, + 'apiClientInformation': { + 'type': request.origin, + 'identifier': ip_address + } + } + zone_config = self.read_zone_conf(zone) + request.update_from_zone_config(zone_config) + + if request.csr_origin != CSR_ORIGIN_SERVICE: + if not request.csr: + request.build_csr() + request_data['certificateSigningRequest'] = request.csr + else: + request_data['isVaaSGenerated'] = True + request_data['applicationServerTypeId'] = APPLICATION_SERVER_TYPE_ID + request_data['csrAttributes'] = self._get_service_generated_csr_attr(request, zone) + + if request.validity_hours is not None: + request_data['validityPeriod'] = f"PT{request.validity_hours}H" + + status, data = self._post(URLS.CERTIFICATE_REQUESTS, data=request_data) + if status == HTTPStatus.CREATED: + request.id = data['certificateRequests'][0]['id'] + if 'certificateIds' in data['certificateRequests'][0] \ + and len(data['certificateRequests'][0]['certificateIds']) > 0: + request.cert_guid = data['certificateRequests'][0]['certificateIds'][0] + return True + else: + log.error(f"unexpected server response {status}: {data}") + raise CertificateRequestError + + def renew_cert(self, request, reuse_key=False): + cert_request_id = None + if not request.id and not request.thumbprint: + log.error("prev_cert_id or thumbprint or manage_id must be specified for renewing certificate") + raise ClientBadData + + if request.thumbprint: + response = self.search_by_thumbprint(request.thumbprint) + cert_request_id = response.csrId + + if request.id: + cert_request_id = request.id + + prev_request = self._get_cert_status(CertificateRequest(cert_id=cert_request_id)) + certificate_id = prev_request.certificateIds[0] + cit_id = prev_request.citId + + if not certificate_id or not cit_id: + log.error("Can't find certificate_id") + raise ClientBadData + + status, data = self._get(URLS.CERTIFICATE_BY_ID.format(certificate_id)) + if status == HTTPStatus.OK: + request.id = data['certificateRequestId'] + else: + raise ServerUnexptedBehavior + + ip_address = get_ip_address() + d = {'existingCertificateId': certificate_id, + 'certificateIssuingTemplateId': cit_id, + 'apiClientInformation': { + 'type': request.origin, + 'identifier': ip_address + }} + + if reuse_key: + if request.csr: + d['certificateSigningRequest'] = request.csr + d['reuseCSR'] = False + else: + log.error("Certificate renew by reusing the CSR is not supported right now. " + "Set [reuse_key] to False or just remove it") + raise VenafiError + else: + c = data + if c.get('subjectCN'): + request.common_name = c['subjectCN'][0] + if c.get('subjectC'): + request.country = c['subjectC'] + if c.get('subjectO'): + request.organization = c['subjectO'] + if c.get('subjectOU'): + request.organizational_unit = c['subjectOU'] + if c.get('subjectL'): + request.locality = c['subjectL'] + if c.get('subjectAlternativeNameDns'): + request.san_dns = c['subjectAlternativeNameDns'] + request.key_type = KeyType(KeyType.RSA, c['keyStrength']) + request.build_csr() + d['certificateSigningRequest'] = request.csr + d['reuseCSR'] = False + + status, data = self._post(URLS.CERTIFICATE_REQUESTS, data=d) + if status == HTTPStatus.CREATED: + request.id = data['certificateRequests'][0]['id'] + return True + else: + log.error(f"server unexpected status {status}") + raise CertificateRenewError + + def read_zone_conf(self, zone): + cit = self._get_cit_or_fail(zone) + policy = self._parse_policy_response_to_object(cit) + rs = policy.recommended_settings + org = CertField("") + org_unit = CertField("") + locality = CertField("") + state = CertField("") + country = CertField("") + if rs: + org = CertField(rs.subjectOValue) + org_unit = CertField(rs.subjectOUValue) + locality = CertField(rs.subjectLValue) + state = CertField(rs.subjectSTValue) + country = CertField(rs.subjectCValue) + + z = ZoneConfig( + organization=org, + organizational_unit=org_unit, + country=country, + province=state, + locality=locality, + policy=policy, + key_type=policy.key_types[0] if policy.key_types else None, + ) + return z + + # -- Out of scope for NGTS ---------------------------------------------------------------- + + def get_policy(self, zone): + raise NotImplementedError + + def set_policy(self, zone, policy_spec): + raise NotImplementedError + + def get_version(self): + raise NotImplementedError