instrumentation

package
v0.1.49 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 14, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AccessStreamName

func AccessStreamName(sessionID string) string

AccessStreamName returns the S2 stream name for an access session.

func AccessStreamPrefix added in v0.1.48

func AccessStreamPrefix() string

AccessStreamPrefix returns the prefix used for all access log streams.

func SessionIDFromStreamName added in v0.1.48

func SessionIDFromStreamName(name string) string

SessionIDFromStreamName extracts the session ID from a stream name of the form "access.{session_id}.events".

Types

type AccessEvent

type AccessEvent struct {
	EventID          string `json:"event_id,omitempty"`
	Timestamp        int64  `json:"ts"`
	WorkspaceID      string `json:"workspace_id"`
	SessionID        string `json:"session_id"`
	Path             string `json:"path"`
	CacheSource      string `json:"cache_source,omitempty"` // backend_rpc, open_content, content_cache, prefetch, dirty_buffer, etc.
	Offset           int64  `json:"offset,omitempty"`
	RequestedBytes   int    `json:"requested_bytes,omitempty"`
	ReadBytes        int    `json:"read_bytes,omitempty"`
	LatencyMs        int64  `json:"latency_ms,omitempty"`
	MountID          string `json:"mount_id,omitempty"`
	AccessOrigin     string `json:"access_origin,omitempty"` // "fuse" when emitted by mount collector
	Integration      string `json:"integration"`
	SourceURI        string `json:"source_uri"` // canonical upstream ref, e.g. "github://abc123" or "gmail://msg-id"
	QueryPath        string `json:"query_path"`
	ResultID         string `json:"result_id"`
	OriginalBytes    int    `json:"original_bytes"`
	CompressedBytes  int    `json:"compressed_bytes"`
	OriginalTokens   int    `json:"original_tokens"`
	CompressedTokens int    `json:"compressed_tokens"`
	Strategy         string `json:"strategy"` // requested strategy
	Outcome          string `json:"outcome"`  // "compressed", "cache_hit", "passthrough", "timeout", "error", "skipped"
	CompressionMs    int64  `json:"compression_ms"`
	FetchMs          int64  `json:"fetch_ms,omitempty"`  // e2e content fetch duration (e.g. time to fetch from source during Open)
	ErrorMsg         string `json:"error_msg,omitempty"` // populated on outcome=error or timeout
}

AccessEvent records a single file access through the filesystem. Every read produces exactly one event regardless of compression outcome.

type AccessLogInterceptor added in v0.1.47

type AccessLogInterceptor struct {
	// contains filtered or unexported fields
}

AccessLogInterceptor is a gRPC server-side unary interceptor that records access events for every successful Read RPC. It is intentionally generic: any service whose Read method accepts a request with a `path` field will be recorded. The interceptor only fires when the caller supplies the x-airstore-session gRPC metadata header.

This is the single instrumentation point for all file reads regardless of which service handles them (SourceService, ContextService, etc.).

func NewAccessLogInterceptor added in v0.1.47

func NewAccessLogInterceptor(recorder AccessRecorder) *AccessLogInterceptor

func (*AccessLogInterceptor) Unary added in v0.1.47

Unary returns a grpc.UnaryServerInterceptor suitable for use with grpc.ChainUnaryInterceptor. It should be placed AFTER the auth interceptor so that auth context (workspace, member) is available.

type AccessRecorder

type AccessRecorder interface {
	Record(ctx context.Context, event AccessEvent) error // Record enqueues an access event for async delivery.
	Flush() error                                        // Flush drains any buffered events. Called on graceful shutdown.
}

AccessRecorder records file access events. Implementations must be safe for concurrent use and should be non-blocking (buffer internally).

type EventFlusher

type EventFlusher struct {
	// contains filtered or unexported fields
}

EventFlusher writes access events to an S2 stream asynchronously. Record() is non-blocking; events are buffered and flushed by a background goroutine.

func NewEventFlusher

func NewEventFlusher(s2 *common.S2Client) *EventFlusher

NewEventFlusher creates and starts an S2-backed access recorder. Call Flush() on shutdown to drain buffered events.

func (*EventFlusher) Flush

func (f *EventFlusher) Flush() error

Flush signals the background goroutine to stop and waits for it to drain remaining events. Safe to call multiple times.

func (*EventFlusher) Record

func (f *EventFlusher) Record(_ context.Context, event AccessEvent) error

Record enqueues an event for async delivery. Non-blocking: if the buffer is full the event is dropped (logged as a warning).

type NoopRecorder

type NoopRecorder struct{}

NoopRecorder discards all events. Used when instrumentation is disabled or in tests.

func NewNoopRecorder

func NewNoopRecorder() *NoopRecorder

func (*NoopRecorder) Flush

func (n *NoopRecorder) Flush() error

func (*NoopRecorder) Record

func (n *NoopRecorder) Record(_ context.Context, _ AccessEvent) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL