Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,13 @@ secrets.json
venv
venv3
venv27
.venv
vcert/example_cert.py
credentials
credentials

# Generated certificate material from example scripts / live tests (never commit)
/cert.pem
/cert.key
/key.pem
/chain.pem
/renewed_cert.pem
68 changes: 68 additions & 0 deletions examples/ngts/get_cert_ngts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/usr/bin/env python3
#
# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk")
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from vcert import (CertificateRequest, venafi_connection, VenafiPlatform)
import string
import random
import logging
from os import environ

logging.basicConfig(level=logging.INFO)
logging.getLogger("urllib3").setLevel(logging.ERROR)


def main():
# Get credentials from environment variables.
# NGTS (Palo Alto Networks Next-Gen Trust Security) authenticates with Strata Cloud Manager
# OAuth2 client credentials issued by a service account. Both the API base URL and the token
# URL differ per environment (dev/prod), so both must be supplied.
url = environ.get('NGTS_URL') # NGTS API base URL (e.g. https://api.sase.paloaltonetworks.com/ngts)
token_url = environ.get('NGTS_TOKEN_URL') # OAuth2 token endpoint (different FQDN, env-specific)
client_id = environ.get('NGTS_CLIENT_ID') # Service-account client id
client_secret = environ.get('NGTS_CLIENT_SECRET') # Service-account client secret
tsg_id = environ.get('NGTS_TSG_ID') # Tenant service group id (used to build the scope)
scope = environ.get('NGTS_SCOPE') # Optional: a ready "tsg_id:<TSG_ID>" scope
zone = environ.get('NGTS_ZONE') # Certificate Issuing Template alias (CIT-only)

# The connection is chosen automatically: when token_url + client_id + client_secret are
# present, an NGTS connection is built. The platform can also be set explicitly:
# conn = venafi_connection(platform=VenafiPlatform.NGTS, ...)
conn = venafi_connection(url=url, token_url=token_url, client_id=client_id, client_secret=client_secret,
tsg_id=tsg_id, scope=scope)

# Build a Certificate request
request = CertificateRequest(common_name=f"{random_word(10)}.venafi.example.com")
request.san_dns = ["www.dns.venafi.example.com", "ww1.dns.venafi.example.com"]

# Request the certificate.
conn.request_cert(request, zone)
# Wait for the certificate to be retrieved (until ISSUED or timeout, 180s by default).
cert = conn.retrieve_cert(request)

# Print the certificate
print(cert.full_chain)
# Save it into a file
with open("./cert.pem", "w") as f:
f.write(cert.full_chain)


def random_word(length):
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for i in range(length))


if __name__ == '__main__':
main()
9 changes: 9 additions & 0 deletions tests/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,14 @@

TPP_SSH_CADN = environ.get('TPP_SSH_CADN')

# NGTS (Palo Alto Networks Next-Gen Trust Security)
NGTS_URL = environ.get('NGTS_URL')
NGTS_TOKEN_URL = environ.get('NGTS_TOKEN_URL')
NGTS_CLIENT_ID = environ.get('NGTS_CLIENT_ID')
NGTS_CLIENT_SECRET = environ.get('NGTS_CLIENT_SECRET')
NGTS_TSG_ID = environ.get('NGTS_TSG_ID')
NGTS_SCOPE = environ.get('NGTS_SCOPE')
NGTS_ZONE = environ.get('NGTS_ZONE')

if RANDOM_DOMAIN and not isinstance(RANDOM_DOMAIN, text_type):
RANDOM_DOMAIN = RANDOM_DOMAIN.decode()
98 changes: 97 additions & 1 deletion tests/test_local_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
#
import json
import unittest
from unittest import mock

from cryptography import x509
from cryptography.hazmat.backends import default_backend

from assets import POLICY_CLOUD1, POLICY_TPP1, EXAMPLE_CSR, EXAMPLE_CHAIN
from vcert import (CloudConnection, KeyType, TPPConnection, CertificateRequest, ZoneConfig, CertField, FakeConnection,
logger)
NGTSConnection, logger)
from vcert.connection_ngts import _parse_ngts_zone
from vcert.errors import ClientBadData, ServerUnexptedBehavior
from vcert.pem import parse_pem, Certificate

