topic

package
v3.113.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 21, 2025 License: Apache-2.0 Imports: 7 Imported by: 2

README

topic - pure Go native client for YDB Topic

License Release PkgGoDev tests lint Go Report Card codecov Code lines View examples Telegram WebSite

See ydb-go-sdk for describe all driver features.

YDB is an open-source Distributed SQL Database that combines high availability and scalability with strict consistency and ACID transactions with CDC and Data stream features.

Installation

go get -u github.com/ydb-platform/ydb-go-sdk/v3

Example Usage

  • connect to YDB
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
    log.Fatal(err)
}
  • send messages
producerAndGroupID := "group-id"
writer, err := db.Topic().StartWriter(producerAndGroupID, "topicName",
    topicoptions.WithMessageGroupID(producerAndGroupID),
    topicoptions.WithCodec(topictypes.CodecGzip),
)
if err != nil {
    log.Fatal(err)
}

data1 := []byte{1, 2, 3}
data2 := []byte{4, 5, 6}
mess1 := topicwriter.Message{Data: bytes.NewReader(data1)}
mess2 := topicwriter.Message{Data: bytes.NewReader(data2)}

err = writer.Write(ctx, mess1, mess2)
if err != nil {
	log.Fatal(err)
}
  • read messages
for {
    msg, err := reader.ReadMessage(ctx)
    if err != nil {
        log.Fatal(err)
    }
    content, err := io.ReadAll(msg)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println(string(content))
    err = reader.Commit(msg.Context(), msg)
    if err != nil {
        log.Fatal(err)
    }
}
 

Documentation

Overview

Example (AlterTopic)
package main

import (
	"context"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	err = db.Topic().Alter(ctx, "topic-path",
		topicoptions.AlterWithAddConsumers(topictypes.Consumer{
			Name:            "new-consumer",
			SupportedCodecs: []topictypes.Codec{topictypes.CodecRaw, topictypes.CodecGzip}, // optional
		}),
	)
	if err != nil {
		log.Printf("failed alter topic: %v", err)

		return
	}
}
Example (CreateTopic)
package main

import (
	"context"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	err = db.Topic().Create(ctx, "topic-path",

		// optional
		topicoptions.CreateWithSupportedCodecs(topictypes.CodecRaw, topictypes.CodecGzip),

		// optional
		topicoptions.CreateWithMinActivePartitions(3),
	)
	if err != nil {
		log.Printf("failed create topic: %v", err)

		return
	}
}
Example (DescribeTopic)
package main

import (
	"context"
	"fmt"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	descResult, err := db.Topic().Describe(ctx, "topic-path")
	if err != nil {
		log.Printf("failed describe topic: %v", err)

		return
	}
	fmt.Printf("describe: %#v\n", descResult)
}
Example (DesrcibeTopicConsumer)
package main

import (
	"context"
	"fmt"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(
		ctx, connectionString,
	)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	descResult, err := db.Topic().DescribeTopicConsumer(ctx, "topic-path", "new-consumer")
	if err != nil {
		log.Printf("failed describe topic consumer: %v", err)

		return
	}
	fmt.Printf("describe consumer: %#v\n", descResult)
}
Example (DropTopic)
package main

import (
	"context"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	err = db.Topic().Drop(ctx, "topic-path")
	if err != nil {
		log.Printf("failed drop topic: %v", err)

		return
	}
}
Example (ReadMessage)
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	ydb "github.com/ydb-platform/ydb-go-sdk/v3"
	"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
)

func main() {
	ctx := context.TODO()
	connectionString := os.Getenv("YDB_CONNECTION_STRING")
	if connectionString == "" {
		connectionString = "grpc://localhost:2136/local"
	}
	db, err := ydb.Open(ctx, connectionString)
	if err != nil {
		log.Printf("failed connect: %v", err)

		return
	}
	defer db.Close(ctx) // cleanup resources

	reader, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("/topic/path"))
	if err != nil {
		fmt.Printf("failed start reader: %v", err)

		return
	}

	for {
		mess, err := reader.ReadMessage(ctx)
		if err != nil {
			fmt.Printf("failed start reader: %v", err)

			return
		}

		content, err := io.ReadAll(mess)
		if err != nil {
			fmt.Printf("failed start reader: %v", err)

			return
		}
		fmt.Println(string(content))
	}
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client interface {
	// Alter change topic options
	Alter(ctx context.Context, path string, opts ...topicoptions.AlterOption) error

	// Create topic
	Create(ctx context.Context, path string, opts ...topicoptions.CreateOption) error

	// Describe topic
	Describe(ctx context.Context, path string, opts ...topicoptions.DescribeOption) (topictypes.TopicDescription, error)

	// Describe topic consumer
	DescribeTopicConsumer(
		ctx context.Context, path string, consumer string, opts ...topicoptions.DescribeConsumerOption,
	) (topictypes.TopicConsumerDescription, error)

	// Drop topic
	Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error

	// StartListener starts read listen topic with the handler
	// it is fast non block call, connection starts in background
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	StartListener(
		consumer string,
		handler topiclistener.EventHandler,
		readSelectors topicoptions.ReadSelectors,
		opts ...topicoptions.ListenerOption,
	) (*topiclistener.TopicListener, error)

	// StartReader start read messages from topic
	// it is fast non block call, connection starts in background
	StartReader(
		consumer string,
		readSelectors topicoptions.ReadSelectors,
		opts ...topicoptions.ReaderOption,
	) (*topicreader.Reader, error)

	// StartWriter start write session to topic
	// it is fast non block call, connection starts in background
	StartWriter(topicPath string, opts ...topicoptions.WriterOption) (*topicwriter.Writer, error)

	// StartTransactionalWriter start writer for write messages within transaction
	//
	// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
	StartTransactionalWriter(
		tx tx.Identifier,
		topicpath string,
		opts ...topicoptions.WriterOption,
	) (*topicwriter.TxWriter, error)
}

Client is interface for topic client Attention: the interface may be extended in the future.

Directories

Path Synopsis
Package topicreader provide Reader to receive messages from YDB topics More examples in examples repository
Package topicreader provide Reader to receive messages from YDB topics More examples in examples repository

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL