Merge pull request #24 from rojkov/devicecache

fpga_plugin: refactor FPGA scans
This commit is contained in:
Ed Bartosh 2018-06-15 12:10:27 +03:00 committed by GitHub
commit 8f30aaa873
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 919 additions and 499 deletions

View File

@ -0,0 +1,284 @@
// Copyright 2018 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package devicecache
import (
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"regexp"
"strings"
"time"
"github.com/golang/glog"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
)
// Device Cache's mode of operation
const (
AfMode = "af"
RegionMode = "region"
)
const (
sysfsDirectory = "/sys/class/fpga"
devfsDirectory = "/dev"
deviceRE = `^intel-fpga-dev.[0-9]+$`
portRE = `^intel-fpga-port.[0-9]+$`
fmeRE = `^intel-fpga-fme.[0-9]+$`
)
// UpdateInfo contains info added, updated and deleted FPGA devices
type UpdateInfo struct {
Added map[string]map[string]deviceplugin.DeviceInfo
Updated map[string]map[string]deviceplugin.DeviceInfo
Removed map[string]map[string]deviceplugin.DeviceInfo
}
type getDevMapFunc func(devices []device) map[string]map[string]deviceplugin.DeviceInfo
func getRegionMap(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
regionMap := make(map[string]map[string]deviceplugin.DeviceInfo)
for _, dev := range devices {
for _, region := range dev.regions {
if _, present := regionMap[region.interfaceID]; !present {
regionMap[region.interfaceID] = make(map[string]deviceplugin.DeviceInfo)
}
devNodes := make([]string, len(region.afus)+1)
for num, afu := range region.afus {
devNodes[num] = afu.devNode
}
devNodes[len(region.afus)] = region.devNode
regionMap[region.interfaceID][region.id] = deviceplugin.DeviceInfo{
State: pluginapi.Healthy,
Nodes: devNodes,
}
}
}
return regionMap
}
func getAfuMap(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
afuMap := make(map[string]map[string]deviceplugin.DeviceInfo)
for _, dev := range devices {
for _, region := range dev.regions {
for _, afu := range region.afus {
if _, present := afuMap[afu.afuID]; !present {
afuMap[afu.afuID] = make(map[string]deviceplugin.DeviceInfo)
}
afuMap[afu.afuID][afu.id] = deviceplugin.DeviceInfo{
State: pluginapi.Healthy,
Nodes: []string{afu.devNode},
}
}
}
}
return afuMap
}
type afu struct {
id string
afuID string
devNode string
}
type region struct {
id string
interfaceID string
devNode string
afus []afu
}
type device struct {
name string
regions []region
}
// Cache represents FPGA devices found on the host
type Cache struct {
sysfsDir string
devfsDir string
deviceReg *regexp.Regexp
portReg *regexp.Regexp
fmeReg *regexp.Regexp
devices []device
ch chan<- UpdateInfo
getDevMap getDevMapFunc
}
// NewCache returns new instance of Cache
func NewCache(sysfsDir string, devfsDir string, mode string, ch chan<- UpdateInfo) (*Cache, error) {
var getDevMap getDevMapFunc
switch mode {
case AfMode:
getDevMap = getAfuMap
case RegionMode:
getDevMap = getRegionMap
default:
return nil, fmt.Errorf("Wrong mode: '%s'", mode)
}
return &Cache{
sysfsDir: sysfsDir,
devfsDir: devfsDir,
deviceReg: regexp.MustCompile(deviceRE),
portReg: regexp.MustCompile(portRE),
fmeReg: regexp.MustCompile(fmeRE),
ch: ch,
getDevMap: getDevMap,
}, nil
}
func (c *Cache) getDevNode(devName string) (string, error) {
devNode := path.Join(c.devfsDir, devName)
if _, err := os.Stat(devNode); err != nil {
return "", fmt.Errorf("Device %s doesn't exist: %v", devNode, err)
}
return devNode, nil
}
func (c *Cache) detectUpdates(devices []device) {
added := make(map[string]map[string]deviceplugin.DeviceInfo)
updated := make(map[string]map[string]deviceplugin.DeviceInfo)
oldDevMap := c.getDevMap(c.devices)
for fpgaID, new := range c.getDevMap(devices) {
if old, ok := oldDevMap[fpgaID]; ok {
if !reflect.DeepEqual(old, new) {
updated[fpgaID] = new
}
delete(oldDevMap, fpgaID)
} else {
added[fpgaID] = new
}
}
if len(added) > 0 || len(updated) > 0 || len(oldDevMap) > 0 {
c.ch <- UpdateInfo{
Added: added,
Updated: updated,
Removed: oldDevMap,
}
}
}
func (c *Cache) scanFPGAs() error {
var devices []device
glog.V(2).Info("Start new FPGA scan")
fpgaFiles, err := ioutil.ReadDir(c.sysfsDir)
if err != nil {
return fmt.Errorf("Can't read sysfs folder: %v", err)
}
for _, fpgaFile := range fpgaFiles {
fname := fpgaFile.Name()
if !c.deviceReg.MatchString(fname) {
continue
}
deviceFolder := path.Join(c.sysfsDir, fname)
deviceFiles, err := ioutil.ReadDir(deviceFolder)
if err != nil {
return err
}
var regions []region
var afus []afu
for _, deviceFile := range deviceFiles {
name := deviceFile.Name()
if c.fmeReg.MatchString(name) {
if len(regions) > 0 {
return fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname)
}
interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id")
data, err := ioutil.ReadFile(interfaceIDFile)
if err != nil {
return err
}
devNode, err := c.getDevNode(name)
if err != nil {
return err
}
regions = append(regions, region{
id: name,
interfaceID: strings.TrimSpace(string(data)),
devNode: devNode,
})
} else if c.portReg.MatchString(name) {
afuFile := path.Join(deviceFolder, name, "afu_id")
data, err := ioutil.ReadFile(afuFile)
if err != nil {
return err
}
devNode, err := c.getDevNode(name)
if err != nil {
return err
}
afus = append(afus, afu{
id: name,
afuID: strings.TrimSpace(string(data)),
devNode: devNode,
})
}
}
if len(regions) == 0 {
return fmt.Errorf("No regions found for device %s", fname)
}
// Currently only one region per device is supported.
regions[0].afus = afus
devices = append(devices, device{
name: fname,
regions: regions,
})
}
c.detectUpdates(devices)
c.devices = devices
return nil
}
// Run starts scanning of FPGA devices on the host
func (c *Cache) Run() error {
for {
err := c.scanFPGAs()
if err != nil {
glog.Error("Device scan failed: ", err)
return fmt.Errorf("Device scan failed: %v", err)
}
time.Sleep(5 * time.Second)
}
}

View File

@ -0,0 +1,439 @@
// Copyright 2018 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package devicecache
import (
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"strings"
"testing"
"time"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
)
func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFiles map[string][]byte) error {
var err error
for _, devfsdir := range devfsDirs {
err = os.MkdirAll(path.Join(devfs, devfsdir), 0755)
if err != nil {
return fmt.Errorf("Failed to create fake device directory: %v", err)
}
}
for _, sysfsdir := range sysfsDirs {
err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755)
if err != nil {
return fmt.Errorf("Failed to create fake device directory: %v", err)
}
}
for filename, body := range sysfsFiles {
err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644)
if err != nil {
return fmt.Errorf("Failed to create fake vendor file: %v", err)
}
}
return nil
}
func TestNewCache(t *testing.T) {
tcases := []struct {
mode string
expectedErr bool
}{
{
mode: AfMode,
},
{
mode: RegionMode,
},
{
mode: "unparsable",
expectedErr: true,
},
}
for _, tcase := range tcases {
_, err := NewCache("", "", tcase.mode, nil)
if tcase.expectedErr && err == nil {
t.Error("No error generated when creating Cache with invalid parameters")
}
}
}
// getDevices returns static list of device structs for testing purposes
func getDevices() []device {
return []device{
{
name: "intel-fpga-dev.0",
regions: []region{
{
id: "intel-fpga-fme.0",
interfaceID: "ce48969398f05f33946d560708be108a",
devNode: "/dev/intel-fpga-fme.0",
afus: []afu{
{
id: "intel-fpga-port.0",
afuID: "d8424dc4a4a3c413f89e433683f9040b",
devNode: "/dev/intel-fpga-port.0",
},
},
},
},
},
{
name: "intel-fpga-dev.1",
regions: []region{
{
id: "intel-fpga-fme.1",
interfaceID: "ce48969398f05f33946d560708be108a",
devNode: "/dev/intel-fpga-fme.1",
afus: []afu{
{
id: "intel-fpga-port.1",
afuID: "d8424dc4a4a3c413f89e433683f9040b",
devNode: "/dev/intel-fpga-port.1",
},
},
},
},
},
}
}
func TestGetRegionMap(t *testing.T) {
expected := map[string]map[string]deviceplugin.DeviceInfo{
"ce48969398f05f33946d560708be108a": {
"intel-fpga-fme.0": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"},
},
"intel-fpga-fme.1": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
},
},
}
result := getRegionMap(getDevices())
if !reflect.DeepEqual(result, expected) {
t.Error("Got unexpected result: ", result)
}
}
func TestGetAfuMap(t *testing.T) {
expected := map[string]map[string]deviceplugin.DeviceInfo{
"d8424dc4a4a3c413f89e433683f9040b": {
"intel-fpga-port.0": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0"},
},
"intel-fpga-port.1": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.1"},
},
},
}
result := getAfuMap(getDevices())
if !reflect.DeepEqual(result, expected) {
t.Error("Got unexpected result: ", result)
}
}
func getDevMapClosure(oldmap map[string]map[string]deviceplugin.DeviceInfo, newmap map[string]map[string]deviceplugin.DeviceInfo) getDevMapFunc {
var callnum int
if oldmap == nil {
oldmap = make(map[string]map[string]deviceplugin.DeviceInfo)
}
if newmap == nil {
newmap = make(map[string]map[string]deviceplugin.DeviceInfo)
}
return func(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
defer func() { callnum = callnum + 1 }()
if callnum%2 == 0 {
return oldmap
}
return newmap
}
}
func TestDetectUpdates(t *testing.T) {
tcases := []struct {
name string
expectedAdded int
expectedUpdated int
expectedRemoved int
oldmap map[string]map[string]deviceplugin.DeviceInfo
newmap map[string]map[string]deviceplugin.DeviceInfo
}{
{
name: "No devices found",
},
{
name: "Added 1 new device type",
newmap: map[string]map[string]deviceplugin.DeviceInfo{
"fpgaID": {
"intel-fpga-port.0": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0"},
},
},
},
expectedAdded: 1,
},
{
name: "Updated 1 new device type",
oldmap: map[string]map[string]deviceplugin.DeviceInfo{
"fpgaID": {
"intel-fpga-port.0": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0"},
},
},
},
newmap: map[string]map[string]deviceplugin.DeviceInfo{
"fpgaID": {
"intel-fpga-port.1": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.1"},
},
},
},
expectedUpdated: 1,
},
{
name: "Removed 1 new device type",
oldmap: map[string]map[string]deviceplugin.DeviceInfo{
"fpgaID": {
"intel-fpga-port.0": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0"},
},
},
},
expectedRemoved: 1,
},
}
for _, tcase := range tcases {
ch := make(chan UpdateInfo, 1)
cache, err := NewCache("", "", AfMode, ch)
if err != nil {
t.Fatal(err)
}
cache.getDevMap = getDevMapClosure(tcase.oldmap, tcase.newmap)
cache.detectUpdates([]device{})
var update UpdateInfo
select {
case update = <-ch:
default:
}
if tcase.expectedAdded != len(update.Added) {
t.Errorf("Test case '%s': expected %d added device types, but got %d", tcase.name, tcase.expectedAdded, len(update.Added))
}
if tcase.expectedUpdated != len(update.Updated) {
t.Errorf("Test case '%s': expected %d updated device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
}
if tcase.expectedRemoved != len(update.Removed) {
t.Errorf("Test case '%s': expected %d removed device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
}
}
}
func TestScanFPGAs(t *testing.T) {
tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestDiscoverFPGAs-%d", time.Now().Unix())
sysfs := path.Join(tmpdir, "sys", "class", "fpga")
devfs := path.Join(tmpdir, "dev")
tcases := []struct {
name string
devfsdirs []string
sysfsdirs []string
sysfsfiles map[string][]byte
errorContains string
expectedDevices []device
}{
{
name: "No sysfs folder given",
errorContains: "Can't read sysfs folder",
},
{
name: "FPGA device without FME and ports",
sysfsdirs: []string{"intel-fpga-dev.0", "incorrect-file-name"},
errorContains: "No regions found",
},
{
name: "AFU without ID",
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
errorContains: "afu_id: no such file or directory",
},
{
name: "No device node for detected AFU",
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
},
errorContains: "/dev/intel-fpga-port.0: no such file or directory",
},
{
name: "AFU without corresponding FME",
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
devfsdirs: []string{"intel-fpga-port.0"},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
},
errorContains: "No regions found",
},
{
name: "More than one FME per FPGA device",
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
"intel-fpga-dev.0/intel-fpga-fme.1/pr",
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.0/intel-fpga-fme.1/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
},
devfsdirs: []string{
"intel-fpga-fme.0",
"intel-fpga-fme.1",
},
errorContains: "Detected more than one FPGA region",
},
{
name: "FME without interface ID",
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0"},
errorContains: "interface_id: no such file or directory",
},
{
name: "No device node for detected region",
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0/pr"},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
},
errorContains: "/dev/intel-fpga-fme.0: no such file or directory",
},
{
name: "No errors expected",
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-port.0",
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
"intel-fpga-dev.1/intel-fpga-port.1",
"intel-fpga-dev.1/intel-fpga-fme.1/pr",
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
"intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
},
devfsdirs: []string{
"intel-fpga-port.0", "intel-fpga-fme.0",
"intel-fpga-port.1", "intel-fpga-fme.1",
},
expectedDevices: []device{
{
name: "intel-fpga-dev.0",
regions: []region{
{
id: "intel-fpga-fme.0",
interfaceID: "ce48969398f05f33946d560708be108a",
devNode: path.Join(devfs, "intel-fpga-fme.0"),
afus: []afu{
{
id: "intel-fpga-port.0",
afuID: "d8424dc4a4a3c413f89e433683f9040b",
devNode: path.Join(devfs, "intel-fpga-port.0"),
},
},
},
},
},
{
name: "intel-fpga-dev.1",
regions: []region{
{
id: "intel-fpga-fme.1",
interfaceID: "ce48969398f05f33946d560708be108a",
devNode: path.Join(devfs, "intel-fpga-fme.1"),
afus: []afu{
{
id: "intel-fpga-port.1",
afuID: "d8424dc4a4a3c413f89e433683f9040b",
devNode: path.Join(devfs, "intel-fpga-port.1"),
},
},
},
},
},
},
},
}
for _, tcase := range tcases {
err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles)
if err != nil {
t.Fatal(err)
}
cache, err := NewCache(sysfs, devfs, AfMode, nil)
if err != nil {
t.Fatal(err)
}
cache.getDevMap = func(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
return make(map[string]map[string]deviceplugin.DeviceInfo)
}
err = cache.scanFPGAs()
if tcase.errorContains != "" {
if err == nil || !strings.Contains(err.Error(), tcase.errorContains) {
t.Errorf("Test case '%s': expected error '%s', but got '%v'", tcase.name, tcase.errorContains, err)
}
} else if err != nil {
t.Errorf("Test case '%s': expected no error, but got '%v'", tcase.name, err)
}
if tcase.expectedDevices != nil && !reflect.DeepEqual(tcase.expectedDevices, cache.devices) {
t.Errorf("Test case '%s': expected devices '%v', but got '%v'", tcase.name, tcase.expectedDevices, cache.devices)
}
err = os.RemoveAll(tmpdir)
if err != nil {
t.Fatal(err)
}
}
}
func TestRun(t *testing.T) {
cache := Cache{
sysfsDir: "/dev/null",
}
err := cache.Run()
if err == nil {
t.Error("Expected error, but got nil")
}
}

View File

@ -17,157 +17,44 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"regexp"
"sort"
"strings"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"github.com/intel/intel-device-plugins-for-kubernetes/cmd/fpga_plugin/devicecache"
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
)
const (
sysfsDirectory = "/sys/class/fpga"
devfsDirectory = "/dev"
deviceRE = `^intel-fpga-dev.[0-9]+$`
portRE = `^intel-fpga-port.[0-9]+$`
fmeRE = `^intel-fpga-fme.[0-9]+$`
// Device plugin settings.
pluginEndpointPrefix = "intel-fpga"
resourceNamePrefix = "intel.com/fpga"
)
type pluginMode string
const (
afMode pluginMode = "af"
regionMode pluginMode = "region"
)
func isValidPluginMode(m pluginMode) bool {
return m == afMode || m == regionMode
}
// deviceManager manages Intel FPGA devices.
type deviceManager struct {
srv deviceplugin.Server
fpgaID string
name string
ch chan map[string]deviceplugin.DeviceInfo
devices map[string]deviceplugin.DeviceInfo
root string
mode pluginMode
}
func newDeviceManager(resourceName string, fpgaID string, rootDir string, mode pluginMode) *deviceManager {
func newDeviceManager(resourceName string, fpgaID string, ch chan map[string]deviceplugin.DeviceInfo) *deviceManager {
return &deviceManager{
fpgaID: fpgaID,
name: resourceName,
ch: ch,
devices: make(map[string]deviceplugin.DeviceInfo),
root: rootDir,
mode: mode,
}
}
// Discovers all FPGA devices available on the local node by walking `/sys/class/fpga` directory.
func discoverFPGAs(sysfsDir string, devfsDir string, mode pluginMode) (map[string]map[string]deviceplugin.DeviceInfo, error) {
deviceReg := regexp.MustCompile(deviceRE)
portReg := regexp.MustCompile(portRE)
fmeReg := regexp.MustCompile(fmeRE)
result := make(map[string]map[string]deviceplugin.DeviceInfo)
fpgaFiles, err := ioutil.ReadDir(sysfsDir)
if err != nil {
return nil, fmt.Errorf("Can't read sysfs folder: %v", err)
}
for _, fpgaFile := range fpgaFiles {
fname := fpgaFile.Name()
if deviceReg.MatchString(fname) {
var interfaceID string
deviceFolder := path.Join(sysfsDir, fname)
deviceFiles, err := ioutil.ReadDir(deviceFolder)
if err != nil {
return nil, err
}
fpgaNodes := make(map[string][]string)
if mode == regionMode {
for _, deviceFile := range deviceFiles {
name := deviceFile.Name()
if fmeReg.MatchString(name) {
if len(interfaceID) > 0 {
return nil, fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname)
}
interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id")
data, err := ioutil.ReadFile(interfaceIDFile)
if err != nil {
return nil, err
}
interfaceID = fmt.Sprintf("%s-%s", mode, strings.TrimSpace(string(data)))
fpgaNodes[interfaceID] = append(fpgaNodes[interfaceID], name)
}
}
}
for _, deviceFile := range deviceFiles {
name := deviceFile.Name()
if portReg.MatchString(name) {
switch mode {
case regionMode:
if len(interfaceID) == 0 {
return nil, fmt.Errorf("No FPGA region found for %s", fname)
}
fpgaNodes[interfaceID] = append(fpgaNodes[interfaceID], name)
case afMode:
afuFile := path.Join(deviceFolder, name, "afu_id")
data, err := ioutil.ReadFile(afuFile)
if err != nil {
return nil, err
}
afuID := fmt.Sprintf("%s-%s", mode, strings.TrimSpace(string(data)))
fpgaNodes[afuID] = append(fpgaNodes[afuID], name)
default:
glog.Fatal("Unsupported mode")
}
}
}
if len(fpgaNodes) == 0 {
return nil, fmt.Errorf("No device nodes found for %s", fname)
}
for fpgaID, nodes := range fpgaNodes {
var devNodes []string
for _, node := range nodes {
devNode := path.Join(devfsDir, node)
if _, err := os.Stat(devNode); err != nil {
return nil, fmt.Errorf("Device %s doesn't exist: %+v", devNode, err)
}
devNodes = append(devNodes, devNode)
}
sort.Strings(devNodes)
if _, ok := result[fpgaID]; !ok {
result[fpgaID] = make(map[string]deviceplugin.DeviceInfo)
}
result[fpgaID][fname] = deviceplugin.DeviceInfo{
State: pluginapi.Healthy,
Nodes: devNodes,
}
}
}
}
return result, nil
}
func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
fmt.Println("GetDevicePluginOptions: return empty options")
return new(pluginapi.DevicePluginOptions), nil
@ -176,37 +63,21 @@ func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *plug
// ListAndWatch returns a list of devices
// Whenever a device state change or a device disappears, ListAndWatch returns the new list
func (dm *deviceManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
sysfsDir := path.Join(dm.root, sysfsDirectory)
devfsDir := path.Join(dm.root, devfsDirectory)
for {
devs, err := discoverFPGAs(sysfsDir, devfsDir, dm.mode)
if err != nil {
dm.srv.Stop()
return fmt.Errorf("Device discovery failed: %+v", err)
}
devinfos, ok := devs[dm.fpgaID]
if !ok {
dm.srv.Stop()
return fmt.Errorf("AFU id %s disappeared", dm.fpgaID)
}
if !reflect.DeepEqual(dm.devices, devinfos) {
dm.devices = devinfos
glog.V(2).Info("Started ListAndWatch for ", dm.fpgaID)
for dm.devices = range dm.ch {
resp := new(pluginapi.ListAndWatchResponse)
var keys []string
for key := range dm.devices {
keys = append(keys, key)
}
sort.Strings(keys)
for _, id := range keys {
resp.Devices = append(resp.Devices, &pluginapi.Device{id, dm.devices[id].State})
for id, device := range dm.devices {
resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State})
}
glog.V(2).Info("Sending to kubelet ", resp.Devices)
if err := stream.Send(resp); err != nil {
dm.srv.Stop()
return fmt.Errorf("device-plugin: cannot update device list: %v", err)
}
}
time.Sleep(5 * time.Second)
}
return nil
}
func (dm *deviceManager) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
@ -218,42 +89,58 @@ func (dm *deviceManager) PreStartContainer(ctx context.Context, rqt *pluginapi.P
return new(pluginapi.PreStartContainerResponse), nil
}
func main() {
var modeStr string
flag.StringVar(&modeStr, "mode", string(afMode), fmt.Sprintf("device plugin mode: '%s' (default) or '%s'", afMode, regionMode))
flag.Parse()
mode := pluginMode(modeStr)
if !isValidPluginMode(mode) {
glog.Error("Wrong mode: ", modeStr)
os.Exit(1)
}
fmt.Println("FPGA device plugin started in", mode, "mode")
devs, err := discoverFPGAs(sysfsDirectory, devfsDirectory, mode)
if err != nil {
glog.Fatalf("Device discovery failed: %+v", err)
}
if len(devs) == 0 {
glog.Error("No devices found. Waiting indefinitely.")
select {}
}
ch := make(chan error)
for fpgaID := range devs {
resourceName := resourceNamePrefix + "-" + fpgaID
pPrefix := pluginEndpointPrefix + "-" + fpgaID
dm := newDeviceManager(resourceName, fpgaID, "/", mode)
go func() {
ch <- dm.srv.Serve(dm, resourceName, pPrefix)
}()
}
err = <-ch
func startDeviceManager(dm *deviceManager, pluginPrefix string) {
err := dm.srv.Serve(dm, dm.name, pluginPrefix)
if err != nil {
glog.Fatal(err)
}
}
func handleUpdate(dms map[string]*deviceManager, updateInfo devicecache.UpdateInfo, start func(*deviceManager, string)) {
glog.V(2).Info("Recieved dev updates: ", updateInfo)
for fpgaID, devices := range updateInfo.Added {
devCh := make(chan map[string]deviceplugin.DeviceInfo, 1)
resourceName := resourceNamePrefix + "-" + fpgaID
pPrefix := pluginEndpointPrefix + "-" + fpgaID
dms[fpgaID] = newDeviceManager(resourceName, fpgaID, devCh)
go start(dms[fpgaID], pPrefix)
devCh <- devices
}
for fpgaID, devices := range updateInfo.Updated {
dms[fpgaID].ch <- devices
}
for fpgaID := range updateInfo.Removed {
dms[fpgaID].srv.Stop()
close(dms[fpgaID].ch)
delete(dms, fpgaID)
}
}
func main() {
var mode string
flag.StringVar(&mode, "mode", string(devicecache.AfMode), fmt.Sprintf("device plugin mode: '%s' (default) or '%s'", devicecache.AfMode, devicecache.RegionMode))
flag.Parse()
updatesCh := make(chan devicecache.UpdateInfo)
cache, err := devicecache.NewCache(sysfsDirectory, devfsDirectory, mode, updatesCh)
if err != nil {
glog.Error(err)
os.Exit(1)
}
glog.Info("FPGA device plugin started in ", mode, " mode")
go func() {
err := cache.Run()
if err != nil {
glog.Fatal(err)
}
}()
dms := make(map[string]*deviceManager)
for updateInfo := range updatesCh {
handleUpdate(dms, updateInfo, startDeviceManager)
}
}

View File

@ -16,241 +16,30 @@ package main
import (
"fmt"
"io/ioutil"
"os"
"path"
"reflect"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"github.com/intel/intel-device-plugins-for-kubernetes/cmd/fpga_plugin/devicecache"
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
)
func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFiles map[string][]byte) error {
var err error
for _, devfsdir := range devfsDirs {
err = os.MkdirAll(path.Join(devfs, devfsdir), 0755)
if err != nil {
return fmt.Errorf("Failed to create fake device directory: %+v", err)
}
}
for _, sysfsdir := range sysfsDirs {
err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755)
if err != nil {
return fmt.Errorf("Failed to create fake device directory: %+v", err)
}
}
for filename, body := range sysfsFiles {
err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644)
if err != nil {
return fmt.Errorf("Failed to create fake vendor file: %+v", err)
}
}
return nil
}
func TestDiscoverFPGAs(t *testing.T) {
tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestDiscoverFPGAs-%d", time.Now().Unix())
sysfs := path.Join(tmpdir, "sys", "class", "fpga")
devfs := path.Join(tmpdir, "dev")
tcases := []struct {
devfsdirs []string
sysfsdirs []string
sysfsfiles map[string][]byte
expectedResult map[string]map[string]deviceplugin.DeviceInfo
expectedErr bool
mode pluginMode
}{
{
expectedResult: nil,
expectedErr: true,
mode: afMode,
},
{
sysfsdirs: []string{"intel-fpga-dev.0"},
expectedResult: nil,
expectedErr: true,
mode: afMode,
},
{
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
expectedResult: nil,
expectedErr: true,
mode: afMode,
},
{
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-port.0",
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
},
expectedResult: nil,
expectedErr: true,
mode: afMode,
},
{
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-port.0",
"intel-fpga-dev.0/intel-fpga-fme.0",
"intel-fpga-dev.1/intel-fpga-port.1",
"intel-fpga-dev.1/intel-fpga-fme.1",
"intel-fpga-dev.2/intel-fpga-port.2",
"intel-fpga-dev.2/intel-fpga-fme.2",
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.2/intel-fpga-port.2/afu_id": []byte("47595d0fae972fbed0c51b4a41c7a349\n"),
},
devfsdirs: []string{
"intel-fpga-port.0", "intel-fpga-fme.0",
"intel-fpga-port.1", "intel-fpga-fme.1",
"intel-fpga-port.2", "intel-fpga-fme.2",
},
expectedResult: map[string]map[string]deviceplugin.DeviceInfo{
fmt.Sprintf("%s-d8424dc4a4a3c413f89e433683f9040b", afMode): {
"intel-fpga-dev.0": {
State: "Healthy",
Nodes: []string{
path.Join(tmpdir, "/dev/intel-fpga-port.0"),
},
},
"intel-fpga-dev.1": {
State: "Healthy",
Nodes: []string{
path.Join(tmpdir, "/dev/intel-fpga-port.1"),
},
},
},
fmt.Sprintf("%s-47595d0fae972fbed0c51b4a41c7a349", afMode): {
"intel-fpga-dev.2": {
State: "Healthy",
Nodes: []string{
path.Join(tmpdir, "/dev/intel-fpga-port.2"),
},
},
},
},
expectedErr: false,
mode: afMode,
},
{
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
"intel-fpga-dev.0/intel-fpga-fme.1/pr",
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.0/intel-fpga-fme.1/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
},
expectedResult: nil,
expectedErr: true,
mode: regionMode,
},
{
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-port.0",
"intel-fpga-dev.1/intel-fpga-port.1",
},
expectedResult: nil,
expectedErr: true,
mode: regionMode,
},
{
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-port.0",
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
"intel-fpga-dev.1/intel-fpga-port.1",
"intel-fpga-dev.1/intel-fpga-fme.1/pr",
"intel-fpga-dev.2/intel-fpga-port.2",
"intel-fpga-dev.2/intel-fpga-fme.2/pr",
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
"intel-fpga-dev.2/intel-fpga-port.2/afu_id": []byte("47595d0fae972fbed0c51b4a41c7a349\n"),
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
"intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
"intel-fpga-dev.2/intel-fpga-fme.2/pr/interface_id": []byte("fd967345645f05f338462a0748be0091\n"),
},
devfsdirs: []string{
"intel-fpga-port.0", "intel-fpga-fme.0",
"intel-fpga-port.1", "intel-fpga-fme.1",
"intel-fpga-port.2", "intel-fpga-fme.2",
},
expectedResult: map[string]map[string]deviceplugin.DeviceInfo{
fmt.Sprintf("%s-ce48969398f05f33946d560708be108a", regionMode): {
"intel-fpga-dev.0": {
State: "Healthy",
Nodes: []string{
path.Join(tmpdir, "/dev/intel-fpga-fme.0"),
path.Join(tmpdir, "/dev/intel-fpga-port.0"),
},
},
"intel-fpga-dev.1": {
State: "Healthy",
Nodes: []string{
path.Join(tmpdir, "/dev/intel-fpga-fme.1"),
path.Join(tmpdir, "/dev/intel-fpga-port.1"),
},
},
},
fmt.Sprintf("%s-fd967345645f05f338462a0748be0091", regionMode): {
"intel-fpga-dev.2": {
State: "Healthy",
Nodes: []string{
path.Join(tmpdir, "/dev/intel-fpga-fme.2"),
path.Join(tmpdir, "/dev/intel-fpga-port.2"),
},
},
},
},
expectedErr: false,
mode: regionMode,
},
}
for _, tcase := range tcases {
err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles)
if err != nil {
t.Error(err)
}
result, err := discoverFPGAs(sysfs, devfs, tcase.mode)
if tcase.expectedErr && err == nil {
t.Error("Expected error hasn't been triggered")
}
if tcase.expectedResult != nil && !reflect.DeepEqual(result, tcase.expectedResult) {
t.Logf("Expected result: %+v", tcase.expectedResult)
t.Logf("Actual result: %+v", result)
t.Error("Got unexpeced result")
}
err = os.RemoveAll(tmpdir)
if err != nil {
t.Fatalf("Failed to remove fake device directory: %+v", err)
}
}
}
// Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer
type listAndWatchServerStub struct {
testDM *deviceManager
t *testing.T
generateErr bool
cdata chan []*pluginapi.Device
cerr chan error
}
func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error {
if s.generateErr {
fmt.Println("listAndWatchServerStub::Send returns error")
return fmt.Errorf("Fake error")
}
fmt.Println("listAndWatchServerStub::Send", resp.Devices)
s.cdata <- resp.Devices
return nil
@ -279,114 +68,78 @@ func (s *listAndWatchServerStub) SetHeader(m metadata.MD) error {
func (s *listAndWatchServerStub) SetTrailer(m metadata.MD) {
}
func TestListAndWatch(t *testing.T) {
afuID := "d8424dc4a4a3c413f89e433683f9040b"
tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestListAndWatch-%d", time.Now().Unix())
sysfs := path.Join(tmpdir, "sys", "class", "fpga")
devfs := path.Join(tmpdir, "dev")
func TestGetDevicePluginOptions(t *testing.T) {
dm := &deviceManager{}
dm.GetDevicePluginOptions(nil, nil)
}
func TestPreStartContainer(t *testing.T) {
dm := &deviceManager{}
dm.PreStartContainer(nil, nil)
}
func TestListAndWatch(t *testing.T) {
tcases := []struct {
devfsdirs []string
sysfsdirs []string
sysfsfiles map[string][]byte
expectedResult []*pluginapi.Device
name string
updates []map[string]deviceplugin.DeviceInfo
expectedErr bool
}{
{
expectedResult: nil,
expectedErr: true,
name: "No updates and close",
},
{
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-port.0",
name: "Send 1 update",
updates: []map[string]deviceplugin.DeviceInfo{
{
"fake_id": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0"},
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\n"),
},
devfsdirs: []string{
"intel-fpga-port.0", "intel-fpga-fme.0",
},
expectedResult: nil,
expectedErr: true,
},
{
sysfsdirs: []string{
"intel-fpga-dev.0/intel-fpga-port.0",
"intel-fpga-dev.0/intel-fpga-fme.0",
"intel-fpga-dev.1/intel-fpga-port.1",
"intel-fpga-dev.1/intel-fpga-fme.1",
"intel-fpga-dev.2/intel-fpga-port.2",
"intel-fpga-dev.2/intel-fpga-fme.2",
name: "Send 1 update, but expect streaming error",
updates: []map[string]deviceplugin.DeviceInfo{
{
"fake_id": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0"},
},
sysfsfiles: map[string][]byte{
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte(afuID),
"intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte(afuID),
"intel-fpga-dev.2/intel-fpga-port.2/afu_id": []byte("47595d0fae972fbed0c51b4a41c7a349\n"),
},
devfsdirs: []string{
"intel-fpga-port.0", "intel-fpga-fme.0",
"intel-fpga-port.1", "intel-fpga-fme.1",
"intel-fpga-port.2", "intel-fpga-fme.2",
},
expectedResult: []*pluginapi.Device{
{"intel-fpga-dev.0", "Healthy"},
{"intel-fpga-dev.1", "Healthy"},
},
expectedErr: false,
expectedErr: true,
},
}
resourceName := fmt.Sprintf("%s%s-%s", resourceNamePrefix, afMode, afuID)
testDM := newDeviceManager(resourceName, fmt.Sprintf("%s-%s", afMode, afuID), tmpdir, afMode)
if testDM == nil {
t.Fatal("Failed to create a deviceManager")
}
for _, tt := range tcases {
devCh := make(chan map[string]deviceplugin.DeviceInfo, len(tt.updates))
testDM := newDeviceManager("intel.com/fpgatest-fpgaID", "fpgaID", devCh)
server := &listAndWatchServerStub{
testDM: testDM,
t: t,
cdata: make(chan []*pluginapi.Device),
cerr: make(chan error),
generateErr: tt.expectedErr,
cdata: make(chan []*pluginapi.Device, len(tt.updates)),
}
for _, tcase := range tcases {
err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles)
if err != nil {
t.Error(err)
// push device infos to DM's channel
for _, update := range tt.updates {
devCh <- update
}
close(devCh)
go func() {
err = testDM.ListAndWatch(&pluginapi.Empty{}, server)
if err != nil {
server.cerr <- err
err := testDM.ListAndWatch(&pluginapi.Empty{}, server)
if err != nil && !tt.expectedErr {
t.Errorf("Test case '%s': got unexpected error %v", tt.name, err)
}
}()
select {
case result := <-server.cdata:
if tcase.expectedErr {
t.Error("Expected error hasn't been triggered")
} else if tcase.expectedResult != nil && !reflect.DeepEqual(result, tcase.expectedResult) {
t.Logf("Expected result: %+v", tcase.expectedResult)
t.Logf("Actual result: %+v", result)
t.Error("Got unexpeced result")
}
testDM.srv.Stop()
case err = <-server.cerr:
if !tcase.expectedErr {
t.Errorf("Unexpected error has been triggered: %+v", err)
}
}
err = os.RemoveAll(tmpdir)
if err != nil {
t.Fatalf("Failed to remove fake device directory: %+v", err)
if err == nil && tt.expectedErr {
t.Errorf("Test case '%s': expected an error, but got nothing", tt.name)
}
}
}
func TestAllocate(t *testing.T) {
testDM := newDeviceManager("", "", "", afMode)
testDM := newDeviceManager("", "", nil)
if testDM == nil {
t.Fatal("Failed to create a deviceManager")
}
@ -399,10 +152,13 @@ func TestAllocate(t *testing.T) {
},
}
testDM.devices["dev1"] = deviceplugin.DeviceInfo{pluginapi.Healthy, []string{"/dev/dev1"}}
testDM.devices["dev1"] = deviceplugin.DeviceInfo{
State: pluginapi.Healthy,
Nodes: []string{"/dev/dev1"},
}
resp, err := testDM.Allocate(nil, rqt)
if err != nil {
t.Fatalf("Failed to allocate healthy device: %+v", err)
t.Fatalf("Failed to allocate healthy device: %v", err)
}
if len(resp.ContainerResponses[0].Devices) != 1 {
@ -410,28 +166,82 @@ func TestAllocate(t *testing.T) {
}
}
func TestIsValidPluginMode(t *testing.T) {
func startDeviceManagerStub(dm *deviceManager, pluginPrefix string) {
}
func TestHandleUpdate(t *testing.T) {
tcases := []struct {
input pluginMode
output bool
name string
dms map[string]*deviceManager
updateInfo devicecache.UpdateInfo
expectedDMs int
}{
{
input: afMode,
output: true,
name: "Empty update",
updateInfo: devicecache.UpdateInfo{},
expectedDMs: 0,
},
{
input: regionMode,
output: true,
name: "Add device manager",
updateInfo: devicecache.UpdateInfo{
Added: map[string]map[string]deviceplugin.DeviceInfo{
"ce48969398f05f33946d560708be108a": {
"intel-fpga-fme.0": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"},
},
"intel-fpga-fme.1": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
},
},
},
},
expectedDMs: 1,
},
{
input: pluginMode("unparsable"),
output: false,
name: "Update existing device manager",
dms: map[string]*deviceManager{
"ce48969398f05f33946d560708be108a": &deviceManager{
ch: make(chan map[string]deviceplugin.DeviceInfo, 1),
},
},
updateInfo: devicecache.UpdateInfo{
Updated: map[string]map[string]deviceplugin.DeviceInfo{
"ce48969398f05f33946d560708be108a": {
"intel-fpga-fme.1": {
State: pluginapi.Healthy,
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
},
},
},
},
expectedDMs: 1,
},
{
name: "Remove device manager",
dms: map[string]*deviceManager{
"ce48969398f05f33946d560708be108a": &deviceManager{
ch: make(chan map[string]deviceplugin.DeviceInfo, 1),
},
},
updateInfo: devicecache.UpdateInfo{
Removed: map[string]map[string]deviceplugin.DeviceInfo{
"ce48969398f05f33946d560708be108a": {},
},
},
expectedDMs: 0,
},
}
for _, tcase := range tcases {
if isValidPluginMode(tcase.input) != tcase.output {
t.Error("Wrong output", tcase.output, "for the given input", tcase.input)
for _, tt := range tcases {
if tt.dms == nil {
tt.dms = make(map[string]*deviceManager)
}
handleUpdate(tt.dms, tt.updateInfo, startDeviceManagerStub)
if len(tt.dms) != tt.expectedDMs {
t.Errorf("Test case '%s': expected %d runnig device managers, but got %d",
tt.name, tt.expectedDMs, len(tt.dms))
}
}
}