Documentation
¶
Overview ¶
Package fluent provides full Fluent logging stack in Go, including:
- `fluent.Handler` - serializes structured logs (implements `slog.Handler`)
- `fluent.Client` - manages connections and writing to the Fluent server
- `fluent.Encoder` - provides a common encoder/buffer, bridging the `Handler` and `Client`
The stack is optimized for efficiency, and the `Handler` and `Client` are coupled only via their shared use of the `Encoder`.
Examples of efficiency optimizations:
- shared encoders/buffers
- comprehensive use of resource pooling to minimize heap allocations
- log preludes are only encoded once per pool, and only copied into each `Encoder` in the pool, no matter how many times the Encoder is used
- shared log attributes (`WithAttrs`) are encoded only once, no matter how many times they are used by the Handler
- where map values have a length that can change, we overallocate a single byte for the msgpack map header, so that we perform neither look-ahead nor extra copying when the length changes (an example of this occurs when the key of a child group `Attr` is an empty string, causing its own `Attr`s to get serialized into the parent's scope)
Copyright (c) 2009 The Go Authors. All rights reserved.
- original source code: https://cs.opensource.google/go/go/+/refs/tags/go1.21.3:src/testing/slogtest/slogtest.go
- license: https://github.com/golang/go/blob/master/LICENSE
Modifications: - ignore zero time case:
- original: tested as "omit zero time"
- modified: tested as "ignore zero time [feel free to use a valid default]"
- argument: most log protocols won't allow nil timestamps
Index ¶
- Constants
- Variables
- func InternalLogger() *log.Logger
- func SetInternalLogger(l *log.Logger)
- func TestHandler(h slog.Handler, results func() []map[string]any) error
- type Client
- type ClientOptions
- type Encoder
- type EncoderOptions
- type EncoderPool
- type EventTime
- type Handler
- func (h *Handler) Enabled(_ context.Context, level slog.Level) bool
- func (h *Handler) Handle(ctx context.Context, r slog.Record) error
- func (h *Handler) Shutdown(ctx context.Context) error
- func (h *Handler) WithAttrs(attrs []slog.Attr) slog.Handler
- func (h *Handler) WithGroup(name string) slog.Handler
- type HandlerOptions
- type Sink
Constants ¶
const ( // MessageMode indicates the Fluent Message mode. MessageMode = iota // ForwardMode indicates the Fluent Forward mode. ForwardMode // PackedForwardMode indicates the Fluent PackedForward mode. PackedForwardMode // CompressedForwardMode indicates the Fluent CompressedPackedForward mode. CompressedPackedForwardMode )
const ( TimeExtType = 0 TimeLen = 8 )
Variables ¶
var ContextKey *ccKey = &ccKey{}
ContextKey is used to extract a log value from context.Context. The value must be be `slog.Attr`.
Example:
ctx := context.WithValue(ctx, fluentHandler.ContextKey,
slog.Group("req",
slog.String("method", r.Method),
slog.String("url", r.URL.String()),
)
)
These attrs are added to the top scope of the Fluent message record.
Functions ¶
func InternalLogger ¶
InternalLogger returns the Logger used to write out internal logs, where logs get written when something goes wrong in the logging stack itself.
func SetInternalLogger ¶
SetInternalLogger makes l the internal logger. After this call, output from the log package's default Logger (as with log.Print, etc.) will be logged at LevelInfo using l's Handler.
func TestHandler ¶
TestHandler tests a slog.Handler. If TestHandler finds any misbehaviors, it returns an error for each, combined into a single error with errors.Join.
TestHandler installs the given Handler in a slog.Logger and makes several calls to the Logger's output methods.
The results function is invoked after all such calls. It should return a slice of map[string]any, one for each call to a Logger output method. The keys and values of the map should correspond to the keys and values of the Handler's output. Each group in the output should be represented as its own nested map[string]any. The standard keys slog.TimeKey, slog.LevelKey and slog.MessageKey should be used.
If the Handler outputs JSON, then calling encoding/json.Unmarshal with a `map[string]any` will create the right data structure.
If a Handler intentionally drops an attribute that is checked by a test, then the results function should check for its absence and add it to the map it returns.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client represents a Fluent client. If using multiple concurrent client workers, then the Client could considered a Fluent client pool, as each worker maintains an independent connection to the server.
func NewClient ¶
func NewClient(host string, opts *ClientOptions) (*Client, error)
New creates a new Fluent client and connects to the Fluent server immediately, returning an error if it is unable to establish the connection.
func NewClientContext ¶
NewClientContext creates a new Fluent client and connects to the Fluent server immediately, returning an error if it is unable to establish the initial connections. The Context is passed to `Connect()` can be used to cancel the `Connect` operation, or set a global deadline for connecting.
func (*Client) Send ¶
Send places the log payload Encoder into the write queue.
This operation is sync/blocking operation when:
- the queueDepth is 0, or
- the queue is full and dropIfQueueIsFull is false
This operation is async/non-blocking when:
- queueDepth > 0, and
- the queue is not full, or dropIfQueueIsFull is true
The payload should NOT include the Fluent `option` field, which the Client is responsible for adding if necessary.
func (*Client) Shutdown ¶
Shutdown is used to support graceful shutdown. It closes the write queue channel, so any further calls to Send* methods will panic. Shutdown blocks until the write queue is fully drained and all worker goroutines have stopped, or the context expires, whichever occurs first.
type ClientOptions ¶
type ClientOptions struct {
// Network protocol used to communicate with the server. Fluent protocol
// says "protocol [enum: tcp/udp/tls]". The default is "tcp".
// ref: https://docs.fluent.org/configuration/transport-section
Network string
// Port of the Fluent server. The default is 24224.
Port int
// DialTimeout sets the timeout for dialing the server. The default is 30s.
DialTimeout time.Duration
// MaxEagerDialTries limits the maximum number of times client workers will
// try to connect to establish an initial the server before the Client is
// returned from the constructor. This is not used if the `SkipEagerDial` is
// true, or for (re)connections that occur after the constructor returns. If
// the value is < 0, the constructor will not return until connections are
// successfully established. The default is 10.
MaxEagerDialTries int
// Concurrency controls the number of workers the Client will spin up. Each
// worker will independently pull messages from the write queue, and send
// messages to the server over its own connection. The default is 1.
Concurrency int
// QueueDepth sets the maximum number of write requests that can be buffered
// before writing to the server is blocked. If blocked and dropIfQueueIsFull
// is true, load shedding will occur, with later writes discarded until
// buffer space increases. The default depth is 0 (synchronous writes).
QueueDepth int
// WriteTimeout controls the timeout for each Write to the server. If
// WriteTimeout < 0, then no timeout will be set. The default is 10 seconds.
WriteTimeout time.Duration
// MaxWriteTries controls the number of times the net.Conn will try to send
// a message before inferring a broken pipe, tearing down the connection,
// and establishing a new one. This must be > 0. The default is 3.
MaxWriteTries int
// InsecureSkipVerify controls whether a client verifies the server's
// certificate chain and host name when using TLS.
InsecureSkipVerify bool
// SkipEagerDial enables returning clients that dial the server lazily.
SkipEagerDial bool
// DropIfQueueFull controls how write requests are handled when the
// writeQueue is full. The default is to block the log handler until the
// queue channel can receive the log message. With this option enabled,
// overflow requests will get dropped to the floor. This enables a tradeoff
// between log completeness and system performance predictability.
DropIfQueueFull bool
// Verbose controls whether debug logs are written to the internal logger.
Verbose bool
}
ClientOptions are used to customize the Fluent Client.
Invalid options are coerced ¶
NB: The struct pointer options approach is used to be consistent with the options used for the Handler, which uses the struct pointer approach to be consistent with the `HandlerOptions` used by log/slog.
func DefaultClientOptions ¶
func DefaultClientOptions() *ClientOptions
DefaultClientOptions returns *ClientOptions with all default values.
type Encoder ¶
Encoder provides a mspgack encoder and its underlying bytes.Buffer.
func NewEncoder ¶
NewEncoder returns a newly allocated Encoder.
func (*Encoder) EncodeEventTime ¶
EncodeTimestamp is a helper that by default encodes a time value as a custom msgpack type defined by Fluent (EventTime). If the Encoder is set to use coarse timestamps, then it encodes the time value as 64-bit integer representing Unix epoch.
func (*Encoder) Free ¶
func (e *Encoder) Free()
Free returns the encoder to the shared pool after eagerly resetting it.
func (*Encoder) Mode ¶
func (e *Encoder) Mode() eventMode
Mode returns the Fluent event mode of the Encoder.
func (*Encoder) RequestACK ¶
RequestACK controls whether explicit ACKS are requested from the server.
func (*Encoder) UseCoarseTimestamps ¶
UseCoarseTimestamps controls whether legacy (unix epoch) timestamps are used.
type EncoderOptions ¶
type EncoderOptions struct {
// Mode is the Fluent event mode (also referred to as the carrier mode in
// the protocol spec) that applies to all of the logs serialized using
// Encoders from one shared EncoderPool.
Mode eventMode
//NewBufferCap sets the capacity, in bytes, for newly created Encoder
//buffers. The minimum value is 64 bytes. The default is 1KiB (1<<10).
NewBufferCap int
// MaxBufferCap sets the maximum buffer capacity , in bytes, beyond which an
// Encoder will not be returned to the shared Encoder pool, to prevent rare,
// unusually large buffers from staying resident in memory. The minimum
// value is the `newBufferCap`. The default is 8KiB (1<<13).
MaxBufferCap int
// WithCoarseTimestamps controls whether the log time is serialized as Unix
// epoch, which is useful for pre-2016 legacy systems that do not support
// sub-second precision. The default is false, so timestamps are serialized
// as Fluent `EventTime` values.
UseCoarseTimestamps bool
// RequestACKs controls whether the logs include a ("chunk") request for
// the Fluent server to send back explicit ACKs.
RequestACKs bool
}
EncoderOptions are used to customize the Encoders and the Encoder pool.
NB: The struct pointer options approach is used to be consistent with the options used for the Handler, which uses the struct pointer approach to be consistent with the `HandlerOptions` used by log/slog.
func DefaultEncoderOptions ¶
func DefaultEncoderOptions() *EncoderOptions
DefaultEncoderOptions returns *DefaultEncoderOptions with all default values.
type EncoderPool ¶
type EncoderPool struct {
*EncoderOptions
// contains filtered or unexported fields
}
Pool defines a shared *Encoder pool, used to minimize heap allocations.
func NewEncoderPool ¶
func NewEncoderPool(tag string, opts *EncoderOptions) (*EncoderPool, error)
NewEncoderPool creates a shared *Encoder pool that returns Encoders with the log prelude, including the outer msgpack array and the tag, pre-encoded.
func (*EncoderPool) Get ¶
func (p *EncoderPool) Get() *Encoder
Get returns an Encoder with the prelude pre-rendered.
func (*EncoderPool) Put ¶
func (p *EncoderPool) Put(e *Encoder)
Put resets an Encoder and returns it to the shared pool.
type EventTime ¶
func (*EventTime) DecodeMsgpack ¶
DecodeMsgpack deserializes *Time values from the custom msgpack format defined in the Fluent protocol specification.
type Handler ¶
type Handler struct {
*HandlerOptions
// contains filtered or unexported fields
}
Handler is an adapter that serializes Go structured logs out to Fluent msgpack arrays, without first serializing them to intermediate data structures, such as map[string]any.
// Example of basic usage
h, err := fluent.NewHandler(fluentHost, fluentTag, nli)
if err != nil {
log.Fatalln(err)
}
logger := slog.New(h)
slog.SetDefault(logger)
slog.Info("unrecognized user", "user_id", user_id)
func NewHandler ¶
func NewHandler(host, tag string, opts *HandlerOptions) (*Handler, error)
NewHandler wraps uses an EncoderPool with default options, and a Client that uses default options except Concurrency is set to 2 and The QueueDepth is set to 16, for asynchronous sending.
For complete control over the `fluent.Client` and the encoding options used by the `fluent.Encoder`s, use the `NewHandlerCustom` constructor.
func NewHandlerCustom ¶
func NewHandlerCustom(client Sink, pool *EncoderPool, opts *HandlerOptions) *Handler
NewHandlerCustom allows creates a Handler that wrap a Client and an EncoderPool that are fully customizable by the caller.
func (*Handler) Enabled ¶
Enabled reports whether the handler handles records at the given level. The handler ignores records whose level is lower. It is called early, before any arguments are processed, to save effort if the log event should be discarded. If called from a Logger method, the first argument is the context passed to that method, or context.Background() if nil was passed or the method does not take a context. The context is passed so Enabled can use its values to make a decision.
func (*Handler) Handle ¶
Handle handles the Record. It will only be called when Enabled returns true. The Context argument is as for Enabled. It is present solely to provide Handlers access to the context's values. Canceling the context does not affect record processing.
Handle methods that produce output should observe the following rules:
- If r.Time is the zero time, ignore the time.
- If r.PC is zero, ignore it.
- Attr's values should be resolved.
- If an Attr's key and value are both the zero value, ignore the Attr.
- If a group's key is empty, inline the group's Attrs.
- If a group has no Attrs (even if it has a non-empty key), ignore it.
Additional rules are taken from test cases in `slogtest.go`. Handler passes all conformance tests in `testing/slogtest.go` except one. For "If r.Time is the zero time, ignore the time.", the test suite interprets "ignore" as "omit the time entirely." This is not acceptable in most logging protocols, including Fluent. Rather than interpretint "ignore" as "omit", Handler *ignores* zero values and uses time.Now() as a reasonable fallback. The modified test suite is included in `slogtest.go`.
func (*Handler) Shutdown ¶
Shutdown closes the writeQueue used to connect the logger to the Fluent client workers. You MUST NOT call any other logger methods after calling Shutdown. This method will block until writeQueue is fully drained.
func (*Handler) WithAttrs ¶
WithAttrs returns a new Handler whose attributes consist of both the receiver's attributes and the arguments. The Handler owns the slice: it may retain, modify or discard it.
func (*Handler) WithGroup ¶
WithGroup returns a new Handler with the given group appended to the receiver's existing groups, increasing the nesting level within the msgpack map serialization of the Fluent log record.
The new scope ends at the end of the log event. That is,
logger.WithGroup("s").LogAttrs(level, msg, slog.Int("a", 1), slog.Int("b", 2))
behaves like
logger.LogAttrs(level, msg, slog.Group("s", slog.Int("a", 1), slog.Int("b", 2)))
If the name is empty, WithGroup returns the receiver, which results in the nested attributes being inlined into the parent scope.
type HandlerOptions ¶
type HandlerOptions struct {
// Level reports the minimum record level that will be logged. The handler
// discards records with lower levels. If Level is nil, the handler assumes
// LevelInfo. The handler calls Level.Level for each record processed; to
// adjust the minimum level dynamically, use a LevelVar.
Level slog.Leveler
// WithTimeFormat configuration OptionFunc allows customization of the how time
// values inside log content will get serialized. This does not change the
// timestamp for the log itself, in the metadata, which is defined by the
// Fluent protocol specification. The default is time.RFC3339Nano.
TimeFormat string
// AddSource causes the handler to compute the source code position of the
// log statement and add a SourceKey attribute to the output.
AddSource bool
// Verbose controls whether debug logs are written to the internal logger.
Verbose bool
}
HandlerOptions are used to customize the Fluent slog.Handler.
NB: The struct pointer options approach is used to be consistent with the approach used in the standard library for `HandlerOptions`.
func DefaultHandlerOptions ¶
func DefaultHandlerOptions() *HandlerOptions
DefaultHandlerOptions returns *DefaultHandlerOptions with all default values.