storage

package
v0.0.0-...-089e9e6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2025 License: MIT Imports: 16 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AppendRecord

func AppendRecord(topic string, partition uint32, recordBytes []byte) error

AppendRecord appends a record to the active segment of the specified partition.

func CleanupSegments

func CleanupSegments() error

CleanupSegments takes care of segment retention based on time and size

func CreateTopic

func CreateTopic(name string, numPartitions uint32) error

CreateTopic creates a new topic with the given number of partitions and initializes them.

func EnsurePartition

func EnsurePartition(topicName string, partitionIndex uint32) error

EnsurePartition creates a partition if it doesn't exist

func FlushDataToDisk

func FlushDataToDisk()

FlushDataToDisk synchronously flushes all partition data to disk by syncing their log and index files.

func GetPartitionDir

func GetPartitionDir(topic string, partition uint32) string

GetPartitionDir returns the directory path for the log of a specific partition.

func GetRecordBatch

func GetRecordBatch(offset uint64, topic string, partition uint32) ([]byte, error)

GetRecordBatch retrieves the RecordBatch bytes corresponding to the specified offset in the partition.

func LoadSegments

func LoadSegments(partitionDir string) ([]*types.Segment, error)

LoadSegments loads segments with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file.

func LoadTopicsState

func LoadTopicsState() (types.TopicsState, error)

LoadTopicsState loads the state of all topics from disk.

func NewRecordBatch

func NewRecordBatch(recordKey []byte, recordValue []byte, attributes uint16) types.RecordBatch

NewRecordBatch creates a RecordBatch given the key and value bytes TODO: handle multi records batches and compression

func NewSegment

func NewSegment(p *types.Partition) (*types.Segment, error)

NewSegment create a new segment for the given partition

func ReadRecordBatch

func ReadRecordBatch(b []byte) types.RecordBatch

ReadRecordBatch turns bytes into a RecordBatch struct

func ReadRecords

func ReadRecords(rb types.RecordBatch) []types.Record

ReadRecords transforms RecordBatch Record bytes into a slice of Record struct

func Shutdown

func Shutdown()

Shutdown gracefully shuts down the log management system and closes all segment files.

func Startup

func Startup(Config *types.Configuration, shutdown chan bool)

Startup initializes log management system

func SyncPartition

func SyncPartition(partition *types.Partition) error

SyncPartition syncs the log and index files of the active segment for the given partition.

func WriteRecordBatch

func WriteRecordBatch(rb types.RecordBatch) []byte

WriteRecordBatch encodes a record batch into bytes

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL