From 80b7b035767eabbea402e46a662ee0501a235e72 Mon Sep 17 00:00:00 2001 From: Dmitry Rozhkov Date: Fri, 8 Jun 2018 14:57:12 +0300 Subject: [PATCH] fpga_plugin: refactor FPGA scans This refactoring brings in device Cache running in its own thread and scanning FPGA devices once every 5 secs. Then no timers are used inside ListAndWatch() method of device managers and no need to run scanning periodically in every device manager's thread. Cache generates update events and the plugin creates, updates or deletes device managers on the fly upon receiving the events. Introducing new modes of operations is a matter of adding a single function converting and filtering the content of Cache. --- cmd/fpga_plugin/devicecache/devicecache.go | 284 +++++++++++ .../devicecache/devicecache_test.go | 439 +++++++++++++++++ cmd/fpga_plugin/fpga_plugin.go | 247 +++------- cmd/fpga_plugin/fpga_plugin_test.go | 448 +++++------------- 4 files changed, 919 insertions(+), 499 deletions(-) create mode 100644 cmd/fpga_plugin/devicecache/devicecache.go create mode 100644 cmd/fpga_plugin/devicecache/devicecache_test.go diff --git a/cmd/fpga_plugin/devicecache/devicecache.go b/cmd/fpga_plugin/devicecache/devicecache.go new file mode 100644 index 00000000..8b2cc0ae --- /dev/null +++ b/cmd/fpga_plugin/devicecache/devicecache.go @@ -0,0 +1,284 @@ +// Copyright 2018 Intel Corporation. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package devicecache + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "reflect" + "regexp" + "strings" + "time" + + "github.com/golang/glog" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + + "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin" +) + +// Device Cache's mode of operation +const ( + AfMode = "af" + RegionMode = "region" +) + +const ( + sysfsDirectory = "/sys/class/fpga" + devfsDirectory = "/dev" + deviceRE = `^intel-fpga-dev.[0-9]+$` + portRE = `^intel-fpga-port.[0-9]+$` + fmeRE = `^intel-fpga-fme.[0-9]+$` +) + +// UpdateInfo contains info added, updated and deleted FPGA devices +type UpdateInfo struct { + Added map[string]map[string]deviceplugin.DeviceInfo + Updated map[string]map[string]deviceplugin.DeviceInfo + Removed map[string]map[string]deviceplugin.DeviceInfo +} + +type getDevMapFunc func(devices []device) map[string]map[string]deviceplugin.DeviceInfo + +func getRegionMap(devices []device) map[string]map[string]deviceplugin.DeviceInfo { + regionMap := make(map[string]map[string]deviceplugin.DeviceInfo) + + for _, dev := range devices { + for _, region := range dev.regions { + if _, present := regionMap[region.interfaceID]; !present { + regionMap[region.interfaceID] = make(map[string]deviceplugin.DeviceInfo) + } + devNodes := make([]string, len(region.afus)+1) + for num, afu := range region.afus { + devNodes[num] = afu.devNode + } + devNodes[len(region.afus)] = region.devNode + regionMap[region.interfaceID][region.id] = deviceplugin.DeviceInfo{ + State: pluginapi.Healthy, + Nodes: devNodes, + } + } + } + + return regionMap +} + +func getAfuMap(devices []device) map[string]map[string]deviceplugin.DeviceInfo { + afuMap := make(map[string]map[string]deviceplugin.DeviceInfo) + + for _, dev := range devices { + for _, region := range dev.regions { + for _, afu := range region.afus { + if _, present := afuMap[afu.afuID]; !present { + afuMap[afu.afuID] = make(map[string]deviceplugin.DeviceInfo) + } + afuMap[afu.afuID][afu.id] = deviceplugin.DeviceInfo{ + State: pluginapi.Healthy, + Nodes: []string{afu.devNode}, + } + } + } + } + + return afuMap +} + +type afu struct { + id string + afuID string + devNode string +} + +type region struct { + id string + interfaceID string + devNode string + afus []afu +} + +type device struct { + name string + regions []region +} + +// Cache represents FPGA devices found on the host +type Cache struct { + sysfsDir string + devfsDir string + + deviceReg *regexp.Regexp + portReg *regexp.Regexp + fmeReg *regexp.Regexp + + devices []device + ch chan<- UpdateInfo + getDevMap getDevMapFunc +} + +// NewCache returns new instance of Cache +func NewCache(sysfsDir string, devfsDir string, mode string, ch chan<- UpdateInfo) (*Cache, error) { + var getDevMap getDevMapFunc + + switch mode { + case AfMode: + getDevMap = getAfuMap + case RegionMode: + getDevMap = getRegionMap + default: + return nil, fmt.Errorf("Wrong mode: '%s'", mode) + } + + return &Cache{ + sysfsDir: sysfsDir, + devfsDir: devfsDir, + deviceReg: regexp.MustCompile(deviceRE), + portReg: regexp.MustCompile(portRE), + fmeReg: regexp.MustCompile(fmeRE), + ch: ch, + getDevMap: getDevMap, + }, nil +} + +func (c *Cache) getDevNode(devName string) (string, error) { + devNode := path.Join(c.devfsDir, devName) + if _, err := os.Stat(devNode); err != nil { + return "", fmt.Errorf("Device %s doesn't exist: %v", devNode, err) + } + + return devNode, nil +} + +func (c *Cache) detectUpdates(devices []device) { + added := make(map[string]map[string]deviceplugin.DeviceInfo) + updated := make(map[string]map[string]deviceplugin.DeviceInfo) + + oldDevMap := c.getDevMap(c.devices) + + for fpgaID, new := range c.getDevMap(devices) { + if old, ok := oldDevMap[fpgaID]; ok { + if !reflect.DeepEqual(old, new) { + updated[fpgaID] = new + } + delete(oldDevMap, fpgaID) + } else { + added[fpgaID] = new + } + } + + if len(added) > 0 || len(updated) > 0 || len(oldDevMap) > 0 { + c.ch <- UpdateInfo{ + Added: added, + Updated: updated, + Removed: oldDevMap, + } + } +} + +func (c *Cache) scanFPGAs() error { + var devices []device + + glog.V(2).Info("Start new FPGA scan") + + fpgaFiles, err := ioutil.ReadDir(c.sysfsDir) + if err != nil { + return fmt.Errorf("Can't read sysfs folder: %v", err) + } + + for _, fpgaFile := range fpgaFiles { + fname := fpgaFile.Name() + + if !c.deviceReg.MatchString(fname) { + continue + } + + deviceFolder := path.Join(c.sysfsDir, fname) + deviceFiles, err := ioutil.ReadDir(deviceFolder) + if err != nil { + return err + } + + var regions []region + var afus []afu + for _, deviceFile := range deviceFiles { + name := deviceFile.Name() + + if c.fmeReg.MatchString(name) { + if len(regions) > 0 { + return fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname) + } + interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id") + data, err := ioutil.ReadFile(interfaceIDFile) + if err != nil { + return err + } + devNode, err := c.getDevNode(name) + if err != nil { + return err + } + regions = append(regions, region{ + id: name, + interfaceID: strings.TrimSpace(string(data)), + devNode: devNode, + }) + } else if c.portReg.MatchString(name) { + afuFile := path.Join(deviceFolder, name, "afu_id") + data, err := ioutil.ReadFile(afuFile) + if err != nil { + return err + } + devNode, err := c.getDevNode(name) + if err != nil { + return err + } + afus = append(afus, afu{ + id: name, + afuID: strings.TrimSpace(string(data)), + devNode: devNode, + }) + } + } + + if len(regions) == 0 { + return fmt.Errorf("No regions found for device %s", fname) + } + + // Currently only one region per device is supported. + regions[0].afus = afus + devices = append(devices, device{ + name: fname, + regions: regions, + }) + } + + c.detectUpdates(devices) + c.devices = devices + + return nil +} + +// Run starts scanning of FPGA devices on the host +func (c *Cache) Run() error { + for { + err := c.scanFPGAs() + if err != nil { + glog.Error("Device scan failed: ", err) + return fmt.Errorf("Device scan failed: %v", err) + } + + time.Sleep(5 * time.Second) + } +} diff --git a/cmd/fpga_plugin/devicecache/devicecache_test.go b/cmd/fpga_plugin/devicecache/devicecache_test.go new file mode 100644 index 00000000..8c06d0be --- /dev/null +++ b/cmd/fpga_plugin/devicecache/devicecache_test.go @@ -0,0 +1,439 @@ +// Copyright 2018 Intel Corporation. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package devicecache + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "reflect" + "strings" + "testing" + "time" + + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + + "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin" +) + +func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFiles map[string][]byte) error { + var err error + + for _, devfsdir := range devfsDirs { + err = os.MkdirAll(path.Join(devfs, devfsdir), 0755) + if err != nil { + return fmt.Errorf("Failed to create fake device directory: %v", err) + } + } + for _, sysfsdir := range sysfsDirs { + err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755) + if err != nil { + return fmt.Errorf("Failed to create fake device directory: %v", err) + } + } + for filename, body := range sysfsFiles { + err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644) + if err != nil { + return fmt.Errorf("Failed to create fake vendor file: %v", err) + } + } + + return nil +} + +func TestNewCache(t *testing.T) { + tcases := []struct { + mode string + expectedErr bool + }{ + { + mode: AfMode, + }, + { + mode: RegionMode, + }, + { + mode: "unparsable", + expectedErr: true, + }, + } + + for _, tcase := range tcases { + _, err := NewCache("", "", tcase.mode, nil) + if tcase.expectedErr && err == nil { + t.Error("No error generated when creating Cache with invalid parameters") + } + } +} + +// getDevices returns static list of device structs for testing purposes +func getDevices() []device { + return []device{ + { + name: "intel-fpga-dev.0", + regions: []region{ + { + id: "intel-fpga-fme.0", + interfaceID: "ce48969398f05f33946d560708be108a", + devNode: "/dev/intel-fpga-fme.0", + afus: []afu{ + { + id: "intel-fpga-port.0", + afuID: "d8424dc4a4a3c413f89e433683f9040b", + devNode: "/dev/intel-fpga-port.0", + }, + }, + }, + }, + }, + { + name: "intel-fpga-dev.1", + regions: []region{ + { + id: "intel-fpga-fme.1", + interfaceID: "ce48969398f05f33946d560708be108a", + devNode: "/dev/intel-fpga-fme.1", + afus: []afu{ + { + id: "intel-fpga-port.1", + afuID: "d8424dc4a4a3c413f89e433683f9040b", + devNode: "/dev/intel-fpga-port.1", + }, + }, + }, + }, + }, + } +} + +func TestGetRegionMap(t *testing.T) { + expected := map[string]map[string]deviceplugin.DeviceInfo{ + "ce48969398f05f33946d560708be108a": { + "intel-fpga-fme.0": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"}, + }, + "intel-fpga-fme.1": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"}, + }, + }, + } + + result := getRegionMap(getDevices()) + if !reflect.DeepEqual(result, expected) { + t.Error("Got unexpected result: ", result) + } +} + +func TestGetAfuMap(t *testing.T) { + expected := map[string]map[string]deviceplugin.DeviceInfo{ + "d8424dc4a4a3c413f89e433683f9040b": { + "intel-fpga-port.0": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0"}, + }, + "intel-fpga-port.1": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.1"}, + }, + }, + } + + result := getAfuMap(getDevices()) + if !reflect.DeepEqual(result, expected) { + t.Error("Got unexpected result: ", result) + } +} + +func getDevMapClosure(oldmap map[string]map[string]deviceplugin.DeviceInfo, newmap map[string]map[string]deviceplugin.DeviceInfo) getDevMapFunc { + var callnum int + + if oldmap == nil { + oldmap = make(map[string]map[string]deviceplugin.DeviceInfo) + } + if newmap == nil { + newmap = make(map[string]map[string]deviceplugin.DeviceInfo) + } + + return func(devices []device) map[string]map[string]deviceplugin.DeviceInfo { + defer func() { callnum = callnum + 1 }() + + if callnum%2 == 0 { + return oldmap + + } + return newmap + } +} + +func TestDetectUpdates(t *testing.T) { + tcases := []struct { + name string + expectedAdded int + expectedUpdated int + expectedRemoved int + oldmap map[string]map[string]deviceplugin.DeviceInfo + newmap map[string]map[string]deviceplugin.DeviceInfo + }{ + { + name: "No devices found", + }, + { + name: "Added 1 new device type", + newmap: map[string]map[string]deviceplugin.DeviceInfo{ + "fpgaID": { + "intel-fpga-port.0": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0"}, + }, + }, + }, + expectedAdded: 1, + }, + { + name: "Updated 1 new device type", + oldmap: map[string]map[string]deviceplugin.DeviceInfo{ + "fpgaID": { + "intel-fpga-port.0": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0"}, + }, + }, + }, + newmap: map[string]map[string]deviceplugin.DeviceInfo{ + "fpgaID": { + "intel-fpga-port.1": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.1"}, + }, + }, + }, + expectedUpdated: 1, + }, + { + name: "Removed 1 new device type", + oldmap: map[string]map[string]deviceplugin.DeviceInfo{ + "fpgaID": { + "intel-fpga-port.0": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0"}, + }, + }, + }, + expectedRemoved: 1, + }, + } + + for _, tcase := range tcases { + ch := make(chan UpdateInfo, 1) + cache, err := NewCache("", "", AfMode, ch) + if err != nil { + t.Fatal(err) + } + cache.getDevMap = getDevMapClosure(tcase.oldmap, tcase.newmap) + + cache.detectUpdates([]device{}) + + var update UpdateInfo + select { + case update = <-ch: + default: + } + + if tcase.expectedAdded != len(update.Added) { + t.Errorf("Test case '%s': expected %d added device types, but got %d", tcase.name, tcase.expectedAdded, len(update.Added)) + } + if tcase.expectedUpdated != len(update.Updated) { + t.Errorf("Test case '%s': expected %d updated device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated)) + } + if tcase.expectedRemoved != len(update.Removed) { + t.Errorf("Test case '%s': expected %d removed device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated)) + } + } +} + +func TestScanFPGAs(t *testing.T) { + tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestDiscoverFPGAs-%d", time.Now().Unix()) + sysfs := path.Join(tmpdir, "sys", "class", "fpga") + devfs := path.Join(tmpdir, "dev") + tcases := []struct { + name string + devfsdirs []string + sysfsdirs []string + sysfsfiles map[string][]byte + errorContains string + expectedDevices []device + }{ + { + name: "No sysfs folder given", + errorContains: "Can't read sysfs folder", + }, + { + name: "FPGA device without FME and ports", + sysfsdirs: []string{"intel-fpga-dev.0", "incorrect-file-name"}, + errorContains: "No regions found", + }, + { + name: "AFU without ID", + sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"}, + errorContains: "afu_id: no such file or directory", + }, + { + name: "No device node for detected AFU", + sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"}, + sysfsfiles: map[string][]byte{ + "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), + }, + errorContains: "/dev/intel-fpga-port.0: no such file or directory", + }, + { + name: "AFU without corresponding FME", + sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"}, + devfsdirs: []string{"intel-fpga-port.0"}, + sysfsfiles: map[string][]byte{ + "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), + }, + errorContains: "No regions found", + }, + { + name: "More than one FME per FPGA device", + sysfsdirs: []string{ + "intel-fpga-dev.0/intel-fpga-fme.0/pr", + "intel-fpga-dev.0/intel-fpga-fme.1/pr", + }, + sysfsfiles: map[string][]byte{ + "intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), + "intel-fpga-dev.0/intel-fpga-fme.1/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), + }, + devfsdirs: []string{ + "intel-fpga-fme.0", + "intel-fpga-fme.1", + }, + errorContains: "Detected more than one FPGA region", + }, + { + name: "FME without interface ID", + sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0"}, + errorContains: "interface_id: no such file or directory", + }, + { + name: "No device node for detected region", + sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0/pr"}, + sysfsfiles: map[string][]byte{ + "intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), + }, + errorContains: "/dev/intel-fpga-fme.0: no such file or directory", + }, + { + name: "No errors expected", + sysfsdirs: []string{ + "intel-fpga-dev.0/intel-fpga-port.0", + "intel-fpga-dev.0/intel-fpga-fme.0/pr", + "intel-fpga-dev.1/intel-fpga-port.1", + "intel-fpga-dev.1/intel-fpga-fme.1/pr", + }, + sysfsfiles: map[string][]byte{ + "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), + "intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), + "intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"), + "intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"), + }, + devfsdirs: []string{ + "intel-fpga-port.0", "intel-fpga-fme.0", + "intel-fpga-port.1", "intel-fpga-fme.1", + }, + expectedDevices: []device{ + { + name: "intel-fpga-dev.0", + regions: []region{ + { + id: "intel-fpga-fme.0", + interfaceID: "ce48969398f05f33946d560708be108a", + devNode: path.Join(devfs, "intel-fpga-fme.0"), + afus: []afu{ + { + id: "intel-fpga-port.0", + afuID: "d8424dc4a4a3c413f89e433683f9040b", + devNode: path.Join(devfs, "intel-fpga-port.0"), + }, + }, + }, + }, + }, + { + name: "intel-fpga-dev.1", + regions: []region{ + { + id: "intel-fpga-fme.1", + interfaceID: "ce48969398f05f33946d560708be108a", + devNode: path.Join(devfs, "intel-fpga-fme.1"), + afus: []afu{ + { + id: "intel-fpga-port.1", + afuID: "d8424dc4a4a3c413f89e433683f9040b", + devNode: path.Join(devfs, "intel-fpga-port.1"), + }, + }, + }, + }, + }, + }, + }, + } + + for _, tcase := range tcases { + err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles) + if err != nil { + t.Fatal(err) + } + + cache, err := NewCache(sysfs, devfs, AfMode, nil) + if err != nil { + t.Fatal(err) + } + cache.getDevMap = func(devices []device) map[string]map[string]deviceplugin.DeviceInfo { + return make(map[string]map[string]deviceplugin.DeviceInfo) + } + + err = cache.scanFPGAs() + if tcase.errorContains != "" { + if err == nil || !strings.Contains(err.Error(), tcase.errorContains) { + t.Errorf("Test case '%s': expected error '%s', but got '%v'", tcase.name, tcase.errorContains, err) + } + } else if err != nil { + t.Errorf("Test case '%s': expected no error, but got '%v'", tcase.name, err) + } + if tcase.expectedDevices != nil && !reflect.DeepEqual(tcase.expectedDevices, cache.devices) { + t.Errorf("Test case '%s': expected devices '%v', but got '%v'", tcase.name, tcase.expectedDevices, cache.devices) + } + + err = os.RemoveAll(tmpdir) + if err != nil { + t.Fatal(err) + } + } +} + +func TestRun(t *testing.T) { + cache := Cache{ + sysfsDir: "/dev/null", + } + err := cache.Run() + if err == nil { + t.Error("Expected error, but got nil") + } +} diff --git a/cmd/fpga_plugin/fpga_plugin.go b/cmd/fpga_plugin/fpga_plugin.go index 7640bfec..92542a3b 100644 --- a/cmd/fpga_plugin/fpga_plugin.go +++ b/cmd/fpga_plugin/fpga_plugin.go @@ -17,157 +17,44 @@ package main import ( "flag" "fmt" - "io/ioutil" "os" - "path" - "reflect" - "regexp" - "sort" - "strings" - "time" "github.com/golang/glog" "golang.org/x/net/context" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "github.com/intel/intel-device-plugins-for-kubernetes/cmd/fpga_plugin/devicecache" "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin" ) const ( sysfsDirectory = "/sys/class/fpga" devfsDirectory = "/dev" - deviceRE = `^intel-fpga-dev.[0-9]+$` - portRE = `^intel-fpga-port.[0-9]+$` - fmeRE = `^intel-fpga-fme.[0-9]+$` // Device plugin settings. pluginEndpointPrefix = "intel-fpga" resourceNamePrefix = "intel.com/fpga" ) -type pluginMode string - -const ( - afMode pluginMode = "af" - regionMode pluginMode = "region" -) - -func isValidPluginMode(m pluginMode) bool { - return m == afMode || m == regionMode -} - // deviceManager manages Intel FPGA devices. type deviceManager struct { srv deviceplugin.Server fpgaID string name string + ch chan map[string]deviceplugin.DeviceInfo devices map[string]deviceplugin.DeviceInfo - root string - mode pluginMode } -func newDeviceManager(resourceName string, fpgaID string, rootDir string, mode pluginMode) *deviceManager { +func newDeviceManager(resourceName string, fpgaID string, ch chan map[string]deviceplugin.DeviceInfo) *deviceManager { return &deviceManager{ fpgaID: fpgaID, name: resourceName, + ch: ch, devices: make(map[string]deviceplugin.DeviceInfo), - root: rootDir, - mode: mode, } } -// Discovers all FPGA devices available on the local node by walking `/sys/class/fpga` directory. -func discoverFPGAs(sysfsDir string, devfsDir string, mode pluginMode) (map[string]map[string]deviceplugin.DeviceInfo, error) { - deviceReg := regexp.MustCompile(deviceRE) - portReg := regexp.MustCompile(portRE) - fmeReg := regexp.MustCompile(fmeRE) - - result := make(map[string]map[string]deviceplugin.DeviceInfo) - - fpgaFiles, err := ioutil.ReadDir(sysfsDir) - if err != nil { - return nil, fmt.Errorf("Can't read sysfs folder: %v", err) - } - - for _, fpgaFile := range fpgaFiles { - fname := fpgaFile.Name() - if deviceReg.MatchString(fname) { - var interfaceID string - - deviceFolder := path.Join(sysfsDir, fname) - deviceFiles, err := ioutil.ReadDir(deviceFolder) - if err != nil { - return nil, err - } - fpgaNodes := make(map[string][]string) - - if mode == regionMode { - for _, deviceFile := range deviceFiles { - name := deviceFile.Name() - if fmeReg.MatchString(name) { - if len(interfaceID) > 0 { - return nil, fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname) - } - interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id") - data, err := ioutil.ReadFile(interfaceIDFile) - if err != nil { - return nil, err - } - interfaceID = fmt.Sprintf("%s-%s", mode, strings.TrimSpace(string(data))) - fpgaNodes[interfaceID] = append(fpgaNodes[interfaceID], name) - } - } - } - - for _, deviceFile := range deviceFiles { - name := deviceFile.Name() - if portReg.MatchString(name) { - switch mode { - case regionMode: - if len(interfaceID) == 0 { - return nil, fmt.Errorf("No FPGA region found for %s", fname) - } - fpgaNodes[interfaceID] = append(fpgaNodes[interfaceID], name) - case afMode: - afuFile := path.Join(deviceFolder, name, "afu_id") - data, err := ioutil.ReadFile(afuFile) - if err != nil { - return nil, err - } - afuID := fmt.Sprintf("%s-%s", mode, strings.TrimSpace(string(data))) - fpgaNodes[afuID] = append(fpgaNodes[afuID], name) - default: - glog.Fatal("Unsupported mode") - } - } - } - if len(fpgaNodes) == 0 { - return nil, fmt.Errorf("No device nodes found for %s", fname) - } - for fpgaID, nodes := range fpgaNodes { - var devNodes []string - for _, node := range nodes { - devNode := path.Join(devfsDir, node) - if _, err := os.Stat(devNode); err != nil { - return nil, fmt.Errorf("Device %s doesn't exist: %+v", devNode, err) - } - devNodes = append(devNodes, devNode) - } - sort.Strings(devNodes) - if _, ok := result[fpgaID]; !ok { - result[fpgaID] = make(map[string]deviceplugin.DeviceInfo) - } - result[fpgaID][fname] = deviceplugin.DeviceInfo{ - State: pluginapi.Healthy, - Nodes: devNodes, - } - } - } - } - return result, nil -} - func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { fmt.Println("GetDevicePluginOptions: return empty options") return new(pluginapi.DevicePluginOptions), nil @@ -176,37 +63,21 @@ func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *plug // ListAndWatch returns a list of devices // Whenever a device state change or a device disappears, ListAndWatch returns the new list func (dm *deviceManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error { - sysfsDir := path.Join(dm.root, sysfsDirectory) - devfsDir := path.Join(dm.root, devfsDirectory) - for { - devs, err := discoverFPGAs(sysfsDir, devfsDir, dm.mode) - if err != nil { + glog.V(2).Info("Started ListAndWatch for ", dm.fpgaID) + + for dm.devices = range dm.ch { + resp := new(pluginapi.ListAndWatchResponse) + for id, device := range dm.devices { + resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State}) + } + glog.V(2).Info("Sending to kubelet ", resp.Devices) + if err := stream.Send(resp); err != nil { dm.srv.Stop() - return fmt.Errorf("Device discovery failed: %+v", err) + return fmt.Errorf("device-plugin: cannot update device list: %v", err) } - devinfos, ok := devs[dm.fpgaID] - if !ok { - dm.srv.Stop() - return fmt.Errorf("AFU id %s disappeared", dm.fpgaID) - } - if !reflect.DeepEqual(dm.devices, devinfos) { - dm.devices = devinfos - resp := new(pluginapi.ListAndWatchResponse) - var keys []string - for key := range dm.devices { - keys = append(keys, key) - } - sort.Strings(keys) - for _, id := range keys { - resp.Devices = append(resp.Devices, &pluginapi.Device{id, dm.devices[id].State}) - } - if err := stream.Send(resp); err != nil { - dm.srv.Stop() - return fmt.Errorf("device-plugin: cannot update device list: %v", err) - } - } - time.Sleep(5 * time.Second) } + + return nil } func (dm *deviceManager) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) { @@ -218,42 +89,58 @@ func (dm *deviceManager) PreStartContainer(ctx context.Context, rqt *pluginapi.P return new(pluginapi.PreStartContainerResponse), nil } -func main() { - var modeStr string - - flag.StringVar(&modeStr, "mode", string(afMode), fmt.Sprintf("device plugin mode: '%s' (default) or '%s'", afMode, regionMode)) - flag.Parse() - - mode := pluginMode(modeStr) - if !isValidPluginMode(mode) { - glog.Error("Wrong mode: ", modeStr) - os.Exit(1) - } - - fmt.Println("FPGA device plugin started in", mode, "mode") - - devs, err := discoverFPGAs(sysfsDirectory, devfsDirectory, mode) - if err != nil { - glog.Fatalf("Device discovery failed: %+v", err) - } - - if len(devs) == 0 { - glog.Error("No devices found. Waiting indefinitely.") - select {} - } - - ch := make(chan error) - for fpgaID := range devs { - resourceName := resourceNamePrefix + "-" + fpgaID - pPrefix := pluginEndpointPrefix + "-" + fpgaID - dm := newDeviceManager(resourceName, fpgaID, "/", mode) - - go func() { - ch <- dm.srv.Serve(dm, resourceName, pPrefix) - }() - } - err = <-ch +func startDeviceManager(dm *deviceManager, pluginPrefix string) { + err := dm.srv.Serve(dm, dm.name, pluginPrefix) if err != nil { glog.Fatal(err) } } + +func handleUpdate(dms map[string]*deviceManager, updateInfo devicecache.UpdateInfo, start func(*deviceManager, string)) { + glog.V(2).Info("Recieved dev updates: ", updateInfo) + for fpgaID, devices := range updateInfo.Added { + devCh := make(chan map[string]deviceplugin.DeviceInfo, 1) + resourceName := resourceNamePrefix + "-" + fpgaID + pPrefix := pluginEndpointPrefix + "-" + fpgaID + dms[fpgaID] = newDeviceManager(resourceName, fpgaID, devCh) + go start(dms[fpgaID], pPrefix) + devCh <- devices + } + for fpgaID, devices := range updateInfo.Updated { + dms[fpgaID].ch <- devices + } + for fpgaID := range updateInfo.Removed { + dms[fpgaID].srv.Stop() + close(dms[fpgaID].ch) + delete(dms, fpgaID) + } +} + +func main() { + var mode string + + flag.StringVar(&mode, "mode", string(devicecache.AfMode), fmt.Sprintf("device plugin mode: '%s' (default) or '%s'", devicecache.AfMode, devicecache.RegionMode)) + flag.Parse() + + updatesCh := make(chan devicecache.UpdateInfo) + + cache, err := devicecache.NewCache(sysfsDirectory, devfsDirectory, mode, updatesCh) + if err != nil { + glog.Error(err) + os.Exit(1) + } + + glog.Info("FPGA device plugin started in ", mode, " mode") + + go func() { + err := cache.Run() + if err != nil { + glog.Fatal(err) + } + }() + + dms := make(map[string]*deviceManager) + for updateInfo := range updatesCh { + handleUpdate(dms, updateInfo, startDeviceManager) + } +} diff --git a/cmd/fpga_plugin/fpga_plugin_test.go b/cmd/fpga_plugin/fpga_plugin_test.go index 4fc14869..c31f91ed 100644 --- a/cmd/fpga_plugin/fpga_plugin_test.go +++ b/cmd/fpga_plugin/fpga_plugin_test.go @@ -16,241 +16,30 @@ package main import ( "fmt" - "io/ioutil" - "os" - "path" - "reflect" "testing" - "time" "golang.org/x/net/context" "google.golang.org/grpc/metadata" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + "github.com/intel/intel-device-plugins-for-kubernetes/cmd/fpga_plugin/devicecache" "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin" ) -func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFiles map[string][]byte) error { - var err error - - for _, devfsdir := range devfsDirs { - err = os.MkdirAll(path.Join(devfs, devfsdir), 0755) - if err != nil { - return fmt.Errorf("Failed to create fake device directory: %+v", err) - } - } - for _, sysfsdir := range sysfsDirs { - err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755) - if err != nil { - return fmt.Errorf("Failed to create fake device directory: %+v", err) - } - } - for filename, body := range sysfsFiles { - err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644) - if err != nil { - return fmt.Errorf("Failed to create fake vendor file: %+v", err) - } - } - - return nil -} - -func TestDiscoverFPGAs(t *testing.T) { - tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestDiscoverFPGAs-%d", time.Now().Unix()) - sysfs := path.Join(tmpdir, "sys", "class", "fpga") - devfs := path.Join(tmpdir, "dev") - tcases := []struct { - devfsdirs []string - sysfsdirs []string - sysfsfiles map[string][]byte - expectedResult map[string]map[string]deviceplugin.DeviceInfo - expectedErr bool - mode pluginMode - }{ - { - expectedResult: nil, - expectedErr: true, - mode: afMode, - }, - { - sysfsdirs: []string{"intel-fpga-dev.0"}, - expectedResult: nil, - expectedErr: true, - mode: afMode, - }, - { - sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"}, - expectedResult: nil, - expectedErr: true, - mode: afMode, - }, - { - sysfsdirs: []string{ - "intel-fpga-dev.0/intel-fpga-port.0", - }, - sysfsfiles: map[string][]byte{ - "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), - }, - expectedResult: nil, - expectedErr: true, - mode: afMode, - }, - { - sysfsdirs: []string{ - "intel-fpga-dev.0/intel-fpga-port.0", - "intel-fpga-dev.0/intel-fpga-fme.0", - "intel-fpga-dev.1/intel-fpga-port.1", - "intel-fpga-dev.1/intel-fpga-fme.1", - "intel-fpga-dev.2/intel-fpga-port.2", - "intel-fpga-dev.2/intel-fpga-fme.2", - }, - sysfsfiles: map[string][]byte{ - "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), - "intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), - "intel-fpga-dev.2/intel-fpga-port.2/afu_id": []byte("47595d0fae972fbed0c51b4a41c7a349\n"), - }, - devfsdirs: []string{ - "intel-fpga-port.0", "intel-fpga-fme.0", - "intel-fpga-port.1", "intel-fpga-fme.1", - "intel-fpga-port.2", "intel-fpga-fme.2", - }, - expectedResult: map[string]map[string]deviceplugin.DeviceInfo{ - fmt.Sprintf("%s-d8424dc4a4a3c413f89e433683f9040b", afMode): { - "intel-fpga-dev.0": { - State: "Healthy", - Nodes: []string{ - path.Join(tmpdir, "/dev/intel-fpga-port.0"), - }, - }, - "intel-fpga-dev.1": { - State: "Healthy", - Nodes: []string{ - path.Join(tmpdir, "/dev/intel-fpga-port.1"), - }, - }, - }, - fmt.Sprintf("%s-47595d0fae972fbed0c51b4a41c7a349", afMode): { - "intel-fpga-dev.2": { - State: "Healthy", - Nodes: []string{ - path.Join(tmpdir, "/dev/intel-fpga-port.2"), - }, - }, - }, - }, - expectedErr: false, - mode: afMode, - }, - { - sysfsdirs: []string{ - "intel-fpga-dev.0/intel-fpga-fme.0/pr", - "intel-fpga-dev.0/intel-fpga-fme.1/pr", - }, - sysfsfiles: map[string][]byte{ - "intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), - "intel-fpga-dev.0/intel-fpga-fme.1/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), - }, - expectedResult: nil, - expectedErr: true, - mode: regionMode, - }, - { - sysfsdirs: []string{ - "intel-fpga-dev.0/intel-fpga-port.0", - "intel-fpga-dev.1/intel-fpga-port.1", - }, - expectedResult: nil, - expectedErr: true, - mode: regionMode, - }, - { - sysfsdirs: []string{ - "intel-fpga-dev.0/intel-fpga-port.0", - "intel-fpga-dev.0/intel-fpga-fme.0/pr", - "intel-fpga-dev.1/intel-fpga-port.1", - "intel-fpga-dev.1/intel-fpga-fme.1/pr", - "intel-fpga-dev.2/intel-fpga-port.2", - "intel-fpga-dev.2/intel-fpga-fme.2/pr", - }, - sysfsfiles: map[string][]byte{ - "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), - "intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"), - "intel-fpga-dev.2/intel-fpga-port.2/afu_id": []byte("47595d0fae972fbed0c51b4a41c7a349\n"), - "intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"), - "intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"), - "intel-fpga-dev.2/intel-fpga-fme.2/pr/interface_id": []byte("fd967345645f05f338462a0748be0091\n"), - }, - devfsdirs: []string{ - "intel-fpga-port.0", "intel-fpga-fme.0", - "intel-fpga-port.1", "intel-fpga-fme.1", - "intel-fpga-port.2", "intel-fpga-fme.2", - }, - expectedResult: map[string]map[string]deviceplugin.DeviceInfo{ - fmt.Sprintf("%s-ce48969398f05f33946d560708be108a", regionMode): { - "intel-fpga-dev.0": { - State: "Healthy", - Nodes: []string{ - path.Join(tmpdir, "/dev/intel-fpga-fme.0"), - path.Join(tmpdir, "/dev/intel-fpga-port.0"), - }, - }, - "intel-fpga-dev.1": { - State: "Healthy", - Nodes: []string{ - path.Join(tmpdir, "/dev/intel-fpga-fme.1"), - path.Join(tmpdir, "/dev/intel-fpga-port.1"), - }, - }, - }, - fmt.Sprintf("%s-fd967345645f05f338462a0748be0091", regionMode): { - "intel-fpga-dev.2": { - State: "Healthy", - Nodes: []string{ - path.Join(tmpdir, "/dev/intel-fpga-fme.2"), - path.Join(tmpdir, "/dev/intel-fpga-port.2"), - }, - }, - }, - }, - expectedErr: false, - mode: regionMode, - }, - } - - for _, tcase := range tcases { - err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles) - if err != nil { - t.Error(err) - } - - result, err := discoverFPGAs(sysfs, devfs, tcase.mode) - if tcase.expectedErr && err == nil { - t.Error("Expected error hasn't been triggered") - } - - if tcase.expectedResult != nil && !reflect.DeepEqual(result, tcase.expectedResult) { - t.Logf("Expected result: %+v", tcase.expectedResult) - t.Logf("Actual result: %+v", result) - t.Error("Got unexpeced result") - } - - err = os.RemoveAll(tmpdir) - if err != nil { - t.Fatalf("Failed to remove fake device directory: %+v", err) - } - } -} - // Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer type listAndWatchServerStub struct { - testDM *deviceManager - t *testing.T - cdata chan []*pluginapi.Device - cerr chan error + testDM *deviceManager + generateErr bool + cdata chan []*pluginapi.Device } func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error { + if s.generateErr { + fmt.Println("listAndWatchServerStub::Send returns error") + return fmt.Errorf("Fake error") + } + fmt.Println("listAndWatchServerStub::Send", resp.Devices) s.cdata <- resp.Devices return nil @@ -279,114 +68,78 @@ func (s *listAndWatchServerStub) SetHeader(m metadata.MD) error { func (s *listAndWatchServerStub) SetTrailer(m metadata.MD) { } -func TestListAndWatch(t *testing.T) { - afuID := "d8424dc4a4a3c413f89e433683f9040b" - tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestListAndWatch-%d", time.Now().Unix()) - sysfs := path.Join(tmpdir, "sys", "class", "fpga") - devfs := path.Join(tmpdir, "dev") +func TestGetDevicePluginOptions(t *testing.T) { + dm := &deviceManager{} + dm.GetDevicePluginOptions(nil, nil) +} +func TestPreStartContainer(t *testing.T) { + dm := &deviceManager{} + dm.PreStartContainer(nil, nil) +} + +func TestListAndWatch(t *testing.T) { tcases := []struct { - devfsdirs []string - sysfsdirs []string - sysfsfiles map[string][]byte - expectedResult []*pluginapi.Device - expectedErr bool + name string + updates []map[string]deviceplugin.DeviceInfo + expectedErr bool }{ { - expectedResult: nil, - expectedErr: true, + name: "No updates and close", }, { - sysfsdirs: []string{ - "intel-fpga-dev.0/intel-fpga-port.0", + name: "Send 1 update", + updates: []map[string]deviceplugin.DeviceInfo{ + { + "fake_id": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0"}, + }, + }, }, - sysfsfiles: map[string][]byte{ - "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n"), - }, - devfsdirs: []string{ - "intel-fpga-port.0", "intel-fpga-fme.0", - }, - expectedResult: nil, - expectedErr: true, }, { - sysfsdirs: []string{ - "intel-fpga-dev.0/intel-fpga-port.0", - "intel-fpga-dev.0/intel-fpga-fme.0", - "intel-fpga-dev.1/intel-fpga-port.1", - "intel-fpga-dev.1/intel-fpga-fme.1", - "intel-fpga-dev.2/intel-fpga-port.2", - "intel-fpga-dev.2/intel-fpga-fme.2", + name: "Send 1 update, but expect streaming error", + updates: []map[string]deviceplugin.DeviceInfo{ + { + "fake_id": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0"}, + }, + }, }, - sysfsfiles: map[string][]byte{ - "intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte(afuID), - "intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte(afuID), - "intel-fpga-dev.2/intel-fpga-port.2/afu_id": []byte("47595d0fae972fbed0c51b4a41c7a349\n"), - }, - devfsdirs: []string{ - "intel-fpga-port.0", "intel-fpga-fme.0", - "intel-fpga-port.1", "intel-fpga-fme.1", - "intel-fpga-port.2", "intel-fpga-fme.2", - }, - expectedResult: []*pluginapi.Device{ - {"intel-fpga-dev.0", "Healthy"}, - {"intel-fpga-dev.1", "Healthy"}, - }, - expectedErr: false, + expectedErr: true, }, } - resourceName := fmt.Sprintf("%s%s-%s", resourceNamePrefix, afMode, afuID) - testDM := newDeviceManager(resourceName, fmt.Sprintf("%s-%s", afMode, afuID), tmpdir, afMode) - if testDM == nil { - t.Fatal("Failed to create a deviceManager") - } + for _, tt := range tcases { + devCh := make(chan map[string]deviceplugin.DeviceInfo, len(tt.updates)) + testDM := newDeviceManager("intel.com/fpgatest-fpgaID", "fpgaID", devCh) - server := &listAndWatchServerStub{ - testDM: testDM, - t: t, - cdata: make(chan []*pluginapi.Device), - cerr: make(chan error), - } - - for _, tcase := range tcases { - err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles) - if err != nil { - t.Error(err) + server := &listAndWatchServerStub{ + testDM: testDM, + generateErr: tt.expectedErr, + cdata: make(chan []*pluginapi.Device, len(tt.updates)), } - go func() { - err = testDM.ListAndWatch(&pluginapi.Empty{}, server) - if err != nil { - server.cerr <- err - } - }() - - select { - case result := <-server.cdata: - if tcase.expectedErr { - t.Error("Expected error hasn't been triggered") - } else if tcase.expectedResult != nil && !reflect.DeepEqual(result, tcase.expectedResult) { - t.Logf("Expected result: %+v", tcase.expectedResult) - t.Logf("Actual result: %+v", result) - t.Error("Got unexpeced result") - } - testDM.srv.Stop() - case err = <-server.cerr: - if !tcase.expectedErr { - t.Errorf("Unexpected error has been triggered: %+v", err) - } + // push device infos to DM's channel + for _, update := range tt.updates { + devCh <- update } + close(devCh) - err = os.RemoveAll(tmpdir) - if err != nil { - t.Fatalf("Failed to remove fake device directory: %+v", err) + err := testDM.ListAndWatch(&pluginapi.Empty{}, server) + if err != nil && !tt.expectedErr { + t.Errorf("Test case '%s': got unexpected error %v", tt.name, err) + } + if err == nil && tt.expectedErr { + t.Errorf("Test case '%s': expected an error, but got nothing", tt.name) } } } func TestAllocate(t *testing.T) { - testDM := newDeviceManager("", "", "", afMode) + testDM := newDeviceManager("", "", nil) if testDM == nil { t.Fatal("Failed to create a deviceManager") } @@ -399,10 +152,13 @@ func TestAllocate(t *testing.T) { }, } - testDM.devices["dev1"] = deviceplugin.DeviceInfo{pluginapi.Healthy, []string{"/dev/dev1"}} + testDM.devices["dev1"] = deviceplugin.DeviceInfo{ + State: pluginapi.Healthy, + Nodes: []string{"/dev/dev1"}, + } resp, err := testDM.Allocate(nil, rqt) if err != nil { - t.Fatalf("Failed to allocate healthy device: %+v", err) + t.Fatalf("Failed to allocate healthy device: %v", err) } if len(resp.ContainerResponses[0].Devices) != 1 { @@ -410,28 +166,82 @@ func TestAllocate(t *testing.T) { } } -func TestIsValidPluginMode(t *testing.T) { +func startDeviceManagerStub(dm *deviceManager, pluginPrefix string) { +} + +func TestHandleUpdate(t *testing.T) { tcases := []struct { - input pluginMode - output bool + name string + dms map[string]*deviceManager + updateInfo devicecache.UpdateInfo + expectedDMs int }{ { - input: afMode, - output: true, + name: "Empty update", + updateInfo: devicecache.UpdateInfo{}, + expectedDMs: 0, }, { - input: regionMode, - output: true, + name: "Add device manager", + updateInfo: devicecache.UpdateInfo{ + Added: map[string]map[string]deviceplugin.DeviceInfo{ + "ce48969398f05f33946d560708be108a": { + "intel-fpga-fme.0": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"}, + }, + "intel-fpga-fme.1": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"}, + }, + }, + }, + }, + expectedDMs: 1, }, { - input: pluginMode("unparsable"), - output: false, + name: "Update existing device manager", + dms: map[string]*deviceManager{ + "ce48969398f05f33946d560708be108a": &deviceManager{ + ch: make(chan map[string]deviceplugin.DeviceInfo, 1), + }, + }, + updateInfo: devicecache.UpdateInfo{ + Updated: map[string]map[string]deviceplugin.DeviceInfo{ + "ce48969398f05f33946d560708be108a": { + "intel-fpga-fme.1": { + State: pluginapi.Healthy, + Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"}, + }, + }, + }, + }, + expectedDMs: 1, + }, + { + name: "Remove device manager", + dms: map[string]*deviceManager{ + "ce48969398f05f33946d560708be108a": &deviceManager{ + ch: make(chan map[string]deviceplugin.DeviceInfo, 1), + }, + }, + updateInfo: devicecache.UpdateInfo{ + Removed: map[string]map[string]deviceplugin.DeviceInfo{ + "ce48969398f05f33946d560708be108a": {}, + }, + }, + expectedDMs: 0, }, } - for _, tcase := range tcases { - if isValidPluginMode(tcase.input) != tcase.output { - t.Error("Wrong output", tcase.output, "for the given input", tcase.input) + for _, tt := range tcases { + if tt.dms == nil { + tt.dms = make(map[string]*deviceManager) + } + handleUpdate(tt.dms, tt.updateInfo, startDeviceManagerStub) + if len(tt.dms) != tt.expectedDMs { + t.Errorf("Test case '%s': expected %d runnig device managers, but got %d", + tt.name, tt.expectedDMs, len(tt.dms)) } } }