mdlsub

package
v0.55.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 18, 2026 License: MIT Imports: 26 Imported by: 0

Documentation

Index

Constants

View Source
const (
	AttributeModelId          = "modelId"
	AttributeType             = "type"
	AttributeVersion          = "version"
	ConfigKeyMdlSubPublishers = "mdlsub.publishers"
	TypeCreate                = "create"
	TypeUpdate                = "update"
	TypeDelete                = "delete"
)
View Source
const (
	ConfigKeyMdlSub            = "mdlsub"
	ConfigKeyMdlSubSubscribers = "mdlsub.subscribers"
)
View Source
const (
	MetricNameSuccess = "ModelEventConsumeSuccess"
	MetricNameSkipped = "ModelEventConsumeSkipped"
	MetricNameFailure = "ModelEventConsumeFailure"
)
View Source
const (
	OutputTypeDb = "db"
)
View Source
const (
	OutputTypeDdb = "ddb"
)
View Source
const (
	OutputTypeKvstore = "kvstore"
)

Variables

This section is empty.

Functions

func AddOutput added in v0.26.0

func AddOutput(name string, factory OutputFactory)

func CreateMessageAttributes

func CreateMessageAttributes(modelId mdl.ModelId, typ string, version int) map[string]string

func FixtureSetFactory added in v0.26.0

func FixtureSetFactory(transformerFactoryMap TransformerMapTypeVersionFactories) fixtures.FixtureSetsFactory

func GetSubscriberConfigKey

func GetSubscriberConfigKey(name string) string

func GetSubscriberFQN

func GetSubscriberFQN(name string, sourceModel SubscriberModel) string

func GetSubscriberOutputConfigKey

func GetSubscriberOutputConfigKey(name string) string

func IsDelayOpError

func IsDelayOpError(err error) bool

func IsUnknownModelError added in v0.54.6

func IsUnknownModelError(err error) bool

func IsUnknownModelVersionError added in v0.54.6

func IsUnknownModelVersionError(err error) bool

func NewGenericTransformer

func NewGenericTransformer[I any, M Model](transformer TypedTransformer[I, M]) func(context.Context, cfg.Config, log.Logger) (ModelTransformer, error)

NewGenericTransformer removes the types from a TypedTransformer and turns a transformer value into a TransformerFactory of that value.

func NewSubscriberCallbackFactory

func NewSubscriberCallbackFactory(
	core SubscriberCore,
	sourceModel SubscriberModel,
	persistGraceTime time.Duration,
) stream.UntypedConsumerCallbackFactory

func NewSubscriberFactory

func NewSubscriberFactory(transformerFactoryMap TransformerMapTypeVersionFactories) kernel.ModuleMultiFactory

func PublisherConfigPostProcessor

func PublisherConfigPostProcessor(config cfg.GosoConf) (bool, error)

func SubscriberConfigPostProcessor

func SubscriberConfigPostProcessor(config cfg.GosoConf) (bool, error)

func SubscriberFactory

func SubscriberFactory(
	ctx context.Context,
	config cfg.Config,
	logger log.Logger,
	transformerFactories TransformerMapTypeVersionFactories,
) (map[string]kernel.ModuleFactory, error)

Types

type DelayOpError

type DelayOpError struct {
	Err error
}

func NewDelayOpError

func NewDelayOpError(err error) DelayOpError

func (DelayOpError) As

func (e DelayOpError) As(target any) bool

func (DelayOpError) Error

func (e DelayOpError) Error() string

func (DelayOpError) Unwrap

func (e DelayOpError) Unwrap() error

type FetchData added in v0.26.0

type FetchData struct {
	Data json.RawMessage `json:"data"`
}

type FixtureSet added in v0.26.0

type FixtureSet struct {
	// contains filtered or unexported fields
}

func NewFixtureSet added in v0.26.0

func NewFixtureSet(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings) *FixtureSet

func NewFixtureSetWithInterfaces added in v0.26.0

func NewFixtureSetWithInterfaces(logger log.Logger, source SubscriberModel, core SubscriberCore, settings *FixtureSettings, httpClient *resty.Client) *FixtureSet

func (FixtureSet) Write added in v0.26.0

func (f FixtureSet) Write(ctx context.Context) error

type FixtureSettings added in v0.26.0

type FixtureSettings struct {
	Dataset FixtureSettingsDataset `cfg:"dataset"`
	Host    string                 `cfg:"host"`
	Path    string                 `cfg:"path"`
}

type FixtureSettingsDataset added in v0.37.0

type FixtureSettingsDataset struct {
	Id int `cfg:"id"`
}

type Model

type Model interface {
	GetId() any
}

type ModelDb

type ModelDb struct {
	Id *uint `gorm:"primary_key;"`
}

func (ModelDb) GetId

func (m ModelDb) GetId() any

type ModelSpecification

type ModelSpecification struct {
	CrudType string
	Version  int
	ModelId  string
}

func (ModelSpecification) String added in v0.26.0

func (m ModelSpecification) String() string

type ModelTransformer

type ModelTransformer interface {
	GetModel() (any, error)
	// contains filtered or unexported methods
}

A ModelTransformer performs the actual transformation work, but should not be directly be implemented by a developer. Instead, implement a TypedTransformer and convert it to a ModelTransformer using NewGenericTransformer, EraseTransformerFactoryTypes, or EraseTransformerTypes.

func EraseTransformerTypes added in v0.41.0

func EraseTransformerTypes[I any, M Model](transformer TypedTransformer[I, M]) ModelTransformer

EraseTransformerTypes takes a TypedTransformer and turns it into an untyped ModelTransformer.

type ModelTransformers

type ModelTransformers map[string]VersionedModelTransformers

type Output

type Output interface {
	Persist(ctx context.Context, model Model, op string) error
}

type OutputDb

type OutputDb struct {
	// contains filtered or unexported fields
}

func NewOutputDb

func NewOutputDb(ctx context.Context, config cfg.Config, logger log.Logger) (*OutputDb, error)

func (*OutputDb) Persist

func (p *OutputDb) Persist(_ context.Context, model Model, op string) error

type OutputDdb

type OutputDdb struct {
	// contains filtered or unexported fields
}

func NewOutputDdb

func NewOutputDdb(ctx context.Context, config cfg.Config, logger log.Logger, model any, settings *SubscriberSettings) (*OutputDdb, error)

func (*OutputDdb) GetType

func (p *OutputDdb) GetType() string

func (*OutputDdb) Persist

func (p *OutputDdb) Persist(ctx context.Context, model Model, op string) error

type OutputFactory

type OutputFactory func(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings, transformers VersionedModelTransformers) (map[int]Output, error)

type OutputKvstore

type OutputKvstore struct {
	// contains filtered or unexported fields
}

func NewOutputKvstore

func NewOutputKvstore(ctx context.Context, config cfg.Config, logger log.Logger, settings *SubscriberSettings) (*OutputKvstore, error)

func (*OutputKvstore) Persist

func (p *OutputKvstore) Persist(ctx context.Context, model Model, op string) error

type Outputs

type Outputs map[string]map[int]Output

type Publisher

type Publisher interface {
	PublishBatch(ctx context.Context, typ string, version int, values []any, customAttributes ...map[string]string) error
	Publish(ctx context.Context, typ string, version int, value any, customAttributes ...map[string]string) error
}

func NewPublisher

func NewPublisher(ctx context.Context, config cfg.Config, logger log.Logger, name string, options ...stream.ProducerOption) (Publisher, error)

func NewPublisherWithInterfaces

func NewPublisherWithInterfaces(logger log.Logger, producer stream.Producer, settings *PublisherSettings) Publisher

func NewPublisherWithSettings

func NewPublisherWithSettings(ctx context.Context, config cfg.Config, logger log.Logger, settings *PublisherSettings, options ...stream.ProducerOption) (Publisher, error)

type PublisherSettings

type PublisherSettings struct {
	mdl.ModelId
	Producer   string `cfg:"producer" validate:"required_without=OutputType"`
	OutputType string `cfg:"output_type" validate:"required_without=Producer"`
	Shared     bool   `cfg:"shared"`
}

type Settings

type Settings struct {
	Subscribers map[string]*SubscriberSettings `cfg:"subscribers"`
}

type SubscriberCallback

type SubscriberCallback struct {
	// contains filtered or unexported fields
}

func NewSubscriberCallbackWithInterfaces added in v0.54.6

func NewSubscriberCallbackWithInterfaces(
	logger log.Logger,
	core SubscriberCore,
	sourceModel SubscriberModel,
) *SubscriberCallback

NewSubscriberCallbackWithInterfaces creates a SubscriberCallback for testing purposes

func (*SubscriberCallback) Consume

func (s *SubscriberCallback) Consume(ctx context.Context, input any, attributes map[string]string) (ack bool, err error)

func (*SubscriberCallback) GetModel

func (s *SubscriberCallback) GetModel(attributes map[string]string) (any, error)

func (*SubscriberCallback) GetSchemaSettings added in v0.51.0

func (s *SubscriberCallback) GetSchemaSettings() (*stream.SchemaSettings, error)

type SubscriberCore added in v0.26.0

type SubscriberCore interface {
	GetModelIds() []string
	GetLatestModelIdVersion(modelId mdl.ModelId) (int, error)
	GetTransformer(spec *ModelSpecification) (ModelTransformer, error)
	GetTransformersForModel(modelId mdl.ModelId) (VersionedModelTransformers, error)
	GetOutput(spec *ModelSpecification) (Output, error)
	Persist(ctx context.Context, spec *ModelSpecification, model Model) error
	Transform(ctx context.Context, spec *ModelSpecification, input any) (Model, error)
}

func NewSubscriberCore added in v0.26.0

func NewSubscriberCore(ctx context.Context, config cfg.Config, logger log.Logger, subscriberSettings map[string]*SubscriberSettings, transformerFactories TransformerMapTypeVersionFactories) (SubscriberCore, error)

func NewSubscriberCoreWithInterfaces added in v0.26.0

func NewSubscriberCoreWithInterfaces(transformers ModelTransformers, outputs Outputs) SubscriberCore

type SubscriberInputConfigPostProcessor

type SubscriberInputConfigPostProcessor func(config cfg.GosoConf, name string, subscriberSettings *SubscriberSettings) cfg.Option

type SubscriberModel

type SubscriberModel struct {
	mdl.ModelId
	Shared bool `cfg:"shared"`
}

func UnmarshalSubscriberSourceModel

func UnmarshalSubscriberSourceModel(config cfg.Config, name string) (SubscriberModel, error)

type SubscriberOutputConfigPostProcessor

type SubscriberOutputConfigPostProcessor func(config cfg.GosoConf, name string, subscriberSettings *SubscriberSettings) cfg.Option

type SubscriberSettings

type SubscriberSettings struct {
	Input            string          `cfg:"input" default:"sns"`
	Output           string          `cfg:"output"`
	PersistGraceTime time.Duration   `cfg:"persist_grace_time" default:"10s" validate:"min=0"`
	RunnerCount      int             `cfg:"runner_count" default:"10" validate:"min=1"`
	SourceModel      SubscriberModel `cfg:"source"`
	TargetModel      SubscriberModel `cfg:"target"`
}

type TransformerFactory

type TransformerFactory func(ctx context.Context, config cfg.Config, logger log.Logger) (ModelTransformer, error)

func EraseTransformerFactoryTypes added in v0.41.0

func EraseTransformerFactoryTypes[I any, M Model](transformerFactory TypedTransformerFactory[I, M]) TransformerFactory

EraseTransformerFactoryTypes takes a TypedTransformerFactory and turns it into an untyped transformer factory, allowing you to embed it into a list of transformers.

type TransformerMapTypeVersionFactories

type TransformerMapTypeVersionFactories map[string]TransformerMapVersionFactories

type TransformerMapVersionFactories

type TransformerMapVersionFactories map[int]TransformerFactory

type TypedTransformer added in v0.41.0

type TypedTransformer[I any, M Model] interface {
	// Transform converts the input into an output model. If Transform returns nil as the output, we don't persist the value.
	Transform(ctx context.Context, inp I) (out *M, err error)
}

A TypedTransformer implements a subscriber. For every item it has to transform it into the corresponding persisted model. If it returns nil, the item will not be persisted. However, if the item has been persisted before and nil is returned, the item will not be updated or deleted.

type TypedTransformerFactory added in v0.41.0

type TypedTransformerFactory[I any, M Model] func(ctx context.Context, config cfg.Config, logger log.Logger) (TypedTransformer[I, M], error)

type UnknownModelError added in v0.54.6

type UnknownModelError struct {
	ModelId string
}

UnknownModelError is returned when a model id is not found in the transformer list.

func NewUnknownModelError added in v0.54.6

func NewUnknownModelError(modelId string) UnknownModelError

func (UnknownModelError) As added in v0.54.6

func (e UnknownModelError) As(target any) bool

func (UnknownModelError) Error added in v0.54.6

func (e UnknownModelError) Error() string

func (UnknownModelError) IsIgnorableWithSettings added in v0.54.6

func (e UnknownModelError) IsIgnorableWithSettings(settings stream.IgnoreOnGetModelErrorSettings) bool

IsIgnorableWithSettings implements stream.IgnorableGetModelError.

type UnknownModelVersionError added in v0.54.6

type UnknownModelVersionError struct {
	ModelId string
	Version int
}

UnknownModelVersionError is returned when a version is not found for a model id in the transformer list.

func NewUnknownModelVersionError added in v0.54.6

func NewUnknownModelVersionError(modelId string, version int) UnknownModelVersionError

func (UnknownModelVersionError) As added in v0.54.6

func (e UnknownModelVersionError) As(target any) bool

func (UnknownModelVersionError) Error added in v0.54.6

func (e UnknownModelVersionError) Error() string

func (UnknownModelVersionError) IsIgnorableWithSettings added in v0.54.6

func (e UnknownModelVersionError) IsIgnorableWithSettings(settings stream.IgnoreOnGetModelErrorSettings) bool

IsIgnorableWithSettings implements stream.IgnorableGetModelError.

type VersionedModelTransformers

type VersionedModelTransformers map[int]ModelTransformer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL