extensions

package
v0.0.0-...-5ea0f31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2024 License: Apache-2.0 Imports: 12 Imported by: 0

README

Extensions

The goal of xCherry is to write MINIMUM code to support any database. xCherry creates a minimum interface to implement an extension.

  • SQL DB interfaces defines the contracts of how to implement an extension for a SQL database
  • NoSQL DB interfaces defines the contracts of how to implement an extension for a NoSQL database
    • This is more generic interface for any database that doesn't works with the go-sql-driver

xCherry will support any database as long as it can:

  • Execute transactions on multiple tables with locking
  • Has CDC(Change data capture) support (ideally, works with Apache Pulsar Connect)

Steps to implement a new database extension

TODO more details

Each sub-directory should contain all the implementation for that database specific. For example, all the MySQL specific logic should be in mysql/ package, including:

  • SQL schemas
  • SQL implementations
  • Error handling
  • Testing
  • Tooling logic
  • etc

Resources

Documentation

Index

Constants

View Source
const (
	// CLIFlagEndpoint is the cli option for endpoint
	CLIFlagEndpoint = "endpoint"
	// CLIFlagPort is the cli option for port
	CLIFlagPort = "port"
	// CLIFlagUser is the cli option for user
	CLIFlagUser = "user"
	// CLIFlagPassword is the cli option for password
	CLIFlagPassword = "password"
	CLIFlagDatabase = "database"
	CLIFlagFile     = "file"
)

Variables

This section is empty.

Functions

func CreateDatabase

func CreateDatabase(cfg config.SQL, name string) error

func CreateDatabaseByCli

func CreateDatabaseByCli(cli *cli.Context, extensionName string) error

CreateDatabaseByCli creates a sql database

func DropDatabase

func DropDatabase(cfg config.SQL, name string) error

func RegisterSQLDBExtension

func RegisterSQLDBExtension(name string, ext SQLDBExtension)

RegisterSQLDBExtension will register a SQL extension

func SetupSchema

func SetupSchema(cfg *config.SQL, filePath string) error

func SetupSchemaByCli

func SetupSchemaByCli(cli *cli.Context, extensionName string) error

SetupSchemaByCli setup schema for a new database

func ValidateConnectConfig

func ValidateConnectConfig(cfg *config.SQL) error

ValidateConnectConfig validates params

Types

type AppDatabaseTableRow

type AppDatabaseTableRow struct {
	TableName               string
	PrimaryKeyColumnToValue map[string]string
	OtherColumnToValue      map[string]string
}

type AppDatabaseTableRowSelect

type AppDatabaseTableRowSelect struct {
	ColumnToValue map[string]string
}

type AsyncStateExecutionRow

type AsyncStateExecutionRow struct {
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string
	StateId                  string
	StateIdSequence          int32

	Status data_models.StateExecutionStatus

	WaitUntilCommands       types.JSONText
	WaitUntilCommandResults types.JSONText

	PreviousVersion int32 // for conditional check

	LastFailure types.JSONText

	Input types.JSONText
	Info  types.JSONText
}

type AsyncStateExecutionRowForUpdate

type AsyncStateExecutionRowForUpdate struct {
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string

	StateId         string
	StateIdSequence int32

	Status data_models.StateExecutionStatus

	WaitUntilCommands       types.JSONText
	WaitUntilCommandResults types.JSONText

	LastFailure types.JSONText

	PreviousVersion int32 // for conditional check
}

type AsyncStateExecutionRowForUpdateWithoutCommands

type AsyncStateExecutionRowForUpdateWithoutCommands struct {
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string

	StateId         string
	StateIdSequence int32

	Status data_models.StateExecutionStatus

	LastFailure types.JSONText

	PreviousVersion int32 // for conditional check
}

type AsyncStateExecutionSelectFilter

type AsyncStateExecutionSelectFilter struct {
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string

	StateId         string
	StateIdSequence int32
}

type ErrorChecker

type ErrorChecker interface {
	IsDupEntryError(err error) bool
	IsNotFoundError(err error) bool
	IsTimeoutError(err error) bool
	IsThrottlingError(err error) bool
	IsConditionalUpdateFailure(err error) bool
}

type ExecutionVisibilityRow

type ExecutionVisibilityRow struct {
	Namespace                string
	ProcessId                string
	ProcessExecutionId       uuid.UUID
	ProcessExecutionIdString string
	ProcessTypeName          string
	Status                   data_models.ProcessExecutionStatus
	StartTime                time.Time
	CloseTime                time.Time
}

type ImmediateTaskRangeDeleteFilter

type ImmediateTaskRangeDeleteFilter struct {
	ShardId int32

	MinTaskSequenceInclusive int64
	MaxTaskSequenceInclusive int64
}

type ImmediateTaskRow

type ImmediateTaskRow struct {
	ShardId      int32
	TaskSequence int64

	TaskType data_models.ImmediateTaskType

	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string
	StateId                  string
	StateIdSequence          int32

	Info types.JSONText
}

type ImmediateTaskRowDeleteFilter

type ImmediateTaskRowDeleteFilter struct {
	ShardId      int32
	TaskSequence int64

	OptionalPartitionKey *data_models.PartitionKey
}

type ImmediateTaskRowForInsert

type ImmediateTaskRowForInsert struct {
	ShardId  int32
	TaskType data_models.ImmediateTaskType

	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string
	// StateId and StateIdSequence will be "" and 0 when TaskType is persistence.ImmediateTaskTypeNewLocalQueueMessages
	StateId         string
	StateIdSequence int32

	Info types.JSONText
}

type LatestProcessExecutionRow

type LatestProcessExecutionRow struct {
	Namespace          string
	ProcessId          string
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string
}

type LocalAttributeRow

type LocalAttributeRow struct {
	ProcessExecutionId       uuid.UUID
	ProcessExecutionIdString string
	Key                      string
	Value                    types.JSONText
}

type LocalQueueMessageRow

type LocalQueueMessageRow struct {
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string

	QueueName string

	DedupId uuid.UUID
	// See the top of the file for why we need this field
	DedupIdString string

	Payload types.JSONText
}

type ProcessExecutionRow

type ProcessExecutionRow struct {
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string

	ShardId int32

	Status                 data_models.ProcessExecutionStatus
	HistoryEventIdSequence int32

	StateExecutionSequenceMaps types.JSONText
	StateExecutionLocalQueues  types.JSONText

	Namespace string

	ProcessId                 string
	StartTime                 time.Time
	TimeoutSeconds            int32
	Info                      types.JSONText
	GracefulCompleteRequested bool
}

type ProcessExecutionRowForUpdate

type ProcessExecutionRowForUpdate struct {
	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string

	ShardId int32

	Status                 data_models.ProcessExecutionStatus
	HistoryEventIdSequence int32

	StateExecutionSequenceMaps types.JSONText
	StateExecutionLocalQueues  types.JSONText

	GracefulCompleteRequested bool
}

type SQLAdminDBSession

type SQLAdminDBSession interface {
	CreateDatabase(ctx context.Context, database string) error
	DropDatabase(ctx context.Context, database string) error
	ExecuteSchemaDDL(ctx context.Context, ddlQuery string) error
	Close() error
}

func NewSQLAdminSession

func NewSQLAdminSession(cfg *config.SQL) (SQLAdminDBSession, error)

NewSQLAdminSession returns a AdminDB

type SQLDBExtension

type SQLDBExtension interface {
	// StartDBSession starts the session for regular business logic
	StartDBSession(cfg *config.SQL) (SQLDBSession, error)
	// StartAdminDBSession starts the session for admin operation like DDL
	StartAdminDBSession(cfg *config.SQL) (SQLAdminDBSession, error)
}

type SQLDBSession

type SQLDBSession interface {
	ErrorChecker
	StartTransaction(ctx context.Context, opts *sql.TxOptions) (SQLTransaction, error)
	Close() error
	// contains filtered or unexported methods
}

func NewSQLSession

func NewSQLSession(cfg *config.SQL) (SQLDBSession, error)

NewSQLSession returns a regular session

type SQLTransaction

type SQLTransaction interface {
	Commit() error
	Rollback() error
	// contains filtered or unexported methods
}

type TimerTaskRangeSelectFilter

type TimerTaskRangeSelectFilter struct {
	ShardId int32

	MaxFireTimeUnixSecondsInclusive int64
	PageSize                        int32
}

type TimerTaskRow

type TimerTaskRow struct {
	ShardId             int32
	FireTimeUnixSeconds int64
	TaskSequence        int64

	TaskType data_models.TimerTaskType

	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string
	StateId                  string
	StateIdSequence          int32

	Info types.JSONText
}

type TimerTaskRowDeleteFilter

type TimerTaskRowDeleteFilter struct {
	ShardId             int32
	FireTimeUnixSeconds int64
	TaskSequence        int64

	OptionalPartitionKey *data_models.PartitionKey
}

type TimerTaskRowForInsert

type TimerTaskRowForInsert struct {
	ShardId             int32
	FireTimeUnixSeconds int64
	TaskType            data_models.TimerTaskType

	ProcessExecutionId uuid.UUID
	// See the top of the file for why we need this field
	ProcessExecutionIdString string
	StateId                  string
	StateIdSequence          int32

	Info types.JSONText
}

type TimerTaskSelectByTimestampsFilter

type TimerTaskSelectByTimestampsFilter struct {
	ShardId int32

	FireTimeUnixSeconds      []int64
	MinTaskSequenceInclusive int64
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL