Documentation
¶
Index ¶
- func AutoAck[T any]() goflux.Middleware[T]
- func ForwardMessageID[T any]() goflux.Middleware[T]
- func InjectHeader[T any]() goflux.Middleware[T]
- func InjectMessageID[T any]() goflux.Middleware[T]
- func RetryAck[T any](policy RetryPolicy) goflux.Middleware[T]
- type RetryAction
- type RetryDecision
- type RetryPolicy
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AutoAck ¶
func AutoAck[T any]() goflux.Middleware[T]
AutoAck returns a Middleware that acknowledges messages automatically: nil handler error triggers Ack, non-nil error triggers Nak. Messages without an acker (fire-and-forget transports) are passed through as-is.
Example ¶
ExampleAutoAck demonstrates the AutoAck middleware. On handler success, Ack is called; on handler error, Nak is called.
package main
import (
"context"
"fmt"
"github.com/foomo/goflux"
"github.com/foomo/goflux/middleware"
)
// ExampleAutoAck demonstrates the AutoAck middleware. On handler success,
// Ack is called; on handler error, Nak is called.
func main() {
// Spy acker records which method was called.
acker := &spyAcker{}
msg := goflux.NewMessage("events", "payload").WithAcker(acker)
handler := middleware.AutoAck[string]()(func(_ context.Context, _ goflux.Message[string]) error {
return nil // success
})
_ = handler(context.Background(), msg)
fmt.Println("acked:", acker.acked)
fmt.Println("naked:", acker.naked)
}
// spyAcker records ack/nak calls for testing.
type spyAcker struct {
acked bool
naked bool
}
func (s *spyAcker) Ack() error { s.acked = true; return nil }
func (s *spyAcker) Nak() error { s.naked = true; return nil }
Output: acked: true naked: false
func ForwardMessageID ¶ added in v0.4.0
func ForwardMessageID[T any]() goflux.Middleware[T]
ForwardMessageID returns a goflux.Middleware that preserves the message ID from the incoming context through the handler chain. Use this with pipe's [pipe.WithMiddleware] to forward message IDs across pipe stages.
Unlike InjectMessageID (which reads the ID from message headers set by transports), ForwardMessageID reads from context — suitable for in-process handler chains where the ID is already in context.
Example ¶
package main
import (
"context"
"fmt"
"github.com/foomo/goflux"
"github.com/foomo/goflux/middleware"
)
func main() {
inner := func(ctx context.Context, msg goflux.Message[string]) error {
fmt.Println(goflux.MessageID(ctx))
return nil
}
handler := middleware.ForwardMessageID[string]()(inner)
ctx := goflux.WithMessageID(context.Background(), "msg-123")
_ = handler(ctx, goflux.NewMessage("test", "payload"))
}
Output: msg-123
func InjectHeader ¶
func InjectHeader[T any]() goflux.Middleware[T]
InjectHeader returns a goflux.Middleware that injects the message's goflux.Header into the context via goflux.WithHeader. Downstream code can read it with goflux.HeaderFromContext.
Push-based transports and the goflux.Processor do this automatically. This middleware is for handler chains that bypass built-in injection.
Example ¶
ExampleInjectHeader demonstrates injecting the full message header into the context so downstream code can access it via HeaderFromContext.
package main
import (
"context"
"fmt"
"github.com/foomo/goflux"
"github.com/foomo/goflux/middleware"
)
func main() {
header := goflux.Header{}
header.Set("X-Tenant", "acme")
header.Set("X-Region", "eu-west-1")
msg := goflux.NewMessageWithHeader("events", "payload", header)
handler := middleware.InjectHeader[string]()(
func(ctx context.Context, msg goflux.Message[string]) error {
h := goflux.HeaderFromContext(ctx)
fmt.Println("tenant:", h.Get("X-Tenant"))
fmt.Println("region:", h.Get("X-Region"))
return nil
},
)
_ = handler(context.Background(), msg)
}
Output: tenant: acme region: eu-west-1
func InjectMessageID ¶
func InjectMessageID[T any]() goflux.Middleware[T]
InjectMessageID returns a goflux.Middleware that reads the message ID from the message's goflux.Header (using goflux.MessageIDHeader as the key) and injects it into the context via goflux.WithMessageID.
Push-based transports and the goflux.Processor do this automatically. This middleware is for handler chains that bypass built-in injection — for example, when composing handlers directly with goflux.Subscriber.Subscribe.
Example ¶
ExampleInjectMessageID demonstrates extracting a message ID from the message header and injecting it into the context. This is useful when composing handlers with Subscriber.Subscribe directly, where the transport may not inject the message ID automatically.
package main
import (
"context"
"fmt"
"github.com/foomo/goflux"
"github.com/foomo/goflux/middleware"
)
func main() {
header := goflux.Header{}
header.Set(goflux.MessageIDHeader, "order-123")
msg := goflux.NewMessageWithHeader("events", "payload", header)
handler := middleware.InjectMessageID[string]()(
func(ctx context.Context, msg goflux.Message[string]) error {
fmt.Println("id:", goflux.MessageID(ctx))
return nil
},
)
_ = handler(context.Background(), msg)
}
Output: id: order-123
func RetryAck ¶
func RetryAck[T any](policy RetryPolicy) goflux.Middleware[T]
RetryAck returns a Middleware that acknowledges messages based on handler outcome and a RetryPolicy:
- nil error: goflux.Message.Ack
- non-nil error + RetryTerm: goflux.Message.Term
- non-nil error + RetryNakWithDelay: goflux.Message.NakWithDelay
- non-nil error + RetryNak: goflux.Message.Nak
Messages without an acker (fire-and-forget transports) are passed through.
Types ¶
type RetryAction ¶
type RetryAction int
RetryAction describes how a failed message should be acknowledged.
const ( // RetryNak triggers immediate redelivery via [goflux.Message.Nak]. RetryNak RetryAction = iota // RetryNakWithDelay triggers delayed redelivery via [goflux.Message.NakWithDelay]. // The delay is specified in [RetryDecision.Delay]. RetryNakWithDelay // RetryTerm terminates the message via [goflux.Message.Term] — no further // redelivery will occur. RetryTerm )
type RetryDecision ¶
type RetryDecision struct {
Action RetryAction
Delay time.Duration
}
RetryDecision holds the action and optional delay for a failed message.
type RetryPolicy ¶
type RetryPolicy func(err error) RetryDecision
RetryPolicy inspects a handler error and decides how to acknowledge the failed message.
func NewRetryPolicy ¶
func NewRetryPolicy(delay time.Duration) RetryPolicy
NewRetryPolicy returns a RetryPolicy that terminates non-retryable errors (see goflux.ErrNonRetryable) and naks all other errors with the given delay.
Example ¶
ExampleNewRetryPolicy demonstrates creating and using a RetryPolicy that classifies errors into retry actions.
package main
import (
"errors"
"fmt"
"time"
"github.com/foomo/goflux"
"github.com/foomo/goflux/middleware"
)
func main() {
policy := middleware.NewRetryPolicy(2 * time.Second)
// Retryable error.
d := policy(errors.New("connection reset"))
fmt.Printf("action=%d delay=%v\n", d.Action, d.Delay)
// Non-retryable error.
d = policy(goflux.NonRetryable(errors.New("schema mismatch")))
fmt.Printf("action=%d\n", d.Action)
}
Output: action=1 delay=2s action=2