Documentation
¶
Index ¶
- Variables
- func NewServer(m Mapper, inputOptions ...Option) numaflow.Server
- type Datum
- type Mapper
- type MapperFunc
- type Message
- func (m Message) Keys() []string
- func (m Message) Tags() []string
- func (m Message) UserMetadata() *UserMetadata
- func (m Message) Value() []byte
- func (m Message) WithKeys(keys []string) Message
- func (m Message) WithTags(tags []string) Message
- func (m Message) WithUserMetadata(userMetadata *UserMetadata) Message
- type Messages
- type Option
- type Service
- type SystemMetadata
- type UserMetadata
- func (md *UserMetadata) AddKV(group, key string, value []byte)
- func (md *UserMetadata) AddKVInt(group, key string, value int)
- func (md *UserMetadata) AddKVString(group, key, value string)
- func (md *UserMetadata) Groups() []string
- func (md *UserMetadata) Keys(group string) []string
- func (md *UserMetadata) RemoveGroup(group string)
- func (md *UserMetadata) RemoveKey(group, key string)
- func (md *UserMetadata) SetKVGroup(group string, kv map[string][]byte)
- func (md *UserMetadata) Value(group, key string) []byte
Constants ¶
This section is empty.
Variables ¶
var (
DROP = fmt.Sprintf("%U__DROP__", '\\') // U+005C__DROP__
)
Functions ¶
Types ¶
type Datum ¶
type Datum interface {
// Value returns the payload of the message.
Value() []byte
// EventTime returns the event time of the message.
EventTime() time.Time
// Watermark returns the watermark of the message.
Watermark() time.Time
// Headers returns the headers of the message.
Headers() map[string]string
// UserMetadata returns the user metadata of the message.
UserMetadata() *UserMetadata
// SystemMetadata returns the system metadata of the message.
SystemMetadata() *SystemMetadata
}
Datum contains methods to get the payload information.
func NewHandlerDatum ¶
func NewHandlerDatum(value []byte, eventTime time.Time, watermark time.Time, headers map[string]string, userMetadata *UserMetadata, systemMetadata *SystemMetadata) Datum
type Mapper ¶
type Mapper interface {
// Map is the function to process each coming message.
Map(ctx context.Context, keys []string, datum Datum) Messages
}
Mapper is the interface of map function implementation.
type MapperFunc ¶
MapperFunc is a utility type used to convert a map function to a Mapper.
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is used to wrap the data return by Map functions
func (Message) UserMetadata ¶ added in v0.11.0
func (m Message) UserMetadata() *UserMetadata
UserMetadata returns message user metadata
func (Message) WithTags ¶
WithTags is used to assign the tags to the message tags will be used for conditional forwarding
func (Message) WithUserMetadata ¶ added in v0.11.0
func (m Message) WithUserMetadata(userMetadata *UserMetadata) Message
WithUserMetadata is used to assign the user metadata to the message
type Messages ¶
type Messages []Message
func MessagesBuilder ¶
func MessagesBuilder() Messages
MessagesBuilder returns an empty instance of Messages
type Option ¶
type Option func(*options)
Option is the interface to apply options.
func WithMaxMessageSize ¶
WithMaxMessageSize sets the server max receive message size and the server max send message size to the given size.
func WithServerInfoFilePath ¶
WithServerInfoFilePath sets the server info file path to the given path.
func WithSockAddr ¶
WithSockAddr start the server with the given sock addr. This is mainly used for testing purposes.
type Service ¶
type Service struct {
mappb.UnimplementedMapServer
Mapper Mapper
// contains filtered or unexported fields
}
Service implements the proto gen server interface and contains the map operation handler.
type SystemMetadata ¶ added in v0.11.0
type SystemMetadata struct {
// contains filtered or unexported fields
}
SystemMetadata is mapping of group name to key-value pairs SystemMetadata wraps system-generated metadata groups per message. It is read-only to UDFs
func NewSystemMetadata ¶ added in v0.11.0
func NewSystemMetadata(d map[string]map[string][]byte) *SystemMetadata
NewSystemMetadata wraps an existing map into SystemMetadata This is for internal and testing purposes only.
func (*SystemMetadata) Groups ¶ added in v0.11.0
func (md *SystemMetadata) Groups() []string
Groups returns the groups of the system metadata. If there are no groups, it returns an empty slice.
Usage example:
```go systemMetadata := datum.SystemMetadata() groups := systemMetadata.Groups() ```
func (*SystemMetadata) Keys ¶ added in v0.11.0
func (md *SystemMetadata) Keys(group string) []string
Keys returns the keys of the system metadata for the given group. If the group is not present, it returns an empty slice.
Usage example:
```go
systemMetadata := datum.SystemMetadata()
keys := systemMetadata.Keys("group-name")
```
func (*SystemMetadata) Value ¶ added in v0.11.0
func (md *SystemMetadata) Value(group, key string) []byte
Value returns the value of the system metadata for the given group and key. If the group or key is not present, it returns an empty slice.
Usage example:
```go
systemMetadata := datum.SystemMetadata()
value := systemMetadata.Value("group-name", "key")
```
type UserMetadata ¶ added in v0.11.0
type UserMetadata struct {
// contains filtered or unexported fields
}
UserMetadata wraps user-defined metadata groups per message.
func NewUserMetadata ¶ added in v0.11.0
func NewUserMetadata(d map[string]map[string][]byte) *UserMetadata
NewUserMetadata wraps an existing map into UserMetadata. If d is nil, an empty map is created.
func (*UserMetadata) AddKV ¶ added in v0.11.0
func (md *UserMetadata) AddKV(group, key string, value []byte)
AddKV adds a key-value pair to the user metadata.
Usage example:
```go
userMetadata := NewUserMetadata()
userMetadata.AddKV("group-name", "key", []byte("value"))
```
func (*UserMetadata) AddKVInt ¶ added in v0.11.0
func (md *UserMetadata) AddKVInt(group, key string, value int)
AddKVInt adds a key-value pair with value of int type to the user metadata
Usage example:
```go
userMetadata := NewUserMetadata()
userMetadata.AddKVInt("group-name", "key", 123)
```
func (*UserMetadata) AddKVString ¶ added in v0.11.0
func (md *UserMetadata) AddKVString(group, key, value string)
AddKVString adds a key-value pair with value of string type to the user metadata.
Usage example:
```go
userMetadata := NewUserMetadata()
userMetadata.AddKVString("group-name", "key", "value")
```
func (*UserMetadata) Groups ¶ added in v0.11.0
func (md *UserMetadata) Groups() []string
Groups returns the groups of the user metadata. If there are no groups, it returns an empty slice.
Usage example:
```go userMetadata := datum.UserMetadata() groups := userMetadata.Groups() ```
func (*UserMetadata) Keys ¶ added in v0.11.0
func (md *UserMetadata) Keys(group string) []string
Keys returns the keys of the user metadata for the given group. If the group is not present, it returns an empty slice.
Usage example:
```go
userMetadata := datum.UserMetadata()
keys := userMetadata.Keys("group-name")
```
func (*UserMetadata) RemoveGroup ¶ added in v0.11.0
func (md *UserMetadata) RemoveGroup(group string)
RemoveGroup removes a group from the user metadata. If the group is not present, it's a no-op.
Usage example:
```go
md := datum.Metadata()
userMetadata := md.UserMetadata()
userMetadata.RemoveGroup("group-name")
```
func (*UserMetadata) RemoveKey ¶ added in v0.11.0
func (md *UserMetadata) RemoveKey(group, key string)
RemoveKey removes a key from a group in the user metadata. If the key or group is not present, it's a no-op.
Usage example:
```go
md := datum.Metadata()
userMetadata := md.UserMetadata()
userMetadata.RemoveKey("group-name", "key")
```
func (*UserMetadata) SetKVGroup ¶ added in v0.11.0
func (md *UserMetadata) SetKVGroup(group string, kv map[string][]byte)
SetKVGroup sets a group of key-value pairs under the provided group name.
Usage example:
```go
userMetadata := NewUserMetadata()
userMetadata.SetKVGroup("group-name", map[string][]byte{"key": []byte("value")})
```
func (*UserMetadata) Value ¶ added in v0.11.0
func (md *UserMetadata) Value(group, key string) []byte
Value returns the value of the user metadata for the given group and key. If the group or key is not present, it returns an empty slice.
Usage example:
```go
userMetadata := datum.UserMetadata()
value := userMetadata.Value("group-name", "key")
```