table

package
v3.53.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 12, 2023 License: Apache-2.0 Imports: 13 Imported by: 56

Documentation

Overview

Example (AlterTable)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
err = db.Table().Do(ctx,
	func(ctx context.Context, s table.Session) (err error) {
		return s.AlterTable(ctx, path.Join(db.Name(), "series"),
			options.WithAddColumn("series_id", types.Optional(types.TypeUint64)),
			options.WithAddColumn("title", types.Optional(types.TypeText)),
			options.WithSetTimeToLiveSettings(
				options.NewTTLSettings().ColumnDateType("expire_at").ExpireAfter(time.Hour),
			),
			options.WithDropTimeToLive(),
			options.WithAddIndex("idx_series_series_id",
				options.WithIndexColumns("series_id"),
				options.WithDataColumns("title"),
				options.WithIndexType(options.GlobalIndex()),
			),
			options.WithDropIndex("idx_series_title"),
			options.WithAlterAttribute("hello", "world"),
			options.WithAddAttribute("foo", "bar"),
			options.WithDropAttribute("baz"),
		)
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (BulkUpsert)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
type logMessage struct {
	App       string
	Host      string
	Timestamp time.Time
	HTTPCode  uint32
	Message   string
}
// prepare native go data
const batchSize = 10000
logs := make([]logMessage, 0, batchSize)
for i := 0; i < batchSize; i++ {
	logs = append(logs, logMessage{
		App:       fmt.Sprintf("App_%d", i/256),
		Host:      fmt.Sprintf("192.168.0.%d", i%256),
		Timestamp: time.Now().Add(time.Millisecond * time.Duration(i%1000)),
		HTTPCode:  200,
		Message:   "GET / HTTP/1.1",
	})
}
// execute bulk upsert with native ydb data
err = db.Table().Do( // Do retry operation on errors with best effort
	ctx, // context manage exiting from Do
	func(ctx context.Context, s table.Session) (err error) { // retry operation
		rows := make([]types.Value, 0, len(logs))
		for _, msg := range logs {
			rows = append(rows, types.StructValue(
				types.StructFieldValue("App", types.TextValue(msg.App)),
				types.StructFieldValue("Host", types.TextValue(msg.Host)),
				types.StructFieldValue("Timestamp", types.TimestampValueFromTime(msg.Timestamp)),
				types.StructFieldValue("HTTPCode", types.Uint32Value(msg.HTTPCode)),
				types.StructFieldValue("Message", types.TextValue(msg.Message)),
			))
		}
		return s.BulkUpsert(ctx, "/local/bulk_upsert_example", types.ListValue(rows...))
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (BulkUpsertWithCompression)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
type logMessage struct {
	App       string
	Host      string
	Timestamp time.Time
	HTTPCode  uint32
	Message   string
}
// prepare native go data
const batchSize = 10000
logs := make([]logMessage, 0, batchSize)
for i := 0; i < batchSize; i++ {
	logs = append(logs, logMessage{
		App:       fmt.Sprintf("App_%d", i/256),
		Host:      fmt.Sprintf("192.168.0.%d", i%256),
		Timestamp: time.Now().Add(time.Millisecond * time.Duration(i%1000)),
		HTTPCode:  200,
		Message:   "GET /images/logo.png HTTP/1.1",
	})
}
// execute bulk upsert with native ydb data
err = db.Table().Do( // Do retry operation on errors with best effort
	ctx, // context manage exiting from Do
	func(ctx context.Context, s table.Session) (err error) { // retry operation
		rows := make([]types.Value, 0, len(logs))
		for _, msg := range logs {
			rows = append(rows, types.StructValue(
				types.StructFieldValue("App", types.TextValue(msg.App)),
				types.StructFieldValue("Host", types.TextValue(msg.Host)),
				types.StructFieldValue("Timestamp", types.TimestampValueFromTime(msg.Timestamp)),
				types.StructFieldValue("HTTPCode", types.Uint32Value(msg.HTTPCode)),
				types.StructFieldValue("Message", types.TextValue(msg.Message)),
			))
		}
		return s.BulkUpsert(ctx, "/local/bulk_upsert_example", types.ListValue(rows...),
			options.WithCallOptions(grpc.UseCompressor(gzip.Name)),
		)
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (CopyTables)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
err = db.Table().Do(ctx,
	func(ctx context.Context, s table.Session) (err error) {
		return s.CopyTables(ctx,
			options.CopyTablesItem(
				path.Join(db.Name(), "from", "series"),
				path.Join(db.Name(), "to", "series"),
				true,
			),
		)
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (CreateTable)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
err = db.Table().Do(ctx,
	func(ctx context.Context, s table.Session) (err error) {
		return s.CreateTable(ctx, path.Join(db.Name(), "series"),
			options.WithColumn("series_id", types.Optional(types.TypeUint64)),
			options.WithColumn("title", types.Optional(types.TypeText)),
			options.WithColumn("series_info", types.Optional(types.TypeText)),
			options.WithColumn("release_date", types.Optional(types.TypeDate)),
			options.WithColumn("expire_at", types.Optional(types.TypeDate)),
			options.WithColumn("comment", types.Optional(types.TypeText)),
			options.WithPrimaryKeyColumn("series_id"),
			options.WithTimeToLiveSettings(
				options.NewTTLSettings().ColumnDateType("expire_at").ExpireAfter(time.Hour),
			),
			options.WithIndex("idx_series_title",
				options.WithIndexColumns("title"),
				options.WithIndexType(options.GlobalAsyncIndex()),
			),
		)
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (DataQueryWithCompression)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
var (
	query = `SELECT 42 as id, "my string" as myStr`
	id    int32  // required value
	myStr string // optional value
)
err = db.Table().Do( // Do retry operation on errors with best effort
	ctx, // context manage exiting from Do
	func(ctx context.Context, s table.Session) (err error) { // retry operation
		_, res, err := s.Execute(ctx, table.DefaultTxControl(), query, nil,
			options.WithCallOptions(
				grpc.UseCompressor(gzip.Name),
			),
		)
		if err != nil {
			return err // for auto-retry with driver
		}
		defer res.Close()                                // cleanup resources
		if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it
			return err // for auto-retry with driver
		}
		for res.NextRow() { // iterate over rows
			err = res.ScanNamed(
				named.Required("id", &id),
				named.OptionalWithDefault("myStr", &myStr),
			)
			if err != nil {
				return err // generally scan error not retryable, return it for driver check error
			}
			fmt.Printf("id=%v, myStr='%s'\n", id, myStr)
		}
		return res.Err() // return finally result error for auto-retry with driver
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (LazyTransaction)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx)
err = db.Table().Do(ctx,
	func(ctx context.Context, session table.Session) (err error) {
		// execute query with opening lazy transaction
		tx, result, err := session.Execute(ctx,
			table.SerializableReadWriteTxControl(),
			"DECLARE $id AS Uint64; "+
				"SELECT `title`, `description` FROM `path/to/mytable` WHERE id = $id",
			table.NewQueryParameters(
				table.ValueParam("$id", types.Uint64Value(1)),
			),
		)
		if err != nil {
			return err
		}
		defer func() {
			_ = tx.Rollback(ctx)
			_ = result.Close()
		}()
		if !result.NextResultSet(ctx) {
			return retry.RetryableError(fmt.Errorf("no result sets"))
		}
		if !result.NextRow() {
			return retry.RetryableError(fmt.Errorf("no rows"))
		}
		var (
			id          uint64
			title       string
			description string
		)
		if err = result.ScanNamed(
			named.OptionalWithDefault("id", &id),
			named.OptionalWithDefault("title", &title),
			named.OptionalWithDefault("description", &description),
		); err != nil {
			return err
		}
		fmt.Println(id, title, description)
		// execute query with commit transaction
		_, err = tx.Execute(ctx,
			"DECLARE $id AS Uint64; "+
				"DECLARE $description AS Text; "+
				"UPSERT INTO `path/to/mytable` "+
				"(id, description) "+
				"VALUES ($id, $description);",
			table.NewQueryParameters(
				table.ValueParam("$id", types.Uint64Value(1)),
				table.ValueParam("$description", types.TextValue("changed description")),
			),
			options.WithCommit(),
		)
		if err != nil {
			return err
		}
		return result.Err()
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (ScanQueryWithCompression)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
var (
	query = `SELECT 42 as id, "my string" as myStr`
	id    int32  // required value
	myStr string // optional value
)
err = db.Table().Do( // Do retry operation on errors with best effort
	ctx, // context manage exiting from Do
	func(ctx context.Context, s table.Session) (err error) { // retry operation
		res, err := s.StreamExecuteScanQuery(ctx, query, nil,
			options.WithCallOptions(
				grpc.UseCompressor(gzip.Name),
			),
		)
		if err != nil {
			return err // for auto-retry with driver
		}
		defer res.Close()                                // cleanup resources
		if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it
			return err // for auto-retry with driver
		}
		for res.NextRow() { // iterate over rows
			err = res.ScanNamed(
				named.Required("id", &id),
				named.OptionalWithDefault("myStr", &myStr),
			)
			if err != nil {
				return err // generally scan error not retryable, return it for driver check error
			}
			fmt.Printf("id=%v, myStr='%s'\n", id, myStr)
		}
		return res.Err() // return finally result error for auto-retry with driver
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}
Example (Select)
ctx := context.TODO()
db, err := ydb.Open(ctx, "grpc://localhost:2136/local")
if err != nil {
	fmt.Printf("failed connect: %v", err)
	return
}
defer db.Close(ctx) // cleanup resources
var (
	query = `SELECT 42 as id, "my string" as myStr`
	id    int32  // required value
	myStr string // optional value
)
err = db.Table().Do( // Do retry operation on errors with best effort
	ctx, // context manage exiting from Do
	func(ctx context.Context, s table.Session) (err error) { // retry operation
		_, res, err := s.Execute(ctx, table.DefaultTxControl(), query, nil)
		if err != nil {
			return err // for auto-retry with driver
		}
		defer res.Close()                                // cleanup resources
		if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it
			return err // for auto-retry with driver
		}
		for res.NextRow() { // iterate over rows
			err = res.ScanNamed(
				named.Required("id", &id),
				named.OptionalWithDefault("myStr", &myStr),
			)
			if err != nil {
				return err // generally scan error not retryable, return it for driver check error
			}
			fmt.Printf("id=%v, myStr='%s'\n", id, myStr)
		}
		return res.Err() // return finally result error for auto-retry with driver
	},
	table.WithIdempotent(),
)
if err != nil {
	fmt.Printf("unexpected error: %v", err)
}

Index

Examples

Constants

View Source
const (
	SessionStatusUnknown = SessionStatus("unknown")
	SessionReady         = SessionStatus("ready")
	SessionBusy          = SessionStatus("busy")
	SessionClosing       = SessionStatus("closing")
	SessionClosed        = SessionStatus("closed")
)

Variables

This section is empty.

Functions

func WithID added in v3.53.0

func WithID(id string) idOption

func WithIdempotent added in v3.2.1

func WithIdempotent() idempotentOption

func WithRetryOptions added in v3.53.0

func WithRetryOptions(retryOptions []retry.Option) retryOptionsOption

func WithTrace added in v3.15.0

func WithTrace(t trace.Table) traceOption

func WithTxCommitOptions added in v3.5.0

func WithTxCommitOptions(opts ...options.CommitTransactionOption) txCommitOptionsOption

func WithTxSettings added in v3.5.0

func WithTxSettings(txSettings *TransactionSettings) txSettingsOption

Types

type Client

type Client interface {
	// CreateSession returns session or error for manually control of session lifecycle
	//
	// CreateSession implements internal busy loop until one of the following conditions is met:
	// - context was canceled or deadlined
	// - session was created
	//
	// Deprecated: don't use CreateSession explicitly. This method only for ORM's compatibility.
	// Use Do for queries with session
	CreateSession(ctx context.Context, opts ...Option) (s ClosableSession, err error)

	// Do provide the best effort for execute operation.
	//
	// Do implements internal busy loop until one of the following conditions is met:
	// - deadline was canceled or deadlined
	// - retry operation returned nil as error
	//
	// Warning: if context without deadline or cancellation func than Do can run indefinitely.
	Do(ctx context.Context, op Operation, opts ...Option) error

	// DoTx provide the best effort for execute transaction.
	//
	// DoTx implements internal busy loop until one of the following conditions is met:
	// - deadline was canceled or deadlined
	// - retry operation returned nil as error
	//
	// DoTx makes auto begin (with TxSettings, by default - SerializableReadWrite), commit and
	// rollback (on error) of transaction.
	//
	// If op TxOperation returns nil - transaction will be committed
	// If op TxOperation return non nil - transaction will be rollback
	// Warning: if context without deadline or cancellation func than DoTx can run indefinitely
	DoTx(ctx context.Context, op TxOperation, opts ...Option) error
}

type ClosableSession added in v3.5.0

type ClosableSession interface {
	closer.Closer

	Session
}

type DataQuery

type DataQuery interface {
	String() string
	ID() string
	YQL() string
}

DataQuery only for tracers

type DataQueryExplanation

type DataQueryExplanation struct {
	Explanation

	AST string
}

DataQueryExplanation is a result of ExplainDataQuery call.

type Explanation added in v3.7.0

type Explanation struct {
	Plan string
}

Explanation is a result of Explain calls.

type Operation added in v3.2.1

type Operation func(ctx context.Context, s Session) error

Operation is the interface that holds an operation for retry. if Operation returns not nil - operation will retry if Operation returns nil - retry loop will break

type Option added in v3.2.1

type Option interface {
	ApplyTableOption(opts *Options)
}

type Options added in v3.2.1

type Options struct {
	ID              string
	Idempotent      bool
	TxSettings      *TransactionSettings
	TxCommitOptions []options.CommitTransactionOption
	RetryOptions    []retry.Option
	Trace           *trace.Table
}

type ParameterOption

type ParameterOption interface {
	Name() string
	Value() types.Value
}

func ValueParam

func ValueParam(name string, v types.Value) ParameterOption

type QueryParameters

type QueryParameters struct {
	// contains filtered or unexported fields
}

func NewQueryParameters

func NewQueryParameters(opts ...ParameterOption) *QueryParameters

func (*QueryParameters) Add

func (q *QueryParameters) Add(params ...ParameterOption)

func (*QueryParameters) Count added in v3.44.0

func (q *QueryParameters) Count() int

func (*QueryParameters) Each

func (q *QueryParameters) Each(it func(name string, v types.Value))

func (*QueryParameters) Params

func (q *QueryParameters) Params() queryParams

func (*QueryParameters) String

func (q *QueryParameters) String() string

type ScriptingYQLExplanation added in v3.7.0

type ScriptingYQLExplanation struct {
	Explanation

	ParameterTypes map[string]types.Type
}

ScriptingYQLExplanation is a result of Explain calls.

type Session

type Session interface {
	SessionInfo

	CreateTable(
		ctx context.Context,
		path string,
		opts ...options.CreateTableOption,
	) (err error)

	DescribeTable(
		ctx context.Context,
		path string,
		opts ...options.DescribeTableOption,
	) (desc options.Description, err error)

	DropTable(
		ctx context.Context,
		path string,
		opts ...options.DropTableOption,
	) (err error)

	AlterTable(
		ctx context.Context,
		path string,
		opts ...options.AlterTableOption,
	) (err error)

	CopyTable(
		ctx context.Context,
		dst, src string,
		opts ...options.CopyTableOption,
	) (err error)

	CopyTables(
		ctx context.Context,
		opts ...options.CopyTablesOption,
	) (err error)

	Explain(
		ctx context.Context,
		query string,
	) (exp DataQueryExplanation, err error)

	// Prepare prepares query for executing in the future
	Prepare(
		ctx context.Context,
		query string,
	) (stmt Statement, err error)

	// Execute executes query.
	//
	// By default, Execute have a flag options.WithKeepInCache(true) if params is not empty. For redefine behavior -
	// append option options.WithKeepInCache(false)
	Execute(
		ctx context.Context,
		tx *TransactionControl,
		query string,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (txr Transaction, r result.Result, err error)

	ExecuteSchemeQuery(
		ctx context.Context,
		query string,
		opts ...options.ExecuteSchemeQueryOption,
	) (err error)

	DescribeTableOptions(
		ctx context.Context,
	) (desc options.TableOptionsDescription, err error)

	StreamReadTable(
		ctx context.Context,
		path string,
		opts ...options.ReadTableOption,
	) (r result.StreamResult, err error)

	StreamExecuteScanQuery(
		ctx context.Context,
		query string,
		params *QueryParameters,
		opts ...options.ExecuteScanQueryOption,
	) (_ result.StreamResult, err error)

	BulkUpsert(
		ctx context.Context,
		table string,
		rows types.Value,
		opts ...options.BulkUpsertOption,
	) (err error)

	ReadRows(
		ctx context.Context,
		path string,
		keys types.Value,
		opts ...options.ReadRowsOption,
	) (_ result.Result, err error)

	BeginTransaction(
		ctx context.Context,
		tx *TransactionSettings,
	) (x Transaction, err error)

	KeepAlive(
		ctx context.Context,
	) error
}

type SessionInfo

type SessionInfo interface {
	ID() string
	NodeID() uint32
	Status() SessionStatus
	LastUsage() time.Time
}

type SessionStatus added in v3.37.7

type SessionStatus = string

type Statement

type Statement interface {
	Execute(
		ctx context.Context,
		tx *TransactionControl,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (txr Transaction, r result.Result, err error)
	NumInput() int
	Text() string
}

type Transaction

type Transaction interface {
	TransactionActor

	CommitTx(
		ctx context.Context,
		opts ...options.CommitTransactionOption,
	) (r result.Result, err error)
	Rollback(
		ctx context.Context,
	) (err error)
}

type TransactionActor added in v3.5.2

type TransactionActor interface {
	TransactionIdentifier

	Execute(
		ctx context.Context,
		query string,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (result.Result, error)
	ExecuteStatement(
		ctx context.Context,
		stmt Statement,
		params *QueryParameters,
		opts ...options.ExecuteDataQueryOption,
	) (result.Result, error)
}

type TransactionControl

type TransactionControl struct {
	// contains filtered or unexported fields
}

func DefaultTxControl added in v3.20.0

func DefaultTxControl() *TransactionControl

DefaultTxControl returns default transaction control with serializable read-write isolation mode and auto-commit

func OnlineReadOnlyTxControl added in v3.36.0

func OnlineReadOnlyTxControl(opts ...TxOnlineReadOnlyOption) *TransactionControl

OnlineReadOnlyTxControl returns online read-only transaction control

func SerializableReadWriteTxControl added in v3.37.0

func SerializableReadWriteTxControl(opts ...TxControlOption) *TransactionControl

SerializableReadWriteTxControl returns transaction control with serializable read-write isolation mode

func StaleReadOnlyTxControl added in v3.36.0

func StaleReadOnlyTxControl() *TransactionControl

StaleReadOnlyTxControl returns stale read-only transaction control

func TxControl

func TxControl(opts ...TxControlOption) *TransactionControl

TxControl makes transaction control from given options

func (*TransactionControl) Desc

type TransactionIdentifier added in v3.5.2

type TransactionIdentifier interface {
	ID() string
}

type TransactionSettings

type TransactionSettings struct {
	// contains filtered or unexported fields
}

func TxSettings

func TxSettings(opts ...TxOption) *TransactionSettings

TxSettings returns transaction settings

func (*TransactionSettings) Settings

type TxControlOption

type TxControlOption func(*txControlDesc)

func BeginTx

func BeginTx(opts ...TxOption) TxControlOption

BeginTx returns begin transaction control option

func CommitTx

func CommitTx() TxControlOption

CommitTx returns commit transaction control option

func WithTxID added in v3.41.0

func WithTxID(txID string) TxControlOption

type TxOnlineReadOnlyOption

type TxOnlineReadOnlyOption func(*txOnlineReadOnly)

func WithInconsistentReads

func WithInconsistentReads() TxOnlineReadOnlyOption

type TxOperation added in v3.5.0

type TxOperation func(ctx context.Context, tx TransactionActor) error

TxOperation is the interface that holds an operation for retry. if TxOperation returns not nil - operation will retry if TxOperation returns nil - retry loop will break

type TxOption

type TxOption func(*txDesc)

Transaction control options

func WithOnlineReadOnly

func WithOnlineReadOnly(opts ...TxOnlineReadOnlyOption) TxOption

func WithSerializableReadWrite

func WithSerializableReadWrite() TxOption

func WithSnapshotReadOnly added in v3.38.0

func WithSnapshotReadOnly() TxOption

func WithStaleReadOnly

func WithStaleReadOnly() TxOption

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL