From b062fb2be241a1b764e6b1ef660469da6564f339 Mon Sep 17 00:00:00 2001 From: Alexander McKim Date: Tue, 21 Apr 2026 12:59:00 -0600 Subject: [PATCH 1/3] Replace future/furr parallelism with BiocParallel --- R/data_curation.R | 45 +++++++++++------------------- R/data_processing.R | 68 ++++++++++++++------------------------------- 2 files changed, 37 insertions(+), 76 deletions(-) diff --git a/R/data_curation.R b/R/data_curation.R index b1f1af5..3ca874d 100644 --- a/R/data_curation.R +++ b/R/data_curation.R @@ -406,6 +406,7 @@ } # Count + #TODO: New docker containers are spun up twice once for count and data. Thats a lot of overheard, can be fixed count_cmd <- paste0( "docker run --rm ", image, " p3-all-genomes --in ", @@ -795,23 +796,10 @@ retrieveMetadata <- function(user_bacs, genome_batches <- split(genome_ids, ceiling(seq_along(genome_ids) / batch_size)) n_cores <- max(1L, parallel::detectCores(logical = TRUE) - 1L) - cluster <- parallel::makeCluster(n_cores) - on.exit(parallel::stopCluster(cluster), add = TRUE) - - parallel::clusterExport( - cluster, - varlist = c( - ".extractAMRtable", ".extractGenomeData", - "abx_filter", "drug_fields", - "filter_type", "amr_fields", "microtrait_fields", - "base_dir", "image" - ), - envir = environment() - ) - parallel::clusterEvalQ(cluster, { library(tibble); library(dplyr) }) + param <- BiocParallel::SnowParam(workers = n_cores) if (isTRUE(verbose)) message("Retrieving AMR phenotype data in batches.") - batch_drug_data <- parallel::parLapply(cluster, genome_batches, function(batch) { + batch_drug_data <- BiocParallel::bplapply(genome_batches, function(batch) { .extractAMRtable( base_dir = base_dir, batch_genome_IDs = batch, @@ -820,7 +808,7 @@ retrieveMetadata <- function(user_bacs, image = image, verbose = FALSE ) - }) + }, BPPARAM = param) combined_drug_data <- unlist(batch_drug_data, use.names = FALSE) if (length(combined_drug_data) == 0) { message("No drug data returned."); return(NULL) } @@ -835,7 +823,7 @@ retrieveMetadata <- function(user_bacs, dplyr::mutate(`genome_drug.genome_id` = as.character(`genome_drug.genome_id`)) if (isTRUE(verbose)) message("Retrieving genome metadata in batches.") - batch_genome_data <- parallel::parLapply(cluster, genome_batches, function(batch) { + batch_genome_data <- BiocParallel::bplapply(genome_batches, function(batch) { .extractGenomeData( base_dir = base_dir, batch_genome_IDs = batch, @@ -845,7 +833,7 @@ retrieveMetadata <- function(user_bacs, image = image, verbose = FALSE ) - }) + }, BPPARAM = param) combined_genome_data <- unlist(batch_genome_data, use.names = FALSE) if (length(combined_genome_data) == 0) { message("No genome data returned."); return(NULL) } @@ -1332,9 +1320,9 @@ retrieveGenomes <- function(base_dir = ".", # FTP method -- preferred if server is available if (identical(method, "ftp")) { if (isTRUE(verbose)) message("Trying FTPS download. Workers=", ftp_workers) - future::plan(future::multisession, workers = max(1, ftp_workers)) - ft_ok <- future.apply::future_lapply(ids, function(gid) .ftp_download_one(gid, genome_path), - future.seed = TRUE) + ftp_param <- BiocParallel::SnowParam(workers = max(1L, ftp_workers)) + ft_ok <- BiocParallel::bplapply(ids, function(gid) .ftp_download_one(gid, genome_path), + BPPARAM = ftp_param) ok_ids <- ids[unlist(ft_ok)] if (isTRUE(verbose)) message("Complete file sets for ", length(ok_ids), " genomes (FTP).") return(c(ok_ids, .list_complete(genome_path, setdiff(ids, ok_ids)))) @@ -1346,22 +1334,22 @@ retrieveGenomes <- function(base_dir = ".", # Parallel chunk containers if (isTRUE(verbose)) message("CLI being run in parallel for ", length(chunks), " data chunks.") - future::plan(future::multisession, workers = max(1, cli_fasta_workers)) - fa_res <- future.apply::future_mapply( + fasta_param <- BiocParallel::SnowParam(workers = max(1L, cli_fasta_workers)) + fa_res <- BiocParallel::bpmapply( FUN = function(vec, tag) .cli_dump_fastas_gto_chunk(image, genome_path, vec, tag), vec = chunks, tag = paste0("fa", seq_along(chunks)), - SIMPLIFY = TRUE, future.seed = TRUE + SIMPLIFY = TRUE, BPPARAM = fasta_param ) if (!all(fa_res) && isTRUE(verbose)) warning(sum(!fa_res), " data chunks failed.") # GFF extraction in parallel containers if (isTRUE(verbose)) message("GFF extraction being run in parallel for ", length(chunks), " data chunks.") - future::plan(future::multisession, workers = max(1, cli_gff_workers)) - g_res <- future.apply::future_mapply( + gff_param <- BiocParallel::SnowParam(workers = max(1L, cli_gff_workers)) + g_res <- BiocParallel::bpmapply( FUN = function(vec, tag) .cli_export_gff_chunk(image, genome_path, vec, tag), vec = chunks, tag = paste0("gff", seq_along(chunks)), - SIMPLIFY = TRUE, future.seed = TRUE + SIMPLIFY = TRUE, BPPARAM = gff_param ) if (!all(g_res) && isTRUE(verbose)) warning(sum(!g_res), " GFF chunks had failures.") @@ -1394,7 +1382,7 @@ genomeList <- function(base_dir = ".", bug_dir <- dirname(db_path) genome_path <- file.path(bug_dir, "genomes") - files_all <- list.files(genome_path, full.names = TRUE) + files_all <- sort(list.files(genome_path, full.names = TRUE)) files_all <- files_all[file.info(files_all)$size > 100] # Separate by type @@ -1488,7 +1476,6 @@ prepareGenomes <- function(user_bacs, if (isTRUE(verbose)) message("Step 1: Downloading genomes from BV-BRC") - # Filter + download ids <- retrieveGenomes( base_dir = base_dir, user_bacs = user_bacs, diff --git a/R/data_processing.R b/R/data_processing.R index 78ab404..7e22780 100644 --- a/R/data_processing.R +++ b/R/data_processing.R @@ -121,30 +121,6 @@ } -#' Temporarily set a future plan for parallel execution -#' -#' Sets a `future` plan (sequential or multisession) for the duration of a block -#' and automatically restores the previous plan on exit. -#' -#' @param workers Integer. Number of workers to use; if <= 1, uses sequential mode. -#' @param plan Character. Either `"multisession"` or `"sequential"`. -#' -#' @return Invisibly returns `TRUE` after setting the plan. -#' -#' @keywords internal -.with_future_plan <- function(workers, plan = c("multisession", "sequential")) { - plan <- match.arg(plan) - old_plan <- future::plan() - on.exit(future::plan(old_plan), add = TRUE) - - if (is.null(workers) || workers <= 1L || identical(plan, "sequential")) { - future::plan(future::sequential) - } else { - future::plan(future::multisession, workers = workers) - } - invisible(TRUE) -} - #' Run Panaroo for Pangenome Analysis in Parallel Batches #' @@ -189,7 +165,7 @@ } output_path <- normalizePath(output_path) - genome_query_output <- DBI::dbReadTable(con, "files") + genome_query_output <- DBI::dbGetQuery(con, "SELECT * FROM files ORDER BY genome_id") panaroo_input_files <- genome_query_output |> dplyr::pull(panaroo_input) @@ -199,16 +175,15 @@ split_files <- strsplit(panaroo_input_files, " ") - # Plan for filtering - .with_future_plan(workers = threads) - valid_entries <- furrr::future_map(split_files, function(paths) { + param <- BiocParallel::SnowParam(workers = max(1L, threads)) + valid_entries <- BiocParallel::bplapply(split_files, function(paths) { gff_file <- paths[1] if (file.exists(gff_file)) { length(readLines(gff_file, n = 5, warn = FALSE)) >= 5 } else { FALSE } - }) + }, BPPARAM = param) filtered_panaroo_input <- sapply(split_files[unlist(valid_entries)], paste, collapse = " ") @@ -225,20 +200,21 @@ # Ensure sum of per-job CPUs does not exceed `threads` panaroo_threads_per_job <- max(1L, floor(threads / n_jobs)) - # One worker per batch - .with_future_plan(workers = n_jobs) - batch_panaroo_run <- furrr::future_map( + param <- BiocParallel::SnowParam(workers = max(1L, n_jobs)) + batch_panaroo_run <- BiocParallel::bplapply( panaroo_batches, - ~ .processPanaroo( - batch_input = .x, - output_path = output_path, - core_threshold = core_threshold, - len_dif_percent = len_dif_percent, - cluster_threshold = cluster_threshold, - family_seq_identity = family_seq_identity, - panaroo_threads_per_job = panaroo_threads_per_job - ), - .options = furrr::furrr_options(seed = TRUE) + function(batch) { + .processPanaroo( + batch_input = batch, + output_path = output_path, + core_threshold = core_threshold, + len_dif_percent = len_dif_percent, + cluster_threshold = cluster_threshold, + family_seq_identity = family_seq_identity, + panaroo_threads_per_job = panaroo_threads_per_job + ) + }, + BPPARAM = param ) invisible(batch_panaroo_run) @@ -500,7 +476,7 @@ con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - genome_query_output <- DBI::dbReadTable(con, "files") + genome_query_output <- DBI::dbGetQuery(con, "SELECT * FROM files ORDER BY genome_id") cdhit_input_files <- genome_query_output |> dplyr::filter(dplyr::if_all(dplyr::everything(), ~ . != "NA")) |> @@ -1153,9 +1129,7 @@ domainFromIPR <- function(duckdb_path, cpu_per_container )) - .with_future_plan(workers = 1) - - results <- future.apply::future_lapply(seq_along(chunks), function(i) { + results <- BiocParallel::bplapply(seq_along(chunks), function(i) { res <- try( .process_chunk( chunk = chunks[[i]], @@ -1175,7 +1149,7 @@ domainFromIPR <- function(duckdb_path, return(NULL) } res - }) + }, BPPARAM = BiocParallel::SerialParam()) # Combine results tsvs <- Filter(function(x) !is.null(x) && file.exists(x), results) From 997c58aa10892c706172947cf1d9b179ac3990a9 Mon Sep 17 00:00:00 2001 From: amcim Date: Tue, 21 Apr 2026 19:04:00 +0000 Subject: [PATCH 2/3] Style code (GHA) --- R/data_curation.R | 425 ++++++++++++++++++++++++++------------------ R/data_processing.R | 373 +++++++++++++++++++++----------------- vignettes/intro.Rmd | 123 +++++++------ 3 files changed, 522 insertions(+), 399 deletions(-) diff --git a/R/data_curation.R b/R/data_curation.R index 3ca874d..cf41093 100644 --- a/R/data_curation.R +++ b/R/data_curation.R @@ -45,8 +45,10 @@ } # Parse - df <- utils::read.table(text = raw_data, sep = "\t", header = TRUE, fill = TRUE, - quote = "", check.names = FALSE, comment.char = "") + df <- utils::read.table( + text = raw_data, sep = "\t", header = TRUE, fill = TRUE, + quote = "", check.names = FALSE, comment.char = "" + ) df <- tibble::as_tibble(df) |> dplyr::mutate(dplyr::across(dplyr::everything(), ~ iconv(.x, from = "", to = "UTF-8", sub = ""))) @@ -61,7 +63,7 @@ # Make sure the BV-BRC metadata live where they're supposed to .ensure_bvbrc_cache <- function(base_dir = ".", verbose = TRUE, - cache_rel = file.path("data", "bvbrc", "bvbrcData.duckdb"), + cache_rel = file.path("data", "bvbrc", "bvbrcData.duckdb"), cache_table = "bvbrc_bac_data") { base_dir <- normalizePath(base_dir, mustWork = FALSE) cache_db <- file.path(base_dir, cache_rel) @@ -115,12 +117,12 @@ base_dir <- normalizePath(base_dir, mustWork = FALSE) data_dir <- file.path(base_dir, "data") bvbrc_dir <- file.path(data_dir, "bvbrc") - logs_dir <- file.path(data_dir, "logs") + logs_dir <- file.path(data_dir, "logs") dir.create(bvbrc_dir, recursive = TRUE, showWarnings = FALSE) - dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) + dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) - db_path <- file.path(bvbrc_dir, "bvbrcData.duckdb") + db_path <- file.path(bvbrc_dir, "bvbrcData.duckdb") table_name <- "bvbrc_bac_data" meta_table <- "__meta" @@ -128,10 +130,10 @@ DBI::dbExecute( con, - glue::glue('CREATE TABLE IF NOT EXISTS {meta_table} ( + glue::glue("CREATE TABLE IF NOT EXISTS {meta_table} ( table_name TEXT PRIMARY KEY, last_updated TIMESTAMP - )') + )") ) # Tiny update check helper @@ -139,12 +141,14 @@ res <- tryCatch( DBI::dbGetQuery( con, - glue::glue('SELECT last_updated FROM {meta_table} - WHERE table_name = {DBI::dbQuoteString(con, table_name)}') + glue::glue("SELECT last_updated FROM {meta_table} + WHERE table_name = {DBI::dbQuoteString(con, table_name)}") ), error = function(e) NULL ) - if (is.null(res) || nrow(res) == 0L) return(NA) + if (is.null(res) || nrow(res) == 0L) { + return(NA) + } as.POSIXct(res$last_updated[[1]], origin = "1970-01-01", tz = "UTC") } @@ -268,7 +272,6 @@ } - #' Resolve `query value` from `user_bacs` for [.getGenomeIDs()] #' #' If query_value is NULL, derive it from user_bacs based on query_type. @@ -277,7 +280,9 @@ #' If nothing suitable is found, throw a fit and an error. #' @keywords internal .resolveQueryValue <- function(query_type, query_value, user_bacs) { - if (!is.null(query_value) && nzchar(query_value)) return(query_value) + if (!is.null(query_value) && nzchar(query_value)) { + return(query_value) + } if (missing(user_bacs) || length(user_bacs) == 0) { stop("Provide query_value or user_bacs for the selected query_type.") } @@ -359,7 +364,7 @@ stringr::str_replace_all("\\s+", "_") |> stringr::str_replace_all("[^A-Za-z0-9._-]", "") - db_dir <- file.path(data_dir, bug_dirname) + db_dir <- file.path(data_dir, bug_dirname) dir.create(db_dir, recursive = TRUE, showWarnings = FALSE) db_file <- paste0(.generateDBname(user_bacs), ".duckdb") @@ -397,16 +402,15 @@ overwrite = FALSE, image = "danylmb/bvbrc:5.3", verbose = TRUE) { - - query_type <- match.arg(query_type) + query_type <- match.arg(query_type) query_value <- .resolveQueryValue(query_type, query_value, user_bacs) - + if (isTRUE(verbose)) { message("Querying BV-BRC: ", query_type, " == ", query_value) } - + # Count - #TODO: New docker containers are spun up twice once for count and data. Thats a lot of overheard, can be fixed + # TODO: New docker containers are spun up twice once for count and data. Thats a lot of overheard, can be fixed count_cmd <- paste0( "docker run --rm ", image, " p3-all-genomes --in ", @@ -415,10 +419,10 @@ " --in genome_status,WGS,Complete", " --count" ) - count_lines <- tryCatch(system(count_cmd, intern = TRUE), error = function(e) character()) + count_lines <- tryCatch(system(count_cmd, intern = TRUE), error = function(e) character()) count_result <- suppressWarnings(as.integer(if (length(count_lines) >= 2) count_lines[2] else NA_integer_)) if (isTRUE(verbose) && !is.na(count_result)) message("Count returned: ", count_result) - + # Details data_cmd <- paste0( "docker run --rm ", image, @@ -430,7 +434,7 @@ ) data_raw <- tryCatch(system(data_cmd, intern = TRUE), error = function(e) character()) if (length(data_raw) == 0L) stop("BV-BRC returned no data for: ", query_type, " = ", query_value) - + data_result <- tibble::as_tibble( utils::read.table( text = data_raw, sep = "\t", header = TRUE, fill = TRUE, @@ -444,16 +448,16 @@ `genome.species` = as.character(`genome.species`), `genome.strain` = as.character(`genome.strain`) ) - + # Per-bug DB path - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) db_path <- paths$db_path - + con <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path) DBI::dbWriteTable(con, "bac_data", data_result, overwrite = TRUE) - + if (isTRUE(verbose)) message("Wrote table 'bac_data' to: ", db_path) - + list(count_result = count_result, duckdbConnection = con, table_name = "bac_data") } @@ -475,28 +479,28 @@ overwrite = FALSE, verbose = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) - + if (isTRUE(verbose)) message("Resolving input taxa.") bac_input_data <- .retrieveCustomQuery(base_dir = base_dir, user_bacs = user_bacs) - + if (is.null(bac_input_data) || nrow(bac_input_data) == 0) { message("No valid input provided or no matches found.") return(NULL) } - + # Resolve per-bug DB path - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) db_path <- paths$db_path - - bac_data <- tibble::tibble() + + bac_data <- tibble::tibble() taxon_ids <- unique(bac_input_data$genome.taxon_id) - + if (isTRUE(verbose)) message("Querying BV-BRC for ", length(taxon_ids), " taxon IDs.") - + for (i in seq_along(taxon_ids)) { tax <- taxon_ids[i] if (isTRUE(verbose)) message("Taxon ", i, "/", length(taxon_ids), ": ", tax) - + res <- .getGenomeIDs( base_dir = base_dir, query_type = "taxon_id", @@ -505,22 +509,22 @@ overwrite = TRUE, verbose = verbose ) - + con <- res$duckdbConnection tbl <- res$table_name each_bac_data <- tibble::as_tibble(DBI::dbReadTable(con, tbl)) bac_data <- dplyr::bind_rows(bac_data, each_bac_data) - + # Close per-iteration connection to avoid open handles piling up try(DBI::dbDisconnect(con, shutdown = TRUE), silent = TRUE) } - + if (nrow(bac_data) > 0) { genome_ids <- bac_data |> dplyr::distinct(`genome.genome_id`) |> dplyr::pull(`genome.genome_id`) genome_ids <- genome_ids[!is.na(genome_ids)] - + if (length(genome_ids) > 0) { if (isTRUE(verbose)) message("Collected ", length(genome_ids), " distinct genome IDs.") return(genome_ids) @@ -579,18 +583,18 @@ verbose = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) data_dir <- file.path(base_dir, "data") - tmp_dir <- file.path(data_dir, "tmp") + tmp_dir <- file.path(data_dir, "tmp") dir.create(tmp_dir, recursive = TRUE, showWarnings = FALSE) - + if (isTRUE(verbose)) { message("Preparing AMR query input for ", length(batch_genome_IDs), " genomes.") } - + docker_path <- Sys.which("docker") if (!nzchar(docker_path)) { stop("Docker is not available on your PATH but is required.") } - + # Generate genome list with p3-echo echo_args <- c( "run", "--rm", @@ -600,12 +604,12 @@ genome_ids_output <- suppressWarnings( system2("docker", args = echo_args, stdout = TRUE, stderr = TRUE) ) - + # Write a temporary file in data/tmp/ tmp_in <- tempfile(tmpdir = tmp_dir, pattern = "genome_drug_ids_", fileext = ".tsv") writeLines(genome_ids_output, con = tmp_in) tmp_in_mounted <- file.path("/data", "tmp", basename(tmp_in)) - + # Allow abx_filter to be a single string with spaces OR a vector of args abx_args <- if (length(abx_filter) == 1L) { @@ -613,7 +617,7 @@ } else { abx_filter } - + # Query drug data drug_args <- c( "run", "--rm", @@ -623,15 +627,15 @@ abx_args, "--attr", drug_fields ) - + if (isTRUE(verbose)) { message("Running AMR query.") } drug_data <- suppressWarnings(system2("docker", args = drug_args, stdout = TRUE, stderr = TRUE)) - + # Clean up after yourself try(unlink(tmp_in), silent = TRUE) - + return(drug_data) } @@ -656,18 +660,18 @@ verbose = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) data_dir <- file.path(base_dir, "data") - tmp_dir <- file.path(data_dir, "tmp") + tmp_dir <- file.path(data_dir, "tmp") dir.create(tmp_dir, recursive = TRUE, showWarnings = FALSE) - + if (isTRUE(verbose)) { message("Preparing genome metadata input for ", length(batch_genome_IDs), " genomes.") } - + docker_path <- Sys.which("docker") if (!nzchar(docker_path)) { stop("Docker is not available on your PATH but is required.") } - + # Generate genome list with p3-echo echo_args <- c( "run", "--rm", @@ -677,14 +681,14 @@ genome_ids_output <- suppressWarnings( system2("docker", args = echo_args, stdout = TRUE, stderr = TRUE) ) - + tmp_in <- tempfile(tmpdir = tmp_dir, pattern = "genome_ids_", fileext = ".tsv") writeLines(genome_ids_output, con = tmp_in) tmp_in_mounted <- file.path("/data", "tmp", basename(tmp_in)) - + # Choose attributes (AMR for this pipeline) chosen_fields <- if (identical(filter_type, "AMR")) amr_fields else microtrait_fields - + get_args <- c( "run", "--rm", "-v", paste0(data_dir, ":/data"), @@ -692,15 +696,15 @@ "--input", tmp_in_mounted, "--attr", chosen_fields ) - + if (isTRUE(verbose)) { message("Running genome metadata query.") } genome_data <- suppressWarnings(system2("docker", args = get_args, stdout = TRUE, stderr = TRUE)) - + # Cleaning up try(unlink(tmp_in), silent = TRUE) - + return(genome_data) } @@ -736,8 +740,10 @@ retrieveMetadata <- function(user_bacs, base_dir <- normalizePath(base_dir, mustWork = FALSE) if (isTRUE(verbose)) message("Resolving genome IDs for user inputs.") - genome_ids <- .retrieveQueryIDs(base_dir = base_dir, user_bacs = user_bacs, - overwrite = overwrite, verbose = verbose) + genome_ids <- .retrieveQueryIDs( + base_dir = base_dir, user_bacs = user_bacs, + overwrite = overwrite, verbose = verbose + ) if (length(genome_ids) == 0) { message("No genome IDs available for the specified inputs.") return(NULL) @@ -754,8 +760,11 @@ retrieveMetadata <- function(user_bacs, "pmid,resistant_phenotype,", "source,taxon_id,testing_standard" ) - abx_filter <- if (identical(abx, "All")) "--required antibiotic" - else paste0("--in antibiotic,", paste(abx, collapse = ",")) + abx_filter <- if (identical(abx, "All")) { + "--required antibiotic" + } else { + paste0("--in antibiotic,", paste(abx, collapse = ",")) + } amr_fields <- paste0( "assembly_accession,assembly_method,", "bioproject_accession,biosample_accession,", @@ -810,7 +819,10 @@ retrieveMetadata <- function(user_bacs, ) }, BPPARAM = param) combined_drug_data <- unlist(batch_drug_data, use.names = FALSE) - if (length(combined_drug_data) == 0) { message("No drug data returned."); return(NULL) } + if (length(combined_drug_data) == 0) { + message("No drug data returned.") + return(NULL) + } combined_drug_data_tbl <- tibble::as_tibble( utils::read.table( @@ -835,7 +847,10 @@ retrieveMetadata <- function(user_bacs, ) }, BPPARAM = param) combined_genome_data <- unlist(batch_genome_data, use.names = FALSE) - if (length(combined_genome_data) == 0) { message("No genome data returned."); return(NULL) } + if (length(combined_genome_data) == 0) { + message("No genome data returned.") + return(NULL) + } combined_genome_data_tbl <- tibble::as_tibble( utils::read.table( @@ -848,13 +863,14 @@ retrieveMetadata <- function(user_bacs, dplyr::mutate(`genome.genome_id` = as.character(`genome.genome_id`)) # Per-bug DB path - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs, overwrite = overwrite) db_path <- paths$db_path logs_dir <- file.path(base_dir, "data", "logs") dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) cat(sprintf("[%s] Writing metadata DuckDB: %s\n", Sys.time(), db_path), - file = file.path(logs_dir, "bvbrc.log"), append = TRUE) + file = file.path(logs_dir, "bvbrc.log"), append = TRUE + ) con <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path) DBI::dbWriteTable(con, "amr_phenotype", combined_drug_data_tbl, overwrite = TRUE) @@ -880,10 +896,14 @@ retrieveMetadata <- function(user_bacs, # FASTA sanitizer to ensure Panaroo compatibility with BV-BRC CLI downloads .strip_fasta_preamble <- function(fna_path) { - if (!file.exists(fna_path)) return(invisible(FALSE)) + if (!file.exists(fna_path)) { + return(invisible(FALSE)) + } txt <- readLines(fna_path, warn = FALSE) first <- which(grepl("^\\s*>", txt))[1] - if (is.na(first)) return(invisible(FALSE)) + if (is.na(first)) { + return(invisible(FALSE)) + } if (first > 1L) { txt <- txt[first:length(txt)] txt[1] <- sub("^\\ufeff", "", txt[1]) @@ -895,14 +915,20 @@ retrieveMetadata <- function(user_bacs, # GFF sanitizer to ensure Panaroo compatibility with BV-BRC CLI downloads .sanitize_gff <- function(gff_path) { - if (!file.exists(gff_path)) return(invisible(FALSE)) + if (!file.exists(gff_path)) { + return(invisible(FALSE)) + } lines <- readLines(gff_path, warn = FALSE) - if (length(lines) == 0L) return(invisible(FALSE)) + if (length(lines) == 0L) { + return(invisible(FALSE)) + } if (!grepl("^##gff-version\\s*3", lines[1])) { lines <- c("##gff-version 3", lines) } out <- vapply(lines, function(line) { - if (grepl("^#", line)) return(line) + if (grepl("^#", line)) { + return(line) + } parts <- strsplit(line, "[\t ]", perl = TRUE)[[1]] if (length(parts) >= 9) { paste(c(parts[1:8], paste(parts[9:length(parts)], collapse = " ")), collapse = "\t") @@ -931,16 +957,19 @@ retrieveMetadata <- function(user_bacs, verbose = TRUE, fallback_to_bvbrc_cache = TRUE) { base_dir <- normalizePath(base_dir, mustWork = FALSE) - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) - db_path <- paths$db_path - + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) + db_path <- paths$db_path + con <- DBI::dbConnect(duckdb::duckdb(), dbdir = db_path) - - on.exit({ - # Keep connection open for caller - NULL - }, add = TRUE) - + + on.exit( + { + # Keep connection open for caller + NULL + }, + add = TRUE + ) + # Good metadata present? Apply AMR filters if ("metadata" %in% DBI::dbListTables(con)) { if (isTRUE(verbose)) message("Loading metadata for filtering.") @@ -950,7 +979,7 @@ retrieveMetadata <- function(user_bacs, message("No data available in 'metadata'.") return(NULL) } - + # Normalize evidence labels initial_metadata <- tibble::as_tibble(initial_metadata) |> dplyr::mutate( @@ -961,13 +990,13 @@ retrieveMetadata <- function(user_bacs, TRUE ~ `genome_drug.evidence` ) ) - + # AMR and quality filtering filtered_metadata <- initial_metadata |> dplyr::filter(`genome_drug.evidence` == "Laboratory Method") |> dplyr::filter(`genome.genome_quality` == "Good") |> dplyr::filter(`genome_drug.resistant_phenotype` %in% c("Resistant", "Susceptible", "Intermediate")) - + DBI::dbWriteTable(con, "filtered", filtered_metadata, overwrite = TRUE) if (isTRUE(verbose)) { message("Post-filter rows: ", nrow(filtered_metadata)) @@ -975,42 +1004,42 @@ retrieveMetadata <- function(user_bacs, } return(list(duckdbConnection = con, table_name = "filtered")) } - + # Attempt BV-BRC cache location if (!isTRUE(fallback_to_bvbrc_cache)) { DBI::dbDisconnect(con, shutdown = TRUE) stop("No 'metadata' table found in ", db_path, ". Run retrieveMetadata() first.") } - + if (isTRUE(verbose)) { message("No 'metadata' in per-selection DB. Falling back to BV-BRC cache at data/bvbrc/.") } - + cache_db <- file.path(base_dir, "data", "bvbrc", "bvbrcData.duckdb") if (!file.exists(cache_db)) { DBI::dbDisconnect(con, shutdown = TRUE) stop("BV-BRC cache not found at: ", cache_db, ". Run .updateBVBRCdata() first.") } - + con_cache <- DBI::dbConnect(duckdb::duckdb(), dbdir = cache_db) on.exit(try(DBI::dbDisconnect(con_cache, shutdown = TRUE), silent = TRUE), add = TRUE) - + if (!"bvbrc_bac_data" %in% DBI::dbListTables(con_cache)) { DBI::dbDisconnect(con, shutdown = TRUE) stop("Table 'bvbrc_bac_data' not found in BV-BRC cache: ", cache_db) } - + bv <- tibble::as_tibble(DBI::dbReadTable(con_cache, "bvbrc_bac_data")) - - + + # Derive matches from user_bacs (taxon IDs or species) sel <- tibble::tibble(`genome.genome_id` = character()) - + for (v in user_bacs) { if (suppressWarnings(!is.na(as.numeric(v)))) { # numeric taxon_id match matches <- bv[bv$`genome.taxon_id` == v | - bv$`genome.taxon_id` == as.numeric(v), , drop = FALSE] + bv$`genome.taxon_id` == as.numeric(v), , drop = FALSE] } else { # species substring (case-insensitive) matches <- bv[stringr::str_detect( @@ -1018,7 +1047,7 @@ retrieveMetadata <- function(user_bacs, stringr::fixed(v, ignore_case = TRUE) ), , drop = FALSE] } - + if (nrow(matches)) { sel <- dplyr::bind_rows( sel, @@ -1029,22 +1058,22 @@ retrieveMetadata <- function(user_bacs, } } sel <- dplyr::distinct(sel) - + if (nrow(sel) == 0L) { DBI::dbDisconnect(con, shutdown = TRUE) stop("No genomes matched user_bacs in BV-BRC cache.") } - + # Minimal 'filtered' for downstream steps (downloaders & genomeList) minimal_filtered <- sel # Ensure expected column name used downstream: # retrieveGenomes() reads "filtered" and expects `genome.genome_id` to exist. DBI::dbWriteTable(con, "filtered", minimal_filtered, overwrite = TRUE) - + if (isTRUE(verbose)) { message("Wrote table 'filtered' to: ", db_path) } - + list(duckdbConnection = con, table_name = "filtered") } @@ -1056,9 +1085,12 @@ retrieveMetadata <- function(user_bacs, # Prefer bash .pick_shell <- function(image) { chk <- suppressWarnings(system2("docker", - c("run", "--rm", image, "sh", "-lc", - "command -v bash >/dev/null || echo NOBASH"), - stdout = TRUE, stderr = TRUE)) + c( + "run", "--rm", image, "sh", "-lc", + "command -v bash >/dev/null || echo NOBASH" + ), + stdout = TRUE, stderr = TRUE + )) if (length(chk) && any(grepl("NOBASH", chk))) "sh" else "bash" } @@ -1105,21 +1137,28 @@ retrieveMetadata <- function(user_bacs, .ftp_download_one <- function(genomeID, out_dir, tries = 3L, min_bytes = 100) { files <- c(".fna", ".PATRIC.faa", ".PATRIC.gff") dests <- file.path(out_dir, paste0(genomeID, files)) - if (.is_complete_set(out_dir, genomeID, min_bytes)) return(TRUE) + if (.is_complete_set(out_dir, genomeID, min_bytes)) { + return(TRUE) + } get_one <- function(url, dest) { if (nzchar(Sys.which("wget"))) { res <- suppressWarnings(system2("wget", - c("-q", "-O", shQuote(dest), shQuote(url)), - stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + c("-q", "-O", shQuote(dest), shQuote(url)), + stdout = TRUE, stderr = TRUE + )) + st <- attr(res, "status") + if (is.null(st)) st <- 0L return(st == 0L && file.exists(dest) && file.info(dest)$size > min_bytes) } else if (nzchar(Sys.which("curl"))) { - curl_args <- if (startsWith(url, "ftps://")) - c("--ssl-reqd", "-L", "-o", shQuote(dest), shQuote(url)) else - c("-L", "-o", shQuote(dest), shQuote(url)) + curl_args <- if (startsWith(url, "ftps://")) { + c("--ssl-reqd", "-L", "-o", shQuote(dest), shQuote(url)) + } else { + c("-L", "-o", shQuote(dest), shQuote(url)) + } res <- suppressWarnings(system2("curl", curl_args, stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + st <- attr(res, "status") + if (is.null(st)) st <- 0L return(st == 0L && file.exists(dest) && file.info(dest)$size > min_bytes) } FALSE @@ -1128,13 +1167,27 @@ retrieveMetadata <- function(user_bacs, for (ext_i in seq_along(files)) { dest <- dests[ext_i] if (file.exists(dest) && file.info(dest)$size > min_bytes) next - ftps <- sprintf("ftps://ftp.bv-brc.org/genomes/%s/%s%s", genomeID, genomeID, files[ext_i]) + ftps <- sprintf("ftps://ftp.bv-brc.org/genomes/%s/%s%s", genomeID, genomeID, files[ext_i]) https <- sprintf("https://ftp.bv-brc.org/genomes/%s/%s%s", genomeID, genomeID, files[ext_i]) ok <- FALSE - for (k in 1:tries) { if (get_one(ftps, dest)) { ok <- TRUE; break } } - if (!ok) for (k in 1:2) { if (get_one(https, dest)) { ok <- TRUE; break } } - if (!ok) return(FALSE) + for (k in 1:tries) { + if (get_one(ftps, dest)) { + ok <- TRUE + break + } + } + if (!ok) { + for (k in 1:2) { + if (get_one(https, dest)) { + ok <- TRUE + break + } + } + } + if (!ok) { + return(FALSE) + } } .is_complete_set(out_dir, genomeID, min_bytes) } @@ -1143,7 +1196,7 @@ retrieveMetadata <- function(user_bacs, # p3-dump-genomes to fetch FASTA and .gto files .cli_dump_fastas_gto_chunk <- function(image, out_dir, genome_ids, tag, tries = 3L) { - out_dir <- normalizePath(out_dir, mustWork = FALSE) + out_dir <- normalizePath(out_dir, mustWork = FALSE) dir.create(out_dir, recursive = TRUE, showWarnings = FALSE) ids_file <- file.path(out_dir, paste0("ids_", tag, ".txt")) writeLines(genome_ids, ids_file) @@ -1152,12 +1205,15 @@ retrieveMetadata <- function(user_bacs, mount <- .docker_path(out_dir) # Safety against Windows-specific CRLF lines before `p3-dump-genomes` - sh_cmd <- sprintf('tr -d "\\r" < /out/%s | p3-dump-genomes --outDir /out --fasta --prot --gto -', - basename(ids_file)) + sh_cmd <- sprintf( + 'tr -d "\\r" < /out/%s | p3-dump-genomes --outDir /out --fasta --prot --gto -', + basename(ids_file) + ) - args <- c("run", "--rm", "-v", paste0(mount, ":/out"), image, shell, "-lc", shQuote(sh_cmd)) + args <- c("run", "--rm", "-v", paste0(mount, ":/out"), image, shell, "-lc", shQuote(sh_cmd)) res <- suppressWarnings(system2("docker", args = args, stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + st <- attr(res, "status") + if (is.null(st)) st <- 0L if (st != 0L && tries > 1L) { Sys.sleep(1) @@ -1185,57 +1241,57 @@ retrieveMetadata <- function(user_bacs, # Export GFF from GTO per genomes in each chunk .cli_export_gff_chunk <- function(image, out_dir, genome_ids, tag, tries = 3L) { - out_dir <- normalizePath(out_dir, mustWork = FALSE) + out_dir <- normalizePath(out_dir, mustWork = FALSE) ids_file <- file.path(out_dir, paste0("ids_", tag, ".txt")) writeLines(genome_ids, ids_file) - exporter <- "/usr/bin/rast-export-genome" - shell <- .pick_shell(image) - mount <- .docker_path(out_dir) + exporter <- "/usr/bin/rast-export-genome" + shell <- .pick_shell(image) + mount <- .docker_path(out_dir) stderr_file <- file.path(out_dir, paste0("gff_chunk_", tag, ".stderr.txt")) # GFFs from BV-BRC .gto export are not directly compatible in Panaroo # This block reformats the contig IDs and ensures .gff/.fna pairs work together sh_cmd <- paste0( - 'set -euo pipefail; ', - 'fail_n=0; : > /out/', basename(stderr_file), '; ', + "set -euo pipefail; ", + "fail_n=0; : > /out/", basename(stderr_file), "; ", 'while IFS= read -r b || [ -n "$b" ]; do ', ' b=${b%$\'\\r\'}; [ -n "$b" ] || continue; ', ' gto="/out/${b}.gto"; gff="/out/${b}.PATRIC.gff"; map="/out/${b}.orig2id.tsv"; ', - ' if [ ! -s "$gto" ]; then echo "MISSING_GTO $b" >>/out/', basename(stderr_file), '; continue; fi; ', + ' if [ ! -s "$gto" ]; then echo "MISSING_GTO $b" >>/out/', basename(stderr_file), "; continue; fi; ", # Export GFF ' [ -s "$gff" ] || ', exporter, ' -i "$gto" -o "$gff" gff ', - ' || { echo "EXPORT_FAIL_GFF $b" >>/out/', basename(stderr_file), '; fail_n=$((fail_n+1)); continue; }; ', + ' || { echo "EXPORT_FAIL_GFF $b" >>/out/', basename(stderr_file), "; fail_n=$((fail_n+1)); continue; }; ", # Build mapping original_id -> id, and default to id if original_id missing - ' if command -v jq >/dev/null 2>&1; then ', + " if command -v jq >/dev/null 2>&1; then ", ' jq -r \'.contigs[] | [(.original_id // .id), .id] | @tsv\' "$gto" > "$map"; ', - ' else ', + " else ", ' python3 - "$gto" > "$map" <<\'PY\'\n', - 'import sys, json\n', - 'g = json.load(open(sys.argv[1]))\n', + "import sys, json\n", + "g = json.load(open(sys.argv[1]))\n", 'for c in g.get("contigs", []):\n', ' o = c.get("original_id") or c.get("id")\n', ' i = c.get("id")\n', - ' if o and i:\n', + " if o and i:\n", ' print(f"{o}\\t{i}")\n', - 'PY\n', - ' fi; ', + "PY\n", + " fi; ", # Relabel GFF sequence IDs: original_id -> id - ' awk \'FNR==NR{m[$1]=$2; next} ', - ' /^##sequence-region/ { if ($2 in m) {$2=m[$2]} print; next } ', - ' /^#/ { print; next } ', + " awk 'FNR==NR{m[$1]=$2; next} ", + " /^##sequence-region/ { if ($2 in m) {$2=m[$2]} print; next } ", + " /^#/ { print; next } ", ' { if ($1 in m) $1=m[$1]; print }\' "$map" "$gff" > "${gff}.tmp" && mv "${gff}.tmp" "$gff"; ', - - 'done < /out/', basename(ids_file), '; ', - 'exit 0' + "done < /out/", basename(ids_file), "; ", + "exit 0" ) args <- c("run", "--rm", "-v", paste0(mount, ":/out"), image, shell, "-lc", shQuote(sh_cmd)) - res <- suppressWarnings(system2("docker", args = args, stdout = TRUE, stderr = TRUE)) - st <- attr(res, "status"); if (is.null(st)) st <- 0L + res <- suppressWarnings(system2("docker", args = args, stdout = TRUE, stderr = TRUE)) + st <- attr(res, "status") + if (is.null(st)) st <- 0L if (st != 0L && tries > 1L) { Sys.sleep(1) @@ -1278,14 +1334,15 @@ retrieveGenomes <- function(base_dir = ".", cli_gff_workers = 4L, chunk_size = 50L, verbose = TRUE) { - - method <- match.arg(method) + method <- match.arg(method) base_dir <- normalizePath(base_dir, mustWork = FALSE) # IDs from .filterGenomes() if (isTRUE(verbose)) message("Filtering genomes before download.") f_out <- .filterGenomes(base_dir = base_dir, user_bacs = user_bacs, verbose = verbose) - if (is.null(f_out)) return(character()) + if (is.null(f_out)) { + return(character()) + } con <- f_out$duckdbConnection tbl <- f_out$table_name @@ -1294,12 +1351,12 @@ retrieveGenomes <- function(base_dir = ".", dplyr::pull(`genome.genome_id`) # Set up the paths and such - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) - bug_dir <- dirname(paths$db_path) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) + bug_dir <- dirname(paths$db_path) genome_path <- file.path(bug_dir, "genomes") - logs_dir <- file.path(base_dir, "data", "logs") + logs_dir <- file.path(base_dir, "data", "logs") dir.create(genome_path, recursive = TRUE, showWarnings = FALSE) - dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) + dir.create(logs_dir, recursive = TRUE, showWarnings = FALSE) # Adding support for resuming a download if (isTRUE(skip_existing)) { @@ -1310,10 +1367,12 @@ retrieveGenomes <- function(base_dir = ".", if (length(ids) == 0L) { if (isTRUE(verbose)) message("All genomes already complete.") - all_complete <- .list_complete(genome_path, - tibble::as_tibble(DBI::dbReadTable(con, tbl)) |> - dplyr::distinct(`genome.genome_id`) |> - dplyr::pull(`genome.genome_id`)) + all_complete <- .list_complete( + genome_path, + tibble::as_tibble(DBI::dbReadTable(con, tbl)) |> + dplyr::distinct(`genome.genome_id`) |> + dplyr::pull(`genome.genome_id`) + ) return(all_complete) } @@ -1322,18 +1381,23 @@ retrieveGenomes <- function(base_dir = ".", if (isTRUE(verbose)) message("Trying FTPS download. Workers=", ftp_workers) ftp_param <- BiocParallel::SnowParam(workers = max(1L, ftp_workers)) ft_ok <- BiocParallel::bplapply(ids, function(gid) .ftp_download_one(gid, genome_path), - BPPARAM = ftp_param) + BPPARAM = ftp_param + ) ok_ids <- ids[unlist(ft_ok)] if (isTRUE(verbose)) message("Complete file sets for ", length(ok_ids), " genomes (FTP).") return(c(ok_ids, .list_complete(genome_path, setdiff(ids, ok_ids)))) } # CLI for FASTA, FAA, and GTO, then GFF from GTO - chunks <- split(ids, ceiling(seq_along(ids)/chunk_size)) + chunks <- split(ids, ceiling(seq_along(ids) / chunk_size)) # Parallel chunk containers - if (isTRUE(verbose)) message("CLI being run in parallel for ", length(chunks), - " data chunks.") + if (isTRUE(verbose)) { + message( + "CLI being run in parallel for ", length(chunks), + " data chunks." + ) + } fasta_param <- BiocParallel::SnowParam(workers = max(1L, cli_fasta_workers)) fa_res <- BiocParallel::bpmapply( FUN = function(vec, tag) .cli_dump_fastas_gto_chunk(image, genome_path, vec, tag), @@ -1343,8 +1407,12 @@ retrieveGenomes <- function(base_dir = ".", if (!all(fa_res) && isTRUE(verbose)) warning(sum(!fa_res), " data chunks failed.") # GFF extraction in parallel containers - if (isTRUE(verbose)) message("GFF extraction being run in parallel for ", - length(chunks), " data chunks.") + if (isTRUE(verbose)) { + message( + "GFF extraction being run in parallel for ", + length(chunks), " data chunks." + ) + } gff_param <- BiocParallel::SnowParam(workers = max(1L, cli_gff_workers)) g_res <- BiocParallel::bpmapply( FUN = function(vec, tag) .cli_export_gff_chunk(image, genome_path, vec, tag), @@ -1355,8 +1423,12 @@ retrieveGenomes <- function(base_dir = ".", # Success set: .fna + .PATRIC.faa + .PATRIC.gff all present per isolate ok_ids <- ids[vapply(ids, .is_complete_set, logical(1), dir = genome_path)] - if (isTRUE(verbose)) message("Complete file sets downloaded for ", - length(ok_ids), " genomes.") + if (isTRUE(verbose)) { + message( + "Complete file sets downloaded for ", + length(ok_ids), " genomes." + ) + } ok_ids } @@ -1375,15 +1447,14 @@ retrieveGenomes <- function(base_dir = ".", genomeList <- function(base_dir = ".", user_bacs, verbose = TRUE) { - base_dir <- normalizePath(base_dir, mustWork = FALSE) - paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) - db_path <- paths$db_path - bug_dir <- dirname(db_path) + paths <- .buildDBpath(base_dir = base_dir, user_bacs = user_bacs) + db_path <- paths$db_path + bug_dir <- dirname(db_path) genome_path <- file.path(bug_dir, "genomes") - files_all <- sort(list.files(genome_path, full.names = TRUE)) - files_all <- files_all[file.info(files_all)$size > 100] + files_all <- sort(list.files(genome_path, full.names = TRUE)) + files_all <- files_all[file.info(files_all)$size > 100] # Separate by type gff_files <- files_all[grepl("\\.PATRIC\\.gff$", files_all)] @@ -1402,14 +1473,18 @@ genomeList <- function(base_dir = ".", faa_path <- file.path(genome_path, paste0(genomeID, ".PATRIC.faa")) data.frame( - genome_id = genomeID, - gff_path = if (file.exists(gff_path) && file.info(gff_path)$size > 100) gff_path else NA, - fna_path = if (file.exists(fna_path) && file.info(fna_path)$size > 100) fna_path else NA, - faa_path = if (file.exists(faa_path) && file.info(faa_path)$size > 100) faa_path else NA, + genome_id = genomeID, + gff_path = if (file.exists(gff_path) && file.info(gff_path)$size > 100) gff_path else NA, + fna_path = if (file.exists(fna_path) && file.info(fna_path)$size > 100) fna_path else NA, + faa_path = if (file.exists(faa_path) && file.info(faa_path)$size > 100) faa_path else NA, panaroo_input = if ( file.exists(gff_path) && file.exists(fna_path) && file.exists(faa_path) && - file.info(gff_path)$size > 100 && file.info(fna_path)$size > 100 && file.info(faa_path)$size > 100 - ) paste(gff_path, fna_path) else NA, + file.info(gff_path)$size > 100 && file.info(fna_path)$size > 100 && file.info(faa_path)$size > 100 + ) { + paste(gff_path, fna_path) + } else { + NA + }, stringsAsFactors = FALSE ) }) @@ -1468,7 +1543,7 @@ prepareGenomes <- function(user_bacs, method = c("ftp", "cli"), overwrite = FALSE, verbose = TRUE) { - method <- match.arg(method) + method <- match.arg(method) base_dir <- normalizePath(base_dir, mustWork = FALSE) # Ensure the BV-BRC metadata cache exists, fetch if missing diff --git a/R/data_processing.R b/R/data_processing.R index 7e22780..ab636d9 100644 --- a/R/data_processing.R +++ b/R/data_processing.R @@ -81,7 +81,7 @@ # Convert to container-visible paths genome_filepath_cont <- .to_container(genome_filepath_host, host_root = mount_host, container_root = mount_cont) - output_dir_cont <- .to_container(output_dir_host, host_root = mount_host, container_root = mount_cont) + output_dir_cont <- .to_container(output_dir_host, host_root = mount_host, container_root = mount_cont) # Run Panaroo in Docker cmd_args <- c( @@ -99,14 +99,17 @@ "--remove-invalid-genes", "--core_threshold", as.character(core_threshold), "--len_dif_percent", as.character(len_dif_percent), - "--threshold", as.character(cluster_threshold), - "-f", as.character(family_seq_identity), - "-t", as.character(panaroo_threads_per_job) + "--threshold", as.character(cluster_threshold), + "-f", as.character(family_seq_identity), + "-t", as.character(panaroo_threads_per_job) ) - res <- tryCatch({ - system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) - }, error = function(e) e) + res <- tryCatch( + { + system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) + }, + error = function(e) e + ) if (inherits(res, "error")) { stop(sprintf("Docker/Panaroo failed to launch: %s", res$message)) @@ -121,7 +124,6 @@ } - #' Run Panaroo for Pangenome Analysis in Parallel Batches #' #' Executes Panaroo inside a Docker container on genome annotation @@ -155,7 +157,6 @@ family_seq_identity = 0.5, threads = 8, split_jobs = FALSE) { - duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) @@ -188,7 +189,7 @@ filtered_panaroo_input <- sapply(split_files[unlist(valid_entries)], paste, collapse = " ") total_lines <- length(filtered_panaroo_input) - batch_size <- if (isTRUE(split_jobs)) ceiling(total_lines / 5) else total_lines + batch_size <- if (isTRUE(split_jobs)) ceiling(total_lines / 5) else total_lines panaroo_batches <- split(filtered_panaroo_input, ceiling(seq_along(filtered_panaroo_input) / batch_size)) n_jobs <- length(panaroo_batches) @@ -241,8 +242,7 @@ len_dif_percent = 0.95, cluster_threshold = 0.95, family_seq_identity = 0.5, - threads = 8){ - + threads = 8) { input_path <- .docker_path(input_path) # Fail fast if Docker is missing @@ -278,9 +278,9 @@ "--merge_paralogs", "--core_threshold", as.character(core_threshold), "--len_dif_percent", as.character(len_dif_percent), - "--threshold", as.character(cluster_threshold), - "-f", as.character(family_seq_identity), - "-t", as.character(threads) + "--threshold", as.character(cluster_threshold), + "-f", as.character(family_seq_identity), + "-t", as.character(threads) ) system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) @@ -290,7 +290,6 @@ } - #' Load Panaroo gene presence/absence table into DuckDB #' #' Reads `gene_presence_absence.csv` and constructs a genome-by-gene count @@ -302,13 +301,13 @@ #' @return A tibble containing the gene count matrix. #' #' @keywords internal -.panaroo2geneTable <- function(panaroo_output_path, duckdb_path){ +.panaroo2geneTable <- function(panaroo_output_path, duckdb_path) { filepath <- file.path(normalizePath(panaroo_output_path), "gene_presence_absence.csv") duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - gene_count <- read.table(filepath, sep=",", header=TRUE, fill=TRUE, quote="") |> + gene_count <- read.table(filepath, sep = ",", header = TRUE, fill = TRUE, quote = "") |> tibble::as_tibble() |> dplyr::select(-c(Non.unique.Gene.name, Annotation)) |> tidyr::pivot_longer(cols = -1) |> @@ -332,13 +331,13 @@ #' @return A tibble with `Gene` and `Annotation` columns. #' #' @keywords internal -.panaroo2geneNames <- function(panaroo_output_path, duckdb_path){ +.panaroo2geneNames <- function(panaroo_output_path, duckdb_path) { filepath <- file.path(normalizePath(panaroo_output_path), "gene_presence_absence.csv") duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - gene_names <- read.table(filepath, sep=",", header=TRUE, fill=TRUE, quote="") |> + gene_names <- read.table(filepath, sep = ",", header = TRUE, fill = TRUE, quote = "") |> tibble::as_tibble() |> dplyr::select(c(Gene, Annotation)) @@ -357,15 +356,15 @@ #' @return A tibble containing the struct matrix. #' #' @keywords internal -.panaroo2StructTable <- function(panaroo_output_path, duckdb_path){ +.panaroo2StructTable <- function(panaroo_output_path, duckdb_path) { struct_filepath <- file.path(normalizePath(panaroo_output_path), "struct_presence_absence.Rtab") duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) - gene_struct <- read.table(struct_filepath, sep="\t", header=TRUE, fill=TRUE, quote="") |> + gene_struct <- read.table(struct_filepath, sep = "\t", header = TRUE, fill = TRUE, quote = "") |> tibble::as_tibble() |> - tidyr::pivot_longer(cols= -1) |> + tidyr::pivot_longer(cols = -1) |> tidyr::pivot_wider(names_from = Gene, values_from = value) |> dplyr::rename("genome_id" = "name") |> dplyr::mutate(genome_id = stringr::str_replace_all(genome_id, c("^X" = "", "\\.PATRIC$" = ""))) @@ -385,7 +384,7 @@ #' @return Invisibly returns TRUE. #' #' @keywords internal -.panaroo2OtherTables <- function(panaroo_output_path, duckdb_path){ +.panaroo2OtherTables <- function(panaroo_output_path, duckdb_path) { panaroo_output_path <- normalizePath(panaroo_output_path) duckdb_path <- normalizePath(duckdb_path) fasta_filepath <- file.path(panaroo_output_path, "pan_genome_reference.fa") @@ -394,15 +393,19 @@ gene_fasta <- Biostrings::readDNAStringSet(filepath = fasta_filepath) DBI::dbWriteTable(con, "gene_ref_seq", - tibble::tibble(name = names(gene_fasta), - sequence = as.character(gene_fasta)), - overwrite = TRUE) + tibble::tibble( + name = names(gene_fasta), + sequence = as.character(gene_fasta) + ), + overwrite = TRUE + ) readr::read_csv(file.path(panaroo_output_path, "gene_presence_absence.csv")) |> dplyr::select(-`Non-unique Gene name`) |> tidyr::pivot_longer(-c("Gene", "Annotation"), - names_to = "genome_ids", - values_to = "protein_ids") |> + names_to = "genome_ids", + values_to = "protein_ids" + ) |> dplyr::mutate(genome_ids = gsub(".PATRIC", "", genome_ids)) |> dplyr::select(genome_ids, Gene, protein_ids) |> dplyr::distinct() |> @@ -423,9 +426,9 @@ #' @return Invisibly returns TRUE. #' #' @keywords internal -.panaroo2duckdb <- function(panaroo_output_path, duckdb_path){ +.panaroo2duckdb <- function(panaroo_output_path, duckdb_path) { panaroo_output_path <- normalizePath(panaroo_output_path) - duckdb_path <- normalizePath(duckdb_path) + duckdb_path <- normalizePath(duckdb_path) .panaroo2geneTable(panaroo_output_path, duckdb_path) .panaroo2geneNames(panaroo_output_path, duckdb_path) @@ -460,7 +463,6 @@ threads = 0, memory = 0, extra_args = c("-g", "1")) { - # Fail fast if Docker is missing if (!nzchar(Sys.which("docker"))) { stop("Docker is not available on your PATH but is required to run CD-HIT.") @@ -506,7 +508,7 @@ "weizhongli1987/cdhit:4.8.1", "cd-hit", "-i", .to_container(cdhit_input_faa, mount_host, mount_cont), - "-o", .to_container(clustered_faa, mount_host, mount_cont), + "-o", .to_container(clustered_faa, mount_host, mount_cont), "-c", as.character(identity), "-n", as.character(word_length), "-T", as.character(threads), @@ -516,19 +518,24 @@ ) message("Running cd-hit via Docker...") - output <- tryCatch({ - system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) - }, error = function(e) { - stop("cd-hit execution failed: ", e$message) - }) + output <- tryCatch( + { + system2("docker", args = cmd_args, stdout = TRUE, stderr = TRUE) + }, + error = function(e) { + stop("cd-hit execution failed: ", e$message) + } + ) if (!file.exists(clustered_faa)) { stop("cd-hit failed: output file not found. Check stderr:\n", paste(output, collapse = "\n")) } # Ensure .clstr exists (used downstream) if (!file.exists(paste0(clustered_faa, ".clstr"))) { - stop("cd-hit did not produce the expected .clstr file at: ", paste0(clustered_faa, ".clstr"), - "\nFull output:\n", paste(output, collapse = "\n")) + stop( + "cd-hit did not produce the expected .clstr file at: ", paste0(clustered_faa, ".clstr"), + "\nFull output:\n", paste(output, collapse = "\n") + ) } message("cd-hit completed successfully.") @@ -648,14 +655,14 @@ runPanaroo2Duckdb <- function(duckdb_path, if (isTRUE(verbose)) message("Launching Panaroo.") .runPanaroo( - duckdb_path = duckdb_path, - output_path = out_dir, - core_threshold = core_threshold, - len_dif_percent = len_dif_percent, + duckdb_path = duckdb_path, + output_path = out_dir, + core_threshold = core_threshold, + len_dif_percent = len_dif_percent, cluster_threshold = cluster_threshold, family_seq_identity = family_seq_identity, - threads = threads, - split_jobs = split_jobs + threads = threads, + split_jobs = split_jobs ) # Identify Panaroo outputs that contain a final_graph.gml file @@ -706,8 +713,10 @@ runPanaroo2Duckdb <- function(duckdb_path, .parseProteinClusters <- function(clustered_faa) { clstr <- paste0(clustered_faa, ".clstr") if (!file.exists(clstr)) { - stop("CD-HIT cluster file not found: ", clstr, - "\nEnsure .runCDHIT() completed successfully and produced the .clstr file.") + stop( + "CD-HIT cluster file not found: ", clstr, + "\nEnsure .runCDHIT() completed successfully and produced the .clstr file." + ) } lines <- data.table::fread(clstr, sep = "\n", header = FALSE)$V1 @@ -716,7 +725,7 @@ runPanaroo2Duckdb <- function(duckdb_path, for (i in seq_along(cluster_ids)) { start <- cluster_ids[i] + 1 - end <- if (i < length(cluster_ids)) cluster_ids[i + 1] - 1 else length(lines) + end <- if (i < length(cluster_ids)) cluster_ids[i + 1] - 1 else length(lines) cluster_lines <- lines[start:end] # This finds the reference cluster ID and names the cluster with it @@ -728,8 +737,10 @@ runPanaroo2Duckdb <- function(duckdb_path, } # Pull genome IDs - genome_matches <- stringr::str_match(cluster_lines, - "fig\\|([0-9]+\\.[0-9]+)\\.peg\\.[0-9]+")[, 2] + genome_matches <- stringr::str_match( + cluster_lines, + "fig\\|([0-9]+\\.[0-9]+)\\.peg\\.[0-9]+" + )[, 2] genome_matches <- genome_matches[!is.na(genome_matches)] if (length(genome_matches) > 0) { @@ -786,9 +797,9 @@ buildMatrices <- function(cluster_map) .buildProtMatrices(cluster_map) names_faa <- names(cdhit_output_faa) |> tibble::as_tibble() |> dplyr::mutate( - proteinID = stringr::str_extract(value, "^fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), - locus_tag = stringr::str_match(value, "peg\\.[0-9]+\\|([^\\s]+)")[, 2], - proteinName= stringr::str_trim(stringr::str_match(value, "\\|[^\\s]+\\s+(.*?)\\s+\\[")[, 2]) + proteinID = stringr::str_extract(value, "^fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), + locus_tag = stringr::str_match(value, "peg\\.[0-9]+\\|([^\\s]+)")[, 2], + proteinName = stringr::str_trim(stringr::str_match(value, "\\|[^\\s]+\\s+(.*?)\\s+\\[")[, 2]) ) |> dplyr::select(-value) @@ -804,47 +815,47 @@ CDHIT2duckdb <- function(duckdb_path, word_length = 5, threads = 0, memory = 0, - extra_args = c("-g", "1")){ - + extra_args = c("-g", "1")) { duckdb_path <- normalizePath(duckdb_path) con <- DBI::dbConnect(duckdb::duckdb(), duckdb_path) on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) if (missing(output_path) || output_path %in% c(".", "results", "results/")) { - output_path <- dirname(duckdb_path) # e.g., ./results/ + output_path <- dirname(duckdb_path) # e.g., ./results/ } output_path <- normalizePath(output_path) cdhit_outputs <- .runCDHIT(duckdb_path, - output_path, - output_prefix = output_prefix, - identity = identity, - word_length = word_length, - threads = threads, - memory = memory, - extra_args = extra_args) - - cluster_map <- .parseProteinClusters(cdhit_outputs$clustered_faa) + output_path, + output_prefix = output_prefix, + identity = identity, + word_length = word_length, + threads = threads, + memory = memory, + extra_args = extra_args + ) + + cluster_map <- .parseProteinClusters(cdhit_outputs$clustered_faa) cluster_count <- .buildProtMatrices(cluster_map) DBI::dbWriteTable(con, "protein_count", cluster_count, overwrite = TRUE) cluster_fasta <- cdhit_outputs$cdhit_input_faa - cluster_name <- .clusterNames(cluster_map, cluster_fasta) + cluster_name <- .clusterNames(cluster_map, cluster_fasta) DBI::dbWriteTable(con, "protein_names", cluster_name, overwrite = TRUE) clustered_faa <- Biostrings::readAAStringSet(cdhit_outputs$clustered_faa) DBI::dbWriteTable(con, "protein_cluster_seq", - tibble::tibble( - name = names(clustered_faa) |> stringr::str_extract("fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), - sequence = as.character(clustered_faa) - ), - overwrite = TRUE) + tibble::tibble( + name = names(clustered_faa) |> stringr::str_extract("fig\\|[0-9]+\\.[0-9]+\\.peg\\.[0-9]+"), + sequence = as.character(clustered_faa) + ), + overwrite = TRUE + ) invisible(TRUE) } - #' Check or install InterProScan data bundle #' #' Ensures that the InterProScan data directory exists locally, downloading @@ -861,12 +872,12 @@ CDHIT2duckdb <- function(duckdb_path, #' #' @keywords internal .checkInterProData <- function( - version = "5.76-107.0", - dest_dir = "inst/extdata/interpro", - docker_image = sprintf("interpro/interproscan:%s", version), - platform = "linux/amd64", - curl_bin = "curl", - verbose = TRUE + version = "5.76-107.0", + dest_dir = "inst/extdata/interpro", + docker_image = sprintf("interpro/interproscan:%s", version), + platform = "linux/amd64", + curl_bin = "curl", + verbose = TRUE ) { msg <- function(...) if (verbose) message(sprintf(...)) @@ -883,25 +894,29 @@ CDHIT2duckdb <- function(duckdb_path, } # Download bundle if needed - tar_url <- sprintf("http://ftp.ebi.ac.uk/pub/software/unix/iprscan/5/%s/alt/interproscan-data-%s.tar.gz", - version, version) - md5_url <- paste0(tar_url, ".md5") + tar_url <- sprintf( + "http://ftp.ebi.ac.uk/pub/software/unix/iprscan/5/%s/alt/interproscan-data-%s.tar.gz", + version, version + ) + md5_url <- paste0(tar_url, ".md5") tar_path <- file.path(dest_dir, basename(tar_url)) md5_path <- paste0(tar_path, ".md5") if (!file.exists(tar_path)) { msg("Downloading InterProScan data bundle.") - status_tar <- system2(curl_bin, c("-L", "-o", tar_path, tar_url)) - status_md5 <- system2(curl_bin, c("-L", "-o", md5_path, md5_url)) - if (status_tar != 0 || status_md5 != 0) + status_tar <- system2(curl_bin, c("-L", "-o", tar_path, tar_url)) + status_md5 <- system2(curl_bin, c("-L", "-o", md5_path, md5_url)) + if (status_tar != 0 || status_md5 != 0) { stop("Failed to download InterProScan data bundle.") + } } msg("Verifying MD5 checksum.") md5_expected <- sub("\\s+.*$", "", readLines(md5_path)[1]) - md5_actual <- tools::md5sum(tar_path)[[1]] - if (!identical(tolower(md5_expected), tolower(md5_actual))) + md5_actual <- tools::md5sum(tar_path)[[1]] + if (!identical(tolower(md5_expected), tolower(md5_actual))) { stop("MD5 checksum mismatch for InterProScan data bundle.") + } msg("Extracting InterProScan data bundle.") utils::untar(tar_path, exdir = dest_dir, tar = "internal") @@ -922,9 +937,11 @@ CDHIT2duckdb <- function(duckdb_path, #' #' @keywords internal .getDfIPRColNames <- function() { - c("AccNum", "SeqMD5Digest", "SLength", "Analysis", + c( + "AccNum", "SeqMD5Digest", "SLength", "Analysis", "DB.ID", "SignDesc", "StartLoc", "StopLoc", "Score", - "Status", "RunDate", "IPRAcc", "IPRDesc", "placeholder") + "Status", "RunDate", "IPRAcc", "IPRDesc", "placeholder" + ) } #' Internal helpers for reading InterProScan TSV outputs @@ -969,8 +986,9 @@ CDHIT2duckdb <- function(duckdb_path, #' @keywords internal .readIPRscanTsv <- function(filepath) { readr::read_tsv(filepath, - col_types = .getDfIPRColTypes(), - col_names = .getDfIPRColNames()) + col_types = .getDfIPRColTypes(), + col_names = .getDfIPRColNames() + ) } @@ -1002,7 +1020,7 @@ CDHIT2duckdb <- function(duckdb_path, file_format, docker_image = sprintf("interpro/interproscan:%s", "5.76-107.0")) { # Normalize and mount paths - path <- .docker_path(path) + path <- .docker_path(path) bind_data <- .docker_path(ipr_data_path) dir.create(file.path(path, "tmp", "iprscan"), recursive = TRUE, showWarnings = FALSE) @@ -1026,7 +1044,7 @@ CDHIT2duckdb <- function(duckdb_path, "-v", paste0(bind_data, ":/opt/interproscan/data"), "-w", "/work", docker_image, - "--input", .to_container(temp_fasta_file, path, "/work"), + "--input", .to_container(temp_fasta_file, path, "/work"), "--cpu", as.character(threads), "-f", file_format, "--appl", appl_str, @@ -1034,31 +1052,34 @@ CDHIT2duckdb <- function(duckdb_path, ) - status <- tryCatch({ - system2( - "docker", - args = c( - "run", - "--rm", - "--platform", "linux/amd64", # force amd64 for ARM hosts - "-v", paste0(path, ":", "/work"), - "-v", paste0(bind_data, ":/opt/interproscan/data"), - "-w", "/work", - docker_image, - "--input", .to_container(temp_fasta_file, path, "/work"), - "--cpu", as.character(threads), - "-f", file_format, - "--appl", appl_str, - "-b", chunk_out_file_base_cont - ), - stdout = TRUE, - stderr = TRUE - ) - }, error = function(e) { - stop(sprintf("InterProScan execution failed for chunk %d: %s", chunk_id, e$message)) - }) + status <- tryCatch( + { + system2( + "docker", + args = c( + "run", + "--rm", + "--platform", "linux/amd64", # force amd64 for ARM hosts + "-v", paste0(path, ":", "/work"), + "-v", paste0(bind_data, ":/opt/interproscan/data"), + "-w", "/work", + docker_image, + "--input", .to_container(temp_fasta_file, path, "/work"), + "--cpu", as.character(threads), + "-f", file_format, + "--appl", appl_str, + "-b", chunk_out_file_base_cont + ), + stdout = TRUE, + stderr = TRUE + ) + }, + error = function(e) { + stop(sprintf("InterProScan execution failed for chunk %d: %s", chunk_id, e$message)) + } + ) - out_tsv <- paste0(chunk_out_file_base_host, ".tsv") + out_tsv <- paste0(chunk_out_file_base_host, ".tsv") out_tsvgz <- paste0(chunk_out_file_base_host, ".tsv.gz") if (file.exists(out_tsv)) { @@ -1066,8 +1087,10 @@ CDHIT2duckdb <- function(duckdb_path, } else if (file.exists(out_tsvgz)) { return(out_tsvgz) } else { - stop(sprintf("InterProScan produced no output for chunk %d. Checked: %s and %s.\nLast message:\n%s", - chunk_id, out_tsv, out_tsvgz, paste(status, collapse = "\n"))) + stop(sprintf( + "InterProScan produced no output for chunk %d. Checked: %s and %s.\nLast message:\n%s", + chunk_id, out_tsv, out_tsvgz, paste(status, collapse = "\n") + )) } } @@ -1076,14 +1099,13 @@ domainFromIPR <- function(duckdb_path, path, out_file_base = "iprscan", appl = c("Pfam"), - ipr_version = "5.76-107.0", + ipr_version = "5.76-107.0", ipr_dest_dir = "inst/extdata/interpro", ipr_platform = "linux/amd64", auto_prepare_data = TRUE, threads = 8, file_format = "TSV", docker_repo = "interpro/interproscan") { - duckdb_path <- normalizePath(duckdb_path) if (missing(path) || path %in% c(".", "results", "results/")) { path <- dirname(duckdb_path) @@ -1102,8 +1124,10 @@ domainFromIPR <- function(duckdb_path, verbose = TRUE ) } else { - list(data_dir = file.path(ipr_dest_dir, sprintf("interproscan-%s", ipr_version), "data"), - ready = NA) + list( + data_dir = file.path(ipr_dest_dir, sprintf("interproscan-%s", ipr_version), "data"), + ready = NA + ) } ipr_data_path <- ipr_info$data_dir @@ -1114,11 +1138,12 @@ domainFromIPR <- function(duckdb_path, on.exit(try(DBI::dbDisconnect(con, shutdown = FALSE), silent = TRUE), add = TRUE) sequences_df <- dplyr::tbl(con, "protein_cluster_seq") |> tibble::as_tibble() - if (nrow(sequences_df) == 0L) + if (nrow(sequences_df) == 0L) { stop("No sequences found in 'protein_cluster_seq'. Please run CDHIT2duckdb() first.") + } # Chunking for parallel (not currently implemented due to memory limits) - chunks <- list(sequences_df) # Force 1 chunk for RAM limits + chunks <- list(sequences_df) # Force 1 chunk for RAM limits # Forcing 1 container operation for RAM limits workers <- 1 @@ -1153,24 +1178,28 @@ domainFromIPR <- function(duckdb_path, # Combine results tsvs <- Filter(function(x) !is.null(x) && file.exists(x), results) - if (length(tsvs) == 0L) + if (length(tsvs) == 0L) { stop("InterProScan produced no usable outputs. Check Docker logs above.") + } df_iprscan <- do.call(rbind, lapply(tsvs, .readIPRscanTsv)) # Load processed tables (unchanged) DBI::dbWriteTable(con, "domain_names", - df_iprscan |> - dplyr::select(AccNum, DB.ID, SignDesc, IPRAcc, IPRDesc, StartLoc, StopLoc), - overwrite = TRUE) + df_iprscan |> + dplyr::select(AccNum, DB.ID, SignDesc, IPRAcc, IPRDesc, StartLoc, StopLoc), + overwrite = TRUE + ) df_protein_domain_pa <- df_iprscan |> dplyr::select(AccNum, DB.ID, IPRAcc, placeholder) |> dplyr::mutate(domain_ID = stringr::str_glue("{DB.ID}_{IPRAcc}")) |> dplyr::distinct() |> dplyr::mutate(placeholder = stringr::str_replace_all(placeholder, "-", "1")) |> - tidyr::pivot_wider(id_cols = AccNum, names_from = domain_ID, values_from = placeholder, - values_fill = "0") |> + tidyr::pivot_wider( + id_cols = AccNum, names_from = domain_ID, values_from = placeholder, + values_fill = "0" + ) |> dplyr::group_by(AccNum) |> dplyr::summarize(across(everything(), ~ ifelse(any(. == "1"), "1", "0")), .groups = "drop") |> dplyr::mutate(across(-AccNum, as.numeric)) @@ -1178,8 +1207,9 @@ domainFromIPR <- function(duckdb_path, protein_filter <- dplyr::tbl(con, "protein_count") |> tibble::as_tibble() accs <- unique(df_protein_domain_pa$AccNum) accs_in_matrix <- intersect(accs, colnames(protein_filter)) - if (length(accs_in_matrix) == 0L) + if (length(accs_in_matrix) == 0L) { stop("No InterPro accessions match protein_count columns.") + } protein_filter <- protein_filter |> dplyr::select(genome_id, dplyr::all_of(accs_in_matrix)) df_protein_domain_pa <- df_protein_domain_pa |> @@ -1197,8 +1227,8 @@ domainFromIPR <- function(duckdb_path, } # Clean BV-BRC metadata, then save as Parquet files -cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ - duckdb_path <- normalizePath(duckdb_path) +cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/") { + duckdb_path <- normalizePath(duckdb_path) # If no explicit path is provided (or a generic one), choose results// when # the DuckDB lives under data//, or else fall back to the DuckDB directory. if (missing(path) || path %in% c(".", "results", "results/")) { @@ -1221,20 +1251,22 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ clean_drug <- readr::read_tsv(file.path(ref_file_path, "clean_drug.tsv")) drug_class <- readr::read_tsv(file.path(ref_file_path, "drug_class.tsv")) - drug_abbr <- readr::read_tsv(file.path(ref_file_path, "drug_abbr.tsv")) + drug_abbr <- readr::read_tsv(file.path(ref_file_path, "drug_abbr.tsv")) class_abbr <- readr::read_tsv(file.path(ref_file_path, "class_abbr.tsv")) clean_countries <- readr::read_tsv(file.path(ref_file_path, "cleaned_bvbrc_countries.tsv")) |> - dplyr::select("raw_entry", "clean_name", "short_name")|> + dplyr::select("raw_entry", "clean_name", "short_name") |> dplyr::distinct() dplyr::tbl(con, "filtered") |> tibble::as_tibble() |> - dplyr::select("genome_drug.genome_id", "genome_drug.antibiotic", - "genome_drug.genome_name", "genome_drug.laboratory_typing_method", - "genome_drug.resistant_phenotype", "genome_drug.taxon_id", - "genome_drug.pmid", "genome.collection_year", - "genome.isolation_country", "genome.host_common_name", - "genome.isolation_source", "genome.species") |> + dplyr::select( + "genome_drug.genome_id", "genome_drug.antibiotic", + "genome_drug.genome_name", "genome_drug.laboratory_typing_method", + "genome_drug.resistant_phenotype", "genome_drug.taxon_id", + "genome_drug.pmid", "genome.collection_year", + "genome.isolation_country", "genome.host_common_name", + "genome.isolation_source", "genome.species" + ) |> dplyr::left_join(clean_drug, by = c("genome_drug.antibiotic" = "original_drug")) |> dplyr::filter(!is.na(cleaned_drug)) |> dplyr::left_join(drug_class, by = c("cleaned_drug" = "drug")) |> @@ -1243,7 +1275,7 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ DBI::dbWriteTable(conn = con, name = "filtered", overwrite = TRUE) resistance_summary <- dplyr::tbl(con, "filtered") |> - tibble::as_tibble() |> + tibble::as_tibble() |> dplyr::filter(genome_drug.resistant_phenotype == "Resistant") |> dplyr::group_by(genome_drug.genome_id) |> dplyr::summarise( @@ -1257,7 +1289,7 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ dplyr::mutate(genome_drug.antibiotic = cleaned_drug) |> dplyr::select(-cleaned_drug) |> dplyr::left_join(clean_countries, by = c("genome.isolation_country" = "raw_entry")) |> - dplyr::rename("cleaned_country"="clean_name", "country_abbr"="short_name") |> + dplyr::rename("cleaned_country" = "clean_name", "country_abbr" = "short_name") |> dplyr::mutate(genome.isolation_country = cleaned_country) |> dplyr::select(-cleaned_country) |> dplyr::left_join(resistance_summary, by = "genome_drug.genome_id") |> @@ -1265,38 +1297,42 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ is.na(resistant_classes) ~ genome_drug.resistant_phenotype, TRUE ~ resistant_classes )) |> - dplyr::mutate(num_resistant_classes= dplyr::case_when( + dplyr::mutate(num_resistant_classes = dplyr::case_when( is.na(num_resistant_classes) ~ 0, TRUE ~ num_resistant_classes )) |> dplyr::mutate(genome.collection_year = as.numeric(genome.collection_year)) |> - dplyr::mutate(year_bin = cut(genome.collection_year, breaks = year_breaks, - right = FALSE, include.lowest = TRUE, - labels = paste(year_breaks[-length(year_breaks)], - year_breaks[-1] - 1, sep = "-"))) |> + dplyr::mutate(year_bin = cut(genome.collection_year, + breaks = year_breaks, + right = FALSE, include.lowest = TRUE, + labels = paste(year_breaks[-length(year_breaks)], + year_breaks[-1] - 1, + sep = "-" + ) + )) |> DBI::dbWriteTable(conn = con, name = "cleaned_metadata", overwrite = TRUE) # Parquet output paths - genes_parquet <- file.path(path, "gene_count.parquet") - gene_names_parquet <- file.path(path, "gene_names.parquet") - gene_ref_seq_parquet <- file.path(path, "gene_seqs.parquet") - genome_gene_protein_parquet <- file.path(path, "genome_gene_protein.parquet") - struct_parquet <- file.path(path, "struct.parquet") + genes_parquet <- file.path(path, "gene_count.parquet") + gene_names_parquet <- file.path(path, "gene_names.parquet") + gene_ref_seq_parquet <- file.path(path, "gene_seqs.parquet") + genome_gene_protein_parquet <- file.path(path, "genome_gene_protein.parquet") + struct_parquet <- file.path(path, "struct.parquet") - proteins_parquet <- file.path(path, "protein_count.parquet") - domains_parquet <- file.path(path, "domain_count.parquet") + proteins_parquet <- file.path(path, "protein_count.parquet") + domains_parquet <- file.path(path, "domain_count.parquet") - metadata_parquet <- file.path(path, "metadata.parquet") # cleaned_metadata exported as 'metadata' + metadata_parquet <- file.path(path, "metadata.parquet") # cleaned_metadata exported as 'metadata' - domain_names_parquet <- file.path(path, "domain_names.parquet") - protein_names_parquet <- file.path(path, "protein_names.parquet") + domain_names_parquet <- file.path(path, "domain_names.parquet") + protein_names_parquet <- file.path(path, "protein_names.parquet") - protein_cluster_seq_parquet <- file.path(path, "protein_seqs.parquet") + protein_cluster_seq_parquet <- file.path(path, "protein_seqs.parquet") # Also export AMR/genome/original metadata - amr_phenotype_parquet <- file.path(path, "amr_phenotype.parquet") - genome_data_parquet <- file.path(path, "genome_data.parquet") - original_metadata_parquet <- file.path(path, "original_metadata.parquet") + amr_phenotype_parquet <- file.path(path, "amr_phenotype.parquet") + genome_data_parquet <- file.path(path, "genome_data.parquet") + original_metadata_parquet <- file.path(path, "original_metadata.parquet") writeCompressedParquet <- function(df, path) { arrow::write_parquet( @@ -1308,7 +1344,9 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ ) } - db_name <- duckdb_path |> stringr::str_split_i(".duckdb", i = 1) |> paste0("_parquet.duckdb") + db_name <- duckdb_path |> + stringr::str_split_i(".duckdb", i = 1) |> + paste0("_parquet.duckdb") con_new <- DBI::dbConnect(duckdb::duckdb(), db_name) on.exit(try(DBI::dbDisconnect(con_new, shutdown = FALSE), silent = TRUE), add = TRUE) @@ -1373,8 +1411,8 @@ cleanData <- function(duckdb_path, path, ref_file_path = "data_raw/"){ # debug/complete views: amr_phenotype, genome_data, original_metadata DBI::dbReadTable(con, "amr_phenotype") |> writeCompressedParquet(amr_phenotype_parquet) - DBI::dbReadTable(con, "genome_data") |> writeCompressedParquet(genome_data_parquet) - DBI::dbReadTable(con, "metadata") |> writeCompressedParquet(original_metadata_parquet) + DBI::dbReadTable(con, "genome_data") |> writeCompressedParquet(genome_data_parquet) + DBI::dbReadTable(con, "metadata") |> writeCompressedParquet(original_metadata_parquet) DBI::dbExecute(con_new, sprintf("CREATE OR REPLACE VIEW amr_phenotype AS SELECT * FROM read_parquet('%s')", amr_phenotype_parquet)) DBI::dbExecute(con_new, sprintf("CREATE OR REPLACE VIEW genome_data AS SELECT * FROM read_parquet('%s')", genome_data_parquet)) @@ -1539,7 +1577,7 @@ runDataProcessing <- function(duckdb_path, cdhit_identity = 0.9, cdhit_word_length = 5, cdhit_memory = 0, - cdhit_extra_args = c("-g","1"), + cdhit_extra_args = c("-g", "1"), cdhit_output_prefix = "cdhit_out", # InterPro ipr_appl = c("Pfam"), @@ -1625,4 +1663,3 @@ runDataProcessing <- function(duckdb_path, parquet_duckdb_path = normalizePath(parquet_duckdb_path) )) } - diff --git a/vignettes/intro.Rmd b/vignettes/intro.Rmd index 59ccbc5..9d9a36f 100644 --- a/vignettes/intro.Rmd +++ b/vignettes/intro.Rmd @@ -31,13 +31,15 @@ For metadata curation with `amRdata`, use `retrieveMetadata()` to create a DuckD ```{r} # Download all AMR data for a species from BV-BRC -retrieveMetadata(user_bacs = "Shigella flexneri", - filter_type = "AMR", - base_dir = "../data/", - abx = "All", - overwrite = FALSE, - image = "danylmb/bvbrc:5.3", - verbose = FALSE) +retrieveMetadata( + user_bacs = "Shigella flexneri", + filter_type = "AMR", + base_dir = "../data/", + abx = "All", + overwrite = FALSE, + image = "danylmb/bvbrc:5.3", + verbose = FALSE +) ``` This wrote tables 'amr_phenotype', 'genome_data', and 'metadata' to a DuckDB @@ -45,16 +47,18 @@ This wrote tables 'amr_phenotype', 'genome_data', and 'metadata' to a DuckDB For downloading genomes with paired AMR phenotype data, use `retrieveGenomes()` ```{r } -retrieveGenomes(base_dir = "../data/", - user_bacs = "Shigella flexneri", - method = c("cli"), - image = "danylmb/bvbrc:5.3", - skip_existing = TRUE, - ftp_workers = 8L, - cli_fasta_workers = 4L, - cli_gff_workers = 4L, - chunk_size = 50L, - verbose = TRUE) +retrieveGenomes( + base_dir = "../data/", + user_bacs = "Shigella flexneri", + method = c("cli"), + image = "danylmb/bvbrc:5.3", + skip_existing = TRUE, + ftp_workers = 8L, + cli_fasta_workers = 4L, + cli_gff_workers = 4L, + chunk_size = 50L, + verbose = TRUE +) ``` This returns character vector of genome IDs and wrote complete file sets on disk. @@ -62,10 +66,11 @@ This returns character vector of genome IDs and wrote complete file sets on disk To write a .txt file listing downloaded genome filepaths (.fna, .faa, .gff) ```{r } -genomeList(base_dir = "../data/", - user_bacs = "Shigella flexneri", - verbose = TRUE) - +genomeList( + base_dir = "../data/", + user_bacs = "Shigella flexneri", + verbose = TRUE +) ``` #### A wrapper for downloading genomes and listing the paths @@ -77,11 +82,13 @@ Internally runs: Allows users to input a species or taxon ID and automate all data downloading and curation steps. ```{r } -prepareGenomes(user_bacs = "Shigella flexneri", - base_dir = "../data/", - method = c("cli"), - overwrite = FALSE, - verbose = TRUE) +prepareGenomes( + user_bacs = "Shigella flexneri", + base_dir = "../data/", + method = c("cli"), + overwrite = FALSE, + verbose = TRUE +) ``` ## Data Processing @@ -95,32 +102,34 @@ Internally runs: 4. `cleanData()` -\> Clean metadata and export the feature tables to Parquet + Parquet-backed DuckDB ```{r} -runDataProcessing(duckdb_path = "../data/Shigella_flexneri/Sfl.duckdb", - output_path = NULL, - # unified threads for all tools - threads = 16, - # Panaroo - panaroo_split_jobs = FALSE, - panaroo_core_threshold = 0.90, - panaroo_len_dif_percent = 0.95, - panaroo_cluster_threshold = 0.95, - panaroo_family_seq_identity = 0.5, - # CD-HIT - cdhit_identity = 0.9, - cdhit_word_length = 5, - cdhit_memory = 0, - cdhit_extra_args = c("-g","1"), - cdhit_output_prefix = "cdhit_out", - # InterPro - ipr_appl = c("Pfam"), - ipr_threads_unused = NULL, - ipr_version = "5.76-107.0", - ipr_dest_dir = "inst/extdata/interpro", - ipr_platform = "linux/amd64", - auto_prepare_data = TRUE, - # Metadata cleaning - ref_file_path = "../data_raw/", - verbose = TRUE) +runDataProcessing( + duckdb_path = "../data/Shigella_flexneri/Sfl.duckdb", + output_path = NULL, + # unified threads for all tools + threads = 16, + # Panaroo + panaroo_split_jobs = FALSE, + panaroo_core_threshold = 0.90, + panaroo_len_dif_percent = 0.95, + panaroo_cluster_threshold = 0.95, + panaroo_family_seq_identity = 0.5, + # CD-HIT + cdhit_identity = 0.9, + cdhit_word_length = 5, + cdhit_memory = 0, + cdhit_extra_args = c("-g", "1"), + cdhit_output_prefix = "cdhit_out", + # InterPro + ipr_appl = c("Pfam"), + ipr_threads_unused = NULL, + ipr_version = "5.76-107.0", + ipr_dest_dir = "inst/extdata/interpro", + ipr_platform = "linux/amd64", + auto_prepare_data = TRUE, + # Metadata cleaning + ref_file_path = "../data_raw/", + verbose = TRUE +) ``` ## Plots @@ -128,8 +137,10 @@ runDataProcessing(duckdb_path = "../data/Shigella_flexneri/Sfl.duckdb", Simple stats and plots to explore metadata ```{r } -generateSummary("data/metadata_parquet", - out_path = "data/") -generatePlots("data/metadata_parquet", - out_path = "data/") +generateSummary("data/metadata_parquet", + out_path = "data/" +) +generatePlots("data/metadata_parquet", + out_path = "data/" +) ``` From 0291082c2dcc38eaf13f757837b56711b294578d Mon Sep 17 00:00:00 2001 From: Janani Ravi Date: Fri, 24 Apr 2026 20:47:46 -0600 Subject: [PATCH 3/3] Replace apply variants with purrr; update deps - Replaced apply functions with purrr helpers - BiocParallel SnowParam usage, simplifying list binding and TSV processing. - Update DESCRIPTION: add BiocParallel and Biostrings to Imports and remove furrr, future, and future.apply (also remove Biostrings from Suggests). - Removed the .with_future_plan man page, wrap examples in \dontrun{...} - fixed .gitignore to match *.Rcheck entries --- .gitignore | 2 +- DESCRIPTION | 6 ++---- R/data_curation.R | 19 ++++++++++--------- R/data_processing.R | 13 ++++++------- man/dot-generateDBname.Rd | 2 ++ man/dot-with_future_plan.Rd | 21 --------------------- 6 files changed, 21 insertions(+), 42 deletions(-) delete mode 100644 man/dot-with_future_plan.Rd diff --git a/.gitignore b/.gitignore index b768489..4cd89f4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .Rproj.user .Rhistory -..Rcheck/ +*.Rcheck/ .Rdata .httr-oauth .DS_Store diff --git a/DESCRIPTION b/DESCRIPTION index 1fcdb5c..8c41962 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -29,13 +29,12 @@ URL: https://github.com/JRaviLab/amR_data BugReports: https://github.com/JRaviLab/amR_data/issues Imports: arrow, + BiocParallel, + Biostrings, DBI, data.table, dplyr, duckdb, - furrr, - future, - future.apply, glue, purrr, readr, @@ -46,7 +45,6 @@ Imports: biocViews: GenomeAssembly, Annotation, Sequencing VignetteBuilder: knitr Suggests: - Biostrings, knitr, rmarkdown, testthat (>= 3.0.0) diff --git a/R/data_curation.R b/R/data_curation.R index cf41093..c12ed6f 100644 --- a/R/data_curation.R +++ b/R/data_curation.R @@ -316,8 +316,10 @@ #' @return A single character string representing the combined shortened name. #' #' @examples +#' \dontrun{ #' .generateDBname(c("90371", "Bacillus subtilis")) #' .generateDBname(c("12345", "Escherichia coli", "Lactobacillus")) +#' } #' .generateDBname <- function(user_bacs) { db_parts <- c() @@ -925,7 +927,7 @@ retrieveMetadata <- function(user_bacs, if (!grepl("^##gff-version\\s*3", lines[1])) { lines <- c("##gff-version 3", lines) } - out <- vapply(lines, function(line) { + out <- purrr::map_chr(lines, function(line) { if (grepl("^#", line)) { return(line) } @@ -935,7 +937,7 @@ retrieveMetadata <- function(user_bacs, } else { line } - }, character(1)) + }) writeLines(out, gff_path, sep = "\n", useBytes = TRUE) invisible(TRUE) } @@ -1101,17 +1103,17 @@ retrieveMetadata <- function(user_bacs, gff <- file.path(dir, paste0(genomeID, ".PATRIC.gff")) paths <- c(fna, faa, gff) all(file.exists(paths)) && - all(vapply(paths, function(x) file.info(x)$size, numeric(1)) > min_bytes) + all(purrr::map_dbl(paths, function(x) file.info(x)$size) > min_bytes) } # List genomes already completed .list_complete <- function(dir, genome_ids, min_bytes = 100) { - genome_ids[vapply(genome_ids, .is_complete_set, logical(1), dir = dir, min_bytes = min_bytes)] + genome_ids[purrr::map_lgl(genome_ids, .is_complete_set, dir = dir, min_bytes = min_bytes)] } # Any genomes missing bits or pieces? Find em .missing_any <- function(dir, genome_ids, min_bytes = 100) { - genome_ids[!vapply(genome_ids, .is_complete_set, logical(1), dir = dir, min_bytes = min_bytes)] + genome_ids[!purrr::map_lgl(genome_ids, .is_complete_set, dir = dir, min_bytes = min_bytes)] } # Audit function @@ -1422,7 +1424,7 @@ retrieveGenomes <- function(base_dir = ".", if (!all(g_res) && isTRUE(verbose)) warning(sum(!g_res), " GFF chunks had failures.") # Success set: .fna + .PATRIC.faa + .PATRIC.gff all present per isolate - ok_ids <- ids[vapply(ids, .is_complete_set, logical(1), dir = genome_path)] + ok_ids <- ids[purrr::map_lgl(ids, .is_complete_set, dir = genome_path)] if (isTRUE(verbose)) { message( "Complete file sets downloaded for ", @@ -1467,7 +1469,7 @@ genomeList <- function(base_dir = ".", genome_ids <- unique(c(gff_ids, fna_ids, faa_ids)) - list_of_files <- lapply(genome_ids, function(genomeID) { + list_of_files <- purrr::map(genome_ids, function(genomeID) { gff_path <- file.path(genome_path, paste0(genomeID, ".PATRIC.gff")) fna_path <- file.path(genome_path, paste0(genomeID, ".fna")) faa_path <- file.path(genome_path, paste0(genomeID, ".PATRIC.faa")) @@ -1487,8 +1489,7 @@ genomeList <- function(base_dir = ".", }, stringsAsFactors = FALSE ) - }) - list_of_files <- do.call(rbind, list_of_files) + }) |> purrr::list_rbind() list_of_files <- tibble::as_tibble(list_of_files) |> dplyr::filter(!is.na(panaroo_input)) diff --git a/R/data_processing.R b/R/data_processing.R index ab636d9..aeb35a0 100644 --- a/R/data_processing.R +++ b/R/data_processing.R @@ -64,11 +64,11 @@ # Write the genome list file (convert each "gff fna" to container-visible paths) genome_filepath_host <- tempfile(pattern = "genomeFilepath_", fileext = ".txt", tmpdir = output_path) - batch_input_cont <- vapply(unlist(batch_input), function(line) { + batch_input_cont <- purrr::map_chr(unlist(batch_input), function(line) { parts <- strsplit(line, " +")[[1]] parts_cont <- .to_container(parts, host_root = mount_host, container_root = mount_cont) paste(parts_cont, collapse = " ") - }, character(1), USE.NAMES = FALSE) + }) # Write with Unix line endings to avoid issues inside Linux container con <- file(genome_filepath_host, open = "wb") @@ -176,17 +176,16 @@ split_files <- strsplit(panaroo_input_files, " ") - param <- BiocParallel::SnowParam(workers = max(1L, threads)) - valid_entries <- BiocParallel::bplapply(split_files, function(paths) { + valid_entries <- purrr::map_lgl(split_files, function(paths) { gff_file <- paths[1] if (file.exists(gff_file)) { length(readLines(gff_file, n = 5, warn = FALSE)) >= 5 } else { FALSE } - }, BPPARAM = param) + }) - filtered_panaroo_input <- sapply(split_files[unlist(valid_entries)], paste, collapse = " ") + filtered_panaroo_input <- purrr::map_chr(split_files[valid_entries], paste, collapse = " ") total_lines <- length(filtered_panaroo_input) batch_size <- if (isTRUE(split_jobs)) ceiling(total_lines / 5) else total_lines @@ -1182,7 +1181,7 @@ domainFromIPR <- function(duckdb_path, stop("InterProScan produced no usable outputs. Check Docker logs above.") } - df_iprscan <- do.call(rbind, lapply(tsvs, .readIPRscanTsv)) + df_iprscan <- purrr::map(tsvs, .readIPRscanTsv) |> purrr::list_rbind() # Load processed tables (unchanged) DBI::dbWriteTable(con, "domain_names", diff --git a/man/dot-generateDBname.Rd b/man/dot-generateDBname.Rd index 48b80f8..bebcf43 100644 --- a/man/dot-generateDBname.Rd +++ b/man/dot-generateDBname.Rd @@ -22,7 +22,9 @@ the first two letters of the species. For single-word names, it appends "sp". For numeric taxon IDs, it prefixes them with "tid_". } \examples{ +\dontrun{ .generateDBname(c("90371", "Bacillus subtilis")) .generateDBname(c("12345", "Escherichia coli", "Lactobacillus")) +} } diff --git a/man/dot-with_future_plan.Rd b/man/dot-with_future_plan.Rd deleted file mode 100644 index aeece96..0000000 --- a/man/dot-with_future_plan.Rd +++ /dev/null @@ -1,21 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/data_processing.R -\name{.with_future_plan} -\alias{.with_future_plan} -\title{Temporarily set a future plan for parallel execution} -\usage{ -.with_future_plan(workers, plan = c("multisession", "sequential")) -} -\arguments{ -\item{workers}{Integer. Number of workers to use; if <= 1, uses sequential mode.} - -\item{plan}{Character. Either \code{"multisession"} or \code{"sequential"}.} -} -\value{ -Invisibly returns \code{TRUE} after setting the plan. -} -\description{ -Sets a \code{future} plan (sequential or multisession) for the duration of a block -and automatically restores the previous plan on exit. -} -\keyword{internal}