mirror of
https://github.com/kubevirt/containerized-data-importer.git
synced 2025-06-03 06:30:22 +00:00
Add DataSource controller to update the Ready condition (#2085)
even when there is no DataImportCron associated Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
This commit is contained in:
parent
ebe8a09c7a
commit
d77abc3fa9
@ -209,6 +209,10 @@ func start(ctx context.Context, cfg *rest.Config) {
|
||||
klog.Errorf("Unable to setup dataimportcron controller: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if _, err := controller.NewDataSourceController(mgr, log, installerLabels); err != nil {
|
||||
klog.Errorf("Unable to setup datasource controller: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
klog.V(1).Infoln("created cdi controllers")
|
||||
|
||||
|
@ -7,6 +7,7 @@ go_library(
|
||||
"config-controller.go",
|
||||
"dataimportcron-conditions.go",
|
||||
"dataimportcron-controller.go",
|
||||
"datasource-controller.go",
|
||||
"datavolume-conditions.go",
|
||||
"datavolume-controller.go",
|
||||
"import-controller.go",
|
||||
|
@ -29,9 +29,7 @@ const (
|
||||
outdated = "Outdated"
|
||||
scheduled = "ImportScheduled"
|
||||
inProgress = "ImportProgressing"
|
||||
failed = "ImportFailed"
|
||||
upToDate = "UpToDate"
|
||||
ready = "Ready"
|
||||
)
|
||||
|
||||
func updateDataImportCronCondition(cron *cdiv1.DataImportCron, conditionType cdiv1.DataImportCronConditionType, status corev1.ConditionStatus, message, reason string) {
|
||||
@ -61,26 +59,6 @@ func FindDataImportCronConditionByType(cron *cdiv1.DataImportCron, conditionType
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
|
||||
if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
|
||||
updateConditionState(&condition.ConditionState, status, message, reason)
|
||||
} else {
|
||||
condition = &cdiv1.DataSourceCondition{Type: conditionType}
|
||||
updateConditionState(&condition.ConditionState, status, message, reason)
|
||||
ds.Status.Conditions = append(ds.Status.Conditions, *condition)
|
||||
}
|
||||
}
|
||||
|
||||
// FindDataSourceConditionByType finds DataSourceCondition by condition type
|
||||
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
|
||||
for i, condition := range ds.Status.Conditions {
|
||||
if condition.Type == conditionType {
|
||||
return &ds.Status.Conditions[i]
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateConditionState(condition *cdiv1.ConditionState, status corev1.ConditionStatus, message, reason string) {
|
||||
conditionStatusUpdated := condition.Status != status
|
||||
conditionUpdated := conditionStatusUpdated || condition.Message != message || condition.Reason != reason
|
||||
|
@ -338,22 +338,7 @@ func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImp
|
||||
|
||||
sourcePVC := dataImportCron.Status.LastImportedPVC
|
||||
if sourcePVC != nil {
|
||||
dv := &cdiv1.DataVolume{}
|
||||
if err := r.client.Get(ctx, types.NamespacedName{Namespace: sourcePVC.Namespace, Name: sourcePVC.Name}, dv); err != nil {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
log.Info("DataVolume not found", "name", sourcePVC.Name)
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "DataVolume not found", notFound)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
} else if dv.Status.Phase == cdiv1.Succeeded {
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
|
||||
} else {
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
|
||||
}
|
||||
dataSource.Spec.Source.PVC = sourcePVC
|
||||
} else {
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No imports yet", noImport)
|
||||
}
|
||||
if !reflect.DeepEqual(dataSource, dataSourceCopy) {
|
||||
if err := r.client.Update(ctx, dataSource); err != nil {
|
||||
|
@ -57,12 +57,13 @@ const (
|
||||
var _ = Describe("All DataImportCron Tests", func() {
|
||||
var _ = Describe("DataImportCron controller reconcile loop", func() {
|
||||
var (
|
||||
reconciler *DataImportCronReconciler
|
||||
cron *cdiv1.DataImportCron
|
||||
dataSource *cdiv1.DataSource
|
||||
cronKey = types.NamespacedName{Name: cronName, Namespace: metav1.NamespaceDefault}
|
||||
cronReq = reconcile.Request{NamespacedName: cronKey}
|
||||
cronJobKey = func(cron *cdiv1.DataImportCron) types.NamespacedName {
|
||||
reconciler *DataImportCronReconciler
|
||||
dsReconciler *DataSourceReconciler
|
||||
cron *cdiv1.DataImportCron
|
||||
dataSource *cdiv1.DataSource
|
||||
cronKey = types.NamespacedName{Name: cronName, Namespace: metav1.NamespaceDefault}
|
||||
cronReq = reconcile.Request{NamespacedName: cronKey}
|
||||
cronJobKey = func(cron *cdiv1.DataImportCron) types.NamespacedName {
|
||||
return types.NamespacedName{Name: GetCronJobName(cron), Namespace: reconciler.cdiNamespace}
|
||||
}
|
||||
dataSourceKey = func(cron *cdiv1.DataImportCron) types.NamespacedName {
|
||||
@ -87,8 +88,24 @@ var _ = Describe("All DataImportCron Tests", func() {
|
||||
Expect(cronCond).ToNot(BeNil())
|
||||
verifyConditionState(string(cdiv1.DataImportCronUpToDate), cronCond.ConditionState, isUpToDate, reasonUpToDate)
|
||||
if dataSource != nil {
|
||||
imports := cron.Status.CurrentImports
|
||||
Expect(imports).ToNot(BeNil())
|
||||
Expect(len(imports)).ToNot(BeZero())
|
||||
dvName := imports[0].DataVolumeName
|
||||
Expect(dvName).ToNot(BeEmpty())
|
||||
|
||||
dv := &cdiv1.DataVolume{}
|
||||
err = reconciler.client.Get(context.TODO(), dvKey(dvName), dv)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = reconciler.client.Get(context.TODO(), dataSourceKey(cron), dataSource)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
dsReconciler = createDataSourceReconciler(dataSource, dv)
|
||||
dsReq := reconcile.Request{NamespacedName: dataSourceKey(cron)}
|
||||
_, err = dsReconciler.Reconcile(context.TODO(), dsReq)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
err = dsReconciler.client.Get(context.TODO(), dataSourceKey(cron), dataSource)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
dsCond := FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
|
||||
Expect(dsCond).ToNot(BeNil())
|
||||
verifyConditionState(string(cdiv1.DataSourceReady), dsCond.ConditionState, isReady, reasonReady)
|
||||
@ -147,7 +164,7 @@ var _ = Describe("All DataImportCron Tests", func() {
|
||||
err := reconciler.client.Update(context.TODO(), cron)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
dataSource = &cdiv1.DataSource{}
|
||||
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noImport)
|
||||
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noPvc)
|
||||
|
||||
imports := cron.Status.CurrentImports
|
||||
Expect(imports).ToNot(BeNil())
|
||||
@ -165,12 +182,12 @@ var _ = Describe("All DataImportCron Tests", func() {
|
||||
dv.Status.Phase = cdiv1.ImportScheduled
|
||||
err = reconciler.client.Update(context.TODO(), dv)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noImport)
|
||||
verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noPvc)
|
||||
|
||||
dv.Status.Phase = cdiv1.ImportInProgress
|
||||
err = reconciler.client.Update(context.TODO(), dv)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noImport)
|
||||
verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noPvc)
|
||||
Expect(cron.Status.LastExecutionTimestamp).ToNot(BeNil())
|
||||
|
||||
dv.Status.Phase = cdiv1.Succeeded
|
||||
|
144
pkg/controller/datasource-controller.go
Normal file
144
pkg/controller/datasource-controller.go
Normal file
@ -0,0 +1,144 @@
|
||||
/*
|
||||
Copyright 2022 The CDI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
limitations under the License.
|
||||
See the License for the specific language governing permissions and
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/tools/record"
|
||||
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
)
|
||||
|
||||
// DataSourceReconciler members
|
||||
type DataSourceReconciler struct {
|
||||
client client.Client
|
||||
recorder record.EventRecorder
|
||||
scheme *runtime.Scheme
|
||||
log logr.Logger
|
||||
installerLabels map[string]string
|
||||
}
|
||||
|
||||
const (
|
||||
ready = "Ready"
|
||||
noPvc = "NoPvc"
|
||||
)
|
||||
|
||||
// Reconcile loop for DataSourceReconciler
|
||||
func (r *DataSourceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
|
||||
dataSource := &cdiv1.DataSource{}
|
||||
if err := r.client.Get(ctx, req.NamespacedName, dataSource); err != nil {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
if err := r.update(ctx, dataSource); err != nil {
|
||||
return reconcile.Result{}, err
|
||||
}
|
||||
return reconcile.Result{}, nil
|
||||
}
|
||||
|
||||
func (r *DataSourceReconciler) update(ctx context.Context, dataSource *cdiv1.DataSource) error {
|
||||
dataSourceCopy := dataSource.DeepCopy()
|
||||
sourcePVC := dataSource.Spec.Source.PVC
|
||||
if sourcePVC != nil {
|
||||
dv := &cdiv1.DataVolume{}
|
||||
if err := r.client.Get(ctx, types.NamespacedName{Namespace: sourcePVC.Namespace, Name: sourcePVC.Name}, dv); err != nil {
|
||||
if k8serrors.IsNotFound(err) {
|
||||
r.log.Info("DataVolume not found", "name", sourcePVC.Name)
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "DataVolume not found", notFound)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
} else if dv.Status.Phase == cdiv1.Succeeded {
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
|
||||
} else {
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dv.Status.Phase), string(dv.Status.Phase))
|
||||
}
|
||||
} else {
|
||||
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "No source PVC set", noPvc)
|
||||
}
|
||||
if !reflect.DeepEqual(dataSource, dataSourceCopy) {
|
||||
if err := r.client.Update(ctx, dataSource); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateDataSourceCondition(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType, status corev1.ConditionStatus, message, reason string) {
|
||||
if condition := FindDataSourceConditionByType(ds, conditionType); condition != nil {
|
||||
updateConditionState(&condition.ConditionState, status, message, reason)
|
||||
} else {
|
||||
condition = &cdiv1.DataSourceCondition{Type: conditionType}
|
||||
updateConditionState(&condition.ConditionState, status, message, reason)
|
||||
ds.Status.Conditions = append(ds.Status.Conditions, *condition)
|
||||
}
|
||||
}
|
||||
|
||||
// FindDataSourceConditionByType finds DataSourceCondition by condition type
|
||||
func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.DataSourceConditionType) *cdiv1.DataSourceCondition {
|
||||
for i, condition := range ds.Status.Conditions {
|
||||
if condition.Type == conditionType {
|
||||
return &ds.Status.Conditions[i]
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// NewDataSourceController creates a new instance of the DataSource controller
|
||||
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
|
||||
reconciler := &DataSourceReconciler{
|
||||
client: mgr.GetClient(),
|
||||
recorder: mgr.GetEventRecorderFor(dataImportControllerName),
|
||||
scheme: mgr.GetScheme(),
|
||||
log: log.WithName(dataImportControllerName),
|
||||
installerLabels: installerLabels,
|
||||
}
|
||||
DataSourceController, err := controller.New(dataImportControllerName, mgr, controller.Options{Reconciler: reconciler})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := addDataSourceControllerWatches(mgr, DataSourceController, log); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
log.Info("Initialized DataSource controller")
|
||||
return DataSourceController, nil
|
||||
}
|
||||
|
||||
func addDataSourceControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
|
||||
if err := cdiv1.AddToScheme(mgr.GetScheme()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.Watch(&source.Kind{Type: &cdiv1.DataSource{}}, &handler.EnqueueRequestForObject{}); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
119
pkg/controller/datasource-controller_test.go
Normal file
119
pkg/controller/datasource-controller_test.go
Normal file
@ -0,0 +1,119 @@
|
||||
/*
|
||||
Copyright 2022 The CDI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/fake"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
)
|
||||
|
||||
const (
|
||||
dsName = "test-datasource"
|
||||
pvcName = "test-pvc"
|
||||
)
|
||||
|
||||
var _ = Describe("All DataSource Tests", func() {
|
||||
var _ = Describe("DataSource controller reconcile loop", func() {
|
||||
var (
|
||||
reconciler *DataSourceReconciler
|
||||
ds *cdiv1.DataSource
|
||||
dsKey = types.NamespacedName{Name: dsName, Namespace: metav1.NamespaceDefault}
|
||||
dsReq = reconcile.Request{NamespacedName: dsKey}
|
||||
)
|
||||
|
||||
// verifyConditions reconciles, gets DataSource, and verifies its status conditions
|
||||
var verifyConditions = func(step string, isReady bool, reasonReady string) {
|
||||
By(step)
|
||||
_, err := reconciler.Reconcile(context.TODO(), dsReq)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
err = reconciler.client.Get(context.TODO(), dsKey, ds)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
dsCond := FindDataSourceConditionByType(ds, cdiv1.DataSourceReady)
|
||||
Expect(dsCond).ToNot(BeNil())
|
||||
verifyConditionState(string(cdiv1.DataSourceReady), dsCond.ConditionState, isReady, reasonReady)
|
||||
}
|
||||
|
||||
It("Should do nothing and return nil when no DataSource exists", func() {
|
||||
reconciler = createDataSourceReconciler()
|
||||
_, err := reconciler.Reconcile(context.TODO(), dsReq)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
})
|
||||
|
||||
It("Should update Ready condition when DataSource has no source pvc", func() {
|
||||
ds = createDataSource()
|
||||
reconciler = createDataSourceReconciler(ds)
|
||||
verifyConditions("No source pvc", false, noPvc)
|
||||
})
|
||||
|
||||
It("Should update Ready condition when DataSource has source pvc", func() {
|
||||
ds = createDataSource()
|
||||
ds.Spec.Source.PVC = &cdiv1.DataVolumeSourcePVC{Namespace: metav1.NamespaceDefault, Name: pvcName}
|
||||
reconciler = createDataSourceReconciler(ds)
|
||||
verifyConditions("Source DV does not exist", false, notFound)
|
||||
|
||||
dv := newImportDataVolume(pvcName)
|
||||
err := reconciler.client.Create(context.TODO(), dv)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
dv.Status.Phase = cdiv1.ImportInProgress
|
||||
err = reconciler.client.Update(context.TODO(), dv)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
verifyConditions("Source DV ImportInProgress", false, string(dv.Status.Phase))
|
||||
|
||||
dv.Status.Phase = cdiv1.Succeeded
|
||||
err = reconciler.client.Update(context.TODO(), dv)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
verifyConditions("Source DV Succeeded", true, ready)
|
||||
|
||||
err = reconciler.client.Delete(context.TODO(), dv)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
verifyConditions("Source DV Deleted", false, notFound)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
func createDataSourceReconciler(objects ...runtime.Object) *DataSourceReconciler {
|
||||
s := scheme.Scheme
|
||||
cdiv1.AddToScheme(s)
|
||||
cl := fake.NewFakeClientWithScheme(s, objects...)
|
||||
r := &DataSourceReconciler{
|
||||
client: cl,
|
||||
scheme: s,
|
||||
log: cronLog,
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func createDataSource() *cdiv1.DataSource {
|
||||
return &cdiv1.DataSource{
|
||||
TypeMeta: metav1.TypeMeta{APIVersion: cdiv1.SchemeGroupVersion.String()},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: dsName,
|
||||
Namespace: metav1.NamespaceDefault,
|
||||
},
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user