manager

package
v0.4.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExecutionModeParallel     = "parallel"
	ExecutionModeSequential   = "sequential"
	ExecutionModeConfigurable = "configurable"
	EnvJobExecutionMode       = "JOB_EXECUTION_MODE"
)

Variables

This section is empty.

Functions

func ComputeJobState added in v0.4.0

func ComputeJobState(tasks []task.Task) task.State

func NewService

func NewService(
	repos *storage.Repositories,
	s scheduler.Scheduler, pubsub mqtt.PubSub,
	domainID, channelID string, logger *slog.Logger,
) (Service, CronScheduler)

Types

type CronScheduler added in v0.4.0

type CronScheduler interface {
	Start(ctx context.Context) error
	Stop()
	ScheduleTask(ctx context.Context, taskID string) error
	UnscheduleTask(ctx context.Context, taskID string) error
}

CronScheduler defines the behavior for managing cron-based task scheduling.

func NewCronScheduler added in v0.4.0

func NewCronScheduler(tasksDB storage.TaskRepository, service Service, logger *slog.Logger) CronScheduler

type ExperimentConfig added in v0.4.0

type ExperimentConfig struct {
	ExperimentID  string         `json:"experiment_id"`
	RoundID       string         `json:"round_id"`
	ModelRef      string         `json:"model_ref"`
	Participants  []string       `json:"participants"`
	Hyperparams   map[string]any `json:"hyperparams"`
	KOfN          int            `json:"k_of_n"`
	TimeoutS      int            `json:"timeout_s"`
	TaskWasmImage string         `json:"task_wasm_image,omitempty"`
}

type FLTask added in v0.4.0

type FLTask struct {
	RoundID     string         `json:"round_id"`
	ModelRef    string         `json:"model_ref"`
	Config      map[string]any `json:"config"`
	Hyperparams map[string]any `json:"hyperparams,omitempty"`
}

type FLUpdate added in v0.4.0

type FLUpdate = fl.Update

type JobPage added in v0.4.0

type JobPage struct {
	Offset uint64       `json:"offset"`
	Limit  uint64       `json:"limit"`
	Total  uint64       `json:"total"`
	Jobs   []JobSummary `json:"jobs"`
}

type JobSummary added in v0.4.0

type JobSummary struct {
	JobID      string      `json:"job_id"`
	Name       string      `json:"name,omitempty"`
	State      task.State  `json:"state"`
	Tasks      []task.Task `json:"tasks"`
	StartTime  time.Time   `json:"start_time"`
	FinishTime time.Time   `json:"finish_time"`
	CreatedAt  time.Time   `json:"created_at"`
}

type PropletMetrics added in v0.4.0

type PropletMetrics = storage.PropletMetrics

type PropletMetricsPage added in v0.4.0

type PropletMetricsPage struct {
	Offset  uint64           `json:"offset"`
	Limit   uint64           `json:"limit"`
	Total   uint64           `json:"total"`
	Metrics []PropletMetrics `json:"metrics"`
}

type RoundStatus added in v0.4.0

type RoundStatus struct {
	RoundID      string `json:"round_id"`
	Completed    bool   `json:"completed"`
	NumUpdates   int    `json:"num_updates"`
	KOfN         int    `json:"k_of_n"`
	ModelVersion int    `json:"model_version,omitempty"`
}

type Service

type Service interface {
	GetProplet(ctx context.Context, propletID string) (proplet.Proplet, error)
	ListProplets(ctx context.Context, offset, limit uint64) (proplet.PropletPage, error)
	SelectProplet(ctx context.Context, task task.Task) (proplet.Proplet, error)
	DeleteProplet(ctx context.Context, propletID string) error

	CreateTask(ctx context.Context, task task.Task) (task.Task, error)
	CreateWorkflow(ctx context.Context, tasks []task.Task) ([]task.Task, error)
	CreateJob(ctx context.Context, name string, tasks []task.Task, executionMode string) (string, []task.Task, error)
	GetTask(ctx context.Context, taskID string) (task.Task, error)
	GetJob(ctx context.Context, jobID string) ([]task.Task, error)
	ListJobs(ctx context.Context, offset, limit uint64) (JobPage, error)
	StartJob(ctx context.Context, jobID string) error
	StopJob(ctx context.Context, jobID string) error
	ListTasks(ctx context.Context, offset, limit uint64) (task.TaskPage, error)
	UpdateTask(ctx context.Context, task task.Task) (task.Task, error)
	DeleteTask(ctx context.Context, taskID string) error
	StartTask(ctx context.Context, taskID string) error
	StopTask(ctx context.Context, taskID string) error

	GetTaskResults(ctx context.Context, taskID string) (any, error)
	GetParentResults(ctx context.Context, taskID string) (map[string]any, error)

	GetTaskMetrics(ctx context.Context, taskID string, offset, limit uint64) (TaskMetricsPage, error)
	GetPropletMetrics(ctx context.Context, propletID string, offset, limit uint64) (PropletMetricsPage, error)

	// Orchestrator/Experiment Config API (Manager acts as Orchestrator per diagram)
	// Step 1: Configure experiment with FL Coordinator
	ConfigureExperiment(ctx context.Context, config ExperimentConfig) error

	// Federated Learning Forwarding API (workload-agnostic)
	// Note: In the diagram, clients call FL Coordinator directly (Steps 3 & 7)
	// These endpoints exist for compatibility/MQTT forwarding
	GetFLTask(ctx context.Context, roundID, propletID string) (FLTask, error)
	PostFLUpdate(ctx context.Context, update FLUpdate) error
	PostFLUpdateCBOR(ctx context.Context, updateData []byte) error
	GetRoundStatus(ctx context.Context, roundID string) (RoundStatus, error)

	Subscribe(ctx context.Context) error

	Shutdown(ctx context.Context) error
	RecoverInterruptedTasks(ctx context.Context) error
}

type TaskMetrics added in v0.4.0

type TaskMetrics = storage.TaskMetrics

type TaskMetricsPage added in v0.4.0

type TaskMetricsPage struct {
	Offset  uint64        `json:"offset"`
	Limit   uint64        `json:"limit"`
	Total   uint64        `json:"total"`
	Metrics []TaskMetrics `json:"metrics"`
}

type WorkflowCoordinator added in v0.4.0

type WorkflowCoordinator struct {
	// contains filtered or unexported fields
}

func NewWorkflowCoordinator added in v0.4.0

func NewWorkflowCoordinator(taskRepo storage.TaskRepository, service Service, logger *slog.Logger) *WorkflowCoordinator

func (*WorkflowCoordinator) CheckAndStartReadyTasks added in v0.4.0

func (wc *WorkflowCoordinator) CheckAndStartReadyTasks(ctx context.Context, workflowID string) error

func (*WorkflowCoordinator) EvaluateConditionalExecution added in v0.4.0

func (wc *WorkflowCoordinator) EvaluateConditionalExecution(ctx context.Context, t task.Task, parentStates map[string]task.State) bool

func (*WorkflowCoordinator) OnTaskCompletion added in v0.4.0

func (wc *WorkflowCoordinator) OnTaskCompletion(ctx context.Context, taskID string) error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL