mirror of
https://github.com/intel/intel-device-plugins-for-kubernetes.git
synced 2025-06-03 03:59:37 +00:00
gpu_plugin: fractional resource management
Fractional resource management feature Signed-off-by: Ukri Niemimuukko <ukri.niemimuukko@intel.com> Signed-off-by: Dmitry Rozhkov <dmitry.rozhkov@intel.com>
This commit is contained in:
parent
518fb93461
commit
2c4d529d66
@ -119,6 +119,30 @@ $ kubectl apply -k deployments/gpu_plugin/overlays/nfd_labeled_nodes
|
||||
daemonset.apps/intel-gpu-plugin created
|
||||
```
|
||||
|
||||
The experimental fractional-resource feature can be enabled by running:
|
||||
|
||||
```bash
|
||||
$ kubectl apply -k deployments/gpu_plugin/overlays/fractional_resources
|
||||
serviceaccount/resource-reader-sa created
|
||||
clusterrole.rbac.authorization.k8s.io/resource-reader created
|
||||
clusterrolebinding.rbac.authorization.k8s.io/resource-reader-rb created
|
||||
daemonset.apps/intel-gpu-plugin created
|
||||
```
|
||||
|
||||
Usage of fractional GPU resources, such as GPU memory, requires that the cluster has node
|
||||
extended resources with the name prefix `gpu.intel.com/`. Those can be created with NFD
|
||||
by running the hook installed by the plugin initcontainer. When fractional resources are
|
||||
enabled, the plugin lets a scheduler extender do card selection decisions based on resource
|
||||
availability and the amount of extended resources requested in the pod spec.
|
||||
|
||||
The scheduler extender then needs to annotate the pod objects with unique
|
||||
increasing numeric timestamps in the annotation `gas-ts` and container card selections in
|
||||
`gas-container-cards` annotation. The latter has container separator `|` and card separator
|
||||
`,`. Example for a pod with two containers and both containers getting two cards:
|
||||
`gas-container-cards:card0,card1|card2,card3`. Enabling the fractional-resource support
|
||||
in the plugin without running such an annotation adding scheduler extender in the cluster
|
||||
will only slow down GPU-deployments, so do not enable this feature unnecessarily.
|
||||
|
||||
> **Note**: It is also possible to run the GPU device plugin using a non-root user. To do this,
|
||||
the nodes' DAC rules must be configured to device plugin socket creation and kubelet registration.
|
||||
Furthermore, the deployments `securityContext` must be configured with appropriate `runAsUser/runAsGroup`.
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"k8s.io/klog/v2"
|
||||
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/cmd/gpu_plugin/rm"
|
||||
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin"
|
||||
)
|
||||
|
||||
@ -52,8 +53,9 @@ const (
|
||||
)
|
||||
|
||||
type cliOptions struct {
|
||||
sharedDevNum int
|
||||
enableMonitoring bool
|
||||
sharedDevNum int
|
||||
enableMonitoring bool
|
||||
resourceManagement bool
|
||||
}
|
||||
|
||||
type devicePlugin struct {
|
||||
@ -67,10 +69,12 @@ type devicePlugin struct {
|
||||
|
||||
scanTicker *time.Ticker
|
||||
scanDone chan bool
|
||||
|
||||
resMan rm.ResourceManager
|
||||
}
|
||||
|
||||
func newDevicePlugin(sysfsDir, devfsDir string, options cliOptions) *devicePlugin {
|
||||
return &devicePlugin{
|
||||
dp := &devicePlugin{
|
||||
sysfsDir: sysfsDir,
|
||||
devfsDir: devfsDir,
|
||||
options: options,
|
||||
@ -79,6 +83,17 @@ func newDevicePlugin(sysfsDir, devfsDir string, options cliOptions) *devicePlugi
|
||||
scanTicker: time.NewTicker(scanPeriod),
|
||||
scanDone: make(chan bool, 1), // buffered as we may send to it before Scan starts receiving from it
|
||||
}
|
||||
|
||||
if options.resourceManagement {
|
||||
var err error
|
||||
dp.resMan, err = rm.NewResourceManager(monitorID, namespace+"/"+deviceType)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to create resource manager: %+v", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return dp
|
||||
}
|
||||
|
||||
func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
|
||||
@ -132,6 +147,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
|
||||
|
||||
var monitor []pluginapi.DeviceSpec
|
||||
devTree := dpapi.NewDeviceTree()
|
||||
rmDevInfos := rm.NewDeviceInfoMap()
|
||||
for _, f := range files {
|
||||
var nodes []pluginapi.DeviceSpec
|
||||
|
||||
@ -180,6 +196,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
|
||||
// Currently only one device type (i915) is supported.
|
||||
// TODO: check model ID to differentiate device models.
|
||||
devTree.AddDevice(deviceType, devID, deviceInfo)
|
||||
rmDevInfos[devID] = rm.NewDeviceInfo(nodes, nil, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -189,18 +206,36 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
|
||||
devTree.AddDevice(monitorType, monitorID, deviceInfo)
|
||||
}
|
||||
|
||||
if dp.resMan != nil {
|
||||
dp.resMan.SetDevInfos(rmDevInfos)
|
||||
}
|
||||
|
||||
return devTree, nil
|
||||
}
|
||||
|
||||
func (dp *devicePlugin) Allocate(request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
||||
if dp.resMan != nil {
|
||||
return dp.resMan.ReallocateWithFractionalResources(request)
|
||||
}
|
||||
|
||||
return nil, &dpapi.UseDefaultMethodError{}
|
||||
}
|
||||
|
||||
func main() {
|
||||
var opts cliOptions
|
||||
|
||||
flag.BoolVar(&opts.enableMonitoring, "enable-monitoring", false, "whether to enable 'i915_monitoring' (= all GPUs) resource")
|
||||
flag.BoolVar(&opts.resourceManagement, "resource-manager", false, "fractional GPU resource management")
|
||||
flag.IntVar(&opts.sharedDevNum, "shared-dev-num", 1, "number of containers sharing the same GPU device")
|
||||
flag.Parse()
|
||||
|
||||
if opts.sharedDevNum < 1 {
|
||||
klog.Warning("The number of containers sharing the same GPU must greater than zero")
|
||||
klog.Error("The number of containers sharing the same GPU must greater than zero")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
if opts.sharedDevNum == 1 && opts.resourceManagement {
|
||||
klog.Error("Trying to use fractional resources with shared-dev-num 1 is pointless")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,9 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
|
||||
"github.com/intel/intel-device-plugins-for-kubernetes/cmd/gpu_plugin/rm"
|
||||
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin"
|
||||
)
|
||||
|
||||
@ -44,6 +46,13 @@ func (n *mockNotifier) Notify(newDeviceTree dpapi.DeviceTree) {
|
||||
n.scanDone <- true
|
||||
}
|
||||
|
||||
type mockResourceManager struct{}
|
||||
|
||||
func (m *mockResourceManager) ReallocateWithFractionalResources(*v1beta1.AllocateRequest) (*v1beta1.AllocateResponse, error) {
|
||||
return &v1beta1.AllocateResponse{}, &dpapi.UseDefaultMethodError{}
|
||||
}
|
||||
func (m *mockResourceManager) SetDevInfos(rm.DeviceInfoMap) {}
|
||||
|
||||
func createTestFiles(root string, devfsdirs, sysfsdirs []string, sysfsfiles map[string][]byte) (string, string, error) {
|
||||
sysfs := path.Join(root, "sys")
|
||||
devfs := path.Join(root, "dev")
|
||||
@ -66,6 +75,30 @@ func createTestFiles(root string, devfsdirs, sysfsdirs []string, sysfsfiles map[
|
||||
return sysfs, devfs, nil
|
||||
}
|
||||
|
||||
func TestNewDevicePlugin(t *testing.T) {
|
||||
if newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: false}) == nil {
|
||||
t.Error("Failed to create plugin")
|
||||
}
|
||||
if newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: true}) != nil {
|
||||
t.Error("Unexpectedly managed to create resource management enabled plugin inside unit tests")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocate(t *testing.T) {
|
||||
plugin := newDevicePlugin("", "", cliOptions{sharedDevNum: 2, resourceManagement: false})
|
||||
_, err := plugin.Allocate(&v1beta1.AllocateRequest{})
|
||||
if _, ok := err.(*dpapi.UseDefaultMethodError); !ok {
|
||||
t.Errorf("Unexpected return value: %+v", err)
|
||||
}
|
||||
|
||||
// mock the rm
|
||||
plugin.resMan = &mockResourceManager{}
|
||||
_, err = plugin.Allocate(&v1beta1.AllocateRequest{})
|
||||
if _, ok := err.(*dpapi.UseDefaultMethodError); !ok {
|
||||
t.Errorf("Unexpected return value: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestScan(t *testing.T) {
|
||||
tcases := []struct {
|
||||
name string
|
||||
@ -204,6 +237,8 @@ func TestScan(t *testing.T) {
|
||||
scanDone: plugin.scanDone,
|
||||
}
|
||||
|
||||
plugin.resMan = &mockResourceManager{}
|
||||
|
||||
err = plugin.Scan(notifier)
|
||||
// Scans in GPU plugin never fail
|
||||
if err != nil {
|
||||
|
408
cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go
Normal file
408
cmd/gpu_plugin/rm/gpu_plugin_resource_manager.go
Normal file
@ -0,0 +1,408 @@
|
||||
// Copyright 2021 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/pkg/deviceplugin"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
)
|
||||
|
||||
const (
|
||||
gasTSAnnotation = "gas-ts"
|
||||
gasCardAnnotation = "gas-container-cards"
|
||||
|
||||
grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock"
|
||||
grpcBufferSize = 4 * 1024 * 1024
|
||||
grpcTimeout = 5 * time.Second
|
||||
|
||||
retryTimeout = 1 * time.Second
|
||||
)
|
||||
|
||||
// Errors.
|
||||
type retryErr struct{}
|
||||
type zeroPendingErr struct{}
|
||||
|
||||
func (e *retryErr) Error() string {
|
||||
return "things didn't work out, but perhaps a retry will help"
|
||||
}
|
||||
func (e *zeroPendingErr) Error() string {
|
||||
return "there are no pending pods anymore in this node"
|
||||
}
|
||||
|
||||
type podCandidate struct {
|
||||
pod *v1.Pod
|
||||
name string
|
||||
allocatedContainers int
|
||||
allocationTargetNum int
|
||||
}
|
||||
|
||||
type deviceInfo struct {
|
||||
nodes []pluginapi.DeviceSpec
|
||||
mounts []pluginapi.Mount
|
||||
envs map[string]string
|
||||
}
|
||||
|
||||
type getClientFunc func(string, time.Duration, int) (podresourcesv1.PodResourcesListerClient, *grpc.ClientConn, error)
|
||||
|
||||
type ResourceManager interface {
|
||||
ReallocateWithFractionalResources(*pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error)
|
||||
SetDevInfos(DeviceInfoMap)
|
||||
}
|
||||
|
||||
type resourceManager struct {
|
||||
mutex sync.RWMutex // for devTree updates during scan
|
||||
deviceInfos DeviceInfoMap
|
||||
nodeName string
|
||||
clientset kubernetes.Interface
|
||||
skipID string
|
||||
fullResourceName string
|
||||
prGetClientFunc getClientFunc
|
||||
}
|
||||
|
||||
func NewDeviceInfo(nodes []pluginapi.DeviceSpec, mounts []pluginapi.Mount, envs map[string]string) *deviceInfo {
|
||||
return &deviceInfo{
|
||||
nodes: nodes,
|
||||
mounts: mounts,
|
||||
envs: envs,
|
||||
}
|
||||
}
|
||||
|
||||
type DeviceInfoMap map[string]*deviceInfo
|
||||
|
||||
func NewDeviceInfoMap() DeviceInfoMap {
|
||||
return DeviceInfoMap{}
|
||||
}
|
||||
|
||||
// NewResourceManager creates a new resource manager.
|
||||
func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error) {
|
||||
clientset, err := getClientset()
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "couldn't get clientset")
|
||||
}
|
||||
|
||||
rm := resourceManager{
|
||||
nodeName: os.Getenv("NODE_NAME"),
|
||||
clientset: clientset,
|
||||
skipID: skipID,
|
||||
fullResourceName: fullResourceName,
|
||||
prGetClientFunc: podresources.GetV1Client,
|
||||
}
|
||||
|
||||
klog.Info("GPU device plugin resource manager enabled")
|
||||
|
||||
return &rm, nil
|
||||
}
|
||||
|
||||
// ReallocateWithFractionalResources runs the fractional resource logic.
|
||||
// This intentionally only logs errors and returns with the UseDefaultMethodError,
|
||||
// in case any errors are hit. This is to avoid clusters filling up with unexpected admission errors.
|
||||
func (rm *resourceManager) ReallocateWithFractionalResources(request *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
|
||||
if !isInputOk(request, rm.skipID) {
|
||||
// it is better to leave allocated gpu devices as is and return
|
||||
return nil, &dpapi.UseDefaultMethodError{}
|
||||
}
|
||||
|
||||
podCandidate, err := rm.findAllocationPodCandidate()
|
||||
if _, ok := err.(*retryErr); ok {
|
||||
klog.Warning("retrying POD resolving after sleeping")
|
||||
time.Sleep(retryTimeout)
|
||||
podCandidate, err = rm.findAllocationPodCandidate()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
if _, ok := err.(*zeroPendingErr); !ok {
|
||||
klog.Error("allocation candidate not found, perhaps the GPU scheduler extender is not called, err:", err)
|
||||
}
|
||||
// it is better to leave allocated gpu devices as is and return
|
||||
return nil, &dpapi.UseDefaultMethodError{}
|
||||
}
|
||||
|
||||
pod := podCandidate.pod
|
||||
cards := containerCards(pod, podCandidate.allocatedContainers)
|
||||
|
||||
return rm.createAllocateResponse(cards)
|
||||
}
|
||||
|
||||
func isInputOk(rqt *pluginapi.AllocateRequest, skipID string) bool {
|
||||
// so far kubelet calls allocate for each container separately. If that changes, we need to refine our logic.
|
||||
if len(rqt.ContainerRequests) != 1 {
|
||||
klog.Warning("multi-container allocation request not supported")
|
||||
return false
|
||||
}
|
||||
|
||||
crqt := rqt.ContainerRequests[0]
|
||||
for _, id := range crqt.DevicesIDs {
|
||||
if id == skipID {
|
||||
return false // intentionally not printing anything, this request is skipped
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// findAllocationPodCandidate tries to find the best allocation candidate pod, which must be:
|
||||
// -pending for this node
|
||||
// -using GPU resources in its spec
|
||||
// -is found via grpc service with unallocated GPU devices
|
||||
// returns:
|
||||
// -the candidate pod struct pointer and no error, or
|
||||
// -errRetry if unsuccessful, but there is perhaps hope of trying again with better luck
|
||||
// -errZeroPending if no pending pods exist anymore (which is fine)
|
||||
// -any grpc communication errors
|
||||
func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {
|
||||
// get map of pending pods for this node
|
||||
pendingPods, err := rm.getNodePendingGPUPods()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
candidates, err := rm.findAllocationPodCandidates(pendingPods)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
numCandidates := len(candidates)
|
||||
switch numCandidates {
|
||||
case 0:
|
||||
// fine, this typically happens when deployment is deleted before PODs start
|
||||
klog.V(4).Info("zero pending pods")
|
||||
return nil, &zeroPendingErr{}
|
||||
case 1:
|
||||
// perfect, only one option
|
||||
klog.V(4).Info("only one pending pod")
|
||||
if _, ok := candidates[0].pod.Annotations[gasCardAnnotation]; !ok {
|
||||
klog.Warningf("Pending POD annotations from scheduler not yet visible for pod %q", candidates[0].pod.Name)
|
||||
return nil, &retryErr{}
|
||||
}
|
||||
return &candidates[0], nil
|
||||
default: // > 1 candidates, not good, need to pick the best
|
||||
// look for scheduler timestamps and sort by them
|
||||
klog.V(4).Infof("%v pods pending, picking oldest", numCandidates)
|
||||
timestampedCandidates := []podCandidate{}
|
||||
for _, candidate := range candidates {
|
||||
if _, ok := pendingPods[candidate.name].Annotations[gasTSAnnotation]; ok {
|
||||
timestampedCandidates = append(timestampedCandidates, candidate)
|
||||
}
|
||||
}
|
||||
sort.Slice(timestampedCandidates,
|
||||
func(i, j int) bool {
|
||||
return pendingPods[timestampedCandidates[i].name].Annotations[gasTSAnnotation] <
|
||||
pendingPods[timestampedCandidates[j].name].Annotations[gasTSAnnotation]
|
||||
})
|
||||
if len(timestampedCandidates) == 0 {
|
||||
klog.Warning("Pending POD annotations from scheduler not yet visible")
|
||||
return nil, &retryErr{}
|
||||
}
|
||||
return ×tampedCandidates[0], nil
|
||||
}
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups="",resources=pods,verbs=list
|
||||
|
||||
// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu.
|
||||
func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) {
|
||||
selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName +
|
||||
",status.phase=" + string(v1.PodPending))
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to parse selector")
|
||||
}
|
||||
|
||||
pendingPodList, err := rm.clientset.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
|
||||
FieldSelector: selector.String(),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to list pods")
|
||||
}
|
||||
|
||||
// make a map ouf of the list, accept only GPU-using pods
|
||||
pendingPods := make(map[string]*v1.Pod)
|
||||
for i := range pendingPodList.Items {
|
||||
pod := &pendingPodList.Items[i]
|
||||
|
||||
if numGPUUsingContainers(pod, rm.fullResourceName) > 0 {
|
||||
pendingPods[pod.Name] = pod
|
||||
}
|
||||
}
|
||||
|
||||
return pendingPods, nil
|
||||
}
|
||||
|
||||
// findAllocationPodCandidates returns a slice of all potential allocation candidate pods.
|
||||
// This goes through the PODs listed in the podresources grpc service and finds those among pending
|
||||
// pods which don't have all GPU devices allocated.
|
||||
func (rm *resourceManager) findAllocationPodCandidates(pendingPods map[string]*v1.Pod) ([]podCandidate, error) {
|
||||
resListerClient, clientConn, err := rm.prGetClientFunc(grpcAddress, grpcTimeout, grpcBufferSize)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Could not get a grpc client for reading plugin resources")
|
||||
}
|
||||
|
||||
defer clientConn.Close()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
|
||||
defer cancel()
|
||||
|
||||
resp, err := resListerClient.List(ctx, &podresourcesv1.ListPodResourcesRequest{})
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "Could not read plugin resources via grpc")
|
||||
}
|
||||
|
||||
candidates := []podCandidate{}
|
||||
for _, podRes := range resp.PodResources {
|
||||
// count allocated gpu-using containers
|
||||
numContainersAllocated := 0
|
||||
for _, cont := range podRes.Containers {
|
||||
for _, dev := range cont.Devices {
|
||||
if dev.ResourceName == rm.fullResourceName {
|
||||
numContainersAllocated++
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if pod, pending := pendingPods[podRes.Name]; pending {
|
||||
allocationTargetNum := numGPUUsingContainers(pod, rm.fullResourceName)
|
||||
if numContainersAllocated < allocationTargetNum {
|
||||
candidate := podCandidate{
|
||||
pod: pod,
|
||||
name: podRes.Name,
|
||||
allocatedContainers: numContainersAllocated,
|
||||
allocationTargetNum: allocationTargetNum,
|
||||
}
|
||||
candidates = append(candidates, candidate)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return candidates, nil
|
||||
}
|
||||
|
||||
func (rm *resourceManager) SetDevInfos(deviceInfos DeviceInfoMap) {
|
||||
rm.mutex.Lock()
|
||||
defer rm.mutex.Unlock()
|
||||
rm.deviceInfos = deviceInfos
|
||||
}
|
||||
|
||||
func (rm *resourceManager) createAllocateResponse(cards []string) (*pluginapi.AllocateResponse, error) {
|
||||
rm.mutex.Lock()
|
||||
defer rm.mutex.Unlock()
|
||||
|
||||
allocateResponse := pluginapi.AllocateResponse{}
|
||||
cresp := pluginapi.ContainerAllocateResponse{}
|
||||
|
||||
for _, card := range cards {
|
||||
newDeviceID := card + "-0"
|
||||
|
||||
dev, ok := rm.deviceInfos[newDeviceID]
|
||||
if !ok {
|
||||
klog.Warningf("No device info for %q, using default allocation method devices", newDeviceID)
|
||||
return nil, &dpapi.UseDefaultMethodError{}
|
||||
}
|
||||
|
||||
// add new devices
|
||||
nodes := dev.nodes
|
||||
for i := range nodes {
|
||||
cresp.Devices = append(cresp.Devices, &nodes[i])
|
||||
}
|
||||
|
||||
// add new mounts
|
||||
mounts := dev.mounts
|
||||
for i := range mounts {
|
||||
cresp.Mounts = append(cresp.Mounts, &mounts[i])
|
||||
}
|
||||
|
||||
for key, value := range dev.envs {
|
||||
if cresp.Envs == nil {
|
||||
cresp.Envs = make(map[string]string)
|
||||
}
|
||||
cresp.Envs[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
allocateResponse.ContainerResponses = append(allocateResponse.ContainerResponses, &cresp)
|
||||
|
||||
return &allocateResponse, nil
|
||||
}
|
||||
|
||||
func numGPUUsingContainers(pod *v1.Pod, fullResourceName string) int {
|
||||
num := 0
|
||||
for _, container := range pod.Spec.Containers {
|
||||
for reqName, quantity := range container.Resources.Requests {
|
||||
resourceName := reqName.String()
|
||||
if resourceName == fullResourceName {
|
||||
value, _ := quantity.AsInt64()
|
||||
if value > 0 {
|
||||
num++
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return num
|
||||
}
|
||||
|
||||
// containerCards returns the cards to use for a single container.
|
||||
// gpuUsingContainerIndex 0 == first gpu-using container in the pod.
|
||||
func containerCards(pod *v1.Pod, gpuUsingContainerIndex int) []string {
|
||||
fullAnnotation := pod.Annotations[gasCardAnnotation]
|
||||
cardLists := strings.Split(fullAnnotation, "|")
|
||||
klog.V(3).Infof("%s:%v", fullAnnotation, cardLists)
|
||||
|
||||
i := 0
|
||||
for _, cardList := range cardLists {
|
||||
cards := strings.Split(cardList, ",")
|
||||
if len(cards) > 0 && len(cardList) > 0 {
|
||||
if gpuUsingContainerIndex == i {
|
||||
klog.V(3).Infof("Cards for container nr %v in pod %v are %v", gpuUsingContainerIndex, pod.Name, cards)
|
||||
return cards
|
||||
}
|
||||
i++
|
||||
}
|
||||
}
|
||||
klog.Warningf("couldn't find cards for gpu using container index %v", gpuUsingContainerIndex)
|
||||
return nil
|
||||
}
|
||||
|
||||
func getClientset() (*kubernetes.Clientset, error) {
|
||||
config, err := rest.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return clientset, nil
|
||||
}
|
241
cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go
Normal file
241
cmd/gpu_plugin/rm/gpu_plugin_resource_manager_test.go
Normal file
@ -0,0 +1,241 @@
|
||||
// Copyright 2021 Intel Corporation. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package rm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
fakecorev1 "k8s.io/client-go/kubernetes/typed/core/v1/fake"
|
||||
"k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
)
|
||||
|
||||
// mockClient implements enough of k8s API for the resource manager tests.
|
||||
type mockClient struct {
|
||||
fake.Clientset
|
||||
mockCoreV1
|
||||
}
|
||||
|
||||
func (m *mockClient) CoreV1() corev1.CoreV1Interface {
|
||||
return m
|
||||
}
|
||||
|
||||
type mockCoreV1 struct {
|
||||
fakecorev1.FakeCoreV1
|
||||
mockPods
|
||||
}
|
||||
|
||||
func (m *mockCoreV1) Pods(namespace string) corev1.PodInterface {
|
||||
return m
|
||||
}
|
||||
|
||||
type mockPods struct {
|
||||
fakecorev1.FakePods
|
||||
pods []v1.Pod
|
||||
}
|
||||
|
||||
func (m *mockPods) List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error) {
|
||||
return &v1.PodList{
|
||||
Items: m.pods,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type mockPodResources struct {
|
||||
pods []v1.Pod
|
||||
}
|
||||
|
||||
func (w *mockPodResources) List(ctx context.Context,
|
||||
in *podresourcesv1.ListPodResourcesRequest,
|
||||
opts ...grpc.CallOption) (*podresourcesv1.ListPodResourcesResponse, error) {
|
||||
resp := podresourcesv1.ListPodResourcesResponse{}
|
||||
for _, pod := range w.pods {
|
||||
resp.PodResources = append(resp.PodResources, &podresourcesv1.PodResources{
|
||||
Name: pod.ObjectMeta.Name, Containers: []*podresourcesv1.ContainerResources{{}},
|
||||
})
|
||||
}
|
||||
return &resp, nil
|
||||
}
|
||||
func (w *mockPodResources) GetAllocatableResources(ctx context.Context,
|
||||
in *podresourcesv1.AllocatableResourcesRequest,
|
||||
opts ...grpc.CallOption) (*podresourcesv1.AllocatableResourcesResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func newMockResourceManager(pods []v1.Pod) ResourceManager {
|
||||
client, err := grpc.Dial("", grpc.WithInsecure())
|
||||
if err != nil {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
mc := &mockClient{}
|
||||
mc.mockCoreV1.mockPods.pods = pods
|
||||
rm := resourceManager{
|
||||
clientset: mc,
|
||||
nodeName: "TestNode",
|
||||
prGetClientFunc: func(string, time.Duration, int) (podresourcesv1.PodResourcesListerClient, *grpc.ClientConn, error) {
|
||||
return &mockPodResources{pods: pods}, client, nil
|
||||
},
|
||||
skipID: "all",
|
||||
fullResourceName: "gpu.intel.com/i915",
|
||||
}
|
||||
|
||||
deviceInfoMap := NewDeviceInfoMap()
|
||||
deviceInfoMap["card0-0"] = NewDeviceInfo([]v1beta1.DeviceSpec{
|
||||
{
|
||||
ContainerPath: "containerpath",
|
||||
HostPath: "hostpath",
|
||||
Permissions: "rw",
|
||||
},
|
||||
},
|
||||
[]v1beta1.Mount{{}},
|
||||
map[string]string{"more": "coverage"})
|
||||
deviceInfoMap["card1-0"] = NewDeviceInfo([]v1beta1.DeviceSpec{{}}, nil, nil)
|
||||
rm.SetDevInfos(deviceInfoMap)
|
||||
|
||||
return &rm
|
||||
}
|
||||
|
||||
func TestNewResourceManager(t *testing.T) {
|
||||
// normal clientset is unavailable inside the unit tests
|
||||
_, err := NewResourceManager("foo", "bar")
|
||||
|
||||
if err == nil {
|
||||
t.Errorf("unexpected success")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReallocateWithFractionalResources(t *testing.T) {
|
||||
properTestPod := v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{gasCardAnnotation: "card0"},
|
||||
Name: "TestPod",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: v1.ResourceList{
|
||||
"gpu.intel.com/i915": resource.MustParse("1"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
unAnnotatedTestPod := *properTestPod.DeepCopy()
|
||||
unAnnotatedTestPod.ObjectMeta.Annotations = nil
|
||||
properTestPod2 := *properTestPod.DeepCopy()
|
||||
properTestPod2.ObjectMeta.Name = "TestPod2"
|
||||
|
||||
timeStampedProperTestPod := *properTestPod.DeepCopy()
|
||||
timeStampedProperTestPod.ObjectMeta.Annotations[gasTSAnnotation] = "2"
|
||||
|
||||
timeStampedProperTestPod2 := *properTestPod2.DeepCopy()
|
||||
timeStampedProperTestPod2.ObjectMeta.Annotations[gasTSAnnotation] = "1"
|
||||
|
||||
properContainerRequests := []*v1beta1.ContainerAllocateRequest{{DevicesIDs: []string{"card0-0"}}}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pods []v1.Pod
|
||||
expectErr bool
|
||||
containerRequests []*v1beta1.ContainerAllocateRequest
|
||||
}{
|
||||
{
|
||||
name: "Wrong number of container requests should fail",
|
||||
pods: []v1.Pod{properTestPod},
|
||||
containerRequests: []*v1beta1.ContainerAllocateRequest{},
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
name: "Request for monitor resource should fail",
|
||||
pods: []v1.Pod{properTestPod},
|
||||
containerRequests: []*v1beta1.ContainerAllocateRequest{{DevicesIDs: []string{"all"}}},
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
name: "Zero pending pods should fail",
|
||||
pods: []v1.Pod{},
|
||||
containerRequests: properContainerRequests,
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
name: "Single pending pod without annotations should fail",
|
||||
pods: []v1.Pod{unAnnotatedTestPod},
|
||||
containerRequests: properContainerRequests,
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
name: "Single pending pod should succeed",
|
||||
pods: []v1.Pod{properTestPod},
|
||||
containerRequests: properContainerRequests,
|
||||
expectErr: false,
|
||||
},
|
||||
{
|
||||
name: "Two pending pods without timestamps should fail",
|
||||
pods: []v1.Pod{properTestPod, properTestPod2},
|
||||
containerRequests: properContainerRequests,
|
||||
expectErr: true,
|
||||
},
|
||||
{
|
||||
name: "Two pending pods with timestamps should reduce to one candidate and succeed",
|
||||
pods: []v1.Pod{timeStampedProperTestPod, timeStampedProperTestPod2},
|
||||
containerRequests: properContainerRequests,
|
||||
expectErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tCase := range testCases {
|
||||
rm := newMockResourceManager(tCase.pods)
|
||||
resp, err := rm.ReallocateWithFractionalResources(&v1beta1.AllocateRequest{
|
||||
ContainerRequests: tCase.containerRequests,
|
||||
})
|
||||
|
||||
if (err != nil) && !tCase.expectErr {
|
||||
t.Errorf("test %v unexpected failure, err:%v", tCase.name, err)
|
||||
}
|
||||
if err == nil {
|
||||
if tCase.expectErr {
|
||||
t.Errorf("test %v unexpected success", tCase.name)
|
||||
} else {
|
||||
// check response
|
||||
expectTruef(len(resp.ContainerResponses) == 1, t, tCase.name, "wrong number of container responses, expected 1")
|
||||
expectTruef(len(resp.ContainerResponses[0].Envs) == 1, t, tCase.name, "wrong number of env variables in container response, expected 1")
|
||||
expectTruef(resp.ContainerResponses[0].Envs["more"] == "coverage", t, tCase.name, "env not set for container response")
|
||||
expectTruef(len(resp.ContainerResponses[0].Devices) == 1, t, tCase.name, "wrong number of devices, expected 1")
|
||||
expectTruef(resp.ContainerResponses[0].Devices[0].HostPath == "hostpath", t, tCase.name, "HostPath not set for device")
|
||||
expectTruef(resp.ContainerResponses[0].Devices[0].ContainerPath == "containerpath", t, tCase.name, "ContainerPath not set for device")
|
||||
expectTruef(resp.ContainerResponses[0].Devices[0].Permissions == "rw", t, tCase.name, "permissions not set for device")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func expectTruef(predicate bool, t *testing.T, testName, format string, args ...interface{}) {
|
||||
if !predicate {
|
||||
t.Helper()
|
||||
t.Errorf(fmt.Sprintf("in test %q ", testName)+format, args...)
|
||||
}
|
||||
}
|
@ -0,0 +1,13 @@
|
||||
apiVersion: apps/v1
|
||||
kind: DaemonSet
|
||||
metadata:
|
||||
name: intel-gpu-plugin
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: intel-gpu-plugin
|
||||
args:
|
||||
- "-shared-dev-num=300"
|
||||
- "-enable-monitoring"
|
||||
- "-resource-manager"
|
@ -0,0 +1,17 @@
|
||||
apiVersion: apps/v1
|
||||
kind: DaemonSet
|
||||
metadata:
|
||||
name: intel-gpu-plugin
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: intel-gpu-plugin
|
||||
volumeMounts:
|
||||
- name: podresources
|
||||
mountPath: /var/lib/kubelet/pod-resources
|
||||
volumes:
|
||||
- name: podresources
|
||||
hostPath:
|
||||
path: /var/lib/kubelet/pod-resources
|
||||
|
@ -0,0 +1,8 @@
|
||||
apiVersion: apps/v1
|
||||
kind: DaemonSet
|
||||
metadata:
|
||||
name: intel-gpu-plugin
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
serviceAccountName: resource-reader-sa
|
@ -0,0 +1,10 @@
|
||||
bases:
|
||||
- ../../base
|
||||
resources:
|
||||
- resource-cluster-role-binding.yaml
|
||||
- resource-cluster-role.yaml
|
||||
- resource-reader-sa.yaml
|
||||
patches:
|
||||
- add-serviceaccount.yaml
|
||||
- add-podresource-mount.yaml
|
||||
- add-args.yaml
|
@ -0,0 +1,12 @@
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
metadata:
|
||||
name: resource-reader-rb
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: resource-reader-sa
|
||||
namespace: default
|
||||
roleRef:
|
||||
kind: ClusterRole
|
||||
name: resource-reader
|
||||
apiGroup: rbac.authorization.k8s.io
|
@ -0,0 +1,8 @@
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
metadata:
|
||||
name: resource-reader
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["pods"]
|
||||
verbs: ["list"]
|
@ -0,0 +1,4 @@
|
||||
apiVersion: v1
|
||||
kind: ServiceAccount
|
||||
metadata:
|
||||
name: resource-reader-sa
|
Loading…
Reference in New Issue
Block a user