Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 139 additions & 19 deletions internal/services/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package services

import (
"context"
"errors"
"fmt"
"log"
"strings"
Expand Down Expand Up @@ -97,9 +98,12 @@ type AuditService struct {
logChan chan *models.AuditLog

// Batch buffer
batchBuffer []*models.AuditLog
batchMutex sync.Mutex
batchTicker *time.Ticker
batchBuffer []*models.AuditLog
batchMutex sync.Mutex
batchTicker *time.Ticker
maxRetryBuffer int // cap for pending-retry records (default: 500)
flushFailCount int // > 0 while in retry state (consecutive flush failures)
lastFlushDropped bool // set true when records are dropped without being written

// Graceful shutdown
wg sync.WaitGroup
Expand All @@ -117,11 +121,12 @@ func NewAuditService(s core.Store, bufferSize int) *AuditService {
}

service := &AuditService{
store: s,
bufferSize: bufferSize,
logChan: make(chan *models.AuditLog, bufferSize),
batchBuffer: make([]*models.AuditLog, 0, 100),
eventsDropped: getAuditEventsDroppedCounter(),
store: s,
bufferSize: bufferSize,
logChan: make(chan *models.AuditLog, bufferSize),
batchBuffer: make([]*models.AuditLog, 0, 100),
maxRetryBuffer: 500, // 5x the normal batch size
eventsDropped: getAuditEventsDroppedCounter(),
}

service.batchTicker = time.NewTicker(1 * time.Second)
Expand Down Expand Up @@ -162,9 +167,33 @@ func (s *AuditService) addToBatch(log *models.AuditLog) {

s.batchBuffer = append(s.batchBuffer, log)

// Flush if batch is full (100 entries)
if len(s.batchBuffer) >= 100 {
s.flushBatchUnsafe()
if s.flushFailCount == 0 {
// Normal path: flush when the batch reaches 100 entries.
if len(s.batchBuffer) >= 100 {
s.flushBatchUnsafe()
}
return
}

// Retry state: the last flush failed, so size-triggered flushes are
// suppressed to avoid advancing flushFailCount once-per-event. The ticker
// is the sole retry driver. However, the maxRetryBuffer cap must still be
// enforced here — otherwise sustained traffic can drain logChan into
// batchBuffer far faster than the ticker fires, growing memory without bound.
maxBuf := s.maxRetryBuffer
if maxBuf <= 0 {
maxBuf = 500
}
if len(s.batchBuffer) > maxBuf {
drop := len(s.batchBuffer) - maxBuf
s.eventsDropped.Add(float64(drop))
s.batchBuffer = s.batchBuffer[drop:]
s.lastFlushDropped = true
// Do NOT reset flushFailCount — the DB may still be down.
// Resetting would re-enter the normal flush path on the next event:
// since the buffer stays above 100 entries, addToBatch would call
// flushBatchUnsafe immediately, causing a write attempt roughly every
// other event under sustained traffic and hammering an already failing DB.
}
}

Expand All @@ -175,22 +204,46 @@ func (s *AuditService) flushBatch() {
s.flushBatchUnsafe()
}

// flushBatchUnsafe flushes the batch buffer without locking (caller must hold lock)
// flushBatchUnsafe flushes the batch buffer without locking (caller must hold lock).
// On success the buffer is cleared. On error the records are retained for the next
// flush tick. If the buffer exceeds maxRetryBuffer the oldest records are dropped
// to prevent unbounded memory growth during a sustained DB outage.
func (s *AuditService) flushBatchUnsafe() {
if len(s.batchBuffer) == 0 {
return
}

// Copy buffer for writing
// Guard: zero value is safe — treat as default.
if s.maxRetryBuffer <= 0 {
s.maxRetryBuffer = 500
}

toWrite := make([]*models.AuditLog, len(s.batchBuffer))
copy(toWrite, s.batchBuffer)

// Clear buffer
s.batchBuffer = s.batchBuffer[:0]

if err := s.store.CreateAuditLogBatch(toWrite); err != nil {
Comment thread
appleboy marked this conversation as resolved.
Comment thread
appleboy marked this conversation as resolved.
Comment thread
appleboy marked this conversation as resolved.
Comment thread
appleboy marked this conversation as resolved.
Comment thread
appleboy marked this conversation as resolved.
log.Printf("Failed to write audit log batch: %v", err)
s.flushFailCount++
Comment thread
appleboy marked this conversation as resolved.
Comment thread
appleboy marked this conversation as resolved.
log.Printf(
"WARNING: failed to write audit log batch (%d records, attempt %d), will retry: %v",
len(toWrite), s.flushFailCount, err,
)

// Buffer still holds the records — enforce the cap so memory stays bounded.
if len(s.batchBuffer) > s.maxRetryBuffer {
drop := len(s.batchBuffer) - s.maxRetryBuffer
log.Printf("WARNING: audit retry buffer full, dropping %d oldest records", drop)
s.eventsDropped.Add(float64(drop))
s.batchBuffer = s.batchBuffer[drop:]
Comment thread
appleboy marked this conversation as resolved.
s.lastFlushDropped = true
}

return
Comment thread
appleboy marked this conversation as resolved.
Comment thread
appleboy marked this conversation as resolved.
}

// Success — clear only after confirmed write.
s.flushFailCount = 0
s.lastFlushDropped = false
s.batchBuffer = s.batchBuffer[:0]
}

// clampToColumn returns s unchanged when it already fits within limit runes,
Expand Down Expand Up @@ -259,6 +312,20 @@ func (s *AuditService) buildAuditLog(
}
entry.Details = maskSensitiveDetails(entry.Details)

// Validate that Details can be serialized to JSON before the record enters
// the batch buffer. An unserializable value would cause CreateAuditLogBatch
// to fail with a permanent error on every retry, blocking all subsequent
// audit events indefinitely. Replace bad details with an error indicator so
// the row can always be written.
if _, err := entry.Details.Value(); err != nil {
log.Printf(
"WARNING: audit details for event %q are not JSON-serializable, replacing with error indicator: %v",
entry.EventType,
err,
)
entry.Details = models.AuditDetails{"_serialization_error": err.Error()}
}

// Clamp string fields to their varchar column widths. Values already within
// the limit are preserved verbatim; only over-long values are truncated
// (with an ellipsis). ActorUsername/ActorFullName mirror their unbounded
Expand All @@ -268,6 +335,21 @@ func (s *AuditService) buildAuditLog(
entry.RequestPath = clampToColumn(entry.RequestPath, 500)
entry.ActorUsername = clampToColumn(entry.ActorUsername, 100)
entry.ActorFullName = clampToColumn(entry.ActorFullName, 100)
entry.ResourceName = clampToColumn(entry.ResourceName, 255)
entry.Action = clampToColumn(entry.Action, 255)

// ActorUserID is varchar(36). Machine identity strings such as
// "client:<uuid>" (43 chars) exceed that limit on Postgres strict mode.
// Log the full value so operators retain visibility, then hard-truncate
// to prevent a permanent INSERT error from poisoning the retry buffer.
// TODO: widen the column in a follow-up migration.
if len(entry.ActorUserID) > 36 {
log.Printf(
"WARNING: audit ActorUserID %q exceeds varchar(36), truncating for event %q",
entry.ActorUserID, entry.EventType,
)
entry.ActorUserID = entry.ActorUserID[:36]
}

// RequestMethod is stored in a varchar(10) column. Preserve values up to
// the full column width and hard-truncate anything longer without adding
Expand Down Expand Up @@ -370,11 +452,49 @@ func (s *AuditService) Shutdown(ctx context.Context) error {

select {
case <-done:
log.Println("Audit service shut down gracefully")
return nil
case <-ctx.Done():
return fmt.Errorf("audit service shutdown timeout: %w", ctx.Err())
}

// The worker performs a single flushBatch on channel close. If that flush
// failed (transient DB error), records remain in batchBuffer with no ticker
// left to retry them. Attempt one final flush here, honouring the caller's
// deadline so a stalled DB cannot block Shutdown past the timeout.
flushDone := make(chan error, 1)
go func() {
s.flushBatch()
s.batchMutex.Lock()
pending := len(s.batchBuffer)
dropped := s.lastFlushDropped
s.batchMutex.Unlock()
switch {
case pending > 0:
flushDone <- fmt.Errorf(
"audit service shutdown: %d records could not be flushed to the database",
pending,
)
case dropped:
// Buffer is empty but only because records were dropped after exhausting
// retries — not because they were successfully written.
flushDone <- errors.New(
"audit service shutdown: pending audit records were dropped after exhausting retry attempts",
)
default:
flushDone <- nil
}
}()

select {
case err := <-flushDone:
if err != nil {
return err
}
case <-ctx.Done():
return fmt.Errorf("audit service shutdown timeout during final flush: %w", ctx.Err())
}

log.Println("Audit service shut down gracefully")
return nil
}

// maskSensitiveDetails masks sensitive information in audit log details
Expand Down
Loading
Loading