Allow snapshots as format for DataImportCron created sources (#2700)

* StorageProfile API for declaring format of resulting cron disk images

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Integrate recommended format in dataimportcron controller

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Take snapclass existence into consideration when populating cloneStrategy and sourceFormat

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

---------

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
This commit is contained in:
akalenyu 2023-06-08 18:29:01 +03:00 committed by GitHub
parent fab858e2fa
commit 33c55a5560
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1405 additions and 357 deletions

View File

@ -27208,6 +27208,13 @@ func schema_pkg_apis_core_v1beta1_StorageProfileSpec(ref common.ReferenceCallbac
},
},
},
"dataImportCronSourceFormat": {
SchemaProps: spec.SchemaProps{
Description: "DataImportCronSourceFormat defines the format of the DataImportCron-created disk image sources",
Type: []string{"string"},
Format: "",
},
},
},
},
},
@ -27258,6 +27265,13 @@ func schema_pkg_apis_core_v1beta1_StorageProfileStatus(ref common.ReferenceCallb
},
},
},
"dataImportCronSourceFormat": {
SchemaProps: spec.SchemaProps{
Description: "DataImportCronSourceFormat defines the format of the DataImportCron-created disk image sources",
Type: []string{"string"},
Format: "",
},
},
},
},
},

View File

@ -54,9 +54,7 @@ func (p *SnapshotClonePhase) Reconcile(ctx context.Context) (*reconcile.Result,
return nil, fmt.Errorf("source snapshot does not exist")
}
if snapshot.Status == nil ||
snapshot.Status.ReadyToUse == nil ||
!*snapshot.Status.ReadyToUse {
if !cc.IsSnapshotReady(snapshot) {
return &reconcile.Result{}, nil
}

View File

@ -22,6 +22,7 @@ go_library(
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -38,6 +38,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
@ -1660,3 +1661,77 @@ func ValidateSnapshotCloneProvisioners(ctx context.Context, c client.Client, sna
// converting volume mode is possible but has security implications
return true, nil
}
// GetSnapshotClassForSmartClone looks up the snapshot class based on the storage class
func GetSnapshotClassForSmartClone(dvName string, targetPvcStorageClassName *string, log logr.Logger, client client.Client) (string, error) {
logger := log.WithName("GetSnapshotClassForSmartClone").V(3)
// Check if relevant CRDs are available
if !isCsiCrdsDeployed(client, log) {
logger.Info("Missing CSI snapshotter CRDs, falling back to host assisted clone")
return "", nil
}
targetStorageClass, err := GetStorageClassByName(context.TODO(), client, targetPvcStorageClassName)
if err != nil {
return "", err
}
if targetStorageClass == nil {
logger.Info("Target PVC's Storage Class not found")
return "", nil
}
// List the snapshot classes
scs := &snapshotv1.VolumeSnapshotClassList{}
if err := client.List(context.TODO(), scs); err != nil {
logger.Info("Cannot list snapshot classes, falling back to host assisted clone")
return "", err
}
for _, snapshotClass := range scs.Items {
// Validate association between snapshot class and storage class
if snapshotClass.Driver == targetStorageClass.Provisioner {
logger.Info("smart-clone is applicable for datavolume", "datavolume",
dvName, "snapshot class", snapshotClass.Name)
return snapshotClass.Name, nil
}
}
logger.Info("Could not match snapshotter with storage class, falling back to host assisted clone")
return "", nil
}
// isCsiCrdsDeployed checks whether the CSI snapshotter CRD are deployed
func isCsiCrdsDeployed(c client.Client, log logr.Logger) bool {
version := "v1"
vsClass := "volumesnapshotclasses." + snapshotv1.GroupName
vsContent := "volumesnapshotcontents." + snapshotv1.GroupName
vs := "volumesnapshots." + snapshotv1.GroupName
return isCrdDeployed(c, vsClass, version, log) &&
isCrdDeployed(c, vsContent, version, log) &&
isCrdDeployed(c, vs, version, log)
}
// isCrdDeployed checks whether a CRD is deployed
func isCrdDeployed(c client.Client, name, version string, log logr.Logger) bool {
crd := &extv1.CustomResourceDefinition{}
err := c.Get(context.TODO(), types.NamespacedName{Name: name}, crd)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Info("Error looking up CRD", "crd name", name, "version", version, "error", err)
}
return false
}
for _, v := range crd.Spec.Versions {
if v.Name == version && v.Served {
return true
}
}
return false
}
// IsSnapshotReady indicates if a volume snapshot is ready to be used
func IsSnapshotReady(snapshot *snapshotv1.VolumeSnapshot) bool {
return snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse
}

View File

@ -32,9 +32,12 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -106,6 +109,8 @@ const (
AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
// AnnLastUseTime is the PVC last use time stamp
AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
// AnnStorageClass is the cron DV's storage class
AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
dataImportControllerName = "dataimportcron-controller"
digestPrefix = "sha256:"
@ -311,13 +316,43 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
dataImportCronCopy := dataImportCron.DeepCopy()
imports := dataImportCron.Status.CurrentImports
importSucceeded := false
dataVolume := dataImportCron.Spec.Template
explicitScName := getStorageClassFromTemplate(&dataVolume)
dvStorageClass, err := cc.GetStorageClassByName(ctx, r.client, explicitScName)
if err != nil {
return res, err
}
if dvStorageClass != nil {
cc.AddAnnotation(dataImportCron, AnnStorageClass, dvStorageClass.Name)
}
format, err := r.getSourceFormat(ctx, dataImportCron, dvStorageClass)
if err != nil {
return res, err
}
snapshot, err := r.getSnapshot(ctx, dataImportCron, format)
if err != nil {
return res, err
}
handlePopulatedPvc := func() error {
if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
return err
}
importSucceeded = true
if err := r.handleCronFormat(ctx, dataImportCron, format, dvStorageClass); err != nil {
return err
}
return nil
}
if dv != nil {
switch dv.Status.Phase {
case cdiv1.Succeeded:
if err = r.updatePvc(ctx, dataImportCron, pvc); err != nil {
if err := handlePopulatedPvc(); err != nil {
return res, err
}
importSucceeded = true
case cdiv1.ImportScheduled:
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
case cdiv1.ImportInProgress:
@ -327,7 +362,12 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
}
} else if pvc != nil {
if err = r.updatePvc(ctx, dataImportCron, pvc); err != nil {
// TODO: with plain populator PVCs (no DataVolumes) we may need to wait for corev1.Bound
if err := handlePopulatedPvc(); err != nil {
return res, err
}
} else if snapshot != nil {
if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
return res, err
}
importSucceeded = true
@ -348,14 +388,14 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
}
}
if err := r.updateDataSource(ctx, dataImportCron); err != nil {
if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
return res, err
}
// We use the poller returned reconcile.Result for RequeueAfter if needed
// skip if we disabled schedule
if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
res, err = r.pollImageStreamDigest(ctx, dataImportCron)
res, err := r.pollImageStreamDigest(ctx, dataImportCron)
if err != nil {
return res, err
}
@ -376,7 +416,9 @@ func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *c
}
}
} else if importSucceeded {
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
if err := r.updateDataImportCronSuccessCondition(ctx, dataImportCron, format, snapshot); err != nil {
return res, err
}
} else if len(imports) > 0 {
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
} else {
@ -421,12 +463,35 @@ func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdi
return dv, pvc, nil
}
func (r *DataImportCronReconciler) updatePvc(ctx context.Context, cron *cdiv1.DataImportCron, pvc *corev1.PersistentVolumeClaim) error {
pvcCopy := pvc.DeepCopy()
cc.AddAnnotation(pvc, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
r.setDataImportCronResourceLabels(cron, pvc)
if !reflect.DeepEqual(pvc, pvcCopy) {
if err := r.client.Update(ctx, pvc); err != nil {
// Returns the current import DV if exists, and the last imported PVC
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) (*snapshotv1.VolumeSnapshot, error) {
if format != cdiv1.DataImportCronSourceFormatSnapshot {
return nil, nil
}
imports := cron.Status.CurrentImports
if len(imports) == 0 {
return nil, nil
}
snapName := imports[0].DataVolumeName
snapshot := &snapshotv1.VolumeSnapshot{}
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
if !k8serrors.IsNotFound(err) {
return nil, err
}
return nil, nil
}
return snapshot, nil
}
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
objCopy := obj.DeepCopyObject()
cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
r.setDataImportCronResourceLabels(cron, obj)
if !reflect.DeepEqual(obj, objCopy) {
if err := r.client.Update(ctx, obj); err != nil {
return err
}
}
@ -478,7 +543,7 @@ func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Co
return nil
}
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
log := r.log.WithName("updateDataSource")
dataSourceName := dataImportCron.Spec.ManagedDataSource
dataSource := &cdiv1.DataSource{}
@ -506,17 +571,37 @@ func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImp
passCronLabelToDataSource(dataImportCron, dataSource, cc.LabelDefaultPreferenceKind)
sourcePVC := dataImportCron.Status.LastImportedPVC
if sourcePVC != nil {
dataSource.Spec.Source.PVC = sourcePVC
}
populateDataSource(format, dataSource, sourcePVC)
if !reflect.DeepEqual(dataSource, dataSourceCopy) {
if err := r.client.Update(ctx, dataSource); err != nil {
return err
}
}
return nil
}
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
if sourcePVC == nil {
return
}
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
dataSource.Spec.Source = cdiv1.DataSourceSource{
PVC: sourcePVC,
}
case cdiv1.DataImportCronSourceFormatSnapshot:
dataSource.Spec.Source = cdiv1.DataSourceSource{
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
Namespace: sourcePVC.Namespace,
Name: sourcePVC.Name,
},
}
}
}
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
if dataImportCron.Status.CurrentImports == nil {
return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
@ -558,22 +643,145 @@ func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, d
if err != nil {
return err
}
// If PVC exists don't create DV
pvc := &corev1.PersistentVolumeClaim{}
if err = r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, pvc); err != nil {
dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
for _, src := range sources {
if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
return err
}
} else {
if err := r.updateSource(ctx, dataImportCron, src); err != nil {
return err
}
// If source exists don't create DV
return nil
}
}
dv := r.newSourceDataVolume(dataImportCron, dvName)
if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
return err
}
return nil
}
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, dvStorageClass *storagev1.StorageClass) error {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
return nil
case cdiv1.DataImportCronSourceFormatSnapshot:
return r.handleSnapshot(ctx, dataImportCron, &dataImportCron.Spec.Template, dvStorageClass)
default:
return fmt.Errorf("unknown source format for snapshot")
}
}
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, dataVolume *cdiv1.DataVolume, dvStorageClass *storagev1.StorageClass) error {
dataSourceName := dataImportCron.Spec.ManagedDataSource
digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
if digest == "" {
return nil
}
dvName, err := createDvName(dataSourceName, digest)
if err != nil {
return err
}
className, err := cc.GetSnapshotClassForSmartClone(dataVolume.Name, &dvStorageClass.Name, r.log, r.client)
if err != nil {
return err
}
labels := map[string]string{
common.CDILabelKey: common.CDILabelValue,
common.CDIComponentLabel: "",
}
desiredSnapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: dvName,
Namespace: dataImportCron.Namespace,
Labels: labels,
},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &dvName,
},
VolumeSnapshotClassName: &className,
},
}
r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
currentSnapshot := &snapshotv1.VolumeSnapshot{}
if err := r.client.Get(context.TODO(), client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
if !k8serrors.IsNotFound(err) {
return err
}
dv := r.newSourceDataVolume(dataImportCron, dvName)
if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
if err := r.client.Create(ctx, desiredSnapshot); err != nil {
return err
}
} else {
if err := r.updatePvc(ctx, dataImportCron, pvc); err != nil {
return err
if cc.IsSnapshotReady(currentSnapshot) {
// Clean up PVC as that is not needed any more
pvc := &corev1.PersistentVolumeClaim{ObjectMeta: metav1.ObjectMeta{Name: desiredSnapshot.Name, Namespace: desiredSnapshot.Namespace}}
if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
return err
}
}
}
dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
return nil
}
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
case cdiv1.DataImportCronSourceFormatSnapshot:
if snapshot == nil {
// Snapshot create/update will trigger reconcile
return nil
}
if cc.IsSnapshotReady(snapshot) {
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
} else {
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
}
default:
return fmt.Errorf("unknown source format for snapshot")
}
return nil
}
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, dvStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
format := cdiv1.DataImportCronSourceFormatPvc
if dvStorageClass == nil {
return format, nil
}
storageProfile := &cdiv1.StorageProfile{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: dvStorageClass.Name}, storageProfile); err != nil {
return format, err
}
if storageProfile.Status.DataImportCronSourceFormat != nil {
format = *storageProfile.Status.DataImportCronSourceFormat
}
return format, nil
}
func getStorageClassFromTemplate(dataVolume *cdiv1.DataVolume) *string {
if dataVolume.Spec.PVC != nil {
return dataVolume.Spec.PVC.StorageClassName
}
if dataVolume.Spec.Storage != nil {
return dataVolume.Spec.Storage.StorageClassName
}
return nil
}
@ -592,8 +800,20 @@ func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context,
maxImports = int(*cron.Spec.ImportsToKeep)
}
if err := r.garbageCollectPVCs(ctx, cron.Namespace, selector, maxImports); err != nil {
return err
}
if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
return err
}
return nil
}
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
pvcList := &corev1.PersistentVolumeClaimList{}
if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}); err != nil {
if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
return err
}
if len(pvcList.Items) > maxImports {
@ -612,6 +832,30 @@ func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context,
}
}
}
return nil
}
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
snapList := &snapshotv1.VolumeSnapshotList{}
if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
if meta.IsNoMatchError(err) {
return nil
}
return err
}
if len(snapList.Items) > maxImports {
sort.Slice(snapList.Items, func(i, j int) bool {
return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
})
for _, snap := range snapList.Items[maxImports:] {
if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
return err
}
}
}
return nil
}
@ -635,6 +879,9 @@ func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.Names
if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
return err
}
if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
return err
}
return nil
}
@ -705,24 +952,40 @@ func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Contro
getCronName := func(obj client.Object) string {
return obj.GetLabels()[common.DataImportCronLabel]
}
mapToCron := func(obj client.Object) []reconcile.Request {
mapSourceObjectToCron := func(obj client.Object) []reconcile.Request {
if cronName := getCronName(obj); cronName != "" {
return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
}
return nil
}
if err := c.Watch(&source.Kind{Type: &cdiv1.DataVolume{}},
handler.EnqueueRequestsFromMapFunc(mapToCron),
predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool { return getCronName(e.ObjectNew) != "" },
DeleteFunc: func(e event.DeleteEvent) bool { return getCronName(e.Object) != "" },
},
); err != nil {
return err
mapStorageProfileToCron := func(obj client.Object) (reqs []reconcile.Request) {
// TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
// Otherwise we risk losing the storage profile event
var crons cdiv1.DataImportCronList
if err := mgr.GetClient().List(context.TODO(), &crons); err != nil {
c.GetLogger().Error(err, "Unable to list DataImportCrons")
return
}
// Storage profiles are 1:1 to storage classes
scName := obj.GetName()
for _, cron := range crons.Items {
dataVolume := cron.Spec.Template
explicitScName := getStorageClassFromTemplate(&dataVolume)
templateSc, err := cc.GetStorageClassByName(context.TODO(), mgr.GetClient(), explicitScName)
if err != nil || templateSc == nil {
c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
return
}
if templateSc.Name == scName {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
}
}
return
}
if err := c.Watch(&source.Kind{Type: &cdiv1.DataSource{}},
handler.EnqueueRequestsFromMapFunc(mapToCron),
if err := c.Watch(&source.Kind{Type: &cdiv1.DataVolume{}},
handler.EnqueueRequestsFromMapFunc(mapSourceObjectToCron),
predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool { return getCronName(e.ObjectNew) != "" },
@ -731,6 +994,33 @@ func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Contro
); err != nil {
return err
}
if err := c.Watch(&source.Kind{Type: &cdiv1.DataSource{}},
handler.EnqueueRequestsFromMapFunc(mapSourceObjectToCron),
predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool { return getCronName(e.ObjectNew) != "" },
DeleteFunc: func(e event.DeleteEvent) bool { return getCronName(e.Object) != "" },
},
); err != nil {
return err
}
if err := c.Watch(&source.Kind{Type: &cdiv1.StorageProfile{}},
handler.EnqueueRequestsFromMapFunc(mapStorageProfileToCron),
predicate.Funcs{
CreateFunc: func(event.CreateEvent) bool { return true },
DeleteFunc: func(event.DeleteEvent) bool { return false },
UpdateFunc: func(e event.UpdateEvent) bool {
profileOld, okOld := e.ObjectOld.(*cdiv1.StorageProfile)
profileNew, okNew := e.ObjectNew.(*cdiv1.StorageProfile)
return okOld && okNew && profileOld.Status.DataImportCronSourceFormat != profileNew.Status.DataImportCronSourceFormat
},
},
); err != nil {
return err
}
return nil
}
@ -778,7 +1068,7 @@ func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *cor
return errors.Errorf("No URL source in cron %s", cron.Name)
}
cdiConfig := &cdiv1.CDIConfig{}
if err = c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
return err
}
insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
@ -941,7 +1231,7 @@ func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCro
if isURLSource(cron) {
digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
} else if isImageStreamSource(cron) {
// No way to import image stream by name when we want speciific digest, so we use its docker reference
// No way to import image stream by name when we want specific digest, so we use its docker reference
digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
dv.Spec.Source.Registry.ImageStream = nil
}

View File

@ -24,6 +24,7 @@ import (
"time"
"github.com/google/uuid"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
@ -32,6 +33,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -111,7 +113,7 @@ var _ = Describe("All DataImportCron Tests", func() {
)
// verifyConditions reconciles, gets DataImportCron and DataSource, and verifies their status conditions
var verifyConditions = func(step string, isProgressing, isUpToDate, isReady bool, reasonProgressing, reasonUpToDate, reasonReady string) {
var verifyConditions = func(step string, isProgressing, isUpToDate, isReady bool, reasonProgressing, reasonUpToDate, reasonReady string, sourceObj client.Object) {
By(step)
_, err := reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())
@ -141,12 +143,11 @@ var _ = Describe("All DataImportCron Tests", func() {
Expect(err).ToNot(HaveOccurred())
} else {
Expect(k8serrors.IsNotFound(err)).To(BeTrue())
pvc := &corev1.PersistentVolumeClaim{}
err = reconciler.client.Get(context.TODO(), dvKey(dvName), pvc)
err = reconciler.client.Get(context.TODO(), dvKey(dvName), sourceObj)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), dataSourceKey(cron), dataSource)
Expect(err).ToNot(HaveOccurred())
dsReconciler = createDataSourceReconciler(dataSource, pvc)
dsReconciler = createDataSourceReconciler(dataSource, sourceObj)
dsReq := reconcile.Request{NamespacedName: dataSourceKey(cron)}
_, err = dsReconciler.Reconcile(context.TODO(), dsReq)
Expect(err).ToNot(HaveOccurred())
@ -387,13 +388,13 @@ var _ = Describe("All DataImportCron Tests", func() {
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
reconciler = createDataImportCronReconciler(cron)
verifyConditions("Before DesiredDigest is set", false, false, false, noImport, noDigest, "")
verifyConditions("Before DesiredDigest is set", false, false, false, noImport, noDigest, "", &corev1.PersistentVolumeClaim{})
cc.AddAnnotation(cron, AnnSourceDesiredDigest, testDigest)
err := reconciler.client.Update(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())
dataSource = &cdiv1.DataSource{}
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noSource)
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noSource, &corev1.PersistentVolumeClaim{})
imports := cron.Status.CurrentImports
Expect(imports).ToNot(BeNil())
@ -418,12 +419,12 @@ var _ = Describe("All DataImportCron Tests", func() {
dv.Status.Phase = cdiv1.ImportScheduled
err = reconciler.client.Update(context.TODO(), dv)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noSource)
verifyConditions("Import scheduled", false, false, false, scheduled, inProgress, noSource, &corev1.PersistentVolumeClaim{})
dv.Status.Phase = cdiv1.ImportInProgress
err = reconciler.client.Update(context.TODO(), dv)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noSource)
verifyConditions("Import in progress", true, false, false, inProgress, inProgress, noSource, &corev1.PersistentVolumeClaim{})
dv.Status.Phase = cdiv1.Succeeded
err = reconciler.client.Update(context.TODO(), dv)
@ -433,7 +434,7 @@ var _ = Describe("All DataImportCron Tests", func() {
err = reconciler.client.Create(context.TODO(), pvc)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import succeeded", false, true, true, noImport, upToDate, ready)
verifyConditions("Import succeeded", false, true, true, noImport, upToDate, ready, &corev1.PersistentVolumeClaim{})
sourcePVC := cdiv1.DataVolumeSourcePVC{
Namespace: cron.Namespace,
@ -474,7 +475,7 @@ var _ = Describe("All DataImportCron Tests", func() {
cron = newDataImportCron(cronName)
dataSource = nil
reconciler = createDataImportCronReconciler(cron)
verifyConditions("Before DesiredDigest is set", false, false, false, noImport, noDigest, "")
verifyConditions("Before DesiredDigest is set", false, false, false, noImport, noDigest, "", &corev1.PersistentVolumeClaim{})
for i := 0; i < nPVCs; i++ {
digest := strings.Repeat(strconv.Itoa(i), 12)
@ -494,7 +495,7 @@ var _ = Describe("All DataImportCron Tests", func() {
_, err = reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import succeeded", false, true, true, noImport, upToDate, ready)
verifyConditions("Import succeeded", false, true, true, noImport, upToDate, ready, &corev1.PersistentVolumeClaim{})
imports := cron.Status.CurrentImports
Expect(imports).To(HaveLen(1))
@ -659,7 +660,7 @@ var _ = Describe("All DataImportCron Tests", func() {
pvc := cc.CreatePvc(dv.Name, dv.Namespace, nil, nil)
err = reconciler.client.Create(context.TODO(), pvc)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import succeeded", false, true, true, noImport, upToDate, ready)
verifyConditions("Import succeeded", false, true, true, noImport, upToDate, ready, &corev1.PersistentVolumeClaim{})
now := metav1.Now()
cron.DeletionTimestamp = &now
@ -747,6 +748,163 @@ var _ = Describe("All DataImportCron Tests", func() {
Expect(reconciler.client.Get(context.TODO(), dataSourceKey(cron), dataSource)).To(Succeed())
ExpectInstancetypeLabels(dataSource.Labels)
})
Context("Snapshot source format", func() {
BeforeEach(func() {
snapFormat := cdiv1.DataImportCronSourceFormatSnapshot
sc := cc.CreateStorageClass(storageClassName, map[string]string{cc.AnnDefaultStorageClass: "true"})
sp := &cdiv1.StorageProfile{
ObjectMeta: metav1.ObjectMeta{
Name: storageClassName,
},
Status: cdiv1.StorageProfileStatus{
DataImportCronSourceFormat: &snapFormat,
},
}
reconciler = createDataImportCronReconciler(sc, sp)
storageProfile := &cdiv1.StorageProfile{ObjectMeta: metav1.ObjectMeta{Name: storageClassName}}
err := reconciler.client.Get(context.TODO(), client.ObjectKeyFromObject(storageProfile), storageProfile)
Expect(err).ToNot(HaveOccurred())
})
It("Should proceed to at least creating a PVC when no default storage class", func() {
// Simulate an environment without default storage class
sc := &storagev1.StorageClass{ObjectMeta: metav1.ObjectMeta{Name: storageClassName}}
sp := &cdiv1.StorageProfile{ObjectMeta: metav1.ObjectMeta{Name: storageClassName}}
err := reconciler.client.Delete(context.TODO(), sc)
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Delete(context.TODO(), sp)
Expect(err).ToNot(HaveOccurred())
cron = newDataImportCron(cronName)
dataSource = nil
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
err = reconciler.client.Create(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Before DesiredDigest is set", false, false, false, noImport, noDigest, "", &corev1.PersistentVolumeClaim{})
cc.AddAnnotation(cron, AnnSourceDesiredDigest, testDigest)
err = reconciler.client.Update(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())
dataSource = &cdiv1.DataSource{}
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noSource, &corev1.PersistentVolumeClaim{})
imports := cron.Status.CurrentImports
Expect(imports).ToNot(BeNil())
Expect(imports).ToNot(BeEmpty())
dvName := imports[0].DataVolumeName
Expect(dvName).ToNot(BeEmpty())
digest := imports[0].Digest
Expect(digest).To(Equal(testDigest))
dv := &cdiv1.DataVolume{}
err = reconciler.client.Get(context.TODO(), dvKey(dvName), dv)
Expect(err).ToNot(HaveOccurred())
Expect(*dv.Spec.Source.Registry.URL).To(Equal(testRegistryURL + "@" + testDigest))
Expect(dv.Annotations[cc.AnnImmediateBinding]).To(Equal("true"))
})
It("Should create snapshot, and update DataImportCron and DataSource once its ready to use", func() {
cron = newDataImportCron(cronName)
dataSource = nil
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
err := reconciler.client.Create(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Before DesiredDigest is set", false, false, false, noImport, noDigest, "", &snapshotv1.VolumeSnapshot{})
cc.AddAnnotation(cron, AnnSourceDesiredDigest, testDigest)
err = reconciler.client.Update(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())
dataSource = &cdiv1.DataSource{}
verifyConditions("After DesiredDigest is set", false, false, false, noImport, outdated, noSource, &snapshotv1.VolumeSnapshot{})
imports := cron.Status.CurrentImports
Expect(imports).ToNot(BeNil())
Expect(imports).ToNot(BeEmpty())
dvName := imports[0].DataVolumeName
Expect(dvName).ToNot(BeEmpty())
digest := imports[0].Digest
Expect(digest).To(Equal(testDigest))
dv := &cdiv1.DataVolume{}
err = reconciler.client.Get(context.TODO(), dvKey(dvName), dv)
Expect(err).ToNot(HaveOccurred())
Expect(*dv.Spec.Source.Registry.URL).To(Equal(testRegistryURL + "@" + testDigest))
Expect(dv.Annotations[cc.AnnImmediateBinding]).To(Equal("true"))
pvc := cc.CreatePvc(dv.Name, dv.Namespace, nil, nil)
err = reconciler.client.Create(context.TODO(), pvc)
Expect(err).ToNot(HaveOccurred())
// DV GCed after hitting succeeded
err = reconciler.client.Delete(context.TODO(), dv)
Expect(err).ToNot(HaveOccurred())
// Reconcile that gets snapshot created
verifyConditions("Snap creation reconcile", false, false, false, noImport, outdated, "SnapshotNotReady", &snapshotv1.VolumeSnapshot{})
// Reconcile so created snapshot can be fetched
verifyConditions("Snap creation triggered reconcile", false, false, false, noImport, inProgress, "SnapshotNotReady", &snapshotv1.VolumeSnapshot{})
// Make snap ready so we reach UpToDate
snap := &snapshotv1.VolumeSnapshot{}
err = reconciler.client.Get(context.TODO(), dvKey(dvName), snap)
Expect(err).ToNot(HaveOccurred())
snap.Status = &snapshotv1.VolumeSnapshotStatus{
ReadyToUse: pointer.Bool(true),
}
err = reconciler.client.Update(context.TODO(), snap)
Expect(err).ToNot(HaveOccurred())
verifyConditions("Import succeeded", false, true, true, noImport, upToDate, ready, &snapshotv1.VolumeSnapshot{})
sourcePVC := cdiv1.DataVolumeSourcePVC{
Namespace: cron.Namespace,
Name: dvName,
}
expectedSource := cdiv1.DataSourceSource{
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
Namespace: cron.Namespace,
Name: dvName,
},
}
Expect(dataSource.Spec.Source).To(Equal(expectedSource))
Expect(cron.Status.LastImportedPVC).ToNot(BeNil())
Expect(*cron.Status.LastImportedPVC).To(Equal(sourcePVC))
Expect(cron.Status.LastImportTimestamp).ToNot(BeNil())
// PVCs not around anymore, they are not needed, we are using a snapshot source
pvcList := &corev1.PersistentVolumeClaimList{}
err = reconciler.client.List(context.TODO(), pvcList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(pvcList.Items).To(BeEmpty())
snap = &snapshotv1.VolumeSnapshot{}
err = reconciler.client.Get(context.TODO(), dvKey(dvName), snap)
Expect(err).ToNot(HaveOccurred())
Expect(*snap.Status.ReadyToUse).To(BeTrue())
Expect(*snap.Spec.Source.PersistentVolumeClaimName).To(Equal(dvName))
now := metav1.Now()
cron.DeletionTimestamp = &now
err = reconciler.client.Update(context.TODO(), cron)
Expect(err).ToNot(HaveOccurred())
_, err = reconciler.Reconcile(context.TODO(), cronReq)
Expect(err).ToNot(HaveOccurred())
// Should delete DataSource and DataVolume on DataImportCron deletion as RetentionPolicy is RetainNone
err = reconciler.client.Get(context.TODO(), dataSourceKey(cron), dataSource)
Expect(err).To(HaveOccurred())
dvList := &cdiv1.DataVolumeList{}
err = reconciler.client.List(context.TODO(), dvList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(dvList.Items).To(BeEmpty())
pvcList = &corev1.PersistentVolumeClaimList{}
err = reconciler.client.List(context.TODO(), pvcList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(pvcList.Items).To(BeEmpty())
snapList := &snapshotv1.VolumeSnapshotList{}
err = reconciler.client.List(context.TODO(), snapList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(snapList.Items).To(BeEmpty())
})
})
})
})
@ -775,6 +933,7 @@ func createDataImportCronReconcilerWithoutConfig(objects ...runtime.Object) *Dat
_ = cdiv1.AddToScheme(s)
_ = imagev1.Install(s)
_ = extv1.AddToScheme(s)
_ = snapshotv1.AddToScheme(s)
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objs...).Build()
rec := record.NewFakeRecorder(1)

View File

@ -52,8 +52,9 @@ type DataSourceReconciler struct {
}
const (
ready = "Ready"
noSource = "NoSource"
ready = "Ready"
noSource = "NoSource"
dataSourceControllerName = "datasource-controller"
)
// Reconcile loop for DataSourceReconciler
@ -136,7 +137,7 @@ func (r *DataSourceReconciler) handleSnapshotSource(ctx context.Context, sourceS
}
r.log.Info("Snapshot not found", "name", sourceSnapshot.Name)
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot not found", cc.NotFound)
} else if snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse {
} else if cc.IsSnapshotReady(snapshot) {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionTrue, "DataSource is ready to be consumed", ready)
} else {
updateDataSourceCondition(dataSource, cdiv1.DataSourceReady, corev1.ConditionFalse, "Snapshot phase is not ready", "SnapshotNotReady")
@ -169,12 +170,12 @@ func FindDataSourceConditionByType(ds *cdiv1.DataSource, conditionType cdiv1.Dat
func NewDataSourceController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
reconciler := &DataSourceReconciler{
client: mgr.GetClient(),
recorder: mgr.GetEventRecorderFor(dataImportControllerName),
recorder: mgr.GetEventRecorderFor(dataSourceControllerName),
scheme: mgr.GetScheme(),
log: log.WithName(dataImportControllerName),
log: log.WithName(dataSourceControllerName),
installerLabels: installerLabels,
}
DataSourceController, err := controller.New(dataImportControllerName, mgr, controller.Options{Reconciler: reconciler})
DataSourceController, err := controller.New(dataSourceControllerName, mgr, controller.Options{Reconciler: reconciler})
if err != nil {
return nil, err
}

View File

@ -30,7 +30,6 @@ go_library(
"//vendor/k8s.io/api/authorization/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
"//vendor/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -29,7 +29,6 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -330,7 +329,7 @@ func (r *PvcCloneReconciler) syncClone(log logr.Logger, req reconcile.Request) (
if pvc == nil {
if selectedCloneStrategy == SmartClone {
snapshotClassName, err := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
snapshotClassName, err := cc.GetSnapshotClassForSmartClone(datavolume.Name, pvcSpec.StorageClassName, r.log, r.client)
if err != nil {
return syncRes, err
}
@ -462,7 +461,7 @@ func (r *PvcCloneReconciler) selectCloneStrategy(datavolume *cdiv1.DataVolume, p
return CsiClone, nil
}
} else if preferredCloneStrategy != nil && *preferredCloneStrategy == cdiv1.CloneStrategySnapshot {
snapshotClassName, err := r.getSnapshotClassForSmartClone(datavolume, pvcSpec)
snapshotClassName, err := cc.GetSnapshotClassForSmartClone(datavolume.Name, pvcSpec.StorageClassName, r.log, r.client)
if err != nil {
return NoClone, err
}
@ -728,82 +727,6 @@ func (r *PvcCloneReconciler) cleanup(syncState *dvSyncState) error {
return nil
}
func (r *PvcCloneReconciler) getSnapshotClassForSmartClone(dataVolume *cdiv1.DataVolume, targetStorageSpec *corev1.PersistentVolumeClaimSpec) (string, error) {
log := r.log.WithName("getSnapshotClassForSmartClone").V(3)
// Check if relevant CRDs are available
if !isCsiCrdsDeployed(r.client, r.log) {
log.Info("Missing CSI snapshotter CRDs, falling back to host assisted clone")
return "", nil
}
targetPvcStorageClassName := targetStorageSpec.StorageClassName
targetStorageClass, err := cc.GetStorageClassByName(context.TODO(), r.client, targetPvcStorageClassName)
if err != nil {
return "", err
}
if targetStorageClass == nil {
log.Info("Target PVC's Storage Class not found")
return "", nil
}
targetPvcStorageClassName = &targetStorageClass.Name
// Fetch the source storage class
srcStorageClass := &storagev1.StorageClass{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: *targetPvcStorageClassName}, srcStorageClass); err != nil {
log.Info("Unable to retrieve storage class, falling back to host assisted clone", "storage class", *targetPvcStorageClassName)
return "", err
}
// List the snapshot classes
scs := &snapshotv1.VolumeSnapshotClassList{}
if err := r.client.List(context.TODO(), scs); err != nil {
log.Info("Cannot list snapshot classes, falling back to host assisted clone")
return "", err
}
for _, snapshotClass := range scs.Items {
// Validate association between snapshot class and storage class
if snapshotClass.Driver == srcStorageClass.Provisioner {
log.Info("smart-clone is applicable for datavolume", "datavolume",
dataVolume.Name, "snapshot class", snapshotClass.Name)
return snapshotClass.Name, nil
}
}
log.Info("Could not match snapshotter with storage class, falling back to host assisted clone")
return "", nil
}
// isCsiCrdsDeployed checks whether the CSI snapshotter CRD are deployed
func isCsiCrdsDeployed(c client.Client, log logr.Logger) bool {
version := "v1"
vsClass := "volumesnapshotclasses." + snapshotv1.GroupName
vsContent := "volumesnapshotcontents." + snapshotv1.GroupName
vs := "volumesnapshots." + snapshotv1.GroupName
return isCrdDeployed(c, vsClass, version, log) &&
isCrdDeployed(c, vsContent, version, log) &&
isCrdDeployed(c, vs, version, log)
}
// isCrdDeployed checks whether a CRD is deployed
func isCrdDeployed(c client.Client, name, version string, log logr.Logger) bool {
crd := &extv1.CustomResourceDefinition{}
err := c.Get(context.TODO(), types.NamespacedName{Name: name}, crd)
if err != nil {
if !k8serrors.IsNotFound(err) {
log.Info("Error looking up CRD", "crd name", name, "version", version, "error", err)
}
return false
}
for _, v := range crd.Spec.Versions {
if v.Name == version && v.Served {
return true
}
}
return false
}
// Returns true if methods different from HostAssisted are possible,
// both snapshot and csi volume clone share the same basic requirements
func (r *PvcCloneReconciler) advancedClonePossible(dataVolume *cdiv1.DataVolume, targetStorageSpec *corev1.PersistentVolumeClaimSpec) (bool, error) {

View File

@ -309,7 +309,7 @@ var _ = Describe("All DataVolume Tests", func() {
AnnDefaultStorageClass: "true",
})
reconciler = createCloneReconciler(dv, sc)
snapclass, err := reconciler.getSnapshotClassForSmartClone(dv, dv.Spec.PVC)
snapclass, err := GetSnapshotClassForSmartClone(dv.Name, dv.Spec.PVC.StorageClassName, reconciler.log, reconciler.client)
Expect(err).ToNot(HaveOccurred())
Expect(snapclass).To(BeEmpty())
})
@ -321,7 +321,7 @@ var _ = Describe("All DataVolume Tests", func() {
AnnDefaultStorageClass: "true",
})
reconciler = createCloneReconciler(dv, sc, createVolumeSnapshotContentCrd(), createVolumeSnapshotClassCrd(), createVolumeSnapshotCrd())
snapshotClass, err := reconciler.getSnapshotClassForSmartClone(dv, dv.Spec.PVC)
snapshotClass, err := GetSnapshotClassForSmartClone(dv.Name, dv.Spec.PVC.StorageClassName, reconciler.log, reconciler.client)
Expect(err).ToNot(HaveOccurred())
Expect(snapshotClass).To(BeEmpty())
})
@ -371,7 +371,7 @@ var _ = Describe("All DataVolume Tests", func() {
dv.Spec.PVC.StorageClassName = &scName
pvc := CreatePvcInStorageClass("test", metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimBound)
reconciler = createCloneReconciler(dv, pvc, createVolumeSnapshotContentCrd(), createVolumeSnapshotClassCrd(), createVolumeSnapshotCrd())
snapclass, err := reconciler.getSnapshotClassForSmartClone(dv, dv.Spec.PVC)
snapclass, err := GetSnapshotClassForSmartClone(dv.Name, dv.Spec.PVC.StorageClassName, reconciler.log, reconciler.client)
Expect(err).ToNot(HaveOccurred())
Expect(snapclass).To(BeEmpty())
})
@ -385,7 +385,7 @@ var _ = Describe("All DataVolume Tests", func() {
dv.Spec.PVC.StorageClassName = &scName
pvc := CreatePvcInStorageClass("test", metav1.NamespaceDefault, &scName, nil, nil, corev1.ClaimBound)
reconciler = createCloneReconciler(sc, dv, pvc)
snapclass, err := reconciler.getSnapshotClassForSmartClone(dv, dv.Spec.PVC)
snapclass, err := GetSnapshotClassForSmartClone(dv.Name, dv.Spec.PVC.StorageClassName, reconciler.log, reconciler.client)
Expect(err).ToNot(HaveOccurred())
Expect(snapclass).To(BeEmpty())
})
@ -401,7 +401,7 @@ var _ = Describe("All DataVolume Tests", func() {
expectedSnapshotClass := "snap-class"
snapClass := createSnapshotClass(expectedSnapshotClass, nil, "csi-plugin")
reconciler = createCloneReconciler(sc, dv, pvc, snapClass, createVolumeSnapshotContentCrd(), createVolumeSnapshotClassCrd(), createVolumeSnapshotCrd())
snapclass, err := reconciler.getSnapshotClassForSmartClone(dv, dv.Spec.PVC)
snapclass, err := GetSnapshotClassForSmartClone(dv.Name, dv.Spec.PVC.StorageClassName, reconciler.log, reconciler.client)
Expect(err).ToNot(HaveOccurred())
Expect(snapclass).To(Equal(expectedSnapshotClass))
})

View File

@ -228,7 +228,7 @@ func (r *SmartCloneReconciler) reconcileSnapshot(log logr.Logger, snapshot *snap
return reconcile.Result{}, nil
}
if snapshot.Status == nil || snapshot.Status.ReadyToUse == nil || !*snapshot.Status.ReadyToUse {
if !cc.IsSnapshotReady(snapshot) {
// wait for ready to use
return reconcile.Result{}, nil
}

View File

@ -565,7 +565,7 @@ func (r *SnapshotCloneReconciler) isSnapshotValidForClone(snapshot *snapshotv1.V
r.log.V(3).Info("Snapshot does not have status populated yet")
return false, nil
}
if snapshot.Status.ReadyToUse == nil || !*snapshot.Status.ReadyToUse {
if !cc.IsSnapshotReady(snapshot) {
r.log.V(3).Info("snapshot not ReadyToUse, while we allow this, probably going to be an issue going forward", "namespace", snapshot.Namespace, "name", snapshot.Name)
}
if snapshot.Status.Error != nil {

View File

@ -6,10 +6,12 @@ import (
"reflect"
"github.com/go-logr/logr"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
@ -84,7 +86,12 @@ func (r *StorageProfileReconciler) reconcileStorageProfile(sc *storagev1.Storage
storageProfile.Status.StorageClass = &sc.Name
storageProfile.Status.Provisioner = &sc.Provisioner
storageProfile.Status.CloneStrategy = r.reconcileCloneStrategy(sc, storageProfile.Spec.CloneStrategy)
snapClass, err := cc.GetSnapshotClassForSmartClone("", &sc.Name, r.log, r.client)
if err != nil {
return reconcile.Result{}, err
}
storageProfile.Status.CloneStrategy = r.reconcileCloneStrategy(sc, storageProfile.Spec.CloneStrategy, snapClass)
storageProfile.Status.DataImportCronSourceFormat = r.reconcileDataImportCronSourceFormat(sc, storageProfile.Spec.DataImportCronSourceFormat, snapClass)
var claimPropertySets []cdiv1.ClaimPropertySet
@ -145,7 +152,7 @@ func (r *StorageProfileReconciler) getStorageProfile(sc *storagev1.StorageClass)
func (r *StorageProfileReconciler) reconcilePropertySets(sc *storagev1.StorageClass) []cdiv1.ClaimPropertySet {
claimPropertySets := []cdiv1.ClaimPropertySet{}
capabilities, found := storagecapabilities.Get(r.client, sc)
capabilities, found := storagecapabilities.GetCapabilities(r.client, sc)
if found {
for i := range capabilities {
claimPropertySet := cdiv1.ClaimPropertySet{
@ -158,30 +165,78 @@ func (r *StorageProfileReconciler) reconcilePropertySets(sc *storagev1.StorageCl
return claimPropertySets
}
func (r *StorageProfileReconciler) reconcileCloneStrategy(sc *storagev1.StorageClass, clonestrategy *cdiv1.CDICloneStrategy) *cdiv1.CDICloneStrategy {
if clonestrategy == nil {
if sc.Annotations["cdi.kubevirt.io/clone-strategy"] == "copy" {
strategy := cdiv1.CloneStrategyHostAssisted
return &strategy
} else if sc.Annotations["cdi.kubevirt.io/clone-strategy"] == "snapshot" {
strategy := cdiv1.CloneStrategySnapshot
return &strategy
} else if sc.Annotations["cdi.kubevirt.io/clone-strategy"] == "csi-clone" {
strategy := cdiv1.CloneStrategyCsiClone
return &strategy
} else {
return clonestrategy
}
func (r *StorageProfileReconciler) reconcileCloneStrategy(sc *storagev1.StorageClass, desiredCloneStrategy *cdiv1.CDICloneStrategy, snapClass string) *cdiv1.CDICloneStrategy {
if desiredCloneStrategy != nil {
return desiredCloneStrategy
}
return clonestrategy
if annStrategyVal, ok := sc.Annotations["cdi.kubevirt.io/clone-strategy"]; ok {
return r.getCloneStrategyFromStorageClass(annStrategyVal)
}
// Default to trying snapshot clone unless volume snapshot class missing
hostAssistedStrategy := cdiv1.CloneStrategyHostAssisted
strategy := hostAssistedStrategy
if snapClass != "" {
strategy = cdiv1.CloneStrategySnapshot
}
if knownStrategy, ok := storagecapabilities.GetAdvisedCloneStrategy(sc); ok {
strategy = knownStrategy
}
if strategy == cdiv1.CloneStrategySnapshot && snapClass == "" {
r.log.Info("No VolumeSnapshotClass found for storage class, falling back to host assisted cloning", "StorageClass.Name", sc.Name)
return &hostAssistedStrategy
}
return &strategy
}
func (r *StorageProfileReconciler) getCloneStrategyFromStorageClass(annStrategyVal string) *cdiv1.CDICloneStrategy {
var strategy cdiv1.CDICloneStrategy
switch annStrategyVal {
case "copy":
strategy = cdiv1.CloneStrategyHostAssisted
case "snapshot":
strategy = cdiv1.CloneStrategySnapshot
case "csi-clone":
strategy = cdiv1.CloneStrategyCsiClone
}
return &strategy
}
func (r *StorageProfileReconciler) reconcileDataImportCronSourceFormat(sc *storagev1.StorageClass, desiredFormat *cdiv1.DataImportCronSourceFormat, snapClass string) *cdiv1.DataImportCronSourceFormat {
if desiredFormat != nil {
return desiredFormat
}
// This can be changed later on
// for example, if at some point we're confident snapshot sources should be the default
pvcFormat := cdiv1.DataImportCronSourceFormatPvc
format := pvcFormat
if knownFormat, ok := storagecapabilities.GetAdvisedSourceFormat(sc); ok {
format = knownFormat
}
if format == cdiv1.DataImportCronSourceFormatSnapshot && snapClass == "" {
// No point using snapshots without a corresponding snapshot class
r.log.Info("No VolumeSnapshotClass found for storage class, falling back to pvc sources for DataImportCrons", "StorageClass.Name", sc.Name)
return &pvcFormat
}
return &format
}
func (r *StorageProfileReconciler) createEmptyStorageProfile(sc *storagev1.StorageClass) (*cdiv1.StorageProfile, error) {
storageProfile := MakeEmptyStorageProfileSpec(sc.Name)
util.SetRecommendedLabels(storageProfile, r.installerLabels, "cdi-controller")
// uncachedClient is used to directly get the resource, SetOwnerRuntime requires some cluster-scoped resources
// normal/cached client does list resource, a cdi user might not have the rights to list cluster scope resource
// uncachedClient is used to directly get the config map
// the controller runtime client caches objects that are read once, and thus requires a list/watch
// should be cheaper than watching
if err := operator.SetOwnerRuntime(r.uncachedClient, storageProfile); err != nil {
return nil, err
}
@ -285,9 +340,11 @@ func addStorageProfileControllerWatches(mgr manager.Manager, c controller.Contro
if err := c.Watch(&source.Kind{Type: &storagev1.StorageClass{}}, &handler.EnqueueRequestForObject{}); err != nil {
return err
}
if err := c.Watch(&source.Kind{Type: &cdiv1.StorageProfile{}}, &handler.EnqueueRequestForObject{}); err != nil {
return err
}
if err := c.Watch(&source.Kind{Type: &v1.PersistentVolume{}}, handler.EnqueueRequestsFromMapFunc(
func(obj client.Object) []reconcile.Request {
return []reconcile.Request{{
@ -302,6 +359,36 @@ func addStorageProfileControllerWatches(mgr manager.Manager, c controller.Contro
}); err != nil {
return err
}
mapSnapshotClassToProfile := func(obj client.Object) (reqs []reconcile.Request) {
var scList storagev1.StorageClassList
if err := mgr.GetClient().List(context.TODO(), &scList); err != nil {
c.GetLogger().Error(err, "Unable to list StorageClasses")
return
}
vsc := obj.(*snapshotv1.VolumeSnapshotClass)
for _, sc := range scList.Items {
if sc.Provisioner == vsc.Driver {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Name: sc.Name}})
}
}
return
}
if err := mgr.GetClient().List(context.TODO(), &snapshotv1.VolumeSnapshotClassList{}, &client.ListOptions{Limit: 1}); err != nil {
if meta.IsNoMatchError(err) {
// Back out if there's no point to attempt watch
return nil
}
if !cc.IsErrCacheNotStarted(err) {
return err
}
}
if err := c.Watch(&source.Kind{Type: &snapshotv1.VolumeSnapshotClass{}},
handler.EnqueueRequestsFromMapFunc(mapSnapshotClassToProfile),
); err != nil {
return err
}
return nil
}

View File

@ -19,7 +19,9 @@ package controller
import (
"context"
"fmt"
"reflect"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
@ -30,6 +32,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
v1 "k8s.io/api/core/v1"
extv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -311,8 +314,61 @@ var _ = Describe("Storage profile controller reconcile loop", func() {
table.Entry("Clone", cdiv1.CloneStrategyCsiClone),
)
table.DescribeTable("should set advised source format for dataimportcrons", func(provisioner string, expectedFormat cdiv1.DataImportCronSourceFormat, deploySnapClass bool) {
storageClass := CreateStorageClassWithProvisioner(storageClassName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, provisioner)
reconciler := createStorageProfileReconciler(storageClass, createVolumeSnapshotContentCrd(), createVolumeSnapshotClassCrd(), createVolumeSnapshotCrd())
if deploySnapClass {
snapClass := createSnapshotClass(storageClassName+"-snapclass", nil, provisioner)
err := reconciler.client.Create(context.TODO(), snapClass)
Expect(err).ToNot(HaveOccurred())
}
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: storageClassName}})
Expect(err).ToNot(HaveOccurred())
storageProfileList := &cdiv1.StorageProfileList{}
err = reconciler.client.List(context.TODO(), storageProfileList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(storageProfileList.Items).To(HaveLen(1))
sp := storageProfileList.Items[0]
Expect(*sp.Status.StorageClass).To(Equal(storageClassName))
Expect(*sp.Status.DataImportCronSourceFormat).To(Equal(expectedFormat))
},
table.Entry("provisioners where snapshot source is more appropriate", "rook-ceph.rbd.csi.ceph.com", cdiv1.DataImportCronSourceFormatSnapshot, true),
table.Entry("provisioners where snapshot source is more appropriate but lack volumesnapclass", "rook-ceph.rbd.csi.ceph.com", cdiv1.DataImportCronSourceFormatPvc, false),
table.Entry("provisioners where there is no known preferred format", "format.unknown.provisioner.csi.com", cdiv1.DataImportCronSourceFormatPvc, false),
)
table.DescribeTable("should set cloneStrategy", func(provisioner string, expectedCloneStrategy cdiv1.CDICloneStrategy, deploySnapClass bool) {
storageClass := CreateStorageClassWithProvisioner(storageClassName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, provisioner)
reconciler := createStorageProfileReconciler(storageClass, createVolumeSnapshotContentCrd(), createVolumeSnapshotClassCrd(), createVolumeSnapshotCrd())
if deploySnapClass {
snapClass := createSnapshotClass(storageClassName+"-snapclass", nil, provisioner)
err := reconciler.client.Create(context.TODO(), snapClass)
Expect(err).ToNot(HaveOccurred())
}
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: storageClassName}})
Expect(err).ToNot(HaveOccurred())
storageProfileList := &cdiv1.StorageProfileList{}
err = reconciler.client.List(context.TODO(), storageProfileList, &client.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(storageProfileList.Items).To(HaveLen(1))
sp := storageProfileList.Items[0]
Expect(*sp.Status.StorageClass).To(Equal(storageClassName))
Expect(*sp.Status.CloneStrategy).To(Equal(expectedCloneStrategy))
},
table.Entry("provisioner with volumesnapshotclass and no known advised strategy", "strategy.unknown.provisioner.csi.com", cdiv1.CloneStrategySnapshot, true),
table.Entry("provisioner without volumesnapshotclass and no known advised strategy", "strategy.unknown.provisioner.csi.com", cdiv1.CloneStrategyHostAssisted, false),
table.Entry("provisioner that is known to prefer csi clone", "csi-powermax.dellemc.com", cdiv1.CloneStrategyCsiClone, false),
)
table.DescribeTable("Should set the IncompleteProfileGauge correctly", func(provisioner string, count int) {
reconciler := createStorageProfileReconciler(CreateStorageClassWithProvisioner(storageClassName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, provisioner))
storageClass := CreateStorageClassWithProvisioner(storageClassName, map[string]string{AnnDefaultStorageClass: "true"}, map[string]string{}, provisioner)
reconciler := createStorageProfileReconciler(storageClass)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: storageClassName}})
Expect(err).ToNot(HaveOccurred())
storageProfileList := &cdiv1.StorageProfileList{}
@ -343,6 +399,8 @@ func createStorageProfileReconciler(objects ...runtime.Object) *StorageProfileRe
// Register operator types with the runtime scheme.
s := scheme.Scheme
_ = cdiv1.AddToScheme(s)
_ = snapshotv1.AddToScheme(s)
_ = extv1.AddToScheme(s)
// Create a fake client to mock API calls.
cl := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(objs...).Build()
@ -379,3 +437,98 @@ func CreatePv(name string, storageClassName string) *v1.PersistentVolume {
}
return pv
}
func createSnapshotClass(name string, annotations map[string]string, snapshotter string) *snapshotv1.VolumeSnapshotClass {
return &snapshotv1.VolumeSnapshotClass{
TypeMeta: metav1.TypeMeta{
Kind: "VolumeSnapshotClass",
APIVersion: snapshotv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: annotations,
},
Driver: snapshotter,
}
}
func createVolumeSnapshotContentCrd() *extv1.CustomResourceDefinition {
pluralName := "volumesnapshotcontents"
return &extv1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
Kind: "CustomResourceDefinition",
APIVersion: extv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: pluralName + "." + snapshotv1.GroupName,
},
Spec: extv1.CustomResourceDefinitionSpec{
Group: snapshotv1.GroupName,
Scope: extv1.ClusterScoped,
Names: extv1.CustomResourceDefinitionNames{
Plural: pluralName,
Kind: reflect.TypeOf(snapshotv1.VolumeSnapshotContent{}).Name(),
},
Versions: []extv1.CustomResourceDefinitionVersion{
{
Name: snapshotv1.SchemeGroupVersion.Version,
Served: true,
},
},
},
}
}
func createVolumeSnapshotClassCrd() *extv1.CustomResourceDefinition {
pluralName := "volumesnapshotclasses"
return &extv1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
Kind: "CustomResourceDefinition",
APIVersion: extv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: pluralName + "." + snapshotv1.GroupName,
},
Spec: extv1.CustomResourceDefinitionSpec{
Group: snapshotv1.GroupName,
Scope: extv1.ClusterScoped,
Names: extv1.CustomResourceDefinitionNames{
Plural: pluralName,
Kind: reflect.TypeOf(snapshotv1.VolumeSnapshotClass{}).Name(),
},
Versions: []extv1.CustomResourceDefinitionVersion{
{
Name: snapshotv1.SchemeGroupVersion.Version,
Served: true,
},
},
},
}
}
func createVolumeSnapshotCrd() *extv1.CustomResourceDefinition {
pluralName := "volumesnapshots"
return &extv1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
Kind: "CustomResourceDefinition",
APIVersion: extv1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: pluralName + "." + snapshotv1.GroupName,
},
Spec: extv1.CustomResourceDefinitionSpec{
Group: snapshotv1.GroupName,
Scope: extv1.NamespaceScoped,
Names: extv1.CustomResourceDefinitionNames{
Plural: pluralName,
Kind: reflect.TypeOf(snapshotv1.VolumeSnapshot{}).Name(),
},
Versions: []extv1.CustomResourceDefinitionVersion{
{
Name: snapshotv1.SchemeGroupVersion.Version,
Served: true,
},
},
},
}
}

View File

@ -6879,6 +6879,10 @@ spec:
description: CloneStrategy defines the preferred method for performing
a CDI clone
type: string
dataImportCronSourceFormat:
description: DataImportCronSourceFormat defines the format of the
DataImportCron-created disk image sources
type: string
type: object
status:
description: StorageProfileStatus provides the most recently observed
@ -6908,6 +6912,10 @@ spec:
description: CloneStrategy defines the preferred method for performing
a CDI clone
type: string
dataImportCronSourceFormat:
description: DataImportCronSourceFormat defines the format of the
DataImportCron-created disk image sources
type: string
provisioner:
description: The Storage class provisioner plugin name
type: string

View File

@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/util:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/storage/v1:go_default_library",
"//vendor/sigs.k8s.io/controller-runtime/pkg/client:go_default_library",

View File

@ -10,6 +10,8 @@ import (
storagev1 "k8s.io/api/storage/v1"
"kubevirt.io/containerized-data-importer/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/client"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
)
// StorageCapabilities is a simple holder of storage capabilities (accessMode etc.)
@ -97,6 +99,23 @@ var CapabilitiesByProvisionerKey = map[string][]StorageCapabilities{
"csi.ovirt.org": createRWOBlockAndFilesystemCapabilities(),
}
// SourceFormatsByProvisionerKey defines the advised data import cron source format
// Certain storage provisioners will scale better cloning from a single source VolumeSnapshot source
var SourceFormatsByProvisionerKey = map[string]cdiv1.DataImportCronSourceFormat{
"rook-ceph.rbd.csi.ceph.com": cdiv1.DataImportCronSourceFormatSnapshot,
"openshift-storage.rbd.csi.ceph.com": cdiv1.DataImportCronSourceFormatSnapshot,
}
// CloneStrategyByProvisionerKey defines the advised clone strategy for a provisioner
var CloneStrategyByProvisionerKey = map[string]cdiv1.CDICloneStrategy{
"csi-vxflexos.dellemc.com": cdiv1.CloneStrategyCsiClone,
"csi-isilon.dellemc.com": cdiv1.CloneStrategyCsiClone,
"csi-powermax.dellemc.com": cdiv1.CloneStrategyCsiClone,
"csi-powerstore.dellemc.com": cdiv1.CloneStrategyCsiClone,
"hspc.csi.hitachi.com": cdiv1.CloneStrategyCsiClone,
"csi.hpe.com": cdiv1.CloneStrategyCsiClone,
}
// ProvisionerNoobaa is the provisioner string for the Noobaa object bucket provisioner which does not work with CDI
const ProvisionerNoobaa = "openshift-storage.noobaa.io/obc"
@ -107,8 +126,8 @@ var UnsupportedProvisioners = map[string]struct{}{
ProvisionerNoobaa: {},
}
// Get finds and returns a predefined StorageCapabilities for a given StorageClass
func Get(cl client.Client, sc *storagev1.StorageClass) ([]StorageCapabilities, bool) {
// GetCapabilities finds and returns a predefined StorageCapabilities for a given StorageClass
func GetCapabilities(cl client.Client, sc *storagev1.StorageClass) ([]StorageCapabilities, bool) {
provisionerKey := storageProvisionerKey(sc)
if provisionerKey == "kubernetes.io/no-provisioner" {
return capabilitiesForNoProvisioner(cl, sc)
@ -117,6 +136,20 @@ func Get(cl client.Client, sc *storagev1.StorageClass) ([]StorageCapabilities, b
return capabilities, found
}
// GetAdvisedSourceFormat finds and returns the advised format for dataimportcron sources
func GetAdvisedSourceFormat(sc *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, bool) {
provisionerKey := storageProvisionerKey(sc)
format, found := SourceFormatsByProvisionerKey[provisionerKey]
return format, found
}
// GetAdvisedCloneStrategy finds and returns the advised clone strategy
func GetAdvisedCloneStrategy(sc *storagev1.StorageClass) (cdiv1.CDICloneStrategy, bool) {
provisionerKey := storageProvisionerKey(sc)
strategy, found := CloneStrategyByProvisionerKey[provisionerKey]
return strategy, found
}
func isLocalStorageOperator(sc *storagev1.StorageClass) bool {
_, found := sc.Labels["local.storage.openshift.io/owner-name"]
return found

View File

@ -414,6 +414,8 @@ type StorageProfileSpec struct {
CloneStrategy *CDICloneStrategy `json:"cloneStrategy,omitempty"`
// ClaimPropertySets is a provided set of properties applicable to PVC
ClaimPropertySets []ClaimPropertySet `json:"claimPropertySets,omitempty"`
// DataImportCronSourceFormat defines the format of the DataImportCron-created disk image sources
DataImportCronSourceFormat *DataImportCronSourceFormat `json:"dataImportCronSourceFormat,omitempty"`
}
// StorageProfileStatus provides the most recently observed status of the StorageProfile
@ -426,6 +428,8 @@ type StorageProfileStatus struct {
CloneStrategy *CDICloneStrategy `json:"cloneStrategy,omitempty"`
// ClaimPropertySets computed from the spec and detected in the system
ClaimPropertySets []ClaimPropertySet `json:"claimPropertySets,omitempty"`
// DataImportCronSourceFormat defines the format of the DataImportCron-created disk image sources
DataImportCronSourceFormat *DataImportCronSourceFormat `json:"dataImportCronSourceFormat,omitempty"`
}
// ClaimPropertySet is a set of properties applicable to PVC
@ -834,6 +838,17 @@ const (
CloneStrategyCsiClone CDICloneStrategy = "csi-clone"
)
// DataImportCronSourceFormat defines the format of the DataImportCron-created disk image sources
type DataImportCronSourceFormat string
const (
// DataImportCronSourceFormatSnapshot implies using a VolumeSnapshot as the resulting DataImportCron disk image source
DataImportCronSourceFormatSnapshot DataImportCronSourceFormat = "snapshot"
// DataImportCronSourceFormatPvc implies using a PVC as the resulting DataImportCron disk image source
DataImportCronSourceFormatPvc DataImportCronSourceFormat = "pvc"
)
// CDIUninstallStrategy defines the state to leave CDI on uninstall
type CDIUninstallStrategy string

View File

@ -180,19 +180,21 @@ func (StorageProfile) SwaggerDoc() map[string]string {
func (StorageProfileSpec) SwaggerDoc() map[string]string {
return map[string]string{
"": "StorageProfileSpec defines specification for StorageProfile",
"cloneStrategy": "CloneStrategy defines the preferred method for performing a CDI clone",
"claimPropertySets": "ClaimPropertySets is a provided set of properties applicable to PVC",
"": "StorageProfileSpec defines specification for StorageProfile",
"cloneStrategy": "CloneStrategy defines the preferred method for performing a CDI clone",
"claimPropertySets": "ClaimPropertySets is a provided set of properties applicable to PVC",
"dataImportCronSourceFormat": "DataImportCronSourceFormat defines the format of the DataImportCron-created disk image sources",
}
}
func (StorageProfileStatus) SwaggerDoc() map[string]string {
return map[string]string{
"": "StorageProfileStatus provides the most recently observed status of the StorageProfile",
"storageClass": "The StorageClass name for which capabilities are defined",
"provisioner": "The Storage class provisioner plugin name",
"cloneStrategy": "CloneStrategy defines the preferred method for performing a CDI clone",
"claimPropertySets": "ClaimPropertySets computed from the spec and detected in the system",
"": "StorageProfileStatus provides the most recently observed status of the StorageProfile",
"storageClass": "The StorageClass name for which capabilities are defined",
"provisioner": "The Storage class provisioner plugin name",
"cloneStrategy": "CloneStrategy defines the preferred method for performing a CDI clone",
"claimPropertySets": "ClaimPropertySets computed from the spec and detected in the system",
"dataImportCronSourceFormat": "DataImportCronSourceFormat defines the format of the DataImportCron-created disk image sources",
}
}

View File

@ -1494,6 +1494,11 @@ func (in *StorageProfileSpec) DeepCopyInto(out *StorageProfileSpec) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.DataImportCronSourceFormat != nil {
in, out := &in.DataImportCronSourceFormat, &out.DataImportCronSourceFormat
*out = new(DataImportCronSourceFormat)
**out = **in
}
return
}
@ -1532,6 +1537,11 @@ func (in *StorageProfileStatus) DeepCopyInto(out *StorageProfileStatus) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.DataImportCronSourceFormat != nil {
in, out := &in.DataImportCronSourceFormat, &out.DataImportCronSourceFormat
*out = new(DataImportCronSourceFormat)
**out = **in
}
return
}

View File

@ -91,28 +91,11 @@ var _ = Describe("Clone Populator tests", func() {
Expect(err).ToNot(HaveOccurred())
snapClass := f.GetSnapshotClass()
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: sourceName,
Namespace: f.Namespace.Name,
},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &pvc.Name,
},
VolumeSnapshotClassName: &snapClass.Name,
},
}
snapshot := utils.NewVolumeSnapshot(sourceName, f.Namespace.Name, pvc.Name, &snapClass.Name)
err = f.CrClient.Create(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
Eventually(func() bool {
err = f.CrClient.Get(context.TODO(), client.ObjectKeyFromObject(snapshot), snapshot)
if err != nil {
return false
}
return snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse
}, 10*time.Second, 1*time.Second).Should(BeTrue())
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
By("Snapshot ready, no need to keep PVC around")
err = f.DeletePVC(pvc)
Expect(err).ToNot(HaveOccurred())

View File

@ -1470,6 +1470,7 @@ var _ = Describe("all clone tests", func() {
Context("CloneStrategy on storageclass annotation", func() {
cloneType := cdiv1.CloneStrategyCsiClone
var originalStrategy *cdiv1.CDICloneStrategy
BeforeEach(func() {
if !f.IsCSIVolumeCloneStorageClassAvailable() {
@ -1479,8 +1480,8 @@ var _ = Describe("all clone tests", func() {
By(fmt.Sprintf("Get original storage profile: %s", cloneStorageClassName))
storageProfile, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), cloneStorageClassName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
originalStrategy = storageProfile.Status.CloneStrategy
Expect(storageProfile.Spec.CloneStrategy).To(BeNil())
Expect(storageProfile.Status.CloneStrategy).To(BeNil())
storageclass, err := f.K8sClient.StorageV1().StorageClasses().Get(context.TODO(), cloneStorageClassName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
@ -1501,6 +1502,7 @@ var _ = Describe("all clone tests", func() {
}, time.Minute, time.Second).Should(Equal(cloneType))
})
AfterEach(func() {
By("[AfterEach] Restore the storage class - remove annotation ")
storageclass, err := f.K8sClient.StorageV1().StorageClasses().Get(context.TODO(), cloneStorageClassName, metav1.GetOptions{})
@ -1514,10 +1516,10 @@ var _ = Describe("all clone tests", func() {
storageProfile, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), cloneStorageClassName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return storageProfile.Status.CloneStrategy
}, time.Minute, time.Second).Should(BeNil())
}, time.Minute, time.Second).Should(Equal(originalStrategy))
})
It("Should clone with correct strategy from storageclass annotation ", func() {
It("Should clone with correct strategy from storageclass annotation ", func() {
pvcDef := utils.NewPVCDefinition(sourcePVCName, "1Gi", nil, nil)
pvcDef.Spec.StorageClassName = &cloneStorageClassName
pvcDef.Namespace = f.Namespace.Name
@ -2628,31 +2630,17 @@ var _ = Describe("all clone tests", func() {
Expect(err).ToNot(HaveOccurred())
snapClass := f.GetSnapshotClass()
snapshot = &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "snap-" + snapSourceDv.Name,
Namespace: f.Namespace.Name,
},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &pvc.Name,
},
VolumeSnapshotClassName: &snapClass.Name,
},
}
snapshot = utils.NewVolumeSnapshot("snap-"+snapSourceDv.Name, f.Namespace.Name, pvc.Name, &snapClass.Name)
err = f.CrClient.Create(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
Eventually(func() bool {
err = f.CrClient.Get(context.TODO(), client.ObjectKeyFromObject(snapshot), snapshot)
if err != nil {
return false
}
return snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse
}, 10*time.Second, 1*time.Second).Should(BeTrue())
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
By("Snapshot ready, no need to keep PVC around")
err = f.DeletePVC(pvc)
Expect(err).ToNot(HaveOccurred())
deleted, err := utils.WaitPVCDeleted(f.K8sClient, pvc.Name, f.Namespace.Name, 2*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
}
BeforeEach(func() {

View File

@ -6,7 +6,9 @@ import (
"time"
cdiclientset "kubevirt.io/containerized-data-importer/pkg/client/clientset/versioned"
"sigs.k8s.io/controller-runtime/pkg/client"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
@ -36,21 +38,30 @@ const (
var _ = Describe("DataImportCron", func() {
var (
f = framework.NewFramework(namespacePrefix)
log = logf.Log.WithName("dataimportcron_test")
dataSourceName = "datasource-test"
pollerPodName = "poller"
cronName = "cron-test"
cron *cdiv1.DataImportCron
reg *cdiv1.DataVolumeSourceRegistry
err error
ns string
f = framework.NewFramework(namespacePrefix)
log = logf.Log.WithName("dataimportcron_test")
dataSourceName = "datasource-test"
pollerPodName = "poller"
cronName = "cron-test"
cron *cdiv1.DataImportCron
reg *cdiv1.DataVolumeSourceRegistry
err error
ns string
scName string
originalProfileSpec *cdiv1.StorageProfileSpec
)
BeforeEach(func() {
ns = f.Namespace.Name
reg, err = getDataVolumeSourceRegistry(f)
Expect(err).ToNot(HaveOccurred())
scName = utils.DefaultStorageClass.GetName()
By(fmt.Sprintf("Get original storage profile: %s", scName))
spec, err := utils.GetStorageProfileSpec(f.CdiClient, scName)
Expect(err).ToNot(HaveOccurred())
originalProfileSpec = spec
})
AfterEach(func() {
@ -59,6 +70,9 @@ var _ = Describe("DataImportCron", func() {
}
err = utils.DeletePodByName(f.K8sClient, pollerPodName, f.CdiInstallNs, nil)
Expect(err).ToNot(HaveOccurred())
By("[AfterEach] Restore the profile")
Expect(utils.UpdateStorageProfile(f.CrClient, scName, *originalProfileSpec)).Should(Succeed())
})
updateDigest := func(digest string) func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
@ -89,7 +103,135 @@ var _ = Describe("DataImportCron", func() {
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
}
table.DescribeTable("should", func(retention, createErrorDv bool, repeat int) {
configureStorageProfileResultingFormat := func(format cdiv1.DataImportCronSourceFormat) {
By(fmt.Sprintf("configure storage profile %s", scName))
newProfileSpec := originalProfileSpec.DeepCopy()
newProfileSpec.DataImportCronSourceFormat = &format
err := utils.UpdateStorageProfile(f.CrClient, scName, *newProfileSpec)
Expect(err).ToNot(HaveOccurred())
Eventually(func() *cdiv1.DataImportCronSourceFormat {
profile, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), scName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return profile.Status.DataImportCronSourceFormat
}, 15*time.Second, time.Second).Should(Equal(&format))
}
verifySourceReady := func(format cdiv1.DataImportCronSourceFormat, name string) metav1.Object {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By(fmt.Sprintf("Verify pvc was created %s", name))
pvc, err := utils.WaitForPVC(f.K8sClient, ns, name)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
return pvc
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
}
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
deleted, err := utils.WaitPVCDeleted(f.K8sClient, name, ns, 2*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
// check pvc is not recreated
Consistently(func() error {
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
return err
}, 5*time.Second, 1*time.Second).Should(
SatisfyAll(HaveOccurred(), WithTransform(errors.IsNotFound, BeTrue())),
"PVC should not have been recreated",
)
return snapshot
}
return nil
}
deleteSource := func(format cdiv1.DataImportCronSourceFormat, name string) {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
deleteDvPvc(f, name)
deleted, err := f.WaitPVCDeletedByUID(pvc, time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
}
// Probably good to ensure deletion by UID here too but re-import is long enough
// to not cause errors
Eventually(func() bool {
err := f.CrClient.Delete(context.TODO(), snapshot)
return err != nil && errors.IsNotFound(err)
}, time.Minute, time.Second).Should(BeTrue())
}
}
getDataSourceName := func(format cdiv1.DataImportCronSourceFormat, ds *cdiv1.DataSource) string {
var sourceName string
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
sourceName = ds.Spec.Source.PVC.Name
case cdiv1.DataImportCronSourceFormatSnapshot:
sourceName = ds.Spec.Source.Snapshot.Name
}
return sourceName
}
verifyRetention := func(format cdiv1.DataImportCronSourceFormat, name string) {
By("Verify DataSource retention")
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
Consistently(func() *metav1.Time {
src := verifySourceReady(format, name)
return src.GetDeletionTimestamp()
}, 5*time.Second, time.Second).Should(BeNil())
}
verifyDeletion := func(format cdiv1.DataImportCronSourceFormat) {
By("Verify DataSource deletion")
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not deleted")
By("Verify sources deleted")
Eventually(func(g Gomega) bool {
pvcs, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
g.Expect(err).ToNot(HaveOccurred())
return len(pvcs.Items) == 0
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "PVCs were not deleted")
if format == cdiv1.DataImportCronSourceFormatSnapshot {
snapshots := &snapshotv1.VolumeSnapshotList{}
Eventually(func(g Gomega) bool {
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
g.Expect(err).ToNot(HaveOccurred())
return len(snapshots.Items) == 0
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "snapshots were not deleted")
}
}
table.DescribeTable("should", func(retention, createErrorDv bool, repeat int, format cdiv1.DataImportCronSourceFormat) {
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
configureStorageProfileResultingFormat(format)
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
@ -135,8 +277,8 @@ var _ = Describe("DataImportCron", func() {
By("Reset desired digest")
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(""))).Should(BeNil())
By("Delete last import PVC " + currentImportDv)
deleteDvPvc(f, currentImportDv)
By(fmt.Sprintf("Delete last import %s, format: %s", currentImportDv, format))
deleteSource(format, currentImportDv)
lastImportDv = ""
By("Wait for non-empty desired digest")
@ -151,13 +293,7 @@ var _ = Describe("DataImportCron", func() {
Expect(currentImportDv).ToNot(Equal(lastImportDv))
lastImportDv = currentImportDv
By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
currentPvc, err := utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
currentSource := verifySourceReady(format, currentImportDv)
By("Verify DataSource was updated")
var dataSource *cdiv1.DataSource
@ -169,29 +305,33 @@ var _ = Describe("DataImportCron", func() {
Expect(err).ToNot(HaveOccurred())
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
return readyCond != nil && readyCond.Status == corev1.ConditionTrue &&
dataSource.Spec.Source.PVC != nil && dataSource.Spec.Source.PVC.Name == currentImportDv
getDataSourceName(format, dataSource) == currentImportDv
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not updated")
By("Verify cron was updated")
Expect(cron.Status.LastImportedPVC).ToNot(BeNil())
Expect(cron.Status.LastImportedPVC.Name).To(Equal(currentImportDv))
By("Update DataSource pvc with dummy name")
dataSource.Spec.Source.PVC.Name = "dummy"
By("Update DataSource to refer to a dummy name")
retryOnceOnErr(
updateDataSource(f.CdiClient, ns, cron.Spec.ManagedDataSource,
func(dataSource *cdiv1.DataSource) *cdiv1.DataSource {
dataSource.Spec.Source.PVC.Name = "dummy"
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
dataSource.Spec.Source.PVC.Name = "dummy"
case cdiv1.DataImportCronSourceFormatSnapshot:
dataSource.Spec.Source.Snapshot.Name = "dummy"
}
return dataSource
},
)).Should(BeNil())
By("Verify DataSource pvc name was reconciled")
By("Verify name on DataSource was reconciled")
Eventually(func() bool {
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return dataSource.Spec.Source.PVC.Name == currentImportDv
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource pvc name was not reconciled")
return getDataSourceName(format, dataSource) == currentImportDv
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource name was not reconciled")
By("Delete DataSource")
err = f.CdiClient.CdiV1beta1().DataSources(ns).Delete(context.TODO(), dataSourceName, metav1.DeleteOptions{})
@ -202,17 +342,11 @@ var _ = Describe("DataImportCron", func() {
return err == nil && ds.UID != dataSource.UID
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not re-created")
By("Delete last imported PVC")
deleteDvPvc(f, currentPvc.Name)
By("Verify last imported PVC was re-created")
Eventually(func() bool {
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), currentPvc.Name, metav1.GetOptions{})
return err == nil && pvc.UID != currentPvc.UID
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Last imported PVC was not re-created")
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
By("Delete last imported source")
deleteSource(format, currentSource.GetName())
By("Verify last imported source was re-created")
recreatedSource := verifySourceReady(format, currentSource.GetName())
Expect(recreatedSource.GetUID()).ToNot(Equal(currentSource.GetUID()), "Last imported source was not re-created")
}
lastImportedPVC := cron.Status.LastImportedPVC
@ -222,31 +356,16 @@ var _ = Describe("DataImportCron", func() {
Expect(err).ToNot(HaveOccurred())
if retention {
By("Verify DataSource retention")
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify last PVC retention")
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), lastImportedPVC.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
verifyRetention(format, lastImportedPVC.Name)
} else {
By("Verify DataSource deletion")
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not deleted")
By("Verify PVCs deletion")
Eventually(func() bool {
pvcs, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcs.Items) == 0
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "PVCs were not deleted")
verifyDeletion(format)
}
},
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", true, false, 1),
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", true, false, 2),
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2),
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatPvc),
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatPvc),
table.Entry("[test_id:10031] succeed importing initially into a snapshot from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatSnapshot),
table.Entry("[test_id:10032] succeed importing to a snapshot from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatSnapshot),
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2, cdiv1.DataImportCronSourceFormatPvc),
)
It("[test_id:10040] Should get digest updated by external poller", func() {
@ -279,6 +398,8 @@ var _ = Describe("DataImportCron", func() {
})
It("[test_id:XXXX] Should allow an empty schedule to trigger an external update to the source", func() {
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
By("Create DataImportCron with empty schedule")
cron = utils.NewDataImportCron(cronName, "5Gi", emptySchedule, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
@ -319,20 +440,54 @@ var _ = Describe("DataImportCron", func() {
Expect(errors.IsNotFound(err)).To(BeTrue())
})
It("[test_id:7406] succeed garbage collecting old PVCs when importing new ones", func() {
garbagePVCs := 3
for i := 0; i < garbagePVCs; i++ {
pvcName := fmt.Sprintf("pvc-garbage-%d", i)
By(fmt.Sprintf("Create %s", pvcName))
pvc := utils.NewPVCDefinition(pvcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
table.DescribeTable("Succeed garbage collecting sources when importing new ones", func(format cdiv1.DataImportCronSourceFormat) {
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(pvcList.Items).To(HaveLen(garbagePVCs))
configureStorageProfileResultingFormat(format)
garbageSources := 3
for i := 0; i < garbageSources; i++ {
srcName := fmt.Sprintf("src-garbage-%d", i)
By(fmt.Sprintf("Create %s", srcName))
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvc := utils.NewPVCDefinition(srcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
case cdiv1.DataImportCronSourceFormatSnapshot:
pvc := utils.NewPVCDefinition(srcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
snapClass := f.GetSnapshotClass()
snapshot := utils.NewVolumeSnapshot(srcName, ns, pvc.Name, &snapClass.Name)
snapshot.SetAnnotations(map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)})
snapshot.SetLabels(map[string]string{common.DataImportCronLabel: cronName})
err = f.CrClient.Create(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
utils.WaitSnapshotReady(f.CrClient, snapshot)
err = f.DeletePVC(pvc)
Expect(err).ToNot(HaveOccurred())
deleted, err := utils.WaitPVCDeleted(f.K8sClient, srcName, ns, 2*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
}
}
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(pvcList.Items).To(HaveLen(garbageSources))
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshots := &snapshotv1.VolumeSnapshotList{}
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expect(err).ToNot(HaveOccurred())
Expect(snapshots.Items).To(HaveLen(garbageSources))
}
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "1Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
@ -347,36 +502,59 @@ var _ = Describe("DataImportCron", func() {
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
currentPvc, err := utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
currentSource := verifySourceReady(format, currentImportDv)
By("Check garbage collection")
Eventually(func() int {
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcList.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
By("Check last import PVC is timestamped and not garbage collected")
pvcFound := false
for _, pvc := range pvcList.Items {
if pvc.UID == currentPvc.UID {
lastUse := pvc.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvcList := &corev1.PersistentVolumeClaimList{}
Eventually(func() int {
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
pvcFound = true
break
return len(pvcList.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
By("Check last import PVC is timestamped and not garbage collected")
found := false
for _, pvc := range pvcList.Items {
if pvc.UID == currentSource.GetUID() {
lastUse := pvc.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
found = true
break
}
}
Expect(found).To(BeTrue())
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshots := &snapshotv1.VolumeSnapshotList{}
Eventually(func(g Gomega) int {
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
g.Expect(err).ToNot(HaveOccurred())
return len(snapshots.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
By("Check last import snapshot is timestamped and not garbage collected")
found := false
for _, snap := range snapshots.Items {
if snap.UID == currentSource.GetUID() {
lastUse := snap.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
found = true
break
}
}
Expect(found).To(BeTrue())
}
Expect(pvcFound).To(BeTrue())
})
},
table.Entry("[test_id:7406] with PVC sources", cdiv1.DataImportCronSourceFormatPvc),
table.Entry("[test_id:10033] with snapshot sources", cdiv1.DataImportCronSourceFormatSnapshot),
)
It("[test_id:8033] should delete jobs on deletion", func() {
noSuchCM := "nosuch"
@ -456,6 +634,77 @@ var _ = Describe("DataImportCron", func() {
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob first job pod was not deleted")
})
Context("Change source format of existing DataImportCron", func() {
It("[test_id:10034] Should switch from PVC to snapshot sources once format changes", func() {
if !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCronWithStorageSpec(cronName, "1Gi", scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
_ = verifySourceReady(cdiv1.DataImportCronSourceFormatPvc, currentImportDv)
snapshots := &snapshotv1.VolumeSnapshotList{}
err = f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expect(err).ToNot(HaveOccurred())
Expect(snapshots.Items).To(BeEmpty())
// Now simulate an upgrade, where a new CDI version has identified
// more storage types that scale better with snapshots
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatSnapshot)
// Switches to not ready because the snapshot wasn't created yet
waitForConditions(corev1.ConditionFalse, corev1.ConditionFalse)
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
// Check snapshot now exists and PVC is gone
currentSource := verifySourceReady(cdiv1.DataImportCronSourceFormatSnapshot, currentImportDv)
// DataSource is updated to point to a snapshot
dataSource, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
Expect(readyCond.Status).To(Equal(corev1.ConditionTrue))
expectedSource := cdiv1.DataSourceSource{
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
Name: currentSource.GetName(),
Namespace: currentSource.GetNamespace(),
},
}
Expect(dataSource.Spec.Source).To(Equal(expectedSource))
// Verify content
targetDV := utils.NewDataVolumeWithSourceRefAndStorageAPI("target-dv", "1Gi", dataSource.Namespace, dataSource.Name)
By(fmt.Sprintf("Create new target datavolume %s", targetDV.Name))
targetDataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, targetDV)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDataVolume)
By("Wait for clone DV Succeeded phase")
err = utils.WaitForDataVolumePhase(f, targetDataVolume.Namespace, cdiv1.Succeeded, targetDataVolume.Name)
Expect(err).ToNot(HaveOccurred())
By("Verify MD5")
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(targetDataVolume.Namespace).Get(context.TODO(), targetDataVolume.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
path := utils.DefaultImagePath
volumeMode := pvc.Spec.VolumeMode
if volumeMode != nil && *volumeMode == corev1.PersistentVolumeBlock {
path = utils.DefaultPvcMountPath
}
same, err := f.VerifyTargetPVCContentMD5(f.Namespace, pvc, path, utils.UploadFileMD5, utils.UploadFileSize)
Expect(err).ToNot(HaveOccurred())
Expect(same).To(BeTrue())
})
})
})
func getDataVolumeSourceRegistry(f *framework.Framework) (*cdiv1.DataVolumeSourceRegistry, error) {

View File

@ -184,18 +184,7 @@ var _ = Describe("DataSource", func() {
f.ForceBindIfWaitForFirstConsumer(pvc)
snapClass := f.GetSnapshotClass()
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: pvc.Namespace,
},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &pvc.Name,
},
VolumeSnapshotClassName: &snapClass.Name,
},
}
snapshot := utils.NewVolumeSnapshot(name, pvc.Namespace, pvc.Name, &snapClass.Name)
err = f.CrClient.Create(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())

View File

@ -305,18 +305,7 @@ var _ = Describe("Population tests", func() {
By("Creating Snapshot")
snapshotClassName := getSnapshotClassName()
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "snapshot-" + pvcDef.Name,
Namespace: pvcDef.Namespace,
},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &pvcDef.Name,
},
VolumeSnapshotClassName: &snapshotClassName,
},
}
snapshot := utils.NewVolumeSnapshot("snapshot-"+pvcDef.Name, pvcDef.Namespace, pvcDef.Name, &snapshotClassName)
snapshotAPIGroup := snapshotAPIName
dataSource := &corev1.TypedLocalObjectReference{
APIGroup: &snapshotAPIGroup,
@ -327,11 +316,7 @@ var _ = Describe("Population tests", func() {
Expect(err).ToNot(HaveOccurred())
By("Waiting for Snapshot to be ready to use")
Eventually(func() bool {
err := f.CrClient.Get(context.TODO(), crclient.ObjectKeyFromObject(snapshot), snapshot)
Expect(err).ToNot(HaveOccurred())
return snapshot.Status != nil && snapshot.Status.ReadyToUse != nil && *snapshot.Status.ReadyToUse
}, timeout, pollingInterval).Should(BeTrue())
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
By(fmt.Sprintf("Creating target datavolume %s", dataVolumeName))
// PVC API because some provisioners only allow exact match between source size and restore size

View File

@ -17,6 +17,7 @@ go_library(
"services.go",
"storageprofile.go",
"upload.go",
"volumesnapshot.go",
],
importpath = "kubevirt.io/containerized-data-importer/tests/utils",
visibility = ["//visibility:public"],
@ -31,6 +32,7 @@ go_library(
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/upload/v1beta1:go_default_library",
"//vendor/github.com/klauspost/compress/zstd:go_default_library",
"//vendor/github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",
"//vendor/github.com/onsi/gomega:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",

View File

@ -36,3 +36,31 @@ func NewDataImportCron(name, size, schedule, dataSource string, importsToKeep in
},
}
}
// NewDataImportCronWithStorageSpec initializes a DataImportCron struct with storage defaults-inferring API
func NewDataImportCronWithStorageSpec(name, size, schedule, dataSource string, importsToKeep int32, source cdiv1.DataVolumeSourceRegistry) *cdiv1.DataImportCron {
return &cdiv1.DataImportCron{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: cdiv1.DataImportCronSpec{
Template: cdiv1.DataVolume{
Spec: cdiv1.DataVolumeSpec{
Source: &cdiv1.DataVolumeSource{
Registry: &source,
},
Storage: &cdiv1.StorageSpec{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(size),
},
},
},
},
},
Schedule: schedule,
ManagedDataSource: dataSource,
ImportsToKeep: &importsToKeep,
},
}
}

View File

@ -181,6 +181,30 @@ func NewDataVolumeWithSourceRef(dataVolumeName string, size, sourceRefNamespace,
}
}
// NewDataVolumeWithSourceRefAndStorageAPI initializes a DataVolume struct with DataSource SourceRef and storage defaults-inferring API
func NewDataVolumeWithSourceRefAndStorageAPI(dataVolumeName string, size, sourceRefNamespace, sourceRefName string) *cdiv1.DataVolume {
return &cdiv1.DataVolume{
ObjectMeta: metav1.ObjectMeta{
Name: dataVolumeName,
Annotations: map[string]string{},
},
Spec: cdiv1.DataVolumeSpec{
SourceRef: &cdiv1.DataVolumeSourceRef{
Kind: cdiv1.DataVolumeDataSource,
Namespace: &sourceRefNamespace,
Name: sourceRefName,
},
Storage: &cdiv1.StorageSpec{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: resource.MustParse(size),
},
},
},
},
}
}
// NewPvcDataSource initializes a DataSource struct with PVC source
func NewPvcDataSource(dataSourceName, dataSourceNamespace, pvcName, pvcNamespace string) *cdiv1.DataSource {
return &cdiv1.DataSource{

View File

@ -0,0 +1,43 @@
package utils
import (
"context"
"time"
"github.com/onsi/gomega"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
)
// NewVolumeSnapshot initializes a VolumeSnapshot struct
func NewVolumeSnapshot(name, namespace, sourcePvcName string, snapshotClassName *string) *snapshotv1.VolumeSnapshot {
return &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &sourcePvcName,
},
VolumeSnapshotClassName: snapshotClassName,
},
}
}
// WaitSnapshotReady waits until the snapshot is ready to be used
func WaitSnapshotReady(c client.Client, snapshot *snapshotv1.VolumeSnapshot) *snapshotv1.VolumeSnapshot {
gomega.Eventually(func() bool {
err := c.Get(context.TODO(), client.ObjectKeyFromObject(snapshot), snapshot)
if err != nil {
return false
}
return cc.IsSnapshotReady(snapshot)
}, 4*time.Minute, 2*time.Second).Should(gomega.BeTrue())
return snapshot
}

View File

@ -5,7 +5,6 @@ import (
"fmt"
"time"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
@ -288,18 +287,7 @@ var _ = Describe("Clone Auth Webhook tests", func() {
pvc := f.CreateAndPopulateSourcePVC(srcPVCDef, "fill-source", fmt.Sprintf("echo \"hello world\" > %s/data.txt", utils.DefaultPvcMountPath))
snapClass := f.GetSnapshotClass()
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: "snap-" + pvc.Name,
Namespace: pvc.Namespace,
},
Spec: snapshotv1.VolumeSnapshotSpec{
Source: snapshotv1.VolumeSnapshotSource{
PersistentVolumeClaimName: &pvc.Name,
},
VolumeSnapshotClassName: &snapClass.Name,
},
}
snapshot := utils.NewVolumeSnapshot("snap-"+pvc.Name, pvc.Namespace, pvc.Name, &snapClass.Name)
err = f.CrClient.Create(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
volumeMode := corev1.PersistentVolumeMode(corev1.PersistentVolumeFilesystem)