Documentation
¶
Overview ¶
Package audit — BufferedAsyncLogger.
BufferedAsyncLogger wraps any audit.Logger with a bounded channel and a pool of N worker goroutines that drain the channel into the underlying Log call. The wrapper exists to close three audit findings at once:
- R2 S5 — synchronous flush on the request hot path (now: workers absorb I/O latency; Log() returns after a channel send / overflow append).
- R2 S9 — silent drop on saturation. The channel is bounded; on full, entries are persisted to a JSON-Lines overflow file (mode 0600) and a recovery goroutine drains the overflow back into the channel when capacity returns. Entries are durable across the saturation boundary; they are NOT ordered across that boundary.
- R4 E3 — JSON encode + file write happen under FileLogger.mu. With the wrapper, encode runs on workers; the underlying mutex hold shrinks to the actual encode+write, and the request goroutine no longer waits on the audit mutex at all (best case: enqueue + return).
Concurrency contract:
- Log MUST NOT block. Path A: non-blocking channel send. Path B: append a JSON line to the overflow file. Either path returns nil to the caller. The only error Log can return is "queue full AND overflow append failed" — both legs of durability gone.
- Close is idempotent (sync.Once). It closes the recovery goroutine's stop channel, closes the work queue (workers drain remaining entries), and then runs a one-shot best-effort overflow drain bounded by a short timeout.
- The recovery goroutine runs at RecoveryInterval (default 5s). On each tick it atomically renames the overflow file to a `.draining.<ts>` sibling so concurrent Log() callers do not see partially drained state, then reads each line, attempts a non-blocking enqueue, and re-spills any line that would have blocked into a fresh overflow file.
- Counters are exposed via getters that return atomic loads. Callers see a value at *some* point in time, not a snapshot across all three.
Notifier-side spool-to-disk is the symmetrical R7 E5 finding for the dispatcher. The audit-side path is closed by this wrapper; the notifier equivalent is tracked separately under v0.6 (see TODO at end of file).
Index ¶
- Constants
- func ReplayFrom(auditPath string, cp *Checkpoint, fn func(Entry)) (int64, error)
- func WriteCheckpoint(auditPath string, cp Checkpoint) error
- type BufferedAsyncLogger
- func (b *BufferedAsyncLogger) Close() error
- func (b *BufferedAsyncLogger) DrainedFromOverflow() uint64
- func (b *BufferedAsyncLogger) DroppedToOverflow() uint64
- func (b *BufferedAsyncLogger) Log(e Entry) error
- func (b *BufferedAsyncLogger) Query(filter QueryFilter) ([]Entry, error)
- func (b *BufferedAsyncLogger) QueueDepth() int
- type BufferedAsyncOpts
- type Checkpoint
- type Entry
- type FileLogger
- type Logger
- type MetaRecord
- type QueryFilter
- type RotationConfig
- type SQLiteLogger
Constants ¶
const ArchiveTimestampFormat = "20060102T150405Z"
ArchiveTimestampFormat is the format used to suffix archived audit files. Lexicographic order matches chronological order, which lets prune walk os.ReadDir without parsing each name back to a time.Time.
const CheckpointSuffix = ".replay-checkpoint"
CheckpointSuffix is appended to an audit log path to produce the companion checkpoint file written by the startup replay seeder.
const CurrentSchemaVersion = 2
CurrentSchemaVersion is the schema version produced by this binary. Readers accept N-1 (headerless v1 files written by v0.4.0) transparently and refuse N+1 with a clear error.
const DefaultFilePermissions = 0600
DefaultFilePermissions is the Unix file mode for newly created audit log files. Restricted to owner-only since audit logs may contain sensitive operational data.
const TransportLLMAPIProxy = "llm_api_proxy"
TransportLLMAPIProxy is the canonical transport string reserved for Phase 4C's LLM API Proxy. Stamped by future code; included here so readers/dashboards can pre-recognise the value.
const TransportMCPGateway = "mcp_gateway"
TransportMCPGateway is the canonical transport string for entries produced via the MCP Gateway (cmd/agentguard-mcp-gateway).
const TransportSDK = "sdk"
TransportSDK is the canonical transport string for SDK callers (Python / TypeScript) that do not stamp meta["transport"]. Pre-v0.5 audit entries also default to this value via EffectiveTransport.
Variables ¶
This section is empty.
Functions ¶
func ReplayFrom ¶ added in v0.5.0
func ReplayFrom(auditPath string, cp *Checkpoint, fn func(Entry)) (int64, error)
ReplayFrom scans auditPath starting at cp.Offset (or 0 if cp is nil or stale) and invokes fn for every valid entry. Returns the end-of-file offset reached during the scan — the caller should persist this back as the next checkpoint so subsequent boots skip work already done.
Staleness detection: if cp.AuditSize exceeds the current file size, the file has been truncated or rotated; we discard the checkpoint and rescan from the beginning so counters stay accurate.
Missing audit file: returns (0, nil) — a fresh install has nothing to replay, and that is not an error.
func WriteCheckpoint ¶ added in v0.5.0
func WriteCheckpoint(auditPath string, cp Checkpoint) error
WriteCheckpoint persists cp atomically via write-then-rename. A partial write crash therefore leaves either the old checkpoint or no change — never a half-written file.
Types ¶
type BufferedAsyncLogger ¶ added in v0.5.0
type BufferedAsyncLogger struct {
// contains filtered or unexported fields
}
BufferedAsyncLogger wraps an underlying Logger with a bounded queue, worker pool, and disk-overflow spill path. See package doc for the concurrency contract.
func NewBufferedAsyncLogger ¶ added in v0.5.0
func NewBufferedAsyncLogger(underlying Logger, opts BufferedAsyncOpts) (*BufferedAsyncLogger, error)
NewBufferedAsyncLogger constructs a buffered async wrapper around underlying. The parent directory of OverflowPath is created if missing (mode 0700) so the operator does not have to pre-provision it.
func (*BufferedAsyncLogger) Close ¶ added in v0.5.0
func (b *BufferedAsyncLogger) Close() error
Close drains the queue and stops all goroutines. Safe to call multiple times (sync.Once). Close also runs a one-shot best-effort overflow drain bounded by closeFlushTimeout so a small backlog at shutdown still reaches the underlying logger.
Close does NOT close the underlying logger — the caller owns that lifecycle (typically `defer logger.Close()` in cmd/agentguard/main.go, after the buffered wrapper's defer fires).
We DO NOT close b.queue: with concurrent Log() callers in flight, a `close(channel)` would race a `chansend`. Instead we signal shutdown via b.closed and let workers exit on a `select` that picks up either a queued entry or the close signal. Log() also reads b.closed first and routes to the overflow file once shutdown has begun.
func (*BufferedAsyncLogger) DrainedFromOverflow ¶ added in v0.5.0
func (b *BufferedAsyncLogger) DrainedFromOverflow() uint64
DrainedFromOverflow returns the lifetime count of entries the recovery goroutine successfully pushed back into the queue from the overflow file.
func (*BufferedAsyncLogger) DroppedToOverflow ¶ added in v0.5.0
func (b *BufferedAsyncLogger) DroppedToOverflow() uint64
DroppedToOverflow returns the lifetime count of entries that were spilled to the overflow file because the queue was full or the underlying Log call returned an error.
func (*BufferedAsyncLogger) Log ¶ added in v0.5.0
func (b *BufferedAsyncLogger) Log(e Entry) error
Log enqueues an entry for async write. Never blocks: on saturation the entry is appended to the overflow file and dropToOverflow is incremented. Returns an error only when both legs of durability fail (queue full AND overflow append failed), which means the entry is lost.
After Close has been called, Log spills directly to the overflow file rather than touching the (now-closed) queue. This is the documented behavior for the racy "Log called concurrently with Close" path: the entry is durable on disk and a future process startup will pick it up (once the recovery loop is reinstated, which is the operator's restart-the-binary remediation).
func (*BufferedAsyncLogger) Query ¶ added in v0.5.0
func (b *BufferedAsyncLogger) Query(filter QueryFilter) ([]Entry, error)
Query passes through to the underlying logger. v0.6 may merge overflow file results so a query during a saturation event returns a complete view; for v0.5 we accept the gap (entries in the spill file are not yet in the underlying logger's index) — the recovery goroutine closes the gap within RecoveryInterval.
func (*BufferedAsyncLogger) QueueDepth ¶ added in v0.5.0
func (b *BufferedAsyncLogger) QueueDepth() int
QueueDepth returns the approximate number of entries sitting in the in-memory queue. Returned value is an atomic load of an int64 counter maintained by Log() / workerLoop(); it can briefly drift past the channel's actual len at a few hundred RPS but converges quickly.
type BufferedAsyncOpts ¶ added in v0.5.0
type BufferedAsyncOpts struct {
QueueSize int // default 1024 if 0
Workers int // default 4 if 0
OverflowPath string // required (caller derives, e.g. "<auditPath>.overflow.jsonl")
RecoveryInterval time.Duration // default 5s if 0; mainly exposed for tests
}
BufferedAsyncOpts configures NewBufferedAsyncLogger.
Zero values for QueueSize, Workers, and RecoveryInterval are replaced with documented defaults. OverflowPath is required — the caller picks the location (typically `<auditPath>.overflow.jsonl`) so the operator's retention/backup tooling sees the spill file alongside the live log.
type Checkpoint ¶ added in v0.5.0
Checkpoint records how far the startup seeder scanned into the audit log, so the next boot can resume from the stored offset instead of re-reading gigabytes of history.
AuditSize is the file size when the checkpoint was written. If the current audit file is smaller at the next boot, the log was truncated or rotated and the offset must be discarded — we fall back to scanning from zero.
func ReadCheckpoint ¶ added in v0.5.0
func ReadCheckpoint(auditPath string) (*Checkpoint, error)
ReadCheckpoint loads the checkpoint for auditPath. Returns (nil, nil) when the checkpoint file is missing — this is the expected first-boot state and must not be treated as an error. A corrupt checkpoint is also downgraded to (nil, nil) so an unreadable marker simply triggers a full rescan rather than aborting startup.
type Entry ¶
type Entry struct {
Timestamp time.Time `json:"timestamp"`
SessionID string `json:"session_id"`
AgentID string `json:"agent_id"`
Request policy.ActionRequest `json:"request"`
Result policy.CheckResult `json:"result"`
DurationMs int64 `json:"duration_ms"`
Transport string `json:"transport,omitempty"`
}
Entry represents a single audit log record.
Transport identifies the integration path that produced this entry. One of "sdk" (default; SDK call), "mcp_gateway" (MCP Gateway tool call), "llm_api_proxy" (Phase 4C), or another gateway-defined value. Empty on unmarshal of pre-v0.5 entries; readers MUST default to EffectiveTransport() when consuming the field. The field is additive and does NOT bump the audit schema_version (still 2). See docs/PROXY_ARCHITECTURE.md § "Audit transport tag".
func (Entry) EffectiveTransport ¶ added in v0.5.0
EffectiveTransport returns e.Transport if set, otherwise the SDK default. Use when reading audit entries that may have been written by a pre-v0.5 binary (no Transport field on disk).
type FileLogger ¶
type FileLogger struct {
// contains filtered or unexported fields
}
FileLogger writes audit entries as JSON lines to a file.
Rotation: when rotCfg.MaxSize > 0, every Log() call stats the underlying file after the write and hands off to rotateLocked() once the live file meets or exceeds the threshold. Rotation is opt-in at v0.4.1 (wired via NewFileLoggerWithRotation); callers using the zero-rotation NewFileLogger keep v0.4.0's unbounded-growth behaviour.
func NewFileLogger ¶
func NewFileLogger(path string) (*FileLogger, error)
NewFileLogger creates a new file-based audit logger.
Schema v2: if the target file does not exist or is empty, the logger writes a single {"_meta":{"schema_version":2,...}} header line before any entries. Existing non-empty files are left alone — the v0.4.0_to_v0.4.1 migration is responsible for rewriting legacy (headerless, v1) files. Query() tolerates both cases transparently.
func NewFileLoggerWithRotation ¶ added in v0.5.0
func NewFileLoggerWithRotation(path string, cfg RotationConfig) (*FileLogger, error)
NewFileLoggerWithRotation is NewFileLogger + rotation policy. The configuration is captured at construction; mutating it after the fact has no effect.
func (*FileLogger) Close ¶
func (l *FileLogger) Close() error
Close flushes and closes the log file.
func (*FileLogger) Log ¶
func (l *FileLogger) Log(entry Entry) error
Log writes an audit entry to the log file.
When RotationConfig.MaxSize is non-zero, the underlying file is stat'd after a successful encode and rotateLocked() fires if the live file has reached the size threshold. Rotation errors are returned to the caller because the entry that triggered rotation has already been persisted — a rotation failure here is a signal the operator needs to act on, not a write that silently dropped data.
func (*FileLogger) Path ¶ added in v0.5.0
func (l *FileLogger) Path() string
Path returns the filesystem path of the underlying audit log, or "" if the FileLogger has no open file. Used by callers that need to co-locate auxiliary files (checkpoint, rotation markers) next to the log.
func (*FileLogger) Query ¶
func (l *FileLogger) Query(filter QueryFilter) ([]Entry, error)
Query reads the log file and filters entries.
Lock scope: we hold l.mu only long enough to capture the current file path (effectively immutable, but we still snapshot it under the lock for safety) and open a read handle. The actual scan runs WITHOUT the lock so concurrent Log() writes are not blocked by long queries.
This is safe because:
- FileLogger.file is opened in O_APPEND mode, so writes go to EOF atomically (on POSIX, append writes of ≤PIPE_BUF are atomic).
- Our read handle captures a consistent size at open; extra bytes written after we open are simply not seen by this query.
- Scanner discards partial lines implicitly (each line terminated by \n).
TODO(v0.6, #audit-perf-mutex): Query scans the full file linearly. For production workloads with large audit logs, replace with a database-backed implementation (SQLite or PostgreSQL). Pre-existing carry-over from v0.4.x; the v0.5 audit review explicitly noted this should NOT block the v0.5 release — see docs/SLO.md § "Deferred performance work".
type Logger ¶
type Logger interface {
Log(entry Entry) error
Query(filter QueryFilter) ([]Entry, error)
Close() error
}
Logger is the interface for audit logging.
type MetaRecord ¶ added in v0.5.0
type MetaRecord struct {
SchemaVersion int `json:"schema_version"`
CreatedAt time.Time `json:"created_at"`
RotatedFrom string `json:"rotated_from,omitempty"`
}
MetaRecord is the shape of the first line of a schema-v2 audit file. Optional RotatedFrom is set when the file is the successor in a rotation chain — the replay walker follows it to stitch history back together.
func ReadMeta ¶ added in v0.5.0
func ReadMeta(path string) (*MetaRecord, error)
ReadMeta returns the schema-v2 meta record from the first line of path. Returns (nil, nil) if the file is headerless (v0.4.0 legacy format) or empty, so callers can distinguish "no header" from "read error".
Refuses files whose schema_version is newer than CurrentSchemaVersion — running an old binary against a newer file is an obvious operator error.
type QueryFilter ¶
type QueryFilter struct {
AgentID string `json:"agent_id,omitempty"`
SessionID string `json:"session_id,omitempty"`
Decision string `json:"decision,omitempty"`
Scope string `json:"scope,omitempty"`
Since *time.Time `json:"since,omitempty"`
Limit int `json:"limit,omitempty"`
Offset int `json:"offset,omitempty"`
// Transport filters by the integration path that produced the
// entry ("sdk", "mcp_gateway", "llm_api_proxy"). Compared against
// Entry.EffectiveTransport so pre-v0.5 entries (no Transport on
// disk) match the "sdk" value. Empty disables the filter.
Transport string `json:"transport,omitempty"`
}
QueryFilter specifies criteria for querying audit logs.
Offset is applied after filtering but before the Limit is reached: the first Offset matching records are discarded, then up to Limit records are collected. A Limit of 0 means "no cap" (compat with v0.4.0).
type RotationConfig ¶ added in v0.5.0
type RotationConfig struct {
// MaxSize is the live-file size threshold in bytes. 0 disables rotation.
MaxSize int64
// MaxFiles caps the number of archived files kept after pruning. Older
// archives (by timestamp suffix) are deleted first. 0 means keep all.
MaxFiles int
// MaxAge bounds the age of archived files. Archives older than this
// are deleted at rotation time. Zero disables age-based pruning. Age
// is computed against the timestamp suffix on the archive name, not
// against filesystem mtime, so an out-of-band rsync of archives does
// not silently invalidate retention.
MaxAge time.Duration
// Compress, when true, gzips the archive file after rename and removes
// the uncompressed copy. A gzip failure leaves the uncompressed archive
// in place — pruning still counts it.
Compress bool
}
RotationConfig configures size-triggered rotation for FileLogger.
The zero value disables rotation entirely — v0.4.0 behaviour.
When MaxSize > 0, after every successful Log() the current file size is checked and rotateLocked() fires if the file meets or exceeds MaxSize. Rotation is synchronous; the caller's Log() blocks until the rename + new-file + header write is done. Expected cost: one stat + one rename + one file open + ~80 bytes encode. At default 100 MB thresholds this is milliseconds every few hundred thousand entries.
type SQLiteLogger ¶
type SQLiteLogger struct {
// contains filtered or unexported fields
}
SQLiteLogger stores audit entries in a SQLite database for efficient querying. Requires a "database/sql" driver for SQLite to be registered (e.g., modernc.org/sqlite).
func NewSQLiteLogger ¶
func NewSQLiteLogger(dbPath string) (*SQLiteLogger, error)
NewSQLiteLogger opens (or creates) a SQLite database at the given path and initializes the audit_entries table and indexes.
Before calling this, register a SQLite driver. For example:
import _ "modernc.org/sqlite"
Then call:
logger, err := audit.NewSQLiteLogger("audit.db")
func (*SQLiteLogger) Close ¶
func (l *SQLiteLogger) Close() error
Close closes the underlying database connection.
func (*SQLiteLogger) Log ¶
func (l *SQLiteLogger) Log(entry Entry) error
Log writes an audit entry to the database.
func (*SQLiteLogger) Query ¶
func (l *SQLiteLogger) Query(filter QueryFilter) ([]Entry, error)
Query returns audit entries matching the given filter. Unlike FileLogger.Query, this uses indexed SQL queries instead of a full file scan — O(log n) per query.