common

package
v0.1.147 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 7, 2026 License: AGPL-3.0, AGPL-3.0-or-later Imports: 44 Imported by: 0

Documentation

Index

Constants

View Source
const (
	EventBusChannel = "airstore:events"
)
View Source
const (
	// FileExtension is the extension used for CLIP archive files
	FileExtension = "clip"
)

Variables

View Source
var (
	ErrNilMessage       = errors.New("redis: nil message")
	ErrChannelClosed    = errors.New("redis: channel closed")
	ErrConnectionIssue  = errors.New("redis: connection issue")
	ErrUnknownRedisMode = errors.New("redis: unknown mode")
)
View Source
var Keys = &redisKeys{}

Keys is the singleton accessor for all Redis key patterns.

View Source
var Streams = StreamNames{}

Streams provides access to stream names

Functions

func CopyStruct

func CopyStruct(src, dst any) error

func FormatLogs added in v0.1.18

func FormatLogs(logs []TaskLogEntry) string

FormatLogs converts log entries to plain text (one line per entry). Used by filesystem and CLI for consistent output formatting.

func GenerateID

func GenerateID(prefix string) string

GenerateID generates a unique ID with the given prefix. Format: prefix-timestamp-random

func GenerateRandomID

func GenerateRandomID(length int) string

GenerateRandomID generates a random ID of the specified length.

func GenerateSessionID

func GenerateSessionID() string

GenerateSessionID generates a unique session ID.

func GenerateWorkerID

func GenerateWorkerID() string

GenerateWorkerID generates a unique worker ID.

func GetConfigParser

func GetConfigParser(format ConfigFormat) (koanf.Parser, error)

func InsecureCredentials added in v0.1.10

func InsecureCredentials() credentials.TransportCredentials

InsecureCredentials returns plaintext credentials (no TLS). Use this when the --insecure flag is explicitly set.

func NeedsTLS added in v0.1.10

func NeedsTLS(addr string) bool

NeedsTLS determines if TLS should be used for the given address.

The logic follows grpcurl's convention:

  • Default to TLS for external/production addresses
  • Use plaintext only for explicitly local addresses

TLS is disabled (plaintext) for:

  • localhost, 127.0.0.1, ::1, *.local, *.localhost
  • Kubernetes internal addresses (*.svc, *.svc.cluster.local)

TLS is enabled for everything else (production default).

func RedactSensitiveMap added in v0.1.71

func RedactSensitiveMap(payload map[string]any) map[string]any

RedactSensitiveMap clones and redacts a JSON-like map.

func RedactSensitiveMaps added in v0.1.71

func RedactSensitiveMaps(items []map[string]any) []map[string]any

RedactSensitiveMaps clones and redacts a list of JSON-like maps.

func RedactSensitiveString added in v0.1.71

func RedactSensitiveString(raw string) string

RedactSensitiveString masks likely secrets in plain text payloads.

func RedactSensitiveValue added in v0.1.71

func RedactSensitiveValue(value any) any

RedactSensitiveValue walks a nested value and masks secret-like content.

func TLSCredentials added in v0.1.10

func TLSCredentials() credentials.TransportCredentials

TLSCredentials returns TLS credentials with system CAs.

func ToSlice

func ToSlice(v interface{}) []interface{}

ToSlice flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.

func ToStruct

func ToStruct(m map[string]string, out interface{}) error

ToStruct copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.

func TransportCredentials added in v0.1.10

func TransportCredentials(addr string) credentials.TransportCredentials

TransportCredentials returns appropriate gRPC credentials for the address. Uses TLS by default, plaintext only for local/internal addresses.

func WithClientName

func WithClientName(name string) func(*redis.UniversalOptions)

Types

type Config added in v0.1.18

type Config struct {
	Bucket         string
	Region         string
	Endpoint       string
	AccessKey      string
	SecretKey      string
	ForcePathStyle bool
}

Config holds the configuration for the image registry.

