dblog

package
v0.0.70 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: Apache-2.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const DumpQuery = `SELECT * FROM %s WHERE ctid >= ($1::bigint, 0)::text::tid AND ctid <= ($2::bigint, 65535)::text::tid`

DumpQuery retrieves all the rows in the specified block range. pg14 and above knows how to directly access those blocks using a TID Range Scan node, so partial scans are efficient. The tid format is (block_number, offset_number), the offset number being an unsigned short integer (<= 65535). Note that we have to use the upper bound as is (and therefore add knowledge about the maximum offset number) rather than use (block_number + 1, 0), in the unlikely event that we were provided the maximum block number Note also that the caller is responsible for providing a properly quoted and fully qualified relation name.

Variables

View Source
var ErrAlreadyRegistered = errors.New("already registered")
View Source
var ErrAlreadyScheduled = errors.New("already scheduled")
View Source
var (
	ErrCaptureInitMessageRequired = errors.New("the first request should be a CaptureInit message")
)
View Source
var ErrEmptyURI = errors.New("first request uri should not be empty")
View Source
var ErrLSNFallBehind = errors.New("lsn fall behind")
View Source
var ErrLSNMissing = errors.New("missing lsn record")
View Source
var ErrMissingTable = errors.New("missing Schema or table")
View Source
var ErrURINotFound = errors.New("requested uri not found")

Functions

This section is empty.

Types

type AfterSchedule

type AfterSchedule func()

type AgentSource

type AgentSource struct {
	// contains filtered or unexported fields
}

func NewAgentSourceDumper

func NewAgentSourceDumper(ctx context.Context, url string) (*AgentSource, error)

func (*AgentSource) LoadDump

func (a *AgentSource) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) (changes []*pb.Change, err error)

func (*AgentSource) Stop

func (a *AgentSource) Stop()

type CancelFunc

type CancelFunc func()

type Controller

type Controller struct {
	pb.UnimplementedDBLogControllerServer
	Scheduler Scheduler
	// contains filtered or unexported fields
}

func NewController

func NewController(scheduler Scheduler) *Controller

func (*Controller) PullDumpInfo

func (c *Controller) PullDumpInfo(server pb.DBLogController_PullDumpInfoServer) (err error)

func (*Controller) Schedule

func (c *Controller) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error)

func (*Controller) SetScheduleCoolDown

func (*Controller) StopSchedule

type DumpInfo

type DumpInfo struct {
	Resp *pb.DumpInfoResponse
	// contains filtered or unexported fields
}

func (*DumpInfo) Ack

func (i *DumpInfo) Ack(requeueReason string) error

type DumpInfoPuller

type DumpInfoPuller interface {
	Pull(ctx context.Context, uri string) chan DumpInfo
}

type GRPCDumpInfoPuller

type GRPCDumpInfoPuller struct {
	Client pb.DBLogControllerClient
}

func (*GRPCDumpInfoPuller) Pull

func (p *GRPCDumpInfoPuller) Pull(ctx context.Context, uri string) chan DumpInfo

type Gateway

type Gateway struct {
	pb.UnimplementedDBLogGatewayServer
	SourceResolver SourceResolver
	DumpInfoPuller DumpInfoPuller
}

func (*Gateway) Capture

func (s *Gateway) Capture(server pb.DBLogGateway_CaptureServer) error

func (*Gateway) Serve

func (s *Gateway) Serve(ctx context.Context, ln net.Listener, opts ...grpc.ServerOption) error

type MemoryScheduler

type MemoryScheduler struct {
	// contains filtered or unexported fields
}

func NewMemoryScheduler

func NewMemoryScheduler(interval time.Duration) *MemoryScheduler

func (*MemoryScheduler) Ack

func (s *MemoryScheduler) Ack(uri string, client string, requeue string)

func (*MemoryScheduler) Register

func (s *MemoryScheduler) Register(uri string, client string, fn OnSchedule) (CancelFunc, error)

func (*MemoryScheduler) Schedule

func (s *MemoryScheduler) Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error

func (*MemoryScheduler) SetCoolDown

func (s *MemoryScheduler) SetCoolDown(uri string, dur time.Duration)

func (*MemoryScheduler) StopSchedule

func (s *MemoryScheduler) StopSchedule(uri string)

type OnSchedule

type OnSchedule func(response *pb.DumpInfoResponse) error

type PGXSourceDumper

type PGXSourceDumper struct {
	SkipLSNCheck bool
	// contains filtered or unexported fields
}

func NewPGXSourceDumper

func NewPGXSourceDumper(ctx context.Context, url string) (*PGXSourceDumper, error)

func (*PGXSourceDumper) LoadDump

func (p *PGXSourceDumper) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.Change, error)

func (*PGXSourceDumper) Stop

func (p *PGXSourceDumper) Stop()

type Scheduler

type Scheduler interface {
	Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error
	Register(uri string, client string, fn OnSchedule) (CancelFunc, error)
	Ack(uri string, client string, requeue string)
	SetCoolDown(uri string, dur time.Duration)
	StopSchedule(uri string)
}

type SourceDumper

type SourceDumper interface {
	LoadDump(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.Change, error)
	Stop()
}

type SourceResolver

type SourceResolver interface {
	Source(ctx context.Context, uri string) (source.RequeueSource, error)
	Dumper(ctx context.Context, uri string) (SourceDumper, error)
}

type StaticAgentPulsarResolver

type StaticAgentPulsarResolver struct {
	// contains filtered or unexported fields
}

func NewStaticAgentPulsarResolver

func NewStaticAgentPulsarResolver(config map[string]StaticAgentPulsarURIConfig) *StaticAgentPulsarResolver

func (*StaticAgentPulsarResolver) Dumper

func (*StaticAgentPulsarResolver) Source

type StaticAgentPulsarURIConfig

type StaticAgentPulsarURIConfig struct {
	PulsarURL            string
	PulsarTopic          string
	PulsarSubscription   string
	PulsarReplicateState bool
	AgentURL             string
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL