diff --git a/BUILDING_FROM_SOURCE.md b/BUILDING_FROM_SOURCE.md index 97184271677..7c502a5101b 100644 --- a/BUILDING_FROM_SOURCE.md +++ b/BUILDING_FROM_SOURCE.md @@ -130,7 +130,7 @@ To **build** after configuration, run the following: To **install**, run the following: ```bash -> cmake --build . --target install-tiledb --config Release +> cmake --build . --target install --config Release ``` Other helpful build targets are as follows: diff --git a/test/src/unit-capi-consolidation.cc b/test/src/unit-capi-consolidation.cc index 9d7e466a969..dccb870d09d 100644 --- a/test/src/unit-capi-consolidation.cc +++ b/test/src/unit-capi-consolidation.cc @@ -8030,3 +8030,55 @@ TEST_CASE_METHOD( CHECK(rc == TILEDB_OK); } } + +TEST_CASE_METHOD( + ConsolidationFx, + "C API: Test consolidation of already consolidated delete commits", + "[capi][consolidation][commits][deletes]") { + // Create a sparse array (Query Conditions require sparse arrays) + remove_sparse_array(); + create_sparse_array(); + + tiledb_array_t* array; + REQUIRE( + tiledb_array_alloc(ctx_, sparse_array_uri_.c_str(), &array) == TILEDB_OK); + + // Write initial data (Creates a .wrt file) + write_sparse_full(); + + // Submit a Delete Query Condition (Creates a .del file) + REQUIRE(tiledb_array_open(ctx_, array, TILEDB_DELETE) == TILEDB_OK); + tiledb_query_t* query; + REQUIRE(tiledb_query_alloc(ctx_, array, TILEDB_DELETE, &query) == TILEDB_OK); + + tiledb_query_condition_t* qc; + REQUIRE(tiledb_query_condition_alloc(ctx_, &qc) == TILEDB_OK); + int32_t val = 3; + // "a1" is the int32 attribute created by create_sparse_array() + REQUIRE( + tiledb_query_condition_init( + ctx_, qc, "a1", &val, sizeof(int32_t), TILEDB_EQ) == TILEDB_OK); + REQUIRE(tiledb_query_set_condition(ctx_, query, qc) == TILEDB_OK); + REQUIRE(tiledb_query_submit(ctx_, query) == TILEDB_OK); + + REQUIRE(tiledb_array_close(ctx_, array) == TILEDB_OK); + tiledb_query_condition_free(&qc); + tiledb_query_free(&query); + + // First Consolidation: Packs the .wrt and .del into a .con file + consolidate_sparse("commits"); + + // Vacuum: Destroys the physical .del file from disk + vacuum_sparse("commits"); + + // Write more data: Creates a new .wrt file so the consolidator has work to + // do + write_sparse_unordered(); + + // Second Consolidation + consolidate_sparse("commits"); + + // Clean up + tiledb_array_free(&array); + remove_sparse_array(); +} diff --git a/tiledb/sm/array/array_directory.cc b/tiledb/sm/array/array_directory.cc index 8869f0928d2..84ba34380f4 100644 --- a/tiledb/sm/array/array_directory.cc +++ b/tiledb/sm/array/array_directory.cc @@ -725,6 +725,21 @@ ArrayDirectory::load_consolidated_commit_uris( // Load the commit URIs to ignore. This is done in serial for now as it can be // optimized by vacuuming. std::unordered_set ignore_set; + + // Helper lambda to advance the stream past binary delete payloads. + // Assumes the caller has already verified this is a delete commit. + auto skip_delete_payload = [](std::stringstream& stream, + uint64_t* out_payload_offset = nullptr) { + storage_size_t size = 0; + stream.read(reinterpret_cast(&size), sizeof(storage_size_t)); + + if (out_payload_offset) { + *out_payload_offset = static_cast(stream.tellg()); + } + + stream.seekg(size, std::ios_base::cur); + }; + for (auto& uri : commits_dir_uris) { if (uri.to_string().ends_with(constants::ignore_file_suffix)) { uint64_t size = resources_.get().vfs().file_size(uri); @@ -765,11 +780,8 @@ ArrayDirectory::load_consolidated_commit_uris( // If we have a delete, process the condition tile if (condition_marker.ends_with(constants::delete_file_suffix)) { - storage_size_t size = 0; - ss.read( - static_cast(static_cast(&size)), - sizeof(storage_size_t)); - auto pos = ss.tellg(); + uint64_t payload_pos = 0; + skip_delete_payload(ss, &payload_pos); // Get the start and end timestamp for this delete FragmentID fragment_id{URI(condition_marker)}; @@ -779,10 +791,8 @@ ArrayDirectory::load_consolidated_commit_uris( // times if (timestamps_overlap(delete_timestamp_range, false)) { delete_and_update_tiles_location_.emplace_back( - uri, condition_marker, pos); + uri, condition_marker, payload_pos); } - pos += size; - ss.seekg(pos); } } } @@ -809,6 +819,10 @@ ArrayDirectory::load_consolidated_commit_uris( all_in_set = false; break; } + // Skip the binary payload if it's a delete file + if (uri_str.ends_with(constants::delete_file_suffix)) { + skip_delete_payload(ss); + } } if (all_in_set && count == uris_set.size()) { @@ -920,6 +934,13 @@ void ArrayDirectory::load_commits_uris_to_consolidate( uri.to_string().ends_with(constants::delete_file_suffix)) { if (consolidated_uris_set.count(uri) == 0) { commit_uris_to_consolidate_.emplace_back(uri); + if (static_cast(uri).ends_with( + constants::delete_file_suffix)) { + const auto base_uri_size = uri_.to_string().size(); + // 0 offset because it is a raw file + delete_and_update_tiles_location_.emplace_back( + uri, uri.to_string().substr(base_uri_size), 0); + } } } } diff --git a/tiledb/sm/array/test/unit_array_directory.cc b/tiledb/sm/array/test/unit_array_directory.cc index 940ace2fad5..3ef26fcd18c 100644 --- a/tiledb/sm/array/test/unit_array_directory.cc +++ b/tiledb/sm/array/test/unit_array_directory.cc @@ -137,3 +137,119 @@ TEST_CASE("Array directory: Vac file fix", "[array-directory][vac-file-fix]") { ArrayDirectory::get_full_vac_uri( "base/", "file://not/related/test.vac") == "base/test.vac"); } + +TEST_CASE( + "Array directory: Commits mode populates raw delete locations", + "[array-directory][commits-mode-del]") { + Config cfg; + auto logger = make_shared(HERE(), "foo"); + ContextResources resources{cfg, logger, 1, 1, ""}; + auto& vfs = resources.vfs(); + + // Setup a test array with a commits directory + URI array_uri(arrays_dir + "/test_array_dir_commits_mode"); + if (vfs.is_dir(array_uri)) { + vfs.remove_dir(array_uri); + } + vfs.create_dir(array_uri); + URI commits_dir = array_uri.join_path("__commits"); + vfs.create_dir(commits_dir); + + // Create a test raw .del file (write version 12 format) + URI del_uri = + commits_dir.join_path("__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del"); + vfs.touch(del_uri); + + // Load ArrayDirectory explicitly in COMMITS mode + ArrayDirectory array_dir( + resources, array_uri, 0, 1, ArrayDirectoryMode::COMMITS); + + // Verify the location map was populated + auto locs = array_dir.delete_and_update_tiles_location(); + + REQUIRE(locs.size() == 1); + CHECK( + locs[0].condition_marker() == + "__commits/" + "__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del"); + CHECK(locs[0].offset() == 0); + + vfs.remove_dir(array_uri); +} + +TEST_CASE( + "Array directory: Vacuum verification skips binary delete payloads", + "[array-directory][vacuum-binary-skip]") { + Config cfg; + auto logger = make_shared(HERE(), "foo"); + ContextResources resources{cfg, logger, 1, 1, ""}; + auto& vfs = resources.vfs(); + + // Setup a test array + URI array_uri(arrays_dir + "/test_array_dir_vacuum_skip"); + if (vfs.is_dir(array_uri)) { + vfs.remove_dir(array_uri); + } + vfs.create_dir(array_uri); + URI commits_dir = array_uri.join_path("__commits"); + vfs.create_dir(commits_dir); + + // Mock physical files so they exist in the directory listing + vfs.touch( + commits_dir.join_path("__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt")); + vfs.touch( + commits_dir.join_path("__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del")); + + // Create a .con file containing a binary payload + URI con_uri = + commits_dir.join_path("__0_1_51cfc7c8956f93998a1ba2a42e7ec0d5_22.con"); + std::string wrt_uri = + "__commits/" + "__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt\n"; + std::string del_uri = + "__commits/" + "__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del\n"; + // 4 bytes of binary data + storage_size_t payload_size = 4; + std::string binary_payload = "BEEF"; + + // Pack the bytes exactly like the consolidator does + std::vector data; + data.insert(data.end(), wrt_uri.begin(), wrt_uri.end()); + data.insert(data.end(), del_uri.begin(), del_uri.end()); + + auto* size_ptr = reinterpret_cast(&payload_size); + data.insert(data.end(), size_ptr, size_ptr + sizeof(storage_size_t)); + data.insert(data.end(), binary_payload.begin(), binary_payload.end()); + + vfs.write(con_uri, data.data(), data.size()); + auto status = vfs.close_file(con_uri); + CHECK(status.ok()); + + // Create a newer, encompassing .con file. + URI con_uri2 = + commits_dir.join_path("__0_2_017ebf26d62191760cc867d23ccd1458_22.con"); + std::vector data2 = data; + std::string wrt_uri2 = + "__commits/" + "__2_2_06e30b6f9ed34137f9ac342cf6211c84_22.wrt\n"; + data2.insert(data2.end(), wrt_uri2.begin(), wrt_uri2.end()); + + vfs.write(con_uri2, data2.data(), data2.size()); + status = vfs.close_file(con_uri2); + CHECK(status.ok()); + + // Load the ArrayDirectory in COMMITS mode + ArrayDirectory array_dir( + resources, array_uri, 0, 2, ArrayDirectoryMode::COMMITS); + + // Verify the .con file passed verification and was marked for vacuuming + auto vacuum_uris = array_dir.consolidated_commits_uris_to_vacuum(); + + REQUIRE(vacuum_uris.size() == 1); + std::cout << vacuum_uris[0].to_string() << std::endl; + std::cout << con_uri.to_string() << std::endl; + CHECK(vacuum_uris[0] == con_uri); + + vfs.remove_dir(array_uri); +} diff --git a/tiledb/sm/consolidator/consolidator.cc b/tiledb/sm/consolidator/consolidator.cc index fcef32b4cab..22baf75ae28 100644 --- a/tiledb/sm/consolidator/consolidator.cc +++ b/tiledb/sm/consolidator/consolidator.cc @@ -255,6 +255,16 @@ void Consolidator::write_consolidated_commits_file( const ArrayDirectory& array_dir, const std::vector& commit_uris, ContextResources& resources) { + // Map relative URIs to their actual physical locations on disk (either raw + // .del files or embedded in .con files) + std::unordered_map< + std::string, + const ArrayDirectory::DeleteAndUpdateTileLocation*> + location_map; + for (const auto& loc : array_dir.delete_and_update_tiles_location()) { + location_map[loc.condition_marker()] = &loc; + } + // Compute the file name. auto name = storage_format::generate_consolidated_fragment_name( commit_uris.front(), commit_uris.back(), write_version); @@ -266,14 +276,36 @@ void Consolidator::write_consolidated_commits_file( std::vector file_sizes(commit_uris.size()); for (uint64_t i = 0; i < commit_uris.size(); i++) { const auto& uri = commit_uris[i]; - total_size += uri.to_string().size() - base_uri_size + 1; - - // If the file is a delete, add the file size to the count and the size of - // the size variable. - if (uri.to_string().ends_with(constants::delete_file_suffix)) { - file_sizes[i] = resources.vfs().file_size(uri); - total_size += file_sizes[i]; - total_size += sizeof(storage_size_t); + std::string relative_uri = uri.to_string().substr(base_uri_size); + // +1 for the newline character + total_size += relative_uri.size() + 1; + + // If the file is a delete, find its physical payload size. + if (relative_uri.ends_with(constants::delete_file_suffix)) { + auto it = location_map.find(relative_uri); + if (it == location_map.end()) { + throw ConsolidatorException( + "Delete commit physical location not found in ArrayDirectory " + "ledger."); + } + + const auto* loc = it->second; + + if (loc->offset() == 0) { + // It's a raw .del file still sitting on disk + file_sizes[i] = resources.vfs().file_size(loc->uri()); + } else { + // It's already embedded in an older .con file. + // Read the size bytes stored immediately before the payload offset. + storage_size_t payload_size = 0; + throw_if_not_ok(resources.vfs().read_exactly( + loc->uri(), + loc->offset() - sizeof(storage_size_t), + &payload_size, + sizeof(storage_size_t))); + file_sizes[i] = payload_size; + } + total_size += file_sizes[i] + sizeof(storage_size_t); } } @@ -283,16 +315,22 @@ void Consolidator::write_consolidated_commits_file( for (uint64_t i = 0; i < commit_uris.size(); i++) { // Add the uri. const auto& uri = commit_uris[i]; - std::string relative_uri = uri.to_string().substr(base_uri_size) + "\n"; + std::string relative_uri = uri.to_string().substr(base_uri_size); memcpy(&data[file_index], relative_uri.data(), relative_uri.size()); file_index += relative_uri.size(); - // For deletes, read the delete condition to the output file. - if (uri.to_string().ends_with(constants::delete_file_suffix)) { + // Directly append the newline character to the buffer + data[file_index++] = '\n'; + + // For deletes, read the delete condition payload from its mapped physical + // location. + if (relative_uri.ends_with(constants::delete_file_suffix)) { memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t)); file_index += sizeof(storage_size_t); + + const auto* loc = location_map[relative_uri]; throw_if_not_ok(resources.vfs().read_exactly( - uri, 0, &data[file_index], file_sizes[i])); + loc->uri(), loc->offset(), &data[file_index], file_sizes[i])); file_index += file_sizes[i]; } }