diff --git a/cmd/cdi-importer/importer.go b/cmd/cdi-importer/importer.go index 54eaf491d..f61c22156 100644 --- a/cmd/cdi-importer/importer.go +++ b/cmd/cdi-importer/importer.go @@ -226,7 +226,7 @@ func importCompleteTerminationMessage(preallocationApplied bool) error { func newDataProcessor(contentType string, volumeMode v1.PersistentVolumeMode, ds importer.DataSourceInterface, imageSize string, filesystemOverhead float64, preallocation bool) *importer.DataProcessor { dest := getImporterDestPath(contentType, volumeMode) - processor := importer.NewDataProcessor(ds, dest, common.ImporterDataDir, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation) + processor := importer.NewDataProcessor(ds, dest, common.ImporterDataDir, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, os.Getenv(common.CacheMode)) return processor } diff --git a/hack/build/bazel-docker.sh b/hack/build/bazel-docker.sh index 840393300..5925adef5 100755 --- a/hack/build/bazel-docker.sh +++ b/hack/build/bazel-docker.sh @@ -121,6 +121,8 @@ fi if [ -n "$DOCKER_CA_CERT_FILE" ]; then volumes="$volumes -v ${DOCKER_CA_CERT_FILE}:${DOCKERIZED_CUSTOM_CA_PATH}:ro,z" fi +# add tmpfs path for testing purposes +volumes="$volumes --tmpfs /mnt/cditmpfs" # Ensure that a bazel server is running if [ -z "$(${CDI_CRI} ps --format '{{.Names}}' | grep ${BAZEL_BUILDER_SERVER})" ]; then diff --git a/pkg/common/common.go b/pkg/common/common.go index d11adb987..1e051dd02 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -125,6 +125,10 @@ const ( ImporterPreviousCheckpoint = "IMPORTER_PREVIOUS_CHECKPOINT" // ImporterFinalCheckpoint provides a constant to capture our env variable "IMPORTER_FINAL_CHECKPOINT" ImporterFinalCheckpoint = "IMPORTER_FINAL_CHECKPOINT" + // CacheMode provides a constant to capture our env variable "CACHE_MODE" + CacheMode = "CACHE_MODE" + // CacheModeTryNone provides a constant to capture our env variable value for "CACHE_MODE" that tries O_DIRECT writing if target supports it + CacheModeTryNone = "TRYNONE" // Preallocation provides a constant to capture out env variable "PREALLOCATION" Preallocation = "PREALLOCATION" // ImportProxyHTTP provides a constant to capture our env variable "http_proxy" diff --git a/pkg/controller/common/util.go b/pkg/controller/common/util.go index 1ec2c2926..c40312e6c 100644 --- a/pkg/controller/common/util.go +++ b/pkg/controller/common/util.go @@ -145,9 +145,14 @@ const ( // AnnVddkInitImageURL saves a per-DV VDDK image URL on the PVC AnnVddkInitImageURL = AnnAPIGroup + "/storage.pod.vddk.initimageurl" - // AnnRequiresScratch provides a const for our PVC requires scratch annotation + // AnnRequiresScratch provides a const for our PVC requiring scratch annotation AnnRequiresScratch = AnnAPIGroup + "/storage.import.requiresScratch" + // AnnRequiresDirectIO provides a const for our PVC requiring direct io annotation (due to OOMs we need to try qemu cache=none) + AnnRequiresDirectIO = AnnAPIGroup + "/storage.import.requiresDirectIo" + // OOMKilledReason provides a value that container runtimes must return in the reason field for an OOMKilled container + OOMKilledReason = "OOMKilled" + // AnnContentType provides a const for the PVC content-type AnnContentType = AnnAPIGroup + "/storage.contentType" @@ -707,7 +712,7 @@ func GetPriorityClass(pvc *corev1.PersistentVolumeClaim) string { // ShouldDeletePod returns whether the PVC workload pod should be deleted func ShouldDeletePod(pvc *corev1.PersistentVolumeClaim) bool { - return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.DeletionTimestamp != nil + return pvc.GetAnnotations()[AnnPodRetainAfterCompletion] != "true" || pvc.GetAnnotations()[AnnRequiresScratch] == "true" || pvc.GetAnnotations()[AnnRequiresDirectIO] == "true" || pvc.DeletionTimestamp != nil } // AddFinalizer adds a finalizer to a resource diff --git a/pkg/controller/import-controller.go b/pkg/controller/import-controller.go index ef93437f8..254d7537f 100644 --- a/pkg/controller/import-controller.go +++ b/pkg/controller/import-controller.go @@ -99,6 +99,7 @@ type importPodEnvVar struct { certConfigMapProxy string extraHeaders []string secretExtraHeaders []string + cacheMode string } type importerPodArgs struct { @@ -374,16 +375,22 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p setAnnotationsFromPodWithPrefix(anno, pod, cc.AnnRunningCondition) scratchExitCode := false - if pod.Status.ContainerStatuses != nil && - pod.Status.ContainerStatuses[0].LastTerminationState.Terminated != nil && - pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode > 0 { - log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode) - if pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.ExitCode == common.ScratchSpaceNeededExitCode { - log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name) - scratchExitCode = true - anno[cc.AnnRequiresScratch] = "true" - } else { - r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, pod.Status.ContainerStatuses[0].LastTerminationState.Terminated.Message) + podModificationsNeeded := scratchExitCode + if statuses := pod.Status.ContainerStatuses; len(statuses) > 0 { + if isOOMKilled(statuses[0]) { + log.V(1).Info("Pod died of an OOM, deleting pod, and restarting with qemu cache mode=none if storage supports it", "pod.Name", pod.Name) + podModificationsNeeded = true + anno[cc.AnnRequiresDirectIO] = "true" + } + if terminated := statuses[0].LastTerminationState.Terminated; terminated != nil && terminated.ExitCode > 0 { + if terminated.ExitCode == common.ScratchSpaceNeededExitCode { + log.V(1).Info("Pod requires scratch space, terminating pod, and restarting with scratch space", "pod.Name", pod.Name) + podModificationsNeeded = true + anno[cc.AnnRequiresScratch] = "true" + } else { + log.Info("Pod termination code", "pod.Name", pod.Name, "ExitCode", terminated.ExitCode) + r.recorder.Event(pvc, corev1.EventTypeWarning, ErrImportFailedPVC, terminated.Message) + } } } @@ -392,12 +399,18 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p } anno[cc.AnnImportPod] = string(pod.Name) - if !scratchExitCode { + if !podModificationsNeeded { // No scratch exit code, update the phase based on the pod. If we do have scratch exit code we don't want to update the // phase, because the pod might terminate cleanly and mistakenly mark the import complete. anno[cc.AnnPodPhase] = string(pod.Status.Phase) } + for _, ev := range pod.Spec.Containers[0].Env { + if ev.Name == common.CacheMode && ev.Value == common.CacheModeTryNone { + anno[cc.AnnRequiresDirectIO] = "false" + } + } + // Check if the POD is waiting for scratch space, if so create some. if pod.Status.Phase == corev1.PodPending && r.requiresScratchSpace(pvc) { if err := r.createScratchPvcForPod(pvc, pod); err != nil { @@ -426,8 +439,8 @@ func (r *ImportReconciler) updatePvcFromPod(pvc *corev1.PersistentVolumeClaim, p log.V(1).Info("Updated PVC", "pvc.anno.Phase", anno[cc.AnnPodPhase], "pvc.anno.Restarts", anno[cc.AnnPodRestarts]) } - if cc.IsPVCComplete(pvc) || scratchExitCode { - if !scratchExitCode { + if cc.IsPVCComplete(pvc) || podModificationsNeeded { + if !podModificationsNeeded { r.recorder.Event(pvc, corev1.EventTypeNormal, ImportSucceededPVC, "Import Successful") log.V(1).Info("Import completed successfully") } @@ -621,6 +634,11 @@ func (r *ImportReconciler) createImportEnvVar(pvc *corev1.PersistentVolumeClaim) if err != nil { return nil, err } + + if v, ok := pvc.Annotations[cc.AnnRequiresDirectIO]; ok && v == "true" { + podEnvVar.cacheMode = common.CacheModeTryNone + } + return podEnvVar, nil } @@ -1299,6 +1317,10 @@ func makeImportEnv(podEnvVar *importPodEnvVar, uid types.UID) []corev1.EnvVar { Name: common.Preallocation, Value: strconv.FormatBool(podEnvVar.preallocation), }, + { + Name: common.CacheMode, + Value: podEnvVar.cacheMode, + }, } if podEnvVar.secretName != "" && podEnvVar.source != cc.SourceGCS { env = append(env, corev1.EnvVar{ @@ -1350,3 +1372,18 @@ func makeImportEnv(podEnvVar *importPodEnvVar, uid types.UID) []corev1.EnvVar { } return env } + +func isOOMKilled(status v1.ContainerStatus) bool { + if terminated := status.State.Terminated; terminated != nil { + if terminated.Reason == cc.OOMKilledReason { + return true + } + } + if terminated := status.LastTerminationState.Terminated; terminated != nil { + if terminated.Reason == cc.OOMKilledReason { + return true + } + } + + return false +} diff --git a/pkg/controller/import-controller_test.go b/pkg/controller/import-controller_test.go index 9391b3881..f87bfe297 100644 --- a/pkg/controller/import-controller_test.go +++ b/pkg/controller/import-controller_test.go @@ -447,10 +447,10 @@ var _ = Describe("Update PVC from POD", func() { }) It("Should create scratch PVC, if pod is pending and PVC is marked with scratch", func() { - scratchPvcName := &corev1.PersistentVolumeClaim{} - scratchPvcName.Name = "testPvc1-scratch" + scratchPvc := &corev1.PersistentVolumeClaim{} + scratchPvc.Name = "testPvc1-scratch" pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodPending), cc.AnnRequiresScratch: "true"}, nil, corev1.ClaimBound) - pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName) + pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc) pod.Status = corev1.PodStatus{ Phase: corev1.PodPending, ContainerStatuses: []v1.ContainerStatus{ @@ -468,7 +468,6 @@ var _ = Describe("Update PVC from POD", func() { Expect(err).ToNot(HaveOccurred()) By("Checking scratch PVC has been created") // Once all controllers are converted, we will use the runtime lib client instead of client-go and retrieval needs to change here. - scratchPvc := &v1.PersistentVolumeClaim{} err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "testPvc1-scratch", Namespace: "default"}, scratchPvc) Expect(err).ToNot(HaveOccurred()) Expect(scratchPvc.Spec.Resources).To(Equal(pvc.Spec.Resources)) @@ -533,9 +532,9 @@ var _ = Describe("Update PVC from POD", func() { It("Should NOT update phase on PVC, if pod exited with error state that is scratchspace exit", func() { pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning)}, nil, corev1.ClaimBound) - scratchPvcName := &corev1.PersistentVolumeClaim{} - scratchPvcName.Name = "testPvc1-scratch" - pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName) + scratchPvc := &corev1.PersistentVolumeClaim{} + scratchPvc.Name = "testPvc1-scratch" + pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc) pod.Status = corev1.PodStatus{ Phase: corev1.PodPending, ContainerStatuses: []corev1.ContainerStatus{ @@ -634,9 +633,9 @@ var _ = Describe("Update PVC from POD", func() { It("Should copy VDDK connection information to annotations on PVC", func() { pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, map[string]string{cc.AnnEndpoint: testEndPoint, cc.AnnPodPhase: string(corev1.PodRunning), cc.AnnSource: cc.SourceVDDK}, nil, corev1.ClaimBound) - scratchPvcName := &corev1.PersistentVolumeClaim{} - scratchPvcName.Name = "testPvc1-scratch" - pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvcName) + scratchPvc := &corev1.PersistentVolumeClaim{} + scratchPvc.Name = "testPvc1-scratch" + pod := cc.CreateImporterTestPod(pvc, "testPvc1", scratchPvc) pod.Status = corev1.PodStatus{ Phase: corev1.PodSucceeded, ContainerStatuses: []corev1.ContainerStatus{ @@ -669,9 +668,10 @@ var _ = Describe("Update PVC from POD", func() { It("Should delete pod for scratch space even if retainAfterCompletion is set", func() { annotations := map[string]string{ - cc.AnnEndpoint: testEndPoint, - cc.AnnImportPod: "testpod", - cc.AnnRequiresScratch: "true", + cc.AnnEndpoint: testEndPoint, + cc.AnnImportPod: "testpod", + // gets added by controller + // cc.AnnRequiresScratch: "true", cc.AnnSource: cc.SourceVDDK, cc.AnnPodRetainAfterCompletion: "true", } @@ -706,6 +706,60 @@ var _ = Describe("Update PVC from POD", func() { Expect(err.Error()).To(ContainSubstring("\"importer-testPvc1\" not found")) }) + It("Should delete pod in favor of recreating with cache=trynone in case of OOMKilled", func() { + annotations := map[string]string{ + cc.AnnEndpoint: testEndPoint, + cc.AnnSource: cc.SourceRegistry, + cc.AnnRegistryImportMethod: string(cdiv1.RegistryPullNode), + } + pvc := cc.CreatePvcInStorageClass("testPvc1", "default", &testStorageClass, annotations, nil, corev1.ClaimPending) + reconciler = createImportReconciler(pvc) + + _, err := reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}}) + Expect(err).ToNot(HaveOccurred()) + // First reconcile decides pods name, second creates it + _, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}}) + Expect(err).ToNot(HaveOccurred()) + + // Simulate OOMKilled on pod + resPod := &corev1.Pod{} + err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod) + Expect(err).ToNot(HaveOccurred()) + resPod.Status = corev1.PodStatus{ + Phase: corev1.PodRunning, + ContainerStatuses: []corev1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &corev1.ContainerStateTerminated{ + ExitCode: 137, + // This is an API + // https://github.com/kubernetes/kubernetes/blob/e38531e9a2359c2ba1505cb04d62d6810edc616e/staging/src/k8s.io/cri-api/pkg/apis/runtime/v1/api.pb.go#L5822-L5823 + Reason: cc.OOMKilledReason, + }, + }, + }, + }, + } + err = reconciler.client.Status().Update(context.TODO(), resPod) + Expect(err).ToNot(HaveOccurred()) + // Reconcile picks OOMKilled and deletes pod + _, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}}) + Expect(err).ToNot(HaveOccurred()) + err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("\"importer-testPvc1\" not found")) + // Next reconcile recreates pod with cache=trynone + _, err = reconciler.Reconcile(context.TODO(), reconcile.Request{NamespacedName: types.NamespacedName{Name: "testPvc1", Namespace: "default"}}) + Expect(err).ToNot(HaveOccurred()) + err = reconciler.client.Get(context.TODO(), types.NamespacedName{Name: "importer-testPvc1", Namespace: "default"}, resPod) + Expect(err).ToNot(HaveOccurred()) + Expect(resPod.Spec.Containers[0].Env).To(ContainElement( + corev1.EnvVar{ + Name: common.CacheMode, + Value: common.CacheModeTryNone, + }, + )) + }) }) var _ = Describe("Create Importer Pod", func() { @@ -1111,6 +1165,10 @@ func createImportTestEnv(podEnvVar *importPodEnvVar, uid string) []corev1.EnvVar Name: common.Preallocation, Value: strconv.FormatBool(podEnvVar.preallocation), }, + { + Name: common.CacheMode, + Value: podEnvVar.cacheMode, + }, } if podEnvVar.secretName != "" { diff --git a/pkg/image/BUILD.bazel b/pkg/image/BUILD.bazel index 800b39503..90f834757 100644 --- a/pkg/image/BUILD.bazel +++ b/pkg/image/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "directio.go", "filefmt.go", "nbdkit.go", "qemu.go", @@ -33,6 +34,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//pkg/common:go_default_library", "//pkg/system:go_default_library", "//tests/reporters:go_default_library", "//vendor/github.com/onsi/ginkgo:go_default_library", diff --git a/pkg/image/directio.go b/pkg/image/directio.go new file mode 100644 index 000000000..f5feadb83 --- /dev/null +++ b/pkg/image/directio.go @@ -0,0 +1,80 @@ +/* +Copyright 2023 The CDI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package image + +import ( + "io" + "os" + "syscall" + + "github.com/pkg/errors" + + "k8s.io/klog/v2" +) + +// DirectIOChecker checks if a certain destination supports direct I/O (bypassing page cache) +type DirectIOChecker interface { + CheckBlockDevice(path string) (bool, error) + CheckFile(path string) (bool, error) +} + +type directIOChecker struct{} + +// NewDirectIOChecker returns a new direct I/O checker +func NewDirectIOChecker() DirectIOChecker { + return &directIOChecker{} +} + +func (c *directIOChecker) CheckBlockDevice(path string) (bool, error) { + return c.check(path, syscall.O_RDONLY) +} + +func (c *directIOChecker) CheckFile(path string) (bool, error) { + flags := syscall.O_RDONLY + if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) { + // try to create the file and perform the check + flags = flags | syscall.O_CREAT + defer os.Remove(path) + } + return c.check(path, flags) +} + +// based on https://github.com/kubevirt/kubevirt/blob/c4fc4ab72a868399f5331438f35b8c33e7dd0720/pkg/virt-launcher/virtwrap/converter/converter.go#L346 +func (c *directIOChecker) check(path string, flags int) (bool, error) { + // #nosec No risk for path injection as we only open the file, not read from it. The function leaks only whether the directory to `path` exists. + f, err := os.OpenFile(path, flags|syscall.O_DIRECT, 0600) + if err != nil { + // EINVAL is returned if the filesystem does not support the O_DIRECT flag + if perr := (&os.PathError{}); errors.As(err, &perr) && errors.Is(perr, syscall.EINVAL) { + // #nosec No risk for path injection as we only open the file, not read from it. The function leaks only whether the directory to `path` exists. + f, err := os.OpenFile(path, flags & ^syscall.O_DIRECT, 0600) + if err == nil { + defer closeIOAndCheckErr(f) + return false, nil + } + } + return false, err + } + defer closeIOAndCheckErr(f) + return true, nil +} + +func closeIOAndCheckErr(c io.Closer) { + if ferr := c.Close(); ferr != nil { + klog.Errorf("Error when closing file: \n%s\n", ferr) + } +} diff --git a/pkg/image/qemu.go b/pkg/image/qemu.go index 99f63591e..03a4856a0 100644 --- a/pkg/image/qemu.go +++ b/pkg/image/qemu.go @@ -60,7 +60,7 @@ type ImgInfo struct { // QEMUOperations defines the interface for executing qemu subprocesses type QEMUOperations interface { - ConvertToRawStream(*url.URL, string, bool) error + ConvertToRawStream(*url.URL, string, bool, string) error Resize(string, resource.Quantity, bool) error Info(url *url.URL) (*ImgInfo, error) Validate(*url.URL, int64) error @@ -94,6 +94,7 @@ var ( {"--preallocation=falloc"}, {"--preallocation=full"}, } + odirectChecker = NewDirectIOChecker() ) func init() { @@ -114,9 +115,12 @@ func NewQEMUOperations() QEMUOperations { return &qemuOperations{} } -func convertToRaw(src, dest string, preallocate bool) error { - args := []string{"convert", "-t", "writeback", "-p", "-O", "raw", src, dest} - var err error +func convertToRaw(src, dest string, preallocate bool, cacheMode string) error { + cacheMode, err := getCacheMode(dest, cacheMode) + if err != nil { + return err + } + args := []string{"convert", "-t", cacheMode, "-p", "-O", "raw", src, dest} if preallocate { err = addPreallocation(args, convertPreallocationMethods, func(args []string) ([]byte, error) { @@ -138,11 +142,38 @@ func convertToRaw(src, dest string, preallocate bool) error { return nil } -func (o *qemuOperations) ConvertToRawStream(url *url.URL, dest string, preallocate bool) error { +func getCacheMode(path string, cacheMode string) (string, error) { + if cacheMode != common.CacheModeTryNone { + return "writeback", nil + } + + var supportDirectIO bool + isDevice, err := util.IsDevice(path) + if err != nil { + return "", err + } + + if isDevice { + supportDirectIO, err = odirectChecker.CheckBlockDevice(path) + } else { + supportDirectIO, err = odirectChecker.CheckFile(path) + } + if err != nil { + return "", err + } + + if supportDirectIO { + return "none", nil + } + + return "writeback", nil +} + +func (o *qemuOperations) ConvertToRawStream(url *url.URL, dest string, preallocate bool, cacheMode string) error { if len(url.Scheme) > 0 && url.Scheme != "nbd+unix" { return fmt.Errorf("not valid schema %s", url.Scheme) } - return convertToRaw(url.String(), dest, preallocate) + return convertToRaw(url.String(), dest, preallocate, cacheMode) } // convertQuantityToQemuSize translates a quantity string into a Qemu compatible string. @@ -242,8 +273,8 @@ func (o *qemuOperations) Validate(url *url.URL, availableSize int64) error { } // ConvertToRawStream converts an http accessible image to raw format without locally caching the image -func ConvertToRawStream(url *url.URL, dest string, preallocate bool) error { - return qemuIterface.ConvertToRawStream(url, dest, preallocate) +func ConvertToRawStream(url *url.URL, dest string, preallocate bool, cacheMode string) error { + return qemuIterface.ConvertToRawStream(url, dest, preallocate, cacheMode) } // Validate does basic validation of a qemu image @@ -295,9 +326,17 @@ func (o *qemuOperations) CreateBlankImage(dest string, size resource.Quantity, p return nil } -func execPreallocation(dest string, bs, count, offset int64) error { - args := []string{"if=/dev/zero", "of=" + dest, fmt.Sprintf("bs=%d", bs), fmt.Sprintf("count=%d", count), fmt.Sprintf("seek=%d", offset), "oflag=seek_bytes"} - _, err := qemuExecFunction(nil, nil, "dd", args...) +func execPreallocationBlock(dest string, bs, count, offset int64) error { + oflag := "oflag=seek_bytes" + supportDirectIO, err := odirectChecker.CheckBlockDevice(dest) + if err != nil { + return err + } + if supportDirectIO { + oflag += ",direct" + } + args := []string{"if=/dev/zero", "of=" + dest, fmt.Sprintf("bs=%d", bs), fmt.Sprintf("count=%d", count), fmt.Sprintf("seek=%d", offset), oflag} + _, err = qemuExecFunction(nil, nil, "dd", args...) if err != nil { return errors.Wrap(err, fmt.Sprintf("Could not preallocate blank block volume at %s, running dd for size %d, offset %d", dest, bs*count, offset)) } @@ -314,12 +353,12 @@ func PreallocateBlankBlock(dest string, size resource.Quantity) error { return errors.Wrap(err, fmt.Sprintf("Could not parse size for preallocating blank block volume at %s with size %s", dest, size.String())) } countBlocks, remainder := qemuSize/units.MiB, qemuSize%units.MiB - err = execPreallocation(dest, units.MiB, countBlocks, 0) + err = execPreallocationBlock(dest, units.MiB, countBlocks, 0) if err != nil { return err } if remainder != 0 { - return execPreallocation(dest, remainder, 1, countBlocks*units.MiB) + return execPreallocationBlock(dest, remainder, 1, countBlocks*units.MiB) } return nil } diff --git a/pkg/image/qemu_test.go b/pkg/image/qemu_test.go index a0bd45a08..5c091b847 100644 --- a/pkg/image/qemu_test.go +++ b/pkg/image/qemu_test.go @@ -19,6 +19,7 @@ import ( "fmt" "net/url" "os" + "os/exec" "path/filepath" "reflect" "strings" @@ -33,6 +34,7 @@ import ( dto "github.com/prometheus/client_model/go" + "kubevirt.io/containerized-data-importer/pkg/common" "kubevirt.io/containerized-data-importer/pkg/system" "github.com/prometheus/client_golang/prometheus" @@ -133,7 +135,9 @@ var _ = Describe("Convert to Raw", func() { var tmpDir, destPath string BeforeEach(func() { - tmpDir, err := os.MkdirTemp(os.TempDir(), "qemutestdest") + var err error + // dest is usually not tmpfs, stay honest in unit tests as well + tmpDir, err = os.MkdirTemp("/var/tmp", "qemutestdest") Expect(err).NotTo(HaveOccurred()) By("tmpDir: " + tmpDir) destPath = filepath.Join(tmpDir, "dest") @@ -147,14 +151,14 @@ var _ = Describe("Convert to Raw", func() { It("should return no error if exec function returns no error", func() { replaceExecFunction(mockExecFunction("", "", nil, "convert", "-p", "-O", "raw", "source", destPath), func() { - err := convertToRaw("source", destPath, false) + err := convertToRaw("source", destPath, false, "") Expect(err).NotTo(HaveOccurred()) }) }) It("should return conversion error if exec function returns error", func() { replaceExecFunction(mockExecFunction("", "exit 1", nil, "convert", "-p", "-O", "raw", "source", destPath), func() { - err := convertToRaw("source", destPath, false) + err := convertToRaw("source", destPath, false, "") Expect(err).To(HaveOccurred()) Expect(strings.Contains(err.Error(), "could not convert image to raw")).To(BeTrue()) }) @@ -164,7 +168,7 @@ var _ = Describe("Convert to Raw", func() { replaceExecFunction(mockExecFunction("", "", nil, "convert", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() { ep, err := url.Parse("/somefile/somewhere") Expect(err).NotTo(HaveOccurred()) - err = ConvertToRawStream(ep, destPath, false) + err = ConvertToRawStream(ep, destPath, false, "") Expect(err).NotTo(HaveOccurred()) }) }) @@ -173,7 +177,7 @@ var _ = Describe("Convert to Raw", func() { replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-o", "preallocation=falloc", "-t", "writeback", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() { ep, err := url.Parse("/somefile/somewhere") Expect(err).NotTo(HaveOccurred()) - err = ConvertToRawStream(ep, destPath, true) + err = ConvertToRawStream(ep, destPath, true, "") Expect(err).NotTo(HaveOccurred()) }) }) @@ -182,10 +186,53 @@ var _ = Describe("Convert to Raw", func() { replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-t", "writeback", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() { ep, err := url.Parse("/somefile/somewhere") Expect(err).NotTo(HaveOccurred()) - err = ConvertToRawStream(ep, destPath, false) + err = ConvertToRawStream(ep, destPath, false, "") Expect(err).NotTo(HaveOccurred()) }) }) + + Context("cache mode adjusted according to O_DIRECT support", func() { + var tmpFsDir string + + BeforeEach(func() { + var err error + + tmpFsDir, err = os.MkdirTemp("/mnt/cditmpfs", "qemutestdestontmpfs") + Expect(err).NotTo(HaveOccurred()) + By("tmpFsDir: " + tmpFsDir) + }) + + AfterEach(func() { + os.RemoveAll(tmpFsDir) + }) + + It("should use cache=none when destination supports O_DIRECT", func() { + replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-t", "none", "-p", "-O", "raw", "/somefile/somewhere", destPath), func() { + ep, err := url.Parse("/somefile/somewhere") + Expect(err).NotTo(HaveOccurred()) + err = ConvertToRawStream(ep, destPath, false, common.CacheModeTryNone) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + It("should use cache=writeback when destination does not support O_DIRECT", func() { + // ensure tmpfs destination + out, err := exec.Command("/usr/bin/findmnt", "-T", tmpFsDir, "-o", "FSTYPE").CombinedOutput() + Expect(err).NotTo(HaveOccurred()) + Expect(string(out)).To(ContainSubstring("tmpfs")) + + tmpFsDestPath := filepath.Join(tmpFsDir, "dest") + _, err = os.Create(tmpFsDestPath) + Expect(err).NotTo(HaveOccurred()) + + replaceExecFunction(mockExecFunctionStrict("", "", nil, "convert", "-t", "writeback", "-p", "-O", "raw", "/somefile/somewhere", tmpFsDestPath), func() { + ep, err := url.Parse("/somefile/somewhere") + Expect(err).NotTo(HaveOccurred()) + err = ConvertToRawStream(ep, tmpFsDestPath, false, common.CacheModeTryNone) + Expect(err).NotTo(HaveOccurred()) + }) + }) + }) }) var _ = Describe("Resize", func() { @@ -298,7 +345,8 @@ var _ = Describe("Create blank image", func() { var tmpDir, destPath string BeforeEach(func() { - tmpDir, err := os.MkdirTemp(os.TempDir(), "qemutestdest") + var err error + tmpDir, err = os.MkdirTemp(os.TempDir(), "qemutestdest") Expect(err).NotTo(HaveOccurred()) By("tmpDir: " + tmpDir) destPath = filepath.Join(tmpDir, "dest") @@ -353,12 +401,45 @@ var _ = Describe("Create blank image", func() { }) var _ = Describe("Create preallocated blank block", func() { + var tmpDir, tmpFsDir, destPath string + + BeforeEach(func() { + var err error + // dest is usually not tmpfs, stay honest in unit tests as well + tmpDir, err = os.MkdirTemp("/var/tmp", "qemutestdest") + Expect(err).NotTo(HaveOccurred()) + By("tmpDir: " + tmpDir) + destPath = filepath.Join(tmpDir, "dest") + _, err = os.Create(destPath) + Expect(err).NotTo(HaveOccurred()) + + tmpFsDir, err = os.MkdirTemp("/mnt/cditmpfs", "qemutestdestontmpfs") + Expect(err).NotTo(HaveOccurred()) + By("tmpFsDir: " + tmpFsDir) + }) + + AfterEach(func() { + os.RemoveAll(tmpDir) + os.RemoveAll(tmpFsDir) + }) + It("Should complete successfully if preallocation succeeds", func() { quantity, err := resource.ParseQuantity("10Gi") Expect(err).NotTo(HaveOccurred()) - dest := "cdi-block-volume" - replaceExecFunction(mockExecFunction("", "", nil, "if=/dev/zero", "of="+dest, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes"), func() { - err = PreallocateBlankBlock(dest, quantity) + replaceExecFunction(mockExecFunction("", "", nil, "if=/dev/zero", "of="+destPath, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes,direct"), func() { + err = PreallocateBlankBlock(destPath, quantity) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + It("Should complete successfully with tmpfs dest without O_DIRECT if preallocation succeeds", func() { + tmpFsDestPath := filepath.Join(tmpFsDir, "dest") + _, err := os.Create(tmpFsDestPath) + Expect(err).NotTo(HaveOccurred()) + quantity, err := resource.ParseQuantity("10Gi") + Expect(err).NotTo(HaveOccurred()) + replaceExecFunction(mockExecFunction("", "", nil, "if=/dev/zero", "of="+tmpFsDestPath, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes"), func() { + err = PreallocateBlankBlock(tmpFsDestPath, quantity) Expect(err).NotTo(HaveOccurred()) }) }) @@ -366,11 +447,10 @@ var _ = Describe("Create preallocated blank block", func() { It("Should complete successfully with value not aligned to 1MiB", func() { quantity, err := resource.ParseQuantity("5243392Ki") Expect(err).NotTo(HaveOccurred()) - dest := "cdi-block-volume" - firstCallArgs := []string{"if=/dev/zero", "of=" + dest, "bs=1048576", "count=5120", "seek=0", "oflag=seek_bytes"} - secondCallArgs := []string{"if=/dev/zero", "of=" + dest, "bs=524288", "count=1", "seek=5368709120", "oflag=seek_bytes"} + firstCallArgs := []string{"if=/dev/zero", "of=" + destPath, "bs=1048576", "count=5120", "seek=0", "oflag=seek_bytes,direct"} + secondCallArgs := []string{"if=/dev/zero", "of=" + destPath, "bs=524288", "count=1", "seek=5368709120", "oflag=seek_bytes,direct"} replaceExecFunction(mockExecFunctionTwoCalls("", "", nil, firstCallArgs, secondCallArgs), func() { - err = PreallocateBlankBlock(dest, quantity) + err = PreallocateBlankBlock(destPath, quantity) Expect(err).NotTo(HaveOccurred()) }) }) @@ -378,9 +458,8 @@ var _ = Describe("Create preallocated blank block", func() { It("Should fail if preallocation fails", func() { quantity, err := resource.ParseQuantity("10Gi") Expect(err).NotTo(HaveOccurred()) - dest := "cdi-block-volume" - replaceExecFunction(mockExecFunction("", "exit 1", nil, "if=/dev/zero", "of="+dest, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes"), func() { - err = PreallocateBlankBlock(dest, quantity) + replaceExecFunction(mockExecFunction("", "exit 1", nil, "if=/dev/zero", "of="+destPath, "bs=1048576", "count=10240", "seek=0", "oflag=seek_bytes,direct"), func() { + err = PreallocateBlankBlock(destPath, quantity) Expect(strings.Contains(err.Error(), "Could not preallocate blank block volume at")).To(BeTrue()) }) }) diff --git a/pkg/importer/data-processor.go b/pkg/importer/data-processor.go index ca7b2e853..8a6ec9ddb 100644 --- a/pkg/importer/data-processor.go +++ b/pkg/importer/data-processor.go @@ -123,10 +123,13 @@ type DataProcessor struct { preallocationApplied bool // phaseExecutors is a mapping from the given processing phase to its execution function. The function returns the next processing phase or error. phaseExecutors map[ProcessingPhase]func() (ProcessingPhase, error) + // cacheMode is the mode in which we choose the qemu-img cache mode: + // TRY_NONE = bypass page cache if the target supports it, otherwise, fall back to using page cache + cacheMode string } // NewDataProcessor create a new instance of a data processor using the passed in data provider. -func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratchDataDir, requestImageSize string, filesystemOverhead float64, preallocation bool) *DataProcessor { +func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratchDataDir, requestImageSize string, filesystemOverhead float64, preallocation bool, cacheMode string) *DataProcessor { dp := &DataProcessor{ currentPhase: ProcessingPhaseInfo, source: dataSource, @@ -136,6 +139,7 @@ func NewDataProcessor(dataSource DataSourceInterface, dataFile, dataDir, scratch requestImageSize: requestImageSize, filesystemOverhead: filesystemOverhead, preallocation: preallocation, + cacheMode: cacheMode, } // Calculate available space before doing anything. dp.availableSpace = dp.calculateTargetSize() @@ -277,7 +281,7 @@ func (dp *DataProcessor) convert(url *url.URL) (ProcessingPhase, error) { return ProcessingPhaseError, err } klog.V(3).Infoln("Converting to Raw") - err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation) + err = qemuOperations.ConvertToRawStream(url, dp.dataFile, dp.preallocation, dp.cacheMode) if err != nil { return ProcessingPhaseError, errors.Wrap(err, "Conversion to Raw failed") } diff --git a/pkg/importer/data-processor_test.go b/pkg/importer/data-processor_test.go index 0e1cbcdb2..9d1589bd6 100644 --- a/pkg/importer/data-processor_test.go +++ b/pkg/importer/data-processor_test.go @@ -148,7 +148,7 @@ var _ = Describe("Data Processor", func() { infoResponse: ProcessingPhaseTransferScratch, transferResponse: ProcessingPhaseComplete, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") err := dp.ProcessData() Expect(err).ToNot(HaveOccurred()) Expect(2).To(Equal(len(mdp.calledPhases))) @@ -162,7 +162,7 @@ var _ = Describe("Data Processor", func() { infoResponse: ProcessingPhaseTransferDataDir, transferResponse: ProcessingPhaseComplete, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") err := dp.ProcessData() Expect(err).ToNot(HaveOccurred()) Expect(2).To(Equal(len(mdp.calledPhases))) @@ -176,7 +176,7 @@ var _ = Describe("Data Processor", func() { infoResponse: ProcessingPhaseTransferScratch, transferResponse: ProcessingPhaseError, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") err := dp.ProcessData() Expect(err).To(HaveOccurred()) Expect(2).To(Equal(len(mdp.calledPhases))) @@ -190,7 +190,7 @@ var _ = Describe("Data Processor", func() { transferResponse: ProcessingPhaseError, needsScratch: true, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") err := dp.ProcessData() Expect(err).To(HaveOccurred()) Expect(ErrRequiresScratchSpace).To(Equal(err)) @@ -204,7 +204,7 @@ var _ = Describe("Data Processor", func() { infoResponse: ProcessingPhaseTransferDataFile, transferResponse: ProcessingPhaseComplete, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, nil, nil, nil) replaceQEMUOperations(qemuOperations, func() { err := dp.ProcessData() @@ -220,7 +220,7 @@ var _ = Describe("Data Processor", func() { infoResponse: ProcessingPhaseTransferDataFile, transferResponse: ProcessingPhaseError, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewQEMUAllErrors() replaceQEMUOperations(qemuOperations, func() { err := dp.ProcessData() @@ -235,7 +235,7 @@ var _ = Describe("Data Processor", func() { mdp := &MockDataProvider{ infoResponse: ProcessingPhase("invalidphase"), } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") err := dp.ProcessData() Expect(err).To(HaveOccurred()) Expect(1).To(Equal(len(mdp.calledPhases))) @@ -254,7 +254,7 @@ var _ = Describe("Data Processor", func() { transferResponse: ProcessingPhaseConvert, url: url, } - dp := NewDataProcessor(mdp, "", "dataDir", tmpDir, "1G", 0.055, false) + dp := NewDataProcessor(mdp, "", "dataDir", tmpDir, "1G", 0.055, false, "") dp.availableSpace = int64(1536000) usableSpace := dp.getUsableSpace() @@ -277,7 +277,7 @@ var _ = Describe("Data Processor", func() { }, fooResponse: ProcessingPhaseComplete, } - dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") dp.RegisterPhaseExecutor(ProcessingPhaseFoo, func() (ProcessingPhase, error) { return mcdp.Foo() }) @@ -299,7 +299,7 @@ var _ = Describe("Data Processor", func() { }, fooResponse: ProcessingPhaseInfo, } - dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mcdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") dp.RegisterPhaseExecutor(ProcessingPhaseFoo, func() (ProcessingPhase, error) { return mcdp.Foo() }) @@ -311,7 +311,7 @@ var _ = Describe("Data Processor", func() { mdp := &MockDataProvider{ infoResponse: "unknown", } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") err := dp.ProcessData() Expect(err).To(HaveOccurred()) }) @@ -324,7 +324,7 @@ var _ = Describe("Convert", func() { mdp := &MockDataProvider{ url: url, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, nil, nil, nil) replaceQEMUOperations(qemuOperations, func() { nextPhase, err := dp.convert(mdp.GetURL()) @@ -339,7 +339,7 @@ var _ = Describe("Convert", func() { mdp := &MockDataProvider{ url: url, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, errors.New("Validation failure"), nil, nil) replaceQEMUOperations(qemuOperations, func() { nextPhase, err := dp.convert(mdp.GetURL()) @@ -354,7 +354,7 @@ var _ = Describe("Convert", func() { mdp := &MockDataProvider{ url: url, } - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewFakeQEMUOperations(errors.New("Conversion failure"), nil, fakeInfoOpRetVal{&fakeZeroImageInfo, errors.New("Scratch space required, and none found ")}, nil, nil, nil) replaceQEMUOperations(qemuOperations, func() { nextPhase, err := dp.convert(mdp.GetURL()) @@ -373,7 +373,7 @@ var _ = Describe("Resize", func() { mdp := &MockDataProvider{ url: url, } - dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "", 0.055, false) + dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "", 0.055, false, "") qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, nil}, nil, nil, nil) replaceQEMUOperations(qemuOperations, func() { nextPhase, err := dp.resize() @@ -395,7 +395,7 @@ var _ = Describe("Resize", func() { mdp := &MockDataProvider{ url: url, } - dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, tempDir, "dataDir", "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, nil}, nil, nil, nil) replaceQEMUOperations(qemuOperations, func() { nextPhase, err := dp.resize() @@ -413,7 +413,7 @@ var _ = Describe("Resize", func() { mdp := &MockDataProvider{ url: url, } - dp := NewDataProcessor(mdp, tmpDir, tmpDir, "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, tmpDir, tmpDir, "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewFakeQEMUOperations(nil, nil, fakeInfoOpRetVal{&fakeZeroImageInfo, nil}, nil, nil, nil) replaceQEMUOperations(qemuOperations, func() { nextPhase, err := dp.resize() @@ -430,7 +430,7 @@ var _ = Describe("Resize", func() { mdp := &MockDataProvider{ url: url, } - dp := NewDataProcessor(mdp, "dest", tmpDir, "scratchDataDir", "1G", 0.055, false) + dp := NewDataProcessor(mdp, "dest", tmpDir, "scratchDataDir", "1G", 0.055, false, "") qemuOperations := NewQEMUAllErrors() replaceQEMUOperations(qemuOperations, func() { nextPhase, err := dp.resize() @@ -444,7 +444,7 @@ var _ = Describe("Resize", func() { return int64(100000), nil }, func() { mdp := &MockDataProvider{} - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "") Expect(int64(100000)).To(Equal(dp.calculateTargetSize())) }) }) @@ -454,7 +454,7 @@ var _ = Describe("Resize", func() { return int64(-1), errors.New("error") }, func() { mdp := &MockDataProvider{} - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "") // We just log the error if one happens. Expect(int64(-1)).To(Equal(dp.calculateTargetSize())) @@ -485,7 +485,7 @@ var _ = Describe("ResizeImage", func() { var _ = Describe("DataProcessorResume", func() { It("Should fail with an error if the data provider cannot resume", func() { mdp := &MockDataProvider{} - dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false) + dp := NewDataProcessor(mdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "") err := dp.ProcessDataResume() Expect(err).To(HaveOccurred()) }) @@ -494,7 +494,7 @@ var _ = Describe("DataProcessorResume", func() { amdp := &MockAsyncDataProvider{ ResumePhase: ProcessingPhaseComplete, } - dp := NewDataProcessor(amdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false) + dp := NewDataProcessor(amdp, "dest", "dataDir", "scratchDataDir", "", 0.055, false, "") err := dp.ProcessDataResume() Expect(err).ToNot(HaveOccurred()) }) @@ -515,7 +515,7 @@ var _ = Describe("MergeDelta", func() { url: url, } - dp := NewDataProcessor(mdp, expectedBackingFile, "dataDir", "scratchDataDir", "", 0.055, false) + dp := NewDataProcessor(mdp, expectedBackingFile, "dataDir", "scratchDataDir", "", 0.055, false, "") err := errors.New("this operation should not be called") info := &image.ImgInfo{ Format: "", @@ -560,7 +560,7 @@ func NewFakeQEMUOperations(e2, e3 error, ret4 fakeInfoOpRetVal, e5 error, e6 err return &fakeQEMUOperations{e2, e3, ret4, e5, e6, targetResize} } -func (o *fakeQEMUOperations) ConvertToRawStream(*url.URL, string, bool) error { +func (o *fakeQEMUOperations) ConvertToRawStream(*url.URL, string, bool, string) error { return o.e2 } diff --git a/pkg/uploadserver/uploadserver.go b/pkg/uploadserver/uploadserver.go index 16d279698..7f503ce17 100644 --- a/pkg/uploadserver/uploadserver.go +++ b/pkg/uploadserver/uploadserver.go @@ -439,7 +439,7 @@ func newAsyncUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, } uds := importer.NewAsyncUploadDataSource(newContentReader(stream, sourceContentType)) - processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation) + processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "") return processor, processor.ProcessDataWithPause() } @@ -450,7 +450,7 @@ func newUploadStreamProcessor(stream io.ReadCloser, dest, imageSize string, file // Clone block device to block device or file system uds := importer.NewUploadDataSource(newContentReader(stream, sourceContentType), dvContentType) - processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation) + processor := importer.NewDataProcessor(uds, dest, common.ImporterVolumePath, common.ScratchDataDir, imageSize, filesystemOverhead, preallocation, "") err := processor.ProcessData() return processor.PreallocationApplied(), err } diff --git a/pkg/uploadserver/uploadserver_test.go b/pkg/uploadserver/uploadserver_test.go index c3a857ce8..624b5bc52 100644 --- a/pkg/uploadserver/uploadserver_test.go +++ b/pkg/uploadserver/uploadserver_test.go @@ -154,11 +154,11 @@ func (amd *AsyncMockDataSource) GetResumePhase() importer.ProcessingPhase { } func saveAsyncProcessorSuccess(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, contentType string) (*importer.DataProcessor, error) { - return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false), nil + return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false, ""), nil } func saveAsyncProcessorFailure(stream io.ReadCloser, dest, imageSize string, filesystemOverhead float64, preallocation bool, contentType string) (*importer.DataProcessor, error) { - return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false), fmt.Errorf("Error using datastream") + return importer.NewDataProcessor(&AsyncMockDataSource{}, "", "", "", "", 0.055, false, ""), fmt.Errorf("Error using datastream") } func withAsyncProcessorSuccess(f func()) {