Documentation
¶
Overview ¶
Package infra provides unified infrastructure clients for Commerce.
This package integrates various backend services:
- Vector: Embeddings and semantic search (Qdrant)
- KV: Key-value cache and sessions (KV_URL, Redis-compatible)
- Storage: Object storage for assets (S3_URL, S3-compatible)
- Search: Full-text search (Meilisearch)
- PubSub: Event streaming (NATS)
- Tasks: Workflow orchestration (Temporal)
Architecture:
+------------------------------------------------------------------+ | Infrastructure Manager | +------------------------------------------------------------------+ | Vector | KV | Storage | Search | PubSub | Tasks | +------------------------------------------------------------------+
Package infra provides infrastructure clients.
This file implements the KV client (KV_URL, Redis-compatible) for caching and session storage.
Package infra provides infrastructure clients.
This file implements distributed locking backed by Redis/Valkey. Locks use atomic SET NX with TTL for acquisition and Lua scripts for safe compare-and-delete release and compare-and-extend renewal.
Package infra provides infrastructure clients.
This file implements the NATS client for pub/sub messaging and order events.
Package infra provides infrastructure clients.
This file implements the Meilisearch client for full-text product search.
Package infra provides infrastructure clients.
This file implements the S3-compatible storage client (S3_URL) for product images and digital assets.
Package infra provides infrastructure clients.
This file implements the Temporal client for workflow orchestration and background task execution.
Package infra provides infrastructure clients.
This file implements the Qdrant vector database client using the REST API. No gRPC dependency -- plain HTTP + JSON over port 6333.
Package infra provides infrastructure clients.
This file implements the ZAP transport for inter-service vector operations. Other Hanzo services call commerce via ZAP (luxfi/zap) for vector ops. The Qdrant REST client remains the DB access layer -- ZAP wraps it for service-to-service communication.
Index ¶
- Constants
- Variables
- func ActivityOptions(taskQueue string, timeout time.Duration, retries int32) workflow.ActivityOptions
- func BuildZAPRequest(opcode uint16, payload []byte) *zap.Message
- func LocalActivityOptions(timeout time.Duration, retries int32) workflow.LocalActivityOptions
- type AckPolicy
- type Config
- type ConsumerConfig
- type DeliverPolicy
- type DocumentsResult
- type GetDocumentsOptions
- type HealthStatus
- type IndexSettings
- type IndexStats
- type KVClient
- func (c *KVClient) Client() *kv.Client
- func (c *KVClient) Close() error
- func (c *KVClient) Delete(ctx context.Context, keys ...string) error
- func (c *KVClient) Exists(ctx context.Context, keys ...string) (int64, error)
- func (c *KVClient) Expire(ctx context.Context, key string, ttl time.Duration) error
- func (c *KVClient) Get(ctx context.Context, key string) (string, error)
- func (c *KVClient) GetJSON(ctx context.Context, key string, dst interface{}) error
- func (c *KVClient) HDel(ctx context.Context, key string, fields ...string) error
- func (c *KVClient) HGet(ctx context.Context, key, field string) (string, error)
- func (c *KVClient) HGetAll(ctx context.Context, key string) (map[string]string, error)
- func (c *KVClient) HSet(ctx context.Context, key string, values ...interface{}) error
- func (c *KVClient) Health(ctx context.Context) HealthStatus
- func (c *KVClient) Incr(ctx context.Context, key string) (int64, error)
- func (c *KVClient) IncrBy(ctx context.Context, key string, value int64) (int64, error)
- func (c *KVClient) LLen(ctx context.Context, key string) (int64, error)
- func (c *KVClient) LPush(ctx context.Context, key string, values ...interface{}) error
- func (c *KVClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *KVClient) Pipeline() kv.Pipeliner
- func (c *KVClient) Publish(ctx context.Context, channel string, message interface{}) error
- func (c *KVClient) RPush(ctx context.Context, key string, values ...interface{}) error
- func (c *KVClient) SAdd(ctx context.Context, key string, members ...interface{}) error
- func (c *KVClient) SIsMember(ctx context.Context, key string, member interface{}) (bool, error)
- func (c *KVClient) SMembers(ctx context.Context, key string) ([]string, error)
- func (c *KVClient) SRem(ctx context.Context, key string, members ...interface{}) error
- func (c *KVClient) Set(ctx context.Context, key string, value string, ttl time.Duration) error
- func (c *KVClient) SetJSON(ctx context.Context, key string, value interface{}, ttl time.Duration) error
- func (c *KVClient) Subscribe(ctx context.Context, channels ...string) *kv.PubSub
- func (c *KVClient) TTL(ctx context.Context, key string) (time.Duration, error)
- func (c *KVClient) Watch(ctx context.Context, fn func(*kv.Tx) error, keys ...string) error
- func (c *KVClient) ZAdd(ctx context.Context, key string, members ...kv.Z) error
- func (c *KVClient) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)
- func (c *KVClient) ZRangeByScore(ctx context.Context, key string, opt *kv.ZRangeBy) ([]string, error)
- type KVConfig
- type Lock
- type Manager
- func (m *Manager) Acquire(ctx context.Context, key string, ttl time.Duration) (*Lock, error)
- func (m *Manager) Close() error
- func (m *Manager) Connect(ctx context.Context) error
- func (m *Manager) Health(ctx context.Context) map[string]HealthStatus
- func (m *Manager) KV() (*KVClient, error)
- func (m *Manager) PubSub() (*PubSubClient, error)
- func (m *Manager) Search() (*SearchClient, error)
- func (m *Manager) Storage() (*StorageClient, error)
- func (m *Manager) Tasks() (*TasksClient, error)
- func (m *Manager) Vector() (*VectorClient, error)
- type Message
- type MessageMetadata
- type ObjectInfo
- type PubAck
- type PubSubClient
- func (c *PubSubClient) Close() error
- func (c *PubSubClient) Conn() *nats.Conn
- func (c *PubSubClient) ConsumeMessages(ctx context.Context, stream, consumer string, ...) error
- func (c *PubSubClient) CreateConsumer(ctx context.Context, stream string, cfg *ConsumerConfig) (jetstream.Consumer, error)
- func (c *PubSubClient) EnsureStream(ctx context.Context, cfg *StreamConfig) error
- func (c *PubSubClient) Health(ctx context.Context) HealthStatus
- func (c *PubSubClient) JetStream() jetstream.JetStream
- func (c *PubSubClient) Publish(ctx context.Context, subject string, data []byte) error
- func (c *PubSubClient) PublishJSON(ctx context.Context, subject string, v interface{}) error
- func (c *PubSubClient) PublishJSONToStream(ctx context.Context, subject string, v interface{}) (*PubAck, error)
- func (c *PubSubClient) PublishToStream(ctx context.Context, subject string, data []byte) (*PubAck, error)
- func (c *PubSubClient) QueueSubscribe(subject, queue string, handler func(*Message)) (*Subscription, error)
- func (c *PubSubClient) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)
- func (c *PubSubClient) RequestJSON(ctx context.Context, subject string, req interface{}, resp interface{}, ...) error
- func (c *PubSubClient) Subscribe(subject string, handler func(*Message)) (*Subscription, error)
- type PubSubConfig
- type RetentionPolicy
- type RetryPolicy
- type RetryPolicyInternal
- type SearchClient
- func (c *SearchClient) Client() meilisearch.ServiceManager
- func (c *SearchClient) ConfigureIndex(ctx context.Context, uid string, settings *IndexSettings) error
- func (c *SearchClient) Delete(ctx context.Context, indexUID string, documentIDs []string) error
- func (c *SearchClient) DeleteAll(ctx context.Context, indexUID string) error
- func (c *SearchClient) EnsureIndex(ctx context.Context, uid string, primaryKey string) error
- func (c *SearchClient) GetDocument(ctx context.Context, indexUID, documentID string, dst interface{}) error
- func (c *SearchClient) GetDocuments(ctx context.Context, indexUID string, opts *GetDocumentsOptions) (*DocumentsResult, error)
- func (c *SearchClient) Health(ctx context.Context) HealthStatus
- func (c *SearchClient) Index(ctx context.Context, indexUID string, documents interface{}, ...) error
- func (c *SearchClient) Search(ctx context.Context, opts *SearchOptions) (*SearchResult, error)
- func (c *SearchClient) Stats(ctx context.Context, indexUID string) (*IndexStats, error)
- type SearchConfig
- type SearchOptions
- type SearchResult
- type StartWorkflowOptions
- type StorageClient
- func (c *StorageClient) Client() *minio.Client
- func (c *StorageClient) Copy(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string) error
- func (c *StorageClient) Delete(ctx context.Context, bucket, key string) error
- func (c *StorageClient) DeleteMany(ctx context.Context, bucket string, keys []string) error
- func (c *StorageClient) Download(ctx context.Context, bucket, key string) (io.ReadCloser, error)
- func (c *StorageClient) DownloadBytes(ctx context.Context, bucket, key string) ([]byte, error)
- func (c *StorageClient) EnsureBucket(ctx context.Context, bucket string) error
- func (c *StorageClient) Exists(ctx context.Context, bucket, key string) (bool, error)
- func (c *StorageClient) Health(ctx context.Context) HealthStatus
- func (c *StorageClient) List(ctx context.Context, bucket, prefix string, maxKeys int) ([]*ObjectInfo, error)
- func (c *StorageClient) PresignedGetURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)
- func (c *StorageClient) PresignedPutURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)
- func (c *StorageClient) PublicURL(bucket, key string) string
- func (c *StorageClient) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, error)
- func (c *StorageClient) Upload(ctx context.Context, opts *UploadOptions) (*UploadResult, error)
- func (c *StorageClient) UploadBytes(ctx context.Context, bucket, key string, data []byte, contentType string) (*UploadResult, error)
- type StorageConfig
- type StorageType
- type StreamConfig
- type StreamMessage
- type Subscription
- type TasksClient
- func (c *TasksClient) CancelWorkflow(ctx context.Context, workflowID, runID string) error
- func (c *TasksClient) Client() client.Client
- func (c *TasksClient) Close()
- func (c *TasksClient) GetWorkflow(ctx context.Context, workflowID, runID string) *WorkflowRun
- func (c *TasksClient) Health(ctx context.Context) HealthStatus
- func (c *TasksClient) NewWorker(taskQueue string, opts *WorkerOptions) (worker.Worker, error)
- func (c *TasksClient) QueryWorkflow(ctx context.Context, workflowID, runID, queryType string, args ...interface{}) (interface{}, error)
- func (c *TasksClient) SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, ...) (*WorkflowRun, error)
- func (c *TasksClient) SignalWorkflow(ctx context.Context, workflowID, runID, signalName string, arg interface{}) error
- func (c *TasksClient) StartAllWorkers() error
- func (c *TasksClient) StartWorker(taskQueue string) error
- func (c *TasksClient) StartWorkflow(ctx context.Context, opts *StartWorkflowOptions, workflowFunc interface{}, ...) (*WorkflowRun, error)
- func (c *TasksClient) TerminateWorkflow(ctx context.Context, workflowID, runID, reason string) error
- type TasksConfig
- type TasksTLSConfig
- type UploadOptions
- type UploadResult
- type VectorClient
- func (c *VectorClient) Close() error
- func (c *VectorClient) Delete(ctx context.Context, collection string, ids []string) error
- func (c *VectorClient) EnsureCollection(ctx context.Context, name string, dimensions uint64) error
- func (c *VectorClient) Health(ctx context.Context) HealthStatus
- func (c *VectorClient) Search(ctx context.Context, opts *VectorSearchOpts) ([]VectorSearchResult, error)
- func (c *VectorClient) Upsert(ctx context.Context, collection string, points []*VectorPoint) error
- type VectorConfig
- type VectorPoint
- type VectorSearchOpts
- type VectorSearchResult
- type WorkerOptions
- type WorkflowRun
- type ZAPConfig
- type ZAPNode
Constants ¶
const ( OpVectorUpsert uint16 = 0x10 OpVectorSearch uint16 = 0x11 OpVectorDelete uint16 = 0x12 )
ZAP opcodes for vector operations.
Variables ¶
var ( // ErrNotConfigured is returned when a service is not configured ErrNotConfigured = errors.New("infra: service not configured") // ErrNotConnected is returned when a service is not connected ErrNotConnected = errors.New("infra: service not connected") // ErrConnectionFailed is returned when a connection attempt fails ErrConnectionFailed = errors.New("infra: connection failed") // ErrTimeout is returned when an operation times out ErrTimeout = errors.New("infra: operation timeout") // ErrClosed is returned when operating on closed infrastructure ErrClosed = errors.New("infra: infrastructure closed") )
var ( // ErrLockNotAcquired is returned when the lock is already held. ErrLockNotAcquired = errors.New("lock: not acquired") // ErrLockNotHeld is returned when releasing or extending a lock // that is no longer held by this instance. ErrLockNotHeld = errors.New("lock: not held") )
Functions ¶
func ActivityOptions ¶
func ActivityOptions(taskQueue string, timeout time.Duration, retries int32) workflow.ActivityOptions
ActivityOptions returns activity options for use in workflows
func BuildZAPRequest ¶ added in v1.37.0
BuildZAPRequest builds a ZAP request message with the given payload. The caller sets flags to encode the opcode: flags = opcode << 8.
func LocalActivityOptions ¶
func LocalActivityOptions(timeout time.Duration, retries int32) workflow.LocalActivityOptions
LocalActivityOptions returns local activity options
Types ¶
type Config ¶
type Config struct {
// Vector (Qdrant) configuration
Vector VectorConfig
// KV (Valkey/Redis) configuration
KV KVConfig
// Storage (MinIO) configuration
Storage StorageConfig
// Search (Meilisearch) configuration
Search SearchConfig
// PubSub (NATS) configuration
PubSub PubSubConfig
// Tasks (Temporal) configuration
Tasks TasksConfig
// Global settings
ConnectTimeout time.Duration
RetryAttempts int
RetryDelay time.Duration
}
Config holds configuration for all infrastructure services
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a default configuration for local development
type ConsumerConfig ¶
type ConsumerConfig struct {
Name string
Durable string
Description string
FilterSubject string
DeliverPolicy DeliverPolicy
AckPolicy AckPolicy
AckWait time.Duration
MaxDeliver int
MaxAckPending int
}
ConsumerConfig configures a JetStream consumer
type DeliverPolicy ¶
type DeliverPolicy int
DeliverPolicy defines message delivery policy
const ( DeliverAll DeliverPolicy = iota DeliverNew )
type DocumentsResult ¶
DocumentsResult contains document retrieval results
type GetDocumentsOptions ¶
GetDocumentsOptions configures document retrieval
type HealthStatus ¶
type HealthStatus struct {
Healthy bool `json:"healthy"`
Latency time.Duration `json:"latency"`
Error string `json:"error,omitempty"`
}
HealthStatus represents the health of a service
type IndexSettings ¶
type IndexSettings struct {
SearchableAttributes []string
FilterableAttributes []string
SortableAttributes []string
RankingRules []string
StopWords []string
Synonyms map[string][]string
DistinctAttribute string
}
IndexSettings configures a search index
type IndexStats ¶
IndexStats contains index statistics
type KVClient ¶
type KVClient struct {
// contains filtered or unexported fields
}
KVClient wraps the Redis client for Valkey
func NewKVClient ¶
NewKVClient creates a new Valkey KV client
func (*KVClient) Health ¶
func (c *KVClient) Health(ctx context.Context) HealthStatus
Health checks the Valkey connection
func (*KVClient) SetJSON ¶
func (c *KVClient) SetJSON(ctx context.Context, key string, value interface{}, ttl time.Duration) error
SetJSON marshals and stores a value
type KVConfig ¶
type KVConfig struct {
// Enabled enables the KV service
Enabled bool
// Addr is the Valkey server address (host:port)
Addr string
// Password for authentication (optional)
Password string
// DB is the database number to use
DB int
// TLS enables TLS connection
TLS bool
// PoolSize is the connection pool size
PoolSize int
// MinIdleConns minimum idle connections
MinIdleConns int
// ConnMaxIdleTime maximum idle time for connections
ConnMaxIdleTime time.Duration
// ReadTimeout for read operations
ReadTimeout time.Duration
// WriteTimeout for write operations
WriteTimeout time.Duration
// KeyPrefix is prepended to all keys
KeyPrefix string
}
KVConfig holds Valkey/Redis configuration
type Lock ¶
type Lock struct {
// contains filtered or unexported fields
}
Lock represents a distributed lock backed by Redis/Valkey.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages all infrastructure connections
func (*Manager) Acquire ¶
Acquire attempts to acquire a distributed lock with the given key. The key is prefixed via the KV client's key prefix. Returns a Lock that must be released when done.
func (*Manager) Health ¶
func (m *Manager) Health(ctx context.Context) map[string]HealthStatus
Health checks the health of all connected services
func (*Manager) PubSub ¶
func (m *Manager) PubSub() (*PubSubClient, error)
PubSub returns the NATS pubsub client
func (*Manager) Search ¶
func (m *Manager) Search() (*SearchClient, error)
Search returns the Meilisearch client
func (*Manager) Storage ¶
func (m *Manager) Storage() (*StorageClient, error)
Storage returns the MinIO storage client
func (*Manager) Tasks ¶
func (m *Manager) Tasks() (*TasksClient, error)
Tasks returns the Temporal tasks client
func (*Manager) Vector ¶
func (m *Manager) Vector() (*VectorClient, error)
Vector returns the Qdrant vector client
type Message ¶
type Message struct {
Subject string
Reply string
Data []byte
// contains filtered or unexported fields
}
Message represents a received message
func (*Message) RespondJSON ¶
RespondJSON sends a JSON response
type MessageMetadata ¶
MessageMetadata contains message metadata
type ObjectInfo ¶
type ObjectInfo struct {
Key string
Size int64
ContentType string
ETag string
LastModified time.Time
Metadata map[string]string
}
ObjectInfo contains information about an object
type PubSubClient ¶
type PubSubClient struct {
// contains filtered or unexported fields
}
PubSubClient wraps the NATS client
func NewPubSubClient ¶
func NewPubSubClient(ctx context.Context, cfg *PubSubConfig) (*PubSubClient, error)
NewPubSubClient creates a new NATS pubsub client
func (*PubSubClient) Conn ¶
func (c *PubSubClient) Conn() *nats.Conn
Conn returns the underlying NATS connection for advanced operations
func (*PubSubClient) ConsumeMessages ¶
func (c *PubSubClient) ConsumeMessages(ctx context.Context, stream, consumer string, handler func(*StreamMessage) error) error
ConsumeMessages starts consuming messages from a consumer
func (*PubSubClient) CreateConsumer ¶
func (c *PubSubClient) CreateConsumer(ctx context.Context, stream string, cfg *ConsumerConfig) (jetstream.Consumer, error)
CreateConsumer creates a durable consumer
func (*PubSubClient) EnsureStream ¶
func (c *PubSubClient) EnsureStream(ctx context.Context, cfg *StreamConfig) error
EnsureStream creates a stream if it doesn't exist
func (*PubSubClient) Health ¶
func (c *PubSubClient) Health(ctx context.Context) HealthStatus
Health checks the NATS connection
func (*PubSubClient) JetStream ¶
func (c *PubSubClient) JetStream() jetstream.JetStream
JetStream returns the JetStream context for advanced operations
func (*PubSubClient) PublishJSON ¶
func (c *PubSubClient) PublishJSON(ctx context.Context, subject string, v interface{}) error
PublishJSON publishes a JSON message
func (*PubSubClient) PublishJSONToStream ¶
func (c *PubSubClient) PublishJSONToStream(ctx context.Context, subject string, v interface{}) (*PubAck, error)
PublishJSONToStream publishes a JSON message to a stream
func (*PubSubClient) PublishToStream ¶
func (c *PubSubClient) PublishToStream(ctx context.Context, subject string, data []byte) (*PubAck, error)
PublishToStream publishes a message to a JetStream stream
func (*PubSubClient) QueueSubscribe ¶
func (c *PubSubClient) QueueSubscribe(subject, queue string, handler func(*Message)) (*Subscription, error)
QueueSubscribe subscribes to a subject with a queue group
func (*PubSubClient) Request ¶
func (c *PubSubClient) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)
Request publishes a request and waits for a response
func (*PubSubClient) RequestJSON ¶
func (c *PubSubClient) RequestJSON(ctx context.Context, subject string, req interface{}, resp interface{}, timeout time.Duration) error
RequestJSON publishes a JSON request and unmarshals the response
func (*PubSubClient) Subscribe ¶
func (c *PubSubClient) Subscribe(subject string, handler func(*Message)) (*Subscription, error)
Subscribe subscribes to a subject
type PubSubConfig ¶
type PubSubConfig struct {
// Enabled enables the pubsub service
Enabled bool
// URL is the NATS server URL
URL string
// Name is the client name
Name string
// Token for authentication (optional)
Token string
// User for authentication (optional)
User string
// Password for authentication (optional)
Password string
// TLS enables TLS connection
TLS bool
// ReconnectWait is the wait time between reconnects
ReconnectWait time.Duration
// MaxReconnects is the maximum reconnection attempts
MaxReconnects int
// EnableJetStream enables JetStream for persistence
EnableJetStream bool
}
PubSubConfig holds NATS configuration
type RetentionPolicy ¶
type RetentionPolicy int
RetentionPolicy defines message retention
const ( RetentionLimits RetentionPolicy = 0 RetentionInterest RetentionPolicy = 1 RetentionWorkQueue RetentionPolicy = 2 )
type RetryPolicy ¶
type RetryPolicy struct {
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
MaximumAttempts int32
}
RetryPolicy configures activity/workflow retry behavior
type RetryPolicyInternal ¶
type RetryPolicyInternal struct {
InitialInterval time.Duration
BackoffCoefficient float64
MaximumInterval time.Duration
MaximumAttempts int32
}
RetryPolicyInternal is internal retry policy that converts to temporal
func (*RetryPolicyInternal) ToTemporal ¶
func (r *RetryPolicyInternal) ToTemporal() *temporal.RetryPolicy
ToTemporal converts to temporal retry policy
type SearchClient ¶
type SearchClient struct {
// contains filtered or unexported fields
}
SearchClient wraps the Meilisearch client
func NewSearchClient ¶
func NewSearchClient(ctx context.Context, cfg *SearchConfig) (*SearchClient, error)
NewSearchClient creates a new Meilisearch client
func (*SearchClient) Client ¶
func (c *SearchClient) Client() meilisearch.ServiceManager
Client returns the underlying Meilisearch client for advanced operations
func (*SearchClient) ConfigureIndex ¶
func (c *SearchClient) ConfigureIndex(ctx context.Context, uid string, settings *IndexSettings) error
ConfigureIndex updates index settings
func (*SearchClient) DeleteAll ¶
func (c *SearchClient) DeleteAll(ctx context.Context, indexUID string) error
DeleteAll removes all documents from an index
func (*SearchClient) EnsureIndex ¶
EnsureIndex creates an index if it doesn't exist
func (*SearchClient) GetDocument ¶
func (c *SearchClient) GetDocument(ctx context.Context, indexUID, documentID string, dst interface{}) error
GetDocument retrieves a document by ID
func (*SearchClient) GetDocuments ¶
func (c *SearchClient) GetDocuments(ctx context.Context, indexUID string, opts *GetDocumentsOptions) (*DocumentsResult, error)
GetDocuments retrieves multiple documents
func (*SearchClient) Health ¶
func (c *SearchClient) Health(ctx context.Context) HealthStatus
Health checks the Meilisearch connection
func (*SearchClient) Index ¶
func (c *SearchClient) Index(ctx context.Context, indexUID string, documents interface{}, primaryKey ...string) error
Index adds or updates documents in an index
func (*SearchClient) Search ¶
func (c *SearchClient) Search(ctx context.Context, opts *SearchOptions) (*SearchResult, error)
Search performs a search query
func (*SearchClient) Stats ¶
func (c *SearchClient) Stats(ctx context.Context, indexUID string) (*IndexStats, error)
Stats returns index statistics
type SearchConfig ¶
type SearchConfig struct {
// Enabled enables the search service
Enabled bool
// Host is the Meilisearch server URL
Host string
// APIKey for authentication
APIKey string
// DefaultIndex is the default search index
DefaultIndex string
// Timeout for requests
Timeout time.Duration
}
SearchConfig holds Meilisearch configuration
type SearchOptions ¶
type SearchOptions struct {
Index string
Query string
Offset int
Limit int
Filter interface{}
Sort []string
Facets []string
AttributesToRetrieve []string
AttributesToCrop []string
CropLength int
AttributesToHighlight []string
HighlightPreTag string
HighlightPostTag string
ShowMatchesPosition bool
}
SearchOptions configures a search query
type SearchResult ¶
type SearchResult struct {
Hits []interface{}
NbHits int64
Offset int
Limit int
ProcessingTimeMs int64
Query string
FacetDistribution map[string]interface{}
}
SearchResult contains search results
type StartWorkflowOptions ¶
type StartWorkflowOptions struct {
ID string
TaskQueue string
ExecutionTimeout time.Duration
RunTimeout time.Duration
TaskTimeout time.Duration
CronSchedule string
SearchAttributes map[string]interface{}
Memo map[string]interface{}
RetryPolicy *RetryPolicy
}
StartWorkflowOptions configures workflow execution
type StorageClient ¶
type StorageClient struct {
// contains filtered or unexported fields
}
StorageClient wraps the MinIO client
func NewStorageClient ¶
func NewStorageClient(ctx context.Context, cfg *StorageConfig) (*StorageClient, error)
NewStorageClient creates a new MinIO storage client
func (*StorageClient) Client ¶
func (c *StorageClient) Client() *minio.Client
Client returns the underlying MinIO client for advanced operations
func (*StorageClient) Copy ¶
func (c *StorageClient) Copy(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string) error
Copy copies an object within storage
func (*StorageClient) Delete ¶
func (c *StorageClient) Delete(ctx context.Context, bucket, key string) error
Delete removes a file from storage
func (*StorageClient) DeleteMany ¶
DeleteMany removes multiple files from storage
func (*StorageClient) Download ¶
func (c *StorageClient) Download(ctx context.Context, bucket, key string) (io.ReadCloser, error)
Download downloads a file from storage
func (*StorageClient) DownloadBytes ¶
DownloadBytes downloads a file as bytes
func (*StorageClient) EnsureBucket ¶
func (c *StorageClient) EnsureBucket(ctx context.Context, bucket string) error
EnsureBucket creates a bucket if it doesn't exist
func (*StorageClient) Health ¶
func (c *StorageClient) Health(ctx context.Context) HealthStatus
Health checks the MinIO connection
func (*StorageClient) List ¶
func (c *StorageClient) List(ctx context.Context, bucket, prefix string, maxKeys int) ([]*ObjectInfo, error)
List lists objects in a bucket
func (*StorageClient) PresignedGetURL ¶
func (c *StorageClient) PresignedGetURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)
PresignedGetURL generates a presigned URL for downloading
func (*StorageClient) PresignedPutURL ¶
func (c *StorageClient) PresignedPutURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)
PresignedPutURL generates a presigned URL for uploading
func (*StorageClient) PublicURL ¶
func (c *StorageClient) PublicURL(bucket, key string) string
PublicURL returns the public URL for an object
func (*StorageClient) Stat ¶
func (c *StorageClient) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, error)
Stat returns object info
func (*StorageClient) Upload ¶
func (c *StorageClient) Upload(ctx context.Context, opts *UploadOptions) (*UploadResult, error)
Upload uploads a file to storage
func (*StorageClient) UploadBytes ¶
func (c *StorageClient) UploadBytes(ctx context.Context, bucket, key string, data []byte, contentType string) (*UploadResult, error)
UploadBytes uploads bytes to storage
type StorageConfig ¶
type StorageConfig struct {
// Enabled enables the storage service
Enabled bool
// Endpoint is the MinIO server endpoint (host:port)
Endpoint string
// AccessKey is the access key ID
AccessKey string
// SecretKey is the secret access key
SecretKey string
// UseSSL enables SSL connection
UseSSL bool
// Bucket is the default bucket name
Bucket string
// Region is the bucket region
Region string
// PublicBaseURL is the public URL base for assets
PublicBaseURL string
}
StorageConfig holds MinIO configuration
type StorageType ¶
type StorageType int
StorageType defines storage backend
const ( StorageFile StorageType = 0 StorageMemory StorageType = 1 )
type StreamConfig ¶
type StreamConfig struct {
Name string
Description string
Subjects []string
Retention RetentionPolicy
MaxAge time.Duration
MaxBytes int64
MaxMsgs int64
Storage StorageType
Replicas int
}
StreamConfig configures a JetStream stream
type StreamMessage ¶
type StreamMessage struct {
Subject string
Data []byte
Headers map[string][]string
Metadata *MessageMetadata
// contains filtered or unexported fields
}
StreamMessage represents a message from a stream
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription wraps a NATS subscription
func (*Subscription) Drain ¶
func (s *Subscription) Drain() error
Drain unsubscribes and drains pending messages
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe() error
Unsubscribe removes the subscription
type TasksClient ¶
type TasksClient struct {
// contains filtered or unexported fields
}
TasksClient wraps the Temporal client
func NewTasksClient ¶
func NewTasksClient(ctx context.Context, cfg *TasksConfig) (*TasksClient, error)
NewTasksClient creates a new Temporal tasks client
func (*TasksClient) CancelWorkflow ¶
func (c *TasksClient) CancelWorkflow(ctx context.Context, workflowID, runID string) error
CancelWorkflow cancels a workflow execution
func (*TasksClient) Client ¶
func (c *TasksClient) Client() client.Client
Client returns the underlying Temporal client for advanced operations
func (*TasksClient) Close ¶
func (c *TasksClient) Close()
Close closes the Temporal client and workers
func (*TasksClient) GetWorkflow ¶
func (c *TasksClient) GetWorkflow(ctx context.Context, workflowID, runID string) *WorkflowRun
GetWorkflow retrieves a workflow run
func (*TasksClient) Health ¶
func (c *TasksClient) Health(ctx context.Context) HealthStatus
Health checks the Temporal connection
func (*TasksClient) NewWorker ¶
func (c *TasksClient) NewWorker(taskQueue string, opts *WorkerOptions) (worker.Worker, error)
NewWorker creates a new worker for a task queue
func (*TasksClient) QueryWorkflow ¶
func (c *TasksClient) QueryWorkflow(ctx context.Context, workflowID, runID, queryType string, args ...interface{}) (interface{}, error)
QueryWorkflow queries a running workflow
func (*TasksClient) SignalWithStartWorkflow ¶
func (c *TasksClient) SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, opts *StartWorkflowOptions, workflowFunc interface{}, workflowArgs ...interface{}) (*WorkflowRun, error)
SignalWithStartWorkflow signals or starts a workflow
func (*TasksClient) SignalWorkflow ¶
func (c *TasksClient) SignalWorkflow(ctx context.Context, workflowID, runID, signalName string, arg interface{}) error
SignalWorkflow sends a signal to a workflow
func (*TasksClient) StartAllWorkers ¶
func (c *TasksClient) StartAllWorkers() error
StartAllWorkers starts all workers in background
func (*TasksClient) StartWorker ¶
func (c *TasksClient) StartWorker(taskQueue string) error
StartWorker starts a worker (blocking)
func (*TasksClient) StartWorkflow ¶
func (c *TasksClient) StartWorkflow(ctx context.Context, opts *StartWorkflowOptions, workflowFunc interface{}, args ...interface{}) (*WorkflowRun, error)
StartWorkflow starts a new workflow execution
func (*TasksClient) TerminateWorkflow ¶
func (c *TasksClient) TerminateWorkflow(ctx context.Context, workflowID, runID, reason string) error
TerminateWorkflow terminates a workflow execution
type TasksConfig ¶
type TasksConfig struct {
// Enabled enables the tasks service
Enabled bool
// HostPort is the Temporal server address
HostPort string
// Namespace is the Temporal namespace
Namespace string
// Identity is the worker identity
Identity string
// TaskQueue is the default task queue
TaskQueue string
// TLS configuration
TLS *TasksTLSConfig
}
TasksConfig holds Temporal configuration
type TasksTLSConfig ¶
type TasksTLSConfig struct {
// CertPath is the path to the client certificate
CertPath string
// KeyPath is the path to the client key
KeyPath string
// CACertPath is the path to the CA certificate
CACertPath string
// ServerName for TLS verification
ServerName string
}
TasksTLSConfig holds TLS configuration for Temporal
type UploadOptions ¶
type UploadOptions struct {
Bucket string
Key string
Reader io.Reader
Size int64
ContentType string
Metadata map[string]string
}
UploadOptions configures an upload operation
type UploadResult ¶
UploadResult contains the result of an upload
type VectorClient ¶
type VectorClient struct {
// contains filtered or unexported fields
}
VectorClient wraps the Qdrant REST client
func NewVectorClient ¶
func NewVectorClient(_ context.Context, cfg *VectorConfig) (*VectorClient, error)
NewVectorClient creates a new Qdrant vector client over HTTP
func (*VectorClient) Close ¶
func (c *VectorClient) Close() error
Close is a no-op for the HTTP client (satisfies the interface).
func (*VectorClient) EnsureCollection ¶
EnsureCollection creates a collection if it doesn't exist
func (*VectorClient) Health ¶
func (c *VectorClient) Health(ctx context.Context) HealthStatus
Health checks the Qdrant connection
func (*VectorClient) Search ¶
func (c *VectorClient) Search(ctx context.Context, opts *VectorSearchOpts) ([]VectorSearchResult, error)
Search performs vector similarity search
func (*VectorClient) Upsert ¶
func (c *VectorClient) Upsert(ctx context.Context, collection string, points []*VectorPoint) error
Upsert inserts or updates vectors
type VectorConfig ¶
type VectorConfig struct {
// Enabled enables the vector service
Enabled bool
// Host is the Qdrant server host
Host string
// Port is the Qdrant HTTP port (default: 6333)
Port int
// APIKey for authentication (optional)
APIKey string
// UseTLS enables TLS connection
UseTLS bool
// DefaultCollection is the default collection name
DefaultCollection string
// DefaultDimensions is the default vector dimensions (1536 for OpenAI)
DefaultDimensions uint64
}
VectorConfig holds Qdrant configuration
type VectorPoint ¶
VectorPoint represents a point to insert
type VectorSearchOpts ¶
type VectorSearchOpts struct {
Collection string
Vector []float32
Limit int
MinScore float32
Filter map[string]interface{}
}
VectorSearchOpts configures vector search
type VectorSearchResult ¶
VectorSearchResult represents a search result
type WorkerOptions ¶
type WorkerOptions struct {
MaxConcurrentActivities int
MaxConcurrentWorkflows int
MaxConcurrentLocalActivities int
ActivitiesPerSecond float64
LocalActivitiesPerSecond float64
TaskQueueActivitiesPerSecond float64
EnableSessionWorker bool
}
WorkerOptions configures a worker
type WorkflowRun ¶
WorkflowRun represents a workflow execution
func (*WorkflowRun) Get ¶
func (r *WorkflowRun) Get(ctx context.Context, result interface{}) error
Get waits for the workflow to complete and returns the result
func (*WorkflowRun) GetWithOptions ¶
func (r *WorkflowRun) GetWithOptions(ctx context.Context, result interface{}, opts client.WorkflowRunGetOptions) error
GetWithOptions waits with options
type ZAPNode ¶ added in v1.37.0
type ZAPNode struct {
// contains filtered or unexported fields
}
ZAPNode wraps a zap.Node and delegates to VectorClient.
func NewZAPNode ¶ added in v1.37.0
NewZAPNode creates and starts a ZAP node with vector operation handlers. The node uses NoDiscovery (K8s services connect directly).