intel-device-plugins-for-ku.../pkg/fpgacontroller/patcher/patcher.go
2020-11-24 18:35:56 +02:00

236 lines
7.1 KiB
Go

// Copyright 2018 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package patcher provides functionality required to patch pods by the FPGA admission webhook.
package patcher
import (
"bytes"
"fmt"
"strings"
"sync"
"text/template"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
fpgav2 "github.com/intel/intel-device-plugins-for-kubernetes/pkg/apis/fpga/v2"
"github.com/intel/intel-device-plugins-for-kubernetes/pkg/fpga"
"github.com/intel/intel-device-plugins-for-kubernetes/pkg/internal/containers"
)
const (
namespace = "fpga.intel.com"
af = "af"
region = "region"
// "regiondevel" corresponds to the FPGA plugin's regiondevel mode. It requires
// FpgaRegion CRDs to be added to the cluster.
regiondevel = "regiondevel"
resourceRemoveOp = `{
"op": "remove",
"path": "/spec/containers/%d/resources/%s/%s"
}`
resourceAddOp = `{
"op": "add",
"path": "/spec/containers/%d/resources/%s/%s",
"value": "%d"
}`
envAddOpTpl = `{
"op": "add",
"path": "/spec/containers/{{- .ContainerIdx -}}/env",
"value": [
{{- $first := true -}}
{{- range $key, $value := .EnvVars -}}
{{- if not $first -}},{{- end -}}{
"name": "{{$key}}",
"value": "{{$value}}"
}
{{- $first = false -}}
{{- end -}}
]
}`
)
var (
rfc6901Escaper = strings.NewReplacer("~", "~0", "/", "~1")
)
type patcher struct {
sync.Mutex
log logr.Logger
afMap map[string]*fpgav2.AcceleratorFunction
resourceMap map[string]string
resourceModeMap map[string]string
}
func newPatcher(log logr.Logger) *patcher {
return &patcher{
log: log,
afMap: make(map[string]*fpgav2.AcceleratorFunction),
resourceMap: make(map[string]string),
resourceModeMap: make(map[string]string),
}
}
func (p *patcher) AddAf(accfunc *fpgav2.AcceleratorFunction) error {
defer p.Unlock()
p.Lock()
p.afMap[namespace+"/"+accfunc.Name] = accfunc
if accfunc.Spec.Mode == af {
devtype, err := fpga.GetAfuDevType(accfunc.Spec.InterfaceID, accfunc.Spec.AfuID)
if err != nil {
return err
}
p.resourceMap[namespace+"/"+accfunc.Name] = rfc6901Escaper.Replace(namespace + "/" + devtype)
} else {
p.resourceMap[namespace+"/"+accfunc.Name] = rfc6901Escaper.Replace(namespace + "/region-" + accfunc.Spec.InterfaceID)
}
p.resourceModeMap[namespace+"/"+accfunc.Name] = accfunc.Spec.Mode
return nil
}
func (p *patcher) AddRegion(region *fpgav2.FpgaRegion) {
defer p.Unlock()
p.Lock()
p.resourceModeMap[namespace+"/"+region.Name] = regiondevel
p.resourceMap[namespace+"/"+region.Name] = rfc6901Escaper.Replace(namespace + "/region-" + region.Spec.InterfaceID)
}
func (p *patcher) RemoveAf(name string) {
defer p.Unlock()
p.Lock()
delete(p.afMap, namespace+"/"+name)
delete(p.resourceMap, namespace+"/"+name)
delete(p.resourceModeMap, namespace+"/"+name)
}
func (p *patcher) RemoveRegion(name string) {
defer p.Unlock()
p.Lock()
delete(p.resourceMap, namespace+"/"+name)
delete(p.resourceModeMap, namespace+"/"+name)
}
func validateContainer(container corev1.Container) error {
for _, v := range container.Env {
if strings.HasPrefix(v.Name, "FPGA_REGION") || strings.HasPrefix(v.Name, "FPGA_AFU") {
return errors.Errorf("environment variable '%s' is not allowed", v.Name)
}
}
return nil
}
func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]string, error) {
if err := validateContainer(container); err != nil {
return nil, err
}
requestedResources, err := containers.GetRequestedResources(container, namespace)
if err != nil {
return nil, err
}
defer p.Unlock()
p.Lock()
fpgaPluginMode := ""
resources := make(map[string]int64)
envVars := make(map[string]string)
counter := 0
ops := make([]string, 0, 2*len(requestedResources))
for rname, quantity := range requestedResources {
mode, found := p.resourceModeMap[rname]
if !found {
return nil, errors.Errorf("no such resource: %q", rname)
}
switch mode {
case regiondevel, af:
// Do nothing.
// The requested resources are exposed by FPGA plugins working in "regiondevel/af" mode.
// In "regiondevel" mode the workload is supposed to program FPGA regions.
// A cluster admin has to add FpgaRegion CRDs to allow this.
case region:
// Let fpga_crihook know how to program the regions by setting ENV variables.
// The requested resources are exposed by FPGA plugins working in "region" mode.
for i := int64(0); i < quantity; i++ {
counter++
envVars[fmt.Sprintf("FPGA_REGION_%d", counter)] = p.afMap[rname].Spec.InterfaceID
envVars[fmt.Sprintf("FPGA_AFU_%d", counter)] = p.afMap[rname].Spec.AfuID
}
default:
// Let admin know about broken af CRD.
err := errors.Errorf("%q is registered with unknown mode %q instead of %q or %q",
rname, p.resourceModeMap[rname], af, region)
p.log.Error(err, "unable to construct patching operations")
return nil, err
}
if fpgaPluginMode == "" {
fpgaPluginMode = mode
} else if fpgaPluginMode != mode {
return nil, errors.New("container cannot be scheduled as it requires resources operated in different modes")
}
mappedName := p.resourceMap[rname]
resources[mappedName] = resources[mappedName] + quantity
// Add operations to remove unresolved resources from the pod.
ops = append(ops, fmt.Sprintf(resourceRemoveOp, containerIdx, "limits", rfc6901Escaper.Replace(rname)))
ops = append(ops, fmt.Sprintf(resourceRemoveOp, containerIdx, "requests", rfc6901Escaper.Replace(rname)))
}
// Add operations to add resolved resources to the pod.
for resource, quantity := range resources {
op := fmt.Sprintf(resourceAddOp, containerIdx, "limits", resource, quantity)
ops = append(ops, op)
op = fmt.Sprintf(resourceAddOp, containerIdx, "requests", resource, quantity)
ops = append(ops, op)
}
// Add the ENV variables to the pod if needed.
if len(envVars) > 0 {
for _, envvar := range container.Env {
envVars[envvar.Name] = envvar.Value
}
data := struct {
ContainerIdx int
EnvVars map[string]string
}{
ContainerIdx: containerIdx,
EnvVars: envVars,
}
t := template.Must(template.New("add_operation").Parse(envAddOpTpl))
buf := new(bytes.Buffer)
if err := t.Execute(buf, data); err != nil {
return nil, errors.Wrap(err, "unable to execute template")
}
ops = append(ops, buf.String())
}
return ops, nil
}