Send device list upon reconnecting to kubelet

When kubelet notifies the plugin about its restart by removing
the plugin's socket we do reconnect to kubelet, but we don't
send the current list of monitored devices to kubelet. As result
kubelet is not aware of discovered devices if it restarts.

Always send the current list of monitored devices to kubelet
upon ListAndWatch() request.
This commit is contained in:
Dmitry Rozhkov 2018-07-11 09:53:01 +03:00 committed by Dmitry Rozhkov
parent 7f83feaf99
commit 8f977b7782
2 changed files with 34 additions and 16 deletions

View File

@ -68,20 +68,32 @@ func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *plug
return new(pluginapi.DevicePluginOptions), nil return new(pluginapi.DevicePluginOptions), nil
} }
func (dm *deviceManager) sendDevices(stream pluginapi.DevicePlugin_ListAndWatchServer) error {
resp := new(pluginapi.ListAndWatchResponse)
for id, device := range dm.devices {
resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State})
}
glog.V(2).Info("Sending to kubelet ", resp.Devices)
if err := stream.Send(resp); err != nil {
dm.srv.Stop()
return fmt.Errorf("device-plugin: cannot update device list: %v", err)
}
return nil
}
// ListAndWatch returns a list of devices // ListAndWatch returns a list of devices
// Whenever a device state change or a device disappears, ListAndWatch returns the new list // Whenever a device state change or a device disappears, ListAndWatch returns the new list
func (dm *deviceManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error { func (dm *deviceManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
glog.V(2).Info("Started ListAndWatch for ", dm.fpgaID) glog.V(2).Info("Started ListAndWatch for ", dm.fpgaID)
if err := dm.sendDevices(stream); err != nil {
return err
}
for dm.devices = range dm.ch { for dm.devices = range dm.ch {
resp := new(pluginapi.ListAndWatchResponse) if err := dm.sendDevices(stream); err != nil {
for id, device := range dm.devices { return err
resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State})
}
glog.V(2).Info("Sending to kubelet ", resp.Devices)
if err := stream.Send(resp); err != nil {
dm.srv.Stop()
return fmt.Errorf("device-plugin: cannot update device list: %v", err)
} }
} }

View File

@ -30,12 +30,14 @@ import (
// Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer // Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer
type listAndWatchServerStub struct { type listAndWatchServerStub struct {
testDM *deviceManager testDM *deviceManager
generateErr bool generateErr int
sendCounter int
cdata chan []*pluginapi.Device cdata chan []*pluginapi.Device
} }
func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error { func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error {
if s.generateErr { s.sendCounter = s.sendCounter + 1
if s.generateErr == s.sendCounter {
fmt.Println("listAndWatchServerStub::Send returns error") fmt.Println("listAndWatchServerStub::Send returns error")
return fmt.Errorf("Fake error") return fmt.Errorf("Fake error")
} }
@ -82,11 +84,15 @@ func TestListAndWatch(t *testing.T) {
tcases := []struct { tcases := []struct {
name string name string
updates []map[string]deviceplugin.DeviceInfo updates []map[string]deviceplugin.DeviceInfo
expectedErr bool errorOnCall int
}{ }{
{ {
name: "No updates and close", name: "No updates and close",
}, },
{
name: "No updates and close, but expect streaming error",
errorOnCall: 1,
},
{ {
name: "Send 1 update", name: "Send 1 update",
updates: []map[string]deviceplugin.DeviceInfo{ updates: []map[string]deviceplugin.DeviceInfo{
@ -108,7 +114,7 @@ func TestListAndWatch(t *testing.T) {
}, },
}, },
}, },
expectedErr: true, errorOnCall: 2,
}, },
} }
@ -118,8 +124,8 @@ func TestListAndWatch(t *testing.T) {
server := &listAndWatchServerStub{ server := &listAndWatchServerStub{
testDM: testDM, testDM: testDM,
generateErr: tt.expectedErr, generateErr: tt.errorOnCall,
cdata: make(chan []*pluginapi.Device, len(tt.updates)), cdata: make(chan []*pluginapi.Device, len(tt.updates)+1),
} }
// push device infos to DM's channel // push device infos to DM's channel
@ -129,10 +135,10 @@ func TestListAndWatch(t *testing.T) {
close(devCh) close(devCh)
err := testDM.ListAndWatch(&pluginapi.Empty{}, server) err := testDM.ListAndWatch(&pluginapi.Empty{}, server)
if err != nil && !tt.expectedErr { if err != nil && tt.errorOnCall == 0 {
t.Errorf("Test case '%s': got unexpected error %v", tt.name, err) t.Errorf("Test case '%s': got unexpected error %v", tt.name, err)
} }
if err == nil && tt.expectedErr { if err == nil && tt.errorOnCall != 0 {
t.Errorf("Test case '%s': expected an error, but got nothing", tt.name) t.Errorf("Test case '%s': expected an error, but got nothing", tt.name)
} }
} }