Documentation
¶
Index ¶
- func AccessStreamName(sessionID string) string
- func AccessStreamPrefix() string
- func AccessWorkspaceStreamName(workspaceID, sessionID string) string
- func AccessWorkspaceStreamPrefix(workspaceID string) string
- func SessionIDFromStreamName(name string) string
- func SessionIDFromWorkspaceStreamName(name, workspaceID string) string
- type AccessEvent
- type AccessLogInterceptor
- type AccessRecorder
- type EventFlusher
- type NoopRecorder
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AccessStreamName ¶
AccessStreamName returns the S2 stream name for an access session. Legacy format: access.{session_id}.events
func AccessStreamPrefix ¶ added in v0.1.48
func AccessStreamPrefix() string
AccessStreamPrefix returns the prefix used for all access log streams. This is the global/legacy prefix ("access.").
func AccessWorkspaceStreamName ¶ added in v0.1.57
AccessWorkspaceStreamName returns a workspace-scoped access stream name. Format: access.{workspace_id}.{session_id}.events
If workspaceID is empty, it falls back to the legacy AccessStreamName format.
func AccessWorkspaceStreamPrefix ¶ added in v0.1.57
AccessWorkspaceStreamPrefix returns the stream-name prefix for a workspace. Format: access.{workspace_id}.
If workspaceID is empty, it falls back to AccessStreamPrefix().
func SessionIDFromStreamName ¶ added in v0.1.48
SessionIDFromStreamName extracts the session ID from a stream name of the form "access.{session_id}.events".
func SessionIDFromWorkspaceStreamName ¶ added in v0.1.57
SessionIDFromWorkspaceStreamName extracts a session ID for a specific workspace from stream names of the form "access.{workspace_id}.{session_id}.events". It returns an empty string when the stream does not belong to that workspace.
Types ¶
type AccessEvent ¶
type AccessEvent struct {
EventID string `json:"event_id,omitempty"`
Timestamp int64 `json:"ts"`
WorkspaceID string `json:"workspace_id"`
SessionID string `json:"session_id"`
Path string `json:"path"`
CacheSource string `json:"cache_source,omitempty"` // backend_rpc, open_content, content_cache, prefetch, dirty_buffer, etc.
Offset int64 `json:"offset,omitempty"`
RequestedBytes int `json:"requested_bytes,omitempty"`
ReadBytes int `json:"read_bytes,omitempty"`
LatencyMs int64 `json:"latency_ms,omitempty"`
MountID string `json:"mount_id,omitempty"`
AccessOrigin string `json:"access_origin,omitempty"` // "fuse" when emitted by mount collector
Integration string `json:"integration"`
SourceURI string `json:"source_uri"` // canonical upstream ref, e.g. "github://abc123" or "gmail://msg-id"
QueryPath string `json:"query_path"`
ResultID string `json:"result_id"`
OriginalBytes int `json:"original_bytes"`
CompressedBytes int `json:"compressed_bytes"`
OriginalTokens int `json:"original_tokens"`
CompressedTokens int `json:"compressed_tokens"`
Strategy string `json:"strategy"` // requested strategy
Outcome string `json:"outcome"` // "compressed", "cache_hit", "passthrough", "timeout", "error", "skipped"
CompressionMs int64 `json:"compression_ms"`
FetchMs int64 `json:"fetch_ms,omitempty"` // e2e content fetch duration (e.g. time to fetch from source during Open)
ErrorMsg string `json:"error_msg,omitempty"` // populated on outcome=error or timeout
}
AccessEvent records a single file access through the filesystem. Every read produces exactly one event regardless of compression outcome.
type AccessLogInterceptor ¶ added in v0.1.47
type AccessLogInterceptor struct {
// contains filtered or unexported fields
}
AccessLogInterceptor is a gRPC server-side unary interceptor that records access events for every successful Read RPC. It is intentionally generic: any service whose Read method accepts a request with a `path` field will be recorded. The interceptor only fires when the caller supplies the x-airstore-session gRPC metadata header.
This is the single instrumentation point for all file reads regardless of which service handles them (SourceService, ContextService, etc.).
func NewAccessLogInterceptor ¶ added in v0.1.47
func NewAccessLogInterceptor(recorder AccessRecorder) *AccessLogInterceptor
func (*AccessLogInterceptor) Unary ¶ added in v0.1.47
func (a *AccessLogInterceptor) Unary() grpc.UnaryServerInterceptor
Unary returns a grpc.UnaryServerInterceptor suitable for use with grpc.ChainUnaryInterceptor. It should be placed AFTER the auth interceptor so that auth context (workspace, member) is available.
type AccessRecorder ¶
type AccessRecorder interface {
Record(ctx context.Context, event AccessEvent) error // Record enqueues an access event for async delivery.
Flush() error // Flush drains any buffered events. Called on graceful shutdown.
}
AccessRecorder records file access events. Implementations must be safe for concurrent use and should be non-blocking (buffer internally).
type EventFlusher ¶
type EventFlusher struct {
// contains filtered or unexported fields
}
EventFlusher writes access events to an S2 stream asynchronously. Record() is non-blocking; events are buffered and flushed by a background goroutine.
func NewEventFlusher ¶
func NewEventFlusher(s2 *common.S2Client) *EventFlusher
NewEventFlusher creates and starts an S2-backed access recorder. Call Flush() on shutdown to drain buffered events.
func (*EventFlusher) Flush ¶
func (f *EventFlusher) Flush() error
Flush signals the background goroutine to stop and waits for it to drain remaining events. Safe to call multiple times.
func (*EventFlusher) Record ¶
func (f *EventFlusher) Record(_ context.Context, event AccessEvent) error
Record enqueues an event for async delivery. Non-blocking: if the buffer is full the event is dropped (logged as a warning).
type NoopRecorder ¶
type NoopRecorder struct{}
NoopRecorder discards all events. Used when instrumentation is disabled or in tests.
func NewNoopRecorder ¶
func NewNoopRecorder() *NoopRecorder
func (*NoopRecorder) Flush ¶
func (n *NoopRecorder) Flush() error
func (*NoopRecorder) Record ¶
func (n *NoopRecorder) Record(_ context.Context, _ AccessEvent) error