Documentation
¶
Index ¶
- Constants
- Variables
- func CopyStruct(src, dst any) error
- func FormatLogs(logs []TaskLogEntry) string
- func GenerateID(prefix string) string
- func GenerateRandomID(length int) string
- func GenerateSessionID() string
- func GenerateWorkerID() string
- func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
- func InsecureCredentials() credentials.TransportCredentials
- func NeedsTLS(addr string) bool
- func RedactSensitiveMap(payload map[string]any) map[string]any
- func RedactSensitiveMaps(items []map[string]any) []map[string]any
- func RedactSensitiveString(raw string) string
- func RedactSensitiveValue(value any) any
- func TLSCredentials() credentials.TransportCredentials
- func ToSlice(v interface{}) []interface{}
- func ToStruct(m map[string]string, out interface{}) error
- func TransportCredentials(addr string) credentials.TransportCredentials
- func WithClientName(name string) func(*redis.UniversalOptions)
- type Config
- type ConfigFormat
- type ConfigLoaderFunc
- type ConfigManager
- type ContainerOverlay
- type Event
- type EventBus
- type EventEmitter
- type EventStream
- type EventType
- type ImageRegistry
- type LocalEventEmitter
- type MongoClient
- type ObjectStore
- type OverlayLayer
- type ParserFunc
- type ReadRecord
- type RedisClient
- func (r *RedisClient) Keys(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd
- func (r *RedisClient) Scan(ctx context.Context, pattern string) ([]string, error)
- func (r *RedisClient) Subscribe(ctx context.Context, channels ...string) (<-chan *redis.Message, <-chan error)
- func (r *RedisClient) ToSlice(v interface{}) []interface{}
- func (r *RedisClient) ToStruct(m map[string]string, out interface{}) error
- type RedisLock
- type RedisLockOption
- type RedisLockOptions
- type RunEventEntry
- type S2Client
- func (c *S2Client) Append(ctx context.Context, stream string, data interface{}) error
- func (c *S2Client) AppendLog(ctx context.Context, taskID, stream, data string) error
- func (c *S2Client) AppendRunEvent(ctx context.Context, runID, eventType string, payload map[string]any) error
- func (c *S2Client) AppendStatus(ctx context.Context, taskID, status string, exitCode *int, errorMsg string) error
- func (c *S2Client) Enabled() bool
- func (c *S2Client) ListStreams(ctx context.Context, prefix string) ([]StreamInfo, error)
- func (c *S2Client) Read(ctx context.Context, stream string, seqNum int64, count int) ([]ReadRecord, error)
- func (c *S2Client) ReadLogs(ctx context.Context, taskID string, seqNum int64) ([]TaskLogEntry, int64, error)
- type S2Config
- type S3Store
- type StreamInfo
- type StreamNames
- func (StreamNames) ChannelConversation(agentID, senderHash string) string
- func (StreamNames) RunEvents(runID string) string
- func (StreamNames) SkillDraft(draftID string) string
- func (StreamNames) SkillDraftIndex(workspaceID string) string
- func (StreamNames) TaskLogs(taskID string) string
- func (StreamNames) TaskStatus(taskID string) string
- func (StreamNames) ViewContext(viewID string) string
- func (StreamNames) ViewDraft(draftID string) string
- func (StreamNames) ViewDraftIndex(workspaceID string) string
- type TaskLogEntry
- type TaskStatusEntry
Constants ¶
const (
EventBusChannel = "airstore:events"
)
const (
// FileExtension is the extension used for CLIP archive files
FileExtension = "clip"
)
Variables ¶
var ( ErrNilMessage = errors.New("redis: nil message") ErrChannelClosed = errors.New("redis: channel closed") ErrConnectionIssue = errors.New("redis: connection issue") ErrUnknownRedisMode = errors.New("redis: unknown mode") )
var Keys = &redisKeys{}
Keys is the singleton accessor for all Redis key patterns.
var Streams = StreamNames{}
Streams provides access to stream names
Functions ¶
func CopyStruct ¶
func FormatLogs ¶ added in v0.1.18
func FormatLogs(logs []TaskLogEntry) string
FormatLogs converts log entries to plain text (one line per entry). Used by filesystem and CLI for consistent output formatting.
func GenerateID ¶
GenerateID generates a unique ID with the given prefix. Format: prefix-timestamp-random
func GenerateRandomID ¶
GenerateRandomID generates a random ID of the specified length.
func GenerateSessionID ¶
func GenerateSessionID() string
GenerateSessionID generates a unique session ID.
func GenerateWorkerID ¶
func GenerateWorkerID() string
GenerateWorkerID generates a unique worker ID.
func GetConfigParser ¶
func GetConfigParser(format ConfigFormat) (koanf.Parser, error)
func InsecureCredentials ¶ added in v0.1.10
func InsecureCredentials() credentials.TransportCredentials
InsecureCredentials returns plaintext credentials (no TLS). Use this when the --insecure flag is explicitly set.
func NeedsTLS ¶ added in v0.1.10
NeedsTLS determines if TLS should be used for the given address.
The logic follows grpcurl's convention:
- Default to TLS for external/production addresses
- Use plaintext only for explicitly local addresses
TLS is disabled (plaintext) for:
- localhost, 127.0.0.1, ::1, *.local, *.localhost
- Kubernetes internal addresses (*.svc, *.svc.cluster.local)
TLS is enabled for everything else (production default).
func RedactSensitiveMap ¶ added in v0.1.71
RedactSensitiveMap clones and redacts a JSON-like map.
func RedactSensitiveMaps ¶ added in v0.1.71
RedactSensitiveMaps clones and redacts a list of JSON-like maps.
func RedactSensitiveString ¶ added in v0.1.71
RedactSensitiveString masks likely secrets in plain text payloads.
func RedactSensitiveValue ¶ added in v0.1.71
RedactSensitiveValue walks a nested value and masks secret-like content.
func TLSCredentials ¶ added in v0.1.10
func TLSCredentials() credentials.TransportCredentials
TLSCredentials returns TLS credentials with system CAs.
func ToSlice ¶
func ToSlice(v interface{}) []interface{}
ToSlice flattens a struct using its field tags so it can be used by HSet. Struct fields must have the redis tag on them otherwise they will be ignored.
func ToStruct ¶
ToStruct copies the result of HGetAll to a provided struct. If a field cannot be parsed, we use Go's default value. Struct fields must have the redis tag on them otherwise they will be ignored.
func TransportCredentials ¶ added in v0.1.10
func TransportCredentials(addr string) credentials.TransportCredentials
TransportCredentials returns appropriate gRPC credentials for the address. Uses TLS by default, plaintext only for local/internal addresses.
func WithClientName ¶
func WithClientName(name string) func(*redis.UniversalOptions)
Types ¶
type Config ¶ added in v0.1.18
type Config struct {
Bucket string
Region string
Endpoint string
AccessKey string
SecretKey string
ForcePathStyle bool
}
Config holds the configuration for the image registry.
type ConfigFormat ¶
type ConfigFormat string
var ( JSONConfigFormat ConfigFormat = ".json" YAMLConfigFormat ConfigFormat = ".yaml" YMLConfigFormat ConfigFormat = ".yml" )
type ConfigLoaderFunc ¶
ConfigLoaderFunc is a function type used to load configuration into a Koanf instance. It takes a Koanf pointer 'k' as a parameter and returns an error if the loading process encounters any issues.
type ConfigManager ¶
type ConfigManager[T any] struct { // contains filtered or unexported fields }
ConfigManager is a generic configuration manager that allows handling and manipulation of configuration data for various types. It includes a Koanf instance ('kf') for managing configuration settings.
func NewConfigManager ¶
func NewConfigManager[T any]() (*ConfigManager[T], error)
NewConfigManager creates a new instance of the ConfigManager[T]. It initializes a Koanf instance and loads the default configuration. It then loads the configuration from the /etc/airstore.d/ directory and the user-specified configuration file from CONFIG_PATH. If the CONFIG_JSON environment variable is set, it loads the configuration from the JSON string. If debug mode is enabled, it prints the current configuration.
func (*ConfigManager[T]) GetConfig ¶
func (cm *ConfigManager[T]) GetConfig() T
GetConfig retrieves the current configuration of type 'T' from the ConfigManager. It unmarshals the configuration data and returns it. If any errors occur during unmarshaling, it logs a fatal error and exits the application.
func (*ConfigManager[T]) LoadConfig ¶
func (cm *ConfigManager[T]) LoadConfig(format ConfigFormat, provider koanf.Provider) error
LoadConfig loads configuration data from a given provider in the specified format into the ConfigManager. It obtains a parser for the format, and then loads the configuration data. If any errors occur during the loading process, they are returned as an error.
func (*ConfigManager[T]) Print ¶
func (cm *ConfigManager[T]) Print() string
Print returns a string representation of the current configuration state.
type ContainerOverlay ¶
type ContainerOverlay struct {
// contains filtered or unexported fields
}
ContainerOverlay manages overlay filesystem layers for containers This creates a writable layer on top of a read-only base (e.g., CLIP FUSE mount)
func NewContainerOverlay ¶
func NewContainerOverlay(sandboxID, rootPath, overlayPath string) *ContainerOverlay
NewContainerOverlay creates a new ContainerOverlay rootPath: the base rootfs (e.g., CLIP FUSE mount) overlayPath: where to store overlay layer data
func (*ContainerOverlay) AddEmptyLayer ¶
func (co *ContainerOverlay) AddEmptyLayer() error
AddEmptyLayer adds an empty writable layer on top of the current stack
func (*ContainerOverlay) AddLayer ¶
func (co *ContainerOverlay) AddLayer(upperDir string) error
AddLayer adds a pre-populated upper layer on top of the current stack
func (*ContainerOverlay) Cleanup ¶
func (co *ContainerOverlay) Cleanup() error
Cleanup unmounts all layers and removes overlay directories
func (*ContainerOverlay) Setup ¶
func (co *ContainerOverlay) Setup() error
Setup creates the initial overlay layer on top of the base rootfs
func (*ContainerOverlay) TopLayerPath ¶
func (co *ContainerOverlay) TopLayerPath() string
TopLayerPath returns the merged path of the top overlay layer This is the path that should be used as the container rootfs
func (*ContainerOverlay) UpperLayerPath ¶
func (co *ContainerOverlay) UpperLayerPath() string
UpperLayerPath returns the upper (writable) path of the top layer
type EventBus ¶ added in v0.1.14
type EventBus struct {
// contains filtered or unexported fields
}
func NewEventBus ¶ added in v0.1.14
func NewEventBus(ctx context.Context, rdb *RedisClient) *EventBus
type EventEmitter ¶ added in v0.1.28
EventEmitter is the interface for emitting hook events. Implemented by EventStream (Redis) and LocalEventEmitter (in-process).
type EventStream ¶ added in v0.1.28
type EventStream struct {
// contains filtered or unexported fields
}
EventStream provides reliable, exactly-once event delivery using Redis Streams. Unlike EventBus (pub/sub, fire-and-forget to all replicas), EventStream uses consumer groups so each event is processed by exactly one consumer.
Includes a reclaim loop that rescues events stuck in pending state (e.g., from a crashed consumer) using XPENDING + XCLAIM.
func NewEventStream ¶ added in v0.1.28
func NewEventStream(rdb *RedisClient, stream, group, consumer string) *EventStream
NewEventStream creates a stream producer/consumer. stream: the Redis Stream key (e.g., common.Keys.HookStream()) group: consumer group name (same across all replicas) consumer: unique per replica (e.g., hostname)
func (*EventStream) Consume ¶ added in v0.1.28
Consume reads events in a loop. Each event is delivered to exactly one consumer in the group. Blocks when idle. Acknowledges after handler returns without error. Also runs a periodic reclaim loop to rescue events stuck in pending state. Run this in a goroutine per gateway replica.
type EventType ¶ added in v0.1.14
type EventType string
const (
EventCacheInvalidate EventType = "cache.invalidate"
)
type ImageRegistry ¶ added in v0.1.18
type ImageRegistry struct {
// contains filtered or unexported fields
}
ImageRegistry manages CLIP archive storage in S3.
func NewImageRegistry ¶ added in v0.1.18
func NewImageRegistry(cfg Config) (*ImageRegistry, error)
NewImageRegistry creates a new image registry with S3 storage.
func (*ImageRegistry) Exists ¶ added in v0.1.18
Exists checks if a CLIP archive exists in the registry.
type LocalEventEmitter ¶ added in v0.1.28
type LocalEventEmitter struct {
// contains filtered or unexported fields
}
LocalEventEmitter calls the handler directly in-process. No Redis required. Used in local mode where there's only one gateway instance.
func NewLocalEventEmitter ¶ added in v0.1.28
func NewLocalEventEmitter() *LocalEventEmitter
func (*LocalEventEmitter) SetHandler ¶ added in v0.1.28
func (e *LocalEventEmitter) SetHandler(handler func(id string, data map[string]any))
SetHandler sets the function called on each Emit. Must be called before Emit.
type MongoClient ¶ added in v0.1.103
type MongoClient struct {
// contains filtered or unexported fields
}
func NewMongoClient ¶ added in v0.1.103
func NewMongoClient(cfg types.MongoConfig) (*MongoClient, error)
func (*MongoClient) Close ¶ added in v0.1.103
func (m *MongoClient) Close(ctx context.Context) error
func (*MongoClient) Collection ¶ added in v0.1.103
func (m *MongoClient) Collection(name string) *mongo.Collection
type ObjectStore ¶ added in v0.1.18
type ObjectStore interface {
Put(ctx context.Context, localPath, key string) error
Get(ctx context.Context, key, localPath string) error
Exists(ctx context.Context, key string) (bool, error)
}
ObjectStore defines the interface for object storage backends.
type OverlayLayer ¶
type OverlayLayer struct {
// contains filtered or unexported fields
}
OverlayLayer represents a single overlay layer
type ParserFunc ¶
type ReadRecord ¶ added in v0.1.18
type ReadRecord struct {
SeqNum int64 `json:"seq_num"`
Timestamp int64 `json:"timestamp"`
Body string `json:"body"` // S2 returns body as a JSON-encoded string
}
ReadRecord represents a record read from S2
type RedisClient ¶
type RedisClient struct {
redis.UniversalClient
}
func NewRedisClient ¶
func NewRedisClient(config types.RedisConfig, options ...func(*redis.UniversalOptions)) (*RedisClient, error)
func (*RedisClient) Keys ¶
Keys gets all keys using a pattern. Actually runs a scan since keys locks up the database.
func (*RedisClient) ToSlice ¶
func (r *RedisClient) ToSlice(v interface{}) []interface{}
type RedisLock ¶
type RedisLock struct {
// contains filtered or unexported fields
}
func NewRedisLock ¶
func NewRedisLock(client *RedisClient, opts ...RedisLockOption) *RedisLock
type RedisLockOption ¶
type RedisLockOption func(*RedisLock)
type RedisLockOptions ¶
type RunEventEntry ¶ added in v0.1.60
type RunEventEntry struct {
RunID string `json:"run_id"`
EventType string `json:"event_type"`
Timestamp int64 `json:"timestamp"`
Payload map[string]any `json:"payload,omitempty"`
}
RunEventEntry represents a lifecycle event emitted by the orchestration engine.
type S2Client ¶ added in v0.1.18
type S2Client struct {
// contains filtered or unexported fields
}
S2Client provides access to S2 streams for append-only log storage
func NewS2Client ¶ added in v0.1.18
NewS2Client creates a new S2 stream client
func (*S2Client) AppendLog ¶ added in v0.1.18
AppendLog is a convenience method for appending a log entry
func (*S2Client) AppendRunEvent ¶ added in v0.1.60
func (c *S2Client) AppendRunEvent(ctx context.Context, runID, eventType string, payload map[string]any) error
AppendRunEvent appends a run event entry to the run event stream.
func (*S2Client) AppendStatus ¶ added in v0.1.18
func (c *S2Client) AppendStatus(ctx context.Context, taskID, status string, exitCode *int, errorMsg string) error
AppendStatus is a convenience method for appending a status entry
func (*S2Client) ListStreams ¶ added in v0.1.48
ListStreams lists streams whose names begin with the given prefix.
func (*S2Client) Read ¶ added in v0.1.18
func (c *S2Client) Read(ctx context.Context, stream string, seqNum int64, count int) ([]ReadRecord, error)
Read reads records from a stream
func (*S2Client) ReadLogs ¶ added in v0.1.18
func (c *S2Client) ReadLogs(ctx context.Context, taskID string, seqNum int64) ([]TaskLogEntry, int64, error)
ReadLogs reads log entries for a task. Returns the logs, the next sequence number for pagination, and any error. Pass nextSeqNum to subsequent calls to fetch logs beyond the first page.
type S2Config ¶ added in v0.1.18
type S2Config struct {
// Token is the S2 API token
Token string
// Basin is the S2 basin name (e.g., "airstore")
Basin string
// Timeout for HTTP requests
Timeout time.Duration
}
S2Config configures the S2 stream client
type S3Store ¶ added in v0.1.18
type S3Store struct {
// contains filtered or unexported fields
}
S3Store implements ObjectStore using AWS S3.
func NewS3Store ¶ added in v0.1.18
NewS3Store creates a new S3-backed object store.
type StreamInfo ¶ added in v0.1.48
type StreamInfo struct {
Name string `json:"name"`
}
StreamInfo represents a stream returned by the S2 list streams API.
type StreamNames ¶ added in v0.1.18
type StreamNames struct{}
StreamNames provides consistent stream naming
func (StreamNames) ChannelConversation ¶ added in v0.1.96
func (StreamNames) ChannelConversation(agentID, senderHash string) string
ChannelConversation returns the stream name for a channel conversation between an agent and a sender.
func (StreamNames) RunEvents ¶ added in v0.1.60
func (StreamNames) RunEvents(runID string) string
RunEvents returns the stream name for orchestration run events.
func (StreamNames) SkillDraft ¶ added in v0.1.97
func (StreamNames) SkillDraft(draftID string) string
SkillDraft returns the stream name for a skill draft conversation.
func (StreamNames) SkillDraftIndex ¶ added in v0.1.97
func (StreamNames) SkillDraftIndex(workspaceID string) string
SkillDraftIndex returns the stream name for a workspace's skill draft index.
func (StreamNames) TaskLogs ¶ added in v0.1.18
func (StreamNames) TaskLogs(taskID string) string
TaskLogs returns the stream name for a task's logs
func (StreamNames) TaskStatus ¶ added in v0.1.18
func (StreamNames) TaskStatus(taskID string) string
TaskStatus returns the stream name for a task's status events
func (StreamNames) ViewContext ¶ added in v0.1.131
func (StreamNames) ViewContext(viewID string) string
ViewContext returns the stream name for a view's persistent context.
func (StreamNames) ViewDraft ¶ added in v0.1.103
func (StreamNames) ViewDraft(draftID string) string
ViewDraft returns the stream name for a view draft conversation.
func (StreamNames) ViewDraftIndex ¶ added in v0.1.103
func (StreamNames) ViewDraftIndex(workspaceID string) string
ViewDraftIndex returns the stream name for a workspace's view draft index.
type TaskLogEntry ¶ added in v0.1.18
type TaskLogEntry struct {
TaskID string `json:"task_id"`
Timestamp int64 `json:"timestamp"`
SeqNum int64 `json:"seq_num,omitempty"`
EventID string `json:"event_id,omitempty"`
Stream string `json:"stream"` // "stdout" or "stderr"
Data string `json:"data"`
ChunkType string `json:"chunk_type,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
TaskLogEntry represents a log entry for a task
func RedactTaskLogEntries ¶ added in v0.1.71
func RedactTaskLogEntries(entries []TaskLogEntry) []TaskLogEntry
RedactTaskLogEntries clones and redacts a task log slice.
func RedactTaskLogEntry ¶ added in v0.1.71
func RedactTaskLogEntry(entry TaskLogEntry) TaskLogEntry
RedactTaskLogEntry clones a log entry with secret-like content redacted.
type TaskStatusEntry ¶ added in v0.1.18
type TaskStatusEntry struct {
TaskID string `json:"task_id"`
Timestamp int64 `json:"timestamp"`
Status string `json:"status"`
ExitCode *int `json:"exit_code,omitempty"`
Error string `json:"error,omitempty"`
}
TaskStatusEntry represents a status change for a task