Documentation
¶
Index ¶
- type Store
- func (s *Store) AckExecution(ctx context.Context, id uuid.UUID, ns domain.Namespace) error
- func (s *Store) CreateJob(ctx context.Context, job domain.Job, schedule domain.Schedule) error
- func (s *Store) DeleteJob(ctx context.Context, jobID uuid.UUID, ns domain.Namespace) error
- func (s *Store) DeleteKey(ctx context.Context, id uuid.UUID, ns domain.Namespace) error
- func (s *Store) DeleteTags(ctx context.Context, jobID uuid.UUID) error
- func (s *Store) DequeueExecution(ctx context.Context) (*domain.Execution, error)
- func (s *Store) GetAttempts(ctx context.Context, executionID uuid.UUID) ([]domain.DeliveryAttempt, error)
- func (s *Store) GetEnabledJobs(ctx context.Context, limit int, afterID uuid.UUID) ([]domain.JobWithSchedule, error)
- func (s *Store) GetExecution(ctx context.Context, id uuid.UUID) (domain.Execution, error)
- func (s *Store) GetExecutionScoped(ctx context.Context, id uuid.UUID, ns domain.Namespace) (domain.Execution, error)
- func (s *Store) GetJob(ctx context.Context, id uuid.UUID) (domain.Job, error)
- func (s *Store) GetJobByID(ctx context.Context, jobID uuid.UUID) (domain.Job, error)
- func (s *Store) GetJobWithSchedule(ctx context.Context, id uuid.UUID) (domain.Job, domain.Schedule, error)
- func (s *Store) GetJobWithScheduleScoped(ctx context.Context, id uuid.UUID, ns domain.Namespace) (domain.Job, domain.Schedule, error)
- func (s *Store) GetKeyByTokenHash(ctx context.Context, tokenHash string) (domain.APIKey, error)
- func (s *Store) GetLastScheduledExecution(ctx context.Context, jobID uuid.UUID) (time.Time, bool, error)
- func (s *Store) GetOrphanedExecutions(ctx context.Context, olderThan time.Time, maxResults int) ([]domain.Execution, error)
- func (s *Store) GetRecentExecutions(ctx context.Context, jobID uuid.UUID, limit int) ([]domain.Execution, error)
- func (s *Store) GetSchedule(ctx context.Context, id uuid.UUID) (domain.Schedule, error)
- func (s *Store) GetTags(ctx context.Context, jobID uuid.UUID) ([]domain.Tag, error)
- func (s *Store) InsertAPIKey(ctx context.Context, key domain.APIKey) error
- func (s *Store) InsertDeliveryAttempt(ctx context.Context, attempt domain.DeliveryAttempt) error
- func (s *Store) InsertExecution(ctx context.Context, exec domain.Execution) error
- func (s *Store) InsertJob(ctx context.Context, job domain.Job, schedule domain.Schedule) error
- func (s *Store) InsertSchedule(ctx context.Context, schedule domain.Schedule) error
- func (s *Store) ListExecutions(ctx context.Context, filter domain.ExecutionFilter) ([]domain.Execution, error)
- func (s *Store) ListJobs(ctx context.Context, filter domain.JobFilter) ([]domain.Job, error)
- func (s *Store) ListKeys(ctx context.Context, ns domain.Namespace, params domain.ListParams) ([]domain.APIKey, error)
- func (s *Store) ListPendingAck(ctx context.Context, ns domain.Namespace, jobID *uuid.UUID, limit int) ([]domain.Execution, error)
- func (s *Store) RequeueStaleExecutions(ctx context.Context, olderThan time.Time, limit int) (int, error)
- func (s *Store) UpdateExecutionStatus(ctx context.Context, executionID uuid.UUID, status domain.ExecutionStatus) error
- func (s *Store) UpdateJob(ctx context.Context, job domain.Job) error
- func (s *Store) UpdateJobAggregate(ctx context.Context, job domain.Job, schedule domain.Schedule, ...) error
- func (s *Store) UpdateLastUsedAt(ctx context.Context, ids []uuid.UUID) error
- func (s *Store) UpdateSchedule(ctx context.Context, schedule domain.Schedule) error
- func (s *Store) UpsertTags(ctx context.Context, jobID uuid.UUID, tags []domain.Tag) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Store ¶
type Store struct {
// contains filtered or unexported fields
}
Store implements scheduler.Store and dispatcher.Store using PostgreSQL.
func New ¶
New creates a new PostgreSQL store with the given database connection. opTimeout specifies the maximum duration for individual DB operations. If opTimeout is 0, no timeout is applied (not recommended for production).
func (*Store) AckExecution ¶
AckExecution marks a terminal unacknowledged execution as acknowledged.
func (*Store) DeleteTags ¶
DeleteTags deletes all tags for a job.
func (*Store) DequeueExecution ¶
DequeueExecution atomically claims one emitted execution by transitioning it to in_progress with a claimed_at timestamp. Returns nil, nil if no work available. Uses SELECT FOR UPDATE SKIP LOCKED to prevent double-claim under concurrency.
func (*Store) GetAttempts ¶
func (s *Store) GetAttempts(ctx context.Context, executionID uuid.UUID) ([]domain.DeliveryAttempt, error)
GetAttempts returns all delivery attempts for an execution.
func (*Store) GetEnabledJobs ¶
func (s *Store) GetEnabledJobs(ctx context.Context, limit int, afterID uuid.UUID) ([]domain.JobWithSchedule, error)
GetEnabledJobs returns enabled jobs with their schedules after afterID, ordered by job ID.
func (*Store) GetExecution ¶
GetExecution returns an execution by its ID.
func (*Store) GetExecutionScoped ¶
func (s *Store) GetExecutionScoped(ctx context.Context, id uuid.UUID, ns domain.Namespace) (domain.Execution, error)
GetExecutionScoped returns an execution by ID filtered by namespace at the SQL level. This provides defense-in-depth for API-facing operations.
func (*Store) GetJobByID ¶
GetJobByID returns a job by its ID.
func (*Store) GetJobWithSchedule ¶
func (s *Store) GetJobWithSchedule(ctx context.Context, id uuid.UUID) (domain.Job, domain.Schedule, error)
GetJobWithSchedule returns a job and its schedule by job ID.
func (*Store) GetJobWithScheduleScoped ¶
func (s *Store) GetJobWithScheduleScoped(ctx context.Context, id uuid.UUID, ns domain.Namespace) (domain.Job, domain.Schedule, error)
GetJobWithScheduleScoped returns a job and its schedule filtered by both ID and namespace. This provides defense-in-depth for API-facing operations.
func (*Store) GetKeyByTokenHash ¶
GetKeyByTokenHash returns an enabled API key by its token hash.
func (*Store) GetLastScheduledExecution ¶
func (s *Store) GetLastScheduledExecution(ctx context.Context, jobID uuid.UUID) (time.Time, bool, error)
GetLastScheduledExecution returns the most recent scheduled fire time for a job.
func (*Store) GetOrphanedExecutions ¶
func (s *Store) GetOrphanedExecutions(ctx context.Context, olderThan time.Time, maxResults int) ([]domain.Execution, error)
GetOrphanedExecutions returns executions that are stuck in 'emitted' status and were created before the given threshold time. Results are ordered by created_at ASC (oldest first) and limited to maxResults.
func (*Store) GetRecentExecutions ¶
func (s *Store) GetRecentExecutions(ctx context.Context, jobID uuid.UUID, limit int) ([]domain.Execution, error)
GetRecentExecutions returns the most recent executions for a job.
func (*Store) GetSchedule ¶
GetSchedule returns a schedule by its ID.
func (*Store) InsertAPIKey ¶
InsertAPIKey inserts a new API key.
func (*Store) InsertDeliveryAttempt ¶
InsertDeliveryAttempt inserts a new delivery attempt record.
func (*Store) InsertExecution ¶
InsertExecution inserts a new execution record. Returns scheduler.ErrDuplicateExecution if (job_id, scheduled_at) already exists.
func (*Store) InsertJob ¶
InsertJob creates a new job with its schedule in a transaction. Alias for CreateJob to satisfy the domain.JobRepository interface.
func (*Store) InsertSchedule ¶
InsertSchedule inserts a new schedule.
func (*Store) ListExecutions ¶
func (s *Store) ListExecutions(ctx context.Context, filter domain.ExecutionFilter) ([]domain.Execution, error)
ListExecutions returns executions matching the filter with dynamic WHERE. This method satisfies the domain.ExecutionRepository interface.
func (*Store) ListJobs ¶
ListJobs returns jobs matching the filter. It supports the full JobFilter including namespace, tags, enabled, and name substring filtering. This method satisfies the domain.JobRepository interface.
func (*Store) ListKeys ¶
func (s *Store) ListKeys(ctx context.Context, ns domain.Namespace, params domain.ListParams) ([]domain.APIKey, error)
ListKeys returns API keys for a namespace, paginated.
func (*Store) ListPendingAck ¶
func (s *Store) ListPendingAck(ctx context.Context, ns domain.Namespace, jobID *uuid.UUID, limit int) ([]domain.Execution, error)
ListPendingAck returns terminal executions that have not been acknowledged.
func (*Store) RequeueStaleExecutions ¶
func (s *Store) RequeueStaleExecutions(ctx context.Context, olderThan time.Time, limit int) (int, error)
RequeueStaleExecutions resets in_progress executions older than olderThan back to emitted status. Uses a CTE with FOR UPDATE SKIP LOCKED to avoid interfering with active DequeueExecution transactions.
func (*Store) UpdateExecutionStatus ¶
func (s *Store) UpdateExecutionStatus(ctx context.Context, executionID uuid.UUID, status domain.ExecutionStatus) error
UpdateExecutionStatus updates the status of an execution. Returns dispatcher.ErrStatusTransitionDenied if the execution is already in a terminal state. This uses an atomic UPDATE with WHERE clause to prevent TOCTOU race conditions.
func (*Store) UpdateJobAggregate ¶
func (s *Store) UpdateJobAggregate(ctx context.Context, job domain.Job, schedule domain.Schedule, tags *[]domain.Tag) error
UpdateJobAggregate updates a job, its schedule, and optionally replaces tags atomically.
func (*Store) UpdateLastUsedAt ¶
UpdateLastUsedAt updates last_used_at for a batch of API key IDs.
func (*Store) UpdateSchedule ¶
UpdateSchedule updates an existing schedule.