Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion BUILDING_FROM_SOURCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ To **build** after configuration, run the following:
To **install**, run the following:

```bash
> cmake --build . --target install-tiledb --config Release
> cmake --build . --target install --config Release
```

Other helpful build targets are as follows:
Expand Down
52 changes: 52 additions & 0 deletions test/src/unit-capi-consolidation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8030,3 +8030,55 @@ TEST_CASE_METHOD(
CHECK(rc == TILEDB_OK);
}
}

TEST_CASE_METHOD(
ConsolidationFx,
"C API: Test consolidation of already consolidated delete commits",
"[capi][consolidation][commits][deletes]") {
// Create a sparse array (Query Conditions require sparse arrays)
remove_sparse_array();
create_sparse_array();

tiledb_array_t* array;
REQUIRE(
tiledb_array_alloc(ctx_, sparse_array_uri_.c_str(), &array) == TILEDB_OK);

// Write initial data (Creates a .wrt file)
write_sparse_full();

// Submit a Delete Query Condition (Creates a .del file)
REQUIRE(tiledb_array_open(ctx_, array, TILEDB_DELETE) == TILEDB_OK);
tiledb_query_t* query;
REQUIRE(tiledb_query_alloc(ctx_, array, TILEDB_DELETE, &query) == TILEDB_OK);

tiledb_query_condition_t* qc;
REQUIRE(tiledb_query_condition_alloc(ctx_, &qc) == TILEDB_OK);
int32_t val = 3;
// "a1" is the int32 attribute created by create_sparse_array()
REQUIRE(
tiledb_query_condition_init(
ctx_, qc, "a1", &val, sizeof(int32_t), TILEDB_EQ) == TILEDB_OK);
REQUIRE(tiledb_query_set_condition(ctx_, query, qc) == TILEDB_OK);
REQUIRE(tiledb_query_submit(ctx_, query) == TILEDB_OK);

REQUIRE(tiledb_array_close(ctx_, array) == TILEDB_OK);
tiledb_query_condition_free(&qc);
tiledb_query_free(&query);

// First Consolidation: Packs the .wrt and .del into a .con file
consolidate_sparse("commits");

// Vacuum: Destroys the physical .del file from disk
vacuum_sparse("commits");

// Write more data: Creates a new .wrt file so the consolidator has work to
// do
write_sparse_unordered();

// Second Consolidation
consolidate_sparse("commits");

// Clean up
tiledb_array_free(&array);
remove_sparse_array();
}
37 changes: 29 additions & 8 deletions tiledb/sm/array/array_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,21 @@ ArrayDirectory::load_consolidated_commit_uris(
// Load the commit URIs to ignore. This is done in serial for now as it can be
// optimized by vacuuming.
std::unordered_set<std::string> ignore_set;

// Helper lambda to advance the stream past binary delete payloads.
// Assumes the caller has already verified this is a delete commit.
auto skip_delete_payload = [](std::stringstream& stream,
uint64_t* out_payload_offset = nullptr) {
storage_size_t size = 0;
stream.read(reinterpret_cast<char*>(&size), sizeof(storage_size_t));

if (out_payload_offset) {
*out_payload_offset = static_cast<uint64_t>(stream.tellg());
}

stream.seekg(size, std::ios_base::cur);
};

for (auto& uri : commits_dir_uris) {
if (uri.to_string().ends_with(constants::ignore_file_suffix)) {
uint64_t size = resources_.get().vfs().file_size(uri);
Expand Down Expand Up @@ -765,11 +780,8 @@ ArrayDirectory::load_consolidated_commit_uris(

// If we have a delete, process the condition tile
if (condition_marker.ends_with(constants::delete_file_suffix)) {
storage_size_t size = 0;
ss.read(
static_cast<char*>(static_cast<void*>(&size)),
sizeof(storage_size_t));
auto pos = ss.tellg();
uint64_t payload_pos = 0;
skip_delete_payload(ss, &payload_pos);

// Get the start and end timestamp for this delete
FragmentID fragment_id{URI(condition_marker)};
Expand All @@ -779,10 +791,8 @@ ArrayDirectory::load_consolidated_commit_uris(
// times
if (timestamps_overlap(delete_timestamp_range, false)) {
delete_and_update_tiles_location_.emplace_back(
uri, condition_marker, pos);
uri, condition_marker, payload_pos);
}
pos += size;
ss.seekg(pos);
}
}
}
Expand All @@ -809,6 +819,10 @@ ArrayDirectory::load_consolidated_commit_uris(
all_in_set = false;
break;
}
// Skip the binary payload if it's a delete file
if (uri_str.ends_with(constants::delete_file_suffix)) {
skip_delete_payload(ss);
}
}

if (all_in_set && count == uris_set.size()) {
Expand Down Expand Up @@ -920,6 +934,13 @@ void ArrayDirectory::load_commits_uris_to_consolidate(
uri.to_string().ends_with(constants::delete_file_suffix)) {
if (consolidated_uris_set.count(uri) == 0) {
commit_uris_to_consolidate_.emplace_back(uri);
if (static_cast<std::string_view>(uri).ends_with(
constants::delete_file_suffix)) {
const auto base_uri_size = uri_.to_string().size();
// 0 offset because it is a raw file
delete_and_update_tiles_location_.emplace_back(
uri, uri.to_string().substr(base_uri_size), 0);
}
}
}
}
Expand Down
116 changes: 116 additions & 0 deletions tiledb/sm/array/test/unit_array_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,119 @@ TEST_CASE("Array directory: Vac file fix", "[array-directory][vac-file-fix]") {
ArrayDirectory::get_full_vac_uri(
"base/", "file://not/related/test.vac") == "base/test.vac");
}

TEST_CASE(
"Array directory: Commits mode populates raw delete locations",
"[array-directory][commits-mode-del]") {
Config cfg;
auto logger = make_shared<Logger>(HERE(), "foo");
ContextResources resources{cfg, logger, 1, 1, ""};
auto& vfs = resources.vfs();

// Setup a test array with a commits directory
URI array_uri(arrays_dir + "/test_array_dir_commits_mode");
if (vfs.is_dir(array_uri)) {
vfs.remove_dir(array_uri);
}
vfs.create_dir(array_uri);
URI commits_dir = array_uri.join_path("__commits");
vfs.create_dir(commits_dir);

// Create a test raw .del file (write version 12 format)
URI del_uri =
commits_dir.join_path("__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del");
vfs.touch(del_uri);

// Load ArrayDirectory explicitly in COMMITS mode
ArrayDirectory array_dir(
resources, array_uri, 0, 1, ArrayDirectoryMode::COMMITS);

// Verify the location map was populated
auto locs = array_dir.delete_and_update_tiles_location();

REQUIRE(locs.size() == 1);
CHECK(
locs[0].condition_marker() ==
"__commits/"
"__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del");
CHECK(locs[0].offset() == 0);

vfs.remove_dir(array_uri);
}

TEST_CASE(
"Array directory: Vacuum verification skips binary delete payloads",
"[array-directory][vacuum-binary-skip]") {
Config cfg;
auto logger = make_shared<Logger>(HERE(), "foo");
ContextResources resources{cfg, logger, 1, 1, ""};
auto& vfs = resources.vfs();

// Setup a test array
URI array_uri(arrays_dir + "/test_array_dir_vacuum_skip");
if (vfs.is_dir(array_uri)) {
vfs.remove_dir(array_uri);
}
vfs.create_dir(array_uri);
URI commits_dir = array_uri.join_path("__commits");
vfs.create_dir(commits_dir);

// Mock physical files so they exist in the directory listing
vfs.touch(
commits_dir.join_path("__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt"));
vfs.touch(
commits_dir.join_path("__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del"));

// Create a .con file containing a binary payload
URI con_uri =
commits_dir.join_path("__0_1_51cfc7c8956f93998a1ba2a42e7ec0d5_22.con");
std::string wrt_uri =
"__commits/"
"__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt\n";
std::string del_uri =
"__commits/"
"__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del\n";
// 4 bytes of binary data
storage_size_t payload_size = 4;
std::string binary_payload = "BEEF";

// Pack the bytes exactly like the consolidator does
std::vector<uint8_t> data;
data.insert(data.end(), wrt_uri.begin(), wrt_uri.end());
data.insert(data.end(), del_uri.begin(), del_uri.end());

auto* size_ptr = reinterpret_cast<uint8_t*>(&payload_size);
data.insert(data.end(), size_ptr, size_ptr + sizeof(storage_size_t));
data.insert(data.end(), binary_payload.begin(), binary_payload.end());

vfs.write(con_uri, data.data(), data.size());
auto status = vfs.close_file(con_uri);
CHECK(status.ok());

// Create a newer, encompassing .con file.
URI con_uri2 =
commits_dir.join_path("__0_2_017ebf26d62191760cc867d23ccd1458_22.con");
std::vector<uint8_t> data2 = data;
std::string wrt_uri2 =
"__commits/"
"__2_2_06e30b6f9ed34137f9ac342cf6211c84_22.wrt\n";
data2.insert(data2.end(), wrt_uri2.begin(), wrt_uri2.end());

vfs.write(con_uri2, data2.data(), data2.size());
status = vfs.close_file(con_uri2);
CHECK(status.ok());

// Load the ArrayDirectory in COMMITS mode
ArrayDirectory array_dir(
resources, array_uri, 0, 2, ArrayDirectoryMode::COMMITS);

// Verify the .con file passed verification and was marked for vacuuming
auto vacuum_uris = array_dir.consolidated_commits_uris_to_vacuum();

REQUIRE(vacuum_uris.size() == 1);
std::cout << vacuum_uris[0].to_string() << std::endl;
std::cout << con_uri.to_string() << std::endl;
CHECK(vacuum_uris[0] == con_uri);

vfs.remove_dir(array_uri);
}
62 changes: 50 additions & 12 deletions tiledb/sm/consolidator/consolidator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ void Consolidator::write_consolidated_commits_file(
const ArrayDirectory& array_dir,
const std::vector<URI>& commit_uris,
ContextResources& resources) {
// Map relative URIs to their actual physical locations on disk (either raw
// .del files or embedded in .con files)
std::unordered_map<
std::string,
const ArrayDirectory::DeleteAndUpdateTileLocation*>
location_map;
for (const auto& loc : array_dir.delete_and_update_tiles_location()) {
location_map[loc.condition_marker()] = &loc;
}

// Compute the file name.
auto name = storage_format::generate_consolidated_fragment_name(
commit_uris.front(), commit_uris.back(), write_version);
Expand All @@ -266,14 +276,36 @@ void Consolidator::write_consolidated_commits_file(
std::vector<storage_size_t> file_sizes(commit_uris.size());
for (uint64_t i = 0; i < commit_uris.size(); i++) {
const auto& uri = commit_uris[i];
total_size += uri.to_string().size() - base_uri_size + 1;

// If the file is a delete, add the file size to the count and the size of
// the size variable.
if (uri.to_string().ends_with(constants::delete_file_suffix)) {
file_sizes[i] = resources.vfs().file_size(uri);
total_size += file_sizes[i];
total_size += sizeof(storage_size_t);
std::string relative_uri = uri.to_string().substr(base_uri_size);
// +1 for the newline character
total_size += relative_uri.size() + 1;

// If the file is a delete, find its physical payload size.
if (relative_uri.ends_with(constants::delete_file_suffix)) {
auto it = location_map.find(relative_uri);
if (it == location_map.end()) {
throw ConsolidatorException(
"Delete commit physical location not found in ArrayDirectory "
"ledger.");
}

const auto* loc = it->second;

if (loc->offset() == 0) {
// It's a raw .del file still sitting on disk
file_sizes[i] = resources.vfs().file_size(loc->uri());
} else {
// It's already embedded in an older .con file.
// Read the size bytes stored immediately before the payload offset.
storage_size_t payload_size = 0;
throw_if_not_ok(resources.vfs().read_exactly(
loc->uri(),
loc->offset() - sizeof(storage_size_t),
&payload_size,
sizeof(storage_size_t)));
file_sizes[i] = payload_size;
}
total_size += file_sizes[i] + sizeof(storage_size_t);
}
}

Expand All @@ -283,16 +315,22 @@ void Consolidator::write_consolidated_commits_file(
for (uint64_t i = 0; i < commit_uris.size(); i++) {
// Add the uri.
const auto& uri = commit_uris[i];
std::string relative_uri = uri.to_string().substr(base_uri_size) + "\n";
std::string relative_uri = uri.to_string().substr(base_uri_size);
memcpy(&data[file_index], relative_uri.data(), relative_uri.size());
file_index += relative_uri.size();

// For deletes, read the delete condition to the output file.
if (uri.to_string().ends_with(constants::delete_file_suffix)) {
// Directly append the newline character to the buffer
data[file_index++] = '\n';

// For deletes, read the delete condition payload from its mapped physical
// location.
if (relative_uri.ends_with(constants::delete_file_suffix)) {
memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t));
file_index += sizeof(storage_size_t);

const auto* loc = location_map[relative_uri];
throw_if_not_ok(resources.vfs().read_exactly(
uri, 0, &data[file_index], file_sizes[i]));
loc->uri(), loc->offset(), &data[file_index], file_sizes[i]));
file_index += file_sizes[i];
}
}
Expand Down