mirror of
https://github.com/intel/intel-device-plugins-for-kubernetes.git
synced 2025-06-03 03:59:37 +00:00
gpu: try to fetch PodList from kubelet API
In large clusters and with resource management, the load from gpu-plugins can become heavy for the api-server. This change will start fetching pod listings from kubelet and use api-server as a backup. Any other error than timeout will also move the logic back to using api-server. Signed-off-by: Tuomas Katila <tuomas.katila@intel.com> Conflicts: deployments/gpu_plugin/base/intel-gpu-plugin.yaml
This commit is contained in:
parent
fa5b9da93d
commit
8393377e68
@ -17,7 +17,13 @@ package rm
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"math/big"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
@ -37,6 +43,7 @@ import (
|
||||
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
|
||||
podresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
|
||||
"k8s.io/utils/strings/slices"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -49,6 +56,13 @@ const (
|
||||
grpcAddress = "unix:///var/lib/kubelet/pod-resources/kubelet.sock"
|
||||
grpcBufferSize = 4 * 1024 * 1024
|
||||
grpcTimeout = 5 * time.Second
|
||||
|
||||
kubeletAPITimeout = 5 * time.Second
|
||||
kubeletAPIMaxRetries = 5
|
||||
kubeletHTTPSCertPath = "/var/lib/kubelet/pki/kubelet.crt"
|
||||
// This is detected incorrectly as credentials
|
||||
//nolint:gosec
|
||||
serviceAccountTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token"
|
||||
)
|
||||
|
||||
// Errors.
|
||||
@ -102,12 +116,14 @@ type resourceManager struct {
|
||||
prGetClientFunc getClientFunc
|
||||
assignments map[string]podAssignmentDetails // pod name -> assignment details
|
||||
nodeName string
|
||||
hostIP string
|
||||
skipID string
|
||||
fullResourceName string
|
||||
retryTimeout time.Duration
|
||||
cleanupInterval time.Duration
|
||||
mutex sync.RWMutex // for devTree updates during scan
|
||||
cleanupMutex sync.RWMutex // for assignment details during cleanup
|
||||
useKubelet bool
|
||||
}
|
||||
|
||||
// NewDeviceInfo creates a new DeviceInfo.
|
||||
@ -137,6 +153,7 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error
|
||||
|
||||
rm := resourceManager{
|
||||
nodeName: os.Getenv("NODE_NAME"),
|
||||
hostIP: os.Getenv("HOST_IP"),
|
||||
clientset: clientset,
|
||||
skipID: skipID,
|
||||
fullResourceName: fullResourceName,
|
||||
@ -144,10 +161,22 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error
|
||||
assignments: make(map[string]podAssignmentDetails),
|
||||
retryTimeout: 1 * time.Second,
|
||||
cleanupInterval: 20 * time.Minute,
|
||||
useKubelet: true,
|
||||
}
|
||||
|
||||
klog.Info("GPU device plugin resource manager enabled")
|
||||
|
||||
// Try listing Pods once to detect if Kubelet API works
|
||||
_, err = rm.listPodsFromKubelet()
|
||||
|
||||
if err != nil {
|
||||
klog.V(2).Info("Not using Kubelet API")
|
||||
|
||||
rm.useKubelet = false
|
||||
} else {
|
||||
klog.V(2).Info("Using Kubelet API")
|
||||
}
|
||||
|
||||
go func() {
|
||||
getRandDuration := func() time.Duration {
|
||||
cleanupIntervalSeconds := int(rm.cleanupInterval.Seconds())
|
||||
@ -167,10 +196,7 @@ func NewResourceManager(skipID, fullResourceName string) (ResourceManager, error
|
||||
// Gather both running and pending pods. It might happen that
|
||||
// cleanup is triggered between GetPreferredAllocation and Allocate
|
||||
// and it would remove the assignment data for the soon-to-be allocated pod
|
||||
running := rm.listPodsOnNodeWithState(string(v1.PodRunning))
|
||||
for podName, podItem := range rm.listPodsOnNodeWithState(string(v1.PodPending)) {
|
||||
running[podName] = podItem
|
||||
}
|
||||
running := rm.listPodsOnNodeWithStates([]string{string(v1.PodRunning), string(v1.PodPending)})
|
||||
|
||||
func() {
|
||||
rm.cleanupMutex.Lock()
|
||||
@ -201,20 +227,129 @@ func getPodResourceKey(res *podresourcesv1.PodResources) string {
|
||||
return res.Namespace + "&" + res.Name
|
||||
}
|
||||
|
||||
func (rm *resourceManager) listPodsOnNodeWithState(state string) map[string]*v1.Pod {
|
||||
pods := make(map[string]*v1.Pod)
|
||||
|
||||
selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName +
|
||||
",status.phase=" + state)
|
||||
func (rm *resourceManager) listPodsFromAPIServer() (*v1.PodList, error) {
|
||||
selector, err := fields.ParseSelector("spec.nodeName=" + rm.nodeName)
|
||||
|
||||
if err != nil {
|
||||
return pods
|
||||
return &v1.PodList{}, err
|
||||
}
|
||||
|
||||
klog.V(4).Info("Requesting pods from API server")
|
||||
|
||||
podList, err := rm.clientset.CoreV1().Pods(v1.NamespaceAll).List(context.Background(), metav1.ListOptions{
|
||||
FieldSelector: selector.String(),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
klog.Error("pod listing failed:", err)
|
||||
|
||||
if err != nil {
|
||||
return &v1.PodList{}, err
|
||||
}
|
||||
}
|
||||
|
||||
return podList, nil
|
||||
}
|
||||
|
||||
// +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=list;get
|
||||
|
||||
func (rm *resourceManager) listPodsFromKubelet() (*v1.PodList, error) {
|
||||
var podList v1.PodList
|
||||
|
||||
token, err := os.ReadFile(serviceAccountTokenPath)
|
||||
if err != nil {
|
||||
klog.Warning("Failed to read token for kubelet API access: ", err)
|
||||
|
||||
return &podList, err
|
||||
}
|
||||
|
||||
kubeletCert, err := os.ReadFile(kubeletHTTPSCertPath)
|
||||
if err != nil {
|
||||
klog.Warning("Failed to read kubelet cert: ", err)
|
||||
|
||||
return &podList, err
|
||||
}
|
||||
|
||||
certPool := x509.NewCertPool()
|
||||
certPool.AppendCertsFromPEM(kubeletCert)
|
||||
|
||||
// There isn't an official documentation for the kubelet API. There is a blog post:
|
||||
// https://www.deepnetwork.com/blog/2020/01/13/kubelet-api.html
|
||||
// And a tool to work with the API:
|
||||
// https://github.com/cyberark/kubeletctl
|
||||
|
||||
kubeletURL := "https://" + rm.hostIP + ":10250/pods"
|
||||
req, _ := http.NewRequestWithContext(context.Background(), "GET", kubeletURL, nil)
|
||||
req.Header.Set("Authorization", "Bearer "+string(token))
|
||||
|
||||
tr := &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
MinVersion: tls.VersionTLS12,
|
||||
RootCAs: certPool,
|
||||
ServerName: rm.nodeName,
|
||||
},
|
||||
}
|
||||
client := &http.Client{
|
||||
Timeout: kubeletAPITimeout,
|
||||
Transport: tr,
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Requesting pods from kubelet (%s)", kubeletURL)
|
||||
|
||||
resp, err := (*client).Do(req)
|
||||
if err != nil {
|
||||
klog.Warning("Failed to read pods from kubelet API: ", err)
|
||||
|
||||
return &podList, err
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
klog.Warning("Failed to read http response body: ", err)
|
||||
|
||||
return &podList, err
|
||||
}
|
||||
|
||||
resp.Body.Close()
|
||||
|
||||
err = json.Unmarshal(body, &podList)
|
||||
if err != nil {
|
||||
klog.Warning("Failed to unmarshal PodList from response: ", err)
|
||||
|
||||
return &podList, err
|
||||
}
|
||||
|
||||
return &podList, nil
|
||||
}
|
||||
|
||||
func (rm *resourceManager) listPods() (*v1.PodList, error) {
|
||||
// Try to use kubelet API as long as it provides listings within retries
|
||||
if rm.useKubelet {
|
||||
var neterr net.Error
|
||||
|
||||
for i := 0; i < kubeletAPIMaxRetries; i++ {
|
||||
if podList, err := rm.listPodsFromKubelet(); err == nil {
|
||||
return podList, nil
|
||||
} else if errors.As(err, neterr); neterr.Timeout() {
|
||||
continue
|
||||
}
|
||||
|
||||
// If error is non-timeout, break to stop using kubelet API
|
||||
break
|
||||
}
|
||||
|
||||
klog.Warning("Stopping Kubelet API use due to error/timeout")
|
||||
|
||||
rm.useKubelet = false
|
||||
}
|
||||
|
||||
return rm.listPodsFromAPIServer()
|
||||
}
|
||||
|
||||
func (rm *resourceManager) listPodsOnNodeWithStates(states []string) map[string]*v1.Pod {
|
||||
pods := make(map[string]*v1.Pod)
|
||||
|
||||
podList, err := rm.listPods()
|
||||
if err != nil {
|
||||
klog.Error("pod listing failed:", err)
|
||||
|
||||
@ -222,8 +357,11 @@ func (rm *resourceManager) listPodsOnNodeWithState(state string) map[string]*v1.
|
||||
}
|
||||
|
||||
for i := range podList.Items {
|
||||
key := getPodKey(&podList.Items[i])
|
||||
pods[key] = &podList.Items[i]
|
||||
phase := string(podList.Items[i].Status.Phase)
|
||||
if slices.Contains(states, phase) {
|
||||
key := getPodKey(&podList.Items[i])
|
||||
pods[key] = &podList.Items[i]
|
||||
}
|
||||
}
|
||||
|
||||
return pods
|
||||
@ -528,7 +666,7 @@ func (rm *resourceManager) findAllocationPodCandidate() (*podCandidate, error) {
|
||||
|
||||
// getNodePendingGPUPods returns a map of pod names -> pods that are pending and use the gpu.
|
||||
func (rm *resourceManager) getNodePendingGPUPods() (map[string]*v1.Pod, error) {
|
||||
pendingPods := rm.listPodsOnNodeWithState(string(v1.PodPending))
|
||||
pendingPods := rm.listPodsOnNodeWithStates([]string{string(v1.PodPending)})
|
||||
|
||||
for podName, pod := range pendingPods {
|
||||
if numGPUUsingContainers(pod, rm.fullResourceName) == 0 {
|
||||
|
@ -105,6 +105,7 @@ func newMockResourceManager(pods []v1.Pod) ResourceManager {
|
||||
fullResourceName: "gpu.intel.com/i915",
|
||||
assignments: make(map[string]podAssignmentDetails),
|
||||
retryTimeout: 1 * time.Millisecond,
|
||||
useKubelet: false,
|
||||
}
|
||||
|
||||
deviceInfoMap := NewDeviceInfoMap()
|
||||
@ -168,6 +169,9 @@ func TestGetPreferredFractionalAllocation(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
}
|
||||
|
||||
gpuLessTestPod := v1.Pod{
|
||||
@ -326,6 +330,9 @@ func TestCreateFractionalResourceResponse(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
}
|
||||
unAnnotatedTestPod := *properTestPod.DeepCopy()
|
||||
unAnnotatedTestPod.ObjectMeta.Annotations = nil
|
||||
@ -458,6 +465,9 @@ func TestCreateFractionalResourceResponseWithOneCardTwoTiles(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
}
|
||||
|
||||
properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
|
||||
@ -521,6 +531,9 @@ func TestCreateFractionalResourceResponseWithTwoCardsOneTile(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
}
|
||||
|
||||
properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
|
||||
@ -589,6 +602,9 @@ func TestCreateFractionalResourceResponseWithThreeCardsTwoTiles(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
}
|
||||
|
||||
properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
|
||||
@ -664,6 +680,9 @@ func TestCreateFractionalResourceResponseWithMultipleContainersTileEach(t *testi
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
}
|
||||
|
||||
properPrefContainerRequests := []*v1beta1.ContainerPreferredAllocationRequest{
|
||||
|
@ -32,6 +32,10 @@ spec:
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
- name: HOST_IP
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: status.hostIP
|
||||
image: intel/intel-gpu-plugin:0.26.0
|
||||
imagePullPolicy: IfNotPresent
|
||||
securityContext:
|
||||
|
@ -0,0 +1,17 @@
|
||||
apiVersion: apps/v1
|
||||
kind: DaemonSet
|
||||
metadata:
|
||||
name: intel-gpu-plugin
|
||||
spec:
|
||||
template:
|
||||
spec:
|
||||
containers:
|
||||
- name: intel-gpu-plugin
|
||||
volumeMounts:
|
||||
- name: kubeletcrt
|
||||
mountPath: /var/lib/kubelet/pki/kubelet.crt
|
||||
volumes:
|
||||
- name: kubeletcrt
|
||||
hostPath:
|
||||
path: /var/lib/kubelet/pki/kubelet.crt
|
||||
type: FileOrCreate
|
@ -4,5 +4,5 @@ metadata:
|
||||
name: gpu-manager-role
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["pods"]
|
||||
verbs: ["list"]
|
||||
resources: ["pods", "nodes/proxy"]
|
||||
verbs: ["list", "get"]
|
||||
|
@ -9,3 +9,4 @@ patches:
|
||||
- path: add-podresource-mount.yaml
|
||||
- path: add-args.yaml
|
||||
- path: add-nodeselector-intel-gpu.yaml
|
||||
- path: add-kubelet-crt-mount.yaml
|
||||
|
@ -5,6 +5,13 @@ metadata:
|
||||
creationTimestamp: null
|
||||
name: gpu-manager-role
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- nodes/proxy
|
||||
verbs:
|
||||
- get
|
||||
- list
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
|
@ -5,6 +5,13 @@ metadata:
|
||||
creationTimestamp: null
|
||||
name: manager-role
|
||||
rules:
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- nodes/proxy
|
||||
verbs:
|
||||
- get
|
||||
- list
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
|
@ -152,6 +152,8 @@ func (c *controller) NewDaemonSet(rawObj client.Object) *apps.DaemonSet {
|
||||
daemonSet.Spec.Template.Spec.ServiceAccountName = serviceAccountName
|
||||
addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources", v1.HostPathDirectory)
|
||||
addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources")
|
||||
addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt", v1.HostPathFileOrCreate)
|
||||
addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt")
|
||||
}
|
||||
|
||||
return daemonSet
|
||||
|
@ -74,6 +74,14 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "HOST_IP",
|
||||
ValueFrom: &v1.EnvVarSource{
|
||||
FieldRef: &v1.ObjectFieldSelector{
|
||||
FieldPath: "status.hostIP",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Args: getPodArgs(devicePlugin),
|
||||
Image: devicePlugin.Spec.Image,
|
||||
@ -145,6 +153,8 @@ func (c *controller) newDaemonSetExpected(rawObj client.Object) *apps.DaemonSet
|
||||
daemonSet.Spec.Template.Spec.ServiceAccountName = serviceAccountName
|
||||
addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources", v1.HostPathDirectory)
|
||||
addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "podresources", "/var/lib/kubelet/pod-resources")
|
||||
addVolumeIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt", v1.HostPathFileOrCreate)
|
||||
addVolumeMountIfMissing(&daemonSet.Spec.Template.Spec, "kubeletcrt", "/var/lib/kubelet/pki/kubelet.crt")
|
||||
}
|
||||
|
||||
return &daemonSet
|
||||
|
@ -73,6 +73,7 @@ func GetDevicePluginCount(pluginKind string) int {
|
||||
// +kubebuilder:rbac:groups="",resources=serviceaccounts,verbs=get;list;watch;create;delete
|
||||
// +kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=clusterrolebindings,verbs=get;list;watch;create;delete
|
||||
// +kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch
|
||||
// +kubebuilder:rbac:groups="",resources=nodes/proxy,verbs=get;list
|
||||
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,verbs=create
|
||||
// +kubebuilder:rbac:groups=security.openshift.io,resources=securitycontextconstraints,resourceNames=privileged,verbs=use
|
||||
// +kubebuilder:rbac:groups=coordination.k8s.io,resources=leases,resourceNames=d1c7b6d5.intel.com,verbs=get;update
|
||||
|
Loading…
Reference in New Issue
Block a user