Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var AirflowDagProcessorLogParseTask = legacyparser.NewParserTaskFromParser( googlecloudclustercomposer_contract.AirflowDagProcessorManagerLogParserTaskID, airflowdagprocessor.NewAirflowDagProcessorParser("/home/airflow/gcs/dags/", googlecloudclustercomposer_contract.ComposerDagProcessorManagerLogQueryTaskID.Ref(), enum.LogTypeComposerEnvironment), 102000, true, []string{googlecloudclustercomposer_contract.InspectionTypeId}, )
AirflowDagProcessorLogParseTask parses Airflow DAG processor manager logs.
var AirflowSchedulerLogParseTask = legacyparser.NewParserTaskFromParser( googlecloudclustercomposer_contract.AirflowSchedulerLogParserTaskID, airflowscheduler.NewAirflowSchedulerParser(googlecloudclustercomposer_contract.ComposerSchedulerLogQueryTaskID.Ref(), enum.LogTypeComposerEnvironment), 100000, true, []string{googlecloudclustercomposer_contract.InspectionTypeId}, )
AirflowSchedulerLogParseTask parses Airflow scheduler logs.
var AirflowWorkerLogParseTask = legacyparser.NewParserTaskFromParser( googlecloudclustercomposer_contract.AirflowWorkerLogParserTaskID, airflowworker.NewAirflowWorkerParser(googlecloudclustercomposer_contract.ComposerWorkerLogQueryTaskID.Ref(), enum.LogTypeComposerEnvironment), 101000, true, []string{googlecloudclustercomposer_contract.InspectionTypeId}, )
AirflowWorkerLogParseTask parses Airflow worker logs.
var AutocompleteComposerClusterNamesTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteComposerClusterNamesTaskID, []taskid.UntypedTaskReference{ googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID.Ref(), googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudcommon_contract.InputLocationsTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref(), }, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]) (inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]], error) { projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref()) environment := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref()) location := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputLocationsTaskID.Ref()) dependencyDigest := fmt.Sprintf("%s-%s-%s", projectID, environment, location) isWIP := projectID == "" || environment == "" if isWIP { return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{ DependencyDigest: dependencyDigest, Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{ Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{}, Error: "Project ID or Composer environment name is empty", }, }, nil } if environment != "" && dependencyDigest == prevValue.DependencyDigest { return prevValue, nil } clusterFinder := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID.Ref()) clusterName, err := clusterFinder.GetGKEClusterName(ctx, projectID, environment) if err != nil { if errors.Is(err, googlecloudclustercomposer_contract.ErrEnvironmentClusterNotFound) { return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{ DependencyDigest: dependencyDigest, Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{ Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{}, Error: `Not found. It works for the clusters existed in the past but make sure the cluster name is right if you believe the cluster should be there. Note: Composer 3 does not run on your GKE. Please remove all Kubernetes/GKE questies from the previous section.`, }, }, nil } return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{ DependencyDigest: dependencyDigest, Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{ Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{}, Error: "Failed to fetch the list GKE cluster. Please confirm if the Project ID is correct, or retry later", }, }, nil } return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]]{ DependencyDigest: dependencyDigest, Value: &inspectioncore_contract.AutocompleteResult[googlecloudk8scommon_contract.GoogleCloudClusterIdentity]{ Values: []googlecloudk8scommon_contract.GoogleCloudClusterIdentity{ { ClusterName: clusterName, ProjectID: projectID, Location: location, }, }, }, }, nil }, inspectioncore_contract.InspectionTypeLabel(googlecloudclustercomposer_contract.InspectionTypeId), coretask.WithSelectionPriority(1000), )
AutocompleteComposerClusterNamesTask is an implementation for googlecloudk8scommon_contract.AutocompleteClusterNamesTaskID the task returns GKE cluster name where the provided Composer environment is running.
var AutocompleteComposerEnvironmentIdentityTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudcommon_contract.InputStartTimeTaskID.Ref(), googlecloudcommon_contract.InputEndTimeTaskID.Ref(), googlecloudcommon_contract.APIClientFactoryTaskID.Ref(), googlecloudcommon_contract.APIClientCallOptionsInjectorTaskID.Ref(), }, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]]) (inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]], error) { projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref()) startTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputStartTimeTaskID.Ref()) endTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputEndTimeTaskID.Ref()) cf := coretask.GetTaskResult(ctx, googlecloudcommon_contract.APIClientFactoryTaskID.Ref()) optionInjector := coretask.GetTaskResult(ctx, googlecloudcommon_contract.APIClientCallOptionsInjectorTaskID.Ref()) currentDigest := fmt.Sprintf("%s-%d-%d", projectID, startTime.Unix(), endTime.Unix()) if currentDigest == prevValue.DependencyDigest { return prevValue, nil } if projectID == "" { return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]]{ Value: &inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]{ Values: []googlecloudclustercomposer_contract.ComposerEnvironmentIdentity{}, Error: "", Hint: "Composer environments are suggested after the project ID is provided.", }, DependencyDigest: currentDigest, }, nil } errorString := "" hintString := "" if endTime.Before(time.Now().Add(-time.Hour * 24 * 30 * 24)) { hintString = "The end time is more than 24 months ago. Suggested environment names may not be complete." } client, err := cf.MonitoringMetricClient(ctx, googlecloud.Project(projectID)) if err != nil { return prevValue, fmt.Errorf("failed to create monitoring metric client: %w", err) } defer client.Close() ctx = optionInjector.InjectToCallContext(ctx, googlecloud.Project(projectID)) filter := `metric.type="composer.googleapis.com/environment/healthy" AND resource.type="cloud_composer_environment"` metricsLabels, err := googlecloud.QueryResourceLabelsFromMetrics(ctx, client, projectID, filter, startTime, endTime, []string{"resource.label.environment_name", "resource.label.location"}) if err != nil { errorString = err.Error() } if hintString == "" && errorString == "" && len(metricsLabels) == 0 { hintString = fmt.Sprintf("No Composer environments found between %s and %s. It is highly likely that the time range is incorrect. Please verify the time range, or proceed by manually entering the environment name.", startTime.Format(time.RFC3339), endTime.Format(time.RFC3339)) } identities := make([]googlecloudclustercomposer_contract.ComposerEnvironmentIdentity, 0, len(metricsLabels)) for _, labels := range metricsLabels { envName := labels["environment_name"] location := labels["location"] if envName != "" && location != "" { identities = append(identities, googlecloudclustercomposer_contract.ComposerEnvironmentIdentity{ ProjectID: projectID, Location: location, EnvironmentName: envName, }) } } return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]]{ DependencyDigest: currentDigest, Value: &inspectioncore_contract.AutocompleteResult[googlecloudclustercomposer_contract.ComposerEnvironmentIdentity]{ Values: identities, Error: errorString, Hint: hintString, }, }, nil })
AutocompleteComposerEnvironmentIdentityTask is the task that autocompletes composer environment identities.
var AutocompleteLocationForComposerEnvironmentTask = inspectiontaskbase.NewCachedTask(googlecloudclustercomposer_contract.AutocompleteLocationForComposerEnvironmentTaskID, []taskid.UntypedTaskReference{ googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref(), googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), googlecloudcommon_contract.InputStartTimeTaskID.Ref(), googlecloudcommon_contract.InputEndTimeTaskID.Ref(), }, func(ctx context.Context, prevValue inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]) (inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]], error) { projectID := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputProjectIdTaskID.Ref()) environmentName := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref()) startTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputStartTimeTaskID.Ref()) endTime := coretask.GetTaskResult(ctx, googlecloudcommon_contract.InputEndTimeTaskID.Ref()) identities := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref()) currentDigest := fmt.Sprintf("%s-%s-%d-%d", projectID, environmentName, startTime.Unix(), endTime.Unix()) if currentDigest == prevValue.DependencyDigest { return prevValue, nil } if projectID == "" { return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{ Value: &inspectioncore_contract.AutocompleteResult[string]{ Values: []string{}, Error: "", Hint: "Locations are suggested after the project ID is provided.", }, DependencyDigest: currentDigest, }, nil } if environmentName == "" { return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{ Value: &inspectioncore_contract.AutocompleteResult[string]{ Values: []string{}, Error: "", Hint: "Locations are suggested after the environment name is provided.", }, DependencyDigest: currentDigest, }, nil } if identities.Error != "" { return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{ Value: &inspectioncore_contract.AutocompleteResult[string]{ Values: []string{}, Error: identities.Error, Hint: identities.Hint, }, DependencyDigest: currentDigest, }, nil } locationsMap := make(map[string]struct{}) for _, identity := range identities.Values { if identity.EnvironmentName == environmentName { locationsMap[identity.Location] = struct{}{} } } locations := make([]string, 0, len(locationsMap)) for location := range locationsMap { locations = append(locations, location) } return inspectiontaskbase.CacheableTaskResult[*inspectioncore_contract.AutocompleteResult[string]]{ Value: &inspectioncore_contract.AutocompleteResult[string]{ Values: locations, Error: "", Hint: identities.Hint, }, DependencyDigest: currentDigest, }, nil }, inspectioncore_contract.InspectionTypeLabel(googlecloudclustercomposer_contract.InspectionTypeId), coretask.WithSelectionPriority(1000), )
var ComposerClusterNamePrefixTask = coretask.NewTask(googlecloudclustercomposer_contract.ComposerClusterNamePrefixTaskID, []taskid.UntypedTaskReference{}, func(ctx context.Context) (string, error) { return "", nil }, inspectioncore_contract.InspectionTypeLabel(googlecloudclustercomposer_contract.InspectionTypeId))
ComposerClusterNamePrefixTask is the task that returns the composer cluster name prefix.
var ComposerDagProcessorManagerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerDagProcessorManagerLogQueryTaskID, "Composer Environment/DAG Processor Manager", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("dag-processor-manager"), generateQueryForComponent("sample-composer-environment", "test-project", "dag-processor-manager"), )
ComposerDagProcessorManagerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var ComposerEnvironmentClusterFinderTask = coretask.NewTask( googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinderTaskID, []taskid.UntypedTaskReference{ googlecloudcommon_contract.APIClientFactoryTaskID.Ref(), }, func(ctx context.Context) (googlecloudclustercomposer_contract.ComposerEnvironmentClusterFinder, error) { return &googlecloudclustercomposer_contract.EnvironmentClusterFinderImpl{}, nil }, )
ComposerEnvironmentClusterFinderTask injects ComposerEnvironmentClusterFinder implementation.
var ComposerEnvironmentListFetcherTask = coretask.NewTask( googlecloudclustercomposer_contract.ComposerEnvironmentListFetcherTaskID, []taskid.UntypedTaskReference{ googlecloudcommon_contract.APIClientFactoryTaskID.Ref(), }, func(ctx context.Context) (googlecloudclustercomposer_contract.ComposerEnvironmentListFetcher, error) { return &googlecloudclustercomposer_contract.ComposerEnvironmentListFetcherImpl{}, nil }, )
ComposerEnvironmentListFetcherTask injects ComposerEnvironmentListFetcher implementation.
var ComposerMonitoringLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerMonitoringLogQueryTaskID, "Composer Environment/Airflow Monitoring", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("airflow-monitoring"), generateQueryForComponent("sample-composer-environment", "test-project", "airflow-monitoring"), )
ComposerMonitoringLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var ComposerSchedulerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerSchedulerLogQueryTaskID, "Composer Environment/Airflow Scheduler", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("airflow-scheduler"), generateQueryForComponent("sample-composer-environment", "test-project", "airflow-scheduler"), )
ComposerSchedulerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var ComposerWorkerLogQueryTask = googlecloudcommon_contract.NewLegacyCloudLoggingListLogTask( googlecloudclustercomposer_contract.ComposerWorkerLogQueryTaskID, "Composer Environment/Airflow Worker", enum.LogTypeComposerEnvironment, []taskid.UntypedTaskReference{ googlecloudcommon_contract.InputProjectIdTaskID.Ref(), googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID.Ref(), }, &googlecloudcommon_contract.ProjectIDDefaultResourceNamesGenerator{}, createGenerator("airflow-worker"), generateQueryForComponent("sample-composer-environment", "test-project", "airflow-worker"), )
ComposerWorkerLogQueryTask defines a task that gather Cloud Composer scheduler logs from Cloud Logging.
var InputComposerEnvironmentNameTask = formtask.NewTextFormTaskBuilder(googlecloudclustercomposer_contract.InputComposerEnvironmentNameTaskID, googlecloudcommon_contract.PriorityForResourceIdentifierGroup+4400, "Composer Environment Name").WithDependencies( []taskid.UntypedTaskReference{googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref()}, ).WithSuggestionsFunc(func(ctx context.Context, value string, previousValues []string) ([]string, error) { environments := coretask.GetTaskResult(ctx, googlecloudclustercomposer_contract.AutocompleteComposerEnvironmentIdentityTaskID.Ref()) if environments.Error != "" { return []string{}, nil } environmentNames := make([]string, len(environments.Values)) for i, env := range environments.Values { environmentNames[i] = env.EnvironmentName } return common.SortForAutocomplete(value, environmentNames), nil }).Build()
InputComposerEnvironmentNameTask is the task that inputs composer environment name.
Functions ¶
func Register ¶
func Register(registry coreinspection.InspectionTaskRegistry) error
Register registers all googlecloudclustercomposer inspection tasks to the registry.
Types ¶
This section is empty.