type ConfigFormat

type ConfigFormat string
var (
	JSONConfigFormat ConfigFormat = ".json"
	YAMLConfigFormat ConfigFormat = ".yaml"
	YMLConfigFormat  ConfigFormat = ".yml"
)

type ConfigLoaderFunc

type ConfigLoaderFunc func(k *koanf.Koanf) error

ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.

type ConfigManager

type ConfigManager[T any] struct {
	// contains filtered or unexported fields
}

ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.

func NewConfigManager

func NewConfigManager[T any]() (*ConfigManager[T], error)

NewConfigManager creates a new instance of the ConfigManager[T]. It initializes a Koanf instance and loads the default configuration. It then loads the configuration from the /etc/airstore.d/ directory and the user-specified configuration file from CONFIG_PATH. If the CONFIG_JSON environment variable is set, it loads the configuration from the JSON string. If debug mode is enabled, it prints the current configuration.

func (*ConfigManager[T]) GetConfig

func (cm *ConfigManager[T]) GetConfig() T

GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.

func (*ConfigManager[T]) LoadConfig

func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error

LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.

func (*ConfigManager[T]) Print

func (cm *ConfigManager[T]) Print() string

Print returns a string representation of the current configuration state.

type ContainerOverlay

type ContainerOverlay struct {
	// contains filtered or unexported fields
}

ContainerOverlay manages overlay filesystem layers for containers This creates a writable layer on top of a read-only base (e.g., CLIP FUSE mount)

func NewContainerOverlay

func NewContainerOverlay(sandboxID, rootPath, overlayPath string) *ContainerOverlay

NewContainerOverlay creates a new ContainerOverlay rootPath: the base rootfs (e.g., CLIP FUSE mount) overlayPath: where to store overlay layer data

func (*ContainerOverlay) AddEmptyLayer

func (co *ContainerOverlay) AddEmptyLayer() error

AddEmptyLayer adds an empty writable layer on top of the current stack

func (*ContainerOverlay) AddLayer

func (co *ContainerOverlay) AddLayer(upperDir string) error

AddLayer adds a pre-populated upper layer on top of the current stack

func (*ContainerOverlay) Cleanup

func (co *ContainerOverlay) Cleanup() error

Cleanup unmounts all layers and removes overlay directories

func (*ContainerOverlay) Setup

func (co *ContainerOverlay) Setup() error

Setup creates the initial overlay layer on top of the base rootfs

func (*ContainerOverlay) TopLayerPath

func (co *ContainerOverlay) TopLayerPath() string

TopLayerPath returns the merged path of the top overlay layer This is the path that should be used as the container rootfs

func (*ContainerOverlay) UpperLayerPath

func (co *ContainerOverlay) UpperLayerPath() string

UpperLayerPath returns the upper (writable) path of the top layer

type Event added in v0.1.14

type Event struct {
	Type EventType      `json:"type"`
	Data map[string]any `json:"data,omitempty"`
}

type EventBus added in v0.1.14

type EventBus struct {
	// contains filtered or unexported fields
}

func NewEventBus added in v0.1.14

func NewEventBus(ctx context.Context, rdb *RedisClient) *EventBus

func (*EventBus) Emit added in v0.1.14

func (eb *EventBus) Emit(e Event)

func (*EventBus) On added in v0.1.14

func (eb *EventBus) On(t EventType, fn func(Event))

func (*EventBus) Start added in v0.1.14

func (eb *EventBus) Start()

type EventEmitter added in v0.1.28

type EventEmitter interface {
	Emit(ctx context.Context, data map[string]any) error
}

EventEmitter is the interface for emitting hook events. Implemented by EventStream (Redis) and LocalEventEmitter (in-process).

type EventStream added in v0.1.28

type EventStream struct {
	// contains filtered or unexported fields
}

EventStream provides reliable, exactly-once event delivery using Redis Streams. Unlike EventBus (pub/sub, fire-and-forget to all replicas), EventStream uses consumer groups so each event is processed by exactly one consumer.

