From 7f4e330ec2c161116d28531131d256a152408cd1 Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Wed, 15 Apr 2026 14:33:05 +0200 Subject: [PATCH 1/3] add a script to feed fastpath from failed measurements s3 bucket --- .../templates/s3_post_fastpath_feeder.py | 155 ++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100755 ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py diff --git a/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py b/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py new file mode 100755 index 00000000..703baa62 --- /dev/null +++ b/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +List objects in an S3 bucket using boto3. +Configuration is read from environment variables (see defaults below). +""" + +import os +import boto3 +import json +import requests +from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +# Configuration from environment (set these in your shell) +AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") # required if not using IAM role/profile +AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") # required if not using IAM role/profile +ROLE_ARN = os.getenv("ROLE_ARN") +ROLE_SESSION_NAME = os.getenv("ROLE_SESSION_NAME", "assume-role-session") +ROLE_DURATION_SECONDS = int(os.getenv("ROLE_DURATION_SECONDS", "3600")) # optional +AWS_REGION = os.getenv("AWS_REGION", "eu-central-1") +BUCKET_NAME = os.getenv("S3_BUCKET_NAME") # required +PREFIX = os.getenv("S3_PREFIX", "") +MAX_KEYS = int(os.getenv("S3_MAX_KEYS", "1000")) +DEST_ROOT = os.getenv("DOWNLOAD_ROOT", "./s3-downloads") +FASTPATH_API = os.getenv("FASTPATH_API", "") + +def assume_role_and_get_credentials(role_arn, session_name, duration_seconds=3600): + """ + Assume the given role and return temporary credentials dict: + { aws_access_key_id, aws_secret_access_key, aws_session_token } + """ + # Use provided long-term creds or default chain to call STS + sts_kwargs = {"region_name": AWS_REGION, + "aws_access_key_id": AWS_ACCESS_KEY_ID, + "aws_secret_access_key": AWS_SECRET_ACCESS_KEY, + } + sts_client = boto3.client("sts", **sts_kwargs) + resp = sts_client.assume_role( + RoleArn=role_arn, + RoleSessionName=session_name, + DurationSeconds=duration_seconds + ) + creds = resp["Credentials"] + return { + "aws_access_key_id": creds["AccessKeyId"], + "aws_secret_access_key": creds["SecretAccessKey"], + "aws_session_token": creds["SessionToken"], + } + +def get_s3_client(): + """ + Returns an S3 client. If ROLE_ARN is set, assumes that role first and uses + the temporary credentials. Otherwise uses provided credentials or default chain. + """ + client_kwargs = {"region_name": AWS_REGION} + if ROLE_ARN: + try: + temp = assume_role_and_get_credentials(ROLE_ARN, ROLE_SESSION_NAME, ROLE_DURATION_SECONDS) + client_kwargs.update(temp) + except ClientError as e: + print(f"Error assuming role: {e.response.get('Error', {}).get('Message')}") + raise + else: + if AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY: + client_kwargs.update({ + "aws_access_key_id": AWS_ACCESS_KEY_ID, + "aws_secret_access_key": AWS_SECRET_ACCESS_KEY, + }) + if AWS_SESSION_TOKEN: + client_kwargs["aws_session_token"] = AWS_SESSION_TOKEN + return boto3.client("s3", **client_kwargs) + +def walk(s3, bucket_name, start_prefix=''): + """ + Generator like os.walk: + yields (prefix, subprefixes, objects) + - prefix: current prefix ('' or ending with '/') + - subprefixes: list of child prefixes (each ends with '/') + - objects: list of object keys directly under this prefix (no trailing '/') + """ + paginator = s3.get_paginator("list_objects_v2") + page_iter = paginator.paginate(Bucket=bucket_name, Prefix=start_prefix, Delimiter='/') + subprefixes = [] + objects = [] + for page in page_iter: + subprefixes.extend([cp["Prefix"] for cp in page.get("CommonPrefixes", [])]) + for obj in page.get("Contents", []): + key = obj["Key"] + if key == start_prefix: + continue + objects.append(key) + yield start_prefix, subprefixes, objects + for sub in subprefixes: + yield from walk(s3, bucket_name, sub) + +def safe_local_path(prefix, key): + # turn S3 key into a local path under DEST_ROOT preserving prefix structure + rel = key[len(prefix):] if prefix and key.startswith(prefix) else key + return os.path.join(DEST_ROOT, prefix.replace('/', os.sep), rel.replace('/', os.sep)) + +def ensure_parent(path): + os.makedirs(os.path.dirname(path), exist_ok=True) + +def process_postcan(s3, bucket, key, local_path): + try: + print("Downloading", key) + s3.download_file(bucket, key, local_path) + p = Path(local_path) + msmt_id = p.stem + with p.open("r", encoding="utf-8") as f: + data = json.load(f) + assert data['format'] == 'json' + content = data.get('content') + endpoint = f"{FASTPATH_API}/{msmt_id}" + try: + resp = requests.post(endpoint, json=content, timeout=30) + resp.raise_for_status() + except requests.RequestException: + raise + assert resp.status_code == 200 + assert resp.content == b"" + # XXX: remove file from s3 if everything went OK + return key, None + except Exception as e: + try: + if os.path.exists(local_path): + os.remove(local_path) + except Exception as remove_err: + return key, f"remove-failed: {remove_err}; download-failed: {e}" + return key, str(e) + +def main(): + if not BUCKET_NAME: + print("S3_BUCKET_NAME environment variable is required.") + return + s3 = get_s3_client() + for prefix, subs, objs in walk(s3, BUCKET_NAME, ""): + print(f"PREFIX: {prefix} subdirs={len(subs)} objects={len(objs)}") + with ThreadPoolExecutor(max_workers=50) as _exe: + futures = [] + for key in objs: + local_path = safe_local_path(prefix, key) + ensure_parent(local_path) + futures.append(_exe.submit(process_postcan, s3, BUCKET_NAME, key, local_path)) + + for fut in as_completed(futures): + key, err = fut.result() + if err: + print(f"Failed to process {key}: {err}") + else: + print(f"Submitted {key} to fastpath") + +if __name__ == "__main__": + main() From 252cacbf8ebfff5838a9cf0e3128e49b07d01d50 Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Fri, 17 Apr 2026 10:12:38 +0200 Subject: [PATCH 2/3] stream file from s3; add --dry-run use requests.Session, limit parallelism --- .../templates/s3_post_fastpath_feeder.py | 91 ++++++++----------- 1 file changed, 40 insertions(+), 51 deletions(-) diff --git a/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py b/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py index 703baa62..957aa812 100755 --- a/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py +++ b/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py @@ -4,13 +4,14 @@ Configuration is read from environment variables (see defaults below). """ -import os +import argparse import boto3 -import json +import os import requests + +from pathlib import Path from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError from concurrent.futures import ThreadPoolExecutor, as_completed -from pathlib import Path # Configuration from environment (set these in your shell) AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") # required if not using IAM role/profile @@ -21,9 +22,12 @@ AWS_REGION = os.getenv("AWS_REGION", "eu-central-1") BUCKET_NAME = os.getenv("S3_BUCKET_NAME") # required PREFIX = os.getenv("S3_PREFIX", "") -MAX_KEYS = int(os.getenv("S3_MAX_KEYS", "1000")) -DEST_ROOT = os.getenv("DOWNLOAD_ROOT", "./s3-downloads") FASTPATH_API = os.getenv("FASTPATH_API", "") +NUM_WORKERS = int(os.getenv("NUM_WORKERS", "4")) + +parser = argparse.ArgumentParser(description="List/process S3 objects") +parser.add_argument("--dry-run", action="store_true", help="List objects and print POSTs without downloading or sending them") +args = parser.parse_args() def assume_role_and_get_credentials(role_arn, session_name, duration_seconds=3600): """ @@ -71,7 +75,7 @@ def get_s3_client(): client_kwargs["aws_session_token"] = AWS_SESSION_TOKEN return boto3.client("s3", **client_kwargs) -def walk(s3, bucket_name, start_prefix=''): +def walk(s3, client, bucket_name, start_prefix=''): """ Generator like os.walk: yields (prefix, subprefixes, objects) @@ -92,42 +96,29 @@ def walk(s3, bucket_name, start_prefix=''): objects.append(key) yield start_prefix, subprefixes, objects for sub in subprefixes: - yield from walk(s3, bucket_name, sub) + yield from walk(s3, client, bucket_name, sub) -def safe_local_path(prefix, key): - # turn S3 key into a local path under DEST_ROOT preserving prefix structure - rel = key[len(prefix):] if prefix and key.startswith(prefix) else key - return os.path.join(DEST_ROOT, prefix.replace('/', os.sep), rel.replace('/', os.sep)) - -def ensure_parent(path): - os.makedirs(os.path.dirname(path), exist_ok=True) - -def process_postcan(s3, bucket, key, local_path): +def process_postcan(s3, client, bucket, key): try: - print("Downloading", key) - s3.download_file(bucket, key, local_path) - p = Path(local_path) + p = Path(key) msmt_id = p.stem - with p.open("r", encoding="utf-8") as f: - data = json.load(f) - assert data['format'] == 'json' - content = data.get('content') - endpoint = f"{FASTPATH_API}/{msmt_id}" - try: - resp = requests.post(endpoint, json=content, timeout=30) - resp.raise_for_status() - except requests.RequestException: - raise - assert resp.status_code == 200 - assert resp.content == b"" + print(f"msmt_id: {p.stem}") + endpoint = f"{FASTPATH_API}/{msmt_id}" + + if args.dry_run: + print(f"DRY RUN: s3://{bucket}/{key} -> {endpoint}") + return key, None + + print(f"SEND: s3://{bucket}/{key} -> {endpoint}") + resp_obj = s3.get_object(Bucket=bucket, Key=key) + body = resp_obj["Body"] + headers = {"Content-Type": "application/octet-stream"} + r = client.post(endpoint, data=body.iter_chunks(chunk_size=16 * 1024), headers=headers, timeout=60) + with r: + r.raise_for_status() # XXX: remove file from s3 if everything went OK return key, None except Exception as e: - try: - if os.path.exists(local_path): - os.remove(local_path) - except Exception as remove_err: - return key, f"remove-failed: {remove_err}; download-failed: {e}" return key, str(e) def main(): @@ -135,21 +126,19 @@ def main(): print("S3_BUCKET_NAME environment variable is required.") return s3 = get_s3_client() - for prefix, subs, objs in walk(s3, BUCKET_NAME, ""): - print(f"PREFIX: {prefix} subdirs={len(subs)} objects={len(objs)}") - with ThreadPoolExecutor(max_workers=50) as _exe: - futures = [] - for key in objs: - local_path = safe_local_path(prefix, key) - ensure_parent(local_path) - futures.append(_exe.submit(process_postcan, s3, BUCKET_NAME, key, local_path)) - - for fut in as_completed(futures): - key, err = fut.result() - if err: - print(f"Failed to process {key}: {err}") - else: - print(f"Submitted {key} to fastpath") + with requests.Session() as client: + for prefix, subs, objs in walk(s3, client, BUCKET_NAME, ""): + print(f"PREFIX: {prefix} subdirs={len(subs)} objects={len(objs)}") + with ThreadPoolExecutor(max_workers=NUM_WORKERS) as _exe: + futures = [] + for key in objs: + futures.append(_exe.submit(process_postcan, s3, client, BUCKET_NAME, key)) + for fut in as_completed(futures): + key, err = fut.result() + if err: + print(f"Failed to process {key}: {err}") + else: + print(f"Submitted {key} to fastpath") if __name__ == "__main__": main() From 2224cc7a87e4aeda6ce0af5e5c138db81d593db5 Mon Sep 17 00:00:00 2001 From: Aaron Gibson Date: Fri, 17 Apr 2026 12:44:27 +0200 Subject: [PATCH 3/3] omit ThreadPoolExecutor --- .../templates/s3_post_fastpath_feeder.py | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py b/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py index 957aa812..af9833d0 100755 --- a/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py +++ b/ansible/roles/fastpath/templates/s3_post_fastpath_feeder.py @@ -11,7 +11,6 @@ from pathlib import Path from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError -from concurrent.futures import ThreadPoolExecutor, as_completed # Configuration from environment (set these in your shell) AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") # required if not using IAM role/profile @@ -23,7 +22,6 @@ BUCKET_NAME = os.getenv("S3_BUCKET_NAME") # required PREFIX = os.getenv("S3_PREFIX", "") FASTPATH_API = os.getenv("FASTPATH_API", "") -NUM_WORKERS = int(os.getenv("NUM_WORKERS", "4")) parser = argparse.ArgumentParser(description="List/process S3 objects") parser.add_argument("--dry-run", action="store_true", help="List objects and print POSTs without downloading or sending them") @@ -71,8 +69,6 @@ def get_s3_client(): "aws_access_key_id": AWS_ACCESS_KEY_ID, "aws_secret_access_key": AWS_SECRET_ACCESS_KEY, }) - if AWS_SESSION_TOKEN: - client_kwargs["aws_session_token"] = AWS_SESSION_TOKEN return boto3.client("s3", **client_kwargs) def walk(s3, client, bucket_name, start_prefix=''): @@ -129,16 +125,12 @@ def main(): with requests.Session() as client: for prefix, subs, objs in walk(s3, client, BUCKET_NAME, ""): print(f"PREFIX: {prefix} subdirs={len(subs)} objects={len(objs)}") - with ThreadPoolExecutor(max_workers=NUM_WORKERS) as _exe: - futures = [] - for key in objs: - futures.append(_exe.submit(process_postcan, s3, client, BUCKET_NAME, key)) - for fut in as_completed(futures): - key, err = fut.result() - if err: - print(f"Failed to process {key}: {err}") - else: - print(f"Submitted {key} to fastpath") + for key in objs: + key, err = process_postcan(s3, client, BUCKET_NAME, key) + if err: + print(f"Failed to process {key}: {err}") + else: + print(f"Submitted {key} to fastpath") if __name__ == "__main__": main()