coordinator

package
v0.0.71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2025 License: Apache-2.0 Imports: 9 Imported by: 0

Documentation

Overview

Package coordinator provides WebSocket-based coordination between services and when-v3. It handles phase management, health tracking, and real-time communication for distributed workflow execution.

Index

Constants

This section is empty.

Variables

ValidTransitions defines which phase transitions are allowed.

Functions

func PayloadToStruct

func PayloadToStruct(payload map[string]interface{}, target interface{}) error

PayloadToStruct converts a message payload to a typed struct.

Types

type CancelPayload

type CancelPayload struct {
	WorkflowID string `json:"workflow_id"`
	Reason     string `json:"reason,omitempty"`
	Force      bool   `json:"force,omitempty"`
}

CancelPayload is the payload for cancel command.

type CheckpointPayload

type CheckpointPayload struct {
	WorkflowID   string                 `json:"workflow_id"`
	CheckpointID string                 `json:"checkpoint_id"`
	Reason       string                 `json:"reason"`
	State        map[string]interface{} `json:"state,omitempty"`
}

CheckpointPayload is the payload for checkpoint message.

type Config

type Config struct {
	// WhenURL is the WebSocket URL to connect to (e.g., "ws://localhost:8080/v1/coordination")
	WhenURL string

	// ServiceName is the name of this service (e.g., "containerservice")
	ServiceName string

	// ServiceID is a unique identifier for this service instance
	ServiceID string

	// InstanceID is a unique identifier for this specific instance (for multi-instance support)
	InstanceID string

	// Capabilities lists what this service can do
	Capabilities []string

	// Version is the service software version
	Version string

	// ProtocolVersion is the coordination protocol version (e.g., "1.0")
	// If empty, defaults to "1.0"
	ProtocolVersion string

	// SchemaVersion is the database schema version this service expects
	SchemaVersion int

	// Reconnect settings
	ReconnectInitialDelay  time.Duration
	ReconnectMaxDelay      time.Duration
	ReconnectBackoffFactor float64
	ReconnectMaxAttempts   int // 0 = infinite

	// PingInterval is how often to send pings
	PingInterval time.Duration

	// Logger for coordinator messages
	Logger *logrus.Entry
}

Config holds configuration for the Coordinator.

func DefaultConfig

func DefaultConfig() Config

DefaultConfig returns a Config with sensible defaults.

type Coordinator

type Coordinator struct {
	// contains filtered or unexported fields
}

Coordinator manages WebSocket communication with when-v3.

func New

func New(config Config) *Coordinator

New creates a new Coordinator.

func (*Coordinator) Close

func (c *Coordinator) Close() error

Close shuts down the coordinator.

func (*Coordinator) Connect

func (c *Coordinator) Connect() error

Connect establishes the WebSocket connection and starts processing.

func (*Coordinator) IsConnected

func (c *Coordinator) IsConnected() bool

IsConnected returns whether the WebSocket is connected.

func (*Coordinator) OnConnected

func (c *Coordinator) OnConnected(fn func())

OnConnected sets a callback for when connection is established.

func (*Coordinator) OnDisconnected

func (c *Coordinator) OnDisconnected(fn func(error))

OnDisconnected sets a callback for when connection is lost.

func (*Coordinator) OnMessage

func (c *Coordinator) OnMessage(msgType MessageType, handler MessageHandler)

OnMessage registers a custom handler for a message type.

func (*Coordinator) OnRegistered

func (c *Coordinator) OnRegistered(fn func(serviceID string))

OnRegistered sets a callback for when registration completes.

func (*Coordinator) Phases

func (c *Coordinator) Phases() *PhaseManager

Phases returns the phase manager for direct access.

func (*Coordinator) Send

func (c *Coordinator) Send(msg *WSMessage)

Send queues a message for sending.

func (*Coordinator) SendCheckpoint

func (c *Coordinator) SendCheckpoint(workflowID, checkpointID, reason string)

SendCheckpoint notifies when-v3 of a checkpoint.

func (*Coordinator) SendError

func (c *Coordinator) SendError(workflowID, actionID, errorMsg string, recoverable bool)

SendError notifies when-v3 of an error.

func (*Coordinator) SendProgress

func (c *Coordinator) SendProgress(workflowID, actionID string, percent float64, stage, message string)

SendProgress notifies when-v3 of progress.

func (*Coordinator) SendWorkflowCreated

func (c *Coordinator) SendWorkflowCreated(workflowID, parentWorkflowID, rootWorkflowID, actionID, actionType string)

SendWorkflowCreated notifies when-v3 of a new child workflow.

type ErrorPayload

type ErrorPayload struct {
	WorkflowID  string `json:"workflow_id"`
	Error       string `json:"error"`
	Recoverable bool   `json:"recoverable"`
	ActionID    string `json:"action_id,omitempty"`
}

ErrorPayload is the payload for error message.

type MessageHandler

type MessageHandler func(msg *WSMessage) error

MessageHandler is a function that handles incoming messages.

type MessageType

type MessageType string

MessageType defines the types of WebSocket messages exchanged between services and when-v3.

const (
	// Service → when-v3 messages
	MessageTypeRegister        MessageType = "register"
	MessageTypeWorkflowCreated MessageType = "workflow_created"
	MessageTypePhaseChanged    MessageType = "phase_changed"
	MessageTypeCheckpoint      MessageType = "checkpoint"
	MessageTypeError           MessageType = "error"
	MessageTypeStatusResponse  MessageType = "status_response"
	MessageTypePong            MessageType = "pong"
	MessageTypeProgress        MessageType = "progress"

	// when-v3 → Service messages
	MessageTypeRegistered MessageType = "registered"
	MessageTypePause      MessageType = "pause"
	MessageTypeResume     MessageType = "resume"
	MessageTypeCancel     MessageType = "cancel"
	MessageTypeStatus     MessageType = "status"
	MessageTypePing       MessageType = "ping"
)

type PausePayload

type PausePayload struct {
	WorkflowID string `json:"workflow_id"`
	Reason     string `json:"reason,omitempty"`
}

PausePayload is the payload for pause command.

type Phase

type Phase string

Phase represents the current phase of a workflow execution.

const (
	PhasePending    Phase = "pending"
	PhasePreFlight  Phase = "pre-flight"
	PhasePlanning   Phase = "planning"
	PhaseExecution  Phase = "execution"
	PhasePausing    Phase = "pausing"
	PhasePaused     Phase = "paused"
	PhaseResuming   Phase = "resuming"
	PhaseCancelling Phase = "cancelling"
	PhaseCancelled  Phase = "cancelled"
	PhaseCompleting Phase = "completing"
	PhaseCompleted  Phase = "completed"
	PhaseFailed     Phase = "failed"
)

func (Phase) CanTransitionTo

func (p Phase) CanTransitionTo(target Phase) bool

CanTransitionTo checks if a transition to the target phase is valid.

func (Phase) IsActive

func (p Phase) IsActive() bool

IsActive returns true if the phase indicates active processing.

func (Phase) IsPausable

func (p Phase) IsPausable() bool

IsPausable returns true if the workflow can be paused from this phase.

func (Phase) IsResumable

func (p Phase) IsResumable() bool

IsResumable returns true if the workflow can be resumed from this phase.

func (Phase) IsTerminal

func (p Phase) IsTerminal() bool

IsTerminal returns true if the phase is a terminal state.

type PhaseChangedPayload

type PhaseChangedPayload struct {
	WorkflowID   string `json:"workflow_id"`
	FromPhase    Phase  `json:"from"`
	ToPhase      Phase  `json:"to"`
	CheckpointID string `json:"checkpoint_id,omitempty"`
	Reason       string `json:"reason,omitempty"`
}

PhaseChangedPayload is the payload for phase_changed message.

type PhaseManager

type PhaseManager struct {
	// contains filtered or unexported fields
}

PhaseManager manages phase states for multiple workflows.

func NewPhaseManager

func NewPhaseManager() *PhaseManager

NewPhaseManager creates a new PhaseManager.

func (*PhaseManager) Cancel

func (pm *PhaseManager) Cancel(workflowID, reason string) error

Cancel initiates cancellation of a workflow.

func (*PhaseManager) Complete

func (pm *PhaseManager) Complete(workflowID string) error

Complete marks a workflow as completed.

func (*PhaseManager) CompleteCancellation

func (pm *PhaseManager) CompleteCancellation(workflowID string) error

CompleteCancellation finishes the cancellation.

func (*PhaseManager) CompletePause

func (pm *PhaseManager) CompletePause(workflowID, checkpointID string) error

CompletePause finishes the pause transition.

func (*PhaseManager) CompleteResume

func (pm *PhaseManager) CompleteResume(workflowID string) error

CompleteResume finishes the resume transition.

func (*PhaseManager) CreateCheckpoint

func (pm *PhaseManager) CreateCheckpoint(workflowID, checkpointID, reason string, state map[string]interface{}) error

CreateCheckpoint creates a checkpoint for a workflow.

func (*PhaseManager) Fail

func (pm *PhaseManager) Fail(workflowID, reason string) error

Fail marks a workflow as failed.

func (*PhaseManager) GetActiveWorkflows

func (pm *PhaseManager) GetActiveWorkflows() []*PhaseState

GetActiveWorkflows returns all workflows that are not in terminal states.

func (*PhaseManager) GetAllWorkflows

func (pm *PhaseManager) GetAllWorkflows() []*PhaseState

GetAllWorkflows returns all tracked workflows.

func (*PhaseManager) GetPhase

func (pm *PhaseManager) GetPhase(workflowID string) (Phase, bool)

GetPhase returns just the current phase of a workflow.

func (*PhaseManager) GetState

func (pm *PhaseManager) GetState(workflowID string) (*PhaseState, bool)

GetState returns the current state of a workflow.

func (*PhaseManager) OnCheckpoint

func (pm *PhaseManager) OnCheckpoint(fn func(workflowID, checkpointID string, state map[string]interface{}))

OnCheckpoint sets a callback for checkpoint creation.

func (*PhaseManager) OnPhaseChanged

func (pm *PhaseManager) OnPhaseChanged(fn func(state *PhaseState))

OnPhaseChanged sets a callback for phase changes.

func (*PhaseManager) Pause

func (pm *PhaseManager) Pause(workflowID, reason string) error

Pause initiates pausing of a workflow.

func (*PhaseManager) RegisterWorkflow

func (pm *PhaseManager) RegisterWorkflow(workflowID, parentWorkflowID, rootWorkflowID string) *PhaseState

RegisterWorkflow registers a new workflow with initial pending state.

func (*PhaseManager) RemoveWorkflow

func (pm *PhaseManager) RemoveWorkflow(workflowID string)

RemoveWorkflow removes a workflow from tracking.

func (*PhaseManager) Resume

func (pm *PhaseManager) Resume(workflowID, fromCheckpoint string) error

Resume initiates resuming of a workflow.

func (*PhaseManager) SetProgress

func (pm *PhaseManager) SetProgress(workflowID string, progress float64, currentAction string) error

SetProgress updates the progress of a workflow.

func (*PhaseManager) TransitionTo

func (pm *PhaseManager) TransitionTo(workflowID string, newPhase Phase, reason string) error

TransitionTo attempts to transition a workflow to a new phase.

type PhaseState

type PhaseState struct {
	WorkflowID       string
	Phase            Phase
	PreviousPhase    Phase
	ChangedAt        time.Time
	Reason           string
	CheckpointID     string
	Progress         float64
	CurrentAction    string
	ParentWorkflowID string
	RootWorkflowID   string
}

PhaseState represents the state of a single workflow's phase.

type ProgressPayload

type ProgressPayload struct {
	WorkflowID  string  `json:"workflow_id"`
	ActionID    string  `json:"action_id,omitempty"`
	Percent     float64 `json:"percent"`
	Stage       string  `json:"stage,omitempty"`
	Message     string  `json:"message,omitempty"`
	CurrentItem int     `json:"current_item,omitempty"`
	TotalItems  int     `json:"total_items,omitempty"`
}

ProgressPayload is the payload for progress message.

type RegisterPayload

type RegisterPayload struct {
	ServiceName     string   `json:"service_name"`
	ServiceID       string   `json:"service_id,omitempty"`
	InstanceID      string   `json:"instance_id,omitempty"`
	Capabilities    []string `json:"capabilities"`
	Version         string   `json:"version,omitempty"`          // Service software version
	ProtocolVersion string   `json:"protocol_version,omitempty"` // Coordination protocol version (e.g., "1.0")
	SchemaVersion   int      `json:"schema_version,omitempty"`   // Database schema version service expects
}

RegisterPayload is the payload for a register message.

type RegisteredPayload

type RegisteredPayload struct {
	ServiceID          string `json:"service_id"`
	InstanceID         string `json:"instance_id,omitempty"`
	Message            string `json:"message,omitempty"`
	ProtocolVersion    string `json:"protocol_version,omitempty"`     // Negotiated protocol version
	HubProtocolVersion string `json:"hub_protocol_version,omitempty"` // Hub's protocol version
}

RegisteredPayload is the payload for a registered response.

type ResumePayload

type ResumePayload struct {
	WorkflowID     string `json:"workflow_id"`
	FromCheckpoint string `json:"from_checkpoint,omitempty"`
}

ResumePayload is the payload for resume command.

type StatusPayload

type StatusPayload struct {
	WorkflowID string `json:"workflow_id"`
}

StatusPayload is the payload for status request.

type StatusResponsePayload

type StatusResponsePayload struct {
	WorkflowID    string  `json:"workflow_id"`
	Phase         Phase   `json:"phase"`
	Progress      float64 `json:"progress"`
	CurrentAction string  `json:"current_action,omitempty"`
	Message       string  `json:"message,omitempty"`
}

StatusResponsePayload is the payload for status_response message.

type WSMessage

type WSMessage struct {
	ID         string                 `json:"id"`                    // For request/response correlation
	Type       MessageType            `json:"type"`                  // Message type
	WorkflowID string                 `json:"workflow_id,omitempty"` // Associated workflow
	Timestamp  time.Time              `json:"timestamp"`             // Message timestamp
	Payload    map[string]interface{} `json:"payload,omitempty"`     // Message-specific data
}

WSMessage is the base message structure for all WebSocket communication.

func NewMessage

func NewMessage(msgType MessageType) *WSMessage

NewMessage creates a new WSMessage with the given type.

func NewMessageWithWorkflow

func NewMessageWithWorkflow(msgType MessageType, workflowID string) *WSMessage

NewMessageWithWorkflow creates a new WSMessage for a specific workflow.

func ParseMessage

func ParseMessage(data []byte) (*WSMessage, error)

ParseMessage deserializes a JSON message.

func (*WSMessage) GetCancelPayload

func (m *WSMessage) GetCancelPayload() (*CancelPayload, error)

GetCancelPayload extracts CancelPayload from message.

func (*WSMessage) GetPausePayload

func (m *WSMessage) GetPausePayload() (*PausePayload, error)

GetPausePayload extracts PausePayload from message.

func (*WSMessage) GetRegisterPayload

func (m *WSMessage) GetRegisterPayload() (*RegisterPayload, error)

GetRegisterPayload extracts RegisterPayload from message.

func (*WSMessage) GetResumePayload

func (m *WSMessage) GetResumePayload() (*ResumePayload, error)

GetResumePayload extracts ResumePayload from message.

func (*WSMessage) JSON

func (m *WSMessage) JSON() ([]byte, error)

JSON serializes the message to JSON bytes.

func (*WSMessage) SetPayload

func (m *WSMessage) SetPayload(payload interface{}) error

SetPayload sets the payload from a typed struct.

type WorkflowCreatedPayload

type WorkflowCreatedPayload struct {
	WorkflowID       string `json:"workflow_id"`
	ParentWorkflowID string `json:"parent_workflow_id,omitempty"`
	RootWorkflowID   string `json:"root_workflow_id,omitempty"`
	ActionID         string `json:"action_id,omitempty"`
	ActionType       string `json:"action_type,omitempty"`
}

WorkflowCreatedPayload is the payload for workflow_created message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL