Versions in this module Expand all Collapse all v1 v1.5.0 Jan 30, 2026 v1.4.2 Jan 19, 2026 Changes in this version + var ErrOffsetOutOfRange = errors.New("offset out of range") + func CountRecordBatchMessages(recordSet []byte) int + func PatchRecordBatchBaseOffset(batch *RecordBatch, baseOffset int64) + type AppendResult struct + BaseOffset int64 + LastOffset int64 + type ByteRange struct + End int64 + Start int64 + type IndexBuilder struct + func NewIndexBuilder(interval int32) *IndexBuilder + func (b *IndexBuilder) BuildBytes() ([]byte, error) + func (b *IndexBuilder) Entries() []*IndexEntry + func (b *IndexBuilder) MaybeAdd(offset int64, position int32, batchMessages int32) + type IndexEntry struct + Offset int64 + Position int32 + func ParseIndex(data []byte) ([]*IndexEntry, error) + type MemoryS3Client struct + func NewMemoryS3Client() *MemoryS3Client + func (m *MemoryS3Client) DownloadIndex(ctx context.Context, key string) ([]byte, error) + func (m *MemoryS3Client) DownloadSegment(ctx context.Context, key string, rng *ByteRange) ([]byte, error) + func (m *MemoryS3Client) EnsureBucket(ctx context.Context) error + func (m *MemoryS3Client) ListSegments(ctx context.Context, prefix string) ([]S3Object, error) + func (m *MemoryS3Client) UploadIndex(ctx context.Context, key string, body []byte) error + func (m *MemoryS3Client) UploadSegment(ctx context.Context, key string, body []byte) error + type PartitionLog struct + func NewPartitionLog(namespace string, topic string, partition int32, startOffset int64, ...) *PartitionLog + func (l *PartitionLog) AppendBatch(ctx context.Context, batch RecordBatch) (*AppendResult, error) + func (l *PartitionLog) EarliestOffset() int64 + func (l *PartitionLog) Flush(ctx context.Context) error + func (l *PartitionLog) Read(ctx context.Context, offset int64, maxBytes int32) ([]byte, error) + func (l *PartitionLog) RestoreFromS3(ctx context.Context) (int64, error) + type PartitionLogConfig struct + Buffer WriteBufferConfig + CacheEnabled bool + ReadAheadSegments int + Segment SegmentWriterConfig + type RecordBatch struct + BaseOffset int64 + Bytes []byte + LastOffsetDelta int32 + MessageCount int32 + func NewRecordBatchFromBytes(data []byte) (RecordBatch, error) + type S3Client interface + DownloadIndex func(ctx context.Context, key string) ([]byte, error) + DownloadSegment func(ctx context.Context, key string, rng *ByteRange) ([]byte, error) + EnsureBucket func(ctx context.Context) error + ListSegments func(ctx context.Context, prefix string) ([]S3Object, error) + UploadIndex func(ctx context.Context, key string, body []byte) error + UploadSegment func(ctx context.Context, key string, body []byte) error + func NewS3Client(ctx context.Context, cfg S3Config) (S3Client, error) + type S3Config struct + AccessKeyID string + Bucket string + Endpoint string + ForcePathStyle bool + KMSKeyARN string + Region string + SecretAccessKey string + SessionToken string + type S3Object struct + Key string + Size int64 + type SegmentArtifact struct + BaseOffset int64 + CreatedAt time.Time + IndexBytes []byte + LastOffset int64 + MessageCount int32 + RelativeIndex []*IndexEntry + SegmentBytes []byte + func BuildSegment(cfg SegmentWriterConfig, batches []RecordBatch, created time.Time) (*SegmentArtifact, error) + type SegmentWriterConfig struct + IndexIntervalMessages int32 + type WriteBuffer struct + func NewWriteBuffer(cfg WriteBufferConfig) *WriteBuffer + func (b *WriteBuffer) Append(batch RecordBatch) + func (b *WriteBuffer) Drain() []RecordBatch + func (b *WriteBuffer) ShouldFlush(now time.Time) bool + func (b *WriteBuffer) Size() int + type WriteBufferConfig struct + FlushInterval time.Duration + MaxBatches int + MaxBytes int + MaxMessages int