builder

package
v3.4.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 11, 2025 License: AGPL-3.0 Imports: 46 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AppendInput

type AppendInput struct {
	// contains filtered or unexported fields
}

type Appender

type Appender struct {
	// contains filtered or unexported fields
}

func (*Appender) Append

func (w *Appender) Append(ctx context.Context, input AppendInput) ([]*chunk.Chunk, error)

func (*Appender) CutRemainingChunks

func (w *Appender) CutRemainingChunks(ctx context.Context) ([]*chunk.Chunk, error)

type BlockBuilder

type BlockBuilder struct {
	services.Service
	types.BuilderTransport
	// contains filtered or unexported fields
}

BlockBuilder is a slimmed-down version of the ingester, intended to ingest logs without WALs. Broadly, it accumulates logs into per-tenant chunks in the same way the existing ingester does, without a WAL. Index (TSDB) creation is also not an out-of-band procedure and must be called directly. In essence, this allows us to buffer data, flushing chunks to storage as necessary, and then when ready to commit this, relevant TSDBs (one per period) are created and flushed to storage. This allows an external caller to prepare a batch of data, build relevant chunks+indices, ensure they're flushed, and then return. As long as chunk+index creation is deterministic, this operation is also idempotent, making retries simple and impossible to introduce duplicate data. It contains the following methods:

  • `Append(context.Context, logproto.PushRequest) error` Adds a push request to ingested data. May flush existing chunks when they're full/etc.
  • `Commit(context.Context) error` Serializes (cuts) any buffered data into chunks, flushes them to storage, then creates + flushes TSDB indices containing all chunk references. Finally, clears internal state.

func NewBlockBuilder

func NewBlockBuilder(
	id string,
	cfg Config,
	kafkaCfg kafka.Config,
	periodicConfigs []config.PeriodConfig,
	store stores.ChunkWriter,
	objStore *MultiStore,
	logger log.Logger,
	registerer prometheus.Registerer,
) (*BlockBuilder,
	error)

type Config

type Config struct {
	ConcurrentFlushes int `yaml:"concurrent_flushes"`
	ConcurrentWriters int `yaml:"concurrent_writers"`

	BlockSize       flagext.ByteSize `yaml:"chunk_block_size"`
	TargetChunkSize flagext.ByteSize `yaml:"chunk_target_size"`
	ChunkEncoding   string           `yaml:"chunk_encoding"`

	MaxChunkAge time.Duration `yaml:"max_chunk_age"`

	Backoff           backoff.Config `yaml:"backoff_config"`
	WorkerParallelism int            `yaml:"worker_parallelism"`
	SyncInterval      time.Duration  `yaml:"sync_interval"`
	PollInterval      time.Duration  `yaml:"poll_interval"`

	SchedulerAddress string `yaml:"scheduler_address"`
	// SchedulerGRPCClientConfig configures the gRPC connection between the block-builder and its scheduler.
	SchedulerGRPCClientConfig grpcclient.Config `yaml:"scheduler_grpc_client_config"`
	// contains filtered or unexported fields
}

func (*Config) RegisterFlags

func (cfg *Config) RegisterFlags(flags *flag.FlagSet)

RegisterFlags registers flags.

func (*Config) RegisterFlagsWithPrefix

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)

func (*Config) Validate

func (cfg *Config) Validate() error
type Head struct {
	// contains filtered or unexported fields
}

Head manages series for a single tenant

func NewHead

func NewHead(userID string) *Head

func (*Head) Append

func (h *Head) Append(ls labels.Labels, fp uint64, chks index.ChunkMetas)

type MultiStore

type MultiStore struct {
	// contains filtered or unexported fields
}

func NewMultiStore

func NewMultiStore(
	periodicConfigs []config.PeriodConfig,
	storageConfig storage.Config,
	clientMetrics storage.ClientMetrics,
) (*MultiStore, error)

func (*MultiStore) DeleteObject

func (m *MultiStore) DeleteObject(ctx context.Context, objectKey string) error

func (*MultiStore) GetAttributes

func (m *MultiStore) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error)

func (*MultiStore) GetObject

func (m *MultiStore) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error)

func (*MultiStore) GetObjectRange

func (m *MultiStore) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error)

func (*MultiStore) GetStoreFor

func (m *MultiStore) GetStoreFor(ts model.Time) (client.ObjectClient, error)

func (*MultiStore) IsObjectNotFoundErr

func (m *MultiStore) IsObjectNotFoundErr(err error) bool

func (*MultiStore) IsRetryableErr

func (m *MultiStore) IsRetryableErr(err error) bool

func (*MultiStore) List

func (m *MultiStore) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error)

func (*MultiStore) ObjectExists

func (m *MultiStore) ObjectExists(ctx context.Context, objectKey string) (bool, error)

func (*MultiStore) PutObject

func (m *MultiStore) PutObject(ctx context.Context, objectKey string, object io.Reader) error

func (*MultiStore) Stop

func (m *MultiStore) Stop()

type TsdbCreator

type TsdbCreator struct {
	// contains filtered or unexported fields
}

TsdbCreator accepts writes and builds TSDBs.

func (*TsdbCreator) Append

func (m *TsdbCreator) Append(userID string, ls labels.Labels, fprint uint64, chks index.ChunkMetas) error

Append adds a new series for the given user

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL