postgres

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OpenConnection

func OpenConnection(ctx context.Context, config *ConnectionConfig) (*sql.DB, error)

OpenConnection opens and configures a PostgreSQL database connection.

Types

type Config

type Config struct {
	// Conn carries connection details (DSN overrides discrete fields when set).
	Conn ConnectionConfig

	Logger     *log.Logger
	KeyManager *keymanager.EncryptionKeyManager
}

Config holds Postgres storage configuration

type ConnectionConfig

type ConnectionConfig struct {
	DSN      string
	Host     string
	Port     int
	User     string
	Password string
	Database string
	SSLMode  string
	// PostgreSQL Client Certificate Configuration (for mTLS with database)
	ClientCertFile  string // Path to PostgreSQL client certificate file
	ClientKeyFile   string // Path to PostgreSQL client key file
	RootCertFile    string // Path to PostgreSQL root CA certificate file
	MaxOpenConns    int
	MaxIdleConns    int
	ConnMaxLifetime time.Duration
}

ConnectionConfig holds PostgreSQL connection configuration.

func DefaultConnectionConfig

func DefaultConnectionConfig() *ConnectionConfig

DefaultConnectionConfig returns a baseline configuration for local development.

type Dialect

type Dialect struct{}

Dialect implements sql.SQLDialect for PostgreSQL databases.

func NewDialect

func NewDialect() *Dialect

NewDialect creates a new PostgreSQL dialect.

func (*Dialect) BigIntType

func (d *Dialect) BigIntType() string

BigIntType returns the PostgreSQL big integer type.

func (*Dialect) BlobType

func (d *Dialect) BlobType() string

BlobType returns the PostgreSQL binary type.

func (*Dialect) CurrentTimestamp

func (d *Dialect) CurrentTimestamp() string

CurrentTimestamp returns the PostgreSQL current timestamp expression.

func (*Dialect) JSONExtract

func (d *Dialect) JSONExtract() string

JSONExtract returns the PostgreSQL JSON extraction function.

func (*Dialect) JSONExtractPath

func (d *Dialect) JSONExtractPath(key string) string

JSONExtractPath returns a JSON path argument for jsonb_extract_path_text

func (*Dialect) JSONSet

func (d *Dialect) JSONSet() string

JSONSet returns the PostgreSQL JSON modification function.

func (*Dialect) JSONSetPath

func (d *Dialect) JSONSetPath(key string) string

JSONSetPath returns a JSON path for jsonb_set (text[] literal)

func (*Dialect) JSONType

func (d *Dialect) JSONType() string

JSONType returns the PostgreSQL JSON type.

func (*Dialect) Placeholder

func (d *Dialect) Placeholder(n int) string

Placeholder returns the PostgreSQL positional parameter syntax.

func (*Dialect) RequiresSerializableForClaim

func (d *Dialect) RequiresSerializableForClaim() bool

RequiresSerializableForClaim indicates if SERIALIZABLE isolation is required.

func (*Dialect) SetBusyTimeout

func (d *Dialect) SetBusyTimeout(ms int) string

SetBusyTimeout returns an empty string; not applicable to PostgreSQL.

func (*Dialect) SetSynchronous

func (d *Dialect) SetSynchronous(level string) string

SetSynchronous returns an empty string; not applicable to PostgreSQL.

func (*Dialect) SetWALMode

func (d *Dialect) SetWALMode() string

SetWALMode returns an empty string; not applicable to PostgreSQL.

func (*Dialect) SupportsAdvisoryLocks

func (d *Dialect) SupportsAdvisoryLocks() bool

SupportsAdvisoryLocks indicates advisory lock support.

func (*Dialect) SupportsReturning

func (d *Dialect) SupportsReturning() bool

SupportsReturning indicates RETURNING clause support.

func (*Dialect) SupportsSkipLocked

func (d *Dialect) SupportsSkipLocked() bool

SupportsSkipLocked indicates SKIP LOCKED support.

func (*Dialect) TimestampType

func (d *Dialect) TimestampType() string

TimestampType returns the PostgreSQL timestamp type.

func (*Dialect) ToJSON

func (d *Dialect) ToJSON(value string) string

ToJSON converts a value to JSONB

func (*Dialect) UnixMillis

func (d *Dialect) UnixMillis(column string) string

UnixMillis extracts unix milliseconds from a timestamp expression.

type SchemaManager

type SchemaManager struct {
	// contains filtered or unexported fields
}

SchemaManager handles PostgreSQL schema initialization and versioning.

func NewSchemaManager

func NewSchemaManager() *SchemaManager

NewSchemaManager creates a new PostgreSQL schema manager.

func (*SchemaManager) EnsureVersionTable

func (m *SchemaManager) EnsureVersionTable(ctx context.Context, db *sql.DB) error

func (*SchemaManager) GetVersion

func (m *SchemaManager) GetVersion(ctx context.Context, db *sql.DB) (uint, bool, error)

func (*SchemaManager) Initialize

func (m *SchemaManager) Initialize(ctx context.Context, db *sql.DB) error

Initialize creates the initial schema if no version is present.

func (*SchemaManager) Migrate

func (m *SchemaManager) Migrate(ctx context.Context, db *sql.DB, targetVersion uint) error

func (*SchemaManager) SetVersion

func (m *SchemaManager) SetVersion(ctx context.Context, db *sql.DB, version uint, description string) error

func (*SchemaManager) Version

func (m *SchemaManager) Version(ctx context.Context, db *sql.DB) (uint, bool, error)

type Storage

type Storage struct {
	*repositorysql.BaseSQL
	// contains filtered or unexported fields
}

Storage implements the persistence.Storage interface for Postgres

func NewStorage

func NewStorage(ctx context.Context, config *Config) (*Storage, error)

NewStorage creates a new Postgres storage instance

func (*Storage) AcknowledgeMessage

func (s *Storage) AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error

AcknowledgeMessage marks a message as completed.

func (*Storage) ClaimMessage

func (s *Storage) ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)

ClaimMessage claims the next available message from a queue.

func (*Storage) Close

func (s *Storage) Close() error

Close closes the storage

func (*Storage) CreateQueue

func (s *Storage) CreateQueue(ctx context.Context, queue *queuepb.Queue) error

func (*Storage) CreateSchedule

func (s *Storage) CreateSchedule(ctx context.Context, schedule *schedulepb.Schedule) error

func (*Storage) DeleteDLQMessage

func (s *Storage) DeleteDLQMessage(ctx context.Context, queueName string, messageId string) error

DeleteDLQMessage permanently deletes a message from DLQ.

func (*Storage) DeleteQueue

func (s *Storage) DeleteQueue(ctx context.Context, name string) error

DeleteQueue deletes a queue.

func (*Storage) DeleteSchedule

func (s *Storage) DeleteSchedule(ctx context.Context, scheduleId string) error

DeleteSchedule deletes a schedule.

func (*Storage) EnqueueMessage

func (s *Storage) EnqueueMessage(ctx context.Context, queueName string, message *messagepb.Message) error

func (*Storage) ExtendMessageLease

func (s *Storage) ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, extensionMs int64) error

ExtendMessageLease extends the lease on a message.

func (*Storage) FindExpiredMessages

func (s *Storage) FindExpiredMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)

func (*Storage) GetDLQMessages

func (s *Storage) GetDLQMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)

func (*Storage) GetQueue

func (s *Storage) GetQueue(ctx context.Context, name string) (*queuepb.Queue, error)

GetQueue retrieves a queue by name.

func (*Storage) GetQueueMetadata

func (s *Storage) GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)

GetQueueMetadata retrieves queue metadata by name.

func (*Storage) GetSchedule

func (s *Storage) GetSchedule(ctx context.Context, scheduleId string) (*schedulepb.Schedule, error)

GetSchedule retrieves a schedule by ID.

func (*Storage) GetScheduleHistory

func (s *Storage) GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)

GetScheduleHistory returns the execution history for a schedule

func (*Storage) HeartbeatMessage

func (s *Storage) HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error

HeartbeatMessage updates the heartbeat for a message.

func (*Storage) ListQueues

func (s *Storage) ListQueues(ctx context.Context) ([]*queuepb.Queue, error)

ListQueues returns all queues.

func (*Storage) ListSchedules

func (s *Storage) ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)

ListSchedules returns schedules for a queue.

func (*Storage) NackMessage

func (s *Storage) NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error

NackMessage marks a message as failed.

func (*Storage) PauseSchedule

func (s *Storage) PauseSchedule(ctx context.Context, scheduleId string) error

PauseSchedule pauses a schedule.

func (*Storage) PeekMessages

func (s *Storage) PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)

PeekMessages retrieves messages without claiming them.

func (*Storage) PurgeDLQ

func (s *Storage) PurgeDLQ(ctx context.Context, queueName string) (int64, error)

PurgeDLQ deletes all errored messages for a queue and returns the count deleted.

func (*Storage) ReclaimExpiredMessage

func (s *Storage) ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error

ReclaimExpiredMessage moves an expired message back to pending or DLQ.

func (*Storage) RecordScheduleExecution

func (s *Storage) RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error

RecordScheduleExecution records a schedule execution.

func (*Storage) ResumeSchedule

func (s *Storage) ResumeSchedule(ctx context.Context, scheduleId string) error

ResumeSchedule resumes a paused schedule.

func (*Storage) RetryDLQMessage

func (s *Storage) RetryDLQMessage(ctx context.Context, queueName string, messageId string) error

RetryDLQMessage moves a message from DLQ back to pending.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL