diff --git a/doc/datavolumes.md b/doc/datavolumes.md index 2b7c8cf64..451438fac 100644 --- a/doc/datavolumes.md +++ b/doc/datavolumes.md @@ -338,12 +338,12 @@ spec: uuid: "52260566-b032-36cb-55b1-79bf29e30490" thumbprint: "20:6C:8A:5D:44:40:B3:79:4B:28:EA:76:13:60:90:6E:49:D9:D9:A3" # SSL fingerprint of vCenter/ESX host secretRef: "vddk-credentials" - finalCheckpoint: true - checkpoints: - - current: "snapshot-1" - previous: "" - - current: "snapshot-2" - previous: "snapshot-1" + finalCheckpoint: true + checkpoints: + - current: "snapshot-1" + previous: "" + - current: "snapshot-2" + previous: "snapshot-1" pvc: accessModes: - ReadWriteOnce diff --git a/pkg/importer/vddk-datasource_amd64.go b/pkg/importer/vddk-datasource_amd64.go index 791e3a321..763e4e8d4 100644 --- a/pkg/importer/vddk-datasource_amd64.go +++ b/pkg/importer/vddk-datasource_amd64.go @@ -26,6 +26,7 @@ import ( "context" "errors" "fmt" + "math" "net/url" "os" "regexp" @@ -501,7 +502,7 @@ const MaxPreadLengthVC = (2 << 20) // MaxPreadLength is the maxmimum read size to request from VMware. Default to // the larger option, and reduce it in createVddkDataSource when connecting to // vCenter endpoints. -var MaxPreadLength uint32 = MaxPreadLengthESX +var MaxPreadLength int = MaxPreadLengthESX // NbdOperations provides a mockable interface for the things needed from libnbd. type NbdOperations interface { @@ -513,8 +514,8 @@ type NbdOperations interface { // BlockStatusData holds zero/hole status for one block of data type BlockStatusData struct { - Offset uint64 - Length uint32 + Offset int64 + Length int64 Flags uint32 } @@ -530,7 +531,13 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc var blocks []*BlockStatusData // Callback for libnbd.BlockStatus. Needs to modify blocks list above. - updateBlocksCallback := func(metacontext string, offset uint64, extents []uint32, err *int) int { + updateBlocksCallback := func(metacontext string, nbdOffset uint64, extents []uint32, err *int) int { + if nbdOffset > math.MaxInt64 { + klog.Errorf("Block status offset too big for conversion: 0x%x", nbdOffset) + return -2 + } + offset := int64(nbdOffset) + if *err != 0 { klog.Errorf("Block status callback error at offset %d: error code %d", offset, *err) return *err @@ -544,17 +551,17 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc return -1 } for i := 0; i < len(extents); i += 2 { - length, flags := extents[i], extents[i+1] + length, flags := int64(extents[i]), extents[i+1] if blocks != nil { last := len(blocks) - 1 lastBlock := blocks[last] lastFlags := lastBlock.Flags - lastOffset := lastBlock.Offset + uint64(lastBlock.Length) + lastOffset := lastBlock.Offset + lastBlock.Length if lastFlags == flags && lastOffset == offset { // Merge with previous block blocks[last] = &BlockStatusData{ Offset: lastBlock.Offset, - Length: lastBlock.Length, + Length: lastBlock.Length + length, Flags: lastFlags, } } else { @@ -563,15 +570,15 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc } else { blocks = append(blocks, &BlockStatusData{Offset: offset, Length: length, Flags: flags}) } - offset += uint64(length) + offset += length } return 0 } if extent.Length < 1024*1024 { blocks = append(blocks, &BlockStatusData{ - Offset: uint64(extent.Start), - Length: uint32(extent.Length), + Offset: extent.Start, + Length: extent.Length, Flags: 0}) return blocks } @@ -579,30 +586,34 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc lastOffset := extent.Start endOffset := extent.Start + extent.Length for lastOffset < endOffset { - var length uint64 + var length int64 missingLength := endOffset - lastOffset if missingLength > (MaxBlockStatusLength) { - length = (MaxBlockStatusLength) + length = MaxBlockStatusLength } else { - length = uint64(missingLength) + length = missingLength } - err := handle.BlockStatus(length, uint64(lastOffset), updateBlocksCallback, &fixedOptArgs) - if err != nil { - klog.Errorf("Error getting block status at offset %d, returning whole block instead. Error was: %v", lastOffset, err) + createWholeBlock := func() []*BlockStatusData { block := &BlockStatusData{ - Offset: uint64(extent.Start), - Length: uint32(extent.Length), + Offset: extent.Start, + Length: extent.Length, Flags: 0, } blocks = []*BlockStatusData{block} return blocks } - last := len(blocks) - 1 - newOffset := blocks[last].Offset + uint64(blocks[last].Length) - if uint64(lastOffset) == newOffset { - klog.Infof("No new block status data at offset %d", newOffset) + err := handle.BlockStatus(uint64(length), uint64(lastOffset), updateBlocksCallback, &fixedOptArgs) + if err != nil { + klog.Errorf("Error getting block status at offset %d, returning whole block instead. Error was: %v", lastOffset, err) + return createWholeBlock() } - lastOffset = int64(newOffset) + last := len(blocks) - 1 + newOffset := blocks[last].Offset + blocks[last].Length + if lastOffset == newOffset { + klog.Infof("No new block status data at offset %d, returning whole block.", newOffset) + return createWholeBlock() + } + lastOffset = newOffset } return blocks @@ -628,29 +639,29 @@ func CopyRange(handle NbdOperations, sink VDDKDataSink, block *BlockStatusData, return err } - buffer := bytes.Repeat([]byte{0}, int(MaxPreadLength)) - count := uint32(0) + buffer := bytes.Repeat([]byte{0}, MaxPreadLength) + count := int64(0) for count < block.Length { - if block.Length-count < MaxPreadLength { + if block.Length-count < int64(MaxPreadLength) { buffer = bytes.Repeat([]byte{0}, int(block.Length-count)) } length := len(buffer) - offset := block.Offset + uint64(count) - err := handle.Pread(buffer, offset, nil) + offset := block.Offset + count + err := handle.Pread(buffer, uint64(offset), nil) if err != nil { klog.Errorf("Error reading from source at offset %d: %v", offset, err) return err } - written, err := sink.Pwrite(buffer, offset) + written, err := sink.Pwrite(buffer, uint64(offset)) if err != nil { klog.Errorf("Failed to write data block at offset %d to local file: %v", block.Offset, err) return err } updateProgress(written) - count += uint32(length) + count += int64(length) } return nil } @@ -661,7 +672,7 @@ func CopyRange(handle NbdOperations, sink VDDKDataSink, block *BlockStatusData, type VDDKDataSink interface { Pwrite(buf []byte, offset uint64) (int, error) Write(buf []byte) (int, error) - ZeroRange(offset uint64, length uint32) error + ZeroRange(offset int64, length int64) error Close() } @@ -718,11 +729,11 @@ func (sink *VDDKFileSink) Write(buf []byte) (int, error) { } // ZeroRange fills the destination range with zero bytes -func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error { - punch := func(offset uint64, length uint32) error { +func (sink *VDDKFileSink) ZeroRange(offset int64, length int64) error { + punch := func(offset int64, length int64) error { klog.Infof("Punching %d-byte hole at offset %d", length, offset) flags := uint32(unix.FALLOC_FL_PUNCH_HOLE | unix.FALLOC_FL_KEEP_SIZE) - return syscall.Fallocate(int(sink.file.Fd()), flags, int64(offset), int64(length)) + return syscall.Fallocate(int(sink.file.Fd()), flags, offset, length) } var err error @@ -734,8 +745,8 @@ func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error { if err != nil { klog.Errorf("Unable to stat destination file: %v", err) } else { // Filesystem - if offset+uint64(length) > uint64(info.Size()) { // Truncate only if extending the file - err = syscall.Ftruncate(int(sink.file.Fd()), int64(offset+uint64(length))) + if offset+length > info.Size() { // Truncate only if extending the file + err = syscall.Ftruncate(int(sink.file.Fd()), offset+length) } else { // Otherwise, try to punch a hole in the file err = punch(offset, length) } @@ -743,22 +754,22 @@ func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error { } if err != nil { // Fall back to regular pwrite - klog.Errorf("Unable to zero range %d - %d on destination, falling back to pwrite: %v", offset, offset+uint64(length), err) + klog.Errorf("Unable to zero range %d - %d on destination, falling back to pwrite: %v", offset, offset+length, err) err = nil - count := uint32(0) - blocksize := uint32(16 << 20) - buffer := bytes.Repeat([]byte{0}, int(blocksize)) + count := int64(0) + const blocksize = 16 << 20 + buffer := bytes.Repeat([]byte{0}, blocksize) for count < length { remaining := length - count if remaining < blocksize { buffer = bytes.Repeat([]byte{0}, int(remaining)) } - written, err := sink.Pwrite(buffer, offset) + written, err := sink.Pwrite(buffer, uint64(offset)) if err != nil { klog.Errorf("Unable to write %d zeroes at offset %d: %v", length, offset, err) break } - count += uint32(written) + count += int64(written) } } diff --git a/pkg/importer/vddk-datasource_test.go b/pkg/importer/vddk-datasource_test.go index 13bfc665d..6c749a452 100644 --- a/pkg/importer/vddk-datasource_test.go +++ b/pkg/importer/vddk-datasource_test.go @@ -60,6 +60,7 @@ var _ = Describe("VDDK data source", func() { newTerminationChannel = createMockTerminationChannel currentExport = defaultMockNbdExport() currentVMwareFunctions = defaultMockVMwareFunctions() + currentMockNbdFunctions = defaultMockNbdFunctions() }) AfterEach(func() { @@ -430,10 +431,10 @@ var _ = Describe("VDDK data source", func() { } _, err := NewVDDKDataSource("http://esx.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", diskName, "", "", "", v1.PersistentVolumeFilesystem) Expect(err).ToNot(HaveOccurred()) - Expect(MaxPreadLength).To(Equal(uint32(MaxPreadLengthESX))) + Expect(MaxPreadLength).To(Equal(MaxPreadLengthESX)) _, err = NewVDDKDataSource("http://vcenter.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", diskName, "", "", "", v1.PersistentVolumeFilesystem) Expect(err).ToNot(HaveOccurred()) - Expect(MaxPreadLength).To(Equal(uint32(MaxPreadLengthVC))) + Expect(MaxPreadLength).To(Equal(MaxPreadLengthVC)) }) It("GetTerminationMessage should contain VDDK connection information", func() { @@ -482,26 +483,266 @@ var _ = Describe("VDDK log watcher", func() { ) }) +var _ = Describe("VDDK get block status", func() { + defaultMockNbd := defaultMockNbdFunctions() + BeforeEach(func() { + currentMockNbdFunctions = defaultMockNbdFunctions() + }) + + It("should not request block status for a range smaller than 1MB", func() { + nbdCalled := false + currentMockNbdFunctions.GetSize = func() (uint64, error) { + nbdCalled = true + return defaultMockNbd.GetSize() + } + currentMockNbdFunctions.Pread = func(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error { + nbdCalled = true + return defaultMockNbd.Pread(buf, offset, optargs) + } + currentMockNbdFunctions.Close = func() *libnbd.LibnbdError { + nbdCalled = true + return defaultMockNbd.Close() + } + currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error { + nbdCalled = true + return defaultMockNbd.BlockStatus(length, offset, callback, optargs) + } + + extent := types.DiskChangeExtent{ + Start: 0, + Length: 1024*1024 - 1, + } + expectedBlocks := []*BlockStatusData{ + { + Offset: extent.Start, + Length: extent.Length, + Flags: 0, + }, + } + + blocks := GetBlockStatus(&mockNbdOperations{}, extent) + Expect(blocks).To(Equal(expectedBlocks)) + Expect(nbdCalled).To(BeFalse()) + }) + + It("should return the whole block when libnbd returns an error", func() { + nbdCalled := false + currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error { + nbdCalled = true + return errors.New("Block status failure") + } + + extent := types.DiskChangeExtent{ + Start: 0, + Length: 1024 * 1024, + } + expectedBlocks := []*BlockStatusData{ + { + Offset: extent.Start, + Length: extent.Length, + Flags: 0, + }, + } + + blocks := GetBlockStatus(&mockNbdOperations{}, extent) + Expect(blocks).To(Equal(expectedBlocks)) + Expect(nbdCalled).To(BeTrue()) + }) + + It("should return the whole block when block list is unchanged after status request", func() { + // Default mock block status callback always returns the same extents, + // so force it to return multiple blocks in the list by asking for more + // than the maximum single block status length. This triggers the + // "No new block status data" codepath in GetBlockStatus. + extent := types.DiskChangeExtent{ + Start: 0, + Length: MaxBlockStatusLength + 1024, + } + expectedBlocks := []*BlockStatusData{ + { + Offset: extent.Start, + Length: extent.Length, + Flags: 0, + }, + } + + blocks := GetBlockStatus(&mockNbdOperations{}, extent) + Expect(blocks).To(Equal(expectedBlocks)) + }) + + It("should chunk up block status requests when asking for more than the maximum status request length", func() { + timesCalled := 0 + currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error { + if timesCalled == 0 { + Expect(offset).To(Equal(uint64(0))) + Expect(length).To(Equal(uint64(MaxBlockStatusLength))) + } + if timesCalled == 1 { + Expect(offset).To(Equal(uint64(MaxBlockStatusLength))) + Expect(length).To(Equal(uint64(1024))) + } + timesCalled++ + err := 0 + callback("base:allocation", offset, []uint32{uint32(length), 0}, &err) + return nil + } + + extent := types.DiskChangeExtent{ + Start: 0, + Length: MaxBlockStatusLength + 1024, + } + expectedBlocks := []*BlockStatusData{ + { + Offset: extent.Start, + Length: extent.Length, + Flags: 0, + }, + } + + blocks := GetBlockStatus(&mockNbdOperations{}, extent) + Expect(blocks).To(Equal(expectedBlocks)) + Expect(timesCalled).To(Equal(2)) + }) + + It("should return multiple blocks when data source ranges have different flags", func() { + const blockSize = 1024 * 1024 + + currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error { + err := 0 + if offset == 0 { + callback("base:allocation", offset, []uint32{blockSize, 0}, &err) + } + if offset == blockSize { + callback("base:allocation", offset, []uint32{blockSize, 3}, &err) + } + return nil + } + + extent := types.DiskChangeExtent{ + Start: 0, + Length: 2 * blockSize, + } + expectedBlocks := []*BlockStatusData{ + { + Offset: 0, + Length: blockSize, + Flags: 0, + }, + { + Offset: blockSize, + Length: blockSize, + Flags: 3, + }, + } + + blocks := GetBlockStatus(&mockNbdOperations{}, extent) + Expect(blocks).To(Equal(expectedBlocks)) + }) + + It("should handle large blocks with the same flags", func() { + const blockSize = MaxBlockStatusLength * 2 + + extent := types.DiskChangeExtent{ + Start: 0, + Length: blockSize, + } + expectedBlocks := []*BlockStatusData{ + { + Offset: extent.Start, + Length: extent.Length, + Flags: 0, + }, + } + + blocks := GetBlockStatus(&mockNbdOperations{}, extent) + Expect(blocks).To(Equal(expectedBlocks)) + }) + + It("should handle large blocks with different flags", func() { + const blockSize = 1024 * 1024 * 1024 * 1024 * 1024 + + // Alternate flags between blocks, so GetBlockStatus returns a long + // list of different blocks instead of one giant block. + flagSetting := true + currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error { + err := 0 + len := uint32(MaxBlockStatusLength) + if length < MaxBlockStatusLength { + len = uint32(length) + } + if flagSetting { + callback("base:allocation", offset, []uint32{len, 0}, &err) + } else { + callback("base:allocation", offset, []uint32{len, 3}, &err) + } + flagSetting = !flagSetting + return nil + } + + extent := types.DiskChangeExtent{ + Start: 0, + Length: blockSize, + } + + blocks := GetBlockStatus(&mockNbdOperations{}, extent) + for index, block := range blocks { + Expect(block.Offset).To(Equal(int64(index * MaxBlockStatusLength))) + Expect(block.Length).To(Equal(int64(MaxBlockStatusLength))) + if index%2 == 0 { + Expect(block.Flags).To(Equal(uint32(0))) + } else { + Expect(block.Flags).To(Equal(uint32(3))) + } + } + }) +}) + +type mockNbdFunctions struct { + GetSize func() (uint64, error) + Pread func(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error + Close func() *libnbd.LibnbdError + BlockStatus func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error +} + +func defaultMockNbdFunctions() mockNbdFunctions { + ops := mockNbdFunctions{} + ops.GetSize = func() (uint64, error) { + return currentExport.Size() + } + ops.Pread = func(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error { + fakebuf, err := currentExport.Read(offset) + copy(buf, fakebuf[offset:offset+uint64(len(buf))]) + return err + } + ops.Close = func() *libnbd.LibnbdError { + return nil + } + ops.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error { + err := 0 + callback("base:allocation", offset, []uint32{uint32(length), 0}, &err) + return nil + } + return ops +} + +var currentMockNbdFunctions mockNbdFunctions + type mockNbdOperations struct{} func (handle *mockNbdOperations) GetSize() (uint64, error) { - return currentExport.Size() + return currentMockNbdFunctions.GetSize() } func (handle *mockNbdOperations) Pread(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error { - fakebuf, err := currentExport.Read(offset) - copy(buf, fakebuf[offset:offset+uint64(len(buf))]) - return err + return currentMockNbdFunctions.Pread(buf, offset, optargs) } func (handle *mockNbdOperations) Close() *libnbd.LibnbdError { - return nil + return currentMockNbdFunctions.Close() } func (handle *mockNbdOperations) BlockStatus(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error { - err := 0 - callback("base:allocation", offset, []uint32{uint32(length), 0}, &err) - return nil + return currentMockNbdFunctions.BlockStatus(length, offset, callback, optargs) } func createMockVddkDataSource(endpoint string, accessKey string, secKey string, thumbprint string, uuid string, backingFile string, currentCheckpoint string, previousCheckpoint string, finalCheckpoint string, volumeMode v1.PersistentVolumeMode) (*VDDKDataSource, error) { @@ -534,9 +775,9 @@ type mockVddkDataSink struct { position int } -func (sink *mockVddkDataSink) ZeroRange(offset uint64, length uint32) error { +func (sink *mockVddkDataSink) ZeroRange(offset int64, length int64) error { buf := bytes.Repeat([]byte{0x00}, int(length)) - _, err := sink.Pwrite(buf, offset) + _, err := sink.Pwrite(buf, uint64(offset)) return err }