Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions R/analyse-helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,99 @@

df_out
}


#' Initialize an incremental writer for /results datasets
#' @noRd
.init_results_stream_writer <- function(write_results_name,
write_results_file,
n_rows,
column_names,
flush_every = 1000L,
storage_mode = "double",
compression_level = 4L) {
if (is.null(write_results_name)) {
return(NULL)
}
if (!is.character(write_results_name) || length(write_results_name) != 1L || write_results_name == "") {
stop("write_results_name must be a non-empty character string when provided")
}
if (is.null(write_results_file) || !is.character(write_results_file) || length(write_results_file) != 1L) {
stop("write_results_file must be a single character path when write_results_name is provided")
}
if (!is.numeric(flush_every) || length(flush_every) != 1L || flush_every <= 0) {
stop("write_results_flush_every must be a positive integer")
}
flush_every <- as.integer(flush_every)

if (!file.exists(write_results_file)) {
rhdf5::h5createFile(write_results_file)
}

h5_write <- hdf5r::H5File$new(write_results_file, mode = "a")
if (!h5_write$exists("results")) {
h5_write$create_group("results")
}
results_grp <- h5_write$open("results")
if (results_grp$exists(write_results_name)) {
results_grp$link_delete(write_results_name)
}
results_grp$create_group(write_results_name)
h5_write$close_all()

dataset_path <- paste0("results/", write_results_name, "/results_matrix")
chunk_rows <- min(flush_every, n_rows)

rhdf5::h5createDataset(
file = write_results_file,
dataset = dataset_path,
dims = c(n_rows, length(column_names)),
storage.mode = storage_mode,
chunk = c(chunk_rows, length(column_names)),
level = as.integer(compression_level)
)

list(
file = write_results_file,
dataset_path = dataset_path,
names_path = paste0("results/", write_results_name, "/column_names"),
column_names = as.character(column_names),
n_cols = length(column_names),
write_row_cursor = 1L
)
}


#' Append one block to an incremental /results writer
#' @noRd
.results_stream_write_block <- function(writer, block_df) {
if (is.null(writer)) {
return(writer)
}
block <- as.matrix(block_df)
row_idx <- writer$write_row_cursor:(writer$write_row_cursor + nrow(block) - 1L)
rhdf5::h5write(
obj = block,
file = writer$file,
name = writer$dataset_path,
index = list(row_idx, seq_len(writer$n_cols))
)
writer$write_row_cursor <- writer$write_row_cursor + nrow(block)
writer
}


#' Finalize an incremental /results writer
#' @noRd
.finalize_results_stream_writer <- function(writer) {
if (is.null(writer)) {
return(invisible(NULL))
}
rhdf5::h5write(
obj = writer$column_names,
file = writer$file,
name = writer$names_path
)
rhdf5::h5closeAll()
invisible(NULL)
}
Loading
Loading