googlecloudclustercomposer_impl

package
v0.52.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 3, 2026 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

AirflowDagProcessorLogParseTask parses Airflow DAG processor manager logs.

AirflowSchedulerLogParseTask parses Airflow scheduler logs.

AirflowWorkerLogParseTask parses Airflow worker logs.

View Source
var AutocompleteComposerClusterNamesTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteComposerClusterNamesTaskID, []taskid.UntypedTaskReference{
	googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID.Ref(),
	googlecloudcommon_contract.InputProjectIdTaskID.Ref(),
	googlecloudcommon_contract.InputLocationsTaskID.Ref(),
	googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(),
	googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref(),
}, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]) (inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]], error) {

	projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref())
	environment := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref())
	location := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputLocationsTaskID.Ref())

	dependencyDigest := fmt.Sprintf("%s-%s-%s", projectID, environment, location)

	isWIP := projectID == "" || environment == ""
	if isWIP {
		return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{
			DependencyDigest: dependencyDigest,
			Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{
				Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{},
				Error:  "Project ID or Composer environment name is empty",
			},
		}, nil
	}

	if environment != "" && dependencyDigest == prevValue.DependencyDigest {
		return prevValue, nil
	}

	clusterFinder := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID.Ref())
	clusterName, err := clusterFinder.GetGKEClusterName(ctx, projectID, environment)
	if err != nil {
		if errors.Is(err, googlecloudclustercomposer_contract.ErrEnvironmentClusterNotFound) {
			return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{
				DependencyDigest: dependencyDigest,
				Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{
					Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{},
					Error: `Not found. It works for the clusters existed in the past but make sure the cluster name is right if you believe the cluster should be there.
Note: Composer 3 does not run on your GKE. Please remove all Kubernetes/GKE questies from the previous section.`,
				},
			}, nil
		}
		return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{
			DependencyDigest: dependencyDigest,
			Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{
				Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{},
				Error:  "Failed to fetch the list GKE cluster. Please confirm if the Project ID is correct, or retry later",
			},
		}, nil
	}

	return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{
		DependencyDigest: dependencyDigest,
		Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{
			Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{
				{
					ClusterName: clusterName,
					ProjectID:   projectID,
					Location:    location,
				},
			},
		},
	}, nil
}, inspectioncore_contract.InspectionTypeLabel(googlecloudclustercomposer_contract.InspectionTypeId),
	coretask.WithSelectionPriority(1000),
)

AutocompleteComposerClusterNamesTask is an implementation for googlecloudk8scommon_contract.AutocompleteClusterNamesTaskID the task returns GKE cluster name where the provided Composer environment is running.

View Source
var AutocompleteComposerEnvironmentIdentityTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID, []taskid.UntypedTaskReference{
	googlecloudcommon_contract.InputProjectIdTaskID.Ref(),
	googlecloudcommon_contract.InputStartTimeTaskID.Ref(),
	googlecloudcommon_contract.InputEndTimeTaskID.Ref(),
	googlecloudcommon_contract.APIClientFactoryTaskID.Ref(),
	googlecloudcommon_contract.APIClientCallOptionsInjectorTaskID.Ref(),
}, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]]) (inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]], error) {
	projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref())
	startTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputStartTimeTaskID.Ref())
	endTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputEndTimeTaskID.Ref())
	cf := coretask.GetTaskResult(ctx, googlecloudcommon_contract.APIClientFactoryTaskID.Ref())
	optionInjector := coretask.GetTaskResult(ctx, googlecloudcommon_contract.APIClientCallOptionsInjectorTaskID.Ref())

	currentDigest := fmt.Sprintf("%s-%d-%d", projectID, startTime.Unix(), endTime.Unix())
	if currentDigest == prevValue.DependencyDigest {
		return prevValue, nil
	}
	if projectID == "" {
		return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]]{
			Value: &inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]{
				Values: []googlecloudclustercomposer_contract.ComposerEnvironmentIdentity{},
				Error:  "",
				Hint:   "Composer environments are suggested after the project ID is provided.",
			},
			DependencyDigest: currentDigest,
		}, nil
	}

	errorString := ""
	hintString := ""
	if endTime.Before(time.Now().Add(-time.Hour * 24 * 30 * 24)) {
		hintString = "The end time is more than 24 months ago. Suggested environment names may not be complete."
	}

	client, err := cf.MonitoringMetricClient(ctx, googlecloud.Project(projectID))
	if err != nil {
		return prevValue, fmt.Errorf("failed to create monitoring metric client: %w", err)
	}
	defer client.Close()

	ctx = optionInjector.InjectToCallContext(ctx, googlecloud.Project(projectID))

	filter := `metric.type="composer.googleapis.com/environment/healthy" AND resource.type="cloud_composer_environment"`

	metricsLabels, err := googlecloud.QueryResourceLabelsFromMetrics(ctx, client, projectID, filter, startTime, endTime, []string{"resource.label.environment_name", "resource.label.location"})
	if err != nil {
		errorString = err.Error()
	}

	if hintString == "" && errorString == "" && len(metricsLabels) == 0 {
		hintString = fmt.Sprintf("No Composer environments found between %s and %s. It is highly likely that the time range is incorrect. Please verify the time range, or proceed by manually entering the environment name.", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339))
	}

	identities := make([]googlecloudclustercomposer_contract.ComposerEnvironmentIdentity, 0, len(metricsLabels))
	for _, labels := range metricsLabels {
		envName := labels["environment_name"]
		location := labels["location"]
		if envName != "" && location != "" {
			identities = append(identities, googlecloudclustercomposer_contract.ComposerEnvironmentIdentity{
				ProjectID:       projectID,
				Location:        location,
				EnvironmentName: envName,
			})
		}
	}

	return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]]{
		DependencyDigest: currentDigest,
		Value: &inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]{
			Values: identities,
			Error:  errorString,
			Hint:   hintString,
		},
	}, nil
})

AutocompleteComposerEnvironmentIdentityTask is the task that autocompletes composer environment identities.

View Source
var AutocompleteLocationForComposerEnvironmentTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteLocationForComposerEnvironmentTaskID, []taskid.UntypedTaskReference{
	googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref(),
	googlecloudcommon_contract.InputProjectIdTaskID.Ref(),
	googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(),
	googlecloudcommon_contract.InputStartTimeTaskID.Ref(),
	googlecloudcommon_contract.InputEndTimeTaskID.Ref(),
}, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]) (inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]], error) {
	projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref())
	environmentName := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref())
	startTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputStartTimeTaskID.Ref())
	endTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputEndTimeTaskID.Ref())
	identities := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref())

	currentDigest := fmt.Sprintf("%s-%s-%d-%d", projectID, environmentName, startTime.Unix(), endTime.Unix())
	if currentDigest == prevValue.DependencyDigest {
		return prevValue, nil
	}

	if projectID == "" {
		return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{
			Value: &inspectioncore_contract.AutocompleteResult[string]{
				Values: []string{},
				Error:  "",
				Hint:   "Locations are suggested after the project ID is provided.",
			},
			DependencyDigest: currentDigest,
		}, nil
	}

	if environmentName == "" {
		return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{
			Value: &inspectioncore_contract.AutocompleteResult[string]{
				Values: []string{},
				Error:  "",
				Hint:   "Locations are suggested after the environment name is provided.",
			},
			DependencyDigest: currentDigest,
		}, nil
	}

	if identities.Error != "" {
		return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{
			Value: &inspectioncore_contract.AutocompleteResult[string]{
				Values: []string{},
				Error:  identities.Error,
				Hint:   identities.Hint,
			},
			DependencyDigest: currentDigest,
		}, nil
	}

	locationsMap := make(map[string]struct{})
	for _, identity := range identities.Values {
		if identity.EnvironmentName == environmentName {
			locationsMap[identity.Location] = struct{}{}
		}
	}

	locations := make([]string, 0, len(locationsMap))
	for location := range locationsMap {
		locations = append(locations, location)
	}

	return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{
		Value: &inspectioncore_contract.AutocompleteResult[string]{
			Values: locations,
			Error:  "",
			Hint:   identities.Hint,
		},
		DependencyDigest: currentDigest,
	}, nil
}, inspectioncore_contract.InspectionTypeLabel(googlecloudclustercomposer_contract.InspectionTypeId),
	coretask.WithSelectionPriority(1000),
)

ComposerClusterNamePrefixTask is the task that returns the composer cluster name prefix.

View Source
var ComposerDagProcessorManagerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask(
	googlecloudclustercomposer_contract.ComposerDagProcessorManagerLogQueryTaskID,
	"Composer Environment/DAG Processor Manager",
	enum.LogTypeComposerEnvironment,
	[]taskid.UntypedTaskReference{
		googlecloudcommon_contract.InputProjectIdTaskID.Ref(),
		googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(),
	},
	&googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{},
	createGenerator("dag-processor-manager"),
	generateQueryForComponent("sample-composer-environment", "test-project", "dag-processor-manager"),
)

ComposerDagProcessorManagerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.

ComposerEnvironmentClusterFinderTask injects ComposerEnvironmentClusterFinder implementation.

ComposerEnvironmentListFetcherTask injects ComposerEnvironmentListFetcher implementation.

View Source
var ComposerMonitoringLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask(
	googlecloudclustercomposer_contract.ComposerMonitoringLogQueryTaskID,
	"Composer Environment/Airflow Monitoring",
	enum.LogTypeComposerEnvironment,
	[]taskid.UntypedTaskReference{
		googlecloudcommon_contract.InputProjectIdTaskID.Ref(),
		googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(),
	},
	&googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{},
	createGenerator("airflow-monitoring"),
	generateQueryForComponent("sample-composer-environment", "test-project", "airflow-monitoring"),
)

ComposerMonitoringLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.

View Source
var ComposerSchedulerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask(
	googlecloudclustercomposer_contract.ComposerSchedulerLogQueryTaskID,
	"Composer Environment/Airflow Scheduler",
	enum.LogTypeComposerEnvironment,
	[]taskid.UntypedTaskReference{
		googlecloudcommon_contract.InputProjectIdTaskID.Ref(),
		googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(),
	},
	&googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{},
	createGenerator("airflow-scheduler"),
	generateQueryForComponent("sample-composer-environment", "test-project", "airflow-scheduler"),
)

ComposerSchedulerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.

ComposerWorkerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.

View Source
var InputComposerEnvironmentNameTask = formtask.NewTextFormTaskBuilder(googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID, googlecloudcommon_contract.PriorityForResourceIdentifierGroup+4400, "Composer Environment Name").WithDependencies(
	[]taskid.UntypedTaskReference{googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref()},
).WithSuggestionsFunc(func(ctx context.Context, value string, previousValues []string) ([]string, error) {
	environments := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref())
	if environments.Error != "" {
		return []string{}, nil
	}
	environmentNames := make([]string, len(environments.Values))
	for i, env := range environments.Values {
		environmentNames[i] = env.EnvironmentName
	}
	return common.SortForAutocomplete(value, environmentNames), nil
}).Build()

InputComposerEnvironmentNameTask is the task that inputs composer environment name.

Functions

func Register

func Register(registry coreinspection.InspectionTaskRegistry) error

Register registers all googlecloudclustercomposer inspection tasks to the registry.

Types

This section is empty.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL