Documentation
¶
Index ¶
- func OpenConnection(dsn string) (*sql.DB, error)
- type BusinessListingRepository
- func (r *BusinessListingRepository) Count(ctx context.Context, filter domain.BusinessListingFilter) (int, error)
- func (r *BusinessListingRepository) CountByJobID(ctx context.Context, jobID string) (int, error)
- func (r *BusinessListingRepository) GetByID(ctx context.Context, id int64) (*domain.BusinessListing, error)
- func (r *BusinessListingRepository) GetCategories(ctx context.Context, limit int) ([]string, error)
- func (r *BusinessListingRepository) GetCities(ctx context.Context, limit int) ([]string, error)
- func (r *BusinessListingRepository) List(ctx context.Context, filter domain.BusinessListingFilter) ([]*domain.BusinessListing, int, error)
- func (r *BusinessListingRepository) ListByJobID(ctx context.Context, jobID string, limit, offset int) ([]*domain.BusinessListing, int, error)
- func (r *BusinessListingRepository) Stats(ctx context.Context) (*domain.BusinessListingStats, error)
- func (r *BusinessListingRepository) Stream(ctx context.Context, filter domain.BusinessListingFilter, ...) error
- func (r *BusinessListingRepository) StreamByJobID(ctx context.Context, jobID string, ...) error
- type CachedBusinessListingRepository
- func (r *CachedBusinessListingRepository) Count(ctx context.Context, filter domain.BusinessListingFilter) (int, error)
- func (r *CachedBusinessListingRepository) CountByJobID(ctx context.Context, jobID string) (int, error)
- func (r *CachedBusinessListingRepository) GetByID(ctx context.Context, id int64) (*domain.BusinessListing, error)
- func (r *CachedBusinessListingRepository) GetCategories(ctx context.Context, limit int) ([]string, error)
- func (r *CachedBusinessListingRepository) GetCities(ctx context.Context, limit int) ([]string, error)
- func (r *CachedBusinessListingRepository) InvalidateAllCache(ctx context.Context) error
- func (r *CachedBusinessListingRepository) InvalidateJobCache(ctx context.Context, jobID string) error
- func (r *CachedBusinessListingRepository) List(ctx context.Context, filter domain.BusinessListingFilter) ([]*domain.BusinessListing, int, error)
- func (r *CachedBusinessListingRepository) ListByJobID(ctx context.Context, jobID string, limit, offset int) ([]*domain.BusinessListing, int, error)
- func (r *CachedBusinessListingRepository) PreloadCache(ctx context.Context) error
- func (r *CachedBusinessListingRepository) Stats(ctx context.Context) (*domain.BusinessListingStats, error)
- func (r *CachedBusinessListingRepository) Stream(ctx context.Context, filter domain.BusinessListingFilter, ...) error
- func (r *CachedBusinessListingRepository) StreamByJobID(ctx context.Context, jobID string, ...) error
- type IntervalDuration
- type JobRepository
- func (r *JobRepository) ClaimJob(ctx context.Context, workerID string) (*domain.Job, error)
- func (r *JobRepository) Create(ctx context.Context, job *domain.Job) error
- func (r *JobRepository) Delete(ctx context.Context, id uuid.UUID) error
- func (r *JobRepository) GetByID(ctx context.Context, id uuid.UUID) (*domain.Job, error)
- func (r *JobRepository) GetStats(ctx context.Context) (*domain.JobStats, error)
- func (r *JobRepository) List(ctx context.Context, params domain.JobListParams) ([]*domain.Job, int, error)
- func (r *JobRepository) ReleaseJob(ctx context.Context, id uuid.UUID) error
- func (r *JobRepository) Update(ctx context.Context, job *domain.Job) error
- func (r *JobRepository) UpdateProgress(ctx context.Context, id uuid.UUID, progress domain.JobProgress) error
- func (r *JobRepository) UpdateStatus(ctx context.Context, id uuid.UUID, status domain.JobStatus) error
- type ProxyListRepository
- func (r *ProxyListRepository) DeleteDead(ctx context.Context) (int, error)
- func (r *ProxyListRepository) GetByAddress(ctx context.Context, ip string, port int) (*domain.Proxy, error)
- func (r *ProxyListRepository) GetStats(ctx context.Context) (*domain.ProxyStats, error)
- func (r *ProxyListRepository) IncrementFailCount(ctx context.Context, id int64, maxFails int) error
- func (r *ProxyListRepository) IncrementSuccessCount(ctx context.Context, id int64) error
- func (r *ProxyListRepository) List(ctx context.Context, params domain.ProxyListParams) ([]*domain.Proxy, int, error)
- func (r *ProxyListRepository) ListHealthy(ctx context.Context) ([]*domain.Proxy, error)
- func (r *ProxyListRepository) MarkUsed(ctx context.Context, id int64) error
- func (r *ProxyListRepository) UpdateStatus(ctx context.Context, id int64, status domain.ProxyStatus) error
- func (r *ProxyListRepository) Upsert(ctx context.Context, proxy *domain.Proxy) error
- func (r *ProxyListRepository) UpsertBatch(ctx context.Context, proxies []*domain.Proxy) error
- type ProxyRepository
- func (r *ProxyRepository) Create(ctx context.Context, url string) (*domain.ProxySource, error)
- func (r *ProxyRepository) Delete(ctx context.Context, id int64) error
- func (r *ProxyRepository) GetByID(ctx context.Context, id int64) (*domain.ProxySource, error)
- func (r *ProxyRepository) List(ctx context.Context) ([]*domain.ProxySource, error)
- type Repositories
- type ResultRepository
- func (r *ResultRepository) CountByJobID(ctx context.Context, jobID uuid.UUID) (int, error)
- func (r *ResultRepository) Create(ctx context.Context, jobID uuid.UUID, data []byte) error
- func (r *ResultRepository) CreateBatch(ctx context.Context, jobID uuid.UUID, data [][]byte) error
- func (r *ResultRepository) DeleteByJobID(ctx context.Context, jobID uuid.UUID) error
- func (r *ResultRepository) GetPlaceStats(ctx context.Context) (*domain.PlaceStats, error)
- func (r *ResultRepository) ListAll(ctx context.Context, limit, offset int) ([][]byte, int, error)
- func (r *ResultRepository) ListByJobID(ctx context.Context, jobID uuid.UUID, limit, offset int) ([][]byte, int, error)
- func (r *ResultRepository) StreamByJobID(ctx context.Context, jobID uuid.UUID, fn func(data []byte) error) error
- type WorkerRepository
- func (r *WorkerRepository) CleanupStaleWorkers(ctx context.Context, maxAge time.Duration) (int, error)
- func (r *WorkerRepository) Delete(ctx context.Context, id string) error
- func (r *WorkerRepository) GetByID(ctx context.Context, id string) (*domain.Worker, error)
- func (r *WorkerRepository) GetStats(ctx context.Context) (*domain.WorkerStats, error)
- func (r *WorkerRepository) IncrementStats(ctx context.Context, id string, jobsCompleted, placesScraped int) error
- func (r *WorkerRepository) List(ctx context.Context, params domain.WorkerListParams) ([]*domain.Worker, error)
- func (r *WorkerRepository) MarkOfflineWorkers(ctx context.Context, timeoutSeconds int) (int, error)
- func (r *WorkerRepository) UpdateStatus(ctx context.Context, id string, status domain.WorkerStatus) error
- func (r *WorkerRepository) Upsert(ctx context.Context, worker *domain.Worker) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BusinessListingRepository ¶
type BusinessListingRepository struct {
// contains filtered or unexported fields
}
BusinessListingRepository provides access to business_listings
func NewBusinessListingRepository ¶
func NewBusinessListingRepository(db *sql.DB) *BusinessListingRepository
NewBusinessListingRepository creates a new repository
func (*BusinessListingRepository) Count ¶
func (r *BusinessListingRepository) Count(ctx context.Context, filter domain.BusinessListingFilter) (int, error)
Count returns count for filtered results (lightweight, no data fetch)
func (*BusinessListingRepository) CountByJobID ¶
CountByJobID counts business listings for a job
func (*BusinessListingRepository) GetByID ¶
func (r *BusinessListingRepository) GetByID(ctx context.Context, id int64) (*domain.BusinessListing, error)
GetByID retrieves a single business listing by ID
func (*BusinessListingRepository) GetCategories ¶
GetCategories returns distinct categories
func (*BusinessListingRepository) List ¶
func (r *BusinessListingRepository) List(ctx context.Context, filter domain.BusinessListingFilter) ([]*domain.BusinessListing, int, error)
List retrieves business listings with filters
func (*BusinessListingRepository) ListByJobID ¶
func (r *BusinessListingRepository) ListByJobID(ctx context.Context, jobID string, limit, offset int) ([]*domain.BusinessListing, int, error)
ListByJobID retrieves business listings for a specific job with pagination
func (*BusinessListingRepository) Stats ¶
func (r *BusinessListingRepository) Stats(ctx context.Context) (*domain.BusinessListingStats, error)
Stats returns aggregate statistics
func (*BusinessListingRepository) Stream ¶
func (r *BusinessListingRepository) Stream(ctx context.Context, filter domain.BusinessListingFilter, fn func(listing *domain.BusinessListing) error) error
Stream streams business listings for export (memory efficient)
func (*BusinessListingRepository) StreamByJobID ¶
func (r *BusinessListingRepository) StreamByJobID(ctx context.Context, jobID string, fn func(listing *domain.BusinessListing) error) error
StreamByJobID streams business listings for a specific job (memory efficient)
type CachedBusinessListingRepository ¶
type CachedBusinessListingRepository struct {
// contains filtered or unexported fields
}
CachedBusinessListingRepository wraps BusinessListingRepository with Redis caching for expensive COUNT and aggregation queries to prevent 504 timeouts
func NewCachedBusinessListingRepository ¶
func NewCachedBusinessListingRepository(db *sql.DB, c cache.Cache) *CachedBusinessListingRepository
NewCachedBusinessListingRepository creates a new cached repository
func (*CachedBusinessListingRepository) Count ¶
func (r *CachedBusinessListingRepository) Count(ctx context.Context, filter domain.BusinessListingFilter) (int, error)
Count returns count for filtered results (delegates to underlying repo)
func (*CachedBusinessListingRepository) CountByJobID ¶
func (r *CachedBusinessListingRepository) CountByJobID(ctx context.Context, jobID string) (int, error)
CountByJobID counts business listings for a job
func (*CachedBusinessListingRepository) GetByID ¶
func (r *CachedBusinessListingRepository) GetByID(ctx context.Context, id int64) (*domain.BusinessListing, error)
GetByID retrieves a single business listing by ID (no caching, fast)
func (*CachedBusinessListingRepository) GetCategories ¶
func (r *CachedBusinessListingRepository) GetCategories(ctx context.Context, limit int) ([]string, error)
GetCategories returns distinct categories with caching
func (*CachedBusinessListingRepository) GetCities ¶
func (r *CachedBusinessListingRepository) GetCities(ctx context.Context, limit int) ([]string, error)
GetCities returns distinct cities with caching
func (*CachedBusinessListingRepository) InvalidateAllCache ¶
func (r *CachedBusinessListingRepository) InvalidateAllCache(ctx context.Context) error
InvalidateAllCache invalidates all business listing caches Call this after bulk operations
func (*CachedBusinessListingRepository) InvalidateJobCache ¶
func (r *CachedBusinessListingRepository) InvalidateJobCache(ctx context.Context, jobID string) error
InvalidateJobCache invalidates cache for a specific job Call this when job results are updated
func (*CachedBusinessListingRepository) List ¶
func (r *CachedBusinessListingRepository) List(ctx context.Context, filter domain.BusinessListingFilter) ([]*domain.BusinessListing, int, error)
List retrieves business listings with caching for the COUNT query
func (*CachedBusinessListingRepository) ListByJobID ¶
func (r *CachedBusinessListingRepository) ListByJobID(ctx context.Context, jobID string, limit, offset int) ([]*domain.BusinessListing, int, error)
ListByJobID retrieves business listings for a specific job with caching
func (*CachedBusinessListingRepository) PreloadCache ¶
func (r *CachedBusinessListingRepository) PreloadCache(ctx context.Context) error
PreloadCache preloads common cache entries on startup This is the "preload" strategy the user asked for
func (*CachedBusinessListingRepository) Stats ¶
func (r *CachedBusinessListingRepository) Stats(ctx context.Context) (*domain.BusinessListingStats, error)
Stats retrieves aggregate statistics with caching
func (*CachedBusinessListingRepository) Stream ¶
func (r *CachedBusinessListingRepository) Stream(ctx context.Context, filter domain.BusinessListingFilter, fn func(listing *domain.BusinessListing) error) error
Stream streams business listings for export (no caching)
func (*CachedBusinessListingRepository) StreamByJobID ¶
func (r *CachedBusinessListingRepository) StreamByJobID(ctx context.Context, jobID string, fn func(listing *domain.BusinessListing) error) error
StreamByJobID streams business listings for a specific job (no caching)
type IntervalDuration ¶
IntervalDuration is a custom type that can scan PostgreSQL INTERVAL into time.Duration
func (*IntervalDuration) Scan ¶
func (d *IntervalDuration) Scan(value interface{}) error
Scan implements the sql.Scanner interface
type JobRepository ¶
type JobRepository struct {
// contains filtered or unexported fields
}
JobRepository implements domain.JobRepository for PostgreSQL
func NewJobRepository ¶
func NewJobRepository(db *sql.DB) *JobRepository
NewJobRepository creates a new JobRepository
func (*JobRepository) List ¶
func (r *JobRepository) List(ctx context.Context, params domain.JobListParams) ([]*domain.Job, int, error)
List retrieves jobs with optional filtering
func (*JobRepository) ReleaseJob ¶
ReleaseJob releases a job back to pending status
func (*JobRepository) UpdateProgress ¶
func (r *JobRepository) UpdateProgress(ctx context.Context, id uuid.UUID, progress domain.JobProgress) error
UpdateProgress updates the progress of a job
func (*JobRepository) UpdateStatus ¶
func (r *JobRepository) UpdateStatus(ctx context.Context, id uuid.UUID, status domain.JobStatus) error
UpdateStatus updates only the status of a job
type ProxyListRepository ¶
type ProxyListRepository struct {
// contains filtered or unexported fields
}
ProxyListRepository implements domain.ProxyListRepository for PostgreSQL
func NewProxyListRepository ¶
func NewProxyListRepository(db *sql.DB) *ProxyListRepository
NewProxyListRepository creates a new ProxyListRepository
func (*ProxyListRepository) DeleteDead ¶
func (r *ProxyListRepository) DeleteDead(ctx context.Context) (int, error)
DeleteDead removes all dead proxies
func (*ProxyListRepository) GetByAddress ¶
func (r *ProxyListRepository) GetByAddress(ctx context.Context, ip string, port int) (*domain.Proxy, error)
GetByAddress retrieves a proxy by IP:port
func (*ProxyListRepository) GetStats ¶
func (r *ProxyListRepository) GetStats(ctx context.Context) (*domain.ProxyStats, error)
GetStats retrieves proxy statistics
func (*ProxyListRepository) IncrementFailCount ¶
IncrementFailCount increments fail count and optionally marks as dead
func (*ProxyListRepository) IncrementSuccessCount ¶
func (r *ProxyListRepository) IncrementSuccessCount(ctx context.Context, id int64) error
IncrementSuccessCount increments success count
func (*ProxyListRepository) List ¶
func (r *ProxyListRepository) List(ctx context.Context, params domain.ProxyListParams) ([]*domain.Proxy, int, error)
List retrieves proxies with optional filtering
func (*ProxyListRepository) ListHealthy ¶
ListHealthy retrieves all healthy proxies (for Pool)
func (*ProxyListRepository) MarkUsed ¶
func (r *ProxyListRepository) MarkUsed(ctx context.Context, id int64) error
MarkUsed updates the last_used timestamp
func (*ProxyListRepository) UpdateStatus ¶
func (r *ProxyListRepository) UpdateStatus(ctx context.Context, id int64, status domain.ProxyStatus) error
UpdateStatus updates the status of a proxy
func (*ProxyListRepository) Upsert ¶
Upsert creates or updates a proxy (based on IP:port unique constraint)
func (*ProxyListRepository) UpsertBatch ¶
UpsertBatch creates or updates multiple proxies
type ProxyRepository ¶
type ProxyRepository struct {
// contains filtered or unexported fields
}
func NewProxyRepository ¶
func NewProxyRepository(db *sql.DB) *ProxyRepository
func (*ProxyRepository) Create ¶
func (r *ProxyRepository) Create(ctx context.Context, url string) (*domain.ProxySource, error)
func (*ProxyRepository) Delete ¶
func (r *ProxyRepository) Delete(ctx context.Context, id int64) error
func (*ProxyRepository) GetByID ¶
func (r *ProxyRepository) GetByID(ctx context.Context, id int64) (*domain.ProxySource, error)
func (*ProxyRepository) List ¶
func (r *ProxyRepository) List(ctx context.Context) ([]*domain.ProxySource, error)
type Repositories ¶
type Repositories struct {
Jobs *JobRepository
Workers *WorkerRepository
Results *ResultRepository
Proxies *ProxyRepository
}
Repositories holds all repository instances
func NewRepositories ¶
func NewRepositories(db *sql.DB) *Repositories
NewRepositories creates all repositories
type ResultRepository ¶
type ResultRepository struct {
// contains filtered or unexported fields
}
ResultRepository implements domain.ResultRepository for PostgreSQL
func NewResultRepository ¶
func NewResultRepository(db *sql.DB) *ResultRepository
NewResultRepository creates a new ResultRepository
func (*ResultRepository) CountByJobID ¶
CountByJobID counts results for a job
func (*ResultRepository) CreateBatch ¶
CreateBatch creates multiple results in a batch
func (*ResultRepository) DeleteByJobID ¶
DeleteByJobID deletes all results for a job
func (*ResultRepository) GetPlaceStats ¶
func (r *ResultRepository) GetPlaceStats(ctx context.Context) (*domain.PlaceStats, error)
GetPlaceStats retrieves place statistics
func (*ResultRepository) ListByJobID ¶
func (r *ResultRepository) ListByJobID(ctx context.Context, jobID uuid.UUID, limit, offset int) ([][]byte, int, error)
ListByJobID retrieves results for a job with pagination
func (*ResultRepository) StreamByJobID ¶
func (r *ResultRepository) StreamByJobID(ctx context.Context, jobID uuid.UUID, fn func(data []byte) error) error
StreamByJobID streams results for export (memory efficient) Uses a longer timeout (5 min) to handle large datasets
type WorkerRepository ¶
type WorkerRepository struct {
// contains filtered or unexported fields
}
WorkerRepository implements domain.WorkerRepository for PostgreSQL
func NewWorkerRepository ¶
func NewWorkerRepository(db *sql.DB) *WorkerRepository
NewWorkerRepository creates a new WorkerRepository
func (*WorkerRepository) CleanupStaleWorkers ¶
func (r *WorkerRepository) CleanupStaleWorkers(ctx context.Context, maxAge time.Duration) (int, error)
CleanupStaleWorkers removes workers that haven't sent heartbeat in a long time
func (*WorkerRepository) Delete ¶
func (r *WorkerRepository) Delete(ctx context.Context, id string) error
Delete deletes a worker by ID
func (*WorkerRepository) GetStats ¶
func (r *WorkerRepository) GetStats(ctx context.Context) (*domain.WorkerStats, error)
GetStats retrieves worker statistics
func (*WorkerRepository) IncrementStats ¶
func (r *WorkerRepository) IncrementStats(ctx context.Context, id string, jobsCompleted, placesScraped int) error
IncrementStats increments worker statistics
func (*WorkerRepository) List ¶
func (r *WorkerRepository) List(ctx context.Context, params domain.WorkerListParams) ([]*domain.Worker, error)
List retrieves all workers
func (*WorkerRepository) MarkOfflineWorkers ¶
MarkOfflineWorkers marks workers as offline if heartbeat is stale
func (*WorkerRepository) UpdateStatus ¶
func (r *WorkerRepository) UpdateStatus(ctx context.Context, id string, status domain.WorkerStatus) error
UpdateStatus updates only the status of a worker