Documentation
¶
Index ¶
- func ParquetRowToJob(row *ParquetJobRow) (*schema.Job, *schema.JobData, error)
- type ClusterAwareParquetWriter
- type FileParquetSource
- type FileTarget
- type NodeStateParquetWriter
- type ParquetJobRow
- type ParquetNodeStateRow
- type ParquetSource
- type ParquetTarget
- type ParquetWriter
- type S3ParquetSource
- type S3Target
- type S3TargetConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParquetRowToJob ¶
ParquetRowToJob converts a ParquetJobRow back into job metadata and metric data. This is the reverse of JobToParquetRow.
Types ¶
type ClusterAwareParquetWriter ¶
type ClusterAwareParquetWriter struct {
// contains filtered or unexported fields
}
ClusterAwareParquetWriter organizes Parquet output by cluster. Each cluster gets its own subdirectory with a cluster.json config file.
func NewClusterAwareParquetWriter ¶
func NewClusterAwareParquetWriter(target ParquetTarget, maxSizeMB int) *ClusterAwareParquetWriter
NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.
func (*ClusterAwareParquetWriter) AddJob ¶
func (cw *ClusterAwareParquetWriter) AddJob(row ParquetJobRow) error
AddJob routes the job row to the appropriate per-cluster writer.
func (*ClusterAwareParquetWriter) Close ¶
func (cw *ClusterAwareParquetWriter) Close() error
Close writes cluster.json files and flushes all per-cluster writers.
func (*ClusterAwareParquetWriter) SetClusterConfig ¶
func (cw *ClusterAwareParquetWriter) SetClusterConfig(name string, cfg *schema.Cluster)
SetClusterConfig stores a cluster configuration to be written as cluster.json on Close.
type FileParquetSource ¶
type FileParquetSource struct {
// contains filtered or unexported fields
}
FileParquetSource reads parquet archives from a local filesystem directory.
func NewFileParquetSource ¶
func NewFileParquetSource(path string) *FileParquetSource
func (*FileParquetSource) GetClusters ¶
func (fs *FileParquetSource) GetClusters() ([]string, error)
func (*FileParquetSource) ListParquetFiles ¶
func (fs *FileParquetSource) ListParquetFiles(cluster string) ([]string, error)
func (*FileParquetSource) ReadClusterConfig ¶
func (fs *FileParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error)
type FileTarget ¶
type FileTarget struct {
// contains filtered or unexported fields
}
FileTarget writes parquet files to a local filesystem directory.
func NewFileTarget ¶
func NewFileTarget(path string) (*FileTarget, error)
type NodeStateParquetWriter ¶
type NodeStateParquetWriter struct {
// contains filtered or unexported fields
}
NodeStateParquetWriter batches ParquetNodeStateRows and flushes them to a target when the estimated size exceeds maxSizeBytes.
func NewNodeStateParquetWriter ¶
func NewNodeStateParquetWriter(target ParquetTarget, maxSizeMB int) *NodeStateParquetWriter
NewNodeStateParquetWriter creates a new writer for node state parquet files.
func (*NodeStateParquetWriter) AddRow ¶
func (pw *NodeStateParquetWriter) AddRow(row ParquetNodeStateRow) error
AddRow adds a row to the current batch. If the estimated batch size exceeds the configured maximum, the batch is flushed first.
func (*NodeStateParquetWriter) Close ¶
func (pw *NodeStateParquetWriter) Close() error
Close flushes any remaining rows and finalizes the writer.
func (*NodeStateParquetWriter) Flush ¶
func (pw *NodeStateParquetWriter) Flush() error
Flush writes the current batch to a parquet file on the target.
type ParquetJobRow ¶
type ParquetJobRow struct {
JobID int64 `parquet:"job_id"`
Cluster string `parquet:"cluster"`
SubCluster string `parquet:"sub_cluster"`
Partition string `parquet:"partition,optional"`
Project string `parquet:"project"`
User string `parquet:"user"`
State string `parquet:"job_state"`
StartTime int64 `parquet:"start_time"`
Duration int32 `parquet:"duration"`
Walltime int64 `parquet:"walltime"`
NumNodes int32 `parquet:"num_nodes"`
NumHWThreads int32 `parquet:"num_hwthreads"`
NumAcc int32 `parquet:"num_acc"`
Exclusive int32 `parquet:"exclusive"`
Energy float64 `parquet:"energy"`
SMT int32 `parquet:"smt"`
ResourcesJSON []byte `parquet:"resources_json"`
StatisticsJSON []byte `parquet:"statistics_json,optional"`
TagsJSON []byte `parquet:"tags_json,optional"`
MetaDataJSON []byte `parquet:"meta_data_json,optional"`
FootprintJSON []byte `parquet:"footprint_json,optional"`
EnergyFootJSON []byte `parquet:"energy_footprint_json,optional"`
MetricDataGz []byte `parquet:"metric_data_gz"`
}
func JobToParquetRow ¶
JobToParquetRow converts job metadata and metric data into a flat ParquetJobRow. Nested fields are marshaled to JSON; metric data is gzip-compressed JSON.
func ReadParquetFile ¶
func ReadParquetFile(data []byte) ([]ParquetJobRow, error)
ReadParquetFile reads all ParquetJobRow entries from parquet-encoded bytes.
type ParquetNodeStateRow ¶
type ParquetNodeStateRow struct {
TimeStamp int64 `parquet:"time_stamp"`
NodeState string `parquet:"node_state"`
HealthState string `parquet:"health_state"`
HealthMetrics string `parquet:"health_metrics,optional"`
CpusAllocated int32 `parquet:"cpus_allocated"`
MemoryAllocated int64 `parquet:"memory_allocated"`
GpusAllocated int32 `parquet:"gpus_allocated"`
JobsRunning int32 `parquet:"jobs_running"`
Hostname string `parquet:"hostname"`
Cluster string `parquet:"cluster"`
SubCluster string `parquet:"subcluster"`
}
type ParquetSource ¶
type ParquetSource interface {
GetClusters() ([]string, error)
ListParquetFiles(cluster string) ([]string, error)
ReadFile(path string) ([]byte, error)
ReadClusterConfig(cluster string) (*schema.Cluster, error)
}
ParquetSource abstracts reading parquet archives from different storage backends.
type ParquetTarget ¶
ParquetTarget abstracts the destination for parquet file writes.
type ParquetWriter ¶
type ParquetWriter struct {
// contains filtered or unexported fields
}
ParquetWriter batches ParquetJobRows and flushes them to a target when the estimated size exceeds maxSizeBytes.
func NewParquetWriter ¶
func NewParquetWriter(target ParquetTarget, maxSizeMB int) *ParquetWriter
NewParquetWriter creates a new writer that flushes batches to the given target. maxSizeMB sets the approximate maximum size per parquet file in megabytes.
func (*ParquetWriter) AddJob ¶
func (pw *ParquetWriter) AddJob(row ParquetJobRow) error
AddJob adds a row to the current batch. If the estimated batch size exceeds the configured maximum, the batch is flushed to the target first.
func (*ParquetWriter) Close ¶
func (pw *ParquetWriter) Close() error
Close flushes any remaining rows and finalizes the writer.
func (*ParquetWriter) Flush ¶
func (pw *ParquetWriter) Flush() error
Flush writes the current batch to a parquet file on the target.
type S3ParquetSource ¶
type S3ParquetSource struct {
// contains filtered or unexported fields
}
S3ParquetSource reads parquet archives from an S3-compatible object store.
func NewS3ParquetSource ¶
func NewS3ParquetSource(cfg S3TargetConfig) (*S3ParquetSource, error)
func (*S3ParquetSource) GetClusters ¶
func (ss *S3ParquetSource) GetClusters() ([]string, error)
func (*S3ParquetSource) ListParquetFiles ¶
func (ss *S3ParquetSource) ListParquetFiles(cluster string) ([]string, error)
func (*S3ParquetSource) ReadClusterConfig ¶
func (ss *S3ParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error)
type S3Target ¶
type S3Target struct {
// contains filtered or unexported fields
}
S3Target writes parquet files to an S3-compatible object store.
func NewS3Target ¶
func NewS3Target(cfg S3TargetConfig) (*S3Target, error)