From e6f09d6b3780ceca0d3da5abe4491760c7705145 Mon Sep 17 00:00:00 2001 From: Chris Lutsch <265904602+cl-earthscope@users.noreply.github.com> Date: Tue, 10 Mar 2026 14:21:05 -0600 Subject: [PATCH 1/2] Fix consolidator crash on vacuumed delete commits. The previous implementation of the commits consolidator assumed that `.del` files always exist on disk as standalone files. If a `.del` file was consolidated and subsequently vacuumed, the consolidator threw a fatal non-retrievable error when trying to read its file size and payload during subsequent consolidation runs. Additionally, the `ArrayDirectory` string-based verification for vacuuming superseded `.con` files failed when it encountered embedded binary `.del` payloads, preventing those older `.con` files from ever being vacuumed. This PR resolves these issues by making the engine fully aware of physical payload locations: * `ArrayDirectory` now tracks the exact physical URI and byte offset for `.del` payloads (whether raw on disk or embedded in older `.con` files) inside `delete_and_update_tiles_location_`. * Added a `skip_delete_payload` helper in `ArrayDirectory` to properly advance the stream past binary data during `.con` file string verification. * `Consolidator::write_consolidated_commits_file` now uses the `location_map` to extract `.del` payloads from their actual physical locations instead of only querying the VFS using the logical URI. * Fixes the engine crash when consolidating an array with previously vacuumed delete commits. * Allows superseded `.con` files containing delete payloads to be successfully verified and vacuumed. The most significant changes are in `tiledb/sm/array/array_directory.cc` (parsing and offset tracking) and `tiledb/sm/consolidator/consolidator.cc` (location-aware reading). Explicitly added a check to ensure `.wrt` files are not added to the payload location map, as they are zero-byte markers. Added two unit tests (`[array-directory][commits-mode-del]` and `[array-directory][vacuum-binary-skip]`) to verify payload offset mapping and `.con` string verification. Added a C API integration test (`[capi][consolidation][commits][deletes]`) that executes the exact Consolidate -> Vacuum -> Consolidate sequence to guarantee the engine no longer crashes on missing `.del` files. --- TYPE: BUG DESC: Fix consolidator crash when processing vacuumed delete commits and allow vacuuming of .con files containing delete payloads. --- BUILDING_FROM_SOURCE.md | 2 +- test/src/unit-capi-consolidation.cc | 57 ++++++++- tiledb/sm/array/array_directory.cc | 37 ++++-- tiledb/sm/array/test/unit_array_directory.cc | 116 +++++++++++++++++++ tiledb/sm/consolidator/consolidator.cc | 64 +++++++--- 5 files changed, 251 insertions(+), 25 deletions(-) diff --git a/BUILDING_FROM_SOURCE.md b/BUILDING_FROM_SOURCE.md index 97184271677..7c502a5101b 100644 --- a/BUILDING_FROM_SOURCE.md +++ b/BUILDING_FROM_SOURCE.md @@ -130,7 +130,7 @@ To **build** after configuration, run the following: To **install**, run the following: ```bash -> cmake --build . --target install-tiledb --config Release +> cmake --build . --target install --config Release ``` Other helpful build targets are as follows: diff --git a/test/src/unit-capi-consolidation.cc b/test/src/unit-capi-consolidation.cc index e1907946285..5a0646074c0 100644 --- a/test/src/unit-capi-consolidation.cc +++ b/test/src/unit-capi-consolidation.cc @@ -7604,8 +7604,9 @@ TEST_CASE_METHOD( const char* msg; tiledb_error_message(err, &msg); CHECK( - std::string("FragmentConsolidator: Consolidation read 0 cells, no " - "progress can be made") == msg); + std::string( + "FragmentConsolidator: Consolidation read 0 cells, no " + "progress can be made") == msg); remove_sparse_string_array(); } @@ -8032,3 +8033,55 @@ TEST_CASE_METHOD( CHECK(rc == TILEDB_OK); } } + +TEST_CASE_METHOD( + ConsolidationFx, + "C API: Test consolidation of already consolidated delete commits", + "[capi][consolidation][commits][deletes]") { + // Create a sparse array (Query Conditions require sparse arrays) + remove_sparse_array(); + create_sparse_array(); + + tiledb_array_t* array; + REQUIRE( + tiledb_array_alloc(ctx_, sparse_array_uri_.c_str(), &array) == TILEDB_OK); + + // Write initial data (Creates a .wrt file) + write_sparse_full(); + + // Submit a Delete Query Condition (Creates a .del file) + REQUIRE(tiledb_array_open(ctx_, array, TILEDB_DELETE) == TILEDB_OK); + tiledb_query_t* query; + REQUIRE(tiledb_query_alloc(ctx_, array, TILEDB_DELETE, &query) == TILEDB_OK); + + tiledb_query_condition_t* qc; + REQUIRE(tiledb_query_condition_alloc(ctx_, &qc) == TILEDB_OK); + int32_t val = 3; + // "a1" is the int32 attribute created by create_sparse_array() + REQUIRE( + tiledb_query_condition_init( + ctx_, qc, "a1", &val, sizeof(int32_t), TILEDB_EQ) == TILEDB_OK); + REQUIRE(tiledb_query_set_condition(ctx_, query, qc) == TILEDB_OK); + REQUIRE(tiledb_query_submit(ctx_, query) == TILEDB_OK); + + REQUIRE(tiledb_array_close(ctx_, array) == TILEDB_OK); + tiledb_query_condition_free(&qc); + tiledb_query_free(&query); + + // First Consolidation: Packs the .wrt and .del into a .con file + consolidate_sparse("commits"); + + // Vacuum: Destroys the physical .del file from disk + vacuum_sparse("commits"); + + // Write more data: Creates a new .wrt file so the consolidator has work to + // do + write_sparse_unordered(); + + // Second Consolidation + consolidate_sparse("commits"); + + // Clean up + tiledb_array_free(&array); + remove_sparse_array(); +} diff --git a/tiledb/sm/array/array_directory.cc b/tiledb/sm/array/array_directory.cc index 6a8581b9ee8..80edfa6fb0b 100644 --- a/tiledb/sm/array/array_directory.cc +++ b/tiledb/sm/array/array_directory.cc @@ -721,6 +721,21 @@ ArrayDirectory::load_consolidated_commit_uris( // Load the commit URIs to ignore. This is done in serial for now as it can be // optimized by vacuuming. std::unordered_set ignore_set; + + // Helper lambda to advance the stream past binary delete payloads. + // Assumes the caller has already verified this is a delete commit. + auto skip_delete_payload = [](std::stringstream& stream, + uint64_t* out_payload_offset = nullptr) { + storage_size_t size = 0; + stream.read(reinterpret_cast(&size), sizeof(storage_size_t)); + + if (out_payload_offset) { + *out_payload_offset = static_cast(stream.tellg()); + } + + stream.seekg(size, std::ios_base::cur); + }; + for (auto& uri : commits_dir_uris) { if (stdx::string::ends_with( uri.to_string(), constants::ignore_file_suffix)) { @@ -764,11 +779,8 @@ ArrayDirectory::load_consolidated_commit_uris( // If we have a delete, process the condition tile if (stdx::string::ends_with( condition_marker, constants::delete_file_suffix)) { - storage_size_t size = 0; - ss.read( - static_cast(static_cast(&size)), - sizeof(storage_size_t)); - auto pos = ss.tellg(); + uint64_t payload_pos = 0; + skip_delete_payload(ss, &payload_pos); // Get the start and end timestamp for this delete FragmentID fragment_id{URI(condition_marker)}; @@ -778,10 +790,8 @@ ArrayDirectory::load_consolidated_commit_uris( // times if (timestamps_overlap(delete_timestamp_range, false)) { delete_and_update_tiles_location_.emplace_back( - uri, condition_marker, pos); + uri, condition_marker, payload_pos); } - pos += size; - ss.seekg(pos); } } } @@ -808,6 +818,10 @@ ArrayDirectory::load_consolidated_commit_uris( all_in_set = false; break; } + // Skip the binary payload if it's a delete file + if (stdx::string::ends_with(uri_str, constants::delete_file_suffix)) { + skip_delete_payload(ss); + } } if (all_in_set && count == uris_set.size()) { @@ -922,6 +936,13 @@ void ArrayDirectory::load_commits_uris_to_consolidate( uri.to_string(), constants::delete_file_suffix)) { if (consolidated_uris_set.count(uri) == 0) { commit_uris_to_consolidate_.emplace_back(uri); + if (stdx::string::ends_with( + uri.to_string(), constants::delete_file_suffix)) { + const auto base_uri_size = uri_.to_string().size(); + // 0 offset because it is a raw file + delete_and_update_tiles_location_.emplace_back( + uri, uri.to_string().substr(base_uri_size), 0); + } } } } diff --git a/tiledb/sm/array/test/unit_array_directory.cc b/tiledb/sm/array/test/unit_array_directory.cc index 940ace2fad5..3ef26fcd18c 100644 --- a/tiledb/sm/array/test/unit_array_directory.cc +++ b/tiledb/sm/array/test/unit_array_directory.cc @@ -137,3 +137,119 @@ TEST_CASE("Array directory: Vac file fix", "[array-directory][vac-file-fix]") { ArrayDirectory::get_full_vac_uri( "base/", "file://not/related/test.vac") == "base/test.vac"); } + +TEST_CASE( + "Array directory: Commits mode populates raw delete locations", + "[array-directory][commits-mode-del]") { + Config cfg; + auto logger = make_shared(HERE(), "foo"); + ContextResources resources{cfg, logger, 1, 1, ""}; + auto& vfs = resources.vfs(); + + // Setup a test array with a commits directory + URI array_uri(arrays_dir + "/test_array_dir_commits_mode"); + if (vfs.is_dir(array_uri)) { + vfs.remove_dir(array_uri); + } + vfs.create_dir(array_uri); + URI commits_dir = array_uri.join_path("__commits"); + vfs.create_dir(commits_dir); + + // Create a test raw .del file (write version 12 format) + URI del_uri = + commits_dir.join_path("__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del"); + vfs.touch(del_uri); + + // Load ArrayDirectory explicitly in COMMITS mode + ArrayDirectory array_dir( + resources, array_uri, 0, 1, ArrayDirectoryMode::COMMITS); + + // Verify the location map was populated + auto locs = array_dir.delete_and_update_tiles_location(); + + REQUIRE(locs.size() == 1); + CHECK( + locs[0].condition_marker() == + "__commits/" + "__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del"); + CHECK(locs[0].offset() == 0); + + vfs.remove_dir(array_uri); +} + +TEST_CASE( + "Array directory: Vacuum verification skips binary delete payloads", + "[array-directory][vacuum-binary-skip]") { + Config cfg; + auto logger = make_shared(HERE(), "foo"); + ContextResources resources{cfg, logger, 1, 1, ""}; + auto& vfs = resources.vfs(); + + // Setup a test array + URI array_uri(arrays_dir + "/test_array_dir_vacuum_skip"); + if (vfs.is_dir(array_uri)) { + vfs.remove_dir(array_uri); + } + vfs.create_dir(array_uri); + URI commits_dir = array_uri.join_path("__commits"); + vfs.create_dir(commits_dir); + + // Mock physical files so they exist in the directory listing + vfs.touch( + commits_dir.join_path("__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt")); + vfs.touch( + commits_dir.join_path("__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del")); + + // Create a .con file containing a binary payload + URI con_uri = + commits_dir.join_path("__0_1_51cfc7c8956f93998a1ba2a42e7ec0d5_22.con"); + std::string wrt_uri = + "__commits/" + "__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt\n"; + std::string del_uri = + "__commits/" + "__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del\n"; + // 4 bytes of binary data + storage_size_t payload_size = 4; + std::string binary_payload = "BEEF"; + + // Pack the bytes exactly like the consolidator does + std::vector data; + data.insert(data.end(), wrt_uri.begin(), wrt_uri.end()); + data.insert(data.end(), del_uri.begin(), del_uri.end()); + + auto* size_ptr = reinterpret_cast(&payload_size); + data.insert(data.end(), size_ptr, size_ptr + sizeof(storage_size_t)); + data.insert(data.end(), binary_payload.begin(), binary_payload.end()); + + vfs.write(con_uri, data.data(), data.size()); + auto status = vfs.close_file(con_uri); + CHECK(status.ok()); + + // Create a newer, encompassing .con file. + URI con_uri2 = + commits_dir.join_path("__0_2_017ebf26d62191760cc867d23ccd1458_22.con"); + std::vector data2 = data; + std::string wrt_uri2 = + "__commits/" + "__2_2_06e30b6f9ed34137f9ac342cf6211c84_22.wrt\n"; + data2.insert(data2.end(), wrt_uri2.begin(), wrt_uri2.end()); + + vfs.write(con_uri2, data2.data(), data2.size()); + status = vfs.close_file(con_uri2); + CHECK(status.ok()); + + // Load the ArrayDirectory in COMMITS mode + ArrayDirectory array_dir( + resources, array_uri, 0, 2, ArrayDirectoryMode::COMMITS); + + // Verify the .con file passed verification and was marked for vacuuming + auto vacuum_uris = array_dir.consolidated_commits_uris_to_vacuum(); + + REQUIRE(vacuum_uris.size() == 1); + std::cout << vacuum_uris[0].to_string() << std::endl; + std::cout << con_uri.to_string() << std::endl; + CHECK(vacuum_uris[0] == con_uri); + + vfs.remove_dir(array_uri); +} diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index ea98549b071..ca35636fe10 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -255,6 +255,16 @@ void Consolidator::write_consolidated_commits_file( const ArrayDirectory& array_dir, const std::vector& commit_uris, ContextResources& resources) { + // Map relative URIs to their actual physical locations on disk (either raw + // .del files or embedded in .con files) + std::unordered_map< + std::string, + const ArrayDirectory::DeleteAndUpdateTileLocation*> + location_map; + for (const auto& loc : array_dir.delete_and_update_tiles_location()) { + location_map[loc.condition_marker()] = &loc; + } + // Compute the file name. auto name = storage_format::generate_consolidated_fragment_name( commit_uris.front(), commit_uris.back(), write_version); @@ -266,15 +276,36 @@ void Consolidator::write_consolidated_commits_file( std::vector file_sizes(commit_uris.size()); for (uint64_t i = 0; i < commit_uris.size(); i++) { const auto& uri = commit_uris[i]; - total_size += uri.to_string().size() - base_uri_size + 1; - - // If the file is a delete, add the file size to the count and the size of - // the size variable. - if (stdx::string::ends_with( - uri.to_string(), constants::delete_file_suffix)) { - file_sizes[i] = resources.vfs().file_size(uri); - total_size += file_sizes[i]; - total_size += sizeof(storage_size_t); + std::string relative_uri = uri.to_string().substr(base_uri_size); + // +1 for the newline character + total_size += relative_uri.size() + 1; + + // If the file is a delete, find its physical payload size. + if (stdx::string::ends_with(relative_uri, constants::delete_file_suffix)) { + auto it = location_map.find(relative_uri); + if (it == location_map.end()) { + throw ConsolidatorException( + "Delete commit physical location not found in ArrayDirectory " + "ledger."); + } + + const auto* loc = it->second; + + if (loc->offset() == 0) { + // It's a raw .del file still sitting on disk + file_sizes[i] = resources.vfs().file_size(loc->uri()); + } else { + // It's already embedded in an older .con file. + // Read the size bytes stored immediately before the payload offset. + storage_size_t payload_size = 0; + throw_if_not_ok(resources.vfs().read_exactly( + loc->uri(), + loc->offset() - sizeof(storage_size_t), + &payload_size, + sizeof(storage_size_t))); + file_sizes[i] = payload_size; + } + total_size += file_sizes[i] + sizeof(storage_size_t); } } @@ -284,17 +315,22 @@ void Consolidator::write_consolidated_commits_file( for (uint64_t i = 0; i < commit_uris.size(); i++) { // Add the uri. const auto& uri = commit_uris[i]; - std::string relative_uri = uri.to_string().substr(base_uri_size) + "\n"; + std::string relative_uri = uri.to_string().substr(base_uri_size); memcpy(&data[file_index], relative_uri.data(), relative_uri.size()); file_index += relative_uri.size(); - // For deletes, read the delete condition to the output file. - if (stdx::string::ends_with( - uri.to_string(), constants::delete_file_suffix)) { + // Directly append the newline character to the buffer + data[file_index++] = '\n'; + + // For deletes, read the delete condition payload from its mapped physical + // location. + if (stdx::string::ends_with(relative_uri, constants::delete_file_suffix)) { memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t)); file_index += sizeof(storage_size_t); + + const auto* loc = location_map[relative_uri]; throw_if_not_ok(resources.vfs().read_exactly( - uri, 0, &data[file_index], file_sizes[i])); + loc->uri(), loc->offset(), &data[file_index], file_sizes[i])); file_index += file_sizes[i]; } } From 338d064e60422d69b64434adf7017907cafe0435 Mon Sep 17 00:00:00 2001 From: Chris Lutsch <265904602+cl-earthscope@users.noreply.github.com> Date: Thu, 12 Mar 2026 17:19:16 -0600 Subject: [PATCH 2/2] Fix linter --- test/src/unit-capi-consolidation.cc | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/src/unit-capi-consolidation.cc b/test/src/unit-capi-consolidation.cc index 593b3f6f1ee..dccb870d09d 100644 --- a/test/src/unit-capi-consolidation.cc +++ b/test/src/unit-capi-consolidation.cc @@ -7602,9 +7602,8 @@ TEST_CASE_METHOD( const char* msg; tiledb_error_message(err, &msg); CHECK( - std::string( - "FragmentConsolidator: Consolidation read 0 cells, no " - "progress can be made") == msg); + std::string("FragmentConsolidator: Consolidation read 0 cells, no " + "progress can be made") == msg); remove_sparse_string_array(); }