diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index b4b7e663..d014419d 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -101,6 +101,9 @@ class Certificates(EnumBackport): KEK = "KEK" DB = "DB" + # Confidential VM detection shim script name + DETECT_CVM_SHIM_FILE_NAME = "DetectConfidentialVmShim.sh" + # File to save default settings for auto OS updates IMAGE_DEFAULT_PATCH_CONFIGURATION_BACKUP_PATH = "ImageDefaultPatchConfiguration.bak" diff --git a/src/core/src/bootstrap/DetectConfidentialVmShim.sh b/src/core/src/bootstrap/DetectConfidentialVmShim.sh new file mode 100644 index 00000000..fa8cd2ea --- /dev/null +++ b/src/core/src/bootstrap/DetectConfidentialVmShim.sh @@ -0,0 +1,92 @@ +#!/usr/bin/env bash +# Copyright 2020 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -euo pipefail + +HOSTNAME=$(hostname) + +ROOT_SRC=$(findmnt -n -o SOURCE /) +ROOT_DEV=$(readlink -f "$ROOT_SRC" || echo "$ROOT_SRC") + +FDE="false" +DETAILS="" + +check_device() { + local dev="$1" + + if blkid "$dev" 2>/dev/null | grep -qi 'crypto_LUKS'; then + FDE="true" + DETAILS="LUKS:$dev" + return + fi + + local type + type=$(lsblk -dn -o TYPE "$dev" 2>/dev/null || true) + + if [[ "$type" == "crypt" ]]; then + FDE="true" + DETAILS="CRYPT:$dev" + return + fi +} + +walk_parents() { + local dev="$1" + + while [[ -n "$dev" ]]; do + check_device "$dev" + + if [[ "$FDE" == "true" ]]; then + return + fi + + local parent + parent=$(lsblk -ndo PKNAME "$dev" 2>/dev/null | head -1 || true) + + if [[ -z "$parent" ]]; then + break + fi + + dev="/dev/$parent" + done +} + +walk_parents "$ROOT_DEV" + +if [[ "$FDE" != "true" ]]; then + while read -r name type; do + if [[ "$type" == "crypt" ]]; then + mapper="/dev/mapper/$name" + + if mount | grep -q "^$mapper on / "; then + FDE="true" + DETAILS="DMCRYPT_ROOT:$mapper" + break + fi + fi + done < <(dmsetup ls --target crypt 2>/dev/null || true) +fi + +if [[ "$FDE" != "true" ]]; then + if systemctl list-units 2>/dev/null | grep -qi azure; then + if ls /var/lib/waagent/*Encryption* >/dev/null 2>&1; then + FDE="true" + DETAILS="AZURE_ADE_ARTIFACTS" + fi + fi +fi + +echo "$HOSTNAME,$ROOT_DEV,FDE=$FDE,$DETAILS" + diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index 4c01ea12..6b1d690d 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -192,6 +192,63 @@ def get_env_var(self, var_name, raise_if_not_success=False): raise return None + def detect_confidential_vm(self): + # type: () -> tuple + """Returns whether the current VM is a Confidential VM and the detection details.""" + if self.platform.os_type() == 'Windows': + return False, str() + + is_confidential_vm, detection_details = self.detect_confidential_vm_by_imds() + if is_confidential_vm: + return is_confidential_vm, detection_details + + is_confidential_vm, detection_details = self.detect_confidential_vm_by_fde() + if is_confidential_vm: + return is_confidential_vm, detection_details + + return False, str() + + def detect_confidential_vm_by_fde(self): + # type: () -> tuple + """Runs the FDE-based CVM detection script and returns whether it detected a Confidential VM.""" + command_output = str() + + try: + detection_shim_path = self.__get_fde_detection_shim_path() + detection_script = self.file_system.read_with_retry(detection_shim_path) + if detection_script is None or str(detection_script).strip() == str(): + raise Exception("FDE_DETECTION_SHIM_EMPTY:{0}".format(str(detection_shim_path))) + + code, out = self.run_command_output('bash "{0}"'.format(detection_shim_path), False, False) + command_output = str(out).strip() if out is not None else str() + return code == 0 and re.search(r'\bFDE\s*=\s*true\b', command_output, re.IGNORECASE) is not None, command_output + except Exception as e: + raise Exception("FDE_DETECTION_ERROR:{0}; OUTPUT:{1}".format(str(e), command_output)) + + @staticmethod + def __get_fde_detection_shim_path(): + # type: () -> str + """Resolves the packaged FDE detection shim path.""" + current_dir = os.path.dirname(os.path.realpath(__file__)) + shim_path = os.path.join(current_dir, Constants.DETECT_CVM_SHIM_FILE_NAME) + + if os.path.isfile(shim_path): + return shim_path + + raise Exception("FDE_DETECTION_SHIM_NOT_FOUND:{0}".format(str(shim_path))) + + def detect_confidential_vm_by_imds(self): + # type: () -> tuple + """Queries Azure IMDS and returns whether the VM reports ConfidentialVM security type.""" + command = 'curl -s --connect-timeout 2 --max-time 2 -H Metadata:true --noproxy "*" "http://169.254.169.254/metadata/instance?api-version=2025-04-07"' + + code, out = self.run_command_output(command, False, False) + command_output = str(out).strip() if out is not None else str() + if code == 0 and re.search(r'"securityType"\s*:\s*"ConfidentialVM"', command_output, re.IGNORECASE) is not None: + return True, 'IMDS:ConfidentialVM' + + return False, str() + def run_command_output(self, cmd, no_output=False, chk_err=True): # type: (str, bool, bool) -> (int, any) """ Wrapper for subprocess.check_output. Execute 'cmd'. Returns return code and STDOUT, trapping expected exceptions. Reports exceptions to Error if chk_err parameter is True """ diff --git a/src/core/src/core_logic/PatchInstaller.py b/src/core/src/core_logic/PatchInstaller.py index aee414f4..81bb0976 100644 --- a/src/core/src/core_logic/PatchInstaller.py +++ b/src/core/src/core_logic/PatchInstaller.py @@ -808,6 +808,16 @@ def try_update_certificates_for_default_patching(self): self.composite_logger.log_debug("Not updating certificates since this is not a default patching operation.") return + try: + is_confidential_vm, detection_details = self.env_layer.detect_confidential_vm() + except Exception as e: + self.composite_logger.log_warning("Unable to determine whether the VM is a Confidential VM before attempting the UEFI certificate update. Continuing with patch installation... [Error: {0}]".format(str(e))) + return + + if is_confidential_vm: + self.composite_logger.log("Skipping UEFI certificate update because this VM was detected as a Confidential VM. [Detection={0}]".format(detection_details)) + return + try: self.package_manager.update_certs() except Exception as e: diff --git a/src/core/src/package_managers/AptitudePackageManager.py b/src/core/src/package_managers/AptitudePackageManager.py index 9439d496..860ef9a7 100644 --- a/src/core/src/package_managers/AptitudePackageManager.py +++ b/src/core/src/package_managers/AptitudePackageManager.py @@ -975,12 +975,12 @@ def try_update_certs(self): success = False try: - self.__run_cert_apt_command(self.apt_update_cmd, "AptUpdate", raise_on_error=True) + self.__run_cert_apt_command(self.apt_update_cmd, step_name="AptUpdate", raise_on_error=True) self.__ensure_fwupd_installation() # shell fwupd commands to update certificates - self.__run_cert_shell_command(self.fwupd_refresh_cmd, "FwupdRefresh", raise_on_error=True) - self.__run_cert_shell_command(self.fwupd_update_cmd, "FwupdUpdate", raise_on_error=True) + self.__run_cert_shell_command(self.fwupd_refresh_cmd, step_name="FwupdRefresh", raise_on_error=True) + self.__run_cert_shell_command(self.fwupd_update_cmd, step_name="FwupdUpdate", raise_on_error=True) """ NOTE: They tooling used to update here is fwupd (firmware update manager). In this method of updating certs, the exact version of current certs is never pinned or set/referred while installing. fwupd fetches and installs latest available certs. This is beneficial because our code doesn't become dated in the future @@ -1015,11 +1015,11 @@ def __ensure_fwupd_installation(self): if installed_version != str(): self.composite_logger.log("[APM][Certs] Existing fwupd version is below minimum. Reinstalling latest. [InstalledVersion={0}][MinimumVersion={1}]" .format(installed_version, self.min_fwupd_version)) - self.__run_cert_apt_command(self.remove_fwupd_cmd, "RemoveOldFwupd", raise_on_error=True) + self.__run_cert_apt_command(self.remove_fwupd_cmd, step_name="RemoveOldFwupd", raise_on_error=True) else: self.composite_logger.log_debug("[APM][Certs] fwupd is not installed. Installing latest version.") - self.__run_cert_apt_command(self.install_fwupd_cmd, "InstallFwupd", raise_on_error=True) + self.__run_cert_apt_command(self.install_fwupd_cmd, step_name="InstallFwupd", raise_on_error=True) # Validate that the installed fwupd meets the minimum requirement. installed_version = self.__get_installed_fwupd_version() diff --git a/src/core/tests/Test_EnvLayer.py b/src/core/tests/Test_EnvLayer.py index cecc0ec8..c9041752 100644 --- a/src/core/tests/Test_EnvLayer.py +++ b/src/core/tests/Test_EnvLayer.py @@ -13,7 +13,7 @@ # limitations under the License. # # Requires Python 2.7+ -import io +import os import platform import sys import unittest @@ -85,6 +85,36 @@ def mock_linux_distribution_to_return_rhel_10(self): def mock_distro_os_release_attr_return_rhel_10(self, attribute): return '10.0' + + def mock_run_command_output_fde_true(self, cmd, no_output=False, chk_err=False): + return 0, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' + + def mock_run_command_output_fde_false(self, cmd, no_output=False, chk_err=False): + return 0, 'test-vm,/dev/sda1,FDE=false,LUKS:/dev/sda1' + + def mock_run_command_output_imds_true(self, cmd, no_output=False, chk_err=False): + return 0, '"securityProfile": { "encryptionAtHost": "false", "secureBootEnabled": "false", "securityType": "ConfidentialVM", "virtualTpmEnabled": "false"}' + + def mock_run_command_output_imds_false(self, cmd, no_output=False, chk_err=False): + return 0, '{"compute":{"securityProfile":{"securityType":""}}}' + + def mock_detect_confidential_vm_by_fde_returns_true(self): + return True, 'test-vm,/dev/sda1,FDE=true,LUKS:/dev/sda1' + + def mock_detect_confidential_vm_by_fde_returns_false(self): + return False, str() + + def mock_detect_confidential_vm_by_imds_returns_true(self): + return True, 'IMDS:ConfidentialVM' + + def mock_detect_confidential_vm_by_imds_returns_false(self): + return False, str() + + def mock_file_system_read_with_retry_returns_empty(self, file_path_or_handle, raise_if_not_found=True): + return str() + + def mock_os_path_isfile_returns_false(self, path): + return False # endregion def test_get_package_manager(self): @@ -138,6 +168,79 @@ def test_is_distro_azure_linux_3(self): # restore original methods distro.os_release_attr = self.backup_envlayer_distro_os_release_attr + def test_detect_confidential_vm_by_fde(self): + backup_run_command_output = self.envlayer.run_command_output + backup_read_with_retry = self.envlayer.file_system.read_with_retry + backup_os_path_isfile = os.path.isfile + + test_input_output_table = [ + [self.mock_run_command_output_fde_true, backup_os_path_isfile, backup_read_with_retry, False, True, 'FDE=true'], + [self.mock_run_command_output_fde_false, backup_os_path_isfile, backup_read_with_retry, False, False, str()], + [self.mock_run_command_output_fde_true, backup_os_path_isfile, self.mock_file_system_read_with_retry_returns_empty, True, False, str()], + [self.mock_run_command_output_fde_true, self.mock_os_path_isfile_returns_false, backup_read_with_retry, True, False, str()], + ] + + for row in test_input_output_table: + self.envlayer.run_command_output = row[0] + os.path.isfile = row[1] + self.envlayer.file_system.read_with_retry = row[2] + expected_raises_exception = row[3] + expected_is_confidential_vm = row[4] + expected_detection_details = row[5] + + if expected_raises_exception: + self.assertRaises(Exception, self.envlayer.detect_confidential_vm_by_fde) + else: + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_fde() + self.assertEqual(is_confidential_vm, expected_is_confidential_vm) + self.assertIn(expected_detection_details, detection_details) + + self.envlayer.run_command_output = backup_run_command_output + self.envlayer.file_system.read_with_retry = backup_read_with_retry + os.path.isfile = backup_os_path_isfile + + def test_detect_confidential_vm_by_imds(self): + backup_run_command_output = self.envlayer.run_command_output + + test_input_output_table = [ + [self.mock_run_command_output_imds_true, True, 'IMDS:ConfidentialVM'], + [self.mock_run_command_output_imds_false, False, str()], + ] + + for row in test_input_output_table: + self.envlayer.run_command_output = row[0] + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm_by_imds() + self.assertEqual(is_confidential_vm, row[1]) + self.assertIn(row[2], detection_details) + + self.envlayer.run_command_output = backup_run_command_output + + def test_detect_confidential_vm(self): + self.backup_platform_system = platform.system + + backup_detect_confidential_vm_by_fde = self.envlayer.detect_confidential_vm_by_fde + backup_detect_confidential_vm_by_imds = self.envlayer.detect_confidential_vm_by_imds + + test_input_output_table = [ + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_true, True, 'IMDS:ConfidentialVM'], + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_true, self.mock_detect_confidential_vm_by_imds_returns_false, True, 'FDE=true'], + ["Windows", self.mock_run_command_output_fde_true, self.mock_run_command_output_imds_true, False, str()], + ["Linux", self.mock_detect_confidential_vm_by_fde_returns_false, self.mock_detect_confidential_vm_by_imds_returns_false, False, str()], + ] + + for row in test_input_output_table: + platform.system = self.mock_platform_system if row[0] == 'Linux' else self.mock_platform_system_windows + self.envlayer.detect_confidential_vm_by_fde = row[1] + self.envlayer.detect_confidential_vm_by_imds = row[2] + is_confidential_vm, detection_details = self.envlayer.detect_confidential_vm() + self.assertEqual(is_confidential_vm, row[3]) + self.assertIn(row[4], detection_details) + + # restore original methods + platform.system = self.backup_platform_system + self.envlayer.detect_confidential_vm_by_fde = backup_detect_confidential_vm_by_fde + self.envlayer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds + def test_filesystem(self): # only validates if these invocable without exceptions backup_retry_count = Constants.MAX_FILE_OPERATION_RETRY_COUNT diff --git a/src/core/tests/Test_PatchInstaller.py b/src/core/tests/Test_PatchInstaller.py index 45900263..affbe97d 100644 --- a/src/core/tests/Test_PatchInstaller.py +++ b/src/core/tests/Test_PatchInstaller.py @@ -35,6 +35,12 @@ def tearDown(self): # region Mocks def mock_update_certs_raise_exception(self): raise Exception("Simulated cert update failure") + + def mock_detect_confidential_vm_raises_exception(self): + raise Exception("Simulated VM detection failure") + + def mock_detect_confidential_vm_by_imds_returns_true(self): + return True, 'IMDS:ConfidentialVM' # endregion # region Utility functions (update cert tests) @@ -816,6 +822,30 @@ def test_try_update_certs_swallows_exception_from_update_certs(self): runtime.patch_installer.package_manager.update_certs = backup_up_update_certs runtime.stop() + + def test_try_update_certificates_skips_confidential_vm(self): + runtime = self._create_update_certs_runtime(enable_uefi_cert_update=True, health_store_id="pub_off_sku_2025.01.01") + backup_detect_confidential_vm_by_imds = runtime.env_layer.detect_confidential_vm_by_imds + + runtime.env_layer.detect_confidential_vm_by_imds = self.mock_detect_confidential_vm_by_imds_returns_true + method_called = self._track_method_call(runtime.patch_installer.package_manager, 'update_certs') + runtime.patch_installer.start_installation(simulate=True) + self.assertEqual(len(method_called), 0) + + runtime.env_layer.detect_confidential_vm_by_imds = backup_detect_confidential_vm_by_imds + runtime.stop() + + def test_try_update_certificates_skips_when_detect_confidential_vm_raises_exception(self): + runtime = self._create_update_certs_runtime(enable_uefi_cert_update=True, health_store_id="pub_off_sku_2025.01.01") + backup_detect_confidential_vm = runtime.env_layer.detect_confidential_vm + + runtime.env_layer.detect_confidential_vm = self.mock_detect_confidential_vm_raises_exception + method_called = self._track_method_call(runtime.patch_installer.package_manager, 'update_certs') + runtime.patch_installer.start_installation(simulate=True) + self.assertEqual(len(method_called), 0) + + runtime.env_layer.detect_confidential_vm = backup_detect_confidential_vm + runtime.stop() # endregion diff --git a/src/tools/Package-Core.py b/src/tools/Package-Core.py index afbcc01f..1404e6d0 100644 --- a/src/tools/Package-Core.py +++ b/src/tools/Package-Core.py @@ -238,6 +238,15 @@ def main(argv): external_dependencies_source_code_path = os.path.join(source_code_path, 'external_dependencies') add_external_dependencies(external_dependencies_destination, external_dependencies_source_code_path) + # Copy core shim files + enforce UNIX style line endings + print('\n========== Copying core shim files + enforcing UNIX style line endings.\n') + core_shim_files = ['DetectConfidentialVmShim.sh'] + for core_shim_file in core_shim_files: + core_shim_src = os.path.join(working_directory, 'core', 'src', 'bootstrap', core_shim_file) + core_shim_destination = os.path.join(working_directory, 'out', core_shim_file) + shutil.copyfile(core_shim_src, core_shim_destination) + replace_text_in_file(core_shim_destination, '\r\n', '\n') + except Exception as error: print('Exception during packaging all python modules in core: ' + repr(error)) raise