Documentation
¶
Index ¶
- Constants
- func ConvertSpecTo[T any](res *resource.Resource) (*T, error)
- func ConvertSpecsTo[T any](res []*resource.Resource) ([]*T, error)
- func ParseBool(data any) (string, error)
- func ParseDateTime(data any, sourceTimeFormats []string, outPutType string) (string, error)
- func ParseNum(data any, precision int) (string, error)
- func ParseString(data any) (string, error)
- func SyncDriveFileToOSS(ctx context.Context, driveClient *gdrive.GDrive, driveFile *drive.File, ...) error
- func SyncDriveFolderToOSS(ctx context.Context, driveClient *gdrive.GDrive, ossClient *oss.Client, ...) error
- func ToOtherExternalSQLString(projectName, schemaName string, serdeProperties map[string]string, ...) (string, error)
- func URNFor(res *resource.Resource) (resource.URN, error)
- type Char
- type Client
- type ClientProvider
- type Cluster
- type ColumnRecord
- type Decimal
- type ExtTableClients
- type ExternalSource
- type ExternalTable
- type ExternalTableHandle
- type ExternalTableSourceType
- type ExternalTableSources
- type Field
- type FunctionHandle
- type MCFunction
- type MapSchema
- type MaskingPolicyHandle
- type MaxCompute
- func (MaxCompute) Backup(_ context.Context, _ *resource.Backup, _ []*resource.Resource) (*resource.BackupResult, error)
- func (MaxCompute) BatchUpdate(_ context.Context, _ []*resource.Resource) error
- func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error
- func (m MaxCompute) Exist(ctx context.Context, tnnt tenant.Tenant, urn resource.URN) (bool, error)
- func (MaxCompute) GetURN(res *resource.Resource) (resource.URN, error)
- func (m MaxCompute) Update(ctx context.Context, res *resource.Resource) error
- func (MaxCompute) Validate(r *resource.Resource) error
- type MaxComputeClient
- func (c *MaxComputeClient) ExternalTableHandleFrom(projectSchema ProjectSchema, getter TenantDetailsGetter, ...) TableResourceHandle
- func (c *MaxComputeClient) FunctionHandleFrom(projectSchema ProjectSchema) TableResourceHandle
- func (c *MaxComputeClient) GetDDLView(_ context.Context, table string) (string, error)
- func (c *MaxComputeClient) SchemaHandleFrom(projectSchema ProjectSchema) TableResourceHandle
- func (c *MaxComputeClient) TableHandleFrom(projectSchema ProjectSchema, maskingPolicyHandle TableMaskingPolicyHandle) TableResourceHandle
- func (c *MaxComputeClient) TableMaskingPolicyHandleFrom(projectSchema ProjectSchema, logger log.Logger) TableMaskingPolicyHandle
- func (c *MaxComputeClient) ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle
- type MaxComputeClientProvider
- type McExternalTable
- type McSQLExecutor
- type McSchema
- type McTable
- type McTableInstance
- type McTableWrapper
- type McTables
- type Partition
- type ProjectSchema
- type ResourceHandle
- type ResourceURN
- type ResourceURNWithUpstreams
- type ResourceURNWithUpstreamsList
- type ResourceURNs
- type Schema
- type SchemaDetails
- type SchemaHandle
- type SchemaInteractor
- type SecretProvider
- type SortColumn
- type SyncRepo
- type SyncerService
- func (s *SyncerService) GetExternalTablesDueForSync(ctx context.Context, tnnt tenant.Tenant, resources []*resource.Resource, ...) ([]*resource.Resource, []*resource.Resource, error)
- func (s *SyncerService) GetSourceRevisionInfo(ctx context.Context, tnnt tenant.Tenant, resources []*resource.Resource) ([]*resource.SourceRevisionInfo, error)
- func (*SyncerService) GetSyncInterval(res *resource.Resource) (int64, error)
- func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error
- func (s *SyncerService) SyncBatch(ctx context.Context, tnnt tenant.Tenant, resources []*resource.Resource) ([]resource.SyncStatus, error)
- func (s *SyncerService) TouchUnModified(ctx context.Context, projectName tenant.ProjectName, ...) error
- type Table
- type TableHandle
- type TableMaskingPolicyHandle
- type TableResourceHandle
- type TenantDetailsGetter
- type VarChar
- type View
- type ViewHandle
- type ViewSQLExecutor
- type ViewSchema
- type ViewTable
Constants ¶
View Source
const ( CSVHandler = "com.aliyun.odps.CsvStorageHandler" TSVHandler = "com.aliyun.odps.TsvStorageHandler" )
View Source
const ( CSV string = "CSV" TSV string = "TSV" JSON string = "JSON" TxtFile string = "TEXTFILE" RcFile string = "RCFILE" ORC string = "ORC" OrcFile string = "ORCFILE" SeqFile string = "SEQUENCEFILE" Parquet string = "PARQUET" Avro string = "AVRO" GoogleSheet ExternalTableSourceType = "GOOGLE_SHEETS" GoogleDrive ExternalTableSourceType = "GOOGLE_DRIVE" LarkSheet ExternalTableSourceType = "LARK_SHEET" OSS ExternalTableSourceType = "OSS" )
View Source
const ( EntityProject = "project" EntitySchema = "schema" EntityFunction = "function" ProjectSchemaSections = 2 TableNameSections = 3 )
View Source
const ( KindTable string = "table" KindView string = "view" KindSchema string = "schema" KindExternalTable string = "external_table" KindExternalTableGoogle string = "external_table_google" KindExternalTableLark string = "external_table_lark" KindFunction string = "function" )
View Source
const ( GsheetCredsKey = "GOOGLE_SHEETS_ACCOUNT" LarkCredentialsKey = "LARK_SHEETS_ACCOUNT" OSSCredsKey = "OSS_CREDS" ExtLocation = "EXT_LOCATION" MaxSyncInterval = 24 UseQuoteSerde = "odps.text.option.use.quote" AssumeRoleSerde = "odps.properties.rolearn" AssumeRoleProjectConfig = "EXTERNAL_TABLE_ASSUME_RAM_USER" )
View Source
const (
EntityExternalTable = "resource_external_table"
)
View Source
const (
EntityFormatter = "CSVFormatter"
)
View Source
const (
EntityTable = "resource_table"
)
View Source
const (
EntityView = "resource_view"
)
Variables ¶
This section is empty.
Functions ¶
func ConvertSpecsTo ¶ added in v0.20.0
func ParseDateTime ¶
func ParseString ¶
func SyncDriveFileToOSS ¶
func SyncDriveFolderToOSS ¶
func ToOtherExternalSQLString ¶
func ToOtherExternalSQLString(projectName, schemaName string, serdeProperties map[string]string, schema tableschema.TableSchema, format string) (string, error)
Types ¶
type Client ¶
type Client interface {
TableHandleFrom(projectSchema ProjectSchema, maskingPolicyHandle TableMaskingPolicyHandle) TableResourceHandle
ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle
ExternalTableHandleFrom(schema ProjectSchema, getter TenantDetailsGetter, maskingPolicyHandle TableMaskingPolicyHandle) TableResourceHandle
TableMaskingPolicyHandleFrom(projectSchema ProjectSchema, logger log.Logger) TableMaskingPolicyHandle
SchemaHandleFrom(projectSchema ProjectSchema) TableResourceHandle
FunctionHandleFrom(projectSchema ProjectSchema) TableResourceHandle
}
type ClientProvider ¶
type Cluster ¶
type Cluster struct {
Using []string `mapstructure:"using,omitempty"`
Type string `mapstructure:"type,omitempty"`
SortBy []SortColumn `mapstructure:"sort_by,omitempty"`
Buckets int `mapstructure:"buckets,omitempty"`
}
Cluster configuration Using: define the columns used for clustering
Type: type of clustering to use for table
Hash: https://www.alibabacloud.com/help/en/maxcompute/use-cases/range-clustering Range: https://www.alibabacloud.com/help/en/maxcompute/use-cases/hash-clustering
SortBy: columns to use for sorting Buckets: buckets to fill data in
type ColumnRecord ¶
type ColumnRecord struct {
// contains filtered or unexported fields
}
type Decimal ¶
type ExtTableClients ¶ added in v0.20.0
type ExternalSource ¶
type ExternalSource struct {
SourceType ExternalTableSourceType `mapstructure:"type,omitempty"`
ContentType string `mapstructure:"content_type,omitempty"`
SourceURIs []string `mapstructure:"uris,omitempty"`
// Additional configs for CSV, GoogleSheets, LarkSheets formats.
SerdeProperties map[string]string `mapstructure:"serde_properties"`
TableProperties map[string]string `mapstructure:"table_properties"`
SyncInterval int64 `mapstructure:"sync_interval_in_hrs,omitempty"`
GetFormattedDate bool `mapstructure:"fetch_formatted_datetime,omitempty"`
GetFormattedData bool `mapstructure:"fetch_formatted_data,omitempty"`
CleanGDriveCSV bool `mapstructure:"clean_gdrive_csv,omitempty"`
Jars []string `mapstructure:"jars,omitempty"`
Location string `mapstructure:"location,omitempty"`
Range string `mapstructure:"range,omitempty"`
}
func (ExternalSource) GetHeaderCount ¶
func (e ExternalSource) GetHeaderCount() (int, error)
func (ExternalSource) Validate ¶
func (e ExternalSource) Validate() error
type ExternalTable ¶
type ExternalTable struct {
Name string `mapstructure:"name,omitempty"`
Project string `mapstructure:"project,omitempty"`
Database string `mapstructure:"database,omitempty"`
Description string `mapstructure:"description,omitempty"`
Schema Schema `mapstructure:"schema,omitempty"`
Source *ExternalSource `mapstructure:"source,omitempty"`
Hints map[string]string `mapstructure:"hints,omitempty"`
}
func (*ExternalTable) FullName ¶
func (e *ExternalTable) FullName() string
func (*ExternalTable) GetSourceType ¶ added in v0.20.0
func (e *ExternalTable) GetSourceType() ExternalTableSourceType
func (*ExternalTable) GetSyncDelayTolerance ¶ added in v0.20.5
func (e *ExternalTable) GetSyncDelayTolerance() time.Duration
func (*ExternalTable) Validate ¶
func (e *ExternalTable) Validate() error
type ExternalTableHandle ¶
type ExternalTableHandle struct {
// contains filtered or unexported fields
}
func NewExternalTableHandle ¶
func NewExternalTableHandle( mcSQLExecutor McSQLExecutor, mcSchema McSchema, mcExternalTable McExternalTable, getter TenantDetailsGetter, maskingPolicyHandle TableMaskingPolicyHandle, ) *ExternalTableHandle
func (ExternalTableHandle) Create ¶
func (e ExternalTableHandle) Create(res *resource.Resource) error
func (ExternalTableHandle) Exists ¶
func (e ExternalTableHandle) Exists(tableName string) bool
type ExternalTableSourceType ¶ added in v0.20.0
type ExternalTableSourceType string
func NewExternalTableSource ¶ added in v0.20.0
func NewExternalTableSource(s string) (ExternalTableSourceType, error)
func (ExternalTableSourceType) String ¶ added in v0.20.0
func (e ExternalTableSourceType) String() string
type ExternalTableSources ¶ added in v0.20.0
type ExternalTableSources []ExternalTableSourceType
func (*ExternalTableSources) Append ¶ added in v0.20.0
func (es *ExternalTableSources) Append(source ExternalTableSourceType)
func (*ExternalTableSources) Has ¶ added in v0.20.0
func (es *ExternalTableSources) Has(sources ...ExternalTableSourceType) bool
type Field ¶
type Field struct {
Name string `mapstructure:"name,omitempty"`
Type string `mapstructure:"type,omitempty"`
Description string `mapstructure:"description,omitempty"`
// First label should be the primary label and others as extended
Labels []string `mapstructure:"labels,omitempty"`
DefaultValue string `mapstructure:"default_value,omitempty"`
Required bool `mapstructure:"required,omitempty"`
SourceTimeFormat string `mapstructure:"source_time_format,omitempty"`
SourceTimeFormats []string `mapstructure:"source_time_formats,omitempty"`
Decimal *Decimal `mapstructure:"decimal,omitempty"`
Char *Char `mapstructure:"char,omitempty"`
VarChar *VarChar `mapstructure:"varchar,omitempty"`
StructSchema []Field `mapstructure:"struct,omitempty"`
ArraySchema *Field `mapstructure:"array,omitempty"`
MapSchema *MapSchema `mapstructure:"map,omitempty"`
// masking policy fields
MaskPolicy string `mapstructure:"mask_policy,omitempty"`
UnmaskPolicy string `mapstructure:"unmask_policy,omitempty"`
}
type FunctionHandle ¶ added in v0.21.8
type FunctionHandle struct {
// contains filtered or unexported fields
}
func NewFunctionHandle ¶ added in v0.21.8
func NewFunctionHandle(mcFunction MCFunction) *FunctionHandle
func (*FunctionHandle) Create ¶ added in v0.21.8
func (*FunctionHandle) Create(_ *resource.Resource) error
func (*FunctionHandle) Exists ¶ added in v0.21.8
func (fh *FunctionHandle) Exists(tableName string) bool
type MCFunction ¶ added in v0.21.8
type MaskingPolicyHandle ¶
type MaskingPolicyHandle struct {
// contains filtered or unexported fields
}
type MaxCompute ¶
type MaxCompute struct {
SyncRepo SyncRepo
// contains filtered or unexported fields
}
func NewMaxComputeDataStore ¶
func NewMaxComputeDataStore(logger log.Logger, secretProvider SecretProvider, clientProvider ClientProvider, tenantProvider TenantDetailsGetter, syncRepo SyncRepo, maxFileSizeSupported, driveFileCleanupSizeLimit int, maxSyncDelayTolerance time.Duration) *MaxCompute
func (MaxCompute) Backup ¶
func (MaxCompute) Backup(_ context.Context, _ *resource.Backup, _ []*resource.Resource) (*resource.BackupResult, error)
func (MaxCompute) BatchUpdate ¶
type MaxComputeClient ¶
func NewClient ¶
func NewClient(svcAccount string) (*MaxComputeClient, error)
func (*MaxComputeClient) ExternalTableHandleFrom ¶
func (c *MaxComputeClient) ExternalTableHandleFrom(projectSchema ProjectSchema, getter TenantDetailsGetter, maskingPolicyHandle TableMaskingPolicyHandle) TableResourceHandle
func (*MaxComputeClient) FunctionHandleFrom ¶ added in v0.21.8
func (c *MaxComputeClient) FunctionHandleFrom(projectSchema ProjectSchema) TableResourceHandle
func (*MaxComputeClient) GetDDLView ¶
func (*MaxComputeClient) SchemaHandleFrom ¶ added in v0.20.6
func (c *MaxComputeClient) SchemaHandleFrom(projectSchema ProjectSchema) TableResourceHandle
func (*MaxComputeClient) TableHandleFrom ¶
func (c *MaxComputeClient) TableHandleFrom(projectSchema ProjectSchema, maskingPolicyHandle TableMaskingPolicyHandle) TableResourceHandle
func (*MaxComputeClient) TableMaskingPolicyHandleFrom ¶
func (c *MaxComputeClient) TableMaskingPolicyHandleFrom(projectSchema ProjectSchema, logger log.Logger) TableMaskingPolicyHandle
func (*MaxComputeClient) ViewHandleFrom ¶
func (c *MaxComputeClient) ViewHandleFrom(projectSchema ProjectSchema) TableResourceHandle
type MaxComputeClientProvider ¶
type MaxComputeClientProvider struct{}
func NewClientProvider ¶
func NewClientProvider() *MaxComputeClientProvider
type McExternalTable ¶
type McExternalTable interface {
CreateExternal(
schema tableschema.TableSchema,
createIfNotExists bool,
serdeProperties map[string]string,
jars []string,
hints, alias map[string]string,
) error
BatchLoadTables(tableNames []string) ([]*odps.Table, error)
Delete(tableName string, ifExists bool) error
}
type McSQLExecutor ¶
type McTable ¶
type McTable interface {
Create(schema tableschema.TableSchema, createIfNotExists bool, hints, alias map[string]string) error
BatchLoadTables(tableNames []string) ([]*odps.Table, error)
}
type McTableInstance ¶
type McTableInstance interface {
Load() error
ColumnMaskInfos() ([]odps.ColumnMaskInfo, error)
}
type McTableWrapper ¶
type McTableWrapper struct {
McTable
}
McTableWrapper is a wrapper for McTable which only implements BatchLoadTables so that the return value satisfies the McTableInstance interface
func (McTableWrapper) BatchLoadTables ¶
func (t McTableWrapper) BatchLoadTables(tableNames []string) ([]McTableInstance, error)
type McTables ¶
type McTables interface {
BatchLoadTables(tableNames []string) ([]McTableInstance, error)
}
type ProjectSchema ¶
func ProjectSchemaFor ¶
func ProjectSchemaFor(name resource.Name) (ProjectSchema, error)
func ProjectSchemaFrom ¶
func ProjectSchemaFrom(project, schemaName string) (ProjectSchema, error)
func (ProjectSchema) FullName ¶
func (ps ProjectSchema) FullName() string
type ResourceHandle ¶
type ResourceURN ¶
type ResourceURN struct {
Project string `mapstructure:"project"`
Schema string `mapstructure:"database"`
Name string `mapstructure:"name"`
}
func NewResourceURN ¶
func NewResourceURN(project, schema, name string) (ResourceURN, error)
func NewResourceURNFromResourceName ¶
func NewResourceURNFromResourceName(resourceName string) (ResourceURN, error)
func (ResourceURN) URN ¶
func (n ResourceURN) URN() string
type ResourceURNWithUpstreams ¶
type ResourceURNWithUpstreams struct {
ResourceURN ResourceURN
Upstreams []*ResourceURNWithUpstreams
}
type ResourceURNWithUpstreamsList ¶
type ResourceURNWithUpstreamsList []*ResourceURNWithUpstreams
func (ResourceURNWithUpstreamsList) FlattenUnique ¶
func (rs ResourceURNWithUpstreamsList) FlattenUnique() []*ResourceURNWithUpstreams
type ResourceURNs ¶
type ResourceURNs []ResourceURN
func (ResourceURNs) GroupByProjectschema ¶
func (n ResourceURNs) GroupByProjectschema() map[ProjectSchema][]string
type Schema ¶
type Schema []*Field
func (Schema) ContainsDateTimeColumns ¶
func (Schema) ToMaxComputeColumns ¶
func (s Schema) ToMaxComputeColumns(partitionColumn map[string]struct{}, clusterColumn *Cluster, schemaBuilder *tableschema.SchemaBuilder, tableType string) error
type SchemaDetails ¶ added in v0.20.6
type SchemaDetails struct {
Name resource.Name `mapstructure:"name,omitempty"`
Project string `mapstructure:"project,omitempty"`
Database string `mapstructure:"database,omitempty"`
Description string `mapstructure:"description,omitempty"`
}
func ConvertSpecToSchemaDetails ¶ added in v0.20.6
func ConvertSpecToSchemaDetails(res *resource.Resource) (*SchemaDetails, error)
func (*SchemaDetails) Validate ¶ added in v0.20.6
func (v *SchemaDetails) Validate() error
type SchemaHandle ¶ added in v0.20.6
type SchemaHandle struct {
// contains filtered or unexported fields
}
func NewSchemaHandle ¶ added in v0.20.6
func NewSchemaHandle(schema McSchema, schemaInteractor SchemaInteractor) *SchemaHandle
func (SchemaHandle) Create ¶ added in v0.20.6
func (sh SchemaHandle) Create(res *resource.Resource) error
func (SchemaHandle) Exists ¶ added in v0.20.6
func (sh SchemaHandle) Exists(_ string) bool
type SchemaInteractor ¶ added in v0.20.6
type SecretProvider ¶
type SortColumn ¶
type SyncRepo ¶
type SyncRepo interface {
Upsert(ctx context.Context, projectName tenant.ProjectName, entityType, identifier string, remarks map[string]string, success bool) error
Touch(ctx context.Context, projectName tenant.ProjectName, identifiers []string) error
UpsertRevision(ctx context.Context, projectName tenant.ProjectName, entityType, identifier string, remarks map[string]string, revision int, success bool) error
}
type SyncerService ¶
type SyncerService struct {
SyncRepo SyncRepo
MaxSyncDelayTolerance time.Duration
// contains filtered or unexported fields
}
func NewSyncer ¶
func NewSyncer(log log.Logger, secretProvider SecretProvider, tenantDetailsGetter TenantDetailsGetter, syncRepo SyncRepo, maxFileSizeSupported, driveFileCleanupSizeLimit int, maxSyncDelayTolerance time.Duration, ) *SyncerService
func (*SyncerService) GetExternalTablesDueForSync ¶ added in v0.20.0
func (*SyncerService) GetSourceRevisionInfo ¶ added in v0.21.14
func (s *SyncerService) GetSourceRevisionInfo(ctx context.Context, tnnt tenant.Tenant, resources []*resource.Resource) ([]*resource.SourceRevisionInfo, error)
func (*SyncerService) GetSyncInterval ¶
func (*SyncerService) GetSyncInterval(res *resource.Resource) (int64, error)
func (*SyncerService) SyncBatch ¶
func (s *SyncerService) SyncBatch(ctx context.Context, tnnt tenant.Tenant, resources []*resource.Resource) ([]resource.SyncStatus, error)
func (*SyncerService) TouchUnModified ¶
func (s *SyncerService) TouchUnModified(ctx context.Context, projectName tenant.ProjectName, resources []*resource.Resource) error
type Table ¶
type Table struct {
Name string `mapstructure:"name,omitempty"`
Project string `mapstructure:"project,omitempty"`
Database string `mapstructure:"database,omitempty"`
Description string `mapstructure:"description,omitempty"`
Schema Schema `mapstructure:"schema,omitempty"`
Cluster *Cluster `mapstructure:"cluster,omitempty"`
Partition *Partition `mapstructure:"partition,omitempty"`
Lifecycle int `mapstructure:"lifecycle,omitempty"`
Type string `mapstructure:"type,omitempty"`
TableProperties map[string]string `mapstructure:"table_properties,omitempty"`
Hints map[string]string `mapstructure:"hints,omitempty"`
ExtraConfig map[string]interface{} `mapstructure:",remain"`
}
Table defines the spec for max-compute table We are trying to keep the spec similar to bigquery to make maintenance easier Cluster: we accept name of column for clustering, check Cluster for more details Partition: we accept the name of columns for partitioning and provide in config ExtraConfig:
hints: hints to be provided to the table for creation, map<string, string> alias: alias for table to be passed to table, map<string, string>
type TableHandle ¶
type TableHandle struct {
// contains filtered or unexported fields
}
func NewTableHandle ¶
func NewTableHandle(mcSQLExecutor McSQLExecutor, mcSchema McSchema, mcTable McTable, maskingPolicyHandle TableMaskingPolicyHandle) *TableHandle
func (TableHandle) Exists ¶
func (t TableHandle) Exists(tableName string) bool
type TableMaskingPolicyHandle ¶
func NewMaskingPolicyHandle ¶
func NewMaskingPolicyHandle(mcSQLExecutor McSQLExecutor, mcTables McTables, logger log.Logger) TableMaskingPolicyHandle
type TableResourceHandle ¶
type TableResourceHandle interface {
ResourceHandle
}
type TenantDetailsGetter ¶
type View ¶
type View struct {
Name string `mapstructure:"name,omitempty"`
Project string `mapstructure:"project,omitempty"`
Database string `mapstructure:"database,omitempty"`
Description string `mapstructure:"description,omitempty"`
Columns []string `mapstructure:"columns,omitempty"`
ViewQuery string `mapstructure:"view_query,omitempty"`
Lifecycle int `mapstructure:"lifecycle,omitempty"`
Hints map[string]string `mapstructure:"hints,omitempty"`
}
type ViewHandle ¶
type ViewHandle struct {
// contains filtered or unexported fields
}
func NewViewHandle ¶
func NewViewHandle(viewSQLExecutor ViewSQLExecutor, viewSchema ViewSchema, viewTable ViewTable) *ViewHandle
func (ViewHandle) Exists ¶
func (v ViewHandle) Exists(tableName string) bool
type ViewSQLExecutor ¶
type ViewSQLExecutor interface {
CurrentSchemaName() string
}
type ViewSchema ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.