pkcs12_enc_cert = """-----BEGIN CERTIFICATE-----
Expand Down Expand Up @@ -346,3 +349,96 @@ def test_pkcs12_plain_pk(self):
cert = Certificate(cert=pkcs12_plain_cert, chain=chain, key=pkcs12_plain_pk)
output = cert.as_pkcs12()
log.info(f"PKCS12 created successfully:\n{output}")

# -- NGTS (offline) -----------------------------------------------------------------------

@staticmethod
def _ngts_conn(**kwargs):
defaults = dict(
client_id="cid",
client_secret="csecret",
token_url="https://auth.example.com/oauth2/token",
tsg_id="123",
url="https://api.sase.paloaltonetworks.com/ngts",
)
defaults.update(kwargs)
return NGTSConnection(**defaults)

def test_parse_ngts_zone(self):
# CIT-only: the whole (trimmed) string is the template alias, no backslash split.
self.assertEqual(_parse_ngts_zone("MyTemplate"), "MyTemplate")
self.assertEqual(_parse_ngts_zone(" MyTemplate "), "MyTemplate")
# A backslash is NOT a separator for NGTS - it is part of the alias.
self.assertEqual(_parse_ngts_zone("App\\CIT"), "App\\CIT")
with self.assertRaises(ClientBadData):
_parse_ngts_zone("")
with self.assertRaises(ClientBadData):
_parse_ngts_zone(None)

def test_ngts_scope_from_tsg_id(self):
conn = self._ngts_conn()
self.assertEqual(conn._scope, "tsg_id:123")

def test_ngts_explicit_scope_wins(self):
conn = self._ngts_conn(scope="tsg_id:999", tsg_id=None)
self.assertEqual(conn._scope, "tsg_id:999")

def test_ngts_requires_url(self):
with self.assertRaises(ClientBadData):
self._ngts_conn(url=None)

def test_ngts_requires_token_url_without_access_token(self):
with self.assertRaises(ClientBadData):
self._ngts_conn(token_url=None)

def test_ngts_requires_scope_or_tsg_id(self):
with self.assertRaises(ClientBadData):
self._ngts_conn(tsg_id=None, scope=None)

def test_ngts_get_access_token(self):
conn = self._ngts_conn()
fake_resp = mock.MagicMock()
fake_resp.status_code = 200
fake_resp.json.return_value = {
'access_token': 'abc.def.ghi',
'token_type': 'Bearer',
'expires_in': 900,
'scope': 'tsg_id:123',
}
with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp) as post:
token = conn._get_access_token()

self.assertEqual(token, 'abc.def.ghi')
self.assertEqual(conn._access_token, 'abc.def.ghi')
self.assertIsNotNone(conn._token_expires)
# client_id/client_secret go through HTTP Basic auth; the body carries grant_type + scope.
_, kwargs = post.call_args
self.assertEqual(kwargs['auth'], ('cid', 'csecret'))
self.assertEqual(kwargs['data']['grant_type'], 'client_credentials')
self.assertEqual(kwargs['data']['scope'], 'tsg_id:123')

def test_ngts_access_token_rejects_non_bearer(self):
conn = self._ngts_conn()
fake_resp = mock.MagicMock()
fake_resp.status_code = 200
fake_resp.json.return_value = {'access_token': 'x', 'token_type': 'mac', 'expires_in': 900}
with mock.patch('vcert.connection_ngts.requests.post', return_value=fake_resp):
with self.assertRaises(ServerUnexptedBehavior):
conn._get_access_token()

def test_ngts_auth_header_is_bearer(self):
conn = self._ngts_conn(access_token='pre.issued.token', token_url=None)
headers = conn._auth_headers('application/json')
self.assertEqual(headers['Authorization'], 'Bearer pre.issued.token')
self.assertNotIn('tppl-api-key', headers)

def test_ngts_get_sends_bearer_header(self):
conn = self._ngts_conn(access_token='pre.issued.token', token_url=None)
fake_resp = mock.MagicMock()
fake_resp.status_code = 200
fake_resp.headers = {'content-type': 'application/json'}
fake_resp.json.return_value = {}
with mock.patch('vcert.connection_ngts.requests.get', return_value=fake_resp) as get:
conn._get("v1/certificateissuingtemplates")
_, kwargs = get.call_args
self.assertEqual(kwargs['headers']['Authorization'], 'Bearer pre.issued.token')
87 changes: 87 additions & 0 deletions tests/test_ngts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env python3
#
# Copyright Venafi, Inc. and CyberArk Software Ltd. ("CyberArk")
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Live tests for the NGTS (Palo Alto Networks Next-Gen Trust Security) connector.
# These hit a live backend and are skipped unless the NGTS_* credentials are present in the
# environment (see tests/test_env.py).
#
import binascii
import time
import unittest

from cryptography.hazmat.primitives import hashes

from test_env import (NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_TSG_ID, NGTS_SCOPE,
NGTS_ZONE)
from test_utils import random_word, enroll, renew, renew_by_thumbprint
from vcert import NGTSConnection, KeyType, logger
from vcert.common import RetireRequest

log = logger.get_child("test-ngts")

_HAS_CREDS = all([NGTS_URL, NGTS_TOKEN_URL, NGTS_CLIENT_ID, NGTS_CLIENT_SECRET, NGTS_ZONE]) \
and (NGTS_TSG_ID or NGTS_SCOPE)


@unittest.skipUnless(_HAS_CREDS, "NGTS_* credentials are not set; skipping live NGTS tests")
class TestNGTSMethods(unittest.TestCase):
def setUp(self):
# Built in setUp (not __init__) so collecting this module without NGTS_* creds does not
# try to construct a connection - the class is skipped before setUp runs.
self.ngts_zone = NGTS_ZONE
self.ngts_conn = NGTSConnection(client_id=NGTS_CLIENT_ID, client_secret=NGTS_CLIENT_SECRET,
token_url=NGTS_TOKEN_URL, scope=NGTS_SCOPE, tsg_id=NGTS_TSG_ID, url=NGTS_URL)

def test_ngts_auth(self):
token = self.ngts_conn.auth()
self.assertTrue(token)
self.assertIsNotNone(self.ngts_conn._token_expires)

def test_ngts_enroll(self):
cn = f"{random_word(10)}.venafi.example.com"
enroll(self.ngts_conn, self.ngts_zone, cn)

def test_ngts_renew(self):
cn = f"{random_word(10)}.venafi.example.com"
cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn)
time.sleep(5)
renew(self.ngts_conn, cert_id, pkey, cert.serial_number, cn)

def test_ngts_renew_by_thumbprint(self):
cn = f"{random_word(10)}.venafi.example.com"
cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn)
time.sleep(5)
renew_by_thumbprint(self.ngts_conn, cert)

def test_ngts_retire_by_thumbprint(self):
cn = f"{random_word(10)}.venafi.example.com"
cert_id, pkey, cert, _, _ = enroll(self.ngts_conn, self.ngts_zone, cn)
fingerprint = binascii.hexlify(cert.fingerprint(hashes.SHA1())).decode()
ret_request = RetireRequest(thumbprint=fingerprint)
self.assertTrue(self.ngts_conn.retire_cert(ret_request))

def test_ngts_read_zone_config(self):
zone = self.ngts_conn.read_zone_conf(self.ngts_zone)
self.assertIsNotNone(zone.policy)
self.assertTrue(len(zone.policy.key_types) > 0)

def test_ngts_read_zone_invalid_zone(self):
with self.assertRaises(Exception):
self.ngts_conn.read_zone_conf(f"non-existent-cit-{random_word(8)}")


if __name__ == '__main__':
unittest.main()
24 changes: 21 additions & 3 deletions vcert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
CustomField, Authentication, SCOPE_CM, SCOPE_PM, SCOPE_SSH, CSR_ORIGIN_LOCAL, CSR_ORIGIN_PROVIDED,
CSR_ORIGIN_SERVICE, CHAIN_OPTION_FIRST, CHAIN_OPTION_IGNORE, CHAIN_OPTION_LAST, VenafiPlatform)
from .connection_cloud import CloudConnection
from .connection_ngts import NGTSConnection
from .connection_tpp import TPPConnection
from .connection_tpp_token import TPPTokenConnection
from .connection_fake import FakeConnection
Expand Down Expand Up @@ -54,21 +55,28 @@ def Connection(url=None, token=None, user=None, password=None, fake=False, http_


def venafi_connection(url=None, api_key=None, user=None, password=None, access_token=None, refresh_token=None,
fake=False, http_request_kwargs=None, platform=None):
fake=False, http_request_kwargs=None, platform=None, client_id=None, client_secret=None,
token_url=None, scope=None, tsg_id=None):
"""
Return connection based on credentials list.
CyberArk Platform (CyberArk Certificate Manager, Self-Hosted) requires URL and access_token (or user and password for getting a new access_token)
Cloud requires api_key and optional URL
NGTS (Palo Alto Networks Next-Gen Trust Security) requires URL, token_url and OAuth2 service-account credentials (client_id, client_secret, tsg_id/scope)
Fake requires no parameters
:param str url: CyberArk Certificate Manager, Self-Hosted or CyberArk Certificate Manager, SaaS URL (for Cloud is optional)
:param str url: CyberArk Certificate Manager, Self-Hosted / SaaS / NGTS URL (for Cloud is optional, required for NGTS)
:param str api_key: CyberArk Certificate Manager, SaaS API Key
:param str user: CyberArk Certificate Manager, Self-Hosted username for getting new tokens
:param str password: CyberArk Certificate Manager, Self-Hosted password for getting new tokens
:param str access_token: CyberArk Certificate Manager, Self-Hosted access token
:param str access_token: CyberArk Certificate Manager, Self-Hosted access token (or a pre-issued NGTS access token)
:param str refresh_token: CyberArk Certificate Manager, Self-Hosted refresh token (optional)
:param bool fake: Use fake connection
:param dict[str, Any] http_request_kwargs: Option for specifying trust bundle or to operate insecurely.
:param VenafiPlatform platform: The platform to be used with the Connector
:param str client_id: NGTS OAuth2 service-account client id
:param str client_secret: NGTS OAuth2 service-account client secret
:param str token_url: NGTS OAuth2 token endpoint (differs per environment)
:param str scope: NGTS OAuth2 scope (``tsg_id:<TSG_ID>``); derived from tsg_id when omitted
:param str tsg_id: NGTS tenant service group id
:rtype CommonConnection:
"""
if platform:
Expand All @@ -79,11 +87,21 @@ def venafi_connection(url=None, api_key=None, user=None, password=None, access_t
refresh_token=refresh_token, http_request_kwargs=http_request_kwargs)
elif platform == VenafiPlatform.VAAS:
return CloudConnection(token=api_key, url=url, http_request_kwargs=http_request_kwargs)
elif platform == VenafiPlatform.NGTS:
return NGTSConnection(client_id=client_id, client_secret=client_secret, token_url=token_url, scope=scope,
tsg_id=tsg_id, access_token=access_token, url=url,
http_request_kwargs=http_request_kwargs)
else:
raise VenafiError(f"Invalid Platform: {platform}. Cannot instantiate a Connector.")
else:
if fake:
return FakeConnection()
# NGTS is detected before the TPP/Cloud branches so its OAuth service-account credentials
# are not shadowed by them.
if token_url and client_id and client_secret:
return NGTSConnection(client_id=client_id, client_secret=client_secret, token_url=token_url, scope=scope,
tsg_id=tsg_id, access_token=access_token, url=url,
http_request_kwargs=http_request_kwargs)
if url and (access_token or refresh_token or (user and password)):
return TPPTokenConnection(url=url, user=user, password=password, access_token=access_token,
refresh_token=refresh_token, http_request_kwargs=http_request_kwargs)
Expand Down
11 changes: 10 additions & 1 deletion vcert/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,14 +609,22 @@ def __init__(self, req_id=None, thumbprint=None, guid=None, description=None):

class Authentication:
def __init__(self, user=None, password=None, access_token=None, refresh_token=None, api_key=None, state=None,
token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM):
token_expires=None, client_id=CLIENT_ID, scope=SCOPE_CM, client_secret=None, token_url=None,
tsg_id=None):
self.user = user
self.password = password
self.access_token = access_token
self.refresh_token = refresh_token
self.token_expires = token_expires
self.api_key = api_key
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.tsg_id = tsg_id
# NGTS OAuth scope is structured as "tsg_id:<TSG_ID>" (tenant service group id).
# Derive it from tsg_id when an explicit scope was not supplied.
if tsg_id and scope == SCOPE_CM:
scope = f"tsg_id:{tsg_id}"
self.scope = scope
self.state = state

Expand Down Expand Up @@ -772,4 +780,5 @@ def __new__(cls, value, description):

FAKE = 100, "Connector for testing purposes"
TPP = 200, "Trust Protection Platform"
NGTS = 300, "Next-Gen Trust Security"
VAAS = 400, "Venafi as a Service"
Loading