containerized-data-importer/pkg/controller/populators/upload-populator.go
Shelly Kagan e6c835c7c1
Upload populator (#2678)
* Create CRD for volumeuploadsource populator

This CRD will be used in the DataSourceRef on PVCs
to trigger population that upload to the volume.
This will be performed by the upload populator
that will be added in future commits.

Signed-off-by: Shelly Kagan <skagan@redhat.com>

* Create upload populator controller

The upload populator controller can be used
standalone without the need of datavolume.
It reconciles pvc with upload dataSourceRef
and uses populators API to populated the pvc
with an upload command.
The controller creates pvc' with upload
annotation. After the upload completes it
rebinds the pv to the original target pvc and
deletes pvc prime.
Eventually we get a bound PVC which is already
populated.

Signed-off-by: Shelly Kagan <skagan@redhat.com>

* Adjust upload-proxy to handle upload population

In case of pvc with datasourceref to upload population
we should create the url to the upload server with the
pvc' name.

Signed-off-by: Shelly Kagan <skagan@redhat.com>

* Add tests for upload population

Signed-off-by: Shelly Kagan <skagan@redhat.com>

* Add unit tests for upload populator

Signed-off-by: Shelly Kagan <skagan@redhat.com>

* Add preallocation to volumeuploadsource crd

Also some other small fixes

Signed-off-by: Shelly Kagan <skagan@redhat.com>

---------

Signed-off-by: Shelly Kagan <skagan@redhat.com>
2023-05-04 08:24:42 +02:00

172 lines
5.8 KiB
Go

/*
Copyright 2023 The CDI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package populators
import (
"context"
"fmt"
"strconv"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
cdiv1 "kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1"
cc "kubevirt.io/containerized-data-importer/pkg/controller/common"
featuregates "kubevirt.io/containerized-data-importer/pkg/feature-gates"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)
const (
uploadPopulatorName = "upload-populator"
// errUploadFailed provides a const to indicate upload has failed
errUploadFailed = "UploadFailed"
// uploadSucceeded provides a const to indicate upload has succeeded
uploadSucceeded = "UploadSucceeded"
// messageUploadFailed provides a const to form upload has failed message
messageUploadFailed = "Upload into %s failed"
// messageUploadSucceeded provides a const to form upload has succeeded message
messageUploadSucceeded = "Successfully uploaded into %s"
)
// UploadPopulatorReconciler members
type UploadPopulatorReconciler struct {
ReconcilerBase
}
// NewUploadPopulator creates a new instance of the upload populator
func NewUploadPopulator(
ctx context.Context,
mgr manager.Manager,
log logr.Logger,
installerLabels map[string]string,
) (controller.Controller, error) {
client := mgr.GetClient()
reconciler := &UploadPopulatorReconciler{
ReconcilerBase: ReconcilerBase{
client: client,
scheme: mgr.GetScheme(),
log: log.WithName(uploadPopulatorName),
recorder: mgr.GetEventRecorderFor(uploadPopulatorName),
featureGates: featuregates.NewFeatureGates(client),
installerLabels: installerLabels,
sourceKind: cdiv1.VolumeUploadSourceRef,
},
}
uploadPopulator, err := controller.New(uploadPopulatorName, mgr, controller.Options{
Reconciler: reconciler,
})
if err != nil {
return nil, err
}
if err := addCommonPopulatorsWatches(mgr, uploadPopulator, log, cdiv1.VolumeUploadSourceRef, &cdiv1.VolumeUploadSource{}); err != nil {
return nil, err
}
return uploadPopulator, nil
}
// Reconcile the reconcile loop for the PVC with DataSourceRef of VolumeUploadSource kind
func (r *UploadPopulatorReconciler) Reconcile(_ context.Context, req reconcile.Request) (reconcile.Result, error) {
log := r.log.WithValues("PVC", req.NamespacedName)
log.V(1).Info("reconciling Upload Source PVCs")
return r.reconcile(req, r, log)
}
// upload-specific implementation of getPopulationSource
func (r *UploadPopulatorReconciler) getPopulationSource(namespace, name string) (client.Object, error) {
uploadSource := &cdiv1.VolumeUploadSource{}
uploadSourceKey := types.NamespacedName{Namespace: namespace, Name: name}
if err := r.client.Get(context.TODO(), uploadSourceKey, uploadSource); err != nil {
// reconcile will be triggered once created
if k8serrors.IsNotFound(err) {
return nil, nil
}
return nil, err
}
return uploadSource, nil
}
// Upload-specific implementation of updatePVCForPopulation
func (r *UploadPopulatorReconciler) updatePVCForPopulation(pvc *corev1.PersistentVolumeClaim, source client.Object) {
pvc.Annotations[cc.AnnUploadRequest] = ""
uploadSource := source.(*cdiv1.VolumeUploadSource)
pvc.Annotations[cc.AnnContentType] = cc.GetContentType(string(uploadSource.Spec.ContentType))
pvc.Annotations[cc.AnnPopulatorKind] = cdiv1.VolumeUploadSourceRef
pvc.Annotations[cc.AnnPreallocationRequested] = strconv.FormatBool(cc.GetPreallocation(r.client, uploadSource.Spec.Preallocation))
}
func (r *UploadPopulatorReconciler) updatePVCPrimeNameAnnotation(pvc *corev1.PersistentVolumeClaim, pvcPrimeName string) (bool, error) {
if _, ok := pvc.Annotations[AnnPVCPrimeName]; ok {
return false, nil
}
if pvc.Annotations == nil {
pvc.Annotations = make(map[string]string)
}
pvc.Annotations[AnnPVCPrimeName] = pvcPrimeName
if err := r.client.Update(context.TODO(), pvc); err != nil {
return false, err
}
return true, nil
}
func (r *UploadPopulatorReconciler) removePVCPrimeNameAnnotation(pvc *corev1.PersistentVolumeClaim) error {
if _, ok := pvc.Annotations[AnnPVCPrimeName]; !ok {
return nil
}
delete(pvc.Annotations, AnnPVCPrimeName)
return r.client.Update(context.TODO(), pvc)
}
func (r *UploadPopulatorReconciler) reconcileTargetPVC(pvc, pvcPrime *corev1.PersistentVolumeClaim) (reconcile.Result, error) {
updated, err := r.updatePVCPrimeNameAnnotation(pvc, pvcPrime.Name)
if updated || err != nil {
// wait for the annotation to be updated
return reconcile.Result{}, err
}
// Wait upload completes
phase := pvcPrime.Annotations[cc.AnnPodPhase]
switch phase {
case string(corev1.PodFailed):
// We'll get called later once it succeeds
r.recorder.Eventf(pvcPrime, corev1.EventTypeWarning, errUploadFailed, fmt.Sprintf(messageUploadFailed, pvc.Name))
case string(corev1.PodSucceeded):
err := r.removePVCPrimeNameAnnotation(pvc)
if err != nil {
return reconcile.Result{}, err
}
// Once the upload is succeeded, we rebind the PV from PVC' to target PVC
if err := r.rebindPV(pvc, pvcPrime); err != nil {
return reconcile.Result{}, err
}
r.recorder.Eventf(pvc, corev1.EventTypeNormal, uploadSucceeded, fmt.Sprintf(messageUploadSucceeded, pvc))
}
return reconcile.Result{}, nil
}