store

package
v0.0.0-...-c1e920f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 28, 2026 License: MIT Imports: 16 Imported by: 0

Documentation

Overview

Package store provides the DedupRepo interface for inbound message deduplication.

Package store provides the JobRepo interface and model for durable job scheduling.

Package store provides the JobRunner for executing durable jobs.

Package store provides the OutboxRepo interface and model for restart-safe outgoing sends.

Package store provides the OutboxSender for processing outgoing messages.

Package store provides storage backends for PromptPipe.

This file implements a PostgreSQL-backed store for receipts.

Package store provides storage backends for PromptPipe.

This file implements an SQLite-backed store for receipts and responses.

Package store provides storage backends for PromptPipe.

This file defines the common interfaces and option types used by all store implementations.

Index

Constants

View Source
const (
	// DefaultMaxOpenConns is the default maximum number of open connections to the database
	DefaultMaxOpenConns = 25
	// DefaultMaxIdleConns is the default maximum number of idle connections in the pool
	DefaultMaxIdleConns = 25
	// DefaultConnMaxLifetime is the default maximum amount of time a connection may be reused
	DefaultConnMaxLifetime = 5 * time.Minute
)

Database connection pool configuration constants

View Source
const (
	// DefaultDirPermissions defines the default permissions for database directories
	DefaultDirPermissions = 0755
	// SQLiteMaxOpenConns is the maximum number of open connections for SQLite (should be 1 for WAL mode safety)
	SQLiteMaxOpenConns = 1
	// SQLiteMaxIdleConns is the maximum number of idle connections for SQLite
	SQLiteMaxIdleConns = 1
	// SQLiteConnMaxLifetime is the maximum amount of time a SQLite connection may be reused
	SQLiteConnMaxLifetime = 30 * time.Minute
)

Constants for SQLite store configuration

Variables

This section is empty.

Functions

func DetectDSNType

func DetectDSNType(dsn string) string

DetectDSNType analyzes a DSN and returns the appropriate database driver. Returns "postgres" for PostgreSQL DSNs, "sqlite3" for SQLite file paths.

func ExtractDirFromSQLiteDSN

func ExtractDirFromSQLiteDSN(dsn string) (string, error)

ExtractDirFromSQLiteDSN extracts the directory path from a SQLite DSN string, handling both file URIs (e.g., "file:/path/to/file?_foreign_keys=on") and regular file paths. This function is specifically designed for SQLite DSNs and will return an error if called with non-SQLite DSNs (e.g., PostgreSQL DSNs). Returns the directory containing the SQLite database file, or an error if: - The DSN is not a SQLite DSN - The file URI cannot be parsed - The resulting path is invalid

Types

type DedupRecord

type DedupRecord struct {
	MessageID     string     `json:"message_id"`
	ParticipantID string     `json:"participant_id"`
	ReceivedAt    time.Time  `json:"received_at"`
	ProcessedAt   *time.Time `json:"processed_at"`
}

DedupRecord represents an inbound message deduplication record.

type DedupRepo

type DedupRepo interface {
	// IsDuplicate checks if a message ID has already been processed.
	// Returns true if the message was already seen.
	IsDuplicate(messageID string) (bool, error)

	// RecordInbound inserts a new inbound message record. Returns false if the
	// message was already recorded (duplicate).
	RecordInbound(messageID, participantID string) (bool, error)

	// MarkProcessed sets the processed_at timestamp for a message.
	MarkProcessed(messageID string) error
}

DedupRepo defines the interface for inbound message deduplication.

type InMemoryStore

type InMemoryStore struct {
	// contains filtered or unexported fields
}

InMemoryStore is a simple in-memory implementation of the Store interface. Data is stored in memory and will be lost when the application restarts.

func NewInMemoryStore

func NewInMemoryStore() *InMemoryStore

NewInMemoryStore creates a new in-memory store.

func (*InMemoryStore) AddReceipt

func (s *InMemoryStore) AddReceipt(r models.Receipt) error

AddReceipt stores a receipt in memory.

func (*InMemoryStore) AddResponse

func (s *InMemoryStore) AddResponse(r models.Response) error

AddResponse stores an incoming response in memory.

func (*InMemoryStore) ClearReceipts

func (s *InMemoryStore) ClearReceipts() error

ClearReceipts clears all stored receipts (for tests).

func (*InMemoryStore) ClearResponses

func (s *InMemoryStore) ClearResponses() error

ClearResponses clears all stored responses (for tests).

func (*InMemoryStore) Close

func (s *InMemoryStore) Close() error

Close is a no-op for in-memory store as there are no resources to clean up.

func (*InMemoryStore) DeleteConversationParticipant

func (s *InMemoryStore) DeleteConversationParticipant(id string) error

DeleteConversationParticipant removes a conversation participant.

func (*InMemoryStore) DeleteFlowState

func (s *InMemoryStore) DeleteFlowState(participantID, flowType string) error

DeleteFlowState removes flow state for a participant.

func (*InMemoryStore) DeleteRegisteredHook

func (s *InMemoryStore) DeleteRegisteredHook(phoneNumber string) error

DeleteRegisteredHook removes a registered hook by phone number.

func (*InMemoryStore) GetConversationParticipant

func (s *InMemoryStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)

GetConversationParticipant retrieves a conversation participant by ID.

func (*InMemoryStore) GetConversationParticipantByPhone

func (s *InMemoryStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)

GetConversationParticipantByPhone retrieves a conversation participant by phone number.

func (*InMemoryStore) GetFlowState

func (s *InMemoryStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)

GetFlowState retrieves flow state for a participant.

func (*InMemoryStore) GetReceipts

func (s *InMemoryStore) GetReceipts() ([]models.Receipt, error)

GetReceipts retrieves all stored receipts from memory.

func (*InMemoryStore) GetRegisteredHook

func (s *InMemoryStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)

GetRegisteredHook retrieves a registered hook by phone number.

func (*InMemoryStore) GetResponses

func (s *InMemoryStore) GetResponses() ([]models.Response, error)

GetResponses retrieves all stored responses from memory.

func (*InMemoryStore) ListConversationParticipants

func (s *InMemoryStore) ListConversationParticipants() ([]models.ConversationParticipant, error)

ListConversationParticipants retrieves all conversation participants.

func (*InMemoryStore) ListRegisteredHooks

func (s *InMemoryStore) ListRegisteredHooks() ([]models.RegisteredHook, error)

ListRegisteredHooks retrieves all registered hooks.

func (*InMemoryStore) SaveConversationParticipant

func (s *InMemoryStore) SaveConversationParticipant(participant models.ConversationParticipant) error

SaveConversationParticipant stores or updates a conversation participant.

func (*InMemoryStore) SaveFlowState

func (s *InMemoryStore) SaveFlowState(state models.FlowState) error

SaveFlowState stores or updates flow state for a participant.

func (*InMemoryStore) SaveRegisteredHook

func (s *InMemoryStore) SaveRegisteredHook(hook models.RegisteredHook) error

SaveRegisteredHook stores a registered hook.

type Job

type Job struct {
	ID          string     `json:"id"`
	Kind        string     `json:"kind"`
	RunAt       time.Time  `json:"run_at"`
	PayloadJSON string     `json:"payload_json"`
	Status      JobStatus  `json:"status"`
	Attempt     int        `json:"attempt"`
	MaxAttempts int        `json:"max_attempts"`
	LastError   string     `json:"last_error"`
	LockedAt    *time.Time `json:"locked_at"`
	DedupeKey   string     `json:"dedupe_key"`
	CreatedAt   time.Time  `json:"created_at"`
	UpdatedAt   time.Time  `json:"updated_at"`
}

Job represents a durable job record that replaces in-memory timers.

type JobHandler

type JobHandler func(ctx context.Context, payload string) error

JobHandler is a function that executes a job's work. It receives the job's payload JSON and returns an error if the execution failed.

type JobRepo

type JobRepo interface {
	// EnqueueJob inserts a new job. If dedupeKey is non-empty and a non-terminal
	// job with that key already exists, the call returns the existing job ID
	// without inserting a duplicate.
	EnqueueJob(kind string, runAt time.Time, payloadJSON string, dedupeKey string) (string, error)

	// ClaimDueJobs marks up to limit queued jobs whose run_at <= now as running
	// and returns them.
	ClaimDueJobs(now time.Time, limit int) ([]Job, error)

	// CompleteJob marks a job as done.
	CompleteJob(id string) error

	// FailJob marks a job as failed, stores the error, and reschedules for retry
	// at nextRunAt if attempt < max_attempts; otherwise marks as permanently failed.
	FailJob(id string, errMsg string, nextRunAt time.Time) error

	// CancelJob marks a job as canceled.
	CancelJob(id string) error

	// RequeueStaleRunningJobs resets jobs that have been running since before
	// staleBefore back to queued status (crash recovery).
	RequeueStaleRunningJobs(staleBefore time.Time) (int, error)

	// GetJob retrieves a single job by ID.
	GetJob(id string) (*Job, error)
}

JobRepo defines the interface for durable job persistence.

type JobRunner

type JobRunner struct {
	// contains filtered or unexported fields
}

JobRunner periodically claims due jobs from the database and dispatches them to registered handlers.

func NewJobRunner

func NewJobRunner(repo JobRepo, pollInterval time.Duration) *JobRunner

NewJobRunner creates a new JobRunner.

func (*JobRunner) RecoverStaleJobs

func (r *JobRunner) RecoverStaleJobs() error

RecoverStaleJobs requeues jobs that were running when the process crashed. Should be called once at startup.

func (*JobRunner) RegisterHandler

func (r *JobRunner) RegisterHandler(kind string, handler JobHandler)

RegisterHandler registers a handler for a given job kind.

func (*JobRunner) Run

func (r *JobRunner) Run(ctx context.Context)

Run starts the polling loop. It blocks until the context is cancelled.

type JobStatus

type JobStatus string

JobStatus represents the lifecycle state of a job.

const (
	JobStatusQueued   JobStatus = "queued"
	JobStatusRunning  JobStatus = "running"
	JobStatusDone     JobStatus = "done"
	JobStatusFailed   JobStatus = "failed"
	JobStatusCanceled JobStatus = "canceled"
)

type Option

type Option func(*Opts)

Option defines a configuration option for store implementations.

func WithPostgresDSN

func WithPostgresDSN(dsn string) Option

WithPostgresDSN sets the PostgreSQL database connection string.

func WithSQLiteDSN

func WithSQLiteDSN(dsn string) Option

WithSQLiteDSN sets the SQLite database file path.

type Opts

type Opts struct {
	DSN string // Database connection string or file path for SQLite
}

Opts holds configuration options for store implementations.

type OutboxMessage

type OutboxMessage struct {
	ID            string       `json:"id"`
	ParticipantID string       `json:"participant_id"`
	Kind          string       `json:"kind"`
	PayloadJSON   string       `json:"payload_json"`
	Status        OutboxStatus `json:"status"`
	Attempts      int          `json:"attempts"`
	NextAttemptAt *time.Time   `json:"next_attempt_at"`
	DedupeKey     string       `json:"dedupe_key"`
	LockedAt      *time.Time   `json:"locked_at"`
	LastError     string       `json:"last_error"`
	CreatedAt     time.Time    `json:"created_at"`
	UpdatedAt     time.Time    `json:"updated_at"`
}

OutboxMessage represents a durable outgoing message record.

type OutboxRepo

type OutboxRepo interface {
	// EnqueueOutboxMessage inserts a new outbox message. If dedupeKey is non-empty
	// and a non-terminal message with that key exists, returns the existing ID.
	EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)

	// ClaimDueOutboxMessages marks up to limit queued messages whose
	// next_attempt_at <= now (or is NULL) as sending and returns them.
	ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)

	// MarkOutboxMessageSent marks a message as successfully sent.
	MarkOutboxMessageSent(id string) error

	// FailOutboxMessage records a send failure and schedules a retry at nextAttemptAt.
	FailOutboxMessage(id string, errMsg string, nextAttemptAt time.Time) error

	// RequeueStaleSendingMessages resets messages stuck in sending since before
	// staleBefore back to queued (crash recovery).
	RequeueStaleSendingMessages(staleBefore time.Time) (int, error)
}

OutboxRepo defines the interface for durable outbox message persistence.

type OutboxSendFunc

type OutboxSendFunc func(ctx context.Context, msg OutboxMessage) error

OutboxSendFunc is the callback that performs the actual message send. It receives the outbox message and should return an error if sending failed.

type OutboxSender

type OutboxSender struct {
	// contains filtered or unexported fields
}

OutboxSender periodically claims due outbox messages and attempts to send them.

func NewOutboxSender

func NewOutboxSender(repo OutboxRepo, sendFunc OutboxSendFunc, pollInterval time.Duration) *OutboxSender

NewOutboxSender creates a new OutboxSender.

func (*OutboxSender) RecoverStaleMessages

func (s *OutboxSender) RecoverStaleMessages() error

RecoverStaleMessages requeues messages stuck in sending state (crash recovery). Should be called once at startup.

func (*OutboxSender) Run

func (s *OutboxSender) Run(ctx context.Context)

Run starts the polling loop. It blocks until the context is cancelled.

type OutboxStatus

type OutboxStatus string

OutboxStatus represents the lifecycle state of an outbox message.

const (
	OutboxStatusQueued   OutboxStatus = "queued"
	OutboxStatusSending  OutboxStatus = "sending"
	OutboxStatusSent     OutboxStatus = "sent"
	OutboxStatusFailed   OutboxStatus = "failed"
	OutboxStatusCanceled OutboxStatus = "canceled"
)

type PersistenceProvider

type PersistenceProvider interface {
	JobRepo() JobRepo
	OutboxRepo() OutboxRepo
	DedupRepo() DedupRepo
}

PersistenceProvider is implemented by database-backed stores that support durable job scheduling, outbox messaging, and inbound deduplication.

type PostgresStore

type PostgresStore struct {
	// contains filtered or unexported fields
}

func NewPostgresStore

func NewPostgresStore(opts ...Option) (*PostgresStore, error)

NewPostgresStore creates a new Postgres store based on provided options.

func (*PostgresStore) AddReceipt

func (s *PostgresStore) AddReceipt(r models.Receipt) error

func (*PostgresStore) AddResponse

func (s *PostgresStore) AddResponse(r models.Response) error

AddResponse stores an incoming response in Postgres.

func (*PostgresStore) CancelJob

func (s *PostgresStore) CancelJob(id string) error

func (*PostgresStore) ClaimDueJobs

func (s *PostgresStore) ClaimDueJobs(now time.Time, limit int) ([]Job, error)

func (*PostgresStore) ClaimDueOutboxMessages

func (s *PostgresStore) ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)

func (*PostgresStore) ClearReceipts

func (s *PostgresStore) ClearReceipts() error

ClearReceipts deletes all records in receipts table (for tests).

func (*PostgresStore) ClearResponses

func (s *PostgresStore) ClearResponses() error

ClearResponses deletes all records in responses table (for tests).

func (*PostgresStore) Close

func (s *PostgresStore) Close() error

Close closes the PostgreSQL database connection.

func (*PostgresStore) CompleteJob

func (s *PostgresStore) CompleteJob(id string) error

func (*PostgresStore) DedupRepo

func (s *PostgresStore) DedupRepo() DedupRepo

DedupRepo returns the PostgresStore as a DedupRepo.

func (*PostgresStore) DeleteConversationParticipant

func (s *PostgresStore) DeleteConversationParticipant(id string) error

DeleteConversationParticipant removes a conversation participant.

func (*PostgresStore) DeleteFlowState

func (s *PostgresStore) DeleteFlowState(participantID, flowType string) error

DeleteFlowState removes flow state for a participant.

func (*PostgresStore) DeleteRegisteredHook

func (s *PostgresStore) DeleteRegisteredHook(phoneNumber string) error

DeleteRegisteredHook deletes a registered hook by phone number

func (*PostgresStore) EnqueueJob

func (s *PostgresStore) EnqueueJob(kind string, runAt time.Time, payloadJSON string, dedupeKey string) (string, error)

func (*PostgresStore) EnqueueOutboxMessage

func (s *PostgresStore) EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)

func (*PostgresStore) FailJob

func (s *PostgresStore) FailJob(id string, errMsg string, nextRunAt time.Time) error

func (*PostgresStore) FailOutboxMessage

func (s *PostgresStore) FailOutboxMessage(id string, errMsg string, nextAttemptAt time.Time) error

func (*PostgresStore) GetConversationParticipant

func (s *PostgresStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)

GetConversationParticipant retrieves a conversation participant by ID.

func (*PostgresStore) GetConversationParticipantByPhone

func (s *PostgresStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)

GetConversationParticipantByPhone retrieves a conversation participant by phone number.

func (*PostgresStore) GetFlowState

func (s *PostgresStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)

GetFlowState retrieves flow state for a participant.

func (*PostgresStore) GetJob

func (s *PostgresStore) GetJob(id string) (*Job, error)

func (*PostgresStore) GetReceipts

func (s *PostgresStore) GetReceipts() ([]models.Receipt, error)

func (*PostgresStore) GetRegisteredHook

func (s *PostgresStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)

GetRegisteredHook retrieves a registered hook by phone number

func (*PostgresStore) GetResponses

func (s *PostgresStore) GetResponses() ([]models.Response, error)

GetResponses retrieves all stored responses from Postgres.

func (*PostgresStore) IsDuplicate

func (s *PostgresStore) IsDuplicate(messageID string) (bool, error)

func (*PostgresStore) JobRepo

func (s *PostgresStore) JobRepo() JobRepo

JobRepo returns the PostgresStore as a JobRepo.

func (*PostgresStore) ListConversationParticipants

func (s *PostgresStore) ListConversationParticipants() ([]models.ConversationParticipant, error)

ListConversationParticipants retrieves all conversation participants.

func (*PostgresStore) ListRegisteredHooks

func (s *PostgresStore) ListRegisteredHooks() ([]models.RegisteredHook, error)

ListRegisteredHooks retrieves all registered hooks

func (*PostgresStore) MarkOutboxMessageSent

func (s *PostgresStore) MarkOutboxMessageSent(id string) error

func (*PostgresStore) MarkProcessed

func (s *PostgresStore) MarkProcessed(messageID string) error

func (*PostgresStore) OutboxRepo

func (s *PostgresStore) OutboxRepo() OutboxRepo

OutboxRepo returns the PostgresStore as an OutboxRepo.

func (*PostgresStore) RecordInbound

func (s *PostgresStore) RecordInbound(messageID, participantID string) (bool, error)

func (*PostgresStore) RequeueStaleRunningJobs

func (s *PostgresStore) RequeueStaleRunningJobs(staleBefore time.Time) (int, error)

func (*PostgresStore) RequeueStaleSendingMessages

func (s *PostgresStore) RequeueStaleSendingMessages(staleBefore time.Time) (int, error)

func (*PostgresStore) SaveConversationParticipant

func (s *PostgresStore) SaveConversationParticipant(participant models.ConversationParticipant) error

SaveConversationParticipant stores or updates a conversation participant.

func (*PostgresStore) SaveFlowState

func (s *PostgresStore) SaveFlowState(state models.FlowState) error

SaveFlowState stores or updates flow state for a participant.

func (*PostgresStore) SaveRegisteredHook

func (s *PostgresStore) SaveRegisteredHook(hook models.RegisteredHook) error

SaveRegisteredHook stores or updates a registered hook.

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

func NewSQLiteStore

func NewSQLiteStore(opts ...Option) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLite store with the given DSN. The DSN should be a file path to the SQLite database file. If the directory doesn't exist, it will be created.

func (*SQLiteStore) AddReceipt

func (s *SQLiteStore) AddReceipt(r models.Receipt) error

func (*SQLiteStore) AddResponse

func (s *SQLiteStore) AddResponse(r models.Response) error

func (*SQLiteStore) CancelJob

func (s *SQLiteStore) CancelJob(id string) error

func (*SQLiteStore) ClaimDueJobs

func (s *SQLiteStore) ClaimDueJobs(now time.Time, limit int) ([]Job, error)

func (*SQLiteStore) ClaimDueOutboxMessages

func (s *SQLiteStore) ClaimDueOutboxMessages(now time.Time, limit int) ([]OutboxMessage, error)

func (*SQLiteStore) ClearReceipts

func (s *SQLiteStore) ClearReceipts() error

ClearReceipts deletes all records in receipts table (for tests).

func (*SQLiteStore) ClearResponses

func (s *SQLiteStore) ClearResponses() error

ClearResponses deletes all records in responses table (for tests).

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the SQLite database connection.

func (*SQLiteStore) CompleteJob

func (s *SQLiteStore) CompleteJob(id string) error

func (*SQLiteStore) DedupRepo

func (s *SQLiteStore) DedupRepo() DedupRepo

DedupRepo returns the SQLiteStore as a DedupRepo.

func (*SQLiteStore) DeleteConversationParticipant

func (s *SQLiteStore) DeleteConversationParticipant(id string) error

DeleteConversationParticipant removes a conversation participant.

func (*SQLiteStore) DeleteFlowState

func (s *SQLiteStore) DeleteFlowState(participantID, flowType string) error

DeleteFlowState removes flow state for a participant.

func (*SQLiteStore) DeleteRegisteredHook

func (s *SQLiteStore) DeleteRegisteredHook(phoneNumber string) error

DeleteRegisteredHook removes a registered hook by phone number.

func (*SQLiteStore) EnqueueJob

func (s *SQLiteStore) EnqueueJob(kind string, runAt time.Time, payloadJSON string, dedupeKey string) (string, error)

func (*SQLiteStore) EnqueueOutboxMessage

func (s *SQLiteStore) EnqueueOutboxMessage(participantID, kind, payloadJSON, dedupeKey string) (string, error)

func (*SQLiteStore) FailJob

func (s *SQLiteStore) FailJob(id string, errMsg string, nextRunAt time.Time) error

func (*SQLiteStore) FailOutboxMessage

func (s *SQLiteStore) FailOutboxMessage(id string, errMsg string, nextAttemptAt time.Time) error

func (*SQLiteStore) GetConversationParticipant

func (s *SQLiteStore) GetConversationParticipant(id string) (*models.ConversationParticipant, error)

GetConversationParticipant retrieves a conversation participant by ID.

func (*SQLiteStore) GetConversationParticipantByPhone

func (s *SQLiteStore) GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)

GetConversationParticipantByPhone retrieves a conversation participant by phone number.

func (*SQLiteStore) GetFlowState

func (s *SQLiteStore) GetFlowState(participantID, flowType string) (*models.FlowState, error)

GetFlowState retrieves flow state for a participant.

func (*SQLiteStore) GetJob

func (s *SQLiteStore) GetJob(id string) (*Job, error)

func (*SQLiteStore) GetReceipts

func (s *SQLiteStore) GetReceipts() ([]models.Receipt, error)

func (*SQLiteStore) GetRegisteredHook

func (s *SQLiteStore) GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)

GetRegisteredHook retrieves a registered hook by phone number.

func (*SQLiteStore) GetResponses

func (s *SQLiteStore) GetResponses() ([]models.Response, error)

func (*SQLiteStore) IsDuplicate

func (s *SQLiteStore) IsDuplicate(messageID string) (bool, error)

func (*SQLiteStore) JobRepo

func (s *SQLiteStore) JobRepo() JobRepo

JobRepo returns the SQLiteStore as a JobRepo.

func (*SQLiteStore) ListConversationParticipants

func (s *SQLiteStore) ListConversationParticipants() ([]models.ConversationParticipant, error)

ListConversationParticipants retrieves all conversation participants.

func (*SQLiteStore) ListRegisteredHooks

func (s *SQLiteStore) ListRegisteredHooks() ([]models.RegisteredHook, error)

ListRegisteredHooks retrieves all registered hooks.

func (*SQLiteStore) MarkOutboxMessageSent

func (s *SQLiteStore) MarkOutboxMessageSent(id string) error

func (*SQLiteStore) MarkProcessed

func (s *SQLiteStore) MarkProcessed(messageID string) error

func (*SQLiteStore) OutboxRepo

func (s *SQLiteStore) OutboxRepo() OutboxRepo

OutboxRepo returns the SQLiteStore as an OutboxRepo.

func (*SQLiteStore) RecordInbound

func (s *SQLiteStore) RecordInbound(messageID, participantID string) (bool, error)

func (*SQLiteStore) RequeueStaleRunningJobs

func (s *SQLiteStore) RequeueStaleRunningJobs(staleBefore time.Time) (int, error)

func (*SQLiteStore) RequeueStaleSendingMessages

func (s *SQLiteStore) RequeueStaleSendingMessages(staleBefore time.Time) (int, error)

func (*SQLiteStore) SaveConversationParticipant

func (s *SQLiteStore) SaveConversationParticipant(participant models.ConversationParticipant) error

SaveConversationParticipant stores or updates a conversation participant.

func (*SQLiteStore) SaveFlowState

func (s *SQLiteStore) SaveFlowState(state models.FlowState) error

SaveFlowState stores or updates flow state for a participant.

func (*SQLiteStore) SaveRegisteredHook

func (s *SQLiteStore) SaveRegisteredHook(hook models.RegisteredHook) error

SaveRegisteredHook stores or updates a registered hook.

type Store

type Store interface {
	AddReceipt(r models.Receipt) error
	GetReceipts() ([]models.Receipt, error)
	AddResponse(r models.Response) error
	GetResponses() ([]models.Response, error)
	ClearReceipts() error  // for tests
	ClearResponses() error // for tests
	Close() error          // for proper resource cleanup
	// Flow state management
	SaveFlowState(state models.FlowState) error
	GetFlowState(participantID, flowType string) (*models.FlowState, error)
	DeleteFlowState(participantID, flowType string) error
	// Conversation participant management
	SaveConversationParticipant(participant models.ConversationParticipant) error
	GetConversationParticipant(id string) (*models.ConversationParticipant, error)
	GetConversationParticipantByPhone(phoneNumber string) (*models.ConversationParticipant, error)
	ListConversationParticipants() ([]models.ConversationParticipant, error)
	DeleteConversationParticipant(id string) error
	// Response hook persistence management
	SaveRegisteredHook(hook models.RegisteredHook) error
	GetRegisteredHook(phoneNumber string) (*models.RegisteredHook, error)
	ListRegisteredHooks() ([]models.RegisteredHook, error)
	DeleteRegisteredHook(phoneNumber string) error
}

Store defines the interface for storing receipts, responses, and flow state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL