diff --git a/src/runtime/database.cpp b/src/runtime/database.cpp index 4b3305bd7..b4839bcea 100644 --- a/src/runtime/database.cpp +++ b/src/runtime/database.cpp @@ -2268,6 +2268,92 @@ void Database::start_job(long job, int64_t starttime) { end_txn(); } +std::vector Database::get_job_outputs(MatchingQueryFilters filters, + bool leaves) const { + // Build the job ID subquery using the same filter mechanism as matching() + auto id_query = build_matching_id_query(std::move(filters)); + + std::string query; + if (leaves) { + // Per-run leaves: outputs from selected jobs that aren't consumed within their respective runs. + // Always excludes hidden jobs from input consideration. + query = R"( + WITH filtered_jobs AS ( +)" + id_query + + R"( + ), + hidden_jobs AS ( + SELECT job_id FROM tags + WHERE uri = 'inspect.visibility' AND content = 'hidden' + ), + job_runs AS ( + SELECT DISTINCT rj.run_id + FROM run_jobs rj + WHERE rj.job_id IN (SELECT job_id FROM filtered_jobs) + ), + run_inputs AS ( + SELECT DISTINCT f.path, rj.run_id + FROM filetree t + JOIN files f ON f.file_id = t.file_id + JOIN run_jobs rj ON rj.job_id = t.job_id + LEFT JOIN hidden_jobs hj ON hj.job_id = t.job_id + WHERE rj.run_id IN (SELECT run_id FROM job_runs) + AND t.access IN (0, 1) + AND hj.job_id IS NULL + ) + SELECT f.path, f.hash, f.type, f.mode, MAX(t.modified) as modified + FROM filetree t + JOIN files f ON f.file_id = t.file_id + JOIN run_jobs rj ON rj.job_id = t.job_id + LEFT JOIN run_inputs ri ON ri.path = f.path AND ri.run_id = rj.run_id + WHERE t.job_id IN (SELECT job_id FROM filtered_jobs) + AND t.access = 2 + AND ri.path IS NULL + GROUP BY f.path + ORDER BY f.path + )"; + } else { + // All outputs from selected jobs + query = R"( + WITH filtered_jobs AS ( +)" + id_query + + R"( + ) + SELECT f.path, f.hash, f.type, f.mode, MAX(t.modified) as modified + FROM filetree t + JOIN files f ON f.file_id = t.file_id + WHERE t.job_id IN (SELECT job_id FROM filtered_jobs) + AND t.access = 2 + GROUP BY f.path + ORDER BY f.path + )"; + } + + sqlite3_stmt *stmt; + if (sqlite3_prepare_v2(imp->db, query.c_str(), -1, &stmt, 0) != SQLITE_OK) { + std::string err = + std::string("sqlite3_prepare_v2 (get_job_outputs): ") + sqlite3_errmsg(imp->db); + std::cerr << err << std::endl; + return {}; + } + + std::vector out; + begin_ro_txn(); + while (sqlite3_step(stmt) == SQLITE_ROW) { + std::string path = rip_column(stmt, 0); + std::string hash = rip_column(stmt, 1); + std::string type = rip_column(stmt, 2); + long mode = sqlite3_column_int64(stmt, 3); + long modified = sqlite3_column_int64(stmt, 4); + out.emplace_back(std::move(path), std::move(type), std::move(hash), mode, modified); + } + finish_stmt("Could not query job outputs", stmt, imp->debugdb); + sqlite3_finalize(stmt); + end_txn(); + + return out; +} + std::vector> Database::get_interleaved_output(long job_id) const { begin_ro_txn(); auto out = get_interleaved_output_impl(this, job_id); diff --git a/src/runtime/database.h b/src/runtime/database.h index 7f0adf5a8..6bc231e02 100644 --- a/src/runtime/database.h +++ b/src/runtime/database.h @@ -237,6 +237,13 @@ struct Database { // Additional filtering needed to determine if runs are "actually" live. std::vector matching_open_runs(MatchingQueryFilters filters); + // Returns all output files (access=2) for jobs matching the provided filters. + // If leaves=true, only returns outputs that are not consumed as inputs within each job's run. + // Leaves computation always excludes hidden jobs from input consideration. + // Returns outputs ordered by path, with duplicates removed (last modified wins). + std::vector get_job_outputs(MatchingQueryFilters filters, + bool leaves = false) const; + std::vector get_file_dependencies() const; std::vector> get_interleaved_output(long job_id) const; diff --git a/tests/inspection/checkout-to-leaves/.wakeroot b/tests/inspection/checkout-to-leaves/.wakeroot new file mode 100644 index 000000000..0967ef424 --- /dev/null +++ b/tests/inspection/checkout-to-leaves/.wakeroot @@ -0,0 +1 @@ +{} diff --git a/tests/inspection/checkout-to-leaves/pass.sh b/tests/inspection/checkout-to-leaves/pass.sh new file mode 100755 index 000000000..bb3f303a3 --- /dev/null +++ b/tests/inspection/checkout-to-leaves/pass.sh @@ -0,0 +1,27 @@ +#!/bin/sh +# Test --checkout-to with --leaves: with --leaves, only outputs not consumed by +# another job in the same run are materialized. + +set -eu + +WAKE="${1:+$1/wake}" +WAKE="${WAKE:-wake}" + +cleanup() { + rm -rf wake.db* wake.log .wake .build all leaves intermediate.txt final.txt +} +trap cleanup EXIT +cleanup + +# 2-job pipeline: job-a -> intermediate.txt -> job-b -> final.txt. +"${WAKE}" -x 'build Unit' >/dev/null + +# Without --leaves: both intermediate.txt and final.txt are checked out. +echo "== --checkout-to (no --leaves) ==" +"${WAKE}" --checkout-to all +(cd all && find . -mindepth 1 | sort) + +# With --leaves: intermediate.txt is consumed by job-b so it's filtered out. +echo "== --checkout-to --leaves ==" +"${WAKE}" --checkout-to leaves --leaves +(cd leaves && find . -mindepth 1 | sort) diff --git a/tests/inspection/checkout-to-leaves/test.wake b/tests/inspection/checkout-to-leaves/test.wake new file mode 100644 index 000000000..7c3224696 --- /dev/null +++ b/tests/inspection/checkout-to-leaves/test.wake @@ -0,0 +1,21 @@ +package test_wake + +from wake import _ + +# Two-job pipeline: jobA produces intermediate.txt, jobB consumes it and writes final.txt. +# With --leaves, only final.txt is a leaf output; intermediate.txt is consumed within the run. +def jobA _: Result Path Error = + makeExecPlan ("sh", "-c", "printf intermediate-data > intermediate.txt", Nil) Nil + | setPlanLabel "job-a" + | runJobWith defaultRunner + | getJobOutput + +export def build _: Result Path Error = + require Pass intermediate = jobA Unit + + makeExecPlan + ("sh", "-c", "cat intermediate.txt > final.txt && printf X-final >> final.txt", Nil) + (intermediate, Nil) + | setPlanLabel "job-b" + | runJobWith defaultRunner + | getJobOutput diff --git a/tests/inspection/checkout-to/.wakeroot b/tests/inspection/checkout-to/.wakeroot new file mode 100644 index 000000000..0967ef424 --- /dev/null +++ b/tests/inspection/checkout-to/.wakeroot @@ -0,0 +1 @@ +{} diff --git a/tests/inspection/checkout-to/pass.sh b/tests/inspection/checkout-to/pass.sh new file mode 100755 index 000000000..869c99825 --- /dev/null +++ b/tests/inspection/checkout-to/pass.sh @@ -0,0 +1,39 @@ +#!/bin/sh +# Test --checkout-to: materializes job outputs into a destination directory. + +set -eu + +WAKE="${1:+$1/wake}" +WAKE="${WAKE:-wake}" + +cleanup() { + rm -rf wake.db* wake.log .wake .build out out2 nonempty notadir hello.txt +} +trap cleanup EXIT +cleanup + +"${WAKE}" -x 'build Unit' >/dev/null + +# 1. Destination doesn't exist -- created and populated. +echo "== fresh ==" +"${WAKE}" --checkout-to out +(cd out && find . -mindepth 1 | sort) +printf 'content: %s\n' "$(cat out/hello.txt)" + +# 2. Empty pre-existing dir -- also accepted. +echo "== empty existing ==" +mkdir out2 +"${WAKE}" --checkout-to out2 +(cd out2 && find . -mindepth 1 | sort) + +# 3. Non-empty pre-existing dir -- refused, contents untouched. +echo "== non-empty existing ==" +mkdir nonempty +touch nonempty/preexisting +"${WAKE}" --checkout-to nonempty || echo "exit=$?" +(cd nonempty && find . -mindepth 1 | sort) + +# 4. Existing path that isn't a directory -- refused. +echo "== not-a-directory ==" +touch notadir +"${WAKE}" --checkout-to notadir || echo "exit=$?" diff --git a/tests/inspection/checkout-to/test.wake b/tests/inspection/checkout-to/test.wake new file mode 100644 index 000000000..418193f68 --- /dev/null +++ b/tests/inspection/checkout-to/test.wake @@ -0,0 +1,10 @@ +package test_wake + +from wake import _ + +# Single-job build that produces "hello.txt" with known content. +export def build _: Result Path Error = + makeExecPlan ("sh", "-c", "printf hello-world > hello.txt", Nil) Nil + | setPlanLabel "build-hello" + | runJobWith defaultRunner + | getJobOutput diff --git a/tools/wake/cli_options.h b/tools/wake/cli_options.h index 2f87d9ecf..42e5635c4 100644 --- a/tools/wake/cli_options.h +++ b/tools/wake/cli_options.h @@ -69,6 +69,8 @@ struct CommandLineOptions { bool clean; bool list_outputs; bool include_hidden; + bool leaves; + const char *checkout_to; std::optional log_header_align; std::optional cache_miss_on_failure; @@ -101,6 +103,7 @@ struct CommandLineOptions { std::vector> output_files = {}; std::vector> labels = {}; std::vector> tags = {}; + std::vector> run_ids = {}; int argc; char **argv; @@ -112,6 +115,7 @@ struct CommandLineOptions { std::vector output_files_buffer(argc_in, nullptr); std::vector labels_buffer(argc_in, nullptr); std::vector tags_buffer(argc_in, nullptr); + std::vector run_ids_buffer(argc_in, nullptr); // clang-format off struct option options[] { @@ -138,6 +142,7 @@ struct CommandLineOptions { {'o', "output", GOPT_ARGUMENT_REQUIRED | GOPT_REPEATABLE_VALUE, output_files_buffer.data(), (unsigned int)argc_in}, {0, "label", GOPT_ARGUMENT_REQUIRED | GOPT_REPEATABLE_VALUE, labels_buffer.data(), (unsigned int)argc_in}, {0, "tag", GOPT_ARGUMENT_REQUIRED | GOPT_REPEATABLE_VALUE, tags_buffer.data(), (unsigned int)argc_in}, + {0, "run", GOPT_ARGUMENT_REQUIRED | GOPT_REPEATABLE_VALUE, run_ids_buffer.data(), (unsigned int)argc_in}, {'l', "last", GOPT_ARGUMENT_FORBIDDEN}, {0, "last-used", GOPT_ARGUMENT_FORBIDDEN}, {0, "last-executed", GOPT_ARGUMENT_FORBIDDEN}, @@ -188,6 +193,8 @@ struct CommandLineOptions { {0, "user-config", GOPT_ARGUMENT_REQUIRED}, {':', "shebang", GOPT_ARGUMENT_REQUIRED}, {0, "include-hidden", GOPT_ARGUMENT_FORBIDDEN}, + {0, "leaves", GOPT_ARGUMENT_FORBIDDEN}, + {0, "checkout-to", GOPT_ARGUMENT_REQUIRED}, {0, 0, GOPT_LAST} }; // clang-format on @@ -236,6 +243,9 @@ struct CommandLineOptions { clean = arg(options, "clean")->count; list_outputs = arg(options, "list-outputs")->count; include_hidden = arg(options, "include-hidden")->count; + leaves = arg(options, "leaves")->count; + + checkout_to = arg(options, "checkout-to")->argument; percent_str = arg(options, "percent")->argument; jobs_str = arg(options, "jobs")->argument; @@ -313,6 +323,13 @@ struct CommandLineOptions { tags.emplace_back(std::move(parts)); } + for (unsigned int i = 0; i < arg(options, "run")->count; i++) { + std::string line(run_ids_buffer[i]); + std::vector parts = wcl::split_by_fn( + ',', line.begin(), line.end(), [](auto a, auto b) { return std::string(a, b); }); + run_ids.emplace_back(std::move(parts)); + } + if (!percent_str) { percent_str = getenv("WAKE_PERCENT"); } diff --git a/tools/wake/main.cpp b/tools/wake/main.cpp index 577241ed5..d76fb240c 100644 --- a/tools/wake/main.cpp +++ b/tools/wake/main.cpp @@ -41,6 +41,8 @@ #include #include +#include "cas/cas.h" +#include "cas/materialize.h" #include "cli_options.h" #include "describe.h" #include "dst/bind.h" @@ -257,6 +259,9 @@ MatchingQueryFilters build_query_filters(const CommandLineOptions &clo, Database filters.needs_tag_concat = true; // Mark that we need the expensive GROUP BY } + // --run + make_and_group(clo.run_ids, "cast(run_id as TEXT)", "", filters.core_filters); + // --last-exe if (clo.last_exe) { filters.core_filters.push_back( @@ -365,10 +370,73 @@ void query_jobs(const CommandLineOptions &clo, Database &db) { describe(matching_jobs, get_describe_policy(clo), db); } -void inspect_database(const CommandLineOptions &clo, Database &db) { +void checkout_filtered(const CommandLineOptions &clo, Database &db, CASContext &cas_ctx) { + // checkout_to should always be set if this function is called + assert(clo.checkout_to); + + // Build filters to select jobs + auto filters = build_query_filters(clo, db); + + std::string dest = clo.checkout_to; + + // Destination must not exist or must be an empty directory. + struct stat st; + if (stat(dest.c_str(), &st) == 0) { + if (!S_ISDIR(st.st_mode)) { + std::cerr << "error: destination '" << dest << "' exists and is not a directory" << std::endl; + exit(1); + } + std::error_code ec; + auto it = std::filesystem::directory_iterator(dest, ec); + if (!ec && it != std::filesystem::directory_iterator()) { + std::cerr << "error: destination '" << dest << "' is not empty" << std::endl; + exit(1); + } + } else { + std::error_code ec; + std::filesystem::create_directories(dest, ec); + if (ec) { + std::cerr << "error: cannot create '" << dest << "': " << ec.message() << std::endl; + exit(1); + } + } + + assert(cas_ctx.has_store()); + auto &store = *cas_ctx.get_store(); + + // Get outputs from selected jobs using filters directly + auto outputs = db.get_job_outputs(std::move(filters), clo.leaves); + if (outputs.empty()) { + std::cout << "No outputs found matching filters" << std::endl; + return; + } + + int files_written = 0; + std::string dest_path = dest + "/"; + unsigned dest_prefix_len = dest_path.length(); + for (auto &f : outputs) { + dest_path.resize(dest_prefix_len); + dest_path += f.path; + time_t mtime_sec = (time_t)(f.modified / 1000000000LL); + long mtime_nsec = (long)(f.modified % 1000000000LL); + + if (auto msg = cas::materialize_item(store, dest_path, f.type, f.hash, (mode_t)f.mode, + mtime_sec, mtime_nsec)) { + std::cerr << "error: " << *msg << std::endl; + exit(1); + } + if (f.type != "directory") ++files_written; + } + + std::cout << "Checked out " << files_written << " file(s) -> " << dest << std::endl; +} + +void inspect_database(const CommandLineOptions &clo, Database &db, CASContext &cas_ctx) { // tagdag and history are db inspection queries, but are very different from the // rest of the queries which operate on the jobs table. - if (clo.tagdag) { + if (clo.checkout_to) { + checkout_filtered(clo, db, cas_ctx); + } else if (clo.tagdag) { output_tagdag(db, clo.tagdag); } else if (clo.ps) { query_ps(clo, db); @@ -422,6 +490,9 @@ void print_help(const char *argv0) { << " --last-executed Capture all jobs executed by the last build. Skips cache" << std::endl << " --history Report the cmndline history of all wake commands recorded" << std::endl << " --ps Show jobs currently running in active wake builds" << std::endl + << " --checkout-to DIR Materialize outputs of filtered jobs to destination" << std::endl + << " --leaves With --checkout-to: only terminal outputs (not consumed by other jobs)" << std::endl + << " --run RUN_ID Filter jobs by run ID (can be combined with other filters)" << std::endl << " --failed -f Capture jobs which failed last build" << std::endl << " --tag KEY=VAL Capture jobs which are tagged, matching KEY and VAL globs" << std::endl << " --canceled Capture jobs which were canceled (run ended before job finished)" << std::endl @@ -647,11 +718,11 @@ int main(int argc, char **argv) { clo.argv[1] = clo.shebang; } - bool is_db_inspect_capture = !clo.job_ids.empty() || !clo.output_files.empty() || - !clo.input_files.empty() || !clo.labels.empty() || - !clo.tags.empty() || clo.last_use || clo.last_exe || clo.failed || - clo.tagdag || clo.canceled || clo.active || clo.queued || - clo.in_flight || clo.history || clo.ps; + bool is_db_inspect_capture = + !clo.job_ids.empty() || !clo.output_files.empty() || !clo.input_files.empty() || + !clo.labels.empty() || !clo.tags.empty() || !clo.run_ids.empty() || clo.last_use || + clo.last_exe || clo.failed || clo.tagdag || clo.canceled || clo.active || clo.queued || + clo.in_flight || clo.history || clo.ps || clo.checkout_to; // DescribePolicy::human() is the default and doesn't have a flag. // DescribePolicy::debug() is overloaded and can't be marked as a db flag @@ -896,8 +967,11 @@ int main(int argc, char **argv) { db.entropy(&sip_key[0], 2); } + const std::string cas_dir = ".build/cas"; + CASContext cas_ctx(cas_dir); + if (is_db_inspection) { - inspect_database(clo, db); + inspect_database(clo, db, cas_ctx); return 0; } @@ -1137,9 +1211,6 @@ int main(int argc, char **argv) { StringInfo info(clo.verbose, clo.debug, clo.quiet, VERSION_STR, wcl::make_canonical(wake_cwd), cmdline); - const std::string cas_dir = ".build/cas"; - CASContext cas_ctx(cas_dir); - PrimMap pmap = prim_register_all(&info, &jobtable, &cas_ctx); bool isTreeBuilt = true;