master

package
v1.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 28, 2025 License: Apache-2.0 Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SlaveStatusOnline represents online status.
	SlaveStatusOnline = "online"
)

Variables

View Source
var (
	ErrSlaveRequestFailed = errors.New("slave request failed")
	ErrSlaveHealthFailed  = errors.New("slave health check failed")
)

Static errors for better error handling.

View Source
var (
	ErrNotSlaveFilePath     = errors.New("not a slave file path")
	ErrInvalidSlaveFilePath = errors.New("invalid slave file path format")
	ErrSlaveNotFound        = errors.New("slave not found")
	ErrSlaveNotOnline       = errors.New("slave not online")
)

Static errors for better error handling.

View Source
var (
	// ErrSlaveIDConflict indicates that a slave ID is already registered with different IP/Port.
	ErrSlaveIDConflict = errors.New("slave ID conflict")
)

Static errors for better error handling.

Functions

func FormatSlaveFilePath

func FormatSlaveFilePath(slaveID, filePath string) string

FormatSlaveFilePath formats a slave file path using the slave:// protocol.

func IsSlaveFile

func IsSlaveFile(filePath string) bool

IsSlaveFile checks if the file path is a slave file.

func IsValidSlaveID

func IsValidSlaveID(slaveID string) bool

IsValidSlaveID validates if a slave ID is in the correct format (16 hex characters).

func ParseSlaveFilePath

func ParseSlaveFilePath(filePath string) (slaveID, remotePath string, err error)

ParseSlaveFilePath parses slave file path: slave://slaveID/absolutePath

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client master to slave client.

func NewClient

func NewClient(masterConfig *config.MasterConfig) *Client

NewClient creates a new master client.

func (*Client) DownloadSlaveFile

func (c *Client) DownloadSlaveFile(ctx context.Context, slave *SlaveInfo, filePath string) (io.ReadCloser, error)

DownloadSlaveFile downloads file from slave.

func (*Client) DownloadSlaveFileWithSize

func (c *Client) DownloadSlaveFileWithSize(ctx context.Context, slave *SlaveInfo, filePath string, maxSize int64) (io.ReadCloser, error)

DownloadSlaveFileWithSize downloads file from slave with specific size limit.

func (*Client) PingSlaveHealth

func (c *Client) PingSlaveHealth(ctx context.Context, slave *SlaveInfo) error

PingSlaveHealth checks slave health status.

func (*Client) RequestAllSlaveFiles

func (c *Client) RequestAllSlaveFiles(ctx context.Context, registry *SlaveRegistry, req *TaskRequest) map[string]*TaskResponse

RequestAllSlaveFiles concurrently requests file info from all online slaves.

func (*Client) RequestAllSlaveFilesByContent

func (c *Client) RequestAllSlaveFilesByContent(ctx context.Context, registry *SlaveRegistry, req *TaskRequest) map[string]*TaskResponse

RequestAllSlaveFilesByContent concurrently requests file info from all online slaves based on content time.

func (*Client) RequestSlaveFiles

func (c *Client) RequestSlaveFiles(ctx context.Context, slave *SlaveInfo, req *TaskRequest) (*TaskResponse, error)

RequestSlaveFiles requests slave to scan files.

func (*Client) RequestSlaveFilesByContent

func (c *Client) RequestSlaveFilesByContent(ctx context.Context, slave *SlaveInfo, req *TaskRequest) (*TaskResponse, error)

RequestSlaveFilesByContent requests slave to scan files based on content time.

type FileManager

type FileManager struct {
	// contains filtered or unexported fields
}

FileManager manages master-slave file transfer.

func NewFileManager

func NewFileManager(client *Client, registry *SlaveRegistry) *FileManager

NewFileManager creates a file manager.

func (*FileManager) GetFileReader

func (fm *FileManager) GetFileReader(ctx context.Context, fileInfo model.FileInfo) (io.ReadCloser, error)

GetFileReader gets file reader, automatically handles local/remote files.

type FileTransferRequest

type FileTransferRequest struct {
	FilePath  string `json:"file_path"`
	Offset    int64  `json:"offset,omitempty"`     // Resume transfer offset
	ChunkSize int    `json:"chunk_size,omitempty"` // Chunk size
	MaxSize   int64  `json:"max_size,omitempty"`   // Maximum file size to read (for append files)
}

FileTransferRequest file transfer request structure.

type HeartbeatRequest

type HeartbeatRequest struct {
	SlaveID      string   `json:"slave_id"`
	Version      string   `json:"version"`
	Capabilities []string `json:"capabilities"`
	Status       string   `json:"status"`
}

HeartbeatRequest heartbeat request structure.

type HeartbeatResponse

type HeartbeatResponse struct {
	Success        bool   `json:"success"`
	Message        string `json:"message,omitempty"`
	NeedReRegister bool   `json:"need_re_register,omitempty"` // If true, slave should re-register
}

HeartbeatResponse heartbeat response structure.

type LimitedReadCloser

type LimitedReadCloser struct {
	// contains filtered or unexported fields
}

LimitedReadCloser wraps an io.ReadCloser with a size limit.

func NewLimitedReadCloser

func NewLimitedReadCloser(reader io.ReadCloser, limit int64) *LimitedReadCloser

func (*LimitedReadCloser) Close

func (lrc *LimitedReadCloser) Close() error

func (*LimitedReadCloser) Read

func (lrc *LimitedReadCloser) Read(p []byte) (n int, err error)

type RegisterRequest

type RegisterRequest struct {
	SlaveID      string   `json:"slave_id"`
	IP           string   `json:"ip"`
	Port         int      `json:"port"`
	Version      string   `json:"version"`
	Capabilities []string `json:"capabilities"`
	FilePrefix   string   `json:"file_prefix"` // File folder prefix
}

RegisterRequest slave registration request structure.

type RegisterResponse

type RegisterResponse struct {
	Success  bool   `json:"success"`
	Message  string `json:"message,omitempty"`
	MasterID string `json:"master_id,omitempty"`
}

RegisterResponse slave registration response structure.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server master server.

func NewServer

func NewServer(port int, masterConfig *config.MasterConfig) *Server

NewServer creates a new master server.

func (*Server) GetRegistry

func (s *Server) GetRegistry() *SlaveRegistry

GetRegistry returns the slave registry.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start starts the server.

type SlaveFileInfo

type SlaveFileInfo struct {
	model.FileInfo
	SlaveID string `json:"slave_id"` // Source slave ID
}

SlaveFileInfo contains slave file information.

func (*SlaveFileInfo) GetRemotePath

func (sfi *SlaveFileInfo) GetRemotePath() string

GetRemotePath returns remote file identifier (using slave:// protocol format).

type SlaveInfo

type SlaveInfo struct {
	ID           string    `json:"id"`           // Unique slave identifier
	IP           string    `json:"ip"`           // Slave IP address
	Port         int       `json:"port"`         // Slave port
	LastSeen     time.Time `json:"last_seen"`    // Last heartbeat time
	Status       string    `json:"status"`       // Status: online, offline, timeout
	Version      string    `json:"version"`      // Slave version
	Capabilities []string  `json:"capabilities"` // Features supported by slave
	FilePrefix   string    `json:"file_prefix"`  // File folder prefix
}

SlaveInfo stores slave node information.

func (*SlaveInfo) GetAddr

func (s *SlaveInfo) GetAddr() string

GetAddr returns the complete slave address with proper IPv6 support.

func (*SlaveInfo) IsOnline

func (s *SlaveInfo) IsOnline() bool

IsOnline checks if slave is online.

type SlaveRegistry

type SlaveRegistry struct {
	// contains filtered or unexported fields
}

SlaveRegistry slave registry, thread-safe.

func NewSlaveRegistry

func NewSlaveRegistry() *SlaveRegistry

NewSlaveRegistry creates a new slave registry.

func (*SlaveRegistry) CheckAndCleanup

func (r *SlaveRegistry) CheckAndCleanup(timeout time.Duration) []string

CheckAndCleanup checks and marks timed-out slaves.

func (*SlaveRegistry) GetOnlineSlaves

func (r *SlaveRegistry) GetOnlineSlaves() []*SlaveInfo

GetOnlineSlaves returns all online slaves.

func (*SlaveRegistry) GetSlave

func (r *SlaveRegistry) GetSlave(slaveID string) (*SlaveInfo, bool)

GetSlave returns information for a specific slave.

func (*SlaveRegistry) GetSlaveCount

func (r *SlaveRegistry) GetSlaveCount() int

GetSlaveCount returns the total number of registered slaves.

func (*SlaveRegistry) Register

func (r *SlaveRegistry) Register(slave *SlaveInfo) error

Register registers or updates a slave. Returns error if slave ID conflicts with existing slave (different IP/Port).

func (*SlaveRegistry) Unregister

func (r *SlaveRegistry) Unregister(slaveID string)

Unregister removes a slave from registry.

func (*SlaveRegistry) UpdateHeartbeat

func (r *SlaveRegistry) UpdateHeartbeat(slaveID string)

UpdateHeartbeat updates slave heartbeat timestamp.

type TaskRequest

type TaskRequest struct {
	TaskID              string    `json:"task_id"`
	StartTime           time.Time `json:"start_time"`
	EndTime             time.Time `json:"end_time"`
	ScanFolders         []string  `json:"scan_folders"`
	AdditionalFiles     []string  `json:"additional_files"`
	WhiteList           []string  `json:"whitelist,omitempty"`
	RecursivelyWalkDirs bool      `json:"recursively_walk_dirs,omitempty"`
}

TaskRequest task request structure.

type TaskResponse

type TaskResponse struct {
	TaskID  string          `json:"task_id"`
	Files   []SlaveFileInfo `json:"files"`
	Success bool            `json:"success"`
	Error   string          `json:"error,omitempty"`
}

TaskResponse task response structure.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL