infra

package
v1.37.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: MIT Imports: 24 Imported by: 0

Documentation

Overview

Package infra provides unified infrastructure clients for Commerce.

This package integrates various backend services:

  • Vector: Embeddings and semantic search (Qdrant)
  • KV: Key-value cache and sessions (KV_URL, Redis-compatible)
  • Storage: Object storage for assets (S3_URL, S3-compatible)
  • Search: Full-text search (Meilisearch)
  • PubSub: Event streaming (NATS)
  • Tasks: Workflow orchestration (Temporal)

Architecture:

+------------------------------------------------------------------+
|                     Infrastructure Manager                        |
+------------------------------------------------------------------+
|  Vector  |   KV     |  Storage  |  Search  |  PubSub  |  Tasks   |
+------------------------------------------------------------------+

Package infra provides infrastructure clients.

This file implements the KV client (KV_URL, Redis-compatible) for caching and session storage.

Package infra provides infrastructure clients.

This file implements distributed locking backed by Redis/Valkey. Locks use atomic SET NX with TTL for acquisition and Lua scripts for safe compare-and-delete release and compare-and-extend renewal.

Package infra provides infrastructure clients.

This file implements the NATS client for pub/sub messaging and order events.

Package infra provides infrastructure clients.

This file implements the Meilisearch client for full-text product search.

Package infra provides infrastructure clients.

This file implements the S3-compatible storage client (S3_URL) for product images and digital assets.

Package infra provides infrastructure clients.

This file implements the Temporal client for workflow orchestration and background task execution.

Package infra provides infrastructure clients.

This file implements the Qdrant vector database client using the REST API. No gRPC dependency -- plain HTTP + JSON over port 6333.

Package infra provides infrastructure clients.

This file implements the ZAP transport for inter-service vector operations. Other Hanzo services call commerce via ZAP (luxfi/zap) for vector ops. The Qdrant REST client remains the DB access layer -- ZAP wraps it for service-to-service communication.

Index

Constants

View Source
const (
	OpVectorUpsert uint16 = 0x10
	OpVectorSearch uint16 = 0x11
	OpVectorDelete uint16 = 0x12
)

ZAP opcodes for vector operations.

Variables

View Source
var (
	// ErrNotConfigured is returned when a service is not configured
	ErrNotConfigured = errors.New("infra: service not configured")

	// ErrNotConnected is returned when a service is not connected
	ErrNotConnected = errors.New("infra: service not connected")

	// ErrConnectionFailed is returned when a connection attempt fails
	ErrConnectionFailed = errors.New("infra: connection failed")

	// ErrTimeout is returned when an operation times out
	ErrTimeout = errors.New("infra: operation timeout")

	// ErrClosed is returned when operating on closed infrastructure
	ErrClosed = errors.New("infra: infrastructure closed")
)
View Source
var (
	// ErrLockNotAcquired is returned when the lock is already held.
	ErrLockNotAcquired = errors.New("lock: not acquired")

	// ErrLockNotHeld is returned when releasing or extending a lock
	// that is no longer held by this instance.
	ErrLockNotHeld = errors.New("lock: not held")
)

Functions

func ActivityOptions

func ActivityOptions(taskQueue string, timeout time.Duration, retries int32) workflow.ActivityOptions

ActivityOptions returns activity options for use in workflows

func BuildZAPRequest added in v1.37.0

func BuildZAPRequest(opcode uint16, payload []byte) *zap.Message

BuildZAPRequest builds a ZAP request message with the given payload. The caller sets flags to encode the opcode: flags = opcode << 8.

func LocalActivityOptions

func LocalActivityOptions(timeout time.Duration, retries int32) workflow.LocalActivityOptions

LocalActivityOptions returns local activity options

Types

type AckPolicy

type AckPolicy int

AckPolicy defines acknowledgment policy

const (
	AckExplicit AckPolicy = 0
	AckNone     AckPolicy = 1
	AckAll      AckPolicy = 2
)

type Config

type Config struct {
	// Vector (Qdrant) configuration
	Vector VectorConfig

	// KV (Valkey/Redis) configuration
	KV KVConfig

	// Storage (MinIO) configuration
	Storage StorageConfig

	// Search (Meilisearch) configuration
	Search SearchConfig

	// PubSub (NATS) configuration
	PubSub PubSubConfig

	// Tasks (Temporal) configuration
	Tasks TasksConfig

	// Global settings
	ConnectTimeout time.Duration
	RetryAttempts  int
	RetryDelay     time.Duration
}

Config holds configuration for all infrastructure services

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a default configuration for local development

type ConsumerConfig

type ConsumerConfig struct {
	Name          string
	Durable       string
	Description   string
	FilterSubject string
	DeliverPolicy DeliverPolicy
	AckPolicy     AckPolicy
	AckWait       time.Duration
	MaxDeliver    int
	MaxAckPending int
}

ConsumerConfig configures a JetStream consumer

type DeliverPolicy

type DeliverPolicy int

DeliverPolicy defines message delivery policy

const (
	DeliverAll DeliverPolicy = iota
	DeliverNew
)

type DocumentsResult

type DocumentsResult struct {
	Results []map[string]interface{}
	Offset  int
	Limit   int
	Total   int
}

DocumentsResult contains document retrieval results

type GetDocumentsOptions

type GetDocumentsOptions struct {
	Offset int
	Limit  int
	Fields []string
}

GetDocumentsOptions configures document retrieval

type HealthStatus

type HealthStatus struct {
	Healthy bool          `json:"healthy"`
	Latency time.Duration `json:"latency"`
	Error   string        `json:"error,omitempty"`
}

HealthStatus represents the health of a service

type IndexSettings

type IndexSettings struct {
	SearchableAttributes []string
	FilterableAttributes []string
	SortableAttributes   []string
	RankingRules         []string
	StopWords            []string
	Synonyms             map[string][]string
	DistinctAttribute    string
}

IndexSettings configures a search index

type IndexStats

type IndexStats struct {
	NumberOfDocuments int
	IsIndexing        bool
	FieldDistribution map[string]int64
}

IndexStats contains index statistics

type KVClient

type KVClient struct {
	// contains filtered or unexported fields
}

KVClient wraps the Redis client for Valkey

func NewKVClient

func NewKVClient(ctx context.Context, cfg *KVConfig) (*KVClient, error)

NewKVClient creates a new Valkey KV client

func (*KVClient) Client

func (c *KVClient) Client() *kv.Client

Client returns the underlying Redis client for advanced operations

func (*KVClient) Close

func (c *KVClient) Close() error

Close closes the Valkey connection

func (*KVClient) Delete

func (c *KVClient) Delete(ctx context.Context, keys ...string) error

Delete removes a key

func (*KVClient) Exists

func (c *KVClient) Exists(ctx context.Context, keys ...string) (int64, error)

Exists checks if keys exist

func (*KVClient) Expire

func (c *KVClient) Expire(ctx context.Context, key string, ttl time.Duration) error

Expire sets expiration on a key

func (*KVClient) Get

func (c *KVClient) Get(ctx context.Context, key string) (string, error)

Get retrieves a value by key

func (*KVClient) GetJSON

func (c *KVClient) GetJSON(ctx context.Context, key string, dst interface{}) error

GetJSON retrieves and unmarshals a JSON value

func (*KVClient) HDel

func (c *KVClient) HDel(ctx context.Context, key string, fields ...string) error

HDel removes hash fields

func (*KVClient) HGet

func (c *KVClient) HGet(ctx context.Context, key, field string) (string, error)

HGet retrieves a hash field

func (*KVClient) HGetAll

func (c *KVClient) HGetAll(ctx context.Context, key string) (map[string]string, error)

HGetAll retrieves all hash fields

func (*KVClient) HSet

func (c *KVClient) HSet(ctx context.Context, key string, values ...interface{}) error

HSet sets hash fields

func (*KVClient) Health

func (c *KVClient) Health(ctx context.Context) HealthStatus

Health checks the Valkey connection

func (*KVClient) Incr

func (c *KVClient) Incr(ctx context.Context, key string) (int64, error)

Incr increments a counter

func (*KVClient) IncrBy

func (c *KVClient) IncrBy(ctx context.Context, key string, value int64) (int64, error)

IncrBy increments a counter by a value

func (*KVClient) LLen

func (c *KVClient) LLen(ctx context.Context, key string) (int64, error)

LLen returns the length of a list

func (*KVClient) LPush

func (c *KVClient) LPush(ctx context.Context, key string, values ...interface{}) error

LPush prepends values to a list

func (*KVClient) LRange

func (c *KVClient) LRange(ctx context.Context, key string, start, stop int64) ([]string, error)

LRange retrieves list elements

func (*KVClient) Pipeline

func (c *KVClient) Pipeline() kv.Pipeliner

Pipeline returns a pipeline for batched operations

func (*KVClient) Publish

func (c *KVClient) Publish(ctx context.Context, channel string, message interface{}) error

Publish publishes a message to a channel

func (*KVClient) RPush

func (c *KVClient) RPush(ctx context.Context, key string, values ...interface{}) error

RPush appends values to a list

func (*KVClient) SAdd

func (c *KVClient) SAdd(ctx context.Context, key string, members ...interface{}) error

SAdd adds members to a set

func (*KVClient) SIsMember

func (c *KVClient) SIsMember(ctx context.Context, key string, member interface{}) (bool, error)

SIsMember checks if a member is in a set

func (*KVClient) SMembers

func (c *KVClient) SMembers(ctx context.Context, key string) ([]string, error)

SMembers retrieves all set members

func (*KVClient) SRem

func (c *KVClient) SRem(ctx context.Context, key string, members ...interface{}) error

SRem removes members from a set

func (*KVClient) Set

func (c *KVClient) Set(ctx context.Context, key string, value string, ttl time.Duration) error

Set stores a value with optional expiration

func (*KVClient) SetJSON

func (c *KVClient) SetJSON(ctx context.Context, key string, value interface{}, ttl time.Duration) error

SetJSON marshals and stores a value

func (*KVClient) Subscribe

func (c *KVClient) Subscribe(ctx context.Context, channels ...string) *kv.PubSub

Subscribe subscribes to channels

func (*KVClient) TTL

func (c *KVClient) TTL(ctx context.Context, key string) (time.Duration, error)

TTL returns the remaining TTL of a key

func (*KVClient) Watch

func (c *KVClient) Watch(ctx context.Context, fn func(*kv.Tx) error, keys ...string) error

Watch executes a transaction with WATCH

func (*KVClient) ZAdd

func (c *KVClient) ZAdd(ctx context.Context, key string, members ...kv.Z) error

ZAdd adds members to a sorted set

func (*KVClient) ZRange

func (c *KVClient) ZRange(ctx context.Context, key string, start, stop int64) ([]string, error)

ZRange retrieves sorted set members by rank

func (*KVClient) ZRangeByScore

func (c *KVClient) ZRangeByScore(ctx context.Context, key string, opt *kv.ZRangeBy) ([]string, error)

ZRangeByScore retrieves sorted set members by score

type KVConfig

type KVConfig struct {
	// Enabled enables the KV service
	Enabled bool

	// Addr is the Valkey server address (host:port)
	Addr string

	// Password for authentication (optional)
	Password string

	// DB is the database number to use
	DB int

	// TLS enables TLS connection
	TLS bool

	// PoolSize is the connection pool size
	PoolSize int

	// MinIdleConns minimum idle connections
	MinIdleConns int

	// ConnMaxIdleTime maximum idle time for connections
	ConnMaxIdleTime time.Duration

	// ReadTimeout for read operations
	ReadTimeout time.Duration

	// WriteTimeout for write operations
	WriteTimeout time.Duration

	// KeyPrefix is prepended to all keys
	KeyPrefix string
}

KVConfig holds Valkey/Redis configuration

type Lock

type Lock struct {
	// contains filtered or unexported fields
}

Lock represents a distributed lock backed by Redis/Valkey.

func (*Lock) Extend

func (l *Lock) Extend(ctx context.Context, ttl time.Duration) error

Extend extends the TTL of the lock. Only extends if the lock is still held by this instance.

func (*Lock) Release

func (l *Lock) Release(ctx context.Context) error

Release releases the distributed lock. Only releases if the lock is still held by this instance (compare-and-delete).

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages all infrastructure connections

func New

func New(cfg *Config) *Manager

New creates a new infrastructure manager

func (*Manager) Acquire

func (m *Manager) Acquire(ctx context.Context, key string, ttl time.Duration) (*Lock, error)

Acquire attempts to acquire a distributed lock with the given key. The key is prefixed via the KV client's key prefix. Returns a Lock that must be released when done.

func (*Manager) Close

func (m *Manager) Close() error

Close closes all infrastructure connections

func (*Manager) Connect

func (m *Manager) Connect(ctx context.Context) error

Connect establishes connections to all enabled services

func (*Manager) Health

func (m *Manager) Health(ctx context.Context) map[string]HealthStatus

Health checks the health of all connected services

func (*Manager) KV

func (m *Manager) KV() (*KVClient, error)

KV returns the Valkey KV client

func (*Manager) PubSub

func (m *Manager) PubSub() (*PubSubClient, error)

PubSub returns the NATS pubsub client

func (*Manager) Search

func (m *Manager) Search() (*SearchClient, error)

Search returns the Meilisearch client

func (*Manager) Storage

func (m *Manager) Storage() (*StorageClient, error)

Storage returns the MinIO storage client

func (*Manager) Tasks

func (m *Manager) Tasks() (*TasksClient, error)

Tasks returns the Temporal tasks client

func (*Manager) Vector

func (m *Manager) Vector() (*VectorClient, error)

Vector returns the Qdrant vector client

type Message

type Message struct {
	Subject string
	Reply   string
	Data    []byte
	// contains filtered or unexported fields
}

Message represents a received message

func (*Message) Respond

func (m *Message) Respond(data []byte) error

Respond sends a response to a request

func (*Message) RespondJSON

func (m *Message) RespondJSON(v interface{}) error

RespondJSON sends a JSON response

type MessageMetadata

type MessageMetadata struct {
	Sequence   uint64
	Timestamp  time.Time
	NumPending uint64
}

MessageMetadata contains message metadata

type ObjectInfo

type ObjectInfo struct {
	Key          string
	Size         int64
	ContentType  string
	ETag         string
	LastModified time.Time
	Metadata     map[string]string
}

ObjectInfo contains information about an object

type PubAck

type PubAck struct {
	Stream   string
	Sequence uint64
	Domain   string
}

PubAck represents a publish acknowledgment

type PubSubClient

type PubSubClient struct {
	// contains filtered or unexported fields
}

PubSubClient wraps the NATS client

func NewPubSubClient

func NewPubSubClient(ctx context.Context, cfg *PubSubConfig) (*PubSubClient, error)

NewPubSubClient creates a new NATS pubsub client

func (*PubSubClient) Close

func (c *PubSubClient) Close() error

Close closes the NATS connection

func (*PubSubClient) Conn

func (c *PubSubClient) Conn() *nats.Conn

Conn returns the underlying NATS connection for advanced operations

func (*PubSubClient) ConsumeMessages

func (c *PubSubClient) ConsumeMessages(ctx context.Context, stream, consumer string, handler func(*StreamMessage) error) error

ConsumeMessages starts consuming messages from a consumer

func (*PubSubClient) CreateConsumer

func (c *PubSubClient) CreateConsumer(ctx context.Context, stream string, cfg *ConsumerConfig) (jetstream.Consumer, error)

CreateConsumer creates a durable consumer

func (*PubSubClient) EnsureStream

func (c *PubSubClient) EnsureStream(ctx context.Context, cfg *StreamConfig) error

EnsureStream creates a stream if it doesn't exist

func (*PubSubClient) Health

func (c *PubSubClient) Health(ctx context.Context) HealthStatus

Health checks the NATS connection

func (*PubSubClient) JetStream

func (c *PubSubClient) JetStream() jetstream.JetStream

JetStream returns the JetStream context for advanced operations

func (*PubSubClient) Publish

func (c *PubSubClient) Publish(ctx context.Context, subject string, data []byte) error

Publish publishes a message to a subject

func (*PubSubClient) PublishJSON

func (c *PubSubClient) PublishJSON(ctx context.Context, subject string, v interface{}) error

PublishJSON publishes a JSON message

func (*PubSubClient) PublishJSONToStream

func (c *PubSubClient) PublishJSONToStream(ctx context.Context, subject string, v interface{}) (*PubAck, error)

PublishJSONToStream publishes a JSON message to a stream

func (*PubSubClient) PublishToStream

func (c *PubSubClient) PublishToStream(ctx context.Context, subject string, data []byte) (*PubAck, error)

PublishToStream publishes a message to a JetStream stream

func (*PubSubClient) QueueSubscribe

func (c *PubSubClient) QueueSubscribe(subject, queue string, handler func(*Message)) (*Subscription, error)

QueueSubscribe subscribes to a subject with a queue group

func (*PubSubClient) Request

func (c *PubSubClient) Request(ctx context.Context, subject string, data []byte, timeout time.Duration) ([]byte, error)

Request publishes a request and waits for a response

func (*PubSubClient) RequestJSON

func (c *PubSubClient) RequestJSON(ctx context.Context, subject string, req interface{}, resp interface{}, timeout time.Duration) error

RequestJSON publishes a JSON request and unmarshals the response

func (*PubSubClient) Subscribe

func (c *PubSubClient) Subscribe(subject string, handler func(*Message)) (*Subscription, error)

Subscribe subscribes to a subject

type PubSubConfig

type PubSubConfig struct {
	// Enabled enables the pubsub service
	Enabled bool

	// URL is the NATS server URL
	URL string

	// Name is the client name
	Name string

	// Token for authentication (optional)
	Token string

	// User for authentication (optional)
	User string

	// Password for authentication (optional)
	Password string

	// TLS enables TLS connection
	TLS bool

	// ReconnectWait is the wait time between reconnects
	ReconnectWait time.Duration

	// MaxReconnects is the maximum reconnection attempts
	MaxReconnects int

	// EnableJetStream enables JetStream for persistence
	EnableJetStream bool
}

PubSubConfig holds NATS configuration

type RetentionPolicy

type RetentionPolicy int

RetentionPolicy defines message retention

const (
	RetentionLimits    RetentionPolicy = 0
	RetentionInterest  RetentionPolicy = 1
	RetentionWorkQueue RetentionPolicy = 2
)

type RetryPolicy

type RetryPolicy struct {
	InitialInterval    time.Duration
	BackoffCoefficient float64
	MaximumInterval    time.Duration
	MaximumAttempts    int32
}

RetryPolicy configures activity/workflow retry behavior

type RetryPolicyInternal

type RetryPolicyInternal struct {
	InitialInterval    time.Duration
	BackoffCoefficient float64
	MaximumInterval    time.Duration
	MaximumAttempts    int32
}

RetryPolicyInternal is internal retry policy that converts to temporal

func (*RetryPolicyInternal) ToTemporal

func (r *RetryPolicyInternal) ToTemporal() *temporal.RetryPolicy

ToTemporal converts to temporal retry policy

type SearchClient

type SearchClient struct {
	// contains filtered or unexported fields
}

SearchClient wraps the Meilisearch client

func NewSearchClient

func NewSearchClient(ctx context.Context, cfg *SearchConfig) (*SearchClient, error)

NewSearchClient creates a new Meilisearch client

func (*SearchClient) Client

func (c *SearchClient) Client() meilisearch.ServiceManager

Client returns the underlying Meilisearch client for advanced operations

func (*SearchClient) ConfigureIndex

func (c *SearchClient) ConfigureIndex(ctx context.Context, uid string, settings *IndexSettings) error

ConfigureIndex updates index settings

func (*SearchClient) Delete

func (c *SearchClient) Delete(ctx context.Context, indexUID string, documentIDs []string) error

Delete removes documents from an index

func (*SearchClient) DeleteAll

func (c *SearchClient) DeleteAll(ctx context.Context, indexUID string) error

DeleteAll removes all documents from an index

func (*SearchClient) EnsureIndex

func (c *SearchClient) EnsureIndex(ctx context.Context, uid string, primaryKey string) error

EnsureIndex creates an index if it doesn't exist

func (*SearchClient) GetDocument

func (c *SearchClient) GetDocument(ctx context.Context, indexUID, documentID string, dst interface{}) error

GetDocument retrieves a document by ID

func (*SearchClient) GetDocuments

func (c *SearchClient) GetDocuments(ctx context.Context, indexUID string, opts *GetDocumentsOptions) (*DocumentsResult, error)

GetDocuments retrieves multiple documents

func (*SearchClient) Health

func (c *SearchClient) Health(ctx context.Context) HealthStatus

Health checks the Meilisearch connection

func (*SearchClient) Index

func (c *SearchClient) Index(ctx context.Context, indexUID string, documents interface{}, primaryKey ...string) error

Index adds or updates documents in an index

func (*SearchClient) Search

func (c *SearchClient) Search(ctx context.Context, opts *SearchOptions) (*SearchResult, error)

Search performs a search query

func (*SearchClient) Stats

func (c *SearchClient) Stats(ctx context.Context, indexUID string) (*IndexStats, error)

Stats returns index statistics

type SearchConfig

type SearchConfig struct {
	// Enabled enables the search service
	Enabled bool

	// Host is the Meilisearch server URL
	Host string

	// APIKey for authentication
	APIKey string

	// DefaultIndex is the default search index
	DefaultIndex string

	// Timeout for requests
	Timeout time.Duration
}

SearchConfig holds Meilisearch configuration

type SearchOptions

type SearchOptions struct {
	Index                 string
	Query                 string
	Offset                int
	Limit                 int
	Filter                interface{}
	Sort                  []string
	Facets                []string
	AttributesToRetrieve  []string
	AttributesToCrop      []string
	CropLength            int
	AttributesToHighlight []string
	HighlightPreTag       string
	HighlightPostTag      string
	ShowMatchesPosition   bool
}

SearchOptions configures a search query

type SearchResult

type SearchResult struct {
	Hits              []interface{}
	NbHits            int64
	Offset            int
	Limit             int
	ProcessingTimeMs  int64
	Query             string
	FacetDistribution map[string]interface{}
}

SearchResult contains search results

type StartWorkflowOptions

type StartWorkflowOptions struct {
	ID               string
	TaskQueue        string
	ExecutionTimeout time.Duration
	RunTimeout       time.Duration
	TaskTimeout      time.Duration
	CronSchedule     string
	SearchAttributes map[string]interface{}
	Memo             map[string]interface{}
	RetryPolicy      *RetryPolicy
}

StartWorkflowOptions configures workflow execution

type StorageClient

type StorageClient struct {
	// contains filtered or unexported fields
}

StorageClient wraps the MinIO client

func NewStorageClient

func NewStorageClient(ctx context.Context, cfg *StorageConfig) (*StorageClient, error)

NewStorageClient creates a new MinIO storage client

func (*StorageClient) Client

func (c *StorageClient) Client() *minio.Client

Client returns the underlying MinIO client for advanced operations

func (*StorageClient) Copy

func (c *StorageClient) Copy(ctx context.Context, srcBucket, srcKey, dstBucket, dstKey string) error

Copy copies an object within storage

func (*StorageClient) Delete

func (c *StorageClient) Delete(ctx context.Context, bucket, key string) error

Delete removes a file from storage

func (*StorageClient) DeleteMany

func (c *StorageClient) DeleteMany(ctx context.Context, bucket string, keys []string) error

DeleteMany removes multiple files from storage

func (*StorageClient) Download

func (c *StorageClient) Download(ctx context.Context, bucket, key string) (io.ReadCloser, error)

Download downloads a file from storage

func (*StorageClient) DownloadBytes

func (c *StorageClient) DownloadBytes(ctx context.Context, bucket, key string) ([]byte, error)

DownloadBytes downloads a file as bytes

func (*StorageClient) EnsureBucket

func (c *StorageClient) EnsureBucket(ctx context.Context, bucket string) error

EnsureBucket creates a bucket if it doesn't exist

func (*StorageClient) Exists

func (c *StorageClient) Exists(ctx context.Context, bucket, key string) (bool, error)

Exists checks if a file exists

func (*StorageClient) Health

func (c *StorageClient) Health(ctx context.Context) HealthStatus

Health checks the MinIO connection

func (*StorageClient) List

func (c *StorageClient) List(ctx context.Context, bucket, prefix string, maxKeys int) ([]*ObjectInfo, error)

List lists objects in a bucket

func (*StorageClient) PresignedGetURL

func (c *StorageClient) PresignedGetURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)

PresignedGetURL generates a presigned URL for downloading

func (*StorageClient) PresignedPutURL

func (c *StorageClient) PresignedPutURL(ctx context.Context, bucket, key string, expiry time.Duration) (string, error)

PresignedPutURL generates a presigned URL for uploading

func (*StorageClient) PublicURL

func (c *StorageClient) PublicURL(bucket, key string) string

PublicURL returns the public URL for an object

func (*StorageClient) Stat

func (c *StorageClient) Stat(ctx context.Context, bucket, key string) (*ObjectInfo, error)

Stat returns object info

func (*StorageClient) Upload

func (c *StorageClient) Upload(ctx context.Context, opts *UploadOptions) (*UploadResult, error)

Upload uploads a file to storage

func (*StorageClient) UploadBytes

func (c *StorageClient) UploadBytes(ctx context.Context, bucket, key string, data []byte, contentType string) (*UploadResult, error)

UploadBytes uploads bytes to storage

type StorageConfig

type StorageConfig struct {
	// Enabled enables the storage service
	Enabled bool

	// Endpoint is the MinIO server endpoint (host:port)
	Endpoint string

	// AccessKey is the access key ID
	AccessKey string

	// SecretKey is the secret access key
	SecretKey string

	// UseSSL enables SSL connection
	UseSSL bool

	// Bucket is the default bucket name
	Bucket string

	// Region is the bucket region
	Region string

	// PublicBaseURL is the public URL base for assets
	PublicBaseURL string
}

StorageConfig holds MinIO configuration

type StorageType

type StorageType int

StorageType defines storage backend

const (
	StorageFile   StorageType = 0
	StorageMemory StorageType = 1
)

type StreamConfig

type StreamConfig struct {
	Name        string
	Description string
	Subjects    []string
	Retention   RetentionPolicy
	MaxAge      time.Duration
	MaxBytes    int64
	MaxMsgs     int64
	Storage     StorageType
	Replicas    int
}

StreamConfig configures a JetStream stream

type StreamMessage

type StreamMessage struct {
	Subject  string
	Data     []byte
	Headers  map[string][]string
	Metadata *MessageMetadata
	// contains filtered or unexported fields
}

StreamMessage represents a message from a stream

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription wraps a NATS subscription

func (*Subscription) Drain

func (s *Subscription) Drain() error

Drain unsubscribes and drains pending messages

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe removes the subscription

type TasksClient

type TasksClient struct {
	// contains filtered or unexported fields
}

TasksClient wraps the Temporal client

func NewTasksClient

func NewTasksClient(ctx context.Context, cfg *TasksConfig) (*TasksClient, error)

NewTasksClient creates a new Temporal tasks client

func (*TasksClient) CancelWorkflow

func (c *TasksClient) CancelWorkflow(ctx context.Context, workflowID, runID string) error

CancelWorkflow cancels a workflow execution

func (*TasksClient) Client

func (c *TasksClient) Client() client.Client

Client returns the underlying Temporal client for advanced operations

func (*TasksClient) Close

func (c *TasksClient) Close()

Close closes the Temporal client and workers

func (*TasksClient) GetWorkflow

func (c *TasksClient) GetWorkflow(ctx context.Context, workflowID, runID string) *WorkflowRun

GetWorkflow retrieves a workflow run

func (*TasksClient) Health

func (c *TasksClient) Health(ctx context.Context) HealthStatus

Health checks the Temporal connection

func (*TasksClient) NewWorker

func (c *TasksClient) NewWorker(taskQueue string, opts *WorkerOptions) (worker.Worker, error)

NewWorker creates a new worker for a task queue

func (*TasksClient) QueryWorkflow

func (c *TasksClient) QueryWorkflow(ctx context.Context, workflowID, runID, queryType string, args ...interface{}) (interface{}, error)

QueryWorkflow queries a running workflow

func (*TasksClient) SignalWithStartWorkflow

func (c *TasksClient) SignalWithStartWorkflow(ctx context.Context, workflowID string, signalName string, signalArg interface{}, opts *StartWorkflowOptions, workflowFunc interface{}, workflowArgs ...interface{}) (*WorkflowRun, error)

SignalWithStartWorkflow signals or starts a workflow

func (*TasksClient) SignalWorkflow

func (c *TasksClient) SignalWorkflow(ctx context.Context, workflowID, runID, signalName string, arg interface{}) error

SignalWorkflow sends a signal to a workflow

func (*TasksClient) StartAllWorkers

func (c *TasksClient) StartAllWorkers() error

StartAllWorkers starts all workers in background

func (*TasksClient) StartWorker

func (c *TasksClient) StartWorker(taskQueue string) error

StartWorker starts a worker (blocking)

func (*TasksClient) StartWorkflow

func (c *TasksClient) StartWorkflow(ctx context.Context, opts *StartWorkflowOptions, workflowFunc interface{}, args ...interface{}) (*WorkflowRun, error)

StartWorkflow starts a new workflow execution

func (*TasksClient) TerminateWorkflow

func (c *TasksClient) TerminateWorkflow(ctx context.Context, workflowID, runID, reason string) error

TerminateWorkflow terminates a workflow execution

type TasksConfig

type TasksConfig struct {
	// Enabled enables the tasks service
	Enabled bool

	// HostPort is the Temporal server address
	HostPort string

	// Namespace is the Temporal namespace
	Namespace string

	// Identity is the worker identity
	Identity string

	// TaskQueue is the default task queue
	TaskQueue string

	// TLS configuration
	TLS *TasksTLSConfig
}

TasksConfig holds Temporal configuration

type TasksTLSConfig

type TasksTLSConfig struct {
	// CertPath is the path to the client certificate
	CertPath string

	// KeyPath is the path to the client key
	KeyPath string

	// CACertPath is the path to the CA certificate
	CACertPath string

	// ServerName for TLS verification
	ServerName string
}

TasksTLSConfig holds TLS configuration for Temporal

type UploadOptions

type UploadOptions struct {
	Bucket      string
	Key         string
	Reader      io.Reader
	Size        int64
	ContentType string
	Metadata    map[string]string
}

UploadOptions configures an upload operation

type UploadResult

type UploadResult struct {
	Bucket   string
	Key      string
	ETag     string
	Size     int64
	Location string
}

UploadResult contains the result of an upload

type VectorClient

type VectorClient struct {
	// contains filtered or unexported fields
}

VectorClient wraps the Qdrant REST client

func NewVectorClient

func NewVectorClient(_ context.Context, cfg *VectorConfig) (*VectorClient, error)

NewVectorClient creates a new Qdrant vector client over HTTP

func (*VectorClient) Close

func (c *VectorClient) Close() error

Close is a no-op for the HTTP client (satisfies the interface).

func (*VectorClient) Delete

func (c *VectorClient) Delete(ctx context.Context, collection string, ids []string) error

Delete removes vectors by ID

func (*VectorClient) EnsureCollection

func (c *VectorClient) EnsureCollection(ctx context.Context, name string, dimensions uint64) error

EnsureCollection creates a collection if it doesn't exist

func (*VectorClient) Health

func (c *VectorClient) Health(ctx context.Context) HealthStatus

Health checks the Qdrant connection

func (*VectorClient) Search

Search performs vector similarity search

func (*VectorClient) Upsert

func (c *VectorClient) Upsert(ctx context.Context, collection string, points []*VectorPoint) error

Upsert inserts or updates vectors

type VectorConfig

type VectorConfig struct {
	// Enabled enables the vector service
	Enabled bool

	// Host is the Qdrant server host
	Host string

	// Port is the Qdrant HTTP port (default: 6333)
	Port int

	// APIKey for authentication (optional)
	APIKey string

	// UseTLS enables TLS connection
	UseTLS bool

	// DefaultCollection is the default collection name
	DefaultCollection string

	// DefaultDimensions is the default vector dimensions (1536 for OpenAI)
	DefaultDimensions uint64
}

VectorConfig holds Qdrant configuration

type VectorPoint

type VectorPoint struct {
	ID      string
	Vector  []float32
	Payload map[string]interface{}
}

VectorPoint represents a point to insert

type VectorSearchOpts

type VectorSearchOpts struct {
	Collection string
	Vector     []float32
	Limit      int
	MinScore   float32
	Filter     map[string]interface{}
}

VectorSearchOpts configures vector search

type VectorSearchResult

type VectorSearchResult struct {
	ID      string
	Score   float32
	Payload map[string]interface{}
}

VectorSearchResult represents a search result

type WorkerOptions

type WorkerOptions struct {
	MaxConcurrentActivities      int
	MaxConcurrentWorkflows       int
	MaxConcurrentLocalActivities int
	ActivitiesPerSecond          float64
	LocalActivitiesPerSecond     float64
	TaskQueueActivitiesPerSecond float64
	EnableSessionWorker          bool
}

WorkerOptions configures a worker

type WorkflowRun

type WorkflowRun struct {
	ID    string
	RunID string
	// contains filtered or unexported fields
}

WorkflowRun represents a workflow execution

func (*WorkflowRun) Get

func (r *WorkflowRun) Get(ctx context.Context, result interface{}) error

Get waits for the workflow to complete and returns the result

func (*WorkflowRun) GetWithOptions

func (r *WorkflowRun) GetWithOptions(ctx context.Context, result interface{}, opts client.WorkflowRunGetOptions) error

GetWithOptions waits with options

type ZAPConfig added in v1.37.0

type ZAPConfig struct {
	Enabled bool
	NodeID  string
	Port    int
}

ZAPConfig configures the ZAP transport node.

type ZAPNode added in v1.37.0

type ZAPNode struct {
	// contains filtered or unexported fields
}

ZAPNode wraps a zap.Node and delegates to VectorClient.

func NewZAPNode added in v1.37.0

func NewZAPNode(cfg *ZAPConfig, vector *VectorClient, logger *slog.Logger) (*ZAPNode, error)

NewZAPNode creates and starts a ZAP node with vector operation handlers. The node uses NoDiscovery (K8s services connect directly).

func (*ZAPNode) Node added in v1.37.0

func (z *ZAPNode) Node() *zap.Node

Node returns the underlying zap.Node (for health checks, peer info, etc.)

func (*ZAPNode) Stop added in v1.37.0

func (z *ZAPNode) Stop()

Stop stops the ZAP node.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL