Documentation
¶
Index ¶
- Constants
- func MergeDataStructure(fileNames []string, topic *TopicDataId, offset int64, ...) error
- func NewEmptyChunk(start int64) SegmentChunk
- func ReadFileStructure(topicId *TopicDataId, offset int64, config conf.DatalogConfig) ([]string, error)
- type Datalog
- type LocalWriteItem
- type ReadItem
- type ReadSegmentChunk
- type ReplicationDataItem
- type ReplicationReader
- type SegmentReader
- type SegmentWriter
Constants ¶
Variables ¶
This section is empty.
Functions ¶
func MergeDataStructure ¶
func MergeDataStructure(fileNames []string, topic *TopicDataId, offset int64, config conf.DatalogConfig) error
func NewEmptyChunk ¶
func NewEmptyChunk(start int64) SegmentChunk
func ReadFileStructure ¶
func ReadFileStructure(topicId *TopicDataId, offset int64, config conf.DatalogConfig) ([]string, error)
Reads segment file names that will contain the data starting from offset
Types ¶
type Datalog ¶
type Datalog interface {
Initializer
// Seeks the position and fills the buffer with chunks until maxSize or maxRecords is reached.
// Opens and close the file handle. It may issue several reads to reach to the position.
ReadFileFrom(
buf []byte,
maxSize int,
segmentId int64,
startOffset int64,
maxRecords int,
topic *TopicDataId,
) ([]byte, error)
// Blocks until there's an available buffer to be used to stream.
// After use, it should be released
StreamBuffer() []byte
// Releases the stream buffer
ReleaseStreamBuffer(buf []byte)
// Gets the max producer offset from local.
// Returns an error when not found.
ReadProducerOffset(topicId *TopicDataId) (int64, error)
// Gets a sorted list of offsets representing the name of the segment files, where the offset is less than maxOffset
SegmentFileList(topic *TopicDataId, maxOffset int64) ([]int64, error)
}
func NewDatalog ¶
func NewDatalog(config conf.DatalogConfig) Datalog
type LocalWriteItem ¶
type LocalWriteItem interface {
SegmentChunk
Replication() ReplicationInfo
SetResult(error)
}
type ReadItem ¶
type ReadItem interface {
Origin() string // An identifier of the source of the poll used to determine whether the reader should use the last stored offset and not auto commit
CommitOnly() bool // Determines whether it should only commit and not read as part of this request
SetResult(error, SegmentChunk)
}
Represents a queued message to read from a segment. When the read is completed, `SetResult()` is invoked.
type ReadSegmentChunk ¶
type ReadSegmentChunk struct {
Buffer []byte
Start int64 // The offset of the first message
Length uint32 // The amount of messages in the chunk
}
func (*ReadSegmentChunk) DataBlock ¶
func (s *ReadSegmentChunk) DataBlock() []byte
func (*ReadSegmentChunk) RecordLength ¶
func (s *ReadSegmentChunk) RecordLength() uint32
func (*ReadSegmentChunk) StartOffset ¶
func (s *ReadSegmentChunk) StartOffset() int64
type ReplicationDataItem ¶
type ReplicationReader ¶
type ReplicationReader interface {
MergeFileStructure() (bool, error) // Merge the index files content and file structures
// Reads at least a chunk from a replica and returns the amount of bytes written in the buffer
StreamFile(
segmentId int64,
topic *TopicDataId,
startOffset int64,
maxRecords int,
buf []byte) (int, error)
}
type SegmentReader ¶
type SegmentReader struct {
Items chan ReadItem
Topic TopicDataId
TopicRangeClusterSize int
SourceVersion GenId // The version in which this reader was created, a consumer might be on Gen=v3 but the current is v4. In this case, source would be v4 and topic.Version = v3
MaxProducedOffset *int64 // When set, it determines the last offset produced for this topicId for an old generation, inclusive
// contains filtered or unexported fields
}
func NewSegmentReader ¶
func NewSegmentReader( group string, isLeader bool, replicationReader ReplicationReader, topic TopicDataId, topicRangeClusterSize int, sourceVersion GenId, initialOffset int64, offsetState OffsetState, maxProducedOffset *int64, datalog Datalog, config conf.DatalogConfig, ) (*SegmentReader, error)
Returns a log file reader.
The segment reader instance is valid for a single generation, closed when the generation ends or the broker is no longer the leader.
It aggressively reads ahead and maintains local cache, so there should there should be a different reader instance per consumer group.
func (*SegmentReader) HasStoppedReceiving ¶
func (s *SegmentReader) HasStoppedReceiving() bool
Determines that the reader has stopped polling the channel, no further ReadItems will be processed.
It signals that either the current offset info changed in a way that generations don't match, the during an offset
func (*SegmentReader) StoredOffsetAsCompleted ¶
func (s *SegmentReader) StoredOffsetAsCompleted() bool
Returns true when the offset state has been set as completed (previous generations only)
type SegmentWriter ¶
type SegmentWriter struct {
Items chan SegmentChunk
Topic TopicDataId
// contains filtered or unexported fields
}
SegmentWriter contains the logic to write segments on disk and replicate them.
There should be an instance per topic+token+generation. When the generation changes for a token, the channel should be closed.
func NewSegmentWriter ¶
func NewSegmentWriter( topic TopicDataId, gossiper Replicator, config conf.DatalogConfig, segmentId *int64, ) (*SegmentWriter, error)