Documentation
¶
Overview ¶
Package storage defines extensible storage interface. This package registers "storage" config section that maps to Config struct. Use NewDataStore(cfg) to initialize a DataStore with the provided config. The package provides default implementations to access local, S3 (and minio), In-Memory, and Redis storage. Use NewCompositeDataStore to swap any portion of the DataStore interface with an external implementation (e.g. a cached protobuf store). Blob-store access is provided by the extensible "stow" library; use NewStowRawStore(cfg) to create a Raw store based on any other stow-supported config (e.g. Azure Blob Storage).
Index ¶
- Constants
- Variables
- func IsCursorEnd(cursor Cursor) bool
- func IsExceedsLimit(err error) bool
- func IsExists(err error) bool
- func IsFailedWriteToCache(err error) bool
- func IsNotFound(err error) bool
- func MapStrings(mapper func(string) string, strings ...string) []string
- func MergeMaps(dst map[string]string, src ...map[string]string)
- func RegisterStowKind(kind string, f func(string) DataReference) error
- type CachingConfig
- type ComposedProtobufStore
- type Config
- type ConnectionConfig
- type Cursor
- type CursorState
- type DataReference
- type DataStore
- type DefaultProtobufStore
- type HTTPClientConfig
- type InMemoryStore
- func (s *InMemoryStore) Clear(ctx context.Context) error
- func (c InMemoryStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
- func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
- func (s *InMemoryStore) Delete(ctx context.Context, reference DataReference) error
- func (s *InMemoryStore) GetBaseContainerFQN(ctx context.Context) DataReference
- func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
- func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
- func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
- func (s *InMemoryStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, ...) (err error)
- type LimitsConfig
- type MemoryMetadata
- type Metadata
- type Options
- type ProtobufStore
- type RawStore
- type RedisConfig
- type RedisStore
- func (c RedisStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
- func (s *RedisStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
- func (s *RedisStore) Delete(ctx context.Context, reference DataReference) error
- func (s *RedisStore) GetBaseContainerFQN(ctx context.Context) DataReference
- func (s *RedisStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
- func (s *RedisStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
- func (s *RedisStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
- func (s *RedisStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, ...) error
- type ReferenceConstructor
- type SignedURLConfig
- type SignedURLProperties
- type SignedURLResponse
- type StowConfig
- type StowMetadata
- type StowStore
- func (c StowStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
- func (s *StowStore) CreateContainer(ctx context.Context, container string) (stow.Container, error)
- func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
- func (s *StowStore) Delete(ctx context.Context, reference DataReference) error
- func (s *StowStore) GetBaseContainerFQN(ctx context.Context) DataReference
- func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
- func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
- func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error)
- func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
- func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, ...) error
- type Type
- type URLPathConstructor
Examples ¶
Constants ¶
const ( KiB int64 = 1024 MiB int64 = 1024 * KiB )
const FailureTypeLabel contextutils.Key = "failure_type"
const FlyteContentMD5 = "flyteContentMD5"
Variables ¶
var ( ErrExceedsLimit stdErrs.ErrorCode = "LIMIT_EXCEEDED" ErrFailedToWriteCache stdErrs.ErrorCode = "CACHE_WRITE_FAILED" )
var (
ConfigSection = config.MustRegisterSection(configSectionKey, defaultConfig)
)
Functions ¶
func IsCursorEnd ¶
func IsExceedsLimit ¶
IsExceedsLimit gets a value indicating whether the root cause of error is a "limit exceeded" error.
func IsExists ¶
IsExists gets a value indicating whether the underlying error is "already exists" error.
func IsFailedWriteToCache ¶
func IsNotFound ¶
IsNotFound gets a value indicating whether the underlying error is a Not Found error.
func RegisterStowKind ¶
func RegisterStowKind(kind string, f func(string) DataReference) error
RegisterStowKind registers a new kind of stow store.
Types ¶
type CachingConfig ¶
type CachingConfig struct {
// Maximum size of the cache where the Blob store data is cached in-memory
// Refer to https://github.com/coocood/freecache to understand how to set the value
// If not specified or set to 0, cache is not used
// NOTE: if Object sizes are larger than 1/1024 of the cache size, the entry will not be written to the cache
// Also refer to https://github.com/coocood/freecache/issues/17 to understand how to set the cache
MaxSizeMegabytes int `` /* 149-byte string literal not displayed */
// sets the garbage collection target percentage:
// a collection is triggered when the ratio of freshly allocated data
// to live data remaining after the previous collection reaches this percentage.
// refer to https://golang.org/pkg/runtime/debug/#SetGCPercent
// If not specified or set to 0, GC percent is not tweaked
TargetGCPercent int `json:"target_gc_percent" pflag:",Sets the garbage collection target percentage."`
}
type ComposedProtobufStore ¶
type ComposedProtobufStore interface {
RawStore
ProtobufStore
}
ComposedProtobufStore interface includes all the necessary data to allow a ProtobufStore to interact with storage through a RawStore.
type Config ¶
type Config struct {
Type Type `json:"type" pflag:",Sets the type of storage [s3/minio/local/mem/stow/redis]."`
// Deprecated: Please use StowConfig instead
Connection ConnectionConfig `json:"connection"`
Stow StowConfig `json:"stow,omitempty" pflag:",Storage config for stow backend."`
Redis RedisConfig `json:"redis,omitempty" pflag:"-,Storage config for the redis backend."`
// Container here is misleading, it refers to a Bucket (AWS S3) like blobstore entity. In some terms it could be a table
InitContainer string `json:"container" pflag:",Initial container (in s3 a bucket) to create -if it doesn't exist-.'"`
// By default if this is not enabled, multiple containers are not supported by the storage layer. Only the configured `container` InitContainer will be allowed to requests data from. But, if enabled then data will be loaded to written to any
// container specified in the DataReference.
MultiContainerEnabled bool `` /* 213-byte string literal not displayed */
// Caching is recommended to improve the performance of underlying systems. It caches the metadata and resolving
// inputs is accelerated. The size of the cache is large so understand how to configure the cache.
// TODO provide some default config choices
// If this section is skipped, Caching is disabled
Cache CachingConfig `json:"cache"`
Limits LimitsConfig `json:"limits" pflag:",Sets limits for stores."`
DefaultHTTPClient HTTPClientConfig `json:"defaultHttpClient" pflag:",Sets the default http client config."`
SignedURL SignedURLConfig `json:"signedUrl" pflag:",Sets config for SignedURL."`
}
Config is a common storage config.
type ConnectionConfig ¶
type ConnectionConfig struct {
Endpoint config.URL `json:"endpoint" pflag:",URL for storage client to connect to."`
AuthType string `json:"auth-type" pflag:",Auth Type to use [iam,accesskey]."`
AccessKey string `json:"access-key" pflag:",Access key to use. Only required when authtype is set to accesskey."`
SecretKey string `json:"secret-key" pflag:",Secret to use when accesskey is set."`
Region string `json:"region" pflag:",Region to connect to."`
DisableSSL bool `json:"disable-ssl" pflag:",Disables SSL connection. Should only be used for development."`
}
ConnectionConfig defines connection configurations.
type Cursor ¶
type Cursor struct {
// contains filtered or unexported fields
}
func NewCursorAtEnd ¶
func NewCursorAtEnd() Cursor
func NewCursorAtStart ¶
func NewCursorAtStart() Cursor
type CursorState ¶
type CursorState int
const ( // Enum representing state of the cursor AtStartCursorState CursorState = 0 AtEndCursorState CursorState = 1 AtCustomPosCursorState CursorState = 2 )
type DataReference ¶
type DataReference string
DataReference defines a reference to data location.
func NewDataReference ¶
func NewDataReference(scheme string, container string, key string) DataReference
func (DataReference) Split ¶
func (r DataReference) Split() (scheme, container, key string, err error)
Split splits the data reference into parts.
func (DataReference) String ¶
func (r DataReference) String() string
type DataStore ¶
type DataStore struct {
ComposedProtobufStore
ReferenceConstructor
// contains filtered or unexported fields
}
DataStore is a simplified interface for accessing and storing data in one of the Cloud stores. Today we rely on Stow for multi-cloud support, but this interface abstracts that part
func NewCompositeDataStore ¶
func NewCompositeDataStore(refConstructor ReferenceConstructor, composedProtobufStore ComposedProtobufStore) *DataStore
NewCompositeDataStore composes a new DataStore.
func NewDataStore ¶
NewDataStore creates a new Data Store with the supplied config.
Example ¶
testScope := promutils.NewTestScope()
ctx := context.Background()
fmt.Println("Creating in memory data store.")
store, err := NewDataStore(&Config{
Type: TypeMemory,
}, testScope.NewSubScope("exp_new"))
if err != nil {
fmt.Printf("Failed to create data store. Error: %v", err)
}
ref, err := store.ConstructReference(ctx, DataReference("root"), "subkey", "subkey2")
if err != nil {
fmt.Printf("Failed to construct data reference. Error: %v", err)
}
fmt.Printf("Constructed data reference [%v] and writing data to it.\n", ref)
dataToStore := "hello world"
err = store.WriteRaw(ctx, ref, int64(len(dataToStore)), Options{}, strings.NewReader(dataToStore))
if err != nil {
fmt.Printf("Failed to write data. Error: %v", err)
}
Output: Creating in memory data store. Constructed data reference [/root/subkey/subkey2] and writing data to it.
type DefaultProtobufStore ¶
type DefaultProtobufStore struct {
RawStore
// contains filtered or unexported fields
}
Implements ProtobufStore to marshal and unmarshal protobufs to/from a RawStore
func NewDefaultProtobufStore ¶
func NewDefaultProtobufStore(store RawStore, scope promutils.Scope) DefaultProtobufStore
func NewDefaultProtobufStoreWithMetrics ¶
func NewDefaultProtobufStoreWithMetrics(store RawStore, metrics *protoMetrics) DefaultProtobufStore
func (DefaultProtobufStore) ReadProtobuf ¶
func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error
func (DefaultProtobufStore) WriteProtobuf ¶
func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error
type HTTPClientConfig ¶
type HTTPClientConfig struct {
Headers map[string][]string `json:"headers" pflag:"-,Sets http headers to set on the http client."`
Timeout config.Duration `json:"timeout" pflag:",Sets time out on the http client."`
// Zero values for the transport settings below fall back to the http.DefaultTransport values.
MaxIdleConns int `` /* 129-byte string literal not displayed */
MaxIdleConnsPerHost int `` /* 128-byte string literal not displayed */
MaxConnsPerHost int `json:"maxConnsPerHost" pflag:",Maximum number of connections per host; new requests block at the limit. Zero means no limit."`
IdleConnTimeout config.Duration `` /* 135-byte string literal not displayed */
}
HTTPClientConfig encapsulates common settings that can be applied to an HTTP Client.
type InMemoryStore ¶
type InMemoryStore struct {
// contains filtered or unexported fields
}
func (InMemoryStore) CopyRaw ¶
func (c InMemoryStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
CopyRaw is a naiive implementation for copy that reads all data locally then writes them to destination. TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy: https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectUsingREST.html
func (*InMemoryStore) CreateSignedURL ¶
func (s *InMemoryStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
CreateSignedURL creates a signed url with the provided properties.
func (*InMemoryStore) Delete ¶
func (s *InMemoryStore) Delete(ctx context.Context, reference DataReference) error
Delete removes the referenced data from the cache map.
func (*InMemoryStore) GetBaseContainerFQN ¶
func (s *InMemoryStore) GetBaseContainerFQN(ctx context.Context) DataReference
func (*InMemoryStore) Head ¶
func (s *InMemoryStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
func (*InMemoryStore) List ¶
func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
func (*InMemoryStore) ReadRaw ¶
func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
type LimitsConfig ¶
type LimitsConfig struct {
GetLimitMegabytes int64 `json:"maxDownloadMBs" pflag:",Maximum allowed download size (in MBs) per call."`
}
LimitsConfig specifies limits for storage package.
type MemoryMetadata ¶
type MemoryMetadata struct {
// contains filtered or unexported fields
}
func (MemoryMetadata) ContentMD5 ¶
func (m MemoryMetadata) ContentMD5() string
func (MemoryMetadata) Etag ¶
func (m MemoryMetadata) Etag() string
func (MemoryMetadata) Exists ¶
func (m MemoryMetadata) Exists() bool
func (MemoryMetadata) Size ¶
func (m MemoryMetadata) Size() int64
type Metadata ¶
type Metadata interface {
Exists() bool
Size() int64
Etag() string
// ContentMD5 retrieves the value of a special metadata tag added by the system that
// contains the MD5 of the uploaded file. If there is no metadata attached
// or that `FlyteContentMD5` key isn't set, ContentMD5 will return empty.
ContentMD5() string
}
Metadata is a placeholder for data reference metadata.
type Options ¶
type Options struct {
Metadata map[string]interface{}
}
Options holds storage options. It is used to pass Metadata (like headers for S3) and also tags or labels for objects
type ProtobufStore ¶
type ProtobufStore interface {
// ReadProtobuf retrieves the entire blob from blobstore and unmarshals it to the passed protobuf
ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error
// WriteProtobuf serializes and stores the protobuf.
WriteProtobuf(ctx context.Context, reference DataReference, opts Options, msg proto.Message) error
}
ProtobufStore defines an interface for reading and writing protobuf messages
type RawStore ¶
type RawStore interface {
// GetBaseContainerFQN returns a FQN DataReference with the configured base init container
GetBaseContainerFQN(ctx context.Context) DataReference
// CreateSignedURL creates a signed url with the provided properties.
CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
// Head gets metadata about the reference. This should generally be a light weight operation.
Head(ctx context.Context, reference DataReference) (Metadata, error)
// List gets a list of items (relative path to the reference input) given a prefix, using a paginated API
List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
// ReadRaw retrieves a byte array from the Blob store or an error
ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
// WriteRaw stores a raw byte array.
WriteRaw(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error
// CopyRaw copies from source to destination.
CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
// Delete removes the referenced data from the blob store.
Delete(ctx context.Context, reference DataReference) error
}
RawStore defines a low level interface for accessing and storing bytes.
func NewInMemoryRawStore ¶
type RedisConfig ¶ added in v2.0.24
type RedisConfig struct {
// Addr is the host:port of the redis server.
Addr string `json:"addr"`
// Username for redis ACL authentication (redis 6+), if required.
Username string `json:"username"`
// Password for redis authentication, if required.
Password string `json:"password"`
// DB is the logical database to select after connecting.
DB int `json:"db"`
}
RedisConfig defines the connection for a redis-backed raw store, selected with type: redis. Objects are stored as redis string values keyed by the path portion of the DataReference (redis://<addr>/<key>), which keeps references interoperable with the flyte-sdk redis plugin.
type RedisStore ¶ added in v2.0.24
type RedisStore struct {
// contains filtered or unexported fields
}
RedisStore implements RawStore on top of a Redis server. Each object is a single Redis string value whose key is the path portion of the DataReference, verbatim: redis://<addr>/<key>. Directories are emulated as key prefixes (a SCAN match on "<prefix>/*"). It is intended for metadata-sized objects (inputs/outputs protobufs); Redis caps a string value at 512MiB and keeps it in memory, so large raw data should stay in a blob store.
The configured redis.addr is authoritative for which server is contacted; the host portion of a reference is advisory. The same server is commonly reachable under different addresses from different network vantage points (in-cluster service DNS vs. host port mapping), so references written elsewhere may legitimately carry a host that differs from this process's config — a mismatch is logged once for observability rather than rejected.
func (RedisStore) CopyRaw ¶ added in v2.0.24
func (c RedisStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
CopyRaw is a naiive implementation for copy that reads all data locally then writes them to destination. TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy: https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectUsingREST.html
func (*RedisStore) CreateSignedURL ¶ added in v2.0.24
func (s *RedisStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) ( SignedURLResponse, error)
CreateSignedURL is not supported; Redis has no notion of pre-signed access.
func (*RedisStore) Delete ¶ added in v2.0.24
func (s *RedisStore) Delete(ctx context.Context, reference DataReference) error
func (*RedisStore) GetBaseContainerFQN ¶ added in v2.0.24
func (s *RedisStore) GetBaseContainerFQN(ctx context.Context) DataReference
func (*RedisStore) Head ¶ added in v2.0.24
func (s *RedisStore) Head(ctx context.Context, reference DataReference) (Metadata, error)
func (*RedisStore) List ¶ added in v2.0.24
func (s *RedisStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ( []DataReference, Cursor, error)
List returns up to maxItems keys under the reference prefix (or the whole container when the reference has no key), paginated by an index cursor.
It deliberately collects the full match set per call instead of exposing the SCAN cursor: SCAN's COUNT is only a hint (a batch may exceed maxItems) and its cursor cannot be rewound, so per-batch pagination either over-returns or silently drops the keys trimmed off a batch. A MATCH scan walks the whole keyspace server-side anyway, so full collection costs the same order as one page, and the keyspaces this store is intended for (run metadata) are small. Sorting makes pages deterministic across calls.
func (*RedisStore) ReadRaw ¶ added in v2.0.24
func (s *RedisStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
type ReferenceConstructor ¶
type ReferenceConstructor interface {
// ConstructReference creates a new dataReference that matches the storage structure.
ConstructReference(ctx context.Context, reference DataReference, nestedKeys ...string) (DataReference, error)
}
ReferenceConstructor defines an interface for building data reference paths.
type SignedURLConfig ¶
type SignedURLConfig struct {
StowConfigOverride map[string]string `json:"stowConfigOverride,omitempty" pflag:"-,Configuration for stow backend. Refer to github/flyteorg/stow"`
}
SignedURLConfig encapsulates configs specifically used for SignedURL behavior.
type SignedURLProperties ¶
type SignedURLProperties struct {
// Scope defines the permission level allowed for the generated URL.
Scope stow.ClientMethod
// ExpiresIn defines the expiration duration for the URL. It's strongly recommended setting it.
ExpiresIn time.Duration
// ContentMD5 defines the expected Base64-encoded 128-bit MD5 hash of the generated file.
// It's strongly recommended setting it.for data integrity checks on the storage backend.
ContentMD5 string
// AddContentMD5Metadata Add ContentMD5 to the metadata of signed URL if true.
AddContentMD5Metadata bool
}
SignedURLProperties encapsulates properties about the signedURL operation.
type SignedURLResponse ¶
type StowConfig ¶
type StowConfig struct {
Kind string `json:"kind,omitempty" pflag:",Kind of Stow backend to use. Refer to github/flyteorg/stow"`
Config map[string]string `json:"config,omitempty" pflag:",Configuration for stow backend. Refer to github/flyteorg/stow"`
}
StowConfig defines configs for stow as defined in github.com/flyteorg/stow
type StowMetadata ¶
type StowMetadata struct {
// contains filtered or unexported fields
}
StowMetadata that will be returned
func (StowMetadata) ContentMD5 ¶
func (s StowMetadata) ContentMD5() string
func (StowMetadata) Etag ¶
func (s StowMetadata) Etag() string
func (StowMetadata) Exists ¶
func (s StowMetadata) Exists() bool
func (StowMetadata) Size ¶
func (s StowMetadata) Size() int64
type StowStore ¶
type StowStore struct {
// contains filtered or unexported fields
}
Implements DataStore to talk to stow location store.
func NewStowRawStore ¶
func (StowStore) CopyRaw ¶
func (c StowStore) CopyRaw(ctx context.Context, source, destination DataReference, opts Options) error
CopyRaw is a naiive implementation for copy that reads all data locally then writes them to destination. TODO: We should upstream an API change to stow to implement copy more natively. E.g. Use s3 copy: https://docs.aws.amazon.com/AmazonS3/latest/dev/CopyingObjectUsingREST.html
func (*StowStore) CreateContainer ¶
func (*StowStore) CreateSignedURL ¶
func (s *StowStore) CreateSignedURL(ctx context.Context, reference DataReference, properties SignedURLProperties) (SignedURLResponse, error)
func (*StowStore) Delete ¶
func (s *StowStore) Delete(ctx context.Context, reference DataReference) error
Delete removes the referenced data from the blob store.
func (*StowStore) GetBaseContainerFQN ¶
func (s *StowStore) GetBaseContainerFQN(ctx context.Context) DataReference
func (*StowStore) List ¶
func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)
func (*StowStore) LoadContainer ¶
func (*StowStore) ReadRaw ¶
func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)
type URLPathConstructor ¶
type URLPathConstructor struct {
}
URLPathConstructor implements ReferenceConstructor that assumes paths are URL-compatible.
func NewURLPathConstructor ¶
func NewURLPathConstructor() URLPathConstructor
func (URLPathConstructor) ConstructReference ¶
func (URLPathConstructor) ConstructReference(ctx context.Context, reference DataReference, nestedKeys ...string) (DataReference, error)