Documentation
¶
Index ¶
- Constants
- func ComputeJobState(tasks []task.Task) task.State
- func NewService(repos *storage.Repositories, s scheduler.Scheduler, pubsub mqtt.PubSub, ...) (Service, CronScheduler)
- type CronScheduler
- type ExperimentConfig
- type FLTask
- type FLUpdate
- type JobPage
- type JobSummary
- type PropletMetrics
- type PropletMetricsPage
- type RoundStatus
- type Service
- type TaskMetrics
- type TaskMetricsPage
- type WorkflowCoordinator
- func (wc *WorkflowCoordinator) CheckAndStartReadyTasks(ctx context.Context, workflowID string) error
- func (wc *WorkflowCoordinator) EvaluateConditionalExecution(ctx context.Context, t task.Task, parentStates map[string]task.State) bool
- func (wc *WorkflowCoordinator) OnTaskCompletion(ctx context.Context, taskID string) error
Constants ¶
View Source
const ( ExecutionModeParallel = "parallel" ExecutionModeSequential = "sequential" ExecutionModeConfigurable = "configurable" EnvJobExecutionMode = "JOB_EXECUTION_MODE" )
Variables ¶
This section is empty.
Functions ¶
func NewService ¶
Types ¶
type CronScheduler ¶ added in v0.4.0
type CronScheduler interface {
Start(ctx context.Context) error
Stop()
ScheduleTask(ctx context.Context, taskID string) error
UnscheduleTask(ctx context.Context, taskID string) error
}
CronScheduler defines the behavior for managing cron-based task scheduling.
func NewCronScheduler ¶ added in v0.4.0
func NewCronScheduler(tasksDB storage.TaskRepository, service Service, logger *slog.Logger) CronScheduler
type ExperimentConfig ¶ added in v0.4.0
type ExperimentConfig struct {
ExperimentID string `json:"experiment_id"`
RoundID string `json:"round_id"`
ModelRef string `json:"model_ref"`
Participants []string `json:"participants"`
Hyperparams map[string]any `json:"hyperparams"`
KOfN int `json:"k_of_n"`
TimeoutS int `json:"timeout_s"`
TaskWasmImage string `json:"task_wasm_image,omitempty"`
}
type JobPage ¶ added in v0.4.0
type JobPage struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Total uint64 `json:"total"`
Jobs []JobSummary `json:"jobs"`
}
type JobSummary ¶ added in v0.4.0
type PropletMetrics ¶ added in v0.4.0
type PropletMetrics = storage.PropletMetrics
type PropletMetricsPage ¶ added in v0.4.0
type PropletMetricsPage struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Total uint64 `json:"total"`
Metrics []PropletMetrics `json:"metrics"`
}
type RoundStatus ¶ added in v0.4.0
type Service ¶
type Service interface {
GetProplet(ctx context.Context, propletID string) (proplet.Proplet, error)
ListProplets(ctx context.Context, offset, limit uint64) (proplet.PropletPage, error)
SelectProplet(ctx context.Context, task task.Task) (proplet.Proplet, error)
DeleteProplet(ctx context.Context, propletID string) error
CreateTask(ctx context.Context, task task.Task) (task.Task, error)
CreateWorkflow(ctx context.Context, tasks []task.Task) ([]task.Task, error)
CreateJob(ctx context.Context, name string, tasks []task.Task, executionMode string) (string, []task.Task, error)
GetTask(ctx context.Context, taskID string) (task.Task, error)
GetJob(ctx context.Context, jobID string) ([]task.Task, error)
ListJobs(ctx context.Context, offset, limit uint64) (JobPage, error)
StartJob(ctx context.Context, jobID string) error
StopJob(ctx context.Context, jobID string) error
ListTasks(ctx context.Context, offset, limit uint64) (task.TaskPage, error)
UpdateTask(ctx context.Context, task task.Task) (task.Task, error)
DeleteTask(ctx context.Context, taskID string) error
StartTask(ctx context.Context, taskID string) error
StopTask(ctx context.Context, taskID string) error
GetTaskResults(ctx context.Context, taskID string) (any, error)
GetParentResults(ctx context.Context, taskID string) (map[string]any, error)
GetTaskMetrics(ctx context.Context, taskID string, offset, limit uint64) (TaskMetricsPage, error)
GetPropletMetrics(ctx context.Context, propletID string, offset, limit uint64) (PropletMetricsPage, error)
// Orchestrator/Experiment Config API (Manager acts as Orchestrator per diagram)
// Step 1: Configure experiment with FL Coordinator
ConfigureExperiment(ctx context.Context, config ExperimentConfig) error
// Federated Learning Forwarding API (workload-agnostic)
// Note: In the diagram, clients call FL Coordinator directly (Steps 3 & 7)
// These endpoints exist for compatibility/MQTT forwarding
GetFLTask(ctx context.Context, roundID, propletID string) (FLTask, error)
PostFLUpdate(ctx context.Context, update FLUpdate) error
PostFLUpdateCBOR(ctx context.Context, updateData []byte) error
GetRoundStatus(ctx context.Context, roundID string) (RoundStatus, error)
Subscribe(ctx context.Context) error
Shutdown(ctx context.Context) error
RecoverInterruptedTasks(ctx context.Context) error
}
type TaskMetrics ¶ added in v0.4.0
type TaskMetrics = storage.TaskMetrics
type TaskMetricsPage ¶ added in v0.4.0
type TaskMetricsPage struct {
Offset uint64 `json:"offset"`
Limit uint64 `json:"limit"`
Total uint64 `json:"total"`
Metrics []TaskMetrics `json:"metrics"`
}
type WorkflowCoordinator ¶ added in v0.4.0
type WorkflowCoordinator struct {
// contains filtered or unexported fields
}
func NewWorkflowCoordinator ¶ added in v0.4.0
func NewWorkflowCoordinator(taskRepo storage.TaskRepository, service Service, logger *slog.Logger) *WorkflowCoordinator
func (*WorkflowCoordinator) CheckAndStartReadyTasks ¶ added in v0.4.0
func (wc *WorkflowCoordinator) CheckAndStartReadyTasks(ctx context.Context, workflowID string) error
func (*WorkflowCoordinator) EvaluateConditionalExecution ¶ added in v0.4.0
func (*WorkflowCoordinator) OnTaskCompletion ¶ added in v0.4.0
func (wc *WorkflowCoordinator) OnTaskCompletion(ctx context.Context, taskID string) error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.