mirror of
https://github.com/intel/intel-device-plugins-for-kubernetes.git
synced 2025-06-03 03:59:37 +00:00
refactor device plugins to increase code reuse
Every device plugin is supposed to implement PluginInterfaceServer interface to be exposed as a gRPC service. But this functionality is common for all our device plugins and can be hidden in a Manager which manages all gRPC servers dynamically. The only mandatory functionality that needs to be provided by a device plugin and which differentiate one plugin from another is the code scanning the host for devices present on it. Refactor the internal deviceplugin package to accept only one mandatory method implementation from device plugins - Scan(). In addition to that a device plugin can optionally implement a PostAllocate() method which mutates responses returned by PluginInterfaceServer.Allocate() method. Also to narrow the gap between these device plugins and the kubevirt's collection the naming scheme for resources has been changed. Now device plugins provide a namespace for the device types they operate with. E.g. for resources in format "color.example.com/<color>" the namespace would be "color.example.com". So, the resource name "intel.com/fpga-region-fffffff" becomes "fpga.intel.com/region-fffffff".
This commit is contained in:
parent
5bb12f515a
commit
bbee3fde77
@ -1,336 +0,0 @@
|
||||
// Copyright 2018 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package devicecache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
)
|
||||
|
||||
// Device Cache's mode of operation
|
||||
const (
|
||||
AfMode = "af"
|
||||
RegionMode = "region"
|
||||
RegionDevelMode = "regiondevel"
|
||||
)
|
||||
|
||||
const (
|
||||
sysfsDirectory = "/sys/class/fpga"
|
||||
devfsDirectory = "/dev"
|
||||
deviceRE = `^intel-fpga-dev.[0-9]+$`
|
||||
portRE = `^intel-fpga-port.[0-9]+$`
|
||||
fmeRE = `^intel-fpga-fme.[0-9]+$`
|
||||
)
|
||||
|
||||
// UpdateInfo contains info added, updated and deleted FPGA devices
|
||||
type UpdateInfo struct {
|
||||
Added map[string]map[string]deviceplugin.DeviceInfo
|
||||
Updated map[string]map[string]deviceplugin.DeviceInfo
|
||||
Removed map[string]map[string]deviceplugin.DeviceInfo
|
||||
}
|
||||
|
||||
type getDevMapFunc func(devices []device) map[string]map[string]deviceplugin.DeviceInfo
|
||||
|
||||
// getRegionMap returns mapping of region interface IDs to AF ports and FME devices
|
||||
func getRegionDevelMap(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
|
||||
regionMap := make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
|
||||
for _, dev := range devices {
|
||||
for _, region := range dev.regions {
|
||||
fpgaID := fmt.Sprintf("%s-%s", RegionMode, region.interfaceID)
|
||||
if _, present := regionMap[fpgaID]; !present {
|
||||
regionMap[fpgaID] = make(map[string]deviceplugin.DeviceInfo)
|
||||
}
|
||||
devNodes := make([]string, len(region.afus)+1)
|
||||
for num, afu := range region.afus {
|
||||
devNodes[num] = afu.devNode
|
||||
}
|
||||
devNodes[len(region.afus)] = region.devNode
|
||||
regionMap[fpgaID][region.id] = deviceplugin.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: devNodes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return regionMap
|
||||
}
|
||||
|
||||
// getRegionMap returns mapping of region interface IDs to AF ports only
|
||||
func getRegionMap(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
|
||||
regionMap := make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
|
||||
for _, dev := range devices {
|
||||
for _, region := range dev.regions {
|
||||
fpgaID := fmt.Sprintf("%s-%s", RegionMode, region.interfaceID)
|
||||
if _, present := regionMap[fpgaID]; !present {
|
||||
regionMap[fpgaID] = make(map[string]deviceplugin.DeviceInfo)
|
||||
}
|
||||
devNodes := make([]string, len(region.afus))
|
||||
for num, afu := range region.afus {
|
||||
devNodes[num] = afu.devNode
|
||||
}
|
||||
regionMap[fpgaID][region.id] = deviceplugin.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: devNodes,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return regionMap
|
||||
}
|
||||
|
||||
// getAfuMap returns mapping of AFU IDs to AF ports
|
||||
func getAfuMap(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
|
||||
afuMap := make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
|
||||
for _, dev := range devices {
|
||||
for _, region := range dev.regions {
|
||||
for _, afu := range region.afus {
|
||||
fpgaID := fmt.Sprintf("%s-%s", AfMode, afu.afuID)
|
||||
if _, present := afuMap[fpgaID]; !present {
|
||||
afuMap[fpgaID] = make(map[string]deviceplugin.DeviceInfo)
|
||||
}
|
||||
afuMap[fpgaID][afu.id] = deviceplugin.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{afu.devNode},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return afuMap
|
||||
}
|
||||
|
||||
type afu struct {
|
||||
id string
|
||||
afuID string
|
||||
devNode string
|
||||
}
|
||||
|
||||
type region struct {
|
||||
id string
|
||||
interfaceID string
|
||||
devNode string
|
||||
afus []afu
|
||||
}
|
||||
|
||||
type device struct {
|
||||
name string
|
||||
regions []region
|
||||
}
|
||||
|
||||
// Cache represents FPGA devices found on the host
|
||||
type Cache struct {
|
||||
sysfsDir string
|
||||
devfsDir string
|
||||
|
||||
deviceReg *regexp.Regexp
|
||||
portReg *regexp.Regexp
|
||||
fmeReg *regexp.Regexp
|
||||
|
||||
devices []device
|
||||
ch chan<- UpdateInfo
|
||||
getDevMap getDevMapFunc
|
||||
ignoreAfuIDs bool
|
||||
}
|
||||
|
||||
// NewCache returns new instance of Cache
|
||||
func NewCache(sysfsDir string, devfsDir string, mode string, ch chan<- UpdateInfo) (*Cache, error) {
|
||||
var getDevMap getDevMapFunc
|
||||
|
||||
ignoreAfuIDs := false
|
||||
switch mode {
|
||||
case AfMode:
|
||||
getDevMap = getAfuMap
|
||||
case RegionMode:
|
||||
getDevMap = getRegionMap
|
||||
ignoreAfuIDs = true
|
||||
case RegionDevelMode:
|
||||
getDevMap = getRegionDevelMap
|
||||
ignoreAfuIDs = true
|
||||
default:
|
||||
return nil, fmt.Errorf("Wrong mode: '%s'", mode)
|
||||
}
|
||||
|
||||
return &Cache{
|
||||
sysfsDir: sysfsDir,
|
||||
devfsDir: devfsDir,
|
||||
deviceReg: regexp.MustCompile(deviceRE),
|
||||
portReg: regexp.MustCompile(portRE),
|
||||
fmeReg: regexp.MustCompile(fmeRE),
|
||||
ch: ch,
|
||||
getDevMap: getDevMap,
|
||||
ignoreAfuIDs: ignoreAfuIDs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *Cache) getDevNode(devName string) (string, error) {
|
||||
devNode := path.Join(c.devfsDir, devName)
|
||||
if _, err := os.Stat(devNode); err != nil {
|
||||
return "", fmt.Errorf("Device %s doesn't exist: %v", devNode, err)
|
||||
}
|
||||
|
||||
return devNode, nil
|
||||
}
|
||||
|
||||
func (c *Cache) detectUpdates(devices []device) {
|
||||
added := make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
updated := make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
|
||||
oldDevMap := c.getDevMap(c.devices)
|
||||
|
||||
for fpgaID, new := range c.getDevMap(devices) {
|
||||
if old, ok := oldDevMap[fpgaID]; ok {
|
||||
if !reflect.DeepEqual(old, new) {
|
||||
updated[fpgaID] = new
|
||||
}
|
||||
delete(oldDevMap, fpgaID)
|
||||
} else {
|
||||
added[fpgaID] = new
|
||||
}
|
||||
}
|
||||
|
||||
if len(added) > 0 || len(updated) > 0 || len(oldDevMap) > 0 {
|
||||
c.ch <- UpdateInfo{
|
||||
Added: added,
|
||||
Updated: updated,
|
||||
Removed: oldDevMap,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Cache) scanFPGAs() error {
|
||||
var devices []device
|
||||
|
||||
glog.V(2).Info("Start new FPGA scan")
|
||||
|
||||
fpgaFiles, err := ioutil.ReadDir(c.sysfsDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't read sysfs folder: %v", err)
|
||||
}
|
||||
|
||||
for _, fpgaFile := range fpgaFiles {
|
||||
fname := fpgaFile.Name()
|
||||
|
||||
if !c.deviceReg.MatchString(fname) {
|
||||
continue
|
||||
}
|
||||
|
||||
deviceFolder := path.Join(c.sysfsDir, fname)
|
||||
deviceFiles, err := ioutil.ReadDir(deviceFolder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
regions, afus, err := c.getSysFsInfo(deviceFolder, deviceFiles, fname)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(regions) == 0 {
|
||||
return fmt.Errorf("No regions found for device %s", fname)
|
||||
}
|
||||
|
||||
// Currently only one region per device is supported.
|
||||
regions[0].afus = afus
|
||||
devices = append(devices, device{
|
||||
name: fname,
|
||||
regions: regions,
|
||||
})
|
||||
}
|
||||
|
||||
c.detectUpdates(devices)
|
||||
c.devices = devices
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Cache) getSysFsInfo(deviceFolder string, deviceFiles []os.FileInfo, fname string) ([]region, []afu, error) {
|
||||
var regions []region
|
||||
var afus []afu
|
||||
for _, deviceFile := range deviceFiles {
|
||||
name := deviceFile.Name()
|
||||
|
||||
if c.fmeReg.MatchString(name) {
|
||||
if len(regions) > 0 {
|
||||
return nil, nil, fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname)
|
||||
}
|
||||
interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id")
|
||||
data, err := ioutil.ReadFile(interfaceIDFile)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
devNode, err := c.getDevNode(name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
regions = append(regions, region{
|
||||
id: name,
|
||||
interfaceID: strings.TrimSpace(string(data)),
|
||||
devNode: devNode,
|
||||
})
|
||||
} else if c.portReg.MatchString(name) {
|
||||
var afuID string
|
||||
|
||||
if c.ignoreAfuIDs {
|
||||
afuID = "unused_afu_id"
|
||||
} else {
|
||||
afuFile := path.Join(deviceFolder, name, "afu_id")
|
||||
data, err := ioutil.ReadFile(afuFile)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
afuID = strings.TrimSpace(string(data))
|
||||
}
|
||||
devNode, err := c.getDevNode(name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
afus = append(afus, afu{
|
||||
id: name,
|
||||
afuID: afuID,
|
||||
devNode: devNode,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return regions, afus, nil
|
||||
}
|
||||
|
||||
// Run starts scanning of FPGA devices on the host
|
||||
func (c *Cache) Run() error {
|
||||
for {
|
||||
err := c.scanFPGAs()
|
||||
if err != nil {
|
||||
glog.Error("Device scan failed: ", err)
|
||||
return fmt.Errorf("Device scan failed: %v", err)
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
@ -1,526 +0,0 @@
|
||||
// Copyright 2018 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package devicecache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
)
|
||||
|
||||
func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFiles map[string][]byte) error {
|
||||
var err error
|
||||
|
||||
for _, devfsdir := range devfsDirs {
|
||||
err = os.MkdirAll(path.Join(devfs, devfsdir), 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create fake device directory: %v", err)
|
||||
}
|
||||
}
|
||||
for _, sysfsdir := range sysfsDirs {
|
||||
err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create fake device directory: %v", err)
|
||||
}
|
||||
}
|
||||
for filename, body := range sysfsFiles {
|
||||
err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create fake vendor file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestNewCache(t *testing.T) {
|
||||
tcases := []struct {
|
||||
mode string
|
||||
expectedErr bool
|
||||
}{
|
||||
{
|
||||
mode: AfMode,
|
||||
},
|
||||
{
|
||||
mode: RegionMode,
|
||||
},
|
||||
{
|
||||
mode: RegionDevelMode,
|
||||
},
|
||||
{
|
||||
mode: "unparsable",
|
||||
expectedErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tcase := range tcases {
|
||||
_, err := NewCache("", "", tcase.mode, nil)
|
||||
if tcase.expectedErr && err == nil {
|
||||
t.Error("No error generated when creating Cache with invalid parameters")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getDevices returns static list of device structs for testing purposes
|
||||
func getDevices() []device {
|
||||
return []device{
|
||||
{
|
||||
name: "intel-fpga-dev.0",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.0",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: "/dev/intel-fpga-fme.0",
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.0",
|
||||
afuID: "d8424dc4a4a3c413f89e433683f9040b",
|
||||
devNode: "/dev/intel-fpga-port.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "intel-fpga-dev.1",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.1",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: "/dev/intel-fpga-fme.1",
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.1",
|
||||
afuID: "d8424dc4a4a3c413f89e433683f9040b",
|
||||
devNode: "/dev/intel-fpga-port.1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRegionDevelMap(t *testing.T) {
|
||||
expected := map[string]map[string]deviceplugin.DeviceInfo{
|
||||
RegionMode + "-ce48969398f05f33946d560708be108a": {
|
||||
"intel-fpga-fme.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"},
|
||||
},
|
||||
"intel-fpga-fme.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := getRegionDevelMap(getDevices())
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Error("Got unexpected result: ", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRegionMap(t *testing.T) {
|
||||
expected := map[string]map[string]deviceplugin.DeviceInfo{
|
||||
RegionMode + "-ce48969398f05f33946d560708be108a": {
|
||||
"intel-fpga-fme.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
"intel-fpga-fme.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := getRegionMap(getDevices())
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Error("Got unexpected result: ", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAfuMap(t *testing.T) {
|
||||
expected := map[string]map[string]deviceplugin.DeviceInfo{
|
||||
AfMode + "-d8424dc4a4a3c413f89e433683f9040b": {
|
||||
"intel-fpga-port.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
"intel-fpga-port.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
result := getAfuMap(getDevices())
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Error("Got unexpected result: ", result)
|
||||
}
|
||||
}
|
||||
|
||||
func getDevMapClosure(oldmap map[string]map[string]deviceplugin.DeviceInfo, newmap map[string]map[string]deviceplugin.DeviceInfo) getDevMapFunc {
|
||||
var callnum int
|
||||
|
||||
if oldmap == nil {
|
||||
oldmap = make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
}
|
||||
if newmap == nil {
|
||||
newmap = make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
}
|
||||
|
||||
return func(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
|
||||
defer func() { callnum = callnum + 1 }()
|
||||
|
||||
if callnum%2 == 0 {
|
||||
return oldmap
|
||||
|
||||
}
|
||||
return newmap
|
||||
}
|
||||
}
|
||||
|
||||
func TestDetectUpdates(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
expectedAdded int
|
||||
expectedUpdated int
|
||||
expectedRemoved int
|
||||
oldmap map[string]map[string]deviceplugin.DeviceInfo
|
||||
newmap map[string]map[string]deviceplugin.DeviceInfo
|
||||
}{
|
||||
{
|
||||
name: "No devices found",
|
||||
},
|
||||
{
|
||||
name: "Added 1 new device type",
|
||||
newmap: map[string]map[string]deviceplugin.DeviceInfo{
|
||||
"fpgaID": {
|
||||
"intel-fpga-port.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedAdded: 1,
|
||||
},
|
||||
{
|
||||
name: "Updated 1 new device type",
|
||||
oldmap: map[string]map[string]deviceplugin.DeviceInfo{
|
||||
"fpgaID": {
|
||||
"intel-fpga-port.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
newmap: map[string]map[string]deviceplugin.DeviceInfo{
|
||||
"fpgaID": {
|
||||
"intel-fpga-port.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedUpdated: 1,
|
||||
},
|
||||
{
|
||||
name: "Removed 1 new device type",
|
||||
oldmap: map[string]map[string]deviceplugin.DeviceInfo{
|
||||
"fpgaID": {
|
||||
"intel-fpga-port.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedRemoved: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tcase := range tcases {
|
||||
ch := make(chan UpdateInfo, 1)
|
||||
cache, err := NewCache("", "", AfMode, ch)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cache.getDevMap = getDevMapClosure(tcase.oldmap, tcase.newmap)
|
||||
|
||||
cache.detectUpdates([]device{})
|
||||
|
||||
var update UpdateInfo
|
||||
select {
|
||||
case update = <-ch:
|
||||
default:
|
||||
}
|
||||
|
||||
if tcase.expectedAdded != len(update.Added) {
|
||||
t.Errorf("Test case '%s': expected %d added device types, but got %d", tcase.name, tcase.expectedAdded, len(update.Added))
|
||||
}
|
||||
if tcase.expectedUpdated != len(update.Updated) {
|
||||
t.Errorf("Test case '%s': expected %d updated device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
|
||||
}
|
||||
if tcase.expectedRemoved != len(update.Removed) {
|
||||
t.Errorf("Test case '%s': expected %d removed device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestScanFPGAs(t *testing.T) {
|
||||
tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestDiscoverFPGAs-%d", time.Now().Unix())
|
||||
sysfs := path.Join(tmpdir, "sys", "class", "fpga")
|
||||
devfs := path.Join(tmpdir, "dev")
|
||||
tcases := []struct {
|
||||
name string
|
||||
devfsdirs []string
|
||||
sysfsdirs []string
|
||||
sysfsfiles map[string][]byte
|
||||
errorContains string
|
||||
expectedDevices []device
|
||||
mode string
|
||||
}{
|
||||
{
|
||||
name: "No sysfs folder given",
|
||||
mode: AfMode,
|
||||
errorContains: "Can't read sysfs folder",
|
||||
},
|
||||
{
|
||||
name: "FPGA device without FME and ports",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0", "incorrect-file-name"},
|
||||
errorContains: "No regions found",
|
||||
},
|
||||
{
|
||||
name: "AFU without ID",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
|
||||
errorContains: "afu_id: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "No device node for detected AFU",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
errorContains: "/dev/intel-fpga-port.0: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "AFU without corresponding FME",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
|
||||
devfsdirs: []string{"intel-fpga-port.0"},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
errorContains: "No regions found",
|
||||
},
|
||||
{
|
||||
name: "More than one FME per FPGA device",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
|
||||
"intel-fpga-dev.0/intel-fpga-fme.1/pr",
|
||||
},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
"intel-fpga-dev.0/intel-fpga-fme.1/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
devfsdirs: []string{
|
||||
"intel-fpga-fme.0",
|
||||
"intel-fpga-fme.1",
|
||||
},
|
||||
errorContains: "Detected more than one FPGA region",
|
||||
},
|
||||
{
|
||||
name: "FME without interface ID",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0"},
|
||||
errorContains: "interface_id: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "No device node for detected region",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0/pr"},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
errorContains: "/dev/intel-fpga-fme.0: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "No errors expected in af mode",
|
||||
mode: AfMode,
|
||||
sysfsdirs: []string{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0",
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
|
||||
"intel-fpga-dev.1/intel-fpga-port.1",
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr",
|
||||
},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
"intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
},
|
||||
devfsdirs: []string{
|
||||
"intel-fpga-port.0", "intel-fpga-fme.0",
|
||||
"intel-fpga-port.1", "intel-fpga-fme.1",
|
||||
},
|
||||
expectedDevices: []device{
|
||||
{
|
||||
name: "intel-fpga-dev.0",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.0",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: path.Join(devfs, "intel-fpga-fme.0"),
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.0",
|
||||
afuID: "d8424dc4a4a3c413f89e433683f9040b",
|
||||
devNode: path.Join(devfs, "intel-fpga-port.0"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "intel-fpga-dev.1",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.1",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: path.Join(devfs, "intel-fpga-fme.1"),
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.1",
|
||||
afuID: "d8424dc4a4a3c413f89e433683f9040b",
|
||||
devNode: path.Join(devfs, "intel-fpga-port.1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "No errors expected in region mode",
|
||||
mode: RegionMode,
|
||||
sysfsdirs: []string{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0",
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
|
||||
"intel-fpga-dev.1/intel-fpga-port.1",
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr",
|
||||
},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
},
|
||||
devfsdirs: []string{
|
||||
"intel-fpga-port.0", "intel-fpga-fme.0",
|
||||
"intel-fpga-port.1", "intel-fpga-fme.1",
|
||||
},
|
||||
expectedDevices: []device{
|
||||
{
|
||||
name: "intel-fpga-dev.0",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.0",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: path.Join(devfs, "intel-fpga-fme.0"),
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.0",
|
||||
afuID: "unused_afu_id",
|
||||
devNode: path.Join(devfs, "intel-fpga-port.0"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "intel-fpga-dev.1",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.1",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: path.Join(devfs, "intel-fpga-fme.1"),
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.1",
|
||||
afuID: "unused_afu_id",
|
||||
devNode: path.Join(devfs, "intel-fpga-port.1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tcase := range tcases {
|
||||
err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cache, err := NewCache(sysfs, devfs, tcase.mode, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cache.getDevMap = func(devices []device) map[string]map[string]deviceplugin.DeviceInfo {
|
||||
return make(map[string]map[string]deviceplugin.DeviceInfo)
|
||||
}
|
||||
|
||||
err = cache.scanFPGAs()
|
||||
if tcase.errorContains != "" {
|
||||
if err == nil || !strings.Contains(err.Error(), tcase.errorContains) {
|
||||
t.Errorf("Test case '%s': expected error '%s', but got '%v'", tcase.name, tcase.errorContains, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Errorf("Test case '%s': expected no error, but got '%v'", tcase.name, err)
|
||||
}
|
||||
if tcase.expectedDevices != nil && !reflect.DeepEqual(tcase.expectedDevices, cache.devices) {
|
||||
t.Errorf("Test case '%s': expected devices '%v', but got '%v'", tcase.name, tcase.expectedDevices, cache.devices)
|
||||
}
|
||||
|
||||
err = os.RemoveAll(tmpdir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRun(t *testing.T) {
|
||||
cache := Cache{
|
||||
sysfsDir: "/dev/null",
|
||||
}
|
||||
err := cache.Run()
|
||||
if err == nil {
|
||||
t.Error("Expected error, but got nil")
|
||||
}
|
||||
}
|
@ -17,10 +17,14 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
@ -29,8 +33,7 @@ import (
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
utilnode "k8s.io/kubernetes/pkg/util/node"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/cmd/fpga_plugin/devicecache"
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -38,111 +41,276 @@ const (
|
||||
devfsDirectory = "/dev"
|
||||
|
||||
// Device plugin settings.
|
||||
pluginEndpointPrefix = "intel-fpga"
|
||||
resourceNamePrefix = "intel.com/fpga"
|
||||
annotationName = "com.intel.fpga.mode"
|
||||
namespace = "fpga.intel.com"
|
||||
annotationName = "com.intel.fpga.mode"
|
||||
|
||||
// Scanner's mode of operation
|
||||
afMode = "af"
|
||||
regionMode = "region"
|
||||
regionDevelMode = "regiondevel"
|
||||
|
||||
deviceRE = `^intel-fpga-dev.[0-9]+$`
|
||||
portRE = `^intel-fpga-port.[0-9]+$`
|
||||
fmeRE = `^intel-fpga-fme.[0-9]+$`
|
||||
)
|
||||
|
||||
// deviceManager manages Intel FPGA devices.
|
||||
type deviceManager struct {
|
||||
srv deviceplugin.Server
|
||||
fpgaID string
|
||||
name string
|
||||
mode string
|
||||
ch chan map[string]deviceplugin.DeviceInfo
|
||||
devices map[string]deviceplugin.DeviceInfo
|
||||
}
|
||||
type getDevTreeFunc func(devices []device) dpapi.DeviceTree
|
||||
|
||||
func newDeviceManager(resourceName string, fpgaID string, mode string, ch chan map[string]deviceplugin.DeviceInfo) *deviceManager {
|
||||
return &deviceManager{
|
||||
fpgaID: fpgaID,
|
||||
name: resourceName,
|
||||
mode: mode,
|
||||
ch: ch,
|
||||
devices: make(map[string]deviceplugin.DeviceInfo),
|
||||
}
|
||||
}
|
||||
// getRegionDevelTree returns mapping of region interface IDs to AF ports and FME devices
|
||||
func getRegionDevelTree(devices []device) dpapi.DeviceTree {
|
||||
regionTree := dpapi.NewDeviceTree()
|
||||
|
||||
func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
|
||||
fmt.Println("GetDevicePluginOptions: return empty options")
|
||||
return new(pluginapi.DevicePluginOptions), nil
|
||||
}
|
||||
|
||||
func (dm *deviceManager) sendDevices(stream pluginapi.DevicePlugin_ListAndWatchServer) error {
|
||||
resp := new(pluginapi.ListAndWatchResponse)
|
||||
for id, device := range dm.devices {
|
||||
resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State})
|
||||
}
|
||||
glog.V(2).Info("Sending to kubelet ", resp.Devices)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
dm.srv.Stop()
|
||||
return fmt.Errorf("device-plugin: cannot update device list: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListAndWatch returns a list of devices
|
||||
// Whenever a device state change or a device disappears, ListAndWatch returns the new list
|
||||
func (dm *deviceManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
|
||||
glog.V(2).Info("Started ListAndWatch for ", dm.fpgaID)
|
||||
|
||||
if err := dm.sendDevices(stream); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for dm.devices = range dm.ch {
|
||||
if err := dm.sendDevices(stream); err != nil {
|
||||
return err
|
||||
for _, dev := range devices {
|
||||
for _, region := range dev.regions {
|
||||
devType := fmt.Sprintf("%s-%s", regionMode, region.interfaceID)
|
||||
devNodes := make([]string, len(region.afus)+1)
|
||||
for num, afu := range region.afus {
|
||||
devNodes[num] = afu.devNode
|
||||
}
|
||||
devNodes[len(region.afus)] = region.devNode
|
||||
regionTree.AddDevice(devType, region.id, dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: devNodes,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return regionTree
|
||||
}
|
||||
|
||||
func (dm *deviceManager) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
||||
response, err := deviceplugin.MakeAllocateResponse(rqt, dm.devices)
|
||||
// Set container annotations when programming is allowed
|
||||
if dm.mode == devicecache.RegionMode {
|
||||
for _, containerResponse := range response.GetContainerResponses() {
|
||||
containerResponse.Annotations = map[string]string{
|
||||
annotationName: fmt.Sprintf("%s-%s", resourceNamePrefix, dm.mode),
|
||||
// getRegionTree returns mapping of region interface IDs to AF ports only
|
||||
func getRegionTree(devices []device) dpapi.DeviceTree {
|
||||
regionTree := dpapi.NewDeviceTree()
|
||||
|
||||
for _, dev := range devices {
|
||||
for _, region := range dev.regions {
|
||||
devType := fmt.Sprintf("%s-%s", regionMode, region.interfaceID)
|
||||
devNodes := make([]string, len(region.afus))
|
||||
for num, afu := range region.afus {
|
||||
devNodes[num] = afu.devNode
|
||||
}
|
||||
regionTree.AddDevice(devType, region.id, dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: devNodes,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return regionTree
|
||||
}
|
||||
|
||||
// getAfuTree returns mapping of AFU IDs to AF ports
|
||||
func getAfuTree(devices []device) dpapi.DeviceTree {
|
||||
afuTree := dpapi.NewDeviceTree()
|
||||
|
||||
for _, dev := range devices {
|
||||
for _, region := range dev.regions {
|
||||
for _, afu := range region.afus {
|
||||
devType := fmt.Sprintf("%s-%s", afMode, afu.afuID)
|
||||
afuTree.AddDevice(devType, afu.id, dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{afu.devNode},
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return response, err
|
||||
|
||||
return afuTree
|
||||
}
|
||||
|
||||
func (dm *deviceManager) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
|
||||
glog.Warning("PreStartContainer() should not be called")
|
||||
return new(pluginapi.PreStartContainerResponse), nil
|
||||
type afu struct {
|
||||
id string
|
||||
afuID string
|
||||
devNode string
|
||||
}
|
||||
|
||||
func startDeviceManager(dm *deviceManager, pluginPrefix string) {
|
||||
err := dm.srv.Serve(dm, dm.name, pluginPrefix)
|
||||
type region struct {
|
||||
id string
|
||||
interfaceID string
|
||||
devNode string
|
||||
afus []afu
|
||||
}
|
||||
|
||||
type device struct {
|
||||
name string
|
||||
regions []region
|
||||
}
|
||||
|
||||
type devicePlugin struct {
|
||||
sysfsDir string
|
||||
devfsDir string
|
||||
|
||||
deviceReg *regexp.Regexp
|
||||
portReg *regexp.Regexp
|
||||
fmeReg *regexp.Regexp
|
||||
|
||||
getDevTree getDevTreeFunc
|
||||
ignoreAfuIDs bool
|
||||
annotationValue string
|
||||
}
|
||||
|
||||
// newDevicePlugin returns new instance of devicePlugin
|
||||
func newDevicePlugin(sysfsDir string, devfsDir string, mode string) (*devicePlugin, error) {
|
||||
var getDevTree getDevTreeFunc
|
||||
|
||||
ignoreAfuIDs := false
|
||||
annotationValue := ""
|
||||
switch mode {
|
||||
case afMode:
|
||||
getDevTree = getAfuTree
|
||||
case regionMode:
|
||||
getDevTree = getRegionTree
|
||||
ignoreAfuIDs = true
|
||||
annotationValue = fmt.Sprintf("%s/%s", namespace, regionMode)
|
||||
case regionDevelMode:
|
||||
getDevTree = getRegionDevelTree
|
||||
ignoreAfuIDs = true
|
||||
default:
|
||||
return nil, fmt.Errorf("Wrong mode: '%s'", mode)
|
||||
}
|
||||
|
||||
return &devicePlugin{
|
||||
sysfsDir: sysfsDir,
|
||||
devfsDir: devfsDir,
|
||||
deviceReg: regexp.MustCompile(deviceRE),
|
||||
portReg: regexp.MustCompile(portRE),
|
||||
fmeReg: regexp.MustCompile(fmeRE),
|
||||
getDevTree: getDevTree,
|
||||
ignoreAfuIDs: ignoreAfuIDs,
|
||||
annotationValue: annotationValue,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (dp *devicePlugin) PostAllocate(response *pluginapi.AllocateResponse) error {
|
||||
// Set container annotations when programming is allowed
|
||||
if len(dp.annotationValue) > 0 {
|
||||
for _, containerResponse := range response.GetContainerResponses() {
|
||||
containerResponse.Annotations = map[string]string{
|
||||
annotationName: dp.annotationValue,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Scan starts scanning of FPGA devices on the host
|
||||
func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
|
||||
for {
|
||||
devTree, err := dp.scanFPGAs()
|
||||
if err != nil {
|
||||
glog.Error("Device scan failed: ", err)
|
||||
return fmt.Errorf("Device scan failed: %v", err)
|
||||
}
|
||||
|
||||
notifier.Notify(devTree)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (dp *devicePlugin) getDevNode(devName string) (string, error) {
|
||||
devNode := path.Join(dp.devfsDir, devName)
|
||||
if _, err := os.Stat(devNode); err != nil {
|
||||
return "", fmt.Errorf("Device %s doesn't exist: %v", devNode, err)
|
||||
}
|
||||
|
||||
return devNode, nil
|
||||
}
|
||||
|
||||
func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) {
|
||||
var devices []device
|
||||
|
||||
glog.V(2).Info("Start new FPGA scan")
|
||||
|
||||
fpgaFiles, err := ioutil.ReadDir(dp.sysfsDir)
|
||||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
return nil, fmt.Errorf("Can't read sysfs folder: %v", err)
|
||||
}
|
||||
|
||||
for _, fpgaFile := range fpgaFiles {
|
||||
fname := fpgaFile.Name()
|
||||
|
||||
if !dp.deviceReg.MatchString(fname) {
|
||||
continue
|
||||
}
|
||||
|
||||
deviceFolder := path.Join(dp.sysfsDir, fname)
|
||||
deviceFiles, err := ioutil.ReadDir(deviceFolder)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
regions, afus, err := dp.getSysFsInfo(deviceFolder, deviceFiles, fname)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(regions) == 0 {
|
||||
return nil, fmt.Errorf("No regions found for device %s", fname)
|
||||
}
|
||||
|
||||
// Currently only one region per device is supported.
|
||||
regions[0].afus = afus
|
||||
devices = append(devices, device{
|
||||
name: fname,
|
||||
regions: regions,
|
||||
})
|
||||
}
|
||||
|
||||
return dp.getDevTree(devices), nil
|
||||
}
|
||||
|
||||
func handleUpdate(dms map[string]*deviceManager, updateInfo devicecache.UpdateInfo, start func(*deviceManager, string), mode string) {
|
||||
glog.V(2).Info("Received dev updates: ", updateInfo)
|
||||
for fpgaID, devices := range updateInfo.Added {
|
||||
devCh := make(chan map[string]deviceplugin.DeviceInfo, 1)
|
||||
resourceName := resourceNamePrefix + "-" + fpgaID
|
||||
pPrefix := pluginEndpointPrefix + "-" + fpgaID
|
||||
dms[fpgaID] = newDeviceManager(resourceName, fpgaID, mode, devCh)
|
||||
go start(dms[fpgaID], pPrefix)
|
||||
devCh <- devices
|
||||
}
|
||||
for fpgaID, devices := range updateInfo.Updated {
|
||||
dms[fpgaID].ch <- devices
|
||||
}
|
||||
for fpgaID := range updateInfo.Removed {
|
||||
dms[fpgaID].srv.Stop()
|
||||
close(dms[fpgaID].ch)
|
||||
delete(dms, fpgaID)
|
||||
func (dp *devicePlugin) getSysFsInfo(deviceFolder string, deviceFiles []os.FileInfo, fname string) ([]region, []afu, error) {
|
||||
var regions []region
|
||||
var afus []afu
|
||||
for _, deviceFile := range deviceFiles {
|
||||
name := deviceFile.Name()
|
||||
|
||||
if dp.fmeReg.MatchString(name) {
|
||||
if len(regions) > 0 {
|
||||
return nil, nil, fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname)
|
||||
}
|
||||
interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id")
|
||||
data, err := ioutil.ReadFile(interfaceIDFile)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
devNode, err := dp.getDevNode(name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
regions = append(regions, region{
|
||||
id: name,
|
||||
interfaceID: strings.TrimSpace(string(data)),
|
||||
devNode: devNode,
|
||||
})
|
||||
} else if dp.portReg.MatchString(name) {
|
||||
var afuID string
|
||||
|
||||
if dp.ignoreAfuIDs {
|
||||
afuID = "unused_afu_id"
|
||||
} else {
|
||||
afuFile := path.Join(deviceFolder, name, "afu_id")
|
||||
data, err := ioutil.ReadFile(afuFile)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
afuID = strings.TrimSpace(string(data))
|
||||
}
|
||||
devNode, err := dp.getDevNode(name)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
afus = append(afus, afu{
|
||||
id: name,
|
||||
afuID: afuID,
|
||||
devNode: devNode,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return regions, afus, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
@ -154,8 +322,8 @@ func main() {
|
||||
|
||||
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
|
||||
flag.StringVar(&master, "master", "", "master url")
|
||||
flag.StringVar(&mode, "mode", string(devicecache.AfMode),
|
||||
fmt.Sprintf("device plugin mode: '%s' (default), '%s' or '%s'", devicecache.AfMode, devicecache.RegionMode, devicecache.RegionDevelMode))
|
||||
flag.StringVar(&mode, "mode", string(afMode),
|
||||
fmt.Sprintf("device plugin mode: '%s' (default), '%s' or '%s'", afMode, regionMode, regionDevelMode))
|
||||
flag.Parse()
|
||||
|
||||
if kubeconfig == "" {
|
||||
@ -185,9 +353,7 @@ func main() {
|
||||
mode = nodeMode
|
||||
}
|
||||
|
||||
updatesCh := make(chan devicecache.UpdateInfo)
|
||||
|
||||
cache, err := devicecache.NewCache(sysfsDirectory, devfsDirectory, mode, updatesCh)
|
||||
plugin, err := newDevicePlugin(sysfsDirectory, devfsDirectory, mode)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
os.Exit(1)
|
||||
@ -195,15 +361,6 @@ func main() {
|
||||
|
||||
glog.Info("FPGA device plugin started in ", mode, " mode")
|
||||
|
||||
go func() {
|
||||
err := cache.Run()
|
||||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
}
|
||||
}()
|
||||
|
||||
dms := make(map[string]*deviceManager)
|
||||
for updateInfo := range updatesCh {
|
||||
handleUpdate(dms, updateInfo, startDeviceManager, mode)
|
||||
}
|
||||
manager := dpapi.NewManager(namespace, plugin)
|
||||
manager.Run()
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Copyright 2017 Intel Corporation. All Rights Reserved.
|
||||
// Copyright 2018 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
@ -16,252 +16,336 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"time"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/cmd/fpga_plugin/devicecache"
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
)
|
||||
|
||||
// Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer
|
||||
type listAndWatchServerStub struct {
|
||||
testDM *deviceManager
|
||||
generateErr int
|
||||
sendCounter int
|
||||
cdata chan []*pluginapi.Device
|
||||
}
|
||||
func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFiles map[string][]byte) error {
|
||||
var err error
|
||||
|
||||
func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error {
|
||||
s.sendCounter = s.sendCounter + 1
|
||||
if s.generateErr == s.sendCounter {
|
||||
fmt.Println("listAndWatchServerStub::Send returns error")
|
||||
return fmt.Errorf("Fake error")
|
||||
for _, devfsdir := range devfsDirs {
|
||||
err = os.MkdirAll(path.Join(devfs, devfsdir), 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create fake device directory: %v", err)
|
||||
}
|
||||
}
|
||||
for _, sysfsdir := range sysfsDirs {
|
||||
err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create fake device directory: %v", err)
|
||||
}
|
||||
}
|
||||
for filename, body := range sysfsFiles {
|
||||
err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create fake vendor file: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("listAndWatchServerStub::Send", resp.Devices)
|
||||
s.cdata <- resp.Devices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) Context() context.Context {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) RecvMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SendMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SendHeader(m metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SetHeader(m metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SetTrailer(m metadata.MD) {
|
||||
}
|
||||
|
||||
func TestGetDevicePluginOptions(t *testing.T) {
|
||||
dm := &deviceManager{}
|
||||
dm.GetDevicePluginOptions(nil, nil)
|
||||
}
|
||||
|
||||
func TestPreStartContainer(t *testing.T) {
|
||||
dm := &deviceManager{}
|
||||
dm.PreStartContainer(nil, nil)
|
||||
}
|
||||
|
||||
func TestListAndWatch(t *testing.T) {
|
||||
func TestNewDevicePlugin(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
updates []map[string]deviceplugin.DeviceInfo
|
||||
errorOnCall int
|
||||
mode string
|
||||
expectedErr bool
|
||||
}{
|
||||
{
|
||||
name: "No updates and close",
|
||||
mode: afMode,
|
||||
},
|
||||
{
|
||||
name: "No updates and close, but expect streaming error",
|
||||
errorOnCall: 1,
|
||||
mode: regionMode,
|
||||
},
|
||||
{
|
||||
name: "Send 1 update",
|
||||
updates: []map[string]deviceplugin.DeviceInfo{
|
||||
{
|
||||
"fake_id": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
mode: regionDevelMode,
|
||||
},
|
||||
{
|
||||
name: "Send 1 update, but expect streaming error",
|
||||
updates: []map[string]deviceplugin.DeviceInfo{
|
||||
{
|
||||
"fake_id": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
errorOnCall: 2,
|
||||
mode: "unparsable",
|
||||
expectedErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tcases {
|
||||
devCh := make(chan map[string]deviceplugin.DeviceInfo, len(tt.updates))
|
||||
testDM := newDeviceManager("intel.com/fpgatest-fpgaID", "fpgaID", devicecache.AfMode, devCh)
|
||||
|
||||
server := &listAndWatchServerStub{
|
||||
testDM: testDM,
|
||||
generateErr: tt.errorOnCall,
|
||||
cdata: make(chan []*pluginapi.Device, len(tt.updates)+1),
|
||||
}
|
||||
|
||||
// push device infos to DM's channel
|
||||
for _, update := range tt.updates {
|
||||
devCh <- update
|
||||
}
|
||||
close(devCh)
|
||||
|
||||
err := testDM.ListAndWatch(&pluginapi.Empty{}, server)
|
||||
if err != nil && tt.errorOnCall == 0 {
|
||||
t.Errorf("Test case '%s': got unexpected error %v", tt.name, err)
|
||||
}
|
||||
if err == nil && tt.errorOnCall != 0 {
|
||||
t.Errorf("Test case '%s': expected an error, but got nothing", tt.name)
|
||||
for _, tcase := range tcases {
|
||||
_, err := newDevicePlugin("", "", tcase.mode)
|
||||
if tcase.expectedErr && err == nil {
|
||||
t.Error("No error generated when creating Cache with invalid parameters")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate(t *testing.T) {
|
||||
testDM := newDeviceManager("", "", devicecache.RegionMode, nil)
|
||||
if testDM == nil {
|
||||
t.Fatal("Failed to create a deviceManager")
|
||||
// getDevices returns static list of device structs for testing purposes
|
||||
func getDevices() []device {
|
||||
return []device{
|
||||
{
|
||||
name: "intel-fpga-dev.0",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.0",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: "/dev/intel-fpga-fme.0",
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.0",
|
||||
afuID: "d8424dc4a4a3c413f89e433683f9040b",
|
||||
devNode: "/dev/intel-fpga-port.0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "intel-fpga-dev.1",
|
||||
regions: []region{
|
||||
{
|
||||
id: "intel-fpga-fme.1",
|
||||
interfaceID: "ce48969398f05f33946d560708be108a",
|
||||
devNode: "/dev/intel-fpga-fme.1",
|
||||
afus: []afu{
|
||||
{
|
||||
id: "intel-fpga-port.1",
|
||||
afuID: "d8424dc4a4a3c413f89e433683f9040b",
|
||||
devNode: "/dev/intel-fpga-port.1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
rqt := &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1"},
|
||||
func TestGetRegionDevelTree(t *testing.T) {
|
||||
expected := dpapi.NewDeviceTree()
|
||||
expected.AddDevice(regionMode+"-ce48969398f05f33946d560708be108a", "intel-fpga-fme.0", dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"},
|
||||
})
|
||||
expected.AddDevice(regionMode+"-ce48969398f05f33946d560708be108a", "intel-fpga-fme.1", dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
|
||||
})
|
||||
|
||||
result := getRegionDevelTree(getDevices())
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Error("Got unexpected result: ", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetRegionTree(t *testing.T) {
|
||||
expected := dpapi.NewDeviceTree()
|
||||
expected.AddDevice(regionMode+"-ce48969398f05f33946d560708be108a", "intel-fpga-fme.0", dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
})
|
||||
expected.AddDevice(regionMode+"-ce48969398f05f33946d560708be108a", "intel-fpga-fme.1", dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1"},
|
||||
})
|
||||
|
||||
result := getRegionTree(getDevices())
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Error("Got unexpected result: ", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetAfuTree(t *testing.T) {
|
||||
expected := dpapi.NewDeviceTree()
|
||||
expected.AddDevice(afMode+"-d8424dc4a4a3c413f89e433683f9040b", "intel-fpga-port.0", dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
})
|
||||
expected.AddDevice(afMode+"-d8424dc4a4a3c413f89e433683f9040b", "intel-fpga-port.1", dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1"},
|
||||
})
|
||||
|
||||
result := getAfuTree(getDevices())
|
||||
if !reflect.DeepEqual(result, expected) {
|
||||
t.Error("Got unexpected result: ", result)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScanFPGAs(t *testing.T) {
|
||||
tmpdir := fmt.Sprintf("/tmp/fpgaplugin-TestDiscoverFPGAs-%d", time.Now().Unix())
|
||||
sysfs := path.Join(tmpdir, "sys", "class", "fpga")
|
||||
devfs := path.Join(tmpdir, "dev")
|
||||
tcases := []struct {
|
||||
name string
|
||||
devfsdirs []string
|
||||
sysfsdirs []string
|
||||
sysfsfiles map[string][]byte
|
||||
errorContains string
|
||||
expectedDevTree map[string]map[string]dpapi.DeviceInfo
|
||||
mode string
|
||||
}{
|
||||
{
|
||||
name: "No sysfs folder given",
|
||||
mode: afMode,
|
||||
errorContains: "Can't read sysfs folder",
|
||||
},
|
||||
{
|
||||
name: "FPGA device without FME and ports",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0", "incorrect-file-name"},
|
||||
errorContains: "No regions found",
|
||||
},
|
||||
{
|
||||
name: "AFU without ID",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
|
||||
errorContains: "afu_id: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "No device node for detected AFU",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
errorContains: "/dev/intel-fpga-port.0: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "AFU without corresponding FME",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-port.0"},
|
||||
devfsdirs: []string{"intel-fpga-port.0"},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
errorContains: "No regions found",
|
||||
},
|
||||
{
|
||||
name: "More than one FME per FPGA device",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
|
||||
"intel-fpga-dev.0/intel-fpga-fme.1/pr",
|
||||
},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
"intel-fpga-dev.0/intel-fpga-fme.1/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
devfsdirs: []string{
|
||||
"intel-fpga-fme.0",
|
||||
"intel-fpga-fme.1",
|
||||
},
|
||||
errorContains: "Detected more than one FPGA region",
|
||||
},
|
||||
{
|
||||
name: "FME without interface ID",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0"},
|
||||
errorContains: "interface_id: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "No device node for detected region",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{"intel-fpga-dev.0/intel-fpga-fme.0/pr"},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
},
|
||||
errorContains: "/dev/intel-fpga-fme.0: no such file or directory",
|
||||
},
|
||||
{
|
||||
name: "No errors expected in af mode",
|
||||
mode: afMode,
|
||||
sysfsdirs: []string{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0",
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
|
||||
"intel-fpga-dev.1/intel-fpga-port.1",
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr",
|
||||
},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
"intel-fpga-dev.1/intel-fpga-port.1/afu_id": []byte("d8424dc4a4a3c413f89e433683f9040b\n"),
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
},
|
||||
devfsdirs: []string{
|
||||
"intel-fpga-port.0", "intel-fpga-fme.0",
|
||||
"intel-fpga-port.1", "intel-fpga-fme.1",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "No errors expected in region mode",
|
||||
mode: regionMode,
|
||||
sysfsdirs: []string{
|
||||
"intel-fpga-dev.0/intel-fpga-port.0",
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr",
|
||||
"intel-fpga-dev.1/intel-fpga-port.1",
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr",
|
||||
},
|
||||
sysfsfiles: map[string][]byte{
|
||||
"intel-fpga-dev.0/intel-fpga-fme.0/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
"intel-fpga-dev.1/intel-fpga-fme.1/pr/interface_id": []byte("ce48969398f05f33946d560708be108a\n"),
|
||||
},
|
||||
devfsdirs: []string{
|
||||
"intel-fpga-port.0", "intel-fpga-fme.0",
|
||||
"intel-fpga-port.1", "intel-fpga-fme.1",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testDM.devices["dev1"] = deviceplugin.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
}
|
||||
resp, err := testDM.Allocate(nil, rqt)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to allocate healthy device: %v", err)
|
||||
}
|
||||
for _, tcase := range tcases {
|
||||
err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(resp.ContainerResponses[0].Devices) != 1 {
|
||||
t.Fatal("Allocated wrong number of devices")
|
||||
}
|
||||
plugin, err := newDevicePlugin(sysfs, devfs, tcase.mode)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
plugin.getDevTree = func(devices []device) dpapi.DeviceTree {
|
||||
return dpapi.NewDeviceTree()
|
||||
}
|
||||
|
||||
if len(resp.ContainerResponses[0].Annotations) != 1 {
|
||||
_, err = plugin.scanFPGAs()
|
||||
if tcase.errorContains != "" {
|
||||
if err == nil || !strings.Contains(err.Error(), tcase.errorContains) {
|
||||
t.Errorf("Test case '%s': expected error '%s', but got '%v'", tcase.name, tcase.errorContains, err)
|
||||
}
|
||||
} else if err != nil {
|
||||
t.Errorf("Test case '%s': expected no error, but got '%v'", tcase.name, err)
|
||||
}
|
||||
|
||||
err = os.RemoveAll(tmpdir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPostAllocate(t *testing.T) {
|
||||
response := new(pluginapi.AllocateResponse)
|
||||
cresp := new(pluginapi.ContainerAllocateResponse)
|
||||
response.ContainerResponses = append(response.ContainerResponses, cresp)
|
||||
|
||||
testValue := "some value"
|
||||
|
||||
dp := &devicePlugin{
|
||||
annotationValue: testValue,
|
||||
}
|
||||
dp.PostAllocate(response)
|
||||
|
||||
if len(response.ContainerResponses[0].Annotations) != 1 {
|
||||
t.Fatal("Set wrong number of annotations")
|
||||
}
|
||||
|
||||
annotation, ok := resp.ContainerResponses[0].Annotations[annotationName]
|
||||
annotation, ok := response.ContainerResponses[0].Annotations[annotationName]
|
||||
if ok == false {
|
||||
t.Fatalf("%s annotation is not set", annotationName)
|
||||
}
|
||||
|
||||
expectedAnnotationValue := fmt.Sprintf("%s-%s", resourceNamePrefix, devicecache.RegionMode)
|
||||
if annotation != expectedAnnotationValue {
|
||||
t.Fatalf("Set wrong %s annotation value %s, should be %s", resourceNamePrefix, annotation, expectedAnnotationValue)
|
||||
}
|
||||
}
|
||||
|
||||
func startDeviceManagerStub(dm *deviceManager, pluginPrefix string) {
|
||||
}
|
||||
|
||||
func TestHandleUpdate(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
dms map[string]*deviceManager
|
||||
updateInfo devicecache.UpdateInfo
|
||||
expectedDMs int
|
||||
}{
|
||||
{
|
||||
name: "Empty update",
|
||||
updateInfo: devicecache.UpdateInfo{},
|
||||
expectedDMs: 0,
|
||||
},
|
||||
{
|
||||
name: "Add device manager",
|
||||
updateInfo: devicecache.UpdateInfo{
|
||||
Added: map[string]map[string]deviceplugin.DeviceInfo{
|
||||
"ce48969398f05f33946d560708be108a": {
|
||||
"intel-fpga-fme.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"},
|
||||
},
|
||||
"intel-fpga-fme.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedDMs: 1,
|
||||
},
|
||||
{
|
||||
name: "Update existing device manager",
|
||||
dms: map[string]*deviceManager{
|
||||
"ce48969398f05f33946d560708be108a": {
|
||||
ch: make(chan map[string]deviceplugin.DeviceInfo, 1),
|
||||
},
|
||||
},
|
||||
updateInfo: devicecache.UpdateInfo{
|
||||
Updated: map[string]map[string]deviceplugin.DeviceInfo{
|
||||
"ce48969398f05f33946d560708be108a": {
|
||||
"intel-fpga-fme.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedDMs: 1,
|
||||
},
|
||||
{
|
||||
name: "Remove device manager",
|
||||
dms: map[string]*deviceManager{
|
||||
"ce48969398f05f33946d560708be108a": {
|
||||
ch: make(chan map[string]deviceplugin.DeviceInfo, 1),
|
||||
},
|
||||
},
|
||||
updateInfo: devicecache.UpdateInfo{
|
||||
Removed: map[string]map[string]deviceplugin.DeviceInfo{
|
||||
"ce48969398f05f33946d560708be108a": {},
|
||||
},
|
||||
},
|
||||
expectedDMs: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tcases {
|
||||
if tt.dms == nil {
|
||||
tt.dms = make(map[string]*deviceManager)
|
||||
}
|
||||
handleUpdate(tt.dms, tt.updateInfo, startDeviceManagerStub, devicecache.AfMode)
|
||||
if len(tt.dms) != tt.expectedDMs {
|
||||
t.Errorf("Test case '%s': expected %d runnig device managers, but got %d",
|
||||
tt.name, tt.expectedDMs, len(tt.dms))
|
||||
}
|
||||
if annotation != testValue {
|
||||
t.Fatalf("Set wrong annotation %s", annotation)
|
||||
}
|
||||
}
|
||||
|
@ -25,11 +25,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -40,133 +39,98 @@ const (
|
||||
vendorString = "0x8086"
|
||||
|
||||
// Device plugin settings.
|
||||
pluginEndpointPrefix = "intelGPU"
|
||||
resourceName = "intel.com/gpu"
|
||||
namespace = "gpu.intel.com"
|
||||
deviceType = "i915"
|
||||
)
|
||||
|
||||
// deviceManager manages Intel gpu devices.
|
||||
type deviceManager struct {
|
||||
srv deviceplugin.Server
|
||||
devices map[string]deviceplugin.DeviceInfo
|
||||
type devicePlugin struct {
|
||||
sysfsDir string
|
||||
devfsDir string
|
||||
|
||||
gpuDeviceReg *regexp.Regexp
|
||||
controlDeviceReg *regexp.Regexp
|
||||
}
|
||||
|
||||
func newDeviceManager() *deviceManager {
|
||||
return &deviceManager{
|
||||
devices: make(map[string]deviceplugin.DeviceInfo),
|
||||
func newDevicePlugin(sysfsDir string, devfsDir string) *devicePlugin {
|
||||
return &devicePlugin{
|
||||
sysfsDir: sysfsDir,
|
||||
devfsDir: devfsDir,
|
||||
gpuDeviceReg: regexp.MustCompile(gpuDeviceRE),
|
||||
controlDeviceReg: regexp.MustCompile(controlDeviceRE),
|
||||
}
|
||||
}
|
||||
|
||||
// Discovers all GPU devices available on the local node by walking `/sys/class/drm` directory.
|
||||
func (dm *deviceManager) discoverGPUs(sysfsDrmDir string, devfsDriDir string) error {
|
||||
reg := regexp.MustCompile(gpuDeviceRE)
|
||||
ctlReg := regexp.MustCompile(controlDeviceRE)
|
||||
files, err := ioutil.ReadDir(sysfsDrmDir)
|
||||
func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
|
||||
for {
|
||||
devTree, err := dp.scan()
|
||||
if err != nil {
|
||||
glog.Error("Device scan failed: ", err)
|
||||
return fmt.Errorf("Device scan failed: %v", err)
|
||||
}
|
||||
|
||||
notifier.Notify(devTree)
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
|
||||
files, err := ioutil.ReadDir(dp.sysfsDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't read sysfs folder: %v", err)
|
||||
return nil, fmt.Errorf("Can't read sysfs folder: %v", err)
|
||||
}
|
||||
|
||||
devTree := dpapi.NewDeviceTree()
|
||||
for _, f := range files {
|
||||
if reg.MatchString(f.Name()) {
|
||||
dat, err := ioutil.ReadFile(path.Join(sysfsDrmDir, f.Name(), "device/vendor"))
|
||||
if dp.gpuDeviceReg.MatchString(f.Name()) {
|
||||
dat, err := ioutil.ReadFile(path.Join(dp.sysfsDir, f.Name(), "device/vendor"))
|
||||
if err != nil {
|
||||
fmt.Println("Oops can't read vendor file")
|
||||
glog.Warning("Skipping. Can't read vendor file: ", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.TrimSpace(string(dat)) == vendorString {
|
||||
var nodes []string
|
||||
|
||||
drmFiles, err := ioutil.ReadDir(path.Join(sysfsDrmDir, f.Name(), "device/drm"))
|
||||
drmFiles, err := ioutil.ReadDir(path.Join(dp.sysfsDir, f.Name(), "device/drm"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Can't read device folder: %v", err)
|
||||
return nil, fmt.Errorf("Can't read device folder: %v", err)
|
||||
}
|
||||
|
||||
for _, drmFile := range drmFiles {
|
||||
if ctlReg.MatchString(drmFile.Name()) {
|
||||
if dp.controlDeviceReg.MatchString(drmFile.Name()) {
|
||||
//Skipping possible drm control node
|
||||
continue
|
||||
}
|
||||
devPath := path.Join(devfsDriDir, drmFile.Name())
|
||||
devPath := path.Join(dp.devfsDir, drmFile.Name())
|
||||
if _, err := os.Stat(devPath); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Printf("Adding '%s' to GPU '%s'\n", devPath, f.Name())
|
||||
glog.V(2).Info("Adding ", devPath, " to GPU ", f.Name())
|
||||
nodes = append(nodes, devPath)
|
||||
}
|
||||
|
||||
if len(nodes) > 0 {
|
||||
dm.devices[f.Name()] = deviceplugin.DeviceInfo{
|
||||
// Currently only one device type (i915) is supported.
|
||||
// TODO: check model ID to differentiate device models.
|
||||
devTree.AddDevice(deviceType, f.Name(), dpapi.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: nodes,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dm *deviceManager) getDeviceState(DeviceName string) string {
|
||||
// TODO: calling tools to figure out actual device state
|
||||
return pluginapi.Healthy
|
||||
}
|
||||
|
||||
// Implements DevicePlugin service functions
|
||||
func (dm *deviceManager) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
|
||||
fmt.Println("GetDevicePluginOptions: return empty options")
|
||||
return new(pluginapi.DevicePluginOptions), nil
|
||||
}
|
||||
|
||||
func (dm *deviceManager) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
|
||||
fmt.Println("device-plugin: ListAndWatch start")
|
||||
changed := true
|
||||
for {
|
||||
for id, dev := range dm.devices {
|
||||
state := dm.getDeviceState(id)
|
||||
if dev.State != state {
|
||||
changed = true
|
||||
dev.State = state
|
||||
dm.devices[id] = dev
|
||||
}
|
||||
}
|
||||
if changed {
|
||||
resp := new(pluginapi.ListAndWatchResponse)
|
||||
for id, dev := range dm.devices {
|
||||
resp.Devices = append(resp.Devices, &pluginapi.Device{id, dev.State})
|
||||
}
|
||||
fmt.Printf("ListAndWatch: send devices %v\n", resp)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
dm.srv.Stop()
|
||||
return fmt.Errorf("device-plugin: cannot update device states: %v", err)
|
||||
}
|
||||
}
|
||||
changed = false
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *deviceManager) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
||||
return deviceplugin.MakeAllocateResponse(rqt, dm.devices)
|
||||
}
|
||||
|
||||
func (dm *deviceManager) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
|
||||
glog.Warning("PreStartContainer() should not be called")
|
||||
return new(pluginapi.PreStartContainerResponse), nil
|
||||
return devTree, nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
fmt.Println("GPU device plugin started")
|
||||
dm := newDeviceManager()
|
||||
glog.Info("GPU device plugin started")
|
||||
|
||||
err := dm.discoverGPUs(sysfsDrmDirectory, devfsDriDirectory)
|
||||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
}
|
||||
|
||||
err = dm.srv.Serve(dm, resourceName, pluginEndpointPrefix)
|
||||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
}
|
||||
plugin := newDevicePlugin(sysfsDrmDirectory, devfsDriDirectory)
|
||||
manager := dpapi.NewManager(namespace, plugin)
|
||||
manager.Run()
|
||||
}
|
||||
|
@ -15,121 +15,15 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
|
||||
)
|
||||
|
||||
// Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer
|
||||
type listAndWatchServerStub struct {
|
||||
counter int
|
||||
testDM *deviceManager
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error {
|
||||
if s.counter > 0 {
|
||||
return errors.New("Fake error when sending response")
|
||||
}
|
||||
|
||||
if len(resp.Devices) != 1 {
|
||||
s.t.Error("Wrong number of sent device infos")
|
||||
}
|
||||
|
||||
s.testDM.devices["dev1"] = deviceplugin.DeviceInfo{
|
||||
State: pluginapi.Unhealthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
}
|
||||
s.counter++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) Context() context.Context {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) RecvMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SendMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SendHeader(m metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SetHeader(m metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SetTrailer(m metadata.MD) {
|
||||
}
|
||||
|
||||
func TestListAndWatch(t *testing.T) {
|
||||
testDM := newDeviceManager()
|
||||
|
||||
if testDM == nil {
|
||||
t.Fatal("Failed to create a deviceManager")
|
||||
}
|
||||
|
||||
testDM.devices["dev1"] = deviceplugin.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
}
|
||||
|
||||
server := &listAndWatchServerStub{
|
||||
testDM: testDM,
|
||||
t: t,
|
||||
}
|
||||
|
||||
testDM.ListAndWatch(&pluginapi.Empty{}, server)
|
||||
}
|
||||
|
||||
func TestAllocate(t *testing.T) {
|
||||
testDM := newDeviceManager()
|
||||
|
||||
if testDM == nil {
|
||||
t.Fatal("Failed to create a deviceManager")
|
||||
}
|
||||
|
||||
rqt := &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testDM.devices["dev1"] = deviceplugin.DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
}
|
||||
resp, err := testDM.Allocate(nil, rqt)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to allocate healthy device: %+v", err)
|
||||
}
|
||||
|
||||
if len(resp.ContainerResponses[0].Devices) != 1 {
|
||||
t.Fatal("Allocated wrong number of devices")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDiscoverGPUs(t *testing.T) {
|
||||
var err error
|
||||
|
||||
func TestScan(t *testing.T) {
|
||||
tmpdir := fmt.Sprintf("/tmp/gpuplugin-test-%d", time.Now().Unix())
|
||||
sysfs := path.Join(tmpdir, "sysfs")
|
||||
devfs := path.Join(tmpdir, "devfs")
|
||||
@ -181,37 +75,37 @@ func TestDiscoverGPUs(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
testDM := newDeviceManager()
|
||||
testPlugin := newDevicePlugin(sysfs, devfs)
|
||||
|
||||
if testDM == nil {
|
||||
if testPlugin == nil {
|
||||
t.Fatal("Failed to create a deviceManager")
|
||||
}
|
||||
|
||||
for _, tcase := range tcases {
|
||||
for _, devfsdir := range tcase.devfsdirs {
|
||||
err = os.MkdirAll(path.Join(devfs, devfsdir), 0755)
|
||||
err := os.MkdirAll(path.Join(devfs, devfsdir), 0755)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create fake device directory: %+v", err)
|
||||
}
|
||||
}
|
||||
for _, sysfsdir := range tcase.sysfsdirs {
|
||||
err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755)
|
||||
err := os.MkdirAll(path.Join(sysfs, sysfsdir), 0755)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create fake device directory: %+v", err)
|
||||
}
|
||||
}
|
||||
for filename, body := range tcase.sysfsfiles {
|
||||
err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644)
|
||||
err := ioutil.WriteFile(path.Join(sysfs, filename), body, 0644)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create fake vendor file: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = testDM.discoverGPUs(sysfs, devfs)
|
||||
tree, err := testPlugin.scan()
|
||||
if tcase.expectedErr && err == nil {
|
||||
t.Error("Expected error hasn't been triggered")
|
||||
}
|
||||
if tcase.expectedDevs != len(testDM.devices) {
|
||||
if tcase.expectedDevs != len(tree[deviceType]) {
|
||||
t.Errorf("Wrong number of discovered devices")
|
||||
}
|
||||
|
||||
|
66
internal/deviceplugin/api.go
Normal file
66
internal/deviceplugin/api.go
Normal file
@ -0,0 +1,66 @@
|
||||
// Copyright 2018 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deviceplugin
|
||||
|
||||
import (
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
)
|
||||
|
||||
// DeviceInfo contains information about device maintained by Device Plugin
|
||||
type DeviceInfo struct {
|
||||
State string
|
||||
Nodes []string
|
||||
Mounts []string
|
||||
Envs map[string]string
|
||||
}
|
||||
|
||||
// DeviceTree contains a tree-like structure of device type -> device ID -> device info.
|
||||
type DeviceTree map[string]map[string]DeviceInfo
|
||||
|
||||
// NewDeviceTree creates an instance of DeviceTree
|
||||
func NewDeviceTree() DeviceTree {
|
||||
return make(map[string]map[string]DeviceInfo)
|
||||
}
|
||||
|
||||
// AddDevice adds device info to DeviceTree.
|
||||
func (tree DeviceTree) AddDevice(devType, id string, info DeviceInfo) {
|
||||
if _, present := tree[devType]; !present {
|
||||
tree[devType] = make(map[string]DeviceInfo)
|
||||
}
|
||||
tree[devType][id] = info
|
||||
}
|
||||
|
||||
// Notifier receives updates from Scanner, detects changes and sends the
|
||||
// detected changes to a channel given by the creator of a Notifier object.
|
||||
type Notifier interface {
|
||||
// Notify notifies manager with a device tree constructed by device
|
||||
// plugin during scanning process.
|
||||
Notify(DeviceTree)
|
||||
}
|
||||
|
||||
// Scanner serves as an interface between Manager and a device plugin.
|
||||
type Scanner interface {
|
||||
// Scan scans the host for devices and sends all found devices to
|
||||
// a Notifier instance. It's called only once for every device plugin by
|
||||
// Manager in a goroutine and operates in an endless loop.
|
||||
Scan(Notifier) error
|
||||
}
|
||||
|
||||
// PostAllocator is an optional interface implemented by device plugins.
|
||||
type PostAllocator interface {
|
||||
// PostAllocate modifies responses returned by Allocate() by e.g.
|
||||
// adding annotations consumed by CRI hooks to the responses.
|
||||
PostAllocate(*pluginapi.AllocateResponse) error
|
||||
}
|
@ -1,264 +0,0 @@
|
||||
// Copyright 2017 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deviceplugin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
)
|
||||
|
||||
const (
|
||||
devicePluginPath = "/tmp/"
|
||||
kubeletSocket = devicePluginPath + "kubelet-test.sock"
|
||||
pluginEndpoint = "plugin-test.sock"
|
||||
resourceName = "intel.com/testdev"
|
||||
)
|
||||
|
||||
type kubeletStub struct {
|
||||
sync.Mutex
|
||||
socket string
|
||||
pluginEndpoint string
|
||||
server *grpc.Server
|
||||
}
|
||||
|
||||
// newKubeletStub returns an initialized kubeletStub for testing purpose.
|
||||
func newKubeletStub(socket string) *kubeletStub {
|
||||
return &kubeletStub{
|
||||
socket: socket,
|
||||
}
|
||||
}
|
||||
|
||||
// Minimal implementation of deviceplugin.RegistrationServer interface
|
||||
|
||||
func (k *kubeletStub) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
k.pluginEndpoint = r.Endpoint
|
||||
return &pluginapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (k *kubeletStub) start() error {
|
||||
os.Remove(k.socket)
|
||||
s, err := net.Listen("unix", k.socket)
|
||||
if err != nil {
|
||||
fmt.Printf("Can't listen at the socket: %+v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
k.server = grpc.NewServer()
|
||||
|
||||
pluginapi.RegisterRegistrationServer(k.server, k)
|
||||
go k.server.Serve(s)
|
||||
|
||||
// Wait till the grpcServer is ready to serve services.
|
||||
waitForServer(k.socket, 10*time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type pluginStub struct {
|
||||
}
|
||||
|
||||
// Minimal implementation of pluginapi.DevicePluginServer
|
||||
|
||||
func (ps *pluginStub) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
|
||||
return new(pluginapi.DevicePluginOptions), nil
|
||||
}
|
||||
|
||||
func (ps *pluginStub) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *pluginStub) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
||||
fmt.Println("Called fake Allocate")
|
||||
resp := new(pluginapi.AllocateResponse)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (ps *pluginStub) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
|
||||
return new(pluginapi.PreStartContainerResponse), nil
|
||||
}
|
||||
|
||||
func TestRegisterWithKublet(t *testing.T) {
|
||||
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
|
||||
|
||||
err := registerWithKubelet(kubeletSocket, pluginSocket, resourceName)
|
||||
if err == nil {
|
||||
t.Error("No error triggered when kubelet is not accessible")
|
||||
}
|
||||
|
||||
kubelet := newKubeletStub(kubeletSocket)
|
||||
kubelet.start()
|
||||
defer kubelet.server.Stop()
|
||||
|
||||
err = registerWithKubelet(kubeletSocket, pluginSocket, resourceName)
|
||||
if err != nil {
|
||||
t.Errorf("Can't register device plugin: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetupAndServe(t *testing.T) {
|
||||
var pluginSocket string
|
||||
var pEndpoint string
|
||||
var srv Server
|
||||
|
||||
kubelet := newKubeletStub(kubeletSocket)
|
||||
kubelet.start()
|
||||
defer kubelet.server.Stop()
|
||||
|
||||
testPlugin := &pluginStub{}
|
||||
defer srv.Stop()
|
||||
|
||||
go srv.setupAndServe(testPlugin, resourceName, "testPlugin", devicePluginPath, kubeletSocket)
|
||||
|
||||
// Wait till the grpcServer is ready to serve services.
|
||||
for {
|
||||
kubelet.Lock()
|
||||
pEndpoint = kubelet.pluginEndpoint
|
||||
kubelet.Unlock()
|
||||
pluginSocket = path.Join(devicePluginPath, pEndpoint)
|
||||
if pEndpoint != "" {
|
||||
if _, err := os.Stat(pluginSocket); err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
err := srv.setupAndServe(testPlugin, resourceName, "testPlugin", devicePluginPath, kubeletSocket)
|
||||
if err == nil {
|
||||
t.Fatalf("Server was able to start on occupied socket %s: %+v", pluginSocket, err)
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(pluginSocket, grpc.WithInsecure(),
|
||||
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("unix", addr, timeout)
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get connection: %+v", err)
|
||||
}
|
||||
|
||||
client := pluginapi.NewDevicePluginClient(conn)
|
||||
_, err = client.Allocate(context.Background(), &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1", "dev2"},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to allocate device dev1: %+v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Check if plugins re-registers after its socket has been removed
|
||||
kubelet.Lock()
|
||||
pEndpoint = kubelet.pluginEndpoint
|
||||
kubelet.Unlock()
|
||||
if pEndpoint == "" {
|
||||
t.Fatal("After successful Allocate() pluginEndpoint is empty")
|
||||
}
|
||||
os.Remove(path.Join(devicePluginPath, pEndpoint))
|
||||
for {
|
||||
kubelet.Lock()
|
||||
pEndpoint = kubelet.pluginEndpoint
|
||||
kubelet.Unlock()
|
||||
pluginSocket = path.Join(devicePluginPath, pEndpoint)
|
||||
if pEndpoint != "" {
|
||||
if _, err = os.Stat(pluginSocket); err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
fmt.Println("No plugin socket. Waiting...")
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
conn, err = grpc.Dial(pluginSocket, grpc.WithInsecure(),
|
||||
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("unix", addr, timeout)
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get connection: %+v", err)
|
||||
}
|
||||
|
||||
client = pluginapi.NewDevicePluginClient(conn)
|
||||
_, err = client.Allocate(context.Background(), &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1", "dev2"},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to allocate device dev1: %+v", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func TestStop(t *testing.T) {
|
||||
srv := &Server{}
|
||||
if err := srv.Stop(); err == nil {
|
||||
t.Error("Calling Stop() before Serve() is successful")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakeAllocateResponse(t *testing.T) {
|
||||
rqt := &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err := MakeAllocateResponse(rqt, nil)
|
||||
if err == nil {
|
||||
t.Fatal("No error when allocating non-existing device")
|
||||
}
|
||||
|
||||
devices := map[string]DeviceInfo{
|
||||
"dev1": {
|
||||
State: pluginapi.Unhealthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = MakeAllocateResponse(rqt, devices)
|
||||
if err == nil {
|
||||
t.Fatal("No error when allocating unhealthy device")
|
||||
}
|
||||
|
||||
devices["dev1"] = DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
}
|
||||
resp, err := MakeAllocateResponse(rqt, devices)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to allocate healthy device: %+v", err)
|
||||
}
|
||||
|
||||
if len(resp.ContainerResponses[0].Devices) != 1 {
|
||||
t.Fatal("Allocated wrong number of devices")
|
||||
}
|
||||
}
|
134
internal/deviceplugin/manager.go
Normal file
134
internal/deviceplugin/manager.go
Normal file
@ -0,0 +1,134 @@
|
||||
// Copyright 2018 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deviceplugin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
)
|
||||
|
||||
// updateInfo contains info for added, updated and deleted devices.
|
||||
type updateInfo struct {
|
||||
Added DeviceTree
|
||||
Updated DeviceTree
|
||||
Removed DeviceTree
|
||||
}
|
||||
|
||||
// notifier implements Notifier interface.
|
||||
type notifier struct {
|
||||
deviceTree DeviceTree
|
||||
updatesCh chan<- updateInfo
|
||||
}
|
||||
|
||||
func newNotifier(updatesCh chan<- updateInfo) *notifier {
|
||||
return ¬ifier{
|
||||
updatesCh: updatesCh,
|
||||
}
|
||||
}
|
||||
|
||||
func (n *notifier) Notify(newDeviceTree DeviceTree) {
|
||||
added := NewDeviceTree()
|
||||
updated := NewDeviceTree()
|
||||
|
||||
for devType, new := range newDeviceTree {
|
||||
if old, ok := n.deviceTree[devType]; ok {
|
||||
if !reflect.DeepEqual(old, new) {
|
||||
updated[devType] = new
|
||||
}
|
||||
delete(n.deviceTree, devType)
|
||||
} else {
|
||||
added[devType] = new
|
||||
}
|
||||
}
|
||||
|
||||
if len(added) > 0 || len(updated) > 0 || len(n.deviceTree) > 0 {
|
||||
n.updatesCh <- updateInfo{
|
||||
Added: added,
|
||||
Updated: updated,
|
||||
Removed: n.deviceTree,
|
||||
}
|
||||
}
|
||||
|
||||
n.deviceTree = newDeviceTree
|
||||
}
|
||||
|
||||
// Manager manages life cycle of device plugins and handles the scan results
|
||||
// received from them.
|
||||
type Manager struct {
|
||||
devicePlugin Scanner
|
||||
namespace string
|
||||
servers map[string]devicePluginServer
|
||||
createServer func(string, func(*pluginapi.AllocateResponse) error) devicePluginServer
|
||||
}
|
||||
|
||||
// NewManager creates a new instance of Manager
|
||||
func NewManager(namespace string, devicePlugin Scanner) *Manager {
|
||||
return &Manager{
|
||||
devicePlugin: devicePlugin,
|
||||
namespace: namespace,
|
||||
servers: make(map[string]devicePluginServer),
|
||||
createServer: newServer,
|
||||
}
|
||||
}
|
||||
|
||||
// Run prepares and launches event loop for updates from Scanner
|
||||
func (m *Manager) Run() {
|
||||
updatesCh := make(chan updateInfo)
|
||||
|
||||
go func() {
|
||||
err := m.devicePlugin.Scan(newNotifier(updatesCh))
|
||||
if err != nil {
|
||||
fmt.Printf("Device scan failed: %+v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
close(updatesCh)
|
||||
}()
|
||||
|
||||
for update := range updatesCh {
|
||||
m.handleUpdate(update)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) handleUpdate(update updateInfo) {
|
||||
glog.V(2).Info("Received dev updates: ", update)
|
||||
for devType, devices := range update.Added {
|
||||
var postAllocate func(*pluginapi.AllocateResponse) error
|
||||
|
||||
if postAllocator, ok := m.devicePlugin.(PostAllocator); ok {
|
||||
postAllocate = postAllocator.PostAllocate
|
||||
}
|
||||
|
||||
m.servers[devType] = m.createServer(devType, postAllocate)
|
||||
go func() {
|
||||
err := m.servers[devType].Serve(m.namespace)
|
||||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
}
|
||||
}()
|
||||
m.servers[devType].Update(devices)
|
||||
}
|
||||
for devType, devices := range update.Updated {
|
||||
m.servers[devType].Update(devices)
|
||||
}
|
||||
for devType := range update.Removed {
|
||||
m.servers[devType].Stop()
|
||||
delete(m.servers, devType)
|
||||
}
|
||||
}
|
220
internal/deviceplugin/manager_test.go
Normal file
220
internal/deviceplugin/manager_test.go
Normal file
@ -0,0 +1,220 @@
|
||||
// Copyright 2018 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deviceplugin
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
)
|
||||
|
||||
func TestNotify(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
expectedAdded int
|
||||
expectedUpdated int
|
||||
expectedRemoved int
|
||||
oldmap map[string]map[string]DeviceInfo
|
||||
newmap map[string]map[string]DeviceInfo
|
||||
}{
|
||||
{
|
||||
name: "No devices found",
|
||||
},
|
||||
{
|
||||
name: "Added 1 new device type",
|
||||
newmap: map[string]map[string]DeviceInfo{
|
||||
"someDeviceType": {
|
||||
"intel-fpga-port.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedAdded: 1,
|
||||
},
|
||||
{
|
||||
name: "Updated 1 new device type",
|
||||
oldmap: map[string]map[string]DeviceInfo{
|
||||
"someDeviceType": {
|
||||
"intel-fpga-port.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
newmap: map[string]map[string]DeviceInfo{
|
||||
"someDeviceType": {
|
||||
"intel-fpga-port.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedUpdated: 1,
|
||||
},
|
||||
{
|
||||
name: "Removed 1 new device type",
|
||||
oldmap: map[string]map[string]DeviceInfo{
|
||||
"someDeviceType": {
|
||||
"intel-fpga-port.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedRemoved: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tcase := range tcases {
|
||||
ch := make(chan updateInfo, 1)
|
||||
n := newNotifier(ch)
|
||||
n.deviceTree = tcase.oldmap
|
||||
|
||||
n.Notify(tcase.newmap)
|
||||
|
||||
var update updateInfo
|
||||
select {
|
||||
case update = <-ch:
|
||||
default:
|
||||
}
|
||||
|
||||
if tcase.expectedAdded != len(update.Added) {
|
||||
t.Errorf("Test case '%s': expected %d added device types, but got %d", tcase.name, tcase.expectedAdded, len(update.Added))
|
||||
}
|
||||
if tcase.expectedUpdated != len(update.Updated) {
|
||||
t.Errorf("Test case '%s': expected %d updated device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
|
||||
}
|
||||
if tcase.expectedRemoved != len(update.Removed) {
|
||||
t.Errorf("Test case '%s': expected %d removed device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type serverStub struct{}
|
||||
|
||||
func (*serverStub) Serve(string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*serverStub) Update(map[string]DeviceInfo) {}
|
||||
|
||||
func (*serverStub) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type devicePluginStub struct{}
|
||||
|
||||
func (*devicePluginStub) Scan(n Notifier) error {
|
||||
tree := NewDeviceTree()
|
||||
tree.AddDevice("testdevice", "dev1", DeviceInfo{
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: make([]string, 0),
|
||||
})
|
||||
n.Notify(tree)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (*devicePluginStub) PostAllocate(*pluginapi.AllocateResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestHandleUpdate(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
servers map[string]devicePluginServer
|
||||
update updateInfo
|
||||
expectedServers int
|
||||
}{
|
||||
{
|
||||
name: "Empty update",
|
||||
update: updateInfo{},
|
||||
expectedServers: 0,
|
||||
},
|
||||
{
|
||||
name: "Add device manager",
|
||||
update: updateInfo{
|
||||
Added: map[string]map[string]DeviceInfo{
|
||||
"ce48969398f05f33946d560708be108a": {
|
||||
"intel-fpga-fme.0": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0", "/dev/intel-fpga-fme.0"},
|
||||
},
|
||||
"intel-fpga-fme.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedServers: 1,
|
||||
},
|
||||
{
|
||||
name: "Update existing device manager",
|
||||
servers: map[string]devicePluginServer{
|
||||
"ce48969398f05f33946d560708be108a": &serverStub{},
|
||||
},
|
||||
update: updateInfo{
|
||||
Updated: map[string]map[string]DeviceInfo{
|
||||
"ce48969398f05f33946d560708be108a": {
|
||||
"intel-fpga-fme.1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.1", "/dev/intel-fpga-fme.1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedServers: 1,
|
||||
},
|
||||
{
|
||||
name: "Remove device manager",
|
||||
servers: map[string]devicePluginServer{
|
||||
"ce48969398f05f33946d560708be108a": &serverStub{},
|
||||
},
|
||||
update: updateInfo{
|
||||
Removed: map[string]map[string]DeviceInfo{
|
||||
"ce48969398f05f33946d560708be108a": {},
|
||||
},
|
||||
},
|
||||
expectedServers: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tcases {
|
||||
if tt.servers == nil {
|
||||
tt.servers = make(map[string]devicePluginServer)
|
||||
}
|
||||
mgr := Manager{
|
||||
devicePlugin: &devicePluginStub{},
|
||||
servers: tt.servers,
|
||||
createServer: func(string, func(*pluginapi.AllocateResponse) error) devicePluginServer {
|
||||
return &serverStub{}
|
||||
},
|
||||
}
|
||||
mgr.handleUpdate(tt.update)
|
||||
if len(tt.servers) != tt.expectedServers {
|
||||
t.Errorf("Test case '%s': expected %d runnig device managers, but got %d",
|
||||
tt.name, tt.expectedServers, len(tt.servers))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRun(t *testing.T) {
|
||||
mgr := NewManager("testnamespace", &devicePluginStub{})
|
||||
mgr.createServer = func(string, func(*pluginapi.AllocateResponse) error) devicePluginServer {
|
||||
return &serverStub{}
|
||||
}
|
||||
mgr.Run()
|
||||
}
|
@ -24,39 +24,150 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/golang/glog"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
)
|
||||
|
||||
// DeviceInfo contains information about device maintained by Device Plugin
|
||||
type DeviceInfo struct {
|
||||
State string
|
||||
Nodes []string
|
||||
DeviceMountPath []string
|
||||
// devicePluginServer maintains a gRPC server satisfying
|
||||
// pluginapi.PluginInterfaceServer interfaces.
|
||||
// This internal unexposed interface simplifies unit testing.
|
||||
type devicePluginServer interface {
|
||||
Serve(namespace string) error
|
||||
Stop() error
|
||||
Update(devices map[string]DeviceInfo)
|
||||
}
|
||||
|
||||
// Server structure to keep server parameters
|
||||
type Server struct {
|
||||
grpcServer *grpc.Server
|
||||
// server implements devicePluginServer and pluginapi.PluginInterfaceServer interfaces.
|
||||
type server struct {
|
||||
devType string
|
||||
grpcServer *grpc.Server
|
||||
updatesCh chan map[string]DeviceInfo
|
||||
devices map[string]DeviceInfo
|
||||
postAllocate func(*pluginapi.AllocateResponse) error
|
||||
}
|
||||
|
||||
// Serve serves starts gRPC server to serve Device Plugin functionality
|
||||
func (srv *Server) Serve(dm pluginapi.DevicePluginServer, resourceName string, pluginPrefix string) error {
|
||||
return srv.setupAndServe(dm, resourceName, pluginPrefix, pluginapi.DevicePluginPath, pluginapi.KubeletSocket)
|
||||
// newServer creates a new server satisfying the devicePluginServer interface.
|
||||
func newServer(devType string, postAllocate func(*pluginapi.AllocateResponse) error) devicePluginServer {
|
||||
return &server{
|
||||
devType: devType,
|
||||
updatesCh: make(chan map[string]DeviceInfo, 1), // TODO: is 1 needed?
|
||||
devices: make(map[string]DeviceInfo),
|
||||
postAllocate: postAllocate,
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops serving Device Plugin
|
||||
func (srv *Server) Stop() error {
|
||||
func (srv *server) GetDevicePluginOptions(ctx context.Context, empty *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
|
||||
fmt.Println("GetDevicePluginOptions: return empty options")
|
||||
return new(pluginapi.DevicePluginOptions), nil
|
||||
}
|
||||
|
||||
func (srv *server) sendDevices(stream pluginapi.DevicePlugin_ListAndWatchServer) error {
|
||||
resp := new(pluginapi.ListAndWatchResponse)
|
||||
for id, device := range srv.devices {
|
||||
resp.Devices = append(resp.Devices, &pluginapi.Device{id, device.State})
|
||||
}
|
||||
glog.V(2).Info("Sending to kubelet ", resp.Devices)
|
||||
if err := stream.Send(resp); err != nil {
|
||||
srv.Stop()
|
||||
return fmt.Errorf("device-plugin: cannot update device list: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (srv *server) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.DevicePlugin_ListAndWatchServer) error {
|
||||
glog.V(2).Info("Started ListAndWatch for ", srv.devType)
|
||||
|
||||
if err := srv.sendDevices(stream); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for srv.devices = range srv.updatesCh {
|
||||
if err := srv.sendDevices(stream); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (srv *server) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
||||
response := new(pluginapi.AllocateResponse)
|
||||
for _, crqt := range rqt.ContainerRequests {
|
||||
cresp := new(pluginapi.ContainerAllocateResponse)
|
||||
for _, id := range crqt.DevicesIDs {
|
||||
dev, ok := srv.devices[id]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
|
||||
}
|
||||
if dev.State != pluginapi.Healthy {
|
||||
return nil, fmt.Errorf("Invalid allocation request with unhealthy device %s", id)
|
||||
}
|
||||
for _, devnode := range dev.Nodes {
|
||||
cresp.Devices = append(cresp.Devices, &pluginapi.DeviceSpec{
|
||||
HostPath: devnode,
|
||||
ContainerPath: devnode,
|
||||
Permissions: "mrw",
|
||||
})
|
||||
}
|
||||
for _, devmount := range dev.Mounts {
|
||||
cresp.Mounts = append(cresp.Mounts, &pluginapi.Mount{
|
||||
HostPath: devmount,
|
||||
ContainerPath: devmount,
|
||||
ReadOnly: false,
|
||||
})
|
||||
}
|
||||
for key, value := range dev.Envs {
|
||||
if cresp.Envs == nil {
|
||||
cresp.Envs = make(map[string]string)
|
||||
}
|
||||
cresp.Envs[key] = value
|
||||
}
|
||||
}
|
||||
response.ContainerResponses = append(response.ContainerResponses, cresp)
|
||||
}
|
||||
|
||||
if srv.postAllocate != nil {
|
||||
err := srv.postAllocate(response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (srv *server) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
|
||||
glog.Warning("PreStartContainer() should not be called")
|
||||
return new(pluginapi.PreStartContainerResponse), nil
|
||||
}
|
||||
|
||||
// Serve starts a gRPC server to serve pluginapi.PluginInterfaceServer interface.
|
||||
func (srv *server) Serve(namespace string) error {
|
||||
return srv.setupAndServe(namespace, pluginapi.DevicePluginPath, pluginapi.KubeletSocket)
|
||||
}
|
||||
|
||||
// Stop stops serving pluginapi.PluginInterfaceServer interface.
|
||||
func (srv *server) Stop() error {
|
||||
if srv.grpcServer == nil {
|
||||
return fmt.Errorf("Can't stop non-existing gRPC server. Calling Stop() before Serve()?")
|
||||
}
|
||||
srv.grpcServer.Stop()
|
||||
close(srv.updatesCh)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update sends updates from Manager to ListAndWatch's event loop.
|
||||
func (srv *server) Update(devices map[string]DeviceInfo) {
|
||||
srv.updatesCh <- devices
|
||||
}
|
||||
|
||||
// setupAndServe binds given gRPC server to device manager, starts it and registers it with kubelet.
|
||||
func (srv *Server) setupAndServe(dm pluginapi.DevicePluginServer, resourceName string, pluginPrefix string, devicePluginPath string, kubeletSocket string) error {
|
||||
func (srv *server) setupAndServe(namespace string, devicePluginPath string, kubeletSocket string) error {
|
||||
resourceName := namespace + "/" + srv.devType
|
||||
pluginPrefix := namespace + "-" + srv.devType
|
||||
|
||||
for {
|
||||
pluginEndpoint := pluginPrefix + ".sock"
|
||||
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
|
||||
@ -72,7 +183,7 @@ func (srv *Server) setupAndServe(dm pluginapi.DevicePluginServer, resourceName s
|
||||
}
|
||||
|
||||
srv.grpcServer = grpc.NewServer()
|
||||
pluginapi.RegisterDevicePluginServer(srv.grpcServer, dm)
|
||||
pluginapi.RegisterDevicePluginServer(srv.grpcServer, srv)
|
||||
|
||||
// Starts device plugin service.
|
||||
go func() {
|
||||
@ -169,29 +280,3 @@ func waitForServer(socket string, timeout time.Duration) error {
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// MakeAllocateResponse creates response data for Allocate GRPC call
|
||||
func MakeAllocateResponse(rqt *pluginapi.AllocateRequest, devices map[string]DeviceInfo) (*pluginapi.AllocateResponse, error) {
|
||||
resp := new(pluginapi.AllocateResponse)
|
||||
for _, crqt := range rqt.ContainerRequests {
|
||||
cresp := new(pluginapi.ContainerAllocateResponse)
|
||||
for _, id := range crqt.DevicesIDs {
|
||||
dev, ok := devices[id]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
|
||||
}
|
||||
if dev.State != pluginapi.Healthy {
|
||||
return nil, fmt.Errorf("Invalid allocation request with unhealthy device %s", id)
|
||||
}
|
||||
for _, devnode := range dev.Nodes {
|
||||
cresp.Devices = append(cresp.Devices, &pluginapi.DeviceSpec{
|
||||
HostPath: devnode,
|
||||
ContainerPath: devnode,
|
||||
Permissions: "mrw",
|
||||
})
|
||||
}
|
||||
}
|
||||
resp.ContainerResponses = append(resp.ContainerResponses, cresp)
|
||||
}
|
||||
return resp, nil
|
||||
}
|
435
internal/deviceplugin/server_test.go
Normal file
435
internal/deviceplugin/server_test.go
Normal file
@ -0,0 +1,435 @@
|
||||
// Copyright 2018 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package deviceplugin
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
||||
)
|
||||
|
||||
const (
|
||||
devicePluginPath = "/tmp/"
|
||||
kubeletSocket = devicePluginPath + "kubelet-test.sock"
|
||||
namespace = "test.intel.com"
|
||||
pluginEndpoint = namespace + "-testdevicetype.sock"
|
||||
resourceName = namespace + "/testdevicetype"
|
||||
)
|
||||
|
||||
type kubeletStub struct {
|
||||
sync.Mutex
|
||||
socket string
|
||||
pluginEndpoint string
|
||||
server *grpc.Server
|
||||
}
|
||||
|
||||
// newKubeletStub returns an initialized kubeletStub for testing purpose.
|
||||
func newKubeletStub(socket string) *kubeletStub {
|
||||
return &kubeletStub{
|
||||
socket: socket,
|
||||
}
|
||||
}
|
||||
|
||||
// Minimal implementation of deviceplugin.RegistrationServer interface
|
||||
|
||||
func (k *kubeletStub) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
|
||||
k.Lock()
|
||||
defer k.Unlock()
|
||||
k.pluginEndpoint = r.Endpoint
|
||||
return &pluginapi.Empty{}, nil
|
||||
}
|
||||
|
||||
func (k *kubeletStub) start() error {
|
||||
os.Remove(k.socket)
|
||||
s, err := net.Listen("unix", k.socket)
|
||||
if err != nil {
|
||||
fmt.Printf("Can't listen at the socket: %+v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
k.server = grpc.NewServer()
|
||||
|
||||
pluginapi.RegisterRegistrationServer(k.server, k)
|
||||
go k.server.Serve(s)
|
||||
|
||||
// Wait till the grpcServer is ready to serve services.
|
||||
waitForServer(k.socket, 10*time.Second)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestRegisterWithKublet(t *testing.T) {
|
||||
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
|
||||
|
||||
err := registerWithKubelet(kubeletSocket, pluginSocket, resourceName)
|
||||
if err == nil {
|
||||
t.Error("No error triggered when kubelet is not accessible")
|
||||
}
|
||||
|
||||
kubelet := newKubeletStub(kubeletSocket)
|
||||
kubelet.start()
|
||||
defer kubelet.server.Stop()
|
||||
|
||||
err = registerWithKubelet(kubeletSocket, pluginSocket, resourceName)
|
||||
if err != nil {
|
||||
t.Errorf("Can't register device plugin: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSetupAndServe(t *testing.T) {
|
||||
var pluginSocket string
|
||||
var pEndpoint string
|
||||
|
||||
kubelet := newKubeletStub(kubeletSocket)
|
||||
kubelet.start()
|
||||
defer kubelet.server.Stop()
|
||||
|
||||
srv := &server{
|
||||
devType: "testtype",
|
||||
devices: map[string]DeviceInfo{
|
||||
"dev1": {
|
||||
State: pluginapi.Healthy,
|
||||
},
|
||||
"dev2": {
|
||||
State: pluginapi.Healthy,
|
||||
},
|
||||
},
|
||||
updatesCh: make(chan map[string]DeviceInfo),
|
||||
}
|
||||
|
||||
defer srv.Stop()
|
||||
go srv.setupAndServe(namespace, devicePluginPath, kubeletSocket)
|
||||
|
||||
// Wait till the grpcServer is ready to serve services.
|
||||
for {
|
||||
kubelet.Lock()
|
||||
pEndpoint = kubelet.pluginEndpoint
|
||||
kubelet.Unlock()
|
||||
pluginSocket = path.Join(devicePluginPath, pEndpoint)
|
||||
if pEndpoint != "" {
|
||||
if _, err := os.Stat(pluginSocket); err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
err := srv.setupAndServe(namespace, devicePluginPath, kubeletSocket)
|
||||
if err == nil {
|
||||
t.Fatalf("Server was able to start on occupied socket %s: %+v", pluginSocket, err)
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(pluginSocket, grpc.WithInsecure(),
|
||||
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("unix", addr, timeout)
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get connection: %+v", err)
|
||||
}
|
||||
|
||||
client := pluginapi.NewDevicePluginClient(conn)
|
||||
_, err = client.Allocate(context.Background(), &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1", "dev2"},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to allocate device dev1: %+v", err)
|
||||
}
|
||||
conn.Close()
|
||||
|
||||
// Check if plugins re-registers after its socket has been removed
|
||||
kubelet.Lock()
|
||||
pEndpoint = kubelet.pluginEndpoint
|
||||
kubelet.Unlock()
|
||||
if pEndpoint == "" {
|
||||
t.Fatal("After successful Allocate() pluginEndpoint is empty")
|
||||
}
|
||||
os.Remove(path.Join(devicePluginPath, pEndpoint))
|
||||
for {
|
||||
kubelet.Lock()
|
||||
pEndpoint = kubelet.pluginEndpoint
|
||||
kubelet.Unlock()
|
||||
pluginSocket = path.Join(devicePluginPath, pEndpoint)
|
||||
if pEndpoint != "" {
|
||||
if _, err = os.Stat(pluginSocket); err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
fmt.Println("No plugin socket. Waiting...")
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
conn, err = grpc.Dial(pluginSocket, grpc.WithInsecure(),
|
||||
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
return net.DialTimeout("unix", addr, timeout)
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get connection: %+v", err)
|
||||
}
|
||||
|
||||
client = pluginapi.NewDevicePluginClient(conn)
|
||||
_, err = client.Allocate(context.Background(), &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1", "dev2"},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("Failed to allocate device dev1: %+v", err)
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
func TestStop(t *testing.T) {
|
||||
srv := &server{}
|
||||
if err := srv.Stop(); err == nil {
|
||||
t.Error("Calling Stop() before Serve() is successful")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate(t *testing.T) {
|
||||
rqt := &pluginapi.AllocateRequest{
|
||||
ContainerRequests: []*pluginapi.ContainerAllocateRequest{
|
||||
{
|
||||
DevicesIDs: []string{"dev1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
srv := &server{}
|
||||
|
||||
tcases := []struct {
|
||||
name string
|
||||
devices map[string]DeviceInfo
|
||||
postAllocate func(*pluginapi.AllocateResponse) error
|
||||
expectedAllocated int
|
||||
expectedErr bool
|
||||
}{
|
||||
{
|
||||
name: "Allocate non-existing device",
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
name: "Allocate unhealthy devices",
|
||||
devices: map[string]DeviceInfo{
|
||||
"dev1": {
|
||||
State: pluginapi.Unhealthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
},
|
||||
},
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
name: "Allocate healthy device",
|
||||
devices: map[string]DeviceInfo{
|
||||
"dev1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
},
|
||||
},
|
||||
expectedAllocated: 1,
|
||||
},
|
||||
{
|
||||
name: "Allocate healthy device with postAllocate hook",
|
||||
devices: map[string]DeviceInfo{
|
||||
"dev1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
Mounts: []string{"/dev"},
|
||||
Envs: map[string]string{
|
||||
"testname": "testvalue",
|
||||
},
|
||||
},
|
||||
},
|
||||
postAllocate: func(resp *pluginapi.AllocateResponse) error {
|
||||
return nil
|
||||
},
|
||||
expectedAllocated: 1,
|
||||
},
|
||||
{
|
||||
name: "Allocate healthy device with failing postAllocate hook",
|
||||
devices: map[string]DeviceInfo{
|
||||
"dev1": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/dev1"},
|
||||
},
|
||||
},
|
||||
postAllocate: func(resp *pluginapi.AllocateResponse) error {
|
||||
return fmt.Errorf("Fake error for %s", "dev1")
|
||||
},
|
||||
expectedErr: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tcases {
|
||||
srv.devices = tt.devices
|
||||
srv.postAllocate = tt.postAllocate
|
||||
resp, err := srv.Allocate(nil, rqt)
|
||||
|
||||
if tt.expectedErr && err == nil {
|
||||
t.Errorf("Test case '%s': no error returned", tt.name)
|
||||
continue
|
||||
}
|
||||
if !tt.expectedErr && err != nil {
|
||||
t.Errorf("Test case '%s': got unexpected error %+v", tt.name, err)
|
||||
continue
|
||||
}
|
||||
if tt.expectedAllocated > 0 && len(resp.ContainerResponses[0].Devices) != tt.expectedAllocated {
|
||||
t.Errorf("Test case '%s': allocated wrong number of devices", tt.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Minimal implementation of pluginapi.DevicePlugin_ListAndWatchServer
|
||||
type listAndWatchServerStub struct {
|
||||
testServer *server
|
||||
generateErr int
|
||||
sendCounter int
|
||||
cdata chan []*pluginapi.Device
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) error {
|
||||
s.sendCounter = s.sendCounter + 1
|
||||
if s.generateErr == s.sendCounter {
|
||||
fmt.Println("listAndWatchServerStub::Send returns error")
|
||||
return fmt.Errorf("Fake error")
|
||||
}
|
||||
|
||||
fmt.Println("listAndWatchServerStub::Send", resp.Devices)
|
||||
s.cdata <- resp.Devices
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) Context() context.Context {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) RecvMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SendMsg(m interface{}) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SendHeader(m metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SetHeader(m metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *listAndWatchServerStub) SetTrailer(m metadata.MD) {
|
||||
}
|
||||
|
||||
func TestListAndWatch(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
updates []map[string]DeviceInfo
|
||||
errorOnCall int
|
||||
}{
|
||||
{
|
||||
name: "No updates and close",
|
||||
},
|
||||
{
|
||||
name: "No updates and close, but expect streaming error",
|
||||
errorOnCall: 1,
|
||||
},
|
||||
{
|
||||
name: "Send 1 update",
|
||||
updates: []map[string]DeviceInfo{
|
||||
{
|
||||
"fake_id": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Send 1 update, but expect streaming error",
|
||||
updates: []map[string]DeviceInfo{
|
||||
{
|
||||
"fake_id": {
|
||||
State: pluginapi.Healthy,
|
||||
Nodes: []string{"/dev/intel-fpga-port.0"},
|
||||
},
|
||||
},
|
||||
},
|
||||
errorOnCall: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tcases {
|
||||
devCh := make(chan map[string]DeviceInfo, len(tt.updates))
|
||||
testServer := &server{
|
||||
updatesCh: devCh,
|
||||
}
|
||||
|
||||
server := &listAndWatchServerStub{
|
||||
testServer: testServer,
|
||||
generateErr: tt.errorOnCall,
|
||||
cdata: make(chan []*pluginapi.Device, len(tt.updates)+1),
|
||||
}
|
||||
|
||||
// push device infos to DM's channel
|
||||
for _, update := range tt.updates {
|
||||
devCh <- update
|
||||
}
|
||||
close(devCh)
|
||||
|
||||
err := testServer.ListAndWatch(&pluginapi.Empty{}, server)
|
||||
if err != nil && tt.errorOnCall == 0 {
|
||||
t.Errorf("Test case '%s': got unexpected error %v", tt.name, err)
|
||||
}
|
||||
if err == nil && tt.errorOnCall != 0 {
|
||||
t.Errorf("Test case '%s': expected an error, but got nothing", tt.name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetDevicePluginOptions(t *testing.T) {
|
||||
srv := &server{}
|
||||
srv.GetDevicePluginOptions(nil, nil)
|
||||
}
|
||||
|
||||
func TestPreStartContainer(t *testing.T) {
|
||||
srv := &server{}
|
||||
srv.PreStartContainer(nil, nil)
|
||||
}
|
||||
|
||||
func TestNewServer(t *testing.T) {
|
||||
_ = newServer("test", nil)
|
||||
}
|
||||
|
||||
func TestUpdate(t *testing.T) {
|
||||
srv := &server{
|
||||
updatesCh: make(chan map[string]DeviceInfo, 1),
|
||||
}
|
||||
srv.Update(make(map[string]DeviceInfo))
|
||||
}
|
Loading…
Reference in New Issue
Block a user