eventrepo

package
v0.1.14 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package eventrepo contains service code for getting and managing cloudevent objects.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AdvancedSearchOptionsToQueryMod

func AdvancedSearchOptionsToQueryMod(opts *grpc.AdvancedSearchOptions) []qm.QueryMod

Types

type CloudEventTypeSummary added in v0.1.3

type CloudEventTypeSummary struct {
	Type      string
	Count     uint64
	FirstSeen time.Time
	LastSeen  time.Time
}

CloudEventTypeSummary holds per-type aggregate metadata for a subject.

type ObjectGetter

type ObjectGetter interface {
	GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
	PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
}

ObjectGetter is an interface for getting an object from S3.

type ObjectInfo

type ObjectInfo struct {
	// Key is the index_key — a pointer into parquet (key#row) or a legacy JSON object.
	Key string
	// DataIndexKey, when non-empty, is the key in the blob bucket holding the
	// event's externalized payload. Set by the producer when the payload was
	// split out of the inline event.
	DataIndexKey string
}

ObjectInfo is the information about the object in S3.

type Presigner added in v0.1.4

type Presigner interface {
	PresignGetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.PresignOptions)) (*v4.PresignedHTTPRequest, error)
}

Presigner generates presigned S3 GET URLs.

type S3ReaderAt added in v0.0.23

type S3ReaderAt struct {
	// contains filtered or unexported fields
}

S3ReaderAt implements io.ReaderAt by downloading the entire S3 object into memory on creation. This avoids multiple round-trips (HeadObject + range GETs) which is optimal for the expected file sizes (< 100 KB).

func NewS3ReaderAt added in v0.0.23

func NewS3ReaderAt(ctx context.Context, client ObjectGetter, bucket, key string) (*S3ReaderAt, error)

NewS3ReaderAt downloads the S3 object into memory and returns a ReaderAt. Rejects objects larger than maxObjectSize.

func (*S3ReaderAt) ReadAt added in v0.0.23

func (r *S3ReaderAt) ReadAt(p []byte, off int64) (int, error)

ReadAt implements io.ReaderAt by reading from the in-memory buffer.

func (*S3ReaderAt) Size added in v0.0.23

func (r *S3ReaderAt) Size() int64

Size returns the total size of the S3 object in bytes.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages and retrieves data messages from indexed objects in S3.

func New

func New(chConn clickhouse.Conn, objGetter ObjectGetter, presigner Presigner, parquetBucket, blobBucket string) *Service

New creates a new instance of Service.

func (*Service) GetCloudEventFromIndex

func (s *Service) GetCloudEventFromIndex(ctx context.Context, index *cloudevent.CloudEvent[ObjectInfo], bucketName string) (cloudevent.RawEvent, error)

GetCloudEventFromIndex fetches and returns the cloud event for the given index. Events with a non-empty DataIndexKey have their payload externalized to the blob bucket; this method returns header-only since the data is meant to be served via presigned URL by the GraphQL layer (see PresignBlobURL).

func (*Service) GetCloudEventTypeSummaries added in v0.1.3

func (s *Service) GetCloudEventTypeSummaries(ctx context.Context, opts *grpc.SearchOptions) ([]CloudEventTypeSummary, error)

GetCloudEventTypeSummaries returns per-type counts and time ranges for the given search options. Tombstoned attestations are not suppressed; callers that want suppression should use GetCloudEventTypeSummariesAdvanced with includeDeleted=false.

func (*Service) GetCloudEventTypeSummariesAdvanced added in v0.1.9

func (s *Service) GetCloudEventTypeSummariesAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions, includeDeleted bool) ([]CloudEventTypeSummary, error)

GetCloudEventTypeSummariesAdvanced returns event type summaries filtered by advanced search options. When includeDeleted is false, rows tombstoned by a dimo.tombstone event with a matching (source, target_id) pair for the same subject are excluded from the counts.

func (*Service) GetLatestCloudEvent

func (s *Service) GetLatestCloudEvent(ctx context.Context, bucketName string, opts *grpc.SearchOptions) (cloudevent.RawEvent, error)

GetLatestCloudEvent fetches and returns the latest cloud event that matches the given options. Tombstoned attestations are not suppressed; callers that want suppression should use GetLatestCloudEventAdvanced with includeDeleted=false.

func (*Service) GetLatestCloudEventAdvanced

func (s *Service) GetLatestCloudEventAdvanced(ctx context.Context, bucketName string, advancedOpts *grpc.AdvancedSearchOptions, includeDeleted bool) (cloudevent.RawEvent, error)

GetLatestCloudEventAdvanced fetches and returns the latest cloud event that matches the given advanced options.

func (*Service) GetLatestIndex

func (s *Service) GetLatestIndex(ctx context.Context, opts *grpc.SearchOptions) (cloudevent.CloudEvent[ObjectInfo], error)

GetLatestIndex returns the latest cloud event index that matches the given options. Tombstoned attestations are not suppressed; callers that want suppression should use GetLatestIndexAdvanced with includeDeleted=false.

func (*Service) GetLatestIndexAdvanced

func (s *Service) GetLatestIndexAdvanced(ctx context.Context, advancedOpts *grpc.AdvancedSearchOptions, includeDeleted bool) (cloudevent.CloudEvent[ObjectInfo], error)

GetLatestIndexAdvanced returns the latest cloud event index that matches the given advanced options. When includeDeleted is false, rows whose (source, id) appears in a dimo.tombstone row's voids_id for the same subject are suppressed.

func (*Service) GetObjectFromKey

func (s *Service) GetObjectFromKey(ctx context.Context, key, bucketName string) ([]byte, error)

GetObjectFromKey fetches and returns the raw object for the given key. Routes based on index_key format:

  • If key contains "#": Parquet reference -- reads the event via SeekToRow and returns data.
  • Otherwise: legacy S3 path -- fetches the entire object as before.

func (*Service) ListCloudEvents

func (s *Service) ListCloudEvents(ctx context.Context, bucketName string, limit int, opts *grpc.SearchOptions) ([]cloudevent.RawEvent, error)

ListCloudEvents fetches and returns the cloud events that match the given options. Tombstoned attestations are not suppressed; callers that want suppression should use ListCloudEventsAdvanced with includeDeleted=false.

func (*Service) ListCloudEventsAdvanced

func (s *Service) ListCloudEventsAdvanced(ctx context.Context, bucketName string, limit int, advancedOpts *grpc.AdvancedSearchOptions, includeDeleted bool) ([]cloudevent.RawEvent, error)

ListCloudEventsAdvanced fetches and returns the cloud events that match the given advanced options.

func (*Service) ListCloudEventsFromIndexes

func (s *Service) ListCloudEventsFromIndexes(ctx context.Context, indexes []cloudevent.CloudEvent[ObjectInfo], bucketName string) ([]cloudevent.RawEvent, error)

ListCloudEventsFromIndexes fetches and returns the cloud events for the given index. Parquet refs sharing the same object key are grouped so they share one S3ReaderAt (avoiding duplicate downloads). Groups and JSON fetches run concurrently.

func (*Service) ListIndexes

func (s *Service) ListIndexes(ctx context.Context, limit int, opts *grpc.SearchOptions) ([]cloudevent.CloudEvent[ObjectInfo], error)

ListIndexes fetches and returns a list of index for cloud events that match the given options. Tombstoned attestations are not suppressed; callers that want suppression should use ListIndexesAdvanced with includeDeleted=false.

func (*Service) ListIndexesAdvanced

func (s *Service) ListIndexesAdvanced(ctx context.Context, limit int, advancedOpts *grpc.AdvancedSearchOptions, includeDeleted bool) ([]cloudevent.CloudEvent[ObjectInfo], error)

ListIndexesAdvanced fetches and returns a list of index for cloud events that match the given advanced options. When includeDeleted is false, rows tombstoned by a dimo.tombstone event with a matching (source, target_id) pair for the same subject are suppressed.

func (*Service) ListObjectsFromKeys

func (s *Service) ListObjectsFromKeys(ctx context.Context, keys []string, bucketName string) ([][]byte, error)

ListObjectsFromKeys fetches and returns the objects for the given keys concurrently.

func (*Service) PresignBlobURL added in v0.1.4

func (s *Service) PresignBlobURL(ctx context.Context, key string) (string, error)

PresignBlobURL returns a short-lived presigned GET URL for the given S3 key in the blob bucket.

func (*Service) StoreObject

func (s *Service) StoreObject(ctx context.Context, bucketName string, cloudHeader *cloudevent.CloudEventHeader, data []byte) error

StoreObject stores the given data in S3 with the given cloudevent header.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL