Documentation
¶
Overview ¶
Package server_utils shares utility functions used across multiple server packages (origin, cache, registry, director).
It should only import lower level packages (config, param, etc), or server_structs package. It should never import any server packages (origin, cache, registry, director) or upper level packages (launcher_utils, cmd, etc).
For structs used across multiple server packages, put them in common package instead
Index ¶
- Constants
- Variables
- func CheckOriginSentinelLocations(exports []OriginExport) (ok bool, err error)
- func CreateFedTok(ctx context.Context, server server_structs.XRootDServer) (tok string, err error)
- func FilterTopLevelPrefixes(nsAds []server_structs.NamespaceAdV2) []server_structs.NamespaceAdV2
- func GetAdvertisementTok(server server_structs.XRootDServer, directorUrl string) (tok string, err error)
- func GetDirectorAds() []server_structs.DirectorAd
- func GetJWKSFromIssUrl(issuer string) (*jwk.Set, error)
- func GetJWKSURLFromIssuerURL(issuerUrl string) (string, error)
- func GetNSIssuerURL(prefix string) (string, error)
- func GetServerMetadata(ctx context.Context, server server_structs.ServerType) (metadata server_structs.ServerRegistration, err error)
- func GetTopologyJSON(ctx context.Context) (*server_structs.TopologyNamespacesJSON, error)
- func HandleDirectorTestResponse(ctx *gin.Context, nChan chan bool)
- func InitSQLiteDB(dbPath string) (*gorm.DB, error)
- func IsDirectorAdFromSelf(ctx context.Context, ad server_structs.ServerBaseAdInterface) (bool, error)
- func IssuerUrlsHookFunc() mapstructure.DecodeHookFuncType
- func LaunchConcurrencyMonitoring(ctx context.Context, egrp *errgroup.Group, sType server_structs.ServerType)
- func LaunchPeriodicDirectorDiscovery(ctx context.Context, isDirector bool) error
- func LaunchPeriodicDirectorTimeout(ctx context.Context, egrp *errgroup.Group, nChan chan bool)
- func LaunchWatcherMaintenance(ctx context.Context, dirPaths []string, description string, ...)
- func MigrateDB(sqldb *sql.DB, migrationFS embed.FS) error
- func OriginExportsDecoderHook() mapstructure.DecodeHookFunc
- func QueryMyPrometheus(ctx context.Context, query string) (promParsed promQLParsed, err error)
- func RegisterOIDCAPI(engine *gin.RouterGroup, isDirector bool)
- func RegisterXrootdReset(fn func())
- func RemoveTrailingSlash(prefix string) string
- func ResetOriginExports()
- func ResetTestState()
- func SetFedTok(ctx context.Context, server server_structs.XRootDServer, tok string) error
- func ShutdownDB(db *gorm.DB) error
- func StringListToCapsHookFunc() mapstructure.DecodeHookFuncType
- func WaitUntilWorking(ctx context.Context, method, reqUrl, server string, expectedStatus int, ...) error
- type BaseOrigin
- type ContextKey
- type GlobusOrigin
- type HTTPSOrigin
- type Origin
- type OriginExport
- type PosixOrigin
- type S3Origin
- type TestFileTransfer
- type TestFileTransferImpl
- type TestType
- type XRootOrigin
Constants ¶
const ( SelfTestBody string = "This object was created by the Pelican self-test functionality" DirectorTestBody string = "This object was created by the Pelican director-test functionality" )
const MonitoringBaseNs string = "/pelican/monitoring" // The base namespace for monitoring objects
Variables ¶
var ( ErrInvalidOriginConfig = errors.New("invalid origin configuration") WarnExportVolumes string = "Using ExportVolumes from the command line (-v), from env vars or in your config causes Pelican to ignore exports " + "configured via 'Origin.Exports' configuration. " + "However, namespaces exported this way will inherit the Origin.Enable* settings from your configuration file. " + "For finer-grained control of each export, please configure them in your pelican.yaml file via 'Origin.Exports'" )
Functions ¶
func CheckOriginSentinelLocations ¶
func CheckOriginSentinelLocations(exports []OriginExport) (ok bool, err error)
Check the sentinel files from Origin.Exports
func CreateFedTok ¶
func CreateFedTok(ctx context.Context, server server_structs.XRootDServer) (tok string, err error)
GetFedTok retrieves a federation token from the Director, which can be passed to other federation services as proof of federation membership.
func FilterTopLevelPrefixes ¶
func FilterTopLevelPrefixes(nsAds []server_structs.NamespaceAdV2) []server_structs.NamespaceAdV2
Given a slice of NamespaceAdV2 objects, return a slice of unique top-level prefixes.
For example, given:
- /foo
- /foo/bar
- /foo/bar/baz
- /goo
- /some/path
the function should return /foo, /goo, and /some/path.
func GetAdvertisementTok ¶
func GetAdvertisementTok(server server_structs.XRootDServer, directorUrl string) (tok string, err error)
Get an advertisement token for the given server. Advertisement tokens are signed by the server and passed to the Director, which can then use them to check the server's identity. Tokens are valid when the Director can query the public key for the given server from the Registry.
func GetDirectorAds ¶
func GetDirectorAds() []server_structs.DirectorAd
Return a list of known director ads
func GetJWKSFromIssUrl ¶
Given an issuer URL, get the JWKS from the issuer's JWKS URL
func GetJWKSURLFromIssuerURL ¶
Given an issuer url, lookup the JWKS URL from the openid-configuration For example, if the issuer URL is https://registry.com:8446/api/v1.0/registry/test-namespace, this function will return the key indicated by the openid-configuration JSON hosted at https://registry.com:8446/api/v1.0/registry/test-namespace/.well-known/openid-configuration.
func GetNSIssuerURL ¶
For a given prefix, get the prefix's issuer URL, where we consider that the openid endpoint we use to look up a key location. Note that this is NOT the same as the issuer key -- to find that, follow openid-style discovery using the issuer URL as a base.
func GetServerMetadata ¶
func GetServerMetadata(ctx context.Context, server server_structs.ServerType) (metadata server_structs.ServerRegistration, err error)
Centralized code for determining the metadata of the server.
This function is called by a server to look up its own metadata from the registry using local configuration parameters. It cannot query metadata for other servers.
The server's name should be unique and human-friendly: for example, "UW_OSDF_CACHE". It will be registered at the registry to ensure uniqueness within the federation.
There are improvements to do here: once registered, the server should serialize the name. It should also be for the service itself, not specific to the "origin" or "cache" component.
In the current implementation, if the origin component is enabled, we always look up the registered "server name" for the hostname in the registry under /origins; otherwise, we look it up under /caches.
func GetTopologyJSON ¶
func GetTopologyJSON(ctx context.Context) (*server_structs.TopologyNamespacesJSON, error)
GetTopologyJSON returns the namespaces and caches from OSDF topology
func HandleDirectorTestResponse ¶
The director periodically uploads/downloads files to/from all online origins for testing. It sends a request reporting the status of the test result to this endpoint, and we will update origin internal health status metric by what director returns.
func IsDirectorAdFromSelf ¶
func IsDirectorAdFromSelf(ctx context.Context, ad server_structs.ServerBaseAdInterface) (bool, error)
Returns `true` if the provided ad was generated by the current process.
We define "self" to be any ad with our name and instance ID.
func IssuerUrlsHookFunc ¶
func IssuerUrlsHookFunc() mapstructure.DecodeHookFuncType
Decode the issuerUrls field in the Origin.Exports block
func LaunchConcurrencyMonitoring ¶
func LaunchConcurrencyMonitoring(ctx context.Context, egrp *errgroup.Group, sType server_structs.ServerType)
Launch the origin/cache's concurrency monitoring routine
The routine periodically scrapes the servers own prometheus endpoint to gather information about the IO concurrency it's seen over the last period. This is used to set a health status that gets reported to the Director, which can help inform the Director whether it needs to cool down redirects to the server.
func LaunchPeriodicDirectorDiscovery ¶
Launch goroutine that periodically discovers all the known directors in a federation.
func LaunchPeriodicDirectorTimeout ¶
Launch a go routine in errorgroup to report timeout if director-based health test response was not sent within the defined time limit
func LaunchWatcherMaintenance ¶
func LaunchWatcherMaintenance(ctx context.Context, dirPaths []string, description string, sleepTime time.Duration, maintenanceFunc func(notifyEvent bool) error)
Launch a maintenance goroutine. The maintenance routine will watch the directory `dirPath`, invoking `maintenanceFunc` whenever an event occurs in the directory. Note the behavior of directory watching differs across platforms; for example, an atomic rename might be one or two events for the destination file depending on Mac OS X or Linux.
Even if the filesystem watcher fails, this will invoke `maintenanceFunc` every `sleepTime` duration. The maintenance function will be called with `true` if invoked due to a directory change, false otherwise When generating error messages, `description` will be used to describe the task.
func MigrateDB ¶
Update database schema with the embedded migration files
The embedded migration files need to be under "/migrations" folder
func OriginExportsDecoderHook ¶
func OriginExportsDecoderHook() mapstructure.DecodeHookFunc
A small convenience function for composing all the relevant decoder hooks needed to work with Origin.Exports config
func QueryMyPrometheus ¶
Query the Prometheus PromQL endpoint on the director server at /api/v1.0/prometheus/query?query=
where the only arg is the query to execute, without "?query="
Example: queryPromtheus("up") // Get metric of the running Prometheus instances
func RegisterOIDCAPI ¶
func RegisterOIDCAPI(engine *gin.RouterGroup, isDirector bool)
func RegisterXrootdReset ¶
func RegisterXrootdReset(fn func())
RegisterXrootdReset allows the xrootd package to provide a reset hook without introducing import cycles.
func RemoveTrailingSlash ¶
Remove the trailing '/' in the prefix, if it exists. If the prefix is '/', leave it alone.
func ResetOriginExports ¶
func ResetOriginExports()
func ResetTestState ¶
func ResetTestState()
Reset the testing state, including: 1. viper settings, 2. preferred prefix, 3. transport object, 4. Federation metadata, 5. origin exports
func SetFedTok ¶
func SetFedTok(ctx context.Context, server server_structs.XRootDServer, tok string) error
SetFedTok does an atomic write of a federation token to the server's token location.
func ShutdownDB ¶
func StringListToCapsHookFunc ¶
func StringListToCapsHookFunc() mapstructure.DecodeHookFuncType
A decoder hook we can pass to viper.Unmarshal to convert a list of strings to a struct with boolean fields. In this case, we're converting a string slice (flow) from yaml:
Exports: Capabilities: ["PublicReads", "Writes"]
to a struct like:
ExportCapabilities{
PublicReads: true,
Writes: true,
Listings: false,
}
Here's a helpful tutorial on how to write these: https://sagikazarmark.hu/blog/decoding-custom-formats-with-viper/
func WaitUntilWorking ¶
func WaitUntilWorking(ctx context.Context, method, reqUrl, server string, expectedStatus int, statusMismatch bool) error
Wait until given `reqUrl` returns the expected status. Logging messages emitted will refer to `server` (e.g., origin, cache, director) The `statusMismatch` param tells the probe not to fail immediately when a bad code is returned, useful when the probed endpoint may be able to respond before it's fully initialized.
Types ¶
type BaseOrigin ¶
type BaseOrigin struct {
Exports []OriginExport
// contains filtered or unexported fields
}
Base origin struct that all other origin types will inherit from
func (*BaseOrigin) Type ¶
func (b *BaseOrigin) Type(o Origin) server_structs.OriginStorageType
type ContextKey ¶
type ContextKey string
const ( // Context value key; used to store a second context that will // indicate the director discovery should stop. // // Meant mostly for unit tests; director discovery is essential // functionality. DirectorDiscoveryShutdownKey ContextKey = "discovery_shutdown" )
type GlobusOrigin ¶
type GlobusOrigin struct {
BaseOrigin
}
Inherit from the base origin
func (*GlobusOrigin) Type ¶
func (o *GlobusOrigin) Type(_ Origin) server_structs.OriginStorageType
type HTTPSOrigin ¶
type HTTPSOrigin struct {
BaseOrigin
}
Inherit from the base origin
func (*HTTPSOrigin) Type ¶
func (o *HTTPSOrigin) Type(_ Origin) server_structs.OriginStorageType
type Origin ¶
type Origin interface {
Type(Origin) server_structs.OriginStorageType
// contains filtered or unexported methods
}
type OriginExport ¶
type OriginExport struct {
StoragePrefix string `json:"storagePrefix"`
FederationPrefix string `json:"federationPrefix"`
IssuerUrls []string `json:"issuerUrls"`
// Export fields specific to S3 backend. Other things like
// S3ServiceUrl, S3Region, etc are kept top-level in the config
S3Bucket string `json:"s3Bucket,omitempty"`
S3AccessKeyfile string `json:"s3AccessKeyfile,omitempty"`
S3SecretKeyfile string `json:"s3SecretKeyfile,omitempty"`
// Export fields specific to Globus backend
GlobusCollectionID string `json:"globusCollectionID,omitempty"`
GlobusCollectionName string `json:"globusCollectionName,omitempty"`
// Capabilities for the export
Capabilities server_structs.Capabilities `json:"capabilities"`
SentinelLocation string `json:"sentinelLocation"`
}
TODO: pull storage-specific fields into a separate struct and mixin
func GetOriginExports ¶
func GetOriginExports() ([]OriginExport, error)
GetOriginExports is the one-stop shop for parsing/configuring origin exports. It should only touch the yaml the first time it's called, and then return the in-memory value on subsequent calls.
type PosixOrigin ¶
type PosixOrigin struct {
BaseOrigin
}
Inherit from the base origin
func (*PosixOrigin) Type ¶
func (o *PosixOrigin) Type(_ Origin) server_structs.OriginStorageType
type S3Origin ¶
type S3Origin struct {
BaseOrigin
}
Inherit from the base origin
func (*S3Origin) Type ¶
func (o *S3Origin) Type(_ Origin) server_structs.OriginStorageType
type TestFileTransfer ¶
type TestFileTransferImpl ¶
type TestFileTransferImpl struct {
// contains filtered or unexported fields
}
func (TestFileTransferImpl) RunTests ¶
func (t TestFileTransferImpl) RunTests(ctx context.Context, baseUrl, audienceUrl, issuerUrl string, testType TestType) (bool, error)
Run a file transfer test suite with upload/download/delete a test file from the server and a xrootd service. It expects `baseUrl` to be the url to the xrootd endpoint, `issuerUrl` be the url to issue scitoken for file transfer, and the test file content/name be based on `testType`
Note that for this test to work, you need to have the `issuerUrl` registered in your xrootd as a list of trusted token issuers and the issuer is expected to follow WLCG rules for issuer metadata discovery and public key access
Read more: https://github.com/WLCG-AuthZ-WG/common-jwt-profile/blob/master/profile.md#token-verification
func (TestFileTransferImpl) TestCacheDownload ¶
func (t TestFileTransferImpl) TestCacheDownload(ctx context.Context, cacheUrl, issuerUrl string, filePath string, body string) (bool, error)
Run a file transfer test to download a test file from the server and a xrootd service. It expects `cacheUrl` to be the url to the xrootd cache, `issuerUrl` be the url to issue a scitoken for file transfer, `filePath“ to be the namespace and file name of the test file, and the test file to contain the string `body`
Note that for this test to work, you need to have the `issuerUrl` registered in your xrootd as a list of trusted token issuers and the issuer is expected to follow WLCG rules for issuer metadata discovery and public key access
Read more: https://github.com/WLCG-AuthZ-WG/common-jwt-profile/blob/master/profile.md#token-verification
type XRootOrigin ¶
type XRootOrigin struct {
BaseOrigin
}
Inherit from the base origin
func (*XRootOrigin) Type ¶
func (o *XRootOrigin) Type(_ Origin) server_structs.OriginStorageType