mirror of
https://github.com/kubevirt/containerized-data-importer.git
synced 2025-06-03 06:30:22 +00:00

As commented by Alex Kalenyuk: the condition check flakes because the switch to snapshot boot sources is almost instant. The DataImportCron never becomes "not ready" following the switch. We also moved the DIC "ready" check after the snapshot "ready" check, otherwise we may check it before it's "not ready", which checks nothing. Signed-off-by: Arnon Gilboa <agilboa@redhat.com> Co-authored-by: Arnon Gilboa <agilboa@redhat.com>
817 lines
34 KiB
Go
817 lines
34 KiB
Go
package tests_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
cdiclientset "kubevirt.io/containerized-data-importer/pkg/client/clientset/versioned"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
|
|
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
|
|
. "github.com/onsi/ginkgo"
|
|
"github.com/onsi/ginkgo/extensions/table"
|
|
. "github.com/onsi/gomega"
|
|
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
logf "sigs.k8s.io/controller-runtime/pkg/log"
|
|
|
|
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
|
|
"kubevirt.io/containerized-data-importer/pkg/common"
|
|
"kubevirt.io/containerized-data-importer/pkg/controller"
|
|
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
|
|
"kubevirt.io/containerized-data-importer/tests/framework"
|
|
"kubevirt.io/containerized-data-importer/tests/utils"
|
|
)
|
|
|
|
const (
|
|
dataImportCronTimeout = 4 * time.Minute
|
|
scheduleEveryMinute = "* * * * *"
|
|
scheduleOnceAYear = "0 0 1 1 *"
|
|
importsToKeep = 1
|
|
emptySchedule = ""
|
|
errorDigest = "sha256:12345678900987654321"
|
|
)
|
|
|
|
var _ = Describe("DataImportCron", func() {
|
|
var (
|
|
f = framework.NewFramework("dataimportcron-func-test")
|
|
log = logf.Log.WithName("dataimportcron_test")
|
|
dataSourceName = "datasource-test"
|
|
pollerPodName = "poller"
|
|
cronName = "cron-test"
|
|
cron *cdiv1.DataImportCron
|
|
reg *cdiv1.DataVolumeSourceRegistry
|
|
err error
|
|
ns string
|
|
scName string
|
|
originalProfileSpec *cdiv1.StorageProfileSpec
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
ns = f.Namespace.Name
|
|
reg, err = getDataVolumeSourceRegistry(f)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
scName = utils.DefaultStorageClass.GetName()
|
|
By(fmt.Sprintf("Get original storage profile: %s", scName))
|
|
|
|
spec, err := utils.GetStorageProfileSpec(f.CdiClient, scName)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
originalProfileSpec = spec
|
|
})
|
|
|
|
AfterEach(func() {
|
|
if err = utils.RemoveInsecureRegistry(f.CrClient, *reg.URL); err != nil {
|
|
fmt.Fprintf(GinkgoWriter, "failed to remove registry; %v", err)
|
|
}
|
|
err = utils.DeletePodByName(f.K8sClient, pollerPodName, f.CdiInstallNs, nil)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("[AfterEach] Restore the profile")
|
|
Expect(utils.UpdateStorageProfile(f.CrClient, scName, *originalProfileSpec)).Should(Succeed())
|
|
})
|
|
|
|
updateDigest := func(digest string) func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
|
|
return func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron {
|
|
cc.AddAnnotation(cron, controller.AnnSourceDesiredDigest, digest)
|
|
return cron
|
|
}
|
|
}
|
|
|
|
waitForDigest := func() {
|
|
Eventually(func() string {
|
|
cron, err := f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
return cron.Annotations[controller.AnnSourceDesiredDigest]
|
|
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeEmpty(), "Desired digest is empty")
|
|
}
|
|
|
|
waitForConditions := func(statusProgressing, statusUpToDate corev1.ConditionStatus) {
|
|
By(fmt.Sprintf("Wait for DataImportCron Progressing:%s, UpToDate:%s", statusProgressing, statusUpToDate))
|
|
Eventually(func() bool {
|
|
var err error
|
|
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
condProgressing := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronProgressing)
|
|
condUpToDate := controller.FindDataImportCronConditionByType(cron, cdiv1.DataImportCronUpToDate)
|
|
return condProgressing != nil && condProgressing.Status == statusProgressing &&
|
|
condUpToDate != nil && condUpToDate.Status == statusUpToDate
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "Timeout waiting for DataImportCron conditions")
|
|
}
|
|
|
|
configureStorageProfileResultingFormat := func(format cdiv1.DataImportCronSourceFormat) {
|
|
By(fmt.Sprintf("configure storage profile %s", scName))
|
|
newProfileSpec := originalProfileSpec.DeepCopy()
|
|
newProfileSpec.DataImportCronSourceFormat = &format
|
|
err := utils.UpdateStorageProfile(f.CrClient, scName, *newProfileSpec)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Eventually(func() *cdiv1.DataImportCronSourceFormat {
|
|
profile, err := f.CdiClient.CdiV1beta1().StorageProfiles().Get(context.TODO(), scName, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
return profile.Status.DataImportCronSourceFormat
|
|
}, 15*time.Second, time.Second).Should(Equal(&format))
|
|
}
|
|
|
|
verifySourceReady := func(format cdiv1.DataImportCronSourceFormat, name string) metav1.Object {
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
By(fmt.Sprintf("Verify pvc was created %s", name))
|
|
pvc, err := utils.WaitForPVC(f.K8sClient, ns, name)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Wait for import completion")
|
|
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, name)
|
|
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
|
|
return pvc
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
snapshot := &snapshotv1.VolumeSnapshot{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: ns,
|
|
},
|
|
}
|
|
snapshot = utils.WaitSnapshotReady(f.CrClient, snapshot)
|
|
deleted, err := utils.WaitPVCDeleted(f.K8sClient, name, ns, 30*time.Second)
|
|
if err != nil {
|
|
// work around https://github.com/kubernetes-csi/external-snapshotter/issues/957
|
|
// it does converge after the resync period of snapshot controller (15mins)
|
|
cc.AddAnnotation(snapshot, "workaround", "triggersync")
|
|
err = f.CrClient.Update(context.TODO(), snapshot)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
// try again
|
|
deleted, err = utils.WaitPVCDeleted(f.K8sClient, name, ns, 30*time.Second)
|
|
}
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(deleted).To(BeTrue())
|
|
// check pvc is not recreated
|
|
Consistently(func() error {
|
|
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
|
|
return err
|
|
}, 5*time.Second, 1*time.Second).Should(
|
|
SatisfyAll(HaveOccurred(), WithTransform(errors.IsNotFound, BeTrue())),
|
|
"PVC should not have been recreated",
|
|
)
|
|
return snapshot
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
deleteSource := func(format cdiv1.DataImportCronSourceFormat, name string) {
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).Get(context.TODO(), name, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
utils.CleanupDvPvcNoWait(f.K8sClient, f.CdiClient, f.Namespace.Name, name)
|
|
deleted, err := f.WaitPVCDeletedByUID(pvc, time.Minute)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(deleted).To(BeTrue())
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
snapshot := &snapshotv1.VolumeSnapshot{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: name,
|
|
Namespace: ns,
|
|
},
|
|
}
|
|
// Probably good to ensure deletion by UID here too but re-import is long enough
|
|
// to not cause errors
|
|
Eventually(func() bool {
|
|
err := f.CrClient.Delete(context.TODO(), snapshot)
|
|
return err != nil && errors.IsNotFound(err)
|
|
}, time.Minute, time.Second).Should(BeTrue())
|
|
}
|
|
}
|
|
|
|
getDataSourceName := func(format cdiv1.DataImportCronSourceFormat, ds *cdiv1.DataSource) string {
|
|
var sourceName string
|
|
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
sourceName = ds.Spec.Source.PVC.Name
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
sourceName = ds.Spec.Source.Snapshot.Name
|
|
}
|
|
|
|
return sourceName
|
|
}
|
|
|
|
verifyRetention := func(format cdiv1.DataImportCronSourceFormat, name string) {
|
|
By("Verify DataSource retention")
|
|
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
Consistently(func() *metav1.Time {
|
|
src := verifySourceReady(format, name)
|
|
return src.GetDeletionTimestamp()
|
|
}, 5*time.Second, time.Second).Should(BeNil())
|
|
}
|
|
|
|
verifyDeletion := func(format cdiv1.DataImportCronSourceFormat) {
|
|
By("Verify DataSource deletion")
|
|
Eventually(func() bool {
|
|
_, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
|
|
return errors.IsNotFound(err)
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not deleted")
|
|
|
|
By("Verify sources deleted")
|
|
Eventually(func(g Gomega) bool {
|
|
pvcs, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
|
|
g.Expect(err).ToNot(HaveOccurred())
|
|
return len(pvcs.Items) == 0
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "PVCs were not deleted")
|
|
|
|
if format == cdiv1.DataImportCronSourceFormatSnapshot {
|
|
snapshots := &snapshotv1.VolumeSnapshotList{}
|
|
Eventually(func(g Gomega) bool {
|
|
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
|
|
g.Expect(err).ToNot(HaveOccurred())
|
|
return len(snapshots.Items) == 0
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "snapshots were not deleted")
|
|
}
|
|
}
|
|
|
|
table.DescribeTable("should", func(retention, createErrorDv bool, repeat int, format cdiv1.DataImportCronSourceFormat) {
|
|
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
|
|
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
|
|
}
|
|
|
|
configureStorageProfileResultingFormat(format)
|
|
|
|
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
|
|
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
|
|
|
|
garbageCollect := cdiv1.DataImportCronGarbageCollectNever
|
|
cron.Spec.GarbageCollect = &garbageCollect
|
|
|
|
if !retention {
|
|
retentionPolicy := cdiv1.DataImportCronRetainNone
|
|
cron.Spec.RetentionPolicy = &retentionPolicy
|
|
}
|
|
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Verify cronjob was created")
|
|
Eventually(func() bool {
|
|
_, err := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
|
|
if errors.IsNotFound(err) {
|
|
return false
|
|
}
|
|
Expect(err).ToNot(HaveOccurred())
|
|
return true
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not created")
|
|
|
|
var lastImportDv, currentImportDv string
|
|
for i := 0; i < repeat; i++ {
|
|
By(fmt.Sprintf("Iter #%d", i))
|
|
if i > 0 {
|
|
if createErrorDv {
|
|
By("Set desired digest to nonexisting one")
|
|
|
|
//get and update!!!
|
|
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(errorDigest))).Should(BeNil())
|
|
|
|
By("Wait for CurrentImports update")
|
|
Eventually(func() string {
|
|
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Get(context.TODO(), cronName, metav1.GetOptions{})
|
|
currentImportDv = cron.Status.CurrentImports[0].DataVolumeName
|
|
Expect(currentImportDv).ToNot(BeEmpty())
|
|
return currentImportDv
|
|
}, dataImportCronTimeout, pollingInterval).ShouldNot(Equal(lastImportDv), "Current import was not updated")
|
|
lastImportDv = currentImportDv
|
|
} else {
|
|
By("Reset desired digest")
|
|
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cronName, updateDigest(""))).Should(BeNil())
|
|
|
|
By(fmt.Sprintf("Delete last import %s, format: %s", currentImportDv, format))
|
|
deleteSource(format, currentImportDv)
|
|
lastImportDv = ""
|
|
|
|
By("Wait for non-empty desired digest")
|
|
waitForDigest()
|
|
}
|
|
}
|
|
|
|
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
|
|
By("Verify CurrentImports update")
|
|
currentImportDv = cron.Status.CurrentImports[0].DataVolumeName
|
|
Expect(currentImportDv).ToNot(BeEmpty())
|
|
Expect(currentImportDv).ToNot(Equal(lastImportDv))
|
|
lastImportDv = currentImportDv
|
|
|
|
currentSource := verifySourceReady(format, currentImportDv)
|
|
|
|
By("Verify DataSource was updated")
|
|
var dataSource *cdiv1.DataSource
|
|
Eventually(func() bool {
|
|
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
|
|
if errors.IsNotFound(err) {
|
|
return false
|
|
}
|
|
Expect(err).ToNot(HaveOccurred())
|
|
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
|
|
return readyCond != nil && readyCond.Status == corev1.ConditionTrue &&
|
|
getDataSourceName(format, dataSource) == currentImportDv
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not updated")
|
|
|
|
By("Verify cron was updated")
|
|
Expect(cron.Status.LastImportedPVC).ToNot(BeNil())
|
|
Expect(cron.Status.LastImportedPVC.Name).To(Equal(currentImportDv))
|
|
|
|
By("Update DataSource to refer to a dummy name")
|
|
retryOnceOnErr(
|
|
updateDataSource(f.CdiClient, ns, cron.Spec.ManagedDataSource,
|
|
func(dataSource *cdiv1.DataSource) *cdiv1.DataSource {
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
dataSource.Spec.Source.PVC.Name = "dummy"
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
dataSource.Spec.Source.Snapshot.Name = "dummy"
|
|
}
|
|
return dataSource
|
|
},
|
|
)).Should(BeNil())
|
|
|
|
By("Verify name on DataSource was reconciled")
|
|
Eventually(func() bool {
|
|
dataSource, err = f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
return getDataSourceName(format, dataSource) == currentImportDv
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource name was not reconciled")
|
|
|
|
By("Delete DataSource")
|
|
err = f.CdiClient.CdiV1beta1().DataSources(ns).Delete(context.TODO(), dataSourceName, metav1.DeleteOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
By("Verify DataSource was re-created")
|
|
Eventually(func() bool {
|
|
ds, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
|
|
return err == nil && ds.UID != dataSource.UID
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "DataSource was not re-created")
|
|
|
|
By("Delete last imported source")
|
|
deleteSource(format, currentSource.GetName())
|
|
By("Verify last imported source was re-created")
|
|
recreatedSource := verifySourceReady(format, currentSource.GetName())
|
|
Expect(recreatedSource.GetUID()).ToNot(Equal(currentSource.GetUID()), "Last imported source was not re-created")
|
|
}
|
|
|
|
lastImportedPVC := cron.Status.LastImportedPVC
|
|
|
|
By("Delete cron")
|
|
err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
if retention {
|
|
verifyRetention(format, lastImportedPVC.Name)
|
|
} else {
|
|
verifyDeletion(format)
|
|
}
|
|
},
|
|
table.Entry("[test_id:7403] succeed importing initial PVC from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatPvc),
|
|
table.Entry("[test_id:7414] succeed importing PVC from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatPvc),
|
|
table.Entry("[test_id:10031] succeed importing initially into a snapshot from registry URL", true, false, 1, cdiv1.DataImportCronSourceFormatSnapshot),
|
|
table.Entry("[test_id:10032] succeed importing to a snapshot from registry URL on source digest update", true, false, 2, cdiv1.DataImportCronSourceFormatSnapshot),
|
|
table.Entry("[test_id:8266] succeed deleting error DVs when importing new ones", false, true, 2, cdiv1.DataImportCronSourceFormatPvc),
|
|
)
|
|
|
|
It("[test_id:10040] Should get digest updated by external poller", func() {
|
|
By("Create DataImportCron with only initial poller job")
|
|
cron = utils.NewDataImportCron(cronName, "5Gi", scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
|
|
retentionPolicy := cdiv1.DataImportCronRetainNone
|
|
cron.Spec.RetentionPolicy = &retentionPolicy
|
|
cron, err := f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Wait for initial digest")
|
|
waitForDigest()
|
|
|
|
By("Set empty digest")
|
|
retryOnceOnErr(updateDataImportCron(f.CdiClient, ns, cron.Name, updateDigest(""))).Should(BeNil())
|
|
|
|
By("Create poller pod to update the DataImportCron digest")
|
|
importerImage := f.GetEnvVarValue("IMPORTER_IMAGE")
|
|
Expect(importerImage).ToNot(BeEmpty())
|
|
|
|
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pollerPodName}}
|
|
err = controller.InitPollerPodSpec(f.CrClient, cron, &pod.Spec, importerImage, corev1.PullIfNotPresent, log)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
_, err = utils.CreatePod(f.K8sClient, f.CdiInstallNs, pod)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Wait for digest set by external poller")
|
|
waitForDigest()
|
|
})
|
|
|
|
It("[test_id:XXXX] Should allow an empty schedule to trigger an external update to the source", func() {
|
|
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
|
|
|
|
By("Create DataImportCron with empty schedule")
|
|
cron = utils.NewDataImportCron(cronName, "5Gi", emptySchedule, dataSourceName, importsToKeep, *reg)
|
|
retentionPolicy := cdiv1.DataImportCronRetainNone
|
|
cron.Spec.RetentionPolicy = &retentionPolicy
|
|
|
|
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Create poller pod to update the DataImportCron digest")
|
|
importerImage := f.GetEnvVarValue("IMPORTER_IMAGE")
|
|
Expect(importerImage).ToNot(BeEmpty())
|
|
|
|
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: pollerPodName}}
|
|
err = controller.InitPollerPodSpec(f.CrClient, cron, &pod.Spec, importerImage, corev1.PullIfNotPresent, log)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
_, err = utils.CreatePod(f.K8sClient, f.CdiInstallNs, pod)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Wait for digest set by external poller")
|
|
waitForDigest()
|
|
|
|
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
|
|
By("Verify CurrentImports update")
|
|
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
|
|
Expect(currentImportDv).ToNot(BeEmpty())
|
|
|
|
By(fmt.Sprintf("Verify pvc was created %s", currentImportDv))
|
|
_, err = utils.WaitForPVC(f.K8sClient, ns, currentImportDv)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Wait for import completion")
|
|
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, currentImportDv)
|
|
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
|
|
|
|
By("Verify cronjob was not created")
|
|
_, err = f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), controller.GetCronJobName(cron), metav1.GetOptions{})
|
|
Expect(errors.IsNotFound(err)).To(BeTrue())
|
|
})
|
|
|
|
table.DescribeTable("Succeed garbage collecting sources when importing new ones", func(format cdiv1.DataImportCronSourceFormat) {
|
|
if format == cdiv1.DataImportCronSourceFormatSnapshot && !f.IsSnapshotStorageClassAvailable() {
|
|
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
|
|
}
|
|
const oldDvName = "old-version-dv"
|
|
|
|
configureStorageProfileResultingFormat(format)
|
|
|
|
garbageSources := 3
|
|
for i := 0; i < garbageSources; i++ {
|
|
srcName := fmt.Sprintf("src-garbage-%d", i)
|
|
By(fmt.Sprintf("Create %s", srcName))
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
pvc := utils.NewPVCDefinition(srcName, "1Gi",
|
|
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
|
|
map[string]string{common.DataImportCronLabel: cronName})
|
|
f.CreateBoundPVCFromDefinition(pvc)
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
pvc := utils.NewPVCDefinition(srcName, "1Gi",
|
|
map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)},
|
|
map[string]string{common.DataImportCronLabel: cronName})
|
|
f.CreateBoundPVCFromDefinition(pvc)
|
|
snapClass := f.GetSnapshotClass()
|
|
snapshot := utils.NewVolumeSnapshot(srcName, ns, pvc.Name, &snapClass.Name)
|
|
snapshot.SetAnnotations(map[string]string{controller.AnnLastUseTime: time.Now().UTC().Format(time.RFC3339Nano)})
|
|
snapshot.SetLabels(map[string]string{common.DataImportCronLabel: cronName})
|
|
err = f.CrClient.Create(context.TODO(), snapshot)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
utils.WaitSnapshotReady(f.CrClient, snapshot)
|
|
err = f.DeletePVC(pvc)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
deleted, err := utils.WaitPVCDeleted(f.K8sClient, srcName, ns, 2*time.Minute)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(deleted).To(BeTrue())
|
|
}
|
|
}
|
|
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
By(fmt.Sprintf("Create labeled DataVolume %s for old DVs garbage collection test", oldDvName))
|
|
dv := utils.NewDataVolumeWithRegistryImport(oldDvName, "5Gi", "")
|
|
dv.Spec.Source.Registry = reg
|
|
dv.Labels = map[string]string{common.DataImportCronLabel: cronName}
|
|
cc.AddAnnotation(dv, cc.AnnDeleteAfterCompletion, "false")
|
|
dv, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, dv)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Wait for import completion")
|
|
f.ForceBindPvcIfDvIsWaitForFirstConsumer(dv)
|
|
err = utils.WaitForDataVolumePhase(f, ns, cdiv1.Succeeded, dv.Name)
|
|
Expect(err).ToNot(HaveOccurred(), "Datavolume not in phase succeeded in time")
|
|
|
|
By(fmt.Sprintf("Verify PVC was created %s", dv.Name))
|
|
pvc, err := utils.WaitForPVC(f.K8sClient, ns, dv.Name)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
By(fmt.Sprintf("Verify DataImportCronLabel is passed to the PVC: %s", pvc.Labels[common.DataImportCronLabel]))
|
|
Expect(pvc.Labels[common.DataImportCronLabel]).To(Equal(cronName))
|
|
|
|
pvc.Labels[common.DataImportCronLabel] = ""
|
|
By("Update DataImportCron label to be empty in the PVC")
|
|
_, err = f.K8sClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Update(context.TODO(), pvc, metav1.UpdateOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
Eventually(func() []corev1.PersistentVolumeClaim {
|
|
pvcList, err := f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
return pvcList.Items
|
|
}, dataImportCronTimeout, pollingInterval).Should(HaveLen(garbageSources + 1))
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
snapshots := &snapshotv1.VolumeSnapshotList{}
|
|
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(snapshots.Items).To(HaveLen(garbageSources))
|
|
}
|
|
|
|
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
|
|
cron = utils.NewDataImportCron(cronName, "1Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
|
|
retentionPolicy := cdiv1.DataImportCronRetainNone
|
|
cron.Spec.RetentionPolicy = &retentionPolicy
|
|
|
|
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
|
|
By("Verify CurrentImports update")
|
|
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
|
|
Expect(currentImportDv).ToNot(BeEmpty())
|
|
|
|
currentSource := verifySourceReady(format, currentImportDv)
|
|
|
|
By("Check garbage collection")
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
By("Check old DV garbage collection")
|
|
Eventually(func() error {
|
|
_, err := f.CdiClient.CdiV1beta1().DataVolumes(ns).Get(context.TODO(), oldDvName, metav1.GetOptions{})
|
|
return err
|
|
}, dataImportCronTimeout, pollingInterval).Should(Satisfy(errors.IsNotFound), "Garbage collection failed cleaning old DV")
|
|
|
|
pvcList := &corev1.PersistentVolumeClaimList{}
|
|
Eventually(func() int {
|
|
pvcList, err = f.K8sClient.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
return len(pvcList.Items)
|
|
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
|
|
|
|
By("Check last import PVC is timestamped and not garbage collected")
|
|
found := false
|
|
for _, pvc := range pvcList.Items {
|
|
if pvc.UID == currentSource.GetUID() {
|
|
lastUse := pvc.Annotations[controller.AnnLastUseTime]
|
|
Expect(lastUse).ToNot(BeEmpty())
|
|
ts, err := time.Parse(time.RFC3339Nano, lastUse)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(ts).To(BeTemporally("<", time.Now()))
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
Expect(found).To(BeTrue())
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
snapshots := &snapshotv1.VolumeSnapshotList{}
|
|
Eventually(func(g Gomega) int {
|
|
err := f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
|
|
g.Expect(err).ToNot(HaveOccurred())
|
|
return len(snapshots.Items)
|
|
}, dataImportCronTimeout, pollingInterval).Should(Equal(importsToKeep), "Garbage collection failed cleaning old imports")
|
|
|
|
By("Check last import snapshot is timestamped and not garbage collected")
|
|
found := false
|
|
for _, snap := range snapshots.Items {
|
|
if snap.UID == currentSource.GetUID() {
|
|
lastUse := snap.Annotations[controller.AnnLastUseTime]
|
|
Expect(lastUse).ToNot(BeEmpty())
|
|
ts, err := time.Parse(time.RFC3339Nano, lastUse)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(ts).To(BeTemporally("<", time.Now()))
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
Expect(found).To(BeTrue())
|
|
}
|
|
},
|
|
table.Entry("[test_id:7406] with PVC sources", cdiv1.DataImportCronSourceFormatPvc),
|
|
table.Entry("[test_id:10033] with snapshot sources", cdiv1.DataImportCronSourceFormatSnapshot),
|
|
)
|
|
|
|
It("[test_id:8033] should delete jobs on deletion", func() {
|
|
noSuchCM := "nosuch"
|
|
reg.CertConfigMap = &noSuchCM
|
|
cron = utils.NewDataImportCron("cron-test", "5Gi", scheduleEveryMinute, dataSourceName, importsToKeep, *reg)
|
|
By("Create new DataImportCron")
|
|
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Verify initial job created")
|
|
initialJobName := controller.GetInitialJobName(cron)
|
|
Eventually(func() *batchv1.Job {
|
|
job, _ := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), initialJobName, metav1.GetOptions{})
|
|
return job
|
|
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "initial job was not created")
|
|
|
|
By("Verify initial job pod created")
|
|
Eventually(func() *corev1.Pod {
|
|
pod, _ := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, initialJobName, "")
|
|
return pod
|
|
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "initial job pod was not created")
|
|
|
|
By("Verify cronjob created and has active job")
|
|
cronJobName := controller.GetCronJobName(cron)
|
|
jobName := ""
|
|
Eventually(func() string {
|
|
cronjob, _ := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), cronJobName, metav1.GetOptions{})
|
|
if cronjob != nil && len(cronjob.Status.Active) > 0 {
|
|
jobName = cronjob.Status.Active[0].Name
|
|
}
|
|
return jobName
|
|
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeEmpty(), "cronjob has no active job")
|
|
|
|
By("Verify cronjob first job created")
|
|
Eventually(func() *batchv1.Job {
|
|
job, _ := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), jobName, metav1.GetOptions{})
|
|
return job
|
|
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "cronjob first job was not created")
|
|
|
|
By("Verify cronjob first job pod created")
|
|
Eventually(func() *corev1.Pod {
|
|
pod, _ := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, jobName, "")
|
|
return pod
|
|
}, dataImportCronTimeout, pollingInterval).ShouldNot(BeNil(), "cronjob first job pod was not created")
|
|
|
|
By("Delete cron")
|
|
err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Delete(context.TODO(), cronName, metav1.DeleteOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
By("Verify cronjob deleted")
|
|
Eventually(func() bool {
|
|
_, err := f.K8sClient.BatchV1().CronJobs(f.CdiInstallNs).Get(context.TODO(), cronJobName, metav1.GetOptions{})
|
|
return errors.IsNotFound(err)
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob was not deleted")
|
|
|
|
By("Verify initial job deleted")
|
|
Eventually(func() bool {
|
|
_, err := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), initialJobName, metav1.GetOptions{})
|
|
return errors.IsNotFound(err)
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "initial job was not deleted")
|
|
|
|
By("Verify initial job pod deleted")
|
|
Eventually(func() bool {
|
|
_, err := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, initialJobName, "")
|
|
return errors.IsNotFound(err)
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "initial job pod was not deleted")
|
|
|
|
By("Verify cronjob first job deleted")
|
|
Eventually(func() bool {
|
|
_, err := f.K8sClient.BatchV1().Jobs(f.CdiInstallNs).Get(context.TODO(), jobName, metav1.GetOptions{})
|
|
return errors.IsNotFound(err)
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob first job was not deleted")
|
|
|
|
By("Verify cronjob first job pod deleted")
|
|
Eventually(func() bool {
|
|
_, err := utils.FindPodByPrefixOnce(f.K8sClient, f.CdiInstallNs, jobName, "")
|
|
return errors.IsNotFound(err)
|
|
}, dataImportCronTimeout, pollingInterval).Should(BeTrue(), "cronjob first job pod was not deleted")
|
|
})
|
|
|
|
Context("Change source format of existing DataImportCron", func() {
|
|
It("[test_id:10034] Should switch from PVC to snapshot sources once format changes", func() {
|
|
if !f.IsSnapshotStorageClassAvailable() {
|
|
Skip("Volumesnapshot support needed to test DataImportCron with Volumesnapshot sources")
|
|
}
|
|
size := "1Gi"
|
|
|
|
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatPvc)
|
|
|
|
By(fmt.Sprintf("Create new DataImportCron %s, url %s", cronName, *reg.URL))
|
|
cron = utils.NewDataImportCronWithStorageSpec(cronName, size, scheduleOnceAYear, dataSourceName, importsToKeep, *reg)
|
|
retentionPolicy := cdiv1.DataImportCronRetainNone
|
|
cron.Spec.RetentionPolicy = &retentionPolicy
|
|
|
|
cron, err = f.CdiClient.CdiV1beta1().DataImportCrons(ns).Create(context.TODO(), cron, metav1.CreateOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
|
|
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
|
|
By("Verify CurrentImports update")
|
|
currentImportDv := cron.Status.CurrentImports[0].DataVolumeName
|
|
Expect(currentImportDv).ToNot(BeEmpty())
|
|
|
|
_ = verifySourceReady(cdiv1.DataImportCronSourceFormatPvc, currentImportDv)
|
|
snapshots := &snapshotv1.VolumeSnapshotList{}
|
|
err = f.CrClient.List(context.TODO(), snapshots, &client.ListOptions{Namespace: ns})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(snapshots.Items).To(BeEmpty())
|
|
// Ensure existing PVC clones from this source don't mess up future ones
|
|
cloneDV := utils.NewDataVolumeForImageCloningAndStorageSpec("target-dv-from-pvc", size, ns, currentImportDv, nil, nil)
|
|
cloneDV, err = utils.CreateDataVolumeFromDefinition(f.CdiClient, f.Namespace.Name, cloneDV)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
f.ForceBindPvcIfDvIsWaitForFirstConsumer(cloneDV)
|
|
err = utils.WaitForDataVolumePhase(f, cloneDV.Namespace, cdiv1.Succeeded, cloneDV.Name)
|
|
|
|
// Now simulate an upgrade, where a new CDI version has identified
|
|
// more storage types that scale better with snapshots
|
|
configureStorageProfileResultingFormat(cdiv1.DataImportCronSourceFormatSnapshot)
|
|
// Check snapshot now exists and PVC is gone
|
|
currentSource := verifySourceReady(cdiv1.DataImportCronSourceFormatSnapshot, currentImportDv)
|
|
waitForConditions(corev1.ConditionFalse, corev1.ConditionTrue)
|
|
// DataSource is updated to point to a snapshot
|
|
dataSource, err := f.CdiClient.CdiV1beta1().DataSources(ns).Get(context.TODO(), cron.Spec.ManagedDataSource, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
readyCond := controller.FindDataSourceConditionByType(dataSource, cdiv1.DataSourceReady)
|
|
Expect(readyCond.Status).To(Equal(corev1.ConditionTrue))
|
|
expectedSource := cdiv1.DataSourceSource{
|
|
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
|
|
Name: currentSource.GetName(),
|
|
Namespace: currentSource.GetNamespace(),
|
|
},
|
|
}
|
|
Expect(dataSource.Spec.Source).To(Equal(expectedSource))
|
|
// Verify content
|
|
targetDV := utils.NewDataVolumeWithSourceRefAndStorageAPI("target-dv-from-snap", &size, dataSource.Namespace, dataSource.Name)
|
|
By(fmt.Sprintf("Create new target datavolume %s", targetDV.Name))
|
|
targetDataVolume, err := utils.CreateDataVolumeFromDefinition(f.CdiClient, ns, targetDV)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
f.ForceBindPvcIfDvIsWaitForFirstConsumer(targetDataVolume)
|
|
|
|
By("Wait for clone DV Succeeded phase")
|
|
err = utils.WaitForDataVolumePhase(f, targetDataVolume.Namespace, cdiv1.Succeeded, targetDataVolume.Name)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
By("Verify MD5")
|
|
pvc, err := f.K8sClient.CoreV1().PersistentVolumeClaims(targetDataVolume.Namespace).Get(context.TODO(), targetDataVolume.Name, metav1.GetOptions{})
|
|
Expect(err).ToNot(HaveOccurred())
|
|
path := utils.DefaultImagePath
|
|
volumeMode := pvc.Spec.VolumeMode
|
|
if volumeMode != nil && *volumeMode == corev1.PersistentVolumeBlock {
|
|
path = utils.DefaultPvcMountPath
|
|
}
|
|
same, err := f.VerifyTargetPVCContentMD5(f.Namespace, pvc, path, utils.UploadFileMD5, utils.UploadFileSize)
|
|
Expect(err).ToNot(HaveOccurred())
|
|
Expect(same).To(BeTrue())
|
|
})
|
|
})
|
|
})
|
|
|
|
func getDataVolumeSourceRegistry(f *framework.Framework) (*cdiv1.DataVolumeSourceRegistry, error) {
|
|
reg := &cdiv1.DataVolumeSourceRegistry{}
|
|
var (
|
|
pullMethod cdiv1.RegistryPullMethod
|
|
url string
|
|
)
|
|
if utils.IsOpenshift(f.K8sClient) {
|
|
url = fmt.Sprintf(utils.TinyCoreIsoRegistryURL, f.CdiInstallNs)
|
|
pullMethod = cdiv1.RegistryPullPod
|
|
} else {
|
|
url = fmt.Sprintf(utils.TrustedRegistryURL, f.DockerPrefix)
|
|
pullMethod = cdiv1.RegistryPullNode
|
|
}
|
|
reg.URL = &url
|
|
reg.PullMethod = &pullMethod
|
|
if err := utils.AddInsecureRegistry(f.CrClient, url); err != nil {
|
|
return nil, err
|
|
}
|
|
return reg, nil
|
|
}
|
|
|
|
func updateDataImportCron(clientSet *cdiclientset.Clientset, namespace string, cronName string,
|
|
update func(cron *cdiv1.DataImportCron) *cdiv1.DataImportCron) func() error {
|
|
|
|
return func() error {
|
|
cron, err := clientSet.CdiV1beta1().DataImportCrons(namespace).Get(context.TODO(), cronName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
cron = update(cron)
|
|
|
|
_, err = clientSet.CdiV1beta1().DataImportCrons(namespace).Update(context.TODO(), cron, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
}
|
|
|
|
func updateDataSource(clientSet *cdiclientset.Clientset, namespace string, dataSourceName string,
|
|
update func(dataSource *cdiv1.DataSource) *cdiv1.DataSource) func() error {
|
|
return func() error {
|
|
dataSource, err := clientSet.CdiV1beta1().DataSources(namespace).Get(context.TODO(), dataSourceName, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dataSource = update(dataSource)
|
|
|
|
_, err = clientSet.CdiV1beta1().DataSources(namespace).Update(context.TODO(), dataSource, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
}
|
|
|
|
func retryOnceOnErr(f func() error) Assertion {
|
|
err := f()
|
|
if err != nil {
|
|
err = f()
|
|
}
|
|
|
|
return Expect(err)
|
|
}
|