monitor

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func HandleWebSocket added in v0.1.1

func HandleWebSocket(hub Hub, logger gochainedlog.Logger) http.HandlerFunc

HandleWebSocket returns an http.HandlerFunc that upgrades connections to WebSocket and streams real-time events from the Hub.

Types

type API

type API struct {
	// contains filtered or unexported fields
}

API serves the monitoring HTTP endpoints for the React UI. It is a thin wrapper around APIService that handles HTTP parameter parsing and JSON response formatting.

func NewAPI

func NewAPI(svc *APIService) *API

NewAPI creates a new monitoring HTTP API backed by an APIService.

func (*API) Handler

func (a *API) Handler() http.Handler

Handler returns the HTTP handler for mounting in a server.

func (*API) Shutdown

func (a *API) Shutdown()

Shutdown stops the WebSocket hub's Redis subscription.

type APIService added in v0.1.1

type APIService struct {
	// contains filtered or unexported fields
}

APIService is a transport-agnostic service layer for the monitor API. It encapsulates all business logic without any HTTP dependency. Users can build custom handlers on top of this service.

func NewAPIService added in v0.1.1

func NewAPIService(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *APIService

NewAPIService creates a new monitoring service backed by Redis.

func (*APIService) BulkCancelBatches added in v0.1.1

func (a *APIService) BulkCancelBatches(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkCancelJobs added in v0.1.1

func (a *APIService) BulkCancelJobs(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkCancelWorkflows added in v0.1.1

func (a *APIService) BulkCancelWorkflows(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkDeleteBatches added in v0.1.1

func (a *APIService) BulkDeleteBatches(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkDeleteJobs added in v0.1.1

func (a *APIService) BulkDeleteJobs(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkDeleteWorkflows added in v0.1.1

func (a *APIService) BulkDeleteWorkflows(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkRetryBatches added in v0.1.1

func (a *APIService) BulkRetryBatches(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkRetryJobs added in v0.1.1

func (a *APIService) BulkRetryJobs(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) BulkRetryWorkflows added in v0.1.1

func (a *APIService) BulkRetryWorkflows(ctx context.Context, req BulkRequest) (int, error)

func (*APIService) CancelBatch added in v0.1.1

func (a *APIService) CancelBatch(ctx context.Context, batchID string) error

func (*APIService) CancelJob added in v0.1.1

func (a *APIService) CancelJob(ctx context.Context, jobID string) error

func (*APIService) CancelWorkflow added in v0.1.1

func (a *APIService) CancelWorkflow(ctx context.Context, wfID string) error

func (*APIService) DeleteJob added in v0.1.1

func (a *APIService) DeleteJob(ctx context.Context, jobID string) error

func (*APIService) GetBatch added in v0.1.1

func (a *APIService) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, error)

func (*APIService) GetBatchItemResult added in v0.1.1

func (a *APIService) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)

func (*APIService) GetBatchResults added in v0.1.1

func (a *APIService) GetBatchResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)

func (*APIService) GetDailyStats added in v0.1.1

func (a *APIService) GetDailyStats(ctx context.Context, days int) ([]DailyStatsEntry, error)

func (*APIService) GetJob added in v0.1.1

func (a *APIService) GetJob(ctx context.Context, jobID string) (*types.Job, uint64, error)

func (*APIService) GetJobActiveRuns added in v0.1.1

func (a *APIService) GetJobActiveRuns(ctx context.Context, jobID string) ([]*types.JobRun, error)

func (*APIService) GetNode added in v0.1.1

func (a *APIService) GetNode(ctx context.Context, nodeID string) (*types.NodeInfo, error)

func (*APIService) GetRedisInfo added in v0.1.1

func (a *APIService) GetRedisInfo(ctx context.Context, section string) (map[string]map[string]string, error)

func (*APIService) GetRun added in v0.1.1

func (a *APIService) GetRun(ctx context.Context, runID string) (*types.JobRun, error)

func (*APIService) GetSchedule added in v0.1.1

func (a *APIService) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, error)

func (*APIService) GetStats added in v0.1.1

func (a *APIService) GetStats(ctx context.Context) (*StatsResponse, error)

func (*APIService) GetSyncRetries added in v0.1.1

func (a *APIService) GetSyncRetries() any

func (*APIService) GetWorkflow added in v0.1.1

func (a *APIService) GetWorkflow(ctx context.Context, wfID string) (*types.WorkflowInstance, error)

func (*APIService) Hub added in v0.1.1

func (a *APIService) Hub() Hub

Hub returns the WebSocket hub for real-time event broadcasting.

func (*APIService) ListActiveRuns added in v0.1.1

func (a *APIService) ListActiveRuns(ctx context.Context) ([]*types.JobRun, error)

func (*APIService) ListBatches added in v0.1.1

func (a *APIService) ListBatches(ctx context.Context, filter store.BatchFilter) ([]types.BatchInstance, int, error)

func (*APIService) ListDLQ added in v0.1.1

func (a *APIService) ListDLQ(ctx context.Context, limit, offset int) ([]*types.WorkMessage, int, error)

func (*APIService) ListGroups added in v0.1.1

func (a *APIService) ListGroups(ctx context.Context) ([]GroupInfo, error)

func (*APIService) ListHistoryEvents added in v0.1.1

func (a *APIService) ListHistoryEvents(ctx context.Context, filter store.EventFilter) ([]types.JobEvent, int, error)

func (*APIService) ListHistoryRuns added in v0.1.1

func (a *APIService) ListHistoryRuns(ctx context.Context, filter store.RunFilter) ([]types.JobRun, int, error)

func (*APIService) ListJobEvents added in v0.1.1

func (a *APIService) ListJobEvents(ctx context.Context, jobID string, limit int) ([]types.JobEvent, error)

func (*APIService) ListJobRuns added in v0.1.1

func (a *APIService) ListJobRuns(ctx context.Context, jobID string, limit int) ([]types.JobRun, error)

func (*APIService) ListJobs added in v0.1.1

func (a *APIService) ListJobs(ctx context.Context, filter store.JobFilter) ([]types.Job, int, error)

func (*APIService) ListNodes added in v0.1.1

func (a *APIService) ListNodes(ctx context.Context) ([]*types.NodeInfo, error)

func (*APIService) ListQueues added in v0.1.1

func (a *APIService) ListQueues(ctx context.Context) ([]QueueInfo, error)

func (*APIService) ListSchedules added in v0.1.1

func (a *APIService) ListSchedules(ctx context.Context, sort string, limit, offset int) ([]types.ScheduleEntry, int, error)

func (*APIService) ListWorkflows added in v0.1.1

func (a *APIService) ListWorkflows(ctx context.Context, filter store.WorkflowFilter) ([]types.WorkflowInstance, int, error)

func (*APIService) Logger added in v0.1.1

func (a *APIService) Logger() gochainedlog.Logger

Logger returns the service logger.

func (*APIService) PauseQueue added in v0.1.1

func (a *APIService) PauseQueue(ctx context.Context, tierName string) error

func (*APIService) ResumeQueue added in v0.1.1

func (a *APIService) ResumeQueue(ctx context.Context, tierName string) error

func (*APIService) RetryBatch added in v0.1.1

func (a *APIService) RetryBatch(ctx context.Context, batchID string, retryFailedOnly bool) error

func (*APIService) RetryJob added in v0.1.1

func (a *APIService) RetryJob(ctx context.Context, jobID string) error

func (*APIService) RetryWorkflow added in v0.1.1

func (a *APIService) RetryWorkflow(ctx context.Context, wfID string) error

func (*APIService) SetSyncRetryStatsFunc added in v0.1.1

func (a *APIService) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)

SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.

func (*APIService) Shutdown added in v0.1.1

func (a *APIService) Shutdown()

Shutdown stops the WebSocket hub's Redis subscription.

func (*APIService) Store added in v0.1.1

func (a *APIService) Store() *store.RedisStore

Store returns the underlying Redis store.

func (*APIService) UpdateJobPayload added in v0.1.1

func (a *APIService) UpdateJobPayload(ctx context.Context, jobID string, payload json.RawMessage) error

type ApiError added in v0.1.1

type ApiError struct {
	Msg        string `json:"error"`
	StatusCode int    `json:"-"`
}

ApiError is a service-layer error that carries an HTTP status code. Handlers can type-assert errors to *ApiError to extract the status code.

func (*ApiError) Error added in v0.1.1

func (e *ApiError) Error() string

type BulkRequest added in v0.1.1

type BulkRequest struct {
	// IDs specifies individual IDs to operate on.
	IDs []string `json:"ids"`
	// Status selects all entities in a given status (alternative to IDs).
	Status string `json:"status"`
}

BulkRequest is the common shape for bulk operation requests.

type DailyStatsEntry

type DailyStatsEntry struct {
	Date      string `json:"date"`
	Processed int64  `json:"processed"`
	Failed    int64  `json:"failed"`
}

DailyStatsEntry represents aggregated stats for one day.

type Dispatcher

type Dispatcher interface {
	Dispatch(ctx context.Context, job *types.Job, attempt int) error
	PublishEvent(event types.JobEvent)
}

Dispatcher abstracts job dispatch and event publishing for the monitor API. v1server provides *dispatcher.Dispatcher; v2 actor server provides actorDispatcher.

type GRPCServer

type GRPCServer struct {
	pb.UnimplementedMonitorServiceServer
	// contains filtered or unexported fields
}

GRPCServer implements the MonitorService gRPC service.

func NewGRPCServer

func NewGRPCServer(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *GRPCServer

NewGRPCServer creates a new gRPC monitor server backed by Redis.

func (*GRPCServer) BulkCancelBatches

BulkCancelBatches cancels multiple batches by ID or status filter.

func (*GRPCServer) BulkCancelJobs

BulkCancelJobs cancels multiple jobs by ID or status filter.

func (*GRPCServer) BulkCancelWorkflows

BulkCancelWorkflows cancels multiple workflows by ID or status filter.

func (*GRPCServer) BulkDeleteBatches

BulkDeleteBatches deletes multiple batches by ID or status filter.

func (*GRPCServer) BulkDeleteJobs

BulkDeleteJobs deletes multiple jobs by ID or status filter.

func (*GRPCServer) BulkDeleteWorkflows

BulkDeleteWorkflows deletes multiple workflows by ID or status filter.

func (*GRPCServer) BulkRetryBatches

BulkRetryBatches retries multiple batches by ID or status filter.

func (*GRPCServer) BulkRetryJobs

BulkRetryJobs retries multiple jobs by ID or status filter.

func (*GRPCServer) BulkRetryWorkflows

BulkRetryWorkflows retries multiple workflows by ID or status filter.

func (*GRPCServer) CancelBatch

CancelBatch cancels a batch and all its non-terminal items and child jobs.

func (*GRPCServer) CancelJob

func (g *GRPCServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error)

CancelJob transitions a job to cancelled, removes its schedule, publishes cancel signals to in-flight workers, and emits a JobCancelled event.

func (*GRPCServer) CancelWorkflow

CancelWorkflow cancels a workflow and all its non-terminal tasks and child jobs.

func (*GRPCServer) DeleteJob

func (g *GRPCServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error)

DeleteJob removes a job and its schedule.

func (*GRPCServer) GetBatch

func (g *GRPCServer) GetBatch(ctx context.Context, req *pb.GetBatchRequest) (*pb.GetBatchResponse, error)

GetBatch retrieves a single batch instance by ID.

func (*GRPCServer) GetBatchItemResult

GetBatchItemResult retrieves a single batch item result.

func (*GRPCServer) GetBatchResults

GetBatchResults returns all item results for a batch.

func (*GRPCServer) GetDailyStats

GetDailyStats returns aggregated daily processing statistics.

func (*GRPCServer) GetJob

func (g *GRPCServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.GetJobResponse, error)

GetJob returns a single job by ID.

func (*GRPCServer) GetNode

func (g *GRPCServer) GetNode(ctx context.Context, req *pb.GetNodeRequest) (*pb.GetNodeResponse, error)

GetNode returns a single node by ID.

func (*GRPCServer) GetRedisInfo

GetRedisInfo returns parsed Redis INFO output organized by section.

func (*GRPCServer) GetRun

func (g *GRPCServer) GetRun(ctx context.Context, req *pb.GetRunRequest) (*pb.GetRunResponse, error)

GetRun returns a single run by ID.

func (*GRPCServer) GetSchedule

GetSchedule returns a single schedule entry by job ID.

func (*GRPCServer) GetStats

GetStats returns overall system statistics including job counts, active schedules, runs, and nodes.

func (*GRPCServer) GetSyncRetries

GetSyncRetries returns the current SyncRetrier statistics as an opaque JSON blob.

func (*GRPCServer) GetWorkflow

GetWorkflow retrieves a single workflow instance by ID.

func (*GRPCServer) Health

Health returns a simple health check response.

func (*GRPCServer) ListBatches

ListBatches returns paginated batch instances with optional status filter.

func (*GRPCServer) ListDLQ

func (g *GRPCServer) ListDLQ(ctx context.Context, req *pb.ListDLQRequest) (*pb.ListDLQResponse, error)

ListDLQ returns paginated dead-letter queue messages.

func (*GRPCServer) ListGroups

ListGroups returns all active aggregation groups with their sizes.

func (*GRPCServer) ListHistoryEvents

ListHistoryEvents returns paginated historical events with optional job_id filter.

func (*GRPCServer) ListHistoryRuns

ListHistoryRuns returns paginated historical runs with optional status and job_id filters.

func (*GRPCServer) ListJobEvents

ListJobEvents returns recent events for a specific job.

func (*GRPCServer) ListJobRuns

ListJobRuns returns recent runs for a specific job.

func (*GRPCServer) ListJobs

func (g *GRPCServer) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)

ListJobs returns a paginated list of jobs, optionally filtered by status, task type, tag, or search term.

func (*GRPCServer) ListNodes

ListNodes returns all registered server nodes.

func (*GRPCServer) ListQueues

ListQueues returns queue/tier information including pause state and size.

func (*GRPCServer) ListRuns

ListRuns returns all active runs.

func (*GRPCServer) ListSchedules

ListSchedules returns a paginated list of schedule entries.

func (*GRPCServer) ListWorkflows

ListWorkflows returns paginated workflow instances with optional status filter.

func (*GRPCServer) PauseQueue

PauseQueue pauses a queue/tier by name.

func (*GRPCServer) RegisterServer

func (g *GRPCServer) RegisterServer(s *grpc.Server)

RegisterServer registers the MonitorService implementation with a gRPC server.

func (*GRPCServer) ResumeQueue

ResumeQueue resumes a paused queue/tier by name.

func (*GRPCServer) RetryBatch

RetryBatch resets a batch for a new execution attempt, optionally retrying only failed items.

func (*GRPCServer) RetryJob

func (g *GRPCServer) RetryJob(ctx context.Context, req *pb.RetryJobRequest) (*pb.RetryJobResponse, error)

RetryJob resets a job to pending, clears its attempt counter and error, dispatches it for immediate execution, and emits a JobRetrying event.

func (*GRPCServer) RetryWorkflow

RetryWorkflow resets a workflow and all its tasks for a new execution attempt.

func (*GRPCServer) SetSyncRetryStatsFunc

func (g *GRPCServer) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)

SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.

func (*GRPCServer) UpdateJobPayload

UpdateJobPayload replaces the payload of a pending or scheduled job.

type GroupInfo added in v0.1.1

type GroupInfo struct {
	Name string `json:"name"`
	Size int64  `json:"size"`
}

GroupInfo describes an active aggregation group.

type Hub

type Hub interface {
	Register(c *WSClient)
	Unregister(c *WSClient)

	Broadcast(data []byte)
	ClientCount() int
	SubscribeRedis(ctx context.Context, rdb rueidis.Client, channel string)
}

Hub manages WebSocket client connections and broadcasts Redis Pub/Sub events. A single Redis subscription feeds all connected WebSocket clients (fan-out).

type QueueInfo

type QueueInfo struct {
	Name       string `json:"name"`
	Weight     int    `json:"weight"`
	FetchBatch int    `json:"fetch_batch"`
	Paused     bool   `json:"paused"`
	Size       int64  `json:"size"`
}

QueueInfo describes the state of a single queue/tier.

type StatsResponse added in v0.1.1

type StatsResponse struct {
	JobCounts       map[types.JobStatus]int `json:"job_counts"`
	ActiveSchedules int                     `json:"active_schedules"`
	ActiveRuns      int                     `json:"active_runs"`
	ActiveNodes     int                     `json:"active_nodes"`
}

StatsResponse contains cluster-wide statistics.

type SyncRetryStatsFunc

type SyncRetryStatsFunc func() any

SyncRetryStatsFunc is a function that returns the current SyncRetrier stats as a JSON-serializable value. Set via SetSyncRetryStatsFunc.

type WSClient added in v0.1.1

type WSClient struct {
	Send chan []byte
}

WSClient represents a single WebSocket connection registered with the Hub.

func NewWSClient added in v0.1.1

func NewWSClient() *WSClient

NewWSClient creates a new WebSocket client with a buffered send channel.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL