service

package
v0.0.0-...-0f4e36f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2026 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Overview

Package service implements business logic for feedback records.

Index

Constants

View Source
const (
	EmbeddingProviderOpenAI       = "openai"
	EmbeddingProviderGoogle       = "google"
	EmbeddingProviderGoogleGemini = "google-gemini"
)

Embedding provider names for NewEmbeddingClient.

View Source
const (
	TranslationProviderOpenAI       = "openai"
	TranslationProviderGoogle       = "google"
	TranslationProviderGoogleGemini = "google-gemini"
)

Translation provider names for NewTranslationClient (same identifiers as the embedding providers; translation reuses the OpenAI and Google SDK wrappers).

View Source
const (

	// EmbeddingsQueueName is the River queue used for feedback embedding jobs.
	EmbeddingsQueueName = "embeddings"
)
View Source
const SigningKeySize = 32

SigningKeySize is the number of random bytes for Standard Webhooks signing keys.

View Source
const (

	// TranslationBackfillsQueueName is the River queue for per-tenant translation
	// backfill fan-out jobs. It is kept separate from TranslationsQueueName so a large
	// fan-out does not starve live per-record translation throughput.
	TranslationBackfillsQueueName = "translation_backfills"
)
View Source
const (

	// TranslationsQueueName is the River queue used for feedback translation jobs.
	TranslationsQueueName = "translations"
)

Variables

View Source
var (
	// ErrEmbeddingConfigInvalid is returned when the embedding provider is unsupported.
	ErrEmbeddingConfigInvalid = errors.New("embedding config invalid")
	// ErrEmbeddingProviderAPIKey is returned when an API-key-based provider is configured without a key.
	ErrEmbeddingProviderAPIKey = errors.New("EMBEDDING_PROVIDER_API_KEY is required for this provider")
	// ErrEmbeddingBaseURLUnsupported is returned when a custom base URL is configured for a non-openai provider.
	ErrEmbeddingBaseURLUnsupported = errors.New("EMBEDDING_BASE_URL is only supported for openai")
	// ErrEmbeddingGoogleGeminiConfig is returned when google-gemini is configured without project or location.
	ErrEmbeddingGoogleGeminiConfig = errors.New(
		"google-gemini requires EMBEDDING_GOOGLE_CLOUD_PROJECT and EMBEDDING_GOOGLE_CLOUD_LOCATION")
)
View Source
var (
	ErrMissingTenantID   = errors.New("tenant_id is required")
	ErrEmptyQuery        = errors.New("query is required and must be non-empty")
	ErrEmbeddingNotFound = repository.ErrEmbeddingNotFound
)

Sentinel errors for search (used by handlers for status mapping).

View Source
var (
	// ErrTaxonomyServiceURLRequired is returned when TAXONOMY_SERVICE_URL is missing.
	ErrTaxonomyServiceURLRequired = errors.New("TAXONOMY_SERVICE_URL is required")

	// ErrTaxonomyServiceTokenRequired is returned when TAXONOMY_SERVICE_TOKEN is missing.
	ErrTaxonomyServiceTokenRequired = errors.New("TAXONOMY_SERVICE_TOKEN is required")

	// ErrTaxonomyServiceUnexpectedStatus is returned when the taxonomy service returns a non-2xx response.
	ErrTaxonomyServiceUnexpectedStatus = errors.New("taxonomy service returned non-success status")
)
View Source
var (
	// ErrTaxonomyEmbeddingsNotConfigured is returned when Hub embeddings are not configured.
	ErrTaxonomyEmbeddingsNotConfigured = errors.New("taxonomy requires EMBEDDING_MODEL to be configured")
	// ErrTaxonomyServiceNotConfigured is returned when the taxonomy compute service is unavailable.
	ErrTaxonomyServiceNotConfigured = errors.New("taxonomy service is not configured")
	// ErrTaxonomyServiceStartFailed is returned when the taxonomy compute service rejects a run.
	ErrTaxonomyServiceStartFailed = errors.New("taxonomy service failed to start run")
)
View Source
var (
	// ErrTranslationConfigInvalid is returned when the translation provider is unsupported.
	ErrTranslationConfigInvalid = errors.New("translation config invalid")
	// ErrTranslationProviderAPIKey is returned when an API-key-based provider is configured without a key.
	ErrTranslationProviderAPIKey = errors.New("TRANSLATION_PROVIDER_API_KEY is required for this provider")
	// ErrTranslationBaseURLUnsupported is returned when a custom base URL is configured for a non-openai provider.
	ErrTranslationBaseURLUnsupported = errors.New("TRANSLATION_BASE_URL is only supported for openai")
	// ErrTranslationGoogleGeminiConfig is returned when google-gemini is configured without project or location.
	ErrTranslationGoogleGeminiConfig = errors.New(
		"google-gemini requires TRANSLATION_GOOGLE_CLOUD_PROJECT and TRANSLATION_GOOGLE_CLOUD_LOCATION")
)
View Source
var (
	ErrWebhookGone   = errors.New("webhook returned 410 Gone (endpoint disabled)")
	ErrWebhookNon2xx = errors.New("webhook returned non-2xx status")
)

Sentinel errors for webhook delivery (err113).

View Source
var ErrEmbeddingBackfillNotConfigured = errors.New("embedding backfill not configured")

ErrEmbeddingBackfillNotConfigured is returned when BackfillEmbeddings is called without embedding inserter/queue.

View Source
var ErrInvalidCursor = errors.New("invalid cursor")

ErrInvalidCursor is returned when the cursor parameter is malformed or invalid.

View Source
var ErrPaginationInvariantViolated = errors.New("pagination invariant violated: hasMore with empty list")

ErrPaginationInvariantViolated indicates hasMore was true with an empty list (repository invariant violation).

View Source
var ErrTranslationLangKeyRequired = errors.New("translation lang key is required when translated text is set")

ErrTranslationLangKeyRequired is returned when a translation is set without a target locale key: a translation must record the locale it was produced in (clearing, where translated is nil, intentionally passes an empty key to null both columns).

View Source
var ErrUserIDRequired = huberrors.NewValidationError("user_id", "user_id is required")

ErrUserIDRequired is returned when deleting feedback records by user is called without user_id.

Functions

func BuildEmbeddingInput

func BuildEmbeddingInput(fieldLabel, valueText *string, prefix string) string

BuildEmbeddingInput prepares text for vector embedding. Uses a pre-allocated strings.Builder to reduce allocations on this hot path.

We feed the model raw, natural text: only TrimSpace and Unicode NFC normalization are applied. Case, diacritics, and punctuation are preserved so the model retains semantic clues (e.g. "US" vs "us", "résumé" vs "resume").

Arguments:

  • fieldLabel: The "question" or metadata key (e.g. "What is your reasoning?").
  • valueText: The "answer" or main content (e.g. "I chose option B because...").
  • prefix: Model-specific task instruction; OpenAI and Google use "".

Returns formatted string: "[prefix]Question: [label]\nAnswer: [value]" (or "[prefix][value]" when label is empty).

func DecodeSearchCursor

func DecodeSearchCursor(cursor string) (distance float64, feedbackRecordID uuid.UUID, err error)

DecodeSearchCursor parses an opaque cursor and returns (distance, feedbackRecordID). Returns ErrInvalidCursor if the cursor is malformed.

func EmbeddingPrefixForProvider

func EmbeddingPrefixForProvider(provider string) string

EmbeddingPrefixForProvider returns the document prefix for the given embedding provider (from registry). Returns "" for unknown providers.

func EncodeSearchCursor

func EncodeSearchCursor(distance float64, id uuid.UUID) (string, error)

EncodeSearchCursor returns an opaque cursor for the next page. distance is the cosine distance (e.embedding <=> query) of the last result row; id is that row's feedback_record_id.

func NormalizeEmbeddingProvider

func NormalizeEmbeddingProvider(provider string) string

NormalizeEmbeddingProvider returns the canonical provider name (lowercase, trimmed).

func NormalizeTranslationProvider

func NormalizeTranslationProvider(provider string) string

NormalizeTranslationProvider returns the canonical provider name (lowercase, trimmed).

func ProviderRequiresAPIKey

func ProviderRequiresAPIKey(provider string) bool

ProviderRequiresAPIKey returns true for providers that require EMBEDDING_PROVIDER_API_KEY (from registry).

func ProviderRequiresGoogleGeminiConfig

func ProviderRequiresGoogleGeminiConfig(provider string) bool

ProviderRequiresGoogleGeminiConfig returns true for providers that require Google Cloud project and location.

func SupportedEmbeddingProviders

func SupportedEmbeddingProviders() map[string]struct{}

SupportedEmbeddingProviders returns the set of supported provider names (from registry).

func TenantIDFromEventData

func TenantIDFromEventData(data any) (string, bool)

TenantIDFromEventData extracts tenant_id from known event payload shapes.

func TenantIDPointerFromEventData

func TenantIDPointerFromEventData(data any) *string

TenantIDPointerFromEventData returns a detached pointer so it can be safely stored in job args.

func ValidateEmbeddingConfig

func ValidateEmbeddingConfig(cfg EmbeddingClientConfig) error

ValidateEmbeddingConfig checks provider support and provider-specific requirements (API key, Google Cloud project/location). Use before creating a client or at startup to fail fast with a clear error.

func ValidateTranslationConfig

func ValidateTranslationConfig(cfg TranslationClientConfig) error

ValidateTranslationConfig checks provider support and provider-specific requirements. Use before creating a client or at startup to fail fast with a clear error.

func WebhookMatchesTenant

func WebhookMatchesTenant(webhook *models.Webhook, tenantID *string) bool

WebhookMatchesTenant reports whether a webhook may receive an event with tenantID.

Types

type CachedTenantSettings

type CachedTenantSettings struct {
	// contains filtered or unexported fields
}

CachedTenantSettings wraps a TenantSettingsReader with a per-process, size-bounded, TTL-expiring LRU cache. Feedback-record creation is high volume and the translation enqueue gate must resolve a tenant's target language per event, so caching avoids a tenant_settings read on every feedback event. Staleness is bounded by the TTL; because the worker persists the target it actually used, a changed target self-corrects on the next write. Safe for concurrent use (expirable.LRU is internally locked).

func NewCachedTenantSettings

func NewCachedTenantSettings(
	delegate TenantSettingsReader, size int, ttl time.Duration, metrics observability.CacheMetrics,
) *CachedTenantSettings

NewCachedTenantSettings wraps delegate with an LRU of at most size entries, each expiring after ttl. A non-positive size or ttl disables caching (every read hits the delegate), keeping small deployments and tests simple.

func (*CachedTenantSettings) GetSettings

func (c *CachedTenantSettings) GetSettings(
	ctx context.Context, tenantID string,
) (*models.TenantSettings, error)

GetSettings returns the tenant's settings, serving a fresh cached value when present and otherwise loading from the delegate and caching the result. Errors are never cached.

func (*CachedTenantSettings) Invalidate

func (c *CachedTenantSettings) Invalidate(tenantID string)

Invalidate evicts the tenant's cached settings so the next GetSettings reloads from the delegate. Called after a settings write so a change (e.g. a newly enabled target language) is visible immediately instead of only after TTL expiry — otherwise records created in the staleness window are skipped by the translation enqueue gate. Eviction is per-process (it refreshes the replica that handled the write); other replicas stay TTL-bounded. No-op when caching is off.

func (*CachedTenantSettings) OnSettingsChanged

func (c *CachedTenantSettings) OnSettingsChanged(_ context.Context, tenantID string, _ []string)

OnSettingsChanged implements SettingsChangeListener: any successful settings write for a tenant evicts that tenant's cached entry. Registered alongside the enrichment backfill listener so a settings change both triggers backfills and refreshes this read cache.

type EmbeddingClient

type EmbeddingClient interface {
	CreateEmbedding(ctx context.Context, input string) ([]float32, error)
	CreateEmbeddingForQuery(ctx context.Context, input string) ([]float32, error)
}

EmbeddingClient generates embedding vectors for text. CreateEmbedding is for embedding documents (e.g. feedback records) for storage. CreateEmbeddingForQuery is for embedding search queries; some providers (e.g. Google) use a different task type for asymmetric retrieval.

func NewEmbeddingClient

func NewEmbeddingClient(ctx context.Context, cfg EmbeddingClientConfig) (EmbeddingClient, error)

NewEmbeddingClient creates an EmbeddingClient for the given config. Validates provider-specific requirements via the registry, then calls the registry factory.

type EmbeddingClientConfig

type EmbeddingClientConfig struct {
	Provider            string
	ProviderAPIKey      string // API key for openai/google providers; not logged or serialized
	Model               string
	BaseURL             string
	Normalize           bool
	GoogleCloudProject  string
	GoogleCloudLocation string
}

EmbeddingClientConfig holds configuration for creating an embedding client.

type EmbeddingProvider

type EmbeddingProvider struct {
	// contains filtered or unexported fields
}

EmbeddingProvider implements eventPublisher by enqueueing one River job per feedback record event when the event is FeedbackRecordCreated (with non-empty value_text) or FeedbackRecordUpdated (with value_text in ChangedFields, including when value_text is now empty so the worker can clear).

func NewEmbeddingProvider

func NewEmbeddingProvider(
	inserter RiverJobInserter,
	apiKey string,
	model string,
	queueName string,
	maxAttempts int,
	docPrefix string,
	metrics observability.EmbeddingMetrics,
) *EmbeddingProvider

NewEmbeddingProvider creates a provider that enqueues feedback_embedding jobs. model is the embedding model name (e.g. text-embedding-3-small) from EMBEDDING_MODEL. docPrefix is the prefix for document text (from EmbeddingPrefixForProvider); use "" for OpenAI/Google. metrics may be nil when metrics are disabled.

func (*EmbeddingProvider) PublishEvent

func (p *EmbeddingProvider) PublishEvent(ctx context.Context, event Event)

PublishEvent enqueues a feedback_embedding job when the event is FeedbackRecordCreated (with non-empty value_text) or FeedbackRecordUpdated (with value_text in ChangedFields). On update, the job is enqueued even when value_text is now empty so the worker can clear the embedding for text fields. API key is required for openai and google (validated at startup).

type EmbeddingsRepository

type EmbeddingsRepository interface {
	Upsert(ctx context.Context, feedbackRecordID uuid.UUID, model string, embedding []float32) error
	DeleteByFeedbackRecordAndModel(ctx context.Context, feedbackRecordID uuid.UUID, model string) error
	ListFeedbackRecordIDsForBackfill(
		ctx context.Context, model string, afterID uuid.UUID, limit int,
	) ([]uuid.UUID, error)
}

EmbeddingsRepository defines the interface for embeddings table access.

type EmbeddingsRepositoryForSearch

type EmbeddingsRepositoryForSearch interface {
	GetEmbeddingAndTenantByFeedbackRecordAndModel(
		ctx context.Context, feedbackRecordID uuid.UUID, model string,
	) ([]float32, string, error)
	NearestFeedbackRecordsByEmbedding(
		ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64,
	) ([]models.FeedbackRecordWithScore, bool, error)
	NearestFeedbackRecordsByEmbeddingAfterCursor(
		ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int,
		lastDistance float64, lastFeedbackRecordID uuid.UUID, excludeID *uuid.UUID, minScore float64,
	) ([]models.FeedbackRecordWithScore, bool, error)
}

EmbeddingsRepositoryForSearch provides the embedding read operations needed for semantic search. HasMore is true when there may be additional results (full page returned or full fetch limit consumed).

type EnrichmentSettingsListener

type EnrichmentSettingsListener struct {
	// contains filtered or unexported fields
}

EnrichmentSettingsListener implements SettingsChangeListener by dispatching each changed setting key to the enrichment backfill it triggers. The dispatch table is built from what the deployment has enabled (e.g. translation), so a disabled enrichment registers no handler and an unknown/irrelevant key is ignored.

It deliberately does not use the webhook MessagePublisher: this is an internal, best-effort side-effect, not a customer-facing event. Enqueue errors are logged and swallowed — the global backfill command remains the guaranteed recovery path.

Adding a future setting is one map entry (e.g. a "sentiment_enabled" → sentiment backfill handler); TenantSettingsService does not change.

func NewTranslationSettingsListener

func NewTranslationSettingsListener(
	inserter RiverJobInserter, queueName string, maxAttempts int,
) *EnrichmentSettingsListener

NewTranslationSettingsListener builds a listener that, on a target_language change, enqueues a per-tenant translation backfill job (TenantTranslationBackfillArgs) via inserter. queueName/maxAttempts configure that fan-out job.

func (*EnrichmentSettingsListener) OnSettingsChanged

func (l *EnrichmentSettingsListener) OnSettingsChanged(ctx context.Context, tenantID string, changedKeys []string)

OnSettingsChanged dispatches each changed key to its enrichment backfill handler, if one is registered. Errors are logged and swallowed (the settings write already succeeded).

type Event

type Event struct {
	ID            uuid.UUID           // Unique event id (UUID v7, time-ordered)
	Type          datatypes.EventType // Event type enum (e.g., FeedbackRecordCreated, WebhookCreated)
	Timestamp     time.Time           // Event creation time
	Data          any                 // Event data (FeedbackRecord, Webhook, etc.)
	ChangedFields []string            // Only for updates
}

Event represents an event that can be published to message providers (webhooks, email, etc.)

type FeedbackEmbeddingArgs

type FeedbackEmbeddingArgs struct {
	FeedbackRecordID uuid.UUID `json:"feedback_record_id" river:"unique"`
	// Model is the embedding model name; stored in embeddings.model.
	Model string `json:"model" river:"unique"`
	// ValueTextHash is a hash of the input (trimmed value_text, or "empty"/"backfill") for dedupe semantics.
	ValueTextHash string `json:"value_text_hash" river:"unique"`
}

FeedbackEmbeddingArgs is the job payload for generating and storing an embedding for one feedback record. Used by EmbeddingProvider and the backfill flow to enqueue, and by FeedbackEmbeddingWorker to run. Uniqueness is by (FeedbackRecordID, Model, ValueTextHash) so that edits within the uniqueness window get a new job when value_text changes; same content within 24h is deduped; one job per record+model.

func (FeedbackEmbeddingArgs) Kind

Kind returns the River job kind.

type FeedbackRecordsRepository

type FeedbackRecordsRepository interface {
	Create(ctx context.Context, req *models.CreateFeedbackRecordRequest) (*models.FeedbackRecord, error)
	GetByID(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error)
	List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, bool, error)
	ListAfterCursor(
		ctx context.Context, filters *models.ListFeedbackRecordsFilters,
		cursorCollectedAt time.Time, cursorID uuid.UUID,
	) ([]models.FeedbackRecord, bool, error)
	Update(ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest) (*models.FeedbackRecord, error)
	SetTranslation(ctx context.Context, feedbackRecordID uuid.UUID, translated *string, langKey, defaultLang string) error
	ListTranslationBackfillTargets(
		ctx context.Context, afterID uuid.UUID, limit int, defaultLang string,
	) ([]models.TranslationBackfillTarget, error)
	ListTranslationBackfillTargetsForTenant(
		ctx context.Context, tenantID string, afterID uuid.UUID, limit int, defaultLang string,
	) ([]models.TranslationBackfillTarget, error)
	Delete(ctx context.Context, id uuid.UUID) error
	DeleteByUser(ctx context.Context, filters *models.DeleteFeedbackRecordsByUserFilters) ([]models.DeletedFeedbackRecordsByTenant, error)
}

FeedbackRecordsRepository defines the interface for feedback records data access.

type FeedbackRecordsService

type FeedbackRecordsService struct {
	// contains filtered or unexported fields
}

FeedbackRecordsService handles business logic for feedback records.

func NewFeedbackRecordsService

func NewFeedbackRecordsService(
	repo FeedbackRecordsRepository,
	embeddingsRepo EmbeddingsRepository,
	embeddingModel string,
	publisher MessagePublisher,
	embeddingInserter RiverJobInserter,
	embeddingQueueName string,
	embeddingMaxAttempts int,
	translationDefaultLang string,
) *FeedbackRecordsService

NewFeedbackRecordsService creates a new feedback records service. publisher may be nil when the service is used only for backfill (BackfillEmbeddings does not use the publisher). embeddingInserter and embeddingQueueName are optional (for backfill); when nil/empty, BackfillEmbeddings returns an error. Call SetEmbeddingInserter after the River client is created to enable backfill without building the service twice. embeddingsRepo and embeddingModel are required for SetEmbedding and BackfillEmbeddings (from EMBEDDING_PROVIDER and EMBEDDING_MODEL). translationDefaultLang (TRANSLATION_DEFAULT_LANGUAGE) is the fallback target for tenants with no target_language of their own; "" disables the fallback. It governs the SetTranslation write-guard and both translation backfills, so pass it wherever translation work runs.

func (*FeedbackRecordsService) BackfillEmbeddings

func (s *FeedbackRecordsService) BackfillEmbeddings(ctx context.Context, model string) (int, error)

BackfillEmbeddings enqueues embedding jobs for the given model for all feedback records that have non-empty value_text and no embedding row for that model (existing rows are replaced by upsert when the job runs). It streams the records in keyset pages. Returns the number of jobs enqueued. Requires embeddingInserter and embeddingQueueName to be set.

func (*FeedbackRecordsService) BackfillTranslations

func (s *FeedbackRecordsService) BackfillTranslations(
	ctx context.Context, inserter RiverJobInserter, queueName string, maxAttempts int,
) (int, error)

BackfillTranslations enqueues a translation job for every feedback record (across all tenants) that needs (re)translation, streaming the targets in keyset pages. Used by the one-off global backfill command. Returns the number of jobs enqueued.

func (*FeedbackRecordsService) BackfillTranslationsForTenant

func (s *FeedbackRecordsService) BackfillTranslationsForTenant(
	ctx context.Context, inserter RiverJobInserter, queueName string, maxAttempts int, tenantID string,
) (int, error)

BackfillTranslationsForTenant enqueues a translation job for every record of a single tenant that needs (re)translation, streaming in keyset pages so a large tenant is never fully materialized. It is the bulk work behind a settings-change re-translation (TenantTranslationBackfillArgs). Returns the number of jobs enqueued.

func (*FeedbackRecordsService) CreateFeedbackRecord

CreateFeedbackRecord creates a new feedback record.

func (*FeedbackRecordsService) DeleteFeedbackRecord

func (s *FeedbackRecordsService) DeleteFeedbackRecord(ctx context.Context, id uuid.UUID) error

DeleteFeedbackRecord deletes a feedback record by ID. Publishes FeedbackRecordDeleted with tenant-aware deleted IDs for webhook isolation.

func (*FeedbackRecordsService) DeleteFeedbackRecordsByUser

func (s *FeedbackRecordsService) DeleteFeedbackRecordsByUser(
	ctx context.Context, filters *models.DeleteFeedbackRecordsByUserFilters,
) (int, error)

DeleteFeedbackRecordsByUser deletes all feedback records matching user_id. When tenant_id is provided, deletion is restricted to that tenant; otherwise all user records are deleted. It publishes one tenant-aware FeedbackRecordDeleted event per tenant represented in the deleted rows.

func (*FeedbackRecordsService) GetFeedbackRecord

func (s *FeedbackRecordsService) GetFeedbackRecord(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error)

GetFeedbackRecord retrieves a single feedback record by ID.

func (*FeedbackRecordsService) ListFeedbackRecords

ListFeedbackRecords retrieves a list of feedback records with optional filters. Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages.

func (*FeedbackRecordsService) SetEmbedding

func (s *FeedbackRecordsService) SetEmbedding(
	ctx context.Context, feedbackRecordID uuid.UUID, model string, embedding []float32,
) error

SetEmbedding sets or clears the embedding for a feedback record and model (internal use by embeddings worker). If embedding is nil, the row for (feedbackRecordID, model) is deleted; otherwise upserted. It does not publish an event.

func (*FeedbackRecordsService) SetEmbeddingInserter

func (s *FeedbackRecordsService) SetEmbeddingInserter(inserter RiverJobInserter)

SetEmbeddingInserter sets the River inserter for embedding jobs (e.g. after River client is created). This allows a single service instance to be used by both handlers and the embedding worker.

func (*FeedbackRecordsService) SetTranslation

func (s *FeedbackRecordsService) SetTranslation(
	ctx context.Context, feedbackRecordID uuid.UUID, translated *string, langKey string,
) error

SetTranslation persists the translated value_text and the target locale key for a feedback record. It is the accessor the translation worker uses; the write is tenant-write-locked in the repository and publishes no event (no enrichment loop).

func (*FeedbackRecordsService) UpdateFeedbackRecord

UpdateFeedbackRecord updates an existing feedback record.

type FeedbackTranslationArgs

type FeedbackTranslationArgs struct {
	FeedbackRecordID uuid.UUID `json:"feedback_record_id" river:"unique"`
	// TargetLang is the tenant's configured target language (BCP-47) at enqueue time.
	TargetLang string `json:"target_lang" river:"unique"`
	// ValueTextHash is a hash of the inputs that determine the translation — the
	// normalized value_text and the source language — or "empty" when value_text is blank.
	ValueTextHash string `json:"value_text_hash" river:"unique"`
}

FeedbackTranslationArgs is the job payload for translating one feedback record's value_text into the tenant's target language. Uniqueness is by (FeedbackRecordID, TargetLang, ValueTextHash): a value_text edit or a target-language change yields a new job, while identical content for the same target within the window is deduped.

func (FeedbackTranslationArgs) Kind

Kind returns the River job kind.

type ListPaginationMeta

type ListPaginationMeta struct {
	Limit      int
	NextCursor string
}

ListPaginationMeta holds pagination metadata for list endpoints (feedback records, webhooks).

func BuildListPaginationMeta

func BuildListPaginationMeta(
	limit int, hasMore bool, encodeLast func() (string, error),
) (ListPaginationMeta, error)

BuildListPaginationMeta builds pagination metadata for cursor-based list responses. hasMore indicates a sentinel row was fetched (limit+1 returned, trimmed to limit). encodeLast is called only when hasMore is true to produce next_cursor. Callers must ensure that when hasMore is true, the underlying list is non-empty so encodeLast can safely access the last item.

type MessagePublisher

type MessagePublisher interface {
	// PublishEvent publishes a single event with data (no changed fields)
	PublishEvent(ctx context.Context, eventType datatypes.EventType, data any)
	// PublishEventWithChangedFields publishes a single event with data and optional changed fields (for updates)
	PublishEventWithChangedFields(ctx context.Context, eventType datatypes.EventType, data any, changedFields []string)
}

MessagePublisher defines the interface for publishing events.

type MessagePublisherManager

type MessagePublisherManager struct {
	// contains filtered or unexported fields
}

MessagePublisherManager coordinates multiple message providers.

func NewMessagePublisherManager

func NewMessagePublisherManager(
	bufferSize int, perEventTimeout time.Duration, metrics observability.EventMetrics,
) *MessagePublisherManager

NewMessagePublisherManager creates a new message publisher manager. bufferSize is the event channel capacity; perEventTimeout limits how long processing one event may take. metrics may be nil when metrics are disabled.

func (*MessagePublisherManager) PublishEvent

func (m *MessagePublisherManager) PublishEvent(ctx context.Context, eventType datatypes.EventType, data any)

PublishEvent publishes an event with data to all registered providers (convenience for no changed fields).

func (*MessagePublisherManager) PublishEventWithChangedFields

func (m *MessagePublisherManager) PublishEventWithChangedFields(
	ctx context.Context, eventType datatypes.EventType, data any, changedFields []string,
)

PublishEventWithChangedFields publishes an event with data to all registered providers.

func (*MessagePublisherManager) RegisterProvider

func (m *MessagePublisherManager) RegisterProvider(provider eventPublisher)

RegisterProvider registers a message provider (webhooks, email, SMS, etc.). Must only be called during startup, before any events are published.

func (*MessagePublisherManager) Shutdown

func (m *MessagePublisherManager) Shutdown()

Shutdown stops the background worker and waits for the buffer to drain.

type NewTaxonomyServiceParams

type NewTaxonomyServiceParams struct {
	Repo                  TaxonomyRepository
	Starter               TaxonomyRunStarter
	EmbeddingModel        string
	MinimumEmbeddingCount int
}

NewTaxonomyServiceParams configures a TaxonomyService.

type RiverJobInserter

type RiverJobInserter interface {
	Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
}

RiverJobInserter inserts a single River job. It is the shared seam the enrichment providers (embedding, translation) and their backfill flows use to enqueue work; satisfied by the River client.

type SearchResult

type SearchResult struct {
	Results    []models.FeedbackRecordWithScore
	NextCursor string // non-empty if there may be a next page (len(Results) == requested limit)
}

SearchResult holds the results and optional next-page cursor from semantic search or similar feedback.

type SearchService

type SearchService struct {
	// contains filtered or unexported fields
}

SearchService performs semantic search and similar-feedback lookups using embeddings.

func NewSearchService

func NewSearchService(p SearchServiceParams) *SearchService

NewSearchService creates a SearchService.

func (*SearchService) SemanticSearch

func (s *SearchService) SemanticSearch(
	ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string,
) (SearchResult, error)

SemanticSearch returns feedback record IDs and similarity scores for the given query, scoped to tenantID. Requires non-empty tenantID and non-empty (after trim) query. Uses cursor-based pagination. minScore is the minimum similarity score (0..1). NextCursor is set when there may be a next page.

func (*SearchService) SimilarFeedback

func (s *SearchService) SimilarFeedback(
	ctx context.Context, feedbackRecordID uuid.UUID, limit int, minScore float64, cursor string,
) (SearchResult, error)

SimilarFeedback returns feedback record IDs and similarity scores for records similar to the given one. The tenant boundary is derived from the source record before running nearest-neighbor search. Returns ErrEmbeddingNotFound when the record has no embedding for the current model. Uses cursor-based pagination.

type SearchServiceParams

type SearchServiceParams struct {
	EmbeddingClient EmbeddingClient
	EmbeddingsRepo  EmbeddingsRepositoryForSearch
	Model           string
	QueryCache      *lru.Cache[string, []float32]
	CacheMetrics    observability.CacheMetrics
	Logger          *slog.Logger
}

SearchServiceParams configures SearchService. QueryCache and CacheMetrics may be nil (no caching).

type SettingsChangeListener

type SettingsChangeListener interface {
	OnSettingsChanged(ctx context.Context, tenantID string, changedKeys []string)
}

SettingsChangeListener is notified after a tenant's settings are successfully written, with the setting keys the write touched. It lets enrichment side-effects (e.g. re-translation) react to a settings change without TenantSettingsService depending on any enrichment concern — the service depends only on this translation-free port.

Implementations do the reaction themselves (a fast, durable enqueue) and own their error handling: the method returns nothing because the settings write has already committed and the side-effect must never fail it.

func NewCompositeSettingsChangeListener

func NewCompositeSettingsChangeListener(listeners ...SettingsChangeListener) SettingsChangeListener

NewCompositeSettingsChangeListener combines listeners into one that notifies each in order.

type TaxonomyClient

type TaxonomyClient struct {
	// contains filtered or unexported fields
}

TaxonomyClient calls the standalone taxonomy service.

func NewTaxonomyClient

func NewTaxonomyClient(cfg TaxonomyClientConfig, httpClient *http.Client) (*TaxonomyClient, error)

NewTaxonomyClient creates a Hub-to-taxonomy-service client.

func (*TaxonomyClient) StartRun

func (c *TaxonomyClient) StartRun(ctx context.Context, runID string) error

StartRun asks the taxonomy service to start compute for a Hub-created run.

type TaxonomyClientConfig

type TaxonomyClientConfig struct {
	ServiceURL   string
	ServiceToken string
}

TaxonomyClientConfig configures the outbound client Hub uses to call the taxonomy service.

type TaxonomyRepository

type TaxonomyRepository interface {
	ListFieldOptions(ctx context.Context, tenantID, embeddingModel string) ([]models.TaxonomyFieldOption, error)
	CountScopeInput(ctx context.Context, scope models.TaxonomyScope, embeddingModel string) (int, int, *string, error)
	CreateRunIfAvailable(ctx context.Context, params repository.CreateTaxonomyRunParams) (*models.TaxonomyRun, bool, error)
	MarkRunRunning(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyRun, error)
	MarkRunFailed(
		ctx context.Context,
		runID uuid.UUID,
		tenantID string,
		message string,
		errorCode models.TaxonomyRunFailureCode,
	) (*models.TaxonomyRun, error)
	GetRunForInternalService(ctx context.Context, runID uuid.UUID) (*models.TaxonomyRun, error)
	GetRunForTenant(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyRun, error)
	GetActiveRun(ctx context.Context, scope models.TaxonomyScope) (*models.TaxonomyRun, error)
	ListRuns(ctx context.Context, filters models.ListTaxonomyRunsFilters) ([]models.TaxonomyRun, error)
	GetRunInput(
		ctx context.Context,
		runID uuid.UUID,
		tenantID string,
		embeddingModel string,
	) (*models.TaxonomyRunInputResponse, error)
	StoreResultAndActivate(
		ctx context.Context,
		runID uuid.UUID,
		tenantID string,
		req models.TaxonomyRunResultRequest,
	) (*models.TaxonomyRun, error)
	GetTree(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyTreeResponse, error)
	RenameNode(ctx context.Context, nodeID uuid.UUID, tenantID, actorID, label string) (*models.TaxonomyNode, error)
	RemoveNode(ctx context.Context, nodeID uuid.UUID, tenantID, actorID string) (*models.TaxonomyNode, error)
	ListNodeRecords(ctx context.Context, nodeID uuid.UUID, tenantID string, limit int) ([]models.FeedbackRecord, int, error)
}

TaxonomyRepository persists taxonomy run state and generated artifacts.

type TaxonomyRunStarter

type TaxonomyRunStarter interface {
	StartRun(ctx context.Context, runID string) error
}

TaxonomyRunStarter starts asynchronous taxonomy compute work.

type TaxonomyService

type TaxonomyService struct {
	// contains filtered or unexported fields
}

TaxonomyService coordinates taxonomy run lifecycle and edits.

func NewTaxonomyService

func NewTaxonomyService(params NewTaxonomyServiceParams) *TaxonomyService

NewTaxonomyService creates a taxonomy application service.

func (*TaxonomyService) CompleteRun

CompleteRun stores taxonomy output and activates the successful run.

func (*TaxonomyService) FailRun

func (s *TaxonomyService) FailRun(
	ctx context.Context,
	runID uuid.UUID,
	message string,
	errorCode models.TaxonomyRunFailureCode,
) (*models.TaxonomyRun, error)

FailRun records a taxonomy run failure.

func (*TaxonomyService) GetActiveTree

GetActiveTree returns the active taxonomy tree for a field scope.

func (*TaxonomyService) GetRun

func (s *TaxonomyService) GetRun(
	ctx context.Context,
	runID uuid.UUID,
	tenantID string,
) (*models.TaxonomyRun, error)

GetRun returns a taxonomy run by ID.

func (*TaxonomyService) GetRunInput

func (s *TaxonomyService) GetRunInput(
	ctx context.Context,
	runID uuid.UUID,
) (*models.TaxonomyRunInputResponse, error)

GetRunInput returns feedback text and embeddings for the taxonomy service.

func (*TaxonomyService) GetTree

func (s *TaxonomyService) GetTree(
	ctx context.Context,
	runID uuid.UUID,
	tenantID string,
) (*models.TaxonomyTreeResponse, error)

GetTree returns a taxonomy tree by run ID.

func (*TaxonomyService) ListFieldOptions

func (s *TaxonomyService) ListFieldOptions(
	ctx context.Context,
	tenantID string,
) (*models.TaxonomyFieldsResponse, error)

ListFieldOptions returns feedback fields that can run taxonomy generation.

func (*TaxonomyService) ListNodeRecords

ListNodeRecords returns feedback records assigned to a taxonomy node.

func (*TaxonomyService) ListRuns

ListRuns returns taxonomy run history for a scoped tenant.

func (*TaxonomyService) RemoveNode

func (s *TaxonomyService) RemoveNode(
	ctx context.Context,
	nodeID uuid.UUID,
	filters models.RemoveTaxonomyNodeFilters,
) (*models.TaxonomyNode, error)

RemoveNode soft-removes a taxonomy node.

func (*TaxonomyService) RenameNode

RenameNode renames a taxonomy node.

func (*TaxonomyService) StartManualRun

StartManualRun creates and starts a manual taxonomy generation run.

type TenantDataRepository

type TenantDataRepository interface {
	DeleteByTenant(ctx context.Context, tenantID string) (*models.TenantDataDeleteCounts, error)
}

TenantDataRepository defines tenant data purge access.

type TenantDataService

type TenantDataService struct {
	// contains filtered or unexported fields
}

TenantDataService handles tenant data purge business logic.

func NewTenantDataService

func NewTenantDataService(repo TenantDataRepository) *TenantDataService

NewTenantDataService creates a new tenant data service.

func (*TenantDataService) DeleteTenantData

func (s *TenantDataService) DeleteTenantData(ctx context.Context, tenantID string) (*models.TenantDataDeleteResult, error)

DeleteTenantData deletes all Hub-owned data for a tenant.

type TenantSettingsReader

type TenantSettingsReader interface {
	GetSettings(ctx context.Context, tenantID string) (*models.TenantSettings, error)
}

TenantSettingsReader is the read surface the cache wraps. *TenantSettingsService satisfies it, so the cache is a drop-in for any consumer that only needs reads (the translation enqueue gate and worker).

type TenantSettingsRepository

type TenantSettingsRepository interface {
	Get(ctx context.Context, tenantID string) (*models.TenantSettings, bool, error)
	Upsert(ctx context.Context, tenantID string, settings models.EnrichmentSettings) (*models.TenantSettings, error)
	// Patch merges set into the tenant's settings and removes removeKeys (an RFC
	// 7396 merge patch: set + delete); the two are disjoint.
	Patch(
		ctx context.Context, tenantID string, set models.EnrichmentSettings, removeKeys []string,
	) (*models.TenantSettings, error)
}

TenantSettingsRepository is the persistence surface the settings service needs.

type TenantSettingsService

type TenantSettingsService struct {
	// contains filtered or unexported fields
}

TenantSettingsService reads and writes tenant-scoped enrichment settings. It is the accessor enrichment workflows will use to resolve a tenant's configuration.

func NewTenantSettingsService

func NewTenantSettingsService(repo TenantSettingsRepository) *TenantSettingsService

NewTenantSettingsService creates a new tenant settings service.

func (*TenantSettingsService) GetSettings

func (s *TenantSettingsService) GetSettings(ctx context.Context, tenantID string) (*models.TenantSettings, error)

GetSettings returns the tenant's enrichment settings. When the tenant has no settings yet it returns a zero-value settings bag (target language unset) rather than a not-found error: an unconfigured tenant is a valid state, and consumers decide the fallback behavior. The lookup is always scoped to the normalized tenant_id.

func (*TenantSettingsService) PatchSettings

PatchSettings applies an RFC 7396 JSON Merge Patch: a member present with a value sets that setting (validated and normalized), a member present with JSON null removes it, and an omitted member is left unchanged. It translates the typed request into the keys to set and the keys to remove, which are disjoint. The tenant_id comes from the request path and scopes the write to that tenant alone.

func (*TenantSettingsService) SetSettingsChangeListener

func (s *TenantSettingsService) SetSettingsChangeListener(listener SettingsChangeListener)

SetSettingsChangeListener registers a listener notified after a successful settings write, used to trigger enrichment side-effects (e.g. a re-translation backfill on a target_language change). Optional; mirrors the post-construction injection of SetEmbeddingInserter. Nil means no side-effects fire.

func (*TenantSettingsService) UpdateSettings

UpdateSettings validates and normalizes the requested settings, then upserts them for the tenant (full replace). The tenant_id is supplied by the caller (from the request path) and scopes the write to that tenant alone.

type TenantTranslationBackfillArgs

type TenantTranslationBackfillArgs struct {
	TenantID string `json:"tenant_id" river:"unique"`
}

TenantTranslationBackfillArgs fans out a re-translation backfill for one tenant: the worker lists the tenant's stale text records and enqueues a FeedbackTranslationArgs job for each. It is enqueued when a tenant's translation-relevant settings change (today: target_language).

Uniqueness is by TenantID across the default (in-flight) states — the enqueue site sets ByArgs without a ByPeriod — so rapid repeated settings changes collapse to a single in-flight backfill, while a change after the previous backfill has completed re-triggers. The worker resolves the tenant's current target at run time, so a coalesced backfill always targets the latest configured language.

func (TenantTranslationBackfillArgs) Kind

Kind returns the River job kind.

type TranslateRequest

type TranslateRequest struct {
	Text       string
	SourceLang string
	TargetLang string
}

TranslateRequest is the input to a single translation. SourceLang and TargetLang are BCP-47 tags: TargetLang comes from the tenant's settings; SourceLang from the feedback record's language and may be empty when the source language is unknown.

type TranslationClient

type TranslationClient interface {
	Translate(ctx context.Context, req TranslateRequest) (string, error)
}

TranslationClient translates TranslateRequest.Text from SourceLang into TargetLang and returns the translated text. Implementations call an LLM provider (OpenAI or Google); the factory selects one from configuration. It mirrors the EmbeddingClient seam so the worker depends on the interface, not a provider.

func NewTranslationClient

func NewTranslationClient(ctx context.Context, cfg TranslationClientConfig) (TranslationClient, error)

NewTranslationClient creates a TranslationClient for the given config. It validates provider-specific requirements via the registry, then calls the registry factory.

type TranslationClientConfig

type TranslationClientConfig struct {
	Provider            string
	ProviderAPIKey      string // API key for openai/google providers; not logged or serialized
	Model               string
	BaseURL             string
	GoogleCloudProject  string
	GoogleCloudLocation string
}

TranslationClientConfig holds configuration for creating a translation client.

type TranslationProvider

type TranslationProvider struct {
	// contains filtered or unexported fields
}

TranslationProvider implements eventPublisher by enqueueing one translation job per eligible feedback record event: FeedbackRecordCreated with non-empty open text, or FeedbackRecordUpdated whose value_text changed. A job is only enqueued when the record is a text field with non-empty value_text and the tenant has a target language configured (read via the settings resolver). The worker resolves the remaining work; ingestion is never blocked.

func NewTranslationProvider

func NewTranslationProvider(
	inserter RiverJobInserter,
	resolver TenantSettingsReader,
	queueName string,
	maxAttempts int,
	defaultLang string,
	metrics observability.TranslationMetrics,
) *TranslationProvider

NewTranslationProvider creates a provider that enqueues feedback_translation jobs. metrics may be nil when metrics are disabled.

func (*TranslationProvider) PublishEvent

func (p *TranslationProvider) PublishEvent(ctx context.Context, event Event)

PublishEvent enqueues a feedback_translation job for an eligible create/update event. Failures are logged and swallowed so they never block ingestion.

type WebhookDispatchArgs

type WebhookDispatchArgs struct {
	EventID       uuid.UUID `json:"event_id"                 river:"unique"`
	EventType     string    `json:"event_type"`
	Timestamp     time.Time `json:"timestamp"`
	Data          any       `json:"data"`
	ChangedFields []string  `json:"changed_fields,omitempty"`
	TenantID      *string   `json:"tenant_id,omitempty"`
	WebhookID     uuid.UUID `json:"webhook_id"               river:"unique"`
}

WebhookDispatchArgs is the job payload for one (event, webhook) delivery. Used by WebhookProvider to enqueue and by WebhookDispatchWorker to run. Only event_id and webhook_id are used for River uniqueness (river:"unique") so the hash is fast and does not include the potentially large data payload.

func (WebhookDispatchArgs) Kind

func (WebhookDispatchArgs) Kind() string

Kind returns the River job kind.

type WebhookDispatchInserter

type WebhookDispatchInserter interface {
	InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
}

WebhookDispatchInserter inserts webhook_dispatch jobs in batch (e.g. River client).

type WebhookPayload

type WebhookPayload struct {
	ID            uuid.UUID `json:"id"`                       // Unique event id (UUID v7)
	Type          string    `json:"type"`                     // Event type as string (e.g., "feedback_record.created", "webhook.created")
	Timestamp     time.Time `json:"timestamp"`                // Event creation timestamp
	TenantID      *string   `json:"tenant_id,omitempty"`      // Tenant boundary for the event
	Data          any       `json:"data"`                     // Event data (FeedbackRecord, Webhook, etc.)
	ChangedFields []string  `json:"changed_fields,omitempty"` // Only for update events (optional)
}

WebhookPayload represents a generic webhook payload structure for all event types. The Data field can contain FeedbackRecord, Webhook, or other event data types.

func NewWebhookPayload

func NewWebhookPayload(args WebhookDispatchArgs) *WebhookPayload

NewWebhookPayload builds the public webhook payload from internal dispatch args.

type WebhookProvider

type WebhookProvider struct {
	// contains filtered or unexported fields
}

WebhookProvider implements eventPublisher by enqueueing one River job per (event, webhook).

func NewWebhookProvider

func NewWebhookProvider(
	inserter WebhookDispatchInserter, repo WebhookProviderRepository,
	maxAttempts, maxFanOut int,
	enqueueMaxRetries int, enqueueInitialBackoff, enqueueMaxBackoff time.Duration,
	metrics observability.WebhookMetrics,
) *WebhookProvider

NewWebhookProvider creates a provider that lists enabled webhooks and enqueues jobs via InsertMany. maxFanOut is the batch size for InsertMany (all matching webhooks are enqueued in batches of maxFanOut). enqueueMaxRetries, enqueueInitialBackoff, enqueueMaxBackoff configure retries when InsertMany fails (transient River/DB errors). metrics may be nil when metrics are disabled.

func (*WebhookProvider) PublishEvent

func (p *WebhookProvider) PublishEvent(ctx context.Context, event Event)

PublishEvent lists enabled webhooks for the event type and tenant, then enqueues one job per webhook. Webhooks are only eligible when the event payload has the same tenant_id.

type WebhookProviderRepository

type WebhookProviderRepository interface {
	ListEnabledForEventTypeAndTenant(ctx context.Context, eventType string, tenantID *string) ([]models.Webhook, error)
}

WebhookProviderRepository lists tenant-scoped webhooks eligible for event fan-out.

type WebhookSender

type WebhookSender interface {
	Send(ctx context.Context, webhook *models.Webhook, payload *WebhookPayload) error
}

WebhookSender sends a single webhook payload to an endpoint (Standard Webhooks: signing, headers, 410 handling).

type WebhookSenderImpl

type WebhookSenderImpl struct {
	// contains filtered or unexported fields
}

WebhookSenderImpl implements WebhookSender with Standard Webhooks conformance.

func NewWebhookSenderImpl

func NewWebhookSenderImpl(
	repo WebhookSenderRepository, metrics observability.WebhookMetrics, urlHostBlacklist map[string]struct{},
	httpTimeout time.Duration, httpClient *http.Client,
) *WebhookSenderImpl

NewWebhookSenderImpl creates a sender that uses the given repo. urlHostBlacklist is the SSRF blacklist (hosts/IPs); may be nil (address checks still run). httpTimeout is the HTTP client timeout; job timeout should be httpTimeout + buffer (e.g. 5s). Client does not follow redirects and validates resolved IPs at dial time (DNS rebinding protection). metrics may be nil when metrics are disabled. If httpClient is non-nil, it is used as-is (e.g. for tests that hit loopback); otherwise a secured client is built.

func (*WebhookSenderImpl) Send

func (s *WebhookSenderImpl) Send(ctx context.Context, webhook *models.Webhook, payload *WebhookPayload) error

Send signs and POSTs the payload to the webhook URL. On 410 Gone, disables the webhook and returns an error.

type WebhookSenderRepository

type WebhookSenderRepository interface {
	Update(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error)
}

WebhookSenderRepository persists webhook state changes caused by delivery.

type WebhooksRepository

type WebhooksRepository interface {
	Create(ctx context.Context, req *models.CreateWebhookRequest) (*models.Webhook, error)
	GetByID(ctx context.Context, id uuid.UUID) (*models.Webhook, error)
	List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, bool, error)
	ListAfterCursor(
		ctx context.Context, filters *models.ListWebhooksFilters,
		cursorCreatedAt time.Time, cursorID uuid.UUID,
	) ([]models.Webhook, bool, error)
	Count(ctx context.Context, filters *models.ListWebhooksFilters) (int64, error)
	Update(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error)
	Delete(ctx context.Context, id uuid.UUID) (*models.DeletedWebhook, error)
}

WebhooksRepository defines the interface for webhooks data access.

type WebhooksService

type WebhooksService struct {
	// contains filtered or unexported fields
}

WebhooksService handles business logic for webhooks.

func NewWebhooksService

func NewWebhooksService(
	repo WebhooksRepository, publisher MessagePublisher, maxWebhooks int, urlHostBlacklist map[string]struct{},
) *WebhooksService

NewWebhooksService creates a new webhooks service. urlHostBlacklist is a set of hostnames/IPs that cannot be used as webhook URLs (SSRF mitigation); may be nil for no restriction.

func (*WebhooksService) CreateWebhook

CreateWebhook creates a new webhook.

func (*WebhooksService) DeleteWebhook

func (s *WebhooksService) DeleteWebhook(ctx context.Context, id uuid.UUID) error

DeleteWebhook deletes a webhook by ID. Publishes WebhookDeleted with tenant-aware deleted IDs.

func (*WebhooksService) GetWebhook

func (s *WebhooksService) GetWebhook(ctx context.Context, id uuid.UUID) (*models.Webhook, error)

GetWebhook retrieves a single webhook by ID.

func (*WebhooksService) ListWebhooks

ListWebhooks retrieves a list of webhooks with optional filters. Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages.

func (*WebhooksService) UpdateWebhook

func (s *WebhooksService) UpdateWebhook(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error)

UpdateWebhook updates an existing webhook.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL