Documentation
¶
Index ¶
- func CalculatePeriod(year, month, day int) (monthPeriod, weekPeriod, dayPeriod int)
- func DoNotifyApiGateway(fileKey, apiEndpoint, apiEndpointJson, notificationTemplate string, ...) error
- func EvalThrotting(submittedPipelinesCount, submittedTier1Count int64) (bool, error)
- func GetLastComponent(path string) (result string)
- func GetOutputTables(dbpool *pgxpool.Pool, pipelineExecutionKey int) ([]string, error)
- func GetSchemaProviderJsonFromPipelineKey(dbpool *pgxpool.Pool, peKey int) (string, string, error)
- func GetSchemaProviderJsonFromPipelineSession(dbpool *pgxpool.Pool, sessionId string) (string, error)
- func InsertSourcePeriod(dbpool *pgxpool.Pool, year, month, day int) (int, error)
- func ReserveSessionId(dbpool *pgxpool.Pool) (string, error)
- func RunUpdateDb(workspaceName string, serverArgs *[]string) (string, error)
- func SplitFileKeyIntoComponents(keyMap map[string]any, fileKey *string) map[string]any
- func UnitTestWorkspaceAction(ctx *DataTableContext, dataTableAction *DataTableAction, token string)
- type Column
- type DataTableAction
- type DataTableColumnDef
- type DataTableContext
- func (ctx *DataTableContext) AddWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
- func (ctx *DataTableContext) DeleteAllWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) DeleteWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) DeleteWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
- func (ctx *DataTableContext) DoPreviewFileAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
- func (ctx *DataTableContext) DoReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
- func (ctx *DataTableContext) DoWorkspaceReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
- func (ctx *DataTableContext) DropTable(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) ExecDataManagementStatement(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) ExecRawQuery(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) ExecRawQueryMap(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) GetTaskThrottlingInfo(taskStatus string) (int64, int64, error)
- func (ctx *DataTableContext) GetWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) InsertPipelineExecutionStatus(dataTableAction *DataTableAction, irow int, results *map[string]any, ...) (peKey int, httpStatus int, err error)
- func (ctx *DataTableContext) InsertRawRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) InsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) PutSchemaEventToS3(action *RegisterFileKeyAction, token string) (*map[string]any, int, error)
- func (ctx *DataTableContext) RegisterFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]any, int, error)
- func (ctx *DataTableContext) RegisterSchemaEvent(dbpool *pgxpool.Pool, schemaInfo map[string]any, token string) error
- func (ctx *DataTableContext) SaveWorkspaceClientConfig(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) SaveWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) StartPendingTasks() (err error)
- func (ctx *DataTableContext) StartPipelinesForInputRegistryV2(inputRegistryKey, sourcePeriodKey int, ...) error
- func (ctx *DataTableContext) SyncFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]any, int, error)
- func (ctx *DataTableContext) VerifyUserPermission(sqlStmt *SqlInsertDefinition, token string) (*user.User, error)
- func (ctx *DataTableContext) WorkspaceInsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
- func (ctx *DataTableContext) WorkspaceQueryStructure(dataTableAction *DataTableAction, token string) (results *[]byte, httpStatus int, err error)
- type FromClause
- type PendingTask
- type RegisterFileKeyAction
- type SourcePeriod
- type SqlInsertDefinition
- type StatusUpdate
- type ThrottlingSpec
- type WhereClause
- type WithClause
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CalculatePeriod ¶
Calculate the month, week, and day period since unix epoch (1/1/1970)
func DoNotifyApiGateway ¶
func EvalThrotting ¶
func GetLastComponent ¶
func GetOutputTables ¶
func GetSchemaProviderJsonFromPipelineKey ¶
Utility function to get the SchemaProvider json and session_id using the pipeline execution key
func GetSchemaProviderJsonFromPipelineSession ¶
func GetSchemaProviderJsonFromPipelineSession(dbpool *pgxpool.Pool, sessionId string) (string, error)
Utility function to get the SchemaProvider json using the pipeline execution session id
func InsertSourcePeriod ¶
Insert into source_period and returns the source_period.key If row already exist on table, return the key of that row without inserting a new one
func RunUpdateDb ¶
Run update_db - function used by apiserver and server
func UnitTestWorkspaceAction ¶
func UnitTestWorkspaceAction(ctx *DataTableContext, dataTableAction *DataTableAction, token string)
Execute pipeline in unit test mode
Types ¶
type DataTableAction ¶
type DataTableAction struct {
Action string `json:"action"`
WorkspaceName string `json:"workspaceName"`
WorkspaceBranch string `json:"workspaceBranch"`
FeatureBranch string `json:"featureBranch"`
RawQuery string `json:"query"`
RawQueryMap map[string]string `json:"query_map"`
Columns []Column `json:"columns"`
FromClauses []FromClause `json:"fromClauses"`
WhereClauses []WhereClause `json:"whereClauses"`
WithClauses []WithClause `json:"withClauses"`
DistinctOnClauses []string `json:"distinctOnClauses"`
SortColumn string `json:"sortColumn"`
SortColumnTable string `json:"sortColumnTable"`
SortAscending bool `json:"sortAscending"`
Offset int `json:"offset"`
Limit int `json:"limit"`
// used for raw_query & raw_query_tool action only
RequestColumnDef bool `json:"requestColumnDef"`
// other non-query properties
SkipThrottling bool `json:"skipThrottling"`
Data []map[string]interface{} `json:"data"`
}
sql access builder SkipThrottling indicates not to put pipeline in pending
type DataTableColumnDef ¶
type DataTableColumnDef struct {
Index int `json:"index"`
Name string `json:"name"`
Label string `json:"label"`
Tooltips string `json:"tooltips"`
IsNumeric bool `json:"isnumeric"`
}
DataTableColumnDef used when returning the column definition obtained from db catalog
func (*DataTableColumnDef) String ¶
func (dc *DataTableColumnDef) String() string
type DataTableContext ¶
type DataTableContext struct {
Dbpool *pgxpool.Pool
DevMode bool
UsingSshTunnel bool
UnitTestDir *string
AdminEmail *string
CpipesEnv map[string]any
}
Environment and settings needed Note CpipesEnv is used ONLY in dev mode to pass environment variables to local_test_driver
func NewDataTableContext ¶
func (*DataTableContext) AddWorkspaceFile ¶
func (ctx *DataTableContext) AddWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
AddWorkspaceFile
func (*DataTableContext) DeleteAllWorkspaceChanges ¶
func (ctx *DataTableContext) DeleteAllWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
DeleteAllWorkspaceChanges -------------------------------------------------------------------------- Function to delete workspace file changes based on rows in workspace_changes Delete the workspace_changes row and the associated large object
func (*DataTableContext) DeleteWorkspaceChanges ¶
func (ctx *DataTableContext) DeleteWorkspaceChanges(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
DeleteWorkspaceChanges -------------------------------------------------------------------------- Function to delete workspace file changes based on rows in workspace_changes Delete the workspace_changes row and the associated large object Restaure files from stash, except for .db and .tgz files
func (*DataTableContext) DeleteWorkspaceFile ¶
func (ctx *DataTableContext) DeleteWorkspaceFile(dataTableAction *DataTableAction, token string) (rb *[]byte, httpStatus int, err error)
DeleteWorkspaceFile
func (*DataTableContext) DoPreviewFileAction ¶
func (ctx *DataTableContext) DoPreviewFileAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
DoPreviewFileAction ------------------------------------------------------
func (*DataTableContext) DoReadAction ¶
func (ctx *DataTableContext) DoReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
DoReadAction ------------------------------------------------------
func (*DataTableContext) DoWorkspaceReadAction ¶
func (ctx *DataTableContext) DoWorkspaceReadAction(dataTableAction *DataTableAction, token string) (*map[string]interface{}, int, error)
DoWorkspaceReadAction ------------------------------------------------------
func (*DataTableContext) DropTable ¶
func (ctx *DataTableContext) DropTable(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
DropTable ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items
func (*DataTableContext) ExecDataManagementStatement ¶
func (ctx *DataTableContext) ExecDataManagementStatement(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
func (*DataTableContext) ExecRawQuery ¶
func (ctx *DataTableContext) ExecRawQuery(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
ExecRawQuery ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items
func (*DataTableContext) ExecRawQueryMap ¶
func (ctx *DataTableContext) ExecRawQueryMap(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
ExecRawQueryMap ------------------------------------------------------ These are queries to load reference data for widget, e.g. dropdown list of items
func (*DataTableContext) GetTaskThrottlingInfo ¶
func (ctx *DataTableContext) GetTaskThrottlingInfo(taskStatus string) (int64, int64, error)
func (*DataTableContext) GetWorkspaceFileContent ¶
func (ctx *DataTableContext) GetWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
GetWorkspaceFileContent -------------------------------------------------------------------------- Function to get the workspace file content based on relative file name Read the file from the workspace on file system since it's already in sync with database
func (*DataTableContext) InsertPipelineExecutionStatus ¶
func (ctx *DataTableContext) InsertPipelineExecutionStatus(dataTableAction *DataTableAction, irow int, results *map[string]any, token string) (peKey int, httpStatus int, err error)
Insert into pipeline_execution_status and in loader_execution_status
func (*DataTableContext) InsertRawRows ¶
func (ctx *DataTableContext) InsertRawRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
InsertRawRows ------------------------------------------------------ Insert row function using a raw text buffer containing cst/tsv rows Delegates to InsertRows
func (*DataTableContext) InsertRows ¶
func (ctx *DataTableContext) InsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
InsertRows ------------------------------------------------------ Main insert row function with pre processing hooks for validating/authorizing the request Main insert row function with post processing hooks for starting pipelines Inserting rows using pre-defined sql statements, keyed by table name provided in dataTableAction
func (*DataTableContext) PutSchemaEventToS3 ¶
func (ctx *DataTableContext) PutSchemaEventToS3(action *RegisterFileKeyAction, token string) (*map[string]any, int, error)
Submit Schema Event to S3 (which will call RegisterFileKEys as side effect)
func (*DataTableContext) RegisterFileKeys ¶
func (ctx *DataTableContext) RegisterFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]any, int, error)
Register file_key with file_key_staging table and handling schema events as well.
func (*DataTableContext) RegisterSchemaEvent ¶
func (ctx *DataTableContext) RegisterSchemaEvent(dbpool *pgxpool.Pool, schemaInfo map[string]any, token string) error
API version to register schema event. This is used by the Jets_Loader process to avoid writing the event to s3 first.
func (*DataTableContext) SaveWorkspaceClientConfig ¶
func (ctx *DataTableContext) SaveWorkspaceClientConfig(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
SaveWorkspaceClientConfig -------------------------------------------------------------------------- Function to save the workspace file content in local workspace file system and in database
func (*DataTableContext) SaveWorkspaceFileContent ¶
func (ctx *DataTableContext) SaveWorkspaceFileContent(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
SaveWorkspaceFileContent -------------------------------------------------------------------------- Function to save the workspace file content in local workspace file system and in database
func (*DataTableContext) StartPendingTasks ¶
func (ctx *DataTableContext) StartPendingTasks() (err error)
func (*DataTableContext) StartPipelinesForInputRegistryV2 ¶
func (ctx *DataTableContext) StartPipelinesForInputRegistryV2(inputRegistryKey, sourcePeriodKey int, inputSessionId, client, objectType, fileKey, token string) error
Start process based on matching criteria:
- find pipelines that are ready to start with the input_registry key.
- Pipeline must have automated flag on
Note: the argument inputSessionId is the inputRegistryKey sessionId
func (*DataTableContext) SyncFileKeys ¶
func (ctx *DataTableContext) SyncFileKeys(registerFileKeyAction *RegisterFileKeyAction, token string) (*map[string]any, int, error)
SyncFileKeys ------------------------------------------------------ 12/17/2023: Replacing all keys in file_key_staging to be able to reset keys from source_config that are Part File sources
func (*DataTableContext) VerifyUserPermission ¶
func (ctx *DataTableContext) VerifyUserPermission(sqlStmt *SqlInsertDefinition, token string) (*user.User, error)
Check that the user has the required permission to execute the action
func (*DataTableContext) WorkspaceInsertRows ¶
func (ctx *DataTableContext) WorkspaceInsertRows(dataTableAction *DataTableAction, token string) (results *map[string]interface{}, httpStatus int, err error)
WorkspaceInsertRows ------------------------------------------------------ Main insert row function with pre processing hooks for validating/authorizing the request Main insert row function with post processing hooks to perform work async Inserting rows using pre-defined sql statements, keyed by table name provided in dataTableAction
func (*DataTableContext) WorkspaceQueryStructure ¶
func (ctx *DataTableContext) WorkspaceQueryStructure(dataTableAction *DataTableAction, token string) (results *[]byte, httpStatus int, err error)
WorkspaceQueryStructure ------------------------------------------------------ Function to query the workspace structure, it returns a hierarchical structure modeled based on ui MenuEntry class. It uses a virtual table name to indicate the level of granularity of the structure dataTableAction.FromClauses[0].Table:
case "workspace_file_structure": structure based on files of the workspace case "workspace_object_structure": structure based on object (rule, lookup, class, etc) of the workspace
Initial implementation use workspace_file_structure NOTE: routePath must correspond to the parametrized url (needed by ui MenuEntry) NOTE: routeParam contains the routePath parameters (needed by ui MenuEntry) Input dataTableAction.Data:
[
{
"key": "123",
"workspace_name": "jets_ws",
"user_email": "email here"
}
]
Output results:
{
"key": "123",
"workspace_name": "jets_ws",
"result_type": "workspace_file_structure",
"result_data": [
{
"key": "a1",
"type": "dir",
"label": "Jet Rules",
"route_path": "/workspace/:workspace_name/jetRules",
"route_params": {
"workspace_name": "jets_ws",
},
"children": [
{
"key": "a1.1",
"type": "dir",
"label": "folder name",
"children": [
{
"key": "a1.1.1",
"type": "file",
"label": "mapping_rules.jr",
"route_path": "/workspace/:workspace_name/wsFile/:file_name",
"route_params": {
"workspace_name": "jets_ws",
"file_name": "jet_rules%03mapping_rules.jr",
}
}
]
}
]
}
]
}
type FromClause ¶
type PendingTask ¶
type RegisterFileKeyAction ¶
type SourcePeriod ¶
type SourcePeriod struct {
Key int `json:"key"`
Year int `json:"year"`
Month int `json:"month"`
Day int `json:"day"`
MonthPeriod int `json:"month_period"`
WeekPeriod int `json:"week_period"`
DayPeriod int `json:"day_period"`
}
* TODO refactor to use SourcePeriod entity
func LoadSourcePeriod ¶
func LoadSourcePeriod(dbpool *pgxpool.Pool, key int) (sp SourcePeriod, err error)
Load source period info from database by key
type SqlInsertDefinition ¶
type SqlInsertDefinition struct {
Stmt string
ColumnKeys []string
AdminOnly bool
Capability string
}
Simple definition of sql statement for insert
type StatusUpdate ¶
type StatusUpdate struct {
CpipesMode bool
CpipesEnv map[string]any
UsingSshTunnel bool
Dbpool *pgxpool.Pool
PeKey int
Status string
FileKey string
FailureDetails string
DoNotNotifyApiGateway bool
}
Status Update command line arguments When used as a delegate from apiserver Dbpool is non nil and then the connection properties (AwsDsnSecret, DbPoolSize, UsingSshTunnel, AwsRegion) are not needed.
func (*StatusUpdate) CoordinateWork ¶
func (ca *StatusUpdate) CoordinateWork() error
func (*StatusUpdate) RegisterDbTableInputSource ¶
func (ca *StatusUpdate) RegisterDbTableInputSource(schemaProviderJson string) error
Register db_table Input Source in input_registry table. This is used by pipelines to rely on db_table input sources, generally for executing reports.
func (*StatusUpdate) RegisterDomainTables ¶
func (ca *StatusUpdate) RegisterDomainTables() error
Register Domain Table with input_registry
func (*StatusUpdate) RegisterFileInputSource ¶
func (ca *StatusUpdate) RegisterFileInputSource() error
Register File Input Source in input_registry table. This is used by the Jets_Loader process to register the input files.
func (*StatusUpdate) ValidateArguments ¶
func (ca *StatusUpdate) ValidateArguments() []string
Package Main Functions --------------------------------------------------------------------------------------
type ThrottlingSpec ¶
type ThrottlingSpec struct {
MaxConcurrentPipelines int `json:"max_concurrent"`
MaxPipeline int `json:"max_for_size"`
Size int `json:"size"`
}
Size in GiB
func (ThrottlingSpec) String ¶
func (t ThrottlingSpec) String() string
type WhereClause ¶
type WhereClause struct {
Table string `json:"table"`
Column string `json:"column"`
Values []string `json:"values"`
NotInValues []string `json:"not_in_values"`
JoinWith string `json:"joinWith"`
Like string `json:"like"`
Ge string `json:"ge"`
Le string `json:"le"`
// Adding a simple or clause
OrWith *WhereClause `json:"orWith"`
}