[release-v1.57] Manual backport of "Use direct io with qemu-img convert if pod is OOMKilled" (#3309)

* Use direct io with qemu-img convert if target supports it

For a while now we have been switching between cache=none (direct io) and cache=writeback (page cache)
for qemu-img's writes.

We have settled on cache=writeback for quite some time since https://github.com/kubevirt/containerized-data-importer/pull/1919,
however, this has proven to be problematic;
Our pod's live in a constrained memory environment (default limit 600M).
cgroupsv1 compares utilization of page cache vs the host's dirty_ratio.
This means that on a standard system (30% dirty ratio) pages only get forced to disk at 0.3 * HOST_MEM (basically never),
easily triggering OOM on hosts with lots of free memory.

cgroupsv2 does come to the rescue here:
- It considers dirty_ratio against CGROUP_MEM
- Has a new memory.high knob that throttles instead of OOM killing
Sadly, k8s is yet to capitalize on memory.high since this feature is still alpha:
https://kubernetes.io/blog/2023/05/05/qos-memory-resources/
Leaving us with no way to avoid frequent OOMs.

This commit changes the way we write to bypass page cache if the target supports it,
otherwise, fall back to cache=writeback (use page cache).
There have previously been issues where target did not support O_DIRECT. A quick example is tmpfs (ram-based)

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

* Capitalize on cache mode=trynone if importer is being OOMKilled

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>

---------

Signed-off-by: Alex Kalenyuk <akalenyu@redhat.com>
This commit is contained in:
Alex Kalenyuk 2024-06-10 13:36:27 +03:00 committed by GitHub
parent 154d28f0e5
commit 099057c2ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 399 additions and 89 deletions

View File

@ -226,7 +226,7 @@ func importCompleteTerminationMessage(preallocationApplied bool) error {
func newDataProcessor(contentType string, volumeMode v1.PersistentVolumeMode, ds importer.DataSourceInterface, imageSize string, filesystemOverhead float64, preallocation bool) *importer.DataProcessor {
dest := getImporterDestPath(contentType, volumeMode)
processor := importer.NewDataProcessor(ds, dest, common.ImporterDataDir, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation)
processor := importer.NewDataProcessor(ds, dest, common.ImporterDataDir, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, os.Getenv(common.CacheMode))
return processor
}

View File

@ -121,6 +121,8 @@ fi
if [ -n "$DOCKER_CA_CERT_FILE" ]; then
volumes="$volumes -v ${DOCKER_CA_CERT_FILE}:${DOCKERIZED_CUSTOM_CA_PATH}:ro,z"
fi
# add tmpfs path for testing purposes
volumes="$volumes --tmpfs /mnt/cditmpfs"
# Ensure that a bazel server is running
if [ -z "$(${CDI_CRI} ps --format '{{.Names}}' | grep ${BAZEL_BUILDER_SERVER})" ]; then

View File

@ -125,6 +125,10 @@ const (
ImporterPreviousCheckpoint = "IMPORTER_PREVIOUS_CHECKPOINT"
// ImporterFinalCheckpoint provides a constant to capture our env variable "IMPORTER_FINAL_CHECKPOINT"
ImporterFinalCheckpoint = "IMPORTER_FINAL_CHECKPOINT"
// CacheMode provides a constant to capture our env variable "CACHE_MODE"
CacheMode = "CACHE_MODE"
// CacheModeTryNone provides a constant to capture our env variable value for "CACHE_MODE" that tries O_DIRECT writing if target supports it
CacheModeTryNone = "TRYNONE"
// Preallocation provides a constant to capture out env variable "PREALLOCATION"
Preallocation = "PREALLOCATION"
// ImportProxyHTTP provides a constant to capture our env variable "http_proxy"

View File

@ -145,9 +145,14 @@ const (
// AnnVddkInitImageURL saves a per-DV VDDK image URL on the PVC
AnnVddkInitImageURL = AnnAPIGroup + "/storage.pod.vddk.initimageurl"
// AnnRequiresScratch provides a const for our PVC requires scratch annotation
// AnnRequiresScratch provides a const for our PVC requiring scratch annotation
AnnRequiresScratch = AnnAPIGroup + "/storage.import.requiresScratch"
// AnnRequiresDirectIO provides a const for our PVC requiring direct io annotation (due to OOMs we need to try qemu cache=none)
AnnRequiresDirectIO = AnnAPIGroup + "/storage.import.requiresDirectIo"
// OOMKilledReason provides a value that container runtimes must return in the reason field for an OOMKilled container
OOMKilledReason = "OOMKilled"
// AnnContentType provides a const for the PVC content-type
AnnContentType = AnnAPIGroup + "/storage.contentType"
@ -707,7 +712,7 @@ func GetPriorityClass(pvc *corev1.PersistentVolumeClaim) string {
// ShouldDeletePod returns whether the PVC workload pod should be deleted
func ShouldDeletePod(pvc *corev1.PersistentVolumeClaim) bool {
return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.DeletionTimestamp != nil
return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.GetAnnotations()[AnnRequiresDirectIO] == "true" || pvc.DeletionTimestamp != nil
}
// AddFinalizer adds a finalizer to a resource

View File

@ -99,6 +99,7 @@ type importPodEnvVar struct {
certConfigMapProxy string
extraHeaders []string
secretExtraHeaders []string
cacheMode string
}
type importerPodArgs struct {
@ -374,16 +375,22 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p
setAnnotationsFromPodWithPrefix(anno, pod, cc.AnnRunningCondition)
scratchExitCode := false
if pod.Status.ContainerStatuses != nil &&
pod.Status.ContainerStatuses[0].LastTerminationState.Terminated != nil &&
pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode > 0 {
log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode)
if pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode == common.ScratchSpaceNeededExitCode {
log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name)
scratchExitCode = true
anno[cc.AnnRequiresScratch] = "true"
} else {
r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.Message)
podModificationsNeeded := scratchExitCode
if statuses := pod.Status.ContainerStatuses; len(statuses) > 0 {
if isOOMKilled(statuses[0]) {
log.V(1).Info("Pod died of an OOM, deleting pod, and restarting with qemu cache mode=none if storage supports it", "pod.Name", pod.Name)
podModificationsNeeded = true
anno[cc.AnnRequiresDirectIO] = "true"
}
if terminated := statuses[0].LastTerminationState.Terminated; terminated != nil && terminated.ExitCode > 0 {
if terminated.ExitCode == common.ScratchSpaceNeededExitCode {
log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name)
podModificationsNeeded = true
anno[cc.AnnRequiresScratch] = "true"
} else {
log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", terminated.ExitCode)
r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, terminated.Message)
}
}
}
@ -392,12 +399,18 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p
}
anno[cc.AnnImportPod] = string(pod.Name)
if !scratchExitCode {
if !podModificationsNeeded {
// No scratch exit code, update the phase based on the pod. If we do have scratch exit code we don't want to update the
// phase, because the pod might terminate cleanly and mistakenly mark the import complete.
anno[cc.AnnPodPhase] = string(pod.Status.Phase)
}
for _, ev := range pod.Spec.Containers[0].Env {
if ev.Name == common.CacheMode && ev.Value == common.CacheModeTryNone {
anno[cc.AnnRequiresDirectIO] = "false"
}
}
// Check if the POD is waiting for scratch space, if so create some.
if pod.Status.Phase == corev1.PodPending && r.requiresScratchSpace(pvc) {
if err := r.createScratchPvcForPod(pvc, pod); err != nil {
@ -426,8 +439,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p
log.V(1).Info("Updated PVC", "pvc.anno.Phase", anno[cc.AnnPodPhase], "pvc.anno.Restarts", anno[cc.AnnPodRestarts])
}
if cc.IsPVCComplete(pvc) || scratchExitCode {
if !scratchExitCode {
if cc.IsPVCComplete(pvc) || podModificationsNeeded {
if !podModificationsNeeded {
r.recorder.Event(pvc, corev1.EventTypeNormal, ImportSucceededPVC, "Import Successful")
log.V(1).Info("Import completed successfully")
}
@ -621,6 +634,11 @@ func (r *ImportReconciler) createImportEnvVar(pvc *corev1.PersistentVolumeClaim)
if err != nil {
return nil, err
}
if v, ok := pvc.Annotations[cc.AnnRequiresDirectIO]; ok && v == "true" {
podEnvVar.cacheMode = common.CacheModeTryNone
}
return podEnvVar, nil
}
@ -1299,6 +1317,10 @@ func makeImportEnv(podEnvVar *importPodEnvVar, uid types.UID) []corev1.EnvVar {
Name: common.Preallocation,
Value: strconv.FormatBool(podEnvVar.preallocation),
},
{
Name: common.CacheMode,
Value: podEnvVar.cacheMode,
},
}
if podEnvVar.secretName != "" && podEnvVar.source != cc.SourceGCS {
env = append(env, corev1.EnvVar{
@ -1350,3 +1372,18 @@ func makeImportEnv(podEnvVar *importPodEnvVar, uid types.UID) []corev1.EnvVar {
}
return env
}
func isOOMKilled(status v1.ContainerStatus) bool {
if terminated := status.State.Terminated; terminated != nil {
if terminated.Reason == cc.OOMKilledReason {
return true
}
}
if terminated := status.LastTerminationState.Terminated; terminated != nil {
if terminated.Reason == cc.OOMKilledReason {
return true
}
}
return false
}

View File

@ -447,10 +447,10 @@ var _ = Describe("Update PVC from POD", func() {
})
It("Should create scratch PVC, if pod is pending and PVC is marked with scratch", func() {
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
scratchPvc := &corev1.PersistentVolumeClaim{}
scratchPvc.Name = "testPvc1-scratch"
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodPending), cc.AnnRequiresScratch: "true"}, nil, corev1.ClaimBound)
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName)
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc)
pod.Status = corev1.PodStatus{
Phase: corev1.PodPending,
ContainerStatuses: []v1.ContainerStatus{
@ -468,7 +468,6 @@ var _ = Describe("Update PVC from POD", func() {
Expect(err).ToNot(HaveOccurred())
By("Checking scratch PVC has been created")
// Once all controllers are converted, we will use the runtime lib client instead of client-go and retrieval needs to change here.
scratchPvc := &v1.PersistentVolumeClaim{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "testPvc1-scratch", Namespace: "default"}, scratchPvc)
Expect(err).ToNot(HaveOccurred())
Expect(scratchPvc.Spec.Resources).To(Equal(pvc.Spec.Resources))
@ -533,9 +532,9 @@ var _ = Describe("Update PVC from POD", func() {
It("Should NOT update phase on PVC, if pod exited with error state that is scratchspace exit", func() {
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound)
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName)
scratchPvc := &corev1.PersistentVolumeClaim{}
scratchPvc.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc)
pod.Status = corev1.PodStatus{
Phase: corev1.PodPending,
ContainerStatuses: []corev1.ContainerStatus{
@ -634,9 +633,9 @@ var _ = Describe("Update PVC from POD", func() {
It("Should copy VDDK connection information to annotations on PVC", func() {
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning), cc.AnnSource: cc.SourceVDDK}, nil, corev1.ClaimBound)
scratchPvcName := &corev1.PersistentVolumeClaim{}
scratchPvcName.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName)
scratchPvc := &corev1.PersistentVolumeClaim{}
scratchPvc.Name = "testPvc1-scratch"
pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc)
pod.Status = corev1.PodStatus{
Phase: corev1.PodSucceeded,
ContainerStatuses: []corev1.ContainerStatus{
@ -669,9 +668,10 @@ var _ = Describe("Update PVC from POD", func() {
It("Should delete pod for scratch space even if retainAfterCompletion is set", func() {
annotations := map[string]string{
cc.AnnEndpoint: testEndPoint,
cc.AnnImportPod: "testpod",
cc.AnnRequiresScratch: "true",
cc.AnnEndpoint: testEndPoint,
cc.AnnImportPod: "testpod",
// gets added by controller
// cc.AnnRequiresScratch: "true",
cc.AnnSource: cc.SourceVDDK,
cc.AnnPodRetainAfterCompletion: "true",
}
@ -706,6 +706,60 @@ var _ = Describe("Update PVC from POD", func() {
Expect(err.Error()).To(ContainSubstring("\"importer-testPvc1\" not found"))
})
It("Should delete pod in favor of recreating with cache=trynone in case of OOMKilled", func() {
annotations := map[string]string{
cc.AnnEndpoint: testEndPoint,
cc.AnnSource: cc.SourceRegistry,
cc.AnnRegistryImportMethod: string(cdiv1.RegistryPullNode),
}
pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, annotations, nil, corev1.ClaimPending)
reconciler = createImportReconciler(pvc)
_, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
// First reconcile decides pods name, second creates it
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
// Simulate OOMKilled on pod
resPod := &corev1.Pod{}
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod)
Expect(err).ToNot(HaveOccurred())
resPod.Status = corev1.PodStatus{
Phase: corev1.PodRunning,
ContainerStatuses: []corev1.ContainerStatus{
{
State: v1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 137,
// This is an API
// https://github.com/kubernetes/kubernetes/blob/e38531e9a2359c2ba1505cb04d62d6810edc616e/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go#L5822-L5823
Reason: cc.OOMKilledReason,
},
},
},
},
}
err = reconciler.client.Status().Update(context.TODO(), resPod)
Expect(err).ToNot(HaveOccurred())
// Reconcile picks OOMKilled and deletes pod
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("\"importer-testPvc1\" not found"))
// Next reconcile recreates pod with cache=trynone
_, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}})
Expect(err).ToNot(HaveOccurred())
err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod)
Expect(err).ToNot(HaveOccurred())
Expect(resPod.Spec.Containers[0].Env).To(ContainElement(
corev1.EnvVar{
Name: common.CacheMode,
Value: common.CacheModeTryNone,
},
))
})
})
var _ = Describe("Create Importer Pod", func() {
@ -1111,6 +1165,10 @@ func createImportTestEnv(podEnvVar *importPodEnvVar, uid string) []corev1.EnvVar
Name: common.Preallocation,
Value: strconv.FormatBool(podEnvVar.preallocation),
},
{
Name: common.CacheMode,
Value: podEnvVar.cacheMode,
},
}
if podEnvVar.secretName != "" {

View File

@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"directio.go",
"filefmt.go",
"nbdkit.go",
"qemu.go",
@ -33,6 +34,7 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/common:go_default_library",
"//pkg/system:go_default_library",
"//tests/reporters:go_default_library",
"//vendor/github.com/onsi/ginkgo:go_default_library",

80
pkg/image/directio.go Normal file
View File

@ -0,0 +1,80 @@
/*
Copyright 2023 The CDI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package image
import (
"io"
"os"
"syscall"
"github.com/pkg/errors"
"k8s.io/klog/v2"
)
// DirectIOChecker checks if a certain destination supports direct I/O (bypassing page cache)
type DirectIOChecker interface {
CheckBlockDevice(path string) (bool, error)
CheckFile(path string) (bool, error)
}
type directIOChecker struct{}
// NewDirectIOChecker returns a new direct I/O checker
func NewDirectIOChecker() DirectIOChecker {
return &directIOChecker{}
}
func (c *directIOChecker) CheckBlockDevice(path string) (bool, error) {
return c.check(path, syscall.O_RDONLY)
}
func (c *directIOChecker) CheckFile(path string) (bool, error) {
flags := syscall.O_RDONLY
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
// try to create the file and perform the check
flags = flags | syscall.O_CREAT
defer os.Remove(path)
}
return c.check(path, flags)
}
// based on https://github.com/kubevirt/kubevirt/blob/c4fc4ab72a868399f5331438f35b8c33e7dd0720/pkg/virt-launcher/virtwrap/converter/converter.go#L346
func (c *directIOChecker) check(path string, flags int) (bool, error) {
// #nosec No risk for path injection as we only open the file, not read from it. The function leaks only whether the directory to `path` exists.
f, err := os.OpenFile(path, flags|syscall.O_DIRECT, 0600)
if err != nil {
// EINVAL is returned if the filesystem does not support the O_DIRECT flag
if perr := (&os.PathError{}); errors.As(err, &perr) && errors.Is(perr, syscall.EINVAL) {
// #nosec No risk for path injection as we only open the file, not read from it. The function leaks only whether the directory to `path` exists.
f, err := os.OpenFile(path, flags & ^syscall.O_DIRECT, 0600)
if err == nil {
defer closeIOAndCheckErr(f)
return false, nil
}
}
return false, err
}
defer closeIOAndCheckErr(f)
return true, nil
}
func closeIOAndCheckErr(c io.Closer) {
if ferr := c.Close(); ferr != nil {
klog.Errorf("Error when closing file: \n%s\n", ferr)
}
}

View File

@ -60,7 +60,7 @@ type ImgInfo struct {
// QEMUOperations defines the interface for executing qemu subprocesses
type QEMUOperations interface {
ConvertToRawStream(*url.URL, string, bool) error
ConvertToRawStream(*url.URL, string, bool, string) error
Resize(string, resource.Quantity, bool) error
Info(url *url.URL) (*ImgInfo, error)
Validate(*url.URL, int64) error
@ -94,6 +94,7 @@ var (
{"--preallocation=falloc"},
{"--preallocation=full"},
}
odirectChecker = NewDirectIOChecker()
)
func init() {
@ -114,9 +115,12 @@ func NewQEMUOperations() QEMUOperations {
return &qemuOperations{}
}
func convertToRaw(src, dest string, preallocate bool) error {
args := []string{"convert", "-t", "writeback", "-p", "-O", "raw", src, dest}
var err error
func convertToRaw(src, dest string, preallocate bool, cacheMode string) error {
cacheMode, err := getCacheMode(dest, cacheMode)
if err != nil {
return err
}
args := []string{"convert", "-t", cacheMode, "-p", "-O", "raw", src, dest}
if preallocate {
err = addPreallocation(args, convertPreallocationMethods, func(args []string) ([]byte, error) {
@ -138,11 +142,38 @@ func convertToRaw(src, dest string, preallocate bool) error {
return nil
}
func (o *qemuOperations) ConvertToRawStream(url *url.URL, dest string, preallocate bool) error {
func getCacheMode(path string, cacheMode string) (string, error) {
if cacheMode != common.CacheModeTryNone {
return "writeback", nil
}
var supportDirectIO bool
isDevice, err := util.IsDevice(path)
if err != nil {
return "", err
}
if isDevice {
supportDirectIO, err = odirectChecker.CheckBlockDevice(path)
} else {
supportDirectIO, err = odirectChecker.CheckFile(path)
}
if err != nil {
return "", err
}
if supportDirectIO {
return "none", nil
}
return "writeback", nil
}
func (o *qemuOperations) ConvertToRawStream(url *url.URL, dest string, preallocate bool, cacheMode string) error {
if len(url.Scheme) > 0 && url.Scheme != "nbd+unix" {
return fmt.Errorf("not valid schema %s", url.Scheme)
}
return convertToRaw(url.String(), dest, preallocate)
return convertToRaw(url.String(), dest, preallocate, cacheMode)
}
// convertQuantityToQemuSize translates a quantity string into a Qemu compatible string.
@ -242,8 +273,8 @@ func (o *qemuOperations) Validate(url *url.URL, availableSize int64) error {
}
// ConvertToRawStream converts an http accessible image to raw format without locally caching the image
func ConvertToRawStream(url *url.URL, dest string, preallocate bool) error {
return qemuIterface.ConvertToRawStream(url, dest, preallocate)
func ConvertToRawStream(url *url.URL, dest string, preallocate bool, cacheMode string) error {
return qemuIterface.ConvertToRawStream(url, dest, preallocate, cacheMode)
}
// Validate does basic validation of a qemu image
@ -295,9 +326,17 @@ func (o *qemuOperations) CreateBlankImage(dest string, size resource.Quantity, p
return nil
}
func execPreallocation(dest string, bs, count, offset int64) error {
args := []string{"if=/dev/zero", "of=" + dest, fmt.Sprintf("bs=%d", bs), fmt.Sprintf("count=%d", count), fmt.Sprintf("seek=%d", offset), "oflag=seek_bytes"}
_, err := qemuExecFunction(nil, nil, "dd", args...)
func execPreallocationBlock(dest string, bs, count, offset int64) error {
oflag := "oflag=seek_bytes"
supportDirectIO, err := odirectChecker.CheckBlockDevice(dest)
if err != nil {
return err
}
if supportDirectIO {
oflag += ",direct"
}
args := []string{"if=/dev/zero", "of=" + dest, fmt.Sprintf("bs=%d", bs), fmt.Sprintf("count=%d", count), fmt.Sprintf("seek=%d", offset), oflag}
_, err = qemuExecFunction(nil, nil, "dd", args...)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("Could not preallocate blank block volume at %s, running dd for size %d, offset %d", dest, bs*count, offset))
}
@ -314,12 +353,12 @@ func PreallocateBlankBlock(dest string, size resource.Quantity) error {
return errors.Wrap(err, fmt.Sprintf("Could not parse size for preallocating blank block volume at %s with size %s", dest, size.String()))
}
countBlocks, remainder := qemuSize/units.MiB, qemuSize%units.MiB
err = execPreallocation(dest, units.MiB, countBlocks, 0)
err = execPreallocationBlock(dest, units.MiB, countBlocks, 0)
if err != nil {
return err
}
if remainder != 0 {
return execPreallocation(dest, remainder, 1, countBlocks*units.MiB)
return execPreallocationBlock(dest, remainder, 1, countBlocks*units.MiB)
}
return nil
}

View File

@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
@ -33,6 +34,7 @@ import (
dto "github.com/prometheus/client_model/go"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/system"
"github.com/prometheus/client_golang/prometheus"
@ -133,7 +135,9 @@ var _ = Describe("Convert to Raw", func() {
var tmpDir, destPath string
BeforeEach(func() {
tmpDir, err := os.MkdirTemp(os.TempDir(), "qemutestdest")
var err error
// dest is usually not tmpfs, stay honest in unit tests as well
tmpDir, err = os.MkdirTemp("/var/tmp", "qemutestdest")
Expect(err).NotTo(HaveOccurred())
By("tmpDir: " + tmpDir)
destPath = filepath.Join(tmpDir, "dest")
@ -147,14 +151,14 @@ var _ = Describe("Convert to Raw", func() {
It("should return no error if exec function returns no error", func() {
replaceExecFunction(mockExecFunction("", "", nil, "convert", "-p", "-O", "raw", "source", destPath), func() {
err := convertToRaw("source", destPath, false)
err := convertToRaw("source", destPath, false, "")
Expect(err).NotTo(HaveOccurred())
})
})
It("should return conversion error if exec function returns error", func() {
replaceExecFunction(mockExecFunction("", "exit 1", nil, "convert", "-p", "-O", "raw", "source", destPath), func() {
err := convertToRaw("source", destPath, false)
err := convertToRaw("source", destPath, false, "")
Expect(err).To(HaveOccurred())
Expect(strings.Contains(err.Error(), "could not convert image to raw")).To(BeTrue())
})
@ -164,7 +168,7 @@ var _ = Describe("Convert to Raw", func() {
replaceExecFunction(mockExecFunction("", "", nil, "convert", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() {
ep, err := url.Parse("/somefile/somewhere")
Expect(err).NotTo(HaveOccurred())
err = ConvertToRawStream(ep, destPath, false)
err = ConvertToRawStream(ep, destPath, false, "")
Expect(err).NotTo(HaveOccurred())
})
})
@ -173,7 +177,7 @@ var _ = Describe("Convert to Raw", func() {
replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-o", "preallocation=falloc", "-t", "writeback", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() {
ep, err := url.Parse("/somefile/somewhere")
Expect(err).NotTo(HaveOccurred())
err = ConvertToRawStream(ep, destPath, true)
err = ConvertToRawStream(ep, destPath, true, "")
Expect(err).NotTo(HaveOccurred())
})
})
@ -182,10 +186,53 @@ var _ = Describe("Convert to Raw", func() {
replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-t", "writeback", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() {
ep, err := url.Parse("/somefile/somewhere")
Expect(err).NotTo(HaveOccurred())
err = ConvertToRawStream(ep, destPath, false)
err = ConvertToRawStream(ep, destPath, false, "")
Expect(err).NotTo(HaveOccurred())
})
})
Context("cache mode adjusted according to O_DIRECT support", func() {
var tmpFsDir string
BeforeEach(func() {
var err error
tmpFsDir, err = os.MkdirTemp("/mnt/cditmpfs", "qemutestdestontmpfs")
Expect(err).NotTo(HaveOccurred())
By("tmpFsDir: " + tmpFsDir)
})
AfterEach(func() {
os.RemoveAll(tmpFsDir)
})
It("should use cache=none when destination supports O_DIRECT", func() {
replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-t", "none", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() {
ep, err := url.Parse("/somefile/somewhere")
Expect(err).NotTo(HaveOccurred())
err = ConvertToRawStream(ep, destPath, false, common.CacheModeTryNone)
Expect(err).NotTo(HaveOccurred())
})
})
It("should use cache=writeback when destination does not support O_DIRECT", func() {
// ensure tmpfs destination
out, err := exec.Command("/usr/bin/findmnt", "-T", tmpFsDir, "-o", "FSTYPE").CombinedOutput()
Expect(err).NotTo(HaveOccurred())
Expect(string(out)).To(ContainSubstring("tmpfs"))
tmpFsDestPath := filepath.Join(tmpFsDir, "dest")
_, err = os.Create(tmpFsDestPath)
Expect(err).NotTo(HaveOccurred())
replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-t", "writeback", "-p", "-O", "raw", "/somefile/somewhere", tmpFsDestPath), func() {
ep, err := url.Parse("/somefile/somewhere")
Expect(err).NotTo(HaveOccurred())
err = ConvertToRawStream(ep, tmpFsDestPath, false, common.CacheModeTryNone)
Expect(err).NotTo(HaveOccurred())
})
})
})
})
var _ = Describe("Resize", func() {
@ -298,7 +345,8 @@ var _ = Describe("Create blank image", func() {
var tmpDir, destPath string
BeforeEach(func() {
tmpDir, err := os.MkdirTemp(os.TempDir(), "qemutestdest")
var err error
tmpDir, err = os.MkdirTemp(os.TempDir(), "qemutestdest")
Expect(err).NotTo(HaveOccurred())
By("tmpDir: " + tmpDir)
destPath = filepath.Join(tmpDir, "dest")
@ -353,12 +401,45 @@ var _ = Describe("Create blank image", func() {
})
var _ = Describe("Create preallocated blank block", func() {
var tmpDir, tmpFsDir, destPath string
BeforeEach(func() {
var err error
// dest is usually not tmpfs, stay honest in unit tests as well
tmpDir, err = os.MkdirTemp("/var/tmp", "qemutestdest")
Expect(err).NotTo(HaveOccurred())
By("tmpDir: " + tmpDir)
destPath = filepath.Join(tmpDir, "dest")
_, err = os.Create(destPath)
Expect(err).NotTo(HaveOccurred())
tmpFsDir, err = os.MkdirTemp("/mnt/cditmpfs", "qemutestdestontmpfs")
Expect(err).NotTo(HaveOccurred())
By("tmpFsDir: " + tmpFsDir)
})
AfterEach(func() {
os.RemoveAll(tmpDir)
os.RemoveAll(tmpFsDir)
})
It("Should complete successfully if preallocation succeeds", func() {
quantity, err := resource.ParseQuantity("10Gi")
Expect(err).NotTo(HaveOccurred())
dest := "cdi-block-volume"
replaceExecFunction(mockExecFunction("", "", nil, "if=/dev/zero", "of="+dest, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes"), func() {
err = PreallocateBlankBlock(dest, quantity)
replaceExecFunction(mockExecFunction("", "", nil, "if=/dev/zero", "of="+destPath, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes,direct"), func() {
err = PreallocateBlankBlock(destPath, quantity)
Expect(err).NotTo(HaveOccurred())
})
})
It("Should complete successfully with tmpfs dest without O_DIRECT if preallocation succeeds", func() {
tmpFsDestPath := filepath.Join(tmpFsDir, "dest")
_, err := os.Create(tmpFsDestPath)
Expect(err).NotTo(HaveOccurred())
quantity, err := resource.ParseQuantity("10Gi")
Expect(err).NotTo(HaveOccurred())
replaceExecFunction(mockExecFunction("", "", nil, "if=/dev/zero", "of="+tmpFsDestPath, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes"), func() {
err = PreallocateBlankBlock(tmpFsDestPath, quantity)
Expect(err).NotTo(HaveOccurred())
})
})
@ -366,11 +447,10 @@ var _ = Describe("Create preallocated blank block", func() {
It("Should complete successfully with value not aligned to 1MiB", func() {
quantity, err := resource.ParseQuantity("5243392Ki")
Expect(err).NotTo(HaveOccurred())
dest := "cdi-block-volume"
firstCallArgs := []string{"if=/dev/zero", "of=" + dest, "bs=1048576", "count=5120", "seek=0", "oflag=seek_bytes"}
secondCallArgs := []string{"if=/dev/zero", "of=" + dest, "bs=524288", "count=1", "seek=5368709120", "oflag=seek_bytes"}
firstCallArgs := []string{"if=/dev/zero", "of=" + destPath, "bs=1048576", "count=5120", "seek=0", "oflag=seek_bytes,direct"}
secondCallArgs := []string{"if=/dev/zero", "of=" + destPath, "bs=524288", "count=1", "seek=5368709120", "oflag=seek_bytes,direct"}
replaceExecFunction(mockExecFunctionTwoCalls("", "", nil, firstCallArgs, secondCallArgs), func() {
err = PreallocateBlankBlock(dest, quantity)
err = PreallocateBlankBlock(destPath, quantity)
Expect(err).NotTo(HaveOccurred())
})
})
@ -378,9 +458,8 @@ var _ = Describe("Create preallocated blank block", func() {
It("Should fail if preallocation fails", func() {
quantity, err := resource.ParseQuantity("10Gi")
Expect(err).NotTo(HaveOccurred())
dest := "cdi-block-volume"
replaceExecFunction(mockExecFunction("", "exit 1", nil, "if=/dev/zero", "of="+dest, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes"), func() {
err = PreallocateBlankBlock(dest, quantity)
replaceExecFunction(mockExecFunction("", "exit 1", nil, "if=/dev/zero", "of="+destPath, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes,direct"), func() {
err = PreallocateBlankBlock(destPath, quantity)
Expect(strings.Contains(err.Error(), "Could not preallocate blank block volume at")).To(BeTrue())
})
})

View File

@ -123,10 +123,13 @@ type DataProcessor struct {
preallocationApplied bool
// phaseExecutors is a mapping from the given processing phase to its execution function. The function returns the next processing phase or error.
phaseExecutors map[ProcessingPhase]func() (ProcessingPhase, error)
// cacheMode is the mode in which we choose the qemu-img cache mode:
// TRY_NONE = bypass page cache if the target supports it, otherwise, fall back to using page cache
cacheMode string
}
// NewDataProcessor create a new instance of a data processor using the passed in data provider.
func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratchDataDir, requestImageSize string, filesystemOverhead float64, preallocation bool) *DataProcessor {
func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratchDataDir, requestImageSize string, filesystemOverhead float64, preallocation bool, cacheMode string) *DataProcessor {
dp := &DataProcessor{
currentPhase: ProcessingPhaseInfo,
source: dataSource,
@ -136,6 +139,7 @@ func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratch
requestImageSize: requestImageSize,
filesystemOverhead: filesystemOverhead,
preallocation: preallocation,
cacheMode: cacheMode,
}
// Calculate available space before doing anything.
dp.availableSpace = dp.calculateTargetSize()
@ -277,7 +281,7 @@ func (dp *DataProcessor) convert(url *url.URL) (ProcessingPhase, error) {
return ProcessingPhaseError, err
}
klog.V(3).Infoln("Converting to Raw")
err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation)
err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation, dp.cacheMode)
if err != nil {
return ProcessingPhaseError, errors.Wrap(err, "Conversion to Raw failed")
}

View File

@ -148,7 +148,7 @@ var _ = Describe("Data Processor", func() {
infoResponse: ProcessingPhaseTransferScratch,
transferResponse: ProcessingPhaseComplete,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
err := dp.ProcessData()
Expect(err).ToNot(HaveOccurred())
Expect(2).To(Equal(len(mdp.calledPhases)))
@ -162,7 +162,7 @@ var _ = Describe("Data Processor", func() {
infoResponse: ProcessingPhaseTransferDataDir,
transferResponse: ProcessingPhaseComplete,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
err := dp.ProcessData()
Expect(err).ToNot(HaveOccurred())
Expect(2).To(Equal(len(mdp.calledPhases)))
@ -176,7 +176,7 @@ var _ = Describe("Data Processor", func() {
infoResponse: ProcessingPhaseTransferScratch,
transferResponse: ProcessingPhaseError,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
err := dp.ProcessData()
Expect(err).To(HaveOccurred())
Expect(2).To(Equal(len(mdp.calledPhases)))
@ -190,7 +190,7 @@ var _ = Describe("Data Processor", func() {
transferResponse: ProcessingPhaseError,
needsScratch: true,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
err := dp.ProcessData()
Expect(err).To(HaveOccurred())
Expect(ErrRequiresScratchSpace).To(Equal(err))
@ -204,7 +204,7 @@ var _ = Describe("Data Processor", func() {
infoResponse: ProcessingPhaseTransferDataFile,
transferResponse: ProcessingPhaseComplete,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, nil, nil, nil)
replaceQEMUOperations(qemuOperations, func() {
err := dp.ProcessData()
@ -220,7 +220,7 @@ var _ = Describe("Data Processor", func() {
infoResponse: ProcessingPhaseTransferDataFile,
transferResponse: ProcessingPhaseError,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewQEMUAllErrors()
replaceQEMUOperations(qemuOperations, func() {
err := dp.ProcessData()
@ -235,7 +235,7 @@ var _ = Describe("Data Processor", func() {
mdp := &MockDataProvider{
infoResponse: ProcessingPhase("invalidphase"),
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
err := dp.ProcessData()
Expect(err).To(HaveOccurred())
Expect(1).To(Equal(len(mdp.calledPhases)))
@ -254,7 +254,7 @@ var _ = Describe("Data Processor", func() {
transferResponse: ProcessingPhaseConvert,
url: url,
}
dp := NewDataProcessor(mdp, "", "dataDir", tmpDir, "1G", 0.055, false)
dp := NewDataProcessor(mdp, "", "dataDir", tmpDir, "1G", 0.055, false, "")
dp.availableSpace = int64(1536000)
usableSpace := dp.getUsableSpace()
@ -277,7 +277,7 @@ var _ = Describe("Data Processor", func() {
},
fooResponse: ProcessingPhaseComplete,
}
dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
dp.RegisterPhaseExecutor(ProcessingPhaseFoo, func() (ProcessingPhase, error) {
return mcdp.Foo()
})
@ -299,7 +299,7 @@ var _ = Describe("Data Processor", func() {
},
fooResponse: ProcessingPhaseInfo,
}
dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
dp.RegisterPhaseExecutor(ProcessingPhaseFoo, func() (ProcessingPhase, error) {
return mcdp.Foo()
})
@ -311,7 +311,7 @@ var _ = Describe("Data Processor", func() {
mdp := &MockDataProvider{
infoResponse: "unknown",
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
err := dp.ProcessData()
Expect(err).To(HaveOccurred())
})
@ -324,7 +324,7 @@ var _ = Describe("Convert", func() {
mdp := &MockDataProvider{
url: url,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, nil, nil, nil)
replaceQEMUOperations(qemuOperations, func() {
nextPhase, err := dp.convert(mdp.GetURL())
@ -339,7 +339,7 @@ var _ = Describe("Convert", func() {
mdp := &MockDataProvider{
url: url,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, errors.New("Validation failure"), nil, nil)
replaceQEMUOperations(qemuOperations, func() {
nextPhase, err := dp.convert(mdp.GetURL())
@ -354,7 +354,7 @@ var _ = Describe("Convert", func() {
mdp := &MockDataProvider{
url: url,
}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewFakeQEMUOperations(errors.New("Conversion failure"), nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, nil, nil, nil)
replaceQEMUOperations(qemuOperations, func() {
nextPhase, err := dp.convert(mdp.GetURL())
@ -373,7 +373,7 @@ var _ = Describe("Resize", func() {
mdp := &MockDataProvider{
url: url,
}
dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "", 0.055, false)
dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "", 0.055, false, "")
qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, nil}, nil, nil, nil)
replaceQEMUOperations(qemuOperations, func() {
nextPhase, err := dp.resize()
@ -395,7 +395,7 @@ var _ = Describe("Resize", func() {
mdp := &MockDataProvider{
url: url,
}
dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, nil}, nil, nil, nil)
replaceQEMUOperations(qemuOperations, func() {
nextPhase, err := dp.resize()
@ -413,7 +413,7 @@ var _ = Describe("Resize", func() {
mdp := &MockDataProvider{
url: url,
}
dp := NewDataProcessor(mdp, tmpDir, tmpDir, "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, tmpDir, tmpDir, "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, nil}, nil, nil, nil)
replaceQEMUOperations(qemuOperations, func() {
nextPhase, err := dp.resize()
@ -430,7 +430,7 @@ var _ = Describe("Resize", func() {
mdp := &MockDataProvider{
url: url,
}
dp := NewDataProcessor(mdp, "dest", tmpDir, "scratchDataDir", "1G", 0.055, false)
dp := NewDataProcessor(mdp, "dest", tmpDir, "scratchDataDir", "1G", 0.055, false, "")
qemuOperations := NewQEMUAllErrors()
replaceQEMUOperations(qemuOperations, func() {
nextPhase, err := dp.resize()
@ -444,7 +444,7 @@ var _ = Describe("Resize", func() {
return int64(100000), nil
}, func() {
mdp := &MockDataProvider{}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "")
Expect(int64(100000)).To(Equal(dp.calculateTargetSize()))
})
})
@ -454,7 +454,7 @@ var _ = Describe("Resize", func() {
return int64(-1), errors.New("error")
}, func() {
mdp := &MockDataProvider{}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "")
// We just log the error if one happens.
Expect(int64(-1)).To(Equal(dp.calculateTargetSize()))
@ -485,7 +485,7 @@ var _ = Describe("ResizeImage", func() {
var _ = Describe("DataProcessorResume", func() {
It("Should fail with an error if the data provider cannot resume", func() {
mdp := &MockDataProvider{}
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false)
dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "")
err := dp.ProcessDataResume()
Expect(err).To(HaveOccurred())
})
@ -494,7 +494,7 @@ var _ = Describe("DataProcessorResume", func() {
amdp := &MockAsyncDataProvider{
ResumePhase: ProcessingPhaseComplete,
}
dp := NewDataProcessor(amdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false)
dp := NewDataProcessor(amdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "")
err := dp.ProcessDataResume()
Expect(err).ToNot(HaveOccurred())
})
@ -515,7 +515,7 @@ var _ = Describe("MergeDelta", func() {
url: url,
}
dp := NewDataProcessor(mdp, expectedBackingFile, "dataDir", "scratchDataDir", "", 0.055, false)
dp := NewDataProcessor(mdp, expectedBackingFile, "dataDir", "scratchDataDir", "", 0.055, false, "")
err := errors.New("this operation should not be called")
info := &image.ImgInfo{
Format: "",
@ -560,7 +560,7 @@ func NewFakeQEMUOperations(e2, e3 error, ret4 fakeInfoOpRetVal, e5 error, e6 err
return &fakeQEMUOperations{e2, e3, ret4, e5, e6, targetResize}
}
func (o *fakeQEMUOperations) ConvertToRawStream(*url.URL, string, bool) error {
func (o *fakeQEMUOperations) ConvertToRawStream(*url.URL, string, bool, string) error {
return o.e2
}

View File

@ -439,7 +439,7 @@ func newAsyncUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string,
}
uds := importer.NewAsyncUploadDataSource(newContentReader(stream, sourceContentType))
processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation)
processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
return processor, processor.ProcessDataWithPause()
}
@ -450,7 +450,7 @@ func newUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, file
// Clone block device to block device or file system
uds := importer.NewUploadDataSource(newContentReader(stream, sourceContentType), dvContentType)
processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation)
processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "")
err := processor.ProcessData()
return processor.PreallocationApplied(), err
}

View File

@ -154,11 +154,11 @@ func (amd *AsyncMockDataSource) GetResumePhase() importer.ProcessingPhase {
}
func saveAsyncProcessorSuccess(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, contentType string) (*importer.DataProcessor, error) {
return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false), nil
return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false, ""), nil
}
func saveAsyncProcessorFailure(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, contentType string) (*importer.DataProcessor, error) {
return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false), fmt.Errorf("Error using datastream")
return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false, ""), fmt.Errorf("Error using datastream")
}
func withAsyncProcessorSuccess(f func()) {