monitor

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 24, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type API

type API struct {
	// contains filtered or unexported fields
}

API serves the monitoring HTTP endpoints for the React UI.

func NewAPI

func NewAPI(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *API

NewAPI creates a new monitoring API backed by Redis.

func (*API) Handler

func (a *API) Handler() http.Handler

Handler returns the HTTP handler for mounting in a server.

func (*API) SetSyncRetryStatsFunc

func (a *API) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)

SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.

func (*API) Shutdown

func (a *API) Shutdown()

Shutdown stops the WebSocket hub's Redis subscription.

type DailyStatsEntry

type DailyStatsEntry struct {
	Date      string `json:"date"`
	Processed int64  `json:"processed"`
	Failed    int64  `json:"failed"`
}

DailyStatsEntry represents aggregated stats for one day.

type Dispatcher

type Dispatcher interface {
	Dispatch(ctx context.Context, job *types.Job, attempt int) error
	PublishEvent(event types.JobEvent)
}

Dispatcher abstracts job dispatch and event publishing for the monitor API. v1server provides *dispatcher.Dispatcher; v2 actor server provides actorDispatcher.

type GRPCServer

type GRPCServer struct {
	pb.UnimplementedMonitorServiceServer
	// contains filtered or unexported fields
}

GRPCServer implements the MonitorService gRPC service.

func NewGRPCServer

func NewGRPCServer(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *GRPCServer

NewGRPCServer creates a new gRPC monitor server backed by Redis.

func (*GRPCServer) BulkCancelBatches

BulkCancelBatches cancels multiple batches by ID or status filter.

func (*GRPCServer) BulkCancelJobs

BulkCancelJobs cancels multiple jobs by ID or status filter.

func (*GRPCServer) BulkCancelWorkflows

BulkCancelWorkflows cancels multiple workflows by ID or status filter.

func (*GRPCServer) BulkDeleteBatches

BulkDeleteBatches deletes multiple batches by ID or status filter.

func (*GRPCServer) BulkDeleteJobs

BulkDeleteJobs deletes multiple jobs by ID or status filter.

func (*GRPCServer) BulkDeleteWorkflows

BulkDeleteWorkflows deletes multiple workflows by ID or status filter.

func (*GRPCServer) BulkRetryBatches

BulkRetryBatches retries multiple batches by ID or status filter.

func (*GRPCServer) BulkRetryJobs

BulkRetryJobs retries multiple jobs by ID or status filter.

func (*GRPCServer) BulkRetryWorkflows

BulkRetryWorkflows retries multiple workflows by ID or status filter.

func (*GRPCServer) CancelBatch

CancelBatch cancels a batch and all its non-terminal items and child jobs.

func (*GRPCServer) CancelJob

func (g *GRPCServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error)

CancelJob transitions a job to cancelled, removes its schedule, publishes cancel signals to in-flight workers, and emits a JobCancelled event.

func (*GRPCServer) CancelWorkflow

CancelWorkflow cancels a workflow and all its non-terminal tasks and child jobs.

func (*GRPCServer) DeleteJob

func (g *GRPCServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error)

DeleteJob removes a job and its schedule.

func (*GRPCServer) GetBatch

func (g *GRPCServer) GetBatch(ctx context.Context, req *pb.GetBatchRequest) (*pb.GetBatchResponse, error)

GetBatch retrieves a single batch instance by ID.

func (*GRPCServer) GetBatchItemResult

GetBatchItemResult retrieves a single batch item result.

func (*GRPCServer) GetBatchResults

GetBatchResults returns all item results for a batch.

func (*GRPCServer) GetDailyStats

GetDailyStats returns aggregated daily processing statistics.

func (*GRPCServer) GetJob

func (g *GRPCServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.GetJobResponse, error)

GetJob returns a single job by ID.

func (*GRPCServer) GetNode

func (g *GRPCServer) GetNode(ctx context.Context, req *pb.GetNodeRequest) (*pb.GetNodeResponse, error)

GetNode returns a single node by ID.

func (*GRPCServer) GetRedisInfo

GetRedisInfo returns parsed Redis INFO output organized by section.

func (*GRPCServer) GetRun

func (g *GRPCServer) GetRun(ctx context.Context, req *pb.GetRunRequest) (*pb.GetRunResponse, error)

GetRun returns a single run by ID.

func (*GRPCServer) GetSchedule

GetSchedule returns a single schedule entry by job ID.

func (*GRPCServer) GetStats

GetStats returns overall system statistics including job counts, active schedules, runs, and nodes.

func (*GRPCServer) GetSyncRetries

GetSyncRetries returns the current SyncRetrier statistics as an opaque JSON blob.

func (*GRPCServer) GetWorkflow

GetWorkflow retrieves a single workflow instance by ID.

func (*GRPCServer) Health

Health returns a simple health check response.

func (*GRPCServer) ListBatches

ListBatches returns paginated batch instances with optional status filter.

func (*GRPCServer) ListDLQ

func (g *GRPCServer) ListDLQ(ctx context.Context, req *pb.ListDLQRequest) (*pb.ListDLQResponse, error)

ListDLQ returns paginated dead-letter queue messages.

func (*GRPCServer) ListGroups

ListGroups returns all active aggregation groups with their sizes.

func (*GRPCServer) ListHistoryEvents

ListHistoryEvents returns paginated historical events with optional job_id filter.

func (*GRPCServer) ListHistoryRuns

ListHistoryRuns returns paginated historical runs with optional status and job_id filters.

func (*GRPCServer) ListJobEvents

ListJobEvents returns recent events for a specific job.

func (*GRPCServer) ListJobRuns

ListJobRuns returns recent runs for a specific job.

func (*GRPCServer) ListJobs

func (g *GRPCServer) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)

ListJobs returns a paginated list of jobs, optionally filtered by status, task type, tag, or search term.

func (*GRPCServer) ListNodes

ListNodes returns all registered server nodes.

func (*GRPCServer) ListQueues

ListQueues returns queue/tier information including pause state and size.

func (*GRPCServer) ListRuns

ListRuns returns all active runs.

func (*GRPCServer) ListSchedules

ListSchedules returns a paginated list of schedule entries.

func (*GRPCServer) ListWorkflows

ListWorkflows returns paginated workflow instances with optional status filter.

func (*GRPCServer) PauseQueue

PauseQueue pauses a queue/tier by name.

func (*GRPCServer) RegisterServer

func (g *GRPCServer) RegisterServer(s *grpc.Server)

RegisterServer registers the MonitorService implementation with a gRPC server.

func (*GRPCServer) ResumeQueue

ResumeQueue resumes a paused queue/tier by name.

func (*GRPCServer) RetryBatch

RetryBatch resets a batch for a new execution attempt, optionally retrying only failed items.

func (*GRPCServer) RetryJob

func (g *GRPCServer) RetryJob(ctx context.Context, req *pb.RetryJobRequest) (*pb.RetryJobResponse, error)

RetryJob resets a job to pending, clears its attempt counter and error, dispatches it for immediate execution, and emits a JobRetrying event.

func (*GRPCServer) RetryWorkflow

RetryWorkflow resets a workflow and all its tasks for a new execution attempt.

func (*GRPCServer) SetSyncRetryStatsFunc

func (g *GRPCServer) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)

SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.

func (*GRPCServer) UpdateJobPayload

UpdateJobPayload replaces the payload of a pending or scheduled job.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub manages WebSocket client connections and broadcasts Redis Pub/Sub events. A single Redis subscription feeds all connected WebSocket clients (fan-out).

type QueueInfo

type QueueInfo struct {
	Name       string `json:"name"`
	Weight     int    `json:"weight"`
	FetchBatch int    `json:"fetch_batch"`
	Paused     bool   `json:"paused"`
	Size       int64  `json:"size"`
}

QueueInfo describes the state of a single queue/tier.

type SyncRetryStatsFunc

type SyncRetryStatsFunc func() any

SyncRetryStatsFunc is a function that returns the current SyncRetrier stats as a JSON-serializable value. Set via SetSyncRetryStatsFunc.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL