From 8f977b7782d1a46cf0aa15d3553dad1c46b4fe85 Mon Sep 17 00:00:00 2001 From: Dmitry Rozhkov Date: Wed, 11 Jul 2018 09:53:01 +0300 Subject: [PATCH] Send device list upon reconnecting to kubelet When kubelet notifies the plugin about its restart by removing the plugin's socket we do reconnect to kubelet, but we don't send the current list of monitored devices to kubelet. As result kubelet is not aware of discovered devices if it restarts. Always send the current list of monitored devices to kubelet upon ListAndWatch() request. --- cmd/fpga_plugin/fpga_plugin.go | 28 ++++++++++++++++++++-------- cmd/fpga_plugin/fpga_plugin_test.go | 22 ++++++++++++++-------- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/cmd/fpga_plugin/fpga_plugin.go b/cmd/fpga_plugin/fpga_plugin.go index 687bd076..4bfe8b37 100644 --- a/cmd/fpga_plugin/fpga_plugin.go +++ b/cmd/fpga_plugin/fpga_plugin.go @@ -68,20 +68,32 @@ func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *plug return new(pluginapi.DevicePluginOptions), nil } +func (dm *deviceManager) sendDevices(stream pluginapi.DevicePlugin_ListAndWatchServer) error { + resp := new(pluginapi.ListAndWatchResponse) + for id, device := range dm.devices { + resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State}) + } + glog.V(2).Info("Sending to kubelet ", resp.Devices) + if err := stream.Send(resp); err != nil { + dm.srv.Stop() + return fmt.Errorf("device-plugin: cannot update device list: %v", err) + } + + return nil +} + // ListAndWatch returns a list of devices // Whenever a device state change or a device disappears, ListAndWatch returns the new list func (dm *deviceManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error { glog.V(2).Info("Started ListAndWatch for ", dm.fpgaID) + if err := dm.sendDevices(stream); err != nil { + return err + } + for dm.devices = range dm.ch { - resp := new(pluginapi.ListAndWatchResponse) - for id, device := range dm.devices { - resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State}) - } - glog.V(2).Info("Sending to kubelet ", resp.Devices) - if err := stream.Send(resp); err != nil { - dm.srv.Stop() - return fmt.Errorf("device-plugin: cannot update device list: %v", err) + if err := dm.sendDevices(stream); err != nil { + return err } } diff --git a/cmd/fpga_plugin/fpga_plugin_test.go b/cmd/fpga_plugin/fpga_plugin_test.go index fc1120a0..bfdee389 100644 --- a/cmd/fpga_plugin/fpga_plugin_test.go +++ b/cmd/fpga_plugin/fpga_plugin_test.go @@ -30,12 +30,14 @@ import ( // Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer type listAndWatchServerStub struct { testDM *deviceManager - generateErr bool + generateErr int + sendCounter int cdata chan []*pluginapi.Device } func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error { - if s.generateErr { + s.sendCounter = s.sendCounter + 1 + if s.generateErr == s.sendCounter { fmt.Println("listAndWatchServerStub::Send returns error") return fmt.Errorf("Fake error") } @@ -82,11 +84,15 @@ func TestListAndWatch(t *testing.T) { tcases := []struct { name string updates []map[string]deviceplugin.DeviceInfo - expectedErr bool + errorOnCall int }{ { name: "No updates and close", }, + { + name: "No updates and close, but expect streaming error", + errorOnCall: 1, + }, { name: "Send 1 update", updates: []map[string]deviceplugin.DeviceInfo{ @@ -108,7 +114,7 @@ func TestListAndWatch(t *testing.T) { }, }, }, - expectedErr: true, + errorOnCall: 2, }, } @@ -118,8 +124,8 @@ func TestListAndWatch(t *testing.T) { server := &listAndWatchServerStub{ testDM: testDM, - generateErr: tt.expectedErr, - cdata: make(chan []*pluginapi.Device, len(tt.updates)), + generateErr: tt.errorOnCall, + cdata: make(chan []*pluginapi.Device, len(tt.updates)+1), } // push device infos to DM's channel @@ -129,10 +135,10 @@ func TestListAndWatch(t *testing.T) { close(devCh) err := testDM.ListAndWatch(&pluginapi.Empty{}, server) - if err != nil && !tt.expectedErr { + if err != nil && tt.errorOnCall == 0 { t.Errorf("Test case '%s': got unexpected error %v", tt.name, err) } - if err == nil && tt.expectedErr { + if err == nil && tt.errorOnCall != 0 { t.Errorf("Test case '%s': expected an error, but got nothing", tt.name) } }