parquet

package
v1.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 13, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ParquetRowToJob

func ParquetRowToJob(row *ParquetJobRow) (*schema.Job, *schema.JobData, error)

ParquetRowToJob converts a ParquetJobRow back into job metadata and metric data. This is the reverse of JobToParquetRow.

Types

type ClusterAwareParquetWriter

type ClusterAwareParquetWriter struct {
	// contains filtered or unexported fields
}

ClusterAwareParquetWriter organizes Parquet output by cluster. Each cluster gets its own subdirectory with a cluster.json config file.

func NewClusterAwareParquetWriter

func NewClusterAwareParquetWriter(target ParquetTarget, maxSizeMB int) *ClusterAwareParquetWriter

NewClusterAwareParquetWriter creates a writer that routes jobs to per-cluster ParquetWriters.

func (*ClusterAwareParquetWriter) AddJob

AddJob routes the job row to the appropriate per-cluster writer.

func (*ClusterAwareParquetWriter) Close

func (cw *ClusterAwareParquetWriter) Close() error

Close writes cluster.json files and flushes all per-cluster writers.

func (*ClusterAwareParquetWriter) SetClusterConfig

func (cw *ClusterAwareParquetWriter) SetClusterConfig(name string, cfg *schema.Cluster)

SetClusterConfig stores a cluster configuration to be written as cluster.json on Close.

type FileParquetSource

type FileParquetSource struct {
	// contains filtered or unexported fields
}

FileParquetSource reads parquet archives from a local filesystem directory.

func NewFileParquetSource

func NewFileParquetSource(path string) *FileParquetSource

func (*FileParquetSource) GetClusters

func (fs *FileParquetSource) GetClusters() ([]string, error)

func (*FileParquetSource) ListParquetFiles

func (fs *FileParquetSource) ListParquetFiles(cluster string) ([]string, error)

func (*FileParquetSource) ReadClusterConfig

func (fs *FileParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error)

func (*FileParquetSource) ReadFile

func (fs *FileParquetSource) ReadFile(path string) ([]byte, error)

type FileTarget

type FileTarget struct {
	// contains filtered or unexported fields
}

FileTarget writes parquet files to a local filesystem directory.

func NewFileTarget

func NewFileTarget(path string) (*FileTarget, error)

func (*FileTarget) WriteFile

func (ft *FileTarget) WriteFile(name string, data []byte) error

type NodeStateParquetWriter

type NodeStateParquetWriter struct {
	// contains filtered or unexported fields
}

NodeStateParquetWriter batches ParquetNodeStateRows and flushes them to a target when the estimated size exceeds maxSizeBytes.

func NewNodeStateParquetWriter

func NewNodeStateParquetWriter(target ParquetTarget, maxSizeMB int) *NodeStateParquetWriter

NewNodeStateParquetWriter creates a new writer for node state parquet files.

func (*NodeStateParquetWriter) AddRow

AddRow adds a row to the current batch. If the estimated batch size exceeds the configured maximum, the batch is flushed first.

func (*NodeStateParquetWriter) Close

func (pw *NodeStateParquetWriter) Close() error

Close flushes any remaining rows and finalizes the writer.

func (*NodeStateParquetWriter) Flush

func (pw *NodeStateParquetWriter) Flush() error

Flush writes the current batch to a parquet file on the target.

type ParquetJobRow

type ParquetJobRow struct {
	JobID          int64   `parquet:"job_id"`
	Cluster        string  `parquet:"cluster"`
	SubCluster     string  `parquet:"sub_cluster"`
	Partition      string  `parquet:"partition,optional"`
	Project        string  `parquet:"project"`
	User           string  `parquet:"user"`
	State          string  `parquet:"job_state"`
	StartTime      int64   `parquet:"start_time"`
	Duration       int32   `parquet:"duration"`
	Walltime       int64   `parquet:"walltime"`
	NumNodes       int32   `parquet:"num_nodes"`
	NumHWThreads   int32   `parquet:"num_hwthreads"`
	NumAcc         int32   `parquet:"num_acc"`
	Exclusive      int32   `parquet:"exclusive"`
	Energy         float64 `parquet:"energy"`
	SMT            int32   `parquet:"smt"`
	ResourcesJSON  []byte  `parquet:"resources_json"`
	StatisticsJSON []byte  `parquet:"statistics_json,optional"`
	TagsJSON       []byte  `parquet:"tags_json,optional"`
	MetaDataJSON   []byte  `parquet:"meta_data_json,optional"`
	FootprintJSON  []byte  `parquet:"footprint_json,optional"`
	EnergyFootJSON []byte  `parquet:"energy_footprint_json,optional"`
	MetricDataGz   []byte  `parquet:"metric_data_gz"`
}

func JobToParquetRow

func JobToParquetRow(meta *schema.Job, data *schema.JobData) (*ParquetJobRow, error)

JobToParquetRow converts job metadata and metric data into a flat ParquetJobRow. Nested fields are marshaled to JSON; metric data is gzip-compressed JSON.

func ReadParquetFile

func ReadParquetFile(data []byte) ([]ParquetJobRow, error)

ReadParquetFile reads all ParquetJobRow entries from parquet-encoded bytes.

type ParquetNodeStateRow

type ParquetNodeStateRow struct {
	TimeStamp       int64  `parquet:"time_stamp"`
	NodeState       string `parquet:"node_state"`
	HealthState     string `parquet:"health_state"`
	HealthMetrics   string `parquet:"health_metrics,optional"`
	CpusAllocated   int32  `parquet:"cpus_allocated"`
	MemoryAllocated int64  `parquet:"memory_allocated"`
	GpusAllocated   int32  `parquet:"gpus_allocated"`
	JobsRunning     int32  `parquet:"jobs_running"`
	Hostname        string `parquet:"hostname"`
	Cluster         string `parquet:"cluster"`
	SubCluster      string `parquet:"subcluster"`
}

type ParquetSource

type ParquetSource interface {
	GetClusters() ([]string, error)
	ListParquetFiles(cluster string) ([]string, error)
	ReadFile(path string) ([]byte, error)
	ReadClusterConfig(cluster string) (*schema.Cluster, error)
}

ParquetSource abstracts reading parquet archives from different storage backends.

type ParquetTarget

type ParquetTarget interface {
	WriteFile(name string, data []byte) error
}

ParquetTarget abstracts the destination for parquet file writes.

type ParquetWriter

type ParquetWriter struct {
	// contains filtered or unexported fields
}

ParquetWriter batches ParquetJobRows and flushes them to a target when the estimated size exceeds maxSizeBytes.

func NewParquetWriter

func NewParquetWriter(target ParquetTarget, maxSizeMB int) *ParquetWriter

NewParquetWriter creates a new writer that flushes batches to the given target. maxSizeMB sets the approximate maximum size per parquet file in megabytes.

func (*ParquetWriter) AddJob

func (pw *ParquetWriter) AddJob(row ParquetJobRow) error

AddJob adds a row to the current batch. If the estimated batch size exceeds the configured maximum, the batch is flushed to the target first.

func (*ParquetWriter) Close

func (pw *ParquetWriter) Close() error

Close flushes any remaining rows and finalizes the writer.

func (*ParquetWriter) Flush

func (pw *ParquetWriter) Flush() error

Flush writes the current batch to a parquet file on the target.

type S3ParquetSource

type S3ParquetSource struct {
	// contains filtered or unexported fields
}

S3ParquetSource reads parquet archives from an S3-compatible object store.

func NewS3ParquetSource

func NewS3ParquetSource(cfg S3TargetConfig) (*S3ParquetSource, error)

func (*S3ParquetSource) GetClusters

func (ss *S3ParquetSource) GetClusters() ([]string, error)

func (*S3ParquetSource) ListParquetFiles

func (ss *S3ParquetSource) ListParquetFiles(cluster string) ([]string, error)

func (*S3ParquetSource) ReadClusterConfig

func (ss *S3ParquetSource) ReadClusterConfig(cluster string) (*schema.Cluster, error)

func (*S3ParquetSource) ReadFile

func (ss *S3ParquetSource) ReadFile(path string) ([]byte, error)

type S3Target

type S3Target struct {
	// contains filtered or unexported fields
}

S3Target writes parquet files to an S3-compatible object store.

func NewS3Target

func NewS3Target(cfg S3TargetConfig) (*S3Target, error)

func (*S3Target) WriteFile

func (st *S3Target) WriteFile(name string, data []byte) error

type S3TargetConfig

type S3TargetConfig struct {
	Endpoint     string
	Bucket       string
	AccessKey    string
	SecretKey    string
	Region       string
	UsePathStyle bool
}

S3TargetConfig holds the configuration for an S3 parquet target.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL