From 9804de7fa10bd56c082736b73a0d04726c5362c1 Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Fri, 10 Oct 2025 00:36:45 +0300 Subject: [PATCH 01/11] stub for leveled_bookie:status/1 --- src/leveled_bookie.erl | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 65611021..8ee9bed8 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -75,7 +75,8 @@ book_loglevel/2, book_addlogs/2, book_removelogs/2, - book_headstatus/1 + book_headstatus/1, + book_status/1 ]). %% folding API @@ -1316,6 +1317,24 @@ book_removelogs(Pid, ForcedLogs) -> book_headstatus(Pid) -> gen_server:call(Pid, head_status, infinity). +-spec book_status(pid()) -> proplists:proplist(). +%% @doc +%% Return a proplist conteaining the following items: +%% * current size of the ledger cache; +%% * number of active journal files; +%% * average compaction score for the journal; +%% * current distribution of files across the ledger (e.g. count of files by level); +%% * current size of the penciller in-memory cache; +%% * penciller work backlog status; +%% * last merge time (penciller); +%% * last compaction time (journal); +%% * last compaction result (journal) e.g. files compacted and compaction score; +%% * ratio of metadata to object size (recent PUTs); +%% * PUT/GET/HEAD recent time/count metrics; +%% * mean level for recent fetches. +book_status(Pid) -> + gen_server:call(Pid, status, infinity). + %%%============================================================================ %%% gen_server callbacks %%%============================================================================ @@ -1740,6 +1759,8 @@ handle_call(return_actors, _From, State) -> {reply, {ok, State#state.inker, State#state.penciller}, State}; handle_call(head_status, _From, State) -> {reply, {State#state.head_only, State#state.head_lookup}, State}; +handle_call(status, _From, State) -> + {reply, status(State), State}; handle_call(Msg, _From, State) -> {reply, {unsupported_message, element(1, Msg)}, State}. @@ -3048,6 +3069,23 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when maybelog_snap_timing(_Monitor, _, _) -> ok. + +status(#state{penciller = _Penciller, + ledger_cache = _LedgerCache}) -> + [{ledger_cache_size, -1}, + {n_active_journal_files, -1}, + {avg_compaction_score, -1.0}, + {level_files_count, []}, + {penciller_inmem_cache_size, -1}, + {penciller_work_backlog_status, void}, + {penciller_last_merge_time, os:system_time(millisecond)}, + {journal_last_compaction_time, os:system_time(millisecond)}, + {journal_last_compaction_result, {-1, -1}}, + {metadata_objsize_ratio, -0.1}, + {recent_putgethead_counts, []}, + {recent_fetch_mean_level, -1}]. + + %%%============================================================================ %%% Test %%%============================================================================ From 779bd05bf834a77b83a3943e9cc47f8439f6f799 Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Sun, 12 Oct 2025 00:44:32 +0300 Subject: [PATCH 02/11] bookie_status: ledger_cache_size --- src/leveled_bookie.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 8ee9bed8..704c45f5 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -3071,8 +3071,10 @@ maybelog_snap_timing(_Monitor, _, _) -> status(#state{penciller = _Penciller, - ledger_cache = _LedgerCache}) -> - [{ledger_cache_size, -1}, + ledger_cache = #ledger_cache{mem = Mem}}) -> + PP = ets:info(Mem), + [{ledger_cache_size, #{size => proplists:get_value(size, PP), + memory => proplists:get_value(memory, PP)}}, {n_active_journal_files, -1}, {avg_compaction_score, -1.0}, {level_files_count, []}, From ddb2587dcb65944526c72843775a97186f726887 Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Tue, 21 Oct 2025 00:15:04 +0300 Subject: [PATCH 03/11] bookie_status: redoing using leveled_monitor WIP --- src/leveled_bookie.erl | 34 +++++++++++++-------------------- src/leveled_monitor.erl | 42 ++++++++++++++++++++++++++++++++++++++--- 2 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 704c45f5..ecea2124 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -1494,7 +1494,8 @@ handle_call( State#state.cache_size, State#state.cache_multiple, Cache0, - State#state.penciller + State#state.penciller, + State#state.monitor ) of {ok, Cache} -> @@ -1528,7 +1529,8 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) when State#state.cache_size, State#state.cache_multiple, Cache0, - State#state.penciller + State#state.penciller, + State#state.monitor ) of {ok, Cache} -> @@ -1705,7 +1707,8 @@ handle_call({compact_journal, Timeout}, From, State) when State#state.cache_size, State#state.cache_multiple, State#state.ledger_cache, - State#state.penciller + State#state.penciller, + State#state.monitor ) of {_, NewCache} -> @@ -2898,7 +2901,7 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> end. -spec maybepush_ledgercache( - pos_integer(), pos_integer(), ledger_cache(), pid() + pos_integer(), pos_integer(), ledger_cache(), pid(), pid() ) -> {ok | returned, ledger_cache()}. %% @doc @@ -2911,9 +2914,10 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> %% in the reply. Try again later when it isn't busy (and also potentially %% implement a slow_offer state to slow down the pace at which PUTs are being %% received) -maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) -> +maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller, Monitor) -> Tab = Cache#ledger_cache.mem, CacheSize = ets:info(Tab, size), + leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}), TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult), if TimeToPush -> @@ -3070,22 +3074,10 @@ maybelog_snap_timing(_Monitor, _, _) -> ok. -status(#state{penciller = _Penciller, - ledger_cache = #ledger_cache{mem = Mem}}) -> - PP = ets:info(Mem), - [{ledger_cache_size, #{size => proplists:get_value(size, PP), - memory => proplists:get_value(memory, PP)}}, - {n_active_journal_files, -1}, - {avg_compaction_score, -1.0}, - {level_files_count, []}, - {penciller_inmem_cache_size, -1}, - {penciller_work_backlog_status, void}, - {penciller_last_merge_time, os:system_time(millisecond)}, - {journal_last_compaction_time, os:system_time(millisecond)}, - {journal_last_compaction_result, {-1, -1}}, - {metadata_objsize_ratio, -0.1}, - {recent_putgethead_counts, []}, - {recent_fetch_mean_level, -1}]. +status(#state{monitor = {no_monitor, 0}}) -> + #{}; +status(#state{monitor = {Monitor, _}}) -> + leveled_monitor:get_bookie_status(Monitor). %%%============================================================================ diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index ae38e4f3..b9f500d1 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -38,7 +38,8 @@ log_level/2, log_add/2, log_remove/2, - get_defaults/0 + get_defaults/0, + get_bookie_status/1 ]). -define(LOG_LIST, [ @@ -128,6 +129,21 @@ sample_start_time = os:timestamp() :: erlang:timestamp() }). +-type bookie_status() :: #{ + ledger_cache_size := undefined | pos_integer(), + n_active_journal_files_update := undefined | pos_integer(), + avg_compaction_score_update := undefined | pos_integer(), + level_files_count_update := undefined | pos_integer(), + penciller_inmem_cache_size_update := undefined | pos_integer(), + penciller_work_backlog_status_update := undefined | {[non_neg_integer()], boolean(), boolean()}, + penciller_last_merge_time_update := undefined | pos_integer(), + journal_last_compaction_time_update := undefined | pos_integer(), + journal_last_compaction_result_update := undefined | {float(), non_neg_integer()}, + metadata_objsize_ratio_update := not_implemented, + recent_putgethead_counts := undefined | {non_neg_integer(), non_neg_integer(), non_neg_integer()}, + recent_fetch_mean_level := undefined | [{pos_integer(), non_neg_integer()}] +}. + -record(state, { bookie_get_timings = #bookie_get_timings{} :: bookie_get_timings(), bookie_head_timings = #bookie_head_timings{} :: bookie_head_timings(), @@ -137,7 +153,8 @@ sst_fetch_timings = [] :: list(sst_fetch_timings()), cdb_get_timings = #cdb_get_timings{} :: cdb_get_timings(), log_frequency = ?LOG_FREQUENCY_SECONDS :: pos_integer(), - log_order = [] :: list(log_type()) + log_order = [] :: list(log_type()), + bookie_status = #{} :: bookie_status() }). -type bookie_get_timings() :: #bookie_get_timings{}. @@ -179,6 +196,17 @@ microsecs()}. -type cdb_get_update() :: {cdb_get_update, pos_integer(), microsecs(), microsecs()}. +-type bookie_status_update() :: + {ledger_cache_size_update, pos_integer()} + | {n_active_journal_files_update, pos_integer()} + | {avg_compaction_score_update, pos_integer()} + | {level_files_count_update, pos_integer()} + | {penciller_inmem_cache_size_update, pos_integer()} + | {penciller_work_backlog_status_update, {non_neg_integer(), boolean(), boolean()}} + | {penciller_last_merge_time_update, pos_integer()} + | {journal_last_compaction_time_update, pos_integer()} + | {journal_last_compaction_result_update, {float(), non_neg_integer()}} + | {metadata_objsize_ratio_update, not_implemented}. -type statistic() :: bookie_get_update() | bookie_head_update() @@ -186,7 +214,8 @@ | bookie_snap_update() | pcl_fetch_update() | sst_fetch_update() - | cdb_get_update(). + | cdb_get_update() + | bookie_status_update(). -export_type([monitor/0, timing/0, sst_fetch_type/0, log_type/0]). @@ -228,6 +257,10 @@ log_add(Pid, ForcedLogs) -> log_remove(Pid, ForcedLogs) -> gen_server:cast(Pid, {log_remove, ForcedLogs}). +-spec get_bookie_status(pid()) -> bookie_status(). +get_bookie_status(Pid) -> + gen_server:call(Pid, get_bookie_status). + -spec maybe_time(monitor()) -> erlang:timestamp() | no_timing. maybe_time({_Pid, TimingProbability}) -> case rand:uniform(100) of @@ -272,6 +305,9 @@ init([LogOpts, LogFrequency, LogOrder]) -> erlang:send_after(InitialJitter, self(), report_next_stats), {ok, #state{log_frequency = LogFrequency, log_order = RandomLogOrder}}. +handle_call(get_bookie_status, _From, State = #state{bookie_status = A}) -> + {reply, A, State}; + handle_call(close, _From, State) -> {stop, normal, ok, State}. From 93a854667b71a12b39f3f63d38cf32ca0edffb6f Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Thu, 23 Oct 2025 20:15:39 +0300 Subject: [PATCH 04/11] bookie_status: redoing using leveled_monitor WIP --- src/leveled_bookie.erl | 2 +- src/leveled_monitor.erl | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index ecea2124..9ee47265 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -2914,7 +2914,7 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> %% in the reply. Try again later when it isn't busy (and also potentially %% implement a slow_offer state to slow down the pace at which PUTs are being %% received) -maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller, Monitor) -> +maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _}) -> Tab = Cache#ledger_cache.mem, CacheSize = ets:info(Tab, size), leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}), diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index b9f500d1..53813454 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -231,7 +231,9 @@ monitor_start(LogFreq, LogOrder) -> ), {ok, Monitor}. --spec add_stat(pid(), statistic()) -> ok. +-spec add_stat(no_monitor | pid(), statistic()) -> ok. +add_stat(no_monitor, _Statistic) -> + ok; add_stat(Watcher, Statistic) -> gen_server:cast(Watcher, Statistic). @@ -667,7 +669,10 @@ handle_cast({log_add, ForcedLogs}, State) -> {noreply, State}; handle_cast({log_remove, ForcedLogs}, State) -> ok = leveled_log:remove_forcedlogs(ForcedLogs), - {noreply, State}. + {noreply, State}; +handle_cast({ledger_cache_size_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{ledger_cache_size => A}}}. + handle_info(report_next_stats, State) -> erlang:send_after( From fc9d1fb127e0dd4e315888e1754bba1d199d91cd Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Fri, 24 Oct 2025 00:33:26 +0300 Subject: [PATCH 05/11] bookie_status: redoing using leveled_monitor WIP * avg/best_compaction_score * level_files_count * penciller_inmem_cache_size --- src/leveled_cdb.erl | 4 +++ src/leveled_iclerk.erl | 27 ++++++++++------ src/leveled_monitor.erl | 67 +++++++++++++++++++++++++++++++-------- src/leveled_pclerk.erl | 10 ++++-- src/leveled_penciller.erl | 5 ++- src/leveled_sst.erl | 7 ++++ 6 files changed, 93 insertions(+), 27 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 57db6bee..9317a66f 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -493,6 +493,7 @@ starting({call, From}, {open_writer, Filename}, State) -> {next_state, writer, State0, [{reply, From, ok}, hibernate]}; starting({call, From}, {open_reader, Filename}, State) -> leveled_log:save(State#state.log_options), + leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, +1}), leveled_log:log(cdb02, [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), State0 = State#state{ @@ -504,6 +505,7 @@ starting({call, From}, {open_reader, Filename}, State) -> {next_state, reader, State0, [{reply, From, ok}, hibernate]}; starting({call, From}, {open_reader, Filename, LastKey}, State) -> leveled_log:save(State#state.log_options), + leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, +1}), leveled_log:log(cdb02, [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), State0 = State#state{ @@ -880,6 +882,7 @@ delete_pending( ) when ?IS_DEF(FN), ?IS_DEF(IO) -> + leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, -1}), leveled_log:log(cdb04, [FN, State#state.delete_point]), close_pendingdelete(IO, FN, State#state.waste_path), {stop, normal}; @@ -906,6 +909,7 @@ delete_pending( ), {keep_state_and_data, [?DELETE_TIMEOUT]}; false -> + leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, -1}), leveled_log:log(cdb04, [FN, ManSQN]), close_pendingdelete(IO, FN, State#state.waste_path), {stop, normal} diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index f891531d..6611da37 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -354,7 +354,7 @@ handle_cast( {noreply, State#state{scored_files = [], scoring_state = ScoringState}}; handle_cast( {score_filelist, [Entry | Tail]}, - State = #state{scoring_state = ScoringState} + State = #state{scoring_state = ScoringState, cdb_options = CDBOpts} ) when ?IS_DEF(ScoringState) -> @@ -379,7 +379,8 @@ handle_cast( ScoringState#scoring_state.max_sqn, ?SAMPLE_SIZE, ?BATCH_SIZE, - State#state.reload_strategy + State#state.reload_strategy, + CDBOpts#cdb_options.monitor ); {CachedScore, true, _ScoreOneIn} -> % If caches are used roll the score towards the current score @@ -394,7 +395,8 @@ handle_cast( ScoringState#scoring_state.max_sqn, ?SAMPLE_SIZE, ?BATCH_SIZE, - State#state.reload_strategy + State#state.reload_strategy, + CDBOpts#cdb_options.monitor ), (NewScore + CachedScore) / 2; {CachedScore, false, _ScoreOneIn} -> @@ -427,6 +429,8 @@ handle_cast( {MaxRunLength, State#state.maxrunlength_compactionperc, State#state.singlefile_compactionperc}, {BestRun0, Score} = assess_candidates(Candidates, ScoreParams), + {Monitor, _} = CDBopts#cdb_options.monitor, + leveled_monitor:add_stat(Monitor, {best_compaction_score_update, Score}), leveled_log:log_timer(ic003, [Score, length(BestRun0)], SW), case Score > 0.0 of true -> @@ -594,7 +598,8 @@ schedule_compaction(CompactionHours, RunsPerDay, CurrentTS) -> leveled_codec:sqn(), non_neg_integer(), non_neg_integer(), - leveled_codec:compaction_strategy() + leveled_codec:compaction_strategy(), + leveled_monitor:monitor() ) -> float(). %% @doc @@ -615,7 +620,8 @@ check_single_file( MaxSQN, SampleSize, BatchSize, - ReloadStrategy + ReloadStrategy, + {Monitor, _} ) -> FN = leveled_cdb:cdb_filename(CDB), SW = os:timestamp(), @@ -629,6 +635,7 @@ check_single_file( MaxSQN, ReloadStrategy ), + leveled_monitor:add_stat(Monitor, {avg_compaction_score_update, Score}), safely_log_filescore(PositionList, FN, Score, SW), Score. @@ -1265,14 +1272,14 @@ check_single_file_test() -> replaced end end, - Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), + Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}), ?assertMatch(37.5, Score1), LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end, - Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS), + Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}), ?assertMatch(100.0, Score2), - Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS), + Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS, {no_monitor, 0}), ?assertMatch(37.5, Score3), - Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS), + Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS, {no_monitor, 0}), ?assertMatch(75.0, Score4), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -1417,7 +1424,7 @@ compact_empty_file_test() -> {3, {o, "Bucket", "Key3", null}} ], LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end, - Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS), + Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}), ?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)), ok = leveled_cdb:cdb_deletepending(CDB2), ok = leveled_cdb:cdb_destroy(CDB2). diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index 53813454..a1f4fde0 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -131,19 +131,22 @@ -type bookie_status() :: #{ ledger_cache_size := undefined | pos_integer(), - n_active_journal_files_update := undefined | pos_integer(), - avg_compaction_score_update := undefined | pos_integer(), - level_files_count_update := undefined | pos_integer(), - penciller_inmem_cache_size_update := undefined | pos_integer(), - penciller_work_backlog_status_update := undefined | {[non_neg_integer()], boolean(), boolean()}, - penciller_last_merge_time_update := undefined | pos_integer(), - journal_last_compaction_time_update := undefined | pos_integer(), - journal_last_compaction_result_update := undefined | {float(), non_neg_integer()}, - metadata_objsize_ratio_update := not_implemented, + n_active_journal_files := undefined | pos_integer(), + avg_compaction_score_sample := undefined | [pos_integer()], + best_compaction_score := undefined | pos_integer(), + level_files_count := undefined | #{non_neg_integer() := non_neg_integer()}, + penciller_inmem_cache_size := undefined | pos_integer(), + penciller_work_backlog_status := undefined | {non_neg_integer(), boolean(), boolean()}, + penciller_last_merge_time := undefined | pos_integer(), + journal_last_compaction_time := undefined | pos_integer(), + journal_last_compaction_result := undefined | {float(), non_neg_integer()}, + metadata_objsize_ratio := not_implemented, recent_putgethead_counts := undefined | {non_neg_integer(), non_neg_integer(), non_neg_integer()}, recent_fetch_mean_level := undefined | [{pos_integer(), non_neg_integer()}] }. +-define(AVG_COMPACTION_SCORE_OVER_MAX, 50). + -record(state, { bookie_get_timings = #bookie_get_timings{} :: bookie_get_timings(), bookie_head_timings = #bookie_head_timings{} :: bookie_head_timings(), @@ -198,9 +201,10 @@ {cdb_get_update, pos_integer(), microsecs(), microsecs()}. -type bookie_status_update() :: {ledger_cache_size_update, pos_integer()} - | {n_active_journal_files_update, pos_integer()} - | {avg_compaction_score_update, pos_integer()} - | {level_files_count_update, pos_integer()} + | {n_active_journal_files_update, integer()} + | {avg_compaction_score_update, [pos_integer()]} + | {best_compaction_score_update, pos_integer()} + | {level_files_count_update, #{non_neg_integer() => pos_integer()}} | {penciller_inmem_cache_size_update, pos_integer()} | {penciller_work_backlog_status_update, {non_neg_integer(), boolean(), boolean()}} | {penciller_last_merge_time_update, pos_integer()} @@ -233,6 +237,7 @@ monitor_start(LogFreq, LogOrder) -> -spec add_stat(no_monitor | pid(), statistic()) -> ok. add_stat(no_monitor, _Statistic) -> + logger:notice("not updating statistic ~p because no_monitor", [_Statistic]), ok; add_stat(Watcher, Statistic) -> gen_server:cast(Watcher, Statistic). @@ -670,8 +675,44 @@ handle_cast({log_add, ForcedLogs}, State) -> handle_cast({log_remove, ForcedLogs}, State) -> ok = leveled_log:remove_forcedlogs(ForcedLogs), {noreply, State}; + handle_cast({ledger_cache_size_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{ledger_cache_size => A}}}. + {noreply, State#state{bookie_status = BS#{ledger_cache_size => A}}}; + +handle_cast({n_active_journal_files_update, Delta}, State = #state{bookie_status = BS0}) -> + A = maps:get(n_active_journal_files, BS0, 0), + BS = maps:put(n_active_journal_files, A + Delta, BS0), + {noreply, State#state{bookie_status = BS}}; + +handle_cast({avg_compaction_score_update, A}, State = #state{bookie_status = BS}) -> + NewSample = + case [A | maps:get(avg_compaction_score_sample, BS, [])] of + L when length(L) > ?AVG_COMPACTION_SCORE_OVER_MAX -> + lists:sublist(L, ?AVG_COMPACTION_SCORE_OVER_MAX); + L -> + L + end, + {noreply, State#state{bookie_status = BS#{avg_compaction_score_sample => NewSample}}}; + +handle_cast({best_compaction_score_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{best_compaction_score => A}}}; + +handle_cast({level_files_count_update, U}, State = #state{bookie_status = BS0}) -> + A = maps:get(level_files_count, BS0, #{}), + BS = maps:put(level_files_count, maps:merge(A, U), BS0), + {noreply, State#state{bookie_status = BS}}; + +handle_cast({penciller_inmem_cache_size_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{penciller_inmem_cache_size => A}}}; +handle_cast({penciller_work_backlog_status_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{penciller_work_backlog_status => A}}}; +handle_cast({penciller_last_merge_time_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{penciller_last_merge_time => A}}}; +handle_cast({journal_last_compaction_result_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{journal_last_compaction_result => A}}}; +handle_cast({metadata_objsize_ratio_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{metadata_objsize_ratio => A}}}. + handle_info(report_next_stats, State) -> diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 5e0af8df..16d14b08 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -388,15 +388,19 @@ merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) -> list(leveled_sst:sst_pointer()) }. do_merge( - [], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max + [], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, SSTOpts, Additions, _Max ) -> + {Monitor, _} = SSTOpts#sst_options.monitor, + leveled_monitor:add_stat(Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}}), leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]), {lists:reverse(Additions), [], []}; do_merge( - KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max + KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, SSTOpts, Additions, Max ) when length(Additions) >= Max -> + {Monitor, _} = SSTOpts#sst_options.monitor, + leveled_monitor:add_stat(Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}}), leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]), FNSrc = leveled_penciller:sst_filename( @@ -409,7 +413,7 @@ do_merge( {ExpandedKL1, []} = split_unexpanded_files(KL1), {ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2), TS1 = os:timestamp(), - InfOpts = OptsSST#sst_options{max_sstslots = infinity}, + InfOpts = SSTOpts#sst_options{max_sstslots = infinity}, % Need to be careful to make sure all the remainder goes in one file, % could be situations whereby the max_sstslots has been changed between % restarts - and so there is too much data for one file in the diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index b2ee31a5..a03a71e7 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -793,6 +793,8 @@ handle_call( State#state.levelzero_index, length(State#state.levelzero_cache) + 1 ), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {penciller_inmem_cache_size_update, NewL0Size}), leveled_log:log_randomtimer( p0031, [NewL0Size, true, true, MinSQN, MaxSQN], @@ -1260,7 +1262,7 @@ handle_cast( }}; handle_cast( work_for_clerk, - State = #state{manifest = Man, levelzero_cache = L0Cache, clerk = Clerk} + State = #state{manifest = Man, levelzero_cache = L0Cache, clerk = Clerk, monitor = Monitor} ) when ?IS_DEF(Man), ?IS_DEF(L0Cache), ?IS_DEF(Clerk) -> @@ -1320,6 +1322,7 @@ handle_cast( % L0 work to do, or because the backlog has grown beyond % tolerance Backlog = WC >= ?WORKQUEUE_BACKLOG_TOLERANCE, + leveled_monitor:add_stat(Monitor, {penciller_work_backlog_status_update, {WC, Backlog, L0Full}}), leveled_log:log(p0024, [WC, Backlog, L0Full]), [TL | _Tail] = WL, ok = leveled_pclerk:clerk_push(Clerk, {TL, Man}), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 93663572..3218d2b9 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -645,6 +645,12 @@ starting( Level ), Summary = UpdState#state.summary, + + if Level == 0 -> + leveled_monitor:add_stat(element(1, Monitor), {penciller_inmem_cache_size_update, 0}); + el/=se -> + noop + end, leveled_log:log_timer( sst08, [ActualFilename, Level, Summary#summary.max_sqn], SW ), @@ -745,6 +751,7 @@ starting(cast, complete_l0startup, State) -> Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), + leveled_monitor:add_stat(element(1, Monitor), {penciller_inmem_cache_size_update, 0}), leveled_log:log_timer( sst08, [ActualFilename, 0, Summary#summary.max_sqn], SW0 ), From b7fd8870e66e0bc42f658dcabe7f20994df7b2e2 Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Tue, 28 Oct 2025 01:39:56 +0200 Subject: [PATCH 06/11] fixups --- src/leveled_cdb.erl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 9317a66f..6ffbe1cf 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -493,7 +493,8 @@ starting({call, From}, {open_writer, Filename}, State) -> {next_state, writer, State0, [{reply, From, ok}, hibernate]}; starting({call, From}, {open_reader, Filename}, State) -> leveled_log:save(State#state.log_options), - leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, +1}), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}), leveled_log:log(cdb02, [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, false), State0 = State#state{ @@ -505,7 +506,8 @@ starting({call, From}, {open_reader, Filename}, State) -> {next_state, reader, State0, [{reply, From, ok}, hibernate]}; starting({call, From}, {open_reader, Filename, LastKey}, State) -> leveled_log:save(State#state.log_options), - leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, +1}), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, +1}), leveled_log:log(cdb02, [Filename]), {Handle, Index, LastKey} = open_for_readonly(Filename, LastKey), State0 = State#state{ @@ -882,7 +884,8 @@ delete_pending( ) when ?IS_DEF(FN), ?IS_DEF(IO) -> - leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, -1}), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}), leveled_log:log(cdb04, [FN, State#state.delete_point]), close_pendingdelete(IO, FN, State#state.waste_path), {stop, normal}; @@ -909,7 +912,8 @@ delete_pending( ), {keep_state_and_data, [?DELETE_TIMEOUT]}; false -> - leveled_monitor:add_stat(State#state.monitor, {n_active_journal_files_update, -1}), + {Monitor, _} = State#state.monitor, + leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}), leveled_log:log(cdb04, [FN, ManSQN]), close_pendingdelete(IO, FN, State#state.waste_path), {stop, normal} From da36264ed2c5bc5ab2e631a8d7b406f9ae25048c Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Tue, 28 Oct 2025 18:12:44 +0200 Subject: [PATCH 07/11] bookie_status: redoing using leveled_monitor WIP penciller_last_merge_time --- src/leveled_monitor.erl | 15 ++++++++------- src/leveled_pclerk.erl | 6 ++++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index a1f4fde0..f319dd6e 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -204,10 +204,10 @@ | {n_active_journal_files_update, integer()} | {avg_compaction_score_update, [pos_integer()]} | {best_compaction_score_update, pos_integer()} - | {level_files_count_update, #{non_neg_integer() => pos_integer()}} + | {level_files_count_update, #{non_neg_integer() => pos_integer()}, TS::non_neg_integer()} | {penciller_inmem_cache_size_update, pos_integer()} | {penciller_work_backlog_status_update, {non_neg_integer(), boolean(), boolean()}} - | {penciller_last_merge_time_update, pos_integer()} + %% | {penciller_last_merge_time_update, pos_integer()} via level_files_count_update | {journal_last_compaction_time_update, pos_integer()} | {journal_last_compaction_result_update, {float(), non_neg_integer()}} | {metadata_objsize_ratio_update, not_implemented}. @@ -697,17 +697,18 @@ handle_cast({avg_compaction_score_update, A}, State = #state{bookie_status = BS} handle_cast({best_compaction_score_update, A}, State = #state{bookie_status = BS}) -> {noreply, State#state{bookie_status = BS#{best_compaction_score => A}}}; -handle_cast({level_files_count_update, U}, State = #state{bookie_status = BS0}) -> +handle_cast({level_files_count_update, U, TS}, State = #state{bookie_status = BS0}) -> A = maps:get(level_files_count, BS0, #{}), - BS = maps:put(level_files_count, maps:merge(A, U), BS0), - {noreply, State#state{bookie_status = BS}}; + BS1 = maps:put(level_files_count, maps:merge(A, U), BS0), + BS2 = maps:put(penciller_last_merge_time, TS, BS1), + {noreply, State#state{bookie_status = BS2}}; handle_cast({penciller_inmem_cache_size_update, A}, State = #state{bookie_status = BS}) -> {noreply, State#state{bookie_status = BS#{penciller_inmem_cache_size => A}}}; + handle_cast({penciller_work_backlog_status_update, A}, State = #state{bookie_status = BS}) -> {noreply, State#state{bookie_status = BS#{penciller_work_backlog_status => A}}}; -handle_cast({penciller_last_merge_time_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{penciller_last_merge_time => A}}}; + handle_cast({journal_last_compaction_result_update, A}, State = #state{bookie_status = BS}) -> {noreply, State#state{bookie_status = BS#{journal_last_compaction_result => A}}}; handle_cast({metadata_objsize_ratio_update, A}, State = #state{bookie_status = BS}) -> diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 16d14b08..7a89e96c 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -391,7 +391,8 @@ do_merge( [], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, SSTOpts, Additions, _Max ) -> {Monitor, _} = SSTOpts#sst_options.monitor, - leveled_monitor:add_stat(Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}}), + leveled_monitor:add_stat( + Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}, os:system_time(millisecond)}), leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]), {lists:reverse(Additions), [], []}; do_merge( @@ -400,7 +401,8 @@ do_merge( length(Additions) >= Max -> {Monitor, _} = SSTOpts#sst_options.monitor, - leveled_monitor:add_stat(Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}}), + leveled_monitor:add_stat( + Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}, os:system_time(millisecond)}), leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]), FNSrc = leveled_penciller:sst_filename( From b93116ec0f99933478813c9a58598f44c85bbdae Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Tue, 28 Oct 2025 18:16:21 +0200 Subject: [PATCH 08/11] bookie_status: redoing using leveled_monitor WIP journal_last_compaction_time --- src/leveled_iclerk.erl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 6611da37..63582c84 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -476,6 +476,9 @@ handle_cast( -> FilesToDelete = leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList), + CDBopts = State#state.cdb_options, + {Monitor, _} = CDBopts#cdb_options.monitor, + leveled_monitor:add_stat(Monitor, {journal_last_compaction_time_update, os:system_time(millisecond)}), leveled_log:log(ic007, []), ok = leveled_inker:ink_clerkcomplete(Ink, [], FilesToDelete), {noreply, State}; From 4600df2baac7aa54404d173bbcf0328c1120c6eb Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Wed, 29 Oct 2025 01:15:44 +0200 Subject: [PATCH 09/11] bookie_status: redoing using leveled_monitor mostly done --- src/leveled_bookie.erl | 2 +- src/leveled_iclerk.erl | 5 +++- src/leveled_monitor.erl | 63 ++++++++++++++++++++++----------------- src/leveled_penciller.erl | 5 +++- 4 files changed, 45 insertions(+), 30 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 9ee47265..0fb3bfd0 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -2901,7 +2901,7 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> end. -spec maybepush_ledgercache( - pos_integer(), pos_integer(), ledger_cache(), pid(), pid() + pos_integer(), pos_integer(), ledger_cache(), pid(), leveled_monitor:monitor() ) -> {ok | returned, ledger_cache()}. %% @doc diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 63582c84..e9d41107 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -430,7 +430,10 @@ handle_cast( State#state.singlefile_compactionperc}, {BestRun0, Score} = assess_candidates(Candidates, ScoreParams), {Monitor, _} = CDBopts#cdb_options.monitor, - leveled_monitor:add_stat(Monitor, {best_compaction_score_update, Score}), + leveled_monitor:add_stat( + Monitor, {journal_last_compaction_result_update, + {length(BestRun0), + Score}}), leveled_log:log_timer(ic003, [Score, length(BestRun0)], SW), case Score > 0.0 of true -> diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index f319dd6e..46e3805b 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -130,19 +130,17 @@ }). -type bookie_status() :: #{ - ledger_cache_size := undefined | pos_integer(), - n_active_journal_files := undefined | pos_integer(), - avg_compaction_score_sample := undefined | [pos_integer()], - best_compaction_score := undefined | pos_integer(), - level_files_count := undefined | #{non_neg_integer() := non_neg_integer()}, - penciller_inmem_cache_size := undefined | pos_integer(), - penciller_work_backlog_status := undefined | {non_neg_integer(), boolean(), boolean()}, - penciller_last_merge_time := undefined | pos_integer(), - journal_last_compaction_time := undefined | pos_integer(), - journal_last_compaction_result := undefined | {float(), non_neg_integer()}, - metadata_objsize_ratio := not_implemented, - recent_putgethead_counts := undefined | {non_neg_integer(), non_neg_integer(), non_neg_integer()}, - recent_fetch_mean_level := undefined | [{pos_integer(), non_neg_integer()}] + ledger_cache_size => non_neg_integer(), + n_active_journal_files => pos_integer(), + avg_compaction_score_sample => [float()], + level_files_count => #{non_neg_integer() => non_neg_integer()}, + penciller_inmem_cache_size => pos_integer(), + penciller_work_backlog_status => {non_neg_integer(), boolean(), boolean()}, + penciller_last_merge_time => integer(), + journal_last_compaction_time => integer(), + journal_last_compaction_result => {non_neg_integer(), float()}, + recent_putgethead_counts => {non_neg_integer(), non_neg_integer(), non_neg_integer()}, + recent_fetch_mean_level => [{pos_integer(), non_neg_integer()}] }. -define(AVG_COMPACTION_SCORE_OVER_MAX, 50). @@ -202,14 +200,13 @@ -type bookie_status_update() :: {ledger_cache_size_update, pos_integer()} | {n_active_journal_files_update, integer()} - | {avg_compaction_score_update, [pos_integer()]} - | {best_compaction_score_update, pos_integer()} + | {avg_compaction_score_update, float()} | {level_files_count_update, #{non_neg_integer() => pos_integer()}, TS::non_neg_integer()} | {penciller_inmem_cache_size_update, pos_integer()} | {penciller_work_backlog_status_update, {non_neg_integer(), boolean(), boolean()}} %% | {penciller_last_merge_time_update, pos_integer()} via level_files_count_update - | {journal_last_compaction_time_update, pos_integer()} - | {journal_last_compaction_result_update, {float(), non_neg_integer()}} + | {journal_last_compaction_time_update, integer()} + | {journal_last_compaction_result_update, {non_neg_integer(), float()}} | {metadata_objsize_ratio_update, not_implemented}. -type statistic() :: bookie_get_update() @@ -237,7 +234,6 @@ monitor_start(LogFreq, LogOrder) -> -spec add_stat(no_monitor | pid(), statistic()) -> ok. add_stat(no_monitor, _Statistic) -> - logger:notice("not updating statistic ~p because no_monitor", [_Statistic]), ok; add_stat(Watcher, Statistic) -> gen_server:cast(Watcher, Statistic). @@ -310,10 +306,10 @@ init([LogOpts, LogFrequency, LogOrder]) -> ), InitialJitter = rand:uniform(2 * 1000 * LogFrequency), erlang:send_after(InitialJitter, self(), report_next_stats), - {ok, #state{log_frequency = LogFrequency, log_order = RandomLogOrder}}. + {ok, #state{log_frequency = LogFrequency, log_order = RandomLogOrder, bookie_status = #{}}}. -handle_call(get_bookie_status, _From, State = #state{bookie_status = A}) -> - {reply, A, State}; +handle_call(get_bookie_status, _From, State) -> + {reply, enriched_bookie_status(State), State}; handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -694,9 +690,6 @@ handle_cast({avg_compaction_score_update, A}, State = #state{bookie_status = BS} end, {noreply, State#state{bookie_status = BS#{avg_compaction_score_sample => NewSample}}}; -handle_cast({best_compaction_score_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{best_compaction_score => A}}}; - handle_cast({level_files_count_update, U, TS}, State = #state{bookie_status = BS0}) -> A = maps:get(level_files_count, BS0, #{}), BS1 = maps:put(level_files_count, maps:merge(A, U), BS0), @@ -709,10 +702,11 @@ handle_cast({penciller_inmem_cache_size_update, A}, State = #state{bookie_status handle_cast({penciller_work_backlog_status_update, A}, State = #state{bookie_status = BS}) -> {noreply, State#state{bookie_status = BS#{penciller_work_backlog_status => A}}}; +handle_cast({journal_last_compaction_time_update, A}, State = #state{bookie_status = BS}) -> + {noreply, State#state{bookie_status = BS#{journal_last_compaction_time => A}}}; + handle_cast({journal_last_compaction_result_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{journal_last_compaction_result => A}}}; -handle_cast({metadata_objsize_ratio_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{metadata_objsize_ratio => A}}}. + {noreply, State#state{bookie_status = BS#{journal_last_compaction_result => A}}}. @@ -734,6 +728,21 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +enriched_bookie_status(#state{bookie_status = BS, + bookie_get_timings = GT, + bookie_put_timings = PT, + bookie_head_timings = HT}) -> + BS#{get_sample_count => GT#bookie_get_timings.sample_count, + get_body_time => GT#bookie_get_timings.body_time, + head_sample_count => HT#bookie_head_timings.sample_count, + head_rsp_time => HT#bookie_head_timings.rsp_time, + put_sample_count => PT#bookie_put_timings.sample_count, + put_prep_time => PT#bookie_put_timings.prep_time, + put_ink_time => PT#bookie_put_timings.ink_time, + put_mem_time => PT#bookie_put_timings.mem_time + }. + + %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index a03a71e7..f66fc959 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -1262,7 +1262,10 @@ handle_cast( }}; handle_cast( work_for_clerk, - State = #state{manifest = Man, levelzero_cache = L0Cache, clerk = Clerk, monitor = Monitor} + State = #state{manifest = Man, + levelzero_cache = L0Cache, + clerk = Clerk, + monitor = {Monitor, _}} ) when ?IS_DEF(Man), ?IS_DEF(L0Cache), ?IS_DEF(Clerk) -> From d576670dc2a29d66a981d02810905daaf72e0c4a Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Wed, 29 Oct 2025 02:17:04 +0200 Subject: [PATCH 10/11] oblige erlfmt --- src/leveled_bookie.erl | 12 ++++-- src/leveled_cdb.erl | 4 +- src/leveled_iclerk.erl | 31 ++++++++++---- src/leveled_monitor.erl | 90 +++++++++++++++++++++++++-------------- src/leveled_pclerk.erl | 10 ++++- src/leveled_penciller.erl | 20 ++++++--- src/leveled_sst.erl | 13 ++++-- 7 files changed, 121 insertions(+), 59 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 0fb3bfd0..4d2d2c48 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -2901,7 +2901,11 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> end. -spec maybepush_ledgercache( - pos_integer(), pos_integer(), ledger_cache(), pid(), leveled_monitor:monitor() + pos_integer(), + pos_integer(), + ledger_cache(), + pid(), + leveled_monitor:monitor() ) -> {ok | returned, ledger_cache()}. %% @doc @@ -2914,7 +2918,9 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> %% in the reply. Try again later when it isn't busy (and also potentially %% implement a slow_offer state to slow down the pace at which PUTs are being %% received) -maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _}) -> +maybepush_ledgercache( + MaxCacheSize, MaxCacheMult, Cache, Penciller, {Monitor, _} +) -> Tab = Cache#ledger_cache.mem, CacheSize = ets:info(Tab, size), leveled_monitor:add_stat(Monitor, {ledger_cache_size_update, CacheSize}), @@ -3073,13 +3079,11 @@ maybelog_snap_timing({Pid, _StatsFreq}, BookieTime, PCLTime) when maybelog_snap_timing(_Monitor, _, _) -> ok. - status(#state{monitor = {no_monitor, 0}}) -> #{}; status(#state{monitor = {Monitor, _}}) -> leveled_monitor:get_bookie_status(Monitor). - %%%============================================================================ %%% Test %%%============================================================================ diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index 6ffbe1cf..36feb661 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -913,7 +913,9 @@ delete_pending( {keep_state_and_data, [?DELETE_TIMEOUT]}; false -> {Monitor, _} = State#state.monitor, - leveled_monitor:add_stat(Monitor, {n_active_journal_files_update, -1}), + leveled_monitor:add_stat( + Monitor, {n_active_journal_files_update, -1} + ), leveled_log:log(cdb04, [FN, ManSQN]), close_pendingdelete(IO, FN, State#state.waste_path), {stop, normal} diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index e9d41107..7ebdf627 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -431,9 +431,9 @@ handle_cast( {BestRun0, Score} = assess_candidates(Candidates, ScoreParams), {Monitor, _} = CDBopts#cdb_options.monitor, leveled_monitor:add_stat( - Monitor, {journal_last_compaction_result_update, - {length(BestRun0), - Score}}), + Monitor, + {journal_last_compaction_result_update, {length(BestRun0), Score}} + ), leveled_log:log_timer(ic003, [Score, length(BestRun0)], SW), case Score > 0.0 of true -> @@ -481,7 +481,10 @@ handle_cast( leveled_imanifest:find_persistedentries(PersistedSQN, ManifestAsList), CDBopts = State#state.cdb_options, {Monitor, _} = CDBopts#cdb_options.monitor, - leveled_monitor:add_stat(Monitor, {journal_last_compaction_time_update, os:system_time(millisecond)}), + leveled_monitor:add_stat( + Monitor, + {journal_last_compaction_time_update, os:system_time(millisecond)} + ), leveled_log:log(ic007, []), ok = leveled_inker:ink_clerkcomplete(Ink, [], FilesToDelete), {noreply, State}; @@ -1278,14 +1281,22 @@ check_single_file_test() -> replaced end end, - Score1 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}), + Score1 = check_single_file( + CDB, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0} + ), ?assertMatch(37.5, Score1), LedgerFun2 = fun(_Srv, _Key, _ObjSQN) -> current end, - Score2 = check_single_file(CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}), + Score2 = check_single_file( + CDB, LedgerFun2, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0} + ), ?assertMatch(100.0, Score2), - Score3 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS, {no_monitor, 0}), + Score3 = check_single_file( + CDB, LedgerFun1, LedgerSrv1, 9, 8, 3, RS, {no_monitor, 0} + ), ?assertMatch(37.5, Score3), - Score4 = check_single_file(CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS, {no_monitor, 0}), + Score4 = check_single_file( + CDB, LedgerFun1, LedgerSrv1, 4, 8, 4, RS, {no_monitor, 0} + ), ?assertMatch(75.0, Score4), ok = leveled_cdb:cdb_deletepending(CDB), ok = leveled_cdb:cdb_destroy(CDB). @@ -1430,7 +1441,9 @@ compact_empty_file_test() -> {3, {o, "Bucket", "Key3", null}} ], LedgerFun1 = fun(_Srv, _Key, _ObjSQN) -> replaced end, - Score1 = check_single_file(CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0}), + Score1 = check_single_file( + CDB2, LedgerFun1, LedgerSrv1, 9, 8, 4, RS, {no_monitor, 0} + ), ?assert((+0.0 =:= Score1) orelse (-0.0 =:= Score1)), ok = leveled_cdb:cdb_deletepending(CDB2), ok = leveled_cdb:cdb_destroy(CDB2). diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index 46e3805b..1158646e 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -139,7 +139,9 @@ penciller_last_merge_time => integer(), journal_last_compaction_time => integer(), journal_last_compaction_result => {non_neg_integer(), float()}, - recent_putgethead_counts => {non_neg_integer(), non_neg_integer(), non_neg_integer()}, + recent_putgethead_counts => { + non_neg_integer(), non_neg_integer(), non_neg_integer() + }, recent_fetch_mean_level => [{pos_integer(), non_neg_integer()}] }. @@ -201,9 +203,12 @@ {ledger_cache_size_update, pos_integer()} | {n_active_journal_files_update, integer()} | {avg_compaction_score_update, float()} - | {level_files_count_update, #{non_neg_integer() => pos_integer()}, TS::non_neg_integer()} + | {level_files_count_update, #{non_neg_integer() => pos_integer()}, + TS :: non_neg_integer()} | {penciller_inmem_cache_size_update, pos_integer()} - | {penciller_work_backlog_status_update, {non_neg_integer(), boolean(), boolean()}} + | {penciller_work_backlog_status_update, { + non_neg_integer(), boolean(), boolean() + }} %% | {penciller_last_merge_time_update, pos_integer()} via level_files_count_update | {journal_last_compaction_time_update, integer()} | {journal_last_compaction_result_update, {non_neg_integer(), float()}} @@ -306,11 +311,14 @@ init([LogOpts, LogFrequency, LogOrder]) -> ), InitialJitter = rand:uniform(2 * 1000 * LogFrequency), erlang:send_after(InitialJitter, self(), report_next_stats), - {ok, #state{log_frequency = LogFrequency, log_order = RandomLogOrder, bookie_status = #{}}}. + {ok, #state{ + log_frequency = LogFrequency, + log_order = RandomLogOrder, + bookie_status = #{} + }}. handle_call(get_bookie_status, _From, State) -> {reply, enriched_bookie_status(State), State}; - handle_call(close, _From, State) -> {stop, normal, ok, State}. @@ -671,16 +679,17 @@ handle_cast({log_add, ForcedLogs}, State) -> handle_cast({log_remove, ForcedLogs}, State) -> ok = leveled_log:remove_forcedlogs(ForcedLogs), {noreply, State}; - handle_cast({ledger_cache_size_update, A}, State = #state{bookie_status = BS}) -> {noreply, State#state{bookie_status = BS#{ledger_cache_size => A}}}; - -handle_cast({n_active_journal_files_update, Delta}, State = #state{bookie_status = BS0}) -> +handle_cast( + {n_active_journal_files_update, Delta}, State = #state{bookie_status = BS0} +) -> A = maps:get(n_active_journal_files, BS0, 0), BS = maps:put(n_active_journal_files, A + Delta, BS0), {noreply, State#state{bookie_status = BS}}; - -handle_cast({avg_compaction_score_update, A}, State = #state{bookie_status = BS}) -> +handle_cast( + {avg_compaction_score_update, A}, State = #state{bookie_status = BS} +) -> NewSample = case [A | maps:get(avg_compaction_score_sample, BS, [])] of L when length(L) > ?AVG_COMPACTION_SCORE_OVER_MAX -> @@ -688,27 +697,40 @@ handle_cast({avg_compaction_score_update, A}, State = #state{bookie_status = BS} L -> L end, - {noreply, State#state{bookie_status = BS#{avg_compaction_score_sample => NewSample}}}; - -handle_cast({level_files_count_update, U, TS}, State = #state{bookie_status = BS0}) -> + {noreply, State#state{ + bookie_status = BS#{avg_compaction_score_sample => NewSample} + }}; +handle_cast( + {level_files_count_update, U, TS}, State = #state{bookie_status = BS0} +) -> A = maps:get(level_files_count, BS0, #{}), BS1 = maps:put(level_files_count, maps:merge(A, U), BS0), BS2 = maps:put(penciller_last_merge_time, TS, BS1), {noreply, State#state{bookie_status = BS2}}; - -handle_cast({penciller_inmem_cache_size_update, A}, State = #state{bookie_status = BS}) -> +handle_cast( + {penciller_inmem_cache_size_update, A}, State = #state{bookie_status = BS} +) -> {noreply, State#state{bookie_status = BS#{penciller_inmem_cache_size => A}}}; - -handle_cast({penciller_work_backlog_status_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{penciller_work_backlog_status => A}}}; - -handle_cast({journal_last_compaction_time_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{journal_last_compaction_time => A}}}; - -handle_cast({journal_last_compaction_result_update, A}, State = #state{bookie_status = BS}) -> - {noreply, State#state{bookie_status = BS#{journal_last_compaction_result => A}}}. - - +handle_cast( + {penciller_work_backlog_status_update, A}, + State = #state{bookie_status = BS} +) -> + {noreply, State#state{ + bookie_status = BS#{penciller_work_backlog_status => A} + }}; +handle_cast( + {journal_last_compaction_time_update, A}, State = #state{bookie_status = BS} +) -> + {noreply, State#state{ + bookie_status = BS#{journal_last_compaction_time => A} + }}; +handle_cast( + {journal_last_compaction_result_update, A}, + State = #state{bookie_status = BS} +) -> + {noreply, State#state{ + bookie_status = BS#{journal_last_compaction_result => A} + }}. handle_info(report_next_stats, State) -> erlang:send_after( @@ -728,11 +750,14 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -enriched_bookie_status(#state{bookie_status = BS, - bookie_get_timings = GT, - bookie_put_timings = PT, - bookie_head_timings = HT}) -> - BS#{get_sample_count => GT#bookie_get_timings.sample_count, +enriched_bookie_status(#state{ + bookie_status = BS, + bookie_get_timings = GT, + bookie_put_timings = PT, + bookie_head_timings = HT +}) -> + BS#{ + get_sample_count => GT#bookie_get_timings.sample_count, get_body_time => GT#bookie_get_timings.body_time, head_sample_count => HT#bookie_head_timings.sample_count, head_rsp_time => HT#bookie_head_timings.rsp_time, @@ -740,8 +765,7 @@ enriched_bookie_status(#state{bookie_status = BS, put_prep_time => PT#bookie_put_timings.prep_time, put_ink_time => PT#bookie_put_timings.ink_time, put_mem_time => PT#bookie_put_timings.mem_time - }. - + }. %%%============================================================================ %%% Test diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 7a89e96c..e67fd2ea 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -392,7 +392,10 @@ do_merge( ) -> {Monitor, _} = SSTOpts#sst_options.monitor, leveled_monitor:add_stat( - Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}, os:system_time(millisecond)}), + Monitor, + {level_files_count_update, #{SinkLevel => length(Additions)}, + os:system_time(millisecond)} + ), leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]), {lists:reverse(Additions), [], []}; do_merge( @@ -402,7 +405,10 @@ do_merge( -> {Monitor, _} = SSTOpts#sst_options.monitor, leveled_monitor:add_stat( - Monitor, {level_files_count_update, #{SinkLevel => length(Additions)}, os:system_time(millisecond)}), + Monitor, + {level_files_count_update, #{SinkLevel => length(Additions)}, + os:system_time(millisecond)} + ), leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]), FNSrc = leveled_penciller:sst_filename( diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index f66fc959..10b8404d 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -794,7 +794,9 @@ handle_call( length(State#state.levelzero_cache) + 1 ), {Monitor, _} = State#state.monitor, - leveled_monitor:add_stat(Monitor, {penciller_inmem_cache_size_update, NewL0Size}), + leveled_monitor:add_stat( + Monitor, {penciller_inmem_cache_size_update, NewL0Size} + ), leveled_log:log_randomtimer( p0031, [NewL0Size, true, true, MinSQN, MaxSQN], @@ -1262,10 +1264,12 @@ handle_cast( }}; handle_cast( work_for_clerk, - State = #state{manifest = Man, - levelzero_cache = L0Cache, - clerk = Clerk, - monitor = {Monitor, _}} + State = #state{ + manifest = Man, + levelzero_cache = L0Cache, + clerk = Clerk, + monitor = {Monitor, _} + } ) when ?IS_DEF(Man), ?IS_DEF(L0Cache), ?IS_DEF(Clerk) -> @@ -1325,7 +1329,11 @@ handle_cast( % L0 work to do, or because the backlog has grown beyond % tolerance Backlog = WC >= ?WORKQUEUE_BACKLOG_TOLERANCE, - leveled_monitor:add_stat(Monitor, {penciller_work_backlog_status_update, {WC, Backlog, L0Full}}), + leveled_monitor:add_stat( + Monitor, + {penciller_work_backlog_status_update, + {WC, Backlog, L0Full}} + ), leveled_log:log(p0024, [WC, Backlog, L0Full]), [TL | _Tail] = WL, ok = leveled_pclerk:clerk_push(Clerk, {TL, Man}), diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 3218d2b9..a7f1feec 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -646,9 +646,12 @@ starting( ), Summary = UpdState#state.summary, - if Level == 0 -> - leveled_monitor:add_stat(element(1, Monitor), {penciller_inmem_cache_size_update, 0}); - el/=se -> + if + Level == 0 -> + leveled_monitor:add_stat( + element(1, Monitor), {penciller_inmem_cache_size_update, 0} + ); + el /= se -> noop end, leveled_log:log_timer( @@ -751,7 +754,9 @@ starting(cast, complete_l0startup, State) -> Summary = UpdState#state.summary, Time4 = timer:now_diff(os:timestamp(), SW4), - leveled_monitor:add_stat(element(1, Monitor), {penciller_inmem_cache_size_update, 0}), + leveled_monitor:add_stat( + element(1, Monitor), {penciller_inmem_cache_size_update, 0} + ), leveled_log:log_timer( sst08, [ActualFilename, 0, Summary#summary.max_sqn], SW0 ), From 12dab0b73e0b8084df97db1839be81f76ca3fc7d Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Thu, 30 Oct 2025 00:30:36 +0200 Subject: [PATCH 11/11] bookie_status: fetch_count_by_level --- src/leveled_monitor.erl | 45 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/src/leveled_monitor.erl b/src/leveled_monitor.erl index 1158646e..f71f2dfc 100644 --- a/src/leveled_monitor.erl +++ b/src/leveled_monitor.erl @@ -142,8 +142,15 @@ recent_putgethead_counts => { non_neg_integer(), non_neg_integer(), non_neg_integer() }, - recent_fetch_mean_level => [{pos_integer(), non_neg_integer()}] + fetch_count_by_level => #{ + reporting_fetch_level() => #{ + count => non_neg_integer(), + time => non_neg_integer() + } + } }. +-type reporting_fetch_level() :: + not_found | mem | '0' | '1' | '2' | '3' | lower. -define(AVG_COMPACTION_SCORE_OVER_MAX, 50). @@ -754,8 +761,39 @@ enriched_bookie_status(#state{ bookie_status = BS, bookie_get_timings = GT, bookie_put_timings = PT, - bookie_head_timings = HT + bookie_head_timings = HT, + pcl_fetch_timings = PFT }) -> + FCL = #{ + not_found => #{ + count => PFT#pcl_fetch_timings.notfound_count, + time => PFT#pcl_fetch_timings.notfound_time + }, + mem => #{ + count => PFT#pcl_fetch_timings.foundmem_count, + time => PFT#pcl_fetch_timings.foundmem_time + }, + '0' => #{ + count => PFT#pcl_fetch_timings.found0_count, + time => PFT#pcl_fetch_timings.found0_time + }, + '1' => #{ + count => PFT#pcl_fetch_timings.found1_count, + time => PFT#pcl_fetch_timings.found1_time + }, + '2' => #{ + count => PFT#pcl_fetch_timings.found2_count, + time => PFT#pcl_fetch_timings.found2_time + }, + '3' => #{ + count => PFT#pcl_fetch_timings.found3_count, + time => PFT#pcl_fetch_timings.found3_time + }, + lower => #{ + count => PFT#pcl_fetch_timings.foundlower_count, + time => PFT#pcl_fetch_timings.foundlower_time + } + }, BS#{ get_sample_count => GT#bookie_get_timings.sample_count, get_body_time => GT#bookie_get_timings.body_time, @@ -764,7 +802,8 @@ enriched_bookie_status(#state{ put_sample_count => PT#bookie_put_timings.sample_count, put_prep_time => PT#bookie_put_timings.prep_time, put_ink_time => PT#bookie_put_timings.ink_time, - put_mem_time => PT#bookie_put_timings.mem_time + put_mem_time => PT#bookie_put_timings.mem_time, + fetch_count_by_level => FCL }. %%%============================================================================