Documentation
¶
Index ¶
- Constants
- Variables
- type AfterSchedule
- type AgentSource
- type CancelFunc
- type Controller
- func (c *Controller) PullDumpInfo(server pb.DBLogController_PullDumpInfoServer) (err error)
- func (c *Controller) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error)
- func (c *Controller) SetScheduleCoolDown(ctx context.Context, req *pb.SetScheduleCoolDownRequest) (*pb.SetScheduleCoolDownResponse, error)
- func (c *Controller) StopSchedule(ctx context.Context, req *pb.StopScheduleRequest) (*pb.StopScheduleResponse, error)
- type DumpInfo
- type DumpInfoPuller
- type GRPCDumpInfoPuller
- type Gateway
- type MemoryScheduler
- func (s *MemoryScheduler) Ack(uri string, client string, requeue string)
- func (s *MemoryScheduler) Register(uri string, client string, fn OnSchedule) (CancelFunc, error)
- func (s *MemoryScheduler) Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error
- func (s *MemoryScheduler) SetCoolDown(uri string, dur time.Duration)
- func (s *MemoryScheduler) StopSchedule(uri string)
- type OnSchedule
- type PGXSourceDumper
- type Scheduler
- type SourceDumper
- type SourceResolver
- type StaticAgentPulsarResolver
- type StaticAgentPulsarURIConfig
Constants ¶
View Source
const DumpQuery = `SELECT * FROM %s WHERE ctid >= ($1::bigint, 0)::text::tid AND ctid <= ($2::bigint, 65535)::text::tid`
DumpQuery retrieves all the rows in the specified block range. pg14 and above knows how to directly access those blocks using a TID Range Scan node, so partial scans are efficient. The tid format is (block_number, offset_number), the offset number being an unsigned short integer (<= 65535). Note that we have to use the upper bound as is (and therefore add knowledge about the maximum offset number) rather than use (block_number + 1, 0), in the unlikely event that we were provided the maximum block number Note also that the caller is responsible for providing a properly quoted and fully qualified relation name.
Variables ¶
View Source
var ErrAlreadyRegistered = errors.New("already registered")
View Source
var ErrAlreadyScheduled = errors.New("already scheduled")
View Source
var (
ErrCaptureInitMessageRequired = errors.New("the first request should be a CaptureInit message")
)
View Source
var ErrEmptyURI = errors.New("first request uri should not be empty")
View Source
var ErrLSNFallBehind = errors.New("lsn fall behind")
View Source
var ErrLSNMissing = errors.New("missing lsn record")
View Source
var ErrMissingTable = errors.New("missing Schema or table")
View Source
var ErrURINotFound = errors.New("requested uri not found")
Functions ¶
This section is empty.
Types ¶
type AfterSchedule ¶
type AfterSchedule func()
type AgentSource ¶
type AgentSource struct {
// contains filtered or unexported fields
}
func NewAgentSourceDumper ¶
func NewAgentSourceDumper(ctx context.Context, url string) (*AgentSource, error)
func (*AgentSource) LoadDump ¶
func (a *AgentSource) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) (changes []*pb.Change, err error)
func (*AgentSource) Stop ¶
func (a *AgentSource) Stop()
type CancelFunc ¶
type CancelFunc func()
type Controller ¶
type Controller struct {
pb.UnimplementedDBLogControllerServer
Scheduler Scheduler
// contains filtered or unexported fields
}
func NewController ¶
func NewController(scheduler Scheduler) *Controller
func (*Controller) PullDumpInfo ¶
func (c *Controller) PullDumpInfo(server pb.DBLogController_PullDumpInfoServer) (err error)
func (*Controller) Schedule ¶
func (c *Controller) Schedule(ctx context.Context, req *pb.ScheduleRequest) (*pb.ScheduleResponse, error)
func (*Controller) SetScheduleCoolDown ¶
func (c *Controller) SetScheduleCoolDown(ctx context.Context, req *pb.SetScheduleCoolDownRequest) (*pb.SetScheduleCoolDownResponse, error)
func (*Controller) StopSchedule ¶
func (c *Controller) StopSchedule(ctx context.Context, req *pb.StopScheduleRequest) (*pb.StopScheduleResponse, error)
type DumpInfo ¶
type DumpInfo struct {
Resp *pb.DumpInfoResponse
// contains filtered or unexported fields
}
type DumpInfoPuller ¶
type GRPCDumpInfoPuller ¶
type GRPCDumpInfoPuller struct {
Client pb.DBLogControllerClient
}
type Gateway ¶
type Gateway struct {
pb.UnimplementedDBLogGatewayServer
SourceResolver SourceResolver
DumpInfoPuller DumpInfoPuller
}
type MemoryScheduler ¶
type MemoryScheduler struct {
// contains filtered or unexported fields
}
func NewMemoryScheduler ¶
func NewMemoryScheduler(interval time.Duration) *MemoryScheduler
func (*MemoryScheduler) Ack ¶
func (s *MemoryScheduler) Ack(uri string, client string, requeue string)
func (*MemoryScheduler) Register ¶
func (s *MemoryScheduler) Register(uri string, client string, fn OnSchedule) (CancelFunc, error)
func (*MemoryScheduler) Schedule ¶
func (s *MemoryScheduler) Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error
func (*MemoryScheduler) SetCoolDown ¶
func (s *MemoryScheduler) SetCoolDown(uri string, dur time.Duration)
func (*MemoryScheduler) StopSchedule ¶
func (s *MemoryScheduler) StopSchedule(uri string)
type OnSchedule ¶
type OnSchedule func(response *pb.DumpInfoResponse) error
type PGXSourceDumper ¶
type PGXSourceDumper struct {
SkipLSNCheck bool
// contains filtered or unexported fields
}
func NewPGXSourceDumper ¶
func NewPGXSourceDumper(ctx context.Context, url string) (*PGXSourceDumper, error)
func (*PGXSourceDumper) LoadDump ¶
func (p *PGXSourceDumper) LoadDump(minLSN uint64, info *pb.DumpInfoResponse) ([]*pb.Change, error)
func (*PGXSourceDumper) Stop ¶
func (p *PGXSourceDumper) Stop()
type Scheduler ¶
type Scheduler interface {
Schedule(uri string, dumps []*pb.DumpInfoResponse, fn AfterSchedule) error
Register(uri string, client string, fn OnSchedule) (CancelFunc, error)
Ack(uri string, client string, requeue string)
SetCoolDown(uri string, dur time.Duration)
StopSchedule(uri string)
}
type SourceDumper ¶
type SourceResolver ¶
type StaticAgentPulsarResolver ¶
type StaticAgentPulsarResolver struct {
// contains filtered or unexported fields
}
func NewStaticAgentPulsarResolver ¶
func NewStaticAgentPulsarResolver(config map[string]StaticAgentPulsarURIConfig) *StaticAgentPulsarResolver
func (*StaticAgentPulsarResolver) Dumper ¶
func (r *StaticAgentPulsarResolver) Dumper(ctx context.Context, uri string) (SourceDumper, error)
func (*StaticAgentPulsarResolver) Source ¶
func (r *StaticAgentPulsarResolver) Source(ctx context.Context, uri string) (source.RequeueSource, error)
Click to show internal directories.
Click to hide internal directories.