Documentation
¶
Index ¶
- Constants
- Variables
- func FormatSlaveFilePath(slaveID, filePath string) string
- func IsSlaveFile(filePath string) bool
- func IsValidSlaveID(slaveID string) bool
- func ParseSlaveFilePath(filePath string) (slaveID, remotePath string, err error)
- type Client
- func (c *Client) DownloadSlaveFile(ctx context.Context, slave *SlaveInfo, filePath string) (io.ReadCloser, error)
- func (c *Client) DownloadSlaveFileWithSize(ctx context.Context, slave *SlaveInfo, filePath string, maxSize int64) (io.ReadCloser, error)
- func (c *Client) PingSlaveHealth(ctx context.Context, slave *SlaveInfo) error
- func (c *Client) RequestAllSlaveFiles(ctx context.Context, registry *SlaveRegistry, req *TaskRequest) map[string]*TaskResponse
- func (c *Client) RequestAllSlaveFilesByContent(ctx context.Context, registry *SlaveRegistry, req *TaskRequest) map[string]*TaskResponse
- func (c *Client) RequestSlaveFiles(ctx context.Context, slave *SlaveInfo, req *TaskRequest) (*TaskResponse, error)
- func (c *Client) RequestSlaveFilesByContent(ctx context.Context, slave *SlaveInfo, req *TaskRequest) (*TaskResponse, error)
- type FileManager
- type FileTransferRequest
- type HeartbeatRequest
- type HeartbeatResponse
- type LimitedReadCloser
- type RegisterRequest
- type RegisterResponse
- type Server
- type SlaveFileInfo
- type SlaveInfo
- type SlaveRegistry
- func (r *SlaveRegistry) CheckAndCleanup(timeout time.Duration) []string
- func (r *SlaveRegistry) GetOnlineSlaves() []*SlaveInfo
- func (r *SlaveRegistry) GetSlave(slaveID string) (*SlaveInfo, bool)
- func (r *SlaveRegistry) GetSlaveCount() int
- func (r *SlaveRegistry) Register(slave *SlaveInfo) error
- func (r *SlaveRegistry) Unregister(slaveID string)
- func (r *SlaveRegistry) UpdateHeartbeat(slaveID string)
- type TaskRequest
- type TaskResponse
Constants ¶
const (
// SlaveStatusOnline represents online status.
SlaveStatusOnline = "online"
)
Variables ¶
var ( ErrSlaveRequestFailed = errors.New("slave request failed") ErrSlaveHealthFailed = errors.New("slave health check failed") )
Static errors for better error handling.
var ( ErrNotSlaveFilePath = errors.New("not a slave file path") ErrInvalidSlaveFilePath = errors.New("invalid slave file path format") ErrSlaveNotFound = errors.New("slave not found") ErrSlaveNotOnline = errors.New("slave not online") )
Static errors for better error handling.
var ( // ErrSlaveIDConflict indicates that a slave ID is already registered with different IP/Port. ErrSlaveIDConflict = errors.New("slave ID conflict") )
Static errors for better error handling.
Functions ¶
func FormatSlaveFilePath ¶
FormatSlaveFilePath formats a slave file path using the slave:// protocol.
func IsSlaveFile ¶
IsSlaveFile checks if the file path is a slave file.
func IsValidSlaveID ¶
IsValidSlaveID validates if a slave ID is in the correct format (16 hex characters).
func ParseSlaveFilePath ¶
ParseSlaveFilePath parses slave file path: slave://slaveID/absolutePath
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client master to slave client.
func NewClient ¶
func NewClient(masterConfig *config.MasterConfig) *Client
NewClient creates a new master client.
func (*Client) DownloadSlaveFile ¶
func (c *Client) DownloadSlaveFile(ctx context.Context, slave *SlaveInfo, filePath string) (io.ReadCloser, error)
DownloadSlaveFile downloads file from slave.
func (*Client) DownloadSlaveFileWithSize ¶
func (c *Client) DownloadSlaveFileWithSize(ctx context.Context, slave *SlaveInfo, filePath string, maxSize int64) (io.ReadCloser, error)
DownloadSlaveFileWithSize downloads file from slave with specific size limit.
func (*Client) PingSlaveHealth ¶
PingSlaveHealth checks slave health status.
func (*Client) RequestAllSlaveFiles ¶
func (c *Client) RequestAllSlaveFiles(ctx context.Context, registry *SlaveRegistry, req *TaskRequest) map[string]*TaskResponse
RequestAllSlaveFiles concurrently requests file info from all online slaves.
func (*Client) RequestAllSlaveFilesByContent ¶
func (c *Client) RequestAllSlaveFilesByContent(ctx context.Context, registry *SlaveRegistry, req *TaskRequest) map[string]*TaskResponse
RequestAllSlaveFilesByContent concurrently requests file info from all online slaves based on content time.
func (*Client) RequestSlaveFiles ¶
func (c *Client) RequestSlaveFiles(ctx context.Context, slave *SlaveInfo, req *TaskRequest) (*TaskResponse, error)
RequestSlaveFiles requests slave to scan files.
func (*Client) RequestSlaveFilesByContent ¶
func (c *Client) RequestSlaveFilesByContent(ctx context.Context, slave *SlaveInfo, req *TaskRequest) (*TaskResponse, error)
RequestSlaveFilesByContent requests slave to scan files based on content time.
type FileManager ¶
type FileManager struct {
// contains filtered or unexported fields
}
FileManager manages master-slave file transfer.
func NewFileManager ¶
func NewFileManager(client *Client, registry *SlaveRegistry) *FileManager
NewFileManager creates a file manager.
func (*FileManager) GetFileReader ¶
func (fm *FileManager) GetFileReader(ctx context.Context, fileInfo model.FileInfo) (io.ReadCloser, error)
GetFileReader gets file reader, automatically handles local/remote files.
type FileTransferRequest ¶
type FileTransferRequest struct {
FilePath string `json:"file_path"`
Offset int64 `json:"offset,omitempty"` // Resume transfer offset
ChunkSize int `json:"chunk_size,omitempty"` // Chunk size
MaxSize int64 `json:"max_size,omitempty"` // Maximum file size to read (for append files)
}
FileTransferRequest file transfer request structure.
type HeartbeatRequest ¶
type HeartbeatRequest struct {
SlaveID string `json:"slave_id"`
Version string `json:"version"`
Capabilities []string `json:"capabilities"`
Status string `json:"status"`
}
HeartbeatRequest heartbeat request structure.
type HeartbeatResponse ¶
type HeartbeatResponse struct {
Success bool `json:"success"`
Message string `json:"message,omitempty"`
NeedReRegister bool `json:"need_re_register,omitempty"` // If true, slave should re-register
}
HeartbeatResponse heartbeat response structure.
type LimitedReadCloser ¶
type LimitedReadCloser struct {
// contains filtered or unexported fields
}
LimitedReadCloser wraps an io.ReadCloser with a size limit.
func NewLimitedReadCloser ¶
func NewLimitedReadCloser(reader io.ReadCloser, limit int64) *LimitedReadCloser
func (*LimitedReadCloser) Close ¶
func (lrc *LimitedReadCloser) Close() error
type RegisterRequest ¶
type RegisterRequest struct {
SlaveID string `json:"slave_id"`
IP string `json:"ip"`
Port int `json:"port"`
Version string `json:"version"`
Capabilities []string `json:"capabilities"`
FilePrefix string `json:"file_prefix"` // File folder prefix
}
RegisterRequest slave registration request structure.
type RegisterResponse ¶
type RegisterResponse struct {
Success bool `json:"success"`
Message string `json:"message,omitempty"`
MasterID string `json:"master_id,omitempty"`
}
RegisterResponse slave registration response structure.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server master server.
func NewServer ¶
func NewServer(port int, masterConfig *config.MasterConfig) *Server
NewServer creates a new master server.
func (*Server) GetRegistry ¶
func (s *Server) GetRegistry() *SlaveRegistry
GetRegistry returns the slave registry.
type SlaveFileInfo ¶
SlaveFileInfo contains slave file information.
func (*SlaveFileInfo) GetRemotePath ¶
func (sfi *SlaveFileInfo) GetRemotePath() string
GetRemotePath returns remote file identifier (using slave:// protocol format).
type SlaveInfo ¶
type SlaveInfo struct {
ID string `json:"id"` // Unique slave identifier
IP string `json:"ip"` // Slave IP address
Port int `json:"port"` // Slave port
LastSeen time.Time `json:"last_seen"` // Last heartbeat time
Status string `json:"status"` // Status: online, offline, timeout
Version string `json:"version"` // Slave version
Capabilities []string `json:"capabilities"` // Features supported by slave
FilePrefix string `json:"file_prefix"` // File folder prefix
}
SlaveInfo stores slave node information.
type SlaveRegistry ¶
type SlaveRegistry struct {
// contains filtered or unexported fields
}
SlaveRegistry slave registry, thread-safe.
func NewSlaveRegistry ¶
func NewSlaveRegistry() *SlaveRegistry
NewSlaveRegistry creates a new slave registry.
func (*SlaveRegistry) CheckAndCleanup ¶
func (r *SlaveRegistry) CheckAndCleanup(timeout time.Duration) []string
CheckAndCleanup checks and marks timed-out slaves.
func (*SlaveRegistry) GetOnlineSlaves ¶
func (r *SlaveRegistry) GetOnlineSlaves() []*SlaveInfo
GetOnlineSlaves returns all online slaves.
func (*SlaveRegistry) GetSlave ¶
func (r *SlaveRegistry) GetSlave(slaveID string) (*SlaveInfo, bool)
GetSlave returns information for a specific slave.
func (*SlaveRegistry) GetSlaveCount ¶
func (r *SlaveRegistry) GetSlaveCount() int
GetSlaveCount returns the total number of registered slaves.
func (*SlaveRegistry) Register ¶
func (r *SlaveRegistry) Register(slave *SlaveInfo) error
Register registers or updates a slave. Returns error if slave ID conflicts with existing slave (different IP/Port).
func (*SlaveRegistry) Unregister ¶
func (r *SlaveRegistry) Unregister(slaveID string)
Unregister removes a slave from registry.
func (*SlaveRegistry) UpdateHeartbeat ¶
func (r *SlaveRegistry) UpdateHeartbeat(slaveID string)
UpdateHeartbeat updates slave heartbeat timestamp.
type TaskRequest ¶
type TaskRequest struct {
TaskID string `json:"task_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
ScanFolders []string `json:"scan_folders"`
AdditionalFiles []string `json:"additional_files"`
WhiteList []string `json:"whitelist,omitempty"`
RecursivelyWalkDirs bool `json:"recursively_walk_dirs,omitempty"`
}
TaskRequest task request structure.
type TaskResponse ¶
type TaskResponse struct {
TaskID string `json:"task_id"`
Files []SlaveFileInfo `json:"files"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
}
TaskResponse task response structure.