intel-device-plugins-for-ku.../cmd/fpga_admissionwebhook/patcher.go
Graham Whaley 71d08224ee fpga: move to using klog for logs and debug
Move all the fpga components to using klog for logging
and debug. This includes replacing our homebrew 'fatal()'
with klog.Error().

Modify the deployment files to move from `-debug` to
`-v`, and set their default level to '1' (Info), rather
than full debug mode ('4').

Signed-off-by: Graham Whaley <graham.whaley@intel.com>
2020-03-24 14:31:53 +00:00

345 lines
9.3 KiB
Go

// Copyright 2018 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"bytes"
"fmt"
"regexp"
"strings"
"sync"
"text/template"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog"
fpgav1 "github.com/intel/intel-device-plugins-for-kubernetes/pkg/apis/fpga.intel.com/v1"
)
const (
namespace = "fpga.intel.com"
resourceReplaceOp = `{
"op": "remove",
"path": "/spec/containers/%d/resources/%s/%s"
}, {
"op": "add",
"path": "/spec/containers/%d/resources/%s/%s",
"value": %s
}`
resourceRemoveOp = `{
"op": "remove",
"path": "/spec/containers/%d/resources/%s/%s"
}`
resourceAddOp = `{
"op": "add",
"path": "/spec/containers/%d/resources/%s/%s",
"value": "%d"
}`
envAddOpTpl = `{
"op": "add",
"path": "/spec/containers/{{- .ContainerIdx -}}/env",
"value": [
{{- $first := true -}}
{{- range $key, $value := .EnvVars -}}
{{- if not $first -}},{{- end -}}{
"name": "{{$key}}",
"value": "{{$value}}"
}
{{- $first = false -}}
{{- end -}}
]
}`
)
var (
rfc6901Escaper = strings.NewReplacer("~", "~0", "/", "~1")
resourceRe = regexp.MustCompile(namespace + `/(?P<Region>[[:alnum:].]+)(-(?P<Af>[[:alnum:]]+))?`)
)
type patcher struct {
sync.Mutex
mode string
regionMap map[string]string
afMap map[string]string
resourceMap map[string]string
}
func newPatcher(mode string) (*patcher, error) {
if mode != preprogrammed && mode != orchestrated {
return nil, errors.Errorf("Unknown mode: %s", mode)
}
return &patcher{
mode: mode,
regionMap: make(map[string]string),
afMap: make(map[string]string),
resourceMap: make(map[string]string),
}, nil
}
func (p *patcher) addAf(af *fpgav1.AcceleratorFunction) {
defer p.Unlock()
p.Lock()
p.afMap[af.Name] = af.Spec.AfuID
p.resourceMap[namespace+"/"+af.Name] = rfc6901Escaper.Replace(namespace + "/af-" + af.Spec.AfuID)
}
func (p *patcher) addRegion(region *fpgav1.FpgaRegion) {
defer p.Unlock()
p.Lock()
p.regionMap[region.Name] = region.Spec.InterfaceID
p.resourceMap[namespace+"/"+region.Name] = rfc6901Escaper.Replace(namespace + "/region-" + region.Spec.InterfaceID)
}
func (p *patcher) removeAf(name string) {
defer p.Unlock()
p.Lock()
delete(p.afMap, name)
delete(p.resourceMap, namespace+"/"+name)
}
func (p *patcher) removeRegion(name string) {
defer p.Unlock()
p.Lock()
delete(p.regionMap, name)
delete(p.resourceMap, namespace+"/"+name)
}
func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]string, error) {
switch p.mode {
case preprogrammed:
return p.getPatchOpsPreprogrammed(containerIdx, container)
case orchestrated:
return p.getPatchOpsOrchestrated(containerIdx, container)
}
return nil, errors.Errorf("Uknown mode: %s", p.mode)
}
func (p *patcher) getPatchOpsPreprogrammed(containerIdx int, container corev1.Container) ([]string, error) {
var ops []string
for resourceName, resourceQuantity := range container.Resources.Limits {
newName, err := p.translateFpgaResourceName(resourceName)
if err != nil {
return nil, err
}
if len(newName) > 0 {
op := fmt.Sprintf(resourceReplaceOp, containerIdx,
"limits", rfc6901Escaper.Replace(string(resourceName)),
containerIdx, "limits", newName, resourceQuantity.String())
ops = append(ops, op)
}
}
for resourceName, resourceQuantity := range container.Resources.Requests {
newName, err := p.translateFpgaResourceName(resourceName)
if err != nil {
return nil, err
}
if len(newName) > 0 {
op := fmt.Sprintf(resourceReplaceOp, containerIdx,
"requests", rfc6901Escaper.Replace(string(resourceName)),
containerIdx, "requests", newName, resourceQuantity.String())
ops = append(ops, op)
}
}
return ops, nil
}
func (p *patcher) translateFpgaResourceName(oldname corev1.ResourceName) (string, error) {
rname := strings.ToLower(string(oldname))
if !strings.HasPrefix(rname, namespace) {
return "", nil
}
defer p.Unlock()
p.Lock()
if newname, ok := p.resourceMap[rname]; ok {
return newname, nil
}
return "", errors.Errorf("Unknown FPGA resource: %s", rname)
}
func (p *patcher) checkResourceRequests(container corev1.Container) error {
for resourceName, resourceQuantity := range container.Resources.Requests {
interfaceID, _, err := p.parseResourceName(string(resourceName))
if err != nil {
return err
}
if interfaceID == "" {
// Skip non-FPGA resources
continue
}
if container.Resources.Limits[resourceName] != resourceQuantity {
return errors.Errorf("'limits' and 'requests' for %s must be equal", string(resourceName))
}
}
return nil
}
func (p *patcher) getPatchOpsOrchestrated(containerIdx int, container corev1.Container) ([]string, error) {
var ops []string
for _, v := range container.Env {
if strings.HasPrefix(v.Name, "FPGA_REGION") || strings.HasPrefix(v.Name, "FPGA_AFU") {
return nil, errors.Errorf("The environment variable '%s' is not allowed", v.Name)
}
}
if err := p.checkResourceRequests(container); err != nil {
return nil, err
}
regions := make(map[string]int64)
envVars := make(map[string]string)
counter := 0
for resourceName, resourceQuantity := range container.Resources.Limits {
interfaceID, afuID, err := p.parseResourceName(string(resourceName))
if err != nil {
return nil, err
}
if interfaceID == "" && afuID == "" {
// Skip non-FPGA resources
continue
}
if container.Resources.Requests[resourceName] != resourceQuantity {
return nil, errors.Errorf("'limits' and 'requests' for %s must be equal", string(resourceName))
}
quantity, ok := resourceQuantity.AsInt64()
if !ok {
return nil, errors.New("Resource quantity isn't of integral type")
}
regions[interfaceID] = regions[interfaceID] + quantity
for i := int64(0); i < quantity; i++ {
counter++
envVars[fmt.Sprintf("FPGA_REGION_%d", counter)] = interfaceID
envVars[fmt.Sprintf("FPGA_AFU_%d", counter)] = afuID
}
ops = append(ops, fmt.Sprintf(resourceRemoveOp, containerIdx, "limits", rfc6901Escaper.Replace(string(resourceName))))
ops = append(ops, fmt.Sprintf(resourceRemoveOp, containerIdx, "requests", rfc6901Escaper.Replace(string(resourceName))))
}
for interfaceID, quantity := range regions {
op := fmt.Sprintf(resourceAddOp, containerIdx, "limits", rfc6901Escaper.Replace(namespace+"/region-"+interfaceID), quantity)
ops = append(ops, op)
op = fmt.Sprintf(resourceAddOp, containerIdx, "requests", rfc6901Escaper.Replace(namespace+"/region-"+interfaceID), quantity)
ops = append(ops, op)
}
if len(envVars) > 0 {
for _, envvar := range container.Env {
envVars[envvar.Name] = envvar.Value
}
data := struct {
ContainerIdx int
EnvVars map[string]string
}{
ContainerIdx: containerIdx,
EnvVars: envVars,
}
t := template.Must(template.New("add_operation").Parse(envAddOpTpl))
buf := new(bytes.Buffer)
t.Execute(buf, data)
ops = append(ops, buf.String())
}
return ops, nil
}
func (p *patcher) parseResourceName(input string) (string, string, error) {
var interfaceID, afuID string
var regionName, afName string
var ok bool
result := resourceRe.FindStringSubmatch(input)
if result == nil {
return "", "", nil
}
defer p.Unlock()
p.Lock()
for num, group := range resourceRe.SubexpNames() {
switch group {
case "Region":
regionName = result[num]
if interfaceID, ok = p.regionMap[result[num]]; !ok {
return "", "", errors.Errorf("Unknown region name: %s", result[num])
}
case "Af":
afName = result[num]
}
}
if afName != "" {
if afuID, ok = p.afMap[regionName+"-"+afName]; !ok {
return "", "", errors.Errorf("Unknown AF name: %s", regionName+"-"+afName)
}
}
return interfaceID, afuID, nil
}
// patcherManager keeps track of patchers registered for different Kubernetes namespaces.
type patcherManager struct {
defaultMode string
patchers map[string]*patcher
}
func newPatcherManager(defaultMode string) (*patcherManager, error) {
if defaultMode != preprogrammed && defaultMode != orchestrated {
return nil, errors.Errorf("Unknown mode: %s", defaultMode)
}
return &patcherManager{
defaultMode: defaultMode,
patchers: make(map[string]*patcher),
}, nil
}
func (pm *patcherManager) getPatcher(namespace string) (*patcher, error) {
if p, ok := pm.patchers[namespace]; ok {
return p, nil
}
p, err := newPatcher(pm.defaultMode)
if err != nil {
return nil, err
}
pm.patchers[namespace] = p
klog.V(4).Info("created new patcher for namespace", namespace)
return p, nil
}