diff --git a/cmd/cdi-controller/controller.go b/cmd/cdi-controller/controller.go index 99aa3815b..37487ddf6 100644 --- a/cmd/cdi-controller/controller.go +++ b/cmd/cdi-controller/controller.go @@ -209,6 +209,10 @@ func start(ctx context.Context, cfg *rest.Config) { klog.Errorf("Unable to setup dataimportcron controller: %v", err) os.Exit(1) } + if _, err := controller.NewDataSourceController(mgr, log, installerLabels); err != nil { + klog.Errorf("Unable to setup datasource controller: %v", err) + os.Exit(1) + } klog.V(1).Infoln("created cdi controllers") diff --git a/pkg/controller/BUILD.bazel b/pkg/controller/BUILD.bazel index b6d58295c..1efff3d4f 100644 --- a/pkg/controller/BUILD.bazel +++ b/pkg/controller/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "config-controller.go", "dataimportcron-conditions.go", "dataimportcron-controller.go", + "datasource-controller.go", "datavolume-conditions.go", "datavolume-controller.go", "import-controller.go", diff --git a/pkg/controller/dataimportcron-conditions.go b/pkg/controller/dataimportcron-conditions.go index dcbc2d327..baeaf4b75 100644 --- a/pkg/controller/dataimportcron-conditions.go +++ b/pkg/controller/dataimportcron-conditions.go @@ -29,9 +29,7 @@ const ( outdated = "Outdated" scheduled = "ImportScheduled" inProgress = "ImportProgressing" - failed = "ImportFailed" upToDate = "UpToDate" - ready = "Ready" ) func updateDataImportCronCondition(cron *cdiv1.DataImportCron, conditionType cdiv1.DataImportCronConditionType, status corev1.ConditionStatus, message, reason string) { @@ -61,26 +59,6 @@ func FindDataImportCronConditionByType(cron *cdiv1.DataImportCron, conditionType return nil } -func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) { - if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil { - updateConditionState(&condition.ConditionState, status, message, reason) - } else { - condition = &cdiv1.DataSourceCondition{Type: conditionType} - updateConditionState(&condition.ConditionState, status, message, reason) - ds.Status.Conditions = append(ds.Status.Conditions, *condition) - } -} - -// FindDataSourceConditionByType finds DataSourceCondition by condition type -func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition { - for i, condition := range ds.Status.Conditions { - if condition.Type == conditionType { - return &ds.Status.Conditions[i] - } - } - return nil -} - func updateConditionState(condition *cdiv1.ConditionState, status corev1.ConditionStatus, message, reason string) { conditionStatusUpdated := condition.Status != status conditionUpdated := conditionStatusUpdated || condition.Message != message || condition.Reason != reason diff --git a/pkg/controller/dataimportcron-controller.go b/pkg/controller/dataimportcron-controller.go index 7e517f7d3..2f078f7fc 100644 --- a/pkg/controller/dataimportcron-controller.go +++ b/pkg/controller/dataimportcron-controller.go @@ -338,22 +338,7 @@ func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImp sourcePVC := dataImportCron.Status.LastImportedPVC if sourcePVC != nil { - dv := &cdiv1.DataVolume{} - if err := r.client.Get(ctx, types.NamespacedName{Namespace: sourcePVC.Namespace, Name: sourcePVC.Name}, dv); err != nil { - if k8serrors.IsNotFound(err) { - log.Info("DataVolume not found", "name", sourcePVC.Name) - updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "DataVolume not found", notFound) - } else { - return err - } - } else if dv.Status.Phase == cdiv1.Succeeded { - updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready) - } else { - updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase)) - } dataSource.Spec.Source.PVC = sourcePVC - } else { - updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No imports yet", noImport) } if !reflect.DeepEqual(dataSource, dataSourceCopy) { if err := r.client.Update(ctx, dataSource); err != nil { diff --git a/pkg/controller/dataimportcron-controller_test.go b/pkg/controller/dataimportcron-controller_test.go index 2871679da..8d18ac1a4 100644 --- a/pkg/controller/dataimportcron-controller_test.go +++ b/pkg/controller/dataimportcron-controller_test.go @@ -57,12 +57,13 @@ const ( var _ = Describe("All DataImportCron Tests", func() { var _ = Describe("DataImportCron controller reconcile loop", func() { var ( - reconciler *DataImportCronReconciler - cron *cdiv1.DataImportCron - dataSource *cdiv1.DataSource - cronKey = types.NamespacedName{Name: cronName, Namespace: metav1.NamespaceDefault} - cronReq = reconcile.Request{NamespacedName: cronKey} - cronJobKey = func(cron *cdiv1.DataImportCron) types.NamespacedName { + reconciler *DataImportCronReconciler + dsReconciler *DataSourceReconciler + cron *cdiv1.DataImportCron + dataSource *cdiv1.DataSource + cronKey = types.NamespacedName{Name: cronName, Namespace: metav1.NamespaceDefault} + cronReq = reconcile.Request{NamespacedName: cronKey} + cronJobKey = func(cron *cdiv1.DataImportCron) types.NamespacedName { return types.NamespacedName{Name: GetCronJobName(cron), Namespace: reconciler.cdiNamespace} } dataSourceKey = func(cron *cdiv1.DataImportCron) types.NamespacedName { @@ -87,8 +88,24 @@ var _ = Describe("All DataImportCron Tests", func() { Expect(cronCond).ToNot(BeNil()) verifyConditionState(string(cdiv1.DataImportCronUpToDate), cronCond.ConditionState, isUpToDate, reasonUpToDate) if dataSource != nil { + imports := cron.Status.CurrentImports + Expect(imports).ToNot(BeNil()) + Expect(len(imports)).ToNot(BeZero()) + dvName := imports[0].DataVolumeName + Expect(dvName).ToNot(BeEmpty()) + + dv := &cdiv1.DataVolume{} + err = reconciler.client.Get(context.TODO(), dvKey(dvName), dv) + Expect(err).ToNot(HaveOccurred()) err = reconciler.client.Get(context.TODO(), dataSourceKey(cron), dataSource) Expect(err).ToNot(HaveOccurred()) + dsReconciler = createDataSourceReconciler(dataSource, dv) + dsReq := reconcile.Request{NamespacedName: dataSourceKey(cron)} + _, err = dsReconciler.Reconcile(context.TODO(), dsReq) + Expect(err).ToNot(HaveOccurred()) + + err = dsReconciler.client.Get(context.TODO(), dataSourceKey(cron), dataSource) + Expect(err).ToNot(HaveOccurred()) dsCond := FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady) Expect(dsCond).ToNot(BeNil()) verifyConditionState(string(cdiv1.DataSourceReady), dsCond.ConditionState, isReady, reasonReady) @@ -147,7 +164,7 @@ var _ = Describe("All DataImportCron Tests", func() { err := reconciler.client.Update(context.TODO(), cron) Expect(err).ToNot(HaveOccurred()) dataSource = &cdiv1.DataSource{} - verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noImport) + verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noPvc) imports := cron.Status.CurrentImports Expect(imports).ToNot(BeNil()) @@ -165,12 +182,12 @@ var _ = Describe("All DataImportCron Tests", func() { dv.Status.Phase = cdiv1.ImportScheduled err = reconciler.client.Update(context.TODO(), dv) Expect(err).ToNot(HaveOccurred()) - verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noImport) + verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noPvc) dv.Status.Phase = cdiv1.ImportInProgress err = reconciler.client.Update(context.TODO(), dv) Expect(err).ToNot(HaveOccurred()) - verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noImport) + verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noPvc) Expect(cron.Status.LastExecutionTimestamp).ToNot(BeNil()) dv.Status.Phase = cdiv1.Succeeded diff --git a/pkg/controller/datasource-controller.go b/pkg/controller/datasource-controller.go new file mode 100644 index 000000000..472b0ca4c --- /dev/null +++ b/pkg/controller/datasource-controller.go @@ -0,0 +1,144 @@ +/* +Copyright 2022 The CDI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +limitations under the License. +See the License for the specific language governing permissions and +*/ + +package controller + +import ( + "context" + "fmt" + "reflect" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/record" + cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// DataSourceReconciler members +type DataSourceReconciler struct { + client client.Client + recorder record.EventRecorder + scheme *runtime.Scheme + log logr.Logger + installerLabels map[string]string +} + +const ( + ready = "Ready" + noPvc = "NoPvc" +) + +// Reconcile loop for DataSourceReconciler +func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + dataSource := &cdiv1.DataSource{} + if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil { + if k8serrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + if err := r.update(ctx, dataSource); err != nil { + return reconcile.Result{}, err + } + return reconcile.Result{}, nil +} + +func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error { + dataSourceCopy := dataSource.DeepCopy() + sourcePVC := dataSource.Spec.Source.PVC + if sourcePVC != nil { + dv := &cdiv1.DataVolume{} + if err := r.client.Get(ctx, types.NamespacedName{Namespace: sourcePVC.Namespace, Name: sourcePVC.Name}, dv); err != nil { + if k8serrors.IsNotFound(err) { + r.log.Info("DataVolume not found", "name", sourcePVC.Name) + updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "DataVolume not found", notFound) + } else { + return err + } + } else if dv.Status.Phase == cdiv1.Succeeded { + updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready) + } else { + updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase)) + } + } else { + updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noPvc) + } + if !reflect.DeepEqual(dataSource, dataSourceCopy) { + if err := r.client.Update(ctx, dataSource); err != nil { + return err + } + } + return nil +} + +func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) { + if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil { + updateConditionState(&condition.ConditionState, status, message, reason) + } else { + condition = &cdiv1.DataSourceCondition{Type: conditionType} + updateConditionState(&condition.ConditionState, status, message, reason) + ds.Status.Conditions = append(ds.Status.Conditions, *condition) + } +} + +// FindDataSourceConditionByType finds DataSourceCondition by condition type +func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition { + for i, condition := range ds.Status.Conditions { + if condition.Type == conditionType { + return &ds.Status.Conditions[i] + } + } + return nil +} + +// NewDataSourceController creates a new instance of the DataSource controller +func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) { + reconciler := &DataSourceReconciler{ + client: mgr.GetClient(), + recorder: mgr.GetEventRecorderFor(dataImportControllerName), + scheme: mgr.GetScheme(), + log: log.WithName(dataImportControllerName), + installerLabels: installerLabels, + } + DataSourceController, err := controller.New(dataImportControllerName, mgr, controller.Options{Reconciler: reconciler}) + if err != nil { + return nil, err + } + if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil { + return nil, err + } + log.Info("Initialized DataSource controller") + return DataSourceController, nil +} + +func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error { + if err := cdiv1.AddToScheme(mgr.GetScheme()); err != nil { + return err + } + if err := c.Watch(&source.Kind{Type: &cdiv1.DataSource{}}, &handler.EnqueueRequestForObject{}); err != nil { + return err + } + return nil +} diff --git a/pkg/controller/datasource-controller_test.go b/pkg/controller/datasource-controller_test.go new file mode 100644 index 000000000..e14caf34e --- /dev/null +++ b/pkg/controller/datasource-controller_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2022 The CDI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/scheme" + cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + dsName = "test-datasource" + pvcName = "test-pvc" +) + +var _ = Describe("All DataSource Tests", func() { + var _ = Describe("DataSource controller reconcile loop", func() { + var ( + reconciler *DataSourceReconciler + ds *cdiv1.DataSource + dsKey = types.NamespacedName{Name: dsName, Namespace: metav1.NamespaceDefault} + dsReq = reconcile.Request{NamespacedName: dsKey} + ) + + // verifyConditions reconciles, gets DataSource, and verifies its status conditions + var verifyConditions = func(step string, isReady bool, reasonReady string) { + By(step) + _, err := reconciler.Reconcile(context.TODO(), dsReq) + Expect(err).ToNot(HaveOccurred()) + err = reconciler.client.Get(context.TODO(), dsKey, ds) + Expect(err).ToNot(HaveOccurred()) + dsCond := FindDataSourceConditionByType(ds, cdiv1.DataSourceReady) + Expect(dsCond).ToNot(BeNil()) + verifyConditionState(string(cdiv1.DataSourceReady), dsCond.ConditionState, isReady, reasonReady) + } + + It("Should do nothing and return nil when no DataSource exists", func() { + reconciler = createDataSourceReconciler() + _, err := reconciler.Reconcile(context.TODO(), dsReq) + Expect(err).ToNot(HaveOccurred()) + }) + + It("Should update Ready condition when DataSource has no source pvc", func() { + ds = createDataSource() + reconciler = createDataSourceReconciler(ds) + verifyConditions("No source pvc", false, noPvc) + }) + + It("Should update Ready condition when DataSource has source pvc", func() { + ds = createDataSource() + ds.Spec.Source.PVC = &cdiv1.DataVolumeSourcePVC{Namespace: metav1.NamespaceDefault, Name: pvcName} + reconciler = createDataSourceReconciler(ds) + verifyConditions("Source DV does not exist", false, notFound) + + dv := newImportDataVolume(pvcName) + err := reconciler.client.Create(context.TODO(), dv) + Expect(err).ToNot(HaveOccurred()) + + dv.Status.Phase = cdiv1.ImportInProgress + err = reconciler.client.Update(context.TODO(), dv) + Expect(err).ToNot(HaveOccurred()) + verifyConditions("Source DV ImportInProgress", false, string(dv.Status.Phase)) + + dv.Status.Phase = cdiv1.Succeeded + err = reconciler.client.Update(context.TODO(), dv) + Expect(err).ToNot(HaveOccurred()) + verifyConditions("Source DV Succeeded", true, ready) + + err = reconciler.client.Delete(context.TODO(), dv) + Expect(err).ToNot(HaveOccurred()) + verifyConditions("Source DV Deleted", false, notFound) + }) + }) +}) + +func createDataSourceReconciler(objects ...runtime.Object) *DataSourceReconciler { + s := scheme.Scheme + cdiv1.AddToScheme(s) + cl := fake.NewFakeClientWithScheme(s, objects...) + r := &DataSourceReconciler{ + client: cl, + scheme: s, + log: cronLog, + } + return r +} + +func createDataSource() *cdiv1.DataSource { + return &cdiv1.DataSource{ + TypeMeta: metav1.TypeMeta{APIVersion: cdiv1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Name: dsName, + Namespace: metav1.NamespaceDefault, + }, + } +}