fluent

package module
v0.0.0-...-ae7757c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 30, 2023 License: MIT Imports: 19 Imported by: 0

README

fluent

A full Fluent logging stack in Go, including

  • fluent.Handler - serializes structured logs (implements slog.Handler)
  • fluent.Client - manages connections and writing to the Fluent server
  • fluent.Encoder - provides a common encoder/buffer, bridging the Handler and Client
import (
    // ...
    "github.com/bitdabbler/fluent"
)

Purpose

Why write a yet another Fluent client and yet another structured log handler? In short, efficiency.

Let's focus on the Fluent Message event mode, with the msgpack form: [tag, time, record, option], where record is equivalent to the Go type map[string]any.

ref: https://github.com/fluent/fluent/wiki/Forward-Protocol-Specification-v1#message-modes

Consider an API such as:

// NOT this libary
client.Send(tag string, timestamp time.Time, record map[string]any)

This hypothetical API (similar to those found in existing libraries) demonstrates the inefficiencies we seek to avoid. First, the log attributes (key-value pairs) had to be collected into the map[string]any for the record field. Second, all of the fields for the Message have to be copied across one extra function boundary. Additionally, those fields are often then collected into an object that represents one Message.

For optimal efficiency, the fluent.Client sends directly from the buffers used by the fluent.Handler. This allows log handlers to serialize log values without first marshaling them into intermediate objects, avoiding redundant serialization steps and excess copying.

Efficiency optimizations:

  • the Handler and the Client directly use the same encoders/buffers
  • comprehensive resource pooling to minimize heap allocations
  • log preludes are encoded only once per pool, and are copied into each Encoder only once, no matter how many times the Encoder is used
  • shared log attributes (WithAttrs) are encoded only once, no matter how many times they are used by the Handler
  • where map values have a length that can change, we overallocate a single byte for the msgpack map header, so that we perform neither look-ahead nor extra copying when the length changes (an example of this occurs when the key of a child group Attr is an empty string, causing its own Attrs to get serialized into the parent's scope)

Basic Usage

h, err := fluent.NewHandler(fluentHost, fluentTag, nil)
if err != nil {
    log.Fatal(err)
}
l := slog.New(h)

// use locally
l.Info("the message", slog.Int("key", 42))

// or set it as the slog package-level logger
slog.SetDefault(l)

// and then use it globally
slog.Info("the message", slog.Int("key", 42))

Details

In the above example, we let the Handler deal with setting up the Client and the EncoderPool.

For the Client, it sets the level of concurrency to 2, and the send queue depth to 16, making writes to the server asynchronous. For the EncoderPool, it uses only the default EncoderOptions. However, you can use an alternative constructor to fully customize everything.

// customize the Client
c, err := fluent.NewClient(fluentHost, &fluent.ClientOptions{
    Port: fluentPort,
    DialTimeout: time.Seconds * 5,
    SkipEagerDial: true,
})
if err != nil {
  log.Fatal(err)
}

// customize the EncoderPool
p, err := fluent.NewEncoderPool(fluentTag, &fluent.EncoderOptions{
    UseCoarseTimestamps: true,
})
if err != nil {
  log.Fatal(err)
}

// customize the Handler
h := fluent.NewHandlerCustom(c, p, &fluent.HandlerOptions{
    AddSource: true,
    TimeFormat: time.RFC1123Z,
})

l := slog.New(h)
slog.SetDefault(l)
slog.Info("another message", slog.String("path", "/enlightenment"))
fluent.Handler
Constructors:
  • NewHandler(host, tag string, opts *HandlerOptions) (*Handler, error)
  • NewHandlerCustom(client Sink, pool *EncoderPool, opts *HandlerOptions) *Handler
Configuration options
Option Type Default
AddSource bool false
TimeFormat string time.RFC3339Nano
Level slog.Leveler slog.LevelInfo
Verbose bool false
Passing log values through context.Context

A Handler can extract a slog.Attr from a context.Context. You can use a slog.Group to add multiple values.

// use the fluent.ContextKey
ctx := context.WithValue(context.Background(), fluent.ContextKey, 
	slog.Group("req",
		slog.String("method", r.Method),
		slog.String("url", r.URL.String()),
	)
)

// log with context, resulting in a payload with the record field:
// {level:INFO,msg:success,req:{method:Get,url:www.example.com}}
slog.InfoContext(ctx, "success") 
Graceful shutdown

The Shutdown method calls Client.Shutdown(). That immediately closes the send queue channel, so the caller must guarantee that no more calls to the Handler methods will occur.

Shutdown blocks while the send queue is drained and all workers shutdown.

// we: 
//   - are in a higher level graceful shutdown function
//   - used `slog.SetDefault` to ensure it was used globally

// create a new Handler that only logs locally to stdout
l := slog.New(slog.NewJSONHandler(os.Stdout, nil))

// atomically switch over to that logger, so that no subsequent
// logging calls will use the `Handler` instance
slog.SetDefault(l)

// now it is safe to shutdown the Handler instance's Client
//
// this blocks until either
//   (a) the write queue is completely drained, or
//   (b) the timeout expired (no limit with context.Background())
h.Shutdown(timeoutCtx)
fluent.Client
Constructors:
  • NewClient(host string, opts *ClientOptions) (*Client, error)
  • NewClientContext(ctx context.Context, host string, opts *ClientOptions) (*Client, error)
Configuration options
Option Type Default
Port int 24224
Network string tcp
InsecureSkipVerify bool false
DialTimeout time.Duration 30 seconds
SkipEagerDial int false (connect eagerly in New)
MaxEagerDialTries int 10
Concurrency int 1
QueueDepth int 0 (writes are synchronous)
DropIfQueueFull bool false (blocks if queue is full)
WriteTimeout time.Duration 0 (no timeout)
MaxWriteTries int 3
Verbose bool false
Concurrency

Use the concurrency settings to enable the Client spin up mutliple workers internally. The workers maintain completely independent connections to the server, for thread safety with minimal locking. The default concurrency level is 1, ensuring that all logs are written out serially.

fluent.Encoder(Pool)
Constructors
  • NewEncoderPool(tag string, opts *EncoderOptions) (*EncoderPool, error)
  • NewEncoder(bufferCap int) *Encoder
Configuration Options
Functional option --- Default
Mode fluent.EventMode MessageMode
NewBufferCap int 1KiB
MaxBufferCap int 8KiB
UseCoarseTimestamps bool false
RequestACKs bool false

Design Decisions, Tradeoffs, and Current Limitations

Not Implemented (yet):

  • Handshake messages
  • [Compressed][Packed]Forward event modes (and related options)
  • explicit ACKing

The current structures and interfaces were designed with Forward event mode and explicit ACK support in mind, so the path to implement them should be smooth.

Explicit ACKs

In the Option, the protocol specification discusses the chunk option, stating:

"chunk: Clients MAY send the chunk option to confirm the server receives event records. The value is a string of Base64 representation of 128 bits unique_id which is an ID of a set of events." (emphasis added).

The chunk option is used for explicit ACKing. Whether that is intended to apply to Message event node is ambiguous. It refers to a "set of events", which relates to the other event modes, not the Message event mode, where each message includes only a single event. Additionally, prior to the Option section, the spec repeatedly and exclusively uses "chunk" to refer a binary chunk of a MessagePackEventStream.

On the other hand, the Message specification includes an optional 4th "option" value in the msgpack array, chunk/ACK support is the only option that appears applicable to Message event mode, and other libraries have included ACK support with this mode.

JSON serialization

JSON serialization is not implemented, as it is less efficient and offers no functional advantage. The log forwarder and the tools used to review logs are separate concerns. The serialized key-value pairs should appear the same regardless of how they are serialized and transported.

Attr Rewrite Hook

We do not currently provide an Attr rewriting hook analogous to the ReplaceAttr hook provided by the standard library's HandlerOptions, used by the built in TextHandler and JSONHandler. Omitting it a provides only a minimal efficiency gain, and results in the inability to rewrite Attr keys dynamically.

However, the main use case for Attr rewriting is to redact sensitive values or change the logged value, not the key. This functionality is better handled by wrapping the value in a type that implements LogValuer, as seen in the example https://pkg.go.dev/log/slog#example-LogValuer-Secret.

Protocol/Specification References

Documentation

Overview

Package fluent provides full Fluent logging stack in Go, including:

  • `fluent.Handler` - serializes structured logs (implements `slog.Handler`)
  • `fluent.Client` - manages connections and writing to the Fluent server
  • `fluent.Encoder` - provides a common encoder/buffer, bridging the `Handler` and `Client`

The stack is optimized for efficiency, and the `Handler` and `Client` are coupled only via their shared use of the `Encoder`.

Examples of efficiency optimizations:

  • shared encoders/buffers
  • comprehensive use of resource pooling to minimize heap allocations
  • log preludes are only encoded once per pool, and only copied into each `Encoder` in the pool, no matter how many times the Encoder is used
  • shared log attributes (`WithAttrs`) are encoded only once, no matter how many times they are used by the Handler
  • where map values have a length that can change, we overallocate a single byte for the msgpack map header, so that we perform neither look-ahead nor extra copying when the length changes (an example of this occurs when the key of a child group `Attr` is an empty string, causing its own `Attr`s to get serialized into the parent's scope)

Copyright (c) 2009 The Go Authors. All rights reserved.

Modifications: - ignore zero time case:

  • original: tested as "omit zero time"
  • modified: tested as "ignore zero time [feel free to use a valid default]"
  • argument: most log protocols won't allow nil timestamps

Index

Constants

View Source
const (
	// MessageMode indicates the Fluent Message mode.
	MessageMode = iota

	// ForwardMode indicates the Fluent Forward mode.
	ForwardMode

	// PackedForwardMode indicates the Fluent PackedForward mode.
	PackedForwardMode

	// CompressedForwardMode indicates the Fluent CompressedPackedForward mode.
	CompressedPackedForwardMode
)
View Source
const (
	TimeExtType = 0
	TimeLen     = 8
)

Variables

View Source
var ContextKey *ccKey = &ccKey{}

ContextKey is used to extract a log value from context.Context. The value must be be `slog.Attr`.

	Example:
 	ctx := context.WithValue(ctx, fluentHandler.ContextKey,
 		slog.Group("req",
 			slog.String("method", r.Method),
 			slog.String("url", r.URL.String()),
 		)
 	)

These attrs are added to the top scope of the Fluent message record.

Functions

func InternalLogger

func InternalLogger() *log.Logger

InternalLogger returns the Logger used to write out internal logs, where logs get written when something goes wrong in the logging stack itself.

func SetInternalLogger

func SetInternalLogger(l *log.Logger)

SetInternalLogger makes l the internal logger. After this call, output from the log package's default Logger (as with log.Print, etc.) will be logged at LevelInfo using l's Handler.

func TestHandler

func TestHandler(h slog.Handler, results func() []map[string]any) error

TestHandler tests a slog.Handler. If TestHandler finds any misbehaviors, it returns an error for each, combined into a single error with errors.Join.

TestHandler installs the given Handler in a slog.Logger and makes several calls to the Logger's output methods.

The results function is invoked after all such calls. It should return a slice of map[string]any, one for each call to a Logger output method. The keys and values of the map should correspond to the keys and values of the Handler's output. Each group in the output should be represented as its own nested map[string]any. The standard keys slog.TimeKey, slog.LevelKey and slog.MessageKey should be used.

If the Handler outputs JSON, then calling encoding/json.Unmarshal with a `map[string]any` will create the right data structure.

If a Handler intentionally drops an attribute that is checked by a test, then the results function should check for its absence and add it to the map it returns.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client represents a Fluent client. If using multiple concurrent client workers, then the Client could considered a Fluent client pool, as each worker maintains an independent connection to the server.

func NewClient

func NewClient(host string, opts *ClientOptions) (*Client, error)

New creates a new Fluent client and connects to the Fluent server immediately, returning an error if it is unable to establish the connection.

func NewClientContext

func NewClientContext(ctx context.Context, host string, opts *ClientOptions) (*Client, error)

NewClientContext creates a new Fluent client and connects to the Fluent server immediately, returning an error if it is unable to establish the initial connections. The Context is passed to `Connect()` can be used to cancel the `Connect` operation, or set a global deadline for connecting.

func (*Client) Send

func (c *Client) Send(enc *Encoder)

Send places the log payload Encoder into the write queue.

This operation is sync/blocking operation when:

  • the queueDepth is 0, or
  • the queue is full and dropIfQueueIsFull is false

This operation is async/non-blocking when:

  • queueDepth > 0, and
  • the queue is not full, or dropIfQueueIsFull is true

The payload should NOT include the Fluent `option` field, which the Client is responsible for adding if necessary.

func (*Client) Shutdown

func (c *Client) Shutdown(ctx context.Context) error

Shutdown is used to support graceful shutdown. It closes the write queue channel, so any further calls to Send* methods will panic. Shutdown blocks until the write queue is fully drained and all worker goroutines have stopped, or the context expires, whichever occurs first.

type ClientOptions

type ClientOptions struct {

	// Network protocol used to communicate with the server. Fluent protocol
	// says "protocol [enum: tcp/udp/tls]". The default is "tcp".
	//   ref: https://docs.fluent.org/configuration/transport-section
	Network string

	// Port of the Fluent server. The default is 24224.
	Port int

	// DialTimeout sets the timeout for dialing the server. The default is 30s.
	DialTimeout time.Duration

	// MaxEagerDialTries limits the maximum number of times client workers will
	// try to connect to establish an initial the server before the Client is
	// returned from the constructor. This is not used if the `SkipEagerDial` is
	// true, or for (re)connections that occur after the constructor returns. If
	// the value is < 0, the constructor will not return until connections are
	// successfully established. The default is 10.
	MaxEagerDialTries int

	// Concurrency controls the number of workers the Client will spin up. Each
	// worker will independently pull messages from the write queue, and send
	// messages to the server over its own connection. The default is 1.
	Concurrency int

	// QueueDepth sets the maximum number of write requests that can be buffered
	// before writing to the server is blocked. If blocked and dropIfQueueIsFull
	// is true, load shedding will occur, with later writes discarded until
	// buffer space increases. The default depth is 0 (synchronous writes).
	QueueDepth int

	// WriteTimeout controls the timeout for each Write to the server. If
	// WriteTimeout < 0, then no timeout will be set. The default is 10 seconds.
	WriteTimeout time.Duration

	// MaxWriteTries controls the number of times the net.Conn will try to send
	// a message before inferring a broken pipe, tearing down the connection,
	// and establishing a new one. This must be > 0. The default is 3.
	MaxWriteTries int

	// InsecureSkipVerify controls whether a client verifies the server's
	// certificate chain and host name when using TLS.
	InsecureSkipVerify bool

	// SkipEagerDial enables returning clients that dial the server lazily.
	SkipEagerDial bool

	// DropIfQueueFull controls how write requests are handled when the
	// writeQueue is full. The default is to block the log handler until the
	// queue channel can receive the log message. With this option enabled,
	// overflow requests will get dropped to the floor. This enables a tradeoff
	// between log completeness and system performance predictability.
	DropIfQueueFull bool

	// Verbose controls whether debug logs are written to the internal logger.
	Verbose bool
}

ClientOptions are used to customize the Fluent Client.

Invalid options are coerced

NB: The struct pointer options approach is used to be consistent with the options used for the Handler, which uses the struct pointer approach to be consistent with the `HandlerOptions` used by log/slog.

func DefaultClientOptions

func DefaultClientOptions() *ClientOptions

DefaultClientOptions returns *ClientOptions with all default values.

type Encoder

type Encoder struct {
	*bytes.Buffer
	*msgpack.Encoder
	// contains filtered or unexported fields
}

Encoder provides a mspgack encoder and its underlying bytes.Buffer.

func NewEncoder

func NewEncoder(bufferCap int) *Encoder

NewEncoder returns a newly allocated Encoder.

func (*Encoder) DeepCopy

func (e *Encoder) DeepCopy() *Encoder

DeepCopy returns a deep copy of the Encoder.

func (*Encoder) EncodeEventTime

func (e *Encoder) EncodeEventTime(utc time.Time) error

EncodeTimestamp is a helper that by default encodes a time value as a custom msgpack type defined by Fluent (EventTime). If the Encoder is set to use coarse timestamps, then it encodes the time value as 64-bit integer representing Unix epoch.

func (*Encoder) Free

func (e *Encoder) Free()

Free returns the encoder to the shared pool after eagerly resetting it.

func (*Encoder) Mode

func (e *Encoder) Mode() eventMode

Mode returns the Fluent event mode of the Encoder.

func (*Encoder) RequestACK

func (e *Encoder) RequestACK() bool

RequestACK controls whether explicit ACKS are requested from the server.

func (*Encoder) UseCoarseTimestamps

func (e *Encoder) UseCoarseTimestamps() bool

UseCoarseTimestamps controls whether legacy (unix epoch) timestamps are used.

type EncoderOptions

type EncoderOptions struct {
	// Mode is the Fluent event mode (also referred to as the carrier mode in
	// the protocol spec) that applies to all of the logs serialized using
	// Encoders from one shared EncoderPool.
	Mode eventMode

	//NewBufferCap sets the capacity, in bytes, for newly created Encoder
	//buffers. The minimum value is 64 bytes. The default is 1KiB (1<<10).
	NewBufferCap int

	// MaxBufferCap sets the maximum buffer capacity , in bytes, beyond which an
	// Encoder will not be returned to the shared Encoder pool, to prevent rare,
	// unusually large buffers from staying resident in memory. The minimum
	// value is the `newBufferCap`. The default is 8KiB (1<<13).
	MaxBufferCap int

	// WithCoarseTimestamps controls whether the log time is serialized as Unix
	// epoch, which is useful for pre-2016 legacy systems that do not support
	// sub-second precision. The default is false, so timestamps are serialized
	// as Fluent `EventTime` values.
	UseCoarseTimestamps bool

	// RequestACKs controls whether the logs include a ("chunk") request for
	// the Fluent server to send back explicit ACKs.
	RequestACKs bool
}

EncoderOptions are used to customize the Encoders and the Encoder pool.

NB: The struct pointer options approach is used to be consistent with the options used for the Handler, which uses the struct pointer approach to be consistent with the `HandlerOptions` used by log/slog.

func DefaultEncoderOptions

func DefaultEncoderOptions() *EncoderOptions

DefaultEncoderOptions returns *DefaultEncoderOptions with all default values.

type EncoderPool

type EncoderPool struct {
	*EncoderOptions
	// contains filtered or unexported fields
}

Pool defines a shared *Encoder pool, used to minimize heap allocations.

func NewEncoderPool

func NewEncoderPool(tag string, opts *EncoderOptions) (*EncoderPool, error)

NewEncoderPool creates a shared *Encoder pool that returns Encoders with the log prelude, including the outer msgpack array and the tag, pre-encoded.

func (*EncoderPool) Get

func (p *EncoderPool) Get() *Encoder

Get returns an Encoder with the prelude pre-rendered.

func (*EncoderPool) Put

func (p *EncoderPool) Put(e *Encoder)

Put resets an Encoder and returns it to the shared pool.

type EventTime

type EventTime time.Time

func (*EventTime) DecodeMsgpack

func (t *EventTime) DecodeMsgpack(dec *msgpack.Decoder) error

DecodeMsgpack deserializes *Time values from the custom msgpack format defined in the Fluent protocol specification.

func (*EventTime) EncodeMsgpack

func (t *EventTime) EncodeMsgpack(enc *msgpack.Encoder) error

EncodeMsgpack serializes *Time values to the custom msgpack format defined in the Fluent protocol specification.

type Handler

type Handler struct {
	*HandlerOptions
	// contains filtered or unexported fields
}

Handler is an adapter that serializes Go structured logs out to Fluent msgpack arrays, without first serializing them to intermediate data structures, such as map[string]any.

// Example of basic usage
h, err := fluent.NewHandler(fluentHost, fluentTag, nli)
if err != nil {
   log.Fatalln(err)
}

logger := slog.New(h)
slog.SetDefault(logger)

slog.Info("unrecognized user", "user_id", user_id)

func NewHandler

func NewHandler(host, tag string, opts *HandlerOptions) (*Handler, error)

NewHandler wraps uses an EncoderPool with default options, and a Client that uses default options except Concurrency is set to 2 and The QueueDepth is set to 16, for asynchronous sending.

For complete control over the `fluent.Client` and the encoding options used by the `fluent.Encoder`s, use the `NewHandlerCustom` constructor.

func NewHandlerCustom

func NewHandlerCustom(client Sink, pool *EncoderPool, opts *HandlerOptions) *Handler

NewHandlerCustom allows creates a Handler that wrap a Client and an EncoderPool that are fully customizable by the caller.

func (*Handler) Enabled

func (h *Handler) Enabled(_ context.Context, level slog.Level) bool

Enabled reports whether the handler handles records at the given level. The handler ignores records whose level is lower. It is called early, before any arguments are processed, to save effort if the log event should be discarded. If called from a Logger method, the first argument is the context passed to that method, or context.Background() if nil was passed or the method does not take a context. The context is passed so Enabled can use its values to make a decision.

func (*Handler) Handle

func (h *Handler) Handle(ctx context.Context, r slog.Record) error

Handle handles the Record. It will only be called when Enabled returns true. The Context argument is as for Enabled. It is present solely to provide Handlers access to the context's values. Canceling the context does not affect record processing.

Handle methods that produce output should observe the following rules:

  • If r.Time is the zero time, ignore the time.
  • If r.PC is zero, ignore it.
  • Attr's values should be resolved.
  • If an Attr's key and value are both the zero value, ignore the Attr.
  • If a group's key is empty, inline the group's Attrs.
  • If a group has no Attrs (even if it has a non-empty key), ignore it.

Additional rules are taken from test cases in `slogtest.go`. Handler passes all conformance tests in `testing/slogtest.go` except one. For "If r.Time is the zero time, ignore the time.", the test suite interprets "ignore" as "omit the time entirely." This is not acceptable in most logging protocols, including Fluent. Rather than interpretint "ignore" as "omit", Handler *ignores* zero values and uses time.Now() as a reasonable fallback. The modified test suite is included in `slogtest.go`.

func (*Handler) Shutdown

func (h *Handler) Shutdown(ctx context.Context) error

Shutdown closes the writeQueue used to connect the logger to the Fluent client workers. You MUST NOT call any other logger methods after calling Shutdown. This method will block until writeQueue is fully drained.

func (*Handler) WithAttrs

func (h *Handler) WithAttrs(attrs []slog.Attr) slog.Handler

WithAttrs returns a new Handler whose attributes consist of both the receiver's attributes and the arguments. The Handler owns the slice: it may retain, modify or discard it.

func (*Handler) WithGroup

func (h *Handler) WithGroup(name string) slog.Handler

WithGroup returns a new Handler with the given group appended to the receiver's existing groups, increasing the nesting level within the msgpack map serialization of the Fluent log record.

The new scope ends at the end of the log event. That is,

logger.WithGroup("s").LogAttrs(level, msg, slog.Int("a", 1), slog.Int("b", 2))

behaves like

logger.LogAttrs(level, msg, slog.Group("s", slog.Int("a", 1), slog.Int("b", 2)))

If the name is empty, WithGroup returns the receiver, which results in the nested attributes being inlined into the parent scope.

type HandlerOptions

type HandlerOptions struct {

	// Level reports the minimum record level that will be logged. The handler
	// discards records with lower levels. If Level is nil, the handler assumes
	// LevelInfo. The handler calls Level.Level for each record processed; to
	// adjust the minimum level dynamically, use a LevelVar.
	Level slog.Leveler

	// WithTimeFormat configuration OptionFunc allows customization of the how time
	// values inside log content will get serialized. This does not change the
	// timestamp for the log itself, in the metadata, which is defined by the
	// Fluent protocol specification. The default is time.RFC3339Nano.
	TimeFormat string

	// AddSource causes the handler to compute the source code position of the
	// log statement and add a SourceKey attribute to the output.
	AddSource bool

	// Verbose controls whether debug logs are written to the internal logger.
	Verbose bool
}

HandlerOptions are used to customize the Fluent slog.Handler.

NB: The struct pointer options approach is used to be consistent with the approach used in the standard library for `HandlerOptions`.

func DefaultHandlerOptions

func DefaultHandlerOptions() *HandlerOptions

DefaultHandlerOptions returns *DefaultHandlerOptions with all default values.

type Sink

type Sink interface {
	Send(*Encoder)
	Shutdown(context.Context) error
}

Sink interface defines the Client API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL