Documentation
¶
Index ¶
- type API
- type DailyStatsEntry
- type Dispatcher
- type GRPCServer
- func (g *GRPCServer) BulkCancelBatches(ctx context.Context, req *pb.BulkCancelBatchesRequest) (*pb.BulkCancelBatchesResponse, error)
- func (g *GRPCServer) BulkCancelJobs(ctx context.Context, req *pb.BulkCancelJobsRequest) (*pb.BulkCancelJobsResponse, error)
- func (g *GRPCServer) BulkCancelWorkflows(ctx context.Context, req *pb.BulkCancelWorkflowsRequest) (*pb.BulkCancelWorkflowsResponse, error)
- func (g *GRPCServer) BulkDeleteBatches(ctx context.Context, req *pb.BulkDeleteBatchesRequest) (*pb.BulkDeleteBatchesResponse, error)
- func (g *GRPCServer) BulkDeleteJobs(ctx context.Context, req *pb.BulkDeleteJobsRequest) (*pb.BulkDeleteJobsResponse, error)
- func (g *GRPCServer) BulkDeleteWorkflows(ctx context.Context, req *pb.BulkDeleteWorkflowsRequest) (*pb.BulkDeleteWorkflowsResponse, error)
- func (g *GRPCServer) BulkRetryBatches(ctx context.Context, req *pb.BulkRetryBatchesRequest) (*pb.BulkRetryBatchesResponse, error)
- func (g *GRPCServer) BulkRetryJobs(ctx context.Context, req *pb.BulkRetryJobsRequest) (*pb.BulkRetryJobsResponse, error)
- func (g *GRPCServer) BulkRetryWorkflows(ctx context.Context, req *pb.BulkRetryWorkflowsRequest) (*pb.BulkRetryWorkflowsResponse, error)
- func (g *GRPCServer) CancelBatch(ctx context.Context, req *pb.CancelBatchRequest) (*pb.CancelBatchResponse, error)
- func (g *GRPCServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error)
- func (g *GRPCServer) CancelWorkflow(ctx context.Context, req *pb.CancelWorkflowRequest) (*pb.CancelWorkflowResponse, error)
- func (g *GRPCServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error)
- func (g *GRPCServer) GetBatch(ctx context.Context, req *pb.GetBatchRequest) (*pb.GetBatchResponse, error)
- func (g *GRPCServer) GetBatchItemResult(ctx context.Context, req *pb.GetBatchItemResultRequest) (*pb.GetBatchItemResultResponse, error)
- func (g *GRPCServer) GetBatchResults(ctx context.Context, req *pb.GetBatchResultsRequest) (*pb.GetBatchResultsResponse, error)
- func (g *GRPCServer) GetDailyStats(ctx context.Context, req *pb.GetDailyStatsRequest) (*pb.GetDailyStatsResponse, error)
- func (g *GRPCServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.GetJobResponse, error)
- func (g *GRPCServer) GetNode(ctx context.Context, req *pb.GetNodeRequest) (*pb.GetNodeResponse, error)
- func (g *GRPCServer) GetRedisInfo(ctx context.Context, req *pb.GetRedisInfoRequest) (*pb.GetRedisInfoResponse, error)
- func (g *GRPCServer) GetRun(ctx context.Context, req *pb.GetRunRequest) (*pb.GetRunResponse, error)
- func (g *GRPCServer) GetSchedule(ctx context.Context, req *pb.GetScheduleRequest) (*pb.GetScheduleResponse, error)
- func (g *GRPCServer) GetStats(ctx context.Context, _ *pb.GetStatsRequest) (*pb.GetStatsResponse, error)
- func (g *GRPCServer) GetSyncRetries(_ context.Context, _ *pb.GetSyncRetriesRequest) (*pb.GetSyncRetriesResponse, error)
- func (g *GRPCServer) GetWorkflow(ctx context.Context, req *pb.GetWorkflowRequest) (*pb.GetWorkflowResponse, error)
- func (g *GRPCServer) Health(_ context.Context, _ *pb.HealthRequest) (*pb.HealthResponse, error)
- func (g *GRPCServer) ListBatches(ctx context.Context, req *pb.ListBatchesRequest) (*pb.ListBatchesResponse, error)
- func (g *GRPCServer) ListDLQ(ctx context.Context, req *pb.ListDLQRequest) (*pb.ListDLQResponse, error)
- func (g *GRPCServer) ListGroups(ctx context.Context, _ *pb.ListGroupsRequest) (*pb.ListGroupsResponse, error)
- func (g *GRPCServer) ListHistoryEvents(ctx context.Context, req *pb.ListHistoryEventsRequest) (*pb.ListHistoryEventsResponse, error)
- func (g *GRPCServer) ListHistoryRuns(ctx context.Context, req *pb.ListHistoryRunsRequest) (*pb.ListHistoryRunsResponse, error)
- func (g *GRPCServer) ListJobEvents(ctx context.Context, req *pb.ListJobEventsRequest) (*pb.ListJobEventsResponse, error)
- func (g *GRPCServer) ListJobRuns(ctx context.Context, req *pb.ListJobRunsRequest) (*pb.ListJobRunsResponse, error)
- func (g *GRPCServer) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
- func (g *GRPCServer) ListNodes(ctx context.Context, _ *pb.ListNodesRequest) (*pb.ListNodesResponse, error)
- func (g *GRPCServer) ListQueues(ctx context.Context, _ *pb.ListQueuesRequest) (*pb.ListQueuesResponse, error)
- func (g *GRPCServer) ListRuns(ctx context.Context, _ *pb.ListRunsRequest) (*pb.ListRunsResponse, error)
- func (g *GRPCServer) ListSchedules(ctx context.Context, req *pb.ListSchedulesRequest) (*pb.ListSchedulesResponse, error)
- func (g *GRPCServer) ListWorkflows(ctx context.Context, req *pb.ListWorkflowsRequest) (*pb.ListWorkflowsResponse, error)
- func (g *GRPCServer) PauseQueue(ctx context.Context, req *pb.PauseQueueRequest) (*pb.PauseQueueResponse, error)
- func (g *GRPCServer) RegisterServer(s *grpc.Server)
- func (g *GRPCServer) ResumeQueue(ctx context.Context, req *pb.ResumeQueueRequest) (*pb.ResumeQueueResponse, error)
- func (g *GRPCServer) RetryBatch(ctx context.Context, req *pb.RetryBatchRequest) (*pb.RetryBatchResponse, error)
- func (g *GRPCServer) RetryJob(ctx context.Context, req *pb.RetryJobRequest) (*pb.RetryJobResponse, error)
- func (g *GRPCServer) RetryWorkflow(ctx context.Context, req *pb.RetryWorkflowRequest) (*pb.RetryWorkflowResponse, error)
- func (g *GRPCServer) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)
- func (g *GRPCServer) UpdateJobPayload(ctx context.Context, req *pb.UpdateJobPayloadRequest) (*pb.UpdateJobPayloadResponse, error)
- type Hub
- type QueueInfo
- type SyncRetryStatsFunc
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type API ¶
type API struct {
// contains filtered or unexported fields
}
API serves the monitoring HTTP endpoints for the React UI.
func NewAPI ¶
func NewAPI(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *API
NewAPI creates a new monitoring API backed by Redis.
func (*API) SetSyncRetryStatsFunc ¶
func (a *API) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)
SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.
type DailyStatsEntry ¶
type DailyStatsEntry struct {
Date string `json:"date"`
Processed int64 `json:"processed"`
Failed int64 `json:"failed"`
}
DailyStatsEntry represents aggregated stats for one day.
type Dispatcher ¶
type Dispatcher interface {
Dispatch(ctx context.Context, job *types.Job, attempt int) error
PublishEvent(event types.JobEvent)
}
Dispatcher abstracts job dispatch and event publishing for the monitor API. v1server provides *dispatcher.Dispatcher; v2 actor server provides actorDispatcher.
type GRPCServer ¶
type GRPCServer struct {
pb.UnimplementedMonitorServiceServer
// contains filtered or unexported fields
}
GRPCServer implements the MonitorService gRPC service.
func NewGRPCServer ¶
func NewGRPCServer(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *GRPCServer
NewGRPCServer creates a new gRPC monitor server backed by Redis.
func (*GRPCServer) BulkCancelBatches ¶
func (g *GRPCServer) BulkCancelBatches(ctx context.Context, req *pb.BulkCancelBatchesRequest) (*pb.BulkCancelBatchesResponse, error)
BulkCancelBatches cancels multiple batches by ID or status filter.
func (*GRPCServer) BulkCancelJobs ¶
func (g *GRPCServer) BulkCancelJobs(ctx context.Context, req *pb.BulkCancelJobsRequest) (*pb.BulkCancelJobsResponse, error)
BulkCancelJobs cancels multiple jobs by ID or status filter.
func (*GRPCServer) BulkCancelWorkflows ¶
func (g *GRPCServer) BulkCancelWorkflows(ctx context.Context, req *pb.BulkCancelWorkflowsRequest) (*pb.BulkCancelWorkflowsResponse, error)
BulkCancelWorkflows cancels multiple workflows by ID or status filter.
func (*GRPCServer) BulkDeleteBatches ¶
func (g *GRPCServer) BulkDeleteBatches(ctx context.Context, req *pb.BulkDeleteBatchesRequest) (*pb.BulkDeleteBatchesResponse, error)
BulkDeleteBatches deletes multiple batches by ID or status filter.
func (*GRPCServer) BulkDeleteJobs ¶
func (g *GRPCServer) BulkDeleteJobs(ctx context.Context, req *pb.BulkDeleteJobsRequest) (*pb.BulkDeleteJobsResponse, error)
BulkDeleteJobs deletes multiple jobs by ID or status filter.
func (*GRPCServer) BulkDeleteWorkflows ¶
func (g *GRPCServer) BulkDeleteWorkflows(ctx context.Context, req *pb.BulkDeleteWorkflowsRequest) (*pb.BulkDeleteWorkflowsResponse, error)
BulkDeleteWorkflows deletes multiple workflows by ID or status filter.
func (*GRPCServer) BulkRetryBatches ¶
func (g *GRPCServer) BulkRetryBatches(ctx context.Context, req *pb.BulkRetryBatchesRequest) (*pb.BulkRetryBatchesResponse, error)
BulkRetryBatches retries multiple batches by ID or status filter.
func (*GRPCServer) BulkRetryJobs ¶
func (g *GRPCServer) BulkRetryJobs(ctx context.Context, req *pb.BulkRetryJobsRequest) (*pb.BulkRetryJobsResponse, error)
BulkRetryJobs retries multiple jobs by ID or status filter.
func (*GRPCServer) BulkRetryWorkflows ¶
func (g *GRPCServer) BulkRetryWorkflows(ctx context.Context, req *pb.BulkRetryWorkflowsRequest) (*pb.BulkRetryWorkflowsResponse, error)
BulkRetryWorkflows retries multiple workflows by ID or status filter.
func (*GRPCServer) CancelBatch ¶
func (g *GRPCServer) CancelBatch(ctx context.Context, req *pb.CancelBatchRequest) (*pb.CancelBatchResponse, error)
CancelBatch cancels a batch and all its non-terminal items and child jobs.
func (*GRPCServer) CancelJob ¶
func (g *GRPCServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error)
CancelJob transitions a job to cancelled, removes its schedule, publishes cancel signals to in-flight workers, and emits a JobCancelled event.
func (*GRPCServer) CancelWorkflow ¶
func (g *GRPCServer) CancelWorkflow(ctx context.Context, req *pb.CancelWorkflowRequest) (*pb.CancelWorkflowResponse, error)
CancelWorkflow cancels a workflow and all its non-terminal tasks and child jobs.
func (*GRPCServer) DeleteJob ¶
func (g *GRPCServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error)
DeleteJob removes a job and its schedule.
func (*GRPCServer) GetBatch ¶
func (g *GRPCServer) GetBatch(ctx context.Context, req *pb.GetBatchRequest) (*pb.GetBatchResponse, error)
GetBatch retrieves a single batch instance by ID.
func (*GRPCServer) GetBatchItemResult ¶
func (g *GRPCServer) GetBatchItemResult(ctx context.Context, req *pb.GetBatchItemResultRequest) (*pb.GetBatchItemResultResponse, error)
GetBatchItemResult retrieves a single batch item result.
func (*GRPCServer) GetBatchResults ¶
func (g *GRPCServer) GetBatchResults(ctx context.Context, req *pb.GetBatchResultsRequest) (*pb.GetBatchResultsResponse, error)
GetBatchResults returns all item results for a batch.
func (*GRPCServer) GetDailyStats ¶
func (g *GRPCServer) GetDailyStats(ctx context.Context, req *pb.GetDailyStatsRequest) (*pb.GetDailyStatsResponse, error)
GetDailyStats returns aggregated daily processing statistics.
func (*GRPCServer) GetJob ¶
func (g *GRPCServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.GetJobResponse, error)
GetJob returns a single job by ID.
func (*GRPCServer) GetNode ¶
func (g *GRPCServer) GetNode(ctx context.Context, req *pb.GetNodeRequest) (*pb.GetNodeResponse, error)
GetNode returns a single node by ID.
func (*GRPCServer) GetRedisInfo ¶
func (g *GRPCServer) GetRedisInfo(ctx context.Context, req *pb.GetRedisInfoRequest) (*pb.GetRedisInfoResponse, error)
GetRedisInfo returns parsed Redis INFO output organized by section.
func (*GRPCServer) GetRun ¶
func (g *GRPCServer) GetRun(ctx context.Context, req *pb.GetRunRequest) (*pb.GetRunResponse, error)
GetRun returns a single run by ID.
func (*GRPCServer) GetSchedule ¶
func (g *GRPCServer) GetSchedule(ctx context.Context, req *pb.GetScheduleRequest) (*pb.GetScheduleResponse, error)
GetSchedule returns a single schedule entry by job ID.
func (*GRPCServer) GetStats ¶
func (g *GRPCServer) GetStats(ctx context.Context, _ *pb.GetStatsRequest) (*pb.GetStatsResponse, error)
GetStats returns overall system statistics including job counts, active schedules, runs, and nodes.
func (*GRPCServer) GetSyncRetries ¶
func (g *GRPCServer) GetSyncRetries(_ context.Context, _ *pb.GetSyncRetriesRequest) (*pb.GetSyncRetriesResponse, error)
GetSyncRetries returns the current SyncRetrier statistics as an opaque JSON blob.
func (*GRPCServer) GetWorkflow ¶
func (g *GRPCServer) GetWorkflow(ctx context.Context, req *pb.GetWorkflowRequest) (*pb.GetWorkflowResponse, error)
GetWorkflow retrieves a single workflow instance by ID.
func (*GRPCServer) Health ¶
func (g *GRPCServer) Health(_ context.Context, _ *pb.HealthRequest) (*pb.HealthResponse, error)
Health returns a simple health check response.
func (*GRPCServer) ListBatches ¶
func (g *GRPCServer) ListBatches(ctx context.Context, req *pb.ListBatchesRequest) (*pb.ListBatchesResponse, error)
ListBatches returns paginated batch instances with optional status filter.
func (*GRPCServer) ListDLQ ¶
func (g *GRPCServer) ListDLQ(ctx context.Context, req *pb.ListDLQRequest) (*pb.ListDLQResponse, error)
ListDLQ returns paginated dead-letter queue messages.
func (*GRPCServer) ListGroups ¶
func (g *GRPCServer) ListGroups(ctx context.Context, _ *pb.ListGroupsRequest) (*pb.ListGroupsResponse, error)
ListGroups returns all active aggregation groups with their sizes.
func (*GRPCServer) ListHistoryEvents ¶
func (g *GRPCServer) ListHistoryEvents(ctx context.Context, req *pb.ListHistoryEventsRequest) (*pb.ListHistoryEventsResponse, error)
ListHistoryEvents returns paginated historical events with optional job_id filter.
func (*GRPCServer) ListHistoryRuns ¶
func (g *GRPCServer) ListHistoryRuns(ctx context.Context, req *pb.ListHistoryRunsRequest) (*pb.ListHistoryRunsResponse, error)
ListHistoryRuns returns paginated historical runs with optional status and job_id filters.
func (*GRPCServer) ListJobEvents ¶
func (g *GRPCServer) ListJobEvents(ctx context.Context, req *pb.ListJobEventsRequest) (*pb.ListJobEventsResponse, error)
ListJobEvents returns recent events for a specific job.
func (*GRPCServer) ListJobRuns ¶
func (g *GRPCServer) ListJobRuns(ctx context.Context, req *pb.ListJobRunsRequest) (*pb.ListJobRunsResponse, error)
ListJobRuns returns recent runs for a specific job.
func (*GRPCServer) ListJobs ¶
func (g *GRPCServer) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
ListJobs returns a paginated list of jobs, optionally filtered by status, task type, tag, or search term.
func (*GRPCServer) ListNodes ¶
func (g *GRPCServer) ListNodes(ctx context.Context, _ *pb.ListNodesRequest) (*pb.ListNodesResponse, error)
ListNodes returns all registered server nodes.
func (*GRPCServer) ListQueues ¶
func (g *GRPCServer) ListQueues(ctx context.Context, _ *pb.ListQueuesRequest) (*pb.ListQueuesResponse, error)
ListQueues returns queue/tier information including pause state and size.
func (*GRPCServer) ListRuns ¶
func (g *GRPCServer) ListRuns(ctx context.Context, _ *pb.ListRunsRequest) (*pb.ListRunsResponse, error)
ListRuns returns all active runs.
func (*GRPCServer) ListSchedules ¶
func (g *GRPCServer) ListSchedules(ctx context.Context, req *pb.ListSchedulesRequest) (*pb.ListSchedulesResponse, error)
ListSchedules returns a paginated list of schedule entries.
func (*GRPCServer) ListWorkflows ¶
func (g *GRPCServer) ListWorkflows(ctx context.Context, req *pb.ListWorkflowsRequest) (*pb.ListWorkflowsResponse, error)
ListWorkflows returns paginated workflow instances with optional status filter.
func (*GRPCServer) PauseQueue ¶
func (g *GRPCServer) PauseQueue(ctx context.Context, req *pb.PauseQueueRequest) (*pb.PauseQueueResponse, error)
PauseQueue pauses a queue/tier by name.
func (*GRPCServer) RegisterServer ¶
func (g *GRPCServer) RegisterServer(s *grpc.Server)
RegisterServer registers the MonitorService implementation with a gRPC server.
func (*GRPCServer) ResumeQueue ¶
func (g *GRPCServer) ResumeQueue(ctx context.Context, req *pb.ResumeQueueRequest) (*pb.ResumeQueueResponse, error)
ResumeQueue resumes a paused queue/tier by name.
func (*GRPCServer) RetryBatch ¶
func (g *GRPCServer) RetryBatch(ctx context.Context, req *pb.RetryBatchRequest) (*pb.RetryBatchResponse, error)
RetryBatch resets a batch for a new execution attempt, optionally retrying only failed items.
func (*GRPCServer) RetryJob ¶
func (g *GRPCServer) RetryJob(ctx context.Context, req *pb.RetryJobRequest) (*pb.RetryJobResponse, error)
RetryJob resets a job to pending, clears its attempt counter and error, dispatches it for immediate execution, and emits a JobRetrying event.
func (*GRPCServer) RetryWorkflow ¶
func (g *GRPCServer) RetryWorkflow(ctx context.Context, req *pb.RetryWorkflowRequest) (*pb.RetryWorkflowResponse, error)
RetryWorkflow resets a workflow and all its tasks for a new execution attempt.
func (*GRPCServer) SetSyncRetryStatsFunc ¶
func (g *GRPCServer) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)
SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.
func (*GRPCServer) UpdateJobPayload ¶
func (g *GRPCServer) UpdateJobPayload(ctx context.Context, req *pb.UpdateJobPayloadRequest) (*pb.UpdateJobPayloadResponse, error)
UpdateJobPayload replaces the payload of a pending or scheduled job.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub manages WebSocket client connections and broadcasts Redis Pub/Sub events. A single Redis subscription feeds all connected WebSocket clients (fan-out).
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Weight int `json:"weight"`
FetchBatch int `json:"fetch_batch"`
Paused bool `json:"paused"`
Size int64 `json:"size"`
}
QueueInfo describes the state of a single queue/tier.
type SyncRetryStatsFunc ¶
type SyncRetryStatsFunc func() any
SyncRetryStatsFunc is a function that returns the current SyncRetrier stats as a JSON-serializable value. Set via SetSyncRetryStatsFunc.