Documentation
¶
Index ¶
- Variables
- func AuthenticatedUsername(ctx context.Context) string
- func ErrorCode(writer *buffer.Writer, err error) error
- func GetAttribute(ctx context.Context, key string) (interface{}, bool)
- func IsSuperUser(ctx context.Context) bool
- func ListenAndServe(address string, handler ParseFn) error
- func NewErrMultipleCommandsStatements() error
- func NewErrUndefinedStatement() error
- func NewErrUnimplementedMessageType(t types.ClientMessage) error
- func NewErrUnkownStatement(name string) error
- func ParseParameters(query string) []uint32
- func RemoteAddress(ctx context.Context) net.Addr
- func SetAttribute(ctx context.Context, key string, value interface{}) bool
- func TypeMap(ctx context.Context) *pgtype.Map
- type AuthStrategy
- type BackendKeyDataFunc
- type BinaryCopyReader
- type CancelRequestFn
- type CloseFn
- type Column
- type Columns
- func (columns Columns) CopyIn(ctx context.Context, writer *buffer.Writer, format FormatCode) error
- func (columns Columns) Define(ctx context.Context, writer *buffer.Writer, formats []FormatCode) error
- func (columns Columns) Write(ctx context.Context, formats []FormatCode, writer *buffer.Writer, srcs []any) (err error)
- type CopyReader
- type DataWriter
- type DefaultPortalCache
- func (cache *DefaultPortalCache) Bind(ctx context.Context, name string, stmt *Statement, parameters []Parameter, ...) error
- func (cache *DefaultPortalCache) Close()
- func (cache *DefaultPortalCache) Execute(ctx context.Context, name string, limit Limit, reader *buffer.Reader, ...) (err error)
- func (cache *DefaultPortalCache) Get(ctx context.Context, name string) (*Portal, error)
- type DefaultStatementCache
- type FlushFn
- type FormatCode
- type Limit
- type OptionFn
- func BackendKeyData(fn BackendKeyDataFunc) OptionFn
- func CancelRequest(fn CancelRequestFn) OptionFn
- func ClientAuth(auth tls.ClientAuthType) OptionFn
- func CloseConn(fn CloseFn) OptionFn
- func ExtendTypes(fn func(*pgtype.Map)) OptionFn
- func FlushConn(fn FlushFn) OptionFn
- func GlobalParameters(params Parameters) OptionFn
- func Logger(logger *slog.Logger) OptionFn
- func MessageBufferSize(size int) OptionFn
- func Portals(handler func() PortalCache) OptionFn
- func SessionAuthStrategy(fn AuthStrategy) OptionFn
- func SessionMiddleware(fn SessionHandler) OptionFn
- func Statements(handler func() StatementCache) OptionFn
- func TLSConfig(config *tls.Config) OptionFn
- func TerminateConn(fn CloseFn) OptionFn
- func Version(version string) OptionFn
- func WithShutdownTimeout(timeout time.Duration) OptionFn
- type Parameter
- type ParameterStatus
- type Parameters
- type ParseFn
- type Portal
- type PortalCache
- type PreparedOptionFn
- type PreparedStatement
- type PreparedStatementFn
- type PreparedStatements
- type Scanner
- type Server
- func (srv *Server) Close() error
- func (srv *Server) Handshake(conn net.Conn) (_ net.Conn, version types.Version, reader *buffer.Reader, err error)
- func (srv *Server) ListenAndServe(address string) error
- func (srv *Server) Serve(listener net.Listener) error
- func (srv *Server) Shutdown(ctx context.Context) error
- type Session
- type SessionHandler
- type Statement
- type StatementCache
- type TextCopyReader
Constants ¶
This section is empty.
Variables ¶
var CopySignature = []byte("PGCOPY\n\377\r\n\000")
CopySignature is the signature that is used to identify the start of a copy-in stream. The signature is used to identify the start of a copy-in stream and is used to determine the start of the copy-in data. https://www.postgresql.org/docs/current/sql-copy.html
var ErrClosedWriter = errors.New("closed writer")
ErrClosedWriter is returned when the data writer has been closed.
var ErrDataWritten = errors.New("data has already been written")
ErrDataWritten is returned when an empty result is attempted to be sent to the client while data has already been written.
var ErrRowLimitExceeded = pgerror.WithCode(errors.New("row limit exceeded"), codes.ProgramLimitExceeded)
var ErrUnknownOid = errors.New("unknown oid")
var QueryParameters = regexp.MustCompile(`\$(\d+)|\?`)
QueryParameters represents a regex which could be used to identify and lookup parameters defined inside a given query. Parameters could be defined as positional parameters and non-positional parameters.
Functions ¶
func AuthenticatedUsername ¶
AuthenticatedUsername returns the username of the authenticated user of the given connection context.
func ErrorCode ¶
ErrorCode writes an error message as response to a command with the given severity and error message. A ready for query message is written back to the client once the error has been written indicating the end of a command cycle. https://www.postgresql.org/docs/current/static/protocol-error-fields.html
func GetAttribute ¶
GetAttribute retrieves a custom attribute from the session by key. The first return value is the attribute value, which will be nil if the attribute doesn't exist. The second return value indicates whether the attribute was found.
Example:
tenantID, ok := wire.GetAttribute(ctx, "tenant_id")
if ok {
// Use tenantID
}
func IsSuperUser ¶
IsSuperUser checks whether the given connection context is a super user.
func ListenAndServe ¶
ListenAndServe opens a new Postgres server using the given address and default configurations. The given handler function is used to handle simple queries. This method should be used to construct a simple Postgres server for testing purposes or simple use cases.
func NewErrMultipleCommandsStatements ¶
func NewErrMultipleCommandsStatements() error
NewErrMultipleCommandsStatements is returned whenever multiple statements have been given within a single query during the extended query protocol.
func NewErrUndefinedStatement ¶
func NewErrUndefinedStatement() error
NewErrUndefinedStatement is returned whenever no statement has been defined within the incoming query.
func NewErrUnimplementedMessageType ¶
func NewErrUnimplementedMessageType(t types.ClientMessage) error
NewErrUnimplementedMessageType is called whenever an unimplemented message type is sent. This error indicates to the client that the sent message cannot be processed at this moment in time.
func NewErrUnkownStatement ¶
NewErrUnkownStatement is returned whenever no executable has been found for the given name.
func ParseParameters ¶
ParseParameters attempts to parse the parameters in the given string and returns the expected parameters. This is necessary for the query protocol where the parameter types are expected to be defined in the extended query protocol.
func RemoteAddress ¶
RemoteAddress returns the Postgres remote address connection info if it has been set inside the given context.
func SetAttribute ¶
SetAttribute sets a custom attribute in the session. The key is the attribute name, and value can be any type. Returns true if the attribute was set successfully, false if the session wasn't found.
Example:
wire.SetAttribute(ctx, "tenant_id", "tenant-123")
Types ¶
type AuthStrategy ¶
type AuthStrategy func(ctx context.Context, writer *buffer.Writer, reader *buffer.Reader) (_ context.Context, err error)
AuthStrategy represents an authentication strategy used to authenticate a user.
func ClearTextPassword ¶
func ClearTextPassword(validate func(ctx context.Context, database, username, password string) (context.Context, bool, error)) AuthStrategy
ClearTextPassword announces to the client to authenticate by sending a clear text password and validates if the provided username and password (received inside the client parameters) are valid. If the provided credentials are invalid or any unexpected error occurs, an error returned and the connection should be closed.
type BackendKeyDataFunc ¶
BackendKeyDataFunc represents a function that generates backend key data for query cancellation. It should return a process ID and secret key that can be used by clients to cancel queries.
type BinaryCopyReader ¶
type BinaryCopyReader struct {
// contains filtered or unexported fields
}
func NewBinaryColumnReader ¶
func NewBinaryColumnReader(ctx context.Context, copy *CopyReader) (_ *BinaryCopyReader, err error)
NewBinaryColumnReader creates a new column reader that reads rows from the given copy reader and returns the values as a slice of any values. The columns are used to determine the format of the data that is read from the reader. If the end of the copy-in stream is reached, an io.EOF error is returned.
type CancelRequestFn ¶
CancelRequestFn function called when a cancel request is received. The function receives the process ID and secret key from the cancel request. It should return an error if the cancel request cannot be processed.
type Column ¶
type Column struct {
Table int32 // table id
ID int32 // column identifier
Attr int16 // column attribute number
Name string // column name
AttrNo int16 // column attribute no (optional)
Oid uint32
Width int16
TypeModifier int32
}
Column represents a table column and its attributes such as name, type and encode formatter.
func (Column) Define ¶
Define writes the column header values to the given writer. This method is used to define a column inside RowDescription message defining the column type, width, and name.
func (Column) Write ¶
func (column Column) Write(ctx context.Context, writer *buffer.Writer, format FormatCode, src any) (err error)
Write encodes the given source value using the column type definition and connection info. The encoded byte buffer is added to the given write buffer. This method Is used to encode values and return them inside a DataRow message.
type Columns ¶
type Columns []Column
Columns represent a collection of columns.
func (Columns) CopyIn ¶
CopyIn sends a [CopyInResponse] to the client, to initiate a CopyIn operation. Based on the given columns within the prepared statement. https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-COPYINRESPONSE
func (Columns) Define ¶
func (columns Columns) Define(ctx context.Context, writer *buffer.Writer, formats []FormatCode) error
Define writes the table RowDescription headers for the given table and the containing columns. The headers have to be written before any data rows could be send back to the client. The given columns are encoded using the given format codes. Columns could be encoded as Text or Binary. If you provide a single format code, it will be applied to all columns.
func (Columns) Write ¶
func (columns Columns) Write(ctx context.Context, formats []FormatCode, writer *buffer.Writer, srcs []any) (err error)
Write writes the given column values back to the client. The given columns are encoded using the given format codes. Columns could be encoded as Text or Binary. If you provide a single format code, it will be applied to all columns.
type CopyReader ¶
func NewCopyReader ¶
NewCopyReader creates a new copy reader that reads copy-in data from the given reader and writes the data to the given writer. The columns are used to determine the format of the data that is read from the reader.
func (*CopyReader) Columns ¶
func (r *CopyReader) Columns() Columns
Columns returns the columns that are currently defined within the copy reader.
func (*CopyReader) Read ¶
func (r *CopyReader) Read() error
Read reads a single chunk from the copy-in stream. The read chunk is returned as a byte slice. If the end of the copy-in stream is reached, an io.EOF error is returned.
type DataWriter ¶
type DataWriter interface {
// Row writes a single data row containing the values inside the given slice to
// the underlaying Postgres client. The column headers have to be written before
// sending rows. Each item inside the slice represents a single column value.
// The slice length needs to be the same length as the defined columns. Nil
// values are encoded as NULL values.
Row([]any) error
// Limit returns the maximum number of rows to be written passed within the
// wire protocol. A value of 0 indicates no limit.
Limit() uint32
// Written returns the number of rows written to the client.
Written() uint32
// Empty announces to the client an empty response and that no data rows should
// be expected.
Empty() error
// Columns returns the columns that are currently defined within the writer.
Columns() Columns
// Complete announces to the client that the command has been completed and
// no further data should be expected.
//
// See [CommandComplete] for the expected format for different queries.
//
// [CommandComplete]: https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-COMMANDCOMPLETE
Complete(description string) error
// CopyIn sends a [CopyInResponse] to the client, to initiate a CopyIn
// operation. The copy operation can be used to send large amounts of data to
// the server in a single transaction. A column reader has to be used to read
// the data that is sent by the client to the CopyReader.
CopyIn(format FormatCode) (*CopyReader, error)
}
DataWriter represents a writer interface for writing columns and data rows using the Postgres wire to the connected client.
func NewDataWriter ¶
func NewDataWriter(ctx context.Context, columns Columns, formats []FormatCode, limit Limit, reader *buffer.Reader, writer *buffer.Writer) DataWriter
NewDataWriter constructs a new data writer using the given context and buffer. The returned writer should be handled with caution as it is not safe for concurrent use. Concurrent access to the same data without proper synchronization can result in unexpected behavior and data corruption.
type DefaultPortalCache ¶
type DefaultPortalCache struct {
// contains filtered or unexported fields
}
func (*DefaultPortalCache) Bind ¶
func (cache *DefaultPortalCache) Bind(ctx context.Context, name string, stmt *Statement, parameters []Parameter, formats []FormatCode) error
func (*DefaultPortalCache) Close ¶
func (cache *DefaultPortalCache) Close()
type DefaultStatementCache ¶
type DefaultStatementCache struct {
// contains filtered or unexported fields
}
func (*DefaultStatementCache) Close ¶
func (cache *DefaultStatementCache) Close()
func (*DefaultStatementCache) Get ¶
Get attempts to get the prepared statement for the given name. An error is returned when no statement has been found.
func (*DefaultStatementCache) Set ¶
func (cache *DefaultStatementCache) Set(ctx context.Context, name string, stmt *PreparedStatement) error
Set attempts to bind the given statement to the given name. Any previously defined statement is overridden.
type FormatCode ¶
type FormatCode int16
FormatCode represents the encoding format of a given column
const ( // TextFormat is the default, text format. TextFormat FormatCode = 0 // BinaryFormat is an alternative, binary, encoding. BinaryFormat FormatCode = 1 )
type Limit ¶
type Limit uint32
Limit represents the maximum number of rows to be written. Zero denotes “no limit”.
const NoLimit Limit = 0
type OptionFn ¶
OptionFn options pattern used to define and set options for the given PostgreSQL server.
func BackendKeyData ¶
func BackendKeyData(fn BackendKeyDataFunc) OptionFn
BackendKeyData sets the function that generates backend key data for query cancellation. The provided function should return a process ID and secret key that can be used by clients to cancel queries. If not set, no BackendKeyData message will be sent.
func CancelRequest ¶
func CancelRequest(fn CancelRequestFn) OptionFn
CancelRequest sets the cancel request handler for the server. This function is called when a client sends a cancel request with a process ID and secret key. The handler should validate the credentials and cancel the appropriate query if valid.
func ClientAuth ¶
func ClientAuth(auth tls.ClientAuthType) OptionFn
ClientAuth sets the client authentication type which is used to authenticate the client connection. The default value is tls.NoClientCert which means that no client authentication is performed.
func ExtendTypes ¶
ExtendTypes provides the ability to extend the underlying connection types. Types registered inside the given github.com/jackc/pgx/v5/pgtype.Map are registered to all incoming connections.
func FlushConn ¶
FlushConn registers a handler for Flush messages.
The provided handler is invoked when the frontend sends a Flush command. This allows the server to force any pending data in its output buffers to be delivered immediately.
Typically, a Flush is sent after an extended-query command (except Sync) when the frontend wants to inspect results before issuing more commands.
func GlobalParameters ¶
func GlobalParameters(params Parameters) OptionFn
GlobalParameters sets the server parameters which are send back to the front-end (client) once a handshake has been established.
func Logger ¶
Logger sets the given slog.Logger as the logger for the given server.
func MessageBufferSize ¶
MessageBufferSize sets the message buffer size which is allocated once a new connection gets constructed. If a negative value or zero value is provided is the default message buffer size used.
func Portals ¶
func Portals(handler func() PortalCache) OptionFn
Portals sets the portals cache used to cache statements for later use. By default DefaultPortalCache is used.
func SessionAuthStrategy ¶
func SessionAuthStrategy(fn AuthStrategy) OptionFn
SessionAuthStrategy sets the given authentication strategy within the given server. The authentication strategy is called when a handshake is initiated.
func SessionMiddleware ¶
func SessionMiddleware(fn SessionHandler) OptionFn
SessionMiddleware sets the given session handler within the underlying server. The session handler is called when a new connection is opened and authenticated allowing for additional metadata to be wrapped around the connection context.
func Statements ¶
func Statements(handler func() StatementCache) OptionFn
Statements sets the statement cache used to cache statements for later use. By default DefaultStatementCache is used.
func TLSConfig ¶
TLSConfig sets the given TLS config to be used to initialize a secure connection between the front-end (client) and back-end (server).
func TerminateConn ¶
TerminateConn sets the terminate connection handle inside the given server instance.
func Version ¶
Version sets the PostgreSQL version for the server which is send back to the front-end (client) once a handshake has been established.
func WithShutdownTimeout ¶
WithShutdownTimeout sets the timeout duration for graceful shutdown. When Shutdown is called, the server will wait up to this duration for active connections to finish before forcing closure. A timeout of 0 means wait indefinitely (no timeout).
type Parameter ¶
type Parameter struct {
// contains filtered or unexported fields
}
func NewParameter ¶
func NewParameter(types *pgtype.Map, format FormatCode, value []byte) Parameter
func (Parameter) Format ¶
func (p Parameter) Format() FormatCode
type ParameterStatus ¶
type ParameterStatus string
ParameterStatus represents a metadata key that could be defined inside a server/client metadata definition.
const ( ParamServerEncoding ParameterStatus = "server_encoding" ParamClientEncoding ParameterStatus = "client_encoding" ParamIsSuperuser ParameterStatus = "is_superuser" ParamSessionAuthorization ParameterStatus = "session_authorization" ParamApplicationName ParameterStatus = "application_name" ParamDatabase ParameterStatus = "database" ParamUsername ParameterStatus = "user" ParamServerVersion ParameterStatus = "server_version" )
At present there is a hard-wired set of parameters for which ParameterStatus will be generated. https://www.postgresql.org/docs/13/protocol-flow.html#PROTOCOL-ASYNC
type Parameters ¶
type Parameters map[ParameterStatus]string
Parameters represents a parameters collection of parameter status keys and their values.
func ClientParameters ¶
func ClientParameters(ctx context.Context) Parameters
ClientParameters returns the connection parameters if it has been set inside the given context.
func ServerParameters ¶
func ServerParameters(ctx context.Context) Parameters
ServerParameters returns the connection parameters if it has been set inside the given context.
type ParseFn ¶
type ParseFn func(ctx context.Context, query string) (PreparedStatements, error)
ParseFn parses the given query and returns a prepared statement which could be used to execute at a later point in time.
type PortalCache ¶
type PortalCache interface {
// Bind attempts to bind the given statement to the given name. Any
// previously defined statement is overridden.
Bind(ctx context.Context, name string, statement *Statement, parameters []Parameter, columns []FormatCode) error
// Get attempts to get the portal for the given name. An error is returned
// when no portal has been found.
Get(ctx context.Context, name string) (*Portal, error)
// Execute executes the prepared statement with the given name and parameters.
Execute(ctx context.Context, name string, limit Limit, reader *buffer.Reader, writer *buffer.Writer) error
// Close is called at the end of a connection. Close releases all resources
// held by the portal cache.
Close()
}
PortalCache represents a cache which could be used to bind and execute prepared statements with parameters.
func DefaultPortalCacheFn ¶
func DefaultPortalCacheFn() PortalCache
type PreparedOptionFn ¶
type PreparedOptionFn func(*PreparedStatement)
PreparedOptionFn options pattern used to define options while preparing a new statement.
func WithColumns ¶
func WithColumns(columns Columns) PreparedOptionFn
WithColumns sets the given columns as the columns which are returned by the prepared statement.
func WithParameters ¶
func WithParameters(parameters []uint32) PreparedOptionFn
WithParameters sets the given parameters as the parameters which are expected by the prepared statement.
type PreparedStatement ¶
type PreparedStatement struct {
// contains filtered or unexported fields
}
func NewStatement ¶
func NewStatement(fn PreparedStatementFn, options ...PreparedOptionFn) *PreparedStatement
NewStatement constructs a new prepared statement for the given function.
type PreparedStatementFn ¶
type PreparedStatementFn func(ctx context.Context, writer DataWriter, parameters []Parameter) error
PreparedStatementFn represents a query of which a statement has been prepared. The statement could be executed at any point in time with the given arguments and data writer.
type PreparedStatements ¶
type PreparedStatements []*PreparedStatement
func Prepared ¶
func Prepared(stmts ...*PreparedStatement) PreparedStatements
Prepared is a small wrapper function returning a list of prepared statements. More then one prepared statement could be returned within the simple query protocol. An error is returned when more than one prepared statement is returned in the extended query protocol.
type Scanner ¶
Scanner is a function that scans a byte slice and returns the value as an any
func NewScanner ¶
NewScanner creates a new scanner that scans a byte slice and returns the value as an any. The scanner uses the given map to decode the value and the given type to determine the format of the data that is scanned.
type Server ¶
type Server struct {
Auth AuthStrategy
BackendKeyData BackendKeyDataFunc
CancelRequest CancelRequestFn
BufferedMsgSize int
Parameters Parameters
TLSConfig *tls.Config
ClientAuth tls.ClientAuthType
Session SessionHandler
Statements func() StatementCache
Portals func() PortalCache
CloseConn CloseFn
TerminateConn CloseFn
FlushConn FlushFn
Version string
ShutdownTimeout time.Duration
// contains filtered or unexported fields
}
Server contains options for listening to an address.
func NewServer ¶
NewServer constructs a new Postgres server using the given address and server options.
func (*Server) Handshake ¶
func (srv *Server) Handshake(conn net.Conn) (_ net.Conn, version types.Version, reader *buffer.Reader, err error)
Handshake performs the connection handshake and returns the connection version and a buffered reader to read incoming messages send by the client.
func (*Server) ListenAndServe ¶
ListenAndServe opens a new Postgres server on the preconfigured address and starts accepting and serving incoming client connections.
func (*Server) Serve ¶
Serve accepts and serves incoming Postgres client connections using the preconfigured configurations. The given listener will be closed once the server is gracefully closed.
func (*Server) Shutdown ¶
Shutdown gracefully shuts down the server with context and timeout support. It stops accepting new connections and waits for active connections to finish within the shorter of the context deadline or the server's configured ShutdownTimeout. If the context has no deadline, the server's ShutdownTimeout is used.
type Session ¶
type Session struct {
*Server
Statements StatementCache
Portals PortalCache
Attributes map[string]interface{}
}
func GetSession ¶
GetSession retrieves the session from the context. The first return value is the session object, which can be used to access all session data. The second return value indicates whether the session was found in the context.
type SessionHandler ¶
SessionHandler represents a wrapper function defining the state of a single session. This function allows the user to wrap additional metadata around the shared context.
type StatementCache ¶
type StatementCache interface {
// Set attempts to bind the given statement to the given name. Any
// previously defined statement is overridden.
Set(ctx context.Context, name string, fn *PreparedStatement) error
// Get attempts to get the prepared statement for the given name. An error
// is returned when no statement has been found.
Get(ctx context.Context, name string) (*Statement, error)
// Close is called at the end of a connection. Close releases all resources
// held by the statement cache.
Close()
}
StatementCache represents a cache which could be used to store and retrieve prepared statements bound to a name.
func DefaultStatementCacheFn ¶
func DefaultStatementCacheFn() StatementCache
type TextCopyReader ¶
type TextCopyReader struct {
// contains filtered or unexported fields
}
func NewTextColumnReader ¶
func NewTextColumnReader(ctx context.Context, copy *CopyReader, csvReader *csv.Reader, csvReaderBuffer *bytes.Buffer, nullValue string) (_ *TextCopyReader, err error)