/* Copyright 2018 The CDI Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package transfer import ( "context" "encoding/json" "fmt" "strings" "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1" ) const ( // AnnObjectTransferName holds the name of related ObjectTransfer resource AnnObjectTransferName = "cdi.kubevirt.io/objectTransferName" objectTransferFinalizer = "cdi.kubevirt.io/objectTransfer" defaultRequeue = 2 * time.Second ) type transferHandler interface { ReconcilePending(*cdiv1.ObjectTransfer) (time.Duration, error) ReconcileRunning(*cdiv1.ObjectTransfer) (time.Duration, error) } type objectTransferHandler struct { reconciler *ObjectTransferReconciler } // ObjectTransferReconciler members type ObjectTransferReconciler struct { Client client.Client Recorder record.EventRecorder Scheme *runtime.Scheme Log logr.Logger InstallerLabels map[string]string } func getTransferTargetName(ot *cdiv1.ObjectTransfer) string { if ot.Spec.Target.Name != nil { return *ot.Spec.Target.Name } return ot.Spec.Source.Name } func getTransferTargetNamespace(ot *cdiv1.ObjectTransfer) string { if ot.Spec.Target.Namespace != nil { return *ot.Spec.Target.Namespace } return ot.Spec.Source.Namespace } // NewObjectTransferController creates a new instance of the ObjectTransfer controller. func NewObjectTransferController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) { name := "transfer-controller" client := mgr.GetClient() reconciler := &ObjectTransferReconciler{ Client: client, Scheme: mgr.GetScheme(), Log: log.WithName(name), Recorder: mgr.GetEventRecorderFor(name), InstallerLabels: installerLabels, } ctrl, err := controller.New(name, mgr, controller.Options{ MaxConcurrentReconciles: 3, Reconciler: reconciler, }) if err != nil { return nil, err } if err := addObjectTransferControllerWatches(mgr, ctrl); err != nil { return nil, err } return ctrl, nil } func (r *ObjectTransferReconciler) logger(ot *cdiv1.ObjectTransfer) logr.Logger { return r.Log.WithValues( "namespace", ot.Namespace, "name", ot.Name, "kind", ot.Spec.Source.Kind, "sName", ot.Spec.Source.Name, "tNamespace", getTransferTargetNamespace(ot), "tName", getTransferTargetName(ot), ) } // Reconcile the reconcile loop for the data volumes. func (r *ObjectTransferReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) { ot := &cdiv1.ObjectTransfer{} if err := r.Client.Get(context.TODO(), req.NamespacedName, ot); err != nil { if errors.IsNotFound(err) { return reconcile.Result{}, nil } return reconcile.Result{}, err } r.logger(ot).V(1).Info("Handling request") switch ot.Status.Phase { case cdiv1.ObjectTransferEmpty: return r.reconcileEmpty(ot) case cdiv1.ObjectTransferPending: handler, err := r.getHandler(ot) if err != nil { return reconcile.Result{}, err } if ot.DeletionTimestamp != nil { return r.reconcileCleanup(ot) } if requeue, err := handler.ReconcilePending(ot); requeue > 0 || err != nil { return reconcile.Result{RequeueAfter: requeue}, err } case cdiv1.ObjectTransferRunning, cdiv1.ObjectTransferError: if ot.DeletionTimestamp != nil && ot.Status.Phase == cdiv1.ObjectTransferError { return r.reconcileCleanup(ot) } handler, err := r.getHandler(ot) if err != nil { return reconcile.Result{}, err } if requeue, err := handler.ReconcileRunning(ot); requeue > 0 || err != nil { return reconcile.Result{RequeueAfter: requeue}, err } case cdiv1.ObjectTransferComplete: return r.reconcileCleanup(ot) } return reconcile.Result{}, nil } func (r *ObjectTransferReconciler) reconcileEmpty(ot *cdiv1.ObjectTransfer) (reconcile.Result, error) { if !controllerutil.ContainsFinalizer(ot, objectTransferFinalizer) { controllerutil.AddFinalizer(ot, objectTransferFinalizer) if err := r.updateResource(ot, ot); err != nil { return reconcile.Result{}, err } } ot.Status.Phase = cdiv1.ObjectTransferPending if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Initializing", ""); err != nil { return reconcile.Result{}, err } return reconcile.Result{}, nil } func (r *ObjectTransferReconciler) reconcileCleanup(ot *cdiv1.ObjectTransfer) (reconcile.Result, error) { if controllerutil.ContainsFinalizer(ot, objectTransferFinalizer) { controllerutil.RemoveFinalizer(ot, objectTransferFinalizer) return reconcile.Result{}, r.updateResource(ot, ot) } return reconcile.Result{}, nil } func (r *ObjectTransferReconciler) getHandler(ot *cdiv1.ObjectTransfer) (transferHandler, error) { switch strings.ToLower(ot.Spec.Source.Kind) { case "datavolume": return &dataVolumeTransferHandler{ objectTransferHandler: objectTransferHandler{ reconciler: r, }, }, nil case "persistentvolumeclaim": return &pvcTransferHandler{ objectTransferHandler: objectTransferHandler{ reconciler: r, }, }, nil } return nil, fmt.Errorf("invalid kind %q", ot.Spec.Source.Kind) } func (r *ObjectTransferReconciler) getResource(ns, name string, obj client.Object) (bool, error) { if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: ns, Name: name}, obj); err != nil { if errors.IsNotFound(err) { return false, nil } return false, err } return true, nil } func (r *ObjectTransferReconciler) updateResource(ot *cdiv1.ObjectTransfer, obj client.Object) error { log := r.logger(ot) log.V(1).Info("Updating resource", "obj", obj) if err := r.Client.Update(context.TODO(), obj); err != nil { log.Error(err, "Update error") return err } return nil } func (r *ObjectTransferReconciler) updateResourceStatus(ot *cdiv1.ObjectTransfer, obj client.Object) error { log := r.logger(ot) log.V(1).Info("Updating resource status", "obj", obj) if err := r.Client.Status().Update(context.TODO(), obj); err != nil { log.Error(err, "Update status error") return err } return nil } func (r *ObjectTransferReconciler) getSourceResource(ot *cdiv1.ObjectTransfer, obj client.Object) (bool, error) { return r.getResource(ot.Spec.Source.Namespace, ot.Spec.Source.Name, obj) } func (r *ObjectTransferReconciler) getTargetResource(ot *cdiv1.ObjectTransfer, obj client.Object) (bool, error) { return r.getResource(getTransferTargetNamespace(ot), getTransferTargetName(ot), obj) } func (r *ObjectTransferReconciler) getCondition(ot *cdiv1.ObjectTransfer, t cdiv1.ObjectTransferConditionType) *cdiv1.ObjectTransferCondition { for i := range ot.Status.Conditions { c := &ot.Status.Conditions[i] if c.Type == t { return c } } return nil } func (r *ObjectTransferReconciler) setAndUpdateCompleteCondition(ot *cdiv1.ObjectTransfer, status corev1.ConditionStatus, message, reason string) error { return r.setAndUpdateCondition(ot, cdiv1.ObjectTransferConditionComplete, status, message, reason) } func (r *ObjectTransferReconciler) setCompleteConditionError(ot *cdiv1.ObjectTransfer, lastError error) error { if ot.Status.Phase == cdiv1.ObjectTransferRunning { ot.Status.Phase = cdiv1.ObjectTransferError } if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Error", lastError.Error()); err != nil { return err } return lastError } func (r *ObjectTransferReconciler) setCompleteConditionRunning(ot *cdiv1.ObjectTransfer) error { ot.Status.Phase = cdiv1.ObjectTransferRunning return r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Running", "") } func (r *ObjectTransferReconciler) setAndUpdateCondition(ot *cdiv1.ObjectTransfer, t cdiv1.ObjectTransferConditionType, status corev1.ConditionStatus, message, reason string) error { r.logger(ot).V(1).Info("Updating condition", "type", t, "status", status, "message", message, "reason", reason) ot2 := &cdiv1.ObjectTransfer{} if _, err := r.getResource("", ot.Name, ot2); err != nil { return err } if r.setCondition(ot, t, status, message, reason) || !apiequality.Semantic.DeepEqual(ot.Status, ot2.Status) { return r.updateResourceStatus(ot, ot) } return nil } func (r *ObjectTransferReconciler) setCondition(ot *cdiv1.ObjectTransfer, t cdiv1.ObjectTransferConditionType, status corev1.ConditionStatus, message, reason string) bool { updated := false cond := r.getCondition(ot, t) if cond == nil { ot.Status.Conditions = append(ot.Status.Conditions, cdiv1.ObjectTransferCondition{Type: t}) cond = &ot.Status.Conditions[len(ot.Status.Conditions)-1] updated = true } if cond.Status != status { cond.Status = status updated = true } if cond.Message != message { cond.Message = message updated = true } if cond.Reason != reason { cond.Reason = reason updated = true } if updated { cond.LastTransitionTime = metav1.Now() cond.LastHeartbeatTime = cond.LastTransitionTime } return updated } func (r *ObjectTransferReconciler) createObjectTransferTarget(ot *cdiv1.ObjectTransfer, obj client.Object, mutateFn func(client.Object)) error { s, ok := ot.Status.Data["source"] if !ok { return fmt.Errorf("source spec missing") } if err := json.Unmarshal([]byte(s), obj); err != nil { return err } metaObj, err := meta.Accessor(obj) if err != nil { return err } metaObj.SetNamespace(getTransferTargetNamespace(ot)) metaObj.SetName(getTransferTargetName(ot)) metaObj.SetGenerateName("") metaObj.SetUID("") metaObj.SetResourceVersion("") metaObj.SetGeneration(0) metaObj.SetSelfLink("") metaObj.SetCreationTimestamp(metav1.Time{}) metaObj.SetDeletionTimestamp(nil) metaObj.SetManagedFields(nil) if ot.Spec.Target.Namespace != nil && *ot.Spec.Target.Namespace != ot.Spec.Source.Namespace { metaObj.SetOwnerReferences(nil) } delete(metaObj.GetAnnotations(), AnnObjectTransferName) if mutateFn != nil { mutateFn(obj) } return r.Client.Create(context.TODO(), obj) } func (r *ObjectTransferReconciler) pendingHelper(ot *cdiv1.ObjectTransfer, obj client.Object, data map[string]string) error { metaObj, err := meta.Accessor(obj) if err != nil { return r.setCompleteConditionError(ot, err) } if !r.hasRequiredAnnotations(ot, metaObj) { if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Required annotation missing", ""); err != nil { return err } return nil } v, ok := metaObj.GetAnnotations()[AnnObjectTransferName] if ok && v != ot.Name { if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Source in use by another transfer", v); err != nil { return err } return nil } if !ok { if metaObj.GetAnnotations() == nil { metaObj.SetAnnotations(make(map[string]string)) } metaObj.GetAnnotations()[AnnObjectTransferName] = ot.Name if err := r.updateResource(ot, obj); err != nil { return r.setCompleteConditionError(ot, err) } if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Pending", ""); err != nil { return err } } bs, err := json.Marshal(obj) if err != nil { return r.setCompleteConditionError(ot, err) } source := string(bs) ot.Status.Data = map[string]string{ "source": source, } for k, v := range data { ot.Status.Data[k] = v } ot.Status.Phase = cdiv1.ObjectTransferRunning if err := r.setCompleteConditionRunning(ot); err != nil { return err } return nil } func (r *ObjectTransferReconciler) hasRequiredAnnotations(ot *cdiv1.ObjectTransfer, obj metav1.Object) bool { for rk, rv := range ot.Spec.Source.RequiredAnnotations { if v, ok := obj.GetAnnotations()[rk]; !ok || v != rv { return false } } return true }