Skip to content

Commit abe9740

Browse files
committed
Fix consolidator crash on vacuumed delete commits.
The previous implementation of the commits consolidator assumed that `.del` files always exist on disk as standalone files. If a `.del` file was consolidated and subsequently vacuumed, the consolidator threw a fatal non-retrievable error when trying to read its file size and payload during subsequent consolidation runs. Additionally, the `ArrayDirectory` string-based verification for vacuuming superseded `.con` files failed when it encountered embedded binary `.del` payloads, preventing those older `.con` files from ever being vacuumed. This PR resolves these issues by making the engine fully aware of physical payload locations: * `ArrayDirectory` now tracks the exact physical URI and byte offset for `.del` payloads (whether raw on disk or embedded in older `.con` files) inside `delete_and_update_tiles_location_`. * Added a `skip_delete_payload` helper in `ArrayDirectory` to properly advance the stream past binary data during `.con` file string verification. * `Consolidator::write_consolidated_commits_file` now uses the `location_map` to extract `.del` payloads from their actual physical locations instead of only querying the VFS using the logical URI. * Fixes the engine crash when consolidating an array with previously vacuumed delete commits. * Allows superseded `.con` files containing delete payloads to be successfully verified and vacuumed. The most significant changes are in `tiledb/sm/array/array_directory.cc` (parsing and offset tracking) and `tiledb/sm/consolidator/consolidator.cc` (location-aware reading). Explicitly added a check to ensure `.wrt` files are not added to the payload location map, as they are zero-byte markers. Added two unit tests (`[array-directory][commits-mode-del]` and `[array-directory][vacuum-binary-skip]`) to verify payload offset mapping and `.con` string verification. Added a C API integration test (`[capi][consolidation][commits][deletes]`) that executes the exact Consolidate -> Vacuum -> Consolidate sequence to guarantee the engine no longer crashes on missing `.del` files. --- TYPE: BUG DESC: Fix consolidator crash when processing vacuumed delete commits and allow vacuuming of .con files containing delete payloads.
1 parent 7c7be31 commit abe9740

5 files changed

Lines changed: 251 additions & 25 deletions

File tree

BUILDING_FROM_SOURCE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ To **build** after configuration, run the following:
130130
To **install**, run the following:
131131

132132
```bash
133-
> cmake --build . --target install-tiledb --config Release
133+
> cmake --build . --target install --config Release
134134
```
135135

136136
Other helpful build targets are as follows:

test/src/unit-capi-consolidation.cc

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7604,8 +7604,9 @@ TEST_CASE_METHOD(
76047604
const char* msg;
76057605
tiledb_error_message(err, &msg);
76067606
CHECK(
7607-
std::string("FragmentConsolidator: Consolidation read 0 cells, no "
7608-
"progress can be made") == msg);
7607+
std::string(
7608+
"FragmentConsolidator: Consolidation read 0 cells, no "
7609+
"progress can be made") == msg);
76097610

76107611
remove_sparse_string_array();
76117612
}
@@ -8032,3 +8033,55 @@ TEST_CASE_METHOD(
80328033
CHECK(rc == TILEDB_OK);
80338034
}
80348035
}
8036+
8037+
TEST_CASE_METHOD(
8038+
ConsolidationFx,
8039+
"C API: Test consolidation of already consolidated delete commits",
8040+
"[capi][consolidation][commits][deletes]") {
8041+
// Create a sparse array (Query Conditions require sparse arrays)
8042+
remove_sparse_array();
8043+
create_sparse_array();
8044+
8045+
tiledb_array_t* array;
8046+
REQUIRE(
8047+
tiledb_array_alloc(ctx_, sparse_array_uri_.c_str(), &array) == TILEDB_OK);
8048+
8049+
// Write initial data (Creates a .wrt file)
8050+
write_sparse_full();
8051+
8052+
// Submit a Delete Query Condition (Creates a .del file)
8053+
REQUIRE(tiledb_array_open(ctx_, array, TILEDB_DELETE) == TILEDB_OK);
8054+
tiledb_query_t* query;
8055+
REQUIRE(tiledb_query_alloc(ctx_, array, TILEDB_DELETE, &query) == TILEDB_OK);
8056+
8057+
tiledb_query_condition_t* qc;
8058+
REQUIRE(tiledb_query_condition_alloc(ctx_, &qc) == TILEDB_OK);
8059+
int32_t val = 3;
8060+
// "a1" is the int32 attribute created by create_sparse_array()
8061+
REQUIRE(
8062+
tiledb_query_condition_init(
8063+
ctx_, qc, "a1", &val, sizeof(int32_t), TILEDB_EQ) == TILEDB_OK);
8064+
REQUIRE(tiledb_query_set_condition(ctx_, query, qc) == TILEDB_OK);
8065+
REQUIRE(tiledb_query_submit(ctx_, query) == TILEDB_OK);
8066+
8067+
REQUIRE(tiledb_array_close(ctx_, array) == TILEDB_OK);
8068+
tiledb_query_condition_free(&qc);
8069+
tiledb_query_free(&query);
8070+
8071+
// First Consolidation: Packs the .wrt and .del into a .con file
8072+
consolidate_sparse("commits");
8073+
8074+
// Vacuum: Destroys the physical .del file from disk
8075+
vacuum_sparse("commits");
8076+
8077+
// Write more data: Creates a new .wrt file so the consolidator has work to
8078+
// do
8079+
write_sparse_unordered();
8080+
8081+
// Second Consolidation
8082+
consolidate_sparse("commits");
8083+
8084+
// Clean up
8085+
tiledb_array_free(&array);
8086+
remove_sparse_array();
8087+
}

tiledb/sm/array/array_directory.cc

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,21 @@ ArrayDirectory::load_consolidated_commit_uris(
721721
// Load the commit URIs to ignore. This is done in serial for now as it can be
722722
// optimized by vacuuming.
723723
std::unordered_set<std::string> ignore_set;
724+
725+
// Helper lambda to advance the stream past binary delete payloads.
726+
// Assumes the caller has already verified this is a delete commit.
727+
auto skip_delete_payload = [](std::stringstream& stream,
728+
uint64_t* out_payload_offset = nullptr) {
729+
storage_size_t size = 0;
730+
stream.read(reinterpret_cast<char*>(&size), sizeof(storage_size_t));
731+
732+
if (out_payload_offset) {
733+
*out_payload_offset = static_cast<uint64_t>(stream.tellg());
734+
}
735+
736+
stream.seekg(size, std::ios_base::cur);
737+
};
738+
724739
for (auto& uri : commits_dir_uris) {
725740
if (stdx::string::ends_with(
726741
uri.to_string(), constants::ignore_file_suffix)) {
@@ -764,11 +779,8 @@ ArrayDirectory::load_consolidated_commit_uris(
764779
// If we have a delete, process the condition tile
765780
if (stdx::string::ends_with(
766781
condition_marker, constants::delete_file_suffix)) {
767-
storage_size_t size = 0;
768-
ss.read(
769-
static_cast<char*>(static_cast<void*>(&size)),
770-
sizeof(storage_size_t));
771-
auto pos = ss.tellg();
782+
uint64_t payload_pos = 0;
783+
skip_delete_payload(ss, &payload_pos);
772784

773785
// Get the start and end timestamp for this delete
774786
FragmentID fragment_id{URI(condition_marker)};
@@ -778,10 +790,8 @@ ArrayDirectory::load_consolidated_commit_uris(
778790
// times
779791
if (timestamps_overlap(delete_timestamp_range, false)) {
780792
delete_and_update_tiles_location_.emplace_back(
781-
uri, condition_marker, pos);
793+
uri, condition_marker, payload_pos);
782794
}
783-
pos += size;
784-
ss.seekg(pos);
785795
}
786796
}
787797
}
@@ -808,6 +818,10 @@ ArrayDirectory::load_consolidated_commit_uris(
808818
all_in_set = false;
809819
break;
810820
}
821+
// Skip the binary payload if it's a delete file
822+
if (stdx::string::ends_with(uri_str, constants::delete_file_suffix)) {
823+
skip_delete_payload(ss);
824+
}
811825
}
812826

813827
if (all_in_set && count == uris_set.size()) {
@@ -922,6 +936,13 @@ void ArrayDirectory::load_commits_uris_to_consolidate(
922936
uri.to_string(), constants::delete_file_suffix)) {
923937
if (consolidated_uris_set.count(uri) == 0) {
924938
commit_uris_to_consolidate_.emplace_back(uri);
939+
if (stdx::string::ends_with(
940+
uri.to_string(), constants::delete_file_suffix)) {
941+
const auto base_uri_size = uri_.to_string().size();
942+
// 0 offset because it is a raw file
943+
delete_and_update_tiles_location_.emplace_back(
944+
uri, uri.to_string().substr(base_uri_size), 0);
945+
}
925946
}
926947
}
927948
}

tiledb/sm/array/test/unit_array_directory.cc

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,119 @@ TEST_CASE("Array directory: Vac file fix", "[array-directory][vac-file-fix]") {
137137
ArrayDirectory::get_full_vac_uri(
138138
"base/", "file://not/related/test.vac") == "base/test.vac");
139139
}
140+
141+
TEST_CASE(
142+
"Array directory: Commits mode populates raw delete locations",
143+
"[array-directory][commits-mode-del]") {
144+
Config cfg;
145+
auto logger = make_shared<Logger>(HERE(), "foo");
146+
ContextResources resources{cfg, logger, 1, 1, ""};
147+
auto& vfs = resources.vfs();
148+
149+
// Setup a test array with a commits directory
150+
URI array_uri(arrays_dir + "/test_array_dir_commits_mode");
151+
if (vfs.is_dir(array_uri)) {
152+
vfs.remove_dir(array_uri);
153+
}
154+
vfs.create_dir(array_uri);
155+
URI commits_dir = array_uri.join_path("__commits");
156+
vfs.create_dir(commits_dir);
157+
158+
// Create a test raw .del file (write version 12 format)
159+
URI del_uri =
160+
commits_dir.join_path("__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del");
161+
vfs.touch(del_uri);
162+
163+
// Load ArrayDirectory explicitly in COMMITS mode
164+
ArrayDirectory array_dir(
165+
resources, array_uri, 0, 1, ArrayDirectoryMode::COMMITS);
166+
167+
// Verify the location map was populated
168+
auto locs = array_dir.delete_and_update_tiles_location();
169+
170+
REQUIRE(locs.size() == 1);
171+
CHECK(
172+
locs[0].condition_marker() ==
173+
"__commits/"
174+
"__0_0_6e1138cbb833e380c227d402c1d2fc38_22.del");
175+
CHECK(locs[0].offset() == 0);
176+
177+
vfs.remove_dir(array_uri);
178+
}
179+
180+
TEST_CASE(
181+
"Array directory: Vacuum verification skips binary delete payloads",
182+
"[array-directory][vacuum-binary-skip]") {
183+
Config cfg;
184+
auto logger = make_shared<Logger>(HERE(), "foo");
185+
ContextResources resources{cfg, logger, 1, 1, ""};
186+
auto& vfs = resources.vfs();
187+
188+
// Setup a test array
189+
URI array_uri(arrays_dir + "/test_array_dir_vacuum_skip");
190+
if (vfs.is_dir(array_uri)) {
191+
vfs.remove_dir(array_uri);
192+
}
193+
vfs.create_dir(array_uri);
194+
URI commits_dir = array_uri.join_path("__commits");
195+
vfs.create_dir(commits_dir);
196+
197+
// Mock physical files so they exist in the directory listing
198+
vfs.touch(
199+
commits_dir.join_path("__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt"));
200+
vfs.touch(
201+
commits_dir.join_path("__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del"));
202+
203+
// Create a .con file containing a binary payload
204+
URI con_uri =
205+
commits_dir.join_path("__0_1_51cfc7c8956f93998a1ba2a42e7ec0d5_22.con");
206+
std::string wrt_uri =
207+
"__commits/"
208+
"__0_0_5c336fca5fe8057791d69cad6c15e981_22.wrt\n";
209+
std::string del_uri =
210+
"__commits/"
211+
"__1_1_6e1138cbb833e380c227d402c1d2fc38_22.del\n";
212+
// 4 bytes of binary data
213+
storage_size_t payload_size = 4;
214+
std::string binary_payload = "BEEF";
215+
216+
// Pack the bytes exactly like the consolidator does
217+
std::vector<uint8_t> data;
218+
data.insert(data.end(), wrt_uri.begin(), wrt_uri.end());
219+
data.insert(data.end(), del_uri.begin(), del_uri.end());
220+
221+
auto* size_ptr = reinterpret_cast<uint8_t*>(&payload_size);
222+
data.insert(data.end(), size_ptr, size_ptr + sizeof(storage_size_t));
223+
data.insert(data.end(), binary_payload.begin(), binary_payload.end());
224+
225+
vfs.write(con_uri, data.data(), data.size());
226+
auto status = vfs.close_file(con_uri);
227+
CHECK(status.ok());
228+
229+
// Create a newer, encompassing .con file.
230+
URI con_uri2 =
231+
commits_dir.join_path("__0_2_017ebf26d62191760cc867d23ccd1458_22.con");
232+
std::vector<uint8_t> data2 = data;
233+
std::string wrt_uri2 =
234+
"__commits/"
235+
"__2_2_06e30b6f9ed34137f9ac342cf6211c84_22.wrt\n";
236+
data2.insert(data2.end(), wrt_uri2.begin(), wrt_uri2.end());
237+
238+
vfs.write(con_uri2, data2.data(), data2.size());
239+
status = vfs.close_file(con_uri2);
240+
CHECK(status.ok());
241+
242+
// Load the ArrayDirectory in COMMITS mode
243+
ArrayDirectory array_dir(
244+
resources, array_uri, 0, 2, ArrayDirectoryMode::COMMITS);
245+
246+
// Verify the .con file passed verification and was marked for vacuuming
247+
auto vacuum_uris = array_dir.consolidated_commits_uris_to_vacuum();
248+
249+
REQUIRE(vacuum_uris.size() == 1);
250+
std::cout << vacuum_uris[0].to_string() << std::endl;
251+
std::cout << con_uri.to_string() << std::endl;
252+
CHECK(vacuum_uris[0] == con_uri);
253+
254+
vfs.remove_dir(array_uri);
255+
}

tiledb/sm/consolidator/consolidator.cc

Lines changed: 50 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,16 @@ void Consolidator::write_consolidated_commits_file(
255255
const ArrayDirectory& array_dir,
256256
const std::vector<URI>& commit_uris,
257257
ContextResources& resources) {
258+
// Map relative URIs to their actual physical locations on disk (either raw
259+
// .del files or embedded in .con files)
260+
std::unordered_map<
261+
std::string,
262+
const ArrayDirectory::DeleteAndUpdateTileLocation*>
263+
location_map;
264+
for (const auto& loc : array_dir.delete_and_update_tiles_location()) {
265+
location_map[loc.condition_marker()] = &loc;
266+
}
267+
258268
// Compute the file name.
259269
auto name = storage_format::generate_consolidated_fragment_name(
260270
commit_uris.front(), commit_uris.back(), write_version);
@@ -266,15 +276,36 @@ void Consolidator::write_consolidated_commits_file(
266276
std::vector<storage_size_t> file_sizes(commit_uris.size());
267277
for (uint64_t i = 0; i < commit_uris.size(); i++) {
268278
const auto& uri = commit_uris[i];
269-
total_size += uri.to_string().size() - base_uri_size + 1;
270-
271-
// If the file is a delete, add the file size to the count and the size of
272-
// the size variable.
273-
if (stdx::string::ends_with(
274-
uri.to_string(), constants::delete_file_suffix)) {
275-
file_sizes[i] = resources.vfs().file_size(uri);
276-
total_size += file_sizes[i];
277-
total_size += sizeof(storage_size_t);
279+
std::string relative_uri = uri.to_string().substr(base_uri_size);
280+
// +1 for the newline character
281+
total_size += relative_uri.size() + 1;
282+
283+
// If the file is a delete, find its physical payload size.
284+
if (stdx::string::ends_with(relative_uri, constants::delete_file_suffix)) {
285+
auto it = location_map.find(relative_uri);
286+
if (it == location_map.end()) {
287+
throw ConsolidatorException(
288+
"Delete commit physical location not found in ArrayDirectory "
289+
"ledger.");
290+
}
291+
292+
const auto* loc = it->second;
293+
294+
if (loc->offset() == 0) {
295+
// It's a raw .del file still sitting on disk
296+
file_sizes[i] = resources.vfs().file_size(loc->uri());
297+
} else {
298+
// It's already embedded in an older .con file.
299+
// Read the size bytes stored immediately before the payload offset.
300+
storage_size_t payload_size = 0;
301+
throw_if_not_ok(resources.vfs().read_exactly(
302+
loc->uri(),
303+
loc->offset() - sizeof(storage_size_t),
304+
&payload_size,
305+
sizeof(storage_size_t)));
306+
file_sizes[i] = payload_size;
307+
}
308+
total_size += file_sizes[i] + sizeof(storage_size_t);
278309
}
279310
}
280311

@@ -284,17 +315,22 @@ void Consolidator::write_consolidated_commits_file(
284315
for (uint64_t i = 0; i < commit_uris.size(); i++) {
285316
// Add the uri.
286317
const auto& uri = commit_uris[i];
287-
std::string relative_uri = uri.to_string().substr(base_uri_size) + "\n";
318+
std::string relative_uri = uri.to_string().substr(base_uri_size);
288319
memcpy(&data[file_index], relative_uri.data(), relative_uri.size());
289320
file_index += relative_uri.size();
290321

291-
// For deletes, read the delete condition to the output file.
292-
if (stdx::string::ends_with(
293-
uri.to_string(), constants::delete_file_suffix)) {
322+
// Directly append the newline character to the buffer
323+
data[file_index++] = '\n';
324+
325+
// For deletes, read the delete condition payload from its mapped physical
326+
// location.
327+
if (stdx::string::ends_with(relative_uri, constants::delete_file_suffix)) {
294328
memcpy(&data[file_index], &file_sizes[i], sizeof(storage_size_t));
295329
file_index += sizeof(storage_size_t);
330+
331+
const auto* loc = location_map[relative_uri];
296332
throw_if_not_ok(resources.vfs().read_exactly(
297-
uri, 0, &data[file_index], file_sizes[i]));
333+
loc->uri(), loc->offset(), &data[file_index], file_sizes[i]));
298334
file_index += file_sizes[i];
299335
}
300336
}

0 commit comments

Comments
 (0)