containerized-data-importer/pkg/controller/populators/populator-base.go
Alexander Wels 2b8c425ee9
Ensure priority class is assigned to pod populating pvc prime (#2864)
* Ensure priority class is assigned to pod populating pvc prime

Priority class was not being passed to the pvc prime from the PVC
and thus was not being added to the importer pod populating the
pvc prime. There is a list of allowed annotations that can be passed
and the priority class annotation was not in it. This commit adds
the annotation to the allowed list.

Cleaned up unneed log argument to a function related to passing the
annotations.

Signed-off-by: Alexander Wels <awels@redhat.com>

* Restore the logger as it was logging the pvc name as well
as the log message. Modified the name to make it clearer
this is the case.

Signed-off-by: Alexander Wels <awels@redhat.com>

* Have upload also use the populator, can't do clone
because the pod disappears too quickly without retain

Signed-off-by: Alexander Wels <awels@redhat.com>

* Test for priority class in upload as well as fix typo in cloning test.

Signed-off-by: Alexander Wels <awels@redhat.com>

---------

Signed-off-by: Alexander Wels <awels@redhat.com>
2023-08-25 02:02:36 +02:00

337 lines
11 KiB
Go

/*
Copyright 2023 The CDI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package populators
import (
"context"
"reflect"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"kubevirt.io/containerized-data-importer/pkg/common"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"kubevirt.io/containerized-data-importer/pkg/util"
)
type pvcModifierFunc func(pvc *corev1.PersistentVolumeClaim, source client.Object)
// ReconcilerBase members
type ReconcilerBase struct {
client client.Client
recorder record.EventRecorder
scheme *runtime.Scheme
log logr.Logger
featureGates featuregates.FeatureGates
sourceKind string
installerLabels map[string]string
}
const (
dataSourceRefField = "spec.dataSourceRef"
uidField = "metadata.uid"
)
// Interface to store populator-specific methods
type populatorController interface {
// Returns the specific populator CR
getPopulationSource(pvc *corev1.PersistentVolumeClaim) (client.Object, error)
// Prepares the PVC' to be populated according to the population source
updatePVCForPopulation(pvc *corev1.PersistentVolumeClaim, source client.Object)
// Reconciles the target PVC with populator-specific logic
reconcileTargetPVC(pvc, pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error)
}
type indexArgs struct {
obj client.Object
field string
extractValue client.IndexerFunc
}
func getIndexArgs() []indexArgs {
return []indexArgs{
{
obj: &corev1.PersistentVolumeClaim{},
field: dataSourceRefField,
extractValue: func(obj client.Object) []string {
pvc := obj.(*corev1.PersistentVolumeClaim)
dataSourceRef := pvc.Spec.DataSourceRef
if isDataSourceRefValid(dataSourceRef) {
namespace := getPopulationSourceNamespace(pvc)
apiGroup := *dataSourceRef.APIGroup
return []string{getPopulatorIndexKey(apiGroup, dataSourceRef.Kind, namespace, dataSourceRef.Name)}
}
return nil
},
},
{
obj: &corev1.PersistentVolumeClaim{},
field: uidField,
extractValue: func(obj client.Object) []string {
return []string{string(obj.GetUID())}
},
},
}
}
// CreateCommonPopulatorIndexes creates indexes used by all populators
func CreateCommonPopulatorIndexes(mgr manager.Manager) error {
for _, ia := range getIndexArgs() {
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), ia.obj, ia.field, ia.extractValue); err != nil {
return err
}
}
return nil
}
func addCommonPopulatorsWatches(mgr manager.Manager, c controller.Controller, log logr.Logger, sourceKind string, sourceType client.Object) error {
// Setup watches
if err := c.Watch(&source.Kind{Type: &corev1.PersistentVolumeClaim{}}, handler.EnqueueRequestsFromMapFunc(
func(obj client.Object) []reconcile.Request {
pvc := obj.(*corev1.PersistentVolumeClaim)
if IsPVCDataSourceRefKind(pvc, sourceKind) {
pvcKey := types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}
return []reconcile.Request{{NamespacedName: pvcKey}}
}
if isPVCPrimeDataSourceRefKind(pvc, sourceKind) {
owner := metav1.GetControllerOf(pvc)
pvcKey := types.NamespacedName{Namespace: pvc.Namespace, Name: owner.Name}
return []reconcile.Request{{NamespacedName: pvcKey}}
}
return nil
}),
); err != nil {
return err
}
mapDataSourceRefToPVC := func(obj client.Object) (reqs []reconcile.Request) {
var pvcs corev1.PersistentVolumeClaimList
matchingFields := client.MatchingFields{
dataSourceRefField: getPopulatorIndexKey(cc.AnnAPIGroup, sourceKind, obj.GetNamespace(), obj.GetName()),
}
if err := mgr.GetClient().List(context.TODO(), &pvcs, matchingFields); err != nil {
log.Error(err, "Unable to list PVCs", "matchingFields", matchingFields)
return reqs
}
for _, pvc := range pvcs.Items {
reqs = append(reqs, reconcile.Request{NamespacedName: types.NamespacedName{Namespace: pvc.Namespace, Name: pvc.Name}})
}
return reqs
}
if err := c.Watch(&source.Kind{Type: sourceType},
handler.EnqueueRequestsFromMapFunc(mapDataSourceRefToPVC),
); err != nil {
return err
}
return nil
}
func (r *ReconcilerBase) getPVCPrime(pvc *corev1.PersistentVolumeClaim) (*corev1.PersistentVolumeClaim, error) {
pvcPrime := &corev1.PersistentVolumeClaim{}
pvcPrimeKey := types.NamespacedName{Namespace: pvc.Namespace, Name: PVCPrimeName(pvc)}
if err := r.client.Get(context.TODO(), pvcPrimeKey, pvcPrime); err != nil {
if !k8serrors.IsNotFound(err) {
return nil, err
}
return nil, nil
}
return pvcPrime, nil
}
func (r *ReconcilerBase) createPVCPrime(pvc *corev1.PersistentVolumeClaim, source client.Object, waitForFirstConsumer bool, updatePVCForPopulation pvcModifierFunc) (*corev1.PersistentVolumeClaim, error) {
labels := make(map[string]string)
labels[common.CDILabelKey] = common.CDILabelValue
annotations := make(map[string]string)
annotations[cc.AnnImmediateBinding] = ""
if waitForFirstConsumer {
annotations[cc.AnnSelectedNode] = pvc.Annotations[cc.AnnSelectedNode]
}
// Assemble PVC' spec
pvcPrime := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: PVCPrimeName(pvc),
Namespace: pvc.Namespace,
Labels: labels,
Annotations: annotations,
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: pvc.Spec.AccessModes,
Resources: pvc.Spec.Resources,
StorageClassName: pvc.Spec.StorageClassName,
VolumeMode: pvc.Spec.VolumeMode,
},
}
pvcPrime.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(pvc, schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PersistentVolumeClaim",
}),
}
cc.SetPvcAllowedAnnotations(pvcPrime, pvc)
util.SetRecommendedLabels(pvcPrime, r.installerLabels, "cdi-controller")
// We use the populator-specific pvcModifierFunc to add required annotations
if updatePVCForPopulation != nil {
updatePVCForPopulation(pvcPrime, source)
}
if err := r.client.Create(context.TODO(), pvcPrime); err != nil {
return nil, err
}
r.recorder.Eventf(pvc, corev1.EventTypeNormal, createdPVCPrimeSuccessfully, messageCreatedPVCPrimeSuccessfully)
return pvcPrime, nil
}
type updatePVCAnnotationsFunc func(pvc, pvcPrime *corev1.PersistentVolumeClaim)
var desiredAnnotations = []string{cc.AnnPodPhase, cc.AnnPodReady, cc.AnnPodRestarts,
cc.AnnPreallocationRequested, cc.AnnPreallocationApplied, cc.AnnCurrentCheckpoint, cc.AnnMultiStageImportDone,
cc.AnnRunningCondition, cc.AnnRunningConditionMessage, cc.AnnRunningConditionReason}
func (r *ReconcilerBase) updatePVCWithPVCPrimeAnnotations(pvc, pvcPrime *corev1.PersistentVolumeClaim, updateFunc updatePVCAnnotationsFunc) error {
pvcCopy := pvc.DeepCopy()
for _, ann := range desiredAnnotations {
if value, ok := pvcPrime.GetAnnotations()[ann]; ok {
cc.AddAnnotation(pvcCopy, ann, value)
} else if _, ok := pvcCopy.GetAnnotations()[ann]; ok {
// if the desired Annotation was deleted from pvcPrime
// delete it also in the target pvc
delete(pvcCopy.Annotations, ann)
}
}
if updateFunc != nil {
updateFunc(pvcCopy, pvcPrime)
}
if !reflect.DeepEqual(pvc.ObjectMeta, pvcCopy.ObjectMeta) {
err := r.client.Update(context.TODO(), pvcCopy)
if err != nil {
return err
}
}
return nil
}
// reconcile functions
func (r *ReconcilerBase) reconcile(req reconcile.Request, populator populatorController, pvcNameLogger logr.Logger) (reconcile.Result, error) {
// Get the target PVC
pvc := &corev1.PersistentVolumeClaim{}
if err := r.client.Get(context.TODO(), req.NamespacedName, pvc); err != nil {
if k8serrors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// We first perform the common reconcile steps.
// We should only continue if we get a valid PVC'
pvcPrime, err := r.reconcileCommon(pvc, populator, pvcNameLogger)
if err != nil || pvcPrime == nil {
return reconcile.Result{}, err
}
// Each populator reconciles the target PVC in a different way
if cc.IsUnbound(pvc) || !cc.IsPVCComplete(pvc) || cc.IsMultiStageImportInProgress(pvc) {
return populator.reconcileTargetPVC(pvc, pvcPrime)
}
// Making sure to clean PVC'
return r.reconcileCleanup(pvcPrime)
}
func (r *ReconcilerBase) reconcileCommon(pvc *corev1.PersistentVolumeClaim, populator populatorController, pvcNameLogger logr.Logger) (*corev1.PersistentVolumeClaim, error) {
if pvc.DeletionTimestamp != nil {
pvcNameLogger.V(1).Info("PVC being terminated, ignoring")
return nil, nil
}
// Get the PVC'. If it does exist, return it and skip the rest of the checks
// If it doesn't, we'll attempt to create it.
pvcPrime, err := r.getPVCPrime(pvc)
if err != nil {
return nil, err
}
if pvcPrime != nil {
return pvcPrime, nil
}
// We should ignore PVCs that aren't for this populator to handle
dataSourceRef := pvc.Spec.DataSourceRef
if !IsPVCDataSourceRefKind(pvc, r.sourceKind) {
pvcNameLogger.V(1).Info("reconciled unexpected PVC, ignoring")
return nil, nil
}
// TODO: Remove this check once we support cross-namespace dataSourceRef
if dataSourceRef.Namespace != nil {
pvcNameLogger.V(1).Info("cross-namespace dataSourceRef not supported yet, ignoring")
return nil, nil
}
// Wait until dataSourceRef exists
populationSource, err := populator.getPopulationSource(pvc)
if populationSource == nil {
return nil, err
}
// Check storage class
ready, nodeName, err := claimReadyForPopulation(context.TODO(), r.client, pvc)
if !ready || err != nil {
return nil, err
}
// If PVC' doesn't exist and target PVC is not bound, we should create the PVC' to start the population.
// We still return nil as we'll get called again once PVC' exists.
// If target PVC is bound, we don't really need to populate anything.
if cc.IsUnbound(pvc) {
_, err := r.createPVCPrime(pvc, populationSource, nodeName != "", populator.updatePVCForPopulation)
if err != nil {
r.recorder.Eventf(pvc, corev1.EventTypeWarning, errCreatingPVCPrime, err.Error())
return nil, err
}
}
return nil, nil
}
func (r *ReconcilerBase) reconcileCleanup(pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
if pvcPrime != nil {
if err := r.client.Delete(context.TODO(), pvcPrime); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}