eventrepo

package
v0.0.24 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Overview

Package eventrepo contains service code for getting and managing cloudevent objects.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AdvancedSearchOptionsToQueryMod

func AdvancedSearchOptionsToQueryMod(opts *grpc.AdvancedSearchOptions) []qm.QueryMod

Types

type ObjectGetter

type ObjectGetter interface {
	GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
	PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

ObjectGetter is an interface for getting an object from S3.

type ObjectInfo

type ObjectInfo struct {
	Key string
}

ObjectInfo is the information about the object in S3.

type S3ReaderAt added in v0.0.23

type S3ReaderAt struct {
	// contains filtered or unexported fields
}

S3ReaderAt implements io.ReaderAt by downloading the entire S3 object into memory on creation. This avoids multiple round-trips (HeadObject + range GETs) which is optimal for the expected file sizes (< 100 KB).

func NewS3ReaderAt added in v0.0.23

func NewS3ReaderAt(ctx context.Context, client ObjectGetter, bucket, key string) (*S3ReaderAt, error)

NewS3ReaderAt downloads the S3 object into memory and returns a ReaderAt. Rejects objects larger than maxObjectSize.

func (*S3ReaderAt) ReadAt added in v0.0.23

func (r *S3ReaderAt) ReadAt(p []byte, off int64) (int, error)

ReadAt implements io.ReaderAt by reading from the in-memory buffer.

func (*S3ReaderAt) Size added in v0.0.23

func (r *S3ReaderAt) Size() int64

Size returns the total size of the S3 object in bytes.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages and retrieves data messages from indexed objects in S3.

func New

func New(chConn clickhouse.Conn, objGetter ObjectGetter, parquetBucket string) *Service

New creates a new instance of Service.

func (*Service) GetCloudEventFromIndex

func (s *Service) GetCloudEventFromIndex(ctx context.Context, index *cloudevent.CloudEvent[ObjectInfo], bucketName string) (cloudevent.RawEvent, error)

GetCloudEventFromIndex fetches and returns the cloud event for the given index.

func (*Service) GetLatestCloudEvent

func (s *Service) GetLatestCloudEvent(ctx context.Context, bucketName string, opts *grpc.SearchOptions) (cloudevent.RawEvent, error)

GetLatestCloudEvent fetches and returns the latest cloud event that matches the given options.

func (*Service) GetLatestCloudEventAdvanced

func (s *Service) GetLatestCloudEventAdvanced(ctx context.Context, bucketName string, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.RawEvent, error)

GetLatestCloudEventAdvanced fetches and returns the latest cloud event that matches the given advanced options.

func (*Service) GetLatestIndex

func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)

GetLatestIndex returns the latest cloud event index that matches the given options.

func (*Service) GetLatestIndexAdvanced

func (s *Service) GetLatestIndexAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)

GetLatestIndexAdvanced returns the latest cloud event index that matches the given advanced options.

func (*Service) GetObjectFromKey

func (s *Service) GetObjectFromKey(ctx context.Context, key, bucketName string) ([]byte, error)

GetObjectFromKey fetches and returns the raw object for the given key. Routes based on index_key format:

  • If key contains "#": Parquet reference -- reads the event via SeekToRow and returns data.
  • Otherwise: legacy S3 path -- fetches the entire object as before.

func (*Service) ListCloudEvents

func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error)

ListCloudEvents fetches and returns the cloud events that match the given options.

func (*Service) ListCloudEventsAdvanced

func (s *Service) ListCloudEventsAdvanced(ctx context.Context, bucketName string, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.RawEvent, error)

ListCloudEventsAdvanced fetches and returns the cloud events that match the given advanced options.

func (*Service) ListCloudEventsFromIndexes

func (s *Service) ListCloudEventsFromIndexes(ctx context.Context, indexes []cloudevent.CloudEvent[ObjectInfo], bucketName string) ([]cloudevent.RawEvent, error)

ListCloudEventsFromIndexes fetches and returns the cloud events for the given index. Parquet refs sharing the same object key are grouped so they share one S3ReaderAt (avoiding duplicate downloads). Groups and JSON fetches run concurrently.

func (*Service) ListIndexes

func (s *Service) ListIndexes(ctx context.Context, limit int, opts *grpc.SearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)

ListIndexes fetches and returns a list of index for cloud events that match the given options.

func (*Service) ListIndexesAdvanced

func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOpts *grpc.AdvancedSearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)

ListIndexesAdvanced fetches and returns a list of index for cloud events that match the given advanced options.

func (*Service) ListObjectsFromKeys

func (s *Service) ListObjectsFromKeys(ctx context.Context, keys []string, bucketName string) ([][]byte, error)

ListObjectsFromKeys fetches and returns the objects for the given keys concurrently.

func (*Service) StoreObject

func (s *Service) StoreObject(ctx context.Context, bucketName string, cloudHeader *cloudevent.CloudEventHeader, data []byte) error

StoreObject stores the given data in S3 with the given cloudevent header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL