diff --git a/tools/README.md b/tools/README.md
index c2a6e56ff..98522920b 100644
--- a/tools/README.md
+++ b/tools/README.md
@@ -3,33 +3,121 @@
The `tools/` folder is intended for helpful tools and scripts that aren't
part of the main oracle-toolkit codebase.
-## gen_patch_metadata
+## gen_patch_metadata.py
-`gen_patch_metadata` retrieves patches from My Oracle Support, parses our
-version and hash information, and prepares `rdbms_patches` and `gi_patches`
-structures for `roles/common/defaults/main.yml`.
+`gen_patch_metadata.py` is a maintainer script used to add metadata for new Oracle patch bundles.
-### Sample usage
+It has two primary functions:
+1. **Command-Line Tool:** When run directly, it downloads a *new* patch from My Oracle Support (MOS), parses its version and hash information, and generates the YAML snippets required for the toolkit.
+2. **Importable Module:** It provides a `parse_patch` function that can be imported by other scripts (like `test_patch_parser.py`) to validate patch-parsing logic.
-```
-$ python3 gen_patch_metadata.py --patch 33567274 --mosuser user@example.com
+### Sample Usage (Adding a New Patch)
+
+This workflow is for **adding a new patch** to the toolkit.
+
+```bash
+$ python3 gen_patch_metadata.py --patch 35742441 --mosuser user@example.com
MOS Password:
-INFO:root:Downloading https://updates.oracle.com/Orion/Download/process_form/p33567274_190000_Linux-x86-64.zip?file_id=113789887&aru=24594397&userid=O-mfielding@google.com&email=user@example.com&patch_password=&patch_file=p33567274_190000_Linux-x86-64.zip
-INFO:root:Abstract: COMBO OF OJVM RU COMPONENT 19.14.0.0.220118 + GI RU 19.14.0.0.220118
-INFO:root:Found release = 19.14.0.0.220118 base = 19.3.0.0.0 GI subdir = 33509923 OJVM subdir = 33561310
-INFO:root:Downloading OPatch
-INFO:root:Downloading https://updates.oracle.com/Orion/Download/process_form/p6880880_190000_Linux-x86-64.zip?aru=24740828&file_id=112014090&patch_file=p6880880_190000_Linux-x86-64.zip&
-Please copy the following files to your GCS bucket: p33567274_190000_Linux-x86-64.zip p6880880_190000_Linux-x86-64.zip
-Add the following to the appropriate sections of roles/common/defaults/main.yml:
-
- gi_patches:
- - { category: "RU", base: "19.3.0.0.0", release: "19.14.0.0.220118", patchnum: "33567274", patchfile: "p33567274_190000_Linux-x86-64.zip", patch_subdir: "/33509923", prereq_check: FALSE, method: "opatchauto apply", ocm: FALSE, upgrade: FALSE, md5sum: "JgJsqbGaGcxEPEP6j79BPQ==" }
-
- rdbms_patches:
- - { category: "RU_Combo", base: "19.3.0.0.0", release: "19.14.0.0.220118", patchnum: "33567274", patchfile: "p33567274_190000_Linux-x86-64.zip", patch_subdir: "/33561310", prereq_check: TRUE, method: "opatch apply", ocm: FALSE, upgrade: TRUE, md5sum: "JgJsqbGaGcxEPEP6j79BPQ==" }
+INFO: Authenticating with MOS...
+INFO: Downloading main patch 35742441...
+INFO: Downloading p35742441_190000_Linux-x86-64.zip from updates.oracle.com
+INFO: Successfully downloaded p35742441_190000_Linux-x86-64.zip
+INFO: Calculating MD5 for p35742441_190000_Linux-x86-64.zip...
+INFO: Calculated MD5 digest: 83s+HwWwloTKy0+i2s3fLg==
+INFO: Abstract: COMBO OF OJVM RU COMPONENT 19.21.0.0.231017 + GI RU 19.21.0.0.231017
+INFO: Found numeric subdirectories: {'35648110', '35642822'}
+INFO: Assigned 'Other' subdir based on clear keywords: /35642822
+INFO: --- Patch Analysis Results ---
+INFO: Base Release: 19.3.0.0.0
+INFO: Patch Release: 19.21.0.0.231017
+INFO: "Other" Subdir: /35642822 (This is likely the GI or DB_RU component)
+INFO: "OJVM" Subdir: /35648110
+INFO: --------------------------------
+INFO: Downloading OPatch (Patch 6880880) for release 19.3.0.0.0
+INFO: Found specific OPatch URL: ...p6880880_190000_Linux-x86-64.zip...
+INFO: Using local copy of OPatch file p6880880_190000_Linux-x86-64.zip
+
+# === SCRIPT OUTPUT: Copy files and update YAML ===
+
+# 1. Copy the following files to your GCS bucket:
+# p35742441_190000_Linux-x86-64.zip p6880880_190000_Linux-x86-64.zip
+
+# 2. Add the following to roles/common/defaults/main/ files:
+# (Review the abstract to make the correct selections!)
+#
+# Abstract: COMBO OF OJVM RU COMPONENT 19.21.0.0.231017 + GI RU 19.21.0.0.231017
+
+# --- SELECTION 1: Choose the NON-OJVM component (GI or DB) ---
+# --- This component is in subdir: /35642822 ---
+
+# 1A: If this is a GI Patch (RU), add to 'gi_patches.yml':
+# gi_patches:
+# - { category: "RU", base: "19.3.0.0.0", release: "19.21.0.0.231017", patchnum: "35742441", patchfile: "p35742441_190000_Linux-x86-64.zip", patch_subdir: "/35642822", prereq_check: FALSE, method: "opatchauto apply", ocm: FALSE, upgrade: FALSE, md5sum: "83s+HwWwloTKy0+i2s3fLg==" }
+
+# 1B: If this is an RDBMS Patch (DB_RU), add to 'rdbms_patches.yml':
+# rdbms_patches:
+# - { category: "DB_RU", base: "19.3.0.0.0", release: "19.21.0.0.231017", patchnum: "35742441", patchfile: "p35742441_190000_Linux-x86-64.zip", patch_subdir: "/35642822", prereq_check: TRUE, method: "opatch apply", ocm: FALSE, upgrade: TRUE, md5sum: "83s+HwWwloTKy0+i2s3fLg==" }
+
+# --- SELECTION 2: Choose the OJVM component ---
+# --- This component is in subdir: /35648110 ---
+
+# 2A: If OJVM is from a GI Combo (RU_Combo), add to 'rdbms_patches.yml':
+# rdbms_patches:
+# - { category: "RU_Combo", base: "19.3.0.0.0", release: "19.21.0.0.231017", patchnum: "35742441", patchfile: "p35742441_190000_Linux-x86-64.zip", patch_subdir: "/35648110", prereq_check: TRUE, method: "opatch apply", ocm: FALSE, upgrade: TRUE, md5sum: "83s+HwWwloTKy0+i2s3fLg==" }
+
+# 2B: If this is an OJVM + DB RU (DB_OJVM_RU), add to 'rdbms_patches.yml':
+# rdbms_patches:
+# - { category: "DB_OJVM_RU", base: "19.3.0.0.0", release: "19.21.0.0.231017", patchnum: "35742441", patchfile: "p35742441_190000_Linux-x86-64.zip", patch_subdir: "/35648110", prereq_check: TRUE, method: "opatch apply", ocm: FALSE, upgrade: TRUE, md5sum: "83s+HwWwloTKy0+i2s3fLg==" }
+
+# === END SCRIPT OUTPUT ===
```
-### Known issues
+-----
+
+## test_patch_parser.py (Unit Tests)
+
+`test_patch_parser.py` is a unit test script that validates the parsing logic in `gen_patch_metadata.py`.
+
+### How It Works
+It reads *all* patch definitions from the toolkit's `gi_patches.yml` and `rdbms_patches.yml` files. For every 2-component combo patch, it:
+1. Downloads the corresponding `.zip` file from the **`gcp-oracle-software` GCS bucket** (it does **not** use MOS).
+2. Runs the `parse_patch` function on the downloaded file.
+3. Compares the parsed metadata (base release, patch release, and set of subdirectories) against the "ground truth" values from the YAML files.
+
+### How to Run the Unit Tests
+
+1. Navigate to the `tools/` directory:
+ ```bash
+ cd oracle-toolkit/tools
+ ```
+
+2. Install all required Python dependencies:
+ ```bash
+ pip install PyYAML google-cloud-storage beautifulsoup4 requests lxml
+ ```
+
+3. Authenticate with GCS. This is **required** to download the test patches.
+ ```bash
+ gcloud auth application-default login
+ ```
+
+4. Run the unit test script:
+ ```bash
+ python3 test_patch_parser.py
+ ```
+
+### Understanding the Test Output
+
+* **`OK`**: If the test finishes with `OK`, it means all patch validations passed successfully.
+* **`INFO: Skipping ...: Not a 2-component combo patch.`**: This is **normal**. The test is designed to *only* validate 2-component combo patches (common for 19c and earlier). It correctly identifies and skips single-component patches (like 21c+ RUs).
+* **`WARNING: Skipping test for obsolete/unavailable patch: ...`**: This is also **normal**. It confirms the test is correctly skipping specific old 12.1.0.2 patches that are no longer available for download.
+* **`ERROR: ... ambiguous` / `WARNING: GUESSING...`**: These messages are **expected**. They come from the `gen_patch_metadata.py` parser when its README analysis isn't 100% certain which subdir is OJVM.
+ * The unit test is designed to handle this. It uses an `assertSetEqual` check to confirm that the *set* of subdirs found by the parser (e.g., `{"/12345", "/67890"}`) is correct, even if the "guess" for OJVM was wrong.
+ * As long as you see `INFO: SUCCESS: ...` after these warnings, the test has passed.
+
+---
+
+## Known Issues
-- Only tested against 12.2, 18c, and 19c patches.
-- No support for multi-file patches.
+* The MOS download logic in `gen_patch_metadata.py` does not support multi-file patches (it will only download the first file).
+* The parser (`parse_patch`) is designed for 2-component combo patches (e.g., 11.2-19c) and is not intended for single-component RUs (e.g., 21c+). The unit test correctly skips these.
diff --git a/tools/gen_patch_metadata.py b/tools/gen_patch_metadata.py
index 19dd765b5..fd72a6500 100644
--- a/tools/gen_patch_metadata.py
+++ b/tools/gen_patch_metadata.py
@@ -1,6 +1,12 @@
#!/usr/bin/python3
-"""gen_patch_metadata.py is a helper script for toolkit maintainers to add metadata for upstream patches.
"""
+gen_patch_metadata.py is a helper script for toolkit maintainers to add
+metadata for new Oracle patch bundles.
+
+This script can be run directly to generate new patch metadata, or
+imported as a module (e.g., by unit tests) to use its parsing functions.
+"""
+
import argparse
import base64
import getpass
@@ -10,146 +16,494 @@
import re
import shutil
import typing
-import urllib
+import urllib.parse
import zipfile
-import bs4
-import requests
+# Import third-party libraries
+try:
+ import bs4
+ import requests
+except ImportError:
+ print("Error: Missing required libraries. Please run:")
+ print("pip install beautifulsoup4 requests lxml")
+ exit(1)
+# --- Constants ---
+
+# Use a standard browser User-Agent to appear as a regular user to MOS.
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36'
-SEARCH_FORM = 'https://updates.oracle.com/Orion/SimpleSearch/process_form?search_type=patch&patch_number=%d&plat_lang=226P'
-DOWNLOAD_URL = r'https://updates.oracle.com/Orion/Download/process_form[^\"]*'
+
+# MOS login and search URLs.
LOGIN_FORM = 'https://updates.oracle.com/Orion/SavedSearches/switch_to_simple'
+SEARCH_FORM = 'https://updates.oracle.com/Orion/SimpleSearch/process_form?search_type=patch&patch_number=%d&plat_lang=226P'
+
+# Regex to find the download link on the patch search results page.
+DOWNLOAD_URL_RE = r'https://updates\.oracle\.com/Orion/Download/process_form[^\"]*'
+
+# Patch number for the generic OPatch utility.
+OPATCH_PATCHNUM = 6880880
+
+# --- MOS Interaction Functions ---
+
+def get_patch_auth(s: requests.Session) -> None:
+ """
+ Authenticates the requests.Session against the MOS login form.
+ This is a "pre-flight" check to establish an authenticated session.
+ """
+ r = s.get(LOGIN_FORM, allow_redirects=False)
+ if 'location' in r.headers:
+ # Perform the two-step login redirect to get the auth cookies.
+ r = s.get(r.headers['Location'])
+ assert r.status_code == 200, f'Got HTTP {r.status_code} on auth attempt'
+
+def get_patch_url(s: requests.Session, patchnum: int) -> typing.List[str]:
+ """
+ Finds all available download URLs for a specific patch number.
+ """
+ search_url = SEARCH_FORM % patchnum
+ r = s.get(search_url, allow_redirects=False)
+ if 'location' in r.headers:
+ # Handle redirects, which can happen post-login
+ r = s.get(r.headers['Location'])
+
+ assert r.status_code == 200, f'Got HTTP {r.status_code} retrieving {search_url}'
+
+ urls = re.findall(DOWNLOAD_URL_RE, str(r.content))
+ assert urls, f'Could not find any download URLs for patch {patchnum}. Is it correct?'
+ return urls
+
+def download_patch(s: requests.Session, url: str, patch_file: str) -> None:
+ """
+ Downloads a given URL to a local file, streaming the response.
+ """
+ logging.info(f'Downloading {patch_file} from {url}')
+ # Use a retry adapter for network resilience
+ s.mount(url, requests.adapters.HTTPAdapter(max_retries=3))
+
+ try:
+ with s.get(url, stream=True) as r:
+ r.raise_for_status() # Raise an exception for bad HTTP status
+ with open(patch_file, 'wb') as f:
+ shutil.copyfileobj(r.raw, f)
+ logging.info(f'Successfully downloaded {patch_file}')
+ except requests.exceptions.RequestException as e:
+ logging.error(f"Failed to download {url}: {e}")
+ # Clean up partial file on failure
+ if os.path.exists(patch_file):
+ os.remove(patch_file)
+ raise
+
+# --- Patch Parsing Helper Functions ---
+
+def _parse_patch_xml(z: zipfile.ZipFile) -> typing.Tuple[str, str, str]:
+ """
+ Parses PatchSearch.xml to get base release, patch release, and abstract.
+ """
+ try:
+ with z.open('PatchSearch.xml') as f:
+ content = f.read()
+
+ # Use html.parser as a fallback for potentially malformed XML
+ try:
+ c = bs4.BeautifulSoup(content, 'xml')
+ except Exception:
+ c = bs4.BeautifulSoup(content, 'html.parser')
+
+ abstract_tag = c.find('abstract')
+ if not abstract_tag:
+ raise ValueError("Tag 'abstract' not found in PatchSearch.xml.")
+ abstract = abstract_tag.get_text()
+
+ # Extract full patch release (e.g., 19.17.0.0.221018) from abstract
+ patch_release_match = re.search(r' (\d+\.\d+\.\d+\.\d+\.\d+) ', abstract)
+ if not patch_release_match:
+ # Fallback for 21c+ patches that might not have the 5-part version
+ patch_release_match = re.search(r' (\d+\.\d+\.\d+\.\d+) ', abstract)
+ if not patch_release_match:
+ raise ValueError("Could not extract patch release version from abstract.")
+ patch_release = patch_release_match.group(1)
+
+ # Extract base release (e.g., 19.0.0.0.0)
+ release_tag = c.find('release')
+ if not release_tag or 'name' not in release_tag.attrs:
+ raise ValueError("Tag 'release' or 'name' attribute not found.")
+ release = release_tag['name']
+
+ return release, patch_release, abstract
+
+ except KeyError:
+ raise FileNotFoundError("'PatchSearch.xml' not found in zip file.")
+ except Exception as e:
+ raise ValueError(f"Error parsing PatchSearch.xml: {e}")
+
+def _find_patch_subdirs(z: zipfile.ZipFile, patchnum: int) -> typing.Set[str]:
+ """
+ Finds the set of numeric subdirectories inside the main patch directory.
+ (e.g., "34449117/34411846/" -> "34411846")
+ """
+ found_subdirs = set()
+ # Match patchnum/12345/
+ subdir_pattern = re.compile(fr'^{patchnum}/(\d+)/')
+
+ for item in z.namelist():
+ match = subdir_pattern.match(item)
+ if match:
+ found_subdirs.add(match.group(1))
+
+ logging.info(f"Found numeric subdirectories: {found_subdirs}")
+ return found_subdirs
-def get_patch_auth(s: requests.models.Request) -> typing.List[str]:
- """Obtains auth for login in order to download patches."""
- r = s.get(LOGIN_FORM, allow_redirects=False)
- if 'location' in r.headers:
- # Do two separate requests to force auth on second request
- r = s.get(r.headers['Location'])
- assert r.status_code == 200, f'Got HTTP code {r.status_code} retrieving {LOGIN_FORM}'
- url = re.findall(LOGIN_FORM, str(r.content))
- return url
-
-def get_patch_url(s: requests.models.Request, patchnum: int) -> typing.List[str]:
- """Retrieves a download URL for a given patch number."""
- r = s.get(SEARCH_FORM % patchnum, allow_redirects=False)
- if 'location' in r.headers:
- # Do two separate requests to force auth on second request
- r = s.get(r.headers['Location'])
-
- assert r.status_code == 200, f'Got HTTP code {r.status_code} retrieving {SEARCH_FORM}'
-
- url = re.findall(DOWNLOAD_URL, str(r.content))
- assert url, f'Could not get a download URL from the patch form {SEARCH_FORM}; is the patch number correct?'
- return url
-
-
-def download_patch(s: requests.models.Request, url: str, patch_file: str) -> None:
- """Downloads a given URL to a local file."""
- logging.info('Downloading %s', url)
- s.mount(url, requests.adapters.HTTPAdapter(max_retries=3))
- with s.get(url, stream=True) as r:
- with open(patch_file, 'wb') as f:
- shutil.copyfileobj(r.raw, f)
-
-
-def parse_patch(patch_file: str, patchnum: int) -> (str, str, str, str):
- """Parses out the release, base release, and GI/OJVM subdirectories from a patch zip file."""
- with zipfile.ZipFile(patch_file, 'r') as z:
- with z.open('PatchSearch.xml') as f:
- c = bs4.BeautifulSoup(f.read(), 'xml')
- abstract = c.find('abstract').get_text()
- assert 'COMBO OF OJVM' in abstract, f'Patch {patchnum} abstract {abstract} does not look like an OJVM combo'
- logging.info('Abstract: %s', abstract)
- patch_release = re.findall(r' (\d+\.\d+\.\d+\.\d+\.\d+) ', abstract)[0]
- release = c.find('release')['name']
- for fname in z.namelist():
- m = re.search(fr'^{patchnum}/(\d+)/README.html', fname)
- if m:
- logging.debug('Found readme file: %s', fname)
- with z.open(fname) as f:
- c = bs4.BeautifulSoup(f.read(), 'lxml')
- logging.debug('Found title: %s', c.find('title').get_text())
- if 'JavaVM' in c.find('title').get_text():
- ojvm_subdir = m.group(1)
- elif 'GI ' in c.find('title').get_text() or 'Grid Infrastructure' in c.find('title').get_text() or 'GI ' in c.find(string = re.compile("GI Release Update")).get_text():
- gi_subdir = m.group(1)
- assert 'ojvm_subdir' in locals(), f'Could not find an OJVM patch molecule in {patch_file}'
- assert 'gi_subdir' in locals(), f'Could not find a GI patch molecule in {patch_file}'
- return(release, patch_release, ojvm_subdir, gi_subdir)
+def _read_and_decode_readme(z: zipfile.ZipFile, readme_path: str) -> str:
+ """
+ Reads a file from a zip and attempts to decode it using common encodings.
+ """
+ try:
+ with z.open(readme_path) as f:
+ content = f.read()
+ for encoding in ['utf-8', 'latin-1', 'cp1252']:
+ try:
+ return content.decode(encoding)
+ except UnicodeDecodeError:
+ continue
+ logging.warning(f"Could not decode {readme_path} with any known encoding.")
+ except Exception as e:
+ logging.warning(f"Error reading {readme_path} from zip: {e}")
+ return ""
+def _extract_text_from_readme(decoded_content: str, is_html: bool) -> str:
+ """
+ Extracts searchable, lower-case text from README content.
+ If HTML, it combines text from both the
and .
+ """
+ search_text = decoded_content.lower()
+ if is_html:
+ try:
+ # *** Use 'lxml' for parsing HTML ***
+ soup = bs4.BeautifulSoup(decoded_content, 'lxml')
+ title_text = soup.find('title').get_text().lower().strip() if soup.find('title') else ""
+ body_text = soup.get_text().lower()
+ search_text = title_text + " " + body_text # Combine for better matching
+ except Exception as e:
+ logging.warning(f"Error parsing HTML README: {e}")
+ pass # Fallback to using the raw decoded content
+ return search_text.strip()
+
+def parse_patch(patch_file: str, patchnum: int) -> typing.Tuple[str, str, str, str, str]:
+ """
+ Parses patch metadata: release info, abstract, and component subdirectories.
+
+ This function is robust:
+ - It reads `PatchSearch.xml` for definitive release info.
+ - It finds *all* component subdirs (e.g., GI, DB, OJVM).
+ - It analyzes `README.html` and `README.txt` files to identify
+ which subdir is for OJVM and which is for the "Other" component (GI or DB).
+
+ Returns:
+ (release, patch_release, ojvm_subdir, other_subdir, abstract)
+ """
+ if not zipfile.is_zipfile(patch_file):
+ raise ValueError(f"File '{patch_file}' is not a valid zip file.")
+
+ with zipfile.ZipFile(patch_file, 'r') as z:
+
+ # --- 1. Get Base Info from PatchSearch.xml ---
+ release, patch_release, abstract = _parse_patch_xml(z)
+ logging.info(f'Abstract: {abstract}')
+
+ # --- 2. Find all numeric subdirectories ---
+ found_subdirs = _find_patch_subdirs(z, patchnum)
+
+ # Handle 21c+ single-component RUs which don't have numbered subdirs
+ if not found_subdirs and release.startswith('21'):
+ logging.info("Found 0 subdirs, assuming 21c-style patch with root subdir '/'")
+ # Return root for both, test logic will validate against YAML
+ return release, patch_release, "/", "/", abstract
+
+ if len(found_subdirs) != 2:
+ raise ValueError(
+ f"Expected exactly 2 numeric subdirectories under '{patchnum}/', "
+ f"but found {len(found_subdirs)}: {found_subdirs}. Cannot proceed."
+ )
+
+ # --- 3. Identify OJVM vs. Other component using README analysis ---
+ readme_analysis = {} # Stores analysis results for each subdir
+ subdir_list = list(found_subdirs)
+
+ for subdir_num in subdir_list:
+ analysis = {'is_likely_ojvm': False, 'is_likely_other': False}
+
+ # Find README.html or README.txt
+ readme_path = next((f'{patchnum}/{subdir_num}/README.{ext}' for ext in ['html', 'txt']
+ if f'{patchnum}/{subdir_num}/README.{ext}' in z.namelist()), None)
+
+ if not readme_path:
+ logging.warning(f"No README found for subdir {subdir_num}")
+ readme_analysis[subdir_num] = analysis
+ continue
+
+ # Read, decode, and extract text from the README
+ decoded_content = _read_and_decode_readme(z, readme_path)
+ if not decoded_content:
+ readme_analysis[subdir_num] = analysis
+ continue
+
+ search_text = _extract_text_from_readme(
+ decoded_content,
+ is_html=readme_path.lower().endswith('.html')
+ )
+ if not search_text:
+ readme_analysis[subdir_num] = analysis
+ continue
+
+ # Check for identifying keywords
+ has_ojvm_kw = 'javavm' in search_text or 'ojvm' in search_text
+ has_other_kw = any(kw in search_text for kw in
+ ['database', 'rdbms', 'db ru', 'gi ', 'grid infrastructure', 'gi release update'])
+
+ # Only flag as "likely" if keywords are NOT ambiguous
+ if has_ojvm_kw and not has_other_kw:
+ analysis['is_likely_ojvm'] = True
+ if has_other_kw and not has_ojvm_kw:
+ analysis['is_likely_other'] = True
+
+ readme_analysis[subdir_num] = analysis
+ logging.debug(f"Analysis for {subdir_num}: {analysis}")
+
+ # --- 4. Assign subdirs based on analysis ---
+ ojvm_subdir, other_subdir = None, None
+ clear_ojvm = [sd for sd, data in readme_analysis.items() if data['is_likely_ojvm']]
+ clear_other = [sd for sd, data in readme_analysis.items() if data['is_likely_other']]
+
+ if len(clear_ojvm) == 1:
+ # Clearly identified OJVM
+ ojvm_subdir = clear_ojvm[0]
+ other_subdir = next(s for s in subdir_list if s != ojvm_subdir)
+ logging.info(f"Assigned OJVM subdir based on clear keywords: /{ojvm_subdir}")
+ elif len(clear_other) == 1:
+ # Clearly identified Other (GI/DB)
+ other_subdir = clear_other[0]
+ ojvm_subdir = next(s for s in subdir_list if s != other_subdir)
+ logging.info(f"Assigned 'Other' subdir based on clear keywords: /{other_subdir}")
+ else:
+ # Ambiguous! Log an error and guess. The user MUST verify.
+ ojvm_subdir = subdir_list[0] # GUESS: Assign first as OJVM
+ other_subdir = subdir_list[1] # GUESS: Assign second as Other
+ logging.error("README analysis was ambiguous for both subdirectories.")
+ logging.warning(
+ f"GUESSING: Assigning /{ojvm_subdir} as OJVM and /{other_subdir} as Other. "
+ "PLEASE VERIFY MANUALLY!"
+ )
+
+ # *** FIX: Return subdir with leading slash, as expected in YAML ***
+ return release, patch_release, f"/{ojvm_subdir}", f"/{other_subdir}", abstract
+
+# --- OPatch Download Function ---
+
+def download_opatch(s: requests.Session, base_release: str) -> str:
+ """
+ Downloads the latest OPatch utility for a given base release.
+
+ Returns:
+ The filename of the downloaded OPatch zip.
+ """
+ logging.info(f'Downloading OPatch (Patch {OPATCH_PATCHNUM}) for release {base_release}')
+ op_urls = get_patch_url(s, OPATCH_PATCHNUM)
+
+ release_major = base_release.split('.')[0] # e.g., "19" from "19.3.0.0.0"
+ op_patch_url = None
+ platform_str = "Linux-x86-64"
+
+ # Define patterns to find the *correct* OPatch for our DB release
+ patterns = [
+ # Most specific: p6880880_190000_Linux-x86-64.zip
+ re.compile(fr'p{OPATCH_PATCHNUM}_{release_major}0000_{platform_str}\.zip', re.IGNORECASE),
+ # Generic release + platform: ...release=19...Linux-x86-64...
+ re.compile(fr'release={release_major}.*{platform_str}', re.IGNORECASE),
+ re.compile(fr'{platform_str}.*release={release_major}', re.IGNORECASE)
+ ]
+
+ # Try to find a specific match first
+ specific_matches = [k for k in op_urls for pattern in patterns if pattern.search(k)]
+ if specific_matches:
+ op_patch_url = specific_matches[0]
+ logging.info(f"Found specific OPatch URL: {op_patch_url}")
+ else:
+ # Fallback: Find *any* Linux-x86-64 OPatch URL if specific one fails
+ logging.warning(f"Specific OPatch for release {release_major} not found. "
+ f"Trying generic {platform_str} fallback.")
+ generic_matches = [k for k in op_urls if platform_str.lower() in k.lower()]
+ if generic_matches:
+ op_patch_url = generic_matches[0]
+ logging.info(f"Found generic OPatch URL: {op_patch_url}")
+
+ assert op_patch_url, f'Could not find any suitable OPatch URL ({platform_str}) in {op_urls}'
+
+ # Extract the filename from the download URL's query parameters
+ op_patch_file_match = re.search(r'patch_file=([^&]+)', op_patch_url)
+ if not op_patch_file_match:
+ raise ValueError(f"Could not extract OPatch filename from URL: {op_patch_url}")
+ op_patch_file = op_patch_file_match.group(1)
+
+ # Download OPatch, skipping if a reasonably-sized file already exists
+ min_opatch_size_mb = 50
+ min_opatch_size_bytes = min_opatch_size_mb * 1024 * 1024
+
+ if os.path.exists(op_patch_file) and os.path.getsize(op_patch_file) > min_opatch_size_bytes:
+ logging.info(f"Using local copy of OPatch file {op_patch_file}")
+ else:
+ download_patch(s, op_patch_url, op_patch_file)
+
+ # Final size check
+ opatch_size = os.path.getsize(op_patch_file)
+ assert opatch_size > min_opatch_size_bytes, (
+ f'OPatch file {op_patch_file} is only {opatch_size} bytes; looks too small'
+ )
+
+ return op_patch_file
+
+# --- Main Execution Block ---
def main():
- ap = argparse.ArgumentParser()
- ap.add_argument('--patch', type=int, help='GI Combo OJVM patch number', required=True)
- ap.add_argument('--mosuser', type=str, help='MOS username', required=True)
- ap.add_argument('--debug', help='Debug logging', action=argparse.BooleanOptionalAction)
- args = ap.parse_args()
- logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO)
+ """
+ Main function to run the script from the command line.
+ """
+ # 1. --- Argument Parsing ---
+ ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
+ ap.add_argument('--patch', type=int, help='The main combo patch number to download and parse.', required=True)
+ ap.add_argument('--mosuser', type=str, help='My Oracle Support (MOS) username.', required=True)
+ ap.add_argument('--debug', help='Enable debug logging.', action='store_true')
+ args = ap.parse_args()
+
+ logging.basicConfig(
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format='%(levelname)s: %(message)s'
+ )
- patchnum = args.patch
- mosuser = args.mosuser
- mospwd = getpass.getpass(prompt='MOS Password: ')
+ patchnum = args.patch
+ mosuser = args.mosuser
+ try:
+ mospwd = getpass.getpass(prompt='MOS Password: ')
+ except Exception as e:
+ logging.error(f"Could not get password: {e}")
+ return
- s = requests.Session()
- s.headers.update({'User-Agent': USER_AGENT})
- s.auth = (mosuser, mospwd)
+ # 2. --- Setup MOS Session ---
+ try:
+ s = requests.Session()
+ s.headers.update({'User-Agent': USER_AGENT})
+ s.auth = (mosuser, mospwd)
+
+ get_patch_auth(s) # Authenticate the session
+ except Exception as e:
+ logging.error(f"Failed to authenticate with MOS: {e}")
+ return
- url = get_patch_auth(s)
- url = get_patch_url(s, patchnum)
- # Yes we ignore multipart patche:ws here.
- logging.debug('Found download URL: %s', url[0])
- patch_file = urllib.parse.parse_qs(urllib.parse.urlparse(url[0]).query)['patch_file'][0]
- logging.debug('url=%s patch_file=%s', url[0], patch_file)
- if os.path.exists(patch_file) and os.path.getsize(patch_file) > 2*1024*1024*1024:
- logging.info('Using local copy of patch file %s', patch_file)
- else:
- download_patch(s, url[0], patch_file)
+ # 3. --- Download Main Patch ---
+ try:
+ urls = get_patch_url(s, patchnum)
+ logging.debug(f'Found download URL(s): {urls}')
+
+ patch_file = urllib.parse.parse_qs(urllib.parse.urlparse(urls[0]).query)['patch_file'][0]
- size = os.path.getsize(patch_file)
- assert size > 2*1024*1024*1024, f'Output file {patch_file} is only {size} bytes in size; looks too small'
+ min_patch_size_gb = 2
+ min_patch_size_bytes = min_patch_size_gb * 1024 * 1024 * 1024
+
+ if os.path.exists(patch_file) and os.path.getsize(patch_file) > min_patch_size_bytes:
+ logging.info(f'Using local copy of patch file {patch_file}')
+ else:
+ download_patch(s, urls[0], patch_file)
- md5 = hashlib.md5()
- with open(patch_file, 'rb') as f:
- while chunk := f.read(1024*1024):
- md5.update(chunk)
+ size = os.path.getsize(patch_file)
+ assert size > min_patch_size_bytes, (
+ f'Output file {patch_file} is only {size} bytes; looks too small'
+ )
+ except Exception as e:
+ logging.error(f"Failed to download main patch {patchnum}: {e}")
+ return
- md5_digest = base64.b64encode(md5.digest()).decode('ascii')
- logging.debug('Calculated MD5 digest %s', md5_digest)
+ # 4. --- Calculate MD5 Checksum ---
+ logging.info(f"Calculating MD5 for {patch_file}...")
+ md5 = hashlib.md5()
+ with open(patch_file, 'rb') as f:
+ while chunk := f.read(1024*1024):
+ md5.update(chunk)
- (release, patch_release, ojvm_subdir, gi_subdir) = parse_patch(patch_file, patchnum)
+ md5_digest = base64.b64encode(md5.digest()).decode('ascii')
+ logging.info(f'Calculated MD5 digest: {md5_digest}')
- base_release = '19.3.0.0.0' if release == '19.0.0.0.0' else release
- logging.info('Found release = %s base = %s GI subdir = %s OJVM subdir = %s', patch_release, base_release, gi_subdir, ojvm_subdir)
+ # 5. --- Parse Patch Contents ---
+ try:
+ (release, patch_release, ojvm_subdir, other_subdir, abstract) = parse_patch(patch_file, patchnum)
+ except Exception as e:
+ logging.error(f"Failed to parse patch file {patch_file}: {e}")
+ logging.error("This patch may be a single-component patch or have an unexpected structure.")
+ return
- logging.info('Downloading OPatch')
- op_url = get_patch_url(s, 6880880)
+ base_release = '19.3.0.0.0' if release == '19.0.0.0.0' else release
+
+ logging.info(f'--- Patch Analysis Results ---')
+ logging.info(f' Base Release: {base_release}')
+ logging.info(f' Patch Release: {patch_release}')
+ logging.info(f' "Other" Subdir: {other_subdir} (This is likely the GI or DB_RU component)')
+ logging.info(f' "OJVM" Subdir: {ojvm_subdir}')
+ logging.info(f'--------------------------------')
- release = patch_file.split('_')[1]
- if release == '121020':
- release = '121010'
- matches = [k for k in op_url if release in k]
- assert len(matches) == 1, f'Could not find OPatch for release {release}; only got {op_url}'
+ # 6. --- Download OPatch ---
+ try:
+ op_patch_file = download_opatch(s, base_release)
+ except Exception as e:
+ logging.error(f"Failed to download OPatch: {e}")
+ op_patch_file = "OPATCH_DOWNLOAD_FAILED" # Set placeholder to continue
- op_patch_file = urllib.parse.parse_qs(urllib.parse.urlparse(matches[0]).query)['patch_file'][0]
- download_patch(s, matches[0], op_patch_file)
+ # 7. --- Generate Final YAML Output ---
+ yaml_output = []
+ yaml_output.append(f'\n# === SCRIPT OUTPUT: Copy files and update YAML ===')
+ yaml_output.append(f'\n# 1. Copy the following files to your GCS bucket:')
+ yaml_output.append(f'# {patch_file} {op_patch_file}')
+
+ yaml_output.append(f'\n# 2. Add the following to roles/common/defaults/main/ files:')
+ yaml_output.append(f'# (Review the abstract to make the correct selections!)')
+ yaml_output.append(f'#')
+ yaml_output.append(f'# Abstract: {abstract}')
+
+ yaml_output.append(f'\n# --- SELECTION 1: Choose the NON-OJVM component (GI or DB) ---')
+ yaml_output.append(f'# --- This component is in subdir: {other_subdir} ---')
+
+ # 1A: GI Patch Option
+ yaml_output.append(f'''
+# 1A: If this is a GI Patch (RU), add to 'gi_patches.yml':
+# gi_patches:
+# - {{ category: "RU", base: "{base_release}", release: "{patch_release}", patchnum: "{patchnum}", patchfile: "{patch_file}", patch_subdir: "{other_subdir}", prereq_check: false, method: "opatchauto apply", ocm: false, upgrade: false, md5sum: "{md5_digest}" }}''')
- size = os.path.getsize(patch_file)
- assert size > 100*1024*1024, f'OPatch output file {patch_file} is only {size} bytes in size; looks too small'
+ # 1B: DB_RU Patch Option
+ yaml_output.append(f'''
+# 1B: If this is an RDBMS Patch (DB_RU), add to 'rdbms_patches.yml':
+# rdbms_patches:
+# - {{ category: "DB_RU", base: "{base_release}", release: "{patch_release}", patchnum: "{patchnum}", patchfile: "{patch_file}", patch_subdir: "{other_subdir}", prereq_check: true, method: "opatch apply", ocm: false, upgrade: true, md5sum: "{md5_digest}" }}''')
- if not (base_release.startswith('19') or base_release.startswith('18') or base_release.startswith('12.2')):
- logging.warning('Base release %s has not been tested; the results may be incorrect.', base_release)
+ yaml_output.append(f'\n# --- SELECTION 2: Choose the OJVM component ---')
+ yaml_output.append(f'# --- This component is in subdir: {ojvm_subdir} ---')
+
+ # 2A: RU_Combo OJVM Option
+ yaml_output.append(f'''
+# 2A: If OJVM is from a GI Combo (RU_Combo), add to 'rdbms_patches.yml':
+# rdbms_patches:
+# - {{ category: "RU_Combo", base: "{base_release}", release: "{patch_release}", patchnum: "{patchnum}", patchfile: "{patch_file}", patch_subdir: "{ojvm_subdir}", prereq_check: true, method: "opatch apply", ocm: false, upgrade: true, md5sum: "{md5_digest}" }}''')
- print(f'Please copy the following files to your GCS bucket: {patch_file} {op_patch_file}')
- print(f'''Add the following to the appropriate sections of roles/common/defaults/main.yml:
+ # 2B: DB_OJVM_RU Patch Option
+ yaml_output.append(f'''
+# 2B: If this is an OJVM + DB RU (DB_OJVM_RU), add to 'rdbms_patches.yml':
+# rdbms_patches:
+# - {{ category: "DB_OJVM_RU", base: "{base_release}", release: "{patch_release}", patchnum: "{patchnum}", patchfile: "{patch_file}", patch_subdir: "{ojvm_subdir}", prereq_check: true, method: "opatch apply", ocm: false, upgrade: true, md5sum: "{md5_digest}" }}
+''')
+
+ yaml_output.append(f'# === END SCRIPT OUTPUT ===')
- gi_patches:
- - {{ category: "RU", base: "{base_release}", release: "{patch_release}", patchnum: "{patchnum}", patchfile: "{patch_file}", patch_subdir: "/{gi_subdir}", prereq_check: FALSE, method: "opatchauto apply", ocm: FALSE, upgrade: FALSE, md5sum: "{md5_digest}" }}
+ print("\n".join(yaml_output))
- rdbms_patches:
- - {{ category: "RU_Combo", base: "{base_release}", release: "{patch_release}", patchnum: "{patchnum}", patchfile: "{patch_file}", patch_subdir: "/{ojvm_subdir}", prereq_check: TRUE, method: "opatch apply", ocm: FALSE, upgrade: TRUE, md5sum: "{md5_digest}" }}
- ''')
+# This guard makes the script safely importable
if __name__ == '__main__':
- main()
+ main()
+
+
diff --git a/tools/test_patch_parser.py b/tools/test_patch_parser.py
new file mode 100644
index 000000000..eb77c2996
--- /dev/null
+++ b/tools/test_patch_parser.py
@@ -0,0 +1,310 @@
+#!/usr/bin/python3
+"""
+test_patch_parser.py: Unit and regression test for gen_patch_metadata.py.
+
+This test validates that the patch parsing logic in gen_patch_metadata.py
+correctly extracts metadata that matches the "ground truth" data stored in
+the toolkit's YAML files.
+
+It works by:
+1. Loading all patch definitions from gi_patches.yml and rdbms_patches.yml.
+2. Grouping patches by their shared .zip file.
+3. For each *unique* combo patch file, it:
+ a. Downloads the .zip from a specified GCS bucket (to avoid MOS).
+ b. Runs the `parse_patch` function on it.
+ c. Asserts that the parsed `base_release`, `patch_release`, and the *set*
+ of subdirectories (`ojvm_subdir`, `other_subdir`) match the values
+ from the YAML files. (Handles ambiguous README parsing).
+4. Cleans up all downloaded files.
+"""
+
+import os
+import unittest
+import yaml
+import logging
+import shutil
+from collections import defaultdict
+
+# Import third-party libraries
+try:
+ from google.cloud import storage
+except ImportError:
+ print("Error: Missing required libraries. Please run:")
+ print("pip install PyYAML google-cloud-storage")
+ exit(1)
+
+
+# Import the script we want to test
+import gen_patch_metadata
+
+# --- Configuration ---
+
+# Hardcoded GCS bucket for downloading patch zips
+GCS_BUCKET_NAME = "gcp-oracle-software"
+
+# Paths relative to the script's location (assuming it's in 'tools/')
+GI_PATCHES_YML = "../roles/common/defaults/main/gi_patches.yml"
+RDBMS_PATCHES_YML = "../roles/common/defaults/main/rdbms_patches.yml"
+DOWNLOAD_DIR = "./patch_test_temp"
+
+# Categories that represent the "OJVM" component of a combo patch
+OJVM_CATEGORIES = {"RU_Combo", "DB_OJVM_RU", "PSU_Combo"}
+
+# Categories that represent the "Other" (GI/DB) component of a combo patch
+OTHER_CATEGORIES = {"RU", "DB_RU", "PSU"}
+
+# --- Helper Function ---
+
+def load_patches_from_yaml(filepath: str, key: str) -> list:
+ """Loads a list of patch dictionaries from a YAML file."""
+ try:
+ with open(filepath, 'r') as f:
+ data = yaml.safe_load(f)
+ # Ensure the key exists and its value is a list
+ if data and key in data and isinstance(data[key], list):
+ return data[key]
+ elif data and key in data:
+ logging.warning(f"Expected a list under key '{key}' in {filepath}, but found {type(data[key])}.")
+ return [] # Return empty list if not a list
+ else:
+ logging.warning(f"Key '{key}' not found in {filepath}.")
+ return []
+ except FileNotFoundError:
+ logging.warning(f"Could not find YAML file: {filepath}")
+ except yaml.YAMLError as e:
+ logging.error(f"Error parsing YAML file {filepath}: {e}")
+ except Exception as e:
+ logging.error(f"Unexpected error loading {filepath}: {e}")
+ return []
+
+
+def group_combo_patches_for_testing() -> list:
+ """
+ Loads both YAML files and groups components by their shared patchfile.
+ Returns a list of patches to test.
+ """
+ patches_by_file = defaultdict(list)
+
+ # 1. Load all patches from both files
+ gi_patches = load_patches_from_yaml(GI_PATCHES_YML, 'gi_patches')
+ rdbms_patches = load_patches_from_yaml(RDBMS_PATCHES_YML, 'rdbms_patches')
+
+ # Filter out any non-dictionary items just in case YAML is malformed
+ all_patches = [p for p in (gi_patches + rdbms_patches) if isinstance(p, dict)]
+
+ # 2. Group by patchfile
+ for patch in all_patches:
+ patchfile = patch.get('patchfile')
+ if patchfile:
+ patches_by_file[patchfile].append(patch)
+ else:
+ logging.warning(f"Patch definition missing 'patchfile' key: {patch}")
+
+ # Skip known obsolete/unavailable 12.1.0.2 patches
+ OBSOLETE_PATCH_FILES = {
+ 'p32126899_121020_Linux-x86-64.zip',
+ 'p32579077_121020_Linux-x86-64.zip'
+ }
+
+ # 3. Create the final list of test cases
+ combo_patches_to_test = []
+ for patchfile, components in patches_by_file.items():
+
+ if patchfile in OBSOLETE_PATCH_FILES:
+ logging.warning(f"Skipping test for obsolete/unavailable patch: {patchfile}")
+ continue
+
+ # Ensure components list is not empty and first item is a dict
+ if not components or not isinstance(components[0], dict):
+ logging.warning(f"Skipping {patchfile}: Invalid component data found.")
+ continue
+
+ patchnum_str = str(components[0].get('patchnum', '0'))
+
+ # Test combo patches (pre-21c)
+ if len(components) == 2:
+ comp_a, comp_b = components
+
+ # Ensure both components are dictionaries before proceeding
+ if not isinstance(comp_a, dict) or not isinstance(comp_b, dict):
+ logging.warning(f"Skipping {patchfile}: Invalid component data for pair.")
+ continue
+
+ # Check essential keys exist
+ if not all(k in comp_a for k in ['base', 'release', 'category', 'patch_subdir']) or \
+ not all(k in comp_b for k in ['category', 'patch_subdir']):
+ logging.warning(f"Skipping {patchfile}: Missing required keys in component definitions.")
+ continue
+
+
+ # Normalize 19c base release
+ base_release = comp_a['base']
+ if base_release == '19.0.0.0.0':
+ base_release = '19.3.0.0.0'
+
+ test_case = {
+ 'patchfile': patchfile,
+ 'patchnum': int(patchnum_str),
+ 'base_release': base_release,
+ 'patch_release': comp_a['release'],
+ 'expected_ojvm_subdir': None,
+ 'expected_other_subdir': None
+ }
+
+ # Assign expected subdirs based on category
+ cat_a = comp_a.get('category')
+ cat_b = comp_b.get('category')
+
+ if cat_a in OJVM_CATEGORIES and cat_b in OTHER_CATEGORIES:
+ test_case['expected_ojvm_subdir'] = comp_a['patch_subdir']
+ test_case['expected_other_subdir'] = comp_b['patch_subdir']
+ elif cat_b in OJVM_CATEGORIES and cat_a in OTHER_CATEGORIES:
+ test_case['expected_ojvm_subdir'] = comp_b['patch_subdir']
+ test_case['expected_other_subdir'] = comp_a['patch_subdir']
+ else:
+ # Log only if categories are present but don't fit expected combo pattern
+ if cat_a and cat_b:
+ logging.warning(f"Skipping {patchfile}: Categories '{cat_a}' and '{cat_b}' do not form a recognized combo pattern.")
+ elif not cat_a or not cat_b:
+ logging.warning(f"Skipping {patchfile}: One or both components missing 'category' key.")
+ continue
+
+ # Ensure subdirs were successfully assigned
+ if test_case['expected_ojvm_subdir'] is None or test_case['expected_other_subdir'] is None:
+ logging.warning(f"Skipping {patchfile}: Could not reliably determine OJVM/Other subdirs from categories.")
+ continue
+
+ combo_patches_to_test.append(test_case)
+
+ elif len(components) > 2:
+ logging.warning(f"Skipping {patchfile}: Found {len(components)} entries for this file, expected 2 for a combo patch.")
+ else: # len(components) == 1
+ # This is a single-component patch (e.g., PSU_Combo 11.2 or 21c RU)
+ # The parse_patch() function is designed for combos, so we skip these.
+ logging.info(f"Skipping {patchfile}: Not a 2-component combo patch.")
+
+ return combo_patches_to_test
+
+# --- Test Case Class ---
+
+class TestPatchParser(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ """Called once before all tests."""
+ logging.info("Loading and grouping patch metadata for testing...")
+ cls.patches_to_test = group_combo_patches_for_testing()
+ if not cls.patches_to_test:
+ # Changed to warning + skip instead of raising error, allows tests to run partially
+ logging.warning("No combo patch files found to test. Check YAML paths and contents. Skipping tests.")
+ cls.patches_to_test = [] # Ensure it's an empty list
+ # raise RuntimeError("No patch files found to test. Check YAML paths and contents.")
+
+ logging.info(f"Found {len(cls.patches_to_test)} unique combo patches to test.")
+
+ # Initialize bucket to None, attempt connection only if needed
+ cls.bucket = None
+ if cls.patches_to_test: # Only connect if there are tests to run
+ try:
+ storage_client = storage.Client()
+ cls.bucket = storage_client.bucket(GCS_BUCKET_NAME)
+ if not cls.bucket.exists():
+ raise RuntimeError(f"GCS Bucket '{GCS_BUCKET_NAME}' does not exist or you lack permissions.")
+ except Exception as e:
+ logging.error(f"Failed to connect to GCS: {e}")
+ # Don't raise here, allow tests to potentially fail individually
+ cls.bucket = None # Ensure bucket is None if connection failed
+
+ # Create a temp dir for downloads
+ os.makedirs(DOWNLOAD_DIR, exist_ok=True)
+ logging.info(f"Using temp download directory: {DOWNLOAD_DIR}")
+
+ @classmethod
+ def tearDownClass(cls):
+ """Called once after all tests."""
+ logging.info(f"Cleaning up temp directory: {DOWNLOAD_DIR}")
+ try:
+ shutil.rmtree(DOWNLOAD_DIR)
+ except Exception as e:
+ logging.error(f"Could not clean up {DOWNLOAD_DIR}: {e}")
+
+ def test_patch_parsing_against_yaml(self):
+ """
+D Iterates all combo patches, downloads from GCS, and validates parsing.
+ """
+ if not self.patches_to_test:
+ self.skipTest("No combo patches were loaded for testing.")
+
+ if self.bucket is None:
+ self.fail("Could not connect to GCS bucket. See previous errors.")
+
+
+ failures = []
+ for patch_data in self.patches_to_test:
+ patchfile = patch_data['patchfile']
+ local_path = os.path.join(DOWNLOAD_DIR, patchfile)
+
+ # Use subTest to run each patch as an independent test
+ with self.subTest(patchfile=patchfile):
+ logging.info(f"--- Testing Patch: {patchfile} ---")
+ try:
+ # 1. Download from GCS
+ logging.info(f"Downloading {patchfile} from GCS...")
+ blob = self.bucket.blob(patchfile)
+ if not blob.exists():
+ raise FileNotFoundError(f"{patchfile} not found in bucket {GCS_BUCKET_NAME}")
+ blob.download_to_filename(local_path)
+
+ self.assertTrue(os.path.exists(local_path))
+
+ # 2. Run the parser
+ logging.info(f"Parsing {patchfile}...")
+ (release, patch_release, ojvm_subdir, other_subdir, _) = \
+ gen_patch_metadata.parse_patch(local_path, patch_data['patchnum'])
+
+ # Normalize base release (e.g., 19.0.0.0.0 -> 19.3.0.0.0)
+ base_release = '19.3.0.0.0' if release == '19.0.0.0.0' else release
+
+ # 3. Compare results
+ logging.info(f"Validating parsed data against YAML...")
+ self.assertEqual(base_release, patch_data['base_release'], "Base release mismatch")
+ self.assertEqual(patch_release, patch_data['patch_release'], "Patch release mismatch")
+
+ # Compare sets of subdirs
+ # This handles cases where the parser guessed the OJVM/Other assignment incorrectly
+ parsed_subdirs = {ojvm_subdir, other_subdir}
+ expected_subdirs = {patch_data['expected_ojvm_subdir'], patch_data['expected_other_subdir']}
+ self.assertSetEqual(parsed_subdirs, expected_subdirs,
+ f"Subdirectory mismatch. Parsed: {parsed_subdirs}, Expected: {expected_subdirs}")
+
+ logging.info(f"SUCCESS: {patchfile}")
+
+ except Exception as e:
+ logging.error(f"FAILED: {patchfile}\n{e}")
+ # Include assertion details if available
+ error_msg = str(e)
+ if isinstance(e, AssertionError):
+ # unittest adds extra context, use that
+ failures.append(f"{patchfile}: {error_msg}")
+ else:
+ failures.append(f"{patchfile}: {type(e).__name__}: {error_msg}")
+
+ finally:
+ # 4. Clean up the zip file
+ if os.path.exists(local_path):
+ try:
+ os.remove(local_path)
+ except OSError as e:
+ logging.warning(f"Could not remove temporary file {local_path}: {e}")
+
+ # Final report of all failures
+ if failures:
+ # Use assertMultiLineEqual for better diff output on assertion errors
+ failure_details = f"Test failed for {len(failures)} patches:\n" + "\n".join(failures)
+ # This will print the full list if it fails
+ self.assertEqual([], failures, failure_details)
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
+ unittest.main()