mapper

package
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 29, 2025 License: Apache-2.0 Imports: 22 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)

Functions

func NewServer

func NewServer(m Mapper, inputOptions ...Option) numaflow.Server

NewServer creates a new map server.

Types

type Datum

type Datum interface {
	// Value returns the payload of the message.
	Value() []byte
	// EventTime returns the event time of the message.
	EventTime() time.Time
	// Watermark returns the watermark of the message.
	Watermark() time.Time
	// Headers returns the headers of the message.
	Headers() map[string]string
	// UserMetadata returns the user metadata of the message.
	UserMetadata() *UserMetadata
	// SystemMetadata returns the system metadata of the message.
	SystemMetadata() *SystemMetadata
}

Datum contains methods to get the payload information.

func NewHandlerDatum

func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, userMetadata *UserMetadata, systemMetadata *SystemMetadata) Datum

type Mapper

type Mapper interface {
	// Map is the function to process each coming message.
	Map(ctx context.Context, keys []string, datum Datum) Messages
}

Mapper is the interface of map function implementation.

type MapperFunc

type MapperFunc func(ctx context.Context, keys []string, datum Datum) Messages

MapperFunc is a utility type used to convert a map function to a Mapper.

func (MapperFunc) Map

func (mf MapperFunc) Map(ctx context.Context, keys []string, datum Datum) Messages

Map implements the function of map function.

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is used to wrap the data return by Map functions

func MessageToDrop

func MessageToDrop() Message

MessageToDrop creates a Message to be dropped

func NewMessage

func NewMessage(value []byte) Message

NewMessage creates a Message with value

func (Message) Keys

func (m Message) Keys() []string

Keys returns message keys

func (Message) Tags

func (m Message) Tags() []string

Tags returns message tags

func (Message) UserMetadata added in v0.11.0

func (m Message) UserMetadata() *UserMetadata

UserMetadata returns message user metadata

func (Message) Value

func (m Message) Value() []byte

Value returns message value

func (Message) WithKeys

func (m Message) WithKeys(keys []string) Message

WithKeys is used to assign the keys to the message

func (Message) WithTags

func (m Message) WithTags(tags []string) Message

WithTags is used to assign the tags to the message tags will be used for conditional forwarding

func (Message) WithUserMetadata added in v0.11.0

func (m Message) WithUserMetadata(userMetadata *UserMetadata) Message

WithUserMetadata is used to assign the user metadata to the message

type Messages

type Messages []Message

func MessagesBuilder

func MessagesBuilder() Messages

MessagesBuilder returns an empty instance of Messages

func (Messages) Append

func (m Messages) Append(msg Message) Messages

Append appends a Message

func (Messages) Items

func (m Messages) Items() []Message

Items returns the message list

type Option

type Option func(*options)

Option is the interface to apply options.

func WithMaxMessageSize

func WithMaxMessageSize(size int) Option

WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.

func WithServerInfoFilePath

func WithServerInfoFilePath(f string) Option

WithServerInfoFilePath sets the server info file path to the given path.

func WithSockAddr

func WithSockAddr(addr string) Option

WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.

type Service

type Service struct {
	mappb.UnimplementedMapServer
	Mapper Mapper
	// contains filtered or unexported fields
}

Service implements the proto gen server interface and contains the map operation handler.

func (*Service) IsReady

IsReady returns true to indicate the gRPC connection is ready.

func (*Service) MapFn

func (fs *Service) MapFn(stream mappb.Map_MapFnServer) error

MapFn applies a user defined function to each request element and returns a list of results.

type SystemMetadata added in v0.11.0

type SystemMetadata struct {
	// contains filtered or unexported fields
}

SystemMetadata is mapping of group name to key-value pairs SystemMetadata wraps system-generated metadata groups per message. It is read-only to UDFs

func NewSystemMetadata added in v0.11.0

func NewSystemMetadata(d map[string]map[string][]byte) *SystemMetadata

NewSystemMetadata wraps an existing map into SystemMetadata This is for internal and testing purposes only.

func (*SystemMetadata) Groups added in v0.11.0

func (md *SystemMetadata) Groups() []string

Groups returns the groups of the system metadata. If there are no groups, it returns an empty slice.

Usage example:

```go
systemMetadata := datum.SystemMetadata()
groups := systemMetadata.Groups()
```

func (*SystemMetadata) Keys added in v0.11.0

func (md *SystemMetadata) Keys(group string) []string

Keys returns the keys of the system metadata for the given group. If the group is not present, it returns an empty slice.

Usage example:

```go
systemMetadata := datum.SystemMetadata()
keys := systemMetadata.Keys("group-name")
```

func (*SystemMetadata) Value added in v0.11.0

func (md *SystemMetadata) Value(group, key string) []byte

Value returns the value of the system metadata for the given group and key. If the group or key is not present, it returns an empty slice.

Usage example:

```go
systemMetadata := datum.SystemMetadata()
value := systemMetadata.Value("group-name", "key")
```

type UserMetadata added in v0.11.0

type UserMetadata struct {
	// contains filtered or unexported fields
}

UserMetadata wraps user-defined metadata groups per message.

func NewUserMetadata added in v0.11.0

func NewUserMetadata(d map[string]map[string][]byte) *UserMetadata

NewUserMetadata wraps an existing map into UserMetadata. If d is nil, an empty map is created.

func (*UserMetadata) AddKV added in v0.11.0

func (md *UserMetadata) AddKV(group, key string, value []byte)

AddKV adds a key-value pair to the user metadata.

Usage example:

```go
userMetadata := NewUserMetadata()
userMetadata.AddKV("group-name", "key", []byte("value"))
```

func (*UserMetadata) AddKVInt added in v0.11.0

func (md *UserMetadata) AddKVInt(group, key string, value int)

AddKVInt adds a key-value pair with value of int type to the user metadata

Usage example:

```go
userMetadata := NewUserMetadata()
userMetadata.AddKVInt("group-name", "key", 123)
```

func (*UserMetadata) AddKVString added in v0.11.0

func (md *UserMetadata) AddKVString(group, key, value string)

AddKVString adds a key-value pair with value of string type to the user metadata.

Usage example:

```go
userMetadata := NewUserMetadata()
userMetadata.AddKVString("group-name", "key", "value")
```

func (*UserMetadata) Groups added in v0.11.0

func (md *UserMetadata) Groups() []string

Groups returns the groups of the user metadata. If there are no groups, it returns an empty slice.

Usage example:

```go
userMetadata := datum.UserMetadata()
groups := userMetadata.Groups()
```

func (*UserMetadata) Keys added in v0.11.0

func (md *UserMetadata) Keys(group string) []string

Keys returns the keys of the user metadata for the given group. If the group is not present, it returns an empty slice.

Usage example:

```go
userMetadata := datum.UserMetadata()
keys := userMetadata.Keys("group-name")
```

func (*UserMetadata) RemoveGroup added in v0.11.0

func (md *UserMetadata) RemoveGroup(group string)

RemoveGroup removes a group from the user metadata. If the group is not present, it's a no-op.

Usage example:

```go
md := datum.Metadata()
userMetadata := md.UserMetadata()
userMetadata.RemoveGroup("group-name")
```

func (*UserMetadata) RemoveKey added in v0.11.0

func (md *UserMetadata) RemoveKey(group, key string)

RemoveKey removes a key from a group in the user metadata. If the key or group is not present, it's a no-op.

Usage example:

```go
md := datum.Metadata()
userMetadata := md.UserMetadata()
userMetadata.RemoveKey("group-name", "key")
```

func (*UserMetadata) SetKVGroup added in v0.11.0

func (md *UserMetadata) SetKVGroup(group string, kv map[string][]byte)

SetKVGroup sets a group of key-value pairs under the provided group name.

Usage example:

```go
userMetadata := NewUserMetadata()
userMetadata.SetKVGroup("group-name", map[string][]byte{"key": []byte("value")})
```

func (*UserMetadata) Value added in v0.11.0

func (md *UserMetadata) Value(group, key string) []byte

Value returns the value of the user metadata for the given group and key. If the group or key is not present, it returns an empty slice.

Usage example:

```go
userMetadata := datum.UserMetadata()
value := userMetadata.Value("group-name", "key")
```

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL