monitor

package
v0.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 11, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleWebSocket added in v0.1.1

func HandleWebSocket(hub Hub, logger gochainedlog.Logger) http.HandlerFunc

HandleWebSocket returns an http.HandlerFunc that upgrades connections to WebSocket and streams real-time events from the Hub.

Types

type API

type API struct {
	// contains filtered or unexported fields
}

API serves the monitoring HTTP endpoints for the React UI.

func NewAPI

func NewAPI(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *API

NewAPI creates a new monitoring API backed by Redis.

func (*API) Handler

func (a *API) Handler() http.Handler

Handler returns the HTTP handler for mounting in a server.

func (*API) Service added in v0.1.3

func (a *API) Service() *APIService

Service returns the underlying APIService (used by gRPC server).

func (*API) SetSyncRetryStatsFunc

func (a *API) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)

SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.

func (*API) Shutdown

func (a *API) Shutdown()

Shutdown cancels the hub's Redis subscription.

type APIService added in v0.1.1

type APIService struct {
	// contains filtered or unexported fields
}

APIService holds all transport-agnostic business logic for the monitoring API.

func NewAPIService added in v0.1.1

func NewAPIService(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *APIService

NewAPIService creates a new APIService, starting the hub's Redis subscription.

func (*APIService) BulkCancelBatches added in v0.1.1

func (a *APIService) BulkCancelBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkCancelJobs added in v0.1.1

func (a *APIService) BulkCancelJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkCancelWorkflows added in v0.1.1

func (a *APIService) BulkCancelWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkDeleteBatches added in v0.1.1

func (a *APIService) BulkDeleteBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkDeleteJobs added in v0.1.1

func (a *APIService) BulkDeleteJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkDeleteWorkflows added in v0.1.1

func (a *APIService) BulkDeleteWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkRetryBatches added in v0.1.1

func (a *APIService) BulkRetryBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkRetryJobs added in v0.1.1

func (a *APIService) BulkRetryJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) BulkRetryWorkflows added in v0.1.1

func (a *APIService) BulkRetryWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)

func (*APIService) CancelBatch added in v0.1.1

func (a *APIService) CancelBatch(ctx context.Context, batchID string) error

func (*APIService) CancelJob added in v0.1.1

func (a *APIService) CancelJob(ctx context.Context, jobID string) error

func (*APIService) CancelWorkflow added in v0.1.1

func (a *APIService) CancelWorkflow(ctx context.Context, wfID string) error

func (*APIService) CheckUniqueKey added in v0.1.2

func (a *APIService) CheckUniqueKey(ctx context.Context, key string) (*UniqueKeyResult, error)

func (*APIService) DeleteJob added in v0.1.1

func (a *APIService) DeleteJob(ctx context.Context, jobID string) error

func (*APIService) DeleteUniqueKey added in v0.1.2

func (a *APIService) DeleteUniqueKey(ctx context.Context, key string) error

func (*APIService) GetBatch added in v0.1.1

func (a *APIService) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, error)

func (*APIService) GetBatchItemResult added in v0.1.1

func (a *APIService) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)

func (*APIService) GetBatchResults added in v0.1.1

func (a *APIService) GetBatchResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)

func (*APIService) GetDailyStats added in v0.1.1

func (a *APIService) GetDailyStats(ctx context.Context, days int) ([]DailyStatsEntry, error)

func (*APIService) GetJob added in v0.1.1

func (a *APIService) GetJob(ctx context.Context, jobID string) (*types.Job, error)

func (*APIService) GetJobActiveRuns added in v0.1.1

func (a *APIService) GetJobActiveRuns(ctx context.Context, jobID string) ([]*types.JobRun, error)

func (*APIService) GetJobAuditCounts added in v0.1.3

func (a *APIService) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)

GetJobAuditCounts returns audit trail entry counts for multiple jobs.

func (*APIService) GetJobAuditTrail added in v0.1.3

func (a *APIService) GetJobAuditTrail(ctx context.Context, jobID string) ([]store.AuditEntry, error)

func (*APIService) GetNode added in v0.1.1

func (a *APIService) GetNode(ctx context.Context, nodeID string) (*types.NodeInfo, error)

func (*APIService) GetRedisInfo added in v0.1.1

func (a *APIService) GetRedisInfo(ctx context.Context, section string) (*RedisInfoResponse, error)

func (*APIService) GetRun added in v0.1.1

func (a *APIService) GetRun(ctx context.Context, runID string) (*types.JobRun, error)

func (*APIService) GetSchedule added in v0.1.1

func (a *APIService) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, error)

func (*APIService) GetStats added in v0.1.1

func (a *APIService) GetStats(ctx context.Context) (*StatsResponse, error)

func (*APIService) GetSyncRetries added in v0.1.1

func (a *APIService) GetSyncRetries() any

func (*APIService) GetWorkflow added in v0.1.1

func (a *APIService) GetWorkflow(ctx context.Context, wfID string) (*types.WorkflowInstance, error)

func (*APIService) GetWorkflowAuditTrail added in v0.1.3

func (a *APIService) GetWorkflowAuditTrail(ctx context.Context, wfID string) ([]WorkflowAuditEntry, error)

GetWorkflowAuditTrail aggregates audit entries from all task jobs in a workflow.

func (*APIService) GetWorkflowTaskResult added in v0.1.10

func (a *APIService) GetWorkflowTaskResult(ctx context.Context, wfID, taskName string) (json.RawMessage, error)

func (*APIService) Hub added in v0.1.1

func (a *APIService) Hub() Hub

func (*APIService) IsNodeDraining added in v0.1.3

func (a *APIService) IsNodeDraining(ctx context.Context, nodeID string) (bool, error)

func (*APIService) ListActiveRuns added in v0.1.1

func (a *APIService) ListActiveRuns(ctx context.Context) ([]*types.JobRun, error)

func (*APIService) ListBatches added in v0.1.1

func (a *APIService) ListBatches(ctx context.Context, filter store.BatchFilter) ([]types.BatchInstance, int, error)

func (*APIService) ListDLQ added in v0.1.1

func (a *APIService) ListDLQ(ctx context.Context, limit, offset int) ([]*types.WorkMessage, int, error)

func (*APIService) ListGroups added in v0.1.1

func (a *APIService) ListGroups(ctx context.Context) ([]GroupInfo, error)

func (*APIService) ListHistoryEvents added in v0.1.1

func (a *APIService) ListHistoryEvents(ctx context.Context, filter store.EventFilter) ([]types.JobEvent, int, error)

func (*APIService) ListHistoryRuns added in v0.1.1

func (a *APIService) ListHistoryRuns(ctx context.Context, filter store.RunFilter) ([]types.JobRun, int, error)

func (*APIService) ListJobEvents added in v0.1.1

func (a *APIService) ListJobEvents(ctx context.Context, filter store.EventFilter) ([]types.JobEvent, int, error)

func (*APIService) ListJobRuns added in v0.1.1

func (a *APIService) ListJobRuns(ctx context.Context, filter store.RunFilter) ([]types.JobRun, int, error)

func (*APIService) ListJobs added in v0.1.1

func (a *APIService) ListJobs(ctx context.Context, filter store.JobFilter) ([]types.Job, int, error)

func (*APIService) ListNodes added in v0.1.1

func (a *APIService) ListNodes(ctx context.Context) ([]*types.NodeInfo, error)

func (*APIService) ListQueues added in v0.1.1

func (a *APIService) ListQueues(ctx context.Context) ([]QueueInfo, error)

func (*APIService) ListSchedules added in v0.1.1

func (a *APIService) ListSchedules(ctx context.Context, filter store.ScheduleFilter) ([]types.ScheduleEntry, int, error)

func (*APIService) ListWorkflows added in v0.1.1

func (a *APIService) ListWorkflows(ctx context.Context, filter store.WorkflowFilter) ([]types.WorkflowInstance, int, error)

func (*APIService) Logger added in v0.1.1

func (a *APIService) Logger() gochainedlog.Logger

func (*APIService) PauseQueue added in v0.1.1

func (a *APIService) PauseQueue(ctx context.Context, tierName string) error

func (*APIService) ResumeQueue added in v0.1.1

func (a *APIService) ResumeQueue(ctx context.Context, tierName string) error

func (*APIService) ResumeWorkflow added in v0.1.10

func (a *APIService) ResumeWorkflow(ctx context.Context, wfID string) error

func (*APIService) RetryBatch added in v0.1.1

func (a *APIService) RetryBatch(ctx context.Context, batchID string, retryFailedOnly bool) error

func (*APIService) RetryJob added in v0.1.1

func (a *APIService) RetryJob(ctx context.Context, jobID string) error

func (*APIService) RetryWorkflow added in v0.1.1

func (a *APIService) RetryWorkflow(ctx context.Context, wfID string) error

func (*APIService) SearchBatchesByPayload added in v0.1.2

func (a *APIService) SearchBatchesByPayload(ctx context.Context, path, value string) (*types.BatchInstance, error)

func (*APIService) SearchJobsByPayload added in v0.1.2

func (a *APIService) SearchJobsByPayload(ctx context.Context, path, value, taskType string) (*types.Job, error)

func (*APIService) SearchWorkflowsByPayload added in v0.1.2

func (a *APIService) SearchWorkflowsByPayload(ctx context.Context, path, value string) (*types.WorkflowInstance, error)

func (*APIService) SetNodeDrain added in v0.1.3

func (a *APIService) SetNodeDrain(ctx context.Context, nodeID string, drain bool) error

func (*APIService) SetSyncRetryStatsFunc added in v0.1.1

func (a *APIService) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)

func (*APIService) Shutdown added in v0.1.1

func (a *APIService) Shutdown()

func (*APIService) Store added in v0.1.1

func (a *APIService) Store() *store.RedisStore

func (*APIService) SuspendWorkflow added in v0.1.10

func (a *APIService) SuspendWorkflow(ctx context.Context, wfID string) error

func (*APIService) UpdateJobPayload added in v0.1.1

func (a *APIService) UpdateJobPayload(ctx context.Context, jobID string, payload json.RawMessage) error

func (*APIService) ValidateWorkflowDefinition added in v0.1.10

func (a *APIService) ValidateWorkflowDefinition(_ context.Context, def *types.WorkflowDefinition) *ValidateResult

type ApiError added in v0.1.1

type ApiError struct {
	Msg        string
	StatusCode int
}

ApiError is a service-layer error that carries an HTTP status code.

func (*ApiError) Error added in v0.1.1

func (e *ApiError) Error() string

type BulkRequest added in v0.1.1

type BulkRequest struct {
	IDs    []string `json:"ids"`
	Status string   `json:"status"`
}

BulkRequest is the common shape for bulk operation requests.

type BulkResult added in v0.1.3

type BulkResult struct {
	Affected int `json:"affected"`
}

BulkResult holds the count of affected entities.

type DailyStatsEntry

type DailyStatsEntry struct {
	Date      string `json:"date"`
	Processed int64  `json:"processed"`
	Failed    int64  `json:"failed"`
}

DailyStatsEntry represents aggregated stats for one day.

type Dispatcher

type Dispatcher interface {
	Dispatch(ctx context.Context, job *types.Job, attempt int) error
	PublishEvent(event types.JobEvent)
}

Dispatcher is the subset of *dispatcher.Dispatcher used by the service layer.

type GRPCServer

type GRPCServer struct {
	pb.UnimplementedMonitorServiceServer
	// contains filtered or unexported fields
}

GRPCServer implements the MonitorService gRPC service.

func NewGRPCServer

func NewGRPCServer(svc *APIService) *GRPCServer

NewGRPCServer creates a new gRPC monitor server backed by an APIService.

func (*GRPCServer) BulkCancelBatches

BulkCancelBatches cancels multiple batches by ID or status filter.

func (*GRPCServer) BulkCancelJobs

BulkCancelJobs cancels multiple jobs by ID or status filter.

func (*GRPCServer) BulkCancelWorkflows

BulkCancelWorkflows cancels multiple workflows by ID or status filter.

func (*GRPCServer) BulkDeleteBatches

BulkDeleteBatches deletes multiple batches by ID or status filter.

func (*GRPCServer) BulkDeleteJobs

BulkDeleteJobs deletes multiple jobs by ID or status filter.

func (*GRPCServer) BulkDeleteWorkflows

BulkDeleteWorkflows deletes multiple workflows by ID or status filter.

func (*GRPCServer) BulkRetryBatches

BulkRetryBatches retries multiple batches by ID or status filter.

func (*GRPCServer) BulkRetryJobs

BulkRetryJobs retries multiple jobs by ID or status filter.

func (*GRPCServer) BulkRetryWorkflows

BulkRetryWorkflows retries multiple workflows by ID or status filter.

func (*GRPCServer) CancelBatch

CancelBatch cancels a batch and all its non-terminal items and child jobs.

func (*GRPCServer) CancelJob

func (g *GRPCServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error)

CancelJob transitions a job to cancelled, removes its schedule, publishes cancel signals to in-flight workers, and emits a JobCancelled event.

func (*GRPCServer) CancelWorkflow

CancelWorkflow cancels a workflow and all its non-terminal tasks and child jobs.

func (*GRPCServer) DeleteJob

func (g *GRPCServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error)

DeleteJob removes a job and its schedule.

func (*GRPCServer) GetBatch

func (g *GRPCServer) GetBatch(ctx context.Context, req *pb.GetBatchRequest) (*pb.GetBatchResponse, error)

GetBatch retrieves a single batch instance by ID.

func (*GRPCServer) GetBatchItemResult

GetBatchItemResult retrieves a single batch item result.

func (*GRPCServer) GetBatchResults

GetBatchResults returns all item results for a batch.

func (*GRPCServer) GetDailyStats

GetDailyStats returns aggregated daily processing statistics.

func (*GRPCServer) GetJob

func (g *GRPCServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.GetJobResponse, error)

GetJob returns a single job by ID.

func (*GRPCServer) GetNode

func (g *GRPCServer) GetNode(ctx context.Context, req *pb.GetNodeRequest) (*pb.GetNodeResponse, error)

GetNode returns a single node by ID.

func (*GRPCServer) GetRedisInfo

GetRedisInfo returns parsed Redis INFO output organized by section.

func (*GRPCServer) GetRun

func (g *GRPCServer) GetRun(ctx context.Context, req *pb.GetRunRequest) (*pb.GetRunResponse, error)

GetRun returns a single run by ID.

func (*GRPCServer) GetSchedule

GetSchedule returns a single schedule entry by job ID.

func (*GRPCServer) GetStats

GetStats returns overall system statistics including job counts, active schedules, runs, and nodes.

func (*GRPCServer) GetSyncRetries

GetSyncRetries returns the current SyncRetrier statistics as an opaque JSON blob.

func (*GRPCServer) GetWorkflow

GetWorkflow retrieves a single workflow instance by ID.

func (*GRPCServer) Health

Health returns a simple health check response.

func (*GRPCServer) ListBatches

ListBatches returns paginated batch instances with optional status filter.

func (*GRPCServer) ListDLQ

func (g *GRPCServer) ListDLQ(ctx context.Context, req *pb.ListDLQRequest) (*pb.ListDLQResponse, error)

ListDLQ returns paginated dead-letter queue messages.

func (*GRPCServer) ListGroups

ListGroups returns all active aggregation groups with their sizes.

func (*GRPCServer) ListHistoryEvents

ListHistoryEvents returns paginated historical events with optional job_id filter.

func (*GRPCServer) ListHistoryRuns

ListHistoryRuns returns paginated historical runs with optional status and job_id filters.

func (*GRPCServer) ListJobEvents

ListJobEvents returns recent events for a specific job.

func (*GRPCServer) ListJobRuns

ListJobRuns returns recent runs for a specific job.

func (*GRPCServer) ListJobs

func (g *GRPCServer) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)

ListJobs returns a paginated list of jobs, optionally filtered by status, task type, tag, or search term.

func (*GRPCServer) ListNodes

ListNodes returns all registered server nodes.

func (*GRPCServer) ListQueues

ListQueues returns queue/tier information including pause state and size.

func (*GRPCServer) ListRuns

ListRuns returns all active runs.

func (*GRPCServer) ListSchedules

ListSchedules returns a paginated list of schedule entries.

func (*GRPCServer) ListWorkflows

ListWorkflows returns paginated workflow instances with optional status filter.

func (*GRPCServer) PauseQueue

PauseQueue pauses a queue/tier by name.

func (*GRPCServer) RegisterServer

func (g *GRPCServer) RegisterServer(s *grpc.Server)

RegisterServer registers the MonitorService implementation with a gRPC server.

func (*GRPCServer) ResumeQueue

ResumeQueue resumes a paused queue/tier by name.

func (*GRPCServer) RetryBatch

RetryBatch resets a batch for a new execution attempt, optionally retrying only failed items.

func (*GRPCServer) RetryJob

func (g *GRPCServer) RetryJob(ctx context.Context, req *pb.RetryJobRequest) (*pb.RetryJobResponse, error)

RetryJob resets a job to pending, clears its attempt counter and error, dispatches it for immediate execution, and emits a JobRetrying event.

func (*GRPCServer) RetryWorkflow

RetryWorkflow resets a workflow and all its tasks for a new execution attempt.

func (*GRPCServer) UpdateJobPayload

UpdateJobPayload replaces the payload of a pending or scheduled job.

type GroupInfo added in v0.1.1

type GroupInfo struct {
	Name string `json:"name"`
	Size int64  `json:"size"`
}

GroupInfo describes a single aggregation group.

type Hub

type Hub interface {
	Register(c *WSClient)
	Unregister(c *WSClient)

	Broadcast(data []byte)
	ClientCount() int
	SubscribeRedis(ctx context.Context, rdb rueidis.Client, channel string)
}

Hub manages WebSocket client connections and broadcasts Redis Pub/Sub events. A single Redis subscription feeds all connected WebSocket clients (fan-out).

type QueueInfo

type QueueInfo struct {
	Name       string  `json:"name"`
	Weight     int     `json:"weight"`
	FetchBatch int     `json:"fetch_batch"`
	RateLimit  float64 `json:"rate_limit,omitempty"`
	RateBurst  int     `json:"rate_burst,omitempty"`
	Paused     bool    `json:"paused"`
	Size       int64   `json:"size"`
}

QueueInfo describes the state of a single queue/tier.

type RedisInfoResponse added in v0.1.10

type RedisInfoResponse struct {
	Mode  string          `json:"mode"` // "cluster" or "standalone"
	Nodes []RedisNodeInfo `json:"nodes"`
}

RedisInfoResponse holds Redis INFO for all nodes with mode indicator.

type RedisNodeInfo added in v0.1.10

type RedisNodeInfo struct {
	Addr     string                       `json:"addr"`
	Role     string                       `json:"role"`
	Sections map[string]map[string]string `json:"sections"`
}

RedisNodeInfo holds parsed INFO output for a single Redis node.

type StatsResponse added in v0.1.1

type StatsResponse struct {
	JobCounts       map[types.JobStatus]int      `json:"job_counts"`
	ActiveSchedules int                          `json:"active_schedules"`
	ActiveRuns      int                          `json:"active_runs"`
	ActiveNodes     int                          `json:"active_nodes"`
	WorkflowCounts  map[types.WorkflowStatus]int `json:"workflow_counts,omitempty"`
	BatchCounts     map[types.WorkflowStatus]int `json:"batch_counts,omitempty"`
	ActiveWorkflows int                          `json:"active_workflows"`
	ActiveBatches   int                          `json:"active_batches"`
}

StatsResponse holds overall system statistics.

type SyncRetryStatsFunc

type SyncRetryStatsFunc func() any

SyncRetryStatsFunc is a function that returns the current SyncRetrier stats as a JSON-serializable value. Set via SetSyncRetryStatsFunc.

type UniqueKeyResult added in v0.1.3

type UniqueKeyResult struct {
	Exists bool   `json:"exists"`
	JobID  string `json:"job_id"`
}

type ValidateResult added in v0.1.10

type ValidateResult struct {
	Valid  bool     `json:"valid"`
	Errors []string `json:"errors,omitempty"`
}

ValidateResult holds the result of a workflow definition validation.

type WSClient added in v0.1.1

type WSClient struct {
	Send chan []byte
}

WSClient represents a single WebSocket connection registered with the Hub.

func NewWSClient added in v0.1.1

func NewWSClient() *WSClient

NewWSClient creates a new WebSocket client with a buffered send channel.

type WorkflowAuditEntry added in v0.1.3

type WorkflowAuditEntry struct {
	TaskName  string    `json:"task_name"`
	JobID     string    `json:"job_id"`
	ID        string    `json:"id"`
	Status    string    `json:"status"`
	Error     string    `json:"error,omitempty"`
	Timestamp time.Time `json:"timestamp"`
}

WorkflowAuditEntry is an audit entry annotated with task name and job ID.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL