containerized-data-importer/tests/dataimportcron_test.go
kubevirt-bot da1c19776b
Fix flaky DataImportCron volume snapshot test (#3583)
As commented by Alex Kalenyuk: the condition check flakes because the
switch to snapshot boot sources is almost instant. The DataImportCron
never becomes "not ready" following the switch. We also moved the DIC
"ready" check after the snapshot "ready" check, otherwise we may check
it before it's "not ready", which checks nothing.

Signed-off-by: Arnon Gilboa <agilboa@redhat.com>
Co-authored-by: Arnon Gilboa <agilboa@redhat.com>
2025-01-06 18:52:24 +01:00

817 lines
34 KiB
Go

package tests_test
import (
"context"
"fmt"
"time"
cdiclientset "kubevirt.io/containerized-data-importer/pkg/client/clientset/versioned"
"sigs.k8s.io/controller-runtime/pkg/client"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
logf "sigs.k8s.io/controller-runtime/pkg/log"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/controller"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/tests/framework"
"kubevirt.io/containerized-data-importer/tests/utils"
)
const (
dataImportCronTimeout = 4 * time.Minute
scheduleEveryMinute = "* * * * *"
scheduleOnceAYear = "0 0 1 1 *"
importsToKeep = 1
emptySchedule = ""
errorDigest = "sha256:12345678900987654321"
)
var _ = Describe("DataImportCron", func() {
var (
f = framework.NewFramework("dataimportcron-func-test")
log = logf.Log.WithName("dataimportcron_test")
dataSourceName = "datasource-test"
pollerPodName = "poller"
cronName = "cron-test"
cron *cdiv1.DataImportCron
reg *cdiv1.DataVolumeSourceRegistry
err error
ns string
scName string
originalProfileSpec *cdiv1.StorageProfileSpec
)
BeforeEach(func() {
ns = f.Namespace.Name
reg, err = getDataVolumeSourceRegistry(f)
Expect(err).ToNot(HaveOccurred())
scName = utils.DefaultStorageClass.GetName()
By(fmt.Sprintf("Get original storage profile: %s", scName))
spec, err := utils.GetStorageProfileSpec(f.CdiClient, scName)
Expect(err).ToNot(HaveOccurred())
originalProfileSpec = spec
})
AfterEach(func() {
if err = utils.RemoveInsecureRegistry(f.CrClient, *reg.URL); err != nil {
fmt.Fprintf(GinkgoWriter, "failed to remove registry; %v", err)
}
err = utils.DeletePodByName(f.K8sClient, pollerPodName, f.CdiInstallNs, nil)
Expect(err).ToNot(HaveOccurred())
By("[AfterEach] Restore the profile")
Expect(utils.UpdateStorageProfile(f.CrClient, scName, *originalProfileSpec)).Should(Succeed())
})
updateDigest := func(digest string) func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
return func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
cc.AddAnnotation(cron, controller.AnnSourceDesiredDigest, digest)
return cron
}
}
waitForDigest := func() {
Eventually(func() string {
cron, err := f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return cron.Annotations[controller.AnnSourceDesiredDigest]
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeEmpty(), "Desired digest is empty")
}
waitForConditions := func(statusProgressing, statusUpToDate corev1.ConditionStatus) {
By(fmt.Sprintf("Wait for DataImportCron Progressing:%s, UpToDate:%s", statusProgressing, statusUpToDate))
Eventually(func() bool {
var err error
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
return condProgressing != nil && condProgressing.Status == statusProgressing &&
condUpToDate != nil && condUpToDate.Status == statusUpToDate
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
}
configureStorageProfileResultingFormat := func(format cdiv1.DataImportCronSourceFormat) {
By(fmt.Sprintf("configure storage profile %s", scName))
newProfileSpec := originalProfileSpec.DeepCopy()
newProfileSpec.DataImportCronSourceFormat = &format
err := utils.UpdateStorageProfile(f.CrClient, scName, *newProfileSpec)
Expect(err).ToNot(HaveOccurred())
Eventually(func() *cdiv1.DataImportCronSourceFormat {
profile, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), scName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return profile.Status.DataImportCronSourceFormat
}, 15*time.Second, time.Second).Should(Equal(&format))
}
verifySourceReady := func(format cdiv1.DataImportCronSourceFormat, name string) metav1.Object {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By(fmt.Sprintf("Verify pvc was created %s", name))
pvc, err := utils.WaitForPVC(f.K8sClient, ns, name)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
return pvc
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
}
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
deleted, err := utils.WaitPVCDeleted(f.K8sClient, name, ns, 30*time.Second)
if err != nil {
// work around https://github.com/kubernetes-csi/external-snapshotter/issues/957
// it does converge after the resync period of snapshot controller (15mins)
cc.AddAnnotation(snapshot, "workaround", "triggersync")
err = f.CrClient.Update(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
// try again
deleted, err = utils.WaitPVCDeleted(f.K8sClient, name, ns, 30*time.Second)
}
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
// check pvc is not recreated
Consistently(func() error {
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
return err
}, 5*time.Second, 1*time.Second).Should(
SatisfyAll(HaveOccurred(), WithTransform(errors.IsNotFound, BeTrue())),
"PVC should not have been recreated",
)
return snapshot
}
return nil
}
deleteSource := func(format cdiv1.DataImportCronSourceFormat, name string) {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
utils.CleanupDvPvcNoWait(f.K8sClient, f.CdiClient, f.Namespace.Name, name)
deleted, err := f.WaitPVCDeletedByUID(pvc, time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
}
// Probably good to ensure deletion by UID here too but re-import is long enough
// to not cause errors
Eventually(func() bool {
err := f.CrClient.Delete(context.TODO(), snapshot)
return err != nil && errors.IsNotFound(err)
}, time.Minute, time.Second).Should(BeTrue())
}
}
getDataSourceName := func(format cdiv1.DataImportCronSourceFormat, ds *cdiv1.DataSource) string {
var sourceName string
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
sourceName = ds.Spec.Source.PVC.Name
case cdiv1.DataImportCronSourceFormatSnapshot:
sourceName = ds.Spec.Source.Snapshot.Name
}
return sourceName
}
verifyRetention := func(format cdiv1.DataImportCronSourceFormat, name string) {
By("Verify DataSource retention")
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
Consistently(func() *metav1.Time {
src := verifySourceReady(format, name)
return src.GetDeletionTimestamp()
}, 5*time.Second, time.Second).Should(BeNil())
}
verifyDeletion := func(format cdiv1.DataImportCronSourceFormat) {
By("Verify DataSource deletion")
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not deleted")
By("Verify sources deleted")
Eventually(func(g Gomega) bool {
pvcs, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
g.Expect(err).ToNot(HaveOccurred())
return len(pvcs.Items) == 0
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "PVCs were not deleted")
if format == cdiv1.DataImportCronSourceFormatSnapshot {
snapshots := &snapshotv1.VolumeSnapshotList{}
Eventually(func(g Gomega) bool {
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
g.Expect(err).ToNot(HaveOccurred())
return len(snapshots.Items) == 0
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "snapshots were not deleted")
}
}
table.DescribeTable("should", func(retention, createErrorDv bool, repeat int, format cdiv1.DataImportCronSourceFormat) {
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
configureStorageProfileResultingFormat(format)
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
garbageCollect := cdiv1.DataImportCronGarbageCollectNever
cron.Spec.GarbageCollect = &garbageCollect
if !retention {
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
}
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify cronjob was created")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
if errors.IsNotFound(err) {
return false
}
Expect(err).ToNot(HaveOccurred())
return true
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not created")
var lastImportDv, currentImportDv string
for i := 0; i < repeat; i++ {
By(fmt.Sprintf("Iter #%d", i))
if i > 0 {
if createErrorDv {
By("Set desired digest to nonexisting one")
//get and update!!!
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(errorDigest))).Should(BeNil())
By("Wait for CurrentImports update")
Eventually(func() string {
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
currentImportDv = cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
return currentImportDv
}, dataImportCronTimeout, pollingInterval).ShouldNot(Equal(lastImportDv), "Current import was not updated")
lastImportDv = currentImportDv
} else {
By("Reset desired digest")
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(""))).Should(BeNil())
By(fmt.Sprintf("Delete last import %s, format: %s", currentImportDv, format))
deleteSource(format, currentImportDv)
lastImportDv = ""
By("Wait for non-empty desired digest")
waitForDigest()
}
}
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv = cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
Expect(currentImportDv).ToNot(Equal(lastImportDv))
lastImportDv = currentImportDv
currentSource := verifySourceReady(format, currentImportDv)
By("Verify DataSource was updated")
var dataSource *cdiv1.DataSource
Eventually(func() bool {
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
if errors.IsNotFound(err) {
return false
}
Expect(err).ToNot(HaveOccurred())
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
return readyCond != nil && readyCond.Status == corev1.ConditionTrue &&
getDataSourceName(format, dataSource) == currentImportDv
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not updated")
By("Verify cron was updated")
Expect(cron.Status.LastImportedPVC).ToNot(BeNil())
Expect(cron.Status.LastImportedPVC.Name).To(Equal(currentImportDv))
By("Update DataSource to refer to a dummy name")
retryOnceOnErr(
updateDataSource(f.CdiClient, ns, cron.Spec.ManagedDataSource,
func(dataSource *cdiv1.DataSource) *cdiv1.DataSource {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
dataSource.Spec.Source.PVC.Name = "dummy"
case cdiv1.DataImportCronSourceFormatSnapshot:
dataSource.Spec.Source.Snapshot.Name = "dummy"
}
return dataSource
},
)).Should(BeNil())
By("Verify name on DataSource was reconciled")
Eventually(func() bool {
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return getDataSourceName(format, dataSource) == currentImportDv
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource name was not reconciled")
By("Delete DataSource")
err = f.CdiClient.CdiV1beta1().DataSources(ns).Delete(context.TODO(), dataSourceName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify DataSource was re-created")
Eventually(func() bool {
ds, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
return err == nil && ds.UID != dataSource.UID
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not re-created")
By("Delete last imported source")
deleteSource(format, currentSource.GetName())
By("Verify last imported source was re-created")
recreatedSource := verifySourceReady(format, currentSource.GetName())
Expect(recreatedSource.GetUID()).ToNot(Equal(currentSource.GetUID()), "Last imported source was not re-created")
}
lastImportedPVC := cron.Status.LastImportedPVC
By("Delete cron")
err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
if retention {
verifyRetention(format, lastImportedPVC.Name)
} else {
verifyDeletion(format)
}
},
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatPvc),
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatPvc),
table.Entry("[test_id:10031] succeed importing initially into a snapshot from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatSnapshot),
table.Entry("[test_id:10032] succeed importing to a snapshot from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatSnapshot),
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2, cdiv1.DataImportCronSourceFormatPvc),
)
It("[test_id:10040] Should get digest updated by external poller", func() {
By("Create DataImportCron with only initial poller job")
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err := f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Wait for initial digest")
waitForDigest()
By("Set empty digest")
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cron.Name, updateDigest(""))).Should(BeNil())
By("Create poller pod to update the DataImportCron digest")
importerImage := f.GetEnvVarValue("IMPORTER_IMAGE")
Expect(importerImage).ToNot(BeEmpty())
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pollerPodName}}
err = controller.InitPollerPodSpec(f.CrClient, cron, &pod.Spec, importerImage, corev1.PullIfNotPresent, log)
Expect(err).ToNot(HaveOccurred())
_, err = utils.CreatePod(f.K8sClient, f.CdiInstallNs, pod)
Expect(err).ToNot(HaveOccurred())
By("Wait for digest set by external poller")
waitForDigest()
})
It("[test_id:XXXX] Should allow an empty schedule to trigger an external update to the source", func() {
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
By("Create DataImportCron with empty schedule")
cron = utils.NewDataImportCron(cronName, "5Gi", emptySchedule, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Create poller pod to update the DataImportCron digest")
importerImage := f.GetEnvVarValue("IMPORTER_IMAGE")
Expect(importerImage).ToNot(BeEmpty())
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pollerPodName}}
err = controller.InitPollerPodSpec(f.CrClient, cron, &pod.Spec, importerImage, corev1.PullIfNotPresent, log)
Expect(err).ToNot(HaveOccurred())
_, err = utils.CreatePod(f.K8sClient, f.CdiInstallNs, pod)
Expect(err).ToNot(HaveOccurred())
By("Wait for digest set by external poller")
waitForDigest()
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
_, err = utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
By("Verify cronjob was not created")
_, err = f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
Expect(errors.IsNotFound(err)).To(BeTrue())
})
table.DescribeTable("Succeed garbage collecting sources when importing new ones", func(format cdiv1.DataImportCronSourceFormat) {
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
const oldDvName = "old-version-dv"
configureStorageProfileResultingFormat(format)
garbageSources := 3
for i := 0; i < garbageSources; i++ {
srcName := fmt.Sprintf("src-garbage-%d", i)
By(fmt.Sprintf("Create %s", srcName))
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvc := utils.NewPVCDefinition(srcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
case cdiv1.DataImportCronSourceFormatSnapshot:
pvc := utils.NewPVCDefinition(srcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
snapClass := f.GetSnapshotClass()
snapshot := utils.NewVolumeSnapshot(srcName, ns, pvc.Name, &snapClass.Name)
snapshot.SetAnnotations(map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)})
snapshot.SetLabels(map[string]string{common.DataImportCronLabel: cronName})
err = f.CrClient.Create(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
utils.WaitSnapshotReady(f.CrClient, snapshot)
err = f.DeletePVC(pvc)
Expect(err).ToNot(HaveOccurred())
deleted, err := utils.WaitPVCDeleted(f.K8sClient, srcName, ns, 2*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
}
}
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By(fmt.Sprintf("Create labeled DataVolume %s for old DVs garbage collection test", oldDvName))
dv := utils.NewDataVolumeWithRegistryImport(oldDvName, "5Gi", "")
dv.Spec.Source.Registry = reg
dv.Labels = map[string]string{common.DataImportCronLabel: cronName}
cc.AddAnnotation(dv, cc.AnnDeleteAfterCompletion, "false")
dv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, dv)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
f.ForceBindPvcIfDvIsWaitForFirstConsumer(dv)
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, dv.Name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
By(fmt.Sprintf("Verify PVC was created %s", dv.Name))
pvc, err := utils.WaitForPVC(f.K8sClient, ns, dv.Name)
Expect(err).ToNot(HaveOccurred())
By(fmt.Sprintf("Verify DataImportCronLabel is passed to the PVC: %s", pvc.Labels[common.DataImportCronLabel]))
Expect(pvc.Labels[common.DataImportCronLabel]).To(Equal(cronName))
pvc.Labels[common.DataImportCronLabel] = ""
By("Update DataImportCron label to be empty in the PVC")
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), pvc, metav1.UpdateOptions{})
Expect(err).ToNot(HaveOccurred())
Eventually(func() []corev1.PersistentVolumeClaim {
pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return pvcList.Items
}, dataImportCronTimeout, pollingInterval).Should(HaveLen(garbageSources + 1))
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshots := &snapshotv1.VolumeSnapshotList{}
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expect(err).ToNot(HaveOccurred())
Expect(snapshots.Items).To(HaveLen(garbageSources))
}
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "1Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
currentSource := verifySourceReady(format, currentImportDv)
By("Check garbage collection")
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By("Check old DV garbage collection")
Eventually(func() error {
_, err := f.CdiClient.CdiV1beta1().DataVolumes(ns).Get(context.TODO(), oldDvName, metav1.GetOptions{})
return err
}, dataImportCronTimeout, pollingInterval).Should(Satisfy(errors.IsNotFound), "Garbage collection failed cleaning old DV")
pvcList := &corev1.PersistentVolumeClaimList{}
Eventually(func() int {
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcList.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
By("Check last import PVC is timestamped and not garbage collected")
found := false
for _, pvc := range pvcList.Items {
if pvc.UID == currentSource.GetUID() {
lastUse := pvc.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
found = true
break
}
}
Expect(found).To(BeTrue())
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshots := &snapshotv1.VolumeSnapshotList{}
Eventually(func(g Gomega) int {
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
g.Expect(err).ToNot(HaveOccurred())
return len(snapshots.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
By("Check last import snapshot is timestamped and not garbage collected")
found := false
for _, snap := range snapshots.Items {
if snap.UID == currentSource.GetUID() {
lastUse := snap.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
found = true
break
}
}
Expect(found).To(BeTrue())
}
},
table.Entry("[test_id:7406] with PVC sources", cdiv1.DataImportCronSourceFormatPvc),
table.Entry("[test_id:10033] with snapshot sources", cdiv1.DataImportCronSourceFormatSnapshot),
)
It("[test_id:8033] should delete jobs on deletion", func() {
noSuchCM := "nosuch"
reg.CertConfigMap = &noSuchCM
cron = utils.NewDataImportCron("cron-test", "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
By("Create new DataImportCron")
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify initial job created")
initialJobName := controller.GetInitialJobName(cron)
Eventually(func() *batchv1.Job {
job, _ := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), initialJobName, metav1.GetOptions{})
return job
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "initial job was not created")
By("Verify initial job pod created")
Eventually(func() *corev1.Pod {
pod, _ := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, initialJobName, "")
return pod
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "initial job pod was not created")
By("Verify cronjob created and has active job")
cronJobName := controller.GetCronJobName(cron)
jobName := ""
Eventually(func() string {
cronjob, _ := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), cronJobName, metav1.GetOptions{})
if cronjob != nil && len(cronjob.Status.Active) > 0 {
jobName = cronjob.Status.Active[0].Name
}
return jobName
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeEmpty(), "cronjob has no active job")
By("Verify cronjob first job created")
Eventually(func() *batchv1.Job {
job, _ := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), jobName, metav1.GetOptions{})
return job
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "cronjob first job was not created")
By("Verify cronjob first job pod created")
Eventually(func() *corev1.Pod {
pod, _ := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, jobName, "")
return pod
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "cronjob first job pod was not created")
By("Delete cron")
err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify cronjob deleted")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), cronJobName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not deleted")
By("Verify initial job deleted")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), initialJobName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "initial job was not deleted")
By("Verify initial job pod deleted")
Eventually(func() bool {
_, err := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, initialJobName, "")
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "initial job pod was not deleted")
By("Verify cronjob first job deleted")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), jobName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob first job was not deleted")
By("Verify cronjob first job pod deleted")
Eventually(func() bool {
_, err := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, jobName, "")
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob first job pod was not deleted")
})
Context("Change source format of existing DataImportCron", func() {
It("[test_id:10034] Should switch from PVC to snapshot sources once format changes", func() {
if !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
size := "1Gi"
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCronWithStorageSpec(cronName, size, scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
_ = verifySourceReady(cdiv1.DataImportCronSourceFormatPvc, currentImportDv)
snapshots := &snapshotv1.VolumeSnapshotList{}
err = f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expect(err).ToNot(HaveOccurred())
Expect(snapshots.Items).To(BeEmpty())
// Ensure existing PVC clones from this source don't mess up future ones
cloneDV := utils.NewDataVolumeForImageCloningAndStorageSpec("target-dv-from-pvc", size, ns, currentImportDv, nil, nil)
cloneDV, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, cloneDV)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(cloneDV)
err = utils.WaitForDataVolumePhase(f, cloneDV.Namespace, cdiv1.Succeeded, cloneDV.Name)
// Now simulate an upgrade, where a new CDI version has identified
// more storage types that scale better with snapshots
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatSnapshot)
// Check snapshot now exists and PVC is gone
currentSource := verifySourceReady(cdiv1.DataImportCronSourceFormatSnapshot, currentImportDv)
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
// DataSource is updated to point to a snapshot
dataSource, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
Expect(readyCond.Status).To(Equal(corev1.ConditionTrue))
expectedSource := cdiv1.DataSourceSource{
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
Name: currentSource.GetName(),
Namespace: currentSource.GetNamespace(),
},
}
Expect(dataSource.Spec.Source).To(Equal(expectedSource))
// Verify content
targetDV := utils.NewDataVolumeWithSourceRefAndStorageAPI("target-dv-from-snap", &size, dataSource.Namespace, dataSource.Name)
By(fmt.Sprintf("Create new target datavolume %s", targetDV.Name))
targetDataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, targetDV)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDataVolume)
By("Wait for clone DV Succeeded phase")
err = utils.WaitForDataVolumePhase(f, targetDataVolume.Namespace, cdiv1.Succeeded, targetDataVolume.Name)
Expect(err).ToNot(HaveOccurred())
By("Verify MD5")
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(targetDataVolume.Namespace).Get(context.TODO(), targetDataVolume.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
path := utils.DefaultImagePath
volumeMode := pvc.Spec.VolumeMode
if volumeMode != nil && *volumeMode == corev1.PersistentVolumeBlock {
path = utils.DefaultPvcMountPath
}
same, err := f.VerifyTargetPVCContentMD5(f.Namespace, pvc, path, utils.UploadFileMD5, utils.UploadFileSize)
Expect(err).ToNot(HaveOccurred())
Expect(same).To(BeTrue())
})
})
})
func getDataVolumeSourceRegistry(f *framework.Framework) (*cdiv1.DataVolumeSourceRegistry, error) {
reg := &cdiv1.DataVolumeSourceRegistry{}
var (
pullMethod cdiv1.RegistryPullMethod
url string
)
if utils.IsOpenshift(f.K8sClient) {
url = fmt.Sprintf(utils.TinyCoreIsoRegistryURL, f.CdiInstallNs)
pullMethod = cdiv1.RegistryPullPod
} else {
url = fmt.Sprintf(utils.TrustedRegistryURL, f.DockerPrefix)
pullMethod = cdiv1.RegistryPullNode
}
reg.URL = &url
reg.PullMethod = &pullMethod
if err := utils.AddInsecureRegistry(f.CrClient, url); err != nil {
return nil, err
}
return reg, nil
}
func updateDataImportCron(clientSet *cdiclientset.Clientset, namespace string, cronName string,
update func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron) func() error {
return func() error {
cron, err := clientSet.CdiV1beta1().DataImportCrons(namespace).Get(context.TODO(), cronName, metav1.GetOptions{})
if err != nil {
return err
}
cron = update(cron)
_, err = clientSet.CdiV1beta1().DataImportCrons(namespace).Update(context.TODO(), cron, metav1.UpdateOptions{})
return err
}
}
func updateDataSource(clientSet *cdiclientset.Clientset, namespace string, dataSourceName string,
update func(dataSource *cdiv1.DataSource) *cdiv1.DataSource) func() error {
return func() error {
dataSource, err := clientSet.CdiV1beta1().DataSources(namespace).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
if err != nil {
return err
}
dataSource = update(dataSource)
_, err = clientSet.CdiV1beta1().DataSources(namespace).Update(context.TODO(), dataSource, metav1.UpdateOptions{})
return err
}
}
func retryOnceOnErr(f func() error) Assertion {
err := f()
if err != nil {
err = f()
}
return Expect(err)
}