Documentation
¶
Index ¶
- type Applicative
- type Builder
- type Data
- type Fold
- type Fork
- type ForkRule
- type HealthInfo
- type InjectionCallback
- type Log
- type LogStore
- type Logger
- type Option
- type Packet
- type Pipe
- func (pipe *Pipe) Run(ctx context.Context, port string, gracePeriod time.Duration) error
- func (pipe *Pipe) StreamHTTP(id string, opts ...*Option) Builder
- func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder
- func (pipe *Pipe) Use(args ...interface{})
- type Retriever
- type Sender
- type Stream
- type Subscription
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Applicative ¶ added in v0.3.0
Applicative type for applying a change to a typed.Typed
type Builder ¶
type Builder interface {
Map(id string, a Applicative, options ...*Option) Builder
FoldLeft(id string, f Fold, options ...*Option) Builder
FoldRight(id string, f Fold, options ...*Option) Builder
Fork(id string, f Fork, options ...*Option) (Builder, Builder)
Link(id, target string, options ...*Option)
Transmit(id string, s Sender, options ...*Option)
}
Builder interface for plotting out the data flow of the system
type Fork ¶ added in v0.5.0
Fork func for splitting a payload into 2
var ( // ForkDuplicate is a SplitHandler that sends data to both outputs ForkDuplicate Fork = func(payload []*Packet) (a, b []*Packet) { payload2 := []*Packet{} buf := &bytes.Buffer{} enc, dec := gob.NewEncoder(buf), gob.NewDecoder(buf) _ = enc.Encode(payload) _ = dec.Decode(&payload2) for i, packet := range payload { payload2[i].span = packet.span } return payload, payload2 } // ForkError is a SplitHandler for splitting errors from successes ForkError Fork = func(payload []*Packet) (s, f []*Packet) { s = []*Packet{} f = []*Packet{} for _, packet := range payload { if packet.Error != nil { f = append(f, packet) } else { s = append(s, packet) } } return s, f } )
type ForkRule ¶ added in v0.5.0
ForkRule provides a SplitHandler for splitting based on the return bool
type HealthInfo ¶ added in v0.7.0
type HealthInfo struct {
StreamID string `json:"stream_id"`
LastPayload time.Time `json:"last_payload"`
// contains filtered or unexported fields
}
HealthInfo type for giving info on the stream
type InjectionCallback ¶ added in v0.7.0
type InjectionCallback func(logs ...*Log)
InjectionCallback func to run when the LogStore decides to restart the flow of orphaned Data
type Log ¶ added in v0.7.0
type Log struct {
OwnerID string `json:"owner_id"`
StreamID string `json:"stream_id"`
VertexID string `json:"vertex_id"`
VertexType string `json:"vertex_type"`
State string `json:"state"`
Packet *Packet `json:"packet"`
When time.Time `json:"when"`
}
Log type for holding the data that is recorded from the streams
type LogStore ¶ added in v0.7.0
type LogStore interface {
Join(id string, callback InjectionCallback, streamIDs ...string) error
Write(logs ...*Log)
Leave(id string) error
}
LogStore type for managing cluster state
type Logger ¶ added in v0.7.0
type Logger interface {
Error(...interface{})
Info(...interface{})
}
Logger type for accepting log messages
type Packet ¶
type Packet struct {
ID string `json:"id"`
Data Data `json:"data"`
Error error `json:"error"`
// contains filtered or unexported fields
}
Packet type that holds information traveling through the machine
type Pipe ¶ added in v0.7.0
type Pipe struct {
// contains filtered or unexported fields
}
Pipe type for holding the server information for running http servers
func (*Pipe) StreamHTTP ¶ added in v0.7.0
StreamHTTP func for creating a Stream at the path /stream/<id>
func (*Pipe) StreamSubscription ¶ added in v0.7.0
func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder
StreamSubscription func for creating a Stream at the that reads from a subscription
func (*Pipe) Use ¶ added in v0.7.0
func (pipe *Pipe) Use(args ...interface{})
Use Wraps fiber.App.Use
Use registers a middleware route that will match requests with the provided prefix (which is optional and defaults to "/").
app.Use(func(c *fiber.Ctx) error {
return c.Next()
})
app.Use("/api", func(c *fiber.Ctx) error {
return c.Next()
})
app.Use("/api", handler, func(c *fiber.Ctx) error {
return c.Next()
})
This method will match all HTTP verbs: GET, POST, PUT, HEAD etc...
