containerized-data-importer/tests/dataimportcron_test.go
Alex Kalenyuk f4ddcc1748
Revert "Clean up leftover snapshot sources from DataImportCron tests" (#3667)
This reverts commit 21be57add8.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
2025-03-12 03:24:02 +01:00

856 lines
37 KiB
Go

package tests_test
import (
"context"
"fmt"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
cdiclientset "kubevirt.io/containerized-data-importer/pkg/client/clientset/versioned"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/controller"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/tests/framework"
"kubevirt.io/containerized-data-importer/tests/utils"
)
const (
dataImportCronTimeout = 4 * time.Minute
dataImportCronConvergeTimeout = 10 * time.Minute
scheduleEveryMinute = "* * * * *"
scheduleOnceAYear = "0 0 1 1 *"
importsToKeep = 1
emptySchedule = ""
testKubevirtIoKey = "test.kubevirt.io/test"
testKubevirtIoValue = "testvalue"
// Digest must be 64 characters long
errorDigest = "sha256:1234567890123456789012345678901234567890123456789012345678901234"
)
var _ = Describe("DataImportCron", Serial, func() {
var (
f = framework.NewFramework("dataimportcron-func-test")
log = logf.Log.WithName("dataimportcron_test")
dataSourceName = "datasource-test"
pollerPodName = "poller"
cronName = "cron-test"
cron *cdiv1.DataImportCron
reg *cdiv1.DataVolumeSourceRegistry
err error
ns string
scName string
originalProfileSpec *cdiv1.StorageProfileSpec
)
BeforeEach(func() {
ns = f.Namespace.Name
reg, err = getDataVolumeSourceRegistry(f)
Expect(err).ToNot(HaveOccurred())
scName = utils.DefaultStorageClass.GetName()
By(fmt.Sprintf("Get original storage profile: %s", scName))
spec, err := utils.GetStorageProfileSpec(f.CdiClient, scName)
Expect(err).ToNot(HaveOccurred())
originalProfileSpec = spec
})
AfterEach(func() {
if err = utils.RemoveInsecureRegistry(f.CrClient, *reg.URL); err != nil {
fmt.Fprintf(GinkgoWriter, "failed to remove registry; %v", err)
}
err = utils.DeletePodByName(f.K8sClient, pollerPodName, f.CdiInstallNs, nil)
Expect(err).ToNot(HaveOccurred())
By("[AfterEach] Restore the profile")
Expect(utils.UpdateStorageProfile(f.CrClient, scName, *originalProfileSpec)).Should(Succeed())
By("[AfterEach] Delete the DataImportCron under test")
// Delete the DataImportCron under test
_ = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataImportCron was not deleted")
By("[AfterEach] Wait for all DataImportCrons UpToDate")
// Wait for all DataImportCrons to converge
dataImportCrons := &cdiv1.DataImportCronList{}
err = f.CrClient.List(context.TODO(), dataImportCrons, &client.ListOptions{Namespace: metav1.NamespaceAll})
Expect(err).ToNot(HaveOccurred())
for _, cronItem := range dataImportCrons.Items {
Eventually(func() bool {
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(cronItem.Namespace).Get(context.TODO(), cronItem.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
return condProgressing != nil && condProgressing.Status == corev1.ConditionFalse &&
condUpToDate != nil && condUpToDate.Status == corev1.ConditionTrue
}, dataImportCronConvergeTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions %q", cronItem.Namespace+"/"+cronItem.Name)
}
})
updateDigest := func(digest string) func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
return func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
cc.AddAnnotation(cron, controller.AnnSourceDesiredDigest, digest)
return cron
}
}
waitForDigest := func() {
Eventually(func() string {
cron, err := f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return cron.Annotations[controller.AnnSourceDesiredDigest]
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeEmpty(), "Desired digest is empty")
}
waitForConditions := func(statusProgressing, statusUpToDate corev1.ConditionStatus) {
By(fmt.Sprintf("Wait for DataImportCron Progressing:%s, UpToDate:%s", statusProgressing, statusUpToDate))
Eventually(func() bool {
var err error
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
return condProgressing != nil && condProgressing.Status == statusProgressing &&
condUpToDate != nil && condUpToDate.Status == statusUpToDate
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
}
configureStorageProfileResultingFormat := func(format cdiv1.DataImportCronSourceFormat) {
By(fmt.Sprintf("configure storage profile %s", scName))
newProfileSpec := originalProfileSpec.DeepCopy()
newProfileSpec.DataImportCronSourceFormat = &format
err := utils.UpdateStorageProfile(f.CrClient, scName, *newProfileSpec)
Expect(err).ToNot(HaveOccurred())
Eventually(func() *cdiv1.DataImportCronSourceFormat {
profile, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), scName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return profile.Status.DataImportCronSourceFormat
}, 15*time.Second, time.Second).Should(Equal(&format))
}
verifySourceReady := func(format cdiv1.DataImportCronSourceFormat, name string) metav1.Object {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By(fmt.Sprintf("Verify pvc was created %s", name))
pvc, err := utils.WaitForPVC(f.K8sClient, ns, name)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
return pvc
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
}
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
Expect(snapshot.Labels).To(HaveKeyWithValue(testKubevirtIoKey, testKubevirtIoValue))
deleted, err := utils.WaitPVCDeleted(f.K8sClient, name, ns, 30*time.Second)
if err != nil {
// work around https://github.com/kubernetes-csi/external-snapshotter/issues/957
// it does converge after the resync period of snapshot controller (15mins)
cc.AddAnnotation(snapshot, "workaround", "triggersync")
err = f.CrClient.Update(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
// try again
deleted, err = utils.WaitPVCDeleted(f.K8sClient, name, ns, 30*time.Second)
}
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
// check pvc is not recreated
Consistently(func() error {
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
return err
}, 5*time.Second, 1*time.Second).Should(
SatisfyAll(HaveOccurred(), WithTransform(errors.IsNotFound, BeTrue())),
"PVC should not have been recreated",
)
return snapshot
}
return nil
}
deleteSource := func(format cdiv1.DataImportCronSourceFormat, name string) {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
utils.CleanupDvPvcNoWait(f.K8sClient, f.CdiClient, f.Namespace.Name, name)
deleted, err := f.WaitPVCDeletedByUID(pvc, time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshot := &snapshotv1.VolumeSnapshot{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
}
// Probably good to ensure deletion by UID here too but re-import is long enough
// to not cause errors
Eventually(func() bool {
err := f.CrClient.Delete(context.TODO(), snapshot)
return err != nil && errors.IsNotFound(err)
}, time.Minute, time.Second).Should(BeTrue())
}
}
getDataSourceName := func(format cdiv1.DataImportCronSourceFormat, ds *cdiv1.DataSource) string {
var sourceName string
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
sourceName = ds.Spec.Source.PVC.Name
case cdiv1.DataImportCronSourceFormatSnapshot:
sourceName = ds.Spec.Source.Snapshot.Name
}
return sourceName
}
verifyRetention := func(format cdiv1.DataImportCronSourceFormat, name string) {
By("Verify DataSource retention")
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
Consistently(func() *metav1.Time {
src := verifySourceReady(format, name)
return src.GetDeletionTimestamp()
}, 5*time.Second, time.Second).Should(BeNil())
}
verifyDeletion := func(format cdiv1.DataImportCronSourceFormat) {
By("Verify DataSource deletion")
Eventually(func() bool {
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not deleted")
By("Verify sources deleted")
Eventually(func(g Gomega) bool {
pvcs, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
g.Expect(err).ToNot(HaveOccurred())
return len(pvcs.Items) == 0
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "PVCs were not deleted")
if format == cdiv1.DataImportCronSourceFormatSnapshot {
snapshots := &snapshotv1.VolumeSnapshotList{}
Eventually(func(g Gomega) bool {
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
g.Expect(err).ToNot(HaveOccurred())
return len(snapshots.Items) == 0
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "snapshots were not deleted")
}
}
DescribeTable("should", func(retention, createErrorDv bool, repeat int, format cdiv1.DataImportCronSourceFormat) {
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
configureStorageProfileResultingFormat(format)
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
garbageCollect := cdiv1.DataImportCronGarbageCollectNever
cron.Spec.GarbageCollect = &garbageCollect
if !retention {
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
}
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify cronjob was created")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
if errors.IsNotFound(err) {
return false
}
Expect(err).ToNot(HaveOccurred())
return true
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not created")
var lastImportDv, currentImportDv string
for i := 0; i < repeat; i++ {
By(fmt.Sprintf("Iter #%d", i))
if i > 0 {
if createErrorDv {
By("Set desired digest to nonexisting one")
//get and update!!!
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(errorDigest))).Should(BeNil())
By("Wait for CurrentImports update")
Eventually(func() string {
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
currentImportDv = cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
return currentImportDv
}, dataImportCronTimeout, pollingInterval).ShouldNot(Equal(lastImportDv), "Current import was not updated")
lastImportDv = currentImportDv
} else {
By("Reset desired digest")
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(""))).Should(BeNil())
By(fmt.Sprintf("Delete last import %s, format: %s", currentImportDv, format))
deleteSource(format, currentImportDv)
lastImportDv = ""
By("Wait for non-empty desired digest")
waitForDigest()
}
}
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv = cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
Expect(currentImportDv).ToNot(Equal(lastImportDv))
lastImportDv = currentImportDv
currentSource := verifySourceReady(format, currentImportDv)
By("Verify DataSource was updated")
var dataSource *cdiv1.DataSource
Eventually(func(g Gomega) {
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
g.Expect(err).ToNot(HaveOccurred())
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
g.Expect(readyCond).ToNot(BeNil())
g.Expect(readyCond.Status).To(Equal(corev1.ConditionTrue))
g.Expect(getDataSourceName(format, dataSource)).To(Equal(currentImportDv))
g.Expect(dataSource.Labels).To(HaveKeyWithValue(testKubevirtIoKey, testKubevirtIoValue))
}, dataImportCronTimeout, pollingInterval).Should(Succeed(), "DataSource was not updated")
By("Verify cron was updated")
Expect(cron.Status.LastImportedPVC).ToNot(BeNil())
Expect(cron.Status.LastImportedPVC.Name).To(Equal(currentImportDv))
By("Update DataSource to refer to a dummy name")
retryOnceOnErr(
updateDataSource(f.CdiClient, ns, cron.Spec.ManagedDataSource,
func(dataSource *cdiv1.DataSource) *cdiv1.DataSource {
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
dataSource.Spec.Source.PVC.Name = "dummy"
case cdiv1.DataImportCronSourceFormatSnapshot:
dataSource.Spec.Source.Snapshot.Name = "dummy"
}
return dataSource
},
)).Should(BeNil())
By("Verify name on DataSource was reconciled")
Eventually(func() bool {
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
return getDataSourceName(format, dataSource) == currentImportDv
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource name was not reconciled")
By("Delete DataSource")
err = f.CdiClient.CdiV1beta1().DataSources(ns).Delete(context.TODO(), dataSourceName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify DataSource was re-created")
Eventually(func() bool {
ds, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
return err == nil && ds.UID != dataSource.UID
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not re-created")
By("Delete last imported source")
deleteSource(format, currentSource.GetName())
By("Verify last imported source was re-created")
recreatedSource := verifySourceReady(format, currentSource.GetName())
Expect(recreatedSource.GetUID()).ToNot(Equal(currentSource.GetUID()), "Last imported source was not re-created")
}
lastImportedPVC := cron.Status.LastImportedPVC
By("Delete cron")
err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
if retention {
verifyRetention(format, lastImportedPVC.Name)
} else {
verifyDeletion(format)
}
},
Entry("[test_id:7403] succeed importing initial PVC from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatPvc),
Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatPvc),
Entry("[test_id:10031] succeed importing initially into a snapshot from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatSnapshot),
Entry("[test_id:10032] succeed importing to a snapshot from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatSnapshot),
Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2, cdiv1.DataImportCronSourceFormatPvc),
)
It("[test_id:10040] Should get digest updated by external poller", func() {
By("Create DataImportCron with only initial poller job")
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err := f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Wait for initial digest")
waitForDigest()
By("Set empty digest")
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cron.Name, updateDigest(""))).Should(BeNil())
By("Create poller pod to update the DataImportCron digest")
importerImage := f.GetEnvVarValue("IMPORTER_IMAGE")
Expect(importerImage).ToNot(BeEmpty())
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pollerPodName}}
err = controller.InitPollerPodSpec(f.CrClient, cron, &pod.Spec, importerImage, corev1.PullIfNotPresent, log)
Expect(err).ToNot(HaveOccurred())
_, err = utils.CreatePod(f.K8sClient, f.CdiInstallNs, pod)
Expect(err).ToNot(HaveOccurred())
By("Wait for digest set by external poller")
waitForDigest()
})
It("[test_id:10360] Should allow an empty schedule to trigger an external update to the source", func() {
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
By("Create DataImportCron with empty schedule")
cron = utils.NewDataImportCron(cronName, "5Gi", emptySchedule, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Create poller pod to update the DataImportCron digest")
importerImage := f.GetEnvVarValue("IMPORTER_IMAGE")
Expect(importerImage).ToNot(BeEmpty())
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pollerPodName}}
err = controller.InitPollerPodSpec(f.CrClient, cron, &pod.Spec, importerImage, corev1.PullIfNotPresent, log)
Expect(err).ToNot(HaveOccurred())
_, err = utils.CreatePod(f.K8sClient, f.CdiInstallNs, pod)
Expect(err).ToNot(HaveOccurred())
By("Wait for digest set by external poller")
waitForDigest()
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
_, err = utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
By("Verify cronjob was not created")
_, err = f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
Expect(errors.IsNotFound(err)).To(BeTrue())
})
DescribeTable("Succeed garbage collecting sources when importing new ones", func(format cdiv1.DataImportCronSourceFormat) {
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
const oldDvName = "old-version-dv"
configureStorageProfileResultingFormat(format)
garbageSources := 3
for i := 0; i < garbageSources; i++ {
srcName := fmt.Sprintf("src-garbage-%d", i)
By(fmt.Sprintf("Create %s", srcName))
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
pvc := utils.NewPVCDefinition(srcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
case cdiv1.DataImportCronSourceFormatSnapshot:
pvc := utils.NewPVCDefinition(srcName, "1Gi",
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
map[string]string{common.DataImportCronLabel: cronName})
f.CreateBoundPVCFromDefinition(pvc)
snapClass := f.GetSnapshotClass()
snapshot := utils.NewVolumeSnapshot(srcName, ns, pvc.Name, &snapClass.Name)
snapshot.SetAnnotations(map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)})
snapshot.SetLabels(map[string]string{common.DataImportCronLabel: cronName})
err = f.CrClient.Create(context.TODO(), snapshot)
Expect(err).ToNot(HaveOccurred())
utils.WaitSnapshotReady(f.CrClient, snapshot)
err = f.DeletePVC(pvc)
Expect(err).ToNot(HaveOccurred())
deleted, err := utils.WaitPVCDeleted(f.K8sClient, srcName, ns, 2*time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(deleted).To(BeTrue())
}
}
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By(fmt.Sprintf("Create labeled DataVolume %s for old DVs garbage collection test", oldDvName))
dv := utils.NewDataVolumeWithRegistryImport(oldDvName, "5Gi", "")
dv.Spec.Source.Registry = reg
dv.Labels = map[string]string{common.DataImportCronLabel: cronName}
dv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, dv)
Expect(err).ToNot(HaveOccurred())
By("Wait for import completion")
f.ForceBindPvcIfDvIsWaitForFirstConsumer(dv)
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, dv.Name)
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
By(fmt.Sprintf("Verify PVC was created %s", dv.Name))
pvc, err := utils.WaitForPVC(f.K8sClient, ns, dv.Name)
Expect(err).ToNot(HaveOccurred())
By(fmt.Sprintf("Verify DataImportCronLabel is passed to the PVC: %s", pvc.Labels[common.DataImportCronLabel]))
Expect(pvc.Labels[common.DataImportCronLabel]).To(Equal(cronName))
pvc.Labels[common.DataImportCronLabel] = ""
By("Update DataImportCron label to be empty in the PVC")
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), pvc, metav1.UpdateOptions{})
Expect(err).ToNot(HaveOccurred())
Eventually(func() []corev1.PersistentVolumeClaim {
pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return pvcList.Items
}, dataImportCronTimeout, pollingInterval).Should(HaveLen(garbageSources + 1))
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshots := &snapshotv1.VolumeSnapshotList{}
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expect(err).ToNot(HaveOccurred())
Expect(snapshots.Items).To(HaveLen(garbageSources))
}
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCron(cronName, "1Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
currentSource := verifySourceReady(format, currentImportDv)
By("Check garbage collection")
switch format {
case cdiv1.DataImportCronSourceFormatPvc:
By("Check old DV garbage collection")
Eventually(func() error {
_, err := f.CdiClient.CdiV1beta1().DataVolumes(ns).Get(context.TODO(), oldDvName, metav1.GetOptions{})
return err
}, dataImportCronTimeout, pollingInterval).Should(Satisfy(errors.IsNotFound), "Garbage collection failed cleaning old DV")
pvcList := &corev1.PersistentVolumeClaimList{}
Eventually(func() int {
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
Expect(err).ToNot(HaveOccurred())
return len(pvcList.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
By("Check last import PVC is timestamped and not garbage collected")
found := false
for _, pvc := range pvcList.Items {
if pvc.UID == currentSource.GetUID() {
lastUse := pvc.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
found = true
break
}
}
Expect(found).To(BeTrue())
case cdiv1.DataImportCronSourceFormatSnapshot:
snapshots := &snapshotv1.VolumeSnapshotList{}
Eventually(func(g Gomega) int {
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
g.Expect(err).ToNot(HaveOccurred())
return len(snapshots.Items)
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
By("Check last import snapshot is timestamped and not garbage collected")
found := false
for _, snap := range snapshots.Items {
if snap.UID == currentSource.GetUID() {
lastUse := snap.Annotations[controller.AnnLastUseTime]
Expect(lastUse).ToNot(BeEmpty())
ts, err := time.Parse(time.RFC3339Nano, lastUse)
Expect(err).ToNot(HaveOccurred())
Expect(ts).To(BeTemporally("<", time.Now()))
found = true
break
}
}
Expect(found).To(BeTrue())
}
},
Entry("[test_id:7406] with PVC & DV sources", cdiv1.DataImportCronSourceFormatPvc),
Entry("[test_id:10033] with snapshot sources", cdiv1.DataImportCronSourceFormatSnapshot),
)
It("[test_id:8033] should delete jobs on deletion", func() {
noSuchCM := "nosuch"
reg.CertConfigMap = &noSuchCM
cron = utils.NewDataImportCron("cron-test", "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
By("Create new DataImportCron")
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify initial job created")
initialJobName := controller.GetInitialJobName(cron)
Eventually(func() *batchv1.Job {
job, _ := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), initialJobName, metav1.GetOptions{})
return job
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "initial job was not created")
By("Verify initial job pod created")
Eventually(func() *corev1.Pod {
pod, _ := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, initialJobName, "")
return pod
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "initial job pod was not created")
By("Verify cronjob created and has active job")
cronJobName := controller.GetCronJobName(cron)
jobName := ""
Eventually(func() string {
cronjob, _ := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), cronJobName, metav1.GetOptions{})
if cronjob != nil && len(cronjob.Status.Active) > 0 {
jobName = cronjob.Status.Active[0].Name
}
return jobName
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeEmpty(), "cronjob has no active job")
By("Verify cronjob first job created")
Eventually(func() *batchv1.Job {
job, _ := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), jobName, metav1.GetOptions{})
return job
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "cronjob first job was not created")
By("Verify cronjob first job pod created")
Eventually(func() *corev1.Pod {
pod, _ := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, jobName, "")
return pod
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "cronjob first job pod was not created")
By("Delete cron")
err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
Expect(err).ToNot(HaveOccurred())
By("Verify cronjob deleted")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), cronJobName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not deleted")
By("Verify initial job deleted")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), initialJobName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "initial job was not deleted")
By("Verify initial job pod deleted")
Eventually(func() bool {
_, err := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, initialJobName, "")
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "initial job pod was not deleted")
By("Verify cronjob first job deleted")
Eventually(func() bool {
_, err := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), jobName, metav1.GetOptions{})
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob first job was not deleted")
By("Verify cronjob first job pod deleted")
Eventually(func() bool {
_, err := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, jobName, "")
return errors.IsNotFound(err)
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob first job pod was not deleted")
})
Context("Change source format of existing DataImportCron", func() {
It("[test_id:10034] Should allow switching back and forth from PVC to snapshot sources", func() {
if !f.IsSnapshotStorageClassAvailable() {
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
}
size := "1Gi"
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
cron = utils.NewDataImportCronWithStorageSpec(cronName, size, scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
retentionPolicy := cdiv1.DataImportCronRetainNone
cron.Spec.RetentionPolicy = &retentionPolicy
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
Expect(err).ToNot(HaveOccurred())
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
By("Verify CurrentImports update")
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
Expect(currentImportDv).ToNot(BeEmpty())
_ = verifySourceReady(cdiv1.DataImportCronSourceFormatPvc, currentImportDv)
snapshots := &snapshotv1.VolumeSnapshotList{}
err = f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
Expect(err).ToNot(HaveOccurred())
Expect(snapshots.Items).To(BeEmpty())
// Ensure existing PVC clones from this source don't mess up future ones
cloneDV := utils.NewDataVolumeForImageCloningAndStorageSpec("target-dv-from-pvc", size, ns, currentImportDv, nil, nil)
cloneDV, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, cloneDV)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(cloneDV)
err = utils.WaitForDataVolumePhase(f, cloneDV.Namespace, cdiv1.Succeeded, cloneDV.Name)
// Now simulate an upgrade, where a new CDI version has identified
// more storage types that scale better with snapshots
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatSnapshot)
// Check snapshot now exists and PVC is gone
currentSource := verifySourceReady(cdiv1.DataImportCronSourceFormatSnapshot, currentImportDv)
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
// DataSource is updated to point to a snapshot
dataSource, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
Expect(readyCond.Status).To(Equal(corev1.ConditionTrue))
expectedSource := cdiv1.DataSourceSource{
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
Name: currentSource.GetName(),
Namespace: currentSource.GetNamespace(),
},
}
Expect(dataSource.Spec.Source).To(Equal(expectedSource))
// Verify content
targetDV := utils.NewDataVolumeWithSourceRefAndStorageAPI("target-dv-from-snap", &size, dataSource.Namespace, dataSource.Name)
By(fmt.Sprintf("Create new target datavolume %s", targetDV.Name))
targetDataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, targetDV)
Expect(err).ToNot(HaveOccurred())
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDataVolume)
By("Wait for clone DV Succeeded phase")
err = utils.WaitForDataVolumePhase(f, targetDataVolume.Namespace, cdiv1.Succeeded, targetDataVolume.Name)
Expect(err).ToNot(HaveOccurred())
By("Verify MD5")
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(targetDataVolume.Namespace).Get(context.TODO(), targetDataVolume.Name, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
path := utils.DefaultImagePath
volumeMode := pvc.Spec.VolumeMode
if volumeMode != nil && *volumeMode == corev1.PersistentVolumeBlock {
path = utils.DefaultPvcMountPath
}
same, err := f.VerifyTargetPVCContentMD5(f.Namespace, pvc, path, utils.UploadFileMD5, utils.UploadFileSize)
Expect(err).ToNot(HaveOccurred())
Expect(same).To(BeTrue())
By("Switch back from snap to PVC source")
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
currentSource = verifySourceReady(cdiv1.DataImportCronSourceFormatPvc, currentImportDv)
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
Expect(err).ToNot(HaveOccurred())
expectedSource = cdiv1.DataSourceSource{
PVC: &cdiv1.DataVolumeSourcePVC{
Name: currentSource.GetName(),
Namespace: currentSource.GetNamespace(),
},
}
Expect(dataSource.Spec.Source).To(Equal(expectedSource))
})
})
})
func getDataVolumeSourceRegistry(f *framework.Framework) (*cdiv1.DataVolumeSourceRegistry, error) {
reg := &cdiv1.DataVolumeSourceRegistry{}
var (
pullMethod cdiv1.RegistryPullMethod
url string
)
if utils.IsOpenshift(f.K8sClient) {
url = fmt.Sprintf(utils.TinyCoreIsoRegistryURL, f.CdiInstallNs)
pullMethod = cdiv1.RegistryPullPod
} else {
url = fmt.Sprintf(utils.TrustedRegistryURL, f.DockerPrefix)
pullMethod = cdiv1.RegistryPullNode
}
reg.URL = &url
reg.PullMethod = &pullMethod
if err := utils.AddInsecureRegistry(f.CrClient, url); err != nil {
return nil, err
}
return reg, nil
}
func updateDataImportCron(clientSet *cdiclientset.Clientset, namespace string, cronName string,
update func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron) func() error {
return func() error {
cron, err := clientSet.CdiV1beta1().DataImportCrons(namespace).Get(context.TODO(), cronName, metav1.GetOptions{})
if err != nil {
return err
}
cron = update(cron)
_, err = clientSet.CdiV1beta1().DataImportCrons(namespace).Update(context.TODO(), cron, metav1.UpdateOptions{})
return err
}
}
func updateDataSource(clientSet *cdiclientset.Clientset, namespace string, dataSourceName string,
update func(dataSource *cdiv1.DataSource) *cdiv1.DataSource) func() error {
return func() error {
dataSource, err := clientSet.CdiV1beta1().DataSources(namespace).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
if err != nil {
return err
}
dataSource = update(dataSource)
_, err = clientSet.CdiV1beta1().DataSources(namespace).Update(context.TODO(), dataSource, metav1.UpdateOptions{})
return err
}
}
func retryOnceOnErr(f func() error) Assertion {
err := f()
if err != nil {
err = f()
}
return Expect(err)
}