topicreader

package
v3.104.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2025 License: Apache-2.0 Imports: 8 Imported by: 6

Documentation

Overview

Package topicreader provide Reader to receive messages from YDB topics More examples in examples repository

https://github.com/ydb-platform/ydb-go-sdk/tree/master/examples/topic/topicreader

Index

Examples

Constants

This section is empty.

Variables

ErrCommitToExpiredSession it is not fatal error and reader can continue work client side must check error with errors.Is

View Source
var ErrConcurrencyCall = xerrors.Wrap(errors.New("ydb: concurrency call denied"))

ErrConcurrencyCall return if method on reader called in concurrency client side must check error with errors.Is

ErrUnexpectedCodec will return if topicreader receive message with unknown codec. client side must check error with errors.Is

Functions

This section is empty.

Types

type Batch

Batch is ordered group of messages from one partition

type CommitRangeGetter

type CommitRangeGetter = topicreadercommon.PublicCommitRangeGetter

CommitRangeGetter interface for get commit offsets

type Message

Message contains data and metadata, readed from the server

type MessageContentUnmarshaler

type MessageContentUnmarshaler = topicreadercommon.PublicMessageContentUnmarshaler

MessageContentUnmarshaler is interface for unmarshal message content to own struct

type ReadBatchOption

ReadBatchOption is type for options of read batch

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader allow to read message from YDB topics. ReadMessage or ReadMessageBatch can call concurrency with Commit, other concurrency call is denied.

In other words you can have one goroutine for read messages and one goroutine for commit messages.

Concurrency table | Method | ReadMessage | ReadMessageBatch | Commit | Close | | ReadMessage | - | - | + | - | | ReadMessageBatch | - | - | + | - | | Commit | + | + | - | - | | Close | - | - | - | - |

func NewReader

func NewReader(internalReader topicreaderinternal.Reader) *Reader

NewReader create new reader, used internally only.

func (*Reader) Close

func (r *Reader) Close(ctx context.Context) error

Close stop work with reader. return when reader complete internal works, flush commit buffer. You should close the Reader after use and before exit from a program for prevent lost last commits.

func (*Reader) Commit

func (r *Reader) Commit(ctx context.Context, obj CommitRangeGetter) error

Commit receive Message, Batch of single offset It can be fast (by default) or sync and waite response from server see topicoptions.CommitMode for details. Fast mode of commit (default) - store commit info to internal buffer only and send it to the server later. Close the reader for wait to send all commits to the server. For topicoptions.CommitModeSync mode sync the method can return ErrCommitToExpiredSession it means about the message/batch was not committed because connection broken or partition routed to other reader by server. Client code should continue work normally

Example
package main

import (
	"context"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	for {
		batch, _ := reader.ReadMessageBatch(ctx)
		processBatch(batch.Context(), batch)

		// Commit may be fast (by default) or sync, depends on reader settings
		_ = reader.Commit(batch.Context(), batch)
	}
}

func processBatch(ctx context.Context, batch *topicreader.Batch) {

	panic("example stub")
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}

func (*Reader) PopMessagesBatchTx added in v3.80.6

func (r *Reader) PopMessagesBatchTx(
	ctx context.Context,
	transaction tx.Identifier,
	opts ...ReadBatchOption,
) (
	resBatch *Batch,
	resErr error,
)

PopMessagesBatchTx read messages batch and commit them within tx. If tx failed - the batch will be received again.

Now it means reconnect to the server and re-read messages from the server to the readers buffer. It is expensive operation and will be good to minimize transaction failures.

The reconnect is implementation detail and may be changed in the future.

Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental

func (*Reader) ReadMessage

func (r *Reader) ReadMessage(ctx context.Context) (*Message, error)

ReadMessage read exactly one message exactly one of message, error is nil

Example
package main

import (
	"context"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	for {
		msg, _ := reader.ReadMessage(ctx)
		processMessage(msg.Context(), msg)
		_ = reader.Commit(msg.Context(), msg)
	}
}

func processMessage(ctx context.Context, m *topicreader.Message) {

	panic("example stub")
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}

func (*Reader) ReadMessageBatch deprecated

func (r *Reader) ReadMessageBatch(ctx context.Context, opts ...ReadBatchOption) (*Batch, error)

ReadMessageBatch

Deprecated: was experimental and not actual now. Use ReadMessagesBatch instead. Will be removed after Oct 2024. Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated

Example
package main

import (
	"context"

	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

func main() {
	ctx := context.TODO()
	reader := readerConnect()

	for {
		batch, _ := reader.ReadMessageBatch(ctx)
		processBatch(batch.Context(), batch)
		_ = reader.Commit(batch.Context(), batch)
	}
}

func processBatch(ctx context.Context, batch *topicreader.Batch) {

	panic("example stub")
}

func readerConnect() *topicreader.Reader {
	panic("example stub")
}

func (*Reader) ReadMessagesBatch added in v3.52.3

func (r *Reader) ReadMessagesBatch(ctx context.Context, opts ...ReadBatchOption) (*Batch, error)

ReadMessagesBatch read batch of messages Batch is ordered message group from one partition exactly one of Batch, err is nil if Batch is not nil - reader guarantee about all Batch.Messages are not nil

func (*Reader) WaitInit added in v3.52.3

func (r *Reader) WaitInit(ctx context.Context) error

WaitInit waits until the reader is initialized or an error occurs

type WithBatchMaxCount

type WithBatchMaxCount int

WithBatchMaxCount max messages within batch

func (WithBatchMaxCount) Apply

Apply implements ReadBatchOption interface

type WithBatchPreferMinCount deprecated

type WithBatchPreferMinCount int

WithBatchPreferMinCount set prefer min count for batch size. Sometime result batch can be less then count for example if internal buffer full and can't receive more messages or server stop send messages in partition

count must be 1 or greater it will panic if count < 1

Deprecated: was experimental and not actual now. The option will be removed for simplify code internals. Will be removed after Oct 2024. Read about versioning policy: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#deprecated

func (WithBatchPreferMinCount) Apply

Apply implements ReadBatchOption interface

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL