Documentation
¶
Index ¶
- func GetId() int64
- func TableCollationIdGen(tableName string) sqle.CollationID
- func TriggerAllChangeChannel(table string, changeIds map[string]string)
- func TriggerChangeChannel(table string)
- func WhichLastModified(a interface{}, b interface{}) bool
- type AskFlumeContext
- type AskFlumeMessage
- type FakeDFStat
- type FlowNameType
- type FlowType
- type PermissionUpdate
- type TTDINode
- func (dfs *TTDINode) EfficientLog(statMap map[string]interface{}, logF func(string, error))
- func (dfs *TTDINode) FinishStatistic(tfmContext *TrcFlowMachineContext, tfContext *TrcFlowContext, mod *kv.Modifier, ...)
- func (dfs *TTDINode) FinishStatisticLog()
- func (dfs *TTDINode) Log()
- func (dfs *TTDINode) MapStatistic(data map[string]interface{}, logger *log.Logger)
- func (dfs *TTDINode) RetrieveStatistic(mod *kv.Modifier, id string, indexPath string, idName string, flowG string, ...) error
- func (dfs *TTDINode) StatisticToMap(mod *kv.Modifier, dfst *TTDINode, enrichLastTested bool) map[string]interface{}
- func (dfs *TTDINode) UpdateDataFlowStatistic(flowG string, flowN string, stateN string, stateC string, mode int, ...)
- func (dfs *TTDINode) UpdateDataFlowStatisticWithTime(flowG string, flowN string, stateN string, stateC string, mode int, ...)
- type TrcFlowContext
- type TrcFlowMachineContext
- func (tfmContext *TrcFlowMachineContext) AddTableSchema(tableSchema sqle.PrimaryKeySchema, tfContext *TrcFlowContext)
- func (tfmContext *TrcFlowMachineContext) CallAPI(apiAuthHeaders map[string]string, host string, apiEndpoint string, ...) (map[string]interface{}, int, error)
- func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext, queryMap map[string]interface{}, ...) [][]interface{}
- func (tfmContext *TrcFlowMachineContext) CreateCompositeTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, ...)
- func (tfmContext *TrcFlowMachineContext) CreateDataFlowTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, iden3 string, ...)
- func (tfmContext *TrcFlowMachineContext) CreateTableTriggers(trcfc *TrcFlowContext, identityColumnName string)
- func (tfmContext *TrcFlowMachineContext) GetDbConn(tfContext *TrcFlowContext, dbUrl string, username string, ...) (*sql.DB, error)
- func (tfmContext *TrcFlowMachineContext) GetFlowConfiguration(trcfc *TrcFlowContext, flowTemplatePath string) (map[string]interface{}, bool)
- func (tfmContext *TrcFlowMachineContext) GetTableModifierLock() *sync.Mutex
- func (tfmContext *TrcFlowMachineContext) Init(sdbConnMap map[string]map[string]interface{}, tableNames []string, ...) error
- func (tfmContext *TrcFlowMachineContext) Log(msg string, err error)
- func (tfmContext *TrcFlowMachineContext) PathToTableRowHelper(tfContext *TrcFlowContext) ([]interface{}, error)
- func (tfmContext *TrcFlowMachineContext) ProcessFlow(config *eUtils.DriverConfig, tfContext *TrcFlowContext, ...) error
- func (tfmContext *TrcFlowMachineContext) SelectFlowChannel(tfContext *TrcFlowContext) <-chan bool
- func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContext, identityColumnName string, ...)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetId ¶
func GetId() int64
Keeps track of ID value for number of queries processed Should match up with ID for Flumeworld.MashupDetailedElementLibrary
func TableCollationIdGen ¶
func TableCollationIdGen(tableName string) sqle.CollationID
func TriggerAllChangeChannel ¶
func TriggerChangeChannel ¶
func TriggerChangeChannel(table string)
func WhichLastModified ¶
func WhichLastModified(a interface{}, b interface{}) bool
True if a time was most recent, false if b time was most recent.
Types ¶
type AskFlumeContext ¶
type AskFlumeContext struct {
GchatQueries chan *AskFlumeContext
DFQueries chan *AskFlumeContext
DFAnswers chan *AskFlumeContext
GchatAnswers chan *AskFlumeContext
Upsert chan *mashupsdk.MashupDetailedElementBundle
Close bool
FlowCase string
Query *AskFlumeMessage
Queries []*AskFlumeMessage
}
func InitAskFlume ¶
func InitAskFlume() (*AskFlumeContext, error)
Initializes the AskFlumeContext and returns the initialized context
type AskFlumeMessage ¶
type FakeDFStat ¶
type FakeDFStat struct {
mashupsdk.MashupDetailedElement
ChildNodes []FakeDFStat
}
type FlowNameType ¶
type FlowNameType string
var AskFlumeFlow FlowNameType = "AskFlumeFlow"
var DataFlowStatConfigurationsFlow FlowNameType = "DataFlowStatistics"
func (FlowNameType) ServiceName ¶
func (fnt FlowNameType) ServiceName() string
func (FlowNameType) TableName ¶
func (fnt FlowNameType) TableName() string
type PermissionUpdate ¶
type TTDINode ¶
type TTDINode struct {
*mashupsdk.MashupDetailedElement
//Data []byte
ChildNodes []*TTDINode
}
func InitArgosyFleet ¶
New API -> Argosy, return dataFlowGroups populated
func (*TTDINode) EfficientLog ¶
Doesn't deserialize statistic data for updatedataflowstatistic
func (*TTDINode) FinishStatistic ¶
func (*TTDINode) FinishStatisticLog ¶
func (dfs *TTDINode) FinishStatisticLog()
Set logFunc and logStat = false to use this otherwise it logs as states change with logStat = true
func (*TTDINode) MapStatistic ¶
func (*TTDINode) RetrieveStatistic ¶
func (*TTDINode) StatisticToMap ¶
func (dfs *TTDINode) StatisticToMap(mod *kv.Modifier, dfst *TTDINode, enrichLastTested bool) map[string]interface{}
Used for flow
func (*TTDINode) UpdateDataFlowStatistic ¶
type TrcFlowContext ¶
type TrcFlowContext struct {
RemoteDataSource map[string]interface{}
GoMod *helperkv.Modifier
Vault *sys.Vault
// Recommended not to store contexts, but because we
// are working with flows, this is a different pattern.
// This just means some analytic tools won't be able to
// perform analysis which are based on the Context.
ContextNotifyChan chan bool
Context context.Context
CancelContext context.CancelFunc
// I flow is complex enough, it can define
// it's own method for loading TrcDb
// from vault.
CustomSeedTrcDb func(*TrcFlowMachineContext, *TrcFlowContext) error
FlowSource string // The name of the flow source identified by project.
FlowSourceAlias string // May be a database name
Flow FlowNameType // May be a table name.
ChangeIdKey string
FlowPath string
FlowData interface{}
ChangeFlowName string // Change flow table name.
FlowState flowcorehelper.CurrentFlowState
FlowLock *sync.Mutex //This is for sync concurrent changes to FlowState
Restart bool
Init bool
ReadOnly bool
Inserter sqle.RowInserter
DataFlowStatistic FakeDFStat
Log *log.Logger
}
type TrcFlowMachineContext ¶
type TrcFlowMachineContext struct {
InitConfigWG *sync.WaitGroup
FlowControllerLock sync.Mutex
Region string
Env string
FlowControllerInit bool
FlowControllerUpdateLock sync.Mutex
FlowControllerUpdateAlert chan string
Config *eUtils.DriverConfig
Vault *sys.Vault
TierceronEngine *trcengine.TierceronEngine
ExtensionAuthData map[string]interface{}
ExtensionAuthDataReloader map[string]interface{}
GetAdditionalFlowsByState func(teststate string) []FlowNameType
ChannelMap map[FlowNameType]chan bool
FlowMap map[FlowNameType]*TrcFlowContext // Map of all running flows for engine
PermissionChan chan PermissionUpdate // This channel is used to alert for dynamic permissions when tables are loaded
}
func (*TrcFlowMachineContext) AddTableSchema ¶
func (tfmContext *TrcFlowMachineContext) AddTableSchema(tableSchema sqle.PrimaryKeySchema, tfContext *TrcFlowContext)
func (*TrcFlowMachineContext) CallAPI ¶
func (tfmContext *TrcFlowMachineContext) CallAPI(apiAuthHeaders map[string]string, host string, apiEndpoint string, bodyData io.Reader, getOrPost bool) (map[string]interface{}, int, error)
Utilizing provided api auth headers, endpoint, and body data this CB makes a call on behalf of the caller and returns a map representation of json data provided by the endpoint.
func (*TrcFlowMachineContext) CallDBQuery ¶
func (tfmContext *TrcFlowMachineContext) CallDBQuery(tfContext *TrcFlowContext, queryMap map[string]interface{}, bindings map[string]sqle.Expression, changed bool, operation string, flowNotifications []FlowNameType, flowtestState string) [][]interface{}
Make a call on Call back to insert or update using the provided query. If this is expected to result in a change to an existing table, thern trigger something to the changed channel.
func (*TrcFlowMachineContext) CreateCompositeTableTriggers ¶
func (tfmContext *TrcFlowMachineContext) CreateCompositeTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, insertT func(string, string, string, string) string, updateT func(string, string, string, string) string, deleteT func(string, string, string, string) string)
Set up call back to enable a trigger to track whenever a row in a table changes...
func (*TrcFlowMachineContext) CreateDataFlowTableTriggers ¶
func (tfmContext *TrcFlowMachineContext) CreateDataFlowTableTriggers(trcfc *TrcFlowContext, iden1 string, iden2 string, iden3 string, insertT func(string, string, string, string, string) string, updateT func(string, string, string, string, string) string, deleteT func(string, string, string, string, string) string)
Set up call back to enable a trigger to track whenever a row in a table changes...
func (*TrcFlowMachineContext) CreateTableTriggers ¶
func (tfmContext *TrcFlowMachineContext) CreateTableTriggers(trcfc *TrcFlowContext, identityColumnName string)
Set up call back to enable a trigger to track whenever a row in a table changes...
func (*TrcFlowMachineContext) GetDbConn ¶
func (tfmContext *TrcFlowMachineContext) GetDbConn(tfContext *TrcFlowContext, dbUrl string, username string, sourceDBConfig map[string]interface{}) (*sql.DB, error)
Open a database connection to the provided source using provided source configurations.
func (*TrcFlowMachineContext) GetFlowConfiguration ¶
func (tfmContext *TrcFlowMachineContext) GetFlowConfiguration(trcfc *TrcFlowContext, flowTemplatePath string) (map[string]interface{}, bool)
func (*TrcFlowMachineContext) GetTableModifierLock ¶
func (tfmContext *TrcFlowMachineContext) GetTableModifierLock() *sync.Mutex
func (*TrcFlowMachineContext) Init ¶
func (tfmContext *TrcFlowMachineContext) Init( sdbConnMap map[string]map[string]interface{}, tableNames []string, additionalFlowNames []FlowNameType, testFlowNames []FlowNameType, ) error
func (*TrcFlowMachineContext) Log ¶
func (tfmContext *TrcFlowMachineContext) Log(msg string, err error)
func (*TrcFlowMachineContext) PathToTableRowHelper ¶
func (tfmContext *TrcFlowMachineContext) PathToTableRowHelper(tfContext *TrcFlowContext) ([]interface{}, error)
func (*TrcFlowMachineContext) ProcessFlow ¶
func (tfmContext *TrcFlowMachineContext) ProcessFlow( config *eUtils.DriverConfig, tfContext *TrcFlowContext, processFlowController func(tfmContext *TrcFlowMachineContext, tfContext *TrcFlowContext) error, vaultDatabaseConfig map[string]interface{}, sourceDatabaseConnectionMap map[string]interface{}, flow FlowNameType, flowType FlowType) error
func (*TrcFlowMachineContext) SelectFlowChannel ¶
func (tfmContext *TrcFlowMachineContext) SelectFlowChannel(tfContext *TrcFlowContext) <-chan bool
func (*TrcFlowMachineContext) SyncTableCycle ¶
func (tfmContext *TrcFlowMachineContext) SyncTableCycle(tfContext *TrcFlowContext, identityColumnName string, indexColumnNames interface{}, getIndexedPathExt func(engine interface{}, rowDataMap map[string]interface{}, indexColumnNames interface{}, databaseName string, tableName string, dbCallBack func(interface{}, map[string]interface{}) (string, []string, [][]interface{}, error)) (string, error), flowPushRemote func(*TrcFlowContext, map[string]interface{}, map[string]interface{}) error, sqlState bool)