Documentation
¶
Index ¶
- func HandleWebSocket(hub Hub, logger gochainedlog.Logger) http.HandlerFunc
- type API
- type APIService
- func (a *APIService) BulkCancelBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkCancelJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkCancelWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkDeleteBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkDeleteJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkDeleteWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkRetryBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkRetryJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) BulkRetryWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)
- func (a *APIService) CancelBatch(ctx context.Context, batchID string) error
- func (a *APIService) CancelJob(ctx context.Context, jobID string) error
- func (a *APIService) CancelWorkflow(ctx context.Context, wfID string) error
- func (a *APIService) CheckUniqueKey(ctx context.Context, key string) (*UniqueKeyResult, error)
- func (a *APIService) DeleteJob(ctx context.Context, jobID string) error
- func (a *APIService) DeleteUniqueKey(ctx context.Context, key string) error
- func (a *APIService) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, error)
- func (a *APIService) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)
- func (a *APIService) GetBatchResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
- func (a *APIService) GetDailyStats(ctx context.Context, days int) ([]DailyStatsEntry, error)
- func (a *APIService) GetJob(ctx context.Context, jobID string) (*types.Job, error)
- func (a *APIService) GetJobActiveRuns(ctx context.Context, jobID string) ([]*types.JobRun, error)
- func (a *APIService) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)
- func (a *APIService) GetJobAuditTrail(ctx context.Context, jobID string) ([]store.AuditEntry, error)
- func (a *APIService) GetNode(ctx context.Context, nodeID string) (*types.NodeInfo, error)
- func (a *APIService) GetRedisInfo(ctx context.Context, section string) (*RedisInfoResponse, error)
- func (a *APIService) GetRun(ctx context.Context, runID string) (*types.JobRun, error)
- func (a *APIService) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, error)
- func (a *APIService) GetStats(ctx context.Context) (*StatsResponse, error)
- func (a *APIService) GetSyncRetries() any
- func (a *APIService) GetWorkflow(ctx context.Context, wfID string) (*types.WorkflowInstance, error)
- func (a *APIService) GetWorkflowAuditTrail(ctx context.Context, wfID string) ([]WorkflowAuditEntry, error)
- func (a *APIService) GetWorkflowTaskResult(ctx context.Context, wfID, taskName string) (json.RawMessage, error)
- func (a *APIService) Hub() Hub
- func (a *APIService) IsNodeDraining(ctx context.Context, nodeID string) (bool, error)
- func (a *APIService) ListActiveRuns(ctx context.Context) ([]*types.JobRun, error)
- func (a *APIService) ListBatches(ctx context.Context, filter store.BatchFilter) ([]types.BatchInstance, int, error)
- func (a *APIService) ListDLQ(ctx context.Context, limit, offset int) ([]*types.WorkMessage, int, error)
- func (a *APIService) ListGroups(ctx context.Context) ([]GroupInfo, error)
- func (a *APIService) ListHistoryEvents(ctx context.Context, filter store.EventFilter) ([]types.JobEvent, int, error)
- func (a *APIService) ListHistoryRuns(ctx context.Context, filter store.RunFilter) ([]types.JobRun, int, error)
- func (a *APIService) ListJobEvents(ctx context.Context, filter store.EventFilter) ([]types.JobEvent, int, error)
- func (a *APIService) ListJobRuns(ctx context.Context, filter store.RunFilter) ([]types.JobRun, int, error)
- func (a *APIService) ListJobs(ctx context.Context, filter store.JobFilter) ([]types.Job, int, error)
- func (a *APIService) ListNodes(ctx context.Context) ([]*types.NodeInfo, error)
- func (a *APIService) ListQueues(ctx context.Context) ([]QueueInfo, error)
- func (a *APIService) ListSchedules(ctx context.Context, filter store.ScheduleFilter) ([]types.ScheduleEntry, int, error)
- func (a *APIService) ListWorkflows(ctx context.Context, filter store.WorkflowFilter) ([]types.WorkflowInstance, int, error)
- func (a *APIService) Logger() gochainedlog.Logger
- func (a *APIService) PauseQueue(ctx context.Context, tierName string) error
- func (a *APIService) ResumeQueue(ctx context.Context, tierName string) error
- func (a *APIService) ResumeWorkflow(ctx context.Context, wfID string) error
- func (a *APIService) RetryBatch(ctx context.Context, batchID string, retryFailedOnly bool) error
- func (a *APIService) RetryJob(ctx context.Context, jobID string) error
- func (a *APIService) RetryWorkflow(ctx context.Context, wfID string) error
- func (a *APIService) SearchBatchesByPayload(ctx context.Context, path, value string) (*types.BatchInstance, error)
- func (a *APIService) SearchJobsByPayload(ctx context.Context, path, value, taskType string) (*types.Job, error)
- func (a *APIService) SearchWorkflowsByPayload(ctx context.Context, path, value string) (*types.WorkflowInstance, error)
- func (a *APIService) SetNodeDrain(ctx context.Context, nodeID string, drain bool) error
- func (a *APIService) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)
- func (a *APIService) Shutdown()
- func (a *APIService) Store() *store.RedisStore
- func (a *APIService) SuspendWorkflow(ctx context.Context, wfID string) error
- func (a *APIService) UpdateJobPayload(ctx context.Context, jobID string, payload json.RawMessage) error
- func (a *APIService) ValidateWorkflowDefinition(_ context.Context, def *types.WorkflowDefinition) *ValidateResult
- type ApiError
- type BulkRequest
- type BulkResult
- type DailyStatsEntry
- type Dispatcher
- type GRPCServer
- func (g *GRPCServer) BulkCancelBatches(ctx context.Context, req *pb.BulkCancelBatchesRequest) (*pb.BulkCancelBatchesResponse, error)
- func (g *GRPCServer) BulkCancelJobs(ctx context.Context, req *pb.BulkCancelJobsRequest) (*pb.BulkCancelJobsResponse, error)
- func (g *GRPCServer) BulkCancelWorkflows(ctx context.Context, req *pb.BulkCancelWorkflowsRequest) (*pb.BulkCancelWorkflowsResponse, error)
- func (g *GRPCServer) BulkDeleteBatches(ctx context.Context, req *pb.BulkDeleteBatchesRequest) (*pb.BulkDeleteBatchesResponse, error)
- func (g *GRPCServer) BulkDeleteJobs(ctx context.Context, req *pb.BulkDeleteJobsRequest) (*pb.BulkDeleteJobsResponse, error)
- func (g *GRPCServer) BulkDeleteWorkflows(ctx context.Context, req *pb.BulkDeleteWorkflowsRequest) (*pb.BulkDeleteWorkflowsResponse, error)
- func (g *GRPCServer) BulkRetryBatches(ctx context.Context, req *pb.BulkRetryBatchesRequest) (*pb.BulkRetryBatchesResponse, error)
- func (g *GRPCServer) BulkRetryJobs(ctx context.Context, req *pb.BulkRetryJobsRequest) (*pb.BulkRetryJobsResponse, error)
- func (g *GRPCServer) BulkRetryWorkflows(ctx context.Context, req *pb.BulkRetryWorkflowsRequest) (*pb.BulkRetryWorkflowsResponse, error)
- func (g *GRPCServer) CancelBatch(ctx context.Context, req *pb.CancelBatchRequest) (*pb.CancelBatchResponse, error)
- func (g *GRPCServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error)
- func (g *GRPCServer) CancelWorkflow(ctx context.Context, req *pb.CancelWorkflowRequest) (*pb.CancelWorkflowResponse, error)
- func (g *GRPCServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error)
- func (g *GRPCServer) GetBatch(ctx context.Context, req *pb.GetBatchRequest) (*pb.GetBatchResponse, error)
- func (g *GRPCServer) GetBatchItemResult(ctx context.Context, req *pb.GetBatchItemResultRequest) (*pb.GetBatchItemResultResponse, error)
- func (g *GRPCServer) GetBatchResults(ctx context.Context, req *pb.GetBatchResultsRequest) (*pb.GetBatchResultsResponse, error)
- func (g *GRPCServer) GetDailyStats(ctx context.Context, req *pb.GetDailyStatsRequest) (*pb.GetDailyStatsResponse, error)
- func (g *GRPCServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.GetJobResponse, error)
- func (g *GRPCServer) GetNode(ctx context.Context, req *pb.GetNodeRequest) (*pb.GetNodeResponse, error)
- func (g *GRPCServer) GetRedisInfo(ctx context.Context, req *pb.GetRedisInfoRequest) (*pb.GetRedisInfoResponse, error)
- func (g *GRPCServer) GetRun(ctx context.Context, req *pb.GetRunRequest) (*pb.GetRunResponse, error)
- func (g *GRPCServer) GetSchedule(ctx context.Context, req *pb.GetScheduleRequest) (*pb.GetScheduleResponse, error)
- func (g *GRPCServer) GetStats(ctx context.Context, _ *pb.GetStatsRequest) (*pb.GetStatsResponse, error)
- func (g *GRPCServer) GetSyncRetries(_ context.Context, _ *pb.GetSyncRetriesRequest) (*pb.GetSyncRetriesResponse, error)
- func (g *GRPCServer) GetWorkflow(ctx context.Context, req *pb.GetWorkflowRequest) (*pb.GetWorkflowResponse, error)
- func (g *GRPCServer) Health(_ context.Context, _ *pb.HealthRequest) (*pb.HealthResponse, error)
- func (g *GRPCServer) ListBatches(ctx context.Context, req *pb.ListBatchesRequest) (*pb.ListBatchesResponse, error)
- func (g *GRPCServer) ListDLQ(ctx context.Context, req *pb.ListDLQRequest) (*pb.ListDLQResponse, error)
- func (g *GRPCServer) ListGroups(ctx context.Context, _ *pb.ListGroupsRequest) (*pb.ListGroupsResponse, error)
- func (g *GRPCServer) ListHistoryEvents(ctx context.Context, req *pb.ListHistoryEventsRequest) (*pb.ListHistoryEventsResponse, error)
- func (g *GRPCServer) ListHistoryRuns(ctx context.Context, req *pb.ListHistoryRunsRequest) (*pb.ListHistoryRunsResponse, error)
- func (g *GRPCServer) ListJobEvents(ctx context.Context, req *pb.ListJobEventsRequest) (*pb.ListJobEventsResponse, error)
- func (g *GRPCServer) ListJobRuns(ctx context.Context, req *pb.ListJobRunsRequest) (*pb.ListJobRunsResponse, error)
- func (g *GRPCServer) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
- func (g *GRPCServer) ListNodes(ctx context.Context, _ *pb.ListNodesRequest) (*pb.ListNodesResponse, error)
- func (g *GRPCServer) ListQueues(ctx context.Context, _ *pb.ListQueuesRequest) (*pb.ListQueuesResponse, error)
- func (g *GRPCServer) ListRuns(ctx context.Context, _ *pb.ListRunsRequest) (*pb.ListRunsResponse, error)
- func (g *GRPCServer) ListSchedules(ctx context.Context, req *pb.ListSchedulesRequest) (*pb.ListSchedulesResponse, error)
- func (g *GRPCServer) ListWorkflows(ctx context.Context, req *pb.ListWorkflowsRequest) (*pb.ListWorkflowsResponse, error)
- func (g *GRPCServer) PauseQueue(ctx context.Context, req *pb.PauseQueueRequest) (*pb.PauseQueueResponse, error)
- func (g *GRPCServer) RegisterServer(s *grpc.Server)
- func (g *GRPCServer) ResumeQueue(ctx context.Context, req *pb.ResumeQueueRequest) (*pb.ResumeQueueResponse, error)
- func (g *GRPCServer) RetryBatch(ctx context.Context, req *pb.RetryBatchRequest) (*pb.RetryBatchResponse, error)
- func (g *GRPCServer) RetryJob(ctx context.Context, req *pb.RetryJobRequest) (*pb.RetryJobResponse, error)
- func (g *GRPCServer) RetryWorkflow(ctx context.Context, req *pb.RetryWorkflowRequest) (*pb.RetryWorkflowResponse, error)
- func (g *GRPCServer) UpdateJobPayload(ctx context.Context, req *pb.UpdateJobPayloadRequest) (*pb.UpdateJobPayloadResponse, error)
- type GroupInfo
- type Hub
- type QueueInfo
- type RedisInfoResponse
- type RedisNodeInfo
- type StatsResponse
- type SyncRetryStatsFunc
- type UniqueKeyResult
- type ValidateResult
- type WSClient
- type WorkflowAuditEntry
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func HandleWebSocket ¶ added in v0.1.1
func HandleWebSocket(hub Hub, logger gochainedlog.Logger) http.HandlerFunc
HandleWebSocket returns an http.HandlerFunc that upgrades connections to WebSocket and streams real-time events from the Hub.
Types ¶
type API ¶
type API struct {
// contains filtered or unexported fields
}
API serves the monitoring HTTP endpoints for the React UI.
func NewAPI ¶
func NewAPI(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *API
NewAPI creates a new monitoring API backed by Redis.
func (*API) Service ¶ added in v0.1.3
func (a *API) Service() *APIService
Service returns the underlying APIService (used by gRPC server).
func (*API) SetSyncRetryStatsFunc ¶
func (a *API) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)
SetSyncRetryStatsFunc sets the function used to retrieve SyncRetrier stats.
type APIService ¶ added in v0.1.1
type APIService struct {
// contains filtered or unexported fields
}
APIService holds all transport-agnostic business logic for the monitoring API.
func NewAPIService ¶ added in v0.1.1
func NewAPIService(s *store.RedisStore, disp Dispatcher, logger gochainedlog.Logger) *APIService
NewAPIService creates a new APIService, starting the hub's Redis subscription.
func (*APIService) BulkCancelBatches ¶ added in v0.1.1
func (a *APIService) BulkCancelBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkCancelJobs ¶ added in v0.1.1
func (a *APIService) BulkCancelJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkCancelWorkflows ¶ added in v0.1.1
func (a *APIService) BulkCancelWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkDeleteBatches ¶ added in v0.1.1
func (a *APIService) BulkDeleteBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkDeleteJobs ¶ added in v0.1.1
func (a *APIService) BulkDeleteJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkDeleteWorkflows ¶ added in v0.1.1
func (a *APIService) BulkDeleteWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkRetryBatches ¶ added in v0.1.1
func (a *APIService) BulkRetryBatches(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkRetryJobs ¶ added in v0.1.1
func (a *APIService) BulkRetryJobs(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) BulkRetryWorkflows ¶ added in v0.1.1
func (a *APIService) BulkRetryWorkflows(ctx context.Context, req BulkRequest) (*BulkResult, error)
func (*APIService) CancelBatch ¶ added in v0.1.1
func (a *APIService) CancelBatch(ctx context.Context, batchID string) error
func (*APIService) CancelJob ¶ added in v0.1.1
func (a *APIService) CancelJob(ctx context.Context, jobID string) error
func (*APIService) CancelWorkflow ¶ added in v0.1.1
func (a *APIService) CancelWorkflow(ctx context.Context, wfID string) error
func (*APIService) CheckUniqueKey ¶ added in v0.1.2
func (a *APIService) CheckUniqueKey(ctx context.Context, key string) (*UniqueKeyResult, error)
func (*APIService) DeleteJob ¶ added in v0.1.1
func (a *APIService) DeleteJob(ctx context.Context, jobID string) error
func (*APIService) DeleteUniqueKey ¶ added in v0.1.2
func (a *APIService) DeleteUniqueKey(ctx context.Context, key string) error
func (*APIService) GetBatch ¶ added in v0.1.1
func (a *APIService) GetBatch(ctx context.Context, batchID string) (*types.BatchInstance, error)
func (*APIService) GetBatchItemResult ¶ added in v0.1.1
func (a *APIService) GetBatchItemResult(ctx context.Context, batchID, itemID string) (*types.BatchItemResult, error)
func (*APIService) GetBatchResults ¶ added in v0.1.1
func (a *APIService) GetBatchResults(ctx context.Context, batchID string) ([]*types.BatchItemResult, error)
func (*APIService) GetDailyStats ¶ added in v0.1.1
func (a *APIService) GetDailyStats(ctx context.Context, days int) ([]DailyStatsEntry, error)
func (*APIService) GetJobActiveRuns ¶ added in v0.1.1
func (*APIService) GetJobAuditCounts ¶ added in v0.1.3
func (a *APIService) GetJobAuditCounts(ctx context.Context, jobIDs []string) (map[string]int64, error)
GetJobAuditCounts returns audit trail entry counts for multiple jobs.
func (*APIService) GetJobAuditTrail ¶ added in v0.1.3
func (a *APIService) GetJobAuditTrail(ctx context.Context, jobID string) ([]store.AuditEntry, error)
func (*APIService) GetRedisInfo ¶ added in v0.1.1
func (a *APIService) GetRedisInfo(ctx context.Context, section string) (*RedisInfoResponse, error)
func (*APIService) GetSchedule ¶ added in v0.1.1
func (a *APIService) GetSchedule(ctx context.Context, jobID string) (*types.ScheduleEntry, error)
func (*APIService) GetStats ¶ added in v0.1.1
func (a *APIService) GetStats(ctx context.Context) (*StatsResponse, error)
func (*APIService) GetSyncRetries ¶ added in v0.1.1
func (a *APIService) GetSyncRetries() any
func (*APIService) GetWorkflow ¶ added in v0.1.1
func (a *APIService) GetWorkflow(ctx context.Context, wfID string) (*types.WorkflowInstance, error)
func (*APIService) GetWorkflowAuditTrail ¶ added in v0.1.3
func (a *APIService) GetWorkflowAuditTrail(ctx context.Context, wfID string) ([]WorkflowAuditEntry, error)
GetWorkflowAuditTrail aggregates audit entries from all task jobs in a workflow.
func (*APIService) GetWorkflowTaskResult ¶ added in v0.1.10
func (a *APIService) GetWorkflowTaskResult(ctx context.Context, wfID, taskName string) (json.RawMessage, error)
func (*APIService) Hub ¶ added in v0.1.1
func (a *APIService) Hub() Hub
func (*APIService) IsNodeDraining ¶ added in v0.1.3
func (*APIService) ListActiveRuns ¶ added in v0.1.1
func (*APIService) ListBatches ¶ added in v0.1.1
func (a *APIService) ListBatches(ctx context.Context, filter store.BatchFilter) ([]types.BatchInstance, int, error)
func (*APIService) ListDLQ ¶ added in v0.1.1
func (a *APIService) ListDLQ(ctx context.Context, limit, offset int) ([]*types.WorkMessage, int, error)
func (*APIService) ListGroups ¶ added in v0.1.1
func (a *APIService) ListGroups(ctx context.Context) ([]GroupInfo, error)
func (*APIService) ListHistoryEvents ¶ added in v0.1.1
func (a *APIService) ListHistoryEvents(ctx context.Context, filter store.EventFilter) ([]types.JobEvent, int, error)
func (*APIService) ListHistoryRuns ¶ added in v0.1.1
func (*APIService) ListJobEvents ¶ added in v0.1.1
func (a *APIService) ListJobEvents(ctx context.Context, filter store.EventFilter) ([]types.JobEvent, int, error)
func (*APIService) ListJobRuns ¶ added in v0.1.1
func (*APIService) ListQueues ¶ added in v0.1.1
func (a *APIService) ListQueues(ctx context.Context) ([]QueueInfo, error)
func (*APIService) ListSchedules ¶ added in v0.1.1
func (a *APIService) ListSchedules(ctx context.Context, filter store.ScheduleFilter) ([]types.ScheduleEntry, int, error)
func (*APIService) ListWorkflows ¶ added in v0.1.1
func (a *APIService) ListWorkflows(ctx context.Context, filter store.WorkflowFilter) ([]types.WorkflowInstance, int, error)
func (*APIService) Logger ¶ added in v0.1.1
func (a *APIService) Logger() gochainedlog.Logger
func (*APIService) PauseQueue ¶ added in v0.1.1
func (a *APIService) PauseQueue(ctx context.Context, tierName string) error
func (*APIService) ResumeQueue ¶ added in v0.1.1
func (a *APIService) ResumeQueue(ctx context.Context, tierName string) error
func (*APIService) ResumeWorkflow ¶ added in v0.1.10
func (a *APIService) ResumeWorkflow(ctx context.Context, wfID string) error
func (*APIService) RetryBatch ¶ added in v0.1.1
func (*APIService) RetryJob ¶ added in v0.1.1
func (a *APIService) RetryJob(ctx context.Context, jobID string) error
func (*APIService) RetryWorkflow ¶ added in v0.1.1
func (a *APIService) RetryWorkflow(ctx context.Context, wfID string) error
func (*APIService) SearchBatchesByPayload ¶ added in v0.1.2
func (a *APIService) SearchBatchesByPayload(ctx context.Context, path, value string) (*types.BatchInstance, error)
func (*APIService) SearchJobsByPayload ¶ added in v0.1.2
func (*APIService) SearchWorkflowsByPayload ¶ added in v0.1.2
func (a *APIService) SearchWorkflowsByPayload(ctx context.Context, path, value string) (*types.WorkflowInstance, error)
func (*APIService) SetNodeDrain ¶ added in v0.1.3
func (*APIService) SetSyncRetryStatsFunc ¶ added in v0.1.1
func (a *APIService) SetSyncRetryStatsFunc(fn SyncRetryStatsFunc)
func (*APIService) Shutdown ¶ added in v0.1.1
func (a *APIService) Shutdown()
func (*APIService) Store ¶ added in v0.1.1
func (a *APIService) Store() *store.RedisStore
func (*APIService) SuspendWorkflow ¶ added in v0.1.10
func (a *APIService) SuspendWorkflow(ctx context.Context, wfID string) error
func (*APIService) UpdateJobPayload ¶ added in v0.1.1
func (a *APIService) UpdateJobPayload(ctx context.Context, jobID string, payload json.RawMessage) error
func (*APIService) ValidateWorkflowDefinition ¶ added in v0.1.10
func (a *APIService) ValidateWorkflowDefinition(_ context.Context, def *types.WorkflowDefinition) *ValidateResult
type BulkRequest ¶ added in v0.1.1
BulkRequest is the common shape for bulk operation requests.
type BulkResult ¶ added in v0.1.3
type BulkResult struct {
Affected int `json:"affected"`
}
BulkResult holds the count of affected entities.
type DailyStatsEntry ¶
type DailyStatsEntry struct {
Date string `json:"date"`
Processed int64 `json:"processed"`
Failed int64 `json:"failed"`
}
DailyStatsEntry represents aggregated stats for one day.
type Dispatcher ¶
type Dispatcher interface {
Dispatch(ctx context.Context, job *types.Job, attempt int) error
PublishEvent(event types.JobEvent)
}
Dispatcher is the subset of *dispatcher.Dispatcher used by the service layer.
type GRPCServer ¶
type GRPCServer struct {
pb.UnimplementedMonitorServiceServer
// contains filtered or unexported fields
}
GRPCServer implements the MonitorService gRPC service.
func NewGRPCServer ¶
func NewGRPCServer(svc *APIService) *GRPCServer
NewGRPCServer creates a new gRPC monitor server backed by an APIService.
func (*GRPCServer) BulkCancelBatches ¶
func (g *GRPCServer) BulkCancelBatches(ctx context.Context, req *pb.BulkCancelBatchesRequest) (*pb.BulkCancelBatchesResponse, error)
BulkCancelBatches cancels multiple batches by ID or status filter.
func (*GRPCServer) BulkCancelJobs ¶
func (g *GRPCServer) BulkCancelJobs(ctx context.Context, req *pb.BulkCancelJobsRequest) (*pb.BulkCancelJobsResponse, error)
BulkCancelJobs cancels multiple jobs by ID or status filter.
func (*GRPCServer) BulkCancelWorkflows ¶
func (g *GRPCServer) BulkCancelWorkflows(ctx context.Context, req *pb.BulkCancelWorkflowsRequest) (*pb.BulkCancelWorkflowsResponse, error)
BulkCancelWorkflows cancels multiple workflows by ID or status filter.
func (*GRPCServer) BulkDeleteBatches ¶
func (g *GRPCServer) BulkDeleteBatches(ctx context.Context, req *pb.BulkDeleteBatchesRequest) (*pb.BulkDeleteBatchesResponse, error)
BulkDeleteBatches deletes multiple batches by ID or status filter.
func (*GRPCServer) BulkDeleteJobs ¶
func (g *GRPCServer) BulkDeleteJobs(ctx context.Context, req *pb.BulkDeleteJobsRequest) (*pb.BulkDeleteJobsResponse, error)
BulkDeleteJobs deletes multiple jobs by ID or status filter.
func (*GRPCServer) BulkDeleteWorkflows ¶
func (g *GRPCServer) BulkDeleteWorkflows(ctx context.Context, req *pb.BulkDeleteWorkflowsRequest) (*pb.BulkDeleteWorkflowsResponse, error)
BulkDeleteWorkflows deletes multiple workflows by ID or status filter.
func (*GRPCServer) BulkRetryBatches ¶
func (g *GRPCServer) BulkRetryBatches(ctx context.Context, req *pb.BulkRetryBatchesRequest) (*pb.BulkRetryBatchesResponse, error)
BulkRetryBatches retries multiple batches by ID or status filter.
func (*GRPCServer) BulkRetryJobs ¶
func (g *GRPCServer) BulkRetryJobs(ctx context.Context, req *pb.BulkRetryJobsRequest) (*pb.BulkRetryJobsResponse, error)
BulkRetryJobs retries multiple jobs by ID or status filter.
func (*GRPCServer) BulkRetryWorkflows ¶
func (g *GRPCServer) BulkRetryWorkflows(ctx context.Context, req *pb.BulkRetryWorkflowsRequest) (*pb.BulkRetryWorkflowsResponse, error)
BulkRetryWorkflows retries multiple workflows by ID or status filter.
func (*GRPCServer) CancelBatch ¶
func (g *GRPCServer) CancelBatch(ctx context.Context, req *pb.CancelBatchRequest) (*pb.CancelBatchResponse, error)
CancelBatch cancels a batch and all its non-terminal items and child jobs.
func (*GRPCServer) CancelJob ¶
func (g *GRPCServer) CancelJob(ctx context.Context, req *pb.CancelJobRequest) (*pb.CancelJobResponse, error)
CancelJob transitions a job to cancelled, removes its schedule, publishes cancel signals to in-flight workers, and emits a JobCancelled event.
func (*GRPCServer) CancelWorkflow ¶
func (g *GRPCServer) CancelWorkflow(ctx context.Context, req *pb.CancelWorkflowRequest) (*pb.CancelWorkflowResponse, error)
CancelWorkflow cancels a workflow and all its non-terminal tasks and child jobs.
func (*GRPCServer) DeleteJob ¶
func (g *GRPCServer) DeleteJob(ctx context.Context, req *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error)
DeleteJob removes a job and its schedule.
func (*GRPCServer) GetBatch ¶
func (g *GRPCServer) GetBatch(ctx context.Context, req *pb.GetBatchRequest) (*pb.GetBatchResponse, error)
GetBatch retrieves a single batch instance by ID.
func (*GRPCServer) GetBatchItemResult ¶
func (g *GRPCServer) GetBatchItemResult(ctx context.Context, req *pb.GetBatchItemResultRequest) (*pb.GetBatchItemResultResponse, error)
GetBatchItemResult retrieves a single batch item result.
func (*GRPCServer) GetBatchResults ¶
func (g *GRPCServer) GetBatchResults(ctx context.Context, req *pb.GetBatchResultsRequest) (*pb.GetBatchResultsResponse, error)
GetBatchResults returns all item results for a batch.
func (*GRPCServer) GetDailyStats ¶
func (g *GRPCServer) GetDailyStats(ctx context.Context, req *pb.GetDailyStatsRequest) (*pb.GetDailyStatsResponse, error)
GetDailyStats returns aggregated daily processing statistics.
func (*GRPCServer) GetJob ¶
func (g *GRPCServer) GetJob(ctx context.Context, req *pb.GetJobRequest) (*pb.GetJobResponse, error)
GetJob returns a single job by ID.
func (*GRPCServer) GetNode ¶
func (g *GRPCServer) GetNode(ctx context.Context, req *pb.GetNodeRequest) (*pb.GetNodeResponse, error)
GetNode returns a single node by ID.
func (*GRPCServer) GetRedisInfo ¶
func (g *GRPCServer) GetRedisInfo(ctx context.Context, req *pb.GetRedisInfoRequest) (*pb.GetRedisInfoResponse, error)
GetRedisInfo returns parsed Redis INFO output organized by section.
func (*GRPCServer) GetRun ¶
func (g *GRPCServer) GetRun(ctx context.Context, req *pb.GetRunRequest) (*pb.GetRunResponse, error)
GetRun returns a single run by ID.
func (*GRPCServer) GetSchedule ¶
func (g *GRPCServer) GetSchedule(ctx context.Context, req *pb.GetScheduleRequest) (*pb.GetScheduleResponse, error)
GetSchedule returns a single schedule entry by job ID.
func (*GRPCServer) GetStats ¶
func (g *GRPCServer) GetStats(ctx context.Context, _ *pb.GetStatsRequest) (*pb.GetStatsResponse, error)
GetStats returns overall system statistics including job counts, active schedules, runs, and nodes.
func (*GRPCServer) GetSyncRetries ¶
func (g *GRPCServer) GetSyncRetries(_ context.Context, _ *pb.GetSyncRetriesRequest) (*pb.GetSyncRetriesResponse, error)
GetSyncRetries returns the current SyncRetrier statistics as an opaque JSON blob.
func (*GRPCServer) GetWorkflow ¶
func (g *GRPCServer) GetWorkflow(ctx context.Context, req *pb.GetWorkflowRequest) (*pb.GetWorkflowResponse, error)
GetWorkflow retrieves a single workflow instance by ID.
func (*GRPCServer) Health ¶
func (g *GRPCServer) Health(_ context.Context, _ *pb.HealthRequest) (*pb.HealthResponse, error)
Health returns a simple health check response.
func (*GRPCServer) ListBatches ¶
func (g *GRPCServer) ListBatches(ctx context.Context, req *pb.ListBatchesRequest) (*pb.ListBatchesResponse, error)
ListBatches returns paginated batch instances with optional status filter.
func (*GRPCServer) ListDLQ ¶
func (g *GRPCServer) ListDLQ(ctx context.Context, req *pb.ListDLQRequest) (*pb.ListDLQResponse, error)
ListDLQ returns paginated dead-letter queue messages.
func (*GRPCServer) ListGroups ¶
func (g *GRPCServer) ListGroups(ctx context.Context, _ *pb.ListGroupsRequest) (*pb.ListGroupsResponse, error)
ListGroups returns all active aggregation groups with their sizes.
func (*GRPCServer) ListHistoryEvents ¶
func (g *GRPCServer) ListHistoryEvents(ctx context.Context, req *pb.ListHistoryEventsRequest) (*pb.ListHistoryEventsResponse, error)
ListHistoryEvents returns paginated historical events with optional job_id filter.
func (*GRPCServer) ListHistoryRuns ¶
func (g *GRPCServer) ListHistoryRuns(ctx context.Context, req *pb.ListHistoryRunsRequest) (*pb.ListHistoryRunsResponse, error)
ListHistoryRuns returns paginated historical runs with optional status and job_id filters.
func (*GRPCServer) ListJobEvents ¶
func (g *GRPCServer) ListJobEvents(ctx context.Context, req *pb.ListJobEventsRequest) (*pb.ListJobEventsResponse, error)
ListJobEvents returns recent events for a specific job.
func (*GRPCServer) ListJobRuns ¶
func (g *GRPCServer) ListJobRuns(ctx context.Context, req *pb.ListJobRunsRequest) (*pb.ListJobRunsResponse, error)
ListJobRuns returns recent runs for a specific job.
func (*GRPCServer) ListJobs ¶
func (g *GRPCServer) ListJobs(ctx context.Context, req *pb.ListJobsRequest) (*pb.ListJobsResponse, error)
ListJobs returns a paginated list of jobs, optionally filtered by status, task type, tag, or search term.
func (*GRPCServer) ListNodes ¶
func (g *GRPCServer) ListNodes(ctx context.Context, _ *pb.ListNodesRequest) (*pb.ListNodesResponse, error)
ListNodes returns all registered server nodes.
func (*GRPCServer) ListQueues ¶
func (g *GRPCServer) ListQueues(ctx context.Context, _ *pb.ListQueuesRequest) (*pb.ListQueuesResponse, error)
ListQueues returns queue/tier information including pause state and size.
func (*GRPCServer) ListRuns ¶
func (g *GRPCServer) ListRuns(ctx context.Context, _ *pb.ListRunsRequest) (*pb.ListRunsResponse, error)
ListRuns returns all active runs.
func (*GRPCServer) ListSchedules ¶
func (g *GRPCServer) ListSchedules(ctx context.Context, req *pb.ListSchedulesRequest) (*pb.ListSchedulesResponse, error)
ListSchedules returns a paginated list of schedule entries.
func (*GRPCServer) ListWorkflows ¶
func (g *GRPCServer) ListWorkflows(ctx context.Context, req *pb.ListWorkflowsRequest) (*pb.ListWorkflowsResponse, error)
ListWorkflows returns paginated workflow instances with optional status filter.
func (*GRPCServer) PauseQueue ¶
func (g *GRPCServer) PauseQueue(ctx context.Context, req *pb.PauseQueueRequest) (*pb.PauseQueueResponse, error)
PauseQueue pauses a queue/tier by name.
func (*GRPCServer) RegisterServer ¶
func (g *GRPCServer) RegisterServer(s *grpc.Server)
RegisterServer registers the MonitorService implementation with a gRPC server.
func (*GRPCServer) ResumeQueue ¶
func (g *GRPCServer) ResumeQueue(ctx context.Context, req *pb.ResumeQueueRequest) (*pb.ResumeQueueResponse, error)
ResumeQueue resumes a paused queue/tier by name.
func (*GRPCServer) RetryBatch ¶
func (g *GRPCServer) RetryBatch(ctx context.Context, req *pb.RetryBatchRequest) (*pb.RetryBatchResponse, error)
RetryBatch resets a batch for a new execution attempt, optionally retrying only failed items.
func (*GRPCServer) RetryJob ¶
func (g *GRPCServer) RetryJob(ctx context.Context, req *pb.RetryJobRequest) (*pb.RetryJobResponse, error)
RetryJob resets a job to pending, clears its attempt counter and error, dispatches it for immediate execution, and emits a JobRetrying event.
func (*GRPCServer) RetryWorkflow ¶
func (g *GRPCServer) RetryWorkflow(ctx context.Context, req *pb.RetryWorkflowRequest) (*pb.RetryWorkflowResponse, error)
RetryWorkflow resets a workflow and all its tasks for a new execution attempt.
func (*GRPCServer) UpdateJobPayload ¶
func (g *GRPCServer) UpdateJobPayload(ctx context.Context, req *pb.UpdateJobPayloadRequest) (*pb.UpdateJobPayloadResponse, error)
UpdateJobPayload replaces the payload of a pending or scheduled job.
type Hub ¶
type Hub interface {
Register(c *WSClient)
Unregister(c *WSClient)
Broadcast(data []byte)
ClientCount() int
SubscribeRedis(ctx context.Context, rdb rueidis.Client, channel string)
}
Hub manages WebSocket client connections and broadcasts Redis Pub/Sub events. A single Redis subscription feeds all connected WebSocket clients (fan-out).
type QueueInfo ¶
type QueueInfo struct {
Name string `json:"name"`
Weight int `json:"weight"`
FetchBatch int `json:"fetch_batch"`
RateLimit float64 `json:"rate_limit,omitempty"`
RateBurst int `json:"rate_burst,omitempty"`
Paused bool `json:"paused"`
Size int64 `json:"size"`
}
QueueInfo describes the state of a single queue/tier.
type RedisInfoResponse ¶ added in v0.1.10
type RedisInfoResponse struct {
Mode string `json:"mode"` // "cluster" or "standalone"
Nodes []RedisNodeInfo `json:"nodes"`
}
RedisInfoResponse holds Redis INFO for all nodes with mode indicator.
type RedisNodeInfo ¶ added in v0.1.10
type RedisNodeInfo struct {
Addr string `json:"addr"`
Role string `json:"role"`
Sections map[string]map[string]string `json:"sections"`
}
RedisNodeInfo holds parsed INFO output for a single Redis node.
type StatsResponse ¶ added in v0.1.1
type StatsResponse struct {
JobCounts map[types.JobStatus]int `json:"job_counts"`
ActiveSchedules int `json:"active_schedules"`
ActiveRuns int `json:"active_runs"`
ActiveNodes int `json:"active_nodes"`
WorkflowCounts map[types.WorkflowStatus]int `json:"workflow_counts,omitempty"`
BatchCounts map[types.WorkflowStatus]int `json:"batch_counts,omitempty"`
ActiveWorkflows int `json:"active_workflows"`
ActiveBatches int `json:"active_batches"`
}
StatsResponse holds overall system statistics.
type SyncRetryStatsFunc ¶
type SyncRetryStatsFunc func() any
SyncRetryStatsFunc is a function that returns the current SyncRetrier stats as a JSON-serializable value. Set via SetSyncRetryStatsFunc.
type UniqueKeyResult ¶ added in v0.1.3
type ValidateResult ¶ added in v0.1.10
ValidateResult holds the result of a workflow definition validation.
type WSClient ¶ added in v0.1.1
type WSClient struct {
Send chan []byte
}
WSClient represents a single WebSocket connection registered with the Hub.
func NewWSClient ¶ added in v0.1.1
func NewWSClient() *WSClient
NewWSClient creates a new WebSocket client with a buffered send channel.
type WorkflowAuditEntry ¶ added in v0.1.3
type WorkflowAuditEntry struct {
TaskName string `json:"task_name"`
JobID string `json:"job_id"`
ID string `json:"id"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
WorkflowAuditEntry is an audit entry annotated with task name and job ID.