Documentation
¶
Index ¶
- func AppendRecord(topic string, partition uint32, recordBytes []byte) error
- func CleanupSegments() error
- func CreateTopic(name string, numPartitions uint32) error
- func EnsurePartition(topicName string, partitionIndex uint32) error
- func FlushDataToDisk()
- func GetPartitionDir(topic string, partition uint32) string
- func GetRecordBatch(offset uint64, topic string, partition uint32) ([]byte, error)
- func LoadSegments(partitionDir string) ([]*types.Segment, error)
- func LoadTopicsState() (types.TopicsState, error)
- func NewRecordBatch(recordKey []byte, recordValue []byte, attributes uint16) types.RecordBatch
- func NewSegment(p *types.Partition) (*types.Segment, error)
- func ReadRecordBatch(b []byte) types.RecordBatch
- func ReadRecords(rb types.RecordBatch) []types.Record
- func Shutdown()
- func Startup(Config *types.Configuration, shutdown chan bool)
- func SyncPartition(partition *types.Partition) error
- func WriteRecordBatch(rb types.RecordBatch) []byte
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func AppendRecord ¶
AppendRecord appends a record to the active segment of the specified partition.
func CleanupSegments ¶
func CleanupSegments() error
CleanupSegments takes care of segment retention based on time and size
func CreateTopic ¶
CreateTopic creates a new topic with the given number of partitions and initializes them.
func EnsurePartition ¶
EnsurePartition creates a partition if it doesn't exist
func FlushDataToDisk ¶
func FlushDataToDisk()
FlushDataToDisk synchronously flushes all partition data to disk by syncing their log and index files.
func GetPartitionDir ¶
GetPartitionDir returns the directory path for the log of a specific partition.
func GetRecordBatch ¶
GetRecordBatch retrieves the RecordBatch bytes corresponding to the specified offset in the partition.
func LoadSegments ¶
LoadSegments loads segments with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.
func LoadTopicsState ¶
func LoadTopicsState() (types.TopicsState, error)
LoadTopicsState loads the state of all topics from disk.
func NewRecordBatch ¶
func NewRecordBatch(recordKey []byte, recordValue []byte, attributes uint16) types.RecordBatch
NewRecordBatch creates a RecordBatch given the key and value bytes TODO: handle multi records batches and compression
func NewSegment ¶
NewSegment create a new segment for the given partition
func ReadRecordBatch ¶
func ReadRecordBatch(b []byte) types.RecordBatch
ReadRecordBatch turns bytes into a RecordBatch struct
func ReadRecords ¶
func ReadRecords(rb types.RecordBatch) []types.Record
ReadRecords transforms RecordBatch Record bytes into a slice of Record struct
func Shutdown ¶
func Shutdown()
Shutdown gracefully shuts down the log management system and closes all segment files.
func Startup ¶
func Startup(Config *types.Configuration, shutdown chan bool)
Startup initializes log management system
func SyncPartition ¶
SyncPartition syncs the log and index files of the active segment for the given partition.
func WriteRecordBatch ¶
func WriteRecordBatch(rb types.RecordBatch) []byte
WriteRecordBatch encodes a record batch into bytes
Types ¶
This section is empty.