Use annotated errors for tracing

This commit is contained in:
Dmitry Rozhkov 2018-08-16 17:31:19 +03:00
parent 52c63fade1
commit 2ff6c5929a
16 changed files with 164 additions and 146 deletions

View File

@ -19,8 +19,9 @@ import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
@ -55,8 +56,7 @@ type controller struct {
func newController(patcher *patcher, config *rest.Config) (*controller, error) {
clientset, err := clientset.NewForConfig(config)
if err != nil {
glog.Error("Failed to create REST clientset ", err)
return nil, err
return nil, errors.Wrap(err, "Failed to create REST clientset")
}
informerFactory := informers.NewSharedInformerFactory(clientset, resyncPeriod)
@ -94,11 +94,11 @@ func (c *controller) run(threadiness int) error {
go c.informerFactory.Start(c.stopCh)
if ok := cache.WaitForCacheSync(c.stopCh, c.afsSynced); !ok {
return fmt.Errorf("failed to wait for AF caches to sync")
return errors.New("failed to wait for AF caches to sync")
}
if ok := cache.WaitForCacheSync(c.stopCh, c.regionsSynced); !ok {
return fmt.Errorf("failed to wait for Region caches to sync")
return errors.New("failed to wait for Region caches to sync")
}
for i := 0; i < threadiness; i++ {
@ -107,7 +107,7 @@ func (c *controller) run(threadiness int) error {
}
}, time.Second, c.stopCh)
}
glog.Info("Started controller workers")
fmt.Println("Started controller workers")
<-c.stopCh
return nil
@ -133,21 +133,21 @@ func (c *controller) processNextWorkItem() bool {
if key, ok = obj.(fpgaObjectKey); !ok {
c.queue.Forget(obj)
return fmt.Errorf("expected fpgaObjectKey in workqueue but got %#v", obj)
return errors.Errorf("expected fpgaObjectKey in workqueue but got %#v", obj)
}
switch key.kind {
case "af":
if err := c.syncAfHandler(key.name); err != nil {
return fmt.Errorf("error syncing '%s': %v", key.name, err)
return errors.Wrapf(err, "error syncing '%s'", key.name)
}
case "region":
if err := c.syncRegionHandler(key.name); err != nil {
return fmt.Errorf("error syncing '%s': %v", key.name, err)
return errors.Wrapf(err, "error syncing '%s'", key.name)
}
default:
c.queue.Forget(obj)
return fmt.Errorf("Unknown kind of object key: %s", key.kind)
return errors.Errorf("Unknown kind of object key: %s", key.kind)
}
// Finally, if no error occurs we Forget this item so it does not
@ -169,7 +169,7 @@ func (c *controller) syncAfHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
runtime.HandleError(errors.Errorf("invalid resource key: %s", key))
return nil
}
@ -178,8 +178,8 @@ func (c *controller) syncAfHandler(key string) error {
if err != nil {
// The AcceleratorFunction resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("accelerated function '%s' in work queue no longer exists", key))
if k8serrors.IsNotFound(err) {
runtime.HandleError(errors.Errorf("accelerated function '%s' in work queue no longer exists", key))
glog.V(2).Infof("AF '%s' no longer exists", key)
c.patcher.removeAf(name)
return nil
@ -197,7 +197,7 @@ func (c *controller) syncRegionHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
runtime.HandleError(errors.Errorf("invalid resource key: %s", key))
return nil
}
@ -206,8 +206,8 @@ func (c *controller) syncRegionHandler(key string) error {
if err != nil {
// The FpgaRegion resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("FPGA region '%s' in work queue no longer exists", key))
if k8serrors.IsNotFound(err) {
runtime.HandleError(errors.Errorf("FPGA region '%s' in work queue no longer exists", key))
glog.V(2).Infof("Region '%s' no longer exists", key)
c.patcher.removeRegion(name)
return nil

View File

@ -98,18 +98,18 @@ func TestSyncAfHandler(t *testing.T) {
for _, tt := range tcases {
p, err := newPatcher(preprogrammed)
if err != nil {
t.Fatalf("Test case '%s': %v", tt.name, err)
t.Fatalf("Test case '%s': %+v", tt.name, err)
}
c, err := newController(p, &rest.Config{})
if err != nil {
t.Fatalf("Test case '%s': %v", tt.name, err)
t.Fatalf("Test case '%s': %+v", tt.name, err)
}
if tt.afLister != nil {
c.afLister = tt.afLister
}
err = c.syncAfHandler(tt.key)
if err != nil && !tt.expectedErr {
t.Errorf("Test case '%s': unexpected error: %v", tt.name, err)
t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err)
}
if err == nil && tt.expectedErr {
t.Errorf("Test case '%s': expected error, but got success", tt.name)
@ -188,18 +188,18 @@ func TestSyncRegionHandler(t *testing.T) {
for _, tt := range tcases {
p, err := newPatcher(preprogrammed)
if err != nil {
t.Fatalf("Test case '%s': %v", tt.name, err)
t.Fatalf("Test case '%s': %+v", tt.name, err)
}
c, err := newController(p, &rest.Config{})
if err != nil {
t.Fatalf("Test case '%s': %v", tt.name, err)
t.Fatalf("Test case '%s': %+v", tt.name, err)
}
if tt.regionLister != nil {
c.regionLister = tt.regionLister
}
err = c.syncRegionHandler(tt.key)
if err != nil && !tt.expectedErr {
t.Errorf("Test case '%s': unexpected error: %v", tt.name, err)
t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err)
}
if err == nil && tt.expectedErr {
t.Errorf("Test case '%s': expected error, but got success", tt.name)
@ -308,7 +308,7 @@ func TestProcessNextWorkItem(t *testing.T) {
p := &patcher{}
c, err := newController(p, &rest.Config{})
if err != nil {
t.Fatalf("Test case '%s': %v", tt.name, err)
t.Fatalf("Test case '%s': %+v", tt.name, err)
}
c.queue = &fakeQueue{
shutdown: tt.shutdown,
@ -349,12 +349,12 @@ func TestRun(t *testing.T) {
p := &patcher{}
c, err := newController(p, &rest.Config{})
if err != nil {
t.Fatalf("Test case '%s': %v", tt.name, err)
t.Fatalf("Test case '%s': %+v", tt.name, err)
}
close(c.stopCh)
err = c.run(0)
if err != nil && !tt.expectedErr {
t.Errorf("Test case '%s': unexpected error: %v", tt.name, err)
t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err)
}
if err == nil && tt.expectedErr {
t.Errorf("Test case '%s': expected error, but got success", tt.name)
@ -384,7 +384,7 @@ func TestNewController(t *testing.T) {
p := &patcher{}
c, err := newController(p, config)
if err != nil && !tt.expectedErr {
t.Errorf("Test case '%s': unexpected error: %v", tt.name, err)
t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err)
}
if err == nil && tt.expectedErr {
t.Errorf("Test case '%s': expected error, but got success", tt.name)

View File

@ -25,6 +25,7 @@ import (
"strings"
"github.com/golang/glog"
"github.com/pkg/errors"
"k8s.io/api/admission/v1beta1"
admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1"
@ -60,7 +61,7 @@ func addToScheme(scheme *runtime.Scheme) {
func getTLSConfig(certFile string, keyFile string) *tls.Config {
sCert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
glog.Fatal(err)
fatal(err)
}
return &tls.Config{
Certificates: []tls.Certificate{sCert},
@ -82,7 +83,7 @@ func mutatePods(ar v1beta1.AdmissionReview, p *patcher) *v1beta1.AdmissionRespon
pod := corev1.Pod{}
deserializer := codecs.UniversalDeserializer()
if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil {
glog.Error(err)
fmt.Printf("ERROR: %+v\n", err)
return toAdmissionResponse(err)
}
reviewResponse := v1beta1.AdmissionResponse{}
@ -148,7 +149,7 @@ func serve(w http.ResponseWriter, r *http.Request, admit admitFunc) {
reviewResponse = toAdmissionResponse(err)
} else {
if ar.Request == nil {
err = fmt.Errorf("Request is empty")
err = errors.New("Request is empty")
reviewResponse = toAdmissionResponse(err)
} else {
reqUID = ar.Request.UID
@ -187,6 +188,11 @@ func makePodsHandler(p *patcher) func(w http.ResponseWriter, r *http.Request) {
}
}
func fatal(err error) {
fmt.Printf("ERROR: %+v\n", err)
os.Exit(1)
}
func main() {
var kubeconfig string
var master string
@ -236,14 +242,12 @@ func main() {
patcher, err := newPatcher(mode)
if err != nil {
glog.Error(err)
os.Exit(1)
fatal(err)
}
controller, err := newController(patcher, config)
if err != nil {
glog.Error(err)
os.Exit(1)
fatal(err)
}
go controller.run(controllerThreadNum)
@ -256,5 +260,5 @@ func main() {
TLSConfig: getTLSConfig(certFile, keyFile),
}
glog.Fatal(server.ListenAndServeTLS("", ""))
fatal(server.ListenAndServeTLS("", ""))
}

View File

@ -21,6 +21,8 @@ import (
"strings"
"sync"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
fpgav1 "github.com/intel/intel-device-plugins-for-kubernetes/pkg/apis/fpga.intel.com/v1"
@ -66,7 +68,7 @@ type patcher struct {
func newPatcher(mode string) (*patcher, error) {
if mode != preprogrammed && mode != orchestrated {
return nil, fmt.Errorf("Unknown mode: %s", mode)
return nil, errors.Errorf("Unknown mode: %s", mode)
}
return &patcher{
@ -117,7 +119,7 @@ func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]s
return p.getPatchOpsOrchestrated(containerIdx, container)
}
return nil, fmt.Errorf("Uknown mode: %s", p.mode)
return nil, errors.Errorf("Uknown mode: %s", p.mode)
}
func (p *patcher) getPatchOpsPreprogrammed(containerIdx int, container corev1.Container) ([]string, error) {
@ -171,7 +173,7 @@ func (p *patcher) getPatchOpsOrchestrated(containerIdx int, container corev1.Con
}
if mutated {
return nil, fmt.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated)
return nil, errors.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated)
}
op := fmt.Sprintf(resourceReplaceOp, containerIdx, "limits", rfc6901Escaper.Replace(string(resourceName)),
@ -201,7 +203,7 @@ func (p *patcher) getPatchOpsOrchestrated(containerIdx int, container corev1.Con
}
if mutated {
return nil, fmt.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated)
return nil, errors.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated)
}
op := fmt.Sprintf(resourceReplaceOp, containerIdx, "requests", rfc6901Escaper.Replace(string(resourceName)),
@ -231,7 +233,7 @@ func (p *patcher) parseResourceName(input string) (string, string, error) {
case "Region":
regionName = result[num]
if interfaceID, ok = p.regionMap[result[num]]; !ok {
return "", "", fmt.Errorf("Unknown region name: %s", result[num])
return "", "", errors.Errorf("Unknown region name: %s", result[num])
}
case "Af":
afName = result[num]
@ -240,7 +242,7 @@ func (p *patcher) parseResourceName(input string) (string, string, error) {
if afName != "" {
if afuID, ok = p.afMap[regionName+"-"+afName]; !ok {
return "", "", fmt.Errorf("Unknown AF name: %s", regionName+"-"+afName)
return "", "", errors.Errorf("Unknown AF name: %s", regionName+"-"+afName)
}
}
@ -251,7 +253,7 @@ func getEnvVars(container corev1.Container) (string, error) {
var jsonstrings []string
for _, envvar := range container.Env {
if envvar.Name == "FPGA_REGION" || envvar.Name == "FPGA_AFU" {
return "", fmt.Errorf("The env var '%s' is not allowed", envvar.Name)
return "", errors.Errorf("The env var '%s' is not allowed", envvar.Name)
}
jsonbytes, err := json.Marshal(envvar)
if err != nil {

View File

@ -278,7 +278,7 @@ func TestGetPatchOpsOrchestrated(t *testing.T) {
t.Errorf("Test case '%s': no error returned", tt.name)
}
if !tt.expectedErr && err != nil {
t.Errorf("Test case '%s': unexpected error %v", tt.name, err)
t.Errorf("Test case '%s': unexpected error %+v", tt.name, err)
}
if len(ops) != tt.expectedOps {
t.Errorf("test case '%s': expected %d ops, but got %d\n%v", tt.name, tt.expectedOps, len(ops), ops)
@ -366,7 +366,7 @@ func TestGetEnvVars(t *testing.T) {
t.Errorf("Test case '%s': no error returned", tt.name)
}
if !tt.expectedErr && err != nil {
t.Errorf("Test case '%s': unexpected error %v", tt.name, err)
t.Errorf("Test case '%s': unexpected error %+v", tt.name, err)
}
}
}

View File

@ -17,7 +17,6 @@ package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
@ -26,7 +25,7 @@ import (
"regexp"
"strings"
"github.com/golang/glog"
"github.com/pkg/errors"
utilsexec "k8s.io/utils/exec"
)
@ -46,7 +45,7 @@ func decodeJSONStream(reader io.Reader) (map[string]interface{}, error) {
decoder := json.NewDecoder(reader)
content := make(map[string]interface{})
err := decoder.Decode(&content)
return content, err
return content, errors.WithStack(err)
}
type hookEnv struct {
@ -78,29 +77,29 @@ func canonize(uuid string) string {
func (he *hookEnv) getFPGAParams(content map[string]interface{}) (*fpgaParams, error) {
bundle, ok := content["bundle"]
if !ok {
return nil, fmt.Errorf("no 'bundle' field in the configuration")
return nil, errors.New("no 'bundle' field in the configuration")
}
configPath := path.Join(fmt.Sprint(bundle), he.config)
configFile, err := os.Open(configPath)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
defer configFile.Close()
content, err = decodeJSONStream(configFile)
if err != nil {
return nil, fmt.Errorf("can't decode %s", configPath)
return nil, errors.WithMessage(err, "can't decode "+configPath)
}
process, ok := content["process"]
if !ok {
return nil, fmt.Errorf("no 'process' field found in %s", configPath)
return nil, errors.Errorf("no 'process' field found in %s", configPath)
}
rawEnv, ok := process.(map[string]interface{})["env"]
if !ok {
return nil, fmt.Errorf("no 'env' field found in the 'process' struct in %s", configPath)
return nil, errors.Errorf("no 'env' field found in the 'process' struct in %s", configPath)
}
dEnv := make(map[string]string)
@ -111,22 +110,22 @@ func (he *hookEnv) getFPGAParams(content map[string]interface{}) (*fpgaParams, e
fpgaRegion, ok := dEnv[fpgaRegionEnv]
if !ok {
return nil, fmt.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaRegionEnv, configPath)
return nil, errors.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaRegionEnv, configPath)
}
fpgaAfu, ok := dEnv[fpgaAfuEnv]
if !ok {
return nil, fmt.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaAfuEnv, configPath)
return nil, errors.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaAfuEnv, configPath)
}
linux, ok := content["linux"]
if !ok {
return nil, fmt.Errorf("no 'linux' field found in %s", configPath)
return nil, errors.Errorf("no 'linux' field found in %s", configPath)
}
rawDevices, ok := linux.(map[string]interface{})["devices"]
if !ok {
return nil, fmt.Errorf("no 'devices' field found in the 'linux' struct in %s", configPath)
return nil, errors.Errorf("no 'devices' field found in the 'linux' struct in %s", configPath)
}
pattern := regexp.MustCompile(fpgaDevRegexp)
@ -137,48 +136,48 @@ func (he *hookEnv) getFPGAParams(content map[string]interface{}) (*fpgaParams, e
}
}
return nil, fmt.Errorf("no FPGA devices found in linux/devices list in %s", configPath)
return nil, errors.Errorf("no FPGA devices found in linux/devices list in %s", configPath)
}
func (he *hookEnv) validateBitStream(params *fpgaParams, fpgaBitStreamPath string) error {
output, err := he.execer.Command("packager", "gbs-info", "--gbs", fpgaBitStreamPath).CombinedOutput()
if err != nil {
return fmt.Errorf("%s/%s: can't get bitstream info: %v", params.region, params.afu, err)
return errors.Wrapf(err, "%s/%s: can't get bitstream info", params.region, params.afu)
}
reader := bytes.NewBuffer(output)
content, err := decodeJSONStream(reader)
if err != nil {
return fmt.Errorf("%s/%s: can't decode 'packager gbs-info' output: %v", params.region, params.afu, err)
return errors.WithMessage(err, fmt.Sprintf("%s/%s: can't decode 'packager gbs-info' output", params.region, params.afu))
}
afuImage, ok := content["afu-image"]
if !ok {
return fmt.Errorf("%s/%s: 'afu-image' field not found in the 'packager gbs-info' output", params.region, params.afu)
return errors.Errorf("%s/%s: 'afu-image' field not found in the 'packager gbs-info' output", params.region, params.afu)
}
interfaceUUID, ok := afuImage.(map[string]interface{})["interface-uuid"]
if !ok {
return fmt.Errorf("%s/%s: 'interface-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu)
return errors.Errorf("%s/%s: 'interface-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu)
}
acceleratorClusters, ok := afuImage.(map[string]interface{})["accelerator-clusters"]
if !ok {
return fmt.Errorf("%s/%s: 'accelerator-clusters' field not found in the 'packager gbs-info' output", params.region, params.afu)
return errors.Errorf("%s/%s: 'accelerator-clusters' field not found in the 'packager gbs-info' output", params.region, params.afu)
}
if canonize(interfaceUUID.(string)) != params.region {
return fmt.Errorf("bitstream is not for this device: region(%s) and interface-uuid(%s) don't match", params.region, interfaceUUID)
return errors.Errorf("bitstream is not for this device: region(%s) and interface-uuid(%s) don't match", params.region, interfaceUUID)
}
acceleratorTypeUUID, ok := acceleratorClusters.([]interface{})[0].(map[string]interface{})["accelerator-type-uuid"]
if !ok {
return fmt.Errorf("%s/%s: 'accelerator-type-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu)
return errors.Errorf("%s/%s: 'accelerator-type-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu)
}
if canonize(acceleratorTypeUUID.(string)) != params.afu {
return fmt.Errorf("incorrect bitstream: AFU(%s) and accelerator-type-uuid(%s) don't match", params.afu, acceleratorTypeUUID)
return errors.Errorf("incorrect bitstream: AFU(%s) and accelerator-type-uuid(%s) don't match", params.afu, acceleratorTypeUUID)
}
return nil
@ -187,7 +186,7 @@ func (he *hookEnv) validateBitStream(params *fpgaParams, fpgaBitStreamPath strin
func (he *hookEnv) programBitStream(params *fpgaParams, fpgaBitStreamPath string) error {
output, err := he.execer.Command("fpgaconf", "-S", params.devNum, fpgaBitStreamPath).CombinedOutput()
if err != nil {
return fmt.Errorf("failed to program AFU %s to socket %s, region %s: error: %v, output: %s", params.afu, params.devNum, params.region, err, string(output))
return errors.Wrapf(err, "failed to program AFU %s to socket %s, region %s: output: %s", params.afu, params.devNum, params.region, string(output))
}
programmedAfu, err := he.getProgrammedAfu(params.devNum)
@ -196,7 +195,7 @@ func (he *hookEnv) programBitStream(params *fpgaParams, fpgaBitStreamPath string
}
if programmedAfu != params.afu {
return fmt.Errorf("programmed function %s instead of %s", programmedAfu, params.afu)
return errors.Errorf("programmed function %s instead of %s", programmedAfu, params.afu)
}
return nil
@ -208,7 +207,7 @@ func (he *hookEnv) getProgrammedAfu(deviceNum string) (string, error) {
afuIDPath := fmt.Sprintf(he.afuIDTemplate, deviceNum, deviceNum)
data, err := ioutil.ReadFile(afuIDPath)
if err != nil {
return "", err
return "", errors.WithStack(err)
}
return strings.TrimSpace(string(data)), nil
}
@ -222,7 +221,7 @@ func (he *hookEnv) process(reader io.Reader) error {
// Check if device plugin annotation is set
annotations, ok := content["annotations"]
if !ok {
return fmt.Errorf("no 'annotations' field in the configuration")
return errors.New("no 'annotations' field in the configuration")
}
annotation, ok := annotations.(map[string]interface{})[annotationName]
@ -237,7 +236,7 @@ func (he *hookEnv) process(reader io.Reader) error {
params, err := he.getFPGAParams(content)
if err != nil {
return fmt.Errorf("couldn't get FPGA region, AFU and device number: %v, skipping", err)
return errors.WithMessage(err, "couldn't get FPGA region, AFU and device number")
}
programmedAfu, err := he.getProgrammedAfu(params.devNum)
@ -252,7 +251,7 @@ func (he *hookEnv) process(reader io.Reader) error {
fpgaBitStreamPath := path.Join(he.bitStreamDir, params.region, params.afu+fpgaBitStreamExt)
if _, err = os.Stat(fpgaBitStreamPath); os.IsNotExist(err) {
return fmt.Errorf("%s/%s: bitstream is not found", params.region, params.afu)
return errors.Errorf("%s/%s: bitstream is not found", params.region, params.afu)
}
err = he.validateBitStream(params, fpgaBitStreamPath)
@ -269,9 +268,6 @@ func (he *hookEnv) process(reader io.Reader) error {
}
func main() {
//work around glog ERROR: logging before flag.Parse: I0618
flag.Parse()
if os.Getenv("PATH") == "" { // runc doesn't set PATH when runs hooks
os.Setenv("PATH", "/sbin:/usr/sbin:/usr/local/sbin:/usr/local/bin:/usr/bin:/bin")
}
@ -280,7 +276,7 @@ func main() {
err := he.process(os.Stdin)
if err != nil {
glog.Error(err)
fmt.Printf("%+v\n", err)
os.Exit(1)
}
}

View File

@ -108,7 +108,7 @@ func TestGetFPGAParams(t *testing.T) {
content, err := decodeJSONStream(stdin)
if err != nil {
t.Fatalf("can't decode json file %s: %v", tc.stdinJSON, err)
t.Fatalf("can't decode json file %s: %+v", tc.stdinJSON, err)
}
he := newHookEnv("", tc.configJSON, nil, "")
@ -117,7 +117,7 @@ func TestGetFPGAParams(t *testing.T) {
if err != nil {
if !tc.expectedErr {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %+v", err)
}
} else {
if params.region != tc.expectedRegion {
@ -238,7 +238,7 @@ func TestValidateBitstream(t *testing.T) {
he := newHookEnv("", "", &execer, "")
err := he.validateBitStream(tc.params, "")
if err != nil && !tc.expectedErr {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %+v", err)
}
}
}
@ -300,7 +300,7 @@ func TestProgramBitStream(t *testing.T) {
he.execer = &fakeexec.FakeExec{CommandScript: genFakeActions(&fcmd, len(fcmd.CombinedOutputScript))}
err := he.programBitStream(tc.params, "")
if err != nil && !tc.expectedErr {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %+v", err)
}
}
}
@ -416,7 +416,7 @@ func TestProcess(t *testing.T) {
err = he.process(stdin)
if err != nil && !tc.expectedErr {
t.Errorf("unexpected error: %v", err)
t.Errorf("unexpected error: %+v", err)
}
}
}

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -182,7 +183,7 @@ func newDevicePlugin(sysfsDir string, devfsDir string, mode string) (*devicePlug
getDevTree = getRegionDevelTree
ignoreAfuIDs = true
default:
return nil, fmt.Errorf("Wrong mode: '%s'", mode)
return nil, errors.Errorf("Wrong mode: '%s'", mode)
}
return &devicePlugin{
@ -215,8 +216,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
for {
devTree, err := dp.scanFPGAs()
if err != nil {
glog.Error("Device scan failed: ", err)
return fmt.Errorf("Device scan failed: %v", err)
return err
}
notifier.Notify(devTree)
@ -228,7 +228,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
func (dp *devicePlugin) getDevNode(devName string) (string, error) {
devNode := path.Join(dp.devfsDir, devName)
if _, err := os.Stat(devNode); err != nil {
return "", fmt.Errorf("Device %s doesn't exist: %v", devNode, err)
return "", errors.Wrapf(err, "Device %s doesn't exist", devNode)
}
return devNode, nil
@ -241,7 +241,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) {
fpgaFiles, err := ioutil.ReadDir(dp.sysfsDir)
if err != nil {
return nil, fmt.Errorf("Can't read sysfs folder: %v", err)
return nil, errors.Wrap(err, "Can't read sysfs folder")
}
for _, fpgaFile := range fpgaFiles {
@ -254,7 +254,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) {
deviceFolder := path.Join(dp.sysfsDir, fname)
deviceFiles, err := ioutil.ReadDir(deviceFolder)
if err != nil {
return nil, err
return nil, errors.WithStack(err)
}
regions, afus, err := dp.getSysFsInfo(deviceFolder, deviceFiles, fname)
@ -263,7 +263,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) {
}
if len(regions) == 0 {
return nil, fmt.Errorf("No regions found for device %s", fname)
return nil, errors.Errorf("No regions found for device %s", fname)
}
// Currently only one region per device is supported.
@ -285,12 +285,12 @@ func (dp *devicePlugin) getSysFsInfo(deviceFolder string, deviceFiles []os.FileI
if dp.fmeReg.MatchString(name) {
if len(regions) > 0 {
return nil, nil, fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname)
return nil, nil, errors.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname)
}
interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id")
data, err := ioutil.ReadFile(interfaceIDFile)
if err != nil {
return nil, nil, err
return nil, nil, errors.WithStack(err)
}
devNode, err := dp.getDevNode(name)
if err != nil {
@ -310,7 +310,7 @@ func (dp *devicePlugin) getSysFsInfo(deviceFolder string, deviceFiles []os.FileI
afuFile := path.Join(deviceFolder, name, "afu_id")
data, err := ioutil.ReadFile(afuFile)
if err != nil {
return nil, nil, err
return nil, nil, errors.WithStack(err)
}
afuID = strings.TrimSpace(string(data))
}
@ -329,6 +329,11 @@ func (dp *devicePlugin) getSysFsInfo(deviceFolder string, deviceFiles []os.FileI
return regions, afus, nil
}
func fatal(err error) {
fmt.Printf("ERROR: %+v\n", err)
os.Exit(1)
}
func main() {
var mode string
var kubeconfig string
@ -348,7 +353,7 @@ func main() {
config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig)
}
if err != nil {
glog.Fatal(err)
fatal(err)
}
// if NODE_NAME is not set then try to use hostname
@ -356,12 +361,12 @@ func main() {
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
glog.Fatal(err)
fatal(err)
}
node, err := clientset.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil {
glog.Fatal(err)
fatal(err)
}
if nodeMode, ok := node.ObjectMeta.Annotations["fpga.intel.com/device-plugin-mode"]; ok {
@ -371,8 +376,7 @@ func main() {
plugin, err := newDevicePlugin(sysfsDirectory, devfsDirectory, mode)
if err != nil {
glog.Error(err)
os.Exit(1)
fatal(err)
}
glog.Info("FPGA device plugin started in ", mode, " mode")

View File

@ -24,6 +24,8 @@ import (
"testing"
"time"
"github.com/pkg/errors"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
dpapi "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin"
@ -35,19 +37,19 @@ func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFil
for _, devfsdir := range devfsDirs {
err = os.MkdirAll(path.Join(devfs, devfsdir), 0755)
if err != nil {
return fmt.Errorf("Failed to create fake device directory: %v", err)
return errors.Wrap(err, "Failed to create fake device directory")
}
}
for _, sysfsdir := range sysfsDirs {
err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755)
if err != nil {
return fmt.Errorf("Failed to create fake device directory: %v", err)
return errors.Wrap(err, "Failed to create fake device directory")
}
}
for filename, body := range sysfsFiles {
err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644)
if err != nil {
return fmt.Errorf("Failed to create fake vendor file: %v", err)
return errors.Wrap(err, "Failed to create fake vendor file")
}
}
@ -326,12 +328,12 @@ func TestScanFPGAs(t *testing.T) {
for _, tcase := range tcases {
err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles)
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
plugin, err := newDevicePlugin(sysfs, devfs, tcase.mode)
if err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
plugin.getDevTree = func(devices []device) dpapi.DeviceTree {
return dpapi.NewDeviceTree()
@ -343,7 +345,7 @@ func TestScanFPGAs(t *testing.T) {
t.Errorf("Test case '%s': expected error '%s', but got '%v'", tcase.name, tcase.errorContains, err)
}
} else if err != nil {
t.Errorf("Test case '%s': expected no error, but got '%v'", tcase.name, err)
t.Errorf("Test case '%s': expected no error, but got '%+v'", tcase.name, err)
}
err = os.RemoveAll(tmpdir)

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
@ -67,8 +68,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
for {
devTree, err := dp.scan()
if err != nil {
glog.Error("Device scan failed: ", err)
return fmt.Errorf("Device scan failed: %v", err)
return err
}
notifier.Notify(devTree)
@ -80,7 +80,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error {
func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
files, err := ioutil.ReadDir(dp.sysfsDir)
if err != nil {
return nil, fmt.Errorf("Can't read sysfs folder: %v", err)
return nil, errors.Wrap(err, "Can't read sysfs folder")
}
devTree := dpapi.NewDeviceTree()
@ -97,7 +97,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) {
drmFiles, err := ioutil.ReadDir(path.Join(dp.sysfsDir, f.Name(), "device/drm"))
if err != nil {
return nil, fmt.Errorf("Can't read device folder: %v", err)
return nil, errors.Wrap(err, "Can't read device folder")
}
for _, drmFile := range drmFiles {

View File

@ -105,6 +105,9 @@ func TestScan(t *testing.T) {
if tcase.expectedErr && err == nil {
t.Error("Expected error hasn't been triggered")
}
if !tcase.expectedErr && err != nil {
t.Errorf("Unexpcted error: %+v", err)
}
if tcase.expectedDevs != len(tree[deviceType]) {
t.Errorf("Wrong number of discovered devices")
}

View File

@ -26,6 +26,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/pkg/errors"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
@ -71,8 +72,7 @@ func (dp *devicePlugin) Scan(notifier deviceplugin.Notifier) error {
for {
devTree, err := dp.scan()
if err != nil {
glog.Error("Device scan failed: ", err)
return fmt.Errorf("Device scan failed: %v", err)
return err
}
notifier.Notify(devTree)
@ -93,7 +93,7 @@ func (dp *devicePlugin) getDpdkDevice(id string) (string, error) {
return "", err
}
if len(files) == 0 {
return "", fmt.Errorf("No devices found")
return "", errors.New("No devices found")
}
return files[0].Name(), nil
@ -101,20 +101,20 @@ func (dp *devicePlugin) getDpdkDevice(id string) (string, error) {
vfioDirPath := path.Join(dp.pciDeviceDir, devicePCIAdd, iommuGroupSuffix)
group, err := filepath.EvalSymlinks(vfioDirPath)
if err != nil {
return "", err
return "", errors.WithStack(err)
}
s := strings.TrimPrefix(group, sysfsIommuGroupPrefix)
fmt.Printf("The vfio device group detected is %v\n", s)
return s, nil
}
return "", fmt.Errorf("Unknown DPDK driver")
return "", errors.New("Unknown DPDK driver")
}
func (dp *devicePlugin) getDpdkDeviceNames(id string) ([]string, error) {
dpdkDeviceName, err := dp.getDpdkDevice(id)
if err != nil {
return []string{}, fmt.Errorf("Unable to get the dpdk device for creating device nodes: %v", err)
return []string{}, err
}
fmt.Printf("%s device: corresponding DPDK device detected is %s\n", id, dpdkDeviceName)
@ -131,13 +131,13 @@ func (dp *devicePlugin) getDpdkDeviceNames(id string) ([]string, error) {
return []string{vfioDev1, vfioDev2}, nil
}
return []string{}, fmt.Errorf("Unknown DPDK driver")
return []string{}, errors.New("Unknown DPDK driver")
}
func (dp *devicePlugin) getDpdkMountPaths(id string) ([]string, error) {
dpdkDeviceName, err := dp.getDpdkDevice(id)
if err != nil {
return []string{}, fmt.Errorf("Unable to get the dpdk device for mountPath: %v", err)
return []string{}, err
}
switch dp.dpdkDriver {
@ -150,13 +150,13 @@ func (dp *devicePlugin) getDpdkMountPaths(id string) ([]string, error) {
return []string{}, nil
}
return nil, fmt.Errorf("Unknown DPDK driver")
return nil, errors.New("Unknown DPDK driver")
}
func (dp *devicePlugin) getDeviceID(pciAddr string) (string, error) {
devID, err := ioutil.ReadFile(path.Join(dp.pciDeviceDir, pciAddr, "device"))
if err != nil {
return "", fmt.Errorf("Cannot obtain ID for the device %s: %v", pciAddr, err)
return "", errors.Wrapf(err, "Cannot obtain ID for the device %s", pciAddr)
}
return strings.TrimPrefix(string(bytes.TrimSpace(devID)), "0x"), nil
@ -170,19 +170,19 @@ func (dp *devicePlugin) bindDevice(id string) error {
// Unbind from the kernel driver
err := ioutil.WriteFile(unbindDevicePath, []byte(devicePCIAddr), 0644)
if err != nil {
return fmt.Errorf("Unbinding from kernel driver failed for the device %s: %v", id, err)
return errors.Wrapf(err, "Unbinding from kernel driver failed for the device %s: %v", id)
}
vfdevID, err := dp.getDeviceID(devicePCIAddr)
if err != nil {
return fmt.Errorf("Cannot obtain ID for the device %s: %v", id, err)
return err
}
bindDevicePath := path.Join(dp.pciDriverDir, dp.dpdkDriver, newIDSuffix)
//Bind to the the dpdk driver
err = ioutil.WriteFile(bindDevicePath, []byte(vendorPrefix+vfdevID), 0644)
if err != nil {
return fmt.Errorf("Binding to the DPDK driver failed for the device %s: %v", id, err)
return errors.Wrapf(err, "Binding to the DPDK driver failed for the device %s: %v", id)
}
return nil
@ -210,7 +210,7 @@ func (dp *devicePlugin) scan() (deviceplugin.DeviceTree, error) {
for _, driver := range append(dp.kernelVfDrivers, dp.dpdkDriver) {
files, err := ioutil.ReadDir(path.Join(dp.pciDriverDir, driver))
if err != nil {
return nil, fmt.Errorf("Can't read sysfs for driver %s: %+v", driver, err)
return nil, errors.Wrapf(err, "Can't read sysfs for driver %s", driver)
}
n := 0
@ -230,17 +230,17 @@ func (dp *devicePlugin) scan() (deviceplugin.DeviceTree, error) {
if driver != dp.dpdkDriver {
err := dp.bindDevice(vfpciaddr)
if err != nil {
return nil, fmt.Errorf("Error in binding the device to the dpdk driver: %+v", err)
return nil, err
}
}
devNodes, err := dp.getDpdkDeviceNames(vfpciaddr)
if err != nil {
return nil, fmt.Errorf("Error in obtaining the device name: %+v", err)
return nil, err
}
devMounts, err := dp.getDpdkMountPaths(vfpciaddr)
if err != nil {
return nil, fmt.Errorf("Error in obtaining the mount point: %+v", err)
return nil, err
}
devinfo := deviceplugin.DeviceInfo{

View File

@ -21,19 +21,21 @@ import (
"path"
"testing"
"time"
"github.com/pkg/errors"
)
func createTestFiles(prefix string, dirs []string, files map[string][]byte) error {
for _, dir := range dirs {
err := os.MkdirAll(path.Join(prefix, dir), 0755)
if err != nil {
return fmt.Errorf("Failed to create fake device directory: %v", err)
return errors.Wrap(err, "Failed to create fake device directory")
}
}
for filename, body := range files {
err := ioutil.WriteFile(path.Join(prefix, filename), body, 0644)
if err != nil {
return fmt.Errorf("Failed to create fake vendor file: %v", err)
return errors.Wrap(err, "Failed to create fake vendor file")
}
}
@ -163,7 +165,7 @@ func TestScanPrivate(t *testing.T) {
t.Fatal(err)
}
if err := createTestFiles(tmpdir, tt.dirs, tt.files); err != nil {
t.Fatal(err)
t.Fatalf("%+v", err)
}
dp := &devicePlugin{

View File

@ -119,7 +119,8 @@ func (m *Manager) handleUpdate(update updateInfo) {
go func() {
err := m.servers[devType].Serve(m.namespace)
if err != nil {
glog.Fatal(err)
fmt.Printf("Failed to serve %s/%s: %+v\n", m.namespace, devType, err)
os.Exit(1)
}
}()
m.servers[devType].Update(devices)

View File

@ -25,6 +25,7 @@ import (
"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
"github.com/pkg/errors"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
@ -71,7 +72,7 @@ func (srv *server) sendDevices(stream pluginapi.DevicePlugin_ListAndWatchServer)
glog.V(2).Info("Sending to kubelet ", resp.Devices)
if err := stream.Send(resp); err != nil {
srv.Stop()
return fmt.Errorf("device-plugin: cannot update device list: %v", err)
return errors.Wrapf(err, "Cannot update device list")
}
return nil
@ -100,10 +101,10 @@ func (srv *server) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest)
for _, id := range crqt.DevicesIDs {
dev, ok := srv.devices[id]
if !ok {
return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
return nil, errors.Errorf("Invalid allocation request with non-existing device %s", id)
}
if dev.State != pluginapi.Healthy {
return nil, fmt.Errorf("Invalid allocation request with unhealthy device %s", id)
return nil, errors.Errorf("Invalid allocation request with unhealthy device %s", id)
}
for _, devnode := range dev.Nodes {
cresp.Devices = append(cresp.Devices, &pluginapi.DeviceSpec{
@ -139,8 +140,7 @@ func (srv *server) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest)
}
func (srv *server) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) {
glog.Warning("PreStartContainer() should not be called")
return new(pluginapi.PreStartContainerResponse), nil
return nil, errors.New("PreStartContainer() should not be called")
}
// Serve starts a gRPC server to serve pluginapi.PluginInterfaceServer interface.
@ -151,7 +151,7 @@ func (srv *server) Serve(namespace string) error {
// Stop stops serving pluginapi.PluginInterfaceServer interface.
func (srv *server) Stop() error {
if srv.grpcServer == nil {
return fmt.Errorf("Can't stop non-existing gRPC server. Calling Stop() before Serve()?")
return errors.New("Can't stop non-existing gRPC server. Calling Stop() before Serve()?")
}
srv.grpcServer.Stop()
close(srv.updatesCh)
@ -173,13 +173,13 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
if err := waitForServer(pluginSocket, time.Second); err == nil {
return fmt.Errorf("Socket %s is already in use", pluginSocket)
return errors.Errorf("Socket %s is already in use", pluginSocket)
}
os.Remove(pluginSocket)
lis, err := net.Listen("unix", pluginSocket)
if err != nil {
return fmt.Errorf("Failed to listen to plugin socket: %+v", err)
return errors.Wrap(err, "Failed to listen to plugin socket")
}
srv.grpcServer = grpc.NewServer()
@ -193,20 +193,20 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube
// Wait for the server to start
if err = waitForServer(pluginSocket, 10*time.Second); err != nil {
return fmt.Errorf("Failed to wait for plugin socket: %+v", err)
return err
}
// Register with Kubelet.
err = registerWithKubelet(kubeletSocket, pluginEndpoint, resourceName)
if err != nil {
return fmt.Errorf("Failed to register: %+v", err)
return err
}
fmt.Println("device-plugin registered")
// Kubelet removes plugin socket when it (re)starts
// plugin must restart in this case
if err = watchFile(pluginSocket); err != nil {
return fmt.Errorf("error watching plugin socket: %+v", err)
return err
}
fmt.Printf("socket %s removed, restarting", pluginSocket)
@ -220,13 +220,13 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube
func watchFile(file string) error {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return err
return errors.Wrapf(err, "Failed to create watcher for %s", file)
}
defer watcher.Close()
err = watcher.Add(filepath.Dir(file))
if err != nil {
return err
return errors.Wrapf(err, "Failed to add %s to watcher", file)
}
for {
@ -236,7 +236,7 @@ func watchFile(file string) error {
return nil
}
case err := <-watcher.Errors:
return err
return errors.WithStack(err)
}
}
}
@ -248,7 +248,7 @@ func registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) err
}))
defer conn.Close()
if err != nil {
return fmt.Errorf("device-plugin: cannot connect to kubelet service: %v", err)
return errors.Wrap(err, "Cannot connect to kubelet service")
}
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
@ -259,7 +259,7 @@ func registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) err
_, err = client.Register(context.Background(), reqt)
if err != nil {
return fmt.Errorf("device-plugin: cannot register to kubelet service: %v", err)
return errors.Wrap(err, "Cannot register to kubelet service")
}
return nil
@ -278,5 +278,5 @@ func waitForServer(socket string, timeout time.Duration) error {
if conn != nil {
conn.Close()
}
return err
return errors.Wrapf(err, "Failed dial context at %s", socket)
}

View File

@ -28,6 +28,8 @@ import (
"google.golang.org/grpc/metadata"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"github.com/pkg/errors"
)
const (
@ -65,8 +67,7 @@ func (k *kubeletStub) start() error {
os.Remove(k.socket)
s, err := net.Listen("unix", k.socket)
if err != nil {
fmt.Printf("Can't listen at the socket: %+v", err)
return err
return errors.Wrap(err, "Can't listen at the socket")
}
k.server = grpc.NewServer()
@ -89,7 +90,10 @@ func TestRegisterWithKublet(t *testing.T) {
}
kubelet := newKubeletStub(kubeletSocket)
kubelet.start()
err = kubelet.start()
if err != nil {
t.Fatalf("%+v", err)
}
defer kubelet.server.Stop()
err = registerWithKubelet(kubeletSocket, pluginSocket, resourceName)
@ -405,7 +409,7 @@ func TestListAndWatch(t *testing.T) {
err := testServer.ListAndWatch(&pluginapi.Empty{}, server)
if err != nil && tt.errorOnCall == 0 {
t.Errorf("Test case '%s': got unexpected error %v", tt.name, err)
t.Errorf("Test case '%s': got unexpected error %+v", tt.name, err)
}
if err == nil && tt.errorOnCall != 0 {
t.Errorf("Test case '%s': expected an error, but got nothing", tt.name)