From 841ae01ba2957451b0110dbeb26ef6ba21d253af Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 21 Apr 2026 13:41:35 +0000 Subject: [PATCH] Add digikeeper-log event-journal client and wire /add to it MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces pkg/journal, a synchronous HTTP client for the digikeeper-log event-journal service. The client accepts a config, propagates context, fetches the schema catalog from /v1/registry at startup via a SchemaRepository with an in-memory cache, and exposes Append for journaling user-meaningful events. Wires the client into cmd/bot/main.go (gated by DIGIKEEPER_LOG_ENABLED) and rewires the /add flow so notes are persisted exclusively to digikeeper-log as note.added events — the command now prompts the user for the note text and a new text-follow-up handler catches the reply, validates, and calls journal.Append. https://claude.ai/code/session_01THYHpGkqcpPjVfbkUtFLft --- cmd/bot/configure.go | 40 +++- cmd/bot/main.go | 38 ++- deployment/.env.template | 8 +- deployment/docker-compose.yml | 6 + internal/cmd_handler/add.go | 96 +++++++- pkg/journal/client.go | 138 +++++++++++ pkg/journal/client_test.go | 399 +++++++++++++++++++++++++++++++ pkg/journal/doc.go | 22 ++ pkg/journal/errors.go | 33 +++ pkg/journal/event.go | 54 +++++ pkg/journal/options.go | 42 ++++ pkg/journal/schema_repository.go | 118 +++++++++ pkg/journal/transport.go | 174 ++++++++++++++ 13 files changed, 1150 insertions(+), 18 deletions(-) create mode 100644 pkg/journal/client.go create mode 100644 pkg/journal/client_test.go create mode 100644 pkg/journal/doc.go create mode 100644 pkg/journal/errors.go create mode 100644 pkg/journal/event.go create mode 100644 pkg/journal/options.go create mode 100644 pkg/journal/schema_repository.go create mode 100644 pkg/journal/transport.go diff --git a/cmd/bot/configure.go b/cmd/bot/configure.go index 2c33688..2398253 100644 --- a/cmd/bot/configure.go +++ b/cmd/bot/configure.go @@ -64,10 +64,21 @@ type PostgresConfig struct { PasswordFile string `env:"PASSWORD_FILE" env-default:""` } +type DigikeeperLogConfig struct { + Enabled bool `env:"ENABLED"` + BaseURL string `env:"BASE_URL" env-default:"http://localhost:8080"` + Timeout time.Duration `env:"TIMEOUT" env-default:"3s"` + MaxRetries int `env:"MAX_RETRIES" env-default:"3"` + ClientID string `env:"CLIENT_ID" env-default:"digikeeper-bot"` + Token SecretValue `env:"TOKEN" env-default:""` + TokenFile string `env:"TOKEN_FILE" env-default:""` +} + type Config struct { - Common CommonConfig `yaml:"common"` - Telegram TelegramConfig `yaml:"telegram" env-prefix:"TELEGRAM_"` - Postgres PostgresConfig `yaml:"postgres" env-prefix:"POSTGRES_"` + Common CommonConfig `yaml:"common"` + Telegram TelegramConfig `yaml:"telegram" env-prefix:"TELEGRAM_"` + Postgres PostgresConfig `yaml:"postgres" env-prefix:"POSTGRES_"` + DigikeeperLog DigikeeperLogConfig `yaml:"digikeeperlog" env-prefix:"DIGIKEEPER_LOG_"` } func (c *Config) IsDevEnv() bool { @@ -99,18 +110,23 @@ func configure() Config { log.Fatalf("Failed to read bot token: %v", err) } - if !cfg.Postgres.Enabled { - return cfg - } + if cfg.Postgres.Enabled { + err = cfg.Postgres.User.LoadFromFile(cfg.Postgres.UserFile) + if err != nil { + log.Fatalf("Failed to read postgres user: %v", err) + } - err = cfg.Postgres.User.LoadFromFile(cfg.Postgres.UserFile) - if err != nil { - log.Fatalf("Failed to read postgres user: %v", err) + err = cfg.Postgres.Password.LoadFromFile(cfg.Postgres.PasswordFile) + if err != nil { + log.Fatalf("Failed to read postgres password: %v", err) + } } - err = cfg.Postgres.Password.LoadFromFile(cfg.Postgres.PasswordFile) - if err != nil { - log.Fatalf("Failed to read postgres password: %v", err) + if cfg.DigikeeperLog.Enabled && cfg.DigikeeperLog.TokenFile != "" { + err = cfg.DigikeeperLog.Token.LoadFromFile(cfg.DigikeeperLog.TokenFile) + if err != nil { + log.Fatalf("Failed to read digikeeper-log token: %v", err) + } } return cfg diff --git a/cmd/bot/main.go b/cmd/bot/main.go index dfd7b4b..73edac3 100644 --- a/cmd/bot/main.go +++ b/cmd/bot/main.go @@ -7,9 +7,10 @@ import ( th "github.com/mymmrac/telego/telegohandler" cmdh "github.com/gitrus/digikeeper-bot/internal/cmd_handler" + "github.com/gitrus/digikeeper-bot/pkg/journal" + session "github.com/gitrus/digikeeper-bot/pkg/sessionmanager" cmdrouter "github.com/gitrus/digikeeper-bot/pkg/telego_commandrouter" tm "github.com/gitrus/digikeeper-bot/pkg/telego_middleware" - session "github.com/gitrus/digikeeper-bot/pkg/sessionmanager" ) func main() { @@ -18,6 +19,19 @@ func main() { ctx := context.Background() + var ( + events *journal.Client + err error + ) + if config.DigikeeperLog.Enabled { + events, err = initJournal(ctx, config, logger) + if err != nil { + logger.ErrorContext(ctx, "Failed to init journal client", "error", err) + return + } + defer func() { _ = events.Close() }() + } + bot, updates, err := initBot(ctx, config) if err != nil { logger.ErrorContext(ctx, "Failed to init bot: %v", "error", err) @@ -48,6 +62,15 @@ func main() { cmdHandlerGroup.BindCommandsToHandler(bh) + // Plain text messages (non-command) go to the note-submission handler so + // that users in the "add" state can finish the /add flow by typing the + // note contents. Registered after BindCommandsToHandler so command + // predicates match first. + bh.Handle(cmdh.HandleAddNoteText(usm, events), + th.AnyMessageWithText(), + th.Not(th.AnyCommand()), + ) + logger.Info("CmdHandlerGroup", "group", cmdHandlerGroup) logger.Info("Starting bot ...") @@ -57,3 +80,16 @@ func main() { return } } + +// initJournal constructs the journal client from the bot config. Callers +// should only invoke it when DigikeeperLog.Enabled is true. +func initJournal(ctx context.Context, cfg Config, logger *slog.Logger) (*journal.Client, error) { + jcfg := journal.Config{ + BaseURL: cfg.DigikeeperLog.BaseURL, + Timeout: cfg.DigikeeperLog.Timeout, + MaxRetries: cfg.DigikeeperLog.MaxRetries, + ClientID: cfg.DigikeeperLog.ClientID, + Token: cfg.DigikeeperLog.Token.String(), + } + return journal.New(ctx, jcfg, journal.WithLogger(logger)) +} diff --git a/deployment/.env.template b/deployment/.env.template index 51c502e..62a6a1d 100644 --- a/deployment/.env.template +++ b/deployment/.env.template @@ -1,7 +1,13 @@ # Bot configuration TELEGRAM_BOT_PUBLIC_URL=your_public_url_or_localhost +# digikeeper-log event-journal client (disabled by default) +# DIGIKEEPER_LOG_ENABLED=true +# DIGIKEEPER_LOG_BASE_URL=http://digikeeper-log:8080 +# DIGIKEEPER_LOG_TOKEN_FILE=/run/secrets/digikeeper_log_token + # Note: Sensitive credentials are stored in the ./secrets/ directory # ./secrets/telegram_token.txt - Your Telegram bot token # ./secrets/db_user.txt - Database username -# ./secrets/db_password.txt - Database password \ No newline at end of file +# ./secrets/db_password.txt - Database password +# ./secrets/digikeeper_log_token - Bearer token for digikeeper-log (optional) \ No newline at end of file diff --git a/deployment/docker-compose.yml b/deployment/docker-compose.yml index 793ebce..923bfa2 100644 --- a/deployment/docker-compose.yml +++ b/deployment/docker-compose.yml @@ -39,6 +39,12 @@ services: - POSTGRES_USER_FILE=/run/secrets/db_user - POSTGRES_PASSWORD_FILE=/run/secrets/db_password - POSTGRES_DB=digikeeper + - DIGIKEEPER_LOG_ENABLED=false + - DIGIKEEPER_LOG_BASE_URL=http://digikeeper-log:8080 + - DIGIKEEPER_LOG_TIMEOUT=3s + - DIGIKEEPER_LOG_MAX_RETRIES=3 + - DIGIKEEPER_LOG_CLIENT_ID=digikeeper-bot + # DIGIKEEPER_LOG_TOKEN_FILE=/run/secrets/digikeeper_log_token ports: - target: 8081 published: 8081 diff --git a/internal/cmd_handler/add.go b/internal/cmd_handler/add.go index 62dbf74..8e6b3b1 100644 --- a/internal/cmd_handler/add.go +++ b/internal/cmd_handler/add.go @@ -7,9 +7,17 @@ import ( th "github.com/mymmrac/telego/telegohandler" tu "github.com/mymmrac/telego/telegoutil" + "github.com/gitrus/digikeeper-bot/pkg/journal" session "github.com/gitrus/digikeeper-bot/pkg/sessionmanager" ) +// stateAwaitingNote marks a user session that entered /add and is now +// expected to send the note contents as the next plain-text message. +const stateAwaitingNote = "add:awaiting_note" + +// HandleAdd starts the /add flow: it transitions the user's session to +// "awaiting note" and prompts them for the note contents. The actual note +// persistence happens in HandleAddNoteText when the user replies. func HandleAdd(usm session.UserSessionManager[*session.SimpleUserSession]) th.Handler { return func(ctx *th.Context, update telego.Update) error { slog.InfoContext(ctx.Context(), "Receive /add") @@ -23,11 +31,11 @@ func HandleAdd(usm session.UserSessionManager[*session.SimpleUserSession]) th.Ha _, err = usm.Set( ctx, userID, - &session.SimpleUserSession{UserID: userID, State: "add", Version: state.Version + 1}, + &session.SimpleUserSession{UserID: userID, State: stateAwaitingNote, Version: state.Version + 1}, state.Version, ) if err != nil { - slog.ErrorContext(ctx.Context(), "Failed to set state") + slog.ErrorContext(ctx.Context(), "Failed to set state", "error", err) chatId := tu.ID(update.Message.Chat.ID) _, err = ctx.Bot().SendMessage(ctx, tu.Message( @@ -37,7 +45,87 @@ func HandleAdd(usm session.UserSessionManager[*session.SimpleUserSession]) th.Ha return err } - slog.InfoContext(ctx.Context(), "Set state", slog.String("state", state.State)) - return nil + chatID := tu.ID(update.Message.Chat.ID) + _, err = ctx.Bot().SendMessage(ctx, tu.Message( + chatID, + "What should I note down? Send the text of the note, or /cancel to abort.", + )) + return err + } +} + +// HandleAddNoteText catches the next plain-text message from a user whose +// session is in stateAwaitingNote, persists it as a note.added journal event +// on digikeeper-log, and clears the state. Users not in the awaiting-note +// state are passed through via ctx.Next so other handlers can match. +// +// journal may be nil when the journal client is disabled in config; in that +// case the text is ignored with a user-visible notice instead of being lost. +func HandleAddNoteText( + usm session.UserSessionManager[*session.SimpleUserSession], + events *journal.Client, +) th.Handler { + return func(ctx *th.Context, update telego.Update) error { + if update.Message == nil || update.Message.From == nil { + return ctx.Next(update) + } + userID := update.Message.From.ID + + state, err := usm.Fetch(ctx, userID) + if err != nil || state.State != stateAwaitingNote { + return ctx.Next(update) + } + + chatID := tu.ID(update.Message.Chat.ID) + + if events == nil { + slog.WarnContext(ctx.Context(), "journal client disabled, note dropped") + _, sendErr := ctx.Bot().SendMessage(ctx, tu.Message( + chatID, + "Note storage isn't configured right now — please try again later.", + )) + if dropErr := usm.DropActive(ctx, userID); dropErr != nil { + slog.WarnContext(ctx.Context(), "Failed to drop session", "error", dropErr) + } + return sendErr + } + + note := update.Message.Text + if note == "" { + _, err = ctx.Bot().SendMessage(ctx, tu.Message( + chatID, + "Empty note — please send the note text, or /cancel to abort.", + )) + return err + } + + res, err := events.Append(ctx.Context(), journal.NewNoteAddedEvent(userID, note)) + if err != nil { + slog.ErrorContext(ctx.Context(), "Failed to append note event", "error", err) + _, sendErr := ctx.Bot().SendMessage(ctx, tu.Message( + chatID, + "I couldn't save the note right now. Please try again.", + )) + if sendErr != nil { + return sendErr + } + return err + } + + slog.InfoContext(ctx.Context(), "Note added", + slog.String("entry_id", res.ID), + slog.String("request_id", res.RequestID), + slog.Bool("indexed", res.Indexed), + ) + + if dropErr := usm.DropActive(ctx, userID); dropErr != nil { + slog.WarnContext(ctx.Context(), "Failed to drop session after note add", "error", dropErr) + } + + _, err = ctx.Bot().SendMessage(ctx, tu.Message( + chatID, + "Noted. ✔", + )) + return err } } diff --git a/pkg/journal/client.go b/pkg/journal/client.go new file mode 100644 index 0000000..40178c2 --- /dev/null +++ b/pkg/journal/client.go @@ -0,0 +1,138 @@ +package journal + +import ( + "context" + "fmt" + "log/slog" + "net/http" + "time" +) + +// Config is the subset of configuration this package needs. The bot's +// top-level config exposes it as DigikeeperLogConfig in cmd/bot/configure.go, +// populated via the DIGIKEEPER_LOG_* env vars. +type Config struct { + BaseURL string + Timeout time.Duration + MaxRetries int + ClientID string + Token string +} + +// Client is a synchronous client for the digikeeper-log event-journal +// service. It holds a cached SchemaRepository populated at construction time. +type Client struct { + cfg Config + httpClient *http.Client + logger *slog.Logger + nowFunc func() time.Time + overrideID string + + schemas *inMemSchemaRepository +} + +// New constructs a Client. It performs two startup probes against the service: +// - GET /healthz for liveness +// - GET /v1/registry to populate the local schema cache +// +// Both probes are advisory: on failure we record a warning via the client's +// logger and continue, so the bot can start before the journal service is +// reachable. The schema cache will be empty in that case, and callers can +// call Client.Schemas().Refresh(ctx) later. +func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) { + if cfg.BaseURL == "" { + return nil, ErrJournal{Reason: "BaseURL is required", Cause: ErrValidation} + } + if cfg.Timeout <= 0 { + cfg.Timeout = 3 * time.Second + } + if cfg.MaxRetries < 0 { + cfg.MaxRetries = 0 + } + + o := options{ + httpClient: &http.Client{Timeout: cfg.Timeout}, + nowFunc: func() time.Time { return time.Now().UTC() }, + logger: slog.Default(), + } + for _, opt := range opts { + opt(&o) + } + if o.httpClient.Timeout == 0 { + o.httpClient.Timeout = cfg.Timeout + } + + c := &Client{ + cfg: cfg, + httpClient: o.httpClient, + logger: o.logger, + nowFunc: o.nowFunc, + overrideID: o.clientID, + } + c.schemas = newInMemSchemaRepository(c) + + c.preflight(ctx) + return c, nil +} + +// preflight runs best-effort startup probes. Failures are logged but do not +// prevent the client from being returned — the journal service may start +// after the bot. +func (c *Client) preflight(ctx context.Context) { + if err := c.Healthz(ctx); err != nil { + c.logger.WarnContext(ctx, "journal healthz probe failed", "error", err) + } + if err := c.schemas.Refresh(ctx); err != nil { + c.logger.WarnContext(ctx, "journal schema preflight failed", "error", err) + } +} + +// Append persists one journal event synchronously. It validates the event +// client-side, retries transient failures with jittered exponential backoff +// up to Config.MaxRetries, and honors the caller's context for cancellation +// throughout. +func (c *Client) Append(ctx context.Context, evt Event) (*AppendResult, error) { + if err := evt.Validate(); err != nil { + return nil, err + } + return c.doAppend(ctx, evt) +} + +// Healthz probes GET /healthz. Returns nil if the service responded 2xx. +func (c *Client) Healthz(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.cfg.BaseURL+"/healthz", http.NoBody) + if err != nil { + return ErrJournal{Reason: "build healthz request", Cause: err} + } + c.setRequestHeaders(req, "") + + resp, err := c.httpClient.Do(req) + if err != nil { + return ErrJournal{Reason: "healthz request failed", Cause: fmt.Errorf("%w: %w", ErrTransport, err)} + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return ErrJournal{Reason: fmt.Sprintf("healthz status %d", resp.StatusCode), Cause: ErrServer} + } + return nil +} + +// Schemas returns the cached SchemaRepository. It is never nil; a client +// whose startup fetch failed will return an empty repository until +// Schemas().Refresh succeeds. +func (c *Client) Schemas() SchemaRepository { return c.schemas } + +// Close releases any resources held by the client. Currently a no-op since +// the client holds no background goroutines, but present so callers can +// `defer client.Close()` and not rework if that changes. +func (c *Client) Close() error { return nil } + +// clientID returns the effective X-Client-Id — option override takes +// precedence over Config.ClientID. +func (c *Client) clientID() string { + if c.overrideID != "" { + return c.overrideID + } + return c.cfg.ClientID +} diff --git a/pkg/journal/client_test.go b/pkg/journal/client_test.go new file mode 100644 index 0000000..c0cfbff --- /dev/null +++ b/pkg/journal/client_test.go @@ -0,0 +1,399 @@ +package journal_test + +import ( + "context" + "encoding/json" + "errors" + "io" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/gitrus/digikeeper-bot/pkg/journal" +) + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func writeString(t *testing.T, w http.ResponseWriter, s string) { + t.Helper() + if _, err := io.WriteString(w, s); err != nil { + t.Fatalf("write response: %v", err) + } +} + +func writeJSON(t *testing.T, w http.ResponseWriter, v any) { + t.Helper() + if err := json.NewEncoder(w).Encode(v); err != nil { + t.Fatalf("encode response: %v", err) + } +} + +type serverHandlers struct { + append http.HandlerFunc + registry http.HandlerFunc + healthz http.HandlerFunc +} + +func newServer(t *testing.T, h serverHandlers) *httptest.Server { + t.Helper() + mux := http.NewServeMux() + if h.append != nil { + mux.HandleFunc("POST /v1/logs", h.append) + } + if h.registry != nil { + mux.HandleFunc("GET /v1/registry", h.registry) + } + if h.healthz != nil { + mux.HandleFunc("GET /healthz", h.healthz) + } else { + mux.HandleFunc("GET /healthz", func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + }) + } + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + return srv +} + +func defaultRegistry(t *testing.T) http.HandlerFunc { + t.Helper() + return func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + writeString(t, w, `{"schemas":[{"type":"entry","schema":{"type":"object"}}]}`) + } +} + +func newClient(t *testing.T, srv *httptest.Server) *journal.Client { + t.Helper() + cfg := journal.Config{ + BaseURL: srv.URL, + Timeout: 2 * time.Second, + MaxRetries: 2, + ClientID: "digikeeper-bot-test", + } + cli, err := journal.New(t.Context(), cfg) + assert.NoError(t, err) + return cli +} + +func appendOK(t *testing.T, status int, id, reqID string) http.HandlerFunc { + t.Helper() + return func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/vnd.api+json") + w.WriteHeader(status) + writeJSON(t, w, map[string]any{ + "meta": map[string]any{"type": "logs"}, + "data": map[string]any{ + "id": id, + "attributes": map[string]any{ + "request_id": reqID, + }, + }, + }) + } +} + +// --------------------------------------------------------------------------- +// Validation +// --------------------------------------------------------------------------- + +func TestEvent_Validate(t *testing.T) { + cases := []struct { + name string + evt journal.Event + expect error + }{ + { + name: "zero timestamp rejected", + evt: journal.Event{Tags: []string{"bot"}}, + expect: journal.ErrValidation, + }, + { + name: "empty tags and data rejected", + evt: journal.Event{Timestamp: time.Now()}, + expect: journal.ErrValidation, + }, + { + name: "ok with tags only", + evt: journal.Event{Timestamp: time.Now(), Tags: []string{"bot"}}, + }, + { + name: "ok with data only", + evt: journal.Event{Timestamp: time.Now(), Data: map[string]any{"k": "v"}}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := tc.evt.Validate() + if tc.expect == nil { + assert.NoError(t, err) + return + } + assert.Error(t, err) + assert.ErrorIs(t, err, tc.expect) + }) + } +} + +func TestNewNoteAddedEvent(t *testing.T) { + evt := journal.NewNoteAddedEvent(42, "buy milk") + assert.Equal(t, journal.TypeNoteAdded, evt.Type) + assert.Equal(t, []string{"bot", "note"}, evt.Tags) + assert.Equal(t, int64(42), evt.Data["user_id"]) + assert.Equal(t, "buy milk", evt.Data["note"]) + assert.NoError(t, evt.Validate()) +} + +// --------------------------------------------------------------------------- +// Append: happy paths +// --------------------------------------------------------------------------- + +func TestClient_Append_201(t *testing.T) { + var captured struct { + clientID string + reqID string + body journal.Event + } + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, r *http.Request) { + captured.clientID = r.Header.Get("X-Client-Id") + captured.reqID = r.Header.Get("X-Request-ID") + if err := json.NewDecoder(r.Body).Decode(&captured.body); err != nil { + t.Fatalf("decode body: %v", err) + } + appendOK(t, http.StatusCreated, "entry-1", captured.reqID)(w, r) + }, + registry: defaultRegistry(t), + }) + + cli := newClient(t, srv) + defer func() { _ = cli.Close() }() + + res, err := cli.Append(t.Context(), journal.NewNoteAddedEvent(1, "note")) + assert.NoError(t, err) + assert.Equal(t, "entry-1", res.ID) + assert.True(t, res.Indexed) + assert.NotEmpty(t, res.RequestID) + assert.Equal(t, "digikeeper-bot-test", captured.clientID) + assert.NotEmpty(t, captured.reqID) + assert.Equal(t, journal.TypeNoteAdded, captured.body.Type) +} + +func TestClient_Append_202_IndexedFalse(t *testing.T) { + srv := newServer(t, serverHandlers{ + append: appendOK(t, http.StatusAccepted, "e-2", "rid"), + registry: defaultRegistry(t), + }) + cli := newClient(t, srv) + + res, err := cli.Append(t.Context(), journal.NewNoteAddedEvent(1, "n")) + assert.NoError(t, err) + assert.Equal(t, "e-2", res.ID) + assert.False(t, res.Indexed) +} + +// --------------------------------------------------------------------------- +// Append: retries, cancellation, auth +// --------------------------------------------------------------------------- + +func TestClient_Append_RetriesOn5xxThenSucceeds(t *testing.T) { + var calls int32 + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, r *http.Request) { + n := atomic.AddInt32(&calls, 1) + if n < 3 { + w.WriteHeader(http.StatusInternalServerError) + return + } + appendOK(t, http.StatusCreated, "ok-"+r.Header.Get("X-Request-ID"), "")(w, r) + }, + registry: defaultRegistry(t), + }) + cli := newClient(t, srv) + + res, err := cli.Append(t.Context(), journal.NewNoteAddedEvent(1, "n")) + assert.NoError(t, err) + assert.NotEmpty(t, res.ID) + assert.Equal(t, int32(3), atomic.LoadInt32(&calls)) +} + +func TestClient_Append_5xxExhaustsRetries(t *testing.T) { + var calls int32 + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&calls, 1) + w.WriteHeader(http.StatusServiceUnavailable) + }, + registry: defaultRegistry(t), + }) + cli := newClient(t, srv) + + _, err := cli.Append(t.Context(), journal.NewNoteAddedEvent(1, "n")) + assert.Error(t, err) + assert.ErrorIs(t, err, journal.ErrServer) + assert.Equal(t, int32(3), atomic.LoadInt32(&calls)) // MaxRetries=2 → 3 attempts +} + +func TestClient_Append_4xxNotRetried(t *testing.T) { + var calls int32 + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&calls, 1) + w.WriteHeader(http.StatusBadRequest) + }, + registry: defaultRegistry(t), + }) + cli := newClient(t, srv) + + _, err := cli.Append(t.Context(), journal.NewNoteAddedEvent(1, "n")) + assert.Error(t, err) + assert.ErrorIs(t, err, journal.ErrServer) + assert.Equal(t, int32(1), atomic.LoadInt32(&calls)) +} + +func TestClient_Append_ContextCancelAbortsRetryLoop(t *testing.T) { + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }, + registry: defaultRegistry(t), + }) + cli := newClient(t, srv) + + ctx, cancel := context.WithCancel(t.Context()) + cancel() + _, err := cli.Append(ctx, journal.NewNoteAddedEvent(1, "n")) + assert.Error(t, err) + assert.True(t, errors.Is(err, context.Canceled), "expected ctx.Canceled, got %v", err) +} + +func TestClient_Append_InvalidEventNotSent(t *testing.T) { + var calls int32 + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, _ *http.Request) { + atomic.AddInt32(&calls, 1) + w.WriteHeader(http.StatusCreated) + }, + registry: defaultRegistry(t), + }) + cli := newClient(t, srv) + + _, err := cli.Append(t.Context(), journal.Event{}) + assert.Error(t, err) + assert.ErrorIs(t, err, journal.ErrValidation) + assert.Equal(t, int32(0), atomic.LoadInt32(&calls)) +} + +func TestClient_Append_AuthHeaderPropagation(t *testing.T) { + var got string + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, r *http.Request) { + got = r.Header.Get("Authorization") + appendOK(t, http.StatusCreated, "id", r.Header.Get("X-Request-ID"))(w, r) + }, + registry: defaultRegistry(t), + }) + + cfg := journal.Config{ + BaseURL: srv.URL, + Timeout: 2 * time.Second, + MaxRetries: 0, + ClientID: "bot", + Token: "s3cret", + } + cli, err := journal.New(t.Context(), cfg) + assert.NoError(t, err) + + _, err = cli.Append(t.Context(), journal.NewNoteAddedEvent(1, "n")) + assert.NoError(t, err) + assert.Equal(t, "Bearer s3cret", got) +} + +func TestClient_Append_NoAuthHeaderWhenTokenEmpty(t *testing.T) { + var got string + srv := newServer(t, serverHandlers{ + append: func(w http.ResponseWriter, r *http.Request) { + got = r.Header.Get("Authorization") + appendOK(t, http.StatusCreated, "id", r.Header.Get("X-Request-ID"))(w, r) + }, + registry: defaultRegistry(t), + }) + cli := newClient(t, srv) // no Token set + + _, err := cli.Append(t.Context(), journal.NewNoteAddedEvent(1, "n")) + assert.NoError(t, err) + assert.Empty(t, got) +} + +// --------------------------------------------------------------------------- +// Schema repository +// --------------------------------------------------------------------------- + +func TestClient_SchemaPreflight_CachesCatalog(t *testing.T) { + srv := newServer(t, serverHandlers{ + registry: func(w http.ResponseWriter, _ *http.Request) { + writeString(t, w, `{"schemas":[ + {"type":"entry","schema":{"k":"v"}}, + {"type":"other","schema":{"x":1}} + ]}`) + }, + }) + cli := newClient(t, srv) + + repo := cli.Schemas() + assert.NotNil(t, repo) + + entry, ok := repo.Get("entry") + assert.True(t, ok) + assert.Equal(t, "entry", entry.Type) + assert.JSONEq(t, `{"k":"v"}`, string(entry.Schema)) + + _, ok = repo.Get("missing") + assert.False(t, ok) + + all := repo.All() + assert.Len(t, all, 2) +} + +func TestClient_SchemaPreflight_FailsSoft(t *testing.T) { + srv := newServer(t, serverHandlers{ + registry: func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }, + }) + // No error: preflight is advisory. + cli := newClient(t, srv) + assert.Empty(t, cli.Schemas().All()) + + // A later Refresh succeeds once the server comes back. + srv.Config.Handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/v1/registry" { + writeString(t, w, `{"schemas":[{"type":"entry","schema":{"z":true}}]}`) + return + } + w.WriteHeader(http.StatusNotFound) + }) + + err := cli.Schemas().Refresh(t.Context()) + assert.NoError(t, err) + entry, ok := cli.Schemas().Get("entry") + assert.True(t, ok) + assert.JSONEq(t, `{"z":true}`, string(entry.Schema)) +} + +// --------------------------------------------------------------------------- +// New / config validation +// --------------------------------------------------------------------------- + +func TestNew_BaseURLRequired(t *testing.T) { + _, err := journal.New(t.Context(), journal.Config{}) + assert.Error(t, err) + assert.ErrorIs(t, err, journal.ErrValidation) +} diff --git a/pkg/journal/doc.go b/pkg/journal/doc.go new file mode 100644 index 0000000..7130cdc --- /dev/null +++ b/pkg/journal/doc.go @@ -0,0 +1,22 @@ +// Package journal is a client for the Digikeeper event-journal service +// (github.com/digikeeper/digikeeper-log). It appends user-meaningful domain +// events — structured records the user deliberately wants to keep (what +// happened, when, tagged, with arbitrary data) — to the journal over HTTP. +// +// This package is for journaling user events, not for shipping application +// diagnostics. +// +// Typical use: +// +// cli, err := journal.New(ctx, cfg) +// if err != nil { +// return err +// } +// defer cli.Close() +// +// res, err := cli.Append(ctx, journal.NewNoteAddedEvent(userID, "buy milk")) +// if err != nil { +// return err +// } +// _ = res.ID +package journal diff --git a/pkg/journal/errors.go b/pkg/journal/errors.go new file mode 100644 index 0000000..75caf69 --- /dev/null +++ b/pkg/journal/errors.go @@ -0,0 +1,33 @@ +package journal + +import ( + "errors" + "fmt" +) + +// Sentinel errors that callers can branch on with errors.Is. +var ( + // ErrValidation indicates the supplied Event failed client-side validation. + ErrValidation = errors.New("journal: validation failed") + // ErrTransport indicates a network or HTTP-level failure reaching the service. + ErrTransport = errors.New("journal: transport failure") + // ErrServer indicates the service responded with a non-2xx status. + ErrServer = errors.New("journal: server error") +) + +// ErrJournal is a wrapper that carries a human-readable reason alongside a +// sentinel cause. It mirrors the style of sessionmanager.ErrSessionManagement +// so callers across the bot see consistent error types. +type ErrJournal struct { + Reason string + Cause error +} + +func (e ErrJournal) Error() string { + if e.Cause == nil { + return fmt.Sprintf("journal: %s", e.Reason) + } + return fmt.Sprintf("journal: %s: %v", e.Reason, e.Cause) +} + +func (e ErrJournal) Unwrap() error { return e.Cause } diff --git a/pkg/journal/event.go b/pkg/journal/event.go new file mode 100644 index 0000000..70fb631 --- /dev/null +++ b/pkg/journal/event.go @@ -0,0 +1,54 @@ +package journal + +import "time" + +// Event is the wire shape accepted by POST /v1/logs on the digikeeper-log +// service. Field names and types match +// internal/httpapi/command/handler.go:AppendInput.Body on the server. +type Event struct { + Type string `json:"type,omitempty"` + Timestamp time.Time `json:"timestamp"` + Tags []string `json:"tags"` + Data map[string]any `json:"data"` +} + +// Validate mirrors the server-side Resolve() check so bad events fail without +// a round-trip. The server requires a non-zero timestamp AND at least one of +// tags or data to be present. +func (e Event) Validate() error { + if e.Timestamp.IsZero() { + return ErrJournal{Reason: "timestamp is required", Cause: ErrValidation} + } + if len(e.Tags) == 0 && len(e.Data) == 0 { + return ErrJournal{Reason: "at least one of tags or data must be provided", Cause: ErrValidation} + } + return nil +} + +// AppendResult describes a successfully persisted journal entry. +type AppendResult struct { + ID string + Indexed bool + RequestID string +} + +// TypeNoteAdded is the provisional type string for "a note was added via the +// bot." The digikeeper-log service does not publish a canonical type +// taxonomy; when it does, this constant is the single point of change. +const TypeNoteAdded = "note.added" + +// NewNoteAddedEvent builds a journal Event for a note that the user added +// through the bot. It stamps the current time via time.Now; callers that +// want a deterministic clock should construct the Event directly or use +// WithNowFunc on the client. +func NewNoteAddedEvent(userID int64, note string) Event { + return Event{ + Type: TypeNoteAdded, + Timestamp: time.Now().UTC(), + Tags: []string{"bot", "note"}, + Data: map[string]any{ + "user_id": userID, + "note": note, + }, + } +} diff --git a/pkg/journal/options.go b/pkg/journal/options.go new file mode 100644 index 0000000..e534e6c --- /dev/null +++ b/pkg/journal/options.go @@ -0,0 +1,42 @@ +package journal + +import ( + "log/slog" + "net/http" + "time" +) + +// Option customizes a Client at construction time. +type Option func(*options) + +type options struct { + httpClient *http.Client + nowFunc func() time.Time + logger *slog.Logger + clientID string +} + +// WithHTTPClient overrides the underlying *http.Client. Useful in tests +// (httptest.NewServer().Client()) and when a caller needs custom transport +// settings (proxies, timeouts beyond the request level, etc.). +func WithHTTPClient(hc *http.Client) Option { + return func(o *options) { o.httpClient = hc } +} + +// WithNowFunc overrides the clock used by typed event constructors that call +// into the client. Primarily for deterministic tests. +func WithNowFunc(nf func() time.Time) Option { + return func(o *options) { o.nowFunc = nf } +} + +// WithLogger overrides the slog.Logger the client uses for its own +// diagnostics. Defaults to slog.Default(). +func WithLogger(l *slog.Logger) Option { + return func(o *options) { o.logger = l } +} + +// WithClientID overrides the X-Client-Id header value. Defaults to the +// ClientID in the Config; WithClientID wins if both are set. +func WithClientID(id string) Option { + return func(o *options) { o.clientID = id } +} diff --git a/pkg/journal/schema_repository.go b/pkg/journal/schema_repository.go new file mode 100644 index 0000000..cfa3c1a --- /dev/null +++ b/pkg/journal/schema_repository.go @@ -0,0 +1,118 @@ +package journal + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" +) + +// SchemaEntry is one schema published by the service under GET /v1/registry. +// The Schema field is the raw JSON-Schema document. +type SchemaEntry struct { + Type string `json:"type"` + Schema json.RawMessage `json:"schema"` +} + +// SchemaRepository is the read-only view callers use to look up the entry +// schemas the service publishes. The repository is populated from the server +// at startup and cached in memory for the process lifetime; Refresh re-pulls +// from the server on demand. +type SchemaRepository interface { + Get(entryType string) (SchemaEntry, bool) + All() []SchemaEntry + Refresh(ctx context.Context) error +} + +// inMemSchemaRepository is the default SchemaRepository: a goroutine-safe +// map backed by a single upstream fetch. +type inMemSchemaRepository struct { + fetcher schemaFetcher + + mu sync.RWMutex + byType map[string]SchemaEntry + ordered []string +} + +type schemaFetcher interface { + fetchSchemas(ctx context.Context) ([]SchemaEntry, error) +} + +func newInMemSchemaRepository(f schemaFetcher) *inMemSchemaRepository { + return &inMemSchemaRepository{ + fetcher: f, + byType: make(map[string]SchemaEntry), + } +} + +func (r *inMemSchemaRepository) Get(entryType string) (SchemaEntry, bool) { + r.mu.RLock() + defer r.mu.RUnlock() + s, ok := r.byType[entryType] + return s, ok +} + +func (r *inMemSchemaRepository) All() []SchemaEntry { + r.mu.RLock() + defer r.mu.RUnlock() + out := make([]SchemaEntry, 0, len(r.ordered)) + for _, t := range r.ordered { + out = append(out, r.byType[t]) + } + return out +} + +func (r *inMemSchemaRepository) Refresh(ctx context.Context) error { + entries, err := r.fetcher.fetchSchemas(ctx) + if err != nil { + return err + } + + byType := make(map[string]SchemaEntry, len(entries)) + ordered := make([]string, 0, len(entries)) + for _, e := range entries { + byType[e.Type] = e + ordered = append(ordered, e.Type) + } + + r.mu.Lock() + r.byType = byType + r.ordered = ordered + r.mu.Unlock() + return nil +} + +// schemaListResponse matches the body returned by GET /v1/registry: +// +// {"schemas": [{"type": "...", "schema": {...}}, ...]} +type schemaListResponse struct { + Schemas []SchemaEntry `json:"schemas"` +} + +// fetchSchemas is the concrete HTTP fetch used by Client to populate the +// repository. It lives on the Client rather than the repository so the +// repository stays free of HTTP plumbing. +func (c *Client) fetchSchemas(ctx context.Context) ([]SchemaEntry, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.cfg.BaseURL+"/v1/registry", http.NoBody) + if err != nil { + return nil, ErrJournal{Reason: "build registry request", Cause: err} + } + c.setRequestHeaders(req, "") + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, ErrJournal{Reason: "registry request failed", Cause: fmt.Errorf("%w: %w", ErrTransport, err)} + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return nil, ErrJournal{Reason: fmt.Sprintf("registry status %d", resp.StatusCode), Cause: ErrServer} + } + + var body schemaListResponse + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + return nil, ErrJournal{Reason: "decode registry response", Cause: err} + } + return body.Schemas, nil +} diff --git a/pkg/journal/transport.go b/pkg/journal/transport.go new file mode 100644 index 0000000..bbde902 --- /dev/null +++ b/pkg/journal/transport.go @@ -0,0 +1,174 @@ +package journal + +import ( + "bytes" + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +const ( + contentType = "application/json" + headerClientID = "X-Client-Id" + headerReqID = "X-Request-ID" + headerAuth = "Authorization" +) + +// appendResponse mirrors the server's envelope from +// internal/httpapi/resource.go: {meta: {...}, data: {id, attributes: {...}}}. +// Only the fields the client actually returns to callers are deserialized. +type appendResponse struct { + Data struct { + ID string `json:"id"` + Attributes struct { + RequestID string `json:"request_id"` + } `json:"attributes"` + } `json:"data"` +} + +// doAppend sends one POST /v1/logs attempt with retries. +func (c *Client) doAppend(ctx context.Context, evt Event) (*AppendResult, error) { + body, err := json.Marshal(evt) + if err != nil { + return nil, ErrJournal{Reason: "marshal event", Cause: err} + } + + requestID, err := newRequestID() + if err != nil { + return nil, ErrJournal{Reason: "generate request id", Cause: err} + } + + var lastErr error + for attempt := 0; attempt <= c.cfg.MaxRetries; attempt++ { + if err := ctx.Err(); err != nil { + return nil, ErrJournal{Reason: "context cancelled", Cause: err} + } + + result, retryable, err := c.appendOnce(ctx, body, requestID) + if err == nil { + return result, nil + } + lastErr = err + + if !retryable || attempt == c.cfg.MaxRetries { + return nil, lastErr + } + if err := sleepWithCtx(ctx, backoff(attempt)); err != nil { + return nil, ErrJournal{Reason: "context cancelled during backoff", Cause: err} + } + } + return nil, lastErr +} + +// appendOnce performs a single HTTP round-trip. The returned bool reports +// whether the error (if any) is retryable. +func (c *Client) appendOnce(ctx context.Context, body []byte, requestID string) (*AppendResult, bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.cfg.BaseURL+"/v1/logs", bytes.NewReader(body)) + if err != nil { + return nil, false, ErrJournal{Reason: "build append request", Cause: err} + } + req.Header.Set("Content-Type", contentType) + c.setRequestHeaders(req, requestID) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, true, ErrJournal{Reason: "append request failed", Cause: fmt.Errorf("%w: %w", ErrTransport, err)} + } + defer func() { _ = resp.Body.Close() }() + + switch resp.StatusCode { + case http.StatusCreated, http.StatusAccepted: + result, err := parseAppendResponse(resp, requestID) + return result, false, err + case http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound, http.StatusUnprocessableEntity: + return nil, false, readErrorResponse(resp) + default: + if resp.StatusCode >= 500 { + return nil, true, readErrorResponse(resp) + } + return nil, false, readErrorResponse(resp) + } +} + +func parseAppendResponse(resp *http.Response, requestID string) (*AppendResult, error) { + var body appendResponse + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { + return nil, ErrJournal{Reason: "decode append response", Cause: err} + } + returnedReqID := body.Data.Attributes.RequestID + if returnedReqID == "" { + returnedReqID = requestID + } + return &AppendResult{ + ID: body.Data.ID, + Indexed: resp.StatusCode == http.StatusCreated, + RequestID: returnedReqID, + }, nil +} + +func readErrorResponse(resp *http.Response) error { + snippet, err := io.ReadAll(io.LimitReader(resp.Body, 1024)) + reason := fmt.Sprintf("status %d", resp.StatusCode) + if err == nil && len(snippet) > 0 { + reason = fmt.Sprintf("%s: %s", reason, bytes.TrimSpace(snippet)) + } + return ErrJournal{Reason: reason, Cause: ErrServer} +} + +func (c *Client) setRequestHeaders(req *http.Request, requestID string) { + clientID := c.clientID() + if clientID != "" { + req.Header.Set(headerClientID, clientID) + } + if requestID != "" { + req.Header.Set(headerReqID, requestID) + } + if c.cfg.Token != "" { + req.Header.Set(headerAuth, "Bearer "+c.cfg.Token) + } + req.Header.Set("Accept", "application/vnd.api+json, application/json") +} + +func newRequestID() (string, error) { + buf := make([]byte, 16) + if _, err := rand.Read(buf); err != nil { + return "", err + } + return hex.EncodeToString(buf), nil +} + +func backoff(attempt int) time.Duration { + // 100ms, 200ms, 400ms, 800ms ... capped at 2s. + base := 100 * time.Millisecond + d := base << attempt + if d > 2*time.Second { + d = 2 * time.Second + } + // Cheap jitter: ± 25%. If rand fails we fall back to the un-jittered value. + jitter := int64(d / 4) + var j [1]byte + if _, err := rand.Read(j[:]); err != nil { + return d + } + off := (int64(j[0]) - 128) * jitter / 128 + return d + time.Duration(off) +} + +func sleepWithCtx(ctx context.Context, d time.Duration) error { + if d <= 0 { + return nil + } + t := time.NewTimer(d) + defer t.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + return nil + } +}