VDDK: Fix NBD status coalescing for large blocks. (#3242)

* Mock NBD functions in VDDK unit tests.

Also add some example tests for GetBlockStatus.

Signed-off-by: Matthew Arnold <marnold@redhat.com>

* Avoid infinite loop by returning whole block.

Also add a unit test to trigger this code. Without the fix, this spins
"No new block status data" messages forever, as reported in the bug.
With the fix, this continues the data transfer without zero-range or
hole-punch optimizations.

Signed-off-by: Matthew Arnold <marnold@redhat.com>

* Correctly extend merged block length.

Bring in the previously proposed fix to make sure merged blocks have
correct lengths, avoiding the initial issue.

Signed-off-by: Matthew Arnold <marnold@redhat.com>

* Add two more GetBlockStatus unit tests.

Hopefully this makes it more obvious what GetBlockStatus actually does.

Signed-off-by: Matthew Arnold <marnold@redhat.com>

* Add unit tests for larger block sizes.

The first test intentionally overflows the result block's length field
and causes an infinite loop in GetBlockStatus, as pointed out in the
problem report. This will be fixed in the next commit.

Signed-off-by: Matthew Arnold <marnold@redhat.com>

* Avoid integer overflow and sort out casts.

Increase the size of BlockStatusData.Length to an int64, to avoid
overflow in GetBlockStatus. Also change BlockStatusData.Offset to an
int64 and remove a handful of unnerving integer conversions. Favor int64
over uint64 to match system libraries, and add a few necessary
conversions mostly isolated to libnbd interfaces.

Signed-off-by: Matthew Arnold <marnold@redhat.com>

* Fix alignment for multi-stage VDDK documentation.

Signed-off-by: Matthew Arnold <marnold@redhat.com>

---------

Signed-off-by: Matthew Arnold <marnold@redhat.com>
This commit is contained in:
Matthew Arnold 2024-05-10 20:52:25 -04:00 committed by GitHub
parent 3053513b10
commit 6f7809e7a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 312 additions and 60 deletions

View File

@ -338,12 +338,12 @@ spec:
uuid: "52260566-b032-36cb-55b1-79bf29e30490"
thumbprint: "20:6C:8A:5D:44:40:B3:79:4B:28:EA:76:13:60:90:6E:49:D9:D9:A3" # SSL fingerprint of vCenter/ESX host
secretRef: "vddk-credentials"
finalCheckpoint: true
checkpoints:
- current: "snapshot-1"
previous: ""
- current: "snapshot-2"
previous: "snapshot-1"
finalCheckpoint: true
checkpoints:
- current: "snapshot-1"
previous: ""
- current: "snapshot-2"
previous: "snapshot-1"
pvc:
accessModes:
- ReadWriteOnce

View File

@ -26,6 +26,7 @@ import (
"context"
"errors"
"fmt"
"math"
"net/url"
"os"
"regexp"
@ -501,7 +502,7 @@ const MaxPreadLengthVC = (2 << 20)
// MaxPreadLength is the maxmimum read size to request from VMware. Default to
// the larger option, and reduce it in createVddkDataSource when connecting to
// vCenter endpoints.
var MaxPreadLength uint32 = MaxPreadLengthESX
var MaxPreadLength int = MaxPreadLengthESX
// NbdOperations provides a mockable interface for the things needed from libnbd.
type NbdOperations interface {
@ -513,8 +514,8 @@ type NbdOperations interface {
// BlockStatusData holds zero/hole status for one block of data
type BlockStatusData struct {
Offset uint64
Length uint32
Offset int64
Length int64
Flags uint32
}
@ -530,7 +531,13 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc
var blocks []*BlockStatusData
// Callback for libnbd.BlockStatus. Needs to modify blocks list above.
updateBlocksCallback := func(metacontext string, offset uint64, extents []uint32, err *int) int {
updateBlocksCallback := func(metacontext string, nbdOffset uint64, extents []uint32, err *int) int {
if nbdOffset > math.MaxInt64 {
klog.Errorf("Block status offset too big for conversion: 0x%x", nbdOffset)
return -2
}
offset := int64(nbdOffset)
if *err != 0 {
klog.Errorf("Block status callback error at offset %d: error code %d", offset, *err)
return *err
@ -544,17 +551,17 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc
return -1
}
for i := 0; i < len(extents); i += 2 {
length, flags := extents[i], extents[i+1]
length, flags := int64(extents[i]), extents[i+1]
if blocks != nil {
last := len(blocks) - 1
lastBlock := blocks[last]
lastFlags := lastBlock.Flags
lastOffset := lastBlock.Offset + uint64(lastBlock.Length)
lastOffset := lastBlock.Offset + lastBlock.Length
if lastFlags == flags && lastOffset == offset {
// Merge with previous block
blocks[last] = &BlockStatusData{
Offset: lastBlock.Offset,
Length: lastBlock.Length,
Length: lastBlock.Length + length,
Flags: lastFlags,
}
} else {
@ -563,15 +570,15 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc
} else {
blocks = append(blocks, &BlockStatusData{Offset: offset, Length: length, Flags: flags})
}
offset += uint64(length)
offset += length
}
return 0
}
if extent.Length < 1024*1024 {
blocks = append(blocks, &BlockStatusData{
Offset: uint64(extent.Start),
Length: uint32(extent.Length),
Offset: extent.Start,
Length: extent.Length,
Flags: 0})
return blocks
}
@ -579,30 +586,34 @@ func GetBlockStatus(handle NbdOperations, extent types.DiskChangeExtent) []*Bloc
lastOffset := extent.Start
endOffset := extent.Start + extent.Length
for lastOffset < endOffset {
var length uint64
var length int64
missingLength := endOffset - lastOffset
if missingLength > (MaxBlockStatusLength) {
length = (MaxBlockStatusLength)
length = MaxBlockStatusLength
} else {
length = uint64(missingLength)
length = missingLength
}
err := handle.BlockStatus(length, uint64(lastOffset), updateBlocksCallback, &fixedOptArgs)
if err != nil {
klog.Errorf("Error getting block status at offset %d, returning whole block instead. Error was: %v", lastOffset, err)
createWholeBlock := func() []*BlockStatusData {
block := &BlockStatusData{
Offset: uint64(extent.Start),
Length: uint32(extent.Length),
Offset: extent.Start,
Length: extent.Length,
Flags: 0,
}
blocks = []*BlockStatusData{block}
return blocks
}
last := len(blocks) - 1
newOffset := blocks[last].Offset + uint64(blocks[last].Length)
if uint64(lastOffset) == newOffset {
klog.Infof("No new block status data at offset %d", newOffset)
err := handle.BlockStatus(uint64(length), uint64(lastOffset), updateBlocksCallback, &fixedOptArgs)
if err != nil {
klog.Errorf("Error getting block status at offset %d, returning whole block instead. Error was: %v", lastOffset, err)
return createWholeBlock()
}
lastOffset = int64(newOffset)
last := len(blocks) - 1
newOffset := blocks[last].Offset + blocks[last].Length
if lastOffset == newOffset {
klog.Infof("No new block status data at offset %d, returning whole block.", newOffset)
return createWholeBlock()
}
lastOffset = newOffset
}
return blocks
@ -628,29 +639,29 @@ func CopyRange(handle NbdOperations, sink VDDKDataSink, block *BlockStatusData,
return err
}
buffer := bytes.Repeat([]byte{0}, int(MaxPreadLength))
count := uint32(0)
buffer := bytes.Repeat([]byte{0}, MaxPreadLength)
count := int64(0)
for count < block.Length {
if block.Length-count < MaxPreadLength {
if block.Length-count < int64(MaxPreadLength) {
buffer = bytes.Repeat([]byte{0}, int(block.Length-count))
}
length := len(buffer)
offset := block.Offset + uint64(count)
err := handle.Pread(buffer, offset, nil)
offset := block.Offset + count
err := handle.Pread(buffer, uint64(offset), nil)
if err != nil {
klog.Errorf("Error reading from source at offset %d: %v", offset, err)
return err
}
written, err := sink.Pwrite(buffer, offset)
written, err := sink.Pwrite(buffer, uint64(offset))
if err != nil {
klog.Errorf("Failed to write data block at offset %d to local file: %v", block.Offset, err)
return err
}
updateProgress(written)
count += uint32(length)
count += int64(length)
}
return nil
}
@ -661,7 +672,7 @@ func CopyRange(handle NbdOperations, sink VDDKDataSink, block *BlockStatusData,
type VDDKDataSink interface {
Pwrite(buf []byte, offset uint64) (int, error)
Write(buf []byte) (int, error)
ZeroRange(offset uint64, length uint32) error
ZeroRange(offset int64, length int64) error
Close()
}
@ -718,11 +729,11 @@ func (sink *VDDKFileSink) Write(buf []byte) (int, error) {
}
// ZeroRange fills the destination range with zero bytes
func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error {
punch := func(offset uint64, length uint32) error {
func (sink *VDDKFileSink) ZeroRange(offset int64, length int64) error {
punch := func(offset int64, length int64) error {
klog.Infof("Punching %d-byte hole at offset %d", length, offset)
flags := uint32(unix.FALLOC_FL_PUNCH_HOLE | unix.FALLOC_FL_KEEP_SIZE)
return syscall.Fallocate(int(sink.file.Fd()), flags, int64(offset), int64(length))
return syscall.Fallocate(int(sink.file.Fd()), flags, offset, length)
}
var err error
@ -734,8 +745,8 @@ func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error {
if err != nil {
klog.Errorf("Unable to stat destination file: %v", err)
} else { // Filesystem
if offset+uint64(length) > uint64(info.Size()) { // Truncate only if extending the file
err = syscall.Ftruncate(int(sink.file.Fd()), int64(offset+uint64(length)))
if offset+length > info.Size() { // Truncate only if extending the file
err = syscall.Ftruncate(int(sink.file.Fd()), offset+length)
} else { // Otherwise, try to punch a hole in the file
err = punch(offset, length)
}
@ -743,22 +754,22 @@ func (sink *VDDKFileSink) ZeroRange(offset uint64, length uint32) error {
}
if err != nil { // Fall back to regular pwrite
klog.Errorf("Unable to zero range %d - %d on destination, falling back to pwrite: %v", offset, offset+uint64(length), err)
klog.Errorf("Unable to zero range %d - %d on destination, falling back to pwrite: %v", offset, offset+length, err)
err = nil
count := uint32(0)
blocksize := uint32(16 << 20)
buffer := bytes.Repeat([]byte{0}, int(blocksize))
count := int64(0)
const blocksize = 16 << 20
buffer := bytes.Repeat([]byte{0}, blocksize)
for count < length {
remaining := length - count
if remaining < blocksize {
buffer = bytes.Repeat([]byte{0}, int(remaining))
}
written, err := sink.Pwrite(buffer, offset)
written, err := sink.Pwrite(buffer, uint64(offset))
if err != nil {
klog.Errorf("Unable to write %d zeroes at offset %d: %v", length, offset, err)
break
}
count += uint32(written)
count += int64(written)
}
}

View File

@ -60,6 +60,7 @@ var _ = Describe("VDDK data source", func() {
newTerminationChannel = createMockTerminationChannel
currentExport = defaultMockNbdExport()
currentVMwareFunctions = defaultMockVMwareFunctions()
currentMockNbdFunctions = defaultMockNbdFunctions()
})
AfterEach(func() {
@ -430,10 +431,10 @@ var _ = Describe("VDDK data source", func() {
}
_, err := NewVDDKDataSource("http://esx.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", diskName, "", "", "", v1.PersistentVolumeFilesystem)
Expect(err).ToNot(HaveOccurred())
Expect(MaxPreadLength).To(Equal(uint32(MaxPreadLengthESX)))
Expect(MaxPreadLength).To(Equal(MaxPreadLengthESX))
_, err = NewVDDKDataSource("http://vcenter.test", "user", "pass", "aa:bb:cc:dd", "1-2-3-4", diskName, "", "", "", v1.PersistentVolumeFilesystem)
Expect(err).ToNot(HaveOccurred())
Expect(MaxPreadLength).To(Equal(uint32(MaxPreadLengthVC)))
Expect(MaxPreadLength).To(Equal(MaxPreadLengthVC))
})
It("GetTerminationMessage should contain VDDK connection information", func() {
@ -482,26 +483,266 @@ var _ = Describe("VDDK log watcher", func() {
)
})
var _ = Describe("VDDK get block status", func() {
defaultMockNbd := defaultMockNbdFunctions()
BeforeEach(func() {
currentMockNbdFunctions = defaultMockNbdFunctions()
})
It("should not request block status for a range smaller than 1MB", func() {
nbdCalled := false
currentMockNbdFunctions.GetSize = func() (uint64, error) {
nbdCalled = true
return defaultMockNbd.GetSize()
}
currentMockNbdFunctions.Pread = func(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error {
nbdCalled = true
return defaultMockNbd.Pread(buf, offset, optargs)
}
currentMockNbdFunctions.Close = func() *libnbd.LibnbdError {
nbdCalled = true
return defaultMockNbd.Close()
}
currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error {
nbdCalled = true
return defaultMockNbd.BlockStatus(length, offset, callback, optargs)
}
extent := types.DiskChangeExtent{
Start: 0,
Length: 1024*1024 - 1,
}
expectedBlocks := []*BlockStatusData{
{
Offset: extent.Start,
Length: extent.Length,
Flags: 0,
},
}
blocks := GetBlockStatus(&mockNbdOperations{}, extent)
Expect(blocks).To(Equal(expectedBlocks))
Expect(nbdCalled).To(BeFalse())
})
It("should return the whole block when libnbd returns an error", func() {
nbdCalled := false
currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error {
nbdCalled = true
return errors.New("Block status failure")
}
extent := types.DiskChangeExtent{
Start: 0,
Length: 1024 * 1024,
}
expectedBlocks := []*BlockStatusData{
{
Offset: extent.Start,
Length: extent.Length,
Flags: 0,
},
}
blocks := GetBlockStatus(&mockNbdOperations{}, extent)
Expect(blocks).To(Equal(expectedBlocks))
Expect(nbdCalled).To(BeTrue())
})
It("should return the whole block when block list is unchanged after status request", func() {
// Default mock block status callback always returns the same extents,
// so force it to return multiple blocks in the list by asking for more
// than the maximum single block status length. This triggers the
// "No new block status data" codepath in GetBlockStatus.
extent := types.DiskChangeExtent{
Start: 0,
Length: MaxBlockStatusLength + 1024,
}
expectedBlocks := []*BlockStatusData{
{
Offset: extent.Start,
Length: extent.Length,
Flags: 0,
},
}
blocks := GetBlockStatus(&mockNbdOperations{}, extent)
Expect(blocks).To(Equal(expectedBlocks))
})
It("should chunk up block status requests when asking for more than the maximum status request length", func() {
timesCalled := 0
currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error {
if timesCalled == 0 {
Expect(offset).To(Equal(uint64(0)))
Expect(length).To(Equal(uint64(MaxBlockStatusLength)))
}
if timesCalled == 1 {
Expect(offset).To(Equal(uint64(MaxBlockStatusLength)))
Expect(length).To(Equal(uint64(1024)))
}
timesCalled++
err := 0
callback("base:allocation", offset, []uint32{uint32(length), 0}, &err)
return nil
}
extent := types.DiskChangeExtent{
Start: 0,
Length: MaxBlockStatusLength + 1024,
}
expectedBlocks := []*BlockStatusData{
{
Offset: extent.Start,
Length: extent.Length,
Flags: 0,
},
}
blocks := GetBlockStatus(&mockNbdOperations{}, extent)
Expect(blocks).To(Equal(expectedBlocks))
Expect(timesCalled).To(Equal(2))
})
It("should return multiple blocks when data source ranges have different flags", func() {
const blockSize = 1024 * 1024
currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error {
err := 0
if offset == 0 {
callback("base:allocation", offset, []uint32{blockSize, 0}, &err)
}
if offset == blockSize {
callback("base:allocation", offset, []uint32{blockSize, 3}, &err)
}
return nil
}
extent := types.DiskChangeExtent{
Start: 0,
Length: 2 * blockSize,
}
expectedBlocks := []*BlockStatusData{
{
Offset: 0,
Length: blockSize,
Flags: 0,
},
{
Offset: blockSize,
Length: blockSize,
Flags: 3,
},
}
blocks := GetBlockStatus(&mockNbdOperations{}, extent)
Expect(blocks).To(Equal(expectedBlocks))
})
It("should handle large blocks with the same flags", func() {
const blockSize = MaxBlockStatusLength * 2
extent := types.DiskChangeExtent{
Start: 0,
Length: blockSize,
}
expectedBlocks := []*BlockStatusData{
{
Offset: extent.Start,
Length: extent.Length,
Flags: 0,
},
}
blocks := GetBlockStatus(&mockNbdOperations{}, extent)
Expect(blocks).To(Equal(expectedBlocks))
})
It("should handle large blocks with different flags", func() {
const blockSize = 1024 * 1024 * 1024 * 1024 * 1024
// Alternate flags between blocks, so GetBlockStatus returns a long
// list of different blocks instead of one giant block.
flagSetting := true
currentMockNbdFunctions.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error {
err := 0
len := uint32(MaxBlockStatusLength)
if length < MaxBlockStatusLength {
len = uint32(length)
}
if flagSetting {
callback("base:allocation", offset, []uint32{len, 0}, &err)
} else {
callback("base:allocation", offset, []uint32{len, 3}, &err)
}
flagSetting = !flagSetting
return nil
}
extent := types.DiskChangeExtent{
Start: 0,
Length: blockSize,
}
blocks := GetBlockStatus(&mockNbdOperations{}, extent)
for index, block := range blocks {
Expect(block.Offset).To(Equal(int64(index * MaxBlockStatusLength)))
Expect(block.Length).To(Equal(int64(MaxBlockStatusLength)))
if index%2 == 0 {
Expect(block.Flags).To(Equal(uint32(0)))
} else {
Expect(block.Flags).To(Equal(uint32(3)))
}
}
})
})
type mockNbdFunctions struct {
GetSize func() (uint64, error)
Pread func(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error
Close func() *libnbd.LibnbdError
BlockStatus func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error
}
func defaultMockNbdFunctions() mockNbdFunctions {
ops := mockNbdFunctions{}
ops.GetSize = func() (uint64, error) {
return currentExport.Size()
}
ops.Pread = func(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error {
fakebuf, err := currentExport.Read(offset)
copy(buf, fakebuf[offset:offset+uint64(len(buf))])
return err
}
ops.Close = func() *libnbd.LibnbdError {
return nil
}
ops.BlockStatus = func(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error {
err := 0
callback("base:allocation", offset, []uint32{uint32(length), 0}, &err)
return nil
}
return ops
}
var currentMockNbdFunctions mockNbdFunctions
type mockNbdOperations struct{}
func (handle *mockNbdOperations) GetSize() (uint64, error) {
return currentExport.Size()
return currentMockNbdFunctions.GetSize()
}
func (handle *mockNbdOperations) Pread(buf []byte, offset uint64, optargs *libnbd.PreadOptargs) error {
fakebuf, err := currentExport.Read(offset)
copy(buf, fakebuf[offset:offset+uint64(len(buf))])
return err
return currentMockNbdFunctions.Pread(buf, offset, optargs)
}
func (handle *mockNbdOperations) Close() *libnbd.LibnbdError {
return nil
return currentMockNbdFunctions.Close()
}
func (handle *mockNbdOperations) BlockStatus(length uint64, offset uint64, callback libnbd.ExtentCallback, optargs *libnbd.BlockStatusOptargs) error {
err := 0
callback("base:allocation", offset, []uint32{uint32(length), 0}, &err)
return nil
return currentMockNbdFunctions.BlockStatus(length, offset, callback, optargs)
}
func createMockVddkDataSource(endpoint string, accessKey string, secKey string, thumbprint string, uuid string, backingFile string, currentCheckpoint string, previousCheckpoint string, finalCheckpoint string, volumeMode v1.PersistentVolumeMode) (*VDDKDataSource, error) {
@ -534,9 +775,9 @@ type mockVddkDataSink struct {
position int
}
func (sink *mockVddkDataSink) ZeroRange(offset uint64, length uint32) error {
func (sink *mockVddkDataSink) ZeroRange(offset int64, length int64) error {
buf := bytes.Repeat([]byte{0x00}, int(length))
_, err := sink.Pwrite(buf, offset)
_, err := sink.Pwrite(buf, uint64(offset))
return err
}