From 467861dc0a41c9ecf5bc100946492ea9ed81ef4c Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 15 May 2025 11:47:04 -0400 Subject: [PATCH 01/45] Add storage functions to get job with failed tasks and partially reset job --- src/spider/storage/MetadataStorage.hpp | 22 ++++ src/spider/storage/mysql/MySqlStorage.cpp | 133 ++++++++++++++++++++++ src/spider/storage/mysql/MySqlStorage.hpp | 7 ++ 3 files changed, 162 insertions(+) diff --git a/src/spider/storage/MetadataStorage.hpp b/src/spider/storage/MetadataStorage.hpp index 91f733b7..fd85fd93 100644 --- a/src/spider/storage/MetadataStorage.hpp +++ b/src/spider/storage/MetadataStorage.hpp @@ -74,6 +74,28 @@ class MetadataStorage { = 0; virtual auto remove_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0; virtual auto reset_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0; + /** + * Gets all the jobs that contains a failed task. + * @param conn The storage connection. + * @param job_ids Returns the job ids of the jobs that contains a failed task. + * @return The storage error code. + */ + virtual auto get_failed_jobs(StorageConnection& conn, std::vector* job_ids) + -> StorageErr + = 0; + /** + * Resets tasks in a job to a previous runnable states. + * @param conn The storage connection. + * @param ready_tasks The tasks to be set to ready. + * @param pending_tasks The tasks to be set to pending. + * @return The storage error code. + */ + virtual auto reset_tasks( + StorageConnection& conn, + std::vector const& ready_tasks, + std::vector const& pending_tasks + ) -> StorageErr + = 0; virtual auto add_child(StorageConnection& conn, boost::uuids::uuid parent_id, Task const& child) -> StorageErr = 0; diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index 7678f69b..51e5d6ab 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1143,6 +1143,139 @@ auto MySqlMetadataStorage::reset_job(StorageConnection& conn, boost::uuids::uuid return StorageErr{}; } +auto MySqlMetadataStorage::get_failed_jobs( + StorageConnection& conn, + std::vector* job_ids +) -> StorageErr { + try { + std::unique_ptr statement{ + static_cast(conn)->createStatement() + }; + std::unique_ptr result{statement->executeQuery( + "SELECT `job_id` FROM `tasks` WHERE `state` = 'failed' AND (`max_retry` = 0 OR " + "`retry` < `max_retry`)" + )}; + job_ids->reserve(result->rowsCount()); + while (result->next()) { + job_ids->emplace_back(read_id(result->getBinaryStream("job_id"))); + } + } catch (sql::SQLException& e) { + static_cast(conn)->rollback(); + return StorageErr{StorageErrType::OtherErr, e.what()}; + } + static_cast(conn)->commit(); + return StorageErr{}; +} + +auto MySqlMetadataStorage::reset_tasks( + StorageConnection& conn, + std::vector const& ready_tasks, + std::vector const& pending_tasks +) -> StorageErr { + try { + // Reset ready tasks + std::unique_ptr ready_statement( + static_cast(conn)->prepareStatement( + "UPDATE `tasks` SET `state` = 'ready' WHERE `id` = ?" + ) + ); + for (boost::uuids::uuid const& id : ready_tasks) { + sql::bytes id_bytes = uuid_get_bytes(id); + ready_statement->setBytes(1, &id_bytes); + ready_statement->addBatch(); + } + ready_statement->executeBatch(); + // Reset pending tasks + std::unique_ptr pending_statement( + static_cast(conn)->prepareStatement( + "UPDATE `tasks` SET `state` = 'pending' WHERE `id` = ?" + ) + ); + for (boost::uuids::uuid const& id : pending_tasks) { + sql::bytes id_bytes = uuid_get_bytes(id); + pending_statement->setBytes(1, &id_bytes); + pending_statement->addBatch(); + } + pending_statement->executeBatch(); + // Clear all the task outputs + std::unique_ptr output_statement( + static_cast(conn)->prepareStatement( + "UPDATE `task_outputs` SET `value` = NULL, `data_id` = NULL WHERE " + "`task_id` = ?" + ) + ); + for (boost::uuids::uuid const& id : ready_tasks) { + sql::bytes id_bytes = uuid_get_bytes(id); + output_statement->setBytes(1, &id_bytes); + output_statement->addBatch(); + } + for (boost::uuids::uuid const& id : pending_tasks) { + sql::bytes id_bytes = uuid_get_bytes(id); + output_statement->setBytes(1, &id_bytes); + output_statement->addBatch(); + } + output_statement->executeBatch(); + // Clear the task inputs value or data for pending tasks + std::unique_ptr input_statement( + static_cast(conn)->prepareStatement( + "UPDATE `task_inputs` SET `value` = NULL, `data_id` = NULL WHERE `task_id` " + "= ?" + ) + ); + for (boost::uuids::uuid const& id : pending_tasks) { + sql::bytes id_bytes = uuid_get_bytes(id); + input_statement->setBytes(1, &id_bytes); + input_statement->addBatch(); + } + input_statement->executeBatch(); + // Set the data to be not persisted if it is only owned by ready and pending tasks. + // 1. Get the list of data that are persisted and referenced by a task. + // 2. Filter out the data that is reference by driver of other tasks. + // 3. Set the data to be not persisted. + std::unique_ptr get_data_statement( + static_cast(conn)->prepareStatement( + "SELECT `data`.`id`, `data_ref_task`.`task_id` FROM `data` JOIN " + "`data_ref_task` ON `data`.`id` = `data_ref_task`.`id` WHERE " + "`data`.`persisted` = 1" + ) + ); + std::unique_ptr const data_res(get_data_statement->executeQuery()); + absl::flat_hash_set data_ids; + absl::flat_hash_set remove_data_ids; + while (data_res->next()) { + boost::uuids::uuid const data_id = read_id(data_res->getBinaryStream("id")); + boost::uuids::uuid const task_id = read_id(data_res->getBinaryStream("task_id")); + data_ids.insert(data_id); + if (std::ranges::find(ready_tasks, task_id) == ready_tasks.end() + && std::ranges::find(pending_tasks, task_id) == pending_tasks.end()) + { + remove_data_ids.insert(task_id); + } + } + for (boost::uuids::uuid const& id : remove_data_ids) { + data_ids.erase(id); + } + if (!data_ids.empty()) { + std::unique_ptr set_data_statement( + static_cast(conn)->prepareStatement( + "UPDATE `data` SET `persisted` = 0 WHERE `id` = ?" + ) + ); + for (boost::uuids::uuid const& id : data_ids) { + sql::bytes id_bytes = uuid_get_bytes(id); + set_data_statement->setBytes(1, &id_bytes); + set_data_statement->addBatch(); + } + set_data_statement->executeBatch(); + } + } catch (sql::SQLException& e) { + static_cast(conn)->rollback(); + return StorageErr{StorageErrType::OtherErr, e.what()}; + } + static_cast(conn)->commit(); + return StorageErr{}; +} + auto MySqlMetadataStorage::add_child( StorageConnection& conn, boost::uuids::uuid parent_id, diff --git a/src/spider/storage/mysql/MySqlStorage.hpp b/src/spider/storage/mysql/MySqlStorage.hpp index 6fb2c708..36dbbac1 100644 --- a/src/spider/storage/mysql/MySqlStorage.hpp +++ b/src/spider/storage/mysql/MySqlStorage.hpp @@ -73,6 +73,13 @@ class MySqlMetadataStorage : public MetadataStorage { ) -> StorageErr override; auto remove_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override; auto reset_job(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override; + auto get_failed_jobs(StorageConnection& conn, std::vector* job_ids) + -> StorageErr override; + auto reset_tasks( + StorageConnection& conn, + std::vector const& ready_tasks, + std::vector const& pending_tasks + ) -> StorageErr override; auto add_child(StorageConnection& conn, boost::uuids::uuid parent_id, Task const& child) -> StorageErr override; auto get_task(StorageConnection& conn, boost::uuids::uuid id, Task* task) From 409970b6657fc6f668d369834eb47acba3c77ba4 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 15 May 2025 11:54:22 -0400 Subject: [PATCH 02/45] Update retry count when resetting tasks --- src/spider/storage/mysql/MySqlStorage.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index 51e5d6ab..28cb5e3b 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1173,10 +1173,10 @@ auto MySqlMetadataStorage::reset_tasks( std::vector const& pending_tasks ) -> StorageErr { try { - // Reset ready tasks + // Reset ready tasks and update retry count std::unique_ptr ready_statement( static_cast(conn)->prepareStatement( - "UPDATE `tasks` SET `state` = 'ready' WHERE `id` = ?" + "UPDATE `tasks` SET `state` = 'ready', `retry` = `retry` + ` WHERE `id` = ?" ) ); for (boost::uuids::uuid const& id : ready_tasks) { @@ -1188,7 +1188,8 @@ auto MySqlMetadataStorage::reset_tasks( // Reset pending tasks std::unique_ptr pending_statement( static_cast(conn)->prepareStatement( - "UPDATE `tasks` SET `state` = 'pending' WHERE `id` = ?" + "UPDATE `tasks` SET `state` = 'pending', `retry` = `retry` + 1 WHERE `id` " + "= ?" ) ); for (boost::uuids::uuid const& id : pending_tasks) { From 319c21f14de0c13955d53945678c9b56aa58ab18 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 15 May 2025 12:02:24 -0400 Subject: [PATCH 03/45] Fix typo --- src/spider/storage/mysql/MySqlStorage.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index 28cb5e3b..2f0241f9 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1151,8 +1151,9 @@ auto MySqlMetadataStorage::get_failed_jobs( std::unique_ptr statement{ static_cast(conn)->createStatement() }; + // Every task should have at least one retry std::unique_ptr result{statement->executeQuery( - "SELECT `job_id` FROM `tasks` WHERE `state` = 'failed' AND (`max_retry` = 0 OR " + "SELECT `job_id` FROM `tasks` WHERE `state` = 'failed' AND (`retry` = 0 OR " "`retry` < `max_retry`)" )}; job_ids->reserve(result->rowsCount()); From 39c2f9747b6d2617081934091d307f08ddf51674 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 15 May 2025 14:26:04 -0400 Subject: [PATCH 04/45] Set the job fail only if we run out of retries --- src/spider/storage/mysql/MySqlStorage.cpp | 29 ++++++++++++++++++----- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index 2f0241f9..b9becf70 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1822,15 +1822,32 @@ auto MySqlMetadataStorage::task_fail( ); task_statement->setBytes(1, &task_id_bytes); task_statement->executeUpdate(); - // Set the job fails - std::unique_ptr const job_statement( + // Check if we run out of retry + std::unique_ptr const retry_statement( static_cast(conn)->prepareStatement( - "UPDATE `jobs` SET `state` = 'fail' WHERE `id` = (SELECT `job_id` FROM " - "`tasks` WHERE `id` = ?)" + "SELECT `retry`, `max_retry` FROM `tasks` WHERE `id` = ?" ) ); - job_statement->setBytes(1, &task_id_bytes); - job_statement->executeUpdate(); + retry_statement->setBytes(1, &task_id_bytes); + std::unique_ptr const retry_res{retry_statement->executeQuery()}; + if (retry_res->rowsCount() == 0) { + static_cast(conn)->rollback(); + return StorageErr{StorageErrType::KeyNotFoundErr, "Task not found"}; + } + retry_res->next(); + int32_t const retry = retry_res->getInt("retry"); + int32_t const max_retry = retry_res->getInt("max_retry"); + if (retry == 0 || retry >= max_retry) { + // Set the job fails + std::unique_ptr const job_statement( + static_cast(conn)->prepareStatement( + "UPDATE `jobs` SET `state` = 'fail' WHERE `id` = (SELECT `job_id` FROM " + "`tasks` WHERE `id` = ?)" + ) + ); + job_statement->setBytes(1, &task_id_bytes); + job_statement->executeUpdate(); + } } } catch (sql::SQLException& e) { spdlog::error("Task fail error: {}", e.what()); From cfb49d239b10e75cc82534953ad47992ddb97d3b Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 15 May 2025 14:49:43 -0400 Subject: [PATCH 05/45] Bug fix for sql statments --- src/spider/storage/mysql/MySqlStorage.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index b9becf70..fae15b15 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1177,7 +1177,8 @@ auto MySqlMetadataStorage::reset_tasks( // Reset ready tasks and update retry count std::unique_ptr ready_statement( static_cast(conn)->prepareStatement( - "UPDATE `tasks` SET `state` = 'ready', `retry` = `retry` + ` WHERE `id` = ?" + "UPDATE `tasks` SET `state` = 'ready', `retry` = `retry` + 1 WHERE `id` " + "= ?" ) ); for (boost::uuids::uuid const& id : ready_tasks) { @@ -1841,7 +1842,8 @@ auto MySqlMetadataStorage::task_fail( // Set the job fails std::unique_ptr const job_statement( static_cast(conn)->prepareStatement( - "UPDATE `jobs` SET `state` = 'fail' WHERE `id` = (SELECT `job_id` FROM " + "UPDATE `jobs` SET `state` = 'fail' WHERE `id` = (SELECT `job_id` " + "FROM " "`tasks` WHERE `id` = ?)" ) ); From 7a8a0a24d711f27c99c22888d928951ab6f8153b Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 15 May 2025 14:49:59 -0400 Subject: [PATCH 06/45] Add unit test for partial job reset --- tests/storage/test-MetadataStorage.cpp | 86 ++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/tests/storage/test-MetadataStorage.cpp b/tests/storage/test-MetadataStorage.cpp index a685b16d..1768488a 100644 --- a/tests/storage/test-MetadataStorage.cpp +++ b/tests/storage/test-MetadataStorage.cpp @@ -540,6 +540,92 @@ TEMPLATE_LIST_TEST_CASE("Job reset", "[storage]", spider::test::StorageFactoryTy REQUIRE(storage->remove_job(*conn, job_id).success()); } +TEMPLATE_LIST_TEST_CASE("Job partial reset", "[storage]", spider::test::StorageFactoryTypeList) { + std::unique_ptr storage_factory + = spider::test::create_storage_factory(); + std::unique_ptr storage + = storage_factory->provide_metadata_storage(); + + std::variant, spider::core::StorageErr> + conn_result = storage_factory->provide_storage_connection(); + REQUIRE(std::holds_alternative>(conn_result)); + auto conn = std::move(std::get>(conn_result)); + + boost::uuids::random_generator gen; + boost::uuids::uuid const job_id = gen(); + + // Create a complicated task graph + spider::core::Task child_task{"child"}; + spider::core::Task parent_1{"p1"}; + spider::core::Task parent_2{"p2"}; + parent_1.add_input(spider::core::TaskInput{"1", "float"}); + parent_1.add_input(spider::core::TaskInput{"2", "float"}); + parent_2.add_input(spider::core::TaskInput{"3", "int"}); + parent_2.add_input(spider::core::TaskInput{"4", "int"}); + parent_1.add_output(spider::core::TaskOutput{"float"}); + parent_2.add_output(spider::core::TaskOutput{"int"}); + child_task.add_input(spider::core::TaskInput{parent_1.get_id(), 0, "float"}); + child_task.add_input(spider::core::TaskInput{parent_2.get_id(), 0, "int"}); + child_task.add_output(spider::core::TaskOutput{"float"}); + parent_1.set_max_retries(1); + parent_2.set_max_retries(1); + child_task.set_max_retries(1); + spider::core::TaskGraph graph; + // Add task and dependencies to task graph in wrong order + graph.add_task(child_task); + graph.add_task(parent_1); + graph.add_task(parent_2); + graph.add_dependency(parent_2.get_id(), child_task.get_id()); + graph.add_dependency(parent_1.get_id(), child_task.get_id()); + graph.add_input_task(parent_1.get_id()); + graph.add_input_task(parent_2.get_id()); + graph.add_output_task(child_task.get_id()); + // Submit job should success + REQUIRE(storage->add_job(*conn, job_id, gen(), graph).success()); + + // Task finish for parent 1 should succeed + spider::core::TaskInstance const parent_1_instance{gen(), parent_1.get_id()}; + REQUIRE(storage->set_task_state(*conn, parent_1.get_id(), spider::core::TaskState::Running) + .success()); + REQUIRE(storage->task_finish( + *conn, + parent_1_instance, + {spider::core::TaskOutput{"1.1", "float"}} + ) + .success()); + + // Job partial reset + REQUIRE(storage->reset_tasks(*conn, {parent_2.get_id()}, {child_task.get_id()}).success()); + // Parent tasks states should be ready and child task state should be waiting + // Parent tasks inputs should be available and child task inputs should be empty + // All tasks output should be empty + spider::core::Task res_task{""}; + REQUIRE(storage->get_task(*conn, parent_1.get_id(), &res_task).success()); + REQUIRE(res_task.get_state() == spider::core::TaskState::Succeed); + REQUIRE(res_task.get_num_inputs() == 2); + REQUIRE(res_task.get_input(0).get_value() == "1"); + REQUIRE(res_task.get_input(1).get_value() == "2"); + REQUIRE(res_task.get_num_outputs() == 1); + REQUIRE(res_task.get_output(0).get_value().has_value()); + REQUIRE(storage->get_task(*conn, parent_2.get_id(), &res_task).success()); + REQUIRE(res_task.get_state() == spider::core::TaskState::Ready); + REQUIRE(res_task.get_num_inputs() == 2); + REQUIRE(res_task.get_input(0).get_value() == "3"); + REQUIRE(res_task.get_input(1).get_value() == "4"); + REQUIRE(res_task.get_num_outputs() == 1); + REQUIRE(!res_task.get_output(0).get_value().has_value()); + REQUIRE(storage->get_task(*conn, child_task.get_id(), &res_task).success()); + REQUIRE(res_task.get_state() == spider::core::TaskState::Pending); + REQUIRE(res_task.get_num_inputs() == 2); + REQUIRE(!res_task.get_input(0).get_value().has_value()); + REQUIRE(!res_task.get_input(1).get_value().has_value()); + REQUIRE(res_task.get_num_outputs() == 1); + REQUIRE(!res_task.get_output(0).get_value().has_value()); + + // Clean up + REQUIRE(storage->remove_job(*conn, job_id).success()); +} + TEMPLATE_LIST_TEST_CASE( "Scheduler lease timeout", "[storage]", From 4a3060242178d3ae2ecea29370839e7ab84f9225 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Tue, 20 May 2025 09:35:53 -0400 Subject: [PATCH 07/45] WIP for job recovery --- src/spider/CMakeLists.txt | 2 + src/spider/core/JobRecovery.cpp | 79 +++++++++++++++++++++++++++++++++ src/spider/core/JobRecovery.hpp | 47 ++++++++++++++++++++ 3 files changed, 128 insertions(+) create mode 100644 src/spider/core/JobRecovery.cpp create mode 100644 src/spider/core/JobRecovery.hpp diff --git a/src/spider/CMakeLists.txt b/src/spider/CMakeLists.txt index b3f92f4a..10bdb988 100644 --- a/src/spider/CMakeLists.txt +++ b/src/spider/CMakeLists.txt @@ -1,6 +1,7 @@ # set variable as CACHE INTERNAL to access it from other scope set(SPIDER_CORE_SOURCES core/Task.cpp + core/JobRecovery.cpp storage/mysql/MySqlConnection.cpp storage/mysql/MySqlStorageFactory.cpp storage/mysql/MySqlJobSubmissionBatch.cpp @@ -20,6 +21,7 @@ set(SPIDER_CORE_HEADERS core/Task.hpp core/TaskGraph.hpp core/JobMetadata.hpp + core/JobRecovery.hpp io/BoostAsio.hpp io/MsgPack.hpp io/msgpack_message.hpp diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp new file mode 100644 index 00000000..c5851cf1 --- /dev/null +++ b/src/spider/core/JobRecovery.cpp @@ -0,0 +1,79 @@ +#include "JobRecovery.hpp" + +#include +#include +#include + +#include +#include +#include +#include + +#include "../storage/DataStorage.hpp" +#include "../storage/MetadataStorage.hpp" +#include "../storage/StorageConnection.hpp" +#include "absl/container/flat_hash_set.h" + +namespace spider::core { +JobRecovery::JobRecovery( + boost::uuids::uuid const job_id, + std::shared_ptr storage_connection, + std::shared_ptr data_store, + std::shared_ptr metadata_store +) + : m_job_id{job_id}, + m_conn{std::move(storage_connection)}, + m_data_store{std::move(data_store)}, + m_metadata_store{std::move(metadata_store)} {} + +auto JobRecovery::compute_graph()-> StorageErr { + StorageErr const err = m_metadata_store->get_task_graph(*m_conn, m_job_id, &m_task_graph); + if (false == err.success()) { + return err; + } + + // Get all the failed tasks + absl::flat_hash_set task_set; + for (auto const& pair: m_task_graph.get_tasks()) { + Task const& task = pair.second; + if (TaskState::Failed == task.get_state()) { + task_set.insert(pair.first); + } + } + + absl::flat_hash_set ready_task_set; + absl::flat_hash_set pending_task_set; + // For each task pop from the set, check if its inputs contains non-persisted Data. + // If so, add it to the pending task set and add parent in the task_set. Otherwise, add it to + // the ready task set. + std::deque working_set; + for (auto const& task_id: task_set) { + working_set.push_back(task_id); + } + while (!working_set.empty()) { + auto const task_id = working_set.front(); + working_set.pop_front(); + std::optional optional_task = m_task_graph.get_task(task_id); + if (false == optional_task.has_value()) { + return StorageErr{StorageErrType::KeyNotFoundErr, fmt::format("No task with id {}", to_string(task_id))}; + } + Task& task = *optional_task.value(); + + } +} + +auto JobRecovery::check_task_input(bool& persisted)-> StorageErr{ + +} + + +auto JobRecovery::get_pending_tasks()-> std::vector{ + return {}; +} + +auto JobRecovery::get_ready_tasks()-> std::vector{ + return {}; +} + + +} // namespace spider::core diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp new file mode 100644 index 00000000..e3121f2d --- /dev/null +++ b/src/spider/core/JobRecovery.hpp @@ -0,0 +1,47 @@ +#ifndef SPIDER_CORE_JOBRECOVERY_HPP +#define SPIDER_CORE_JOBRECOVERY_HPP + +#include + +#include + +#include "../storage/DataStorage.hpp" +#include "../storage/MetadataStorage.hpp" +#include "../storage/StorageConnection.hpp" + +namespace spider::core { +class JobRecovery { +public: + JobRecovery( + boost::uuids::uuid job_id, + std::shared_ptr storage_connection, + std::shared_ptr data_store, + std::shared_ptr metadata_store + ); + + auto compute_graph() -> StorageErr; + + auto get_ready_tasks() -> std::vector; + + auto get_pending_tasks() -> std::vector; + +private: + + /** + * Check if all the task input data are persisted. + * @param persisted True if all the task input data are persisted, false otherwise. + * @return The storage error code from accessing the storage. + */ + auto check_task_input(bool& persisted) -> StorageErr; + + boost::uuids::uuid m_job_id; + + std::shared_ptr m_conn; + std::shared_ptr m_data_store; + std::shared_ptr m_metadata_store; + + TaskGraph m_task_graph; +}; +} // namespace spider::core + +#endif From 66eb5d24193be43090118209af7b5e75f83f7ffc Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 12:25:25 -0400 Subject: [PATCH 08/45] Add persisted in data --- src/spider/core/Data.hpp | 5 +++++ src/spider/storage/mysql/MySqlStorage.cpp | 11 ++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/spider/core/Data.hpp b/src/spider/core/Data.hpp index b5a1d556..1903aa95 100644 --- a/src/spider/core/Data.hpp +++ b/src/spider/core/Data.hpp @@ -31,11 +31,16 @@ class Data { void set_hard_locality(bool const hard) { m_hard_locality = hard; } + void set_persisted(bool const persisted) { this->m_persisted = persisted; } + + [[nodiscard]] auto is_persisted() const -> bool { return m_persisted; } + private: boost::uuids::uuid m_id; std::string m_value; std::vector m_locality; bool m_hard_locality = false; + bool m_persisted = false; void init_id() { boost::uuids::random_generator gen; diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index fae15b15..e2c6733e 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -2141,13 +2141,15 @@ auto MySqlDataStorage::add_driver_data( try { std::unique_ptr statement( static_cast(conn)->prepareStatement( - "INSERT INTO `data` (`id`, `value`, `hard_locality`) VALUES(?, ?, ?)" + "INSERT INTO `data` (`id`, `value`, `hard_locality`, `persisted`) " + "VALUES(?, ?, ?, ?)" ) ); sql::bytes id_bytes = uuid_get_bytes(data.get_id()); statement->setBytes(1, &id_bytes); statement->setString(2, data.get_value()); statement->setBoolean(3, data.is_hard_locality()); + statement->setBoolean(4, data.is_persisted()); statement->executeUpdate(); for (std::string const& addr : data.get_locality()) { @@ -2189,13 +2191,15 @@ auto MySqlDataStorage::add_task_data( try { std::unique_ptr statement( static_cast(conn)->prepareStatement( - "INSERT INTO `data` (`id`, `value`, `hard_locality`) VALUES(?, ?, ?)" + "INSERT INTO `data` (`id`, `value`, `hard_locality`, `persisted`) " + "VALUES(?, ?, ?, ?)" ) ); sql::bytes id_bytes = uuid_get_bytes(data.get_id()); statement->setBytes(1, &id_bytes); statement->setString(2, data.get_value()); statement->setBoolean(3, data.is_hard_locality()); + statement->setBoolean(4, data.is_persisted()); statement->executeUpdate(); for (std::string const& addr : data.get_locality()) { @@ -2235,7 +2239,7 @@ auto MySqlDataStorage::get_data_with_locality( ) -> StorageErr { std::unique_ptr statement( static_cast(conn)->prepareStatement( - "SELECT `id`, `value`, `hard_locality` FROM `data` WHERE `id` = ?" + "SELECT `id`, `value`, `hard_locality`, `persisted` FROM `data` WHERE `id` = ?" ) ); sql::bytes id_bytes = uuid_get_bytes(id); @@ -2251,6 +2255,7 @@ auto MySqlDataStorage::get_data_with_locality( res->next(); *data = Data{id, get_sql_string(res->getString(2))}; data->set_hard_locality(res->getBoolean(3)); + data->set_persisted(res->getBoolean(4)); std::unique_ptr locality_statement( static_cast(conn)->prepareStatement( From 22fa39969a8778c6203839da7487ecc75bbabe56 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 12:41:11 -0400 Subject: [PATCH 09/45] Add data persistence in storage and client --- src/spider/client/Data.hpp | 32 +++++++++++++++++++++++ src/spider/storage/DataStorage.hpp | 1 + src/spider/storage/mysql/MySqlStorage.cpp | 19 ++++++++++++++ src/spider/storage/mysql/MySqlStorage.hpp | 1 + 4 files changed, 53 insertions(+) diff --git a/src/spider/client/Data.hpp b/src/spider/client/Data.hpp index 9ef8116e..24ca5a81 100644 --- a/src/spider/client/Data.hpp +++ b/src/spider/client/Data.hpp @@ -79,6 +79,26 @@ class Data { m_data_store->set_data_locality(*conn, *m_impl); } + /** + * Sets the data as checkpointed, indicating the data should not be cleaned up. + * + * @throw spider::ConnectionException + */ + void set_checkpointed() { + m_impl->set_persisted(true); + if (nullptr != m_connection) { + m_data_store->set_data_persisted(*m_connection, *m_impl); + return; + } + std::variant, core::StorageErr> conn_result + = m_storage_factory->provide_storage_connection(); + if (std::holds_alternative(conn_result)) { + throw ConnectionException(std::get(conn_result).description); + } + auto conn = std::move(std::get>(conn_result)); + m_data_store->set_data_persisted(*conn, *m_impl); + } + class Builder { public: /** @@ -107,6 +127,16 @@ class Data { return *this; } + /** + * Sets the data as checkpointed, indicating the data should not be cleaned up. + * + * @return self + */ + auto set_checkpointed() -> Builder& { + m_persisted = true; + return *this; + } + /** * Builds the data object. * @@ -120,6 +150,7 @@ class Data { auto data = std::make_unique(std::string{buffer.data(), buffer.size()}); data->set_locality(m_nodes); data->set_hard_locality(m_hard_locality); + data->set_persisted(m_persisted); std::shared_ptr conn = m_connection; if (nullptr == conn) { std::variant, core::StorageErr> conn_result @@ -176,6 +207,7 @@ class Data { std::vector m_nodes; bool m_hard_locality = false; std::function m_cleanup_func; + bool m_persisted = false; std::shared_ptr m_data_store; std::shared_ptr m_storage_factory; diff --git a/src/spider/storage/DataStorage.hpp b/src/spider/storage/DataStorage.hpp index 79713111..2ff072e7 100644 --- a/src/spider/storage/DataStorage.hpp +++ b/src/spider/storage/DataStorage.hpp @@ -65,6 +65,7 @@ class DataStorage { ) -> StorageErr = 0; virtual auto set_data_locality(StorageConnection& conn, Data const& data) -> StorageErr = 0; + virtual auto set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr = 0; virtual auto remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0; virtual auto add_task_reference(StorageConnection& conn, boost::uuids::uuid id, boost::uuids::uuid task_id) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index e2c6733e..adec6c8a 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -2385,6 +2385,25 @@ auto MySqlDataStorage::set_data_locality(StorageConnection& conn, Data const& da return StorageErr{}; } +auto MySqlDataStorage::set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr { + try { + sql::bytes id_bytes = uuid_get_bytes(data.get_id()); + std::unique_ptr statement( + static_cast(conn)->prepareStatement( + "UPDATE `data` SET `persisted` = ? WHERE `id` = ?" + ) + ); + statement->setBoolean(1, data.is_persisted()); + statement->setBytes(2, &id_bytes); + statement->executeUpdate(); + } catch (sql::SQLException& e) { + static_cast(conn)->rollback(); + return StorageErr{StorageErrType::OtherErr, e.what()}; + } + static_cast(conn)->commit(); + return StorageErr{}; +} + auto MySqlDataStorage::remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr { try { std::unique_ptr statement( diff --git a/src/spider/storage/mysql/MySqlStorage.hpp b/src/spider/storage/mysql/MySqlStorage.hpp index 36dbbac1..2eab099d 100644 --- a/src/spider/storage/mysql/MySqlStorage.hpp +++ b/src/spider/storage/mysql/MySqlStorage.hpp @@ -168,6 +168,7 @@ class MySqlDataStorage : public DataStorage { Data* data ) -> StorageErr override; auto set_data_locality(StorageConnection& conn, Data const& data) -> StorageErr override; + auto set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr override; auto remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override; auto add_task_reference(StorageConnection& conn, boost::uuids::uuid id, boost::uuids::uuid task_id) From cd3b5304fb2974f8520d6fda60064ec911fa432d Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 12:48:05 -0400 Subject: [PATCH 10/45] Add data persistence test --- tests/storage/test-DataStorage.cpp | 9 ++++++++- tests/utils/CoreDataUtils.hpp | 4 ++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/storage/test-DataStorage.cpp b/tests/storage/test-DataStorage.cpp index f47e4ee3..7574a4fb 100644 --- a/tests/storage/test-DataStorage.cpp +++ b/tests/storage/test-DataStorage.cpp @@ -40,7 +40,7 @@ TEMPLATE_LIST_TEST_CASE( auto conn = std::move(std::get>(conn_result)); // Add driver and data - spider::core::Data const data{"value"}; + spider::core::Data data{"value"}; boost::uuids::random_generator gen; boost::uuids::uuid const driver_id = gen(); REQUIRE(metadata_storage->add_driver(*conn, spider::core::Driver{driver_id}).success()); @@ -56,6 +56,13 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(data_storage->get_data(*conn, data.get_id(), &result).success()); REQUIRE(spider::test::data_equal(data, result)); + // Set data persisted should succeed + data.set_persisted(false); + REQUIRE(data_storage->set_data_persisted(*conn, data).success()); + // Get data should match + REQUIRE(data_storage->get_data(*conn, data.get_id(), &result).success()); + REQUIRE(spider::test::data_equal(data, result)); + // Remove data should succeed REQUIRE(data_storage->remove_data(*conn, data.get_id()).success()); diff --git a/tests/utils/CoreDataUtils.hpp b/tests/utils/CoreDataUtils.hpp index 0de99828..d5820227 100644 --- a/tests/utils/CoreDataUtils.hpp +++ b/tests/utils/CoreDataUtils.hpp @@ -20,6 +20,10 @@ inline auto data_equal(core::Data const& d1, core::Data const& d2) -> bool { return false; } + if (d1.is_persisted() != d2.is_persisted()) { + return false; + } + return true; } } // namespace spider::test From d8a99ca49a7697009c4f7d613dcf71f9b0c44205 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 12:48:22 -0400 Subject: [PATCH 11/45] WIP --- src/spider/core/JobRecovery.cpp | 42 +++++++++++++++++++++++---------- src/spider/core/JobRecovery.hpp | 8 +------ 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index c5851cf1..528c13bb 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -26,7 +26,7 @@ JobRecovery::JobRecovery( m_data_store{std::move(data_store)}, m_metadata_store{std::move(metadata_store)} {} -auto JobRecovery::compute_graph()-> StorageErr { +auto JobRecovery::compute_graph() -> StorageErr { StorageErr const err = m_metadata_store->get_task_graph(*m_conn, m_job_id, &m_task_graph); if (false == err.success()) { return err; @@ -34,7 +34,7 @@ auto JobRecovery::compute_graph()-> StorageErr { // Get all the failed tasks absl::flat_hash_set task_set; - for (auto const& pair: m_task_graph.get_tasks()) { + for (auto const& pair : m_task_graph.get_tasks()) { Task const& task = pair.second; if (TaskState::Failed == task.get_state()) { task_set.insert(pair.first); @@ -47,7 +47,7 @@ auto JobRecovery::compute_graph()-> StorageErr { // If so, add it to the pending task set and add parent in the task_set. Otherwise, add it to // the ready task set. std::deque working_set; - for (auto const& task_id: task_set) { + for (auto const& task_id : task_set) { working_set.push_back(task_id); } while (!working_set.empty()) { @@ -55,25 +55,41 @@ auto JobRecovery::compute_graph()-> StorageErr { working_set.pop_front(); std::optional optional_task = m_task_graph.get_task(task_id); if (false == optional_task.has_value()) { - return StorageErr{StorageErrType::KeyNotFoundErr, fmt::format("No task with id {}", to_string(task_id))}; + return StorageErr{ + StorageErrType::KeyNotFoundErr, + fmt::format("No task with id {}", to_string(task_id)) + }; } - Task& task = *optional_task.value(); - } } -auto JobRecovery::check_task_input(bool& persisted)-> StorageErr{ - +auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> StorageErr { + for (auto const& task_input : task.get_inputs()) { + std::optional optional_date_id = task_input.get_data_id(); + if (false == optional_date_id.has_value()) { + continue; + } + boost::uuids::uuid const data_id = optional_date_id.value(); + Data data; + StorageErr const err = m_data_store->get_data(*m_conn, data_id, &data); + if (false == err.success()) { + return err; + } + if (data.is_persisted()) { + continue; + } + not_persisted = true; + return StorageErr{}; + } + not_persisted = false; + return StorageErr{}; } - -auto JobRecovery::get_pending_tasks()-> std::vector{ +auto JobRecovery::get_pending_tasks() -> std::vector { return {}; } -auto JobRecovery::get_ready_tasks()-> std::vector{ +auto JobRecovery::get_ready_tasks() -> std::vector { return {}; } - - } // namespace spider::core diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index e3121f2d..37f5d2d2 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -26,13 +26,7 @@ class JobRecovery { auto get_pending_tasks() -> std::vector; private: - - /** - * Check if all the task input data are persisted. - * @param persisted True if all the task input data are persisted, false otherwise. - * @return The storage error code from accessing the storage. - */ - auto check_task_input(bool& persisted) -> StorageErr; + auto check_task_input(Task const& task, bool& not_persisted) -> StorageErr; boost::uuids::uuid m_job_id; From cb1a1e52ab96c28287e4cee4a8ac5f56a6b627c2 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 12:25:25 -0400 Subject: [PATCH 12/45] Add persisted in data --- src/spider/core/Data.hpp | 5 +++++ src/spider/storage/mysql/MySqlStorage.cpp | 11 ++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/spider/core/Data.hpp b/src/spider/core/Data.hpp index b5a1d556..1903aa95 100644 --- a/src/spider/core/Data.hpp +++ b/src/spider/core/Data.hpp @@ -31,11 +31,16 @@ class Data { void set_hard_locality(bool const hard) { m_hard_locality = hard; } + void set_persisted(bool const persisted) { this->m_persisted = persisted; } + + [[nodiscard]] auto is_persisted() const -> bool { return m_persisted; } + private: boost::uuids::uuid m_id; std::string m_value; std::vector m_locality; bool m_hard_locality = false; + bool m_persisted = false; void init_id() { boost::uuids::random_generator gen; diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index 7678f69b..60aa0877 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1987,13 +1987,15 @@ auto MySqlDataStorage::add_driver_data( try { std::unique_ptr statement( static_cast(conn)->prepareStatement( - "INSERT INTO `data` (`id`, `value`, `hard_locality`) VALUES(?, ?, ?)" + "INSERT INTO `data` (`id`, `value`, `hard_locality`, `persisted`) " + "VALUES(?, ?, ?, ?)" ) ); sql::bytes id_bytes = uuid_get_bytes(data.get_id()); statement->setBytes(1, &id_bytes); statement->setString(2, data.get_value()); statement->setBoolean(3, data.is_hard_locality()); + statement->setBoolean(4, data.is_persisted()); statement->executeUpdate(); for (std::string const& addr : data.get_locality()) { @@ -2035,13 +2037,15 @@ auto MySqlDataStorage::add_task_data( try { std::unique_ptr statement( static_cast(conn)->prepareStatement( - "INSERT INTO `data` (`id`, `value`, `hard_locality`) VALUES(?, ?, ?)" + "INSERT INTO `data` (`id`, `value`, `hard_locality`, `persisted`) " + "VALUES(?, ?, ?, ?)" ) ); sql::bytes id_bytes = uuid_get_bytes(data.get_id()); statement->setBytes(1, &id_bytes); statement->setString(2, data.get_value()); statement->setBoolean(3, data.is_hard_locality()); + statement->setBoolean(4, data.is_persisted()); statement->executeUpdate(); for (std::string const& addr : data.get_locality()) { @@ -2081,7 +2085,7 @@ auto MySqlDataStorage::get_data_with_locality( ) -> StorageErr { std::unique_ptr statement( static_cast(conn)->prepareStatement( - "SELECT `id`, `value`, `hard_locality` FROM `data` WHERE `id` = ?" + "SELECT `id`, `value`, `hard_locality`, `persisted` FROM `data` WHERE `id` = ?" ) ); sql::bytes id_bytes = uuid_get_bytes(id); @@ -2097,6 +2101,7 @@ auto MySqlDataStorage::get_data_with_locality( res->next(); *data = Data{id, get_sql_string(res->getString(2))}; data->set_hard_locality(res->getBoolean(3)); + data->set_persisted(res->getBoolean(4)); std::unique_ptr locality_statement( static_cast(conn)->prepareStatement( From 42411dfca41b5f3e2976a72e986308192bdf032b Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 12:41:11 -0400 Subject: [PATCH 13/45] Add data persistence in storage and client --- src/spider/client/Data.hpp | 32 +++++++++++++++++++++++ src/spider/storage/DataStorage.hpp | 1 + src/spider/storage/mysql/MySqlStorage.cpp | 19 ++++++++++++++ src/spider/storage/mysql/MySqlStorage.hpp | 1 + 4 files changed, 53 insertions(+) diff --git a/src/spider/client/Data.hpp b/src/spider/client/Data.hpp index 9ef8116e..24ca5a81 100644 --- a/src/spider/client/Data.hpp +++ b/src/spider/client/Data.hpp @@ -79,6 +79,26 @@ class Data { m_data_store->set_data_locality(*conn, *m_impl); } + /** + * Sets the data as checkpointed, indicating the data should not be cleaned up. + * + * @throw spider::ConnectionException + */ + void set_checkpointed() { + m_impl->set_persisted(true); + if (nullptr != m_connection) { + m_data_store->set_data_persisted(*m_connection, *m_impl); + return; + } + std::variant, core::StorageErr> conn_result + = m_storage_factory->provide_storage_connection(); + if (std::holds_alternative(conn_result)) { + throw ConnectionException(std::get(conn_result).description); + } + auto conn = std::move(std::get>(conn_result)); + m_data_store->set_data_persisted(*conn, *m_impl); + } + class Builder { public: /** @@ -107,6 +127,16 @@ class Data { return *this; } + /** + * Sets the data as checkpointed, indicating the data should not be cleaned up. + * + * @return self + */ + auto set_checkpointed() -> Builder& { + m_persisted = true; + return *this; + } + /** * Builds the data object. * @@ -120,6 +150,7 @@ class Data { auto data = std::make_unique(std::string{buffer.data(), buffer.size()}); data->set_locality(m_nodes); data->set_hard_locality(m_hard_locality); + data->set_persisted(m_persisted); std::shared_ptr conn = m_connection; if (nullptr == conn) { std::variant, core::StorageErr> conn_result @@ -176,6 +207,7 @@ class Data { std::vector m_nodes; bool m_hard_locality = false; std::function m_cleanup_func; + bool m_persisted = false; std::shared_ptr m_data_store; std::shared_ptr m_storage_factory; diff --git a/src/spider/storage/DataStorage.hpp b/src/spider/storage/DataStorage.hpp index 79713111..2ff072e7 100644 --- a/src/spider/storage/DataStorage.hpp +++ b/src/spider/storage/DataStorage.hpp @@ -65,6 +65,7 @@ class DataStorage { ) -> StorageErr = 0; virtual auto set_data_locality(StorageConnection& conn, Data const& data) -> StorageErr = 0; + virtual auto set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr = 0; virtual auto remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr = 0; virtual auto add_task_reference(StorageConnection& conn, boost::uuids::uuid id, boost::uuids::uuid task_id) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index 60aa0877..8621cf9d 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -2231,6 +2231,25 @@ auto MySqlDataStorage::set_data_locality(StorageConnection& conn, Data const& da return StorageErr{}; } +auto MySqlDataStorage::set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr { + try { + sql::bytes id_bytes = uuid_get_bytes(data.get_id()); + std::unique_ptr statement( + static_cast(conn)->prepareStatement( + "UPDATE `data` SET `persisted` = ? WHERE `id` = ?" + ) + ); + statement->setBoolean(1, data.is_persisted()); + statement->setBytes(2, &id_bytes); + statement->executeUpdate(); + } catch (sql::SQLException& e) { + static_cast(conn)->rollback(); + return StorageErr{StorageErrType::OtherErr, e.what()}; + } + static_cast(conn)->commit(); + return StorageErr{}; +} + auto MySqlDataStorage::remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr { try { std::unique_ptr statement( diff --git a/src/spider/storage/mysql/MySqlStorage.hpp b/src/spider/storage/mysql/MySqlStorage.hpp index 6fb2c708..befd431e 100644 --- a/src/spider/storage/mysql/MySqlStorage.hpp +++ b/src/spider/storage/mysql/MySqlStorage.hpp @@ -161,6 +161,7 @@ class MySqlDataStorage : public DataStorage { Data* data ) -> StorageErr override; auto set_data_locality(StorageConnection& conn, Data const& data) -> StorageErr override; + auto set_data_persisted(StorageConnection& conn, Data const& data) -> StorageErr override; auto remove_data(StorageConnection& conn, boost::uuids::uuid id) -> StorageErr override; auto add_task_reference(StorageConnection& conn, boost::uuids::uuid id, boost::uuids::uuid task_id) From 041679f7cf06d534dff25604e97cc396a7f36060 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 12:48:05 -0400 Subject: [PATCH 14/45] Add data persistence test --- tests/storage/test-DataStorage.cpp | 9 ++++++++- tests/utils/CoreDataUtils.hpp | 4 ++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/tests/storage/test-DataStorage.cpp b/tests/storage/test-DataStorage.cpp index f47e4ee3..7574a4fb 100644 --- a/tests/storage/test-DataStorage.cpp +++ b/tests/storage/test-DataStorage.cpp @@ -40,7 +40,7 @@ TEMPLATE_LIST_TEST_CASE( auto conn = std::move(std::get>(conn_result)); // Add driver and data - spider::core::Data const data{"value"}; + spider::core::Data data{"value"}; boost::uuids::random_generator gen; boost::uuids::uuid const driver_id = gen(); REQUIRE(metadata_storage->add_driver(*conn, spider::core::Driver{driver_id}).success()); @@ -56,6 +56,13 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(data_storage->get_data(*conn, data.get_id(), &result).success()); REQUIRE(spider::test::data_equal(data, result)); + // Set data persisted should succeed + data.set_persisted(false); + REQUIRE(data_storage->set_data_persisted(*conn, data).success()); + // Get data should match + REQUIRE(data_storage->get_data(*conn, data.get_id(), &result).success()); + REQUIRE(spider::test::data_equal(data, result)); + // Remove data should succeed REQUIRE(data_storage->remove_data(*conn, data.get_id()).success()); diff --git a/tests/utils/CoreDataUtils.hpp b/tests/utils/CoreDataUtils.hpp index 0de99828..d5820227 100644 --- a/tests/utils/CoreDataUtils.hpp +++ b/tests/utils/CoreDataUtils.hpp @@ -20,6 +20,10 @@ inline auto data_equal(core::Data const& d1, core::Data const& d2) -> bool { return false; } + if (d1.is_persisted() != d2.is_persisted()) { + return false; + } + return true; } } // namespace spider::test From f9bcb0e404987c996108b7d10bece00df54ba30f Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 21 May 2025 13:03:52 -0400 Subject: [PATCH 15/45] Fix the tests --- tests/storage/test-DataStorage.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/storage/test-DataStorage.cpp b/tests/storage/test-DataStorage.cpp index 7574a4fb..bb409504 100644 --- a/tests/storage/test-DataStorage.cpp +++ b/tests/storage/test-DataStorage.cpp @@ -57,7 +57,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(spider::test::data_equal(data, result)); // Set data persisted should succeed - data.set_persisted(false); + data.set_persisted(true); REQUIRE(data_storage->set_data_persisted(*conn, data).success()); // Get data should match REQUIRE(data_storage->get_data(*conn, data.get_id(), &result).success()); From cc031dc9ccdc1848636731e581fc6943cbb247a4 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 10:38:53 -0400 Subject: [PATCH 16/45] Fix relative header --- src/spider/core/JobRecovery.cpp | 7 +++---- src/spider/core/JobRecovery.hpp | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 528c13bb..4ec20df2 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -9,10 +9,9 @@ #include #include -#include "../storage/DataStorage.hpp" -#include "../storage/MetadataStorage.hpp" -#include "../storage/StorageConnection.hpp" -#include "absl/container/flat_hash_set.h" +#include +#include +#include namespace spider::core { JobRecovery::JobRecovery( diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index 37f5d2d2..197d6922 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -5,9 +5,9 @@ #include -#include "../storage/DataStorage.hpp" -#include "../storage/MetadataStorage.hpp" -#include "../storage/StorageConnection.hpp" +#include +#include +#include namespace spider::core { class JobRecovery { From 46ae85aec8a661f88c5a4ede6c78e1051a63d7ac Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 12:08:12 -0400 Subject: [PATCH 17/45] Add job recovery --- src/spider/core/JobRecovery.cpp | 65 ++++++++++++++++++++++++++++----- src/spider/core/JobRecovery.hpp | 17 +++++++++ 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 4ec20df2..a3c0ee15 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -26,17 +27,16 @@ JobRecovery::JobRecovery( m_metadata_store{std::move(metadata_store)} {} auto JobRecovery::compute_graph() -> StorageErr { - StorageErr const err = m_metadata_store->get_task_graph(*m_conn, m_job_id, &m_task_graph); + StorageErr err = m_metadata_store->get_task_graph(*m_conn, m_job_id, &m_task_graph); if (false == err.success()) { return err; } // Get all the failed tasks absl::flat_hash_set task_set; - for (auto const& pair : m_task_graph.get_tasks()) { - Task const& task = pair.second; + for (auto const& [task_id, task] : m_task_graph.get_tasks()) { if (TaskState::Failed == task.get_state()) { - task_set.insert(pair.first); + task_set.insert(task_id); } } @@ -45,13 +45,13 @@ auto JobRecovery::compute_graph() -> StorageErr { // For each task pop from the set, check if its inputs contains non-persisted Data. // If so, add it to the pending task set and add parent in the task_set. Otherwise, add it to // the ready task set. - std::deque working_set; + std::deque working_queue; for (auto const& task_id : task_set) { - working_set.push_back(task_id); + working_queue.push_back(task_id); } - while (!working_set.empty()) { - auto const task_id = working_set.front(); - working_set.pop_front(); + while (!working_queue.empty()) { + auto const task_id = working_queue.front(); + working_queue.pop_front(); std::optional optional_task = m_task_graph.get_task(task_id); if (false == optional_task.has_value()) { return StorageErr{ @@ -59,7 +59,52 @@ auto JobRecovery::compute_graph() -> StorageErr { fmt::format("No task with id {}", to_string(task_id)) }; } + Task const& task = *optional_task.value(); + bool not_persisted = false; + err = check_task_input(task, not_persisted); + if (false == err.success()) { + return err; + } + if (not_persisted) { + pending_task_set.insert(task_id); + std::vector const parents = m_task_graph.get_parent_tasks(task_id); + for (auto const& parent_id : parents) { + if (false == task_set.contains(parent_id)) { + working_queue.push_back(parent_id); + task_set.insert(parent_id); + } + } + } else { + ready_task_set.insert(task_id); + } + } + + // Set the pending and ready tasks + m_pending_tasks.clear(); + m_pending_tasks.reserve(pending_task_set.size()); + for (auto const& task_id : pending_task_set) { + m_pending_tasks.push_back(task_id); + } + m_ready_tasks.clear(); + m_ready_tasks.reserve(ready_task_set.size()); + for (auto const& task_id : ready_task_set) { + m_ready_tasks.push_back(task_id); + } + + return StorageErr{}; +} + +auto JobRecovery::get_data(boost::uuids::uuid data_id, Data& data) -> StorageErr { + auto it = m_data_map.find(data_id); + if (it != m_data_map.end()) { + data = it->second; + return StorageErr{}; + } + StorageErr const err = m_data_store->get_data(*m_conn, data_id, &data); + if (err.success()) { + m_data_map[data_id] = data; } + return err; } auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> StorageErr { @@ -70,7 +115,7 @@ auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> Sto } boost::uuids::uuid const data_id = optional_date_id.value(); Data data; - StorageErr const err = m_data_store->get_data(*m_conn, data_id, &data); + StorageErr const err = get_data(data_id, data); if (false == err.success()) { return err; } diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index 197d6922..ec9249f9 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -2,7 +2,9 @@ #define SPIDER_CORE_JOBRECOVERY_HPP #include +#include +#include #include #include @@ -28,13 +30,28 @@ class JobRecovery { private: auto check_task_input(Task const& task, bool& not_persisted) -> StorageErr; + /** + * Get the data associated with the given data_id. If the data is cached in + * m_data_map, return it. Otherwise, fetch it from the data store and cache + * it. + * @param data_id + * @param data + * @return + */ + auto get_data(boost::uuids::uuid data_id, Data& data) -> StorageErr; + boost::uuids::uuid m_job_id; std::shared_ptr m_conn; std::shared_ptr m_data_store; std::shared_ptr m_metadata_store; + absl::flat_hash_map m_data_map; + TaskGraph m_task_graph; + + std::vector m_ready_tasks; + std::vector m_pending_tasks; }; } // namespace spider::core From 74aade0a9c08aafda3f5fb4a5ca4836571fb9916 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 12:09:28 -0400 Subject: [PATCH 18/45] Add getters for job recovery result --- src/spider/core/JobRecovery.cpp | 8 ++++---- src/spider/core/JobRecovery.hpp | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index a3c0ee15..3880a062 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -129,11 +129,11 @@ auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> Sto return StorageErr{}; } -auto JobRecovery::get_pending_tasks() -> std::vector { - return {}; +auto JobRecovery::get_pending_tasks() -> std::vector const& { + return m_pending_tasks; } -auto JobRecovery::get_ready_tasks() -> std::vector { - return {}; +auto JobRecovery::get_ready_tasks() -> std::vector const& { + return m_ready_tasks; } } // namespace spider::core diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index ec9249f9..2c7cc5d5 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -23,9 +23,9 @@ class JobRecovery { auto compute_graph() -> StorageErr; - auto get_ready_tasks() -> std::vector; + auto get_ready_tasks() -> std::vector const&; - auto get_pending_tasks() -> std::vector; + auto get_pending_tasks() -> std::vector const&; private: auto check_task_input(Task const& task, bool& not_persisted) -> StorageErr; From f93723d809e44139115817cd00d55f76e95a23a1 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 12:16:26 -0400 Subject: [PATCH 19/45] Add docstring --- src/spider/core/JobRecovery.hpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index 2c7cc5d5..9c0ec3b2 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -21,6 +21,15 @@ class JobRecovery { std::shared_ptr metadata_store ); + /** + * Recover the job by loading the task graph and data from the storage, + * compute the minimal subgraph that contains all the failed tasks and the + * data across edge are all persisted. + * The result is stored in m_ready_tasks and m_pending_tasks, where + * m_ready_tasks contains the tasks on the boundary of the subgraph, and + * m_pending_tasks contains the tasks that are not ready to run yet. + * @return StorageErr + */ auto compute_graph() -> StorageErr; auto get_ready_tasks() -> std::vector const&; @@ -28,6 +37,13 @@ class JobRecovery { auto get_pending_tasks() -> std::vector const&; private: + /** + * Check if any of the task input is not persisted. + * @param task + * @param not_persisted Returns true if any of the task input is not + * persisted, false otherwise. + * @return + */ auto check_task_input(Task const& task, bool& not_persisted) -> StorageErr; /** From 11c2cd780d735c52fbd92a898ab601ddbd31a4cb Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 12:35:54 -0400 Subject: [PATCH 20/45] Fix clang-tidy --- src/spider/core/JobRecovery.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 3880a062..55a99ebf 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -10,6 +11,9 @@ #include #include +#include +#include +#include #include #include #include From 584ad73672c94d73b1a39ec28b2d74fa704979b4 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 12:48:35 -0400 Subject: [PATCH 21/45] Fix clang tidy --- src/spider/core/JobRecovery.hpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index 9c0ec3b2..739a542b 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -7,6 +7,9 @@ #include #include +#include +#include +#include #include #include #include From e9ccff6824a33c8e28fdaef007fdd0430a535e93 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 12:52:57 -0400 Subject: [PATCH 22/45] Fix clang tidy --- src/spider/core/JobRecovery.hpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index 739a542b..a314df3c 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include From b7e0911aed69761d50550227e0233beba545ac34 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 13:44:04 -0400 Subject: [PATCH 23/45] Add recovery loop in scheduler --- src/spider/scheduler/scheduler.cpp | 75 ++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/spider/scheduler/scheduler.cpp b/src/spider/scheduler/scheduler.cpp index 8679eab4..caf09644 100644 --- a/src/spider/scheduler/scheduler.cpp +++ b/src/spider/scheduler/scheduler.cpp @@ -17,11 +17,13 @@ #include #include #include +#include #include // IWYU pragma: keep #include #include #include +#include #include // IWYU pragma: keep #include #include @@ -33,6 +35,8 @@ #include #include +#include "spider/core/JobRecovery.hpp" + constexpr int cCmdArgParseErr = 1; constexpr int cSignalHandleErr = 2; constexpr int cStorageConnectionErr = 3; @@ -42,6 +46,8 @@ constexpr int cStorageErr = 5; constexpr int cCleanupInterval = 1000; constexpr int cRetryCount = 5; +constexpr int cRecoveryInterval = 1000; + namespace { /* * Signal handler for SIGTERM. Sets the stop flag to request a stop. @@ -145,6 +151,67 @@ auto cleanup_loop( } } +auto recovery_loop( + std::shared_ptr const& storage_factory, + std::shared_ptr const& metadata_store, + std::shared_ptr const& data_store +) -> void { + while (!spider::core::StopFlag::is_stop_requested()) { + std::this_thread::sleep_for(std::chrono::seconds(cRecoveryInterval)); + spdlog::debug("Starting recovery"); + std::variant, spider::core::StorageErr> + conn_result = storage_factory->provide_storage_connection(); + if (std::holds_alternative(conn_result)) { + spdlog::error( + "Failed to connect to storage: {}", + std::get(conn_result).description + ); + continue; + } + + std::shared_ptr conn = std::move( + std::get>(conn_result) + ); + + std::vector job_ids; + spider::core::StorageErr err = metadata_store->get_failed_jobs(*conn, &job_ids); + if (false == err.success()) { + spdlog::error("Failed to get failed jobs: {}", err.description); + continue; + } + if (job_ids.empty()) { + spdlog::debug("No failed jobs found"); + continue; + } + for (boost::uuids::uuid const& job_id : job_ids) { + spdlog::debug("Recovering job: {}", to_string(job_id)); + spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; + err = recovery.compute_graph(); + if (false == err.success()) { + spdlog::error( + "Failed to compute graph for job {}: {}", + to_string(job_id), + err.description + ); + continue; + } + err = metadata_store->reset_tasks( + *conn, + recovery.get_ready_tasks(), + recovery.get_ready_tasks() + ); + if (false == err.success()) { + spdlog::error( + "Failed to reset tasks for job {}: {}", + to_string(job_id), + err.description + ); + continue; + } + } + } +} + constexpr int cSignalExitBase = 128; } // namespace @@ -261,8 +328,16 @@ auto main(int argc, char** argv) -> int { // Start a thread that periodically starts cleanup std::thread cleanup_thread{cleanup_loop, std::cref(storage_factory), std::cref(data_store)}; + std::thread recovery_thread{ + recovery_loop, + std::cref(storage_factory), + std::cref(metadata_store), + std::cref(data_store) + }; + heartbeat_thread.join(); cleanup_thread.join(); + recovery_thread.join(); server.stop(); } catch (std::system_error& e) { spdlog::error("Failed to join thread: {}", e.what()); From 24b858a313f804fec75603ff398211bf62475aa3 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 13:49:11 -0400 Subject: [PATCH 24/45] Fix clang tidy --- src/spider/core/JobRecovery.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 55a99ebf..4f5bc252 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -119,7 +119,7 @@ auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> Sto } boost::uuids::uuid const data_id = optional_date_id.value(); Data data; - StorageErr const err = get_data(data_id, data); + StorageErr err = get_data(data_id, data); if (false == err.success()) { return err; } From 668902b500fb13c95b2982611fbc53a86b2604e6 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 14:13:15 -0400 Subject: [PATCH 25/45] Fix clang tidy --- src/spider/scheduler/scheduler.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/spider/scheduler/scheduler.cpp b/src/spider/scheduler/scheduler.cpp index caf09644..0295c7c1 100644 --- a/src/spider/scheduler/scheduler.cpp +++ b/src/spider/scheduler/scheduler.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -35,8 +36,6 @@ #include #include -#include "spider/core/JobRecovery.hpp" - constexpr int cCmdArgParseErr = 1; constexpr int cSignalHandleErr = 2; constexpr int cStorageConnectionErr = 3; @@ -169,7 +168,7 @@ auto recovery_loop( continue; } - std::shared_ptr conn = std::move( + std::shared_ptr const conn = std::move( std::get>(conn_result) ); From dc38efc424a05985946c1eb7d36ef11ee2531ca1 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 14:28:39 -0400 Subject: [PATCH 26/45] Bug fix --- src/spider/core/JobRecovery.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 4f5bc252..1fc0afda 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -47,8 +47,9 @@ auto JobRecovery::compute_graph() -> StorageErr { absl::flat_hash_set ready_task_set; absl::flat_hash_set pending_task_set; // For each task pop from the set, check if its inputs contains non-persisted Data. - // If so, add it to the pending task set and add parent in the task_set. Otherwise, add it to - // the ready task set. + // If the task has non-persisted Data input and has parents, add it to pending tasks and add + // its parents to the working queue. If the task has non-persisted Data input and has no + // parents, or the task has all its inputs persisted, add it to ready tasks. std::deque working_queue; for (auto const& task_id : task_set) { working_queue.push_back(task_id); @@ -70,12 +71,16 @@ auto JobRecovery::compute_graph() -> StorageErr { return err; } if (not_persisted) { - pending_task_set.insert(task_id); std::vector const parents = m_task_graph.get_parent_tasks(task_id); - for (auto const& parent_id : parents) { - if (false == task_set.contains(parent_id)) { - working_queue.push_back(parent_id); - task_set.insert(parent_id); + if (parents.empty()) { + ready_task_set.insert(task_id); + } else { + pending_task_set.insert(task_id); + for (auto const& parent_id : parents) { + if (false == task_set.contains(parent_id)) { + working_queue.push_back(parent_id); + task_set.insert(parent_id); + } } } } else { From dc6918e5ee23d576bb56140cb782f4f45b65c525 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Mon, 26 May 2025 14:29:01 -0400 Subject: [PATCH 27/45] Add simple job recovery unit test --- tests/CMakeLists.txt | 1 + tests/scheduler/test-JobRecovery.cpp | 179 +++++++++++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 tests/scheduler/test-JobRecovery.cpp diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a04b1c58..d273c154 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -10,6 +10,7 @@ set(SPIDER_TEST_SOURCES worker/test-TaskExecutor.cpp worker/test-Process.cpp io/test-MsgpackMessage.cpp + scheduler/test-JobRecovery.cpp scheduler/test-SchedulerPolicy.cpp scheduler/test-SchedulerServer.cpp client/test-Driver.cpp diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp new file mode 100644 index 00000000..4205f365 --- /dev/null +++ b/tests/scheduler/test-JobRecovery.cpp @@ -0,0 +1,179 @@ +// NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { +TEMPLATE_LIST_TEST_CASE("Recovery single task", "[storage]", spider::test::StorageFactoryTypeList) { + std::shared_ptr const storage_factory + = spider::test::create_storage_factory(); + std::shared_ptr const metadata_store + = storage_factory->provide_metadata_storage(); + std::shared_ptr const data_store + = storage_factory->provide_data_storage(); + + std::variant, spider::core::StorageErr> + conn_result = storage_factory->provide_storage_connection(); + REQUIRE(std::holds_alternative>(conn_result)); + std::shared_ptr const conn + = std::move(std::get>(conn_result)); + + boost::uuids::random_generator gen; + + boost::uuids::uuid const job_id = gen(); + boost::uuids::uuid const client_id = gen(); + // Submit task without data + spider::core::Task task{"task"}; + REQUIRE(metadata_store->add_driver(*conn, spider::core::Driver{client_id}).success()); + task.add_input(spider::core::TaskInput{"10", "int"}); + task.add_output(spider::core::TaskOutput{"int"}); + spider::core::TaskGraph graph; + graph.add_task(task); + graph.add_input_task(task.get_id()); + graph.add_output_task(task.get_id()); + REQUIRE(metadata_store->add_job(*conn, job_id, client_id, graph).success()); + + // Set task as failed + REQUIRE(metadata_store->set_task_state(*conn, task.get_id(), spider::core::TaskState::Failed) + .success()); + + // Recover the job + spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; + REQUIRE(recovery.compute_graph().success()); + auto const& ready_tasks = recovery.get_ready_tasks(); + auto const& pending_tasks = recovery.get_pending_tasks(); + REQUIRE(ready_tasks.size() == 1); + REQUIRE(pending_tasks.empty()); + REQUIRE(ready_tasks[0] == task.get_id()); + + REQUIRE(metadata_store->remove_job(*conn, job_id).success()); +} + +TEMPLATE_LIST_TEST_CASE( + "Recovery single task with data", + "[storage]", + spider::test::StorageFactoryTypeList +) { + std::shared_ptr const storage_factory + = spider::test::create_storage_factory(); + std::shared_ptr const metadata_store + = storage_factory->provide_metadata_storage(); + std::shared_ptr const data_store + = storage_factory->provide_data_storage(); + + std::variant, spider::core::StorageErr> + conn_result = storage_factory->provide_storage_connection(); + REQUIRE(std::holds_alternative>(conn_result)); + std::shared_ptr const conn + = std::move(std::get>(conn_result)); + + boost::uuids::random_generator gen; + + boost::uuids::uuid const job_id = gen(); + boost::uuids::uuid const client_id = gen(); + // Submit task without data + spider::core::Task task{"task"}; + spider::core::Data data{"data"}; + REQUIRE(metadata_store->add_driver(*conn, spider::core::Driver{client_id}).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data).success()); + task.add_input(spider::core::TaskInput{data.get_id()}); + task.add_output(spider::core::TaskOutput{"int"}); + spider::core::TaskGraph graph; + graph.add_task(task); + graph.add_input_task(task.get_id()); + graph.add_output_task(task.get_id()); + REQUIRE(metadata_store->add_job(*conn, job_id, client_id, graph).success()); + + // Set task as failed + REQUIRE(metadata_store->set_task_state(*conn, task.get_id(), spider::core::TaskState::Failed) + .success()); + + // Recover the job + spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; + REQUIRE(recovery.compute_graph().success()); + auto const& ready_tasks = recovery.get_ready_tasks(); + auto const& pending_tasks = recovery.get_pending_tasks(); + REQUIRE(ready_tasks.size() == 1); + REQUIRE(pending_tasks.empty()); + REQUIRE(ready_tasks[0] == task.get_id()); + + REQUIRE(metadata_store->remove_job(*conn, job_id).success()); + REQUIRE(data_store->remove_data(*conn, data.get_id()).success()); +} + +TEMPLATE_LIST_TEST_CASE( + "Recovery single task with persisted data", + "[storage]", + spider::test::StorageFactoryTypeList +) { + std::shared_ptr const storage_factory + = spider::test::create_storage_factory(); + std::shared_ptr const metadata_store + = storage_factory->provide_metadata_storage(); + std::shared_ptr const data_store + = storage_factory->provide_data_storage(); + + std::variant, spider::core::StorageErr> + conn_result = storage_factory->provide_storage_connection(); + REQUIRE(std::holds_alternative>(conn_result)); + std::shared_ptr const conn + = std::move(std::get>(conn_result)); + + boost::uuids::random_generator gen; + + boost::uuids::uuid const job_id = gen(); + boost::uuids::uuid const client_id = gen(); + // Submit task without data + spider::core::Task task{"task"}; + spider::core::Data data{"data"}; + data.set_persisted(true); + REQUIRE(metadata_store->add_driver(*conn, spider::core::Driver{client_id}).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data).success()); + task.add_input(spider::core::TaskInput{data.get_id()}); + task.add_output(spider::core::TaskOutput{"int"}); + spider::core::TaskGraph graph; + graph.add_task(task); + graph.add_input_task(task.get_id()); + graph.add_output_task(task.get_id()); + REQUIRE(metadata_store->add_job(*conn, job_id, client_id, graph).success()); + + // Set task as failed + REQUIRE(metadata_store->set_task_state(*conn, task.get_id(), spider::core::TaskState::Failed) + .success()); + + // Recover the job + spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; + REQUIRE(recovery.compute_graph().success()); + auto const& ready_tasks = recovery.get_ready_tasks(); + auto const& pending_tasks = recovery.get_pending_tasks(); + REQUIRE(ready_tasks.size() == 1); + REQUIRE(pending_tasks.empty()); + REQUIRE(ready_tasks[0] == task.get_id()); + + REQUIRE(metadata_store->remove_job(*conn, job_id).success()); + REQUIRE(data_store->remove_data(*conn, data.get_id()).success()); +} +} // namespace + +// NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) From 14f2b94ce3b8b0ba297b9ded3088b9cb2062c57a Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 28 May 2025 14:52:03 -0400 Subject: [PATCH 28/45] Adding children of rollbacked task into queue --- src/spider/core/JobRecovery.cpp | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 1fc0afda..889bde21 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -36,7 +36,6 @@ auto JobRecovery::compute_graph() -> StorageErr { return err; } - // Get all the failed tasks absl::flat_hash_set task_set; for (auto const& [task_id, task] : m_task_graph.get_tasks()) { if (TaskState::Failed == task.get_state()) { @@ -47,6 +46,7 @@ auto JobRecovery::compute_graph() -> StorageErr { absl::flat_hash_set ready_task_set; absl::flat_hash_set pending_task_set; // For each task pop from the set, check if its inputs contains non-persisted Data. + // Add the non-pending children of the task to the working queue. // If the task has non-persisted Data input and has parents, add it to pending tasks and add // its parents to the working queue. If the task has non-persisted Data input and has no // parents, or the task has all its inputs persisted, add it to ready tasks. @@ -70,6 +70,23 @@ auto JobRecovery::compute_graph() -> StorageErr { if (false == err.success()) { return err; } + for (boost::uuids::uuid const& child_id : m_task_graph.get_child_tasks(task_id)) { + if (task_set.contains(child_id)) { + continue; + } + std::optional optional_child_task = m_task_graph.get_task(child_id); + if (false == optional_child_task.has_value()) { + return StorageErr{ + StorageErrType::KeyNotFoundErr, + fmt::format("No task with id {}", to_string(child_id)) + }; + } + Task const& child_task = *optional_child_task.value(); + if (TaskState::Pending != child_task.get_state()) { + working_queue.push_back(child_id); + task_set.insert(child_id); + } + } if (not_persisted) { std::vector const parents = m_task_graph.get_parent_tasks(task_id); if (parents.empty()) { From 0307ddabd5e4eea5b70d70aae46580be64bce30f Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 28 May 2025 16:11:16 -0400 Subject: [PATCH 29/45] Break up job recovery step into separate function --- src/spider/core/JobRecovery.cpp | 149 +++++++++++++-------------- src/spider/core/JobRecovery.hpp | 26 ++++- tests/scheduler/test-JobRecovery.cpp | 8 +- 3 files changed, 100 insertions(+), 83 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 889bde21..8a423ec0 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -36,85 +36,20 @@ auto JobRecovery::compute_graph() -> StorageErr { return err; } - absl::flat_hash_set task_set; for (auto const& [task_id, task] : m_task_graph.get_tasks()) { if (TaskState::Failed == task.get_state()) { - task_set.insert(task_id); + m_task_set.insert(task_id); + m_task_queue.push_front(task_id); } } - absl::flat_hash_set ready_task_set; - absl::flat_hash_set pending_task_set; - // For each task pop from the set, check if its inputs contains non-persisted Data. - // Add the non-pending children of the task to the working queue. - // If the task has non-persisted Data input and has parents, add it to pending tasks and add - // its parents to the working queue. If the task has non-persisted Data input and has no - // parents, or the task has all its inputs persisted, add it to ready tasks. - std::deque working_queue; - for (auto const& task_id : task_set) { - working_queue.push_back(task_id); - } - while (!working_queue.empty()) { - auto const task_id = working_queue.front(); - working_queue.pop_front(); - std::optional optional_task = m_task_graph.get_task(task_id); - if (false == optional_task.has_value()) { - return StorageErr{ - StorageErrType::KeyNotFoundErr, - fmt::format("No task with id {}", to_string(task_id)) - }; - } - Task const& task = *optional_task.value(); - bool not_persisted = false; - err = check_task_input(task, not_persisted); + while (!m_task_queue.empty()) { + auto const task_id = m_task_queue.front(); + m_task_queue.pop_front(); + err = process_task(task_id); if (false == err.success()) { return err; } - for (boost::uuids::uuid const& child_id : m_task_graph.get_child_tasks(task_id)) { - if (task_set.contains(child_id)) { - continue; - } - std::optional optional_child_task = m_task_graph.get_task(child_id); - if (false == optional_child_task.has_value()) { - return StorageErr{ - StorageErrType::KeyNotFoundErr, - fmt::format("No task with id {}", to_string(child_id)) - }; - } - Task const& child_task = *optional_child_task.value(); - if (TaskState::Pending != child_task.get_state()) { - working_queue.push_back(child_id); - task_set.insert(child_id); - } - } - if (not_persisted) { - std::vector const parents = m_task_graph.get_parent_tasks(task_id); - if (parents.empty()) { - ready_task_set.insert(task_id); - } else { - pending_task_set.insert(task_id); - for (auto const& parent_id : parents) { - if (false == task_set.contains(parent_id)) { - working_queue.push_back(parent_id); - task_set.insert(parent_id); - } - } - } - } else { - ready_task_set.insert(task_id); - } - } - - // Set the pending and ready tasks - m_pending_tasks.clear(); - m_pending_tasks.reserve(pending_task_set.size()); - for (auto const& task_id : pending_task_set) { - m_pending_tasks.push_back(task_id); - } - m_ready_tasks.clear(); - m_ready_tasks.reserve(ready_task_set.size()); - for (auto const& task_id : ready_task_set) { - m_ready_tasks.push_back(task_id); } return StorageErr{}; @@ -155,11 +90,75 @@ auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> Sto return StorageErr{}; } -auto JobRecovery::get_pending_tasks() -> std::vector const& { - return m_pending_tasks; +auto JobRecovery::process_task(boost::uuids::uuid task_id) -> StorageErr { + std::optional const optional_task = m_task_graph.get_task(task_id); + if (false == optional_task.has_value()) { + return StorageErr{ + StorageErrType::KeyNotFoundErr, + fmt::format("No task with id {}", to_string(task_id)) + }; + } + + for (boost::uuids::uuid const& child_id : m_task_graph.get_child_tasks(task_id)) { + if (m_task_set.contains(child_id)) { + continue; + } + std::optional optional_child_task = m_task_graph.get_task(child_id); + if (false == optional_child_task.has_value()) { + return StorageErr{ + StorageErrType::KeyNotFoundErr, + fmt::format("No task with id {}", to_string(child_id)) + }; + } + Task const& child_task = *optional_child_task.value(); + if (TaskState::Pending != child_task.get_state()) { + m_task_queue.push_back(child_id); + m_task_set.insert(child_id); + } + } + + Task const& task = *optional_task.value(); + bool not_persisted = false; + StorageErr err = check_task_input(task, not_persisted); + if (false == err.success()) { + return err; + } + + if (not_persisted) { + std::vector const parents = m_task_graph.get_parent_tasks(task_id); + if (parents.empty()) { + m_ready_tasks.insert(task_id); + } else { + m_pending_tasks.insert(task_id); + for (auto const& parent_id : parents) { + if (false == m_task_set.contains(parent_id)) { + m_task_queue.push_back(parent_id); + m_task_set.insert(parent_id); + } + } + } + } else { + m_ready_tasks.insert(task_id); + } + + return StorageErr{}; +} + +auto JobRecovery::get_pending_tasks() -> std::vector { + std::vector pending_tasks; + pending_tasks.reserve(m_pending_tasks.size()); + for (auto const& task_id : m_pending_tasks) { + pending_tasks.push_back(task_id); + } + return pending_tasks; } -auto JobRecovery::get_ready_tasks() -> std::vector const& { - return m_ready_tasks; +auto JobRecovery::get_ready_tasks() -> std::vector { + std::vector ready_tasks; + ready_tasks.reserve(m_ready_tasks.size()); + for (auto const& task_id : m_ready_tasks) { + ready_tasks.push_back(task_id); + } + return ready_tasks; } } // namespace spider::core diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index a314df3c..2cbd4cf1 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -1,10 +1,12 @@ #ifndef SPIDER_CORE_JOBRECOVERY_HPP #define SPIDER_CORE_JOBRECOVERY_HPP +#include #include #include #include +#include #include #include @@ -36,9 +38,9 @@ class JobRecovery { */ auto compute_graph() -> StorageErr; - auto get_ready_tasks() -> std::vector const&; + auto get_ready_tasks() -> std::vector; - auto get_pending_tasks() -> std::vector const&; + auto get_pending_tasks() -> std::vector; private: /** @@ -60,6 +62,20 @@ class JobRecovery { */ auto get_data(boost::uuids::uuid data_id, Data& data) -> StorageErr; + /* + * Process the task from the task queue with the given task_id. + * 1. Add the non-pending children of the task to the working queue. + * 2. Check if its inputs contains non-persisted Data. + * 3. If the task has non-persisted Data input and has parents, add it to pending tasks and add + * its parents to the working queue. + * 4. If the task has non-persisted Data input and has no parents, or the task has all its + * inputs persisted, add it to ready tasks. + * + * @param task_id + * @return StorageErr + */ + auto process_task(boost::uuids::uuid task_id) -> StorageErr; + boost::uuids::uuid m_job_id; std::shared_ptr m_conn; @@ -70,8 +86,10 @@ class JobRecovery { TaskGraph m_task_graph; - std::vector m_ready_tasks; - std::vector m_pending_tasks; + absl::flat_hash_set m_task_set; + std::deque m_task_queue; + absl::flat_hash_set m_ready_tasks; + absl::flat_hash_set m_pending_tasks; }; } // namespace spider::core diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 4205f365..8c1554a7 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -112,8 +112,8 @@ TEMPLATE_LIST_TEST_CASE( // Recover the job spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; REQUIRE(recovery.compute_graph().success()); - auto const& ready_tasks = recovery.get_ready_tasks(); - auto const& pending_tasks = recovery.get_pending_tasks(); + auto ready_tasks = recovery.get_ready_tasks(); + auto pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(pending_tasks.empty()); REQUIRE(ready_tasks[0] == task.get_id()); @@ -165,8 +165,8 @@ TEMPLATE_LIST_TEST_CASE( // Recover the job spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; REQUIRE(recovery.compute_graph().success()); - auto const& ready_tasks = recovery.get_ready_tasks(); - auto const& pending_tasks = recovery.get_pending_tasks(); + auto ready_tasks = recovery.get_ready_tasks(); + auto pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(pending_tasks.empty()); REQUIRE(ready_tasks[0] == task.get_id()); From 0ef1aa22227651e6f403f599720b2e5635d46a8b Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Wed, 28 May 2025 16:23:45 -0400 Subject: [PATCH 30/45] Fix clang-tidy --- tests/scheduler/test-JobRecovery.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 8c1554a7..5264b8e6 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -1,9 +1,6 @@ // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) -#include #include -#include -#include #include #include @@ -94,7 +91,7 @@ TEMPLATE_LIST_TEST_CASE( boost::uuids::uuid const client_id = gen(); // Submit task without data spider::core::Task task{"task"}; - spider::core::Data data{"data"}; + spider::core::Data const data{"data"}; REQUIRE(metadata_store->add_driver(*conn, spider::core::Driver{client_id}).success()); REQUIRE(data_store->add_driver_data(*conn, client_id, data).success()); task.add_input(spider::core::TaskInput{data.get_id()}); From 8838cda44b7f0dcec22d0c06da5dd256ec7dcd52 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 11:29:16 -0400 Subject: [PATCH 31/45] Fix typo --- src/spider/core/JobRecovery.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 8a423ec0..c5d3b1c2 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -70,11 +70,11 @@ auto JobRecovery::get_data(boost::uuids::uuid data_id, Data& data) -> StorageErr auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> StorageErr { for (auto const& task_input : task.get_inputs()) { - std::optional optional_date_id = task_input.get_data_id(); - if (false == optional_date_id.has_value()) { + std::optional optional_data_id = task_input.get_data_id(); + if (false == optional_data_id.has_value()) { continue; } - boost::uuids::uuid const data_id = optional_date_id.value(); + boost::uuids::uuid const data_id = optional_data_id.value(); Data data; StorageErr err = get_data(data_id, data); if (false == err.success()) { From 7172eee5db5d43870bba904d89d20f7b54d8cb3c Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 11:29:36 -0400 Subject: [PATCH 32/45] Add multiple task recovery test --- tests/scheduler/test-JobRecovery.cpp | 127 +++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 5264b8e6..f1e8379f 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -171,6 +171,133 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(metadata_store->remove_job(*conn, job_id).success()); REQUIRE(data_store->remove_data(*conn, data.get_id()).success()); } + +/** + * Recovers a job with multiple tasks. The task graph is: + * T1 T2 + * / \ / \ + * / \/ \ + * T3 T4 T5 + * . . . + * | | | + * T6 T7 T8 + */ +TEMPLATE_LIST_TEST_CASE( + "Recovery multiple tasks", + "[storage]", + spider::test::StorageFactoryTypeList +) { + std::shared_ptr const storage_factory + = spider::test::create_storage_factory(); + std::shared_ptr const metadata_store + = storage_factory->provide_metadata_storage(); + std::shared_ptr const data_store + = storage_factory->provide_data_storage(); + + std::variant, spider::core::StorageErr> + conn_result = storage_factory->provide_storage_connection(); + REQUIRE(std::holds_alternative>(conn_result)); + std::shared_ptr const conn + = std::move(std::get>(conn_result)); + + boost::uuids::random_generator gen; + + boost::uuids::uuid const job_id = gen(); + boost::uuids::uuid const client_id = gen(); + // Build task graph with multiple tasks + spider::core::Task task1{"task1"}; + task1.add_input(spider::core::TaskInput{"10", "int"}); + spider::core::Data data1{"data1"}; + data1.set_persisted(true); + task1.add_output(spider::core::TaskOutput{data1.get_id()}); + spider::core::Task task2{"task2"}; + task2.add_input(spider::core::TaskInput{"10", "int"}); + spider::core::Data data2{"data2"}; + data2.set_persisted(true); + task2.add_output(spider::core::TaskOutput{data2.get_id()}); + spider::core::Task task3{"task3"}; + task3.add_input(spider::core::TaskInput{data1.get_id()}); + spider::core::Data const data3{"data3"}; + task3.add_output(spider::core::TaskOutput{data3.get_id()}); + spider::core::Task task4{"task4"}; + task4.add_input(spider::core::TaskInput{data1.get_id()}); + task4.add_input(spider::core::TaskInput{data2.get_id()}); + spider::core::Data const data4{"data4"}; + task4.add_output(spider::core::TaskOutput{data4.get_id()}); + spider::core::Task task5{"task5"}; + task5.add_input(spider::core::TaskInput{data2.get_id()}); + spider::core::Data const data5{"data5"}; + task5.add_output(spider::core::TaskOutput{data5.get_id()}); + spider::core::Task task6{"task6"}; + task6.add_input(spider::core::TaskInput{data3.get_id()}); + task6.add_output(spider::core::TaskOutput{"int"}); + spider::core::Task task7{"task7"}; + task7.add_input(spider::core::TaskInput{data4.get_id()}); + task7.add_output(spider::core::TaskOutput{"int"}); + spider::core::Task task8{"task8"}; + task8.add_input(spider::core::TaskInput{data5.get_id()}); + task8.add_output(spider::core::TaskOutput{"int"}); + spider::core::TaskGraph graph; + graph.add_task(task1); + graph.add_task(task2); + graph.add_task(task3); + graph.add_task(task4); + graph.add_task(task5); + graph.add_task(task6); + graph.add_task(task7); + graph.add_task(task8); + graph.add_input_task(task1.get_id()); + graph.add_input_task(task2.get_id()); + graph.add_output_task(task6.get_id()); + graph.add_output_task(task7.get_id()); + graph.add_output_task(task8.get_id()); + graph.add_dependency(task1.get_id(), task3.get_id()); + graph.add_dependency(task1.get_id(), task4.get_id()); + graph.add_dependency(task2.get_id(), task4.get_id()); + graph.add_dependency(task2.get_id(), task5.get_id()); + graph.add_dependency(task3.get_id(), task6.get_id()); + graph.add_dependency(task4.get_id(), task7.get_id()); + graph.add_dependency(task5.get_id(), task8.get_id()); + REQUIRE(metadata_store->add_driver(*conn, spider::core::Driver{client_id}).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data1).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data2).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data3).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data4).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data5).success()); + REQUIRE(metadata_store->add_job(*conn, job_id, client_id, graph).success()); + REQUIRE(metadata_store->set_task_state(*conn, task1.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task2.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task3.get_id(), spider::core::TaskState::Ready) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task4.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task5.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task6.get_id(), spider::core::TaskState::Pending) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task7.get_id(), spider::core::TaskState::Ready) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task8.get_id(), spider::core::TaskState::Failed) + .success()); + + spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; + REQUIRE(recovery.compute_graph().success()); + auto ready_tasks = recovery.get_ready_tasks(); + auto pending_tasks = recovery.get_pending_tasks(); + REQUIRE(ready_tasks.size() == 1); + REQUIRE(ready_tasks[0] == task5.get_id()); + REQUIRE(pending_tasks.size() == 1); + REQUIRE(pending_tasks[0] == task8.get_id()); + + REQUIRE(metadata_store->remove_job(*conn, job_id).success()); + REQUIRE(data_store->remove_data(*conn, data1.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data2.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data3.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data4.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data5.get_id()).success()); +} } // namespace // NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) From b09b56aabebba2b64dc0adfe5c24805af6e41d07 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 11:30:26 -0400 Subject: [PATCH 33/45] Add cleanup for tests --- tests/scheduler/test-JobRecovery.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index f1e8379f..c3318fe4 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -297,6 +297,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(data_store->remove_data(*conn, data3.get_id()).success()); REQUIRE(data_store->remove_data(*conn, data4.get_id()).success()); REQUIRE(data_store->remove_data(*conn, data5.get_id()).success()); + REQUIRE(metadata_store->remove_driver(*conn, client_id).success()); } } // namespace From cfb0159b61dba28d43256f13be5634abff496019 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 11:35:30 -0400 Subject: [PATCH 34/45] Add driver removel in tests --- tests/scheduler/test-JobRecovery.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index c3318fe4..2fa179cf 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -65,6 +65,7 @@ TEMPLATE_LIST_TEST_CASE("Recovery single task", "[storage]", spider::test::Stora REQUIRE(ready_tasks[0] == task.get_id()); REQUIRE(metadata_store->remove_job(*conn, job_id).success()); + REQUIRE(metadata_store->remove_driver(*conn, client_id).success()); } TEMPLATE_LIST_TEST_CASE( @@ -117,6 +118,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(metadata_store->remove_job(*conn, job_id).success()); REQUIRE(data_store->remove_data(*conn, data.get_id()).success()); + REQUIRE(metadata_store->remove_driver(*conn, client_id).success()); } TEMPLATE_LIST_TEST_CASE( @@ -170,6 +172,7 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(metadata_store->remove_job(*conn, job_id).success()); REQUIRE(data_store->remove_data(*conn, data.get_id()).success()); + REQUIRE(metadata_store->remove_driver(*conn, client_id).success()); } /** From 5a2d396788d545aa1df61b8816cdcfc8ea472d4b Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 11:51:55 -0400 Subject: [PATCH 35/45] Add more tests for job recovery --- tests/scheduler/test-JobRecovery.cpp | 131 +++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 2fa179cf..703f88c7 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -1,6 +1,7 @@ // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) #include +#include #include #include @@ -302,6 +303,136 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(data_store->remove_data(*conn, data5.get_id()).success()); REQUIRE(metadata_store->remove_driver(*conn, client_id).success()); } + +/** + * Recovers a job with multiple tasks. The task graph is: + * T1 T2 + * / \ . . + * / \/ \ + * T3 T4 T5 + * . . . + * | | | + * T6 T7 T8 + */ +TEMPLATE_LIST_TEST_CASE( + "Recovery multiple tasks with children", + "[storage]", + spider::test::StorageFactoryTypeList +) { + std::shared_ptr const storage_factory + = spider::test::create_storage_factory(); + std::shared_ptr const metadata_store + = storage_factory->provide_metadata_storage(); + std::shared_ptr const data_store + = storage_factory->provide_data_storage(); + + std::variant, spider::core::StorageErr> + conn_result = storage_factory->provide_storage_connection(); + REQUIRE(std::holds_alternative>(conn_result)); + std::shared_ptr const conn + = std::move(std::get>(conn_result)); + + boost::uuids::random_generator gen; + + boost::uuids::uuid const job_id = gen(); + boost::uuids::uuid const client_id = gen(); + // Build task graph with multiple tasks + spider::core::Task task1{"task1"}; + task1.add_input(spider::core::TaskInput{"10", "int"}); + spider::core::Data data1{"data1"}; + data1.set_persisted(true); + task1.add_output(spider::core::TaskOutput{data1.get_id()}); + spider::core::Task task2{"task2"}; + task2.add_input(spider::core::TaskInput{"10", "int"}); + spider::core::Data data2{"data2"}; + task2.add_output(spider::core::TaskOutput{data2.get_id()}); + spider::core::Task task3{"task3"}; + task3.add_input(spider::core::TaskInput{data1.get_id()}); + spider::core::Data const data3{"data3"}; + task3.add_output(spider::core::TaskOutput{data3.get_id()}); + spider::core::Task task4{"task4"}; + task4.add_input(spider::core::TaskInput{data1.get_id()}); + task4.add_input(spider::core::TaskInput{data2.get_id()}); + spider::core::Data const data4{"data4"}; + task4.add_output(spider::core::TaskOutput{data4.get_id()}); + spider::core::Task task5{"task5"}; + task5.add_input(spider::core::TaskInput{data2.get_id()}); + spider::core::Data const data5{"data5"}; + task5.add_output(spider::core::TaskOutput{data5.get_id()}); + spider::core::Task task6{"task6"}; + task6.add_input(spider::core::TaskInput{data3.get_id()}); + task6.add_output(spider::core::TaskOutput{"int"}); + spider::core::Task task7{"task7"}; + task7.add_input(spider::core::TaskInput{data4.get_id()}); + task7.add_output(spider::core::TaskOutput{"int"}); + spider::core::Task task8{"task8"}; + task8.add_input(spider::core::TaskInput{data5.get_id()}); + task8.add_output(spider::core::TaskOutput{"int"}); + spider::core::TaskGraph graph; + graph.add_task(task1); + graph.add_task(task2); + graph.add_task(task3); + graph.add_task(task4); + graph.add_task(task5); + graph.add_task(task6); + graph.add_task(task7); + graph.add_task(task8); + graph.add_input_task(task1.get_id()); + graph.add_input_task(task2.get_id()); + graph.add_output_task(task6.get_id()); + graph.add_output_task(task7.get_id()); + graph.add_output_task(task8.get_id()); + graph.add_dependency(task1.get_id(), task3.get_id()); + graph.add_dependency(task1.get_id(), task4.get_id()); + graph.add_dependency(task2.get_id(), task4.get_id()); + graph.add_dependency(task2.get_id(), task5.get_id()); + graph.add_dependency(task3.get_id(), task6.get_id()); + graph.add_dependency(task4.get_id(), task7.get_id()); + graph.add_dependency(task5.get_id(), task8.get_id()); + REQUIRE(metadata_store->add_driver(*conn, spider::core::Driver{client_id}).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data1).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data2).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data3).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data4).success()); + REQUIRE(data_store->add_driver_data(*conn, client_id, data5).success()); + REQUIRE(metadata_store->add_job(*conn, job_id, client_id, graph).success()); + REQUIRE(metadata_store->set_task_state(*conn, task1.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task2.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task3.get_id(), spider::core::TaskState::Ready) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task4.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task5.get_id(), spider::core::TaskState::Succeed) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task6.get_id(), spider::core::TaskState::Pending) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task7.get_id(), spider::core::TaskState::Ready) + .success()); + REQUIRE(metadata_store->set_task_state(*conn, task8.get_id(), spider::core::TaskState::Failed) + .success()); + + spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; + REQUIRE(recovery.compute_graph().success()); + auto ready_tasks = recovery.get_ready_tasks(); + auto pending_tasks = recovery.get_pending_tasks(); + REQUIRE(ready_tasks.size() == 1); + REQUIRE(ready_tasks[0] == task2.get_id()); + REQUIRE(pending_tasks.size() == 4); + REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task4.get_id())); + REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task5.get_id())); + REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task7.get_id())); + REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task8.get_id())); + + REQUIRE(metadata_store->remove_job(*conn, job_id).success()); + REQUIRE(data_store->remove_data(*conn, data1.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data2.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data3.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data4.get_id()).success()); + REQUIRE(data_store->remove_data(*conn, data5.get_id()).success()); + REQUIRE(metadata_store->remove_driver(*conn, client_id).success()); +} } // namespace // NOLINTEND(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) From 084adc87acc8a1edaf21228e725608c9189950f2 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 14:13:13 -0400 Subject: [PATCH 36/45] Fix not updating task state to ready when multiple children exists --- src/spider/storage/mysql/MySqlStorage.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index c2768974..21154ec6 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1775,10 +1775,10 @@ auto MySqlMetadataStorage::task_finish( std::unique_ptr ready_statement( static_cast(conn)->prepareStatement( "UPDATE `tasks` SET `state` = 'ready' WHERE `id` IN (SELECT `task_id` FROM " - "`task_inputs` WHERE `output_task_id` = ?) AND `state` = 'pending' AND NOT " - "EXISTS (SELECT `task_id` FROM `task_inputs` WHERE `task_id` IN (SELECT " - "`task_id` FROM `task_inputs` WHERE `output_task_id` = ?) AND `value` IS " - "NULL AND `data_id` IS NULL)" + "`task_inputs` WHERE `output_task_id` = ?) AND `state` = 'pending' AND " + "`id` NOT IN (SELECT `task_id` FROM `task_inputs` WHERE `task_id` IN " + "(SELECT `task_id` FROM `task_inputs` WHERE `output_task_id` = ?) AND " + "`value` IS NULL AND `data_id` IS NULL)" ) ); ready_statement->setBytes(1, &task_id_bytes); From 0e96dc7a281d88f53a862f7acf1bec325e92210a Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 14:13:47 -0400 Subject: [PATCH 37/45] Fix multiple parents --- src/spider/core/JobRecovery.cpp | 43 ++++++++++++++++++--------------- src/spider/core/JobRecovery.hpp | 13 +++++----- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index c5d3b1c2..4eceb1f0 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -1,8 +1,10 @@ #include "JobRecovery.hpp" +#include #include #include #include +#include #include #include @@ -68,7 +70,10 @@ auto JobRecovery::get_data(boost::uuids::uuid data_id, Data& data) -> StorageErr return err; } -auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> StorageErr { +auto JobRecovery::check_task_input( + Task const& task, + absl::flat_hash_set& not_persisted +) -> StorageErr { for (auto const& task_input : task.get_inputs()) { std::optional optional_data_id = task_input.get_data_id(); if (false == optional_data_id.has_value()) { @@ -80,13 +85,16 @@ auto JobRecovery::check_task_input(Task const& task, bool& not_persisted) -> Sto if (false == err.success()) { return err; } - if (data.is_persisted()) { - continue; + if (false == data.is_persisted()) { + std::optional> optional_parent + = task_input.get_task_output(); + if (false == optional_parent.has_value()) { + continue; + } + boost::uuids::uuid const parent_task_id = std::get<0>(optional_parent.value()); + not_persisted.insert(parent_task_id); } - not_persisted = true; - return StorageErr{}; } - not_persisted = false; return StorageErr{}; } @@ -118,27 +126,22 @@ auto JobRecovery::process_task(boost::uuids::uuid task_id) -> StorageErr { } Task const& task = *optional_task.value(); - bool not_persisted = false; + absl::flat_hash_set not_persisted; StorageErr err = check_task_input(task, not_persisted); if (false == err.success()) { return err; } - if (not_persisted) { - std::vector const parents = m_task_graph.get_parent_tasks(task_id); - if (parents.empty()) { - m_ready_tasks.insert(task_id); - } else { - m_pending_tasks.insert(task_id); - for (auto const& parent_id : parents) { - if (false == m_task_set.contains(parent_id)) { - m_task_queue.push_back(parent_id); - m_task_set.insert(parent_id); - } + if (not_persisted.empty()) { + m_ready_tasks.insert(task_id); + } else { + m_pending_tasks.insert(task_id); + for (auto const& parent_id : not_persisted) { + if (false == m_task_set.contains(parent_id)) { + m_task_queue.push_back(parent_id); + m_task_set.insert(parent_id); } } - } else { - m_ready_tasks.insert(task_id); } return StorageErr{}; diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index 2cbd4cf1..c56c3be9 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -44,13 +44,13 @@ class JobRecovery { private: /** - * Check if any of the task input is not persisted. + * Check if task has any parents with non-persisted Data that feed into the task. * @param task - * @param not_persisted Returns true if any of the task input is not - * persisted, false otherwise. + * @param not_persisted Returns parents with non-persisted Data that feed into the task. * @return */ - auto check_task_input(Task const& task, bool& not_persisted) -> StorageErr; + auto check_task_input(Task const& task, absl::flat_hash_set& not_persisted) + -> StorageErr; /** * Get the data associated with the given data_id. If the data is cached in @@ -67,9 +67,8 @@ class JobRecovery { * 1. Add the non-pending children of the task to the working queue. * 2. Check if its inputs contains non-persisted Data. * 3. If the task has non-persisted Data input and has parents, add it to pending tasks and add - * its parents to the working queue. - * 4. If the task has non-persisted Data input and has no parents, or the task has all its - * inputs persisted, add it to ready tasks. + * its parents with non-persistent Data to the working queue. + * 4. Otherwise, add it to ready tasks. * * @param task_id * @return StorageErr From cdc841f1b6e2a683683de2e2ac7ca9a6c7fd948d Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 14:13:58 -0400 Subject: [PATCH 38/45] Add more tests --- tests/scheduler/test-JobRecovery.cpp | 117 ++++++++++++++++++--------- 1 file changed, 81 insertions(+), 36 deletions(-) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 703f88c7..1568c5da 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -220,26 +220,26 @@ TEMPLATE_LIST_TEST_CASE( data2.set_persisted(true); task2.add_output(spider::core::TaskOutput{data2.get_id()}); spider::core::Task task3{"task3"}; - task3.add_input(spider::core::TaskInput{data1.get_id()}); + task3.add_input(spider::core::TaskInput{task1.get_id(), 0, ""}); spider::core::Data const data3{"data3"}; task3.add_output(spider::core::TaskOutput{data3.get_id()}); spider::core::Task task4{"task4"}; - task4.add_input(spider::core::TaskInput{data1.get_id()}); - task4.add_input(spider::core::TaskInput{data2.get_id()}); + task4.add_input(spider::core::TaskInput{task1.get_id(), 0, ""}); + task4.add_input(spider::core::TaskInput{task2.get_id(), 0, ""}); spider::core::Data const data4{"data4"}; task4.add_output(spider::core::TaskOutput{data4.get_id()}); spider::core::Task task5{"task5"}; - task5.add_input(spider::core::TaskInput{data2.get_id()}); + task5.add_input(spider::core::TaskInput{task2.get_id(), 0, ""}); spider::core::Data const data5{"data5"}; task5.add_output(spider::core::TaskOutput{data5.get_id()}); spider::core::Task task6{"task6"}; - task6.add_input(spider::core::TaskInput{data3.get_id()}); + task6.add_input(spider::core::TaskInput{task3.get_id(), 0, ""}); task6.add_output(spider::core::TaskOutput{"int"}); spider::core::Task task7{"task7"}; - task7.add_input(spider::core::TaskInput{data4.get_id()}); + task7.add_input(spider::core::TaskInput{task4.get_id(), 0, ""}); task7.add_output(spider::core::TaskOutput{"int"}); spider::core::Task task8{"task8"}; - task8.add_input(spider::core::TaskInput{data5.get_id()}); + task8.add_input(spider::core::TaskInput{task5.get_id(), 0, ""}); task8.add_output(spider::core::TaskOutput{"int"}); spider::core::TaskGraph graph; graph.add_task(task1); @@ -269,20 +269,47 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(data_store->add_driver_data(*conn, client_id, data4).success()); REQUIRE(data_store->add_driver_data(*conn, client_id, data5).success()); REQUIRE(metadata_store->add_job(*conn, job_id, client_id, graph).success()); - REQUIRE(metadata_store->set_task_state(*conn, task1.get_id(), spider::core::TaskState::Succeed) + REQUIRE(metadata_store->set_task_running(*conn, task1.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task1.get_id()}, + {spider::core::TaskOutput{data1.get_id()}} + ) .success()); - REQUIRE(metadata_store->set_task_state(*conn, task2.get_id(), spider::core::TaskState::Succeed) + REQUIRE(metadata_store->set_task_running(*conn, task2.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task2.get_id()}, + {spider::core::TaskOutput{data2.get_id()}} + ) .success()); - REQUIRE(metadata_store->set_task_state(*conn, task3.get_id(), spider::core::TaskState::Ready) + REQUIRE(metadata_store->set_task_running(*conn, task3.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task3.get_id()}, + {spider::core::TaskOutput{data3.get_id()}} + ) .success()); - REQUIRE(metadata_store->set_task_state(*conn, task4.get_id(), spider::core::TaskState::Succeed) + REQUIRE(metadata_store->set_task_running(*conn, task4.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task4.get_id()}, + {spider::core::TaskOutput{data4.get_id()}} + ) .success()); - REQUIRE(metadata_store->set_task_state(*conn, task5.get_id(), spider::core::TaskState::Succeed) - .success()); - REQUIRE(metadata_store->set_task_state(*conn, task6.get_id(), spider::core::TaskState::Pending) - .success()); - REQUIRE(metadata_store->set_task_state(*conn, task7.get_id(), spider::core::TaskState::Ready) + REQUIRE(metadata_store->set_task_running(*conn, task5.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task5.get_id()}, + {spider::core::TaskOutput{data5.get_id()}} + ) .success()); + REQUIRE(metadata_store->set_task_state(*conn, task8.get_id(), spider::core::TaskState::Failed) .success()); @@ -344,29 +371,29 @@ TEMPLATE_LIST_TEST_CASE( task1.add_output(spider::core::TaskOutput{data1.get_id()}); spider::core::Task task2{"task2"}; task2.add_input(spider::core::TaskInput{"10", "int"}); - spider::core::Data data2{"data2"}; + spider::core::Data const data2{"data2"}; task2.add_output(spider::core::TaskOutput{data2.get_id()}); spider::core::Task task3{"task3"}; - task3.add_input(spider::core::TaskInput{data1.get_id()}); + task3.add_input(spider::core::TaskInput{task1.get_id(), 0, ""}); spider::core::Data const data3{"data3"}; task3.add_output(spider::core::TaskOutput{data3.get_id()}); spider::core::Task task4{"task4"}; - task4.add_input(spider::core::TaskInput{data1.get_id()}); - task4.add_input(spider::core::TaskInput{data2.get_id()}); + task4.add_input(spider::core::TaskInput{task1.get_id(), 0, ""}); + task4.add_input(spider::core::TaskInput{task2.get_id(), 0, ""}); spider::core::Data const data4{"data4"}; task4.add_output(spider::core::TaskOutput{data4.get_id()}); spider::core::Task task5{"task5"}; - task5.add_input(spider::core::TaskInput{data2.get_id()}); + task5.add_input(spider::core::TaskInput{task2.get_id(), 0, ""}); spider::core::Data const data5{"data5"}; task5.add_output(spider::core::TaskOutput{data5.get_id()}); spider::core::Task task6{"task6"}; - task6.add_input(spider::core::TaskInput{data3.get_id()}); + task6.add_input(spider::core::TaskInput{task3.get_id(), 0, ""}); task6.add_output(spider::core::TaskOutput{"int"}); spider::core::Task task7{"task7"}; - task7.add_input(spider::core::TaskInput{data4.get_id()}); + task7.add_input(spider::core::TaskInput{task4.get_id(), 0, ""}); task7.add_output(spider::core::TaskOutput{"int"}); spider::core::Task task8{"task8"}; - task8.add_input(spider::core::TaskInput{data5.get_id()}); + task8.add_input(spider::core::TaskInput{task5.get_id(), 0, ""}); task8.add_output(spider::core::TaskOutput{"int"}); spider::core::TaskGraph graph; graph.add_task(task1); @@ -396,20 +423,39 @@ TEMPLATE_LIST_TEST_CASE( REQUIRE(data_store->add_driver_data(*conn, client_id, data4).success()); REQUIRE(data_store->add_driver_data(*conn, client_id, data5).success()); REQUIRE(metadata_store->add_job(*conn, job_id, client_id, graph).success()); - REQUIRE(metadata_store->set_task_state(*conn, task1.get_id(), spider::core::TaskState::Succeed) + REQUIRE(metadata_store->set_task_running(*conn, task1.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task1.get_id()}, + {spider::core::TaskOutput{data1.get_id()}} + ) .success()); - REQUIRE(metadata_store->set_task_state(*conn, task2.get_id(), spider::core::TaskState::Succeed) + REQUIRE(metadata_store->set_task_running(*conn, task2.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task2.get_id()}, + {spider::core::TaskOutput{data2.get_id()}} + ) .success()); - REQUIRE(metadata_store->set_task_state(*conn, task3.get_id(), spider::core::TaskState::Ready) + REQUIRE(metadata_store->set_task_running(*conn, task3.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task3.get_id()}, + {spider::core::TaskOutput{data3.get_id()}} + ) .success()); - REQUIRE(metadata_store->set_task_state(*conn, task4.get_id(), spider::core::TaskState::Succeed) - .success()); - REQUIRE(metadata_store->set_task_state(*conn, task5.get_id(), spider::core::TaskState::Succeed) - .success()); - REQUIRE(metadata_store->set_task_state(*conn, task6.get_id(), spider::core::TaskState::Pending) - .success()); - REQUIRE(metadata_store->set_task_state(*conn, task7.get_id(), spider::core::TaskState::Ready) + REQUIRE(metadata_store->set_task_running(*conn, task5.get_id()).success()); + REQUIRE(metadata_store + ->task_finish( + *conn, + spider::core::TaskInstance{task5.get_id()}, + {spider::core::TaskOutput{data5.get_id()}} + ) .success()); + REQUIRE(metadata_store->set_task_state(*conn, task8.get_id(), spider::core::TaskState::Failed) .success()); @@ -419,10 +465,9 @@ TEMPLATE_LIST_TEST_CASE( auto pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(ready_tasks[0] == task2.get_id()); - REQUIRE(pending_tasks.size() == 4); + REQUIRE(pending_tasks.size() == 3); REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task4.get_id())); REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task5.get_id())); - REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task7.get_id())); REQUIRE(pending_tasks.end() != std::ranges::find(pending_tasks, task8.get_id())); REQUIRE(metadata_store->remove_job(*conn, job_id).success()); From 75713042bb97a1700da9107ccbe1ddf707dade3e Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 14:28:22 -0400 Subject: [PATCH 39/45] Fix clang-tidy --- tests/scheduler/test-JobRecovery.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 1568c5da..70cb9bd7 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -1,7 +1,7 @@ // NOLINTBEGIN(cert-err58-cpp,cppcoreguidelines-avoid-do-while,readability-function-cognitive-complexity,cppcoreguidelines-avoid-non-const-global-variables,cppcoreguidelines-avoid-c-arrays,modernize-avoid-c-arrays,clang-analyzer-optin.core.EnumCastOutOfRange) +#include #include -#include #include #include From accbdfde2598cd4fce27ae96aebb05cc209b51e7 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 15:04:24 -0400 Subject: [PATCH 40/45] Use dot graph for docstring --- tests/scheduler/test-JobRecovery.cpp | 62 +++++++++++++++++++++------- 1 file changed, 48 insertions(+), 14 deletions(-) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 70cb9bd7..79dddd38 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -178,13 +178,30 @@ TEMPLATE_LIST_TEST_CASE( /** * Recovers a job with multiple tasks. The task graph is: - * T1 T2 - * / \ / \ - * / \/ \ - * T3 T4 T5 - * . . . - * | | | - * T6 T7 T8 + * \dot + * digraph task_graph { + * node [shape="rect"]; + * 1 [color="green"]; + * 2 [color="green"]; + * 3 [color="green"]; + * 4 [color="green"]; + * 6 [color="blue"]; + * 7 [color="blue"]; + * 1 -> 3; + * 1 -> 4; + * 2 -> 4; + * 2 -> 5; + * 3 -> 6 [style="dashed"]; + * 4 -> 7 [style="dashed"]; + * subgraph cluster_recovery { + * style=filled; + * color=yellow; + * 5 [color="green"]; + * 8 [color="red"]; + * 5 -> 8 [style="dashed"]; + * } + * } + * \enddot */ TEMPLATE_LIST_TEST_CASE( "Recovery multiple tasks", @@ -333,13 +350,30 @@ TEMPLATE_LIST_TEST_CASE( /** * Recovers a job with multiple tasks. The task graph is: - * T1 T2 - * / \ . . - * / \/ \ - * T3 T4 T5 - * . . . - * | | | - * T6 T7 T8 + * \dot + * digraph task_graph { + * node [shape="rect"]; + * 1 [color="green"]; + * 3 [color="green"]; + * 6 [color="blue"]; + * 7 [color="blue"]; + * 1 -> 3; + * 1 -> 4; + * 3 -> 6 [style="dashed"]; + * 4 -> 7 [style="dashed"]; + * subgraph cluster_recovery { + * style=filled; + * color=yellow; + * 2 [color="green"]; + * 4 [color="blue"] + * 5 [color="green"]; + * 8 [color="red"]; + * 2 -> 4 [style="dashed"]; + * 2 -> 5 [style="dashed"]; + * 5 -> 8 [style="dashed"]; + * } + * } + * \enddot */ TEMPLATE_LIST_TEST_CASE( "Recovery multiple tasks with children", From bd577498bfd6d7797d434bfea9a5117973d281b2 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 16:41:38 -0400 Subject: [PATCH 41/45] Make JobRecovery getters const --- src/spider/core/JobRecovery.cpp | 4 ++-- src/spider/core/JobRecovery.hpp | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/spider/core/JobRecovery.cpp b/src/spider/core/JobRecovery.cpp index 4eceb1f0..62da1de8 100644 --- a/src/spider/core/JobRecovery.cpp +++ b/src/spider/core/JobRecovery.cpp @@ -147,7 +147,7 @@ auto JobRecovery::process_task(boost::uuids::uuid task_id) -> StorageErr { return StorageErr{}; } -auto JobRecovery::get_pending_tasks() -> std::vector { +auto JobRecovery::get_pending_tasks() const -> std::vector { std::vector pending_tasks; pending_tasks.reserve(m_pending_tasks.size()); for (auto const& task_id : m_pending_tasks) { @@ -156,7 +156,7 @@ auto JobRecovery::get_pending_tasks() -> std::vector { return pending_tasks; } -auto JobRecovery::get_ready_tasks() -> std::vector { +auto JobRecovery::get_ready_tasks() const -> std::vector { std::vector ready_tasks; ready_tasks.reserve(m_ready_tasks.size()); for (auto const& task_id : m_ready_tasks) { diff --git a/src/spider/core/JobRecovery.hpp b/src/spider/core/JobRecovery.hpp index c56c3be9..bfe51b41 100644 --- a/src/spider/core/JobRecovery.hpp +++ b/src/spider/core/JobRecovery.hpp @@ -38,9 +38,9 @@ class JobRecovery { */ auto compute_graph() -> StorageErr; - auto get_ready_tasks() -> std::vector; + [[nodiscard]] auto get_ready_tasks() const -> std::vector; - auto get_pending_tasks() -> std::vector; + [[nodiscard]] auto get_pending_tasks() const -> std::vector; private: /** From d32dc7f81b10e60f3ea3f262d60782221f278335 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 16:43:54 -0400 Subject: [PATCH 42/45] Use const for ready and pending tasks in test --- tests/scheduler/test-JobRecovery.cpp | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/scheduler/test-JobRecovery.cpp b/tests/scheduler/test-JobRecovery.cpp index 79dddd38..9d27bdab 100644 --- a/tests/scheduler/test-JobRecovery.cpp +++ b/tests/scheduler/test-JobRecovery.cpp @@ -59,8 +59,8 @@ TEMPLATE_LIST_TEST_CASE("Recovery single task", "[storage]", spider::test::Stora // Recover the job spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; REQUIRE(recovery.compute_graph().success()); - auto const& ready_tasks = recovery.get_ready_tasks(); - auto const& pending_tasks = recovery.get_pending_tasks(); + auto const ready_tasks = recovery.get_ready_tasks(); + auto const pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(pending_tasks.empty()); REQUIRE(ready_tasks[0] == task.get_id()); @@ -111,8 +111,8 @@ TEMPLATE_LIST_TEST_CASE( // Recover the job spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; REQUIRE(recovery.compute_graph().success()); - auto ready_tasks = recovery.get_ready_tasks(); - auto pending_tasks = recovery.get_pending_tasks(); + auto const ready_tasks = recovery.get_ready_tasks(); + auto const pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(pending_tasks.empty()); REQUIRE(ready_tasks[0] == task.get_id()); @@ -165,8 +165,8 @@ TEMPLATE_LIST_TEST_CASE( // Recover the job spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; REQUIRE(recovery.compute_graph().success()); - auto ready_tasks = recovery.get_ready_tasks(); - auto pending_tasks = recovery.get_pending_tasks(); + auto const ready_tasks = recovery.get_ready_tasks(); + auto const pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(pending_tasks.empty()); REQUIRE(ready_tasks[0] == task.get_id()); @@ -332,8 +332,8 @@ TEMPLATE_LIST_TEST_CASE( spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; REQUIRE(recovery.compute_graph().success()); - auto ready_tasks = recovery.get_ready_tasks(); - auto pending_tasks = recovery.get_pending_tasks(); + auto const ready_tasks = recovery.get_ready_tasks(); + auto const pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(ready_tasks[0] == task5.get_id()); REQUIRE(pending_tasks.size() == 1); @@ -495,8 +495,8 @@ TEMPLATE_LIST_TEST_CASE( spider::core::JobRecovery recovery{job_id, conn, data_store, metadata_store}; REQUIRE(recovery.compute_graph().success()); - auto ready_tasks = recovery.get_ready_tasks(); - auto pending_tasks = recovery.get_pending_tasks(); + auto const ready_tasks = recovery.get_ready_tasks(); + auto const pending_tasks = recovery.get_pending_tasks(); REQUIRE(ready_tasks.size() == 1); REQUIRE(ready_tasks[0] == task2.get_id()); REQUIRE(pending_tasks.size() == 3); From f412bc385ef1a705ee78ef979eabbe69757853c2 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 16:45:08 -0400 Subject: [PATCH 43/45] Bug fix --- src/spider/scheduler/scheduler.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spider/scheduler/scheduler.cpp b/src/spider/scheduler/scheduler.cpp index 6d235b82..05359a55 100644 --- a/src/spider/scheduler/scheduler.cpp +++ b/src/spider/scheduler/scheduler.cpp @@ -197,7 +197,7 @@ auto recovery_loop( err = metadata_store->reset_tasks( *conn, recovery.get_ready_tasks(), - recovery.get_ready_tasks() + recovery.get_pending_tasks() ); if (false == err.success()) { spdlog::error( From c3f438c33f9b646814b7292c97a21e09a6dec337 Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 16:47:07 -0400 Subject: [PATCH 44/45] Bug fix --- src/spider/storage/mysql/MySqlStorage.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/spider/storage/mysql/MySqlStorage.cpp b/src/spider/storage/mysql/MySqlStorage.cpp index 21154ec6..7fe7049d 100644 --- a/src/spider/storage/mysql/MySqlStorage.cpp +++ b/src/spider/storage/mysql/MySqlStorage.cpp @@ -1275,7 +1275,7 @@ auto MySqlMetadataStorage::reset_tasks( if (std::ranges::find(ready_tasks, task_id) == ready_tasks.end() && std::ranges::find(pending_tasks, task_id) == pending_tasks.end()) { - remove_data_ids.insert(task_id); + remove_data_ids.insert(data_id); } } for (boost::uuids::uuid const& id : remove_data_ids) { From 55bfdaf8bb6d3fb7e3cf34e2f8cd278ea896042b Mon Sep 17 00:00:00 2001 From: sitaowang1998 Date: Thu, 29 May 2025 16:50:53 -0400 Subject: [PATCH 45/45] Fix interval --- src/spider/scheduler/scheduler.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/spider/scheduler/scheduler.cpp b/src/spider/scheduler/scheduler.cpp index 05359a55..8c8f4011 100644 --- a/src/spider/scheduler/scheduler.cpp +++ b/src/spider/scheduler/scheduler.cpp @@ -42,10 +42,10 @@ constexpr int cStorageConnectionErr = 3; constexpr int cSchedulerAddrErr = 4; constexpr int cStorageErr = 5; -constexpr int cCleanupInterval = 1000; +constexpr int cCleanupInterval = 10; constexpr int cRetryCount = 5; -constexpr int cRecoveryInterval = 1000; +constexpr int cRecoveryInterval = 1; namespace { /*