containerized-data-importer/pkg/controller/datavolume/clone-controller-base.go
akalenyu 3a4af3eff0
Introduce base clone controller and adjust PVC clone controller accordingly (#2544)
We are introducing cloning from snapshot soon as a new controller,
as part of those efforts, it was identified to be beneficial to have this split.

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
2023-01-18 19:34:59 +01:00

591 lines
20 KiB
Go

/*
Copyright 2022 The CDI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package datavolume
import (
"context"
"fmt"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
"kubevirt.io/containerized-data-importer/pkg/common"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
"kubevirt.io/containerized-data-importer/pkg/token"
"kubevirt.io/containerized-data-importer/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
// ErrUnableToClone provides a const to indicate some errors are blocking the clone
ErrUnableToClone = "ErrUnableToClone"
// CloneScheduled provides a const to indicate clone is scheduled
CloneScheduled = "CloneScheduled"
// CloneInProgress provides a const to indicate clone is in progress
CloneInProgress = "CloneInProgress"
// SnapshotForSmartCloneInProgress provides a const to indicate snapshot creation for smart-clone is in progress
SnapshotForSmartCloneInProgress = "SnapshotForSmartCloneInProgress"
// SnapshotForSmartCloneCreated provides a const to indicate snapshot creation for smart-clone has been completed
SnapshotForSmartCloneCreated = "SnapshotForSmartCloneCreated"
// SmartClonePVCInProgress provides a const to indicate snapshot creation for smart-clone is in progress
SmartClonePVCInProgress = "SmartClonePVCInProgress"
// SmartCloneSourceInUse provides a const to indicate a smart clone is being delayed because the source is in use
SmartCloneSourceInUse = "SmartCloneSourceInUse"
// CSICloneInProgress provides a const to indicate csi volume clone is in progress
CSICloneInProgress = "CSICloneInProgress"
// CSICloneSourceInUse provides a const to indicate a csi volume clone is being delayed because the source is in use
CSICloneSourceInUse = "CSICloneSourceInUse"
// HostAssistedCloneSourceInUse provides a const to indicate a host-assisted clone is being delayed because the source is in use
HostAssistedCloneSourceInUse = "HostAssistedCloneSourceInUse"
// CloneFailed provides a const to indicate clone has failed
CloneFailed = "CloneFailed"
// CloneSucceeded provides a const to indicate clone has succeeded
CloneSucceeded = "CloneSucceeded"
// MessageCloneScheduled provides a const to form clone is scheduled message
MessageCloneScheduled = "Cloning from %s/%s into %s/%s scheduled"
// MessageCloneInProgress provides a const to form clone is in progress message
MessageCloneInProgress = "Cloning from %s/%s into %s/%s in progress"
// MessageCloneFailed provides a const to form clone has failed message
MessageCloneFailed = "Cloning from %s/%s into %s/%s failed"
// MessageCloneSucceeded provides a const to form clone has succeeded message
MessageCloneSucceeded = "Successfully cloned from %s/%s into %s/%s"
// MessageSmartCloneInProgress provides a const to form snapshot for smart-clone is in progress message
MessageSmartCloneInProgress = "Creating snapshot for smart-clone is in progress (for pvc %s/%s)"
// MessageSmartClonePVCInProgress provides a const to form snapshot for smart-clone is in progress message
MessageSmartClonePVCInProgress = "Creating PVC for smart-clone is in progress (for pvc %s/%s)"
// MessageCsiCloneInProgress provides a const to form a CSI Volume Clone in progress message
MessageCsiCloneInProgress = "CSI Volume clone in progress (for pvc %s/%s)"
// ExpansionInProgress is const representing target PVC expansion
ExpansionInProgress = "ExpansionInProgress"
// MessageExpansionInProgress is a const for reporting target expansion
MessageExpansionInProgress = "Expanding PersistentVolumeClaim for DataVolume %s/%s"
// NamespaceTransferInProgress is const representing target PVC transfer
NamespaceTransferInProgress = "NamespaceTransferInProgress"
// MessageNamespaceTransferInProgress is a const for reporting target transfer
MessageNamespaceTransferInProgress = "Transferring PersistentVolumeClaim for DataVolume %s/%s"
// SizeDetectionPodCreated provides a const to indicate that the size-detection pod has been created (reason)
SizeDetectionPodCreated = "SizeDetectionPodCreated"
// MessageSizeDetectionPodCreated provides a const to indicate that the size-detection pod has been created (message)
MessageSizeDetectionPodCreated = "Size-detection pod created"
// SizeDetectionPodNotReady reports that the size-detection pod has not finished its exectuion (reason)
SizeDetectionPodNotReady = "SizeDetectionPodNotReady"
// MessageSizeDetectionPodNotReady reports that the size-detection pod has not finished its exectuion (message)
MessageSizeDetectionPodNotReady = "The size detection pod is not finished yet"
// ImportPVCNotReady reports that it's not yet possible to access the source PVC (reason)
ImportPVCNotReady = "ImportPVCNotReady"
// MessageImportPVCNotReady reports that it's not yet possible to access the source PVC (message)
MessageImportPVCNotReady = "The source PVC is not fully imported"
// CloneValidationFailed reports that a clone wasn't admitted by our validation mechanism (reason)
CloneValidationFailed = "CloneValidationFailed"
// MessageCloneValidationFailed reports that a clone wasn't admitted by our validation mechanism (message)
MessageCloneValidationFailed = "The clone doesn't meet the validation requirements"
// CloneWithoutSource reports that the source PVC of a clone doesn't exists (reason)
CloneWithoutSource = "CloneWithoutSource"
// MessageCloneWithoutSource reports that the source PVC of a clone doesn't exists (message)
MessageCloneWithoutSource = "The source PVC %s doesn't exist"
// AnnCSICloneRequest annotation associates object with CSI Clone Request
AnnCSICloneRequest = "cdi.kubevirt.io/CSICloneRequest"
// AnnVirtualImageSize annotation contains the Virtual Image size of a PVC used for host-assisted cloning
AnnVirtualImageSize = "cdi.Kubervirt.io/virtualSize"
// AnnSourceCapacity annotation contains the storage capacity of a PVC used for host-assisted cloning
AnnSourceCapacity = "cdi.Kubervirt.io/sourceCapacity"
crossNamespaceFinalizer = "cdi.kubevirt.io/dataVolumeFinalizer"
annReadyForTransfer = "cdi.kubevirt.io/readyForTransfer"
annCloneType = "cdi.kubevirt.io/cloneType"
cloneControllerName = "datavolume-clone-controller"
)
type statusPhaseSync struct {
phase cdiv1.DataVolumePhase
pvc *corev1.PersistentVolumeClaim
event Event
}
type dataVolumeCloneSyncResult struct {
dataVolumeSyncResult
phaseSync *statusPhaseSync
}
// CloneReconcilerBase members
type CloneReconcilerBase struct {
ReconcilerBase
clonerImage string
importerImage string
pullPolicy string
tokenValidator token.Validator
tokenGenerator token.Generator
}
func addDataVolumeCloneControllerCommonWatches(mgr manager.Manager, datavolumeController controller.Controller) error {
if err := addDataVolumeControllerCommonWatches(mgr, datavolumeController, dataVolumeClone); err != nil {
return err
}
return nil
}
func (r *CloneReconcilerBase) ensureExtendedToken(pvc *corev1.PersistentVolumeClaim) error {
_, ok := pvc.Annotations[cc.AnnExtendedCloneToken]
if ok {
return nil
}
token, ok := pvc.Annotations[cc.AnnCloneToken]
if !ok {
return fmt.Errorf("token missing")
}
payload, err := r.tokenValidator.Validate(token)
if err != nil {
return err
}
if payload.Params == nil {
payload.Params = make(map[string]string)
}
payload.Params["uid"] = string(pvc.UID)
newToken, err := r.tokenGenerator.Generate(payload)
if err != nil {
return err
}
pvc.Annotations[cc.AnnExtendedCloneToken] = newToken
if err := r.updatePVC(pvc); err != nil {
return err
}
return nil
}
// When the clone is finished some additional actions may be applied
// like namespaceTransfer Cleanup or size expansion
func (r *CloneReconcilerBase) finishClone(log logr.Logger, syncRes *dataVolumeCloneSyncResult, transferName string) (reconcile.Result, error) {
//DO Nothing, not yet ready
if syncRes.pvc.Annotations[cc.AnnCloneOf] != "true" {
return reconcile.Result{}, nil
}
// expand for non-namespace case
return r.expandPvcAfterClone(log, syncRes, syncRes.pvc, cdiv1.Succeeded, false)
}
func (r *CloneReconcilerBase) setCloneOfOnPvc(pvc *corev1.PersistentVolumeClaim) error {
if v, ok := pvc.Annotations[cc.AnnCloneOf]; !ok || v != "true" {
if pvc.Annotations == nil {
pvc.Annotations = make(map[string]string)
}
pvc.Annotations[cc.AnnCloneOf] = "true"
return r.updatePVC(pvc)
}
return nil
}
// temporary pvc is used when the clone src and tgt are in two distinct namespaces
func (r *CloneReconcilerBase) expandPvcAfterClone(
log logr.Logger,
syncRes *dataVolumeCloneSyncResult,
pvc *corev1.PersistentVolumeClaim,
nextPhase cdiv1.DataVolumePhase,
isTemp bool) (reconcile.Result, error) {
done, err := r.expand(log, syncRes, pvc)
if err != nil {
return reconcile.Result{}, err
}
if !done {
return reconcile.Result{Requeue: true},
r.syncCloneStatusPhase(syncRes, cdiv1.ExpansionInProgress, pvc)
}
if isTemp {
// trigger transfer and next reconcile should have pvcExists == true
pvc.Annotations[annReadyForTransfer] = "true"
pvc.Annotations[cc.AnnPopulatedFor] = syncRes.dvMutated.Name
if err := r.updatePVC(pvc); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, r.syncCloneStatusPhase(syncRes, nextPhase, pvc)
}
func (r *CloneReconcilerBase) doCrossNamespaceClone(log logr.Logger,
syncRes *dataVolumeCloneSyncResult,
pvcName string,
returnWhenCloneInProgress bool,
selectedCloneStrategy cloneStrategy) (*reconcile.Result, error) {
datavolume := syncRes.dvMutated
initialized, err := r.initTransfer(log, syncRes, pvcName)
if err != nil {
return &reconcile.Result{}, err
}
// get reconciled again v soon
if !initialized {
return &reconcile.Result{}, r.syncCloneStatusPhase(syncRes, cdiv1.CloneScheduled, nil)
}
tmpPVC := &corev1.PersistentVolumeClaim{}
nn := types.NamespacedName{Namespace: datavolume.Spec.Source.PVC.Namespace, Name: pvcName}
if err := r.client.Get(context.TODO(), nn, tmpPVC); err != nil {
if !k8serrors.IsNotFound(err) {
return &reconcile.Result{}, err
}
} else if tmpPVC.Annotations[cc.AnnCloneOf] == "true" {
result, err := r.expandPvcAfterClone(log, syncRes, tmpPVC, cdiv1.NamespaceTransferInProgress, true)
return &result, err
} else {
// AnnCloneOf != true, so cloneInProgress
if returnWhenCloneInProgress {
return &reconcile.Result{}, nil
}
}
return nil, nil
}
func (r *CloneReconcilerBase) initTransfer(log logr.Logger, syncRes *dataVolumeCloneSyncResult, name string) (bool, error) {
initialized := true
dv := syncRes.dvMutated
log.Info("Initializing transfer")
if !cc.HasFinalizer(dv, crossNamespaceFinalizer) {
cc.AddFinalizer(dv, crossNamespaceFinalizer)
initialized = false
}
ot := &cdiv1.ObjectTransfer{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: name}, ot); err != nil {
if !k8serrors.IsNotFound(err) {
return false, err
}
if err := cc.ValidateCloneTokenDV(r.tokenValidator, dv); err != nil {
return false, err
}
ot = &cdiv1.ObjectTransfer{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{
common.CDILabelKey: common.CDILabelValue,
common.CDIComponentLabel: "",
},
},
Spec: cdiv1.ObjectTransferSpec{
Source: cdiv1.TransferSource{
Kind: "PersistentVolumeClaim",
Namespace: dv.Spec.Source.PVC.Namespace,
Name: name,
RequiredAnnotations: map[string]string{
annReadyForTransfer: "true",
},
},
Target: cdiv1.TransferTarget{
Namespace: &dv.Namespace,
Name: &dv.Name,
},
},
}
util.SetRecommendedLabels(ot, r.installerLabels, "cdi-controller")
if err := setAnnOwnedByDataVolume(ot, dv); err != nil {
return false, err
}
if err := r.client.Create(context.TODO(), ot); err != nil {
return false, err
}
initialized = false
}
return initialized, nil
}
func expansionPodName(pvc *corev1.PersistentVolumeClaim) string {
return "cdi-expand-" + string(pvc.UID)
}
func (r *CloneReconcilerBase) expand(log logr.Logger, syncRes *dataVolumeCloneSyncResult, pvc *corev1.PersistentVolumeClaim) (bool, error) {
if pvc.Status.Phase != corev1.ClaimBound {
return false, fmt.Errorf("cannot expand volume in %q phase", pvc.Status.Phase)
}
requestedSize, hasRequested := syncRes.pvcSpec.Resources.Requests[corev1.ResourceStorage]
currentSize, hasCurrent := pvc.Spec.Resources.Requests[corev1.ResourceStorage]
actualSize, hasActual := pvc.Status.Capacity[corev1.ResourceStorage]
if !hasRequested || !hasCurrent || !hasActual {
return false, fmt.Errorf("PVC sizes missing")
}
expansionRequired := actualSize.Cmp(requestedSize) < 0
updateRequestSizeRequired := currentSize.Cmp(requestedSize) < 0
log.V(3).Info("Expand sizes", "req", requestedSize, "cur", currentSize, "act", actualSize, "exp", expansionRequired)
if updateRequestSizeRequired {
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = requestedSize
if err := r.updatePVC(pvc); err != nil {
return false, err
}
return false, nil
}
podName := expansionPodName(pvc)
podExists := true
pod := &corev1.Pod{}
nn := types.NamespacedName{Namespace: pvc.Namespace, Name: podName}
if err := r.client.Get(context.TODO(), nn, pod); err != nil {
if !k8serrors.IsNotFound(err) {
return false, err
}
podExists = false
}
if !expansionRequired && !podExists {
// finally done
return true, nil
}
hasPendingResizeCondition := false
for _, cond := range pvc.Status.Conditions {
if cond.Type == corev1.PersistentVolumeClaimFileSystemResizePending {
hasPendingResizeCondition = true
break
}
}
if !podExists && !hasPendingResizeCondition {
// wait for resize condition
return false, nil
}
if !podExists {
var err error
pod, err = r.createExpansionPod(pvc, syncRes.dvMutated, podName)
// Check if pod has failed and, in that case, record an event with the error
if podErr := cc.HandleFailedPod(err, podName, pvc, r.recorder, r.client); podErr != nil {
return false, podErr
}
}
if pod.Status.Phase == corev1.PodSucceeded {
if err := r.client.Delete(context.TODO(), pod); err != nil {
if k8serrors.IsNotFound(err) {
return true, err
}
return false, err
}
return false, nil
}
return false, nil
}
func (r *CloneReconcilerBase) createExpansionPod(pvc *corev1.PersistentVolumeClaim, dv *cdiv1.DataVolume, podName string) (*corev1.Pod, error) {
resourceRequirements, err := cc.GetDefaultPodResourceRequirements(r.client)
if err != nil {
return nil, err
}
workloadNodePlacement, err := cc.GetWorkloadNodePlacement(r.client)
if err != nil {
return nil, err
}
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: pvc.Namespace,
Annotations: map[string]string{
cc.AnnCreatedBy: "yes",
},
Labels: map[string]string{
common.CDILabelKey: common.CDILabelValue,
common.CDIComponentLabel: "cdi-expander",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "dummy",
Image: r.clonerImage,
ImagePullPolicy: corev1.PullPolicy(r.pullPolicy),
Command: []string{"/bin/bash"},
Args: []string{"-c", "echo", "'hello cdi'"},
},
},
RestartPolicy: corev1.RestartPolicyOnFailure,
Volumes: []corev1.Volume{
{
Name: cc.DataVolName,
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
NodeSelector: workloadNodePlacement.NodeSelector,
Tolerations: workloadNodePlacement.Tolerations,
Affinity: workloadNodePlacement.Affinity,
},
}
util.SetRecommendedLabels(pod, r.installerLabels, "cdi-controller")
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock {
pod.Spec.Containers[0].VolumeDevices = cc.AddVolumeDevices()
} else {
pod.Spec.Containers[0].VolumeMounts = []corev1.VolumeMount{
{
Name: cc.DataVolName,
MountPath: common.ClonerMountPath,
},
}
}
if resourceRequirements != nil {
pod.Spec.Containers[0].Resources = *resourceRequirements
}
if err := setAnnOwnedByDataVolume(pod, dv); err != nil {
return nil, err
}
cc.SetRestrictedSecurityContext(&pod.Spec)
if err := r.client.Create(context.TODO(), pod); err != nil {
if !k8serrors.IsAlreadyExists(err) {
return nil, err
}
}
return pod, nil
}
func (r *CloneReconcilerBase) syncCloneStatusPhase(syncRes *dataVolumeCloneSyncResult, phase cdiv1.DataVolumePhase, pvc *corev1.PersistentVolumeClaim) error {
var event Event
dataVolume := syncRes.dvMutated
switch phase {
case cdiv1.CloneScheduled:
event.eventType = corev1.EventTypeNormal
event.reason = CloneScheduled
event.message = fmt.Sprintf(MessageCloneScheduled, dataVolume.Spec.Source.PVC.Namespace, dataVolume.Spec.Source.PVC.Name, dataVolume.Namespace, dataVolume.Name)
case cdiv1.SnapshotForSmartCloneInProgress:
event.eventType = corev1.EventTypeNormal
event.reason = SnapshotForSmartCloneInProgress
event.message = fmt.Sprintf(MessageSmartCloneInProgress, dataVolume.Spec.Source.PVC.Namespace, dataVolume.Spec.Source.PVC.Name)
case cdiv1.CSICloneInProgress:
event.eventType = corev1.EventTypeNormal
event.reason = string(cdiv1.CSICloneInProgress)
event.message = fmt.Sprintf(MessageCsiCloneInProgress, dataVolume.Spec.Source.PVC.Namespace, dataVolume.Spec.Source.PVC.Name)
case cdiv1.ExpansionInProgress:
event.eventType = corev1.EventTypeNormal
event.reason = ExpansionInProgress
event.message = fmt.Sprintf(MessageExpansionInProgress, dataVolume.Namespace, dataVolume.Name)
case cdiv1.NamespaceTransferInProgress:
event.eventType = corev1.EventTypeNormal
event.reason = NamespaceTransferInProgress
event.message = fmt.Sprintf(MessageNamespaceTransferInProgress, dataVolume.Namespace, dataVolume.Name)
case cdiv1.Succeeded:
event.eventType = corev1.EventTypeNormal
event.reason = CloneSucceeded
event.message = fmt.Sprintf(MessageCloneSucceeded, dataVolume.Spec.Source.PVC.Namespace, dataVolume.Spec.Source.PVC.Name, dataVolume.Namespace, dataVolume.Name)
}
return r.syncDataVolumeStatusPhaseWithEvent(syncRes, phase, pvc, event)
}
// If SourceRef is set, populate spec.Source with data from the DataSource
// Note that when the controller actually updates the DV (updateDataVolume), we nil out spec.Source when SourceRef is set
func (r *CloneReconcilerBase) populateSourceIfSourceRef(dv *cdiv1.DataVolume) error {
if dv.Spec.SourceRef == nil {
return nil
}
if dv.Spec.SourceRef.Kind != cdiv1.DataVolumeDataSource {
return errors.Errorf("Unsupported sourceRef kind %s, currently only %s is supported", dv.Spec.SourceRef.Kind, cdiv1.DataVolumeDataSource)
}
ns := dv.Namespace
if dv.Spec.SourceRef.Namespace != nil && *dv.Spec.SourceRef.Namespace != "" {
ns = *dv.Spec.SourceRef.Namespace
}
dataSource := &cdiv1.DataSource{}
if err := r.client.Get(context.TODO(), types.NamespacedName{Name: dv.Spec.SourceRef.Name, Namespace: ns}, dataSource); err != nil {
return err
}
dv.Spec.Source = &cdiv1.DataVolumeSource{
PVC: dataSource.Spec.Source.PVC,
}
return nil
}
func (r *CloneReconcilerBase) syncDataVolumeStatusPhaseWithEvent(
syncRes *dataVolumeCloneSyncResult,
phase cdiv1.DataVolumePhase,
pvc *corev1.PersistentVolumeClaim,
event Event) error {
if syncRes.phaseSync != nil {
return fmt.Errorf("phaseSync is already set")
}
syncRes.phaseSync = &statusPhaseSync{phase: phase, pvc: pvc, event: event}
return nil
}
func isCrossNamespaceClone(dv *cdiv1.DataVolume) bool {
if dv.Spec.Source.PVC == nil {
return false
}
return dv.Spec.Source.PVC.Namespace != "" && dv.Spec.Source.PVC.Namespace != dv.Namespace
}
func getTransferName(dv *cdiv1.DataVolume) string {
return fmt.Sprintf("cdi-tmp-%s", dv.UID)
}