diff --git a/R/analyse-helpers.R b/R/analyse-helpers.R index 0f01ba1..d3bce7a 100644 --- a/R/analyse-helpers.R +++ b/R/analyse-helpers.R @@ -264,3 +264,99 @@ df_out } + + +#' Initialize an incremental writer for /results datasets +#' @noRd +.init_results_stream_writer <- function(write_results_name, + write_results_file, + n_rows, + column_names, + flush_every = 1000L, + storage_mode = "double", + compression_level = 4L) { + if (is.null(write_results_name)) { + return(NULL) + } + if (!is.character(write_results_name) || length(write_results_name) != 1L || write_results_name == "") { + stop("write_results_name must be a non-empty character string when provided") + } + if (is.null(write_results_file) || !is.character(write_results_file) || length(write_results_file) != 1L) { + stop("write_results_file must be a single character path when write_results_name is provided") + } + if (!is.numeric(flush_every) || length(flush_every) != 1L || flush_every <= 0) { + stop("write_results_flush_every must be a positive integer") + } + flush_every <- as.integer(flush_every) + + if (!file.exists(write_results_file)) { + rhdf5::h5createFile(write_results_file) + } + + h5_write <- hdf5r::H5File$new(write_results_file, mode = "a") + if (!h5_write$exists("results")) { + h5_write$create_group("results") + } + results_grp <- h5_write$open("results") + if (results_grp$exists(write_results_name)) { + results_grp$link_delete(write_results_name) + } + results_grp$create_group(write_results_name) + h5_write$close_all() + + dataset_path <- paste0("results/", write_results_name, "/results_matrix") + chunk_rows <- min(flush_every, n_rows) + + rhdf5::h5createDataset( + file = write_results_file, + dataset = dataset_path, + dims = c(n_rows, length(column_names)), + storage.mode = storage_mode, + chunk = c(chunk_rows, length(column_names)), + level = as.integer(compression_level) + ) + + list( + file = write_results_file, + dataset_path = dataset_path, + names_path = paste0("results/", write_results_name, "/column_names"), + column_names = as.character(column_names), + n_cols = length(column_names), + write_row_cursor = 1L + ) +} + + +#' Append one block to an incremental /results writer +#' @noRd +.results_stream_write_block <- function(writer, block_df) { + if (is.null(writer)) { + return(writer) + } + block <- as.matrix(block_df) + row_idx <- writer$write_row_cursor:(writer$write_row_cursor + nrow(block) - 1L) + rhdf5::h5write( + obj = block, + file = writer$file, + name = writer$dataset_path, + index = list(row_idx, seq_len(writer$n_cols)) + ) + writer$write_row_cursor <- writer$write_row_cursor + nrow(block) + writer +} + + +#' Finalize an incremental /results writer +#' @noRd +.finalize_results_stream_writer <- function(writer) { + if (is.null(writer)) { + return(invisible(NULL)) + } + rhdf5::h5write( + obj = writer$column_names, + file = writer$file, + name = writer$names_path + ) + rhdf5::h5closeAll() + invisible(NULL) +} diff --git a/R/analyse.R b/R/analyse.R index d6d3d85..cb0ec6c 100644 --- a/R/analyse.R +++ b/R/analyse.R @@ -72,6 +72,14 @@ #' @param on_error Character: one of "stop", "skip", or "debug". When an error occurs #' while fitting an element, choose whether to stop, skip returning all-NaN values for #' that element, or drop into `browser()` (if interactive) then skip. Default: "stop". +#' @param write_results_name Optional analysis name for incremental writes to +#' `results//results_matrix`. +#' @param write_results_file Optional HDF5 file path used when `write_results_name` is provided. +#' @param write_results_flush_every Positive integer number of elements per write block. +#' @param write_results_storage_mode Storage mode for results writes (e.g., `"double"`). +#' @param write_results_compression_level Gzip compression level (0-9) for results writes. +#' @param return_output If TRUE (default), return the combined data.frame. If FALSE, +#' returns `invisible(NULL)`; useful for streaming large runs to HDF5. #' @param ... Additional arguments for `stats::lm()` #' @return Tibble with the summarized model statistics for all elements requested #' @importFrom dplyr %>% @@ -88,7 +96,14 @@ ModelArray.lm <- function(formula, data, phenotypes, scalar, element.subset = NU correct.p.value.terms = c("fdr"), correct.p.value.model = c("fdr"), num.subj.lthr.abs = 10, num.subj.lthr.rel = 0.2, verbose = TRUE, pbar = TRUE, n_cores = 1, - on_error = "stop", ...) { + on_error = "stop", + write_results_name = NULL, + write_results_file = NULL, + write_results_flush_every = 1000L, + write_results_storage_mode = "double", + write_results_compression_level = 4L, + return_output = TRUE, + ...) { .validate_modelarray_input(data) element.subset <- .validate_element_subset(element.subset, data, scalar) phenotypes <- .align_phenotypes(data, phenotypes, scalar) @@ -222,25 +237,74 @@ ModelArray.lm <- function(formula, data, phenotypes, scalar, element.subset = NU message(glue::glue("looping across elements....")) } - fits <- .parallel_dispatch(element.subset, analyseOneElement.lm, n_cores, pbar, - formula = formula, modelarray = data, phenotypes = phenotypes, scalar = scalar, - var.terms = var.terms, var.model = var.model, - num.subj.lthr = num.subj.lthr, num.stat.output = num.stat.output, - flag_initiate = FALSE, on_error = on_error, - ... + need_term_correction <- (!all(correct.p.value.terms == "none")) && ("p.value" %in% var.terms) + need_model_correction <- (!all(correct.p.value.model == "none")) && ("p.value" %in% var.model) + need_full_df <- return_output || need_term_correction || need_model_correction + if (!return_output && (need_term_correction || need_model_correction)) { + stop("return_output=FALSE is not supported when p-value correction is requested") + } + + writer <- .init_results_stream_writer( + write_results_name = write_results_name, + write_results_file = write_results_file, + n_rows = length(element.subset), + column_names = column_names, + flush_every = write_results_flush_every, + storage_mode = write_results_storage_mode, + compression_level = write_results_compression_level ) + fits_all <- if (need_full_df) vector("list", length(element.subset)) else NULL + chunk_size <- if (is.null(writer)) length(element.subset) else as.integer(write_results_flush_every) + chunk_starts <- seq(1L, length(element.subset), by = chunk_size) + for (chunk_start in chunk_starts) { + chunk_end <- min(chunk_start + chunk_size - 1L, length(element.subset)) + chunk_elements <- element.subset[chunk_start:chunk_end] + + chunk_fits <- .parallel_dispatch(chunk_elements, analyseOneElement.lm, n_cores, pbar, + formula = formula, modelarray = data, phenotypes = phenotypes, scalar = scalar, + var.terms = var.terms, var.model = var.model, + num.subj.lthr = num.subj.lthr, num.stat.output = num.stat.output, + flag_initiate = FALSE, on_error = on_error, + ... + ) + + if (need_full_df) { + fits_all[chunk_start:chunk_end] <- chunk_fits + } + if (!is.null(writer)) { + chunk_df <- as.data.frame(do.call(rbind, chunk_fits)) + colnames(chunk_df) <- column_names + writer <- .results_stream_write_block(writer, chunk_df) + } + } + .finalize_results_stream_writer(writer) - df_out <- do.call(rbind, fits) - df_out <- as.data.frame(df_out) # turn into data.frame - colnames(df_out) <- column_names # add column names + if (!need_full_df) { + return(invisible(NULL)) + } + df_out <- do.call(rbind, fits_all) + df_out <- as.data.frame(df_out) + colnames(df_out) <- column_names - # Add corrections of p.values df_out <- .correct_pvalues(df_out, list.terms, correct.p.value.terms, var.terms) df_out <- .correct_pvalues(df_out, "model", correct.p.value.model, var.model) - df_out # return + # Streamed blocks were uncorrected; rewrite final corrected results if needed. + if (!is.null(writer) && (need_term_correction || need_model_correction)) { + writeResults( + fn.output = write_results_file, + df.output = df_out, + analysis_name = write_results_name, + overwrite = TRUE + ) + } + + if (return_output) { + return(df_out) + } + invisible(NULL) } #' Run GAM for element-wise data @@ -383,6 +447,14 @@ ModelArray.lm <- function(formula, data, phenotypes, scalar, element.subset = NU #' @param on_error Character: one of "stop", "skip", or "debug". When an error occurs #' while fitting an element, choose whether to stop, skip returning all-NaN values for #' that element, or drop into `browser()` (if interactive) then skip. Default: "stop". +#' @param write_results_name Optional analysis name for incremental writes to +#' `results//results_matrix`. +#' @param write_results_file Optional HDF5 file path used when `write_results_name` is provided. +#' @param write_results_flush_every Positive integer number of elements per write block. +#' @param write_results_storage_mode Storage mode for results writes (e.g., `"double"`). +#' @param write_results_compression_level Gzip compression level (0-9) for results writes. +#' @param return_output If TRUE (default), return the combined data.frame. If FALSE, +#' returns `invisible(NULL)`; useful for streaming large runs to HDF5. #' @param ... Additional arguments for `mgcv::gam()` #' @return Tibble with the summarized model statistics for all elements requested #' @importFrom dplyr %>% mutate @@ -403,7 +475,14 @@ ModelArray.gam <- function(formula, data, phenotypes, scalar, element.subset = N correct.p.value.parametricTerms = c("fdr"), num.subj.lthr.abs = 10, num.subj.lthr.rel = 0.2, verbose = TRUE, pbar = TRUE, n_cores = 1, - on_error = "stop", ...) { + on_error = "stop", + write_results_name = NULL, + write_results_file = NULL, + write_results_flush_every = 1000L, + write_results_storage_mode = "double", + write_results_compression_level = 4L, + return_output = TRUE, + ...) { .validate_modelarray_input(data) element.subset <- .validate_element_subset(element.subset, data, scalar) @@ -570,19 +649,60 @@ ModelArray.gam <- function(formula, data, phenotypes, scalar, element.subset = N message(glue::glue("looping across elements....")) } - fits <- .parallel_dispatch(element.subset, analyseOneElement.gam, n_cores, pbar, - formula = formula, modelarray = data, phenotypes = phenotypes, scalar = scalar, - var.smoothTerms = var.smoothTerms, var.parametricTerms = var.parametricTerms, - var.model = var.model, - num.subj.lthr = num.subj.lthr, num.stat.output = num.stat.output, - flag_initiate = FALSE, flag_sse = flag_sse, on_error = on_error, - ... + need_smooth_correction <- (!all(correct.p.value.smoothTerms == "none")) && + ("p.value" %in% var.smoothTerms) + need_param_correction <- (!all(correct.p.value.parametricTerms == "none")) && + ("p.value" %in% var.parametricTerms) + need_changed_rsq <- !is.null(changed.rsq.term.index) + need_full_df <- return_output || need_smooth_correction || need_param_correction || need_changed_rsq + if (!return_output && (need_smooth_correction || need_param_correction || need_changed_rsq)) { + stop("return_output=FALSE is not supported when p-value correction or changed.rsq is requested") + } + + writer <- .init_results_stream_writer( + write_results_name = write_results_name, + write_results_file = write_results_file, + n_rows = length(element.subset), + column_names = column_names, + flush_every = write_results_flush_every, + storage_mode = write_results_storage_mode, + compression_level = write_results_compression_level ) + fits_all <- if (need_full_df) vector("list", length(element.subset)) else NULL + chunk_size <- if (is.null(writer)) length(element.subset) else as.integer(write_results_flush_every) + chunk_starts <- seq(1L, length(element.subset), by = chunk_size) + for (chunk_start in chunk_starts) { + chunk_end <- min(chunk_start + chunk_size - 1L, length(element.subset)) + chunk_elements <- element.subset[chunk_start:chunk_end] + + chunk_fits <- .parallel_dispatch(chunk_elements, analyseOneElement.gam, n_cores, pbar, + formula = formula, modelarray = data, phenotypes = phenotypes, scalar = scalar, + var.smoothTerms = var.smoothTerms, var.parametricTerms = var.parametricTerms, + var.model = var.model, + num.subj.lthr = num.subj.lthr, num.stat.output = num.stat.output, + flag_initiate = FALSE, flag_sse = flag_sse, on_error = on_error, + ... + ) + + if (need_full_df) { + fits_all[chunk_start:chunk_end] <- chunk_fits + } + if (!is.null(writer)) { + chunk_df <- as.data.frame(do.call(rbind, chunk_fits)) + colnames(chunk_df) <- column_names + writer <- .results_stream_write_block(writer, chunk_df) + } + } + .finalize_results_stream_writer(writer) + + if (!need_full_df) { + return(invisible(NULL)) + } - df_out <- do.call(rbind, fits) - df_out <- as.data.frame(df_out) # turn into data.frame - colnames(df_out) <- column_names # add column names + df_out <- do.call(rbind, fits_all) + df_out <- as.data.frame(df_out) + colnames(df_out) <- column_names ### get the changed.rsq for smooth terms: @@ -719,8 +839,20 @@ ModelArray.gam <- function(formula, data, phenotypes, scalar, element.subset = N df_out, list.parametricTerms, correct.p.value.parametricTerms, var.parametricTerms ) + if (!is.null(writer) && (need_smooth_correction || need_param_correction || need_changed_rsq)) { + writeResults( + fn.output = write_results_file, + df.output = df_out, + analysis_name = write_results_name, + overwrite = TRUE + ) + } + ### return - df_out + if (return_output) { + return(df_out) + } + invisible(NULL) } @@ -756,6 +888,26 @@ ModelArray.gam <- function(formula, data, phenotypes, scalar, element.subset = N #' @param on_error Character: one of "stop", "skip", or "debug". When an error occurs in #' the user function for an element, choose whether to stop, skip returning all-NaN values #' for that element, or drop into `browser()` (if interactive) then skip. Default: "stop". +#' @param write_scalar_name Optional character scalar name. If provided, `ModelArray.wrap` +#' writes selected output columns into `scalars//values` while processing. +#' @param write_scalar_file Optional character HDF5 output filename used when +#' `write_scalar_name` is provided. +#' @param write_scalar_columns Optional character/integer selector for output columns to save +#' as scalar values. If NULL, uses all wrap output columns except `element_id`. +#' @param write_scalar_column_names Optional character vector of source names saved as +#' scalar `column_names`. If NULL, uses `phenotypes$source_file`. +#' @param write_scalar_flush_every Positive integer number of elements per write block. +#' @param write_scalar_storage_mode Storage mode for scalar writes (e.g., `"double"`). +#' @param write_scalar_compression_level Gzip compression level (0-9) for scalar writes. +#' @param write_results_name Optional analysis name for incremental writes to +#' `results//results_matrix`. +#' @param write_results_file Optional HDF5 file path used when `write_results_name` is provided. +#' @param write_results_flush_every Positive integer number of elements per write block for +#' results writes. +#' @param write_results_storage_mode Storage mode for results writes (e.g., `"double"`). +#' @param write_results_compression_level Gzip compression level (0-9) for results writes. +#' @param return_output If TRUE (default), return the combined data.frame. If FALSE, +#' returns `invisible(NULL)`; useful when writing large outputs directly to HDF5. #' @param ... Additional arguments forwarded to `FUN` #' @return Tibble/data.frame with one row per element and first column `element_id` #' @importFrom dplyr %>% @@ -766,7 +918,21 @@ ModelArray.gam <- function(formula, data, phenotypes, scalar, element.subset = N ModelArray.wrap <- function(FUN, data, phenotypes, scalar, element.subset = NULL, num.subj.lthr.abs = 10, num.subj.lthr.rel = 0.2, verbose = TRUE, pbar = TRUE, n_cores = 1, - on_error = "stop", ...) { + on_error = "stop", + write_scalar_name = NULL, + write_scalar_file = NULL, + write_scalar_columns = NULL, + write_scalar_column_names = NULL, + write_scalar_flush_every = 1000L, + write_scalar_storage_mode = "double", + write_scalar_compression_level = 4L, + write_results_name = NULL, + write_results_file = NULL, + write_results_flush_every = 1000L, + write_results_storage_mode = "double", + write_results_compression_level = 4L, + return_output = TRUE, + ...) { .validate_modelarray_input(data) element.subset <- .validate_element_subset(element.subset, data, scalar) phenotypes <- .align_phenotypes(data, phenotypes, scalar) @@ -791,16 +957,163 @@ ModelArray.wrap <- function(FUN, data, phenotypes, scalar, element.subset = NULL if (verbose) message(glue::glue("looping across elements....")) - fits <- .parallel_dispatch(element.subset, analyseOneElement.wrap, n_cores, pbar, - user_fun = FUN, modelarray = data, phenotypes = phenotypes, scalar = scalar, - num.subj.lthr = num.subj.lthr, num.stat.output = num.stat.output, - flag_initiate = FALSE, on_error = on_error, - ... + writer_results <- .init_results_stream_writer( + write_results_name = write_results_name, + write_results_file = write_results_file, + n_rows = length(element.subset), + column_names = column_names, + flush_every = write_results_flush_every, + storage_mode = write_results_storage_mode, + compression_level = write_results_compression_level ) - df_out <- do.call(rbind, fits) + writing_scalar <- !is.null(write_scalar_name) + if (writing_scalar) { + if (!is.character(write_scalar_name) || length(write_scalar_name) != 1L || write_scalar_name == "") { + stop("write_scalar_name must be a non-empty character string when provided") + } + if (is.null(write_scalar_file) || !is.character(write_scalar_file) || length(write_scalar_file) != 1L) { + stop("write_scalar_file must be a single character path when write_scalar_name is provided") + } + if (!is.numeric(write_scalar_flush_every) || length(write_scalar_flush_every) != 1L || write_scalar_flush_every <= 0) { + stop("write_scalar_flush_every must be a positive integer") + } + write_scalar_flush_every <- as.integer(write_scalar_flush_every) + + if (is.null(write_scalar_columns)) { + write_scalar_columns <- setdiff(column_names, "element_id") + } + + if (is.character(write_scalar_columns)) { + missing_cols <- setdiff(write_scalar_columns, column_names) + if (length(missing_cols) > 0L) { + stop( + "write_scalar_columns not found in wrap output: ", + paste(missing_cols, collapse = ", ") + ) + } + scalar_col_idx <- match(write_scalar_columns, column_names) + } else if (is.numeric(write_scalar_columns)) { + scalar_col_idx <- as.integer(write_scalar_columns) + if (any(scalar_col_idx < 1L) || any(scalar_col_idx > length(column_names))) { + stop("write_scalar_columns numeric indices are out of bounds") + } + } else { + stop("write_scalar_columns must be NULL, character, or integer") + } + + scalar_col_names <- column_names[scalar_col_idx] + if (length(scalar_col_names) == 0L) { + stop("write_scalar_columns selected zero columns") + } + + if (is.null(write_scalar_column_names)) { + write_scalar_column_names <- as.character(phenotypes[["source_file"]]) + } else { + write_scalar_column_names <- as.character(write_scalar_column_names) + } + if (length(write_scalar_column_names) != length(scalar_col_idx)) { + stop("length(write_scalar_column_names) must equal number of selected write_scalar_columns") + } + + # Initialize scalar output dataset. + if (!file.exists(write_scalar_file)) { + rhdf5::h5createFile(write_scalar_file) + } + scalar_grp <- paste0("scalars/", write_scalar_name) + h5_write <- hdf5r::H5File$new(write_scalar_file, mode = "a") + if (!h5_write$exists("scalars")) { + h5_write$create_group("scalars") + } + scalars_grp_obj <- h5_write$open("scalars") + if (scalars_grp_obj$exists(write_scalar_name)) { + scalars_grp_obj$link_delete(write_scalar_name) + } + scalars_grp_obj$create_group(write_scalar_name) + h5_write$close_all() + + dataset_path <- paste0(scalar_grp, "/values") + chunk_rows <- min(write_scalar_flush_every, length(element.subset)) + rhdf5::h5createDataset( + file = write_scalar_file, + dataset = dataset_path, + dims = c(length(element.subset), length(scalar_col_idx)), + storage.mode = write_scalar_storage_mode, + chunk = c(chunk_rows, length(scalar_col_idx)), + level = as.integer(write_scalar_compression_level) + ) + } + + fits_all <- if (return_output) vector("list", length(element.subset)) else NULL + write_row_cursor <- 1L + + chunk_size <- length(element.subset) + if (writing_scalar) { + chunk_size <- min(chunk_size, write_scalar_flush_every) + } + if (!is.null(writer_results)) { + chunk_size <- min(chunk_size, as.integer(write_results_flush_every)) + } + chunk_starts <- seq(1L, length(element.subset), by = chunk_size) + for (chunk_start in chunk_starts) { + chunk_end <- min(chunk_start + chunk_size - 1L, length(element.subset)) + chunk_elements <- element.subset[chunk_start:chunk_end] + + chunk_fits <- .parallel_dispatch(chunk_elements, analyseOneElement.wrap, n_cores, pbar, + user_fun = FUN, modelarray = data, phenotypes = phenotypes, scalar = scalar, + num.subj.lthr = num.subj.lthr, num.stat.output = num.stat.output, + flag_initiate = FALSE, on_error = on_error, + ... + ) + + if (return_output) { + fits_all[chunk_start:chunk_end] <- chunk_fits + } + + chunk_df <- NULL + if (!is.null(writer_results) || writing_scalar) { + chunk_df <- as.data.frame(do.call(rbind, chunk_fits)) + colnames(chunk_df) <- column_names + } + if (!is.null(writer_results)) { + writer_results <- .results_stream_write_block(writer_results, chunk_df) + } + + if (writing_scalar) { + block <- as.matrix(chunk_df[, scalar_col_idx, drop = FALSE]) + row_idx <- write_row_cursor:(write_row_cursor + nrow(block) - 1L) + rhdf5::h5write( + obj = block, + file = write_scalar_file, + name = dataset_path, + index = list(row_idx, seq_len(ncol(block))) + ) + write_row_cursor <- write_row_cursor + nrow(block) + } + } + + if (writing_scalar) { + rhdf5::h5writeAttribute( + attr = write_scalar_column_names, + h5obj = write_scalar_file, + name = "column_names", + h5loc = dataset_path + ) + rhdf5::h5write( + obj = write_scalar_column_names, + file = write_scalar_file, + name = paste0("scalars/", write_scalar_name, "/column_names") + ) + rhdf5::h5closeAll() + } + .finalize_results_stream_writer(writer_results) + + if (!return_output) { + return(invisible(NULL)) + } + + df_out <- do.call(rbind, fits_all) df_out <- as.data.frame(df_out) colnames(df_out) <- column_names - df_out } diff --git a/man/ModelArray.gam.Rd b/man/ModelArray.gam.Rd index a12481d..ffa2990 100644 --- a/man/ModelArray.gam.Rd +++ b/man/ModelArray.gam.Rd @@ -23,6 +23,12 @@ ModelArray.gam( pbar = TRUE, n_cores = 1, on_error = "stop", + write_results_name = NULL, + write_results_file = NULL, + write_results_flush_every = 1000L, + write_results_storage_mode = "double", + write_results_compression_level = 4L, + return_output = TRUE, ... ) } @@ -97,10 +103,25 @@ Default is 0.2.} while fitting an element, choose whether to stop, skip returning all-NaN values for that element, or drop into `browser()` (if interactive) then skip. Default: "stop".} +\item{write_results_name}{Optional analysis name for incremental writes to +`results//results_matrix`.} + +\item{write_results_file}{Optional HDF5 file path used when `write_results_name` is provided.} + +\item{write_results_flush_every}{Positive integer number of elements per write block.} + +\item{write_results_storage_mode}{Storage mode for results writes (e.g., `"double"`).} + +\item{write_results_compression_level}{Gzip compression level (0-9) for results writes.} + +\item{return_output}{If TRUE (default), return the combined data.frame. If FALSE, +returns `invisible(NULL)`; useful for streaming large runs to HDF5.} + \item{...}{Additional arguments for `mgcv::gam()`} } \value{ -Tibble with the summarized model statistics for all elements requested +Tibble with the summarized model statistics for all elements requested when +`return_output = TRUE`; otherwise `invisible(NULL)`. } \description{ `ModelArray.gam` fits gam model for each of elements requested, diff --git a/man/ModelArray.lm.Rd b/man/ModelArray.lm.Rd index 0ff64e6..9188bbf 100644 --- a/man/ModelArray.lm.Rd +++ b/man/ModelArray.lm.Rd @@ -21,6 +21,12 @@ ModelArray.lm( pbar = TRUE, n_cores = 1, on_error = "stop", + write_results_name = NULL, + write_results_file = NULL, + write_results_flush_every = 1000L, + write_results_storage_mode = "double", + write_results_compression_level = 4L, + return_output = TRUE, ... ) } @@ -83,10 +89,25 @@ Default is 0.2.} while fitting an element, choose whether to stop, skip returning all-NaN values for that element, or drop into `browser()` (if interactive) then skip. Default: "stop".} +\item{write_results_name}{Optional analysis name for incremental writes to +`results//results_matrix`.} + +\item{write_results_file}{Optional HDF5 file path used when `write_results_name` is provided.} + +\item{write_results_flush_every}{Positive integer number of elements per write block.} + +\item{write_results_storage_mode}{Storage mode for results writes (e.g., `"double"`).} + +\item{write_results_compression_level}{Gzip compression level (0-9) for results writes.} + +\item{return_output}{If TRUE (default), return the combined data.frame. If FALSE, +returns `invisible(NULL)`; useful for streaming large runs to HDF5.} + \item{...}{Additional arguments for `stats::lm()`} } \value{ -Tibble with the summarized model statistics for all elements requested +Tibble with the summarized model statistics for all elements requested when +`return_output = TRUE`; otherwise `invisible(NULL)`. } \description{ `ModelArray.lm` fits linear model (`stats::lm()`) for each of elements requested, and returns a tibble diff --git a/man/ModelArray.wrap.Rd b/man/ModelArray.wrap.Rd index 044c352..19b5835 100644 --- a/man/ModelArray.wrap.Rd +++ b/man/ModelArray.wrap.Rd @@ -16,6 +16,19 @@ ModelArray.wrap( pbar = TRUE, n_cores = 1, on_error = "stop", + write_scalar_name = NULL, + write_scalar_file = NULL, + write_scalar_columns = NULL, + write_scalar_column_names = NULL, + write_scalar_flush_every = 1000L, + write_scalar_storage_mode = "double", + write_scalar_compression_level = 4L, + write_results_name = NULL, + write_results_file = NULL, + write_results_flush_every = 1000L, + write_results_storage_mode = "double", + write_results_compression_level = 4L, + return_output = TRUE, ... ) } @@ -46,10 +59,44 @@ It must match `sources(data)[[scalar]]` order and contents (reordered if needed) the user function for an element, choose whether to stop, skip returning all-NaN values for that element, or drop into `browser()` (if interactive) then skip. Default: "stop".} +\item{write_scalar_name}{Optional character scalar name. If provided, `ModelArray.wrap` +writes selected output columns into `scalars//values` while processing.} + +\item{write_scalar_file}{Optional character HDF5 output filename used when +`write_scalar_name` is provided.} + +\item{write_scalar_columns}{Optional character/integer selector for output columns to save +as scalar values. If NULL, uses all wrap output columns except `element_id`.} + +\item{write_scalar_column_names}{Optional character vector of source names saved as +scalar `column_names`. If NULL, uses `phenotypes$source_file`.} + +\item{write_scalar_flush_every}{Positive integer number of elements per write block.} + +\item{write_scalar_storage_mode}{Storage mode for scalar writes (e.g., `"double"`).} + +\item{write_scalar_compression_level}{Gzip compression level (0-9) for scalar writes.} + +\item{write_results_name}{Optional analysis name for incremental writes to +`results//results_matrix`.} + +\item{write_results_file}{Optional HDF5 file path used when `write_results_name` is provided.} + +\item{write_results_flush_every}{Positive integer number of elements per write block for +results writes.} + +\item{write_results_storage_mode}{Storage mode for results writes (e.g., `"double"`).} + +\item{write_results_compression_level}{Gzip compression level (0-9) for results writes.} + +\item{return_output}{If TRUE (default), return the combined data.frame. If FALSE, +returns `invisible(NULL)`; useful when writing large outputs directly to HDF5.} + \item{...}{Additional arguments forwarded to `FUN`} } \value{ Tibble/data.frame with one row per element and first column `element_id` +when `return_output = TRUE`; otherwise `invisible(NULL)`. } \description{ `ModelArray.wrap` runs a user-supplied function `FUN` for each requested element and diff --git a/tests/testthat/test-stream_results.R b/tests/testthat/test-stream_results.R new file mode 100644 index 0000000..92e38f0 --- /dev/null +++ b/tests/testthat/test-stream_results.R @@ -0,0 +1,124 @@ +test_that("ModelArray.lm can stream results to HDF5", { + src <- paste0("s", 1:8) + mat <- matrix(c( + 1, 3, 2, 7, 5, 9, 8, 6, + 2, 1, 4, 3, 6, 5, 7, 8, + 5, 2, 6, 1, 7, 3, 8, 4 + ), nrow = 3, byrow = TRUE) + modelarray <- methods::new( + "ModelArray", + sources = list(FD = src), + scalars = list(FD = mat), + results = list(), + path = tempfile(fileext = ".h5") + ) + phen <- data.frame(source_file = src, age = seq(10, 80, by = 10)) + h5_out <- tempfile(fileext = ".h5") + on.exit(unlink(h5_out), add = TRUE) + + out <- ModelArray.lm( + formula = FD ~ age, + data = modelarray, + phenotypes = phen, + scalar = "FD", + element.subset = as.integer(c(1, 2, 3)), + var.terms = c("estimate"), + var.model = c("adj.r.squared"), + correct.p.value.terms = c("none"), + correct.p.value.model = c("none"), + num.subj.lthr.abs = 0, + n_cores = 1, + pbar = FALSE, + verbose = FALSE, + write_results_name = "lm_stream", + write_results_file = h5_out, + write_results_flush_every = 2L, + return_output = TRUE + ) + + mat_h5 <- rhdf5::h5read(h5_out, "results/lm_stream/results_matrix") + expect_equal(dim(mat_h5), dim(out)) + expect_equal(unname(mat_h5), unname(as.matrix(out))) +}) + +test_that("ModelArray.gam can stream results to HDF5", { + src <- paste0("s", 1:8) + mat <- matrix(c( + 1, 3, 2, 7, 5, 9, 8, 6, + 2, 1, 4, 3, 6, 5, 7, 8, + 5, 2, 6, 1, 7, 3, 8, 4 + ), nrow = 3, byrow = TRUE) + modelarray <- methods::new( + "ModelArray", + sources = list(FD = src), + scalars = list(FD = mat), + results = list(), + path = tempfile(fileext = ".h5") + ) + phen <- data.frame(source_file = src, age = seq(10, 80, by = 10)) + h5_out <- tempfile(fileext = ".h5") + on.exit(unlink(h5_out), add = TRUE) + + out <- ModelArray.gam( + formula = FD ~ age, + data = modelarray, + phenotypes = phen, + scalar = "FD", + element.subset = as.integer(c(1, 2, 3)), + var.smoothTerms = c(), + var.parametricTerms = c("estimate"), + var.model = c("dev.expl"), + correct.p.value.smoothTerms = c("none"), + correct.p.value.parametricTerms = c("none"), + num.subj.lthr.abs = 0, + n_cores = 1, + pbar = FALSE, + verbose = FALSE, + write_results_name = "gam_stream", + write_results_file = h5_out, + write_results_flush_every = 2L, + return_output = TRUE + ) + + mat_h5 <- rhdf5::h5read(h5_out, "results/gam_stream/results_matrix") + expect_equal(dim(mat_h5), dim(out)) + expect_equal(unname(mat_h5), unname(as.matrix(out))) +}) + +test_that("ModelArray.wrap can stream results to HDF5 without returning output", { + src <- c("s1", "s2", "s3", "s4") + modelarray <- methods::new( + "ModelArray", + sources = list(FD = src), + scalars = list(FD = matrix(c(1, 2, 3, 4, 2, 3, 4, 5), nrow = 2, byrow = TRUE)), + results = list(), + path = tempfile(fileext = ".h5") + ) + phen <- data.frame(source_file = src, age = c(20, 30, 40, 50)) + h5_out <- tempfile(fileext = ".h5") + on.exit(unlink(h5_out), add = TRUE) + + fun_wrap <- function(data) { + data.frame(m = mean(data$FD), a = data$age[1]) + } + + out <- ModelArray.wrap( + FUN = fun_wrap, + data = modelarray, + phenotypes = phen, + scalar = "FD", + element.subset = as.integer(c(1, 2)), + num.subj.lthr.abs = 0, + n_cores = 1, + pbar = FALSE, + verbose = FALSE, + write_results_name = "wrap_stream", + write_results_file = h5_out, + write_results_flush_every = 1L, + return_output = FALSE + ) + + expect_null(out) + mat_h5 <- rhdf5::h5read(h5_out, "results/wrap_stream/results_matrix") + expect_equal(dim(mat_h5), c(2, 3)) +}) diff --git a/tests/testthat/test-wrap_write_scalars.R b/tests/testthat/test-wrap_write_scalars.R new file mode 100644 index 0000000..4b39fbb --- /dev/null +++ b/tests/testthat/test-wrap_write_scalars.R @@ -0,0 +1,87 @@ +test_that("ModelArray.wrap can stream selected columns to scalars", { + src <- c("s1", "s2", "s3", "s4") + modelarray <- methods::new( + "ModelArray", + sources = list(FD = src), + scalars = list(FD = matrix(c(1, 2, 3, 4, 2, 3, 4, 5), nrow = 2, byrow = TRUE)), + results = list(), + path = tempfile(fileext = ".h5") + ) + phen <- data.frame(source_file = src, age = c(20, 30, 40, 50)) + h5_out <- tempfile(fileext = ".h5") + on.exit(unlink(h5_out), add = TRUE) + + fun_harmonize <- function(data) { + vals <- data$FD + 10 + names(vals) <- data$source_file + vals + } + + out <- ModelArray.wrap( + FUN = fun_harmonize, + data = modelarray, + phenotypes = phen, + scalar = "FD", + element.subset = as.integer(c(1, 2)), + n_cores = 1, + pbar = FALSE, + verbose = FALSE, + num.subj.lthr.abs = 0, + write_scalar_name = "FD_harmonized", + write_scalar_file = h5_out, + write_scalar_flush_every = 1L + ) + + expect_equal(dim(out), c(2, 5)) + expect_true(all(c("element_id", src) %in% colnames(out))) + + h5_vals <- rhdf5::h5read(h5_out, "scalars/FD_harmonized/values") + expect_equal( + h5_vals, + matrix(c(11, 12, 13, 14, 12, 13, 14, 15), nrow = 2, byrow = TRUE) + ) + + ma_out <- ModelArray(h5_out, scalar_types = c("FD_harmonized")) + loaded <- as.matrix(scalars(ma_out)[["FD_harmonized"]]) + dimnames(loaded) <- NULL + expect_equal(loaded, h5_vals) + expect_equal(sources(ma_out)[["FD_harmonized"]], src) +}) + +test_that("ModelArray.wrap write-scalar mode validates column metadata", { + src <- c("s1", "s2", "s3", "s4") + modelarray <- methods::new( + "ModelArray", + sources = list(FD = src), + scalars = list(FD = matrix(c(1, 2, 3, 4, 2, 3, 4, 5), nrow = 2, byrow = TRUE)), + results = list(), + path = tempfile(fileext = ".h5") + ) + phen <- data.frame(source_file = src, age = c(20, 30, 40, 50)) + h5_out <- tempfile(fileext = ".h5") + on.exit(unlink(h5_out), add = TRUE) + + simple_fun <- function(data) { + vals <- data$FD + names(vals) <- data$source_file + vals + } + + expect_error( + ModelArray.wrap( + FUN = simple_fun, + data = modelarray, + phenotypes = phen, + scalar = "FD", + element.subset = as.integer(c(1, 2)), + n_cores = 1, + pbar = FALSE, + verbose = FALSE, + num.subj.lthr.abs = 0, + write_scalar_name = "FD_harmonized", + write_scalar_file = h5_out, + write_scalar_column_names = c("only_one_name") + ), + "length\\(write_scalar_column_names\\) must equal number of selected write_scalar_columns" + ) +}) diff --git a/vignettes/modelling.Rmd b/vignettes/modelling.Rmd index 0f832a0..3a37662 100644 --- a/vignettes/modelling.Rmd +++ b/vignettes/modelling.Rmd @@ -275,6 +275,92 @@ head(result) writeResults(h5_path, df.output = result, analysis_name = "site_analysis") ``` +### Case study: harmonize with `covfam` and stream to a new scalar + +You can also use `ModelArray.wrap()` as a transformation engine rather than a test runner. +The example below applies a harmonization step per element, streams the harmonized values +directly into `/scalars`, and then loads that harmonized scalar in a new `ModelArray`. + +```{r wrap_covfam_case_study, eval=FALSE} +library(covfam) + +# Assume phenotypes has harmonization variables, e.g.: +# source_file, site, age, sex +# and that "thickness" is already present in the h5 as an input scalar. + +h5_in <- "thickness_raw.h5" +h5_harmonized <- "thickness_harmonized.h5" + +ma_raw <- ModelArray(h5_in, scalar_types = "thickness") +phenotypes <- read.csv("phenotypes.csv") + +# Copy metadata/layout into a new file so we can append new scalars there. +file.copy(h5_in, h5_harmonized, overwrite = TRUE) + +covfam_harmonize_element <- function(data) { + # Harmonize one element across subjects. + # Adjust arguments here to match your covfam configuration. + out <- covfam::covfam( + y = data$thickness, + batch = data$site, + mod = data.frame(age = data$age, sex = data$sex) + ) + + # Return named subject-level vector so wrap columns map to source_file order. + vals <- out$y_harmonized + names(vals) <- data$source_file + vals +} + +# Stream harmonized subject-level values into scalars/thickness_covfam/values. +# Set return_output = FALSE to avoid keeping the full output table in memory. +ModelArray.wrap( + FUN = covfam_harmonize_element, + data = ma_raw, + phenotypes = phenotypes, + scalar = "thickness", + n_cores = 8, + write_scalar_name = "thickness_covfam", + write_scalar_file = h5_harmonized, + write_scalar_flush_every = 2000L, + return_output = FALSE +) + +# Load the harmonized scalar as a new modelling input +ma_harmonized <- ModelArray(h5_harmonized, scalar_types = c("thickness_covfam")) + +# Use harmonized data in downstream models +fit_harmonized <- ModelArray.lm( + thickness_covfam ~ age + sex, + data = ma_harmonized, + phenotypes = phenotypes, + scalar = "thickness_covfam", + n_cores = 8 +) +``` + +If you want to stream model statistics too (not only transformed scalars), use +`write_results_name` and `write_results_file` in `ModelArray.lm()`, `ModelArray.gam()`, +or `ModelArray.wrap()`. + +#### Quick QA: export one harmonized image to inspect + +After writing `thickness_covfam` into `/scalars`, you can export one row back to an image +and open it in your usual viewer as a spot-check. Use `--column-index` to select which row +from `scalars/thickness_covfam/values` to export. + +```{bash eval=FALSE} +modelarrayio h5-export-nifti-file \ + --input-hdf5 thickness_harmonized.h5 \ + --scalar-name thickness_covfam \ + --column-index 0 \ + --group-mask-file group_mask_thickness.nii.gz \ + --output-file qa_thickness_covfam_col000.nii.gz +``` + +If your workflow uses CIFTI or MIF instead of NIfTI, use the matching export command: +`h5-export-cifti-file` or `h5-export-mif-file`. + ## Modelling across multiple h5 files with `mergeModelArrays()` When scalars live in separate h5 files — for example, cortical thickness in one file