Documentation
ΒΆ
Index ΒΆ
- type Applicative
- type Builder
- type Data
- type Fold
- type Fork
- type ForkRule
- type HealthInfo
- type InjectionCallback
- type Log
- type LogStore
- type Logger
- type Option
- type Packet
- type Pipe
- func (pipe *Pipe) Run(ctx context.Context, port string, gracePeriod time.Duration) error
- func (pipe *Pipe) Stream(stream Stream) Builder
- func (pipe *Pipe) StreamHTTP(id string, opts ...*Option) Builder
- func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder
- func (pipe *Pipe) Use(args ...interface{})
- type Retriever
- type Sender
- type Stream
- type Subscription
Constants ΒΆ
This section is empty.
Variables ΒΆ
This section is empty.
Functions ΒΆ
This section is empty.
Types ΒΆ
type Applicative ΒΆ added in v0.3.0
Applicative type for applying a change to a typed.Typed
type Builder ΒΆ
type Builder interface {
Map(id string, a Applicative, options ...*Option) Builder
FoldLeft(id string, f Fold, options ...*Option) Builder
FoldRight(id string, f Fold, options ...*Option) Builder
Fork(id string, f Fork, options ...*Option) (Builder, Builder)
Link(id, target string, options ...*Option)
Transmit(id string, s Sender, options ...*Option)
}
Builder interface for plotting out the data flow of the system
type Fork ΒΆ added in v0.5.0
Fork func for splitting a payload into 2
var ( // ForkDuplicate is a SplitHandler that sends data to both outputs ForkDuplicate Fork = func(payload []*Packet) (a, b []*Packet) { payload2 := []*Packet{} buf := &bytes.Buffer{} enc, dec := gob.NewEncoder(buf), gob.NewDecoder(buf) _ = enc.Encode(payload) _ = dec.Decode(&payload2) for i, packet := range payload { payload2[i].span = packet.span } return payload, payload2 } // ForkError is a SplitHandler for splitting errors from successes ForkError Fork = func(payload []*Packet) (s, f []*Packet) { s = []*Packet{} f = []*Packet{} for _, packet := range payload { if packet.Error != nil { f = append(f, packet) } else { s = append(s, packet) } } return s, f } )
type ForkRule ΒΆ added in v0.5.0
ForkRule provides a SplitHandler for splitting based on the return bool
type HealthInfo ΒΆ added in v0.7.0
type HealthInfo struct {
StreamID string `json:"stream_id"`
LastPayload time.Time `json:"last_payload"`
// contains filtered or unexported fields
}
HealthInfo type for giving info on the stream
type InjectionCallback ΒΆ added in v0.7.0
type InjectionCallback func(logs ...*Log)
InjectionCallback func to run when the LogStore decides to restart the flow of orphaned Data
type Log ΒΆ added in v0.7.0
type Log struct {
OwnerID string `json:"owner_id"`
StreamID string `json:"stream_id"`
VertexID string `json:"vertex_id"`
VertexType string `json:"vertex_type"`
State string `json:"state"`
Packet *Packet `json:"packet"`
When time.Time `json:"when"`
}
Log type for holding the data that is recorded from the streams
type LogStore ΒΆ added in v0.7.0
type LogStore interface {
Join(id string, callback InjectionCallback, streamIDs ...string) error
Write(logs ...*Log)
Leave(id string) error
}
LogStore type for managing cluster state
type Logger ΒΆ added in v0.7.0
type Logger interface {
Error(...interface{})
Info(...interface{})
}
Logger type for accepting log messages
type Packet ΒΆ
type Packet struct {
ID string `json:"id"`
Data Data `json:"data"`
Error error `json:"error"`
// contains filtered or unexported fields
}
Packet type that holds information traveling through the machine
type Pipe ΒΆ added in v0.7.0
type Pipe struct {
// contains filtered or unexported fields
}
Pipe type for holding the server information for running http servers
func (*Pipe) StreamHTTP ΒΆ added in v0.7.0
StreamHTTP func for creating a Stream at the path /stream/<id>
func (*Pipe) StreamSubscription ΒΆ added in v0.7.0
func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder
StreamSubscription func for creating a Stream at the that reads from a subscription
func (*Pipe) Use ΒΆ added in v0.7.0
func (pipe *Pipe) Use(args ...interface{})
Use Wraps fiber.App.Use
Use registers a middleware route that will match requests with the provided prefix (which is optional and defaults to "/").
app.Use(func(c *fiber.Ctx) error {
return c.Next()
})
app.Use("/api", func(c *fiber.Ctx) error {
return c.Next()
})
app.Use("/api", handler, func(c *fiber.Ctx) error {
return c.Next()
})
This method will match all HTTP verbs: GET, POST, PUT, HEAD etc...
