mirror of
https://github.com/kubevirt/containerized-data-importer.git
synced 2025-06-03 06:30:22 +00:00

* Enable empty schedule in DataImportCron (#2711) Allow disabling DataImportCron schedule and support external trigger Signed-off-by: Ido Aharon <iaharon@redhat.com> * expand upon #2721 (#2731) Need to replace requeue bool with requeue duration Signed-off-by: Michael Henriksen <mhenriks@redhat.com> * Add clone from snapshot functionalities to clone-populator (#2724) * Add clone from snapshot functionalities to the clone populator Signed-off-by: Alvaro Romero <alromero@redhat.com> * Update clone populator unit tests to cover clone from snapshot capabilities Signed-off-by: Alvaro Romero <alromero@redhat.com> * Fix storage class assignation in temp-source claim for host-assisted clone from snapshot This commit also includes other minor and styling-related fixes Signed-off-by: Alvaro Romero <alromero@redhat.com> --------- Signed-off-by: Alvaro Romero <alromero@redhat.com> * Prepare CDI testing for the upcoming non-CSI lane (#2730) * Update functional tests to skip incompatible default storage classes Signed-off-by: Alvaro Romero <alromero@redhat.com> * Enable the use of non-csi HPP in testing lanes This commit modifies several scripts to allow the usage of classic HPP as the default SC in tests. This allows us to test our non-populator flow with a non-csi provisioner. Signed-off-by: Alvaro Romero <alromero@redhat.com> --------- Signed-off-by: Alvaro Romero <alromero@redhat.com> * Allow snapshots as format for DataImportCron created sources (#2700) * StorageProfile API for declaring format of resulting cron disk images Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com> * Integrate recommended format in dataimportcron controller Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com> * Take snapclass existence into consideration when populating cloneStrategy and sourceFormat Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com> --------- Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com> * Remove leader election test (#2745) Now that we are using the standard k8s leases from the controller runtime library, there is no need to test our implementation as it is no longer in use. This will save some testing time and random failures. Signed-off-by: Alexander Wels <awels@redhat.com> * Integration of Data volume using CDI populators (#2722) * move cleanup out of dv deletion It seemed off to call cleanup in the prepare function just because we don't call cleanup unless the dv is deleting. Instead we check in the clenup function itself if it should be done: in this 2 specific cases in case of deletion and in case the dv succeeded. The cleanup will be used in future commit also for population cleanup which we also want to happen not only on deletion. Signed-off-by: Shelly Kagan <skagan@redhat.com> * Use populator if csi storage class exists Add new datavolume phase PendingPopulation to indicate wffc when using populators, this new phase will be used in kubevirt in order to know that there is no need for dummy pod to pass wffc phase and that the population will occur once creating the vm. Signed-off-by: Shelly Kagan <skagan@redhat.com> * Update population targetPVC with pvc prime annotations The annotations will be used to update dv that uses the populators. Signed-off-by: Shelly Kagan <skagan@redhat.com> * Adjust UT with new behavior Signed-off-by: Shelly Kagan <skagan@redhat.com> * updates after review Signed-off-by: Shelly Kagan <skagan@redhat.com> * Fix import populator report progress The import pod should be taken from pvcprime Signed-off-by: Shelly Kagan <skagan@redhat.com> * Prevent requeue upload dv when failing to find progress report pod Signed-off-by: Shelly Kagan <skagan@redhat.com> * Remove size inflation in populators The populators are handling existing PVCs. The PVC already has a defined requested size, inflating the PVC' with fsoverhead will only be on the PVC' spec and will not reflect on the target PVC, this seems undesired. Instead if the populators is using by PVC that the datavolume controller created the inflation will happen there if needed. Signed-off-by: Shelly Kagan <skagan@redhat.com> * Adjust functional tests to handle dvs using populators Signed-off-by: Shelly Kagan <skagan@redhat.com> * Fix clone test Signed-off-by: Shelly Kagan <skagan@redhat.com> * add shouldUpdateProgress variable to know if need to update progress Signed-off-by: Shelly Kagan <skagan@redhat.com> * Change update of annotation from denied list to allowed list Instead if checking if the annotation on pvcPrime is not desired go over desired list and if the annotation exists add it. Signed-off-by: Shelly Kagan <skagan@redhat.com> * fix removing annotations from pv when rebinding Signed-off-by: Shelly Kagan <skagan@redhat.com> * More fixes and UT Signed-off-by: Shelly Kagan <skagan@redhat.com> * a bit more updates and UTs Signed-off-by: Shelly Kagan <skagan@redhat.com> --------- Signed-off-by: Shelly Kagan <skagan@redhat.com> * Run bazelisk run //robots/cmd/uploader:uploader -- -workspace /home/prow/go/src/github.com/kubevirt/project-infra/../containerized-data-importer/WORKSPACE -dry-run=false (#2751) Signed-off-by: kubevirt-bot <kubevirtbot@redhat.com> * Allow dynamic linked build for non bazel build (#2753) The current script always passes the static ldflag to the compiler which will result in a static binary. We would like to be able to build dynamic libraries instead. cdi-containerimage-server has to be static because we are copying it into the context of a container disk container which is most likely based on a scratch container and has no libraries for us to use. Signed-off-by: Alexander Wels <awels@redhat.com> * Disable DV GC by default (#2754) * Disable DV GC by default DataVolume garbage collection is a nice feature, but unfortunately it violates fundamental principle of Kubernetes. CR should not be auto-deleted when it completes its role (Job with TTLSecondsAfter- Finished is an exception), and once CR was created we can assume it is there until explicitly deleted. In addition, CR should keep idempotency, so the same CR manifest can be applied multiple times, as long as it is a valid update (e.g. DataVolume validation webhook does not allow updating the spec). When GC is enabled, some systems (e.g GitOps / ArgoCD) may require a workaround (DV annotation deleteAfterCompletion = "false") to prevent GC and function correctly. On the next kubevirt-bot Bump kubevirtci PR (with bump-cdi), it will fail on all kubevirtci lanes with tests referring DVs, as the tests IsDataVolumeGC() looks at CDIConfig Spec.DataVolumeTTLSeconds and assumes default is enabled. This should be fixed there. Signed-off-by: Arnon Gilboa <agilboa@redhat.com> * Fix test waiting for PVC deletion with UID Signed-off-by: Arnon Gilboa <agilboa@redhat.com> * Fix clone test assuming DV was GCed Signed-off-by: Arnon Gilboa <agilboa@redhat.com> * Fix DIC controller DV/PVC deletion when snapshot is ready Signed-off-by: Arnon Gilboa <agilboa@redhat.com> --------- Signed-off-by: Arnon Gilboa <agilboa@redhat.com> --------- Signed-off-by: Ido Aharon <iaharon@redhat.com> Signed-off-by: Michael Henriksen <mhenriks@redhat.com> Signed-off-by: Alvaro Romero <alromero@redhat.com> Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com> Signed-off-by: Alexander Wels <awels@redhat.com> Signed-off-by: Shelly Kagan <skagan@redhat.com> Signed-off-by: kubevirt-bot <kubevirtbot@redhat.com> Signed-off-by: Arnon Gilboa <agilboa@redhat.com> Co-authored-by: Ido Aharon <iaharon@redhat.com> Co-authored-by: Michael Henriksen <mhenriks@redhat.com> Co-authored-by: alromeros <alromero@redhat.com> Co-authored-by: akalenyu <akalenyu@redhat.com> Co-authored-by: Shelly Kagan <skagan@redhat.com> Co-authored-by: kubevirt-bot <kubevirtbot@redhat.com> Co-authored-by: Arnon Gilboa <agilboa@redhat.com>
1352 lines
46 KiB
Go
1352 lines
46 KiB
Go
/*
|
|
Copyright 2021 The CDI Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
limitations under the License.
|
|
See the License for the specific language governing permissions and
|
|
*/
|
|
|
|
package controller
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/containers/image/v5/docker/reference"
|
|
"github.com/go-logr/logr"
|
|
"github.com/gorhill/cronexpr"
|
|
imagev1 "github.com/openshift/api/image/v1"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
storagev1 "k8s.io/api/storage/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/validation"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/utils/pointer"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller"
|
|
"sigs.k8s.io/controller-runtime/pkg/event"
|
|
"sigs.k8s.io/controller-runtime/pkg/handler"
|
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
|
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
"sigs.k8s.io/controller-runtime/pkg/source"
|
|
|
|
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
|
|
"kubevirt.io/containerized-data-importer/pkg/common"
|
|
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
|
|
cdv "kubevirt.io/containerized-data-importer/pkg/controller/datavolume"
|
|
"kubevirt.io/containerized-data-importer/pkg/monitoring"
|
|
"kubevirt.io/containerized-data-importer/pkg/operator"
|
|
"kubevirt.io/containerized-data-importer/pkg/util"
|
|
"kubevirt.io/containerized-data-importer/pkg/util/naming"
|
|
)
|
|
|
|
const (
|
|
// ErrDataSourceAlreadyManaged provides a const to indicate DataSource already managed error
|
|
ErrDataSourceAlreadyManaged = "ErrDataSourceAlreadyManaged"
|
|
// MessageDataSourceAlreadyManaged provides a const to form DataSource already managed error message
|
|
MessageDataSourceAlreadyManaged = "DataSource %s is already managed by DataImportCron %s"
|
|
|
|
prometheusNsLabel = "ns"
|
|
prometheusCronNameLabel = "cron_name"
|
|
)
|
|
|
|
var (
|
|
// DataImportCronOutdatedGauge is the metric we use to alert about DataImportCrons failing
|
|
DataImportCronOutdatedGauge = prometheus.NewGaugeVec(
|
|
prometheus.GaugeOpts{
|
|
Name: monitoring.MetricOptsList[monitoring.DataImportCronOutdated].Name,
|
|
Help: monitoring.MetricOptsList[monitoring.DataImportCronOutdated].Help,
|
|
},
|
|
[]string{prometheusNsLabel, prometheusCronNameLabel},
|
|
)
|
|
)
|
|
|
|
// DataImportCronReconciler members
|
|
type DataImportCronReconciler struct {
|
|
client client.Client
|
|
uncachedClient client.Client
|
|
recorder record.EventRecorder
|
|
scheme *runtime.Scheme
|
|
log logr.Logger
|
|
image string
|
|
pullPolicy string
|
|
cdiNamespace string
|
|
installerLabels map[string]string
|
|
}
|
|
|
|
const (
|
|
// AnnSourceDesiredDigest is the digest of the pending updated image
|
|
AnnSourceDesiredDigest = cc.AnnAPIGroup + "/storage.import.sourceDesiredDigest"
|
|
// AnnImageStreamDockerRef is the ImageStream Docker reference
|
|
AnnImageStreamDockerRef = cc.AnnAPIGroup + "/storage.import.imageStreamDockerRef"
|
|
// AnnNextCronTime is the next time stamp which satisfies the cron expression
|
|
AnnNextCronTime = cc.AnnAPIGroup + "/storage.import.nextCronTime"
|
|
// AnnLastCronTime is the cron last execution time stamp
|
|
AnnLastCronTime = cc.AnnAPIGroup + "/storage.import.lastCronTime"
|
|
// AnnLastUseTime is the PVC last use time stamp
|
|
AnnLastUseTime = cc.AnnAPIGroup + "/storage.import.lastUseTime"
|
|
// AnnStorageClass is the cron DV's storage class
|
|
AnnStorageClass = cc.AnnAPIGroup + "/storage.import.storageClass"
|
|
|
|
dataImportControllerName = "dataimportcron-controller"
|
|
digestPrefix = "sha256:"
|
|
digestDvNameSuffixLength = 12
|
|
cronJobUIDSuffixLength = 8
|
|
defaultImportsToKeepPerCron = 3
|
|
)
|
|
|
|
// Reconcile loop for DataImportCronReconciler
|
|
func (r *DataImportCronReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
|
|
dataImportCron := &cdiv1.DataImportCron{}
|
|
if err := r.client.Get(ctx, req.NamespacedName, dataImportCron); cc.IgnoreNotFound(err) != nil {
|
|
return reconcile.Result{}, err
|
|
} else if err != nil || dataImportCron.DeletionTimestamp != nil {
|
|
err := r.cleanup(ctx, req.NamespacedName)
|
|
return reconcile.Result{}, err
|
|
}
|
|
shouldReconcile, err := r.shouldReconcileCron(ctx, dataImportCron)
|
|
if !shouldReconcile || err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
if err := r.initCron(ctx, dataImportCron); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
return r.update(ctx, dataImportCron)
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) shouldReconcileCron(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
|
|
dataSource := &cdiv1.DataSource{}
|
|
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: cron.Spec.ManagedDataSource}, dataSource); err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
return true, nil
|
|
}
|
|
return false, err
|
|
}
|
|
dataSourceCronLabel := dataSource.Labels[common.DataImportCronLabel]
|
|
if dataSourceCronLabel == cron.Name || dataSourceCronLabel == "" {
|
|
return true, nil
|
|
}
|
|
otherCron := &cdiv1.DataImportCron{}
|
|
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dataSourceCronLabel}, otherCron); err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
return true, nil
|
|
}
|
|
return false, err
|
|
}
|
|
if otherCron.Spec.ManagedDataSource == dataSource.Name {
|
|
msg := fmt.Sprintf(MessageDataSourceAlreadyManaged, dataSource.Name, otherCron.Name)
|
|
r.recorder.Event(cron, corev1.EventTypeWarning, ErrDataSourceAlreadyManaged, msg)
|
|
r.log.V(3).Info(msg)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) initCron(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
|
|
if dataImportCron.Spec.Schedule == "" {
|
|
return nil
|
|
}
|
|
if isImageStreamSource(dataImportCron) {
|
|
if dataImportCron.Annotations[AnnNextCronTime] == "" {
|
|
cc.AddAnnotation(dataImportCron, AnnNextCronTime, time.Now().Format(time.RFC3339))
|
|
}
|
|
return nil
|
|
}
|
|
if !isURLSource(dataImportCron) {
|
|
return nil
|
|
}
|
|
exists, err := r.cronJobExistsAndUpdated(ctx, dataImportCron)
|
|
if exists || err != nil {
|
|
return err
|
|
}
|
|
cronJob, err := r.newCronJob(dataImportCron)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := r.client.Create(ctx, cronJob); err != nil {
|
|
return err
|
|
}
|
|
job, err := r.newInitialJob(dataImportCron, cronJob)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := r.client.Create(ctx, job); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) getImageStream(ctx context.Context, imageStreamName, imageStreamNamespace string) (*imagev1.ImageStream, string, error) {
|
|
if imageStreamName == "" || imageStreamNamespace == "" {
|
|
return nil, "", errors.Errorf("Missing ImageStream name or namespace")
|
|
}
|
|
imageStream := &imagev1.ImageStream{}
|
|
name, tag, err := splitImageStreamName(imageStreamName)
|
|
if err != nil {
|
|
return nil, "", err
|
|
}
|
|
imageStreamNamespacedName := types.NamespacedName{
|
|
Namespace: imageStreamNamespace,
|
|
Name: name,
|
|
}
|
|
if err := r.client.Get(ctx, imageStreamNamespacedName, imageStream); err != nil {
|
|
return nil, "", err
|
|
}
|
|
return imageStream, tag, nil
|
|
}
|
|
|
|
func getImageStreamDigest(imageStream *imagev1.ImageStream, imageStreamTag string) (string, string, error) {
|
|
if imageStream == nil {
|
|
return "", "", errors.Errorf("No ImageStream")
|
|
}
|
|
tags := imageStream.Status.Tags
|
|
if len(tags) == 0 {
|
|
return "", "", errors.Errorf("ImageStream %s has no tags", imageStream.Name)
|
|
}
|
|
|
|
tagIdx := 0
|
|
if len(imageStreamTag) > 0 {
|
|
tagIdx = -1
|
|
for i, tag := range tags {
|
|
if tag.Tag == imageStreamTag {
|
|
tagIdx = i
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if tagIdx == -1 {
|
|
return "", "", errors.Errorf("ImageStream %s has no tag %s", imageStream.Name, imageStreamTag)
|
|
}
|
|
|
|
if len(tags[tagIdx].Items) == 0 {
|
|
return "", "", errors.Errorf("ImageStream %s tag %s has no items", imageStream.Name, imageStreamTag)
|
|
}
|
|
return tags[tagIdx].Items[0].Image, tags[tagIdx].Items[0].DockerImageReference, nil
|
|
}
|
|
|
|
func splitImageStreamName(imageStreamName string) (string, string, error) {
|
|
if subs := strings.Split(imageStreamName, ":"); len(subs) == 1 {
|
|
return imageStreamName, "", nil
|
|
} else if len(subs) == 2 && len(subs[0]) > 0 && len(subs[1]) > 0 {
|
|
return subs[0], subs[1], nil
|
|
}
|
|
return "", "", errors.Errorf("Illegal ImageStream name %s", imageStreamName)
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) pollImageStreamDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
|
|
if nextTimeStr := dataImportCron.Annotations[AnnNextCronTime]; nextTimeStr != "" {
|
|
nextTime, err := time.Parse(time.RFC3339, nextTimeStr)
|
|
if err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
if nextTime.Before(time.Now()) {
|
|
if err := r.updateImageStreamDesiredDigest(ctx, dataImportCron); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
}
|
|
return r.setNextCronTime(dataImportCron)
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) setNextCronTime(dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
|
|
now := time.Now()
|
|
expr, err := cronexpr.Parse(dataImportCron.Spec.Schedule)
|
|
if err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
nextTime := expr.Next(now)
|
|
diffSec := time.Duration(nextTime.Sub(now).Seconds()) + 1
|
|
res := reconcile.Result{Requeue: true, RequeueAfter: diffSec * time.Second}
|
|
cc.AddAnnotation(dataImportCron, AnnNextCronTime, nextTime.Format(time.RFC3339))
|
|
return res, err
|
|
}
|
|
|
|
func isImageStreamSource(dataImportCron *cdiv1.DataImportCron) bool {
|
|
regSource, err := getCronRegistrySource(dataImportCron)
|
|
return err == nil && regSource.ImageStream != nil
|
|
}
|
|
|
|
func isURLSource(dataImportCron *cdiv1.DataImportCron) bool {
|
|
regSource, err := getCronRegistrySource(dataImportCron)
|
|
return err == nil && regSource.URL != nil
|
|
}
|
|
|
|
func getCronRegistrySource(cron *cdiv1.DataImportCron) (*cdiv1.DataVolumeSourceRegistry, error) {
|
|
source := cron.Spec.Template.Spec.Source
|
|
if source == nil || source.Registry == nil {
|
|
return nil, errors.Errorf("Cron with no registry source %s", cron.Name)
|
|
}
|
|
return source.Registry, nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) update(ctx context.Context, dataImportCron *cdiv1.DataImportCron) (reconcile.Result, error) {
|
|
res := reconcile.Result{}
|
|
|
|
dv, pvc, err := r.getImportState(ctx, dataImportCron)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
dataImportCronCopy := dataImportCron.DeepCopy()
|
|
imports := dataImportCron.Status.CurrentImports
|
|
importSucceeded := false
|
|
|
|
dataVolume := dataImportCron.Spec.Template
|
|
explicitScName := getStorageClassFromTemplate(&dataVolume)
|
|
dvStorageClass, err := cc.GetStorageClassByName(ctx, r.client, explicitScName)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
if dvStorageClass != nil {
|
|
cc.AddAnnotation(dataImportCron, AnnStorageClass, dvStorageClass.Name)
|
|
}
|
|
format, err := r.getSourceFormat(ctx, dataImportCron, dvStorageClass)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
snapshot, err := r.getSnapshot(ctx, dataImportCron, format)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
handlePopulatedPvc := func() error {
|
|
if err := r.updateSource(ctx, dataImportCron, pvc); err != nil {
|
|
return err
|
|
}
|
|
importSucceeded = true
|
|
if err := r.handleCronFormat(ctx, dataImportCron, format, dvStorageClass); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
if dv != nil {
|
|
switch dv.Status.Phase {
|
|
case cdiv1.Succeeded:
|
|
if err := handlePopulatedPvc(); err != nil {
|
|
return res, err
|
|
}
|
|
case cdiv1.ImportScheduled:
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "Import is scheduled", scheduled)
|
|
case cdiv1.ImportInProgress:
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionTrue, "Import is progressing", inProgress)
|
|
default:
|
|
dvPhase := string(dv.Status.Phase)
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, fmt.Sprintf("Import DataVolume phase %s", dvPhase), dvPhase)
|
|
}
|
|
} else if pvc != nil {
|
|
// TODO: with plain populator PVCs (no DataVolumes) we may need to wait for corev1.Bound
|
|
if err := handlePopulatedPvc(); err != nil {
|
|
return res, err
|
|
}
|
|
} else if snapshot != nil {
|
|
if err := r.updateSource(ctx, dataImportCron, snapshot); err != nil {
|
|
return res, err
|
|
}
|
|
importSucceeded = true
|
|
} else {
|
|
if len(imports) > 0 {
|
|
dataImportCron.Status.CurrentImports = imports[1:]
|
|
}
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
|
|
}
|
|
|
|
if importSucceeded {
|
|
if err := updateDataImportCronOnSuccess(dataImportCron); err != nil {
|
|
return res, err
|
|
}
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronProgressing, corev1.ConditionFalse, "No current import", noImport)
|
|
if err := r.garbageCollectOldImports(ctx, dataImportCron); err != nil {
|
|
return res, err
|
|
}
|
|
}
|
|
|
|
if err := r.updateDataSource(ctx, dataImportCron, format); err != nil {
|
|
return res, err
|
|
}
|
|
|
|
// We use the poller returned reconcile.Result for RequeueAfter if needed
|
|
// skip if we disabled schedule
|
|
if isImageStreamSource(dataImportCron) && dataImportCron.Spec.Schedule != "" {
|
|
res, err := r.pollImageStreamDigest(ctx, dataImportCron)
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
}
|
|
|
|
desiredDigest := dataImportCron.Annotations[AnnSourceDesiredDigest]
|
|
digestUpdated := desiredDigest != "" && (len(imports) == 0 || desiredDigest != imports[0].Digest)
|
|
if digestUpdated {
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Source digest updated since last import", outdated)
|
|
if dv != nil {
|
|
if err := r.deleteErroneousDataVolume(ctx, dataImportCron, dv); err != nil {
|
|
return res, err
|
|
}
|
|
}
|
|
if importSucceeded || len(imports) == 0 {
|
|
if err := r.createImportDataVolume(ctx, dataImportCron); err != nil {
|
|
return res, err
|
|
}
|
|
}
|
|
} else if importSucceeded {
|
|
if err := r.updateDataImportCronSuccessCondition(ctx, dataImportCron, format, snapshot); err != nil {
|
|
return res, err
|
|
}
|
|
} else if len(imports) > 0 {
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Import is progressing", inProgress)
|
|
} else {
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "No source digest", noDigest)
|
|
}
|
|
|
|
if err := updateLastExecutionTimestamp(dataImportCron); err != nil {
|
|
return res, err
|
|
}
|
|
|
|
if !reflect.DeepEqual(dataImportCron, dataImportCronCopy) {
|
|
if err := r.client.Update(ctx, dataImportCron); err != nil {
|
|
return res, err
|
|
}
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
// Returns the current import DV if exists, and the last imported PVC
|
|
func (r *DataImportCronReconciler) getImportState(ctx context.Context, cron *cdiv1.DataImportCron) (*cdiv1.DataVolume, *corev1.PersistentVolumeClaim, error) {
|
|
imports := cron.Status.CurrentImports
|
|
if len(imports) == 0 {
|
|
return nil, nil, nil
|
|
}
|
|
|
|
dvName := imports[0].DataVolumeName
|
|
dv := &cdiv1.DataVolume{}
|
|
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, dv); err != nil {
|
|
if !k8serrors.IsNotFound(err) {
|
|
return nil, nil, err
|
|
}
|
|
dv = nil
|
|
}
|
|
|
|
pvc := &corev1.PersistentVolumeClaim{}
|
|
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: dvName}, pvc); err != nil {
|
|
if !k8serrors.IsNotFound(err) {
|
|
return nil, nil, err
|
|
}
|
|
pvc = nil
|
|
}
|
|
return dv, pvc, nil
|
|
}
|
|
|
|
// Returns the current import DV if exists, and the last imported PVC
|
|
func (r *DataImportCronReconciler) getSnapshot(ctx context.Context, cron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) (*snapshotv1.VolumeSnapshot, error) {
|
|
if format != cdiv1.DataImportCronSourceFormatSnapshot {
|
|
return nil, nil
|
|
}
|
|
|
|
imports := cron.Status.CurrentImports
|
|
if len(imports) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
snapName := imports[0].DataVolumeName
|
|
snapshot := &snapshotv1.VolumeSnapshot{}
|
|
if err := r.client.Get(ctx, types.NamespacedName{Namespace: cron.Namespace, Name: snapName}, snapshot); err != nil {
|
|
if !k8serrors.IsNotFound(err) {
|
|
return nil, err
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
return snapshot, nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) updateSource(ctx context.Context, cron *cdiv1.DataImportCron, obj client.Object) error {
|
|
objCopy := obj.DeepCopyObject()
|
|
cc.AddAnnotation(obj, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
|
|
r.setDataImportCronResourceLabels(cron, obj)
|
|
if !reflect.DeepEqual(obj, objCopy) {
|
|
if err := r.client.Update(ctx, obj); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) deleteErroneousDataVolume(ctx context.Context, cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume) error {
|
|
log := r.log.WithValues("name", dv.Name).WithValues("uid", dv.UID)
|
|
if cond := cdv.FindConditionByType(cdiv1.DataVolumeRunning, dv.Status.Conditions); cond != nil {
|
|
if cond.Status == corev1.ConditionFalse && cond.Reason == common.GenericError {
|
|
log.Info("Delete DataVolume and reset DesiredDigest due to error", "message", cond.Message)
|
|
// Unlabel the DV before deleting it, to eliminate reconcile before DIC is updated
|
|
dv.Labels[common.DataImportCronLabel] = ""
|
|
if err := r.client.Update(ctx, dv); cc.IgnoreNotFound(err) != nil {
|
|
return err
|
|
}
|
|
if err := r.client.Delete(ctx, dv); cc.IgnoreNotFound(err) != nil {
|
|
return err
|
|
}
|
|
cron.Status.CurrentImports = nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) updateImageStreamDesiredDigest(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
|
|
log := r.log.WithValues("name", dataImportCron.Name).WithValues("uid", dataImportCron.UID)
|
|
regSource, err := getCronRegistrySource(dataImportCron)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if regSource.ImageStream == nil {
|
|
return nil
|
|
}
|
|
imageStream, imageStreamTag, err := r.getImageStream(ctx, *regSource.ImageStream, dataImportCron.Namespace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
digest, dockerRef, err := getImageStreamDigest(imageStream, imageStreamTag)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cc.AddAnnotation(dataImportCron, AnnLastCronTime, time.Now().Format(time.RFC3339))
|
|
if digest != "" && dataImportCron.Annotations[AnnSourceDesiredDigest] != digest {
|
|
log.Info("Updating DataImportCron", "digest", digest)
|
|
cc.AddAnnotation(dataImportCron, AnnSourceDesiredDigest, digest)
|
|
cc.AddAnnotation(dataImportCron, AnnImageStreamDockerRef, dockerRef)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) updateDataSource(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat) error {
|
|
log := r.log.WithName("updateDataSource")
|
|
dataSourceName := dataImportCron.Spec.ManagedDataSource
|
|
dataSource := &cdiv1.DataSource{}
|
|
if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dataSourceName}, dataSource); err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
dataSource = r.newDataSource(dataImportCron)
|
|
if err := r.client.Create(ctx, dataSource); err != nil {
|
|
return err
|
|
}
|
|
log.Info("DataSource created", "name", dataSourceName, "uid", dataSource.UID)
|
|
} else {
|
|
return err
|
|
}
|
|
}
|
|
if dataSource.Labels[common.DataImportCronLabel] == "" {
|
|
log.Info("DataSource has no DataImportCron label, so it is not updated", "name", dataSourceName, "uid", dataSource.UID)
|
|
return nil
|
|
}
|
|
dataSourceCopy := dataSource.DeepCopy()
|
|
r.setDataImportCronResourceLabels(dataImportCron, dataSource)
|
|
|
|
passCronLabelToDataSource(dataImportCron, dataSource, cc.LabelDefaultInstancetype)
|
|
passCronLabelToDataSource(dataImportCron, dataSource, cc.LabelDefaultInstancetypeKind)
|
|
passCronLabelToDataSource(dataImportCron, dataSource, cc.LabelDefaultPreference)
|
|
passCronLabelToDataSource(dataImportCron, dataSource, cc.LabelDefaultPreferenceKind)
|
|
|
|
sourcePVC := dataImportCron.Status.LastImportedPVC
|
|
populateDataSource(format, dataSource, sourcePVC)
|
|
|
|
if !reflect.DeepEqual(dataSource, dataSourceCopy) {
|
|
if err := r.client.Update(ctx, dataSource); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func populateDataSource(format cdiv1.DataImportCronSourceFormat, dataSource *cdiv1.DataSource, sourcePVC *cdiv1.DataVolumeSourcePVC) {
|
|
if sourcePVC == nil {
|
|
return
|
|
}
|
|
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
dataSource.Spec.Source = cdiv1.DataSourceSource{
|
|
PVC: sourcePVC,
|
|
}
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
dataSource.Spec.Source = cdiv1.DataSourceSource{
|
|
Snapshot: &cdiv1.DataVolumeSourceSnapshot{
|
|
Namespace: sourcePVC.Namespace,
|
|
Name: sourcePVC.Name,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
func updateDataImportCronOnSuccess(dataImportCron *cdiv1.DataImportCron) error {
|
|
if dataImportCron.Status.CurrentImports == nil {
|
|
return errors.Errorf("No CurrentImports in cron %s", dataImportCron.Name)
|
|
}
|
|
sourcePVC := &cdiv1.DataVolumeSourcePVC{
|
|
Namespace: dataImportCron.Namespace,
|
|
Name: dataImportCron.Status.CurrentImports[0].DataVolumeName,
|
|
}
|
|
if dataImportCron.Status.LastImportedPVC == nil || *dataImportCron.Status.LastImportedPVC != *sourcePVC {
|
|
dataImportCron.Status.LastImportedPVC = sourcePVC
|
|
now := metav1.Now()
|
|
dataImportCron.Status.LastImportTimestamp = &now
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func updateLastExecutionTimestamp(cron *cdiv1.DataImportCron) error {
|
|
lastTimeStr := cron.Annotations[AnnLastCronTime]
|
|
if lastTimeStr == "" {
|
|
return nil
|
|
}
|
|
lastTime, err := time.Parse(time.RFC3339, lastTimeStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ts := cron.Status.LastExecutionTimestamp; ts == nil || ts.Time != lastTime {
|
|
cron.Status.LastExecutionTimestamp = &metav1.Time{Time: lastTime}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) createImportDataVolume(ctx context.Context, dataImportCron *cdiv1.DataImportCron) error {
|
|
dataSourceName := dataImportCron.Spec.ManagedDataSource
|
|
digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
|
|
if digest == "" {
|
|
return nil
|
|
}
|
|
dvName, err := createDvName(dataSourceName, digest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dataImportCron.Status.CurrentImports = []cdiv1.ImportStatus{{DataVolumeName: dvName, Digest: digest}}
|
|
|
|
sources := []client.Object{&snapshotv1.VolumeSnapshot{}, &corev1.PersistentVolumeClaim{}}
|
|
for _, src := range sources {
|
|
if err := r.client.Get(ctx, types.NamespacedName{Namespace: dataImportCron.Namespace, Name: dvName}, src); err != nil {
|
|
if !k8serrors.IsNotFound(err) && !meta.IsNoMatchError(err) {
|
|
return err
|
|
}
|
|
} else {
|
|
if err := r.updateSource(ctx, dataImportCron, src); err != nil {
|
|
return err
|
|
}
|
|
// If source exists don't create DV
|
|
return nil
|
|
}
|
|
}
|
|
|
|
dv := r.newSourceDataVolume(dataImportCron, dvName)
|
|
if err := r.client.Create(ctx, dv); err != nil && !k8serrors.IsAlreadyExists(err) {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) handleCronFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, dvStorageClass *storagev1.StorageClass) error {
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
return nil
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
return r.handleSnapshot(ctx, dataImportCron, &dataImportCron.Spec.Template, dvStorageClass)
|
|
default:
|
|
return fmt.Errorf("unknown source format for snapshot")
|
|
}
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) handleSnapshot(ctx context.Context, dataImportCron *cdiv1.DataImportCron, dataVolume *cdiv1.DataVolume, dvStorageClass *storagev1.StorageClass) error {
|
|
dataSourceName := dataImportCron.Spec.ManagedDataSource
|
|
digest := dataImportCron.Annotations[AnnSourceDesiredDigest]
|
|
if digest == "" {
|
|
return nil
|
|
}
|
|
dvName, err := createDvName(dataSourceName, digest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
className, err := cc.GetSnapshotClassForSmartClone(dataVolume.Name, &dvStorageClass.Name, r.log, r.client)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
labels := map[string]string{
|
|
common.CDILabelKey: common.CDILabelValue,
|
|
common.CDIComponentLabel: "",
|
|
}
|
|
desiredSnapshot := &snapshotv1.VolumeSnapshot{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: dvName,
|
|
Namespace: dataImportCron.Namespace,
|
|
Labels: labels,
|
|
},
|
|
Spec: snapshotv1.VolumeSnapshotSpec{
|
|
Source: snapshotv1.VolumeSnapshotSource{
|
|
PersistentVolumeClaimName: &dvName,
|
|
},
|
|
VolumeSnapshotClassName: &className,
|
|
},
|
|
}
|
|
r.setDataImportCronResourceLabels(dataImportCron, desiredSnapshot)
|
|
|
|
currentSnapshot := &snapshotv1.VolumeSnapshot{}
|
|
if err := r.client.Get(context.TODO(), client.ObjectKeyFromObject(desiredSnapshot), currentSnapshot); err != nil {
|
|
if !k8serrors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
cc.AddAnnotation(desiredSnapshot, AnnLastUseTime, time.Now().UTC().Format(time.RFC3339Nano))
|
|
if err := r.client.Create(ctx, desiredSnapshot); err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
if cc.IsSnapshotReady(currentSnapshot) {
|
|
// Clean up DV/PVC as they are not needed anymore
|
|
r.log.Info("Deleting dv/pvc as snapshot is ready", "name", desiredSnapshot.Name)
|
|
if err := r.deleteDvPvc(ctx, desiredSnapshot.Name, desiredSnapshot.Namespace); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) updateDataImportCronSuccessCondition(ctx context.Context, dataImportCron *cdiv1.DataImportCron, format cdiv1.DataImportCronSourceFormat, snapshot *snapshotv1.VolumeSnapshot) error {
|
|
switch format {
|
|
case cdiv1.DataImportCronSourceFormatPvc:
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
|
|
case cdiv1.DataImportCronSourceFormatSnapshot:
|
|
if snapshot == nil {
|
|
// Snapshot create/update will trigger reconcile
|
|
return nil
|
|
}
|
|
if cc.IsSnapshotReady(snapshot) {
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionTrue, "Latest import is up to date", upToDate)
|
|
} else {
|
|
updateDataImportCronCondition(dataImportCron, cdiv1.DataImportCronUpToDate, corev1.ConditionFalse, "Snapshot of imported data is progressing", inProgress)
|
|
}
|
|
default:
|
|
return fmt.Errorf("unknown source format for snapshot")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) getSourceFormat(ctx context.Context, dataImportCron *cdiv1.DataImportCron, dvStorageClass *storagev1.StorageClass) (cdiv1.DataImportCronSourceFormat, error) {
|
|
format := cdiv1.DataImportCronSourceFormatPvc
|
|
if dvStorageClass == nil {
|
|
return format, nil
|
|
}
|
|
|
|
storageProfile := &cdiv1.StorageProfile{}
|
|
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: dvStorageClass.Name}, storageProfile); err != nil {
|
|
return format, err
|
|
}
|
|
if storageProfile.Status.DataImportCronSourceFormat != nil {
|
|
format = *storageProfile.Status.DataImportCronSourceFormat
|
|
}
|
|
|
|
return format, nil
|
|
}
|
|
|
|
func getStorageClassFromTemplate(dataVolume *cdiv1.DataVolume) *string {
|
|
if dataVolume.Spec.PVC != nil {
|
|
return dataVolume.Spec.PVC.StorageClassName
|
|
}
|
|
|
|
if dataVolume.Spec.Storage != nil {
|
|
return dataVolume.Spec.Storage.StorageClassName
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) garbageCollectOldImports(ctx context.Context, cron *cdiv1.DataImportCron) error {
|
|
if cron.Spec.GarbageCollect != nil && *cron.Spec.GarbageCollect != cdiv1.DataImportCronGarbageCollectOutdated {
|
|
return nil
|
|
}
|
|
selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
maxImports := defaultImportsToKeepPerCron
|
|
|
|
if cron.Spec.ImportsToKeep != nil && *cron.Spec.ImportsToKeep >= 0 {
|
|
maxImports = int(*cron.Spec.ImportsToKeep)
|
|
}
|
|
|
|
if err := r.garbageCollectPVCs(ctx, cron.Namespace, selector, maxImports); err != nil {
|
|
return err
|
|
}
|
|
if err := r.garbageCollectSnapshots(ctx, cron.Namespace, selector, maxImports); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) garbageCollectPVCs(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
|
|
pvcList := &corev1.PersistentVolumeClaimList{}
|
|
|
|
if err := r.client.List(ctx, pvcList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
|
|
return err
|
|
}
|
|
if len(pvcList.Items) > maxImports {
|
|
sort.Slice(pvcList.Items, func(i, j int) bool {
|
|
return pvcList.Items[i].Annotations[AnnLastUseTime] > pvcList.Items[j].Annotations[AnnLastUseTime]
|
|
})
|
|
for _, pvc := range pvcList.Items[maxImports:] {
|
|
r.log.Info("Deleting dv/pvc", "name", pvc.Name, "pvc.uid", pvc.UID)
|
|
if err := r.deleteDvPvc(ctx, pvc.Name, pvc.Namespace); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// deleteDvPvc deletes DV or PVC if DV was GCed
|
|
func (r *DataImportCronReconciler) deleteDvPvc(ctx context.Context, name, namespace string) error {
|
|
om := metav1.ObjectMeta{Name: name, Namespace: namespace}
|
|
dv := &cdiv1.DataVolume{ObjectMeta: om}
|
|
if err := r.client.Delete(ctx, dv); err == nil || !k8serrors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
pvc := &corev1.PersistentVolumeClaim{ObjectMeta: om}
|
|
if err := r.client.Delete(ctx, pvc); err != nil && !k8serrors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) garbageCollectSnapshots(ctx context.Context, namespace string, selector labels.Selector, maxImports int) error {
|
|
snapList := &snapshotv1.VolumeSnapshotList{}
|
|
|
|
if err := r.client.List(ctx, snapList, &client.ListOptions{Namespace: namespace, LabelSelector: selector}); err != nil {
|
|
if meta.IsNoMatchError(err) {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
if len(snapList.Items) > maxImports {
|
|
sort.Slice(snapList.Items, func(i, j int) bool {
|
|
return snapList.Items[i].Annotations[AnnLastUseTime] > snapList.Items[j].Annotations[AnnLastUseTime]
|
|
})
|
|
for _, snap := range snapList.Items[maxImports:] {
|
|
r.log.Info("Deleting snapshot", "name", snap.Name, "uid", snap.UID)
|
|
if err := r.client.Delete(ctx, &snap); err != nil && !k8serrors.IsNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) cleanup(ctx context.Context, cron types.NamespacedName) error {
|
|
// Don't keep alerting over a cron thats being deleted, will get set back to 1 again by reconcile loop if needed.
|
|
DataImportCronOutdatedGauge.With(getPrometheusCronLabels(cron)).Set(0)
|
|
if err := r.deleteJobs(ctx, cron); err != nil {
|
|
return err
|
|
}
|
|
selector, err := getSelector(map[string]string{common.DataImportCronLabel: cron.Name, common.DataImportCronCleanupLabel: "true"})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
opts := &client.DeleteAllOfOptions{ListOptions: client.ListOptions{Namespace: cron.Namespace, LabelSelector: selector}}
|
|
if err := r.client.DeleteAllOf(ctx, &cdiv1.DataSource{}, opts); err != nil {
|
|
return err
|
|
}
|
|
if err := r.client.DeleteAllOf(ctx, &cdiv1.DataVolume{}, opts); err != nil {
|
|
return err
|
|
}
|
|
if err := r.client.DeleteAllOf(ctx, &corev1.PersistentVolumeClaim{}, opts); err != nil {
|
|
return err
|
|
}
|
|
if err := r.client.DeleteAllOf(ctx, &snapshotv1.VolumeSnapshot{}, opts); cc.IgnoreIsNoMatchError(err) != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) deleteJobs(ctx context.Context, cron types.NamespacedName) error {
|
|
deletePropagationBackground := metav1.DeletePropagationBackground
|
|
deleteOpts := &client.DeleteOptions{PropagationPolicy: &deletePropagationBackground}
|
|
selector, err := getSelector(map[string]string{common.DataImportCronLabel: getCronJobLabelValue(cron.Namespace, cron.Name)})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
cronJobList := &batchv1.CronJobList{}
|
|
if err := r.client.List(ctx, cronJobList, &client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}); err != nil {
|
|
return err
|
|
}
|
|
for _, cronJob := range cronJobList.Items {
|
|
if err := r.client.Delete(ctx, &cronJob, deleteOpts); cc.IgnoreNotFound(err) != nil {
|
|
return err
|
|
}
|
|
}
|
|
jobList := &batchv1.JobList{}
|
|
if err := r.client.List(ctx, jobList, &client.ListOptions{Namespace: r.cdiNamespace, LabelSelector: selector}); err != nil {
|
|
return err
|
|
}
|
|
for _, job := range jobList.Items {
|
|
if err := r.client.Delete(ctx, &job, deleteOpts); cc.IgnoreNotFound(err) != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// NewDataImportCronController creates a new instance of the DataImportCron controller
|
|
func NewDataImportCronController(mgr manager.Manager, log logr.Logger, importerImage, pullPolicy string, installerLabels map[string]string) (controller.Controller, error) {
|
|
uncachedClient, err := client.New(mgr.GetConfig(), client.Options{
|
|
Scheme: mgr.GetScheme(),
|
|
Mapper: mgr.GetRESTMapper(),
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
reconciler := &DataImportCronReconciler{
|
|
client: mgr.GetClient(),
|
|
uncachedClient: uncachedClient,
|
|
recorder: mgr.GetEventRecorderFor(dataImportControllerName),
|
|
scheme: mgr.GetScheme(),
|
|
log: log.WithName(dataImportControllerName),
|
|
image: importerImage,
|
|
pullPolicy: pullPolicy,
|
|
cdiNamespace: util.GetNamespace(),
|
|
installerLabels: installerLabels,
|
|
}
|
|
dataImportCronController, err := controller.New(dataImportControllerName, mgr, controller.Options{Reconciler: reconciler})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := addDataImportCronControllerWatches(mgr, dataImportCronController, log); err != nil {
|
|
return nil, err
|
|
}
|
|
log.Info("Initialized DataImportCron controller")
|
|
return dataImportCronController, nil
|
|
}
|
|
|
|
func addDataImportCronControllerWatches(mgr manager.Manager, c controller.Controller, log logr.Logger) error {
|
|
if err := c.Watch(&source.Kind{Type: &cdiv1.DataImportCron{}}, &handler.EnqueueRequestForObject{}); err != nil {
|
|
return err
|
|
}
|
|
|
|
getCronName := func(obj client.Object) string {
|
|
return obj.GetLabels()[common.DataImportCronLabel]
|
|
}
|
|
mapSourceObjectToCron := func(obj client.Object) []reconcile.Request {
|
|
if cronName := getCronName(obj); cronName != "" {
|
|
return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: cronName, Namespace: obj.GetNamespace()}}}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
mapStorageProfileToCron := func(obj client.Object) (reqs []reconcile.Request) {
|
|
// TODO: Get rid of this after at least one version; use indexer on storage class annotation instead
|
|
// Otherwise we risk losing the storage profile event
|
|
var crons cdiv1.DataImportCronList
|
|
if err := mgr.GetClient().List(context.TODO(), &crons); err != nil {
|
|
c.GetLogger().Error(err, "Unable to list DataImportCrons")
|
|
return
|
|
}
|
|
// Storage profiles are 1:1 to storage classes
|
|
scName := obj.GetName()
|
|
for _, cron := range crons.Items {
|
|
dataVolume := cron.Spec.Template
|
|
explicitScName := getStorageClassFromTemplate(&dataVolume)
|
|
templateSc, err := cc.GetStorageClassByName(context.TODO(), mgr.GetClient(), explicitScName)
|
|
if err != nil || templateSc == nil {
|
|
c.GetLogger().Error(err, "Unable to get storage class", "templateSc", templateSc)
|
|
return
|
|
}
|
|
if templateSc.Name == scName {
|
|
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: cron.Namespace, Name: cron.Name}})
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
if err := c.Watch(&source.Kind{Type: &cdiv1.DataVolume{}},
|
|
handler.EnqueueRequestsFromMapFunc(mapSourceObjectToCron),
|
|
predicate.Funcs{
|
|
CreateFunc: func(event.CreateEvent) bool { return false },
|
|
UpdateFunc: func(e event.UpdateEvent) bool { return getCronName(e.ObjectNew) != "" },
|
|
DeleteFunc: func(e event.DeleteEvent) bool { return getCronName(e.Object) != "" },
|
|
},
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.Watch(&source.Kind{Type: &cdiv1.DataSource{}},
|
|
handler.EnqueueRequestsFromMapFunc(mapSourceObjectToCron),
|
|
predicate.Funcs{
|
|
CreateFunc: func(event.CreateEvent) bool { return false },
|
|
UpdateFunc: func(e event.UpdateEvent) bool { return getCronName(e.ObjectNew) != "" },
|
|
DeleteFunc: func(e event.DeleteEvent) bool { return getCronName(e.Object) != "" },
|
|
},
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := c.Watch(&source.Kind{Type: &cdiv1.StorageProfile{}},
|
|
handler.EnqueueRequestsFromMapFunc(mapStorageProfileToCron),
|
|
predicate.Funcs{
|
|
CreateFunc: func(event.CreateEvent) bool { return true },
|
|
DeleteFunc: func(event.DeleteEvent) bool { return false },
|
|
UpdateFunc: func(e event.UpdateEvent) bool {
|
|
profileOld, okOld := e.ObjectOld.(*cdiv1.StorageProfile)
|
|
profileNew, okNew := e.ObjectNew.(*cdiv1.StorageProfile)
|
|
return okOld && okNew && profileOld.Status.DataImportCronSourceFormat != profileNew.Status.DataImportCronSourceFormat
|
|
},
|
|
},
|
|
); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) cronJobExistsAndUpdated(ctx context.Context, cron *cdiv1.DataImportCron) (bool, error) {
|
|
cronJob := &batchv1.CronJob{}
|
|
cronJobKey := types.NamespacedName{Namespace: r.cdiNamespace, Name: GetCronJobName(cron)}
|
|
if err := r.client.Get(ctx, cronJobKey, cronJob); err != nil {
|
|
return false, cc.IgnoreNotFound(err)
|
|
}
|
|
|
|
cronJobCopy := cronJob.DeepCopy()
|
|
if err := r.initCronJob(cron, cronJobCopy); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if !reflect.DeepEqual(cronJob, cronJobCopy) {
|
|
r.log.Info("Updating CronJob", "name", cronJob.GetName())
|
|
if err := r.client.Update(ctx, cronJobCopy); err != nil {
|
|
return false, cc.IgnoreNotFound(err)
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) newCronJob(cron *cdiv1.DataImportCron) (*batchv1.CronJob, error) {
|
|
cronJob := &batchv1.CronJob{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: GetCronJobName(cron),
|
|
Namespace: r.cdiNamespace,
|
|
},
|
|
}
|
|
if err := r.initCronJob(cron, cronJob); err != nil {
|
|
return nil, err
|
|
}
|
|
return cronJob, nil
|
|
}
|
|
|
|
// InitPollerPodSpec inits poller PodSpec
|
|
func InitPollerPodSpec(c client.Client, cron *cdiv1.DataImportCron, podSpec *corev1.PodSpec, image string, pullPolicy corev1.PullPolicy, log logr.Logger) error {
|
|
regSource, err := getCronRegistrySource(cron)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if regSource.URL == nil {
|
|
return errors.Errorf("No URL source in cron %s", cron.Name)
|
|
}
|
|
cdiConfig := &cdiv1.CDIConfig{}
|
|
if err := c.Get(context.TODO(), types.NamespacedName{Name: common.ConfigName}, cdiConfig); err != nil {
|
|
return err
|
|
}
|
|
insecureTLS, err := IsInsecureTLS(*regSource.URL, cdiConfig, log)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
container := corev1.Container{
|
|
Name: "cdi-source-update-poller",
|
|
Image: image,
|
|
Command: []string{
|
|
"/usr/bin/cdi-source-update-poller",
|
|
"-ns", cron.Namespace,
|
|
"-cron", cron.Name,
|
|
"-url", *regSource.URL,
|
|
},
|
|
ImagePullPolicy: pullPolicy,
|
|
TerminationMessagePath: corev1.TerminationMessagePathDefault,
|
|
TerminationMessagePolicy: corev1.TerminationMessageReadFile,
|
|
}
|
|
|
|
var volumes []corev1.Volume
|
|
hasCertConfigMap := regSource.CertConfigMap != nil && *regSource.CertConfigMap != ""
|
|
if hasCertConfigMap {
|
|
vm := corev1.VolumeMount{
|
|
Name: CertVolName,
|
|
MountPath: common.ImporterCertDir,
|
|
}
|
|
container.VolumeMounts = append(container.VolumeMounts, vm)
|
|
container.Command = append(container.Command, "-certdir", common.ImporterCertDir)
|
|
volumes = append(volumes, createConfigMapVolume(CertVolName, *regSource.CertConfigMap))
|
|
}
|
|
|
|
if volName, _ := GetImportProxyConfig(cdiConfig, common.ImportProxyConfigMapName); volName != "" {
|
|
vm := corev1.VolumeMount{
|
|
Name: ProxyCertVolName,
|
|
MountPath: common.ImporterProxyCertDir,
|
|
}
|
|
container.VolumeMounts = append(container.VolumeMounts, vm)
|
|
volumes = append(volumes, createConfigMapVolume(ProxyCertVolName, volName))
|
|
}
|
|
|
|
if regSource.SecretRef != nil && *regSource.SecretRef != "" {
|
|
container.Env = append(container.Env,
|
|
corev1.EnvVar{
|
|
Name: common.ImporterAccessKeyID,
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
SecretKeyRef: &corev1.SecretKeySelector{
|
|
LocalObjectReference: corev1.LocalObjectReference{
|
|
Name: *regSource.SecretRef,
|
|
},
|
|
Key: common.KeyAccess,
|
|
},
|
|
},
|
|
},
|
|
corev1.EnvVar{
|
|
Name: common.ImporterSecretKey,
|
|
ValueFrom: &corev1.EnvVarSource{
|
|
SecretKeyRef: &corev1.SecretKeySelector{
|
|
LocalObjectReference: corev1.LocalObjectReference{
|
|
Name: *regSource.SecretRef,
|
|
},
|
|
Key: common.KeySecret,
|
|
},
|
|
},
|
|
},
|
|
)
|
|
}
|
|
|
|
addEnvVar := func(varName, value string) {
|
|
container.Env = append(container.Env, corev1.EnvVar{Name: varName, Value: value})
|
|
}
|
|
|
|
if insecureTLS {
|
|
addEnvVar(common.InsecureTLSVar, "true")
|
|
}
|
|
|
|
addEnvVarFromImportProxyConfig := func(varName string) {
|
|
if value, err := GetImportProxyConfig(cdiConfig, varName); err == nil {
|
|
addEnvVar(varName, value)
|
|
}
|
|
}
|
|
|
|
addEnvVarFromImportProxyConfig(common.ImportProxyHTTP)
|
|
addEnvVarFromImportProxyConfig(common.ImportProxyHTTPS)
|
|
addEnvVarFromImportProxyConfig(common.ImportProxyNoProxy)
|
|
|
|
imagePullSecrets, err := cc.GetImagePullSecrets(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
workloadNodePlacement, err := cc.GetWorkloadNodePlacement(context.TODO(), c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
podSpec.RestartPolicy = corev1.RestartPolicyNever
|
|
podSpec.TerminationGracePeriodSeconds = pointer.Int64(0)
|
|
podSpec.Containers = []corev1.Container{container}
|
|
podSpec.ServiceAccountName = common.CronJobServiceAccountName
|
|
podSpec.Volumes = volumes
|
|
podSpec.ImagePullSecrets = imagePullSecrets
|
|
podSpec.NodeSelector = workloadNodePlacement.NodeSelector
|
|
podSpec.Tolerations = workloadNodePlacement.Tolerations
|
|
podSpec.Affinity = workloadNodePlacement.Affinity
|
|
|
|
cc.SetRestrictedSecurityContext(podSpec)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) initCronJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) error {
|
|
cronJobSpec := &cronJob.Spec
|
|
cronJobSpec.Schedule = cron.Spec.Schedule
|
|
cronJobSpec.ConcurrencyPolicy = batchv1.ForbidConcurrent
|
|
cronJobSpec.SuccessfulJobsHistoryLimit = pointer.Int32(1)
|
|
cronJobSpec.FailedJobsHistoryLimit = pointer.Int32(1)
|
|
|
|
jobSpec := &cronJobSpec.JobTemplate.Spec
|
|
jobSpec.BackoffLimit = pointer.Int32(2)
|
|
jobSpec.TTLSecondsAfterFinished = pointer.Int32(10)
|
|
|
|
podSpec := &jobSpec.Template.Spec
|
|
if err := InitPollerPodSpec(r.client, cron, podSpec, r.image, corev1.PullPolicy(r.pullPolicy), r.log); err != nil {
|
|
return err
|
|
}
|
|
if err := r.setJobCommon(cron, cronJob); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) newInitialJob(cron *cdiv1.DataImportCron, cronJob *batchv1.CronJob) (*batchv1.Job, error) {
|
|
job := &batchv1.Job{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: GetInitialJobName(cron),
|
|
Namespace: cronJob.Namespace,
|
|
},
|
|
Spec: cronJob.Spec.JobTemplate.Spec,
|
|
}
|
|
if err := r.setJobCommon(cron, job); err != nil {
|
|
return nil, err
|
|
}
|
|
return job, nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) setJobCommon(cron *cdiv1.DataImportCron, obj metav1.Object) error {
|
|
if err := operator.SetOwnerRuntime(r.uncachedClient, obj); err != nil {
|
|
return err
|
|
}
|
|
util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
|
|
labels := obj.GetLabels()
|
|
labels[common.DataImportCronLabel] = getCronJobLabelValue(cron.Namespace, cron.Name)
|
|
obj.SetLabels(labels)
|
|
return nil
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) newSourceDataVolume(cron *cdiv1.DataImportCron, dataVolumeName string) *cdiv1.DataVolume {
|
|
var digestedURL string
|
|
dv := cron.Spec.Template.DeepCopy()
|
|
if isURLSource(cron) {
|
|
digestedURL = untagDigestedDockerURL(*dv.Spec.Source.Registry.URL + "@" + cron.Annotations[AnnSourceDesiredDigest])
|
|
} else if isImageStreamSource(cron) {
|
|
// No way to import image stream by name when we want specific digest, so we use its docker reference
|
|
digestedURL = "docker://" + cron.Annotations[AnnImageStreamDockerRef]
|
|
dv.Spec.Source.Registry.ImageStream = nil
|
|
}
|
|
dv.Spec.Source.Registry.URL = &digestedURL
|
|
dv.Name = dataVolumeName
|
|
dv.Namespace = cron.Namespace
|
|
r.setDataImportCronResourceLabels(cron, dv)
|
|
cc.AddAnnotation(dv, cc.AnnImmediateBinding, "true")
|
|
passCronAnnotationToDv(cron, dv, cc.AnnPodRetainAfterCompletion)
|
|
|
|
passCronLabelToDv(cron, dv, cc.LabelDefaultInstancetype)
|
|
passCronLabelToDv(cron, dv, cc.LabelDefaultInstancetypeKind)
|
|
passCronLabelToDv(cron, dv, cc.LabelDefaultPreference)
|
|
passCronLabelToDv(cron, dv, cc.LabelDefaultPreferenceKind)
|
|
|
|
return dv
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) setDataImportCronResourceLabels(cron *cdiv1.DataImportCron, obj metav1.Object) {
|
|
util.SetRecommendedLabels(obj, r.installerLabels, common.CDIControllerName)
|
|
labels := obj.GetLabels()
|
|
labels[common.DataImportCronLabel] = cron.Name
|
|
if cron.Spec.RetentionPolicy != nil && *cron.Spec.RetentionPolicy == cdiv1.DataImportCronRetainNone {
|
|
labels[common.DataImportCronCleanupLabel] = "true"
|
|
}
|
|
obj.SetLabels(labels)
|
|
}
|
|
|
|
func untagDigestedDockerURL(dockerURL string) string {
|
|
if u, err := url.Parse(dockerURL); err == nil {
|
|
url := u.Host + u.Path
|
|
subs := reference.ReferenceRegexp.FindStringSubmatch(url)
|
|
// Check for tag
|
|
if len(subs) > 2 && len(subs[2]) > 0 {
|
|
if untaggedRef, err := reference.ParseDockerRef(url); err == nil {
|
|
return u.Scheme + "://" + untaggedRef.String()
|
|
}
|
|
}
|
|
}
|
|
return dockerURL
|
|
}
|
|
|
|
func passCronLabelToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
|
|
if val := cron.Labels[ann]; val != "" {
|
|
cc.AddLabel(dv, ann, val)
|
|
}
|
|
}
|
|
|
|
func passCronAnnotationToDv(cron *cdiv1.DataImportCron, dv *cdiv1.DataVolume, ann string) {
|
|
if val := cron.Annotations[ann]; val != "" {
|
|
cc.AddAnnotation(dv, ann, val)
|
|
}
|
|
}
|
|
|
|
func passCronLabelToDataSource(cron *cdiv1.DataImportCron, ds *cdiv1.DataSource, ann string) {
|
|
if val := cron.Labels[ann]; val != "" {
|
|
cc.AddLabel(ds, ann, val)
|
|
}
|
|
}
|
|
|
|
func (r *DataImportCronReconciler) newDataSource(cron *cdiv1.DataImportCron) *cdiv1.DataSource {
|
|
dataSource := &cdiv1.DataSource{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: cron.Spec.ManagedDataSource,
|
|
Namespace: cron.Namespace,
|
|
},
|
|
}
|
|
util.SetRecommendedLabels(dataSource, r.installerLabels, common.CDIControllerName)
|
|
dataSource.Labels[common.DataImportCronLabel] = cron.Name
|
|
return dataSource
|
|
}
|
|
|
|
// Create DataVolume name based on the DataSource name + prefix of the digest sha256
|
|
func createDvName(prefix, digest string) (string, error) {
|
|
fromIdx := len(digestPrefix)
|
|
toIdx := fromIdx + digestDvNameSuffixLength
|
|
if !strings.HasPrefix(digest, digestPrefix) {
|
|
return "", errors.Errorf("Digest has no supported prefix")
|
|
}
|
|
if len(digest) < toIdx {
|
|
return "", errors.Errorf("Digest is too short")
|
|
}
|
|
return naming.GetResourceName(prefix, digest[fromIdx:toIdx]), nil
|
|
}
|
|
|
|
// GetCronJobName get CronJob name based on cron name and UID
|
|
func GetCronJobName(cron *cdiv1.DataImportCron) string {
|
|
return naming.GetResourceName(cron.Name, string(cron.UID)[:cronJobUIDSuffixLength])
|
|
}
|
|
|
|
// GetInitialJobName get initial job name based on cron name and UID
|
|
func GetInitialJobName(cron *cdiv1.DataImportCron) string {
|
|
return naming.GetResourceName("initial-job", GetCronJobName(cron))
|
|
}
|
|
|
|
func getSelector(matchLabels map[string]string) (labels.Selector, error) {
|
|
return metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: matchLabels})
|
|
}
|
|
|
|
func getCronJobLabelValue(cronNamespace, cronName string) string {
|
|
const maxLen = validation.DNS1035LabelMaxLength
|
|
label := cronNamespace + "." + cronName
|
|
if len(label) > maxLen {
|
|
return label[:maxLen]
|
|
}
|
|
return label
|
|
}
|