sinker

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2025 License: Apache-2.0 Imports: 22 Imported by: 11

Documentation

Index

Constants

View Source
const (
	UDContainerFallbackSink = "fb-udsink"
)

Variables

This section is empty.

Functions

func NewServer

func NewServer(h Sinker, inputOptions ...Option) numaflow.Server

NewServer creates a new sinkServer object.

Types

type Datum

type Datum interface {
	// Keys returns the keys of the message.
	Keys() []string
	// Value returns the payload of the message.
	Value() []byte
	// EventTime returns the event time of the message.
	EventTime() time.Time
	// Watermark returns the watermark of the message.
	Watermark() time.Time
	// ID returns the ID of the message.
	ID() string
	// Headers returns the headers of the message.
	Headers() map[string]string
	// UserMetadata returns the user metadata of the message.
	UserMetadata() *UserMetadata
	// SystemMetadata returns the system metadata of the message.
	SystemMetadata() *SystemMetadata
}

Datum is the interface of incoming message payload for sink function.

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the sinkServer max receive message size and the sinkServer max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(path string) Option

WithServerInfoFilePath sets the sinkServer info file path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the sinkServer with the given sock addr. This is mainly used for testing purpose.

type Response

type Response struct {
	// ID corresponds the ID in the message.
	ID string `json:"id"`
	// Successful or not. If it's false, "err" is expected to be present.
	Success bool `json:"success"`
	// Err represents the error message when "success" is false.
	Err string `json:"err,omitempty"`
	// Fallback is true if the message to be sent to the fallback sink.
	Fallback bool `json:"fallback,omitempty"`
	// Serve is true if the message to be sent to serving store.
	Serve bool `json:"serve,omitempty"`
	// ServeResponse is the response that will be sent to the serving store.
	ServeResponse []byte `json:"serve_reponse,omitempty"`
}

Response is the processing result of each message

func ResponseFailure

func ResponseFailure(id, errMsg string) Response

ResponseFailure creates a failed Response with the given id and error message. The Success field is set to false and the Err field is set to the provided error message.

func ResponseFallback added in v0.7.0

func ResponseFallback(id string) Response

ResponseFallback creates a Response with the Fallback field set to true. This indicates that the message should be sent to the fallback sink.

func ResponseOK

func ResponseOK(id string) Response

ResponseOK creates a successful Response with the given id. The Success field is set to true.

func ResponseServe added in v0.10.0

func ResponseServe(id string, result []byte) Response

ResponseServe creates a Response with the Serve field set to true. This indicates that the message should be sent to the serving store.

type Responses

type Responses []Response

func ResponsesBuilder

func ResponsesBuilder() Responses

ResponsesBuilder returns an empty instance of Responses

func (Responses) Append

func (r Responses) Append(rep Response) Responses

Append appends a response

func (Responses) Items

func (r Responses) Items() []Response

Items returns the response list

type Service

type Service struct {
	sinkpb.UnimplementedSinkServer

	Sinker Sinker
	// contains filtered or unexported fields
}

Service implements the proto gen server interface and contains the sinkfn operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) SinkFn

func (fs *Service) SinkFn(stream sinkpb.Sink_SinkFnServer) error

SinkFn applies a sink function to a every element.

type Sinker

type Sinker interface {
	// Sink is the function to process a list of incoming messages
	Sink(ctx context.Context, datumStreamCh <-chan Datum) Responses
}

Sinker is the interface of sink function implementation.

type SinkerFunc

type SinkerFunc func(ctx context.Context, datumStreamCh <-chan Datum) Responses

SinkerFunc is utility type used to convert a Sink function to a Sinker.

func (SinkerFunc) Sink

func (sf SinkerFunc) Sink(ctx context.Context, datumStreamCh <-chan Datum) Responses

Sink implements the function of sink function.

type SystemMetadata added in v0.11.0

type SystemMetadata struct {
	// contains filtered or unexported fields
}

SystemMetadata wraps system-generated metadata groups per message.

func NewSystemMetadata added in v0.11.0

func NewSystemMetadata(d map[string]map[string][]byte) *SystemMetadata

NewSystemMetadata wraps an existing map into SystemMetadata This is for internal and testing purposes only.

func (*SystemMetadata) Groups added in v0.11.0

func (md *SystemMetadata) Groups() []string

Groups returns the groups of the system metadata. If there are no groups, it returns an empty slice.

Usage example:

```go
systemMetadata := datum.SystemMetadata()
groups := systemMetadata.Groups()
```

func (*SystemMetadata) Keys added in v0.11.0

func (md *SystemMetadata) Keys(group string) []string

Keys returns the keys of the system metadata for the given group. If the group is not present, it returns an empty slice.

Usage example:

```go
systemMetadata := datum.SystemMetadata()
keys := systemMetadata.Keys("group-name")
```

func (*SystemMetadata) Value added in v0.11.0

func (md *SystemMetadata) Value(group, key string) []byte

Value returns the value of the system metadata for the given group and key. If the group or key is not present, it returns an empty slice.

Usage example:

```go
systemMetadata := datum.SystemMetadata()
value := systemMetadata.Value("group-name", "key")
```

type UserMetadata added in v0.11.0

type UserMetadata struct {
	// contains filtered or unexported fields
}

UserMetadata wraps user-defined metadata groups per message.

func NewUserMetadata added in v0.11.0

func NewUserMetadata(d map[string]map[string][]byte) *UserMetadata

NewUserMetadata wraps an existing map into UserMetadata This is for internal and testing purposes only.

func (*UserMetadata) Groups added in v0.11.0

func (md *UserMetadata) Groups() []string

Groups returns the groups of the user metadata. If there are no groups, it returns an empty slice.

Usage example:

```go
userMetadata := datum.UserMetadata()
groups := userMetadata.Groups()
```

func (*UserMetadata) Keys added in v0.11.0

func (md *UserMetadata) Keys(group string) []string

Keys returns the keys of the user metadata for the given group. If the group is not present, it returns an empty slice.

Usage example:

```go
userMetadata := datum.UserMetadata()
keys := userMetadata.Keys("group-name")
```

func (*UserMetadata) Value added in v0.11.0

func (md *UserMetadata) Value(group, key string) []byte

Value returns the value of the user metadata for the given group and key. If the group or key is not present, it returns an empty slice.

Usage example:

```go
userMetadata := datum.UserMetadata()
value := userMetadata.Value("group-name", "key")
```

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL