postgres

package
v0.0.0-...-9944d71 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 24, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func OpenConnection

func OpenConnection(dsn string) (*sql.DB, error)

OpenConnection opens a PostgreSQL connection

Types

type BusinessListingRepository

type BusinessListingRepository struct {
	// contains filtered or unexported fields
}

BusinessListingRepository provides access to business_listings

func NewBusinessListingRepository

func NewBusinessListingRepository(db *sql.DB) *BusinessListingRepository

NewBusinessListingRepository creates a new repository

func (*BusinessListingRepository) Count

Count returns count for filtered results (lightweight, no data fetch)

func (*BusinessListingRepository) CountByJobID

func (r *BusinessListingRepository) CountByJobID(ctx context.Context, jobID string) (int, error)

CountByJobID counts business listings for a job

func (*BusinessListingRepository) GetByID

GetByID retrieves a single business listing by ID

func (*BusinessListingRepository) GetCategories

func (r *BusinessListingRepository) GetCategories(ctx context.Context, limit int) ([]string, error)

GetCategories returns distinct categories

func (*BusinessListingRepository) GetCities

func (r *BusinessListingRepository) GetCities(ctx context.Context, limit int) ([]string, error)

GetCities returns distinct cities

func (*BusinessListingRepository) List

List retrieves business listings with filters

func (*BusinessListingRepository) ListByJobID

func (r *BusinessListingRepository) ListByJobID(ctx context.Context, jobID string, limit, offset int) ([]*domain.BusinessListing, int, error)

ListByJobID retrieves business listings for a specific job with pagination

func (*BusinessListingRepository) Stats

Stats returns aggregate statistics

func (*BusinessListingRepository) Stream

Stream streams business listings for export (memory efficient)

func (*BusinessListingRepository) StreamByJobID

func (r *BusinessListingRepository) StreamByJobID(ctx context.Context, jobID string, fn func(listing *domain.BusinessListing) error) error

StreamByJobID streams business listings for a specific job (memory efficient)

type CachedBusinessListingRepository

type CachedBusinessListingRepository struct {
	// contains filtered or unexported fields
}

CachedBusinessListingRepository wraps BusinessListingRepository with Redis caching for expensive COUNT and aggregation queries to prevent 504 timeouts

func NewCachedBusinessListingRepository

func NewCachedBusinessListingRepository(db *sql.DB, c cache.Cache) *CachedBusinessListingRepository

NewCachedBusinessListingRepository creates a new cached repository

func (*CachedBusinessListingRepository) Count

Count returns count for filtered results (delegates to underlying repo)

func (*CachedBusinessListingRepository) CountByJobID

func (r *CachedBusinessListingRepository) CountByJobID(ctx context.Context, jobID string) (int, error)

CountByJobID counts business listings for a job

func (*CachedBusinessListingRepository) GetByID

GetByID retrieves a single business listing by ID (no caching, fast)

func (*CachedBusinessListingRepository) GetCategories

func (r *CachedBusinessListingRepository) GetCategories(ctx context.Context, limit int) ([]string, error)

GetCategories returns distinct categories with caching

func (*CachedBusinessListingRepository) GetCities

func (r *CachedBusinessListingRepository) GetCities(ctx context.Context, limit int) ([]string, error)

GetCities returns distinct cities with caching

func (*CachedBusinessListingRepository) InvalidateAllCache

func (r *CachedBusinessListingRepository) InvalidateAllCache(ctx context.Context) error

InvalidateAllCache invalidates all business listing caches Call this after bulk operations

func (*CachedBusinessListingRepository) InvalidateJobCache

func (r *CachedBusinessListingRepository) InvalidateJobCache(ctx context.Context, jobID string) error

InvalidateJobCache invalidates cache for a specific job Call this when job results are updated

func (*CachedBusinessListingRepository) List

List retrieves business listings with caching for the COUNT query

func (*CachedBusinessListingRepository) ListByJobID

func (r *CachedBusinessListingRepository) ListByJobID(ctx context.Context, jobID string, limit, offset int) ([]*domain.BusinessListing, int, error)

ListByJobID retrieves business listings for a specific job with caching

func (*CachedBusinessListingRepository) PreloadCache

PreloadCache preloads common cache entries on startup This is the "preload" strategy the user asked for

func (*CachedBusinessListingRepository) Stats

Stats retrieves aggregate statistics with caching

func (*CachedBusinessListingRepository) Stream

Stream streams business listings for export (no caching)

func (*CachedBusinessListingRepository) StreamByJobID

func (r *CachedBusinessListingRepository) StreamByJobID(ctx context.Context, jobID string, fn func(listing *domain.BusinessListing) error) error

StreamByJobID streams business listings for a specific job (no caching)

type IntervalDuration

type IntervalDuration time.Duration

IntervalDuration is a custom type that can scan PostgreSQL INTERVAL into time.Duration

func (*IntervalDuration) Scan

func (d *IntervalDuration) Scan(value interface{}) error

Scan implements the sql.Scanner interface

func (IntervalDuration) Value

func (d IntervalDuration) Value() (driver.Value, error)

Value implements the driver.Valuer interface

type JobRepository

type JobRepository struct {
	// contains filtered or unexported fields
}

JobRepository implements domain.JobRepository for PostgreSQL

func NewJobRepository

func NewJobRepository(db *sql.DB) *JobRepository

NewJobRepository creates a new JobRepository

func (*JobRepository) ClaimJob

func (r *JobRepository) ClaimJob(ctx context.Context, workerID string) (*domain.Job, error)

ClaimJob claims a pending job for a worker (atomic operation)

func (*JobRepository) Create

func (r *JobRepository) Create(ctx context.Context, job *domain.Job) error

Create creates a new job

func (*JobRepository) Delete

func (r *JobRepository) Delete(ctx context.Context, id uuid.UUID) error

Delete deletes a job by ID

func (*JobRepository) GetByID

func (r *JobRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.Job, error)

GetByID retrieves a job by ID

func (*JobRepository) GetStats

func (r *JobRepository) GetStats(ctx context.Context) (*domain.JobStats, error)

GetStats retrieves job statistics

func (*JobRepository) List

func (r *JobRepository) List(ctx context.Context, params domain.JobListParams) ([]*domain.Job, int, error)

List retrieves jobs with optional filtering

func (*JobRepository) ReleaseJob

func (r *JobRepository) ReleaseJob(ctx context.Context, id uuid.UUID) error

ReleaseJob releases a job back to pending status

func (*JobRepository) Update

func (r *JobRepository) Update(ctx context.Context, job *domain.Job) error

Update updates a job

func (*JobRepository) UpdateProgress

func (r *JobRepository) UpdateProgress(ctx context.Context, id uuid.UUID, progress domain.JobProgress) error

UpdateProgress updates the progress of a job

func (*JobRepository) UpdateStatus

func (r *JobRepository) UpdateStatus(ctx context.Context, id uuid.UUID, status domain.JobStatus) error

UpdateStatus updates only the status of a job

type ProxyListRepository

type ProxyListRepository struct {
	// contains filtered or unexported fields
}

ProxyListRepository implements domain.ProxyListRepository for PostgreSQL

func NewProxyListRepository

func NewProxyListRepository(db *sql.DB) *ProxyListRepository

NewProxyListRepository creates a new ProxyListRepository

func (*ProxyListRepository) DeleteDead

func (r *ProxyListRepository) DeleteDead(ctx context.Context) (int, error)

DeleteDead removes all dead proxies

func (*ProxyListRepository) GetByAddress

func (r *ProxyListRepository) GetByAddress(ctx context.Context, ip string, port int) (*domain.Proxy, error)

GetByAddress retrieves a proxy by IP:port

func (*ProxyListRepository) GetStats

GetStats retrieves proxy statistics

func (*ProxyListRepository) IncrementFailCount

func (r *ProxyListRepository) IncrementFailCount(ctx context.Context, id int64, maxFails int) error

IncrementFailCount increments fail count and optionally marks as dead

func (*ProxyListRepository) IncrementSuccessCount

func (r *ProxyListRepository) IncrementSuccessCount(ctx context.Context, id int64) error

IncrementSuccessCount increments success count

func (*ProxyListRepository) List

List retrieves proxies with optional filtering

func (*ProxyListRepository) ListHealthy

func (r *ProxyListRepository) ListHealthy(ctx context.Context) ([]*domain.Proxy, error)

ListHealthy retrieves all healthy proxies (for Pool)

func (*ProxyListRepository) MarkUsed

func (r *ProxyListRepository) MarkUsed(ctx context.Context, id int64) error

MarkUsed updates the last_used timestamp

func (*ProxyListRepository) UpdateStatus

func (r *ProxyListRepository) UpdateStatus(ctx context.Context, id int64, status domain.ProxyStatus) error

UpdateStatus updates the status of a proxy

func (*ProxyListRepository) Upsert

func (r *ProxyListRepository) Upsert(ctx context.Context, proxy *domain.Proxy) error

Upsert creates or updates a proxy (based on IP:port unique constraint)

func (*ProxyListRepository) UpsertBatch

func (r *ProxyListRepository) UpsertBatch(ctx context.Context, proxies []*domain.Proxy) error

UpsertBatch creates or updates multiple proxies

type ProxyRepository

type ProxyRepository struct {
	// contains filtered or unexported fields
}

func NewProxyRepository

func NewProxyRepository(db *sql.DB) *ProxyRepository

func (*ProxyRepository) Create

func (r *ProxyRepository) Create(ctx context.Context, url string) (*domain.ProxySource, error)

func (*ProxyRepository) Delete

func (r *ProxyRepository) Delete(ctx context.Context, id int64) error

func (*ProxyRepository) GetByID

func (r *ProxyRepository) GetByID(ctx context.Context, id int64) (*domain.ProxySource, error)

func (*ProxyRepository) List

type Repositories

type Repositories struct {
	Jobs    *JobRepository
	Workers *WorkerRepository
	Results *ResultRepository
	Proxies *ProxyRepository
}

Repositories holds all repository instances

func NewRepositories

func NewRepositories(db *sql.DB) *Repositories

NewRepositories creates all repositories

type ResultRepository

type ResultRepository struct {
	// contains filtered or unexported fields
}

ResultRepository implements domain.ResultRepository for PostgreSQL

func NewResultRepository

func NewResultRepository(db *sql.DB) *ResultRepository

NewResultRepository creates a new ResultRepository

func (*ResultRepository) CountByJobID

func (r *ResultRepository) CountByJobID(ctx context.Context, jobID uuid.UUID) (int, error)

CountByJobID counts results for a job

func (*ResultRepository) Create

func (r *ResultRepository) Create(ctx context.Context, jobID uuid.UUID, data []byte) error

Create creates a new result

func (*ResultRepository) CreateBatch

func (r *ResultRepository) CreateBatch(ctx context.Context, jobID uuid.UUID, data [][]byte) error

CreateBatch creates multiple results in a batch

func (*ResultRepository) DeleteByJobID

func (r *ResultRepository) DeleteByJobID(ctx context.Context, jobID uuid.UUID) error

DeleteByJobID deletes all results for a job

func (*ResultRepository) GetPlaceStats

func (r *ResultRepository) GetPlaceStats(ctx context.Context) (*domain.PlaceStats, error)

GetPlaceStats retrieves place statistics

func (*ResultRepository) ListAll

func (r *ResultRepository) ListAll(ctx context.Context, limit, offset int) ([][]byte, int, error)

ListAll retrieves all results with pagination (global view)

func (*ResultRepository) ListByJobID

func (r *ResultRepository) ListByJobID(ctx context.Context, jobID uuid.UUID, limit, offset int) ([][]byte, int, error)

ListByJobID retrieves results for a job with pagination

func (*ResultRepository) StreamByJobID

func (r *ResultRepository) StreamByJobID(ctx context.Context, jobID uuid.UUID, fn func(data []byte) error) error

StreamByJobID streams results for export (memory efficient) Uses a longer timeout (5 min) to handle large datasets

type WorkerRepository

type WorkerRepository struct {
	// contains filtered or unexported fields
}

WorkerRepository implements domain.WorkerRepository for PostgreSQL

func NewWorkerRepository

func NewWorkerRepository(db *sql.DB) *WorkerRepository

NewWorkerRepository creates a new WorkerRepository

func (*WorkerRepository) CleanupStaleWorkers

func (r *WorkerRepository) CleanupStaleWorkers(ctx context.Context, maxAge time.Duration) (int, error)

CleanupStaleWorkers removes workers that haven't sent heartbeat in a long time

func (*WorkerRepository) Delete

func (r *WorkerRepository) Delete(ctx context.Context, id string) error

Delete deletes a worker by ID

func (*WorkerRepository) GetByID

func (r *WorkerRepository) GetByID(ctx context.Context, id string) (*domain.Worker, error)

GetByID retrieves a worker by ID

func (*WorkerRepository) GetStats

func (r *WorkerRepository) GetStats(ctx context.Context) (*domain.WorkerStats, error)

GetStats retrieves worker statistics

func (*WorkerRepository) IncrementStats

func (r *WorkerRepository) IncrementStats(ctx context.Context, id string, jobsCompleted, placesScraped int) error

IncrementStats increments worker statistics

func (*WorkerRepository) List

List retrieves all workers

func (*WorkerRepository) MarkOfflineWorkers

func (r *WorkerRepository) MarkOfflineWorkers(ctx context.Context, timeoutSeconds int) (int, error)

MarkOfflineWorkers marks workers as offline if heartbeat is stale

func (*WorkerRepository) UpdateStatus

func (r *WorkerRepository) UpdateStatus(ctx context.Context, id string, status domain.WorkerStatus) error

UpdateStatus updates only the status of a worker

func (*WorkerRepository) Upsert

func (r *WorkerRepository) Upsert(ctx context.Context, worker *domain.Worker) error

Upsert creates or updates a worker (for heartbeat)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL