Documentation
¶
Overview ¶
Package sc contains definitions and functions for parsing Capillaries scripts
Index ¶
- Constants
- Variables
- func BuildKey(fieldMap map[string]any, idxDef *IdxDef) (string, error)
- func CalculateFieldValue(fieldName string, fieldDef *WriteTableFieldDef, srcVars eval.VarValuesMap, ...) (any, error)
- func CheckValueType(val any, fieldType TableFieldType) error
- func DefaultCassandraDecimal2() *inf.Dec
- func DefaultDateTime() time.Time
- func DefaultDecimal2() decimal.Decimal
- func GetDefaultFieldTypeValue(fieldType TableFieldType) any
- func IsValidFieldType(fieldType TableFieldType) bool
- func JsonOrYamlUnmarshal(scriptType ScriptType, in []byte, out any) error
- func NewScriptDefCache() *expirable.LRU[string, ScriptInitResult]
- func NewScriptFromFileBytes(caPath string, privateKeys map[string]string, scriptUrl string, ...) (*ScriptDef, ScriptInitProblemType, error)
- func NewScriptFromFiles(caPath string, privateKeys map[string]string, scriptUrl string, ...) (*ScriptDef, ScriptInitProblemType, error)
- func ParseRawGolangExpressionStringAndHarvestFieldRefs(strExp string, usedFields *FieldRefs) (ast.Expr, error)
- func ParseRawRelaxedGolangExpressionStringAndHarvestFieldRefs(strExp string, usedFields *FieldRefs, parserFlags FieldRefParserFlag) (ast.Expr, error)
- func ValidateNodeType(nodeType NodeType) error
- func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error
- func ValidateStartPolicy(startPolicy NodeStartPolicy) error
- type AggFinderVisitor
- type CsvCreatorSettings
- type CsvReaderColumnSettings
- type CsvReaderSettings
- type CustomProcessorDef
- type CustomProcessorDefFactory
- type DependencyPolicyDef
- type DependencyRule
- type FieldRef
- type FieldRefParserFlag
- type FieldRefs
- func (fieldRefs *FieldRefs) Append(otherFieldRefs FieldRefs)
- func (fieldRefs *FieldRefs) AppendWithFilter(otherFieldRefs FieldRefs, tableFilter string)
- func (fieldRefs *FieldRefs) FindByFieldName(fieldName string) (*FieldRef, bool)
- func (fieldRefs *FieldRefs) HasFieldsWithTableAlias(tableAlias string) bool
- type FileColumnIndexingMode
- type FileCreatorDef
- func (creatorDef *FileCreatorDef) CalculateFileRecordFromSrcVars(srcVars eval.VarValuesMap) ([]any, error)
- func (creatorDef *FileCreatorDef) CheckFileRecordHavingCondition(fileRecord []any) (bool, error)
- func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error
- func (creatorDef *FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions() FieldRefs
- func (creatorDef *FileCreatorDef) HasTop() bool
- type FileReaderColumnDef
- type FileReaderDef
- type IdxCaseSensitivity
- type IdxComponentDef
- type IdxDef
- type IdxDefMap
- type IdxSortOrder
- type IdxUniqueness
- type IndexRef
- type LookupDef
- type LookupJoinType
- type NodeRerunPolicy
- type NodeStartPolicy
- type NodeType
- type ParquetCodecType
- type ParquetCreatorSettings
- type ParquetReaderColumnSettings
- type ReadyToRunNodeCmdType
- type ScriptDef
- type ScriptInitProblemType
- type ScriptInitResult
- type ScriptNodeDef
- func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, ...) error
- func (node *ScriptNodeDef) GetTargetName() string
- func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error)
- func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs
- func (node *ScriptNodeDef) HasCustomProcessor() bool
- func (node *ScriptNodeDef) HasFileCreator() bool
- func (node *ScriptNodeDef) HasFileReader() bool
- func (node *ScriptNodeDef) HasLookup() bool
- func (node *ScriptNodeDef) HasTableCreator() bool
- func (node *ScriptNodeDef) HasTableReader() bool
- type ScriptType
- type TableCreatorDef
- func (tcDef *TableCreatorDef) CalculateTableRecordFromSrcVars(canUseAggFunc bool, srcVars eval.VarValuesMap) (map[string]any, error)
- func (tcDef *TableCreatorDef) CheckTableRecordHavingCondition(tableRecord map[string]any) (bool, error)
- func (tcDef *TableCreatorDef) Deserialize(rawWriter json.RawMessage) error
- func (tcDef *TableCreatorDef) GetFieldDefaultReadyForDb(fieldName string) (any, error)
- func (tcDef *TableCreatorDef) GetFieldRefs() *FieldRefs
- func (tcDef *TableCreatorDef) GetFieldRefsWithAlias(useTableAlias string) *FieldRefs
- func (tcDef *TableCreatorDef) GetSingleUniqueIndexDef() (string, *IdxDef, error)
- type TableFieldType
- type TableReaderDef
- type TableUpdaterDef
- type TopDef
- type WriteCsvColumnSettings
- type WriteFileColumnDef
- type WriteParquetColumnSettings
- type WriteTableFieldDef
Constants ¶
const ( CreatorFileTypeUnknown int = 0 CreatorFileTypeCsv int = 1 CreatorFileTypeParquet int = 2 )
const ( ReaderFileTypeUnknown int = 0 ReaderFileTypeCsv int = 1 ReaderFileTypeParquet int = 2 )
const ( DefaultStringComponentLen int64 = 64 MinStringComponentLen int64 = 16 MaxStringComponentLen int64 = 1024 )
const ( ReservedParamBatchIdx string = "{batch_idx|string}" ReservedParamRunId string = "{run_id|string}" )
const ( HandlerExeTypeGeneric string = "capi_daemon" HandlerExeTypeToolbelt string = "capi_toolbelt" HandlerExeTypeWebapi string = "capi_webapi" )
const AllowedIdxNameRegex = "^idx[A-Za-z0-9_]+"
const AllowedTableNameRegex = "[A-Za-z0-9_]+"
const BeginningOfTimeMicro = int64(-62135596800000000) // time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC).UnixMicro()
const CassandraDatetimeFormat string = "2006-01-02T15:04:05.000-07:00"
Cassandra timestamps are milliseconds. No microsecond support. On writes: - allows (but not requires) ":" in the timezone - allows (but not requires) "T" in as date/time separator
const CreatorAlias string = "w"
const CustomProcessorAlias string = "p"
const DefaultBool bool = false
const DefaultFloat float64 = float64(0.0)
const DefaultInt int64 = int64(0)
const DefaultPolicyCheckerConfJson string = `` /* 1464-byte string literal not displayed */
This conf should be never referenced in prod code. It's always in the the config.json. Or in the unit tests. Or in helper tools.
const DefaultRowsetSize int = 1000 // 1000 seems to work on c7g.large without OOM, careful with using bigger values
const DefaultString string = ""
const (
FieldNameUnknown = "unknown_field_name"
)
const LookupAlias string = "l"
const MaxAcceptedBatchesByTableReader int = 1000000
const MaxFileCreatorTopLimit int = 5000000
500k is conservative
const MaxRowsetSize int = 100000
const MaxTableNameLen int = 42 // Amazon keyspaces 48, minus RunIdSuffixLen
const ProhibitedTableNameRegex = "^idx|^wf|^system"
const ReaderAlias string = "r"
const RunIdSuffixFormat string = "_%05d" // _00001
const RunIdSuffixLen int = 6 // _00001
const ScriptDefCacheElementLife time.Duration = 1
const ScriptDefCacheMaxElements int = 50
Variables ¶
var ( ScriptDefCacheHitCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: "capi_script_def_cache_hit_count", Help: "Capillaries script def cache hits", }) ScriptDefCacheMissCounter = prometheus.NewCounter(prometheus.CounterOpts{ Name: "capi_script_def_cache_miss_count", Help: "Capillaries script def cache miss", }) )
Used by Daemon and Webapi
var ScriptDefCache *expirable.LRU[string, ScriptInitResult]
WARING: deserialized ScriptDef can take megabytes (big python scripts, big tag maps), so keep an eye on memory consumption If bad comes to worse, implement file-based caching (will require implementing take/restore ScriptDef snapshot code)
Functions ¶
func CalculateFieldValue ¶
func CalculateFieldValue(fieldName string, fieldDef *WriteTableFieldDef, srcVars eval.VarValuesMap, canUseAggFunc bool) (any, error)
func CheckValueType ¶
func CheckValueType(val any, fieldType TableFieldType) error
func DefaultDateTime ¶
func DefaultDecimal2 ¶
func GetDefaultFieldTypeValue ¶
func GetDefaultFieldTypeValue(fieldType TableFieldType) any
func IsValidFieldType ¶
func IsValidFieldType(fieldType TableFieldType) bool
func JsonOrYamlUnmarshal ¶ added in v1.1.22
func JsonOrYamlUnmarshal(scriptType ScriptType, in []byte, out any) error
func NewScriptDefCache ¶ added in v1.2.0
func NewScriptDefCache() *expirable.LRU[string, ScriptInitResult]
func NewScriptFromFileBytes ¶ added in v1.1.13
func NewScriptFromFileBytes( caPath string, privateKeys map[string]string, scriptUrl string, jsonOrYamlBytesScript []byte, scriptParamsUrl string, jsonOrYamlBytesParams []byte, customProcessorDefFactoryInstance CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage) (*ScriptDef, ScriptInitProblemType, error)
func NewScriptFromFiles ¶
func NewScriptFromFiles(caPath string, privateKeys map[string]string, scriptUrl string, scriptParamsUrl string, customProcessorDefFactoryInstance CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage) (*ScriptDef, ScriptInitProblemType, error)
func ParseRawRelaxedGolangExpressionStringAndHarvestFieldRefs ¶ added in v1.1.4
func ValidateNodeType ¶
func ValidateRerunPolicy ¶
func ValidateRerunPolicy(rerunPolicy NodeRerunPolicy) error
func ValidateStartPolicy ¶
func ValidateStartPolicy(startPolicy NodeStartPolicy) error
Types ¶
type AggFinderVisitor ¶
type AggFinderVisitor struct {
Error error
}
type CsvCreatorSettings ¶ added in v1.1.9
type CsvCreatorSettings struct {
Separator string `json:"separator" yaml:"separator"`
}
type CsvReaderColumnSettings ¶ added in v1.1.9
type CsvReaderSettings ¶ added in v1.1.9
type CsvReaderSettings struct {
SrcFileHdrLineIdx int `json:"hdr_line_idx" yaml:"hdr_line_idx"`
SrcFileFirstDataLineIdx int `json:"first_data_line_idx,omitempty" yaml:"first_data_line_idx,omitempty"`
Separator string `json:"separator,omitempty" yaml:"separator,omitempty"`
ColumnIndexingMode FileColumnIndexingMode `json:"-"`
}
type CustomProcessorDef ¶
type CustomProcessorDef interface {
Deserialize(raw json.RawMessage, customProcSettings json.RawMessage, scriptType ScriptType, caPath string, privateKeys map[string]string) error
GetFieldRefs() *FieldRefs
GetUsedInTargetExpressionsFields() *FieldRefs
}
type CustomProcessorDefFactory ¶
type CustomProcessorDefFactory interface {
Create(processorType string) (CustomProcessorDef, bool)
}
type DependencyPolicyDef ¶
type DependencyPolicyDef struct {
EventPriorityOrderString string `json:"event_priority_order" yaml:"event_priority_order"`
IsDefault bool `json:"is_default" yaml:"is_default"`
Rules []DependencyRule `json:"rules" yaml:"rules"`
OrderIdxDef IdxDef
}
func (*DependencyPolicyDef) Deserialize ¶
func (polDef *DependencyPolicyDef) Deserialize(rawPol json.RawMessage, scriptType ScriptType) error
type DependencyRule ¶
type DependencyRule struct {
Cmd ReadyToRunNodeCmdType `json:"cmd" yaml:"cmd"`
RawExpression string `json:"expression" yaml:"expression"`
ParsedExpression ast.Expr
}
type FieldRef ¶
type FieldRef struct {
TableName string
FieldName string
FieldType TableFieldType
}
func IdxKeyFieldRef ¶
func IdxKeyFieldRef() FieldRef
func RunBatchKeyTokenFieldRef() FieldRef {
return FieldRef{
TableName: "db_system",
FieldName: "token(run_id,batch_idx,key)",
FieldType: FieldTypeInt}
}
func KeyTokenFieldRef ¶
func KeyTokenFieldRef() FieldRef
func RowidFieldRef ¶
func RowidTokenFieldRef ¶
func RowidTokenFieldRef() FieldRef
func (*FieldRef) GetAliasHash ¶
type FieldRefParserFlag ¶ added in v1.1.4
type FieldRefParserFlag uint32
const ( FieldRefStrict FieldRefParserFlag = 0 FieldRefAllowUnknownIdents FieldRefParserFlag = 1 << iota FieldRefAllowWhateverFeatureYouAreAddingHere )
func (FieldRefParserFlag) HasFlag ¶ added in v1.1.4
func (f FieldRefParserFlag) HasFlag(flag FieldRefParserFlag) bool
type FieldRefs ¶
type FieldRefs []FieldRef
func GetFieldRefsUsedInAllTargetExpressions ¶
func GetFieldRefsUsedInAllTargetExpressions(fieldDefMap map[string]*WriteTableFieldDef) FieldRefs
func JoinFieldRefs ¶
func NewFieldRefsFromNodeEvent ¶
func NewFieldRefsFromNodeEvent() *FieldRefs
func (*FieldRefs) AppendWithFilter ¶
func (*FieldRefs) FindByFieldName ¶
func (*FieldRefs) HasFieldsWithTableAlias ¶
type FileColumnIndexingMode ¶
type FileColumnIndexingMode string
const ( FileColumnIndexingName FileColumnIndexingMode = "name" FileColumnIndexingIdx FileColumnIndexingMode = "idx" FileColumnIndexingUnknown FileColumnIndexingMode = "unknown" )
type FileCreatorDef ¶
type FileCreatorDef struct {
UrlTemplate string `json:"url_template" yaml:"url_template"`
RawHaving string `json:"having,omitempty"`
Top TopDef `json:"top,omitempty"`
Csv CsvCreatorSettings `json:"csv,omitempty"`
Parquet ParquetCreatorSettings `json:"parquet,omitempty"`
Columns []WriteFileColumnDef `json:"columns" yaml:"columns"`
Having ast.Expr `json:"-"`
UsedInHavingFields FieldRefs `json:"-"`
UsedInTargetExpressionsFields FieldRefs `json:"-"`
CreatorFileType int `json:"-"`
}
func (*FileCreatorDef) CalculateFileRecordFromSrcVars ¶
func (creatorDef *FileCreatorDef) CalculateFileRecordFromSrcVars(srcVars eval.VarValuesMap) ([]any, error)
func (*FileCreatorDef) CheckFileRecordHavingCondition ¶
func (creatorDef *FileCreatorDef) CheckFileRecordHavingCondition(fileRecord []any) (bool, error)
func (*FileCreatorDef) Deserialize ¶
func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error
func (*FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions ¶
func (creatorDef *FileCreatorDef) GetFieldRefsUsedInAllTargetFileExpressions() FieldRefs
func (*FileCreatorDef) HasTop ¶
func (creatorDef *FileCreatorDef) HasTop() bool
type FileReaderColumnDef ¶
type FileReaderColumnDef struct {
DefaultValue string `json:"col_default_value,omitempty" yaml:"col_default_value,omitempty"` // Optional. If omitted, zero value is used
Type TableFieldType `json:"col_type" yaml:"col_type"`
Csv CsvReaderColumnSettings `json:"csv,omitempty" yaml:"csv,omitempty"`
Parquet ParquetReaderColumnSettings `json:"parquet,omitempty" yaml:"parquet,omitempty"`
}
type FileReaderDef ¶
type FileReaderDef struct {
SrcFileUrls []string `json:"urls" yaml:"urls"`
Csv CsvReaderSettings `json:"csv,omitempty" yaml:"csv,omitempty"`
Columns map[string]*FileReaderColumnDef `json:"columns" yaml:"columns"` // Keys are names used in table writer
ReaderFileType int `json:"-"`
}
func (*FileReaderDef) Deserialize ¶
func (frDef *FileReaderDef) Deserialize(rawReader json.RawMessage) error
func (*FileReaderDef) ReadCsvLineToValuesMap ¶ added in v1.1.9
func (frDef *FileReaderDef) ReadCsvLineToValuesMap(line *[]string, colVars eval.VarValuesMap) error
func (*FileReaderDef) ResolveCsvColumnIndexesFromNames ¶ added in v1.1.9
func (frDef *FileReaderDef) ResolveCsvColumnIndexesFromNames(srcHdrLine []string) error
type IdxCaseSensitivity ¶
type IdxCaseSensitivity string
const ( IdxCaseSensitive IdxCaseSensitivity = "case_sensitive" IdxIgnoreCase IdxCaseSensitivity = "ignore_case" IdxCaseSensitivityUnknown IdxCaseSensitivity = "case_sensitivity_unknown" )
type IdxComponentDef ¶
type IdxComponentDef struct {
FieldName string
CaseSensitivity IdxCaseSensitivity
SortOrder IdxSortOrder
StringLen int64 // For string fields only, default 64
FieldType TableFieldType // Populated from tgt_table def
}
type IdxDef ¶
type IdxDef struct {
Uniqueness IdxUniqueness
Components []IdxComponentDef
}
type IdxSortOrder ¶
type IdxSortOrder string
const ( IdxSortAsc IdxSortOrder = "asc" IdxSortDesc IdxSortOrder = "desc" IdxSortUnknown IdxSortOrder = "unknown" )
type IdxUniqueness ¶
type IdxUniqueness string
const ( IdxUnique IdxUniqueness = "unique" IdxNonUnique IdxUniqueness = "non_unique" IdxUniquenessUnknown IdxUniqueness = "unknown" )
type LookupDef ¶
type LookupDef struct {
IndexName string `json:"index_name" yaml:"index_name"`
RawJoinOn string `json:"join_on" yaml:"join_on"`
IsGroup bool `json:"group" yaml:"group"`
RawFilter string `json:"filter" yaml:"filter"`
LookupJoin LookupJoinType `json:"join_type" yaml:"join_type"`
IdxReadBatchSize int `json:"idx_read_batch_size" yaml:"idx_read_batch_size"`
RightLookupReadBatchSize int `json:"right_lookup_read_batch_size" yaml:"right_lookup_read_batch_size"`
LeftTableFields FieldRefs // In the same order as lookup idx - important
TableCreator *TableCreatorDef // Populated when walking through al nodes
UsedInFilterFields FieldRefs
Filter ast.Expr `yaml:"-"`
}
func (*LookupDef) CheckFilterCondition ¶
func (lkpDef *LookupDef) CheckFilterCondition(varsFromLookup eval.VarValuesMap) (bool, error)
func (*LookupDef) CheckPagedBatchSize ¶
func (*LookupDef) ParseFilter ¶
func (*LookupDef) UsesFilter ¶
func (*LookupDef) ValidateJoinType ¶
type LookupJoinType ¶
type LookupJoinType string
const ( LookupJoinInner LookupJoinType = "inner" LookupJoinLeft LookupJoinType = "left" )
type NodeRerunPolicy ¶
type NodeRerunPolicy string
const ( NodeRerun NodeRerunPolicy = "rerun" // Default NodeFail NodeRerunPolicy = "fail" )
type NodeStartPolicy ¶
type NodeStartPolicy string
const ( NodeStartManual NodeStartPolicy = "manual" NodeStartAuto NodeStartPolicy = "auto" // Default )
type NodeType ¶
type NodeType string
const ( NodeTypeNone NodeType = "none" NodeTypeFileTable NodeType = "file_table" NodeTypeTableTable NodeType = "table_table" NodeTypeTableLookupTable NodeType = "table_lookup_table" NodeTypeTableFile NodeType = "table_file" NodeTypeTableCustomTfmTable NodeType = "table_custom_tfm_table" NodeTypeDistinctTable NodeType = "distinct_table" )
type ParquetCodecType ¶ added in v1.1.9
type ParquetCodecType string
const ( ParquetCodecGzip ParquetCodecType = "gzip" ParquetCodecSnappy ParquetCodecType = "snappy" ParquetCodecUncompressed ParquetCodecType = "uncompressed" )
type ParquetCreatorSettings ¶ added in v1.1.9
type ParquetCreatorSettings struct {
Codec ParquetCodecType `json:"codec" yaml:"codec"`
}
type ParquetReaderColumnSettings ¶ added in v1.1.9
type ParquetReaderColumnSettings struct {
SrcColName string `json:"col_name"`
}
type ReadyToRunNodeCmdType ¶
type ReadyToRunNodeCmdType string
const ( NodeNone ReadyToRunNodeCmdType = "none" NodeGo ReadyToRunNodeCmdType = "go" NodeWait ReadyToRunNodeCmdType = "wait" NodeNogo ReadyToRunNodeCmdType = "nogo" )
func ReadyToRunNodeCmdTypeFromString ¶ added in v1.1.26
func ReadyToRunNodeCmdTypeFromString(s string) (ReadyToRunNodeCmdType, error)
type ScriptDef ¶
type ScriptDef struct {
ScriptNodes map[string]*ScriptNodeDef `json:"nodes" yaml:"nodes"`
RawDependencyPolicies map[string]json.RawMessage `json:"dependency_policies" yaml:"dependency_policies"`
TableCreatorNodeMap map[string](*ScriptNodeDef)
IndexNodeMap map[string](*ScriptNodeDef)
}
func (*ScriptDef) Deserialize ¶
func (scriptDef *ScriptDef) Deserialize(jsonOrYamlBytesScript []byte, scriptType ScriptType, customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, caPath string, privateKeys map[string]string) error
func (*ScriptDef) GetAffectedNodes ¶
Returns all nodes that will receive RabbitMQ/ActiveMQ messages when a run is started with startNodeNames The tricky part is not to include nodes that have "manual" nodes between them and the start nodes (see addChildrenToManual)
type ScriptInitProblemType ¶ added in v1.1.3
type ScriptInitProblemType int
const ScriptInitConnectivityProblem ScriptInitProblemType = 3
const ScriptInitContentProblem ScriptInitProblemType = 2
const ScriptInitNoProblem ScriptInitProblemType = 0
const ScriptInitUrlProblem ScriptInitProblemType = 1
type ScriptInitResult ¶ added in v1.1.26
type ScriptInitResult struct {
Def *ScriptDef
InitProblem ScriptInitProblemType
Err error
}
type ScriptNodeDef ¶
type ScriptNodeDef struct {
Name string // Get it from the key
Type NodeType `json:"type" yaml:"type"`
Desc string `json:"desc" yaml:"desc"`
StartPolicy NodeStartPolicy `json:"start_policy" yaml:"start_policy"`
RerunPolicy NodeRerunPolicy `json:"rerun_policy,omitempty" yaml:"rerun_policy,omitempty"`
CustomProcessorType string `json:"custom_proc_type,omitempty" yaml:"custom_proc_type,omitempty"`
HandlerExeType string `json:"handler_exe_type,omitempty" yaml:"handler_exe_type,omitempty"`
MaxBatchProcessingTime int `json:"max_batch_processing_time,omitempty" yaml:"max_batch_processing_time,omitempty"`
RawReader json.RawMessage `json:"r" yaml:"r"` // This depends on tfm type
TableReader TableReaderDef
FileReader FileReaderDef
Lookup LookupDef `json:"l" yaml:"l"`
RawProcessorDef json.RawMessage `json:"p" yaml:"p"` // This depends on tfm type
CustomProcessor CustomProcessorDef // Also should implement CustomProcessorRunner
RawWriter json.RawMessage `json:"w" yaml:"w"` // This depends on tfm type
DependencyPolicyName string `json:"dependency_policy" yaml:"dependency_policy"`
TableCreator TableCreatorDef
TableUpdater TableUpdaterDef
FileCreator FileCreatorDef
DepPolDef *DependencyPolicyDef
}
func (*ScriptNodeDef) Deserialize ¶
func (node *ScriptNodeDef) Deserialize(customProcessorDefFactory CustomProcessorDefFactory, customProcessorsSettings map[string]json.RawMessage, scriptType ScriptType, caPath string, privateKeys map[string]string) error
func (*ScriptNodeDef) GetTargetName ¶
func (node *ScriptNodeDef) GetTargetName() string
func (*ScriptNodeDef) GetTokenIntervalsByNumberOfBatches ¶
func (node *ScriptNodeDef) GetTokenIntervalsByNumberOfBatches() ([][]int64, error)
func (*ScriptNodeDef) GetUniqueIndexesFieldRefs ¶
func (node *ScriptNodeDef) GetUniqueIndexesFieldRefs() *FieldRefs
func (*ScriptNodeDef) HasCustomProcessor ¶
func (node *ScriptNodeDef) HasCustomProcessor() bool
func (*ScriptNodeDef) HasFileCreator ¶
func (node *ScriptNodeDef) HasFileCreator() bool
func (*ScriptNodeDef) HasFileReader ¶
func (node *ScriptNodeDef) HasFileReader() bool
func (*ScriptNodeDef) HasLookup ¶
func (node *ScriptNodeDef) HasLookup() bool
func (*ScriptNodeDef) HasTableCreator ¶
func (node *ScriptNodeDef) HasTableCreator() bool
func (*ScriptNodeDef) HasTableReader ¶
func (node *ScriptNodeDef) HasTableReader() bool
type ScriptType ¶ added in v1.1.22
type ScriptType string
const ( ScriptJson ScriptType = "json" ScriptYaml ScriptType = "yaml" ScriptUnknown ScriptType = "unknown" )
type TableCreatorDef ¶
type TableCreatorDef struct {
Name string `json:"name" yaml:"name"`
CreateProperties string `json:"table_options" yaml:"table_options"`
RawHaving string `json:"having,omitempty" yaml:"having,omitempty"`
Having ast.Expr `json:"-"`
UsedInHavingFields FieldRefs `json:"-"`
UsedInTargetExpressionsFields FieldRefs `json:"-"`
Fields map[string]*WriteTableFieldDef `json:"fields,omitempty" yaml:"fields,omitempty"`
RawIndexes map[string]string `json:"indexes,omitempty" yaml:"indexes,omitempty"`
Indexes IdxDefMap `json:"-"`
}
func (*TableCreatorDef) CalculateTableRecordFromSrcVars ¶
func (tcDef *TableCreatorDef) CalculateTableRecordFromSrcVars(canUseAggFunc bool, srcVars eval.VarValuesMap) (map[string]any, error)
func (*TableCreatorDef) CheckTableRecordHavingCondition ¶
func (tcDef *TableCreatorDef) CheckTableRecordHavingCondition(tableRecord map[string]any) (bool, error)
func (*TableCreatorDef) Deserialize ¶
func (tcDef *TableCreatorDef) Deserialize(rawWriter json.RawMessage) error
func (*TableCreatorDef) GetFieldDefaultReadyForDb ¶
func (tcDef *TableCreatorDef) GetFieldDefaultReadyForDb(fieldName string) (any, error)
func (*TableCreatorDef) GetFieldRefs ¶
func (tcDef *TableCreatorDef) GetFieldRefs() *FieldRefs
func (*TableCreatorDef) GetFieldRefsWithAlias ¶
func (tcDef *TableCreatorDef) GetFieldRefsWithAlias(useTableAlias string) *FieldRefs
func (*TableCreatorDef) GetSingleUniqueIndexDef ¶ added in v1.1.18
func (tcDef *TableCreatorDef) GetSingleUniqueIndexDef() (string, *IdxDef, error)
type TableFieldType ¶
type TableFieldType string
const ( FieldTypeString TableFieldType = "string" FieldTypeInt TableFieldType = "int" // sign+18digit string FieldTypeFloat TableFieldType = "float" // sign+64digit string, 32 digits after point FieldTypeBool TableFieldType = "bool" // F or T FieldTypeDecimal2 TableFieldType = "decimal2" // sign + 18digit+point+2 FieldTypeDateTime TableFieldType = "datetime" // int unix epoch milliseconds FieldTypeUnknown TableFieldType = "unknown" )
type TableReaderDef ¶
type TableReaderDef struct {
TableName string `json:"table" yaml:"table"`
ExpectedBatchesTotal int `json:"expected_batches_total,omitempty" yaml:"expected_batches_total,omitempty"`
RowsetSize int `json:"rowset_size,omitempty" yaml:"rowset_size,omitempty"` // DefaultRowsetSize = 1000, careful with higher values - watch for OOM
TableCreator *TableCreatorDef `json:"-"`
}
type TableUpdaterDef ¶
type TableUpdaterDef struct {
Fields map[string]*WriteTableFieldDef `json:"fields" yaml:"fields"`
}
type WriteCsvColumnSettings ¶ added in v1.1.9
type WriteFileColumnDef ¶
type WriteFileColumnDef struct {
RawExpression string `json:"expression" yaml:"expression"`
Name string `json:"name"` // To be used in Having
Type TableFieldType `json:"type"` // To be checked when checking expressions and to be used in Having
Csv WriteCsvColumnSettings `json:"csv,omitempty"`
Parquet WriteParquetColumnSettings `json:"parquet,omitempty"`
ParsedExpression ast.Expr `json:"-"`
UsedFields FieldRefs `json:"-"`
}
type WriteParquetColumnSettings ¶ added in v1.1.9
type WriteParquetColumnSettings struct {
ColumnName string `json:"column_name" yaml:"column_name"`
}
type WriteTableFieldDef ¶
type WriteTableFieldDef struct {
RawExpression string `json:"expression" yaml:"expression"`
Type TableFieldType `json:"type" yaml:"type"`
DefaultValue string `json:"default_value,omitempty" yaml:"default_value,omitempty"` // Optional. If omitted, default zero value is used
ParsedExpression ast.Expr `json:"-"`
UsedFields FieldRefs `json:"-"`
}
Source Files
¶
- custom_processor_def.go
- dependency_policy_def.go
- doc.go
- field_ref.go
- file_creator_def.go
- file_reader_def.go
- index_def.go
- key.go
- lookup_def.go
- prometheus_counters.go
- script_def.go
- script_def_cache.go
- script_def_loader.donotcover.go
- script_def_loader.go
- script_node_def.go
- table_creator_def.go
- table_def.go
- table_reader_def.go
- util.go
- write_table_field_def.go