Documentation
¶
Overview ¶
Package topic is a client library for topic descriptions and partitioned writes.
Index ¶
- Constants
- Variables
- func EnumeratePartitions(name string, partitions int) func() []journal.Name
- func ModuloPartitionMapping(partitions func() []journal.Name, routingKey func(Message, []byte) []byte) func(Message) journal.Name
- func UnpackLine(r *bufio.Reader) ([]byte, error)
- type Description
- type Envelope
- type Fixupable
- type Framing
- type MemoryWriter
- type Message
- type Partition
- type Publisher
Constants ¶
const FixedFrameHeaderLength = 8
Variables ¶
var ( // Error returned by Unmarshal upon detection of an invalid frame. ErrDesyncDetected = errors.New("detected de-synchronization") )
var FixedFraming = new(fixedFraming)
FixedFraming is a Framing implementation which encodes messages in a binary format with a fixed-length header. Messages must support Size and MarshalTo functions for marshal support (eg, generated Protobuf messages satisfy this interface). Messages are encoded as a 4-byte magic word for de-synchronization detection, followed by a little-endian uint32 length, followed by payload bytes.
var JsonFraming = new(jsonFraming)
JsonFraming is a Framing implementation which encodes messages as line- delimited JSON. Messages must be encode-able by the encoding/json package.
Functions ¶
func EnumeratePartitions ¶
EnumeratePartitions returns a closure suitable for use as Description.Partitions, which defines |partitions| sorted partitions prefixed with |name| and having a "part-123" suffix.
func ModuloPartitionMapping ¶
func ModuloPartitionMapping(partitions func() []journal.Name, routingKey func(Message, []byte) []byte) func(Message) journal.Name
ModuloPartitionMapping returns a closure which maps a Message into a stable member of |partitions| using modulo arithmetic. It requires a |routingKey| function, which extracts and encodes a key from Message, returning the result of appending it to the argument []byte.
Types ¶
type Description ¶
type Description struct {
// Name of the Topic. Topics are typically named and arranged in a directory
// structure using forward-slash delimiters, and the topic Name is a prefix
// for all its Partition names. This is a convention only and is not enforced,
// and some use cases may motivate exceptions to the rule.
Name string
// Partitions returns Journal partitions of this Topic. The returned Journals
// may be held constant, or may change across invocations (eg, in response to
// a dynamic topic watch). Partitions is called frequently, and care should
// be taken to avoid excessive allocation. However, a returned slice may not
// be mutated (shallow equality must be sufficient to detect changes). The
// returned Journals may be empty.
Partitions func() []journal.Name `json:"-"`
// MappedPartition returns the journal.Name which Message maps to, under
// Topic routing. It is not required that the returned Journal be a member
// of Partitions: MappedPartition may be used to implicitly create new
// Journals as required.
MappedPartition func(Message) journal.Name `json:"-"`
// Builds or obtains a zero-valued instance of the topic message type.
GetMessage func() Message `json:"-"`
// If non-nil, returns a used instance of the message type. This is
// typically used for pooling of message instances.
PutMessage func(Message) `json:"-"`
// Serialization used for Topic messages.
Framing
}
Description details the required properties of a Topic implementation.
type Envelope ¶
type Envelope struct {
// Topic of the message.
Topic *Description
// Journal & offset of the message.
journal.Mark
// Message value.
Message
}
Envelope combines a Message with its topic and specific journal Mark.
type Fixupable ¶
type Fixupable interface {
Fixup() error
}
Fixupable is an optional Message type capable of being "fixed up" after decoding. This provides an opportunity to apply migrations or initialization after a code-generated decode implementation has completed.
type Framing ¶
type Framing interface {
// Encode appends the serialization of |msg| onto buffer |b|,
// returning the resulting buffer.
Encode(msg Message, b []byte) ([]byte, error)
// Unpack reads and returns a complete framed message from the Reader,
// including any applicable message header or suffix. It returns an error of
// the underlying Reader, or of a framing corruption. The returned []byte may
// be invalidated by a subsequent use of the Reader or Extract call.
Unpack(*bufio.Reader) ([]byte, error)
// Unmarshals Message from the supplied frame, previously produced by Extract.
// It returns a Message-level decoding error, which does not invalidate the
// framing or Reader (eg, further frames may be extracted).
Unmarshal([]byte, Message) error
}
Framing specifies the serialization used to encode Messages within a topic.
type MemoryWriter ¶
type MemoryWriter struct {
Messages []Envelope
// contains filtered or unexported fields
}
MemoryWriter is an implementation of journal.Writer which uses a provided Framing and message initializer to decode and capture messages as they are written. The intended use is within unit tests which publish and subsequently verify expected messages.
func NewMemoryWriter ¶
func NewMemoryWriter(framing Framing, new func() Message) *MemoryWriter
func (*MemoryWriter) ReadFrom ¶
func (w *MemoryWriter) ReadFrom(j journal.Name, r io.Reader) (*journal.AsyncAppend, error)
func (*MemoryWriter) Write ¶
func (w *MemoryWriter) Write(j journal.Name, b []byte) (*journal.AsyncAppend, error)
type Partition ¶
type Partition struct {
Topic *Description
Journal journal.Name
}
Partition pairs a Gazette Journal with the topic it implements.
type Publisher ¶
A Publisher publishes Messages to a Topic.
func NewPublisher ¶
func (Publisher) Publish ¶
func (p Publisher) Publish(msg Message, to *Description) (*journal.AsyncAppend, error)
Publish frames |msg|, routes it to the appropriate Topic partition, and writes the resulting encoding. If |msg| implements `Validate() error`, the message is Validated prior to framing, and any validation error returned.