utils

package
v0.0.0-...-f8747f6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 9, 2026 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildSyncReqForLegacyOrNew

func BuildSyncReqForLegacyOrNew(args interface{}) (*types.ExecutionRequest, error)

BuildSyncReqForLegacyOrNew builds execution request based on the workflow signature

NEW workflow request - type: map[string]interface{} - return the req as it is

OLD workflow request - Deprecated - type: int - maps the jobID to the new request format and returns it

func CleanupConfigFiles

func CleanupConfigFiles(workDir string, configs []types.JobConfig)

func CreateDirectory

func CreateDirectory(dirPath string) error

CreateDirectory creates a directory with the specified permissions if it doesn't exist

func DeleteDirectory

func DeleteDirectory(dirPath string) error

func ExtractJSONAndMarshal

func ExtractJSONAndMarshal(output string) ([]byte, error)

ExtractJSONAndMarshal extracts and returns the last valid JSON block from output

func GetConfigDir

func GetConfigDir() string

func GetDockerImageName

func GetDockerImageName(sourceType, version string) string

func GetExecutorEnvironment

func GetExecutorEnvironment() string

func GetHostOutputDir

func GetHostOutputDir(outputDir string) string

getHostOutputDir returns the host output directory

func GetStateFileFromWorkdir

func GetStateFileFromWorkdir(workflowID string, command types.Command) (string, error)

func GetTelemetryUserID

func GetTelemetryUserID() string

func GetWorkerEnvVars

func GetWorkerEnvVars() map[string]string

GetWorkerEnvVars returns the environment variables from the worker container.

func GetWorkflowDirAndSubDir

func GetWorkflowDirAndSubDir(workflowID string, command types.Command) (string, string)

func GetWorkflowDirectory

func GetWorkflowDirectory(operation types.Command, originalWorkflowID string) string

GetWorkflowDirectory determines the directory name based on operation and workflow ID

func InitLogCleaner

func InitLogCleaner(logDir string, retentionPeriod int)

starts a log cleaner that removes old logs from the specified directory based on the retention period

func IsStateEmpty

func IsStateEmpty(state string) bool

IsStateEmpty returns true if the state is empty or an empty JSON object

func PrepareWorkflowLogger

func PrepareWorkflowLogger(ctx context.Context, workflowID string, command types.Command) (context.Context, *logger.WorkflowLogFile, error)

PrepareWorkflowLogger ensures the workflow directory exists and initializes the workflow logger. It returns the new context with the workflow logger attached, and the log file handle that must be closed when the workflow finishes.

func ReadFile

func ReadFile(filePath string) (string, error)

ReadFile parses a JSON file into a map

func RemoveFlagFromArgs

func RemoveFlagFromArgs(arguments []string, flagName string) []string

RemoveFlagFromArgs returns a new slice with the given flag and its associated value removed.

func RetryWithBackoff

func RetryWithBackoff(fn func() error, maxRetries int, initialDelay time.Duration) error

RetryWithBackoff retries a function with exponential backoff

func RevertUpdatesInSchedule

func RevertUpdatesInSchedule(req *types.ExecutionRequest)

RevertUpdatesInSchedule reverts the updates made to the schedule for clear-destination request

func SetupWorkDirectory

func SetupWorkDirectory(workDirPath string) error

func Ternary

func Ternary(condition bool, trueValue, falseValue interface{}) interface{}

Ternary returns trueValue if condition is true, otherwise returns falseValue

func Unmarshal

func Unmarshal(from, object any) error

Unmarshal serializes and deserializes any from into the object

func UpdateConfigForClearDestination

func UpdateConfigForClearDestination(jobDetails types.JobData, req *types.ExecutionRequest) error

func UpdateConfigWithJobDetails

func UpdateConfigWithJobDetails(jobData types.JobData, req *types.ExecutionRequest)

func UpdateSyncRequestForLegacy

func UpdateSyncRequestForLegacy(job types.JobData, req *types.ExecutionRequest)

UpdateSyncRequest updates the ExecutionRequest for deprecated sync workflow

func WorkflowAlreadyLaunched

func WorkflowAlreadyLaunched(workdir string) bool

WorkflowAlreadyLaunched checks for olake.log file in the workdir/logs

workdir/logs/sync_<timestamp>/olake.log - present -> workflow has started already not present -> workflow is running for the first time

func WorkflowHash

func WorkflowHash(workflowID string) string

WorkflowHash returns a deterministic hash string for a given workflowID

func WriteConfigFiles

func WriteConfigFiles(workDir string, configs []types.JobConfig) error

func WriteFile

func WriteFile(filePath string, data []byte) error

WriteFile writes data to a file, creating the directory if necessary

Types

This section is empty.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL