postgres

package
v1.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Store

type Store struct {
	// contains filtered or unexported fields
}

Store implements scheduler.Store and dispatcher.Store using PostgreSQL.

func New

func New(db *sql.DB, opTimeout time.Duration) *Store

New creates a new PostgreSQL store with the given database connection. opTimeout specifies the maximum duration for individual DB operations. If opTimeout is 0, no timeout is applied (not recommended for production).

func (*Store) AckExecution

func (s *Store) AckExecution(ctx context.Context, id uuid.UUID, ns domain.Namespace) error

AckExecution marks a terminal unacknowledged execution as acknowledged.

func (*Store) CreateJob

func (s *Store) CreateJob(ctx context.Context, job domain.Job, schedule domain.Schedule) error

CreateJob creates a new job with its schedule in a transaction.

func (*Store) DeleteJob

func (s *Store) DeleteJob(ctx context.Context, jobID uuid.UUID, ns domain.Namespace) error

DeleteJob cascade-deletes a job by id and namespace.

func (*Store) DeleteKey

func (s *Store) DeleteKey(ctx context.Context, id uuid.UUID, ns domain.Namespace) error

DeleteKey deletes an API key by id and namespace.

func (*Store) DeleteTags

func (s *Store) DeleteTags(ctx context.Context, jobID uuid.UUID) error

DeleteTags deletes all tags for a job.

func (*Store) DequeueExecution

func (s *Store) DequeueExecution(ctx context.Context) (*domain.Execution, error)

DequeueExecution atomically claims one emitted execution by transitioning it to in_progress with a claimed_at timestamp. Returns nil, nil if no work available. Uses SELECT FOR UPDATE SKIP LOCKED to prevent double-claim under concurrency.

func (*Store) GetAttempts

func (s *Store) GetAttempts(ctx context.Context, executionID uuid.UUID) ([]domain.DeliveryAttempt, error)

GetAttempts returns all delivery attempts for an execution.

func (*Store) GetEnabledJobs

func (s *Store) GetEnabledJobs(ctx context.Context, limit int, afterID uuid.UUID) ([]domain.JobWithSchedule, error)

GetEnabledJobs returns enabled jobs with their schedules after afterID, ordered by job ID.

func (*Store) GetExecution

func (s *Store) GetExecution(ctx context.Context, id uuid.UUID) (domain.Execution, error)

GetExecution returns an execution by its ID.

func (*Store) GetExecutionScoped

func (s *Store) GetExecutionScoped(ctx context.Context, id uuid.UUID, ns domain.Namespace) (domain.Execution, error)

GetExecutionScoped returns an execution by ID filtered by namespace at the SQL level. This provides defense-in-depth for API-facing operations.

func (*Store) GetJob

func (s *Store) GetJob(ctx context.Context, id uuid.UUID) (domain.Job, error)

GetJob returns a job by its ID.

func (*Store) GetJobByID

func (s *Store) GetJobByID(ctx context.Context, jobID uuid.UUID) (domain.Job, error)

GetJobByID returns a job by its ID.

func (*Store) GetJobWithSchedule

func (s *Store) GetJobWithSchedule(ctx context.Context, id uuid.UUID) (domain.Job, domain.Schedule, error)

GetJobWithSchedule returns a job and its schedule by job ID.

func (*Store) GetJobWithScheduleScoped

func (s *Store) GetJobWithScheduleScoped(ctx context.Context, id uuid.UUID, ns domain.Namespace) (domain.Job, domain.Schedule, error)

GetJobWithScheduleScoped returns a job and its schedule filtered by both ID and namespace. This provides defense-in-depth for API-facing operations.

func (*Store) GetKeyByTokenHash

func (s *Store) GetKeyByTokenHash(ctx context.Context, tokenHash string) (domain.APIKey, error)

GetKeyByTokenHash returns an enabled API key by its token hash.

func (*Store) GetLastScheduledExecution

func (s *Store) GetLastScheduledExecution(ctx context.Context, jobID uuid.UUID) (time.Time, bool, error)

GetLastScheduledExecution returns the most recent scheduled fire time for a job.

func (*Store) GetOrphanedExecutions

func (s *Store) GetOrphanedExecutions(ctx context.Context, olderThan time.Time, maxResults int) ([]domain.Execution, error)

GetOrphanedExecutions returns executions that are stuck in 'emitted' status and were created before the given threshold time. Results are ordered by created_at ASC (oldest first) and limited to maxResults.

func (*Store) GetRecentExecutions

func (s *Store) GetRecentExecutions(ctx context.Context, jobID uuid.UUID, limit int) ([]domain.Execution, error)

GetRecentExecutions returns the most recent executions for a job.

func (*Store) GetSchedule

func (s *Store) GetSchedule(ctx context.Context, id uuid.UUID) (domain.Schedule, error)

GetSchedule returns a schedule by its ID.

func (*Store) GetTags

func (s *Store) GetTags(ctx context.Context, jobID uuid.UUID) ([]domain.Tag, error)

GetTags returns all tags for a job.

func (*Store) InsertAPIKey

func (s *Store) InsertAPIKey(ctx context.Context, key domain.APIKey) error

InsertAPIKey inserts a new API key.

func (*Store) InsertDeliveryAttempt

func (s *Store) InsertDeliveryAttempt(ctx context.Context, attempt domain.DeliveryAttempt) error

InsertDeliveryAttempt inserts a new delivery attempt record.

func (*Store) InsertExecution

func (s *Store) InsertExecution(ctx context.Context, exec domain.Execution) error

InsertExecution inserts a new execution record. Returns scheduler.ErrDuplicateExecution if (job_id, scheduled_at) already exists.

func (*Store) InsertJob

func (s *Store) InsertJob(ctx context.Context, job domain.Job, schedule domain.Schedule) error

InsertJob creates a new job with its schedule in a transaction. Alias for CreateJob to satisfy the domain.JobRepository interface.

func (*Store) InsertSchedule

func (s *Store) InsertSchedule(ctx context.Context, schedule domain.Schedule) error

InsertSchedule inserts a new schedule.

func (*Store) ListExecutions

func (s *Store) ListExecutions(ctx context.Context, filter domain.ExecutionFilter) ([]domain.Execution, error)

ListExecutions returns executions matching the filter with dynamic WHERE. This method satisfies the domain.ExecutionRepository interface.

func (*Store) ListJobs

func (s *Store) ListJobs(ctx context.Context, filter domain.JobFilter) ([]domain.Job, error)

ListJobs returns jobs matching the filter. It supports the full JobFilter including namespace, tags, enabled, and name substring filtering. This method satisfies the domain.JobRepository interface.

func (*Store) ListKeys

func (s *Store) ListKeys(ctx context.Context, ns domain.Namespace, params domain.ListParams) ([]domain.APIKey, error)

ListKeys returns API keys for a namespace, paginated.

func (*Store) ListPendingAck

func (s *Store) ListPendingAck(ctx context.Context, ns domain.Namespace, jobID *uuid.UUID, limit int) ([]domain.Execution, error)

ListPendingAck returns terminal executions that have not been acknowledged.

func (*Store) RequeueStaleExecutions

func (s *Store) RequeueStaleExecutions(ctx context.Context, olderThan time.Time, limit int) (int, error)

RequeueStaleExecutions resets in_progress executions older than olderThan back to emitted status. Uses a CTE with FOR UPDATE SKIP LOCKED to avoid interfering with active DequeueExecution transactions.

func (*Store) UpdateExecutionStatus

func (s *Store) UpdateExecutionStatus(ctx context.Context, executionID uuid.UUID, status domain.ExecutionStatus) error

UpdateExecutionStatus updates the status of an execution. Returns dispatcher.ErrStatusTransitionDenied if the execution is already in a terminal state. This uses an atomic UPDATE with WHERE clause to prevent TOCTOU race conditions.

func (*Store) UpdateJob

func (s *Store) UpdateJob(ctx context.Context, job domain.Job) error

UpdateJob updates a job by id and namespace.

func (*Store) UpdateJobAggregate

func (s *Store) UpdateJobAggregate(ctx context.Context, job domain.Job, schedule domain.Schedule, tags *[]domain.Tag) error

UpdateJobAggregate updates a job, its schedule, and optionally replaces tags atomically.

func (*Store) UpdateLastUsedAt

func (s *Store) UpdateLastUsedAt(ctx context.Context, ids []uuid.UUID) error

UpdateLastUsedAt updates last_used_at for a batch of API key IDs.

func (*Store) UpdateSchedule

func (s *Store) UpdateSchedule(ctx context.Context, schedule domain.Schedule) error

UpdateSchedule updates an existing schedule.

func (*Store) UpsertTags

func (s *Store) UpsertTags(ctx context.Context, jobID uuid.UUID, tags []domain.Tag) error

UpsertTags upserts tags for a job in a transaction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL