Documentation
¶
Index ¶
- func BuildSyncReqForLegacyOrNew(args interface{}) (*types.ExecutionRequest, error)
- func CleanupConfigFiles(workDir string, configs []types.JobConfig)
- func CreateDirectory(dirPath string) error
- func DeleteDirectory(dirPath string) error
- func ExtractJSONAndMarshal(output string) ([]byte, error)
- func GetConfigDir() string
- func GetDockerImageName(sourceType, version string) string
- func GetExecutorEnvironment() string
- func GetHostOutputDir(outputDir string) string
- func GetStateFileFromWorkdir(workflowID string, command types.Command) (string, error)
- func GetTelemetryUserID() string
- func GetWorkerEnvVars() map[string]string
- func GetWorkflowDirAndSubDir(workflowID string, command types.Command) (string, string)
- func GetWorkflowDirectory(operation types.Command, originalWorkflowID string) string
- func InitLogCleaner(logDir string, retentionPeriod int)
- func IsStateEmpty(state string) bool
- func PrepareWorkflowLogger(ctx context.Context, workflowID string, command types.Command) (context.Context, *logger.WorkflowLogFile, error)
- func ReadFile(filePath string) (string, error)
- func RemoveFlagFromArgs(arguments []string, flagName string) []string
- func RetryWithBackoff(fn func() error, maxRetries int, initialDelay time.Duration) error
- func RevertUpdatesInSchedule(req *types.ExecutionRequest)
- func SetupWorkDirectory(workDirPath string) error
- func Ternary(condition bool, trueValue, falseValue interface{}) interface{}
- func Unmarshal(from, object any) error
- func UpdateConfigForClearDestination(jobDetails types.JobData, req *types.ExecutionRequest) error
- func UpdateConfigWithJobDetails(jobData types.JobData, req *types.ExecutionRequest)
- func UpdateSyncRequestForLegacy(job types.JobData, req *types.ExecutionRequest)
- func WorkflowAlreadyLaunched(workdir string) bool
- func WorkflowHash(workflowID string) string
- func WriteConfigFiles(workDir string, configs []types.JobConfig) error
- func WriteFile(filePath string, data []byte) error
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildSyncReqForLegacyOrNew ¶
func BuildSyncReqForLegacyOrNew(args interface{}) (*types.ExecutionRequest, error)
BuildSyncReqForLegacyOrNew builds execution request based on the workflow signature
NEW workflow request - type: map[string]interface{} - return the req as it is
OLD workflow request - Deprecated - type: int - maps the jobID to the new request format and returns it
func CleanupConfigFiles ¶
func CreateDirectory ¶
CreateDirectory creates a directory with the specified permissions if it doesn't exist
func DeleteDirectory ¶
func ExtractJSONAndMarshal ¶
ExtractJSONAndMarshal extracts and returns the last valid JSON block from output
func GetConfigDir ¶
func GetConfigDir() string
func GetDockerImageName ¶
func GetExecutorEnvironment ¶
func GetExecutorEnvironment() string
func GetHostOutputDir ¶
getHostOutputDir returns the host output directory
func GetStateFileFromWorkdir ¶
func GetTelemetryUserID ¶
func GetTelemetryUserID() string
func GetWorkerEnvVars ¶
GetWorkerEnvVars returns the environment variables from the worker container.
func GetWorkflowDirAndSubDir ¶
func GetWorkflowDirectory ¶
GetWorkflowDirectory determines the directory name based on operation and workflow ID
func InitLogCleaner ¶
starts a log cleaner that removes old logs from the specified directory based on the retention period
func IsStateEmpty ¶
IsStateEmpty returns true if the state is empty or an empty JSON object
func PrepareWorkflowLogger ¶
func PrepareWorkflowLogger(ctx context.Context, workflowID string, command types.Command) (context.Context, *logger.WorkflowLogFile, error)
PrepareWorkflowLogger ensures the workflow directory exists and initializes the workflow logger. It returns the new context with the workflow logger attached, and the log file handle that must be closed when the workflow finishes.
func RemoveFlagFromArgs ¶
RemoveFlagFromArgs returns a new slice with the given flag and its associated value removed.
func RetryWithBackoff ¶
RetryWithBackoff retries a function with exponential backoff
func RevertUpdatesInSchedule ¶
func RevertUpdatesInSchedule(req *types.ExecutionRequest)
RevertUpdatesInSchedule reverts the updates made to the schedule for clear-destination request
func SetupWorkDirectory ¶
func Ternary ¶
func Ternary(condition bool, trueValue, falseValue interface{}) interface{}
Ternary returns trueValue if condition is true, otherwise returns falseValue
func UpdateConfigForClearDestination ¶
func UpdateConfigForClearDestination(jobDetails types.JobData, req *types.ExecutionRequest) error
func UpdateConfigWithJobDetails ¶
func UpdateConfigWithJobDetails(jobData types.JobData, req *types.ExecutionRequest)
func UpdateSyncRequestForLegacy ¶
func UpdateSyncRequestForLegacy(job types.JobData, req *types.ExecutionRequest)
UpdateSyncRequest updates the ExecutionRequest for deprecated sync workflow
func WorkflowAlreadyLaunched ¶
WorkflowAlreadyLaunched checks for olake.log file in the workdir/logs
workdir/logs/sync_<timestamp>/olake.log - present -> workflow has started already not present -> workflow is running for the first time
func WorkflowHash ¶
WorkflowHash returns a deterministic hash string for a given workflowID
Types ¶
This section is empty.