importer, util: Consolidate stream data to file (#3721)

* move util/file functions to importer/file

This contributes to the goal of eliminating util functions.
In future commits certain redundancy that existed between the importer
and util can be eliminated.

Signed-off-by: Adi Aloni <aaloni@redhat.com>

* importer replace streamDataToFile with StreamDataToFile

Previously there were two functions that were used to stream data to a
device:
- importer/util streamDataToFile
- importer/file StreamDataToFile

StreamDataToFile was originally introduced to handle preallocation for
host assisted clones[1]. Aside from that ability, the two functions are
identical in functionality.

This commit replaces streamDataToFile with StreamDataToFile.

[1] https://github.com/kubevirt/containerized-data-importer/pull/3352

Signed-off-by: Adi Aloni <aaloni@redhat.com>

* importer: respect preallocation in StreamDataToFile

Previously, the preallocation setting that's used in the data-processor
was only respected in the upload-server's clone-processor.

With this change, preallocation is respected in StreamDataToFile used
throughout all relevant datasources.

This should also result in more efficient imports as the default of the
preallocation setting is false.

Signed-off-by: Adi Aloni <aaloni@redhat.com>

* importer, errors: introduce IsNoCapacityError

Previously StreamDataToFile would assert out of space errors by matching
a substring in the error's message. This would not work in all
filesystems such as IBM's GPFS.

This commit introduces IsNoCapacityError which matches an error with all
insufficient capacity related errors (e.g., ENOSPC, EDQUOT) and uses it
instead of similar assertions.

Signed-off-by: Adi Aloni <aaloni@redhat.com>

---------

Signed-off-by: Adi Aloni <aaloni@redhat.com>
This commit is contained in:
Adi Aloni 2025-05-11 13:11:55 +03:00 committed by GitHub
parent c0572f854e
commit bd7d028287
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 274 additions and 321 deletions

View File

@ -132,7 +132,7 @@ func main() {
os.Exit(1)
}
availableDestSpace, err := util.GetAvailableSpaceByVolumeMode(volumeMode)
availableDestSpace, err := importer.GetAvailableSpaceByVolumeMode(volumeMode)
if err != nil {
klog.Errorf("%+v", err)
os.Exit(1)

View File

@ -391,7 +391,8 @@ func addLabelsFromTerminationMessage(labels map[string]string, termMsg *common.T
func simplifyKnownMessage(msg string) string {
if strings.Contains(msg, "is larger than the reported available") ||
strings.Contains(msg, "no space left on device") ||
strings.Contains(msg, "file largest block is bigger than maxblock") {
strings.Contains(msg, "file largest block is bigger than maxblock") ||
strings.Contains(msg, "disk quota exceeded") {
return "DataVolume too small to contain image"
}

View File

@ -18,6 +18,7 @@ go_library(
"//pkg/util:go_default_library",
"//vendor/github.com/docker/go-units:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
],

View File

@ -27,6 +27,7 @@ import (
"github.com/docker/go-units"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/klog/v2"
@ -133,16 +134,19 @@ func getCacheMode(path string, cacheMode string) (string, error) {
}
var supportDirectIO bool
isDevice, err := util.IsDevice(path)
if err != nil {
var stat unix.Stat_t
var err error
if err = unix.Stat(path, &stat); err != nil {
return "", err
}
if isDevice {
if (stat.Mode & unix.S_IFMT) == unix.S_IFBLK {
supportDirectIO, err = odirectChecker.CheckBlockDevice(path)
} else {
supportDirectIO, err = odirectChecker.CheckFile(path)
}
if err != nil {
return "", err
}

View File

@ -5,6 +5,7 @@ go_library(
srcs = [
"data-processor.go",
"errors.go",
"file.go",
"format-readers.go",
"gcs-datasource.go",
"http-datasource.go",
@ -44,7 +45,9 @@ go_library(
"//vendor/github.com/ovirt/go-ovirt-client-log-klog:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/github.com/ulikunitz/xz:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/google.golang.org/api/option:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/klog/v2:go_default_library",
] + select({
@ -56,16 +59,8 @@ go_library(
"//vendor/github.com/vmware/govmomi/vim25/methods:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25/mo:go_default_library",
"//vendor/github.com/vmware/govmomi/vim25/types:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/libguestfs.org/libnbd:go_default_library",
],
"@io_bazel_rules_go//go/platform:arm64": [
"//vendor/k8s.io/api/core/v1:go_default_library",
],
"@io_bazel_rules_go//go/platform:s390x": [
"//vendor/k8s.io/api/core/v1:go_default_library",
],
"//conditions:default": [],
}),
)
@ -74,6 +69,7 @@ go_test(
name = "go_default_test",
srcs = [
"data-processor_test.go",
"file_test.go",
"format-readers_test.go",
"gcs-datasource_test.go",
"http-datasource_test.go",

View File

@ -62,17 +62,17 @@ const (
)
// may be overridden in tests
var getAvailableSpaceBlockFunc = util.GetAvailableSpaceBlock
var getAvailableSpaceFunc = util.GetAvailableSpace
var getAvailableSpaceBlockFunc = GetAvailableSpaceBlock
var getAvailableSpaceFunc = GetAvailableSpace
// DataSourceInterface is the interface all data sources should implement.
type DataSourceInterface interface {
// Info is called to get initial information about the data.
Info() (ProcessingPhase, error)
// Transfer is called to transfer the data from the source to the path passed in.
Transfer(path string) (ProcessingPhase, error)
Transfer(path string, preallocation bool) (ProcessingPhase, error)
// TransferFile is called to transfer the data from the source to the file passed in.
TransferFile(fileName string) (ProcessingPhase, error)
TransferFile(fileName string, preallocation bool) (ProcessingPhase, error)
// Geturl returns the url that the data processor can use when converting the data.
GetURL() *url.URL
// GetTerminationMessage returns data to be serialized and used as the termination message of the importer.
@ -170,7 +170,7 @@ func (dp *DataProcessor) initDefaultPhases() {
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseTransferScratch, func() (ProcessingPhase, error) {
pp, err := dp.source.Transfer(dp.scratchDataDir)
pp, err := dp.source.Transfer(dp.scratchDataDir, dp.preallocation)
if errors.Is(err, ErrInvalidPath) {
// Passed in invalid scratch space path, return scratch space needed error.
err = ErrRequiresScratchSpace
@ -180,14 +180,14 @@ func (dp *DataProcessor) initDefaultPhases() {
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataDir, func() (ProcessingPhase, error) {
pp, err := dp.source.Transfer(dp.dataDir)
pp, err := dp.source.Transfer(dp.dataDir, dp.preallocation)
if err != nil {
err = errors.Wrap(err, "Unable to transfer source data to target directory")
}
return pp, err
})
dp.RegisterPhaseExecutor(ProcessingPhaseTransferDataFile, func() (ProcessingPhase, error) {
pp, err := dp.source.TransferFile(dp.dataFile)
pp, err := dp.source.TransferFile(dp.dataFile, dp.preallocation)
if err != nil {
err = errors.Wrap(err, "Unable to transfer source data to target file")
}

View File

@ -63,7 +63,7 @@ func (m *MockDataProvider) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to the passed in path.
func (m *MockDataProvider) Transfer(path string) (ProcessingPhase, error) {
func (m *MockDataProvider) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
m.calledPhases = append(m.calledPhases, m.infoResponse)
m.transferPath = path
if m.transferResponse == ProcessingPhaseError {
@ -76,7 +76,7 @@ func (m *MockDataProvider) Transfer(path string) (ProcessingPhase, error) {
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (m *MockDataProvider) TransferFile(fileName string) (ProcessingPhase, error) {
func (m *MockDataProvider) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
m.calledPhases = append(m.calledPhases, ProcessingPhaseTransferDataFile)
m.transferFile = fileName
if m.transferResponse == ProcessingPhaseError {
@ -111,13 +111,13 @@ func (madp *MockAsyncDataProvider) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to the passed in path.
func (madp *MockAsyncDataProvider) Transfer(path string) (ProcessingPhase, error) {
return madp.MockDataProvider.Transfer(path)
func (madp *MockAsyncDataProvider) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
return madp.MockDataProvider.Transfer(path, preallocation)
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (madp *MockAsyncDataProvider) TransferFile(fileName string) (ProcessingPhase, error) {
return madp.MockDataProvider.TransferFile(fileName)
func (madp *MockAsyncDataProvider) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
return madp.MockDataProvider.TransferFile(fileName, preallocation)
}
// Close closes any readers or other open resources.

View File

@ -1,7 +1,9 @@
package importer
import (
"errors"
"fmt"
"syscall"
"kubevirt.io/containerized-data-importer/pkg/common"
)
@ -40,3 +42,9 @@ func (err *ImagePullFailedError) Error() string {
func (err *ImagePullFailedError) Unwrap() error {
return err.err
}
func IsNoCapacityError(err error) bool {
return errors.Is(err, syscall.ENOSPC) ||
errors.Is(err, syscall.EDQUOT) ||
errors.As(err, &ValidationSizeError{})
}

View File

@ -1,4 +1,4 @@
package util
package importer
import (
"bytes"
@ -6,7 +6,6 @@ import (
"io"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"syscall"
@ -17,6 +16,10 @@ import (
"k8s.io/klog/v2"
)
var (
blockdevFileName = "/usr/sbin/blockdev"
)
// OpenFileOrBlockDevice opens the destination data file, whether it is a block device or regular file
func OpenFileOrBlockDevice(fileName string) (*os.File, error) {
var outFile *os.File
@ -37,27 +40,6 @@ func OpenFileOrBlockDevice(fileName string) (*os.File, error) {
return outFile, nil
}
// CopyFile copies a file from one location to another.
func CopyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, in)
if err != nil {
return err
}
return out.Close()
}
// LinkFile symlinks the source to the target
func LinkFile(source, target string) error {
out, err := exec.Command("/usr/bin/ln", "-s", source, target).CombinedOutput()
@ -68,44 +50,6 @@ func LinkFile(source, target string) error {
return nil
}
// CopyDir copies a dir from one location to another.
func CopyDir(source string, dest string) error {
// get properties of source dir
sourceinfo, err := os.Stat(source)
if err != nil {
return err
}
// create dest dir
err = os.MkdirAll(dest, sourceinfo.Mode())
if err != nil {
return err
}
directory, _ := os.Open(source)
objects, err := directory.Readdir(-1)
for _, obj := range objects {
src := filepath.Join(source, obj.Name())
dst := filepath.Join(dest, obj.Name())
if obj.IsDir() {
// create sub-directories - recursively
err = CopyDir(src, dst)
if err != nil {
fmt.Println(err)
}
} else {
// perform copy
err = CopyFile(src, dst)
if err != nil {
fmt.Println(err)
}
}
}
return err
}
// GetAvailableSpace gets the amount of available space at the path specified.
func GetAvailableSpace(path string) (int64, error) {
var stat syscall.Statfs_t
@ -247,15 +191,18 @@ func StreamDataToFile(r io.Reader, fileName string, preallocate bool) (int64, in
bytesWritten = bytesRead
}
if err != nil {
os.Remove(outFile.Name())
if strings.Contains(err.Error(), "no space left on device") {
err = errors.Wrapf(err, "unable to write to file")
}
return bytesRead, bytesWritten, err
}
defer func() {
klog.Infof("Read %d bytes, wrote %d bytes to %s", bytesRead, bytesWritten, outFile.Name())
}()
klog.Infof("Read %d bytes, wrote %d bytes to %s", bytesRead, bytesWritten, outFile.Name())
if err != nil {
klog.Errorf("Unable to write file from dataReader: %v\n", err)
os.Remove(outFile.Name())
if IsNoCapacityError(err) {
return bytesRead, bytesWritten, fmt.Errorf("unable to write to file: %w", err)
}
return bytesRead, bytesWritten, NewImagePullFailedError(err)
}
err = outFile.Sync()

View File

@ -1,4 +1,4 @@
package util
package importer
import (
"bytes"
@ -12,52 +12,7 @@ import (
. "github.com/onsi/gomega"
)
const (
pattern = "^[a-zA-Z0-9]+$"
TestImagesDir = "../../tests/images"
)
var (
fileDir, _ = filepath.Abs(TestImagesDir)
)
var _ = Describe("All tests", func() {
var _ = Describe("Copy files", func() {
var destTmp string
var err error
BeforeEach(func() {
destTmp, err = os.MkdirTemp("", "dest")
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
err = os.RemoveAll(destTmp)
Expect(err).NotTo(HaveOccurred())
os.Remove("test.txt")
})
It("Should copy file from source to dest, with valid source and dest", func() {
err = CopyFile(filepath.Join(TestImagesDir, "content.tar"), filepath.Join(destTmp, "target.tar"))
Expect(err).ToNot(HaveOccurred())
sourceMd5, err := Md5sum(filepath.Join(TestImagesDir, "content.tar"))
Expect(err).ToNot(HaveOccurred())
targetMd5, err := Md5sum(filepath.Join(destTmp, "target.tar"))
Expect(err).ToNot(HaveOccurred())
Expect(sourceMd5).Should(Equal(targetMd5))
})
It("Should not copy file from source to dest, with invalid source", func() {
err = CopyFile(filepath.Join(TestImagesDir, "content.tar22"), filepath.Join(destTmp, "target.tar"))
Expect(err).To(HaveOccurred())
})
It("Should not copy file from source to dest, with invalid target", func() {
err = CopyFile(filepath.Join(TestImagesDir, "content.tar"), filepath.Join("/invalidpath", "target.tar"))
Expect(err).To(HaveOccurred())
})
})
var _ = Describe("Zero out ranges in files", func() {
var testFile *os.File
var testData []byte

View File

@ -15,7 +15,6 @@ import (
"k8s.io/klog/v2"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/util"
)
const (
@ -109,7 +108,7 @@ func (sd *GCSDataSource) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to a temporary location.
func (sd *GCSDataSource) Transfer(path string) (ProcessingPhase, error) {
func (sd *GCSDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
klog.V(3).Infoln("GCS Importer: Transfer")
file := filepath.Join(path, tempFile)
@ -117,7 +116,7 @@ func (sd *GCSDataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseError, err
}
size, _ := util.GetAvailableSpace(path)
size, _ := GetAvailableSpace(path)
if size <= int64(0) {
//Path provided is invalid.
@ -125,8 +124,7 @@ func (sd *GCSDataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseError, ErrInvalidPath
}
err := streamDataToFile(sd.readers.TopReader(), file)
_, _, err := StreamDataToFile(sd.readers.TopReader(), file, preallocation)
if err != nil {
klog.V(3).Infoln("GCS Importer: Transfer Error: ", err)
return ProcessingPhaseError, err
@ -137,12 +135,12 @@ func (sd *GCSDataSource) Transfer(path string) (ProcessingPhase, error) {
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (sd *GCSDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (sd *GCSDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
err := streamDataToFile(sd.readers.TopReader(), fileName)
_, _, err := StreamDataToFile(sd.readers.TopReader(), fileName, preallocation)
if err != nil {
return ProcessingPhaseError, err
}

View File

@ -243,7 +243,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -257,7 +257,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -271,7 +271,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -285,7 +285,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -301,7 +301,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -317,7 +317,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -333,7 +333,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -349,7 +349,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -363,7 +363,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -377,7 +377,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -391,7 +391,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -405,7 +405,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -421,7 +421,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -437,7 +437,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -453,7 +453,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -469,7 +469,7 @@ var _ = Describe("Google Cloud Storage data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})

View File

@ -148,18 +148,18 @@ func (hs *HTTPDataSource) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to a scratch location.
func (hs *HTTPDataSource) Transfer(path string) (ProcessingPhase, error) {
func (hs *HTTPDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
if hs.contentType == cdiv1.DataVolumeKubeVirt {
file := filepath.Join(path, tempFile)
if err := CleanAll(file); err != nil {
return ProcessingPhaseError, err
}
size, err := util.GetAvailableSpace(path)
size, err := GetAvailableSpace(path)
if err != nil || size <= 0 {
return ProcessingPhaseError, ErrInvalidPath
}
hs.readers.StartProgressUpdate()
err = streamDataToFile(hs.readers.TopReader(), file)
_, _, err = StreamDataToFile(hs.readers.TopReader(), file, preallocation)
if err != nil {
return ProcessingPhaseError, err
}
@ -177,12 +177,12 @@ func (hs *HTTPDataSource) Transfer(path string) (ProcessingPhase, error) {
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (hs *HTTPDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (hs *HTTPDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
hs.readers.StartProgressUpdate()
err := streamDataToFile(hs.readers.TopReader(), fileName)
_, _, err := StreamDataToFile(hs.readers.TopReader(), fileName, preallocation)
if err != nil {
return ProcessingPhaseError, err
}

View File

@ -123,7 +123,7 @@ var _ = Describe("Http data source", func() {
Expect(err).NotTo(HaveOccurred())
_, err := dp.Info()
Expect(err).NotTo(HaveOccurred())
newPhase, err := dp.Transfer(scratchPath)
newPhase, err := dp.Transfer(scratchPath, false)
if !wantErr {
Expect(err).NotTo(HaveOccurred())
Expect(expectedPhase).To(Equal(newPhase))

View File

@ -123,7 +123,7 @@ func (is *ImageioDataSource) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to a scratch location.
func (is *ImageioDataSource) Transfer(path string) (ProcessingPhase, error) {
func (is *ImageioDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
defer is.cleanupTransfer()
file := filepath.Join(path, tempFile)
err := CleanAll(file)
@ -131,13 +131,13 @@ func (is *ImageioDataSource) Transfer(path string) (ProcessingPhase, error) {
return ProcessingPhaseError, err
}
// we know that there won't be archives
size, _ := util.GetAvailableSpace(path)
size, _ := GetAvailableSpace(path)
if size <= int64(0) {
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
is.readers.StartProgressUpdate()
err = streamDataToFile(is.readers.TopReader(), file)
_, _, err = StreamDataToFile(is.readers.TopReader(), file, preallocation)
if err != nil {
return ProcessingPhaseError, err
}
@ -166,7 +166,7 @@ func (is *ImageioDataSource) Transfer(path string) (ProcessingPhase, error) {
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (is *ImageioDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (is *ImageioDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
if !is.IsDeltaCopy() {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
@ -182,7 +182,7 @@ func (is *ImageioDataSource) TransferFile(fileName string) (ProcessingPhase, err
return ProcessingPhaseError, err
}
} else {
err := streamDataToFile(is.readers.TopReader(), fileName)
_, _, err := StreamDataToFile(is.readers.TopReader(), fileName, preallocation)
if err != nil {
return ProcessingPhaseError, err
}
@ -269,7 +269,7 @@ func (is *ImageioDataSource) getExtentsReader() (*extentReader, error) {
// StreamExtents requests individual extents from the ImageIO API and copies them to the destination.
// It skips downloading ranges of all zero bytes.
func (is *ImageioDataSource) StreamExtents(extentsReader *extentReader, fileName string) error {
outFile, err := util.OpenFileOrBlockDevice(fileName)
outFile, err := OpenFileOrBlockDevice(fileName)
if err != nil {
return err
}
@ -297,9 +297,9 @@ func (is *ImageioDataSource) StreamExtents(extentsReader *extentReader, fileName
preallocated := info.Size() >= int64(is.contentLength)
// Choose seek for regular files, and hole punching for block devices and pre-allocated files
zeroRange := util.AppendZeroWithTruncate
zeroRange := AppendZeroWithTruncate
if isBlock || preallocated {
zeroRange = util.PunchHole
zeroRange = PunchHole
}
// Transfer all the non-zero extents, and try to quickly write out blocks of all zero bytes for extents that only contain zero
@ -308,7 +308,7 @@ func (is *ImageioDataSource) StreamExtents(extentsReader *extentReader, fileName
err = zeroRange(outFile, extent.Start, extent.Length)
if err != nil {
klog.Infof("Initial zero method failed, trying AppendZeroWithWrite instead. Error was: %v", err)
zeroRange = util.AppendZeroWithWrite // If the initial choice fails, fall back to regular file writing
zeroRange = AppendZeroWithWrite // If the initial choice fails, fall back to regular file writing
err = zeroRange(outFile, extent.Start, extent.Length)
if err != nil {
return errors.Wrap(err, "failed to zero range on destination")

View File

@ -127,7 +127,7 @@ var _ = Describe("Imageio data source", func() {
It("NewImageioDataSource tranfer should fail if invalid path", func() {
dp, err := NewImageioDataSource(ts.URL, "", "", tempDir, diskID, "", "")
Expect(err).ToNot(HaveOccurred())
_, err = dp.Transfer("")
_, err = dp.Transfer("", false)
Expect(err).To(HaveOccurred())
})
@ -136,7 +136,7 @@ var _ = Describe("Imageio data source", func() {
Expect(err).ToNot(HaveOccurred())
_, err = dp.Info()
Expect(err).NotTo(HaveOccurred())
phase, err := dp.TransferFile("")
phase, err := dp.TransferFile("", false)
Expect(err).To(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseError))
})

View File

@ -28,7 +28,6 @@ import (
"k8s.io/klog/v2"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/util"
)
const (
@ -82,13 +81,13 @@ func (rd *RegistryDataSource) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source registry to a temporary location.
func (rd *RegistryDataSource) Transfer(path string) (ProcessingPhase, error) {
func (rd *RegistryDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
rd.imageDir = filepath.Join(path, containerDiskImageDir)
if err := CleanAll(rd.imageDir); err != nil {
return ProcessingPhaseError, err
}
size, err := util.GetAvailableSpace(path)
size, err := GetAvailableSpace(path)
if err != nil {
return ProcessingPhaseError, err
}
@ -98,7 +97,7 @@ func (rd *RegistryDataSource) Transfer(path string) (ProcessingPhase, error) {
}
klog.V(1).Infof("Copying registry image to scratch space.")
rd.info, err = CopyRegistryImage(rd.endpoint, path, containerDiskImageDir, rd.accessKey, rd.secKey, rd.certDir, rd.insecureTLS)
rd.info, err = CopyRegistryImage(rd.endpoint, path, containerDiskImageDir, rd.accessKey, rd.secKey, rd.certDir, rd.insecureTLS, preallocation)
if err != nil {
return ProcessingPhaseError, errors.Wrapf(err, "Failed to read registry image")
}
@ -115,7 +114,7 @@ func (rd *RegistryDataSource) Transfer(path string) (ProcessingPhase, error) {
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (rd *RegistryDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (rd *RegistryDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
return ProcessingPhaseError, errors.New("Transferfile should not be called")
}
@ -217,7 +216,7 @@ func collectCerts(certDir, targetDir, targetPrefix string) error {
if !strings.HasSuffix(obj.Name(), ".crt") {
continue
}
if err := util.LinkFile(filepath.Join(certDir, obj.Name()), filepath.Join(targetDir, targetPrefix+obj.Name())); err != nil {
if err := LinkFile(filepath.Join(certDir, obj.Name()), filepath.Join(targetDir, targetPrefix+obj.Name())); err != nil {
return err
}
}

View File

@ -49,7 +49,7 @@ var _ = Describe("Registry data source", func() {
ds = NewRegistryDataSource(ep, accKey, secKey, certDir, insecureRegistry)
// Need to pass in a real path if we don't want scratch space needed error.
result, err := ds.Transfer(scratchPath)
result, err := ds.Transfer(scratchPath, false)
if !wantErr {
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseConvert).To(Equal(result))
@ -67,7 +67,7 @@ var _ = Describe("Registry data source", func() {
It("TransferFile should not be called", func() {
ds = NewRegistryDataSource("", "", "", "", true)
result, err := ds.TransferFile("file")
result, err := ds.TransferFile("file", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})

View File

@ -16,7 +16,6 @@ import (
"k8s.io/klog/v2"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/util"
)
const (
@ -86,19 +85,19 @@ func (sd *S3DataSource) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to a temporary location.
func (sd *S3DataSource) Transfer(path string) (ProcessingPhase, error) {
func (sd *S3DataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
file := filepath.Join(path, tempFile)
if err := CleanAll(file); err != nil {
return ProcessingPhaseError, err
}
size, _ := util.GetAvailableSpace(path)
size, _ := GetAvailableSpace(path)
if size <= int64(0) {
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
err := streamDataToFile(sd.readers.TopReader(), file)
_, _, err := StreamDataToFile(sd.readers.TopReader(), file, preallocation)
if err != nil {
return ProcessingPhaseError, err
}
@ -108,12 +107,12 @@ func (sd *S3DataSource) Transfer(path string) (ProcessingPhase, error) {
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (sd *S3DataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (sd *S3DataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
err := streamDataToFile(sd.readers.TopReader(), fileName)
_, _, err := StreamDataToFile(sd.readers.TopReader(), fileName, preallocation)
if err != nil {
return ProcessingPhaseError, err
}

View File

@ -110,7 +110,7 @@ var _ = Describe("S3 data source", func() {
nextPhase, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(nextPhase))
result, err := sd.Transfer(scratchPath)
result, err := sd.Transfer(scratchPath, false)
if !wantErr {
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseConvert).To(Equal(result))
@ -146,7 +146,7 @@ var _ = Describe("S3 data source", func() {
Expect(ProcessingPhaseTransferScratch).To(Equal(nextPhase))
err = sourceFile.Close()
Expect(err).NotTo(HaveOccurred())
result, err := sd.Transfer(tmpDir)
result, err := sd.Transfer(tmpDir, false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -162,7 +162,7 @@ var _ = Describe("S3 data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"))
result, err = sd.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -178,7 +178,7 @@ var _ = Describe("S3 data source", func() {
result, err := sd.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = sd.TransferFile("/invalidpath/invalidfile")
result, err = sd.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})

View File

@ -125,7 +125,8 @@ func processLayer(ctx context.Context,
destDir string,
pathPrefix string,
cache types.BlobInfoCache,
stopAtFirst bool) (bool, error) {
stopAtFirst bool,
preallocation bool) (bool, error) {
var reader io.ReadCloser
reader, _, err := src.GetBlob(ctx, layer, cache)
if err != nil {
@ -164,7 +165,7 @@ func processLayer(ctx context.Context,
return false, errors.Wrap(err, "Error creating output file's directory")
}
if err := streamDataToFile(tarReader, destFile); err != nil {
if _, _, err := StreamDataToFile(tarReader, destFile, preallocation); err != nil {
klog.Errorf("Error copying file: %v", err)
return false, errors.Wrap(err, "Error copying file")
}
@ -192,7 +193,7 @@ func safeJoinPaths(dir, path string) (v string, err error) {
return "", fmt.Errorf("%s: %s", "content filepath is tainted", path)
}
func copyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir string, insecureRegistry, stopAtFirst bool) (*types.ImageInspectInfo, error) {
func copyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir string, insecureRegistry, stopAtFirst bool, preallocation bool) (*types.ImageInspectInfo, error) {
klog.Infof("Downloading image from '%v', copying file from '%v' to '%v'", url, pathPrefix, destDir)
ctx, cancel := commandTimeoutContext()
@ -219,7 +220,7 @@ func copyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir stri
for _, layer := range layers {
klog.Infof("Processing layer %+v", layer)
found, err = processLayer(ctx, srcCtx, src, layer, destDir, pathPrefix, cache, stopAtFirst)
found, err = processLayer(ctx, srcCtx, src, layer, destDir, pathPrefix, cache, stopAtFirst, preallocation)
if found {
break
}
@ -286,8 +287,8 @@ func GetImageDigest(url, accessKey, secKey, certDir string, insecureRegistry boo
// secKey: secretKey for the registry described in url.
// certDir: directory public CA keys are stored for registry identity verification
// insecureRegistry: boolean if true will allow insecure registries.
func CopyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir string, insecureRegistry bool) (*types.ImageInspectInfo, error) {
return copyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir, insecureRegistry, true)
func CopyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir string, insecureRegistry bool, preallocation bool) (*types.ImageInspectInfo, error) {
return copyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir, insecureRegistry, true, preallocation)
}
// CopyRegistryImageAll download image from registry with docker image API. It will extract all files under the pathPrefix
@ -298,6 +299,6 @@ func CopyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir stri
// secKey: secretKey for the registry described in url.
// certDir: directory public CA keys are stored for registry identity verification
// insecureRegistry: boolean if true will allow insecure registries.
func CopyRegistryImageAll(url, destDir, pathPrefix, accessKey, secKey, certDir string, insecureRegistry bool) (*types.ImageInspectInfo, error) {
return copyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir, insecureRegistry, false)
func CopyRegistryImageAll(url, destDir, pathPrefix, accessKey, secKey, certDir string, insecureRegistry bool, preallocation bool) (*types.ImageInspectInfo, error) {
return copyRegistryImage(url, destDir, pathPrefix, accessKey, secKey, certDir, insecureRegistry, false, preallocation)
}

View File

@ -41,7 +41,7 @@ var _ = Describe("Registry Importer", func() {
})
DescribeTable("Should extract a single file", func(source string) {
info, err := CopyRegistryImage(source, tmpDir, "disk/cirros-0.3.4-x86_64-disk.img", "", "", "", false)
info, err := CopyRegistryImage(source, tmpDir, "disk/cirros-0.3.4-x86_64-disk.img", "", "", "", false, false)
Expect(err).ToNot(HaveOccurred())
Expect(info).ToNot(BeNil())
@ -52,7 +52,7 @@ var _ = Describe("Registry Importer", func() {
Entry("when one of the image layers is malformed", malformedSource),
)
It("Should extract files prefixed by path", func() {
info, err := CopyRegistryImageAll(source, tmpDir, "etc/", "", "", "", false)
info, err := CopyRegistryImageAll(source, tmpDir, "etc/", "", "", "", false, false)
Expect(err).ToNot(HaveOccurred())
Expect(info).ToNot(BeNil())
@ -63,7 +63,7 @@ var _ = Describe("Registry Importer", func() {
Expect(file).To(BeARegularFile())
})
It("Should return an error if a single file is not found", func() {
info, err := CopyRegistryImage(source, tmpDir, "disk/invalid.img", "", "", "", false)
info, err := CopyRegistryImage(source, tmpDir, "disk/invalid.img", "", "", "", false, false)
Expect(err).To(HaveOccurred())
Expect(info).To(BeNil())
@ -72,7 +72,7 @@ var _ = Describe("Registry Importer", func() {
Expect(err).To(HaveOccurred())
})
It("Should return an error if no files matches a prefix", func() {
info, err := CopyRegistryImageAll(source, tmpDir, "invalid/", "", "", "", false)
info, err := CopyRegistryImageAll(source, tmpDir, "invalid/", "", "", "", false, false)
Expect(err).To(HaveOccurred())
Expect(info).To(BeNil())
})

View File

@ -59,13 +59,13 @@ func (ud *UploadDataSource) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to the passed in path.
func (ud *UploadDataSource) Transfer(path string) (ProcessingPhase, error) {
func (ud *UploadDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
if ud.contentType == cdiv1.DataVolumeKubeVirt {
file := filepath.Join(path, tempFile)
if err := CleanAll(file); err != nil {
return ProcessingPhaseError, err
}
size, err := util.GetAvailableSpace(path)
size, err := GetAvailableSpace(path)
if err != nil {
return ProcessingPhaseError, err
}
@ -73,7 +73,7 @@ func (ud *UploadDataSource) Transfer(path string) (ProcessingPhase, error) {
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
err = streamDataToFile(ud.readers.TopReader(), file)
_, _, err = StreamDataToFile(ud.readers.TopReader(), file, preallocation)
if err != nil {
return ProcessingPhaseError, err
}
@ -91,11 +91,11 @@ func (ud *UploadDataSource) Transfer(path string) (ProcessingPhase, error) {
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (ud *UploadDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (ud *UploadDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
err := streamDataToFile(ud.readers.TopReader(), fileName)
_, _, err := StreamDataToFile(ud.readers.TopReader(), fileName, preallocation)
if err != nil {
return ProcessingPhaseError, err
}
@ -146,12 +146,12 @@ func (aud *AsyncUploadDataSource) Info() (ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to the passed in path.
func (aud *AsyncUploadDataSource) Transfer(path string) (ProcessingPhase, error) {
func (aud *AsyncUploadDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
file := filepath.Join(path, tempFile)
if err := CleanAll(file); err != nil {
return ProcessingPhaseError, err
}
size, err := util.GetAvailableSpace(path)
size, err := GetAvailableSpace(path)
if err != nil {
return ProcessingPhaseError, err
}
@ -159,7 +159,7 @@ func (aud *AsyncUploadDataSource) Transfer(path string) (ProcessingPhase, error)
//Path provided is invalid.
return ProcessingPhaseError, ErrInvalidPath
}
err = streamDataToFile(aud.uploadDataSource.readers.TopReader(), file)
_, _, err = StreamDataToFile(aud.uploadDataSource.readers.TopReader(), file, preallocation)
if err != nil {
return ProcessingPhaseError, err
}
@ -170,11 +170,11 @@ func (aud *AsyncUploadDataSource) Transfer(path string) (ProcessingPhase, error)
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (aud *AsyncUploadDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (aud *AsyncUploadDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
if err := CleanAll(fileName); err != nil {
return ProcessingPhaseError, err
}
err := streamDataToFile(aud.uploadDataSource.readers.TopReader(), fileName)
_, _, err := StreamDataToFile(aud.uploadDataSource.readers.TopReader(), fileName, preallocation)
if err != nil {
return ProcessingPhaseError, err
}

View File

@ -91,7 +91,7 @@ var _ = Describe("Upload data source", func() {
ud = NewUploadDataSource(sourceFile, dvContentType)
_, err = ud.Info()
Expect(err).NotTo(HaveOccurred())
nextPhase, err := ud.Transfer(scratchPath)
nextPhase, err := ud.Transfer(scratchPath, false)
Expect(nextPhase).To(Equal(expectedPhase))
if nextPhase == ProcessingPhaseConvert {
Expect(err).NotTo(HaveOccurred())
@ -124,7 +124,7 @@ var _ = Describe("Upload data source", func() {
Expect(ProcessingPhaseTransferScratch).To(Equal(nextPhase))
err = sourceFile.Close()
Expect(err).NotTo(HaveOccurred())
result, err := ud.Transfer(tmpDir)
result, err := ud.Transfer(tmpDir, false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -137,7 +137,7 @@ var _ = Describe("Upload data source", func() {
result, err := ud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = ud.TransferFile(filepath.Join(tmpDir, "file"))
result, err = ud.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseResize).To(Equal(result))
})
@ -150,7 +150,7 @@ var _ = Describe("Upload data source", func() {
result, err := ud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = ud.TransferFile("/invalidpath/invalidfile")
result, err = ud.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -225,7 +225,7 @@ var _ = Describe("Async Upload data source", func() {
nextPhase, err := aud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferScratch).To(Equal(nextPhase))
result, err := aud.Transfer(scratchPath)
result, err := aud.Transfer(scratchPath, false)
if !wantErr {
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseValidatePause).To(Equal(result))
@ -248,7 +248,7 @@ var _ = Describe("Async Upload data source", func() {
Expect(ProcessingPhaseTransferScratch).To(Equal(nextPhase))
err = sourceFile.Close()
Expect(err).NotTo(HaveOccurred())
result, err := aud.Transfer(tmpDir)
result, err := aud.Transfer(tmpDir, false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})
@ -261,7 +261,7 @@ var _ = Describe("Async Upload data source", func() {
result, err := aud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = aud.TransferFile(filepath.Join(tmpDir, "file"))
result, err = aud.TransferFile(filepath.Join(tmpDir, "file"), false)
Expect(err).ToNot(HaveOccurred())
Expect(ProcessingPhaseValidatePause).To(Equal(result))
Expect(ProcessingPhaseResize).To(Equal(aud.GetResumePhase()))
@ -275,7 +275,7 @@ var _ = Describe("Async Upload data source", func() {
result, err := aud.Info()
Expect(err).NotTo(HaveOccurred())
Expect(ProcessingPhaseTransferDataFile).To(Equal(result))
result, err = aud.TransferFile("/invalidpath/invalidfile")
result, err = aud.TransferFile("/invalidpath/invalidfile", false)
Expect(err).To(HaveOccurred())
Expect(ProcessingPhaseError).To(Equal(result))
})

View File

@ -1,7 +1,6 @@
package importer
import (
"io"
"net/url"
"os"
"os/signal"
@ -10,7 +9,7 @@ import (
"github.com/pkg/errors"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"kubevirt.io/containerized-data-importer/pkg/common"
"kubevirt.io/containerized-data-importer/pkg/util"
@ -36,7 +35,7 @@ func ParseEndpoint(endpt string) (*url.URL, error) {
// CleanAll deletes all files at specified paths (recursively)
func CleanAll(paths ...string) error {
for _, p := range paths {
isDevice, err := util.IsDevice(p)
isDevice, err := IsDevice(p)
if err != nil {
return err
}
@ -58,6 +57,15 @@ func GetTerminationChannel() <-chan os.Signal {
return terminationChannel
}
// GetAvailableSpaceByVolumeMode calls another method based on the volumeMode parameter to get the amount of
// available space at the path specified.
func GetAvailableSpaceByVolumeMode(volumeMode v1.PersistentVolumeMode) (int64, error) {
if volumeMode == v1.PersistentVolumeBlock {
return GetAvailableSpaceBlock(common.WriteBlockPath)
}
return GetAvailableSpace(common.ImporterVolumePath)
}
// newTerminationChannel should be overridden for unit tests
var newTerminationChannel = GetTerminationChannel
@ -85,23 +93,3 @@ func envToLabel(env string) string {
return strings.ToLower(label)
}
// streamDataToFile provides a function to stream the specified io.Reader to the specified local file
func streamDataToFile(r io.Reader, fileName string) error {
outFile, err := util.OpenFileOrBlockDevice(fileName)
if err != nil {
return err
}
defer outFile.Close()
klog.V(1).Infof("Writing data...\n")
if _, err = io.Copy(outFile, r); err != nil {
klog.Errorf("Unable to write file from dataReader: %v\n", err)
os.Remove(outFile.Name())
if strings.Contains(err.Error(), "no space left on device") {
return errors.Wrapf(err, "unable to write to file")
}
return NewImagePullFailedError(err)
}
err = outFile.Sync()
return err
}

View File

@ -1,7 +1,6 @@
package importer
import (
"io"
"net/url"
"os"
"path/filepath"
@ -48,39 +47,6 @@ var _ = Describe("Parse endpoints", func() {
})
var _ = Describe("Stream Data To File", func() {
var (
err error
tmpDir string
)
BeforeEach(func() {
tmpDir, err = os.MkdirTemp("", "stream")
Expect(err).NotTo(HaveOccurred())
By("tmpDir: " + tmpDir)
})
AfterEach(func() {
os.RemoveAll(tmpDir)
})
DescribeTable("should", func(fileName string, useTmpDir bool, r io.Reader, errMsg string, wantErr bool) {
if useTmpDir {
fileName = filepath.Join(tmpDir, fileName)
}
err = streamDataToFile(r, fileName)
if !wantErr {
Expect(err).NotTo(HaveOccurred())
} else {
Expect(err).To(HaveOccurred())
Expect(strings.Contains(err.Error(), errMsg)).To(BeTrue())
}
},
Entry("succeed with valid reader and filename", "valid", true, strings.NewReader("test reader"), "", false),
Entry("fail with valid reader and invalid filename", "/invalidpath/invalidfile", false, strings.NewReader("test reader"), "no such file or directory", true),
)
})
var _ = Describe("Clean dir", func() {
var (
err error

View File

@ -945,7 +945,7 @@ func (vs *VDDKDataSource) GetTerminationMessage() *common.TerminationMessage {
}
// Transfer is called to transfer the data from the source to the path passed in.
func (vs *VDDKDataSource) Transfer(path string) (ProcessingPhase, error) {
func (vs *VDDKDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
return ProcessingPhaseTransferDataFile, nil
}
@ -966,7 +966,7 @@ func (vs *VDDKDataSource) IsDeltaCopy() bool {
var MockableStat = os.Stat
// TransferFile is called to transfer the data from the source to the file passed in.
func (vs *VDDKDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (vs *VDDKDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
defer func() { _ = vs.VMware.Close() }()
if !vs.IsWarm() {

View File

@ -20,11 +20,11 @@ func (V VDDKDataSource) Info() (ProcessingPhase, error) {
panic("not support")
}
func (V VDDKDataSource) Transfer(path string) (ProcessingPhase, error) {
func (V VDDKDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
panic("not support")
}
func (V VDDKDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (V VDDKDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
panic("not support")
}

View File

@ -21,11 +21,11 @@ func (V VDDKDataSource) Info() (ProcessingPhase, error) {
panic("not support")
}
func (V VDDKDataSource) Transfer(path string) (ProcessingPhase, error) {
func (V VDDKDataSource) Transfer(path string, preallocation bool) (ProcessingPhase, error) {
panic("not support")
}
func (V VDDKDataSource) TransferFile(fileName string) (ProcessingPhase, error) {
func (V VDDKDataSource) TransferFile(fileName string, preallocation bool) (ProcessingPhase, error) {
panic("not support")
}

View File

@ -119,7 +119,7 @@ var _ = Describe("VDDK data source", func() {
phase, err := dp.Info()
Expect(err).ToNot(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseTransferDataFile))
phase, err = dp.TransferFile("")
phase, err = dp.TransferFile("", false)
Expect(err).ToNot(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseResize))
})
@ -131,7 +131,7 @@ var _ = Describe("VDDK data source", func() {
phase, err := dp.Info()
Expect(err).ToNot(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseTransferDataFile))
phase, err = dp.TransferFile("")
phase, err = dp.TransferFile("", false)
Expect(err).To(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseError))
})
@ -187,7 +187,7 @@ var _ = Describe("VDDK data source", func() {
currentMockNbdFunctions.BlockStatus = func(uint64, uint64, libnbd.ExtentCallback, *libnbd.BlockStatusOptargs) error {
return fmt.Errorf("this should not be called on zero-change test")
}
phase, err := dp.TransferFile("")
phase, err := dp.TransferFile("", false)
Expect(err).ToNot(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseComplete))
})
@ -211,7 +211,7 @@ var _ = Describe("VDDK data source", func() {
}
mockSinkBuffer = bytes.Repeat([]byte{0x00}, int(dp.Size))
phase, err := dp.TransferFile("target")
phase, err := dp.TransferFile("target", false)
Expect(err).ToNot(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseResize))
@ -241,7 +241,7 @@ var _ = Describe("VDDK data source", func() {
MockableStat = func(string) (fs.FileInfo, error) {
return nil, nil
}
phase, err := snap1.TransferFile("target")
phase, err := snap1.TransferFile("target", false)
Expect(err).ToNot(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseResize))
@ -301,7 +301,7 @@ var _ = Describe("VDDK data source", func() {
}
changedSourceSum := md5.Sum(sourceBytes) //nolint:gosec // This is test code
phase, err = snap2.TransferFile(".")
phase, err = snap2.TransferFile(".", false)
Expect(err).ToNot(HaveOccurred())
Expect(phase).To(Equal(ProcessingPhaseComplete))
@ -595,7 +595,7 @@ var _ = Describe("VDDK data source", func() {
ds, err := NewVDDKDataSource("http://vcenter.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", diskName, snapshotName, changeID, "", v1.PersistentVolumeFilesystem)
Expect(err).ToNot(HaveOccurred())
_, err = ds.TransferFile("")
_, err = ds.TransferFile("", false)
Expect(err).ToNot(HaveOccurred())
Expect(queries[0]).To(BeTrue())
Expect(queries[1]).To(BeTrue())

View File

@ -487,13 +487,12 @@ func cloneProcessor(stream io.ReadCloser, contentType, dest string, preallocate
}
defer stream.Close()
bytesRead, bytesWrittenn, err := util.StreamDataToFile(stream, dest, preallocate)
_, _, err := importer.StreamDataToFile(stream, dest, preallocate)
if err != nil {
return false, err
}
klog.Infof("Read %d bytes, wrote %d bytes to %s", bytesRead, bytesWrittenn, dest)
return false, nil
}
@ -575,7 +574,7 @@ func newSnappyReadCloser(stream io.ReadCloser) io.ReadCloser {
}
func handleStreamError(w http.ResponseWriter, err error) {
if errors.As(err, &importer.ValidationSizeError{}) || strings.Contains(err.Error(), "no space left on device") {
if importer.IsNoCapacityError(err) {
w.WriteHeader(http.StatusBadRequest)
err = errors.New("effective image size is larger than the reported available storage. A larger PVC is required")
} else {

View File

@ -153,12 +153,12 @@ func (amd *AsyncMockDataSource) Info() (importer.ProcessingPhase, error) {
}
// Transfer is called to transfer the data from the source to the passed in path.
func (amd *AsyncMockDataSource) Transfer(path string) (importer.ProcessingPhase, error) {
func (amd *AsyncMockDataSource) Transfer(path string, preallocation bool) (importer.ProcessingPhase, error) {
return importer.ProcessingPhasePause, nil
}
// TransferFile is called to transfer the data from the source to the passed in file.
func (amd *AsyncMockDataSource) TransferFile(fileName string) (importer.ProcessingPhase, error) {
func (amd *AsyncMockDataSource) TransferFile(fileName string, preallocation bool) (importer.ProcessingPhase, error) {
return importer.ProcessingPhasePause, nil
}

View File

@ -2,17 +2,13 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"file.go",
"util.go",
],
srcs = ["util.go"],
importpath = "kubevirt.io/containerized-data-importer/pkg/util",
visibility = ["//visibility:public"],
deps = [
"//pkg/common:go_default_library",
"//staging/src/kubevirt.io/containerized-data-importer-api/pkg/apis/core/v1beta1:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/golang.org/x/sys/unix:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -23,7 +19,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"file_test.go",
"util_suite_test.go",
"util_test.go",
],

View File

@ -6,11 +6,13 @@ import (
"crypto/md5" //nolint:gosec // This is not a security-sensitive use case
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"math"
"math/rand"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
@ -26,7 +28,6 @@ import (
)
const (
blockdevFileName = "/usr/sbin/blockdev"
// DefaultAlignBlockSize is the alignment size we use to align disk images, its a multiple of all known hardware block sizes 512/4k/8k/32k/64k.
DefaultAlignBlockSize = 1024 * 1024
)
@ -92,15 +93,6 @@ func (r *CountingReader) Close() error {
return r.Reader.Close()
}
// GetAvailableSpaceByVolumeMode calls another method based on the volumeMode parameter to get the amount of
// available space at the path specified.
func GetAvailableSpaceByVolumeMode(volumeMode v1.PersistentVolumeMode) (int64, error) {
if volumeMode == v1.PersistentVolumeBlock {
return GetAvailableSpaceBlock(common.WriteBlockPath)
}
return GetAvailableSpace(common.ImporterVolumePath)
}
// MinQuantity calculates the minimum of two quantities.
func MinQuantity(availableSpace, imageSize *resource.Quantity) resource.Quantity {
if imageSize.Cmp(*availableSpace) == 1 {
@ -262,3 +254,62 @@ func ResolveVolumeMode(volumeMode *v1.PersistentVolumeMode) v1.PersistentVolumeM
}
return retVolumeMode
}
// CopyFile copies a file from one location to another.
func CopyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
_, err = io.Copy(out, in)
if err != nil {
return err
}
return out.Close()
}
// CopyDir copies a dir from one location to another.
func CopyDir(source string, dest string) error {
// get properties of source dir
sourceinfo, err := os.Stat(source)
if err != nil {
return err
}
// create dest dir
err = os.MkdirAll(dest, sourceinfo.Mode())
if err != nil {
return err
}
directory, _ := os.Open(source)
objects, err := directory.Readdir(-1)
for _, obj := range objects {
src := filepath.Join(source, obj.Name())
dst := filepath.Join(dest, obj.Name())
if obj.IsDir() {
// create sub-directories - recursively
err = CopyDir(src, dst)
if err != nil {
fmt.Println(err)
}
} else {
// perform copy
err = CopyFile(src, dst)
if err != nil {
fmt.Println(err)
}
}
}
return err
}

View File

@ -12,6 +12,51 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
)
const (
TestImagesDir = "../../tests/images"
pattern = "^[a-zA-Z0-9]+$"
)
var (
fileDir, _ = filepath.Abs(TestImagesDir)
)
var _ = Describe("Copy files", func() {
var destTmp string
var err error
BeforeEach(func() {
destTmp, err = os.MkdirTemp("", "dest")
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
err = os.RemoveAll(destTmp)
Expect(err).NotTo(HaveOccurred())
os.Remove("test.txt")
})
It("Should copy file from source to dest, with valid source and dest", func() {
err = CopyFile(filepath.Join(TestImagesDir, "content.tar"), filepath.Join(destTmp, "target.tar"))
Expect(err).ToNot(HaveOccurred())
sourceMd5, err := Md5sum(filepath.Join(TestImagesDir, "content.tar"))
Expect(err).ToNot(HaveOccurred())
targetMd5, err := Md5sum(filepath.Join(destTmp, "target.tar"))
Expect(err).ToNot(HaveOccurred())
Expect(sourceMd5).Should(Equal(targetMd5))
})
It("Should not copy file from source to dest, with invalid source", func() {
err = CopyFile(filepath.Join(TestImagesDir, "content.tar22"), filepath.Join(destTmp, "target.tar"))
Expect(err).To(HaveOccurred())
})
It("Should not copy file from source to dest, with invalid target", func() {
err = CopyFile(filepath.Join(TestImagesDir, "content.tar"), filepath.Join("/invalidpath", "target.tar"))
Expect(err).To(HaveOccurred())
})
})
var _ = Describe("Util", func() {
It("Should match RandAlphaNum", func() {
got := RandAlphaNum(8)