lint: enable wsl check

Fixes: #392

Signed-off-by: Ed Bartosh <eduard.bartosh@intel.com>
This commit is contained in:
Ed Bartosh 2021-12-15 13:01:14 +02:00
parent 8058d3c1b1
commit cec004c398
80 changed files with 784 additions and 55 deletions

View File

@ -42,6 +42,7 @@ linters:
- unused
- varcheck
- whitespace
- wsl
linters-settings:
gofmt:

View File

@ -57,6 +57,7 @@ func (dp *DevicePlugin) Scan(notifier dpapi.Notifier) error {
defer dp.scanTicker.Stop()
var prevDevTree dpapi.DeviceTree
for {
devTree := dp.scan()
@ -64,6 +65,7 @@ func (dp *DevicePlugin) Scan(notifier dpapi.Notifier) error {
klog.V(1).Info("DLB scan update: pf: ", len(devTree[deviceTypePF]), " / vf: ", len(devTree[deviceTypeVF]))
prevDevTree = devTree
}
notifier.Notify(devTree)
select {

View File

@ -55,6 +55,7 @@ func createTestFiles(devfs string, devfsdirs []string, sysfs string, pfDevs []st
if err := os.MkdirAll(path.Join(sysfs, pfDev, "device"), 0750); err != nil {
return errors.Wrap(err, "Failed to create fake device directory")
}
if err := os.WriteFile(path.Join(sysfs, pfDev, "device", "sriov_numvfs"), []byte(sriovnumvfs[index]), 0600); err != nil {
return errors.Wrap(err, "Failed to create fake device directory")
}

View File

@ -50,6 +50,8 @@ func main() {
if plugin == nil {
klog.Fatal("Cannot create device plugin, please check above error messages.")
}
manager := dpapi.NewManager(namespace, plugin)
manager.Run()
}

View File

@ -43,8 +43,11 @@ func init() {
}
func main() {
var metricsAddr string
var enableLeaderElection bool
var (
metricsAddr string
enableLeaderElection bool
)
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
@ -91,6 +94,7 @@ func main() {
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)

View File

@ -79,12 +79,14 @@ func (dev *Device) getName() string {
if len(dev.name) == 0 {
dev.name = filepath.Base(dev.Path)
}
return dev.name
}
func decodeJSONStream(reader io.Reader, dest interface{}) error {
decoder := json.NewDecoder(reader)
err := decoder.Decode(&dest)
return errors.WithStack(err)
}
@ -110,6 +112,7 @@ func newHookEnv(bitstreamDir string, config string, newPort newPortFun) *hookEnv
func (he *hookEnv) getConfig(stdinJ *Stdin) (*Config, error) {
configPath := filepath.Join(stdinJ.Bundle, he.config)
configFile, err := os.Open(configPath)
if err != nil {
return nil, errors.WithStack(err)
@ -117,6 +120,7 @@ func (he *hookEnv) getConfig(stdinJ *Stdin) (*Config, error) {
defer configFile.Close()
var config Config
err = decodeJSONStream(configFile, &config)
if err != nil {
return nil, errors.WithMessage(err, "can't decode "+configPath)
@ -129,6 +133,7 @@ func (he *hookEnv) getConfig(stdinJ *Stdin) (*Config, error) {
if len(config.Linux.Devices) == 0 {
return nil, errors.Errorf("%s: linux.devices is empty", configPath)
}
return &config, nil
}
@ -136,6 +141,7 @@ func (he *hookEnv) getFPGAParams(config *Config) ([]fpgaParams, error) {
// parse FPGA_REGION_N and FPGA_AFU_N environment variables
regionEnv := make(map[string]string)
afuEnv := make(map[string]string)
for _, env := range config.Process.Env {
splitted := strings.SplitN(env, "=", 2)
if strings.HasPrefix(splitted[0], fpgaRegionEnvPrefix) {
@ -156,6 +162,7 @@ func (he *hookEnv) getFPGAParams(config *Config) ([]fpgaParams, error) {
}
params := []fpgaParams{}
for num, region := range regionEnv {
afu, ok := afuEnv[num]
if !ok {
@ -164,6 +171,7 @@ func (he *hookEnv) getFPGAParams(config *Config) ([]fpgaParams, error) {
// Find a device suitable for the region/interface id
found := false
for _, dev := range config.Linux.Devices {
deviceName := dev.getName()
// skip non-FPGA devices
@ -175,10 +183,12 @@ func (he *hookEnv) getFPGAParams(config *Config) ([]fpgaParams, error) {
if dev.processed {
continue
}
port, err := he.newPort(deviceName)
if err != nil {
return nil, err
}
if interfaceUUID := port.GetInterfaceUUID(); interfaceUUID == region {
params = append(params,
fpgaParams{
@ -189,18 +199,22 @@ func (he *hookEnv) getFPGAParams(config *Config) ([]fpgaParams, error) {
)
dev.processed = true
found = true
break
}
}
if !found {
return nil, errors.Errorf("can't find appropriate device for region %s", region)
}
}
return params, nil
}
func getStdin(reader io.Reader) (*Stdin, error) {
var stdinJ Stdin
err := decodeJSONStream(reader, &stdinJ)
if err != nil {
return nil, err

View File

@ -36,6 +36,7 @@ func createTestDirs(sysfs string, sysfsDirs []string, sysfsFiles map[string][]by
return errors.Wrap(err, "Failed to create fake device directory")
}
}
for filename, body := range sysfsFiles {
err := os.WriteFile(path.Join(sysfs, filename), body, 0600)
if err != nil {
@ -189,6 +190,7 @@ type testFpgaPort struct {
func (p *testFpgaPort) GetInterfaceUUID() (id string) {
uuid := p.interfaceUUIDS[p.callNo]
p.callNo++
return uuid
}
@ -196,6 +198,7 @@ func (p *testFpgaPort) GetInterfaceUUID() (id string) {
func (p *testFpgaPort) GetAcceleratorTypeUUID() string {
uuid := p.accelTypeUUIDS[p.callNo]
p.callNo++
return uuid
}
@ -204,6 +207,7 @@ func (p *testFpgaPort) PR(bs bitstream.File, dryRun bool) error {
if p.failProgramming {
return errors.New("fail to program device")
}
return nil
}
@ -219,6 +223,7 @@ func TestGetFPGAParams(t *testing.T) {
if err != nil {
t.Fatalf("can't create temporary directory: %+v", err)
}
defer os.RemoveAll(tmpdir)
sysfsTest := path.Join(tmpdir, "sys", "class", "fpga")
@ -336,6 +341,7 @@ func TestProcess(t *testing.T) {
if err != nil {
t.Fatalf("can't create temporary directory: %+v", err)
}
defer os.RemoveAll(tmpdir)
sysfs := path.Join(tmpdir, "sys", "class", "fpga")

View File

@ -312,6 +312,7 @@ func TestScanFPGAsDFL(t *testing.T) {
if err != nil {
t.Fatalf("can't create temporary directory: %+v", err)
}
sysfs := path.Join(tmpdir, "sys")
dev := path.Join(tmpdir, "dev")
tcases := []struct {

View File

@ -65,8 +65,10 @@ func getRegionDevelTree(devices []device) dpapi.DeviceTree {
if region.interfaceID == unhealthyInterfaceID {
health = pluginapi.Unhealthy
}
devType := fmt.Sprintf("%s-%s", regionMode, region.interfaceID)
devNodes := make([]pluginapi.DeviceSpec, len(region.afus)+1)
for num, afu := range region.afus {
devNodes[num] = pluginapi.DeviceSpec{
HostPath: afu.devNode,
@ -74,6 +76,7 @@ func getRegionDevelTree(devices []device) dpapi.DeviceTree {
Permissions: "rw",
}
}
devNodes[len(region.afus)] = pluginapi.DeviceSpec{
HostPath: region.devNode,
ContainerPath: region.devNode,
@ -97,8 +100,10 @@ func getRegionTree(devices []device) dpapi.DeviceTree {
if region.interfaceID == unhealthyInterfaceID {
health = pluginapi.Unhealthy
}
devType := fmt.Sprintf("%s-%s", regionMode, region.interfaceID)
devNodes := make([]pluginapi.DeviceSpec, len(region.afus))
for num, afu := range region.afus {
devNodes[num] = pluginapi.DeviceSpec{
HostPath: afu.devNode,
@ -106,6 +111,7 @@ func getRegionTree(devices []device) dpapi.DeviceTree {
Permissions: "rw",
}
}
regionTree.AddDevice(devType, region.id, dpapi.NewDeviceInfo(health, devNodes, nil, nil))
}
}
@ -124,11 +130,13 @@ func getAfuTree(devices []device) dpapi.DeviceTree {
if afu.afuID == unhealthyAfuID {
health = pluginapi.Unhealthy
}
devType, err := fpga.GetAfuDevType(region.interfaceID, afu.afuID)
if err != nil {
klog.Warningf("failed to get devtype: %+v", err)
continue
}
devNodes := []pluginapi.DeviceSpec{
{
HostPath: afu.devNode,
@ -182,16 +190,20 @@ type devicePlugin struct {
// newDevicePlugin returns new instance of devicePlugin.
func newDevicePlugin(mode string, rootPath string) (*devicePlugin, error) {
var dp *devicePlugin
var err error
var (
dp *devicePlugin
err error
)
sysfsPathOPAE := path.Join(rootPath, sysfsDirectoryOPAE)
devfsPath := path.Join(rootPath, devfsDirectory)
if _, err = os.Stat(sysfsPathOPAE); os.IsNotExist(err) {
sysfsPathDFL := path.Join(rootPath, sysfsDirectoryDFL)
if _, err = os.Stat(sysfsPathDFL); os.IsNotExist(err) {
return nil, errors.Errorf("kernel driver is not loaded: neither %s nor %s sysfs entry exists", sysfsPathOPAE, sysfsPathDFL)
}
dp, err = newDevicePluginDFL(sysfsPathDFL, devfsPath, mode)
} else {
dp, err = newDevicePluginOPAE(sysfsPathOPAE, devfsPath, mode)
@ -224,6 +236,7 @@ func (dp *devicePlugin) PostAllocate(response *pluginapi.AllocateResponse) error
// Scan starts scanning FPGA devices on the host.
func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
defer dp.scanTicker.Stop()
for {
devTree, err := dp.scanFPGAs()
if err != nil {
@ -242,6 +255,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
func (dp *devicePlugin) getRegions(deviceFiles []os.DirEntry) ([]region, error) {
regions := map[string]region{}
for _, deviceFile := range deviceFiles {
name := deviceFile.Name()
if dp.portReg.MatchString(name) {
@ -249,6 +263,7 @@ func (dp *devicePlugin) getRegions(deviceFiles []os.DirEntry) ([]region, error)
if err != nil {
return nil, errors.Wrapf(err, "can't get port info for %s", name)
}
fme, err := port.GetFME()
if err != nil {
return nil, errors.Wrapf(err, "can't get FME info for %s", name)
@ -256,6 +271,7 @@ func (dp *devicePlugin) getRegions(deviceFiles []os.DirEntry) ([]region, error)
afuInfo := afu{id: port.GetName(), afuID: port.GetAcceleratorTypeUUID(), devNode: port.GetDevPath()}
regionName := fme.GetName()
reg, ok := regions[regionName]
if ok {
reg.afus = append(reg.afus, afuInfo)
@ -264,11 +280,13 @@ func (dp *devicePlugin) getRegions(deviceFiles []os.DirEntry) ([]region, error)
}
}
}
result := make([]region, 0, len(regions))
// Get list of regions from the map
for _, reg := range regions {
result = append(result, reg)
}
return result, nil
}
@ -280,6 +298,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) {
}
devices := []device{}
for _, file := range files {
devName := file.Name()
@ -301,6 +320,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) {
devices = append(devices, device{name: devName, regions: regions})
}
}
return dp.getDevTree(devices), nil
}
@ -327,10 +347,12 @@ func getPluginParams(mode string) (getDevTreeFunc, string, error) {
}
func main() {
var mode string
var kubeconfig string
var master string
var nodename string
var (
mode string
kubeconfig string
master string
nodename string
)
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
flag.StringVar(&master, "master", "", "master url")

View File

@ -40,12 +40,14 @@ func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFil
return errors.Wrap(err, "Failed to create fake device directory")
}
}
for _, sysfsdir := range sysfsDirs {
err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0750)
if err != nil {
return errors.Wrap(err, "Failed to create fake device directory")
}
}
for filename, body := range sysfsFiles {
err = os.WriteFile(path.Join(sysfs, filename), body, 0600)
if err != nil {
@ -62,12 +64,14 @@ func validateDevTree(expectedDevTreeKeys map[string][]string, devTree dpapi.Devi
if _, ok := devTree[resource]; !ok {
return errors.Errorf("device tree: resource %s missing", resource)
}
for _, device := range devices {
if _, ok := devTree[resource][device]; !ok {
return errors.Errorf("device tree resource %s: device %s missing", resource, device)
}
}
}
return nil
}
@ -104,6 +108,7 @@ func TestNewDevicePlugin(t *testing.T) {
if err != nil {
t.Fatalf("can't create temporary directory: %+v", err)
}
defer os.RemoveAll(root)
tcases := []struct {
@ -185,6 +190,7 @@ func TestScan(t *testing.T) {
if err != nil {
t.Fatalf("can't create temporary directory: %+v", err)
}
defer os.RemoveAll(root)
sysfs := path.Join(root, "sys")

View File

@ -11,8 +11,10 @@ import (
)
func getModeOverrideFromCluster(nodeName, kubeConfig, master, mode string) (string, error) {
var config *rest.Config
var err error
var (
config *rest.Config
err error
)
if len(nodeName) == 0 {
return mode, errors.New("node name is not set")
@ -23,6 +25,7 @@ func getModeOverrideFromCluster(nodeName, kubeConfig, master, mode string) (stri
} else {
config, err = clientcmd.BuildConfigFromFlags(master, kubeConfig)
}
if err != nil {
return mode, err
}

View File

@ -264,6 +264,7 @@ func TestScanFPGAsOPAE(t *testing.T) {
if err != nil {
t.Fatalf("can't create temporary directory: %+v", err)
}
sysfs := path.Join(tmpdir, "sys")
dev := path.Join(tmpdir, "dev")
tcases := []struct {

View File

@ -33,10 +33,13 @@ const (
)
func main() {
var err error
var bitstream, device string
var dryRun, force, quiet bool
var port uint
var (
err error
bitstream, device string
dryRun, force, quiet bool
port uint
)
flag.StringVar(&bitstream, "b", "", "Path to bitstream file (GBS or AOCX)")
flag.StringVar(&device, "d", "", "Path to device node (FME or Port)")
flag.BoolVar(&dryRun, "dry-run", false, "Don't write/program, just validate and log")
@ -51,6 +54,7 @@ func main() {
}
cmd := flag.Arg(0)
err = validateFlags(cmd, bitstream, device, port)
if err != nil {
log.Fatalf("Invalid arguments: %+v", err)
@ -79,6 +83,7 @@ func main() {
default:
err = errors.Errorf("unknown command %+v", flag.Args())
}
if err != nil {
log.Fatalf("%+v", err)
}
@ -101,10 +106,12 @@ func validateFlags(cmd, bitstream, device string, port uint) error {
if bitstream == "" {
return errors.Errorf("bitstream filename is missing")
}
if device == "" {
return errors.Errorf("FPGA device name is missing")
}
}
return nil
}
@ -119,33 +126,41 @@ func installBitstream(fname string, dryRun, force, quiet bool) (err error) {
if !quiet {
fmt.Printf("Installing bitstream %q as %q\n", fname, installPath)
if dryRun {
fmt.Println("Dry-run: no copying performed")
return
}
}
err = os.MkdirAll(filepath.Dir(installPath), 0755)
if err != nil {
return errors.Wrap(err, "unable to create destination directory")
}
src, err := os.Open(fname)
if err != nil {
return errors.Wrap(err, "can't open bitstream file")
}
defer src.Close()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if !force {
flags = flags | os.O_EXCL
}
dst, err := os.OpenFile(installPath, flags, 0644)
if err != nil {
if os.IsExist(err) {
return errors.Wrapf(err, "destination file %q already exist. Use --force to overwrite it", installPath)
}
return errors.Wrap(err, "can't create destination file")
}
defer dst.Close()
_, err = io.Copy(dst, src)
return err
}
@ -160,13 +175,16 @@ func printBitstreamInfo(fname string, quiet bool) (err error) {
fmt.Printf("Accelerator Type UUID : %q\n", info.AcceleratorTypeUUID())
fmt.Printf("Unique UUID : %q\n", info.UniqueUUID())
fmt.Printf("Installation Path : %q\n", info.InstallPath(fpgaBitStreamDirectory))
extra := info.ExtraMetadata()
if len(extra) > 0 && !quiet {
fmt.Println("Extra:")
for k, v := range extra {
fmt.Printf("\t%s : %q\n", k, v)
}
}
return
}
@ -177,6 +195,7 @@ func fpgaInfo(fname string, quiet bool) error {
case fpga.IsFpgaPort(fname):
return portInfo(fname, quiet)
}
return errors.Errorf("unknown FPGA device file %s", fname)
}
@ -190,6 +209,7 @@ func fmeInfo(fname string, quiet bool) error {
if _, err := fme.GetAPIVersion(); err != nil {
return errors.Wrap(err, "kernel API mismatch")
}
return printFpgaFME(fme, quiet)
}
@ -198,20 +218,26 @@ func printFpgaFME(f fpga.FME, quiet bool) (err error) {
fmt.Printf("Name : %s\n", f.GetName())
fmt.Printf("Device Node : %s\n", f.GetDevPath())
fmt.Printf("SysFS Path : %s\n", f.GetSysFsPath())
pci, err := f.GetPCIDevice()
if err != nil {
return
}
printPCIeInfo(pci, quiet)
fmt.Printf("Interface UUID : %s\n", f.GetInterfaceUUID())
if !quiet {
if apiVer, err := f.GetAPIVersion(); err == nil {
fmt.Printf("Kernet API Version : %d\n", apiVer)
}
fmt.Printf("Ports Num : %d\n", f.GetPortsNum())
if id, err := f.GetSocketID(); err == nil {
fmt.Printf("Socket Id : %d\n", id)
}
fmt.Printf("Bitstream Id : %s\n", f.GetBitstreamID())
fmt.Printf("Bitstream Metadata : %s\n", f.GetBitstreamMetadata())
}
@ -229,9 +255,11 @@ func portReleaseOrAssign(fname string, port uint, release, quiet bool) error {
if _, err := fme.GetAPIVersion(); err != nil {
return errors.Wrap(err, "kernel API mismatch")
}
if release {
return fme.PortRelease(uint32(port))
}
return fme.PortAssign(uint32(port))
}
@ -254,20 +282,26 @@ func printFpgaPort(f fpga.Port, quiet bool) (err error) {
fmt.Printf("Name : %s\n", f.GetName())
fmt.Printf("Device Node : %s\n", f.GetDevPath())
fmt.Printf("SysFS Path : %s\n", f.GetSysFsPath())
pci, err := f.GetPCIDevice()
if err != nil {
return
}
printPCIeInfo(pci, quiet)
fme, err := f.GetFME()
if err != nil {
return
}
fmt.Printf("FME Name : %s\n", fme.GetName())
num, err := f.GetPortID()
if err != nil {
return
}
fmt.Printf("Port Id : %d\n", num)
fmt.Printf("Interface UUID : %s\n", f.GetInterfaceUUID())
fmt.Printf("Accelerator UUID : %s\n", f.GetAcceleratorTypeUUID())
@ -289,6 +323,7 @@ func printFpgaPort(f fpga.Port, quiet bool) (err error) {
}
fmt.Printf("Port Regions : %d\n", pi.Regions)
for idx := uint32(0); idx < pi.Regions; idx++ {
ri, err := f.PortGetRegionInfo(idx)
if err != nil {
@ -303,17 +338,22 @@ func printFpgaPort(f fpga.Port, quiet bool) (err error) {
func printPCIeInfo(pci *fpga.PCIDevice, quiet bool) {
fmt.Printf("PCIe s:b:d:f : %s\n", pci.BDF)
if pci.PhysFn != nil && !quiet {
fmt.Printf("Physical Function PCIe s:b:d:f : %s\n", pci.PhysFn.BDF)
}
fmt.Printf("Device Id : %s:%s\n", pci.Vendor, pci.Device)
if !quiet {
fmt.Printf("Device Class : %s\n", pci.Class)
fmt.Printf("Local CPUs : %s\n", pci.CPUs)
fmt.Printf("NUMA : %s\n", pci.NUMA)
if pci.VFs != "" {
fmt.Printf("SR-IOV Virtual Functions : %s\n", pci.VFs)
}
if pci.TotalVFs != "" {
fmt.Printf("SR-IOV maximum Virtual Functions : %s\n", pci.TotalVFs)
}
@ -341,6 +381,7 @@ func doPR(dev, fname string, dryRun, quiet bool) error {
fmt.Printf("Before: Interface ID: %q AFU ID: %q\n", port.GetInterfaceUUID(), port.GetAcceleratorTypeUUID())
fmt.Printf("Programming %q to port %q: ", fname, dev)
}
err = port.PR(bs, dryRun)
if !quiet {
if err != nil {
@ -348,28 +389,35 @@ func doPR(dev, fname string, dryRun, quiet bool) error {
} else {
fmt.Println("OK")
}
fmt.Printf("After : Interface ID: %q AFU ID: %q\n", port.GetInterfaceUUID(), port.GetAcceleratorTypeUUID())
}
return err
}
func listDevices(listFMEs, listPorts, quiet bool) error {
fmes, ports := fpga.ListFpgaDevices()
if listFMEs {
if !quiet {
fmt.Printf("Detected FPGA FMEs: %d\n", len(fmes))
}
for _, v := range fmes {
fmt.Println(v)
}
}
if listPorts {
if !quiet {
fmt.Printf("Detected FPGA Ports: %d\n", len(ports))
}
for _, v := range ports {
fmt.Println(v)
}
}
return nil
}

View File

@ -111,6 +111,7 @@ func getEnvVarNumber(envVarName string) uint64 {
return val
}
}
return 0
}
@ -157,6 +158,7 @@ func (lm labelMap) addNumericLabel(labelName string, valueToAdd int64) {
if numstr, ok := lm[labelName]; ok {
_, _ = fmt.Sscanf(numstr, "%d", &value)
}
value += valueToAdd
lm[labelName] = strconv.FormatInt(value, 10)
}
@ -205,19 +207,23 @@ scanning:
if !strings.HasPrefix(line, prefix) {
continue
}
fields := strings.Split(line, ": ")
if len(fields) == 2 {
action(fields[1])
} else {
klog.Warningf("invalid '%s' line format: '%s'", file.Name(), line)
}
delete(searchStringActionMap, prefix)
if len(searchStringActionMap) == 0 {
break scanning
}
break
}
}
if gen == "" {
// TODO: drop gen label before engine types
// start to have diverging major gen values
@ -226,6 +232,7 @@ scanning:
} else if media != "" {
gen = media
}
if gen != "" {
// truncate to major value
gen = strings.SplitN(gen, ".", 2)[0]
@ -264,6 +271,7 @@ func (l *labeler) createLabels() error {
l.labels.addNumericLabel(labelNamespace+"memory.max", int64(memoryAmount))
}
gpuCount := len(gpuNameList)
if gpuCount > 0 {
// add gpu list label (example: "card0.card1.card2")

View File

@ -279,11 +279,13 @@ func (tc *testcase) createFiles(t *testing.T, sysfs, root string) {
t.Fatalf("Failed to create fake capability file: %+v", err)
}
}
for _, sysfsdir := range tc.sysfsdirs {
if err := os.MkdirAll(path.Join(sysfs, sysfsdir), 0750); err != nil {
t.Fatalf("Failed to create fake sysfs directory: %+v", err)
}
}
for filename, body := range tc.sysfsfiles {
if err := os.WriteFile(path.Join(sysfs, filename), body, 0600); err != nil {
t.Fatalf("Failed to create fake vendor file: %+v", err)

View File

@ -28,6 +28,7 @@ const (
func main() {
l := newLabeler(sysfsDRMDirectory, debugfsDRIDirectory)
err := l.createLabels()
if err != nil {
klog.Errorf("%+v", err)

View File

@ -69,6 +69,7 @@ func nonePolicy(req *pluginapi.ContainerPreferredAllocationRequest) []string {
deviceIds := req.AvailableDeviceIDs[0:req.AllocationSize]
klog.V(2).Infof("Allocate deviceIds: %q", deviceIds)
return deviceIds
}
@ -93,15 +94,19 @@ func balancedPolicy(req *pluginapi.ContainerPreferredAllocationRequest) []string
Index = append(Index, key)
sort.Strings(Card[key])
}
sort.Strings(Index)
need := req.AllocationSize
var deviceIds []string
// We choose one device ID from the GPU card that has most shared gpu IDs each time.
for {
var allocateCard string
var max int
var (
allocateCard string
max int
)
for _, key := range Index {
if Count[key] > max {
@ -123,6 +128,7 @@ func balancedPolicy(req *pluginapi.ContainerPreferredAllocationRequest) []string
}
klog.V(2).Infof("Allocate deviceIds: %q", deviceIds)
return deviceIds
}
@ -135,6 +141,7 @@ func packedPolicy(req *pluginapi.ContainerPreferredAllocationRequest) []string {
deviceIds = deviceIds[:req.AllocationSize]
klog.V(2).Infof("Allocate deviceIds: %q", deviceIds)
return deviceIds
}
@ -168,6 +175,7 @@ func newDevicePlugin(sysfsDir, devfsDir string, options cliOptions) *devicePlugi
if options.resourceManagement {
var err error
dp.resMan, err = rm.NewResourceManager(monitorID, namespace+"/"+deviceType)
if err != nil {
klog.Errorf("Failed to create resource manager: %+v", err)
@ -199,7 +207,9 @@ func (dp *devicePlugin) GetPreferredAllocation(rqt *pluginapi.PreferredAllocatio
// Add a security check here. This should never happen unless there occurs error in kubelet device plugin manager.
if req.AllocationSize > int32(len(req.AvailableDeviceIDs)) {
klog.V(3).Info("req.AllocationSize must be not greater than len(req.AvailableDeviceIDs).")
var err = errors.Errorf("AllocationSize (%d) is greater then the number of available device IDs (%d)", req.AllocationSize, len(req.AvailableDeviceIDs))
return nil, err
}
@ -211,11 +221,13 @@ func (dp *devicePlugin) GetPreferredAllocation(rqt *pluginapi.PreferredAllocatio
response.ContainerResponses = append(response.ContainerResponses, resp)
}
return response, nil
}
func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
defer dp.scanTicker.Stop()
var previouslyFound = -1
for {
@ -245,15 +257,18 @@ func (dp *devicePlugin) isCompatibleDevice(name string) bool {
klog.V(4).Info("Not compatible device: ", name)
return false
}
dat, err := os.ReadFile(path.Join(dp.sysfsDir, name, "device/vendor"))
if err != nil {
klog.Warning("Skipping. Can't read vendor file: ", err)
return false
}
if strings.TrimSpace(string(dat)) != vendorString {
klog.V(4).Info("Non-Intel GPU: ", name)
return false
}
return true
}
@ -264,8 +279,10 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
}
var monitor []pluginapi.DeviceSpec
devTree := dpapi.NewDeviceTree()
rmDevInfos := rm.NewDeviceInfoMap()
for _, f := range files {
var nodes []pluginapi.DeviceSpec
@ -285,6 +302,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
//Skipping possible drm control node
continue
}
devPath := path.Join(dp.devfsDir, drmFile.Name())
if _, err := os.Stat(devPath); err != nil {
continue
@ -296,23 +314,29 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
ContainerPath: devPath,
Permissions: "rw",
}
if !isPFwithVFs {
klog.V(4).Infof("Adding %s to GPU %s", devPath, f.Name())
nodes = append(nodes, devSpec)
}
if dp.options.enableMonitoring {
klog.V(4).Infof("Adding %s to GPU %s/%s", devPath, monitorType, monitorID)
monitor = append(monitor, devSpec)
}
}
if len(nodes) > 0 {
deviceInfo := dpapi.NewDeviceInfo(pluginapi.Healthy, nodes, nil, nil)
for i := 0; i < dp.options.sharedDevNum; i++ {
devID := fmt.Sprintf("%s-%d", f.Name(), i)
// Currently only one device type (i915) is supported.
// TODO: check model ID to differentiate device models.
devTree.AddDevice(deviceType, devID, deviceInfo)
rmDevInfos[devID] = rm.NewDeviceInfo(nodes, nil, nil)
}
}
@ -362,6 +386,7 @@ func main() {
klog.Error("invalid value for preferredAllocationPolicy, the valid values: balanced, packed, none")
os.Exit(1)
}
klog.V(1).Infof("GPU device plugin started with %s preferred allocation policy", opts.preferredAllocationPolicy)
plugin := newDevicePlugin(sysfsDrmDirectory, devfsDriDirectory, opts)

View File

@ -62,16 +62,19 @@ func createTestFiles(root string, devfsdirs, sysfsdirs []string, sysfsfiles map[
return "", "", errors.Wrap(err, "Failed to create fake device directory")
}
}
for _, sysfsdir := range sysfsdirs {
if err := os.MkdirAll(path.Join(sysfs, sysfsdir), 0750); err != nil {
return "", "", errors.Wrap(err, "Failed to create fake device directory")
}
}
for filename, body := range sysfsfiles {
if err := os.WriteFile(path.Join(sysfs, filename), body, 0600); err != nil {
return "", "", errors.Wrap(err, "Failed to create fake vendor file")
}
}
return sysfs, devfs, nil
}
@ -79,6 +82,7 @@ func TestNewDevicePlugin(t *testing.T) {
if newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: false}) == nil {
t.Error("Failed to create plugin")
}
if newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: true}) != nil {
t.Error("Unexpectedly managed to create resource management enabled plugin inside unit tests")
}
@ -134,6 +138,7 @@ func TestGetPreferredAllocation(t *testing.T) {
func TestAllocate(t *testing.T) {
plugin := newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: false})
_, err := plugin.Allocate(&v1beta1.AllocateRequest{})
if _, ok := err.(*dpapi.UseDefaultMethodError); !ok {
t.Errorf("Unexpected return value: %+v", err)
@ -141,6 +146,7 @@ func TestAllocate(t *testing.T) {
// mock the rm
plugin.resMan = &mockResourceManager{}
_, err = plugin.Allocate(&v1beta1.AllocateRequest{})
if _, ok := err.(*dpapi.UseDefaultMethodError); !ok {
t.Errorf("Unexpected return value: %+v", err)
@ -266,6 +272,7 @@ func TestScan(t *testing.T) {
if tc.options.sharedDevNum == 0 {
tc.options.sharedDevNum = 1
}
t.Run(tc.name, func(t *testing.T) {
root, err := os.MkdirTemp("", "test_new_device_plugin")
if err != nil {

View File

@ -140,6 +140,7 @@ func (rm *resourceManager) ReallocateWithFractionalResources(request *pluginapi.
if _, ok := err.(*retryErr); ok {
klog.Warning("retrying POD resolving after sleeping")
time.Sleep(retryTimeout)
podCandidate, err = rm.findAllocationPodCandidate()
}
@ -204,29 +205,37 @@ func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {
case 1:
// perfect, only one option
klog.V(4).Info("only one pending pod")
if _, ok := candidates[0].pod.Annotations[gasCardAnnotation]; !ok {
klog.Warningf("Pending POD annotations from scheduler not yet visible for pod %q", candidates[0].pod.Name)
return nil, &retryErr{}
}
return &candidates[0], nil
default: // > 1 candidates, not good, need to pick the best
// look for scheduler timestamps and sort by them
klog.V(4).Infof("%v pods pending, picking oldest", numCandidates)
timestampedCandidates := []podCandidate{}
for _, candidate := range candidates {
if _, ok := pendingPods[candidate.name].Annotations[gasTSAnnotation]; ok {
timestampedCandidates = append(timestampedCandidates, candidate)
}
}
sort.Slice(timestampedCandidates,
func(i, j int) bool {
return pendingPods[timestampedCandidates[i].name].Annotations[gasTSAnnotation] <
pendingPods[timestampedCandidates[j].name].Annotations[gasTSAnnotation]
})
if len(timestampedCandidates) == 0 {
klog.Warning("Pending POD annotations from scheduler not yet visible")
return nil, &retryErr{}
}
return &timestampedCandidates[0], nil
}
}
@ -252,6 +261,7 @@ func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) {
// make a map ouf of the list, accept only GPU-using pods
pendingPods := make(map[string]*v1.Pod)
for i := range pendingPodList.Items {
pod := &pendingPodList.Items[i]
@ -273,6 +283,7 @@ func (rm *resourceManager) findAllocationPodCandidates(pendingPods map[string]*v
}
defer clientConn.Close()
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()
@ -282,9 +293,11 @@ func (rm *resourceManager) findAllocationPodCandidates(pendingPods map[string]*v
}
candidates := []podCandidate{}
for _, podRes := range resp.PodResources {
// count allocated gpu-using containers
numContainersAllocated := 0
for _, cont := range podRes.Containers {
for _, dev := range cont.Devices {
if dev.ResourceName == rm.fullResourceName {
@ -349,6 +362,7 @@ func (rm *resourceManager) createAllocateResponse(cards []string) (*pluginapi.Al
if cresp.Envs == nil {
cresp.Envs = make(map[string]string)
}
cresp.Envs[key] = value
}
}
@ -360,6 +374,7 @@ func (rm *resourceManager) createAllocateResponse(cards []string) (*pluginapi.Al
func numGPUUsingContainers(pod *v1.Pod, fullResourceName string) int {
num := 0
for _, container := range pod.Spec.Containers {
for reqName, quantity := range container.Resources.Requests {
resourceName := reqName.String()
@ -372,6 +387,7 @@ func numGPUUsingContainers(pod *v1.Pod, fullResourceName string) int {
}
}
}
return num
}
@ -383,6 +399,7 @@ func containerCards(pod *v1.Pod, gpuUsingContainerIndex int) []string {
klog.V(3).Infof("%s:%v", fullAnnotation, cardLists)
i := 0
for _, cardList := range cardLists {
cards := strings.Split(cardList, ",")
if len(cards) > 0 && len(cardList) > 0 {
@ -393,7 +410,9 @@ func containerCards(pod *v1.Pod, gpuUsingContainerIndex int) []string {
i++
}
}
klog.Warningf("couldn't find cards for gpu using container index %v", gpuUsingContainerIndex)
return nil
}

View File

@ -75,6 +75,7 @@ func (w *mockPodResources) List(ctx context.Context,
Name: pod.ObjectMeta.Name, Containers: []*podresourcesv1.ContainerResources{{}},
})
}
return &resp, nil
}
func (w *mockPodResources) GetAllocatableResources(ctx context.Context,
@ -216,6 +217,7 @@ func TestReallocateWithFractionalResources(t *testing.T) {
if (err != nil) && !tCase.expectErr {
t.Errorf("test %v unexpected failure, err:%v", tCase.name, err)
}
if err == nil {
if tCase.expectErr {
t.Errorf("test %v unexpected success", tCase.name)

View File

@ -31,5 +31,6 @@ func GetSriovNumVFs(sysFSPath string) string {
if err != nil {
return "-1"
}
return strings.TrimSpace(string(dat))
}

View File

@ -46,6 +46,7 @@ var (
setupLog = ctrl.Log.WithName("setup")
)
// nolint:wsl
func init() {
// Add schemes for DaemonSets, Pods etc...
_ = clientgoscheme.AddToScheme(scheme)
@ -78,6 +79,7 @@ func (flag *flagList) Set(value string) error {
}
*flag = append(*flag, value)
return nil
}
@ -87,14 +89,17 @@ func contains(arr []string, val string) bool {
return true
}
}
return false
}
func main() {
var metricsAddr string
var devicePluginNamespace string
var enableLeaderElection bool
var pm *patcher.Manager
var (
metricsAddr string
devicePluginNamespace string
enableLeaderElection bool
pm *patcher.Manager
)
ctrl.SetLogger(klogr.New())
@ -179,6 +184,7 @@ func main() {
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)

View File

@ -114,22 +114,28 @@ func (dp *DevicePlugin) getDpdkDevice(vfBdf string) (string, error) {
switch dp.dpdkDriver {
case igbUio:
uioDirPath := filepath.Join(dp.pciDeviceDir, vfBdf, uioSuffix)
files, err := os.ReadDir(uioDirPath)
if err != nil {
return "", err
}
if len(files) == 0 {
return "", errors.New("No devices found")
}
return files[0].Name(), nil
case vfioPci:
vfioDirPath := filepath.Join(dp.pciDeviceDir, vfBdf, iommuGroupSuffix)
group, err := filepath.EvalSymlinks(vfioDirPath)
if err != nil {
return "", errors.WithStack(err)
}
s := filepath.Base(group)
return s, nil
default:
@ -142,6 +148,7 @@ func (dp *DevicePlugin) getDpdkDeviceSpecs(dpdkDeviceName string) []pluginapi.De
case igbUio:
//Setting up with uio
uioDev := filepath.Join(uioDevicePath, dpdkDeviceName)
return []pluginapi.DeviceSpec{
{
HostPath: uioDev,
@ -152,6 +159,7 @@ func (dp *DevicePlugin) getDpdkDeviceSpecs(dpdkDeviceName string) []pluginapi.De
case vfioPci:
//Setting up with vfio
vfioDev := filepath.Join(vfioDevicePath, dpdkDeviceName)
return []pluginapi.DeviceSpec{
{
HostPath: vfioDev,
@ -174,6 +182,7 @@ func (dp *DevicePlugin) getDpdkMounts(dpdkDeviceName string) []pluginapi.Mount {
case igbUio:
//Setting up with uio mountpoints
uioMountPoint := filepath.Join(uioMountPath, dpdkDeviceName, "/device")
return []pluginapi.Mount{
{
HostPath: uioMountPoint,
@ -205,16 +214,19 @@ func (dp *DevicePlugin) bindDevice(vfBdf string) error {
if err := os.WriteFile(unbindDevicePath, []byte(vfBdf), 0600); !os.IsNotExist(err) {
return errors.Wrapf(err, "Unbinding from kernel driver failed for the device %s", vfBdf)
}
vfdevID, err := dp.getDeviceID(vfBdf)
if err != nil {
return err
}
bindDevicePath := filepath.Join(dp.pciDriverDir, dp.dpdkDriver, newIDSuffix)
//Bind to the the dpdk driver
err = os.WriteFile(bindDevicePath, []byte(vendorPrefix+vfdevID), 0600)
if err != nil {
return errors.Wrapf(err, "Binding to the DPDK driver failed for the device %s", vfBdf)
}
return nil
}
@ -224,6 +236,7 @@ func isValidKernelDriver(kernelvfDriver string) bool {
return true
}
}
return false
}
@ -232,6 +245,7 @@ func isValidDpdkDeviceDriver(dpdkDriver string) bool {
case igbUio, vfioPci:
return true
}
return false
}
@ -252,14 +266,18 @@ func (dp *DevicePlugin) isValidVfDeviceID(vfDevID string) bool {
// PostAllocate implements PostAllocator interface for vfio based QAT plugin.
func (dp *DevicePlugin) PostAllocate(response *pluginapi.AllocateResponse) error {
tempMap := make(map[string]string)
for _, cresp := range response.ContainerResponses {
counter := 0
for k := range cresp.Envs {
tempMap[strings.Join([]string{envVarPrefix, strconv.Itoa(counter)}, "")] = cresp.Envs[k]
counter++
}
cresp.Envs = tempMap
}
return nil
}
@ -278,6 +296,7 @@ func getPciDevicesWithPattern(pattern string) (pciDevices []string) {
klog.Warningf("unable to evaluate symlink: %s", devBdf)
continue
}
pciDevices = append(pciDevices, targetDev)
}
@ -316,11 +335,13 @@ func (dp *DevicePlugin) getVfDevices() []string {
func getCurrentDriver(device string) string {
symlink := filepath.Join(device, "driver")
driver, err := filepath.EvalSymlinks(symlink)
if err != nil {
klog.Infof("no driver bound to device %q", filepath.Base(device))
return ""
}
return filepath.Base(driver)
}
@ -339,8 +360,8 @@ func (dp *DevicePlugin) scan() (dpapi.DeviceTree, error) {
if !dp.isValidVfDeviceID(vfdevID) {
continue
}
n = n + 1
n = n + 1
if n > dp.maxDevices {
break
}

View File

@ -427,6 +427,7 @@ func eleInSlice(a string, list []string) bool {
return true
}
}
return false
}
func TestPostAllocate(t *testing.T) {
@ -452,10 +453,12 @@ func TestPostAllocate(t *testing.T) {
"03:04.3": {},
"03:04.4": {},
}
dp := &DevicePlugin{}
if err := dp.PostAllocate(response); err != nil {
t.Errorf("Unexpected error: %+v", err)
}
if len(response.ContainerResponses[0].Envs) != 4 {
t.Fatal("Set wrong number of Environment Variables")
}

View File

@ -76,19 +76,23 @@ func getDevTree(sysfs string, qatDevs []device, config map[string]section) (dpap
newDeviceSpec("/dev/qat_dev_processes"),
newDeviceSpec("/dev/usdm_drv"),
}
for _, qatDev := range qatDevs {
uiodevs, err := getUIODevices(sysfs, qatDev.devtype, qatDev.bsf)
if err != nil {
return nil, err
}
for _, uiodev := range uiodevs {
devs = append(devs, newDeviceSpec(filepath.Join("/dev/", uiodev)))
}
}
uniqID := 0
for sname, svalue := range config {
devType := fmt.Sprintf("cy%d_dc%d", svalue.cryptoEngines, svalue.compressionEngines)
for _, ep := range svalue.endpoints {
for i := 0; i < ep.processes; i++ {
envs := map[string]string{
@ -137,6 +141,7 @@ func (dp *DevicePlugin) getOnlineDevices(iommuOn bool) ([]device, error) {
}
devices := []device{}
for _, line := range strings.Split(string(outputBytes[:]), "\n") {
matches := adfCtlRegex.FindStringSubmatch(line)
if len(matches) == 0 {
@ -192,6 +197,7 @@ func getUIODevices(sysfs, devtype, bsf string) ([]string, error) {
func (dp *DevicePlugin) parseConfigs(devices []device) (map[string]section, error) {
devNum := 0
drvConfig := make(driverConfig)
for _, dev := range devices {
// Parse the configuration.
config, err := ini.Load(filepath.Join(dp.configDir, fmt.Sprintf("%s_%s.conf", dev.devtype, dev.id)))
@ -204,7 +210,9 @@ func (dp *DevicePlugin) parseConfigs(devices []device) (map[string]section, erro
if section.Name() == "GENERAL" || section.Name() == "KERNEL" || section.Name() == "KERNEL_QAT" || section.Name() == ini.DefaultSection {
continue
}
klog.V(4).Info(section.Name())
if err := drvConfig.update(dev.id, section); err != nil {
return nil, err
}
@ -226,15 +234,19 @@ func (drvConfig driverConfig) update(devID string, iniSection *ini.Section) erro
if err != nil {
return errors.Wrapf(err, "Can't parse NumProcesses in %s", iniSection.Name())
}
cryptoEngines, err := iniSection.Key("NumberCyInstances").Int()
if err != nil {
return errors.Wrapf(err, "Can't parse NumberCyInstances in %s", iniSection.Name())
}
compressionEngines, err := iniSection.Key("NumberDcInstances").Int()
if err != nil {
return errors.Wrapf(err, "Can't parse NumberDcInstances in %s", iniSection.Name())
}
pinned := false
if limitDevAccessKey, err := iniSection.GetKey("LimitDevAccess"); err == nil {
limitDevAccess, err := limitDevAccessKey.Bool()
if err != nil {
@ -251,9 +263,11 @@ func (drvConfig driverConfig) update(devID string, iniSection *ini.Section) erro
if old.pinned != pinned {
return errors.Errorf("Value of LimitDevAccess must be consistent across all devices in %s", iniSection.Name())
}
if !pinned && old.endpoints[0].processes != numProcesses {
return errors.Errorf("For not pinned section \"%s\" NumProcesses must be equal for all devices", iniSection.Name())
}
if old.cryptoEngines != cryptoEngines || old.compressionEngines != compressionEngines {
return errors.Errorf("NumberCyInstances and NumberDcInstances must be consistent across all devices in %s", iniSection.Name())
}
@ -329,15 +343,19 @@ func (dp *DevicePlugin) PostAllocate(response *pluginapi.AllocateResponse) error
envsToDelete := []string{}
envsToAdd := make(map[string]string)
counter := 0
for key, value := range containerResponse.Envs {
if !strings.HasPrefix(key, "QAT_SECTION_NAME_") {
continue
}
parts := strings.Split(key, "_")
if len(parts) != 6 {
return errors.Errorf("Wrong format of env variable name %s", key)
}
prefix := strings.Join(parts[0:5], "_")
envsToDelete = append(envsToDelete, key)
envsToAdd[fmt.Sprintf("%s_%d", prefix, counter)] = value
counter++

View File

@ -225,14 +225,17 @@ func TestParseConfigs(t *testing.T) {
expectedErr: true,
},
}
for _, tt := range tcases {
dp := &DevicePlugin{
configDir: "./test_data/" + tt.testData,
}
_, err := dp.parseConfigs(qatdevs)
if tt.expectedErr && err == nil {
t.Errorf("Test case '%s': expected error hasn't been triggered", tt.name)
}
if !tt.expectedErr && err != nil {
t.Errorf("Test case '%s': Unexpected error: %+v", tt.name, err)
}
@ -303,6 +306,7 @@ func TestGetDevTree(t *testing.T) {
expectedErr: true,
},
}
for _, tt := range tcases {
t.Run(tt.name, func(t *testing.T) {
var err error
@ -385,9 +389,11 @@ func TestPostAllocate(t *testing.T) {
if tc.expectedErr && err == nil {
t.Errorf("Test case '%s': expected error hasn't been triggered", tc.name)
}
if !tc.expectedErr && err != nil {
t.Errorf("Test case '%s': Unexpected error: %+v", tc.name, err)
}
klog.V(4).Info(response)
}
}

View File

@ -32,8 +32,10 @@ const (
)
func main() {
var plugin deviceplugin.Scanner
var err error
var (
plugin deviceplugin.Scanner
err error
)
mode := flag.String("mode", "dpdk", "plugin mode which can be either dpdk (default) or kernel")
@ -50,12 +52,15 @@ func main() {
default:
err = errors.Errorf("Unknown mode: %s", *mode)
}
if err != nil {
fmt.Println(err.Error())
os.Exit(1)
}
klog.V(1).Infof("QAT device plugin started in '%s' mode", *mode)
manager := deviceplugin.NewManager(namespace, plugin)
manager.Run()
}

View File

@ -36,8 +36,11 @@ func init() {
}
func main() {
var metricsAddr string
var enableLeaderElection bool
var (
metricsAddr string
enableLeaderElection bool
)
flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
flag.BoolVar(&enableLeaderElection, "enable-leader-election", false,
"Enable leader election for controller manager. "+
@ -64,6 +67,7 @@ func main() {
})
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)

View File

@ -45,6 +45,7 @@ type patchNodeOp struct {
func main() {
var register, affirm, label, daemon bool
flag.BoolVar(&register, "register", false, "register EPC as extended resource")
flag.BoolVar(&affirm, "affirm", false, "return error if EPC is not available")
flag.BoolVar(&label, "node-label", false, "create node label")
@ -55,11 +56,13 @@ func main() {
// get the EPC size
var epcSize uint64
if cpuid.CPU.SGX.Available {
for _, s := range cpuid.CPU.SGX.EPCSections {
epcSize += s.EPCSize
}
}
klog.Infof("epc capacity: %d bytes", epcSize)
if epcSize == 0 && affirm {
@ -77,7 +80,9 @@ func main() {
if daemon {
klog.Info("waiting for termination signal")
term := make(chan os.Signal, 1)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
<-term
}
@ -93,6 +98,7 @@ func updateNode(epcSize uint64, register, label bool) error {
Value: epcSize,
})
}
if label && epcSize > 0 {
payload = append(payload, patchNodeOp{
Op: "add",
@ -100,6 +106,7 @@ func updateNode(epcSize uint64, register, label bool) error {
Value: "true",
})
}
if len(payload) == 0 {
return nil
}
@ -129,5 +136,6 @@ func updateNode(epcSize uint64, register, label bool) error {
// patch the node
_, err = clientset.CoreV1().Nodes().Patch(context.TODO(), node.Name, types.JSONPatchType, payloadBytes, metav1.PatchOptions{}, "status")
return err
}

View File

@ -58,10 +58,12 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
if err != nil {
return err
}
notifier.Notify(devTree)
// Wait forever to prevent manager run loop from exiting.
<-dp.scanDone
return nil
}
@ -71,10 +73,12 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
// Assume that both /dev/sgx_enclave and /dev/sgx_provision must be present.
sgxEnclavePath := path.Join(dp.devfsDir, "sgx_enclave")
sgxProvisionPath := path.Join(dp.devfsDir, "sgx_provision")
if _, err := os.Stat(sgxEnclavePath); err != nil {
klog.Error("No SGX enclave file available: ", err)
return devTree, nil
}
if _, err := os.Stat(sgxProvisionPath); err != nil {
klog.Error("No SGX provision file available: ", err)
return devTree, nil
@ -92,11 +96,13 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
nodes := []pluginapi.DeviceSpec{{HostPath: sgxEnclavePath, ContainerPath: sgxEnclavePath, Permissions: "rw"}}
devTree.AddDevice(deviceTypeEnclave, devID, dpapi.NewDeviceInfo(pluginapi.Healthy, nodes, deprecatedMounts, nil))
}
for i := uint(0); i < dp.nProvision; i++ {
devID := fmt.Sprintf("%s-%d", "sgx-provision", i)
nodes := []pluginapi.DeviceSpec{{HostPath: sgxProvisionPath, ContainerPath: sgxProvisionPath, Permissions: "rw"}}
devTree.AddDevice(deviceTypeProvision, devID, dpapi.NewDeviceInfo(pluginapi.Healthy, nodes, deprecatedMounts, nil))
}
return devTree, nil
}
@ -106,7 +112,6 @@ func getDefaultPodCount(nCPUs uint) uint {
// either via "--pods-per-core" or "--max-pods" kubelet options. We get the
// limit by multiplying the number of cores in the system with env variable
// "PODS_PER_CORE".
envPodsPerCore := os.Getenv(podsPerCoreEnvVariable)
if envPodsPerCore != "" {
tmp, err := strconv.ParseUint(envPodsPerCore, 10, 32)
@ -121,8 +126,7 @@ func getDefaultPodCount(nCPUs uint) uint {
}
func main() {
var enclaveLimit uint
var provisionLimit uint
var enclaveLimit, provisionLimit uint
podCount := getDefaultPodCount(uint(runtime.NumCPU()))

View File

@ -98,6 +98,7 @@ func getPciDeviceCounts(sysfsPciDevicesPath string, vendorID string, pidSearch [
}
}
}
return found, nil
}
@ -125,6 +126,7 @@ func newDevicePlugin(deviceCtx interface{}, sharedDevNum int) *devicePlugin {
klog.V(1).Info("The number of containers sharing the same VPU must greater than zero")
return nil
}
return &devicePlugin{
deviceCtx: deviceCtx,
sharedDevNum: sharedDevNum,
@ -135,6 +137,7 @@ func newDevicePlugin(deviceCtx interface{}, sharedDevNum int) *devicePlugin {
func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
defer dp.scanTicker.Stop()
for {
devTree, err := dp.scan()
if err != nil {
@ -302,8 +305,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
}
func main() {
var sharedDevNum int
var scanMode int
var sharedDevNum, scanMode int
flag.IntVar(&sharedDevNum, "shared-dev-num", 1, "number of containers sharing the same VPU device")
flag.IntVar(&scanMode, "mode", 1, "USB=1 PCI=2")
@ -312,6 +314,7 @@ func main() {
klog.V(1).Info("VPU device plugin started")
var plugin *devicePlugin
if scanMode == 1 {
// add lsusb here
ctx := gousb.NewContext()
@ -322,15 +325,19 @@ func main() {
// gousb (libusb) Debug levels are a 1:1 match to klog levels, just pass through.
ctx.Debug(verbosityLevel)
}
deviceCtxUsb := devicePluginUsb{usbContext: ctx, vendorID: vendorID, productIDs: productIDs}
plugin = newDevicePlugin(deviceCtxUsb, sharedDevNum)
} else if scanMode == 2 {
deviceCtxPci := devicePluginPci{sysfsPciDevicesPath: sysBusPCIDevice, vendorIDPCI: vendorIDIntel, productIDsPCI: productIDsPCI}
plugin = newDevicePlugin(deviceCtxPci, sharedDevNum)
}
if plugin == nil {
klog.Fatal("Cannot create device plugin, please check above error messages.")
}
manager := dpapi.NewManager(namespace, plugin)
manager.Run()
}

View File

@ -38,6 +38,7 @@ type testCase struct {
//OpenDevices tries to inject gousb compatible fake device info.
func (t *testCase) OpenDevices(opener func(desc *gousb.DeviceDesc) bool) ([]*gousb.Device, error) {
var ret []*gousb.Device
for _, p := range t.productIDs {
desc := &gousb.DeviceDesc{
Vendor: gousb.ID(t.vendorID),
@ -48,6 +49,7 @@ func (t *testCase) OpenDevices(opener func(desc *gousb.DeviceDesc) bool) ([]*gou
ret = append(ret, &gousb.Device{Desc: desc})
}
}
return ret, nil
}
@ -64,16 +66,20 @@ func createDevice(pciBusRootDir string, bdf string, vid string, pid string) erro
if err != nil {
return err
}
err = os.WriteFile(filepath.Join(pciBusRootDir, bdf, "device"), pidHex, 0444)
if err != nil {
return err
}
return nil
}
func createTestPCI(folder string, testPCI []PCIPidDeviceType) error {
var busNum = 1
var devNum = 3
//Loop for all supported device type
for _, pciPid := range testPCI {
//Loop for pid number
@ -87,6 +93,7 @@ func createTestPCI(folder string, testPCI []PCIPidDeviceType) error {
}
}
}
return nil
}
@ -115,7 +122,9 @@ func TestScanPci(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpPciDir)
//create supported PCI devices file
if err = createTestPCI(tmpPciDir, productIDsPCI); err != nil {
t.Fatal(err)
@ -128,15 +137,18 @@ func TestScanPci(t *testing.T) {
}
fN.scanDone = testPlugin.scanDone
err = testPlugin.Scan(&fN)
if err != nil {
t.Error("vpu plugin test failed with testPlugin.Scan()")
}
//Loop for all supported PCI device type
for _, pciPid := range productIDsPCI {
if len(fN.tree[pciPid.deviceType]) == 0 {
t.Error("vpu plugin test failed with testPlugin.Scan(): tree len is 0")
}
klog.V(4).Infof("tree len of pci %s is %d", pciPid.deviceType, len(fN.tree[pciPid.deviceType]))
}
@ -150,10 +162,12 @@ func TestScanPci(t *testing.T) {
}
fN.scanDone = testPlugin.scanDone
err = testPlugin.Scan(&fN)
if err != nil {
t.Error("vpu plugin test failed with testPlugin.Scan() in no hddl_service.sock case.")
}
if len(fN.tree[deviceType]) != 0 {
t.Error("vpu plugin test failed with testPlugin.Scan(): tree len should be 0 in no hddl_service.sock case.")
}
@ -186,13 +200,16 @@ func TestScan(t *testing.T) {
}
fN.scanDone = testPlugin.scanDone
err = testPlugin.Scan(&fN)
if err != nil {
t.Error("vpu plugin test failed with testPlugin.Scan()")
}
if len(fN.tree[deviceType]) == 0 {
t.Error("vpu plugin test failed with testPlugin.Scan(): tree len is 0")
}
klog.V(4).Infof("tree len of usb is %d", len(fN.tree[deviceType]))
//remove the hddl_service.sock and test with no hddl socket case
@ -205,10 +222,12 @@ func TestScan(t *testing.T) {
}
fN.scanDone = testPlugin.scanDone
err = testPlugin.Scan(&fN)
if err != nil {
t.Error("vpu plugin test failed with testPlugin.Scan() in no hddl_service.sock case.")
}
if len(fN.tree[deviceType]) != 0 {
t.Error("vpu plugin test failed with testPlugin.Scan(): tree len should be 0 in no hddl_service.sock case.")
}

View File

@ -67,9 +67,11 @@ func SGXPluginDaemonSet() *apps.DaemonSet {
// getDaemonset unmarshalls yaml content into a DaemonSet object.
func getDaemonset(content []byte) *apps.DaemonSet {
var result apps.DaemonSet
err := yaml.Unmarshal(content, &result)
if err != nil {
panic(err)
}
return &result
}

View File

@ -29,7 +29,6 @@ const imageMinVersion string = "0.22.0"
func validatePluginImage(image, expectedImageName string, expectedMinVersion *version.Version) error {
// Ignore registry, vendor and extract the image name with the tag
parts := strings.SplitN(filepath.Base(image), ":", 2)
if len(parts) != 2 {
return errors.Errorf("incorrect image field %q", image)

View File

@ -81,6 +81,7 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet {
if len(devicePlugin.Spec.NodeSelector) > 0 {
daemonSet.Spec.Template.Spec.NodeSelector = devicePlugin.Spec.NodeSelector
}
daemonSet.ObjectMeta.Namespace = c.ns
daemonSet.Spec.Template.Spec.Containers[0].Args = getPodArgs(devicePlugin)

View File

@ -128,6 +128,7 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet
},
},
}
return &daemonSet
}

View File

@ -80,26 +80,32 @@ func (c *controller) GetTotalObjectCount(ctx context.Context, clnt client.Client
func removeInitContainer(ds *apps.DaemonSet, dp *devicepluginv1.DsaDevicePlugin) {
newInitContainers := []v1.Container{}
for _, container := range ds.Spec.Template.Spec.InitContainers {
if container.Name == inicontainerName {
continue
}
newInitContainers = append(newInitContainers, container)
}
ds.Spec.Template.Spec.InitContainers = newInitContainers
ds.Spec.Template.Spec.InitContainers = newInitContainers
newVolumes := []v1.Volume{}
for _, volume := range ds.Spec.Template.Spec.Volumes {
if volume.Name == "intel-dsa-config-volume" || volume.Name == "sys-devices" {
continue
}
newVolumes = append(newVolumes, volume)
}
ds.Spec.Template.Spec.Volumes = newVolumes
}
func addInitContainer(ds *apps.DaemonSet, dp *devicepluginv1.DsaDevicePlugin) {
yes := true
ds.Spec.Template.Spec.InitContainers = append(ds.Spec.Template.Spec.InitContainers, v1.Container{
Image: dp.Spec.InitImage,
ImagePullPolicy: "IfNotPresent",
@ -164,6 +170,7 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet {
if len(devicePlugin.Spec.NodeSelector) > 0 {
daemonSet.Spec.Template.Spec.NodeSelector = devicePlugin.Spec.NodeSelector
}
daemonSet.ObjectMeta.Namespace = c.ns
daemonSet.Spec.Template.Spec.Containers[0].Args = getPodArgs(devicePlugin)
daemonSet.Spec.Template.Spec.Containers[0].Image = devicePlugin.Spec.Image
@ -183,6 +190,7 @@ func provisioningUpdate(ds *apps.DaemonSet, dp *devicepluginv1.DsaDevicePlugin)
if container.Name == inicontainerName && container.Image != dp.Spec.InitImage {
found = true
update = true
break
}
}
@ -211,9 +219,11 @@ func (c *controller) UpdateDaemonSet(rawObj client.Object, ds *apps.DaemonSet) (
if provisioningUpdate(ds, dp) {
removeInitContainer(ds, dp)
if dp.Spec.InitImage != "" {
addInitContainer(ds, dp)
}
updated = true
}

View File

@ -81,7 +81,9 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet {
if len(devicePlugin.Spec.NodeSelector) > 0 {
daemonSet.Spec.Template.Spec.NodeSelector = devicePlugin.Spec.NodeSelector
}
daemonSet.ObjectMeta.Namespace = c.ns
daemonSet.Spec.Template.Spec.Containers[0].Args = getPodArgs(devicePlugin)
daemonSet.Spec.Template.Spec.Containers[0].Image = devicePlugin.Spec.Image

View File

@ -36,6 +36,7 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet
devicePlugin := rawObj.(*devicepluginv1.FpgaDevicePlugin)
yes := true
directoryOrCreate := v1.HostPathDirectoryOrCreate
return &apps.DaemonSet{
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",

View File

@ -88,8 +88,10 @@ func (c *controller) NewServiceAccount(rawObj client.Object) *v1.ServiceAccount
Namespace: c.ns,
},
}
return &sa
}
return nil
}
@ -114,8 +116,10 @@ func (c *controller) NewClusterRoleBinding(rawObj client.Object) *rbacv1.Cluster
APIGroup: "rbac.authorization.k8s.io",
},
}
return &rb
}
return nil
}
@ -126,6 +130,7 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet {
if len(devicePlugin.Spec.NodeSelector) > 0 {
daemonSet.Spec.Template.Spec.NodeSelector = devicePlugin.Spec.NodeSelector
}
daemonSet.ObjectMeta.Namespace = c.ns
daemonSet.Spec.Template.Spec.Containers[0].Args = getPodArgs(devicePlugin)
daemonSet.Spec.Template.Spec.Containers[0].Image = devicePlugin.Spec.Image
@ -200,20 +205,25 @@ func setInitContainer(spec *v1.PodSpec, imageName string) {
func removeVolume(volumes []v1.Volume, name string) []v1.Volume {
newVolumes := []v1.Volume{}
for _, volume := range volumes {
if volume.Name != name {
newVolumes = append(newVolumes, volume)
}
}
return newVolumes
}
func removeVolumeMount(volumeMounts []v1.VolumeMount, name string) []v1.VolumeMount {
newVolumeMounts := []v1.VolumeMount{}
for _, volume := range volumeMounts {
if volume.Name != name {
newVolumeMounts = append(newVolumeMounts, volume)
}
}
return newVolumeMounts
}
@ -256,6 +266,7 @@ func (c *controller) UpdateDaemonSet(rawObj client.Object, ds *apps.DaemonSet) (
if dp.Spec.ResourceManager {
newServiceAccountName = serviceAccountName
}
if ds.Spec.Template.Spec.ServiceAccountName != newServiceAccountName {
ds.Spec.Template.Spec.ServiceAccountName = newServiceAccountName
if dp.Spec.ResourceManager {
@ -265,6 +276,7 @@ func (c *controller) UpdateDaemonSet(rawObj client.Object, ds *apps.DaemonSet) (
ds.Spec.Template.Spec.Volumes = removeVolume(ds.Spec.Template.Spec.Volumes, "podresources")
ds.Spec.Template.Spec.Containers[0].VolumeMounts = removeVolumeMount(ds.Spec.Template.Spec.Containers[0].VolumeMounts, "podresources")
}
updated = true
}

View File

@ -182,6 +182,7 @@ func TestNewDamonSetGPU(t *testing.T) {
if tc.resourceManager {
plugin.Spec.ResourceManager = true
}
if tc.isInitImage {
plugin.Spec.InitImage = "intel/intel-gpu-initcontainer:devel"
}

View File

@ -82,9 +82,11 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet {
daemonSet := deployments.QATPluginDaemonSet()
daemonSet.Annotations = annotations
daemonSet.Spec.Template.Annotations = annotations
if len(devicePlugin.Spec.NodeSelector) > 0 {
daemonSet.Spec.Template.Spec.NodeSelector = devicePlugin.Spec.NodeSelector
}
daemonSet.ObjectMeta.Namespace = c.ns
daemonSet.Spec.Template.Spec.Containers[0].Args = getPodArgs(devicePlugin)
daemonSet.Spec.Template.Spec.Containers[0].Image = devicePlugin.Spec.Image
@ -172,6 +174,7 @@ func getPodArgs(qdp *devicepluginv1.QatDevicePlugin) []string {
for i, v := range qdp.Spec.KernelVfDrivers {
drvs[i] = string(v)
}
args = append(args, "-kernel-vf-drivers", strings.Join(drvs, ","))
} else {
args = append(args, "-kernel-vf-drivers", "dh895xccvf,c6xxvf,c3xxxvf,d15xxvf")

View File

@ -36,6 +36,7 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet
devicePlugin := rawObj.(*devicepluginv1.QatDevicePlugin)
yes := true
pluginAnnotations := devicePlugin.ObjectMeta.DeepCopy().Annotations
return &apps.DaemonSet{
TypeMeta: metav1.TypeMeta{
Kind: "DaemonSet",

View File

@ -203,6 +203,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
log.Error(err, "unable to list child Pods of the controlled daemon set")
return ctrl.Result{}, err
}
nodeNames := make([]string, len(pods.Items))
for i, pod := range pods.Items {
nodeNames[i] = pod.Spec.NodeName
@ -213,6 +214,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
if err != nil {
return ctrl.Result{}, err
}
if statusUpdated {
if err := r.Status().Update(ctx, devicePlugin); apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
@ -357,6 +359,7 @@ func (r *reconciler) updateBookKeeper(ctx context.Context) error {
}
bKeeper.set(r.pluginKind, count)
return nil
}
@ -426,10 +429,12 @@ func (r *reconciler) maybeDeleteDaemonSets(ctx context.Context, err error, daemo
}
log.V(1).Info("deleted DaemonSets owned by deleted custom device plugin object")
return ctrl.Result{}, nil
}
log.Error(err, "unable to fetch custom device plugin object")
return ctrl.Result{}, err
}
@ -437,6 +442,7 @@ func (r *reconciler) maybeDeleteRedundantDaemonSets(ctx context.Context, dsets [
count := len(dsets)
if count > 1 {
log.V(0).Info("there are redundant DaemonSets", "redundantDS", count-1)
redundantSets := dsets[1:]
for i := range redundantSets {
if err := r.Delete(ctx, &redundantSets[i], client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {

View File

@ -120,7 +120,9 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet {
if len(devicePlugin.Spec.NodeSelector) > 0 {
daemonSet.Spec.Template.Spec.NodeSelector = devicePlugin.Spec.NodeSelector
}
daemonSet.ObjectMeta.Namespace = c.ns
daemonSet.Spec.Template.Spec.Containers[0].Args = getPodArgs(devicePlugin)
daemonSet.Spec.Template.Spec.Containers[0].Image = devicePlugin.Spec.Image

View File

@ -141,6 +141,7 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet
if devicePlugin.Spec.InitImage != "" {
setInitContainer(&daemonSet.Spec.Template.Spec, devicePlugin.Spec.InitImage)
}
return &daemonSet
}

View File

@ -51,7 +51,9 @@ func NewDeviceInfo(state string, nodes []pluginapi.DeviceSpec, mounts []pluginap
mounts: mounts,
envs: envs,
}
devPaths := []string{}
for _, node := range nodes {
devPaths = append(devPaths, node.HostPath)
}
@ -91,6 +93,7 @@ func (tree DeviceTree) AddDevice(devType, id string, info DeviceInfo) {
if _, present := tree[devType]; !present {
tree[devType] = make(map[string]DeviceInfo)
}
tree[devType][id] = info
}

View File

@ -55,6 +55,7 @@ func (n *notifier) Notify(newDeviceTree DeviceTree) {
if !reflect.DeepEqual(old, new) {
updated[devType] = new
}
delete(n.deviceTree, devType)
} else {
added[devType] = new
@ -101,6 +102,7 @@ func (m *Manager) Run() {
klog.Errorf("Device scan failed: %+v", err)
os.Exit(1)
}
close(updatesCh)
}()
@ -111,11 +113,14 @@ func (m *Manager) Run() {
func (m *Manager) handleUpdate(update updateInfo) {
klog.V(4).Info("Received dev updates:", update)
for devType, devices := range update.Added {
var allocate allocateFunc
var postAllocate postAllocateFunc
var preStartContainer preStartContainerFunc
var getPreferredAllocation getPreferredAllocationFunc
var (
allocate allocateFunc
postAllocate postAllocateFunc
preStartContainer preStartContainerFunc
getPreferredAllocation getPreferredAllocationFunc
)
if postAllocator, ok := m.devicePlugin.(PostAllocator); ok {
postAllocate = postAllocator.PostAllocate
@ -134,6 +139,7 @@ func (m *Manager) handleUpdate(update updateInfo) {
}
m.servers[devType] = m.createServer(devType, postAllocate, preStartContainer, getPreferredAllocation, allocate)
go func(dt string) {
err := m.servers[dt].Serve(m.namespace)
if err != nil {
@ -143,13 +149,16 @@ func (m *Manager) handleUpdate(update updateInfo) {
}(devType)
m.servers[devType].Update(devices)
}
for devType, devices := range update.Updated {
m.servers[devType].Update(devices)
}
for devType := range update.Removed {
if err := m.servers[devType].Stop(); err != nil {
klog.Errorf("Unable to stop gRPC server for %q: %+v", devType, err)
}
delete(m.servers, devType)
}
}

View File

@ -115,6 +115,7 @@ func TestNotify(t *testing.T) {
n.Notify(tcase.newmap)
var update updateInfo
select {
case update = <-ch:
default:
@ -123,9 +124,11 @@ func TestNotify(t *testing.T) {
if tcase.expectedAdded != len(update.Added) {
t.Errorf("Test case '%s': expected %d added device types, but got %d", tcase.name, tcase.expectedAdded, len(update.Added))
}
if tcase.expectedUpdated != len(update.Updated) {
t.Errorf("Test case '%s': expected %d updated device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
}
if tcase.expectedRemoved != len(update.Removed) {
t.Errorf("Test case '%s': expected %d removed device types, but got %d", tcase.name, tcase.expectedUpdated, len(update.Updated))
}
@ -153,6 +156,7 @@ func (*devicePluginStub) Scan(n Notifier) error {
nodes: make([]pluginapi.DeviceSpec, 0),
})
n.Notify(tree)
return nil
}
@ -266,6 +270,7 @@ func TestHandleUpdate(t *testing.T) {
if tt.servers == nil {
tt.servers = make(map[string]devicePluginServer)
}
mgr := Manager{
devicePlugin: &devicePluginStub{},
servers: tt.servers,
@ -273,7 +278,9 @@ func TestHandleUpdate(t *testing.T) {
return &serverStub{}
},
}
mgr.handleUpdate(tt.update)
if len(tt.servers) != tt.expectedServers {
t.Errorf("Test case '%s': expected %d runnig device managers, but got %d",
tt.name, tt.expectedServers, len(tt.servers))

View File

@ -101,7 +101,9 @@ func (srv *server) sendDevices(stream pluginapi.DevicePlugin_ListAndWatchServer)
Topology: device.topology,
})
}
klog.V(4).Info("Sending to kubelet", resp.Devices)
if err := stream.Send(resp); err != nil {
_ = srv.Stop()
return errors.Wrapf(err, "Cannot update device list")
@ -136,29 +138,37 @@ func (srv *server) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest)
}
response := new(pluginapi.AllocateResponse)
for _, crqt := range rqt.ContainerRequests {
cresp := new(pluginapi.ContainerAllocateResponse)
for _, id := range crqt.DevicesIDs {
dev, ok := srv.devices[id]
if !ok {
return nil, errors.Errorf("Invalid allocation request with non-existing device %s", id)
}
if dev.state != pluginapi.Healthy {
return nil, errors.Errorf("Invalid allocation request with unhealthy device %s", id)
}
for i := range dev.nodes {
cresp.Devices = append(cresp.Devices, &dev.nodes[i])
}
for i := range dev.mounts {
cresp.Mounts = append(cresp.Mounts, &dev.mounts[i])
}
for key, value := range dev.envs {
if cresp.Envs == nil {
cresp.Envs = make(map[string]string)
}
cresp.Envs[key] = value
}
}
response.ContainerResponses = append(response.ContainerResponses, cresp)
}
@ -168,6 +178,7 @@ func (srv *server) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest)
return nil, err
}
}
return response, nil
}
@ -183,6 +194,7 @@ func (srv *server) GetPreferredAllocation(ctx context.Context, rqt *pluginapi.Pr
if srv.getPreferredAllocation != nil {
return srv.getPreferredAllocation(rqt)
}
return nil, errors.New("GetPreferredAllocation should not be called as this device plugin doesn't implement it")
}
@ -196,9 +208,11 @@ func (srv *server) Stop() error {
if srv.grpcServer == nil {
return errors.New("Can't stop non-existing gRPC server. Calling Stop() before Serve()?")
}
srv.setState(terminating)
srv.grpcServer.Stop()
close(srv.updatesCh)
return nil
}
@ -216,6 +230,7 @@ func (srv *server) setState(state serverState) {
func (srv *server) getState() serverState {
srv.stateMutex.Lock()
defer srv.stateMutex.Unlock()
return srv.state
}
@ -246,6 +261,7 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube
// Starts device plugin service.
go func() {
klog.V(1).Infof("Start server for %s at: %s", srv.devType, pluginSocket)
if serveErr := srv.grpcServer.Serve(lis); serveErr != nil {
klog.Errorf("unable to start gRPC server: %+v", serveErr)
}
@ -261,6 +277,7 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube
if err != nil {
return err
}
klog.V(1).Infof("Device plugin for %s registered", srv.devType)
// Kubelet removes plugin socket when it (re)starts
@ -306,6 +323,7 @@ func watchFile(file string) error {
func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) error {
ctx := context.Background()
conn, err := grpc.DialContext(ctx, kubeletSocket, grpc.WithInsecure(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
@ -313,7 +331,9 @@ func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceNa
if err != nil {
return errors.Wrap(err, "Cannot connect to kubelet service")
}
defer conn.Close()
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
@ -334,7 +354,9 @@ func (srv *server) registerWithKubelet(kubeletSocket, pluginEndPoint, resourceNa
// by making grpc blocking connection to the server socket.
func waitForServer(socket string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, socket, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
@ -343,5 +365,6 @@ func waitForServer(socket string, timeout time.Duration) error {
if conn != nil {
_ = conn.Close()
}
return errors.Wrapf(err, "Failed dial context at %s", socket)
}

View File

@ -88,11 +88,13 @@ func (k *kubeletStub) Register(ctx context.Context, r *pluginapi.RegisterRequest
k.Lock()
defer k.Unlock()
k.pluginEndpoint = r.Endpoint
return &pluginapi.Empty{}, nil
}
func (k *kubeletStub) start() error {
_ = os.Remove(k.socket)
s, err := net.Listen("unix", k.socket)
if err != nil {
return errors.Wrap(err, "Can't listen at the socket")
@ -101,6 +103,7 @@ func (k *kubeletStub) start() error {
k.server = grpc.NewServer()
pluginapi.RegisterRegistrationServer(k.server, k)
go maybeLogError(func() error { return k.server.Serve(s) }, "unable to start server")
// Wait till the grpcServer is ready to serve services.
@ -118,10 +121,12 @@ func TestRegisterWithKublet(t *testing.T) {
}
kubelet := newKubeletStub(kubeletSocket)
err = kubelet.start()
if err != nil {
t.Fatalf("%+v", err)
}
defer kubelet.server.Stop()
err = srv.registerWithKubelet(kubeletSocket, pluginSocket, resourceName)
@ -131,18 +136,22 @@ func TestRegisterWithKublet(t *testing.T) {
}
func TestSetupAndServe(t *testing.T) {
var pluginSocket string
var pEndpoint string
var (
pluginSocket string
pEndpoint string
)
kubelet := newKubeletStub(kubeletSocket)
if err := kubelet.start(); err != nil {
t.Fatalf("unable to start kubelet stub: %+v", err)
}
defer kubelet.server.Stop()
srv := newTestServer()
defer maybeLogError(srv.Stop, "unable to stop server")
go maybeLogError(func() error {
return srv.setupAndServe(namespace, devicePluginPath, kubeletSocket)
}, "unable to start server")
@ -152,12 +161,14 @@ func TestSetupAndServe(t *testing.T) {
kubelet.Lock()
pEndpoint = kubelet.pluginEndpoint
kubelet.Unlock()
pluginSocket = path.Join(devicePluginPath, pEndpoint)
if pEndpoint != "" {
if _, err := os.Stat(pluginSocket); err == nil {
break
}
}
time.Sleep(1 * time.Second)
}
@ -167,6 +178,7 @@ func TestSetupAndServe(t *testing.T) {
}
ctx := context.Background()
conn, err := grpc.DialContext(ctx, pluginSocket, grpc.WithInsecure(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
@ -183,36 +195,45 @@ func TestSetupAndServe(t *testing.T) {
},
},
})
if err != nil {
t.Errorf("Failed to allocate device dev1: %+v", err)
}
_ = conn.Close()
// Check if plugins re-registers after its socket has been removed
kubelet.Lock()
pEndpoint = kubelet.pluginEndpoint
kubelet.Unlock()
if pEndpoint == "" {
t.Fatal("After successful Allocate() pluginEndpoint is empty")
}
_ = os.Remove(path.Join(devicePluginPath, pEndpoint))
for {
kubelet.Lock()
pEndpoint = kubelet.pluginEndpoint
kubelet.Unlock()
pluginSocket = path.Join(devicePluginPath, pEndpoint)
if pEndpoint != "" {
if _, err = os.Stat(pluginSocket); err == nil {
break
}
}
klog.V(1).Info("No plugin socket. Waiting...")
time.Sleep(1 * time.Second)
}
conn, err = grpc.DialContext(ctx, pluginSocket, grpc.WithInsecure(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
}))
if err != nil {
t.Fatalf("Failed to get connection: %+v", err)
}
@ -225,9 +246,11 @@ func TestSetupAndServe(t *testing.T) {
},
},
})
if err != nil {
t.Errorf("Failed to allocate device dev1: %+v", err)
}
_ = conn.Close()
}
@ -358,13 +381,16 @@ func TestAllocate(t *testing.T) {
t.Errorf("Test case '%s': no error returned", tt.name)
continue
}
if !tt.expectedErr && err != nil {
t.Errorf("Test case '%s': got unexpected error %+v", tt.name, err)
continue
}
if tt.expectedAllocated > 0 && len(resp.ContainerResponses[0].Devices) != tt.expectedAllocated {
t.Errorf("Test case '%s': allocated wrong number of devices", tt.name)
}
if tt.expectedAllocated > 1 {
if reflect.DeepEqual(resp.ContainerResponses[0].Devices[0], resp.ContainerResponses[0].Devices[1]) {
t.Errorf("Test case '%s': got equal dev nodes in the same response", tt.name)
@ -390,6 +416,7 @@ func (s *listAndWatchServerStub) Send(resp *pluginapi.ListAndWatchResponse) erro
klog.V(4).Info("listAndWatchServerStub::Send", resp.Devices)
s.cdata <- resp.Devices
return nil
}
@ -481,12 +508,14 @@ func TestListAndWatch(t *testing.T) {
for _, update := range tt.updates {
devCh <- update
}
close(devCh)
err := testServer.ListAndWatch(&pluginapi.Empty{}, server)
if err != nil && tt.errorOnCall == 0 {
t.Errorf("Test case '%s': got unexpected error %+v", tt.name, err)
}
if err == nil && tt.errorOnCall != 0 {
t.Errorf("Test case '%s': expected an error, but got nothing", tt.name)
}

View File

@ -59,12 +59,16 @@ func OpenAOCX(name string) (*FileAOCX, error) {
if err != nil {
return nil, errors.WithStack(err)
}
ff, err := NewFileAOCX(f)
if err != nil {
f.Close()
return nil, err
}
ff.closer = f
return ff, nil
}
@ -76,6 +80,7 @@ func (f *FileAOCX) Close() (err error) {
err = f.closer.Close()
f.closer = nil
}
return
}
@ -86,10 +91,12 @@ func setSection(f *FileAOCX, section *elf.Section) error {
if err != nil {
return errors.Wrap(err, "unable to read .acl.fpga.bin")
}
f.GBS, err = parseFpgaBin(data)
if err != nil {
return errors.Wrap(err, "unable to parse gbs")
}
return nil
}
@ -113,8 +120,10 @@ func setSection(f *FileAOCX, section *elf.Section) error {
if err != nil {
return errors.Wrapf(err, "%s: unable to get section data", name)
}
*field = strings.TrimSpace(string(data))
}
return nil
}
@ -125,6 +134,7 @@ func NewFileAOCX(r io.ReaderAt) (*FileAOCX, error) {
if err != nil {
return nil, errors.Wrap(err, "unable to read header")
}
f := new(FileAOCX)
for _, section := range el.Sections {
err = setSection(f, section)
@ -132,6 +142,7 @@ func NewFileAOCX(r io.ReaderAt) (*FileAOCX, error) {
return nil, err
}
}
return f, nil
}
@ -140,26 +151,32 @@ func parseFpgaBin(d []byte) (*FileGBS, error) {
if err != nil {
return nil, errors.Wrap(err, "unable to open file")
}
gz := gb.Section(".acl.gbs.gz")
if gz == nil {
return nil, errors.New("no .acl.gbs.gz section in .acl.fgpa.bin")
}
gzr, err := gzip.NewReader(gz.Open())
if err != nil {
return nil, errors.Wrap(err, "unable to open gzip reader for .acl.gbs.gz")
}
b, err := io.ReadAll(gzr)
if err != nil {
return nil, errors.Wrap(err, "unable to uncompress .acl.gbs.gz")
}
g, err := NewFileGBS(bytes.NewReader(b))
if err != nil {
return nil, err
}
if afuUUID := g.AcceleratorTypeUUID(); afuUUID != OpenCLUUID {
g.Close()
return nil, errors.Errorf("incorrect OpenCL BSP AFU UUID (%s)", afuUUID)
}
return g, nil
}
@ -168,6 +185,7 @@ func (f *FileAOCX) RawBitstreamReader() io.ReadSeeker {
if f.GBS != nil {
return f.GBS.Bitstream.Open()
}
return nil
}
@ -176,6 +194,7 @@ func (f *FileAOCX) RawBitstreamData() ([]byte, error) {
if f.GBS != nil {
return f.GBS.Bitstream.Data()
}
return nil, errors.Errorf("GBS section not found")
}
@ -190,6 +209,7 @@ func (f *FileAOCX) InterfaceUUID() (ret string) {
if f.GBS != nil {
ret = f.GBS.InterfaceUUID()
}
return
}
@ -198,6 +218,7 @@ func (f *FileAOCX) AcceleratorTypeUUID() (ret string) {
if f.GBS != nil {
ret = f.GBS.AcceleratorTypeUUID()
}
return
}
@ -205,9 +226,11 @@ func (f *FileAOCX) AcceleratorTypeUUID() (ret string) {
func (f *FileAOCX) InstallPath(root string) (ret string) {
interfaceID := f.InterfaceUUID()
uniqID := f.UniqueUUID()
if interfaceID != "" && uniqID != "" {
ret = filepath.Join(root, interfaceID, uniqID+fileExtensionAOCX)
}
return
}

View File

@ -33,12 +33,14 @@ func GetFPGABitstream(bitstreamDir, region, afu string) (File, error) {
if os.IsNotExist(err) {
continue
}
if err != nil {
return nil, errors.Errorf("%s: stat error: %v", bitstreamPath, err)
}
return Open(bitstreamPath)
}
return nil, errors.Errorf("%s/%s: bitstream not found", region, afu)
}
@ -50,5 +52,6 @@ func Open(fname string) (File, error) {
case ".aocx":
return OpenAOCX(fname)
}
return nil, errors.Errorf("unsupported file format %s", fname)
}

View File

@ -99,6 +99,7 @@ func (b *Bitstream) Open() io.ReadSeeker { return io.NewSectionReader(b.sr, 0, 1
func (b *Bitstream) Data() ([]byte, error) {
dat := make([]byte, b.Size)
n, err := io.ReadFull(b.Open(), dat)
return dat[0:n], err
}
@ -108,12 +109,15 @@ func OpenGBS(name string) (*FileGBS, error) {
if err != nil {
return nil, errors.WithStack(err)
}
ff, err := NewFileGBS(f)
if err != nil {
_ = f.Close()
return nil, err
}
ff.closer = f
return ff, nil
}
@ -125,6 +129,7 @@ func (f *FileGBS) Close() (err error) {
err = f.closer.Close()
f.closer = nil
}
return
}
@ -139,6 +144,7 @@ func (f *FileGBS) AcceleratorTypeUUID() (ret string) {
if len(f.Metadata.AfuImage.AcceleratorClusters) == 1 {
ret = strings.ToLower(strings.Replace(f.Metadata.AfuImage.AcceleratorClusters[0].AcceleratorTypeUUID, "-", "", -1))
}
return
}
@ -158,6 +164,7 @@ func NewFileGBS(r bitstreamReader) (*FileGBS, error) {
if _, err := sr.Seek(0, io.SeekStart); err != nil {
return nil, errors.Wrap(err, "unable to seek")
}
if err := binary.Read(sr, binary.LittleEndian, &f.Header); err != nil {
return nil, errors.Wrap(err, "unable to read header")
}
@ -169,25 +176,30 @@ func NewFileGBS(r bitstreamReader) (*FileGBS, error) {
if f.MetadataLength == 0 || f.MetadataLength >= 4096 {
return nil, errors.Errorf("incorrect length of GBS metadata %d", f.MetadataLength)
}
dec := json.NewDecoder(io.NewSectionReader(r, fileHeaderLength, int64(f.MetadataLength)))
if err := dec.Decode(&f.Metadata); err != nil {
return nil, errors.Wrap(err, "unable to parse GBS metadata")
}
if afus := len(f.Metadata.AfuImage.AcceleratorClusters); afus != 1 {
return nil, errors.Errorf("incorrect length of AcceleratorClusters in GBS metadata: %d", afus)
}
// 4. Create bitsream struct
b := new(Bitstream)
// 4.1. calculate offest/size
last, err := r.Seek(0, io.SeekEnd)
if err != nil {
return nil, errors.Wrap(err, "unable to determine file size")
}
b.Size = uint64(last - fileHeaderLength - int64(f.MetadataLength))
// 4.2. assign internal sr
b.sr = io.NewSectionReader(r, int64(fileHeaderLength+f.MetadataLength), int64(b.Size))
b.ReaderAt = b.sr
f.Bitstream = b
return f, nil
}
@ -213,9 +225,11 @@ func (f *FileGBS) UniqueUUID() string {
func (f *FileGBS) InstallPath(root string) (ret string) {
interfaceID := f.InterfaceUUID()
uniqID := f.UniqueUUID()
if interfaceID != "" && uniqID != "" {
ret = filepath.Join(root, interfaceID, uniqID+fileExtensionGBS)
}
return
}

View File

@ -51,6 +51,7 @@ func TestOpenGBS(t *testing.T) {
func TestFileGBSMethods(t *testing.T) {
interfaceUUID := "69528db6eb31577a8c3668f9faa081f6"
typeUUID := "d8424dc4a4a3c413f89e433683f9040b"
gbs, err := OpenGBS(filepath.Join("testdata/intel.com/fpga", interfaceUUID, typeUUID) + ".gbs")
if err != nil {
t.Errorf("unexpected open error: %+v", err)

View File

@ -59,9 +59,11 @@ func NewDflFME(dev string) (FME, error) {
if err := checkPCIDeviceType(fme); err != nil {
return nil, err
}
if err := fme.updateProperties(); err != nil {
return nil, err
}
return fme, nil
}
@ -95,9 +97,11 @@ func NewDflPort(dev string) (Port, error) {
if err := checkPCIDeviceType(port); err != nil {
return nil, err
}
if err := port.updateProperties(); err != nil {
return nil, err
}
return port, nil
}
@ -146,11 +150,13 @@ func (f *DflPort) CheckExtension() (int, error) {
// from the status of FME's fpga manager.
func (f *DflFME) PortPR(port uint32, bitstream []byte) error {
var value DflFpgaFmePortPR
value.Argsz = uint32(unsafe.Sizeof(value))
value.Port_id = port
value.Buffer_size = uint32(len(bitstream))
value.Buffer_address = uint64(uintptr(unsafe.Pointer(&bitstream[0])))
_, err := ioctlDev(f.DevPath, DFL_FPGA_FME_PORT_PR, uintptr(unsafe.Pointer(&value)))
return err
}
@ -159,6 +165,7 @@ func (f *DflFME) PortPR(port uint32, bitstream []byte) error {
func (f *DflFME) PortRelease(port uint32) error {
value := port
_, err := ioctlDev(f.DevPath, DFL_FPGA_FME_PORT_RELEASE, uintptr(unsafe.Pointer(&value)))
return err
}
@ -167,6 +174,7 @@ func (f *DflFME) PortRelease(port uint32) error {
func (f *DflFME) PortAssign(port uint32) error {
value := port
_, err := ioctlDev(f.DevPath, DFL_FPGA_FME_PORT_ASSIGN, uintptr(unsafe.Pointer(&value)))
return err
}
@ -180,11 +188,14 @@ func (f *DflFME) GetSysFsPath() string {
if f.SysFsPath != "" {
return f.SysFsPath
}
sysfs, err := FindSysFsDevice(f.DevPath)
if err != nil {
return ""
}
f.SysFsPath = sysfs
return f.SysFsPath
}
@ -193,7 +204,9 @@ func (f *DflFME) GetName() string {
if f.Name != "" {
return f.Name
}
f.Name = filepath.Base(f.GetSysFsPath())
return f.Name
}
@ -202,11 +215,14 @@ func (f *DflFME) GetPCIDevice() (*PCIDevice, error) {
if f.PCIDevice != nil {
return f.PCIDevice, nil
}
pci, err := NewPCIDevice(f.GetSysFsPath())
if err != nil {
return nil, err
}
f.PCIDevice = pci
return f.PCIDevice, nil
}
@ -218,10 +234,12 @@ func (f *DflFME) GetPortsNum() int {
return -1
}
}
n, err := strconv.ParseUint(f.PortsNum, 10, 32)
if err != nil {
return -1
}
return int(n)
}
@ -233,6 +251,7 @@ func (f *DflFME) GetInterfaceUUID() (id string) {
return ""
}
}
return f.CompatID
}
@ -241,7 +260,9 @@ func (f *DflFME) GetSocketID() (uint32, error) {
if f.SocketID == "" {
return math.MaxUint32, errors.Errorf("n/a")
}
id, err := strconv.ParseUint(f.SocketID, 10, 32)
return uint32(id), err
}
@ -261,6 +282,7 @@ func (f *DflFME) updateProperties() error {
if err != nil {
return err
}
fileMap := map[string]*string{
"bitstream_id": &f.BitstreamID,
"bitstream_metadata": &f.BitstreamMetadata,
@ -269,6 +291,7 @@ func (f *DflFME) updateProperties() error {
"socket_id": &f.SocketID,
"dfl-fme-region.*/fpga_region/region*/compat_id": &f.CompatID,
}
return readFilesInDirectory(fileMap, filepath.Join(pci.SysFsPath, dflFpgaFmeGlobPCI))
}
@ -289,13 +312,16 @@ func (f *DflPort) PortReset() error {
// * Return: 0 on success, -errno on failure.
func (f *DflPort) PortGetInfo() (ret PortInfo, err error) {
var value DflFpgaPortInfo
value.Argsz = uint32(unsafe.Sizeof(value))
_, err = ioctlDev(f.DevPath, DFL_FPGA_PORT_GET_INFO, uintptr(unsafe.Pointer(&value)))
if err == nil {
ret.Flags = value.Flags
ret.Regions = value.Regions
ret.Umsgs = value.Umsgs
}
return
}
@ -306,8 +332,10 @@ func (f *DflPort) PortGetInfo() (ret PortInfo, err error) {
// * Return: 0 on success, -errno on failure.
func (f *DflPort) PortGetRegionInfo(index uint32) (ret PortRegionInfo, err error) {
var value DflFpgaPortRegionInfo
value.Argsz = uint32(unsafe.Sizeof(value))
value.Index = index
_, err = ioctlDev(f.DevPath, DFL_FPGA_PORT_GET_REGION_INFO, uintptr(unsafe.Pointer(&value)))
if err == nil {
ret.Flags = value.Flags
@ -315,6 +343,7 @@ func (f *DflPort) PortGetRegionInfo(index uint32) (ret PortRegionInfo, err error
ret.Offset = value.Offset
ret.Size = value.Size
}
return
}
@ -328,11 +357,14 @@ func (f *DflPort) GetSysFsPath() string {
if f.SysFsPath != "" {
return f.SysFsPath
}
sysfs, err := FindSysFsDevice(f.DevPath)
if err != nil {
return ""
}
f.SysFsPath = sysfs
return f.SysFsPath
}
@ -341,7 +373,9 @@ func (f *DflPort) GetName() string {
if f.Name != "" {
return f.Name
}
f.Name = filepath.Base(f.GetSysFsPath())
return f.Name
}
@ -350,11 +384,14 @@ func (f *DflPort) GetPCIDevice() (*PCIDevice, error) {
if f.PCIDevice != nil {
return f.PCIDevice, nil
}
pci, err := NewPCIDevice(f.GetSysFsPath())
if err != nil {
return nil, err
}
f.PCIDevice = pci
return f.PCIDevice, nil
}
@ -363,31 +400,39 @@ func (f *DflPort) GetFME() (fme FME, err error) {
if f.FME != nil {
return f.FME, nil
}
pci, err := f.GetPCIDevice()
if err != nil {
return
}
if pci.PhysFn != nil {
pci = pci.PhysFn
}
var dev string
fileMap := map[string]*string{
"dev": &dev,
}
if err = readFilesInDirectory(fileMap, filepath.Join(pci.SysFsPath, dflFpgaFmeGlobPCI)); err != nil {
return
}
realDev, err := filepath.EvalSymlinks(filepath.Join("/dev/char", dev))
if err != nil {
return
}
fme, err = NewDflFME(realDev)
if err != nil {
return
}
f.FME = fme
return
return fme, err
}
// GetPortID returns ID of the FPGA port within physical device.
@ -398,7 +443,9 @@ func (f *DflPort) GetPortID() (uint32, error) {
return math.MaxUint32, err
}
}
id, err := strconv.ParseUint(f.ID, 10, 32)
return uint32(id), err
}
@ -408,6 +455,7 @@ func (f *DflPort) GetAcceleratorTypeUUID() (afuID string) {
if err != nil || f.AFUID == "" {
return ""
}
return f.AFUID
}
@ -418,6 +466,7 @@ func (f *DflPort) GetInterfaceUUID() (id string) {
return ""
}
defer fme.Close()
return fme.GetInterfaceUUID()
}
@ -433,5 +482,6 @@ func (f *DflPort) updateProperties() error {
"dev": &f.Dev,
"id": &f.ID,
}
return readFilesInDirectory(fileMap, f.GetSysFsPath())
}

View File

@ -46,13 +46,16 @@ func NewPort(fname string) (Port, error) {
if strings.IndexByte(fname, byte('/')) < 0 {
fname = filepath.Join("/dev", fname)
}
devName := cleanBasename(fname)
switch {
case strings.HasPrefix(devName, dflFpgaPortPrefix):
return NewDflPort(fname)
case strings.HasPrefix(devName, intelFpgaPortPrefix):
return NewIntelFpgaPort(fname)
}
return nil, errors.Errorf("unknown type of FPGA port %s", fname)
}
@ -61,13 +64,16 @@ func NewFME(fname string) (FME, error) {
if strings.IndexByte(fname, byte('/')) < 0 {
fname = filepath.Join("/dev", fname)
}
devName := cleanBasename(fname)
switch {
case strings.HasPrefix(devName, dflFpgaFmePrefix):
return NewDflFME(fname)
case strings.HasPrefix(devName, intelFpgaFmePrefix):
return NewIntelFpgaFME(fname)
}
return nil, errors.Errorf("unknown type of FPGA FME %s", fname)
}
@ -77,8 +83,10 @@ func ListFpgaDevices() (FMEs, Ports []string) {
if err != nil {
return
}
for _, file := range files {
fname := file.Name()
switch {
case IsFpgaFME(fname):
FMEs = append(FMEs, fname)
@ -86,6 +94,7 @@ func ListFpgaDevices() (FMEs, Ports []string) {
Ports = append(Ports, fname)
}
}
return
}
@ -94,21 +103,27 @@ func genericPortPR(f Port, bs bitstream.File, dryRun bool) error {
if err != nil {
return err
}
ifID := fme.GetInterfaceUUID()
bsID := bs.InterfaceUUID()
if ifID != bsID {
return errors.Errorf("FME interface UUID %q is not compatible with bitstream UUID %q ", ifID, bsID)
}
pNum, err := f.GetPortID()
if err != nil {
return err
}
rawBistream, err := bs.RawBitstreamData()
if err != nil {
return err
}
if dryRun {
return nil
}
return fme.PortPR(pNum, rawBistream)
}

View File

@ -56,9 +56,11 @@ func NewIntelFpgaFME(dev string) (FME, error) {
if err := checkPCIDeviceType(fme); err != nil {
return nil, err
}
if err := fme.updateProperties(); err != nil {
return nil, err
}
return fme, nil
}
@ -80,6 +82,7 @@ func (f *IntelFpgaPort) Close() error {
if f.FME != nil {
defer f.FME.Close()
}
return nil
}
@ -90,10 +93,12 @@ func NewIntelFpgaPort(dev string) (Port, error) {
port.Close()
return nil, err
}
if err := port.updateProperties(); err != nil {
port.Close()
return nil, err
}
return port, nil
}
@ -141,11 +146,14 @@ func (f *IntelFpgaPort) CheckExtension() (int, error) {
// from the status of FME's fpga manager.
func (f *IntelFpgaFME) PortPR(port uint32, bitstream []byte) error {
var value IntelFpgaFmePortPR
value.Argsz = uint32(unsafe.Sizeof(value))
value.Port_id = port
value.Buffer_size = uint32(len(bitstream))
value.Buffer_address = uint64(uintptr(unsafe.Pointer(&bitstream[0])))
_, err := ioctlDev(f.DevPath, FPGA_FME_PORT_PR, uintptr(unsafe.Pointer(&value)))
return err
}
@ -153,9 +161,12 @@ func (f *IntelFpgaFME) PortPR(port uint32, bitstream []byte) error {
// * Return: 0 on success, -errno on failure.
func (f *IntelFpgaFME) PortRelease(port uint32) error {
var value IntelFpgaFmePortRelease
value.Argsz = uint32(unsafe.Sizeof(value))
value.Id = port
_, err := ioctlDev(f.DevPath, FPGA_FME_PORT_RELEASE, uintptr(unsafe.Pointer(&value)))
return err
}
@ -163,9 +174,12 @@ func (f *IntelFpgaFME) PortRelease(port uint32) error {
// * Return: 0 on success, -errno on failure.
func (f *IntelFpgaFME) PortAssign(port uint32) error {
var value IntelFpgaFmePortAssign
value.Argsz = uint32(unsafe.Sizeof(value))
value.Id = port
_, err := ioctlDev(f.DevPath, FPGA_FME_PORT_ASSIGN, uintptr(unsafe.Pointer(&value)))
return err
}
@ -179,11 +193,14 @@ func (f *IntelFpgaFME) GetSysFsPath() string {
if f.SysFsPath != "" {
return f.SysFsPath
}
sysfs, err := FindSysFsDevice(f.DevPath)
if err != nil {
return ""
}
f.SysFsPath = sysfs
return f.SysFsPath
}
@ -192,7 +209,9 @@ func (f *IntelFpgaFME) GetName() string {
if f.Name != "" {
return f.Name
}
f.Name = filepath.Base(f.GetSysFsPath())
return f.Name
}
@ -201,11 +220,14 @@ func (f *IntelFpgaFME) GetPCIDevice() (*PCIDevice, error) {
if f.PCIDevice != nil {
return f.PCIDevice, nil
}
pci, err := NewPCIDevice(f.GetSysFsPath())
if err != nil {
return nil, err
}
f.PCIDevice = pci
return f.PCIDevice, nil
}
@ -217,10 +239,12 @@ func (f *IntelFpgaFME) GetPortsNum() int {
return -1
}
}
n, err := strconv.ParseUint(f.PortsNum, 10, 32)
if err != nil {
return -1
}
return int(n)
}
@ -232,6 +256,7 @@ func (f *IntelFpgaFME) GetInterfaceUUID() (id string) {
return ""
}
}
return f.CompatID
}
@ -240,7 +265,9 @@ func (f *IntelFpgaFME) GetSocketID() (uint32, error) {
if f.SocketID == "" {
return math.MaxUint32, errors.Errorf("n/a")
}
id, err := strconv.ParseUint(f.SocketID, 10, 32)
return uint32(id), err
}
@ -260,6 +287,7 @@ func (f *IntelFpgaFME) updateProperties() error {
if err != nil {
return err
}
fileMap := map[string]*string{
"bitstream_id": &f.BitstreamID,
"bitstream_metadata": &f.BitstreamMetadata,
@ -268,6 +296,7 @@ func (f *IntelFpgaFME) updateProperties() error {
"socket_id": &f.SocketID,
"pr/interface_id": &f.CompatID,
}
return readFilesInDirectory(fileMap, filepath.Join(pci.SysFsPath, intelFpgaFmeGlobPCI))
}
@ -288,13 +317,16 @@ func (f *IntelFpgaPort) PortReset() error {
// * Return: 0 on success, -errno on failure.
func (f *IntelFpgaPort) PortGetInfo() (ret PortInfo, err error) {
var value IntelFpgaPortInfo
value.Argsz = uint32(unsafe.Sizeof(value))
_, err = ioctlDev(f.DevPath, FPGA_PORT_GET_INFO, uintptr(unsafe.Pointer(&value)))
if err == nil {
ret.Flags = value.Flags
ret.Regions = value.Regions
ret.Umsgs = value.Umsgs
}
return
}
@ -305,8 +337,10 @@ func (f *IntelFpgaPort) PortGetInfo() (ret PortInfo, err error) {
// * Return: 0 on success, -errno on failure.
func (f *IntelFpgaPort) PortGetRegionInfo(index uint32) (ret PortRegionInfo, err error) {
var value IntelFpgaPortRegionInfo
value.Argsz = uint32(unsafe.Sizeof(value))
value.Index = index
_, err = ioctlDev(f.DevPath, FPGA_PORT_GET_REGION_INFO, uintptr(unsafe.Pointer(&value)))
if err == nil {
ret.Flags = value.Flags
@ -314,6 +348,7 @@ func (f *IntelFpgaPort) PortGetRegionInfo(index uint32) (ret PortRegionInfo, err
ret.Offset = value.Offset
ret.Size = value.Size
}
return
}
@ -327,11 +362,14 @@ func (f *IntelFpgaPort) GetSysFsPath() string {
if f.SysFsPath != "" {
return f.SysFsPath
}
sysfs, err := FindSysFsDevice(f.DevPath)
if err != nil {
return ""
}
f.SysFsPath = sysfs
return f.SysFsPath
}
@ -340,7 +378,9 @@ func (f *IntelFpgaPort) GetName() string {
if f.Name != "" {
return f.Name
}
f.Name = filepath.Base(f.GetSysFsPath())
return f.Name
}
@ -349,11 +389,14 @@ func (f *IntelFpgaPort) GetPCIDevice() (*PCIDevice, error) {
if f.PCIDevice != nil {
return f.PCIDevice, nil
}
pci, err := NewPCIDevice(f.GetSysFsPath())
if err != nil {
return nil, err
}
f.PCIDevice = pci
return f.PCIDevice, nil
}
@ -362,31 +405,38 @@ func (f *IntelFpgaPort) GetFME() (fme FME, err error) {
if f.FME != nil {
return f.FME, nil
}
pci, err := f.GetPCIDevice()
if err != nil {
return
}
if pci.PhysFn != nil {
pci = pci.PhysFn
}
var dev string
fileMap := map[string]*string{
"dev": &dev,
}
if err = readFilesInDirectory(fileMap, filepath.Join(pci.SysFsPath, intelFpgaFmeGlobPCI)); err != nil {
return
}
realDev, err := filepath.EvalSymlinks(filepath.Join("/dev/char", dev))
if err != nil {
return
}
fme, err = NewIntelFpgaFME(realDev)
if err != nil {
return
}
f.FME = fme
return
return fme, err
}
// GetPortID returns ID of the FPGA port within physical device.
@ -397,7 +447,9 @@ func (f *IntelFpgaPort) GetPortID() (uint32, error) {
return math.MaxUint32, err
}
}
id, err := strconv.ParseUint(f.ID, 10, 32)
return uint32(id), err
}
@ -407,6 +459,7 @@ func (f *IntelFpgaPort) GetAcceleratorTypeUUID() string {
if err != nil || f.AFUID == "" {
return ""
}
return f.AFUID
}
@ -417,6 +470,7 @@ func (f *IntelFpgaPort) GetInterfaceUUID() (id string) {
return ""
}
defer fme.Close()
return fme.GetInterfaceUUID()
}
@ -432,5 +486,6 @@ func (f *IntelFpgaPort) updateProperties() error {
"dev": &f.Dev,
"id": &f.ID,
}
return readFilesInDirectory(fileMap, f.GetSysFsPath())
}

View File

@ -27,6 +27,7 @@ func ioctl(fd uintptr, req uint, arg uintptr) (uintptr, error) {
if err != 0 {
return ret, err
}
return ret, nil
}
@ -37,5 +38,6 @@ func ioctlDev(dev string, req uint, arg uintptr) (ret uintptr, err error) {
return
}
defer f.Close()
return ioctl(f.Fd(), req, arg)
}

View File

@ -54,22 +54,29 @@ type PCIDevice struct {
// NewPCIDevice returns sysfs entry for specified PCI device.
func NewPCIDevice(devPath string) (*PCIDevice, error) {
realDevPath, err := filepath.EvalSymlinks(devPath)
if err != nil {
return nil, errors.Wrapf(err, "failed get realpath for %s", devPath)
}
pci := new(PCIDevice)
for p := realDevPath; strings.HasPrefix(p, "/sys/devices/pci"); p = filepath.Dir(p) {
subs := pciAddressRE.FindStringSubmatch(filepath.Base(p))
if subs == nil || len(subs) != 5 {
continue
}
pci.SysFsPath = p
pci.BDF = subs[0]
break
}
if pci.SysFsPath == "" || pci.BDF == "" {
return nil, errors.Errorf("can't find PCI device address for sysfs entry %s", realDevPath)
}
fileMap := map[string]*string{
"vendor": &pci.Vendor,
"device": &pci.Device,
@ -79,18 +86,23 @@ func NewPCIDevice(devPath string) (*PCIDevice, error) {
"sriov_numvfs": &pci.VFs,
"sriov_totalvfs": &pci.TotalVFs,
}
if err = readFilesInDirectory(fileMap, pci.SysFsPath); err != nil {
return nil, err
}
if pci.Vendor == "" || pci.Device == "" {
return nil, errors.Errorf("%s vendor or device id can't be empty (%q/%q)", pci.SysFsPath, pci.Vendor, pci.Device)
}
if physFn, err := NewPCIDevice(filepath.Join(pci.SysFsPath, "physfn")); err == nil {
pci.PhysFn = physFn
}
if driver, err := filepath.EvalSymlinks(filepath.Join(pci.SysFsPath, "driver")); err == nil {
pci.Driver = filepath.Base(driver)
}
return pci, nil
}
@ -99,6 +111,7 @@ func (pci *PCIDevice) NumVFs() int64 {
if numvfs, err := strconv.ParseInt(pci.VFs, 10, 32); err == nil {
return numvfs
}
return -1
}
@ -111,9 +124,11 @@ func (pci *PCIDevice) GetVFs() (ret []*PCIDevice, err error) {
if er != nil {
return nil, er
}
ret = append(ret, vf)
}
}
return
}
@ -125,13 +140,16 @@ func FindSysFsDevice(dev string) (string, error) {
if os.IsNotExist(err) {
return "", nil
}
return "", errors.Wrapf(err, "unable to get stat for %s", dev)
}
devType := "block"
rdev := fi.Sys().(*syscall.Stat_t).Dev
if mode := fi.Mode(); mode&os.ModeDevice != 0 {
rdev = fi.Sys().(*syscall.Stat_t).Rdev
if mode&os.ModeCharDevice != 0 {
devType = "char"
}
@ -139,13 +157,17 @@ func FindSysFsDevice(dev string) (string, error) {
major := unix.Major(rdev)
minor := unix.Minor(rdev)
if major == 0 {
return "", errors.Errorf("%s is a virtual device node", dev)
}
devPath := fmt.Sprintf("/sys/dev/%s/%d:%d", devType, major, minor)
realDevPath, err := filepath.EvalSymlinks(devPath)
if err != nil {
return "", errors.Wrapf(err, "failed get realpath for %s", devPath)
}
return realDevPath, nil
}

View File

@ -29,6 +29,7 @@ func readFilesInDirectory(fileMap map[string]*string, dir string) error {
if strings.ContainsAny(fname, "?*[") {
// path contains wildcards, let's find by Glob needed file.
files, err := filepath.Glob(fname)
switch {
case err != nil:
continue
@ -36,17 +37,22 @@ func readFilesInDirectory(fileMap map[string]*string, dir string) error {
// doesn't match unique file, skip it
continue
}
fname = files[0]
}
b, err := os.ReadFile(fname)
if err != nil {
if os.IsNotExist(err) {
continue
}
return errors.Wrapf(err, "%s: unable to read file %q", dir, k)
}
*v = strings.TrimSpace(string(b))
}
return nil
}
@ -56,6 +62,7 @@ func cleanBasename(name string) string {
if err != nil {
realPath = name
}
return filepath.Base(realPath)
}
@ -69,5 +76,6 @@ func checkPCIDeviceType(dev commonFpgaAPI) error {
if pci.Class != fpgaClass {
return errors.Errorf("unsupported PCI class device %s VID=%s PID=%s Class=%s", pci.BDF, pci.Vendor, pci.Device, pci.Class)
}
return nil
}

View File

@ -44,19 +44,23 @@ func (r *AcceleratorFunctionReconciler) Reconcile(ctx context.Context, req ctrl.
log := log.FromContext(ctx).WithValues("af", req.NamespacedName)
p := r.PatcherManager.GetPatcher(req.NamespacedName.Namespace)
var af fpgav2.AcceleratorFunction
if err := r.Get(ctx, req.NamespacedName, &af); err != nil {
if apierrors.IsNotFound(err) {
p.RemoveAf(req.NamespacedName.Name)
log.V(4).Info("removed from patcher")
return ctrl.Result{}, nil
}
log.Error(err, "unable to fetch AcceleratorFunction object")
return ctrl.Result{}, err
}
log.V(4).Info("received", "AcceleratorFunction", af)
return ctrl.Result{}, p.AddAf(&af)
}
@ -79,20 +83,24 @@ func (r *FpgaRegionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
log := log.FromContext(ctx).WithValues("fpgaregion", req.NamespacedName)
p := r.PatcherManager.GetPatcher(req.NamespacedName.Namespace)
var region fpgav2.FpgaRegion
if err := r.Get(ctx, req.NamespacedName, &region); err != nil {
if apierrors.IsNotFound(err) {
p.RemoveRegion(req.NamespacedName.Name)
log.V(4).Info("removed from patcher")
return ctrl.Result{}, nil
}
log.Error(err, "unable to fetch FpgaRegion object")
return ctrl.Result{}, err
}
log.V(4).Info("received", "FpgaRegion", region)
p.AddRegion(&region)
return ctrl.Result{}, nil
}

View File

@ -38,6 +38,7 @@ var (
func init() {
ctrl.SetLogger(klogr.New())
_ = fpgav2.AddToScheme(scheme)
}
@ -83,6 +84,7 @@ func TestAcceleratorFunctionReconcile(t *testing.T) {
func TestAcceleratorFunctionSetupWithManager(t *testing.T) {
r := &AcceleratorFunctionReconciler{}
err := r.SetupWithManager(&mockManager{
scheme: scheme,
log: logger,
@ -134,6 +136,7 @@ func TestFpgaRegionReconcile(t *testing.T) {
func TestFpgaRegionSetupWithManager(t *testing.T) {
r := &FpgaRegionReconciler{}
err := r.SetupWithManager(&mockManager{
scheme: scheme,
log: logger,

View File

@ -95,6 +95,7 @@ func (p *patcher) AddAf(accfunc *fpgav2.AcceleratorFunction) error {
p.Lock()
p.afMap[namespace+"/"+accfunc.Name] = accfunc
if accfunc.Spec.Mode == af {
devtype, err := fpga.GetAfuDevType(accfunc.Spec.InterfaceID, accfunc.Spec.AfuID)
if err != nil {
@ -105,6 +106,7 @@ func (p *patcher) AddAf(accfunc *fpgav2.AcceleratorFunction) error {
} else {
p.resourceMap[namespace+"/"+accfunc.Name] = rfc6901Escaper.Replace(namespace + "/region-" + accfunc.Spec.InterfaceID)
}
p.resourceModeMap[namespace+"/"+accfunc.Name] = accfunc.Spec.Mode
return nil
@ -141,6 +143,7 @@ func validateContainer(container corev1.Container) error {
return errors.Errorf("environment variable '%s' is not allowed", v.Name)
}
}
return nil
}
@ -162,6 +165,7 @@ func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]s
envVars := make(map[string]string)
counter := 0
ops := make([]string, 0, 2*len(requestedResources))
for rname, quantity := range requestedResources {
mode, found := p.resourceModeMap[rname]
if !found {
@ -179,6 +183,7 @@ func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]s
// The requested resources are exposed by FPGA plugins working in "region" mode.
for i := int64(0); i < quantity; i++ {
counter++
envVars[fmt.Sprintf("FPGA_REGION_%d", counter)] = p.afMap[rname].Spec.InterfaceID
envVars[fmt.Sprintf("FPGA_AFU_%d", counter)] = p.afMap[rname].Spec.AfuID
}
@ -187,6 +192,7 @@ func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]s
err := errors.Errorf("%q is registered with unknown mode %q instead of %q or %q",
rname, p.resourceModeMap[rname], af, region)
p.log.Error(err, "unable to construct patching operations")
return nil, err
}
@ -217,6 +223,7 @@ func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]s
for _, envvar := range container.Env {
envVars[envvar.Name] = envvar.Value
}
data := struct {
EnvVars map[string]string
ContainerIdx int
@ -224,11 +231,14 @@ func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]s
ContainerIdx: containerIdx,
EnvVars: envVars,
}
t := template.Must(template.New("add_operation").Parse(envAddOpTpl))
buf := new(bytes.Buffer)
if err := t.Execute(buf, data); err != nil {
return nil, errors.Wrap(err, "unable to execute template")
}
ops = append(ops, buf.String())
}

View File

@ -63,24 +63,29 @@ func TestPatcherStorageFunctions(t *testing.T) {
if err := p.AddAf(goodAf); err != nil {
t.Error("unexpected error")
}
if len(p.resourceModeMap) != 1 || len(p.afMap) != 1 || len(p.resourceMap) != 1 {
t.Error("Failed to add AF to patcher")
}
if err := p.AddAf(brokenAf); err == nil {
t.Error("AddAf() must fail")
}
p.RemoveAf(goodAf.Name)
if len(p.resourceModeMap) != 0 || len(p.afMap) != 0 || len(p.resourceMap) != 0 {
t.Error("Failed to remove AF from patcher")
}
p.AddRegion(region)
if len(p.resourceModeMap) != 1 || len(p.resourceMap) != 1 {
t.Error("Failed to add fpga region to patcher")
}
p.RemoveRegion(region.Name)
if len(p.resourceModeMap) != 0 || len(p.resourceMap) != 0 {
t.Error("Failed to remove fpga region from patcher")
}

View File

@ -78,11 +78,13 @@ func (pm *Manager) mutate(ctx context.Context, req webhook.AdmissionRequest) web
if req.Resource != podResource {
err := errors.Errorf("unexpected resource type %q", req.Resource)
pm.log.Error(err, "unable to mutate")
return toAdmissionResponse(err)
}
raw := req.Object.Raw
pod := corev1.Pod{}
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
pm.log.Error(err, "unable to decode")
@ -93,10 +95,12 @@ func (pm *Manager) mutate(ctx context.Context, req webhook.AdmissionRequest) web
if namespace == "" && req.Namespace != "" {
namespace = req.Namespace
}
name := pod.Name
if name == "" && pod.ObjectMeta.GenerateName != "" {
name = pod.ObjectMeta.GenerateName
}
pm.log.V(1).Info("Received pod", "Pod", name, "Namespace", namespace)
patcher := pm.GetPatcher(namespace)
@ -105,11 +109,13 @@ func (pm *Manager) mutate(ctx context.Context, req webhook.AdmissionRequest) web
}
ops := []string{}
for containerIdx, container := range pod.Spec.Containers {
patchOps, err := patcher.getPatchOps(containerIdx, container)
if err != nil {
return toAdmissionResponse(err)
}
ops = append(ops, patchOps...)
}
@ -118,6 +124,7 @@ func (pm *Manager) mutate(ctx context.Context, req webhook.AdmissionRequest) web
pt := admissionv1.PatchTypeJSONPatch
reviewResponse.PatchType = &pt
}
return webhook.AdmissionResponse{
AdmissionResponse: reviewResponse,
}

View File

@ -51,6 +51,7 @@ func TestGetPatcher(t *testing.T) {
pm: &Manager{patchers: map[string]*patcher{namespace: newPatcher(log)}},
},
}
for _, tt := range tcases {
t.Run(tt.name, func(t *testing.T) {
p := tt.pm.GetPatcher(namespace)
@ -102,10 +103,12 @@ func TestMutate(t *testing.T) {
},
},
}
podRaw, err := json.Marshal(pod)
if err != nil {
t.Fatal(err)
}
brokenPodRaw, err := json.Marshal(brokenPod)
if err != nil {
t.Fatal(err)

View File

@ -69,6 +69,7 @@ func NewDevicePlugin(sysfsDir, statePattern, devDir string, sharedDevNum int) *D
// Scan discovers devices and reports them to the upper level API.
func (dp *DevicePlugin) Scan(notifier dpapi.Notifier) error {
defer dp.scanTicker.Stop()
for {
devTree, err := dp.scan()
if err != nil {
@ -90,6 +91,7 @@ func readFile(fpath string) (string, error) {
if err != nil {
return "", errors.WithStack(err)
}
return strings.TrimSpace(string(data)), nil
}
@ -97,6 +99,7 @@ func readFile(fpath string) (string, error) {
func getDevNodes(devDir, charDevDir, wqName string) ([]pluginapi.DeviceSpec, error) {
// check if /dev/dsa/<work queue> device node exists
devPath := path.Join(devDir, wqName)
stat, err := os.Stat(devPath)
if err != nil {
return nil, errors.WithStack(err)
@ -110,10 +113,12 @@ func getDevNodes(devDir, charDevDir, wqName string) ([]pluginapi.DeviceSpec, err
// as libaccel-config requires it
rdev := stat.Sys().(*syscall.Stat_t).Rdev
charDevPath := path.Join(charDevDir, fmt.Sprintf("%d:%d", unix.Major(rdev), unix.Minor(rdev)))
stat, err = os.Lstat(charDevPath)
if err != nil {
return nil, errors.WithStack(err)
}
if stat.Mode()&os.ModeSymlink == 0 {
return nil, errors.Errorf("%s is not a symlink", charDevPath)
}
@ -122,6 +127,7 @@ func getDevNodes(devDir, charDevDir, wqName string) ([]pluginapi.DeviceSpec, err
if err != nil {
return nil, errors.WithStack(err)
}
if destPath != devPath {
return nil, errors.Errorf("%s points to %s instead of device node %s", charDevPath, destPath, devPath)
}
@ -164,6 +170,7 @@ func (dp *DevicePlugin) scan() (dpapi.DeviceTree, error) {
// Read queue mode
queueDir := filepath.Dir(fpath)
wqMode, err := readFile(path.Join(queueDir, "mode"))
if err != nil {
return nil, err
@ -189,7 +196,9 @@ func (dp *DevicePlugin) scan() (dpapi.DeviceTree, error) {
if wqMode != "shared" {
amount = 1
}
klog.V(4).Infof("%s: amount: %d, type: %s, mode: %s, nodes: %+v", wqName, amount, wqType, wqMode, devNodes)
for i := 0; i < amount; i++ {
deviceType := fmt.Sprintf("wq-%s-%s", wqType, wqMode)
deviceID := fmt.Sprintf("%s-%s-%d", deviceType, wqName, i)

View File

@ -36,8 +36,9 @@ const (
func getFakeDevNodes(devDir, charDevDir, wqName string) ([]pluginapi.DeviceSpec, error) {
devPath := path.Join(devDir, wqName)
var devNum int
var queueNum int
var devNum, queueNum int
fmt.Sscanf(wqName, "wq%d.%d", &devNum, &queueNum)
charDevPath := path.Join(charDevDir, fmt.Sprintf("%d:%d", dsaMajor, devNum*10+queueNum))
@ -212,6 +213,7 @@ func genTest(sysfs, statePattern string, tc testCase) func(t *testing.T) {
t.Fatalf("Failed to create fake sysfs directory: %+v", err)
}
}
for filename, body := range tc.sysfsFiles {
if err := os.WriteFile(path.Join(sysfs, filename), body, 0600); err != nil {
t.Fatalf("Failed to create fake sysfs entry: %+v", err)
@ -229,9 +231,11 @@ func genTest(sysfs, statePattern string, tc testCase) func(t *testing.T) {
if !tc.expectedError && err != nil {
t.Errorf("unexpected error: %+v", err)
}
if tc.expectedError && err == nil {
t.Errorf("unexpected success")
}
if err := checkDeviceTree(notifier.deviceTree, tc.expectedResult, tc.expectedError); err != nil {
t.Error(err)
}
@ -246,15 +250,19 @@ func checkDeviceTree(deviceTree dpapi.DeviceTree, expectedResult map[string]int,
if !ok {
return fmt.Errorf("%w: unexpected device type: %s", errUnitTest, key)
}
numberDev := len(deviceTree[key])
if numberDev != val {
return fmt.Errorf("%w: %s: unexpected number of devices: %d, expected: %d", errUnitTest, key, numberDev, val)
}
delete(expectedResult, key)
}
if len(expectedResult) > 0 {
return fmt.Errorf("%w: missing expected result(s): %+v", errUnitTest, expectedResult)
}
}
return nil
}

View File

@ -40,6 +40,7 @@ func GetRequestedResources(container corev1.Container, ns string) (map[string]in
}
resources := make(map[string]int64)
for resourceName, resourceQuantity := range container.Resources.Limits {
rname := strings.ToLower(string(resourceName))
if !strings.HasPrefix(rname, ns) {

View File

@ -64,17 +64,21 @@ func getDevicesFromVirtual(realDevPath string) (devs []string, err error) {
switch dir {
case "vfio/":
iommuGroup := filepath.Join(mockRoot, "/sys/kernel/iommu_groups", file, "devices")
files, err := os.ReadDir(iommuGroup)
if err != nil {
return nil, errors.Wrapf(err, "failed to read IOMMU group %s", iommuGroup)
}
for _, file := range files {
realDev, err := filepath.EvalSymlinks(filepath.Join(iommuGroup, file.Name()))
if err != nil {
return nil, errors.Wrapf(err, "failed to get real path for %s", file.Name())
}
devs = append(devs, realDev)
}
return devs, nil
default:
return nil, nil
@ -87,14 +91,17 @@ func getTopologyHint(sysFSPath string) (*Hint, error) {
"local_cpulist": &hint.CPUs,
"numa_node": &hint.NUMAs,
}
if err := readFilesInDirectory(fileMap, sysFSPath); err != nil {
return nil, err
}
// Workarounds for broken information provided by kernel
if hint.NUMAs == "-1" {
// non-NUMA aware device or system, ignore it
hint.NUMAs = ""
}
if hint.NUMAs != "" && hint.CPUs == "" {
// broken topology hint. BIOS reports socket id as NUMA node
// First, try to get hints from parent device or bus.
@ -102,17 +109,21 @@ func getTopologyHint(sysFSPath string) (*Hint, error) {
if er == nil {
cpulist := map[string]bool{}
numalist := map[string]bool{}
for _, h := range parentHints {
if h.CPUs != "" {
cpulist[h.CPUs] = true
}
if h.NUMAs != "" {
numalist[h.NUMAs] = true
}
}
if cpus := strings.Join(mapKeys(cpulist), ","); cpus != "" {
hint.CPUs = cpus
}
if numas := strings.Join(mapKeys(numalist), ","); numas != "" {
hint.NUMAs = numas
}
@ -123,6 +134,7 @@ func getTopologyHint(sysFSPath string) (*Hint, error) {
hint.NUMAs = ""
}
}
return &hint, nil
}
@ -130,30 +142,37 @@ func getTopologyHint(sysFSPath string) (*Hint, error) {
// dependend devices (e.g. RAID).
func NewTopologyHints(devPath string) (hints Hints, err error) {
hints = make(Hints)
realDevPath, err := filepath.EvalSymlinks(devPath)
if err != nil {
return nil, errors.Wrapf(err, "failed get realpath for %s", devPath)
}
for p := realDevPath; strings.HasPrefix(p, mockRoot+"/sys/devices/"); p = filepath.Dir(p) {
hint, err := getTopologyHint(p)
if err != nil {
return nil, err
hint, er := getTopologyHint(p)
if er != nil {
return nil, er
}
if hint.CPUs != "" || hint.NUMAs != "" || hint.Sockets != "" {
hints[hint.Provider] = *hint
break
}
}
fromVirtual, _ := getDevicesFromVirtual(realDevPath)
deps, _ := filepath.Glob(filepath.Join(realDevPath, "slaves/*"))
for _, device := range append(deps, fromVirtual...) {
deviceHints, er := NewTopologyHints(device)
if er != nil {
return nil, er
}
hints = MergeTopologyHints(hints, deviceHints)
}
return
return hints, err
}
// MergeTopologyHints combines org and hints.
@ -163,12 +182,15 @@ func MergeTopologyHints(org, hints Hints) (res Hints) {
} else {
res = make(Hints)
}
for k, v := range hints {
if _, ok := res[k]; ok {
continue
}
res[k] = v
}
return
}
@ -180,10 +202,12 @@ func (h *Hint) String() string {
cpus = "CPUs:" + h.CPUs
sep = ", "
}
if h.NUMAs != "" {
nodes = sep + "NUMAs:" + h.NUMAs
sep = ", "
}
if h.Sockets != "" {
sockets = sep + "sockets:" + h.Sockets
}
@ -202,13 +226,16 @@ func FindSysFsDevice(dev string) (string, error) {
if os.IsNotExist(err) {
return "", nil
}
return "", errors.Wrapf(err, "unable to get stat for %s", dev)
}
devType := "block"
rdev := fi.Sys().(*syscall.Stat_t).Dev
if mode := fi.Mode(); mode&os.ModeDevice != 0 {
rdev = fi.Sys().(*syscall.Stat_t).Rdev
if mode&os.ModeCharDevice != 0 {
devType = "char"
}
@ -216,14 +243,18 @@ func FindSysFsDevice(dev string) (string, error) {
major := unix.Major(rdev)
minor := unix.Minor(rdev)
if major == 0 {
return "", errors.Errorf("%s is a virtual device node", dev)
}
devPath := fmt.Sprintf("/sys/dev/%s/%d:%d", devType, major, minor)
realDevPath, err := filepath.EvalSymlinks(devPath)
if err != nil {
return "", errors.Wrapf(err, "failed get realpath for %s", devPath)
}
return filepath.Join(mockRoot, realDevPath), nil
}
@ -235,10 +266,13 @@ func readFilesInDirectory(fileMap map[string]*string, dir string) error {
if os.IsNotExist(err) {
continue
}
return errors.Wrapf(err, "%s: unable to read file %q", dir, k)
}
*v = strings.TrimSpace(string(b))
}
return nil
}
@ -246,17 +280,21 @@ func readFilesInDirectory(fileMap map[string]*string, dir string) error {
func mapKeys(m map[string]bool) []string {
ret := make([]string, len(m))
i := 0
for k := range m {
ret[i] = k
i++
}
return ret
}
// GetTopologyInfo returns topology information for the list of device nodes.
func GetTopologyInfo(devs []string) (*pluginapi.TopologyInfo, error) {
var result pluginapi.TopologyInfo
nodeIDs := map[int64]struct{}{}
for _, dev := range devs {
sysfsDevice, err := FindSysFsDevice(dev)
if err != nil {
@ -279,9 +317,11 @@ func GetTopologyInfo(devs []string) (*pluginapi.TopologyInfo, error) {
if err != nil {
return nil, errors.Wrapf(err, "unable to convert numa node %s into int64", nNode)
}
if nNodeID < 0 {
return nil, errors.Wrapf(err, "numa node is negative: %d", nNodeID)
}
if _, ok := nodeIDs[nNodeID]; !ok {
result.Nodes = append(result.Nodes, &pluginapi.NUMANode{ID: nNodeID})
nodeIDs[nNodeID] = struct{}{}
@ -290,6 +330,8 @@ func GetTopologyInfo(devs []string) (*pluginapi.TopologyInfo, error) {
}
}
}
sort.Slice(result.Nodes, func(i, j int) bool { return result.Nodes[i].ID < result.Nodes[j].ID })
return &result, nil
}

View File

@ -29,13 +29,16 @@ func setupTestEnv(t *testing.T) func() {
if err != nil {
t.Fatal("unable to get current directory")
}
if path, err := filepath.EvalSymlinks(pwd); err == nil {
pwd = path
}
mockRoot = pwd + "/testdata"
teardown := func() {
mockRoot = ""
}
return teardown
}
@ -78,8 +81,10 @@ func TestFindSysFsDevice(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
teardown := setupTestEnv(t)
defer teardown()
cases := []struct {
name string
input string
@ -124,6 +129,7 @@ func TestFindSysFsDevice(t *testing.T) {
func TestReadFilesInDirectory(t *testing.T) {
var file, empty string
fname := "test-a"
content := []byte(" something\n")
expectedContent := "something"
@ -137,7 +143,9 @@ func TestReadFilesInDirectory(t *testing.T) {
if err != nil {
t.Fatalf("unable to create test directory: %+v", err)
}
defer os.RemoveAll(dir)
if err = os.WriteFile(filepath.Join(dir, fname), content, 0600); err != nil {
t.Fatalf("unexpected failure: %v", err)
}
@ -145,9 +153,11 @@ func TestReadFilesInDirectory(t *testing.T) {
if err = readFilesInDirectory(fileMap, dir); err != nil {
t.Fatalf("unexpected failure: %v", err)
}
if empty != "" {
t.Fatalf("unexpected content: %q", empty)
}
if file != expectedContent {
t.Fatalf("unexpected content: %q expected: %q", file, expectedContent)
}
@ -274,8 +284,10 @@ func TestNewTopologyHints(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
teardown := setupTestEnv(t)
defer teardown()
cases := []struct {
output Hints
name string
@ -320,8 +332,10 @@ func TestGetTopologyInfo(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
teardown := setupTestEnv(t)
defer teardown()
cases := []struct {
name string
output *pluginapi.TopologyInfo

View File

@ -69,6 +69,7 @@ func getAesmdVolume(needsAesmd bool, epcUserCount int32, aesmdPresent bool) *cor
// deployment detected. aesmd socket path is provided as a hostpath volume and mounted
// by all (SGX) containers.
dirOrCreate := corev1.HostPathDirectoryOrCreate
return &corev1.Volume{
Name: "aesmd-socket",
VolumeSource: corev1.VolumeSource{
@ -93,6 +94,7 @@ func warnWrongResources(resources map[string]int64) []string {
if ok {
warnings = append(warnings, provision+" should not be used in Pod spec directly")
}
return warnings
}
@ -186,9 +188,8 @@ func (s *Mutator) Handle(ctx context.Context, req admission.Request) admission.R
Name: "SGX_AESM_ADDR",
Value: "1",
})
// container mutate logic for non-aesmd quote providers
// case:
}
pod.Spec.Containers[idx] = container
}
@ -196,6 +197,7 @@ func (s *Mutator) Handle(ctx context.Context, req admission.Request) admission.R
if pod.Spec.Volumes == nil {
pod.Spec.Volumes = make([]corev1.Volume, 0)
}
pod.Spec.Volumes = append(pod.Spec.Volumes, *vol)
}

View File

@ -63,7 +63,9 @@ func setupFirstNode() []byte {
if err != nil {
framework.Failf("Error deleting orphaned namespaces: %v", err)
}
framework.Logf("Waiting for deletion of the following namespaces: %v", deleted)
if err = framework.WaitForNamespacesDeleted(c, deleted, framework.DefaultPodDeletionTimeout); err != nil {
framework.Failf("Failed to delete orphaned namespaces %v: %v", deleted, err)
}
@ -84,10 +86,12 @@ func setupFirstNode() []byte {
// Log the version of the server and this client.
framework.Logf("e2e test version: %s", version.Get().GitVersion)
serverVersion, err := c.DiscoveryClient.ServerVersion()
if err != nil {
framework.Logf("Unexpected server error retrieving version: %v", err)
}
if serverVersion != nil {
framework.Logf("kube-apiserver version: %s", serverVersion.GitVersion)
}

View File

@ -74,6 +74,7 @@ func runTestCase(fmw *framework.Framework, pluginKustomizationPath, mappingsColl
if err != nil {
framework.Failf("unable to create temp directory: %v", err)
}
defer os.RemoveAll(tmpDir)
err = utils.CreateKustomizationOverlay(fmw.Namespace.Name, filepath.Dir(pluginKustomizationPath)+"/../overlays/"+pluginMode, tmpDir)
@ -90,7 +91,9 @@ func runTestCase(fmw *framework.Framework, pluginKustomizationPath, mappingsColl
waitForPod(fmw, "intel-fpga-plugin")
resource := v1.ResourceName(nodeResource)
ginkgo.By("checking if the resource is allocatable")
if err := utils.WaitForNodesWithResource(fmw.ClientSet, resource, 30*time.Second); err != nil {
framework.Failf("unable to wait for nodes to have positive allocatable resource: %v", err)
}
@ -99,6 +102,7 @@ func runTestCase(fmw *framework.Framework, pluginKustomizationPath, mappingsColl
image := "intel/opae-nlb-demo:devel"
ginkgo.By("submitting a pod requesting correct FPGA resources")
pod := createPod(fmw, fmt.Sprintf("fpgaplugin-%s-%s-%s-correct", pluginMode, cmd1, cmd2), resource, image, []string{cmd1, "-S0"})
ginkgo.By("waiting the pod to finish successfully")
@ -109,6 +113,7 @@ func runTestCase(fmw *framework.Framework, pluginKustomizationPath, mappingsColl
//framework.RunKubectlOrDie(fmw.Namespace.Name, "--namespace", fmw.Namespace.Name, "logs", pod.ObjectMeta.Name)
ginkgo.By("submitting a pod requesting incorrect FPGA resources")
pod = createPod(fmw, fmt.Sprintf("fpgaplugin-%s-%s-%s-incorrect", pluginMode, cmd1, cmd2), resource, image, []string{cmd2, "-S0"})
ginkgo.By("waiting the pod failure")
@ -145,11 +150,13 @@ func createPod(fmw *framework.Framework, name string, resourceName v1.ResourceNa
pod, err := fmw.ClientSet.CoreV1().Pods(fmw.Namespace.Name).Create(context.TODO(),
podSpec, metav1.CreateOptions{})
framework.ExpectNoError(err, "pod Create API error")
return pod
}
func waitForPod(fmw *framework.Framework, name string) {
ginkgo.By(fmt.Sprintf("waiting for %s availability", name))
if _, err := e2epod.WaitForPodsWithLabelRunningReady(fmw.ClientSet, fmw.Namespace.Name,
labels.Set{"app": name}.AsSelector(), 1, 100*time.Second); err != nil {
framework.DumpAllNamespaceInfo(fmw.ClientSet, fmw.Namespace.Name)

View File

@ -73,12 +73,14 @@ func checkPodMutation(f *framework.Framework, mappingsNamespace string, source,
}
ginkgo.By("deploying webhook")
_ = utils.DeployWebhook(f, kustomizationPath)
ginkgo.By("deploying mappings")
framework.RunKubectlOrDie(f.Namespace.Name, "apply", "-n", mappingsNamespace, "-f", filepath.Dir(kustomizationPath)+"/../mappings-collection.yaml")
ginkgo.By("submitting a pod for admission")
podSpec := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "webhook-tester"},
Spec: v1.PodSpec{
@ -105,11 +107,13 @@ func checkPodMutation(f *framework.Framework, mappingsNamespace string, source,
framework.ExpectNoError(err, "pod Create API error")
ginkgo.By("checking the pod has been mutated")
q, ok := pod.Spec.Containers[0].Resources.Limits[expectedMutation]
if !ok {
framework.DumpAllNamespaceInfo(f.ClientSet, f.Namespace.Name)
kubectl.LogFailedContainers(f.ClientSet, f.Namespace.Name, framework.Logf)
framework.Fail("pod hasn't been mutated")
}
gomega.Expect(q.String()).To(gomega.Equal("1"))
}

View File

@ -43,10 +43,12 @@ func describeQatDpdkPlugin() {
if err != nil {
framework.Failf("unable to locate %q: %v", dpdkKustomizationYaml, err)
}
compressTestYamlPath, err := utils.LocateRepoFile(compressTestYaml)
if err != nil {
framework.Failf("unable to locate %q: %v", compressTestYaml, err)
}
cryptoTestYamlPath, err := utils.LocateRepoFile(cryptoTestYaml)
if err != nil {
framework.Failf("unable to locate %q: %v", cryptoTestYaml, err)

View File

@ -40,6 +40,7 @@ func init() {
func describe() {
f := framework.NewDefaultFramework("sgxwebhook")
var webhook v1.Pod
ginkgo.BeforeEach(func() {
@ -145,8 +146,10 @@ func checkMutatedResources(f *framework.Framework, r v1.ResourceRequirements, ex
kubectl.LogFailedContainers(f.ClientSet, f.Namespace.Name, framework.Logf)
framework.Fail("the pod has missing resources")
}
gomega.Expect(q.String()).To(gomega.Equal("1"))
}
for _, res := range forbiddenResources {
_, ok := r.Limits[res]
if ok {

View File

@ -43,7 +43,9 @@ const (
// WaitForNodesWithResource waits for nodes to have positive allocatable resource.
func WaitForNodesWithResource(c clientset.Interface, res v1.ResourceName, timeout time.Duration) error {
framework.Logf("Waiting up to %s for any positive allocatable resource %q", timeout, res)
start := time.Now()
err := wait.Poll(poll, timeout,
func() (bool, error) {
for t := time.Now(); time.Since(t) < nodeListTimeout; time.Sleep(poll) {
@ -66,6 +68,7 @@ func WaitForNodesWithResource(c clientset.Interface, res v1.ResourceName, timeou
return false, errors.New("unable to list nodes")
})
return err
}
@ -106,6 +109,7 @@ func LocateRepoFile(repopath string) (string, error) {
if _, err := os.Stat(path); !os.IsNotExist(err) {
return path, nil
}
path = filepath.Join(currentDir, "../../"+repopath)
if _, err := os.Stat(path); !os.IsNotExist(err) {
return path, err
@ -120,9 +124,10 @@ func CreateKustomizationOverlay(namespace, base, overlay string) error {
for range strings.Split(overlay[1:], "/") {
relPath = relPath + "../"
}
relPath = relPath + base[1:]
relPath = relPath + base[1:]
content := fmt.Sprintf("namespace: %s\nbases:\n - %s", namespace, relPath)
return os.WriteFile(overlay+"/kustomization.yaml", []byte(content), 0600)
}
@ -137,6 +142,7 @@ func DeployWebhook(f *framework.Framework, kustomizationPath string) v1.Pod {
if err != nil {
framework.Failf("unable to create temp directory: %v", err)
}
defer os.RemoveAll(tmpDir)
err = CreateKustomizationOverlay(f.Namespace.Name, filepath.Dir(kustomizationPath), tmpDir)
@ -145,6 +151,7 @@ func DeployWebhook(f *framework.Framework, kustomizationPath string) v1.Pod {
}
framework.RunKubectlOrDie(f.Namespace.Name, "apply", "-k", tmpDir)
podList, err := e2epod.WaitForPodsWithLabelRunningReady(f.ClientSet, f.Namespace.Name,
labels.Set{"control-plane": "controller-manager"}.AsSelector(), 1 /* one replica */, 30*time.Second)
if err != nil {
@ -167,14 +174,17 @@ func TestContainersRunAsNonRoot(pods []v1.Pod) error {
if c.SecurityContext.RunAsNonRoot == nil || !*c.SecurityContext.RunAsNonRoot {
return errors.Errorf("%s (container: %s): RunAsNonRoot is not true", p.Name, c.Name)
}
if c.SecurityContext.RunAsGroup == nil || *c.SecurityContext.RunAsGroup == 0 {
return errors.Errorf("%s (container: %s): RunAsGroup is root (0)", p.Name, c.Name)
}
if c.SecurityContext.RunAsUser == nil || *c.SecurityContext.RunAsUser == 0 {
return errors.Errorf("%s (container: %s): RunAsUser is root (0)", p.Name, c.Name)
}
}
}
return nil
}
@ -194,8 +204,10 @@ func TestPodsFileSystemInfo(pods []v1.Pod) error {
if c.SecurityContext.ReadOnlyRootFilesystem == nil || !*c.SecurityContext.ReadOnlyRootFilesystem {
return errors.Errorf("%s (container: %s): Writable root filesystem", p.Name, c.Name)
}
printVolumeMounts(c.VolumeMounts)
}
}
return nil
}

View File

@ -42,12 +42,14 @@ import (
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
var cfg *rest.Config
var k8sClient client.Client
var k8sManager ctrl.Manager
var testEnv *envtest.Environment
var ctx context.Context
var cancel context.CancelFunc
var (
cfg *rest.Config
k8sClient client.Client
k8sManager ctrl.Manager
testEnv *envtest.Environment
ctx context.Context
cancel context.CancelFunc
)
func TestAPIs(t *testing.T) {
RegisterFailHandler(Fail)