Includes a reclaim loop that rescues events stuck in pending state (e.g., from a crashed consumer) using XPENDING + XCLAIM.

func NewEventStream added in v0.1.28

func NewEventStream(rdb *RedisClient, stream, group, consumer string) *EventStream

NewEventStream creates a stream producer/consumer. stream: the Redis Stream key (e.g., common.Keys.HookStream()) group: consumer group name (same across all replicas) consumer: unique per replica (e.g., hostname)

func (*EventStream) Consume added in v0.1.28

func (s *EventStream) Consume(ctx context.Context, handler func(id string, data map[string]any))

Consume reads events in a loop. Each event is delivered to exactly one consumer in the group. Blocks when idle. Acknowledges after handler returns without error. Also runs a periodic reclaim loop to rescue events stuck in pending state. Run this in a goroutine per gateway replica.

func (*EventStream) Emit added in v0.1.28

func (s *EventStream) Emit(ctx context.Context, data map[string]any) error

Emit appends an event to the stream. O(1). Non-blocking. Called by StorageService / SourceService on the hot path.

type EventType added in v0.1.14

type EventType string
const (
	EventCacheInvalidate EventType = "cache.invalidate"
)

type ImageRegistry added in v0.1.18

type ImageRegistry struct {
	// contains filtered or unexported fields
}

ImageRegistry manages CLIP archive storage in S3.

func NewImageRegistry added in v0.1.18

func NewImageRegistry(cfg Config) (*ImageRegistry, error)

NewImageRegistry creates a new image registry with S3 storage.

func (*ImageRegistry) Exists added in v0.1.18

func (r *ImageRegistry) Exists(ctx context.Context, imageID string) (bool, error)

Exists checks if a CLIP archive exists in the registry.

func (*ImageRegistry) Pull added in v0.1.18

func (r *ImageRegistry) Pull(ctx context.Context, localPath, imageID string) error

Pull downloads a CLIP archive from the registry.

func (*ImageRegistry) Push added in v0.1.18

func (r *ImageRegistry) Push(ctx context.Context, localPath, imageID string) error

Push uploads a CLIP archive to the registry.

type LocalEventEmitter added in v0.1.28

type LocalEventEmitter struct {
	// contains filtered or unexported fields
}

LocalEventEmitter calls the handler directly in-process. No Redis required. Used in local mode where there's only one gateway instance.

func NewLocalEventEmitter added in v0.1.28

func NewLocalEventEmitter() *LocalEventEmitter

func (*LocalEventEmitter) Emit added in v0.1.28

func (e *LocalEventEmitter) Emit(_ context.Context, data map[string]any) error

func (*LocalEventEmitter) SetHandler added in v0.1.28

func (e *LocalEventEmitter) SetHandler(handler func(id string, data map[string]any))

SetHandler sets the function called on each Emit. Must be called before Emit.

type MongoClient added in v0.1.103

type MongoClient struct {
	// contains filtered or unexported fields
}

func NewMongoClient added in v0.1.103

func NewMongoClient(cfg types.MongoConfig) (*MongoClient, error)

func (*MongoClient) Close added in v0.1.103

func (m *MongoClient) Close(ctx context.Context) error

func (*MongoClient) Collection added in v0.1.103

func (m *MongoClient) Collection(name string) *mongo.Collection

type ObjectStore added in v0.1.18

type ObjectStore interface {
	Put(ctx context.Context, localPath, key string) error
	Get(ctx context.Context, key, localPath string) error
	Exists(ctx context.Context, key string) (bool, error)
}

ObjectStore defines the interface for object storage backends.

type OverlayLayer

type OverlayLayer struct {
	// contains filtered or unexported fields
}

OverlayLayer represents a single overlay layer

type ParserFunc

type ParserFunc func() (koanf.Parser, error)

type ReadRecord added in v0.1.18

type ReadRecord struct {
	SeqNum    int64  `json:"seq_num"`
	Timestamp int64  `json:"timestamp"`
	Body      string `json:"body"` // S2 returns body as a JSON-encoded string
}

ReadRecord represents a record read from S2

type RedisClient

type RedisClient struct {
	redis.UniversalClient
}

func NewRedisClient

func NewRedisClient(config types.RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)

func (*RedisClient) Keys

func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)

Keys gets all keys using a pattern. Actually runs a scan since keys locks up the database.

func (*RedisClient) Publish

func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd

func (*RedisClient) Scan

func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)

func (*RedisClient) Subscribe

func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)

func (*RedisClient) ToSlice

func (r *RedisClient) ToSlice(v interface{}) []interface{}

func (*RedisClient) ToStruct

func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error

type RedisLock

type RedisLock struct {
	// contains filtered or unexported fields
}

func NewRedisLock

func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock

func (*RedisLock) Acquire

func (l *RedisLock) Acquire(ctx context.Context, key string, opts RedisLockOptions) error

func (*RedisLock) Release

func (l *RedisLock) Release(key string) error

type RedisLockOption

type RedisLockOption func(*RedisLock)

type RedisLockOptions

type RedisLockOptions struct {
	TtlS    int
	Retries int
}

type RunEventEntry added in v0.1.60

type RunEventEntry struct {
	RunID     string         `json:"run_id"`
	EventType string         `json:"event_type"`
	Timestamp int64          `json:"timestamp"`
	Payload   map[string]any `json:"payload,omitempty"`
}

RunEventEntry represents a lifecycle event emitted by the orchestration engine.

type S2Client added in v0.1.18

type S2Client struct {
	// contains filtered or unexported fields
}

S2Client provides access to S2 streams for append-only log storage

func NewS2Client added in v0.1.18

func NewS2Client(config S2Config) *S2Client

NewS2Client creates a new S2 stream client

func (*S2Client) Append added in v0.1.18

func (c *S2Client) Append(ctx context.Context, stream string, data interface{}) error

Append appends a record to a stream

func (*S2Client) AppendLog added in v0.1.18

func (c *S2Client) AppendLog(ctx context.Context, taskID, stream, data string) error

AppendLog is a convenience method for appending a log entry

func (*S2Client) AppendRunEvent added in v0.1.60

func (c *S2Client) AppendRunEvent(ctx context.Context, runID, eventType string, payload map[string]any) error

AppendRunEvent appends a run event entry to the run event stream.

func (*S2Client) AppendStatus added in v0.1.18

func (c *S2Client) AppendStatus(ctx context.Context, taskID, status string, exitCode *int, errorMsg string) error

AppendStatus is a convenience method for appending a status entry

func (*S2Client) Enabled added in v0.1.18

func (c *S2Client) Enabled() bool

Enabled returns true if the S2 client is configured

func (*S2Client) ListStreams added in v0.1.48

func (c *S2Client) ListStreams(ctx context.Context, prefix string) ([]StreamInfo, error)

ListStreams lists streams whose names begin with the given prefix.

func (*S2Client) Read added in v0.1.18

func (c *S2Client) Read(ctx context.Context, stream string, seqNum int64, count int) ([]ReadRecord, error)

Read reads records from a stream

func (*S2Client) ReadLogs added in v0.1.18

func (c *S2Client) ReadLogs(ctx context.Context, taskID string, seqNum int64) ([]TaskLogEntry, int64, error)

ReadLogs reads log entries for a task. Returns the logs, the next sequence number for pagination, and any error. Pass nextSeqNum to subsequent calls to fetch logs beyond the first page.

type S2Config added in v0.1.18

type S2Config struct {
	// Token is the S2 API token
	Token string

	// Basin is the S2 basin name (e.g., "airstore")
	Basin string

	// Timeout for HTTP requests
	Timeout time.Duration
}

S2Config configures the S2 stream client

type S3Store added in v0.1.18

type S3Store struct {
	// contains filtered or unexported fields
}

S3Store implements ObjectStore using AWS S3.

func NewS3Store added in v0.1.18

func NewS3Store(cfg Config) (*S3Store, error)

NewS3Store creates a new S3-backed object store.

func (*S3Store) Exists added in v0.1.18

func (s *S3Store) Exists(ctx context.Context, key string) (bool, error)

Exists checks if an object exists in S3.

func (*S3Store) Get added in v0.1.18

func (s *S3Store) Get(ctx context.Context, key, localPath string) error

Get downloads a file from S3.

func (*S3Store) Put added in v0.1.18

func (s *S3Store) Put(ctx context.Context, localPath, key string) error

Put uploads a file to S3.

type StreamInfo added in v0.1.48

type StreamInfo struct {
	Name string `json:"name"`
}

StreamInfo represents a stream returned by the S2 list streams API.

type StreamNames added in v0.1.18

type StreamNames struct{}

StreamNames provides consistent stream naming

func (StreamNames) ChannelConversation added in v0.1.96

func (StreamNames) ChannelConversation(agentID, senderHash string) string

ChannelConversation returns the stream name for a channel conversation between an agent and a sender.

func (StreamNames) RunEvents added in v0.1.60

func (StreamNames) RunEvents(runID string) string

RunEvents returns the stream name for orchestration run events.

func (StreamNames) SkillDraft added in v0.1.97

func (StreamNames) SkillDraft(draftID string) string

SkillDraft returns the stream name for a skill draft conversation.

func (StreamNames) SkillDraftIndex added in v0.1.97

func (StreamNames) SkillDraftIndex(workspaceID string) string

SkillDraftIndex returns the stream name for a workspace's skill draft index.

func (StreamNames) TaskLogs added in v0.1.18

func (StreamNames) TaskLogs(taskID string) string

TaskLogs returns the stream name for a task's logs

func (StreamNames) TaskStatus added in v0.1.18

func (StreamNames) TaskStatus(taskID string) string

TaskStatus returns the stream name for a task's status events

func (StreamNames) ViewContext added in v0.1.131

func (StreamNames) ViewContext(viewID string) string

ViewContext returns the stream name for a view's persistent context.

func (StreamNames) ViewDraft added in v0.1.103

func (StreamNames) ViewDraft(draftID string) string

ViewDraft returns the stream name for a view draft conversation.

func (StreamNames) ViewDraftIndex added in v0.1.103

func (StreamNames) ViewDraftIndex(workspaceID string) string

ViewDraftIndex returns the stream name for a workspace's view draft index.

type TaskLogEntry added in v0.1.18

type TaskLogEntry struct {
	TaskID    string         `json:"task_id"`
	Timestamp int64          `json:"timestamp"`
	SeqNum    int64          `json:"seq_num,omitempty"`
	EventID   string         `json:"event_id,omitempty"`
	Stream    string         `json:"stream"` // "stdout" or "stderr"
	Data      string         `json:"data"`
	ChunkType string         `json:"chunk_type,omitempty"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

TaskLogEntry represents a log entry for a task

func RedactTaskLogEntries added in v0.1.71

func RedactTaskLogEntries(entries []TaskLogEntry) []TaskLogEntry

RedactTaskLogEntries clones and redacts a task log slice.

func RedactTaskLogEntry added in v0.1.71

func RedactTaskLogEntry(entry TaskLogEntry) TaskLogEntry

RedactTaskLogEntry clones a log entry with secret-like content redacted.

type TaskStatusEntry added in v0.1.18

type TaskStatusEntry struct {
	TaskID    string `json:"task_id"`
	Timestamp int64  `json:"timestamp"`
	Status    string `json:"status"`
	ExitCode  *int   `json:"exit_code,omitempty"`
	Error     string `json:"error,omitempty"`
}

TaskStatusEntry represents a status change for a task

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL