worker

package
v0.11.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2025 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Heartbeat

type Heartbeat interface {
	// Start begins sending regular heartbeats
	Start(ctx context.Context, workerID xid.ID, options HeartbeatOptions) error

	// Stop terminates heartbeat sending
	Stop(ctx context.Context, workerID xid.ID) error

	// SendHeartbeat sends a single heartbeat immediately
	SendHeartbeat(ctx context.Context, workerID xid.ID, status HeartbeatStatus) error

	// GetLastHeartbeat retrieves the last heartbeat information for a worker
	GetLastHeartbeat(ctx context.Context, workerID xid.ID) (*HeartbeatStatus, time.Time, error)

	// IsAlive checks if a worker is considered alive based on heartbeats
	IsAlive(ctx context.Context, workerID xid.ID) (bool, error)

	// GetDeadWorkers retrieves a list of workers that have failed to send heartbeats
	GetDeadWorkers(ctx context.Context) ([]xid.ID, error)

	// Logger returns a structured logger for heartbeat operations
	Logger() core.Logger
}

Heartbeat defines the interface for worker heartbeat management.

type HeartbeatOptions

type HeartbeatOptions struct {
	// Interval is how often to send heartbeats
	Interval time.Duration `json:"interval"`

	// Timeout is how long before a worker is considered dead if no heartbeat is received
	Timeout time.Duration `json:"timeout"`

	// IncludeDetailedMetrics indicates whether to include detailed system metrics
	IncludeDetailedMetrics bool `json:"includeDetailedMetrics"`

	// RetryAttempts is the number of attempts to make when a heartbeat fails
	RetryAttempts int `json:"retryAttempts"`

	// RetryBackoff is the delay between retry attempts
	RetryBackoff time.Duration `json:"retryBackoff"`
}

HeartbeatOptions configures heartbeat behavior.

type HeartbeatStatus

type HeartbeatStatus struct {
	// Status is the current operational status of the worker
	Status WorkerStatus `json:"status"`

	// CurrentTaskCount is the number of tasks currently being processed
	CurrentTaskCount int `json:"currentTaskCount"`

	// CPUUsage is the current CPU usage percentage (0-100)
	CPUUsage float64 `json:"cpuUsage"`

	// MemoryUsage is the current memory usage percentage (0-100)
	MemoryUsage float64 `json:"memoryUsage"`

	// DiskUsage is the current disk usage percentage (0-100)
	DiskUsage float64 `json:"diskUsage"`

	// NetworkLatency is the current network latency in milliseconds
	NetworkLatency int64 `json:"networkLatency"`

	// LastError contains details of the last error encountered, if any
	LastError string `json:"lastError,omitempty"`

	// TaskIDs contains the IDs of tasks currently being processed
	TaskIDs []string `json:"taskIds,omitempty"`

	// AdditionalMetrics contains any other metrics to report
	AdditionalMetrics map[string]interface{} `json:"additionalMetrics,omitempty"`
}

HeartbeatStatus represents detailed status information for a worker heartbeat.

type Registration

type Registration interface {
	// Register registers the worker with the workflow system
	Register(ctx context.Context, info RegistrationInfo) error

	// Unregister removes the worker from the workflow system
	Unregister(ctx context.Context, workerID xid.ID) error

	// UpdateStatus updates the worker's operational status
	UpdateStatus(ctx context.Context, workerID xid.ID, status WorkerStatus) error

	// UpdateCapabilities updates the worker's capabilities
	UpdateCapabilities(ctx context.Context, workerID xid.ID, capabilities []string) error

	// UpdateMetadata updates the worker's metadata
	UpdateMetadata(ctx context.Context, workerID xid.ID, metadata WorkerMetadata) error

	// GetWorkerInfo retrieves the current worker information
	GetWorkerInfo(ctx context.Context, workerID xid.ID) (*RegistrationInfo, error)

	// Logger returns a structured logger for registration operations
	Logger() core.Logger
}

Registration defines the interface for worker registration.

type RegistrationInfo

type RegistrationInfo struct {
	// ID is the unique identifier for this worker
	ID xid.ID `json:"id"`

	// DisplayName is a human-readable name for this worker
	DisplayName string `json:"displayName"`

	// Status is the current operational status of the worker
	Status WorkerStatus `json:"status"`

	// Capabilities defines what this worker can do
	Capabilities []string `json:"capabilities"`

	// Specializations defines what this worker specializes in
	Specializations []string `json:"specializations,omitempty"`

	// MaxConcurrentTasks is the maximum number of tasks this worker can process simultaneously
	MaxConcurrentTasks int `json:"maxConcurrentTasks"`

	// CurrentTasks is the number of tasks currently being processed
	CurrentTasks int `json:"currentTasks"`

	// Metadata contains additional worker information
	Metadata WorkerMetadata `json:"metadata"`
}

RegistrationInfo represents the information required to register a worker.

type Specialization

type Specialization interface {
	// RegisterCapabilities registers a worker's specialization capabilities
	RegisterCapabilities(ctx context.Context, workerID xid.ID, capabilities []SpecializationCapability) error

	// UpdateCapabilities updates a worker's specialization capabilities
	UpdateCapabilities(ctx context.Context, workerID xid.ID, capabilities []SpecializationCapability) error

	// GetCapabilities retrieves a worker's specialization capabilities
	GetCapabilities(ctx context.Context, workerID xid.ID) ([]SpecializationCapability, error)

	// FindMatchingWorkers finds workers that match specific requirements
	FindMatchingWorkers(ctx context.Context, requirements []SpecializationRequirement) ([]SpecializationMatch, error)

	// CalculateMatchScore determines how well a worker matches requirements
	CalculateMatchScore(capabilities []SpecializationCapability, requirements []SpecializationRequirement) (*SpecializationMatch, error)

	// Logger returns a structured logger for specialization operations
	Logger() core.Logger
}

Specialization defines the interface for worker specialization.

type SpecializationCapability

type SpecializationCapability struct {
	// Type is the category of specialization offered
	Type SpecializationType `json:"type"`

	// Value is the specific capability offered (e.g., integration name)
	Value string `json:"value"`

	// Confidence indicates the worker's proficiency in this capability (0.0-1.0)
	Confidence float64 `json:"confidence"`

	// MaxConcurrent is the maximum number of concurrent tasks of this type
	MaxConcurrent int `json:"maxConcurrent"`

	// Description provides details about this capability
	Description string `json:"description,omitempty"`
}

SpecializationCapability represents a worker's capability to handle specific tasks.

type SpecializationMatch

type SpecializationMatch struct {
	// WorkerID is the worker that matches the requirements
	WorkerID xid.ID `json:"workerId"`

	// MatchScore indicates how well the worker matches (higher is better)
	MatchScore float64 `json:"matchScore"`

	// MissingRequirements contains any required specializations the worker lacks
	MissingRequirements []SpecializationRequirement `json:"missingRequirements,omitempty"`

	// MatchDetails provides details about specific requirement matches
	MatchDetails map[string]float64 `json:"matchDetails,omitempty"`
}

SpecializationMatch represents how well a worker matches task requirements.

type SpecializationRequirement

type SpecializationRequirement struct {
	// Type is the category of specialization required
	Type SpecializationType `json:"type"`

	// Value is the specific value required (e.g., integration name)
	Value string `json:"value"`

	// Priority indicates the importance of this requirement (higher = more important)
	Priority int `json:"priority"`

	// IsRequired indicates if this specialization is mandatory
	IsRequired bool `json:"isRequired"`

	// Description provides additional context about this requirement
	Description string `json:"description,omitempty"`
}

SpecializationRequirement defines requirements for a task to be processed.

type SpecializationType

type SpecializationType string

SpecializationType represents a category of worker specialization.

const (
	// SpecializationTypeIntegration indicates specialization in specific integrations.
	SpecializationTypeIntegration SpecializationType = "integration"

	// SpecializationTypeProject indicates specialization in specific projects.
	SpecializationTypeProject SpecializationType = "project"

	// SpecializationTypeWorkflow indicates specialization in specific workflows.
	SpecializationTypeWorkflow SpecializationType = "workflow"

	// SpecializationTypeResource indicates specialization in resource-intensive operations.
	SpecializationTypeResource SpecializationType = "resource"

	// SpecializationTypeRegion indicates specialization in a geographic region.
	SpecializationTypeRegion SpecializationType = "region"
)

func (SpecializationType) String added in v0.11.3

func (s SpecializationType) String() string

func (SpecializationType) Values added in v0.11.3

func (s SpecializationType) Values() []string

type WorkerMetadata

type WorkerMetadata struct {
	// Hostname is the host where the worker is running
	Hostname string `json:"hostname"`

	// IPAddress is the IP address of the worker
	IPAddress string `json:"ipAddress,omitempty"`

	// Version is the worker software version
	Version string `json:"version"`

	// StartTime is when the worker started
	StartTime time.Time `json:"startTime"`

	// OperatingSystem contains information about the OS
	OperatingSystem string `json:"operatingSystem,omitempty"`

	// Tags contains arbitrary tags for worker classification
	Tags []string `json:"tags,omitempty"`

	// AdditionalInfo contains any other worker information
	AdditionalInfo map[string]interface{} `json:"additionalInfo,omitempty"`
}

WorkerMetadata contains additional metadata about a worker.

type WorkerStatus

type WorkerStatus string

WorkerStatus represents the current operational status of a worker.

const (
	// WorkerStatusIdle indicates the worker is available but not processing any tasks.
	WorkerStatusIdle WorkerStatus = "idle"

	// WorkerStatusBusy indicates the worker is currently processing tasks.
	WorkerStatusBusy WorkerStatus = "busy"

	// WorkerStatusStarting indicates the worker is in the process of starting up.
	WorkerStatusStarting WorkerStatus = "starting"

	// WorkerStatusStopping indicates the worker is in the process of shutting down.
	WorkerStatusStopping WorkerStatus = "stopping"

	// WorkerStatusError indicates the worker has encountered an error.
	WorkerStatusError WorkerStatus = "error"

	// WorkerStatusMaintenance indicates the worker is in maintenance mode.
	WorkerStatusMaintenance WorkerStatus = "maintenance"

	// WorkerStatusOffline indicates the worker is currently not operational or unreachable.
	WorkerStatusOffline WorkerStatus = "offline"

	// WorkerStatusOnline indicates the worker is available and actively connected.
	WorkerStatusOnline WorkerStatus = "online"

	// WorkerStatusDraining indicates the worker is completing current tasks and not accepting new ones.
	WorkerStatusDraining WorkerStatus = "draining"
)

func (WorkerStatus) SQLTypeName added in v0.11.3

func (WorkerStatus) SQLTypeName() string

func (WorkerStatus) String added in v0.11.3

func (s WorkerStatus) String() string

func (WorkerStatus) Values added in v0.11.3

func (s WorkerStatus) Values() []string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL