Documentation
¶
Index ¶
- Constants
- func AddOutput(name string, factory OutputFactory)
- func CreateMessageAttributes(modelId mdl.ModelId, typ string, version int) map[string]string
- func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory
- func GetSubscriberConfigKey(name string) string
- func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
- func GetSubscriberOutputConfigKey(name string) string
- func IsDelayOpError(err error) bool
- func IsUnknownModelError(err error) bool
- func IsUnknownModelVersionError(err error) bool
- func NewGenericTransformer[I any, M Model](transformer TypedTransformer[I, M]) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
- func NewSubscriberCallbackFactory(core SubscriberCore, sourceModel SubscriberModel, ...) stream.UntypedConsumerCallbackFactory
- func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
- func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error)
- func SubscriberFactory(ctx context.Context, config cfg.Config, logger log.Logger, ...) (map[string]kernel.ModuleFactory, error)
- type DelayOpError
- type FetchData
- type FixtureSet
- type FixtureSettings
- type FixtureSettingsDataset
- type Model
- type ModelDb
- type ModelSpecification
- type ModelTransformer
- type ModelTransformers
- type Output
- type OutputDb
- type OutputDdb
- type OutputFactory
- type OutputKvstore
- type Outputs
- type Publisher
- func NewPublisher(ctx context.Context, config cfg.Config, logger log.Logger, name string, ...) (Publisher, error)
- func NewPublisherWithInterfaces(logger log.Logger, producer stream.Producer, settings *PublisherSettings) Publisher
- func NewPublisherWithSettings(ctx context.Context, config cfg.Config, logger log.Logger, ...) (Publisher, error)
- type PublisherSettings
- type Settings
- type SubscriberCallback
- type SubscriberCore
- type SubscriberInputConfigPostProcessor
- type SubscriberModel
- type SubscriberOutputConfigPostProcessor
- type SubscriberSettings
- type TransformerFactory
- type TransformerMapTypeVersionFactories
- type TransformerMapVersionFactories
- type TypedTransformer
- type TypedTransformerFactory
- type UnknownModelError
- type UnknownModelVersionError
- type VersionedModelTransformers
Constants ¶
const ( AttributeModelId = "modelId" AttributeType = "type" AttributeVersion = "version" ConfigKeyMdlSubPublishers = "mdlsub.publishers" TypeCreate = "create" TypeUpdate = "update" TypeDelete = "delete" )
const ( ConfigKeyMdlSub = "mdlsub" ConfigKeyMdlSubSubscribers = "mdlsub.subscribers" )
const ( MetricNameSuccess = "ModelEventConsumeSuccess" MetricNameSkipped = "ModelEventConsumeSkipped" MetricNameFailure = "ModelEventConsumeFailure" )
const (
OutputTypeDb = "db"
)
const (
OutputTypeDdb = "ddb"
)
const (
OutputTypeKvstore = "kvstore"
)
Variables ¶
This section is empty.
Functions ¶
func AddOutput ¶ added in v0.26.0
func AddOutput(name string, factory OutputFactory)
func CreateMessageAttributes ¶
func FixtureSetFactory ¶ added in v0.26.0
func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory
func GetSubscriberConfigKey ¶
func GetSubscriberFQN ¶
func GetSubscriberFQN(name string, sourceModel SubscriberModel) string
func IsDelayOpError ¶
func IsUnknownModelError ¶ added in v0.54.6
func IsUnknownModelVersionError ¶ added in v0.54.6
func NewGenericTransformer ¶
func NewGenericTransformer[I any, M Model](transformer TypedTransformer[I, M]) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)
NewGenericTransformer removes the types from a TypedTransformer and turns a transformer value into a TransformerFactory of that value.
func NewSubscriberCallbackFactory ¶
func NewSubscriberCallbackFactory( core SubscriberCore, sourceModel SubscriberModel, persistGraceTime time.Duration, ) stream.UntypedConsumerCallbackFactory
func NewSubscriberFactory ¶
func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory
Types ¶
type DelayOpError ¶
type DelayOpError struct {
Err error
}
func NewDelayOpError ¶
func NewDelayOpError(err error) DelayOpError
func (DelayOpError) As ¶
func (e DelayOpError) As(target any) bool
func (DelayOpError) Error ¶
func (e DelayOpError) Error() string
func (DelayOpError) Unwrap ¶
func (e DelayOpError) Unwrap() error
type FetchData ¶ added in v0.26.0
type FetchData struct {
Data json.RawMessage `json:"data"`
}
type FixtureSet ¶ added in v0.26.0
type FixtureSet struct {
// contains filtered or unexported fields
}
func NewFixtureSet ¶ added in v0.26.0
func NewFixtureSet(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings) *FixtureSet
func NewFixtureSetWithInterfaces ¶ added in v0.26.0
func NewFixtureSetWithInterfaces(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings, httpClient *resty.Client) *FixtureSet
type FixtureSettings ¶ added in v0.26.0
type FixtureSettings struct {
Dataset FixtureSettingsDataset `cfg:"dataset"`
Host string `cfg:"host"`
Path string `cfg:"path"`
}
type FixtureSettingsDataset ¶ added in v0.37.0
type FixtureSettingsDataset struct {
Id int `cfg:"id"`
}
type ModelSpecification ¶
func (ModelSpecification) String ¶ added in v0.26.0
func (m ModelSpecification) String() string
type ModelTransformer ¶
type ModelTransformer interface {
GetModel() (any, error)
// contains filtered or unexported methods
}
A ModelTransformer performs the actual transformation work, but should not be directly be implemented by a developer. Instead, implement a TypedTransformer and convert it to a ModelTransformer using NewGenericTransformer, EraseTransformerFactoryTypes, or EraseTransformerTypes.
func EraseTransformerTypes ¶ added in v0.41.0
func EraseTransformerTypes[I any, M Model](transformer TypedTransformer[I, M]) ModelTransformer
EraseTransformerTypes takes a TypedTransformer and turns it into an untyped ModelTransformer.
type ModelTransformers ¶
type ModelTransformers map[string]VersionedModelTransformers
type OutputDb ¶
type OutputDb struct {
// contains filtered or unexported fields
}
func NewOutputDb ¶
type OutputDdb ¶
type OutputDdb struct {
// contains filtered or unexported fields
}
func NewOutputDdb ¶
type OutputFactory ¶
type OutputKvstore ¶
type OutputKvstore struct {
// contains filtered or unexported fields
}
func NewOutputKvstore ¶
func NewOutputKvstore(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) (*OutputKvstore, error)
type Publisher ¶
type Publisher interface {
PublishBatch(ctx context.Context, typ string, version int, values []any, customAttributes ...map[string]string) error
Publish(ctx context.Context, typ string, version int, value any, customAttributes ...map[string]string) error
}
func NewPublisher ¶
type PublisherSettings ¶
type Settings ¶
type Settings struct {
Subscribers map[string]*SubscriberSettings `cfg:"subscribers"`
}
type SubscriberCallback ¶
type SubscriberCallback struct {
// contains filtered or unexported fields
}
func NewSubscriberCallbackWithInterfaces ¶ added in v0.54.6
func NewSubscriberCallbackWithInterfaces( logger log.Logger, core SubscriberCore, sourceModel SubscriberModel, ) *SubscriberCallback
NewSubscriberCallbackWithInterfaces creates a SubscriberCallback for testing purposes
func (*SubscriberCallback) GetModel ¶
func (s *SubscriberCallback) GetModel(attributes map[string]string) (any, error)
func (*SubscriberCallback) GetSchemaSettings ¶ added in v0.51.0
func (s *SubscriberCallback) GetSchemaSettings() (*stream.SchemaSettings, error)
type SubscriberCore ¶ added in v0.26.0
type SubscriberCore interface {
GetModelIds() []string
GetLatestModelIdVersion(modelId mdl.ModelId) (int, error)
GetTransformer(spec *ModelSpecification) (ModelTransformer, error)
GetTransformersForModel(modelId mdl.ModelId) (VersionedModelTransformers, error)
GetOutput(spec *ModelSpecification) (Output, error)
Persist(ctx context.Context, spec *ModelSpecification, model Model) error
Transform(ctx context.Context, spec *ModelSpecification, input any) (Model, error)
}
func NewSubscriberCore ¶ added in v0.26.0
func NewSubscriberCore(ctx context.Context, config cfg.Config, logger log.Logger, subscriberSettings map[string]*SubscriberSettings, transformerFactories TransformerMapTypeVersionFactories) (SubscriberCore, error)
func NewSubscriberCoreWithInterfaces ¶ added in v0.26.0
func NewSubscriberCoreWithInterfaces(transformers ModelTransformers, outputs Outputs) SubscriberCore
type SubscriberModel ¶
func UnmarshalSubscriberSourceModel ¶
func UnmarshalSubscriberSourceModel(config cfg.Config, name string) (SubscriberModel, error)
type SubscriberSettings ¶
type SubscriberSettings struct {
Input string `cfg:"input" default:"sns"`
Output string `cfg:"output"`
PersistGraceTime time.Duration `cfg:"persist_grace_time" default:"10s" validate:"min=0"`
RunnerCount int `cfg:"runner_count" default:"10" validate:"min=1"`
SourceModel SubscriberModel `cfg:"source"`
TargetModel SubscriberModel `cfg:"target"`
}
type TransformerFactory ¶
type TransformerFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (ModelTransformer, error)
func EraseTransformerFactoryTypes ¶ added in v0.41.0
func EraseTransformerFactoryTypes[I any, M Model](transformerFactory TypedTransformerFactory[I, M]) TransformerFactory
EraseTransformerFactoryTypes takes a TypedTransformerFactory and turns it into an untyped transformer factory, allowing you to embed it into a list of transformers.
type TransformerMapTypeVersionFactories ¶
type TransformerMapTypeVersionFactories map[string]TransformerMapVersionFactories
type TransformerMapVersionFactories ¶
type TransformerMapVersionFactories map[int]TransformerFactory
type TypedTransformer ¶ added in v0.41.0
type TypedTransformer[I any, M Model] interface { // Transform converts the input into an output model. If Transform returns nil as the output, we don't persist the value. Transform(ctx context.Context, inp I) (out *M, err error) }
A TypedTransformer implements a subscriber. For every item it has to transform it into the corresponding persisted model. If it returns nil, the item will not be persisted. However, if the item has been persisted before and nil is returned, the item will not be updated or deleted.
type TypedTransformerFactory ¶ added in v0.41.0
type UnknownModelError ¶ added in v0.54.6
type UnknownModelError struct {
ModelId string
}
UnknownModelError is returned when a model id is not found in the transformer list.
func NewUnknownModelError ¶ added in v0.54.6
func NewUnknownModelError(modelId string) UnknownModelError
func (UnknownModelError) As ¶ added in v0.54.6
func (e UnknownModelError) As(target any) bool
func (UnknownModelError) Error ¶ added in v0.54.6
func (e UnknownModelError) Error() string
func (UnknownModelError) IsIgnorableWithSettings ¶ added in v0.54.6
func (e UnknownModelError) IsIgnorableWithSettings(settings stream.IgnoreOnGetModelErrorSettings) bool
IsIgnorableWithSettings implements stream.IgnorableGetModelError.
type UnknownModelVersionError ¶ added in v0.54.6
UnknownModelVersionError is returned when a version is not found for a model id in the transformer list.
func NewUnknownModelVersionError ¶ added in v0.54.6
func NewUnknownModelVersionError(modelId string, version int) UnknownModelVersionError
func (UnknownModelVersionError) As ¶ added in v0.54.6
func (e UnknownModelVersionError) As(target any) bool
func (UnknownModelVersionError) Error ¶ added in v0.54.6
func (e UnknownModelVersionError) Error() string
func (UnknownModelVersionError) IsIgnorableWithSettings ¶ added in v0.54.6
func (e UnknownModelVersionError) IsIgnorableWithSettings(settings stream.IgnoreOnGetModelErrorSettings) bool
IsIgnorableWithSettings implements stream.IgnorableGetModelError.
type VersionedModelTransformers ¶
type VersionedModelTransformers map[int]ModelTransformer