db_common

package
v2.3.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 16, 2025 License: AGPL-3.0 Imports: 31 Imported by: 1

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrServiceInRecoveryMode = errors.New("service is in recovery mode")
View Source
var Functions = []SQLFunction{
	{
		Name:     "glob",
		Params:   map[string]string{"input_glob": "text"},
		Returns:  "text",
		Language: "plpgsql",
		Body: `
declare
	output_pattern text;
begin
	output_pattern = replace(input_glob, '*', '%');
	output_pattern = replace(output_pattern, '?', '_');
	return output_pattern;
end;
`,
	},
	{
		Name:     constants.FunctionCacheSet,
		Params:   map[string]string{"command": "text"},
		Returns:  "void",
		Language: "plpgsql",
		Body: `
begin
	IF command = 'on' THEN
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache','true');
	ELSIF command = 'off' THEN
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache','false');
	ELSIF command = 'clear' THEN
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache_clear_time','');
	ELSE
		RAISE EXCEPTION 'Unknown value % for set_cache - valid values are on, off and clear.', $1;
	END IF;
end;
`,
	},
	{
		Name:     constants.FunctionConnectionCacheClear,
		Params:   map[string]string{"connection": "text"},
		Returns:  "void",
		Language: "plpgsql",
		Body: `
begin
		INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('connection_cache_clear',connection);
end;
`,
	},
	{
		Name:     constants.FunctionCacheSetTtl,
		Params:   map[string]string{"duration": "int"},
		Returns:  "void",
		Language: "plpgsql",
		Body: `
begin
	INSERT INTO steampipe_internal.steampipe_settings("name","value") VALUES ('cache_ttl',duration);
end;
`,
	},
}

Functions is a list of SQLFunction objects that are installed in the db 'steampipe_internal' schema startup

Functions

func AddRootCertToConfig

func AddRootCertToConfig(config *pgconn.Config, certLocation string) error

func AddSearchPathPrefix

func AddSearchPathPrefix(searchPathPrefix []string, searchPath []string) []string

func BuildSearchPathResult

func BuildSearchPathResult(searchPathString string) ([]string, error)

func CacheClear

func CacheClear(ctx context.Context, connection *pgx.Conn) error

CacheClear resets the max time on the cache anything below this is not accepted

func CanSetCacheTtl

func CanSetCacheTtl(ss *ServerSettings, newTtl int) (bool, string)

func EnsureInternalSchemaSuffix

func EnsureInternalSchemaSuffix(searchPath []string) []string

func ExecuteQuery

func ExecuteQuery(ctx context.Context, client Client, queryString string, args ...any) (*queryresult.ResultStreamer, error)

ExecuteQuery executes a single query. If shutdownAfterCompletion is true, shutdown the client after completion

func ExecuteSystemClientCall

func ExecuteSystemClientCall(ctx context.Context, conn *pgx.Conn, executor SystemClientExecutor) error

ExecuteSystemClientCall creates a transaction and sets the application_name to the one used by the system client, executes the callback and sets the application name back to the client app name

func GetCommentsQueryForPlugin

func GetCommentsQueryForPlugin(connectionName string, p map[string]*proto.TableSchema) string

func GetDeleteConnectionQuery

func GetDeleteConnectionQuery(name string) string

func GetMissingSchemaFromIsRelationNotFoundError

func GetMissingSchemaFromIsRelationNotFoundError(err error) (string, string, bool)

func GetUpdateConnectionQuery

func GetUpdateConnectionQuery(connectionName, pluginSchemaName string) string

func GetUserSearchPath

func GetUserSearchPath(ctx context.Context, conn *pgx.Conn) ([]string, error)

func IsClientAppName

func IsClientAppName(appName string) bool

func IsClientSystemAppName

func IsClientSystemAppName(appName string) bool

func IsRelationNotFoundError

func IsRelationNotFoundError(err error) bool

func IsSchemaNameValid

func IsSchemaNameValid(name string) (bool, string)

IsSchemaNameValid verifies that the given string is a valid pgsql schema name

func IsServiceAppName

func IsServiceAppName(appName string) bool

func LoadForeignSchemaNames

func LoadForeignSchemaNames(ctx context.Context, conn *pgx.Conn) ([]string, error)

func MaxDbConnections

func MaxDbConnections() int

func PgEscapeName

func PgEscapeName(name string) string

PgEscapeName escapes strings which will be usaed for Podsdtgres object identifiers (table names, column names, schema names)

func PgEscapeSearchPath

func PgEscapeSearchPath(searchPath []string) []string

PgEscapeSearchPath applies postgres escaping to search path and remove whitespace

func PgEscapeString

func PgEscapeString(str string) string

PgEscapeString escapes strings which are to be inserted use a custom escape tag to avoid chance of clash with the escaped text https://medium.com/@lnishada/postgres-dollar-quoting-6d23e4f186ec

func SetCacheEnabled

func SetCacheEnabled(ctx context.Context, enabled bool, connection *pgx.Conn) error

SetCacheEnabled enables/disables the cache

func SetCacheTtl

func SetCacheTtl(ctx context.Context, duration time.Duration, connection *pgx.Conn) error

SetCacheTtl set the cache ttl on the client

func ValidateClientCacheEnabled

func ValidateClientCacheEnabled(c Client) error_helpers.ErrorAndWarnings

func ValidateClientCacheSettings

func ValidateClientCacheSettings(c Client) error_helpers.ErrorAndWarnings

func ValidateClientCacheTtl

func ValidateClientCacheTtl(c Client) error_helpers.ErrorAndWarnings

func WaitForConnection

func WaitForConnection(ctx context.Context, connStr string, options ...WaitOption) (conn *pgx.Conn, err error)

func WaitForConnectionPing

func WaitForConnectionPing(ctx context.Context, connection *pgx.Conn, waitOptions ...WaitOption) (err error)

WaitForConnectionPing PINGs the DB - retrying after a backoff of constants.ServicePingInterval - but only for constants.DBConnectionTimeout returns the error from the database if the dbClient does not respond successfully after a timeout

func WaitForPool

func WaitForPool(ctx context.Context, db *pgxpool.Pool, waitOptions ...WaitOption) (err error)

WaitForPool waits for the db to start accepting connections and returns true returns false if the dbClient does not start within a stipulated time,

func WaitForRecovery

func WaitForRecovery(ctx context.Context, connection *pgx.Conn, waitOptions ...WaitOption) (err error)

WaitForRecovery returns an error (ErrRecoveryMode) if the service stays in recovery mode for more than constants.DBRecoveryWaitTimeout

Types

type AcquireSessionResult

type AcquireSessionResult struct {
	Session *DatabaseSession
	error_helpers.ErrorAndWarnings
}

type Client

type Client interface {
	Close(context.Context) error
	LoadUserSearchPath(context.Context) error

	SetRequiredSessionSearchPath(context.Context) error
	GetRequiredSessionSearchPath() []string
	GetCustomSearchPath() []string

	// acquire a management database connection - must be closed
	AcquireManagementConnection(context.Context) (*pgxpool.Conn, error)
	// acquire a query execution session (which search pathand cache options  set) - must be closed
	AcquireSession(context.Context) *AcquireSessionResult

	ExecuteSync(context.Context, string, ...any) (*pqueryresult.SyncQueryResult, error)
	Execute(context.Context, string, ...any) (*queryresult.Result, error)

	ExecuteSyncInSession(context.Context, *DatabaseSession, string, ...any) (*pqueryresult.SyncQueryResult, error)
	ExecuteInSession(context.Context, *DatabaseSession, func(), string, ...any) (*queryresult.Result, error)

	ResetPools(context.Context)
	GetSchemaFromDB(context.Context) (*SchemaMetadata, error)

	ServerSettings() *ServerSettings
	RegisterNotificationListener(f func(notification *pgconn.Notification))
}

type ColumnSchema

type ColumnSchema struct {
	ID          string
	Name        string
	NotNull     bool
	Type        string
	Default     string
	Description string
}

ColumnSchema contains the details of a single column in a table

type DatabaseSession

type DatabaseSession struct {
	BackendPid uint32   `json:"backend_pid"`
	SearchPath []string `json:"-"`

	// this gets rewritten, since the database/sql gives back a new instance everytime
	Connection *pgxpool.Conn `json:"-"`
}

DatabaseSession wraps over the raw database connection the purpose is to be able

  • to store the current search path of the connection without having to make a database round-trip
  • To store the last scan_metadata id used on this connection

func NewDBSession

func NewDBSession(backendPid uint32) *DatabaseSession

func (*DatabaseSession) Close

func (s *DatabaseSession) Close(waitForCleanup bool)

type InitResult

type InitResult struct {
	Error    error
	Warnings []string
	Messages []string

	// allow overriding of the display functions
	DisplayMessage func(ctx context.Context, m string)
	DisplayWarning func(ctx context.Context, w string)
}

func (*InitResult) AddMessage

func (r *InitResult) AddMessage(message string)

func (*InitResult) AddWarnings

func (r *InitResult) AddWarnings(warnings ...string)

func (*InitResult) DisplayMessages

func (r *InitResult) DisplayMessages()

func (*InitResult) HasMessages

func (r *InitResult) HasMessages() bool

type NotificationListener

type NotificationListener struct {
	// contains filtered or unexported fields
}

func NewNotificationListener

func NewNotificationListener(ctx context.Context, conn *pgx.Conn) (*NotificationListener, error)

func (*NotificationListener) RegisterListener

func (c *NotificationListener) RegisterListener(onNotification func(*pgconn.Notification))

func (*NotificationListener) Stop

func (c *NotificationListener) Stop(ctx context.Context)

type QueryWithArgs

type QueryWithArgs struct {
	Query string
	Args  []any
}

type SQLFunction

type SQLFunction struct {
	Name     string
	Params   map[string]string
	Returns  string
	Body     string
	Language string
}

SQLFunction is a struct for an sqlFunc

type SchemaMetadata

type SchemaMetadata struct {
	// map {schemaname, {map {tablename -> tableschema}}
	Schemas map[string]map[string]TableSchema
	// the name of the temporary schema
	TemporarySchemaName string
}

SchemaMetadata is a struct to represent the schema of the database

func LoadSchemaMetadata

func LoadSchemaMetadata(ctx context.Context, conn *pgx.Conn, query string) (*SchemaMetadata, error)

func NewSchemaMetadata

func NewSchemaMetadata() *SchemaMetadata

func (*SchemaMetadata) GetSchemas

func (m *SchemaMetadata) GetSchemas() []string

GetSchemas returns all foreign schema names

func (*SchemaMetadata) GetTablesInSchema

func (m *SchemaMetadata) GetTablesInSchema(schemaName string) map[string]struct{}

GetTablesInSchema returns a lookup of all foreign tables in a given foreign schema

type ServerSettings

type ServerSettings struct {
	StartTime        time.Time `db:"start_time"`
	SteampipeVersion string    `db:"steampipe_version"`
	FdwVersion       string    `db:"fdw_version"`
	CacheMaxTtl      int       `db:"cache_max_ttl"`
	CacheMaxSizeMb   int       `db:"cache_max_size_mb"`
	CacheEnabled     bool      `db:"cache_enabled"`
}

type SystemClientExecutor

type SystemClientExecutor func(context.Context, pgx.Tx) error

SystemClientExecutor is the executor function that is called within a transaction make sure that by the time the executor finishes execution, the connection is freed otherwise we will get a `conn is busy` error

type TableSchema

type TableSchema struct {
	// map columnName -> columnSchema
	Columns     map[string]ColumnSchema
	Name        string
	FullName    string
	Schema      string
	Description string
}

TableSchema contains the details of a single table in the schema

type WaitOption

type WaitOption func(w *waitConfig)

func WithRetryInterval

func WithRetryInterval(d time.Duration) WaitOption

func WithTimeout

func WithTimeout(d time.Duration) WaitOption

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL