Documentation
¶
Overview ¶
Package service implements business logic for feedback records.
Index ¶
- Constants
- Variables
- func BuildEmbeddingInput(fieldLabel, valueText *string, prefix string) string
- func DecodeSearchCursor(cursor string) (distance float64, feedbackRecordID uuid.UUID, err error)
- func EmbeddingPrefixForProvider(provider string) string
- func EncodeSearchCursor(distance float64, id uuid.UUID) (string, error)
- func NormalizeEmbeddingProvider(provider string) string
- func NormalizeTranslationProvider(provider string) string
- func ProviderRequiresAPIKey(provider string) bool
- func ProviderRequiresGoogleGeminiConfig(provider string) bool
- func SupportedEmbeddingProviders() map[string]struct{}
- func TenantIDFromEventData(data any) (string, bool)
- func TenantIDPointerFromEventData(data any) *string
- func ValidateEmbeddingConfig(cfg EmbeddingClientConfig) error
- func ValidateTranslationConfig(cfg TranslationClientConfig) error
- func WebhookMatchesTenant(webhook *models.Webhook, tenantID *string) bool
- type CachedTenantSettings
- type EmbeddingClient
- type EmbeddingClientConfig
- type EmbeddingProvider
- type EmbeddingsRepository
- type EmbeddingsRepositoryForSearch
- type EnrichmentSettingsListener
- type Event
- type FeedbackEmbeddingArgs
- type FeedbackRecordsRepository
- type FeedbackRecordsService
- func (s *FeedbackRecordsService) BackfillEmbeddings(ctx context.Context, model string) (int, error)
- func (s *FeedbackRecordsService) BackfillTranslations(ctx context.Context, inserter RiverJobInserter, queueName string, ...) (int, error)
- func (s *FeedbackRecordsService) BackfillTranslationsForTenant(ctx context.Context, inserter RiverJobInserter, queueName string, ...) (int, error)
- func (s *FeedbackRecordsService) CreateFeedbackRecord(ctx context.Context, req *models.CreateFeedbackRecordRequest) (*models.FeedbackRecord, error)
- func (s *FeedbackRecordsService) DeleteFeedbackRecord(ctx context.Context, id uuid.UUID) error
- func (s *FeedbackRecordsService) DeleteFeedbackRecordsByUser(ctx context.Context, filters *models.DeleteFeedbackRecordsByUserFilters) (int, error)
- func (s *FeedbackRecordsService) GetFeedbackRecord(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error)
- func (s *FeedbackRecordsService) ListFeedbackRecords(ctx context.Context, filters *models.ListFeedbackRecordsFilters) (*models.ListFeedbackRecordsResponse, error)
- func (s *FeedbackRecordsService) SetEmbedding(ctx context.Context, feedbackRecordID uuid.UUID, model string, ...) error
- func (s *FeedbackRecordsService) SetEmbeddingInserter(inserter RiverJobInserter)
- func (s *FeedbackRecordsService) SetTranslation(ctx context.Context, feedbackRecordID uuid.UUID, translated *string, ...) error
- func (s *FeedbackRecordsService) UpdateFeedbackRecord(ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest) (*models.FeedbackRecord, error)
- type FeedbackTranslationArgs
- type ListPaginationMeta
- type MessagePublisher
- type MessagePublisherManager
- func (m *MessagePublisherManager) PublishEvent(ctx context.Context, eventType datatypes.EventType, data any)
- func (m *MessagePublisherManager) PublishEventWithChangedFields(ctx context.Context, eventType datatypes.EventType, data any, ...)
- func (m *MessagePublisherManager) RegisterProvider(provider eventPublisher)
- func (m *MessagePublisherManager) Shutdown()
- type NewTaxonomyServiceParams
- type RiverJobInserter
- type SearchResult
- type SearchService
- type SearchServiceParams
- type SettingsChangeListener
- type TaxonomyClient
- type TaxonomyClientConfig
- type TaxonomyRepository
- type TaxonomyRunStarter
- type TaxonomyService
- func (s *TaxonomyService) CompleteRun(ctx context.Context, runID uuid.UUID, req models.TaxonomyRunResultRequest) (*models.TaxonomyRun, error)
- func (s *TaxonomyService) FailRun(ctx context.Context, runID uuid.UUID, message string, ...) (*models.TaxonomyRun, error)
- func (s *TaxonomyService) GetActiveTree(ctx context.Context, scope models.TaxonomyScope) (*models.TaxonomyTreeResponse, error)
- func (s *TaxonomyService) GetRun(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyRun, error)
- func (s *TaxonomyService) GetRunInput(ctx context.Context, runID uuid.UUID) (*models.TaxonomyRunInputResponse, error)
- func (s *TaxonomyService) GetTree(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyTreeResponse, error)
- func (s *TaxonomyService) ListFieldOptions(ctx context.Context, tenantID string) (*models.TaxonomyFieldsResponse, error)
- func (s *TaxonomyService) ListNodeRecords(ctx context.Context, nodeID uuid.UUID, ...) (*models.TaxonomyNodeRecordsResponse, error)
- func (s *TaxonomyService) ListRuns(ctx context.Context, filters models.ListTaxonomyRunsFilters) (*models.ListTaxonomyRunsResponse, error)
- func (s *TaxonomyService) RemoveNode(ctx context.Context, nodeID uuid.UUID, ...) (*models.TaxonomyNode, error)
- func (s *TaxonomyService) RenameNode(ctx context.Context, nodeID uuid.UUID, req models.RenameTaxonomyNodeRequest) (*models.TaxonomyNode, error)
- func (s *TaxonomyService) StartManualRun(ctx context.Context, req models.CreateTaxonomyRunRequest) (*models.CreateTaxonomyRunResponse, error)
- type TenantDataRepository
- type TenantDataService
- type TenantSettingsReader
- type TenantSettingsRepository
- type TenantSettingsService
- func (s *TenantSettingsService) GetSettings(ctx context.Context, tenantID string) (*models.TenantSettings, error)
- func (s *TenantSettingsService) PatchSettings(ctx context.Context, tenantID string, req *models.PatchTenantSettingsRequest) (*models.TenantSettings, error)
- func (s *TenantSettingsService) SetSettingsChangeListener(listener SettingsChangeListener)
- func (s *TenantSettingsService) UpdateSettings(ctx context.Context, tenantID string, req *models.UpdateTenantSettingsRequest) (*models.TenantSettings, error)
- type TenantTranslationBackfillArgs
- type TranslateRequest
- type TranslationClient
- type TranslationClientConfig
- type TranslationProvider
- type WebhookDispatchArgs
- type WebhookDispatchInserter
- type WebhookPayload
- type WebhookProvider
- type WebhookProviderRepository
- type WebhookSender
- type WebhookSenderImpl
- type WebhookSenderRepository
- type WebhooksRepository
- type WebhooksService
- func (s *WebhooksService) CreateWebhook(ctx context.Context, req *models.CreateWebhookRequest) (*models.Webhook, error)
- func (s *WebhooksService) DeleteWebhook(ctx context.Context, id uuid.UUID) error
- func (s *WebhooksService) GetWebhook(ctx context.Context, id uuid.UUID) (*models.Webhook, error)
- func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.ListWebhooksFilters) (*models.ListWebhooksResponse, error)
- func (s *WebhooksService) UpdateWebhook(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error)
Constants ¶
const ( EmbeddingProviderOpenAI = "openai" EmbeddingProviderGoogle = "google" EmbeddingProviderGoogleGemini = "google-gemini" )
Embedding provider names for NewEmbeddingClient.
const ( TranslationProviderOpenAI = "openai" TranslationProviderGoogle = "google" TranslationProviderGoogleGemini = "google-gemini" )
Translation provider names for NewTranslationClient (same identifiers as the embedding providers; translation reuses the OpenAI and Google SDK wrappers).
const (
// EmbeddingsQueueName is the River queue used for feedback embedding jobs.
EmbeddingsQueueName = "embeddings"
)
const SigningKeySize = 32
SigningKeySize is the number of random bytes for Standard Webhooks signing keys.
const ( // TranslationBackfillsQueueName is the River queue for per-tenant translation // backfill fan-out jobs. It is kept separate from TranslationsQueueName so a large // fan-out does not starve live per-record translation throughput. TranslationBackfillsQueueName = "translation_backfills" )
const (
// TranslationsQueueName is the River queue used for feedback translation jobs.
TranslationsQueueName = "translations"
)
Variables ¶
var ( // ErrEmbeddingConfigInvalid is returned when the embedding provider is unsupported. ErrEmbeddingConfigInvalid = errors.New("embedding config invalid") // ErrEmbeddingProviderAPIKey is returned when an API-key-based provider is configured without a key. ErrEmbeddingProviderAPIKey = errors.New("EMBEDDING_PROVIDER_API_KEY is required for this provider") // ErrEmbeddingBaseURLUnsupported is returned when a custom base URL is configured for a non-openai provider. ErrEmbeddingBaseURLUnsupported = errors.New("EMBEDDING_BASE_URL is only supported for openai") // ErrEmbeddingGoogleGeminiConfig is returned when google-gemini is configured without project or location. ErrEmbeddingGoogleGeminiConfig = errors.New( "google-gemini requires EMBEDDING_GOOGLE_CLOUD_PROJECT and EMBEDDING_GOOGLE_CLOUD_LOCATION") )
var ( ErrMissingTenantID = errors.New("tenant_id is required") ErrEmptyQuery = errors.New("query is required and must be non-empty") ErrEmbeddingNotFound = repository.ErrEmbeddingNotFound )
Sentinel errors for search (used by handlers for status mapping).
var ( // ErrTaxonomyServiceURLRequired is returned when TAXONOMY_SERVICE_URL is missing. ErrTaxonomyServiceURLRequired = errors.New("TAXONOMY_SERVICE_URL is required") // ErrTaxonomyServiceTokenRequired is returned when TAXONOMY_SERVICE_TOKEN is missing. ErrTaxonomyServiceTokenRequired = errors.New("TAXONOMY_SERVICE_TOKEN is required") // ErrTaxonomyServiceUnexpectedStatus is returned when the taxonomy service returns a non-2xx response. ErrTaxonomyServiceUnexpectedStatus = errors.New("taxonomy service returned non-success status") )
var ( // ErrTaxonomyEmbeddingsNotConfigured is returned when Hub embeddings are not configured. ErrTaxonomyEmbeddingsNotConfigured = errors.New("taxonomy requires EMBEDDING_MODEL to be configured") // ErrTaxonomyServiceNotConfigured is returned when the taxonomy compute service is unavailable. ErrTaxonomyServiceNotConfigured = errors.New("taxonomy service is not configured") // ErrTaxonomyServiceStartFailed is returned when the taxonomy compute service rejects a run. ErrTaxonomyServiceStartFailed = errors.New("taxonomy service failed to start run") )
var ( // ErrTranslationConfigInvalid is returned when the translation provider is unsupported. ErrTranslationConfigInvalid = errors.New("translation config invalid") // ErrTranslationProviderAPIKey is returned when an API-key-based provider is configured without a key. ErrTranslationProviderAPIKey = errors.New("TRANSLATION_PROVIDER_API_KEY is required for this provider") // ErrTranslationBaseURLUnsupported is returned when a custom base URL is configured for a non-openai provider. ErrTranslationBaseURLUnsupported = errors.New("TRANSLATION_BASE_URL is only supported for openai") // ErrTranslationGoogleGeminiConfig is returned when google-gemini is configured without project or location. ErrTranslationGoogleGeminiConfig = errors.New( "google-gemini requires TRANSLATION_GOOGLE_CLOUD_PROJECT and TRANSLATION_GOOGLE_CLOUD_LOCATION") )
var ( ErrWebhookGone = errors.New("webhook returned 410 Gone (endpoint disabled)") ErrWebhookNon2xx = errors.New("webhook returned non-2xx status") )
Sentinel errors for webhook delivery (err113).
var ErrEmbeddingBackfillNotConfigured = errors.New("embedding backfill not configured")
ErrEmbeddingBackfillNotConfigured is returned when BackfillEmbeddings is called without embedding inserter/queue.
var ErrInvalidCursor = errors.New("invalid cursor")
ErrInvalidCursor is returned when the cursor parameter is malformed or invalid.
var ErrPaginationInvariantViolated = errors.New("pagination invariant violated: hasMore with empty list")
ErrPaginationInvariantViolated indicates hasMore was true with an empty list (repository invariant violation).
var ErrTranslationLangKeyRequired = errors.New("translation lang key is required when translated text is set")
ErrTranslationLangKeyRequired is returned when a translation is set without a target locale key: a translation must record the locale it was produced in (clearing, where translated is nil, intentionally passes an empty key to null both columns).
var ErrUserIDRequired = huberrors.NewValidationError("user_id", "user_id is required")
ErrUserIDRequired is returned when deleting feedback records by user is called without user_id.
Functions ¶
func BuildEmbeddingInput ¶
BuildEmbeddingInput prepares text for vector embedding. Uses a pre-allocated strings.Builder to reduce allocations on this hot path.
We feed the model raw, natural text: only TrimSpace and Unicode NFC normalization are applied. Case, diacritics, and punctuation are preserved so the model retains semantic clues (e.g. "US" vs "us", "résumé" vs "resume").
Arguments:
- fieldLabel: The "question" or metadata key (e.g. "What is your reasoning?").
- valueText: The "answer" or main content (e.g. "I chose option B because...").
- prefix: Model-specific task instruction; OpenAI and Google use "".
Returns formatted string: "[prefix]Question: [label]\nAnswer: [value]" (or "[prefix][value]" when label is empty).
func DecodeSearchCursor ¶
DecodeSearchCursor parses an opaque cursor and returns (distance, feedbackRecordID). Returns ErrInvalidCursor if the cursor is malformed.
func EmbeddingPrefixForProvider ¶
EmbeddingPrefixForProvider returns the document prefix for the given embedding provider (from registry). Returns "" for unknown providers.
func EncodeSearchCursor ¶
EncodeSearchCursor returns an opaque cursor for the next page. distance is the cosine distance (e.embedding <=> query) of the last result row; id is that row's feedback_record_id.
func NormalizeEmbeddingProvider ¶
NormalizeEmbeddingProvider returns the canonical provider name (lowercase, trimmed).
func NormalizeTranslationProvider ¶
NormalizeTranslationProvider returns the canonical provider name (lowercase, trimmed).
func ProviderRequiresAPIKey ¶
ProviderRequiresAPIKey returns true for providers that require EMBEDDING_PROVIDER_API_KEY (from registry).
func ProviderRequiresGoogleGeminiConfig ¶
ProviderRequiresGoogleGeminiConfig returns true for providers that require Google Cloud project and location.
func SupportedEmbeddingProviders ¶
func SupportedEmbeddingProviders() map[string]struct{}
SupportedEmbeddingProviders returns the set of supported provider names (from registry).
func TenantIDFromEventData ¶
TenantIDFromEventData extracts tenant_id from known event payload shapes.
func TenantIDPointerFromEventData ¶
TenantIDPointerFromEventData returns a detached pointer so it can be safely stored in job args.
func ValidateEmbeddingConfig ¶
func ValidateEmbeddingConfig(cfg EmbeddingClientConfig) error
ValidateEmbeddingConfig checks provider support and provider-specific requirements (API key, Google Cloud project/location). Use before creating a client or at startup to fail fast with a clear error.
func ValidateTranslationConfig ¶
func ValidateTranslationConfig(cfg TranslationClientConfig) error
ValidateTranslationConfig checks provider support and provider-specific requirements. Use before creating a client or at startup to fail fast with a clear error.
Types ¶
type CachedTenantSettings ¶
type CachedTenantSettings struct {
// contains filtered or unexported fields
}
CachedTenantSettings wraps a TenantSettingsReader with a per-process, size-bounded, TTL-expiring LRU cache. Feedback-record creation is high volume and the translation enqueue gate must resolve a tenant's target language per event, so caching avoids a tenant_settings read on every feedback event. Staleness is bounded by the TTL; because the worker persists the target it actually used, a changed target self-corrects on the next write. Safe for concurrent use (expirable.LRU is internally locked).
func NewCachedTenantSettings ¶
func NewCachedTenantSettings( delegate TenantSettingsReader, size int, ttl time.Duration, metrics observability.CacheMetrics, ) *CachedTenantSettings
NewCachedTenantSettings wraps delegate with an LRU of at most size entries, each expiring after ttl. A non-positive size or ttl disables caching (every read hits the delegate), keeping small deployments and tests simple.
func (*CachedTenantSettings) GetSettings ¶
func (c *CachedTenantSettings) GetSettings( ctx context.Context, tenantID string, ) (*models.TenantSettings, error)
GetSettings returns the tenant's settings, serving a fresh cached value when present and otherwise loading from the delegate and caching the result. Errors are never cached.
func (*CachedTenantSettings) Invalidate ¶
func (c *CachedTenantSettings) Invalidate(tenantID string)
Invalidate evicts the tenant's cached settings so the next GetSettings reloads from the delegate. Called after a settings write so a change (e.g. a newly enabled target language) is visible immediately instead of only after TTL expiry — otherwise records created in the staleness window are skipped by the translation enqueue gate. Eviction is per-process (it refreshes the replica that handled the write); other replicas stay TTL-bounded. No-op when caching is off.
func (*CachedTenantSettings) OnSettingsChanged ¶
func (c *CachedTenantSettings) OnSettingsChanged(_ context.Context, tenantID string, _ []string)
OnSettingsChanged implements SettingsChangeListener: any successful settings write for a tenant evicts that tenant's cached entry. Registered alongside the enrichment backfill listener so a settings change both triggers backfills and refreshes this read cache.
type EmbeddingClient ¶
type EmbeddingClient interface {
CreateEmbedding(ctx context.Context, input string) ([]float32, error)
CreateEmbeddingForQuery(ctx context.Context, input string) ([]float32, error)
}
EmbeddingClient generates embedding vectors for text. CreateEmbedding is for embedding documents (e.g. feedback records) for storage. CreateEmbeddingForQuery is for embedding search queries; some providers (e.g. Google) use a different task type for asymmetric retrieval.
func NewEmbeddingClient ¶
func NewEmbeddingClient(ctx context.Context, cfg EmbeddingClientConfig) (EmbeddingClient, error)
NewEmbeddingClient creates an EmbeddingClient for the given config. Validates provider-specific requirements via the registry, then calls the registry factory.
type EmbeddingClientConfig ¶
type EmbeddingClientConfig struct {
Provider string
ProviderAPIKey string // API key for openai/google providers; not logged or serialized
Model string
BaseURL string
Normalize bool
GoogleCloudProject string
GoogleCloudLocation string
}
EmbeddingClientConfig holds configuration for creating an embedding client.
type EmbeddingProvider ¶
type EmbeddingProvider struct {
// contains filtered or unexported fields
}
EmbeddingProvider implements eventPublisher by enqueueing one River job per feedback record event when the event is FeedbackRecordCreated (with non-empty value_text) or FeedbackRecordUpdated (with value_text in ChangedFields, including when value_text is now empty so the worker can clear).
func NewEmbeddingProvider ¶
func NewEmbeddingProvider( inserter RiverJobInserter, apiKey string, model string, queueName string, maxAttempts int, docPrefix string, metrics observability.EmbeddingMetrics, ) *EmbeddingProvider
NewEmbeddingProvider creates a provider that enqueues feedback_embedding jobs. model is the embedding model name (e.g. text-embedding-3-small) from EMBEDDING_MODEL. docPrefix is the prefix for document text (from EmbeddingPrefixForProvider); use "" for OpenAI/Google. metrics may be nil when metrics are disabled.
func (*EmbeddingProvider) PublishEvent ¶
func (p *EmbeddingProvider) PublishEvent(ctx context.Context, event Event)
PublishEvent enqueues a feedback_embedding job when the event is FeedbackRecordCreated (with non-empty value_text) or FeedbackRecordUpdated (with value_text in ChangedFields). On update, the job is enqueued even when value_text is now empty so the worker can clear the embedding for text fields. API key is required for openai and google (validated at startup).
type EmbeddingsRepository ¶
type EmbeddingsRepository interface {
Upsert(ctx context.Context, feedbackRecordID uuid.UUID, model string, embedding []float32) error
DeleteByFeedbackRecordAndModel(ctx context.Context, feedbackRecordID uuid.UUID, model string) error
ListFeedbackRecordIDsForBackfill(
ctx context.Context, model string, afterID uuid.UUID, limit int,
) ([]uuid.UUID, error)
}
EmbeddingsRepository defines the interface for embeddings table access.
type EmbeddingsRepositoryForSearch ¶
type EmbeddingsRepositoryForSearch interface {
GetEmbeddingAndTenantByFeedbackRecordAndModel(
ctx context.Context, feedbackRecordID uuid.UUID, model string,
) ([]float32, string, error)
NearestFeedbackRecordsByEmbedding(
ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int, excludeID *uuid.UUID, minScore float64,
) ([]models.FeedbackRecordWithScore, bool, error)
NearestFeedbackRecordsByEmbeddingAfterCursor(
ctx context.Context, model string, queryEmbedding []float32, tenantID string, limit int,
lastDistance float64, lastFeedbackRecordID uuid.UUID, excludeID *uuid.UUID, minScore float64,
) ([]models.FeedbackRecordWithScore, bool, error)
}
EmbeddingsRepositoryForSearch provides the embedding read operations needed for semantic search. HasMore is true when there may be additional results (full page returned or full fetch limit consumed).
type EnrichmentSettingsListener ¶
type EnrichmentSettingsListener struct {
// contains filtered or unexported fields
}
EnrichmentSettingsListener implements SettingsChangeListener by dispatching each changed setting key to the enrichment backfill it triggers. The dispatch table is built from what the deployment has enabled (e.g. translation), so a disabled enrichment registers no handler and an unknown/irrelevant key is ignored.
It deliberately does not use the webhook MessagePublisher: this is an internal, best-effort side-effect, not a customer-facing event. Enqueue errors are logged and swallowed — the global backfill command remains the guaranteed recovery path.
Adding a future setting is one map entry (e.g. a "sentiment_enabled" → sentiment backfill handler); TenantSettingsService does not change.
func NewTranslationSettingsListener ¶
func NewTranslationSettingsListener( inserter RiverJobInserter, queueName string, maxAttempts int, ) *EnrichmentSettingsListener
NewTranslationSettingsListener builds a listener that, on a target_language change, enqueues a per-tenant translation backfill job (TenantTranslationBackfillArgs) via inserter. queueName/maxAttempts configure that fan-out job.
func (*EnrichmentSettingsListener) OnSettingsChanged ¶
func (l *EnrichmentSettingsListener) OnSettingsChanged(ctx context.Context, tenantID string, changedKeys []string)
OnSettingsChanged dispatches each changed key to its enrichment backfill handler, if one is registered. Errors are logged and swallowed (the settings write already succeeded).
type Event ¶
type Event struct {
ID uuid.UUID // Unique event id (UUID v7, time-ordered)
Type datatypes.EventType // Event type enum (e.g., FeedbackRecordCreated, WebhookCreated)
Timestamp time.Time // Event creation time
Data any // Event data (FeedbackRecord, Webhook, etc.)
ChangedFields []string // Only for updates
}
Event represents an event that can be published to message providers (webhooks, email, etc.)
type FeedbackEmbeddingArgs ¶
type FeedbackEmbeddingArgs struct {
FeedbackRecordID uuid.UUID `json:"feedback_record_id" river:"unique"`
// Model is the embedding model name; stored in embeddings.model.
Model string `json:"model" river:"unique"`
// ValueTextHash is a hash of the input (trimmed value_text, or "empty"/"backfill") for dedupe semantics.
ValueTextHash string `json:"value_text_hash" river:"unique"`
}
FeedbackEmbeddingArgs is the job payload for generating and storing an embedding for one feedback record. Used by EmbeddingProvider and the backfill flow to enqueue, and by FeedbackEmbeddingWorker to run. Uniqueness is by (FeedbackRecordID, Model, ValueTextHash) so that edits within the uniqueness window get a new job when value_text changes; same content within 24h is deduped; one job per record+model.
func (FeedbackEmbeddingArgs) Kind ¶
func (FeedbackEmbeddingArgs) Kind() string
Kind returns the River job kind.
type FeedbackRecordsRepository ¶
type FeedbackRecordsRepository interface {
Create(ctx context.Context, req *models.CreateFeedbackRecordRequest) (*models.FeedbackRecord, error)
GetByID(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error)
List(ctx context.Context, filters *models.ListFeedbackRecordsFilters) ([]models.FeedbackRecord, bool, error)
ListAfterCursor(
ctx context.Context, filters *models.ListFeedbackRecordsFilters,
cursorCollectedAt time.Time, cursorID uuid.UUID,
) ([]models.FeedbackRecord, bool, error)
Update(ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest) (*models.FeedbackRecord, error)
SetTranslation(ctx context.Context, feedbackRecordID uuid.UUID, translated *string, langKey, defaultLang string) error
ListTranslationBackfillTargets(
ctx context.Context, afterID uuid.UUID, limit int, defaultLang string,
) ([]models.TranslationBackfillTarget, error)
ListTranslationBackfillTargetsForTenant(
ctx context.Context, tenantID string, afterID uuid.UUID, limit int, defaultLang string,
) ([]models.TranslationBackfillTarget, error)
Delete(ctx context.Context, id uuid.UUID) error
DeleteByUser(ctx context.Context, filters *models.DeleteFeedbackRecordsByUserFilters) ([]models.DeletedFeedbackRecordsByTenant, error)
}
FeedbackRecordsRepository defines the interface for feedback records data access.
type FeedbackRecordsService ¶
type FeedbackRecordsService struct {
// contains filtered or unexported fields
}
FeedbackRecordsService handles business logic for feedback records.
func NewFeedbackRecordsService ¶
func NewFeedbackRecordsService( repo FeedbackRecordsRepository, embeddingsRepo EmbeddingsRepository, embeddingModel string, publisher MessagePublisher, embeddingInserter RiverJobInserter, embeddingQueueName string, embeddingMaxAttempts int, translationDefaultLang string, ) *FeedbackRecordsService
NewFeedbackRecordsService creates a new feedback records service. publisher may be nil when the service is used only for backfill (BackfillEmbeddings does not use the publisher). embeddingInserter and embeddingQueueName are optional (for backfill); when nil/empty, BackfillEmbeddings returns an error. Call SetEmbeddingInserter after the River client is created to enable backfill without building the service twice. embeddingsRepo and embeddingModel are required for SetEmbedding and BackfillEmbeddings (from EMBEDDING_PROVIDER and EMBEDDING_MODEL). translationDefaultLang (TRANSLATION_DEFAULT_LANGUAGE) is the fallback target for tenants with no target_language of their own; "" disables the fallback. It governs the SetTranslation write-guard and both translation backfills, so pass it wherever translation work runs.
func (*FeedbackRecordsService) BackfillEmbeddings ¶
BackfillEmbeddings enqueues embedding jobs for the given model for all feedback records that have non-empty value_text and no embedding row for that model (existing rows are replaced by upsert when the job runs). It streams the records in keyset pages. Returns the number of jobs enqueued. Requires embeddingInserter and embeddingQueueName to be set.
func (*FeedbackRecordsService) BackfillTranslations ¶
func (s *FeedbackRecordsService) BackfillTranslations( ctx context.Context, inserter RiverJobInserter, queueName string, maxAttempts int, ) (int, error)
BackfillTranslations enqueues a translation job for every feedback record (across all tenants) that needs (re)translation, streaming the targets in keyset pages. Used by the one-off global backfill command. Returns the number of jobs enqueued.
func (*FeedbackRecordsService) BackfillTranslationsForTenant ¶
func (s *FeedbackRecordsService) BackfillTranslationsForTenant( ctx context.Context, inserter RiverJobInserter, queueName string, maxAttempts int, tenantID string, ) (int, error)
BackfillTranslationsForTenant enqueues a translation job for every record of a single tenant that needs (re)translation, streaming in keyset pages so a large tenant is never fully materialized. It is the bulk work behind a settings-change re-translation (TenantTranslationBackfillArgs). Returns the number of jobs enqueued.
func (*FeedbackRecordsService) CreateFeedbackRecord ¶
func (s *FeedbackRecordsService) CreateFeedbackRecord( ctx context.Context, req *models.CreateFeedbackRecordRequest, ) (*models.FeedbackRecord, error)
CreateFeedbackRecord creates a new feedback record.
func (*FeedbackRecordsService) DeleteFeedbackRecord ¶
DeleteFeedbackRecord deletes a feedback record by ID. Publishes FeedbackRecordDeleted with tenant-aware deleted IDs for webhook isolation.
func (*FeedbackRecordsService) DeleteFeedbackRecordsByUser ¶
func (s *FeedbackRecordsService) DeleteFeedbackRecordsByUser( ctx context.Context, filters *models.DeleteFeedbackRecordsByUserFilters, ) (int, error)
DeleteFeedbackRecordsByUser deletes all feedback records matching user_id. When tenant_id is provided, deletion is restricted to that tenant; otherwise all user records are deleted. It publishes one tenant-aware FeedbackRecordDeleted event per tenant represented in the deleted rows.
func (*FeedbackRecordsService) GetFeedbackRecord ¶
func (s *FeedbackRecordsService) GetFeedbackRecord(ctx context.Context, id uuid.UUID) (*models.FeedbackRecord, error)
GetFeedbackRecord retrieves a single feedback record by ID.
func (*FeedbackRecordsService) ListFeedbackRecords ¶
func (s *FeedbackRecordsService) ListFeedbackRecords( ctx context.Context, filters *models.ListFeedbackRecordsFilters, ) (*models.ListFeedbackRecordsResponse, error)
ListFeedbackRecords retrieves a list of feedback records with optional filters. Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages.
func (*FeedbackRecordsService) SetEmbedding ¶
func (s *FeedbackRecordsService) SetEmbedding( ctx context.Context, feedbackRecordID uuid.UUID, model string, embedding []float32, ) error
SetEmbedding sets or clears the embedding for a feedback record and model (internal use by embeddings worker). If embedding is nil, the row for (feedbackRecordID, model) is deleted; otherwise upserted. It does not publish an event.
func (*FeedbackRecordsService) SetEmbeddingInserter ¶
func (s *FeedbackRecordsService) SetEmbeddingInserter(inserter RiverJobInserter)
SetEmbeddingInserter sets the River inserter for embedding jobs (e.g. after River client is created). This allows a single service instance to be used by both handlers and the embedding worker.
func (*FeedbackRecordsService) SetTranslation ¶
func (s *FeedbackRecordsService) SetTranslation( ctx context.Context, feedbackRecordID uuid.UUID, translated *string, langKey string, ) error
SetTranslation persists the translated value_text and the target locale key for a feedback record. It is the accessor the translation worker uses; the write is tenant-write-locked in the repository and publishes no event (no enrichment loop).
func (*FeedbackRecordsService) UpdateFeedbackRecord ¶
func (s *FeedbackRecordsService) UpdateFeedbackRecord( ctx context.Context, id uuid.UUID, req *models.UpdateFeedbackRecordRequest, ) (*models.FeedbackRecord, error)
UpdateFeedbackRecord updates an existing feedback record.
type FeedbackTranslationArgs ¶
type FeedbackTranslationArgs struct {
FeedbackRecordID uuid.UUID `json:"feedback_record_id" river:"unique"`
// TargetLang is the tenant's configured target language (BCP-47) at enqueue time.
TargetLang string `json:"target_lang" river:"unique"`
// ValueTextHash is a hash of the inputs that determine the translation — the
// normalized value_text and the source language — or "empty" when value_text is blank.
ValueTextHash string `json:"value_text_hash" river:"unique"`
}
FeedbackTranslationArgs is the job payload for translating one feedback record's value_text into the tenant's target language. Uniqueness is by (FeedbackRecordID, TargetLang, ValueTextHash): a value_text edit or a target-language change yields a new job, while identical content for the same target within the window is deduped.
func (FeedbackTranslationArgs) Kind ¶
func (FeedbackTranslationArgs) Kind() string
Kind returns the River job kind.
type ListPaginationMeta ¶
ListPaginationMeta holds pagination metadata for list endpoints (feedback records, webhooks).
func BuildListPaginationMeta ¶
func BuildListPaginationMeta( limit int, hasMore bool, encodeLast func() (string, error), ) (ListPaginationMeta, error)
BuildListPaginationMeta builds pagination metadata for cursor-based list responses. hasMore indicates a sentinel row was fetched (limit+1 returned, trimmed to limit). encodeLast is called only when hasMore is true to produce next_cursor. Callers must ensure that when hasMore is true, the underlying list is non-empty so encodeLast can safely access the last item.
type MessagePublisher ¶
type MessagePublisher interface {
// PublishEvent publishes a single event with data (no changed fields)
PublishEvent(ctx context.Context, eventType datatypes.EventType, data any)
// PublishEventWithChangedFields publishes a single event with data and optional changed fields (for updates)
PublishEventWithChangedFields(ctx context.Context, eventType datatypes.EventType, data any, changedFields []string)
}
MessagePublisher defines the interface for publishing events.
type MessagePublisherManager ¶
type MessagePublisherManager struct {
// contains filtered or unexported fields
}
MessagePublisherManager coordinates multiple message providers.
func NewMessagePublisherManager ¶
func NewMessagePublisherManager( bufferSize int, perEventTimeout time.Duration, metrics observability.EventMetrics, ) *MessagePublisherManager
NewMessagePublisherManager creates a new message publisher manager. bufferSize is the event channel capacity; perEventTimeout limits how long processing one event may take. metrics may be nil when metrics are disabled.
func (*MessagePublisherManager) PublishEvent ¶
func (m *MessagePublisherManager) PublishEvent(ctx context.Context, eventType datatypes.EventType, data any)
PublishEvent publishes an event with data to all registered providers (convenience for no changed fields).
func (*MessagePublisherManager) PublishEventWithChangedFields ¶
func (m *MessagePublisherManager) PublishEventWithChangedFields( ctx context.Context, eventType datatypes.EventType, data any, changedFields []string, )
PublishEventWithChangedFields publishes an event with data to all registered providers.
func (*MessagePublisherManager) RegisterProvider ¶
func (m *MessagePublisherManager) RegisterProvider(provider eventPublisher)
RegisterProvider registers a message provider (webhooks, email, SMS, etc.). Must only be called during startup, before any events are published.
func (*MessagePublisherManager) Shutdown ¶
func (m *MessagePublisherManager) Shutdown()
Shutdown stops the background worker and waits for the buffer to drain.
type NewTaxonomyServiceParams ¶
type NewTaxonomyServiceParams struct {
Repo TaxonomyRepository
Starter TaxonomyRunStarter
EmbeddingModel string
MinimumEmbeddingCount int
}
NewTaxonomyServiceParams configures a TaxonomyService.
type RiverJobInserter ¶
type RiverJobInserter interface {
Insert(ctx context.Context, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
}
RiverJobInserter inserts a single River job. It is the shared seam the enrichment providers (embedding, translation) and their backfill flows use to enqueue work; satisfied by the River client.
type SearchResult ¶
type SearchResult struct {
Results []models.FeedbackRecordWithScore
NextCursor string // non-empty if there may be a next page (len(Results) == requested limit)
}
SearchResult holds the results and optional next-page cursor from semantic search or similar feedback.
type SearchService ¶
type SearchService struct {
// contains filtered or unexported fields
}
SearchService performs semantic search and similar-feedback lookups using embeddings.
func NewSearchService ¶
func NewSearchService(p SearchServiceParams) *SearchService
NewSearchService creates a SearchService.
func (*SearchService) SemanticSearch ¶
func (s *SearchService) SemanticSearch( ctx context.Context, query, tenantID string, limit int, minScore float64, cursor string, ) (SearchResult, error)
SemanticSearch returns feedback record IDs and similarity scores for the given query, scoped to tenantID. Requires non-empty tenantID and non-empty (after trim) query. Uses cursor-based pagination. minScore is the minimum similarity score (0..1). NextCursor is set when there may be a next page.
func (*SearchService) SimilarFeedback ¶
func (s *SearchService) SimilarFeedback( ctx context.Context, feedbackRecordID uuid.UUID, limit int, minScore float64, cursor string, ) (SearchResult, error)
SimilarFeedback returns feedback record IDs and similarity scores for records similar to the given one. The tenant boundary is derived from the source record before running nearest-neighbor search. Returns ErrEmbeddingNotFound when the record has no embedding for the current model. Uses cursor-based pagination.
type SearchServiceParams ¶
type SearchServiceParams struct {
EmbeddingClient EmbeddingClient
EmbeddingsRepo EmbeddingsRepositoryForSearch
Model string
QueryCache *lru.Cache[string, []float32]
CacheMetrics observability.CacheMetrics
Logger *slog.Logger
}
SearchServiceParams configures SearchService. QueryCache and CacheMetrics may be nil (no caching).
type SettingsChangeListener ¶
type SettingsChangeListener interface {
OnSettingsChanged(ctx context.Context, tenantID string, changedKeys []string)
}
SettingsChangeListener is notified after a tenant's settings are successfully written, with the setting keys the write touched. It lets enrichment side-effects (e.g. re-translation) react to a settings change without TenantSettingsService depending on any enrichment concern — the service depends only on this translation-free port.
Implementations do the reaction themselves (a fast, durable enqueue) and own their error handling: the method returns nothing because the settings write has already committed and the side-effect must never fail it.
func NewCompositeSettingsChangeListener ¶
func NewCompositeSettingsChangeListener(listeners ...SettingsChangeListener) SettingsChangeListener
NewCompositeSettingsChangeListener combines listeners into one that notifies each in order.
type TaxonomyClient ¶
type TaxonomyClient struct {
// contains filtered or unexported fields
}
TaxonomyClient calls the standalone taxonomy service.
func NewTaxonomyClient ¶
func NewTaxonomyClient(cfg TaxonomyClientConfig, httpClient *http.Client) (*TaxonomyClient, error)
NewTaxonomyClient creates a Hub-to-taxonomy-service client.
type TaxonomyClientConfig ¶
TaxonomyClientConfig configures the outbound client Hub uses to call the taxonomy service.
type TaxonomyRepository ¶
type TaxonomyRepository interface {
ListFieldOptions(ctx context.Context, tenantID, embeddingModel string) ([]models.TaxonomyFieldOption, error)
CountScopeInput(ctx context.Context, scope models.TaxonomyScope, embeddingModel string) (int, int, *string, error)
CreateRunIfAvailable(ctx context.Context, params repository.CreateTaxonomyRunParams) (*models.TaxonomyRun, bool, error)
MarkRunRunning(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyRun, error)
MarkRunFailed(
ctx context.Context,
runID uuid.UUID,
tenantID string,
message string,
errorCode models.TaxonomyRunFailureCode,
) (*models.TaxonomyRun, error)
GetRunForInternalService(ctx context.Context, runID uuid.UUID) (*models.TaxonomyRun, error)
GetRunForTenant(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyRun, error)
GetActiveRun(ctx context.Context, scope models.TaxonomyScope) (*models.TaxonomyRun, error)
ListRuns(ctx context.Context, filters models.ListTaxonomyRunsFilters) ([]models.TaxonomyRun, error)
GetRunInput(
ctx context.Context,
runID uuid.UUID,
tenantID string,
embeddingModel string,
) (*models.TaxonomyRunInputResponse, error)
StoreResultAndActivate(
ctx context.Context,
runID uuid.UUID,
tenantID string,
req models.TaxonomyRunResultRequest,
) (*models.TaxonomyRun, error)
GetTree(ctx context.Context, runID uuid.UUID, tenantID string) (*models.TaxonomyTreeResponse, error)
RenameNode(ctx context.Context, nodeID uuid.UUID, tenantID, actorID, label string) (*models.TaxonomyNode, error)
RemoveNode(ctx context.Context, nodeID uuid.UUID, tenantID, actorID string) (*models.TaxonomyNode, error)
ListNodeRecords(ctx context.Context, nodeID uuid.UUID, tenantID string, limit int) ([]models.FeedbackRecord, int, error)
}
TaxonomyRepository persists taxonomy run state and generated artifacts.
type TaxonomyRunStarter ¶
TaxonomyRunStarter starts asynchronous taxonomy compute work.
type TaxonomyService ¶
type TaxonomyService struct {
// contains filtered or unexported fields
}
TaxonomyService coordinates taxonomy run lifecycle and edits.
func NewTaxonomyService ¶
func NewTaxonomyService(params NewTaxonomyServiceParams) *TaxonomyService
NewTaxonomyService creates a taxonomy application service.
func (*TaxonomyService) CompleteRun ¶
func (s *TaxonomyService) CompleteRun( ctx context.Context, runID uuid.UUID, req models.TaxonomyRunResultRequest, ) (*models.TaxonomyRun, error)
CompleteRun stores taxonomy output and activates the successful run.
func (*TaxonomyService) FailRun ¶
func (s *TaxonomyService) FailRun( ctx context.Context, runID uuid.UUID, message string, errorCode models.TaxonomyRunFailureCode, ) (*models.TaxonomyRun, error)
FailRun records a taxonomy run failure.
func (*TaxonomyService) GetActiveTree ¶
func (s *TaxonomyService) GetActiveTree( ctx context.Context, scope models.TaxonomyScope, ) (*models.TaxonomyTreeResponse, error)
GetActiveTree returns the active taxonomy tree for a field scope.
func (*TaxonomyService) GetRun ¶
func (s *TaxonomyService) GetRun( ctx context.Context, runID uuid.UUID, tenantID string, ) (*models.TaxonomyRun, error)
GetRun returns a taxonomy run by ID.
func (*TaxonomyService) GetRunInput ¶
func (s *TaxonomyService) GetRunInput( ctx context.Context, runID uuid.UUID, ) (*models.TaxonomyRunInputResponse, error)
GetRunInput returns feedback text and embeddings for the taxonomy service.
func (*TaxonomyService) GetTree ¶
func (s *TaxonomyService) GetTree( ctx context.Context, runID uuid.UUID, tenantID string, ) (*models.TaxonomyTreeResponse, error)
GetTree returns a taxonomy tree by run ID.
func (*TaxonomyService) ListFieldOptions ¶
func (s *TaxonomyService) ListFieldOptions( ctx context.Context, tenantID string, ) (*models.TaxonomyFieldsResponse, error)
ListFieldOptions returns feedback fields that can run taxonomy generation.
func (*TaxonomyService) ListNodeRecords ¶
func (s *TaxonomyService) ListNodeRecords( ctx context.Context, nodeID uuid.UUID, filters models.TaxonomyNodeRecordsFilters, ) (*models.TaxonomyNodeRecordsResponse, error)
ListNodeRecords returns feedback records assigned to a taxonomy node.
func (*TaxonomyService) ListRuns ¶
func (s *TaxonomyService) ListRuns( ctx context.Context, filters models.ListTaxonomyRunsFilters, ) (*models.ListTaxonomyRunsResponse, error)
ListRuns returns taxonomy run history for a scoped tenant.
func (*TaxonomyService) RemoveNode ¶
func (s *TaxonomyService) RemoveNode( ctx context.Context, nodeID uuid.UUID, filters models.RemoveTaxonomyNodeFilters, ) (*models.TaxonomyNode, error)
RemoveNode soft-removes a taxonomy node.
func (*TaxonomyService) RenameNode ¶
func (s *TaxonomyService) RenameNode( ctx context.Context, nodeID uuid.UUID, req models.RenameTaxonomyNodeRequest, ) (*models.TaxonomyNode, error)
RenameNode renames a taxonomy node.
func (*TaxonomyService) StartManualRun ¶
func (s *TaxonomyService) StartManualRun( ctx context.Context, req models.CreateTaxonomyRunRequest, ) (*models.CreateTaxonomyRunResponse, error)
StartManualRun creates and starts a manual taxonomy generation run.
type TenantDataRepository ¶
type TenantDataRepository interface {
DeleteByTenant(ctx context.Context, tenantID string) (*models.TenantDataDeleteCounts, error)
}
TenantDataRepository defines tenant data purge access.
type TenantDataService ¶
type TenantDataService struct {
// contains filtered or unexported fields
}
TenantDataService handles tenant data purge business logic.
func NewTenantDataService ¶
func NewTenantDataService(repo TenantDataRepository) *TenantDataService
NewTenantDataService creates a new tenant data service.
func (*TenantDataService) DeleteTenantData ¶
func (s *TenantDataService) DeleteTenantData(ctx context.Context, tenantID string) (*models.TenantDataDeleteResult, error)
DeleteTenantData deletes all Hub-owned data for a tenant.
type TenantSettingsReader ¶
type TenantSettingsReader interface {
GetSettings(ctx context.Context, tenantID string) (*models.TenantSettings, error)
}
TenantSettingsReader is the read surface the cache wraps. *TenantSettingsService satisfies it, so the cache is a drop-in for any consumer that only needs reads (the translation enqueue gate and worker).
type TenantSettingsRepository ¶
type TenantSettingsRepository interface {
Get(ctx context.Context, tenantID string) (*models.TenantSettings, bool, error)
Upsert(ctx context.Context, tenantID string, settings models.EnrichmentSettings) (*models.TenantSettings, error)
// Patch merges set into the tenant's settings and removes removeKeys (an RFC
// 7396 merge patch: set + delete); the two are disjoint.
Patch(
ctx context.Context, tenantID string, set models.EnrichmentSettings, removeKeys []string,
) (*models.TenantSettings, error)
}
TenantSettingsRepository is the persistence surface the settings service needs.
type TenantSettingsService ¶
type TenantSettingsService struct {
// contains filtered or unexported fields
}
TenantSettingsService reads and writes tenant-scoped enrichment settings. It is the accessor enrichment workflows will use to resolve a tenant's configuration.
func NewTenantSettingsService ¶
func NewTenantSettingsService(repo TenantSettingsRepository) *TenantSettingsService
NewTenantSettingsService creates a new tenant settings service.
func (*TenantSettingsService) GetSettings ¶
func (s *TenantSettingsService) GetSettings(ctx context.Context, tenantID string) (*models.TenantSettings, error)
GetSettings returns the tenant's enrichment settings. When the tenant has no settings yet it returns a zero-value settings bag (target language unset) rather than a not-found error: an unconfigured tenant is a valid state, and consumers decide the fallback behavior. The lookup is always scoped to the normalized tenant_id.
func (*TenantSettingsService) PatchSettings ¶
func (s *TenantSettingsService) PatchSettings( ctx context.Context, tenantID string, req *models.PatchTenantSettingsRequest, ) (*models.TenantSettings, error)
PatchSettings applies an RFC 7396 JSON Merge Patch: a member present with a value sets that setting (validated and normalized), a member present with JSON null removes it, and an omitted member is left unchanged. It translates the typed request into the keys to set and the keys to remove, which are disjoint. The tenant_id comes from the request path and scopes the write to that tenant alone.
func (*TenantSettingsService) SetSettingsChangeListener ¶
func (s *TenantSettingsService) SetSettingsChangeListener(listener SettingsChangeListener)
SetSettingsChangeListener registers a listener notified after a successful settings write, used to trigger enrichment side-effects (e.g. a re-translation backfill on a target_language change). Optional; mirrors the post-construction injection of SetEmbeddingInserter. Nil means no side-effects fire.
func (*TenantSettingsService) UpdateSettings ¶
func (s *TenantSettingsService) UpdateSettings( ctx context.Context, tenantID string, req *models.UpdateTenantSettingsRequest, ) (*models.TenantSettings, error)
UpdateSettings validates and normalizes the requested settings, then upserts them for the tenant (full replace). The tenant_id is supplied by the caller (from the request path) and scopes the write to that tenant alone.
type TenantTranslationBackfillArgs ¶
type TenantTranslationBackfillArgs struct {
TenantID string `json:"tenant_id" river:"unique"`
}
TenantTranslationBackfillArgs fans out a re-translation backfill for one tenant: the worker lists the tenant's stale text records and enqueues a FeedbackTranslationArgs job for each. It is enqueued when a tenant's translation-relevant settings change (today: target_language).
Uniqueness is by TenantID across the default (in-flight) states — the enqueue site sets ByArgs without a ByPeriod — so rapid repeated settings changes collapse to a single in-flight backfill, while a change after the previous backfill has completed re-triggers. The worker resolves the tenant's current target at run time, so a coalesced backfill always targets the latest configured language.
func (TenantTranslationBackfillArgs) Kind ¶
func (TenantTranslationBackfillArgs) Kind() string
Kind returns the River job kind.
type TranslateRequest ¶
TranslateRequest is the input to a single translation. SourceLang and TargetLang are BCP-47 tags: TargetLang comes from the tenant's settings; SourceLang from the feedback record's language and may be empty when the source language is unknown.
type TranslationClient ¶
type TranslationClient interface {
Translate(ctx context.Context, req TranslateRequest) (string, error)
}
TranslationClient translates TranslateRequest.Text from SourceLang into TargetLang and returns the translated text. Implementations call an LLM provider (OpenAI or Google); the factory selects one from configuration. It mirrors the EmbeddingClient seam so the worker depends on the interface, not a provider.
func NewTranslationClient ¶
func NewTranslationClient(ctx context.Context, cfg TranslationClientConfig) (TranslationClient, error)
NewTranslationClient creates a TranslationClient for the given config. It validates provider-specific requirements via the registry, then calls the registry factory.
type TranslationClientConfig ¶
type TranslationClientConfig struct {
Provider string
ProviderAPIKey string // API key for openai/google providers; not logged or serialized
Model string
BaseURL string
GoogleCloudProject string
GoogleCloudLocation string
}
TranslationClientConfig holds configuration for creating a translation client.
type TranslationProvider ¶
type TranslationProvider struct {
// contains filtered or unexported fields
}
TranslationProvider implements eventPublisher by enqueueing one translation job per eligible feedback record event: FeedbackRecordCreated with non-empty open text, or FeedbackRecordUpdated whose value_text changed. A job is only enqueued when the record is a text field with non-empty value_text and the tenant has a target language configured (read via the settings resolver). The worker resolves the remaining work; ingestion is never blocked.
func NewTranslationProvider ¶
func NewTranslationProvider( inserter RiverJobInserter, resolver TenantSettingsReader, queueName string, maxAttempts int, defaultLang string, metrics observability.TranslationMetrics, ) *TranslationProvider
NewTranslationProvider creates a provider that enqueues feedback_translation jobs. metrics may be nil when metrics are disabled.
func (*TranslationProvider) PublishEvent ¶
func (p *TranslationProvider) PublishEvent(ctx context.Context, event Event)
PublishEvent enqueues a feedback_translation job for an eligible create/update event. Failures are logged and swallowed so they never block ingestion.
type WebhookDispatchArgs ¶
type WebhookDispatchArgs struct {
EventID uuid.UUID `json:"event_id" river:"unique"`
EventType string `json:"event_type"`
Timestamp time.Time `json:"timestamp"`
Data any `json:"data"`
ChangedFields []string `json:"changed_fields,omitempty"`
TenantID *string `json:"tenant_id,omitempty"`
WebhookID uuid.UUID `json:"webhook_id" river:"unique"`
}
WebhookDispatchArgs is the job payload for one (event, webhook) delivery. Used by WebhookProvider to enqueue and by WebhookDispatchWorker to run. Only event_id and webhook_id are used for River uniqueness (river:"unique") so the hash is fast and does not include the potentially large data payload.
func (WebhookDispatchArgs) Kind ¶
func (WebhookDispatchArgs) Kind() string
Kind returns the River job kind.
type WebhookDispatchInserter ¶
type WebhookDispatchInserter interface {
InsertMany(ctx context.Context, params []river.InsertManyParams) ([]*rivertype.JobInsertResult, error)
}
WebhookDispatchInserter inserts webhook_dispatch jobs in batch (e.g. River client).
type WebhookPayload ¶
type WebhookPayload struct {
ID uuid.UUID `json:"id"` // Unique event id (UUID v7)
Type string `json:"type"` // Event type as string (e.g., "feedback_record.created", "webhook.created")
Timestamp time.Time `json:"timestamp"` // Event creation timestamp
TenantID *string `json:"tenant_id,omitempty"` // Tenant boundary for the event
Data any `json:"data"` // Event data (FeedbackRecord, Webhook, etc.)
ChangedFields []string `json:"changed_fields,omitempty"` // Only for update events (optional)
}
WebhookPayload represents a generic webhook payload structure for all event types. The Data field can contain FeedbackRecord, Webhook, or other event data types.
func NewWebhookPayload ¶
func NewWebhookPayload(args WebhookDispatchArgs) *WebhookPayload
NewWebhookPayload builds the public webhook payload from internal dispatch args.
type WebhookProvider ¶
type WebhookProvider struct {
// contains filtered or unexported fields
}
WebhookProvider implements eventPublisher by enqueueing one River job per (event, webhook).
func NewWebhookProvider ¶
func NewWebhookProvider( inserter WebhookDispatchInserter, repo WebhookProviderRepository, maxAttempts, maxFanOut int, enqueueMaxRetries int, enqueueInitialBackoff, enqueueMaxBackoff time.Duration, metrics observability.WebhookMetrics, ) *WebhookProvider
NewWebhookProvider creates a provider that lists enabled webhooks and enqueues jobs via InsertMany. maxFanOut is the batch size for InsertMany (all matching webhooks are enqueued in batches of maxFanOut). enqueueMaxRetries, enqueueInitialBackoff, enqueueMaxBackoff configure retries when InsertMany fails (transient River/DB errors). metrics may be nil when metrics are disabled.
func (*WebhookProvider) PublishEvent ¶
func (p *WebhookProvider) PublishEvent(ctx context.Context, event Event)
PublishEvent lists enabled webhooks for the event type and tenant, then enqueues one job per webhook. Webhooks are only eligible when the event payload has the same tenant_id.
type WebhookProviderRepository ¶
type WebhookProviderRepository interface {
ListEnabledForEventTypeAndTenant(ctx context.Context, eventType string, tenantID *string) ([]models.Webhook, error)
}
WebhookProviderRepository lists tenant-scoped webhooks eligible for event fan-out.
type WebhookSender ¶
type WebhookSender interface {
Send(ctx context.Context, webhook *models.Webhook, payload *WebhookPayload) error
}
WebhookSender sends a single webhook payload to an endpoint (Standard Webhooks: signing, headers, 410 handling).
type WebhookSenderImpl ¶
type WebhookSenderImpl struct {
// contains filtered or unexported fields
}
WebhookSenderImpl implements WebhookSender with Standard Webhooks conformance.
func NewWebhookSenderImpl ¶
func NewWebhookSenderImpl( repo WebhookSenderRepository, metrics observability.WebhookMetrics, urlHostBlacklist map[string]struct{}, httpTimeout time.Duration, httpClient *http.Client, ) *WebhookSenderImpl
NewWebhookSenderImpl creates a sender that uses the given repo. urlHostBlacklist is the SSRF blacklist (hosts/IPs); may be nil (address checks still run). httpTimeout is the HTTP client timeout; job timeout should be httpTimeout + buffer (e.g. 5s). Client does not follow redirects and validates resolved IPs at dial time (DNS rebinding protection). metrics may be nil when metrics are disabled. If httpClient is non-nil, it is used as-is (e.g. for tests that hit loopback); otherwise a secured client is built.
func (*WebhookSenderImpl) Send ¶
func (s *WebhookSenderImpl) Send(ctx context.Context, webhook *models.Webhook, payload *WebhookPayload) error
Send signs and POSTs the payload to the webhook URL. On 410 Gone, disables the webhook and returns an error.
type WebhookSenderRepository ¶
type WebhookSenderRepository interface {
Update(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error)
}
WebhookSenderRepository persists webhook state changes caused by delivery.
type WebhooksRepository ¶
type WebhooksRepository interface {
Create(ctx context.Context, req *models.CreateWebhookRequest) (*models.Webhook, error)
GetByID(ctx context.Context, id uuid.UUID) (*models.Webhook, error)
List(ctx context.Context, filters *models.ListWebhooksFilters) ([]models.Webhook, bool, error)
ListAfterCursor(
ctx context.Context, filters *models.ListWebhooksFilters,
cursorCreatedAt time.Time, cursorID uuid.UUID,
) ([]models.Webhook, bool, error)
Count(ctx context.Context, filters *models.ListWebhooksFilters) (int64, error)
Update(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error)
Delete(ctx context.Context, id uuid.UUID) (*models.DeletedWebhook, error)
}
WebhooksRepository defines the interface for webhooks data access.
type WebhooksService ¶
type WebhooksService struct {
// contains filtered or unexported fields
}
WebhooksService handles business logic for webhooks.
func NewWebhooksService ¶
func NewWebhooksService( repo WebhooksRepository, publisher MessagePublisher, maxWebhooks int, urlHostBlacklist map[string]struct{}, ) *WebhooksService
NewWebhooksService creates a new webhooks service. urlHostBlacklist is a set of hostnames/IPs that cannot be used as webhook URLs (SSRF mitigation); may be nil for no restriction.
func (*WebhooksService) CreateWebhook ¶
func (s *WebhooksService) CreateWebhook(ctx context.Context, req *models.CreateWebhookRequest) (*models.Webhook, error)
CreateWebhook creates a new webhook.
func (*WebhooksService) DeleteWebhook ¶
DeleteWebhook deletes a webhook by ID. Publishes WebhookDeleted with tenant-aware deleted IDs.
func (*WebhooksService) GetWebhook ¶
GetWebhook retrieves a single webhook by ID.
func (*WebhooksService) ListWebhooks ¶
func (s *WebhooksService) ListWebhooks(ctx context.Context, filters *models.ListWebhooksFilters) (*models.ListWebhooksResponse, error)
ListWebhooks retrieves a list of webhooks with optional filters. Uses cursor-based pagination: omit cursor for first page, use next_cursor for subsequent pages.
func (*WebhooksService) UpdateWebhook ¶
func (s *WebhooksService) UpdateWebhook(ctx context.Context, id uuid.UUID, req *models.UpdateWebhookRequest) (*models.Webhook, error)
UpdateWebhook updates an existing webhook.
Source Files
¶
- embedding_client.go
- embedding_client_factory.go
- embedding_job_args.go
- embedding_provider.go
- enrichment_settings_listener.go
- feedback_records_service.go
- id_validation.go
- job_inserter.go
- message_publisher.go
- pagination.go
- search_cursor.go
- search_service.go
- settings_change_listener.go
- taxonomy_client.go
- taxonomy_service.go
- tenant_data_service.go
- tenant_settings_cache.go
- tenant_settings_service.go
- tenant_translation_backfill_job_args.go
- translation_client.go
- translation_client_factory.go
- translation_job_args.go
- translation_provider.go
- webhook_dispatch_args.go
- webhook_payload.go
- webhook_provider.go
- webhook_sender.go
- webhook_tenant.go
- webhooks_service.go