Documentation
¶
Overview ¶
Package store provides the DedupRepo interface for inbound message deduplication.
Package store provides the JobRepo interface and model for durable job scheduling.
Package store provides the JobRunner for executing durable jobs.
Package store provides the OutboxRepo interface and model for restart-safe outgoing sends.
Package store provides the OutboxSender for processing outgoing messages.
Package store provides storage backends for PromptPipe.
This file implements a PostgreSQL-backed store for receipts.
Package store provides storage backends for PromptPipe.
This file implements an SQLite-backed store for receipts and responses.
Package store provides storage backends for PromptPipe.
This file defines the common interfaces and option types used by all store implementations.
Index ¶
- Constants
- func DetectDSNType(dsn string) string
- func ExtractDirFromSQLiteDSN(dsn string) (string, error)
- type DedupRecord
- type DedupRepo
- type InMemoryStore
- func (s *InMemoryStore) AddReceipt(r models.Receipt) error
- func (s *InMemoryStore) AddResponse(r models.Response) error
- func (s *InMemoryStore) ClearReceipts() error
- func (s *InMemoryStore) ClearResponses() error
- func (s *InMemoryStore) Close() error
- func (s *InMemoryStore) DeleteConversationParticipant(id string) error
- func (s *InMemoryStore) DeleteFlowState(participantID, flowType string) error
- func (s *InMemoryStore) DeleteRegisteredHook(phoneNumber string) error
- func (s *InMemoryStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)
- func (s *InMemoryStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
- func (s *InMemoryStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)
- func (s *InMemoryStore) GetReceipts() ([]models.Receipt, error)
- func (s *InMemoryStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
- func (s *InMemoryStore) GetResponses() ([]models.Response, error)
- func (s *InMemoryStore) ListConversationParticipants() ([]models.ConversationParticipant, error)
- func (s *InMemoryStore) ListRegisteredHooks() ([]models.RegisteredHook, error)
- func (s *InMemoryStore) SaveConversationParticipant(participant models.ConversationParticipant) error
- func (s *InMemoryStore) SaveFlowState(state models.FlowState) error
- func (s *InMemoryStore) SaveRegisteredHook(hook models.RegisteredHook) error
- type Job
- type JobHandler
- type JobRepo
- type JobRunner
- type JobStatus
- type Option
- type Opts
- type OutboxMessage
- type OutboxRepo
- type OutboxSendFunc
- type OutboxSender
- type OutboxStatus
- type PersistenceProvider
- type PostgresStore
- func (s *PostgresStore) AddReceipt(r models.Receipt) error
- func (s *PostgresStore) AddResponse(r models.Response) error
- func (s *PostgresStore) CancelJob(id string) error
- func (s *PostgresStore) ClaimDueJobs(now time.Time, limit int) ([]Job, error)
- func (s *PostgresStore) ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)
- func (s *PostgresStore) ClearReceipts() error
- func (s *PostgresStore) ClearResponses() error
- func (s *PostgresStore) Close() error
- func (s *PostgresStore) CompleteJob(id string) error
- func (s *PostgresStore) DedupRepo() DedupRepo
- func (s *PostgresStore) DeleteConversationParticipant(id string) error
- func (s *PostgresStore) DeleteFlowState(participantID, flowType string) error
- func (s *PostgresStore) DeleteRegisteredHook(phoneNumber string) error
- func (s *PostgresStore) EnqueueJob(kind string, runAt time.Time, payloadJSON string, dedupeKey string) (string, error)
- func (s *PostgresStore) EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)
- func (s *PostgresStore) FailJob(id string, errMsg string, nextRunAt time.Time) error
- func (s *PostgresStore) FailOutboxMessage(id string, errMsg string, nextAttemptAt time.Time) error
- func (s *PostgresStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)
- func (s *PostgresStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
- func (s *PostgresStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)
- func (s *PostgresStore) GetJob(id string) (*Job, error)
- func (s *PostgresStore) GetReceipts() ([]models.Receipt, error)
- func (s *PostgresStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
- func (s *PostgresStore) GetResponses() ([]models.Response, error)
- func (s *PostgresStore) IsDuplicate(messageID string) (bool, error)
- func (s *PostgresStore) JobRepo() JobRepo
- func (s *PostgresStore) ListConversationParticipants() ([]models.ConversationParticipant, error)
- func (s *PostgresStore) ListRegisteredHooks() ([]models.RegisteredHook, error)
- func (s *PostgresStore) MarkOutboxMessageSent(id string) error
- func (s *PostgresStore) MarkProcessed(messageID string) error
- func (s *PostgresStore) OutboxRepo() OutboxRepo
- func (s *PostgresStore) RecordInbound(messageID, participantID string) (bool, error)
- func (s *PostgresStore) RequeueStaleRunningJobs(staleBefore time.Time) (int, error)
- func (s *PostgresStore) RequeueStaleSendingMessages(staleBefore time.Time) (int, error)
- func (s *PostgresStore) SaveConversationParticipant(participant models.ConversationParticipant) error
- func (s *PostgresStore) SaveFlowState(state models.FlowState) error
- func (s *PostgresStore) SaveRegisteredHook(hook models.RegisteredHook) error
- type SQLiteStore
- func (s *SQLiteStore) AddReceipt(r models.Receipt) error
- func (s *SQLiteStore) AddResponse(r models.Response) error
- func (s *SQLiteStore) CancelJob(id string) error
- func (s *SQLiteStore) ClaimDueJobs(now time.Time, limit int) ([]Job, error)
- func (s *SQLiteStore) ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)
- func (s *SQLiteStore) ClearReceipts() error
- func (s *SQLiteStore) ClearResponses() error
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) CompleteJob(id string) error
- func (s *SQLiteStore) DedupRepo() DedupRepo
- func (s *SQLiteStore) DeleteConversationParticipant(id string) error
- func (s *SQLiteStore) DeleteFlowState(participantID, flowType string) error
- func (s *SQLiteStore) DeleteRegisteredHook(phoneNumber string) error
- func (s *SQLiteStore) EnqueueJob(kind string, runAt time.Time, payloadJSON string, dedupeKey string) (string, error)
- func (s *SQLiteStore) EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)
- func (s *SQLiteStore) FailJob(id string, errMsg string, nextRunAt time.Time) error
- func (s *SQLiteStore) FailOutboxMessage(id string, errMsg string, nextAttemptAt time.Time) error
- func (s *SQLiteStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)
- func (s *SQLiteStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
- func (s *SQLiteStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)
- func (s *SQLiteStore) GetJob(id string) (*Job, error)
- func (s *SQLiteStore) GetReceipts() ([]models.Receipt, error)
- func (s *SQLiteStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
- func (s *SQLiteStore) GetResponses() ([]models.Response, error)
- func (s *SQLiteStore) IsDuplicate(messageID string) (bool, error)
- func (s *SQLiteStore) JobRepo() JobRepo
- func (s *SQLiteStore) ListConversationParticipants() ([]models.ConversationParticipant, error)
- func (s *SQLiteStore) ListRegisteredHooks() ([]models.RegisteredHook, error)
- func (s *SQLiteStore) MarkOutboxMessageSent(id string) error
- func (s *SQLiteStore) MarkProcessed(messageID string) error
- func (s *SQLiteStore) OutboxRepo() OutboxRepo
- func (s *SQLiteStore) RecordInbound(messageID, participantID string) (bool, error)
- func (s *SQLiteStore) RequeueStaleRunningJobs(staleBefore time.Time) (int, error)
- func (s *SQLiteStore) RequeueStaleSendingMessages(staleBefore time.Time) (int, error)
- func (s *SQLiteStore) SaveConversationParticipant(participant models.ConversationParticipant) error
- func (s *SQLiteStore) SaveFlowState(state models.FlowState) error
- func (s *SQLiteStore) SaveRegisteredHook(hook models.RegisteredHook) error
- type Store
Constants ¶
const ( // DefaultMaxOpenConns is the default maximum number of open connections to the database DefaultMaxOpenConns = 25 // DefaultMaxIdleConns is the default maximum number of idle connections in the pool DefaultMaxIdleConns = 25 // DefaultConnMaxLifetime is the default maximum amount of time a connection may be reused DefaultConnMaxLifetime = 5 * time.Minute )
Database connection pool configuration constants
const ( // DefaultDirPermissions defines the default permissions for database directories DefaultDirPermissions = 0755 // SQLiteMaxOpenConns is the maximum number of open connections for SQLite (should be 1 for WAL mode safety) SQLiteMaxOpenConns = 1 // SQLiteMaxIdleConns is the maximum number of idle connections for SQLite SQLiteMaxIdleConns = 1 // SQLiteConnMaxLifetime is the maximum amount of time a SQLite connection may be reused SQLiteConnMaxLifetime = 30 * time.Minute )
Constants for SQLite store configuration
Variables ¶
This section is empty.
Functions ¶
func DetectDSNType ¶
DetectDSNType analyzes a DSN and returns the appropriate database driver. Returns "postgres" for PostgreSQL DSNs, "sqlite3" for SQLite file paths.
func ExtractDirFromSQLiteDSN ¶
ExtractDirFromSQLiteDSN extracts the directory path from a SQLite DSN string, handling both file URIs (e.g., "file:/path/to/file?_foreign_keys=on") and regular file paths. This function is specifically designed for SQLite DSNs and will return an error if called with non-SQLite DSNs (e.g., PostgreSQL DSNs). Returns the directory containing the SQLite database file, or an error if: - The DSN is not a SQLite DSN - The file URI cannot be parsed - The resulting path is invalid
Types ¶
type DedupRecord ¶
type DedupRecord struct {
MessageID string `json:"message_id"`
ParticipantID string `json:"participant_id"`
ReceivedAt time.Time `json:"received_at"`
ProcessedAt *time.Time `json:"processed_at"`
}
DedupRecord represents an inbound message deduplication record.
type DedupRepo ¶
type DedupRepo interface {
// IsDuplicate checks if a message ID has already been processed.
// Returns true if the message was already seen.
IsDuplicate(messageID string) (bool, error)
// RecordInbound inserts a new inbound message record. Returns false if the
// message was already recorded (duplicate).
RecordInbound(messageID, participantID string) (bool, error)
// MarkProcessed sets the processed_at timestamp for a message.
MarkProcessed(messageID string) error
}
DedupRepo defines the interface for inbound message deduplication.
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
InMemoryStore is a simple in-memory implementation of the Store interface. Data is stored in memory and will be lost when the application restarts.
func NewInMemoryStore ¶
func NewInMemoryStore() *InMemoryStore
NewInMemoryStore creates a new in-memory store.
func (*InMemoryStore) AddReceipt ¶
func (s *InMemoryStore) AddReceipt(r models.Receipt) error
AddReceipt stores a receipt in memory.
func (*InMemoryStore) AddResponse ¶
func (s *InMemoryStore) AddResponse(r models.Response) error
AddResponse stores an incoming response in memory.
func (*InMemoryStore) ClearReceipts ¶
func (s *InMemoryStore) ClearReceipts() error
ClearReceipts clears all stored receipts (for tests).
func (*InMemoryStore) ClearResponses ¶
func (s *InMemoryStore) ClearResponses() error
ClearResponses clears all stored responses (for tests).
func (*InMemoryStore) Close ¶
func (s *InMemoryStore) Close() error
Close is a no-op for in-memory store as there are no resources to clean up.
func (*InMemoryStore) DeleteConversationParticipant ¶
func (s *InMemoryStore) DeleteConversationParticipant(id string) error
DeleteConversationParticipant removes a conversation participant.
func (*InMemoryStore) DeleteFlowState ¶
func (s *InMemoryStore) DeleteFlowState(participantID, flowType string) error
DeleteFlowState removes flow state for a participant.
func (*InMemoryStore) DeleteRegisteredHook ¶
func (s *InMemoryStore) DeleteRegisteredHook(phoneNumber string) error
DeleteRegisteredHook removes a registered hook by phone number.
func (*InMemoryStore) GetConversationParticipant ¶
func (s *InMemoryStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)
GetConversationParticipant retrieves a conversation participant by ID.
func (*InMemoryStore) GetConversationParticipantByPhone ¶
func (s *InMemoryStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
GetConversationParticipantByPhone retrieves a conversation participant by phone number.
func (*InMemoryStore) GetFlowState ¶
func (s *InMemoryStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)
GetFlowState retrieves flow state for a participant.
func (*InMemoryStore) GetReceipts ¶
func (s *InMemoryStore) GetReceipts() ([]models.Receipt, error)
GetReceipts retrieves all stored receipts from memory.
func (*InMemoryStore) GetRegisteredHook ¶
func (s *InMemoryStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
GetRegisteredHook retrieves a registered hook by phone number.
func (*InMemoryStore) GetResponses ¶
func (s *InMemoryStore) GetResponses() ([]models.Response, error)
GetResponses retrieves all stored responses from memory.
func (*InMemoryStore) ListConversationParticipants ¶
func (s *InMemoryStore) ListConversationParticipants() ([]models.ConversationParticipant, error)
ListConversationParticipants retrieves all conversation participants.
func (*InMemoryStore) ListRegisteredHooks ¶
func (s *InMemoryStore) ListRegisteredHooks() ([]models.RegisteredHook, error)
ListRegisteredHooks retrieves all registered hooks.
func (*InMemoryStore) SaveConversationParticipant ¶
func (s *InMemoryStore) SaveConversationParticipant(participant models.ConversationParticipant) error
SaveConversationParticipant stores or updates a conversation participant.
func (*InMemoryStore) SaveFlowState ¶
func (s *InMemoryStore) SaveFlowState(state models.FlowState) error
SaveFlowState stores or updates flow state for a participant.
func (*InMemoryStore) SaveRegisteredHook ¶
func (s *InMemoryStore) SaveRegisteredHook(hook models.RegisteredHook) error
SaveRegisteredHook stores a registered hook.
type Job ¶
type Job struct {
ID string `json:"id"`
Kind string `json:"kind"`
RunAt time.Time `json:"run_at"`
PayloadJSON string `json:"payload_json"`
Status JobStatus `json:"status"`
Attempt int `json:"attempt"`
MaxAttempts int `json:"max_attempts"`
LastError string `json:"last_error"`
LockedAt *time.Time `json:"locked_at"`
DedupeKey string `json:"dedupe_key"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
Job represents a durable job record that replaces in-memory timers.
type JobHandler ¶
JobHandler is a function that executes a job's work. It receives the job's payload JSON and returns an error if the execution failed.
type JobRepo ¶
type JobRepo interface {
// EnqueueJob inserts a new job. If dedupeKey is non-empty and a non-terminal
// job with that key already exists, the call returns the existing job ID
// without inserting a duplicate.
EnqueueJob(kind string, runAt time.Time, payloadJSON string, dedupeKey string) (string, error)
// ClaimDueJobs marks up to limit queued jobs whose run_at <= now as running
// and returns them.
ClaimDueJobs(now time.Time, limit int) ([]Job, error)
// CompleteJob marks a job as done.
CompleteJob(id string) error
// FailJob marks a job as failed, stores the error, and reschedules for retry
// at nextRunAt if attempt < max_attempts; otherwise marks as permanently failed.
FailJob(id string, errMsg string, nextRunAt time.Time) error
// CancelJob marks a job as canceled.
CancelJob(id string) error
// RequeueStaleRunningJobs resets jobs that have been running since before
// staleBefore back to queued status (crash recovery).
RequeueStaleRunningJobs(staleBefore time.Time) (int, error)
// GetJob retrieves a single job by ID.
GetJob(id string) (*Job, error)
}
JobRepo defines the interface for durable job persistence.
type JobRunner ¶
type JobRunner struct {
// contains filtered or unexported fields
}
JobRunner periodically claims due jobs from the database and dispatches them to registered handlers.
func NewJobRunner ¶
NewJobRunner creates a new JobRunner.
func (*JobRunner) RecoverStaleJobs ¶
RecoverStaleJobs requeues jobs that were running when the process crashed. Should be called once at startup.
func (*JobRunner) RegisterHandler ¶
func (r *JobRunner) RegisterHandler(kind string, handler JobHandler)
RegisterHandler registers a handler for a given job kind.
type Option ¶
type Option func(*Opts)
Option defines a configuration option for store implementations.
func WithPostgresDSN ¶
WithPostgresDSN sets the PostgreSQL database connection string.
func WithSQLiteDSN ¶
WithSQLiteDSN sets the SQLite database file path.
type Opts ¶
type Opts struct {
DSN string // Database connection string or file path for SQLite
}
Opts holds configuration options for store implementations.
type OutboxMessage ¶
type OutboxMessage struct {
ID string `json:"id"`
ParticipantID string `json:"participant_id"`
Kind string `json:"kind"`
PayloadJSON string `json:"payload_json"`
Status OutboxStatus `json:"status"`
Attempts int `json:"attempts"`
NextAttemptAt *time.Time `json:"next_attempt_at"`
DedupeKey string `json:"dedupe_key"`
LockedAt *time.Time `json:"locked_at"`
LastError string `json:"last_error"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
OutboxMessage represents a durable outgoing message record.
type OutboxRepo ¶
type OutboxRepo interface {
// EnqueueOutboxMessage inserts a new outbox message. If dedupeKey is non-empty
// and a non-terminal message with that key exists, returns the existing ID.
EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)
// ClaimDueOutboxMessages marks up to limit queued messages whose
// next_attempt_at <= now (or is NULL) as sending and returns them.
ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)
// MarkOutboxMessageSent marks a message as successfully sent.
MarkOutboxMessageSent(id string) error
// FailOutboxMessage records a send failure and schedules a retry at nextAttemptAt.
FailOutboxMessage(id string, errMsg string, nextAttemptAt time.Time) error
// RequeueStaleSendingMessages resets messages stuck in sending since before
// staleBefore back to queued (crash recovery).
RequeueStaleSendingMessages(staleBefore time.Time) (int, error)
}
OutboxRepo defines the interface for durable outbox message persistence.
type OutboxSendFunc ¶
type OutboxSendFunc func(ctx context.Context, msg OutboxMessage) error
OutboxSendFunc is the callback that performs the actual message send. It receives the outbox message and should return an error if sending failed.
type OutboxSender ¶
type OutboxSender struct {
// contains filtered or unexported fields
}
OutboxSender periodically claims due outbox messages and attempts to send them.
func NewOutboxSender ¶
func NewOutboxSender(repo OutboxRepo, sendFunc OutboxSendFunc, pollInterval time.Duration) *OutboxSender
NewOutboxSender creates a new OutboxSender.
func (*OutboxSender) RecoverStaleMessages ¶
func (s *OutboxSender) RecoverStaleMessages() error
RecoverStaleMessages requeues messages stuck in sending state (crash recovery). Should be called once at startup.
func (*OutboxSender) Run ¶
func (s *OutboxSender) Run(ctx context.Context)
Run starts the polling loop. It blocks until the context is cancelled.
type OutboxStatus ¶
type OutboxStatus string
OutboxStatus represents the lifecycle state of an outbox message.
const ( OutboxStatusQueued OutboxStatus = "queued" OutboxStatusSending OutboxStatus = "sending" OutboxStatusSent OutboxStatus = "sent" OutboxStatusFailed OutboxStatus = "failed" OutboxStatusCanceled OutboxStatus = "canceled" )
type PersistenceProvider ¶
type PersistenceProvider interface {
JobRepo() JobRepo
OutboxRepo() OutboxRepo
DedupRepo() DedupRepo
}
PersistenceProvider is implemented by database-backed stores that support durable job scheduling, outbox messaging, and inbound deduplication.
type PostgresStore ¶
type PostgresStore struct {
// contains filtered or unexported fields
}
func NewPostgresStore ¶
func NewPostgresStore(opts ...Option) (*PostgresStore, error)
NewPostgresStore creates a new Postgres store based on provided options.
func (*PostgresStore) AddReceipt ¶
func (s *PostgresStore) AddReceipt(r models.Receipt) error
func (*PostgresStore) AddResponse ¶
func (s *PostgresStore) AddResponse(r models.Response) error
AddResponse stores an incoming response in Postgres.
func (*PostgresStore) CancelJob ¶
func (s *PostgresStore) CancelJob(id string) error
func (*PostgresStore) ClaimDueJobs ¶
func (*PostgresStore) ClaimDueOutboxMessages ¶
func (s *PostgresStore) ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)
func (*PostgresStore) ClearReceipts ¶
func (s *PostgresStore) ClearReceipts() error
ClearReceipts deletes all records in receipts table (for tests).
func (*PostgresStore) ClearResponses ¶
func (s *PostgresStore) ClearResponses() error
ClearResponses deletes all records in responses table (for tests).
func (*PostgresStore) Close ¶
func (s *PostgresStore) Close() error
Close closes the PostgreSQL database connection.
func (*PostgresStore) CompleteJob ¶
func (s *PostgresStore) CompleteJob(id string) error
func (*PostgresStore) DedupRepo ¶
func (s *PostgresStore) DedupRepo() DedupRepo
DedupRepo returns the PostgresStore as a DedupRepo.
func (*PostgresStore) DeleteConversationParticipant ¶
func (s *PostgresStore) DeleteConversationParticipant(id string) error
DeleteConversationParticipant removes a conversation participant.
func (*PostgresStore) DeleteFlowState ¶
func (s *PostgresStore) DeleteFlowState(participantID, flowType string) error
DeleteFlowState removes flow state for a participant.
func (*PostgresStore) DeleteRegisteredHook ¶
func (s *PostgresStore) DeleteRegisteredHook(phoneNumber string) error
DeleteRegisteredHook deletes a registered hook by phone number
func (*PostgresStore) EnqueueJob ¶
func (*PostgresStore) EnqueueOutboxMessage ¶
func (s *PostgresStore) EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)
func (*PostgresStore) FailOutboxMessage ¶
func (*PostgresStore) GetConversationParticipant ¶
func (s *PostgresStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)
GetConversationParticipant retrieves a conversation participant by ID.
func (*PostgresStore) GetConversationParticipantByPhone ¶
func (s *PostgresStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
GetConversationParticipantByPhone retrieves a conversation participant by phone number.
func (*PostgresStore) GetFlowState ¶
func (s *PostgresStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)
GetFlowState retrieves flow state for a participant.
func (*PostgresStore) GetReceipts ¶
func (s *PostgresStore) GetReceipts() ([]models.Receipt, error)
func (*PostgresStore) GetRegisteredHook ¶
func (s *PostgresStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
GetRegisteredHook retrieves a registered hook by phone number
func (*PostgresStore) GetResponses ¶
func (s *PostgresStore) GetResponses() ([]models.Response, error)
GetResponses retrieves all stored responses from Postgres.
func (*PostgresStore) IsDuplicate ¶
func (s *PostgresStore) IsDuplicate(messageID string) (bool, error)
func (*PostgresStore) JobRepo ¶
func (s *PostgresStore) JobRepo() JobRepo
JobRepo returns the PostgresStore as a JobRepo.
func (*PostgresStore) ListConversationParticipants ¶
func (s *PostgresStore) ListConversationParticipants() ([]models.ConversationParticipant, error)
ListConversationParticipants retrieves all conversation participants.
func (*PostgresStore) ListRegisteredHooks ¶
func (s *PostgresStore) ListRegisteredHooks() ([]models.RegisteredHook, error)
ListRegisteredHooks retrieves all registered hooks
func (*PostgresStore) MarkOutboxMessageSent ¶
func (s *PostgresStore) MarkOutboxMessageSent(id string) error
func (*PostgresStore) MarkProcessed ¶
func (s *PostgresStore) MarkProcessed(messageID string) error
func (*PostgresStore) OutboxRepo ¶
func (s *PostgresStore) OutboxRepo() OutboxRepo
OutboxRepo returns the PostgresStore as an OutboxRepo.
func (*PostgresStore) RecordInbound ¶
func (s *PostgresStore) RecordInbound(messageID, participantID string) (bool, error)
func (*PostgresStore) RequeueStaleRunningJobs ¶
func (s *PostgresStore) RequeueStaleRunningJobs(staleBefore time.Time) (int, error)
func (*PostgresStore) RequeueStaleSendingMessages ¶
func (s *PostgresStore) RequeueStaleSendingMessages(staleBefore time.Time) (int, error)
func (*PostgresStore) SaveConversationParticipant ¶
func (s *PostgresStore) SaveConversationParticipant(participant models.ConversationParticipant) error
SaveConversationParticipant stores or updates a conversation participant.
func (*PostgresStore) SaveFlowState ¶
func (s *PostgresStore) SaveFlowState(state models.FlowState) error
SaveFlowState stores or updates flow state for a participant.
func (*PostgresStore) SaveRegisteredHook ¶
func (s *PostgresStore) SaveRegisteredHook(hook models.RegisteredHook) error
SaveRegisteredHook stores or updates a registered hook.
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
func NewSQLiteStore ¶
func NewSQLiteStore(opts ...Option) (*SQLiteStore, error)
NewSQLiteStore creates a new SQLite store with the given DSN. The DSN should be a file path to the SQLite database file. If the directory doesn't exist, it will be created.
func (*SQLiteStore) AddReceipt ¶
func (s *SQLiteStore) AddReceipt(r models.Receipt) error
func (*SQLiteStore) AddResponse ¶
func (s *SQLiteStore) AddResponse(r models.Response) error
func (*SQLiteStore) CancelJob ¶
func (s *SQLiteStore) CancelJob(id string) error
func (*SQLiteStore) ClaimDueJobs ¶
func (*SQLiteStore) ClaimDueOutboxMessages ¶
func (s *SQLiteStore) ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)
func (*SQLiteStore) ClearReceipts ¶
func (s *SQLiteStore) ClearReceipts() error
ClearReceipts deletes all records in receipts table (for tests).
func (*SQLiteStore) ClearResponses ¶
func (s *SQLiteStore) ClearResponses() error
ClearResponses deletes all records in responses table (for tests).
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close closes the SQLite database connection.
func (*SQLiteStore) CompleteJob ¶
func (s *SQLiteStore) CompleteJob(id string) error
func (*SQLiteStore) DedupRepo ¶
func (s *SQLiteStore) DedupRepo() DedupRepo
DedupRepo returns the SQLiteStore as a DedupRepo.
func (*SQLiteStore) DeleteConversationParticipant ¶
func (s *SQLiteStore) DeleteConversationParticipant(id string) error
DeleteConversationParticipant removes a conversation participant.
func (*SQLiteStore) DeleteFlowState ¶
func (s *SQLiteStore) DeleteFlowState(participantID, flowType string) error
DeleteFlowState removes flow state for a participant.
func (*SQLiteStore) DeleteRegisteredHook ¶
func (s *SQLiteStore) DeleteRegisteredHook(phoneNumber string) error
DeleteRegisteredHook removes a registered hook by phone number.
func (*SQLiteStore) EnqueueJob ¶
func (*SQLiteStore) EnqueueOutboxMessage ¶
func (s *SQLiteStore) EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)
func (*SQLiteStore) FailOutboxMessage ¶
func (*SQLiteStore) GetConversationParticipant ¶
func (s *SQLiteStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)
GetConversationParticipant retrieves a conversation participant by ID.
func (*SQLiteStore) GetConversationParticipantByPhone ¶
func (s *SQLiteStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
GetConversationParticipantByPhone retrieves a conversation participant by phone number.
func (*SQLiteStore) GetFlowState ¶
func (s *SQLiteStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)
GetFlowState retrieves flow state for a participant.
func (*SQLiteStore) GetReceipts ¶
func (s *SQLiteStore) GetReceipts() ([]models.Receipt, error)
func (*SQLiteStore) GetRegisteredHook ¶
func (s *SQLiteStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
GetRegisteredHook retrieves a registered hook by phone number.
func (*SQLiteStore) GetResponses ¶
func (s *SQLiteStore) GetResponses() ([]models.Response, error)
func (*SQLiteStore) IsDuplicate ¶
func (s *SQLiteStore) IsDuplicate(messageID string) (bool, error)
func (*SQLiteStore) JobRepo ¶
func (s *SQLiteStore) JobRepo() JobRepo
JobRepo returns the SQLiteStore as a JobRepo.
func (*SQLiteStore) ListConversationParticipants ¶
func (s *SQLiteStore) ListConversationParticipants() ([]models.ConversationParticipant, error)
ListConversationParticipants retrieves all conversation participants.
func (*SQLiteStore) ListRegisteredHooks ¶
func (s *SQLiteStore) ListRegisteredHooks() ([]models.RegisteredHook, error)
ListRegisteredHooks retrieves all registered hooks.
func (*SQLiteStore) MarkOutboxMessageSent ¶
func (s *SQLiteStore) MarkOutboxMessageSent(id string) error
func (*SQLiteStore) MarkProcessed ¶
func (s *SQLiteStore) MarkProcessed(messageID string) error
func (*SQLiteStore) OutboxRepo ¶
func (s *SQLiteStore) OutboxRepo() OutboxRepo
OutboxRepo returns the SQLiteStore as an OutboxRepo.
func (*SQLiteStore) RecordInbound ¶
func (s *SQLiteStore) RecordInbound(messageID, participantID string) (bool, error)
func (*SQLiteStore) RequeueStaleRunningJobs ¶
func (s *SQLiteStore) RequeueStaleRunningJobs(staleBefore time.Time) (int, error)
func (*SQLiteStore) RequeueStaleSendingMessages ¶
func (s *SQLiteStore) RequeueStaleSendingMessages(staleBefore time.Time) (int, error)
func (*SQLiteStore) SaveConversationParticipant ¶
func (s *SQLiteStore) SaveConversationParticipant(participant models.ConversationParticipant) error
SaveConversationParticipant stores or updates a conversation participant.
func (*SQLiteStore) SaveFlowState ¶
func (s *SQLiteStore) SaveFlowState(state models.FlowState) error
SaveFlowState stores or updates flow state for a participant.
func (*SQLiteStore) SaveRegisteredHook ¶
func (s *SQLiteStore) SaveRegisteredHook(hook models.RegisteredHook) error
SaveRegisteredHook stores or updates a registered hook.
type Store ¶
type Store interface {
AddReceipt(r models.Receipt) error
GetReceipts() ([]models.Receipt, error)
AddResponse(r models.Response) error
GetResponses() ([]models.Response, error)
ClearReceipts() error // for tests
ClearResponses() error // for tests
Close() error // for proper resource cleanup
// Flow state management
SaveFlowState(state models.FlowState) error
GetFlowState(participantID, flowType string) (*models.FlowState, error)
DeleteFlowState(participantID, flowType string) error
// Conversation participant management
SaveConversationParticipant(participant models.ConversationParticipant) error
GetConversationParticipant(id string) (*models.ConversationParticipant, error)
GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
ListConversationParticipants() ([]models.ConversationParticipant, error)
DeleteConversationParticipant(id string) error
// Response hook persistence management
SaveRegisteredHook(hook models.RegisteredHook) error
GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
ListRegisteredHooks() ([]models.RegisteredHook, error)
DeleteRegisteredHook(phoneNumber string) error
}
Store defines the interface for storing receipts, responses, and flow state.