containerized-data-importer/pkg/controller/transfer/controller.go
Michael Henriksen cc8dbc3bae
increase controller cuncurrency and cpu requests (#2862)
This commit ups the cpu request for for all our installed compopents
(cdi-deployment, cdi-apiserver, cdi-uploadproxy, cdi-operator)
for 10m (1% of a core) to 100m (10% of a core).
The main driver of this is BZ: 2216038.
Without this change, it is pretty easy to create a large number of
concurrent clone operations and get token timeout errors.
Upping resource requests and concurrency addresses the issue
in a very direct way.

Signed-off-by: Michael Henriksen <mhenriks@redhat.com>
2023-08-24 02:48:34 +02:00

450 lines
12 KiB
Go

/*
Copyright 2018 The CDI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package transfer
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
)
const (
// AnnObjectTransferName holds the name of related ObjectTransfer resource
AnnObjectTransferName = "cdi.kubevirt.io/objectTransferName"
objectTransferFinalizer = "cdi.kubevirt.io/objectTransfer"
defaultRequeue = 2 * time.Second
)
type transferHandler interface {
ReconcilePending(*cdiv1.ObjectTransfer) (time.Duration, error)
ReconcileRunning(*cdiv1.ObjectTransfer) (time.Duration, error)
}
type objectTransferHandler struct {
reconciler *ObjectTransferReconciler
}
// ObjectTransferReconciler members
type ObjectTransferReconciler struct {
Client client.Client
Recorder record.EventRecorder
Scheme *runtime.Scheme
Log logr.Logger
InstallerLabels map[string]string
}
func getTransferTargetName(ot *cdiv1.ObjectTransfer) string {
if ot.Spec.Target.Name != nil {
return *ot.Spec.Target.Name
}
return ot.Spec.Source.Name
}
func getTransferTargetNamespace(ot *cdiv1.ObjectTransfer) string {
if ot.Spec.Target.Namespace != nil {
return *ot.Spec.Target.Namespace
}
return ot.Spec.Source.Namespace
}
// NewObjectTransferController creates a new instance of the ObjectTransfer controller.
func NewObjectTransferController(mgr manager.Manager, log logr.Logger, installerLabels map[string]string) (controller.Controller, error) {
name := "transfer-controller"
client := mgr.GetClient()
reconciler := &ObjectTransferReconciler{
Client: client,
Scheme: mgr.GetScheme(),
Log: log.WithName(name),
Recorder: mgr.GetEventRecorderFor(name),
InstallerLabels: installerLabels,
}
ctrl, err := controller.New(name, mgr, controller.Options{
MaxConcurrentReconciles: 3,
Reconciler: reconciler,
})
if err != nil {
return nil, err
}
if err := addObjectTransferControllerWatches(mgr, ctrl); err != nil {
return nil, err
}
return ctrl, nil
}
func (r *ObjectTransferReconciler) logger(ot *cdiv1.ObjectTransfer) logr.Logger {
return r.Log.WithValues(
"namespace", ot.Namespace,
"name", ot.Name,
"kind", ot.Spec.Source.Kind,
"sName", ot.Spec.Source.Name,
"tNamespace", getTransferTargetNamespace(ot),
"tName", getTransferTargetName(ot),
)
}
// Reconcile the reconcile loop for the data volumes.
func (r *ObjectTransferReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {
ot := &cdiv1.ObjectTransfer{}
if err := r.Client.Get(context.TODO(), req.NamespacedName, ot); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
r.logger(ot).V(1).Info("Handling request")
switch ot.Status.Phase {
case cdiv1.ObjectTransferEmpty:
return r.reconcileEmpty(ot)
case cdiv1.ObjectTransferPending:
handler, err := r.getHandler(ot)
if err != nil {
return reconcile.Result{}, err
}
if ot.DeletionTimestamp != nil {
return r.reconcileCleanup(ot)
}
if requeue, err := handler.ReconcilePending(ot); requeue > 0 || err != nil {
return reconcile.Result{RequeueAfter: requeue}, err
}
case cdiv1.ObjectTransferRunning, cdiv1.ObjectTransferError:
if ot.DeletionTimestamp != nil && ot.Status.Phase == cdiv1.ObjectTransferError {
return r.reconcileCleanup(ot)
}
handler, err := r.getHandler(ot)
if err != nil {
return reconcile.Result{}, err
}
if requeue, err := handler.ReconcileRunning(ot); requeue > 0 || err != nil {
return reconcile.Result{RequeueAfter: requeue}, err
}
case cdiv1.ObjectTransferComplete:
return r.reconcileCleanup(ot)
}
return reconcile.Result{}, nil
}
func (r *ObjectTransferReconciler) reconcileEmpty(ot *cdiv1.ObjectTransfer) (reconcile.Result, error) {
if !controllerutil.ContainsFinalizer(ot, objectTransferFinalizer) {
controllerutil.AddFinalizer(ot, objectTransferFinalizer)
if err := r.updateResource(ot, ot); err != nil {
return reconcile.Result{}, err
}
}
ot.Status.Phase = cdiv1.ObjectTransferPending
if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Initializing", ""); err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
func (r *ObjectTransferReconciler) reconcileCleanup(ot *cdiv1.ObjectTransfer) (reconcile.Result, error) {
if controllerutil.ContainsFinalizer(ot, objectTransferFinalizer) {
controllerutil.RemoveFinalizer(ot, objectTransferFinalizer)
return reconcile.Result{}, r.updateResource(ot, ot)
}
return reconcile.Result{}, nil
}
func (r *ObjectTransferReconciler) getHandler(ot *cdiv1.ObjectTransfer) (transferHandler, error) {
switch strings.ToLower(ot.Spec.Source.Kind) {
case "datavolume":
return &dataVolumeTransferHandler{
objectTransferHandler: objectTransferHandler{
reconciler: r,
},
}, nil
case "persistentvolumeclaim":
return &pvcTransferHandler{
objectTransferHandler: objectTransferHandler{
reconciler: r,
},
}, nil
}
return nil, fmt.Errorf("invalid kind %q", ot.Spec.Source.Kind)
}
func (r *ObjectTransferReconciler) getResource(ns, name string, obj client.Object) (bool, error) {
if err := r.Client.Get(context.TODO(), types.NamespacedName{Namespace: ns, Name: name}, obj); err != nil {
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
}
func (r *ObjectTransferReconciler) updateResource(ot *cdiv1.ObjectTransfer, obj client.Object) error {
log := r.logger(ot)
log.V(1).Info("Updating resource", "obj", obj)
if err := r.Client.Update(context.TODO(), obj); err != nil {
log.Error(err, "Update error")
return err
}
return nil
}
func (r *ObjectTransferReconciler) updateResourceStatus(ot *cdiv1.ObjectTransfer, obj client.Object) error {
log := r.logger(ot)
log.V(1).Info("Updating resource status", "obj", obj)
if err := r.Client.Status().Update(context.TODO(), obj); err != nil {
log.Error(err, "Update status error")
return err
}
return nil
}
func (r *ObjectTransferReconciler) getSourceResource(ot *cdiv1.ObjectTransfer, obj client.Object) (bool, error) {
return r.getResource(ot.Spec.Source.Namespace, ot.Spec.Source.Name, obj)
}
func (r *ObjectTransferReconciler) getTargetResource(ot *cdiv1.ObjectTransfer, obj client.Object) (bool, error) {
return r.getResource(getTransferTargetNamespace(ot), getTransferTargetName(ot), obj)
}
func (r *ObjectTransferReconciler) getCondition(ot *cdiv1.ObjectTransfer, t cdiv1.ObjectTransferConditionType) *cdiv1.ObjectTransferCondition {
for i := range ot.Status.Conditions {
c := &ot.Status.Conditions[i]
if c.Type == t {
return c
}
}
return nil
}
func (r *ObjectTransferReconciler) setAndUpdateCompleteCondition(ot *cdiv1.ObjectTransfer, status corev1.ConditionStatus, message, reason string) error {
return r.setAndUpdateCondition(ot, cdiv1.ObjectTransferConditionComplete, status, message, reason)
}
func (r *ObjectTransferReconciler) setCompleteConditionError(ot *cdiv1.ObjectTransfer, lastError error) error {
if ot.Status.Phase == cdiv1.ObjectTransferRunning {
ot.Status.Phase = cdiv1.ObjectTransferError
}
if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Error", lastError.Error()); err != nil {
return err
}
return lastError
}
func (r *ObjectTransferReconciler) setCompleteConditionRunning(ot *cdiv1.ObjectTransfer) error {
ot.Status.Phase = cdiv1.ObjectTransferRunning
return r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Running", "")
}
func (r *ObjectTransferReconciler) setAndUpdateCondition(ot *cdiv1.ObjectTransfer, t cdiv1.ObjectTransferConditionType, status corev1.ConditionStatus, message, reason string) error {
r.logger(ot).V(1).Info("Updating condition", "type", t, "status", status, "message", message, "reason", reason)
ot2 := &cdiv1.ObjectTransfer{}
if _, err := r.getResource("", ot.Name, ot2); err != nil {
return err
}
if r.setCondition(ot, t, status, message, reason) || !apiequality.Semantic.DeepEqual(ot.Status, ot2.Status) {
return r.updateResourceStatus(ot, ot)
}
return nil
}
func (r *ObjectTransferReconciler) setCondition(ot *cdiv1.ObjectTransfer, t cdiv1.ObjectTransferConditionType, status corev1.ConditionStatus, message, reason string) bool {
updated := false
cond := r.getCondition(ot, t)
if cond == nil {
ot.Status.Conditions = append(ot.Status.Conditions, cdiv1.ObjectTransferCondition{Type: t})
cond = &ot.Status.Conditions[len(ot.Status.Conditions)-1]
updated = true
}
if cond.Status != status {
cond.Status = status
updated = true
}
if cond.Message != message {
cond.Message = message
updated = true
}
if cond.Reason != reason {
cond.Reason = reason
updated = true
}
if updated {
cond.LastTransitionTime = metav1.Now()
cond.LastHeartbeatTime = cond.LastTransitionTime
}
return updated
}
func (r *ObjectTransferReconciler) createObjectTransferTarget(ot *cdiv1.ObjectTransfer, obj client.Object, mutateFn func(client.Object)) error {
s, ok := ot.Status.Data["source"]
if !ok {
return fmt.Errorf("source spec missing")
}
if err := json.Unmarshal([]byte(s), obj); err != nil {
return err
}
metaObj, err := meta.Accessor(obj)
if err != nil {
return err
}
metaObj.SetNamespace(getTransferTargetNamespace(ot))
metaObj.SetName(getTransferTargetName(ot))
metaObj.SetGenerateName("")
metaObj.SetUID("")
metaObj.SetResourceVersion("")
metaObj.SetGeneration(0)
metaObj.SetSelfLink("")
metaObj.SetCreationTimestamp(metav1.Time{})
metaObj.SetDeletionTimestamp(nil)
metaObj.SetManagedFields(nil)
if ot.Spec.Target.Namespace != nil && *ot.Spec.Target.Namespace != ot.Spec.Source.Namespace {
metaObj.SetOwnerReferences(nil)
}
delete(metaObj.GetAnnotations(), AnnObjectTransferName)
if mutateFn != nil {
mutateFn(obj)
}
return r.Client.Create(context.TODO(), obj)
}
func (r *ObjectTransferReconciler) pendingHelper(ot *cdiv1.ObjectTransfer, obj client.Object, data map[string]string) error {
metaObj, err := meta.Accessor(obj)
if err != nil {
return r.setCompleteConditionError(ot, err)
}
if !r.hasRequiredAnnotations(ot, metaObj) {
if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Required annotation missing", ""); err != nil {
return err
}
return nil
}
v, ok := metaObj.GetAnnotations()[AnnObjectTransferName]
if ok && v != ot.Name {
if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Source in use by another transfer", v); err != nil {
return err
}
return nil
}
if !ok {
if metaObj.GetAnnotations() == nil {
metaObj.SetAnnotations(make(map[string]string))
}
metaObj.GetAnnotations()[AnnObjectTransferName] = ot.Name
if err := r.updateResource(ot, obj); err != nil {
return r.setCompleteConditionError(ot, err)
}
if err := r.setAndUpdateCompleteCondition(ot, corev1.ConditionFalse, "Pending", ""); err != nil {
return err
}
}
bs, err := json.Marshal(obj)
if err != nil {
return r.setCompleteConditionError(ot, err)
}
source := string(bs)
ot.Status.Data = map[string]string{
"source": source,
}
for k, v := range data {
ot.Status.Data[k] = v
}
ot.Status.Phase = cdiv1.ObjectTransferRunning
if err := r.setCompleteConditionRunning(ot); err != nil {
return err
}
return nil
}
func (r *ObjectTransferReconciler) hasRequiredAnnotations(ot *cdiv1.ObjectTransfer, obj metav1.Object) bool {
for rk, rv := range ot.Spec.Source.RequiredAnnotations {
if v, ok := obj.GetAnnotations()[rk]; !ok || v != rv {
return false
}
}
return true
}