From 2ff6c5929a136e8dc44bf886e8a70af9b690f5ef Mon Sep 17 00:00:00 2001 From: Dmitry Rozhkov Date: Thu, 16 Aug 2018 17:31:19 +0300 Subject: [PATCH] Use annotated errors for tracing --- cmd/fpga_admissionwebhook/controller.go | 32 +++++----- cmd/fpga_admissionwebhook/controller_test.go | 20 +++---- .../fpga_admissionwebhook.go | 20 ++++--- cmd/fpga_admissionwebhook/patcher.go | 16 ++--- cmd/fpga_admissionwebhook/patcher_test.go | 4 +- cmd/fpga_crihook/main.go | 58 +++++++++---------- cmd/fpga_crihook/main_test.go | 10 ++-- cmd/fpga_plugin/fpga_plugin.go | 34 ++++++----- cmd/fpga_plugin/fpga_plugin_test.go | 14 +++-- cmd/gpu_plugin/gpu_plugin.go | 8 +-- cmd/gpu_plugin/gpu_plugin_test.go | 3 + cmd/qat_plugin/qat_plugin.go | 34 +++++------ cmd/qat_plugin/qat_plugin_test.go | 8 ++- internal/deviceplugin/manager.go | 3 +- internal/deviceplugin/server.go | 34 +++++------ internal/deviceplugin/server_test.go | 12 ++-- 16 files changed, 164 insertions(+), 146 deletions(-) diff --git a/cmd/fpga_admissionwebhook/controller.go b/cmd/fpga_admissionwebhook/controller.go index b3cde4a2..1fb76026 100644 --- a/cmd/fpga_admissionwebhook/controller.go +++ b/cmd/fpga_admissionwebhook/controller.go @@ -19,8 +19,9 @@ import ( "time" "github.com/golang/glog" + "github.com/pkg/errors" - "k8s.io/apimachinery/pkg/api/errors" + k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" @@ -55,8 +56,7 @@ type controller struct { func newController(patcher *patcher, config *rest.Config) (*controller, error) { clientset, err := clientset.NewForConfig(config) if err != nil { - glog.Error("Failed to create REST clientset ", err) - return nil, err + return nil, errors.Wrap(err, "Failed to create REST clientset") } informerFactory := informers.NewSharedInformerFactory(clientset, resyncPeriod) @@ -94,11 +94,11 @@ func (c *controller) run(threadiness int) error { go c.informerFactory.Start(c.stopCh) if ok := cache.WaitForCacheSync(c.stopCh, c.afsSynced); !ok { - return fmt.Errorf("failed to wait for AF caches to sync") + return errors.New("failed to wait for AF caches to sync") } if ok := cache.WaitForCacheSync(c.stopCh, c.regionsSynced); !ok { - return fmt.Errorf("failed to wait for Region caches to sync") + return errors.New("failed to wait for Region caches to sync") } for i := 0; i < threadiness; i++ { @@ -107,7 +107,7 @@ func (c *controller) run(threadiness int) error { } }, time.Second, c.stopCh) } - glog.Info("Started controller workers") + fmt.Println("Started controller workers") <-c.stopCh return nil @@ -133,21 +133,21 @@ func (c *controller) processNextWorkItem() bool { if key, ok = obj.(fpgaObjectKey); !ok { c.queue.Forget(obj) - return fmt.Errorf("expected fpgaObjectKey in workqueue but got %#v", obj) + return errors.Errorf("expected fpgaObjectKey in workqueue but got %#v", obj) } switch key.kind { case "af": if err := c.syncAfHandler(key.name); err != nil { - return fmt.Errorf("error syncing '%s': %v", key.name, err) + return errors.Wrapf(err, "error syncing '%s'", key.name) } case "region": if err := c.syncRegionHandler(key.name); err != nil { - return fmt.Errorf("error syncing '%s': %v", key.name, err) + return errors.Wrapf(err, "error syncing '%s'", key.name) } default: c.queue.Forget(obj) - return fmt.Errorf("Unknown kind of object key: %s", key.kind) + return errors.Errorf("Unknown kind of object key: %s", key.kind) } // Finally, if no error occurs we Forget this item so it does not @@ -169,7 +169,7 @@ func (c *controller) syncAfHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + runtime.HandleError(errors.Errorf("invalid resource key: %s", key)) return nil } @@ -178,8 +178,8 @@ func (c *controller) syncAfHandler(key string) error { if err != nil { // The AcceleratorFunction resource may no longer exist, in which case we stop // processing. - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("accelerated function '%s' in work queue no longer exists", key)) + if k8serrors.IsNotFound(err) { + runtime.HandleError(errors.Errorf("accelerated function '%s' in work queue no longer exists", key)) glog.V(2).Infof("AF '%s' no longer exists", key) c.patcher.removeAf(name) return nil @@ -197,7 +197,7 @@ func (c *controller) syncRegionHandler(key string) error { // Convert the namespace/name string into a distinct namespace and name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + runtime.HandleError(errors.Errorf("invalid resource key: %s", key)) return nil } @@ -206,8 +206,8 @@ func (c *controller) syncRegionHandler(key string) error { if err != nil { // The FpgaRegion resource may no longer exist, in which case we stop // processing. - if errors.IsNotFound(err) { - runtime.HandleError(fmt.Errorf("FPGA region '%s' in work queue no longer exists", key)) + if k8serrors.IsNotFound(err) { + runtime.HandleError(errors.Errorf("FPGA region '%s' in work queue no longer exists", key)) glog.V(2).Infof("Region '%s' no longer exists", key) c.patcher.removeRegion(name) return nil diff --git a/cmd/fpga_admissionwebhook/controller_test.go b/cmd/fpga_admissionwebhook/controller_test.go index 76471538..9d54b5e2 100644 --- a/cmd/fpga_admissionwebhook/controller_test.go +++ b/cmd/fpga_admissionwebhook/controller_test.go @@ -98,18 +98,18 @@ func TestSyncAfHandler(t *testing.T) { for _, tt := range tcases { p, err := newPatcher(preprogrammed) if err != nil { - t.Fatalf("Test case '%s': %v", tt.name, err) + t.Fatalf("Test case '%s': %+v", tt.name, err) } c, err := newController(p, &rest.Config{}) if err != nil { - t.Fatalf("Test case '%s': %v", tt.name, err) + t.Fatalf("Test case '%s': %+v", tt.name, err) } if tt.afLister != nil { c.afLister = tt.afLister } err = c.syncAfHandler(tt.key) if err != nil && !tt.expectedErr { - t.Errorf("Test case '%s': unexpected error: %v", tt.name, err) + t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err) } if err == nil && tt.expectedErr { t.Errorf("Test case '%s': expected error, but got success", tt.name) @@ -188,18 +188,18 @@ func TestSyncRegionHandler(t *testing.T) { for _, tt := range tcases { p, err := newPatcher(preprogrammed) if err != nil { - t.Fatalf("Test case '%s': %v", tt.name, err) + t.Fatalf("Test case '%s': %+v", tt.name, err) } c, err := newController(p, &rest.Config{}) if err != nil { - t.Fatalf("Test case '%s': %v", tt.name, err) + t.Fatalf("Test case '%s': %+v", tt.name, err) } if tt.regionLister != nil { c.regionLister = tt.regionLister } err = c.syncRegionHandler(tt.key) if err != nil && !tt.expectedErr { - t.Errorf("Test case '%s': unexpected error: %v", tt.name, err) + t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err) } if err == nil && tt.expectedErr { t.Errorf("Test case '%s': expected error, but got success", tt.name) @@ -308,7 +308,7 @@ func TestProcessNextWorkItem(t *testing.T) { p := &patcher{} c, err := newController(p, &rest.Config{}) if err != nil { - t.Fatalf("Test case '%s': %v", tt.name, err) + t.Fatalf("Test case '%s': %+v", tt.name, err) } c.queue = &fakeQueue{ shutdown: tt.shutdown, @@ -349,12 +349,12 @@ func TestRun(t *testing.T) { p := &patcher{} c, err := newController(p, &rest.Config{}) if err != nil { - t.Fatalf("Test case '%s': %v", tt.name, err) + t.Fatalf("Test case '%s': %+v", tt.name, err) } close(c.stopCh) err = c.run(0) if err != nil && !tt.expectedErr { - t.Errorf("Test case '%s': unexpected error: %v", tt.name, err) + t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err) } if err == nil && tt.expectedErr { t.Errorf("Test case '%s': expected error, but got success", tt.name) @@ -384,7 +384,7 @@ func TestNewController(t *testing.T) { p := &patcher{} c, err := newController(p, config) if err != nil && !tt.expectedErr { - t.Errorf("Test case '%s': unexpected error: %v", tt.name, err) + t.Errorf("Test case '%s': unexpected error: %+v", tt.name, err) } if err == nil && tt.expectedErr { t.Errorf("Test case '%s': expected error, but got success", tt.name) diff --git a/cmd/fpga_admissionwebhook/fpga_admissionwebhook.go b/cmd/fpga_admissionwebhook/fpga_admissionwebhook.go index 00279597..c463f9e5 100644 --- a/cmd/fpga_admissionwebhook/fpga_admissionwebhook.go +++ b/cmd/fpga_admissionwebhook/fpga_admissionwebhook.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/golang/glog" + "github.com/pkg/errors" "k8s.io/api/admission/v1beta1" admissionregistrationv1beta1 "k8s.io/api/admissionregistration/v1beta1" @@ -60,7 +61,7 @@ func addToScheme(scheme *runtime.Scheme) { func getTLSConfig(certFile string, keyFile string) *tls.Config { sCert, err := tls.LoadX509KeyPair(certFile, keyFile) if err != nil { - glog.Fatal(err) + fatal(err) } return &tls.Config{ Certificates: []tls.Certificate{sCert}, @@ -82,7 +83,7 @@ func mutatePods(ar v1beta1.AdmissionReview, p *patcher) *v1beta1.AdmissionRespon pod := corev1.Pod{} deserializer := codecs.UniversalDeserializer() if _, _, err := deserializer.Decode(raw, nil, &pod); err != nil { - glog.Error(err) + fmt.Printf("ERROR: %+v\n", err) return toAdmissionResponse(err) } reviewResponse := v1beta1.AdmissionResponse{} @@ -148,7 +149,7 @@ func serve(w http.ResponseWriter, r *http.Request, admit admitFunc) { reviewResponse = toAdmissionResponse(err) } else { if ar.Request == nil { - err = fmt.Errorf("Request is empty") + err = errors.New("Request is empty") reviewResponse = toAdmissionResponse(err) } else { reqUID = ar.Request.UID @@ -187,6 +188,11 @@ func makePodsHandler(p *patcher) func(w http.ResponseWriter, r *http.Request) { } } +func fatal(err error) { + fmt.Printf("ERROR: %+v\n", err) + os.Exit(1) +} + func main() { var kubeconfig string var master string @@ -236,14 +242,12 @@ func main() { patcher, err := newPatcher(mode) if err != nil { - glog.Error(err) - os.Exit(1) + fatal(err) } controller, err := newController(patcher, config) if err != nil { - glog.Error(err) - os.Exit(1) + fatal(err) } go controller.run(controllerThreadNum) @@ -256,5 +260,5 @@ func main() { TLSConfig: getTLSConfig(certFile, keyFile), } - glog.Fatal(server.ListenAndServeTLS("", "")) + fatal(server.ListenAndServeTLS("", "")) } diff --git a/cmd/fpga_admissionwebhook/patcher.go b/cmd/fpga_admissionwebhook/patcher.go index bd2ff773..3e8082fd 100644 --- a/cmd/fpga_admissionwebhook/patcher.go +++ b/cmd/fpga_admissionwebhook/patcher.go @@ -21,6 +21,8 @@ import ( "strings" "sync" + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" fpgav1 "github.com/intel/intel-device-plugins-for-kubernetes/pkg/apis/fpga.intel.com/v1" @@ -66,7 +68,7 @@ type patcher struct { func newPatcher(mode string) (*patcher, error) { if mode != preprogrammed && mode != orchestrated { - return nil, fmt.Errorf("Unknown mode: %s", mode) + return nil, errors.Errorf("Unknown mode: %s", mode) } return &patcher{ @@ -117,7 +119,7 @@ func (p *patcher) getPatchOps(containerIdx int, container corev1.Container) ([]s return p.getPatchOpsOrchestrated(containerIdx, container) } - return nil, fmt.Errorf("Uknown mode: %s", p.mode) + return nil, errors.Errorf("Uknown mode: %s", p.mode) } func (p *patcher) getPatchOpsPreprogrammed(containerIdx int, container corev1.Container) ([]string, error) { @@ -171,7 +173,7 @@ func (p *patcher) getPatchOpsOrchestrated(containerIdx int, container corev1.Con } if mutated { - return nil, fmt.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated) + return nil, errors.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated) } op := fmt.Sprintf(resourceReplaceOp, containerIdx, "limits", rfc6901Escaper.Replace(string(resourceName)), @@ -201,7 +203,7 @@ func (p *patcher) getPatchOpsOrchestrated(containerIdx int, container corev1.Con } if mutated { - return nil, fmt.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated) + return nil, errors.Errorf("Only one FPGA resource per container is supported in '%s' mode", orchestrated) } op := fmt.Sprintf(resourceReplaceOp, containerIdx, "requests", rfc6901Escaper.Replace(string(resourceName)), @@ -231,7 +233,7 @@ func (p *patcher) parseResourceName(input string) (string, string, error) { case "Region": regionName = result[num] if interfaceID, ok = p.regionMap[result[num]]; !ok { - return "", "", fmt.Errorf("Unknown region name: %s", result[num]) + return "", "", errors.Errorf("Unknown region name: %s", result[num]) } case "Af": afName = result[num] @@ -240,7 +242,7 @@ func (p *patcher) parseResourceName(input string) (string, string, error) { if afName != "" { if afuID, ok = p.afMap[regionName+"-"+afName]; !ok { - return "", "", fmt.Errorf("Unknown AF name: %s", regionName+"-"+afName) + return "", "", errors.Errorf("Unknown AF name: %s", regionName+"-"+afName) } } @@ -251,7 +253,7 @@ func getEnvVars(container corev1.Container) (string, error) { var jsonstrings []string for _, envvar := range container.Env { if envvar.Name == "FPGA_REGION" || envvar.Name == "FPGA_AFU" { - return "", fmt.Errorf("The env var '%s' is not allowed", envvar.Name) + return "", errors.Errorf("The env var '%s' is not allowed", envvar.Name) } jsonbytes, err := json.Marshal(envvar) if err != nil { diff --git a/cmd/fpga_admissionwebhook/patcher_test.go b/cmd/fpga_admissionwebhook/patcher_test.go index bf3a3bb9..faeb8f38 100644 --- a/cmd/fpga_admissionwebhook/patcher_test.go +++ b/cmd/fpga_admissionwebhook/patcher_test.go @@ -278,7 +278,7 @@ func TestGetPatchOpsOrchestrated(t *testing.T) { t.Errorf("Test case '%s': no error returned", tt.name) } if !tt.expectedErr && err != nil { - t.Errorf("Test case '%s': unexpected error %v", tt.name, err) + t.Errorf("Test case '%s': unexpected error %+v", tt.name, err) } if len(ops) != tt.expectedOps { t.Errorf("test case '%s': expected %d ops, but got %d\n%v", tt.name, tt.expectedOps, len(ops), ops) @@ -366,7 +366,7 @@ func TestGetEnvVars(t *testing.T) { t.Errorf("Test case '%s': no error returned", tt.name) } if !tt.expectedErr && err != nil { - t.Errorf("Test case '%s': unexpected error %v", tt.name, err) + t.Errorf("Test case '%s': unexpected error %+v", tt.name, err) } } } diff --git a/cmd/fpga_crihook/main.go b/cmd/fpga_crihook/main.go index d374d003..a040b9ec 100644 --- a/cmd/fpga_crihook/main.go +++ b/cmd/fpga_crihook/main.go @@ -17,7 +17,6 @@ package main import ( "bytes" "encoding/json" - "flag" "fmt" "io" "io/ioutil" @@ -26,7 +25,7 @@ import ( "regexp" "strings" - "github.com/golang/glog" + "github.com/pkg/errors" utilsexec "k8s.io/utils/exec" ) @@ -46,7 +45,7 @@ func decodeJSONStream(reader io.Reader) (map[string]interface{}, error) { decoder := json.NewDecoder(reader) content := make(map[string]interface{}) err := decoder.Decode(&content) - return content, err + return content, errors.WithStack(err) } type hookEnv struct { @@ -78,29 +77,29 @@ func canonize(uuid string) string { func (he *hookEnv) getFPGAParams(content map[string]interface{}) (*fpgaParams, error) { bundle, ok := content["bundle"] if !ok { - return nil, fmt.Errorf("no 'bundle' field in the configuration") + return nil, errors.New("no 'bundle' field in the configuration") } configPath := path.Join(fmt.Sprint(bundle), he.config) configFile, err := os.Open(configPath) if err != nil { - return nil, err + return nil, errors.WithStack(err) } defer configFile.Close() content, err = decodeJSONStream(configFile) if err != nil { - return nil, fmt.Errorf("can't decode %s", configPath) + return nil, errors.WithMessage(err, "can't decode "+configPath) } process, ok := content["process"] if !ok { - return nil, fmt.Errorf("no 'process' field found in %s", configPath) + return nil, errors.Errorf("no 'process' field found in %s", configPath) } rawEnv, ok := process.(map[string]interface{})["env"] if !ok { - return nil, fmt.Errorf("no 'env' field found in the 'process' struct in %s", configPath) + return nil, errors.Errorf("no 'env' field found in the 'process' struct in %s", configPath) } dEnv := make(map[string]string) @@ -111,22 +110,22 @@ func (he *hookEnv) getFPGAParams(content map[string]interface{}) (*fpgaParams, e fpgaRegion, ok := dEnv[fpgaRegionEnv] if !ok { - return nil, fmt.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaRegionEnv, configPath) + return nil, errors.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaRegionEnv, configPath) } fpgaAfu, ok := dEnv[fpgaAfuEnv] if !ok { - return nil, fmt.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaAfuEnv, configPath) + return nil, errors.Errorf("%s environment is not set in the 'process/env' list in %s", fpgaAfuEnv, configPath) } linux, ok := content["linux"] if !ok { - return nil, fmt.Errorf("no 'linux' field found in %s", configPath) + return nil, errors.Errorf("no 'linux' field found in %s", configPath) } rawDevices, ok := linux.(map[string]interface{})["devices"] if !ok { - return nil, fmt.Errorf("no 'devices' field found in the 'linux' struct in %s", configPath) + return nil, errors.Errorf("no 'devices' field found in the 'linux' struct in %s", configPath) } pattern := regexp.MustCompile(fpgaDevRegexp) @@ -137,48 +136,48 @@ func (he *hookEnv) getFPGAParams(content map[string]interface{}) (*fpgaParams, e } } - return nil, fmt.Errorf("no FPGA devices found in linux/devices list in %s", configPath) + return nil, errors.Errorf("no FPGA devices found in linux/devices list in %s", configPath) } func (he *hookEnv) validateBitStream(params *fpgaParams, fpgaBitStreamPath string) error { output, err := he.execer.Command("packager", "gbs-info", "--gbs", fpgaBitStreamPath).CombinedOutput() if err != nil { - return fmt.Errorf("%s/%s: can't get bitstream info: %v", params.region, params.afu, err) + return errors.Wrapf(err, "%s/%s: can't get bitstream info", params.region, params.afu) } reader := bytes.NewBuffer(output) content, err := decodeJSONStream(reader) if err != nil { - return fmt.Errorf("%s/%s: can't decode 'packager gbs-info' output: %v", params.region, params.afu, err) + return errors.WithMessage(err, fmt.Sprintf("%s/%s: can't decode 'packager gbs-info' output", params.region, params.afu)) } afuImage, ok := content["afu-image"] if !ok { - return fmt.Errorf("%s/%s: 'afu-image' field not found in the 'packager gbs-info' output", params.region, params.afu) + return errors.Errorf("%s/%s: 'afu-image' field not found in the 'packager gbs-info' output", params.region, params.afu) } interfaceUUID, ok := afuImage.(map[string]interface{})["interface-uuid"] if !ok { - return fmt.Errorf("%s/%s: 'interface-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu) + return errors.Errorf("%s/%s: 'interface-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu) } acceleratorClusters, ok := afuImage.(map[string]interface{})["accelerator-clusters"] if !ok { - return fmt.Errorf("%s/%s: 'accelerator-clusters' field not found in the 'packager gbs-info' output", params.region, params.afu) + return errors.Errorf("%s/%s: 'accelerator-clusters' field not found in the 'packager gbs-info' output", params.region, params.afu) } if canonize(interfaceUUID.(string)) != params.region { - return fmt.Errorf("bitstream is not for this device: region(%s) and interface-uuid(%s) don't match", params.region, interfaceUUID) + return errors.Errorf("bitstream is not for this device: region(%s) and interface-uuid(%s) don't match", params.region, interfaceUUID) } acceleratorTypeUUID, ok := acceleratorClusters.([]interface{})[0].(map[string]interface{})["accelerator-type-uuid"] if !ok { - return fmt.Errorf("%s/%s: 'accelerator-type-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu) + return errors.Errorf("%s/%s: 'accelerator-type-uuid' field not found in the 'packager gbs-info' output", params.region, params.afu) } if canonize(acceleratorTypeUUID.(string)) != params.afu { - return fmt.Errorf("incorrect bitstream: AFU(%s) and accelerator-type-uuid(%s) don't match", params.afu, acceleratorTypeUUID) + return errors.Errorf("incorrect bitstream: AFU(%s) and accelerator-type-uuid(%s) don't match", params.afu, acceleratorTypeUUID) } return nil @@ -187,7 +186,7 @@ func (he *hookEnv) validateBitStream(params *fpgaParams, fpgaBitStreamPath strin func (he *hookEnv) programBitStream(params *fpgaParams, fpgaBitStreamPath string) error { output, err := he.execer.Command("fpgaconf", "-S", params.devNum, fpgaBitStreamPath).CombinedOutput() if err != nil { - return fmt.Errorf("failed to program AFU %s to socket %s, region %s: error: %v, output: %s", params.afu, params.devNum, params.region, err, string(output)) + return errors.Wrapf(err, "failed to program AFU %s to socket %s, region %s: output: %s", params.afu, params.devNum, params.region, string(output)) } programmedAfu, err := he.getProgrammedAfu(params.devNum) @@ -196,7 +195,7 @@ func (he *hookEnv) programBitStream(params *fpgaParams, fpgaBitStreamPath string } if programmedAfu != params.afu { - return fmt.Errorf("programmed function %s instead of %s", programmedAfu, params.afu) + return errors.Errorf("programmed function %s instead of %s", programmedAfu, params.afu) } return nil @@ -208,7 +207,7 @@ func (he *hookEnv) getProgrammedAfu(deviceNum string) (string, error) { afuIDPath := fmt.Sprintf(he.afuIDTemplate, deviceNum, deviceNum) data, err := ioutil.ReadFile(afuIDPath) if err != nil { - return "", err + return "", errors.WithStack(err) } return strings.TrimSpace(string(data)), nil } @@ -222,7 +221,7 @@ func (he *hookEnv) process(reader io.Reader) error { // Check if device plugin annotation is set annotations, ok := content["annotations"] if !ok { - return fmt.Errorf("no 'annotations' field in the configuration") + return errors.New("no 'annotations' field in the configuration") } annotation, ok := annotations.(map[string]interface{})[annotationName] @@ -237,7 +236,7 @@ func (he *hookEnv) process(reader io.Reader) error { params, err := he.getFPGAParams(content) if err != nil { - return fmt.Errorf("couldn't get FPGA region, AFU and device number: %v, skipping", err) + return errors.WithMessage(err, "couldn't get FPGA region, AFU and device number") } programmedAfu, err := he.getProgrammedAfu(params.devNum) @@ -252,7 +251,7 @@ func (he *hookEnv) process(reader io.Reader) error { fpgaBitStreamPath := path.Join(he.bitStreamDir, params.region, params.afu+fpgaBitStreamExt) if _, err = os.Stat(fpgaBitStreamPath); os.IsNotExist(err) { - return fmt.Errorf("%s/%s: bitstream is not found", params.region, params.afu) + return errors.Errorf("%s/%s: bitstream is not found", params.region, params.afu) } err = he.validateBitStream(params, fpgaBitStreamPath) @@ -269,9 +268,6 @@ func (he *hookEnv) process(reader io.Reader) error { } func main() { - //work around glog ERROR: logging before flag.Parse: I0618 - flag.Parse() - if os.Getenv("PATH") == "" { // runc doesn't set PATH when runs hooks os.Setenv("PATH", "/sbin:/usr/sbin:/usr/local/sbin:/usr/local/bin:/usr/bin:/bin") } @@ -280,7 +276,7 @@ func main() { err := he.process(os.Stdin) if err != nil { - glog.Error(err) + fmt.Printf("%+v\n", err) os.Exit(1) } } diff --git a/cmd/fpga_crihook/main_test.go b/cmd/fpga_crihook/main_test.go index 7c7c945d..abfde06d 100644 --- a/cmd/fpga_crihook/main_test.go +++ b/cmd/fpga_crihook/main_test.go @@ -108,7 +108,7 @@ func TestGetFPGAParams(t *testing.T) { content, err := decodeJSONStream(stdin) if err != nil { - t.Fatalf("can't decode json file %s: %v", tc.stdinJSON, err) + t.Fatalf("can't decode json file %s: %+v", tc.stdinJSON, err) } he := newHookEnv("", tc.configJSON, nil, "") @@ -117,7 +117,7 @@ func TestGetFPGAParams(t *testing.T) { if err != nil { if !tc.expectedErr { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %+v", err) } } else { if params.region != tc.expectedRegion { @@ -238,7 +238,7 @@ func TestValidateBitstream(t *testing.T) { he := newHookEnv("", "", &execer, "") err := he.validateBitStream(tc.params, "") if err != nil && !tc.expectedErr { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %+v", err) } } } @@ -300,7 +300,7 @@ func TestProgramBitStream(t *testing.T) { he.execer = &fakeexec.FakeExec{CommandScript: genFakeActions(&fcmd, len(fcmd.CombinedOutputScript))} err := he.programBitStream(tc.params, "") if err != nil && !tc.expectedErr { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %+v", err) } } } @@ -416,7 +416,7 @@ func TestProcess(t *testing.T) { err = he.process(stdin) if err != nil && !tc.expectedErr { - t.Errorf("unexpected error: %v", err) + t.Errorf("unexpected error: %+v", err) } } } diff --git a/cmd/fpga_plugin/fpga_plugin.go b/cmd/fpga_plugin/fpga_plugin.go index 1f8413df..8d2a7c9d 100644 --- a/cmd/fpga_plugin/fpga_plugin.go +++ b/cmd/fpga_plugin/fpga_plugin.go @@ -25,6 +25,7 @@ import ( "time" "github.com/golang/glog" + "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -182,7 +183,7 @@ func newDevicePlugin(sysfsDir string, devfsDir string, mode string) (*devicePlug getDevTree = getRegionDevelTree ignoreAfuIDs = true default: - return nil, fmt.Errorf("Wrong mode: '%s'", mode) + return nil, errors.Errorf("Wrong mode: '%s'", mode) } return &devicePlugin{ @@ -215,8 +216,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error { for { devTree, err := dp.scanFPGAs() if err != nil { - glog.Error("Device scan failed: ", err) - return fmt.Errorf("Device scan failed: %v", err) + return err } notifier.Notify(devTree) @@ -228,7 +228,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error { func (dp *devicePlugin) getDevNode(devName string) (string, error) { devNode := path.Join(dp.devfsDir, devName) if _, err := os.Stat(devNode); err != nil { - return "", fmt.Errorf("Device %s doesn't exist: %v", devNode, err) + return "", errors.Wrapf(err, "Device %s doesn't exist", devNode) } return devNode, nil @@ -241,7 +241,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) { fpgaFiles, err := ioutil.ReadDir(dp.sysfsDir) if err != nil { - return nil, fmt.Errorf("Can't read sysfs folder: %v", err) + return nil, errors.Wrap(err, "Can't read sysfs folder") } for _, fpgaFile := range fpgaFiles { @@ -254,7 +254,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) { deviceFolder := path.Join(dp.sysfsDir, fname) deviceFiles, err := ioutil.ReadDir(deviceFolder) if err != nil { - return nil, err + return nil, errors.WithStack(err) } regions, afus, err := dp.getSysFsInfo(deviceFolder, deviceFiles, fname) @@ -263,7 +263,7 @@ func (dp *devicePlugin) scanFPGAs() (dpapi.DeviceTree, error) { } if len(regions) == 0 { - return nil, fmt.Errorf("No regions found for device %s", fname) + return nil, errors.Errorf("No regions found for device %s", fname) } // Currently only one region per device is supported. @@ -285,12 +285,12 @@ func (dp *devicePlugin) getSysFsInfo(deviceFolder string, deviceFiles []os.FileI if dp.fmeReg.MatchString(name) { if len(regions) > 0 { - return nil, nil, fmt.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname) + return nil, nil, errors.Errorf("Detected more than one FPGA region for device %s. Only one region per FPGA device is supported", fname) } interfaceIDFile := path.Join(deviceFolder, name, "pr", "interface_id") data, err := ioutil.ReadFile(interfaceIDFile) if err != nil { - return nil, nil, err + return nil, nil, errors.WithStack(err) } devNode, err := dp.getDevNode(name) if err != nil { @@ -310,7 +310,7 @@ func (dp *devicePlugin) getSysFsInfo(deviceFolder string, deviceFiles []os.FileI afuFile := path.Join(deviceFolder, name, "afu_id") data, err := ioutil.ReadFile(afuFile) if err != nil { - return nil, nil, err + return nil, nil, errors.WithStack(err) } afuID = strings.TrimSpace(string(data)) } @@ -329,6 +329,11 @@ func (dp *devicePlugin) getSysFsInfo(deviceFolder string, deviceFiles []os.FileI return regions, afus, nil } +func fatal(err error) { + fmt.Printf("ERROR: %+v\n", err) + os.Exit(1) +} + func main() { var mode string var kubeconfig string @@ -348,7 +353,7 @@ func main() { config, err = clientcmd.BuildConfigFromFlags(master, kubeconfig) } if err != nil { - glog.Fatal(err) + fatal(err) } // if NODE_NAME is not set then try to use hostname @@ -356,12 +361,12 @@ func main() { clientset, err := kubernetes.NewForConfig(config) if err != nil { - glog.Fatal(err) + fatal(err) } node, err := clientset.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{}) if err != nil { - glog.Fatal(err) + fatal(err) } if nodeMode, ok := node.ObjectMeta.Annotations["fpga.intel.com/device-plugin-mode"]; ok { @@ -371,8 +376,7 @@ func main() { plugin, err := newDevicePlugin(sysfsDirectory, devfsDirectory, mode) if err != nil { - glog.Error(err) - os.Exit(1) + fatal(err) } glog.Info("FPGA device plugin started in ", mode, " mode") diff --git a/cmd/fpga_plugin/fpga_plugin_test.go b/cmd/fpga_plugin/fpga_plugin_test.go index 68fcb440..26d1a283 100644 --- a/cmd/fpga_plugin/fpga_plugin_test.go +++ b/cmd/fpga_plugin/fpga_plugin_test.go @@ -24,6 +24,8 @@ import ( "testing" "time" + "github.com/pkg/errors" + pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" dpapi "github.com/intel/intel-device-plugins-for-kubernetes/internal/deviceplugin" @@ -35,19 +37,19 @@ func createTestDirs(devfs, sysfs string, devfsDirs, sysfsDirs []string, sysfsFil for _, devfsdir := range devfsDirs { err = os.MkdirAll(path.Join(devfs, devfsdir), 0755) if err != nil { - return fmt.Errorf("Failed to create fake device directory: %v", err) + return errors.Wrap(err, "Failed to create fake device directory") } } for _, sysfsdir := range sysfsDirs { err = os.MkdirAll(path.Join(sysfs, sysfsdir), 0755) if err != nil { - return fmt.Errorf("Failed to create fake device directory: %v", err) + return errors.Wrap(err, "Failed to create fake device directory") } } for filename, body := range sysfsFiles { err = ioutil.WriteFile(path.Join(sysfs, filename), body, 0644) if err != nil { - return fmt.Errorf("Failed to create fake vendor file: %v", err) + return errors.Wrap(err, "Failed to create fake vendor file") } } @@ -326,12 +328,12 @@ func TestScanFPGAs(t *testing.T) { for _, tcase := range tcases { err := createTestDirs(devfs, sysfs, tcase.devfsdirs, tcase.sysfsdirs, tcase.sysfsfiles) if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } plugin, err := newDevicePlugin(sysfs, devfs, tcase.mode) if err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } plugin.getDevTree = func(devices []device) dpapi.DeviceTree { return dpapi.NewDeviceTree() @@ -343,7 +345,7 @@ func TestScanFPGAs(t *testing.T) { t.Errorf("Test case '%s': expected error '%s', but got '%v'", tcase.name, tcase.errorContains, err) } } else if err != nil { - t.Errorf("Test case '%s': expected no error, but got '%v'", tcase.name, err) + t.Errorf("Test case '%s': expected no error, but got '%+v'", tcase.name, err) } err = os.RemoveAll(tmpdir) diff --git a/cmd/gpu_plugin/gpu_plugin.go b/cmd/gpu_plugin/gpu_plugin.go index 9aa5e7ee..dfd5166e 100644 --- a/cmd/gpu_plugin/gpu_plugin.go +++ b/cmd/gpu_plugin/gpu_plugin.go @@ -25,6 +25,7 @@ import ( "time" "github.com/golang/glog" + "github.com/pkg/errors" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" @@ -67,8 +68,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error { for { devTree, err := dp.scan() if err != nil { - glog.Error("Device scan failed: ", err) - return fmt.Errorf("Device scan failed: %v", err) + return err } notifier.Notify(devTree) @@ -80,7 +80,7 @@ func (dp *devicePlugin) Scan(notifier dpapi.Notifier) error { func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) { files, err := ioutil.ReadDir(dp.sysfsDir) if err != nil { - return nil, fmt.Errorf("Can't read sysfs folder: %v", err) + return nil, errors.Wrap(err, "Can't read sysfs folder") } devTree := dpapi.NewDeviceTree() @@ -97,7 +97,7 @@ func (dp *devicePlugin) scan() (dpapi.DeviceTree, error) { drmFiles, err := ioutil.ReadDir(path.Join(dp.sysfsDir, f.Name(), "device/drm")) if err != nil { - return nil, fmt.Errorf("Can't read device folder: %v", err) + return nil, errors.Wrap(err, "Can't read device folder") } for _, drmFile := range drmFiles { diff --git a/cmd/gpu_plugin/gpu_plugin_test.go b/cmd/gpu_plugin/gpu_plugin_test.go index 3fb59c64..5278a125 100644 --- a/cmd/gpu_plugin/gpu_plugin_test.go +++ b/cmd/gpu_plugin/gpu_plugin_test.go @@ -105,6 +105,9 @@ func TestScan(t *testing.T) { if tcase.expectedErr && err == nil { t.Error("Expected error hasn't been triggered") } + if !tcase.expectedErr && err != nil { + t.Errorf("Unexpcted error: %+v", err) + } if tcase.expectedDevs != len(tree[deviceType]) { t.Errorf("Wrong number of discovered devices") } diff --git a/cmd/qat_plugin/qat_plugin.go b/cmd/qat_plugin/qat_plugin.go index b055b624..a8487ced 100644 --- a/cmd/qat_plugin/qat_plugin.go +++ b/cmd/qat_plugin/qat_plugin.go @@ -26,6 +26,7 @@ import ( "time" "github.com/golang/glog" + "github.com/pkg/errors" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" @@ -71,8 +72,7 @@ func (dp *devicePlugin) Scan(notifier deviceplugin.Notifier) error { for { devTree, err := dp.scan() if err != nil { - glog.Error("Device scan failed: ", err) - return fmt.Errorf("Device scan failed: %v", err) + return err } notifier.Notify(devTree) @@ -93,7 +93,7 @@ func (dp *devicePlugin) getDpdkDevice(id string) (string, error) { return "", err } if len(files) == 0 { - return "", fmt.Errorf("No devices found") + return "", errors.New("No devices found") } return files[0].Name(), nil @@ -101,20 +101,20 @@ func (dp *devicePlugin) getDpdkDevice(id string) (string, error) { vfioDirPath := path.Join(dp.pciDeviceDir, devicePCIAdd, iommuGroupSuffix) group, err := filepath.EvalSymlinks(vfioDirPath) if err != nil { - return "", err + return "", errors.WithStack(err) } s := strings.TrimPrefix(group, sysfsIommuGroupPrefix) fmt.Printf("The vfio device group detected is %v\n", s) return s, nil } - return "", fmt.Errorf("Unknown DPDK driver") + return "", errors.New("Unknown DPDK driver") } func (dp *devicePlugin) getDpdkDeviceNames(id string) ([]string, error) { dpdkDeviceName, err := dp.getDpdkDevice(id) if err != nil { - return []string{}, fmt.Errorf("Unable to get the dpdk device for creating device nodes: %v", err) + return []string{}, err } fmt.Printf("%s device: corresponding DPDK device detected is %s\n", id, dpdkDeviceName) @@ -131,13 +131,13 @@ func (dp *devicePlugin) getDpdkDeviceNames(id string) ([]string, error) { return []string{vfioDev1, vfioDev2}, nil } - return []string{}, fmt.Errorf("Unknown DPDK driver") + return []string{}, errors.New("Unknown DPDK driver") } func (dp *devicePlugin) getDpdkMountPaths(id string) ([]string, error) { dpdkDeviceName, err := dp.getDpdkDevice(id) if err != nil { - return []string{}, fmt.Errorf("Unable to get the dpdk device for mountPath: %v", err) + return []string{}, err } switch dp.dpdkDriver { @@ -150,13 +150,13 @@ func (dp *devicePlugin) getDpdkMountPaths(id string) ([]string, error) { return []string{}, nil } - return nil, fmt.Errorf("Unknown DPDK driver") + return nil, errors.New("Unknown DPDK driver") } func (dp *devicePlugin) getDeviceID(pciAddr string) (string, error) { devID, err := ioutil.ReadFile(path.Join(dp.pciDeviceDir, pciAddr, "device")) if err != nil { - return "", fmt.Errorf("Cannot obtain ID for the device %s: %v", pciAddr, err) + return "", errors.Wrapf(err, "Cannot obtain ID for the device %s", pciAddr) } return strings.TrimPrefix(string(bytes.TrimSpace(devID)), "0x"), nil @@ -170,19 +170,19 @@ func (dp *devicePlugin) bindDevice(id string) error { // Unbind from the kernel driver err := ioutil.WriteFile(unbindDevicePath, []byte(devicePCIAddr), 0644) if err != nil { - return fmt.Errorf("Unbinding from kernel driver failed for the device %s: %v", id, err) + return errors.Wrapf(err, "Unbinding from kernel driver failed for the device %s: %v", id) } vfdevID, err := dp.getDeviceID(devicePCIAddr) if err != nil { - return fmt.Errorf("Cannot obtain ID for the device %s: %v", id, err) + return err } bindDevicePath := path.Join(dp.pciDriverDir, dp.dpdkDriver, newIDSuffix) //Bind to the the dpdk driver err = ioutil.WriteFile(bindDevicePath, []byte(vendorPrefix+vfdevID), 0644) if err != nil { - return fmt.Errorf("Binding to the DPDK driver failed for the device %s: %v", id, err) + return errors.Wrapf(err, "Binding to the DPDK driver failed for the device %s: %v", id) } return nil @@ -210,7 +210,7 @@ func (dp *devicePlugin) scan() (deviceplugin.DeviceTree, error) { for _, driver := range append(dp.kernelVfDrivers, dp.dpdkDriver) { files, err := ioutil.ReadDir(path.Join(dp.pciDriverDir, driver)) if err != nil { - return nil, fmt.Errorf("Can't read sysfs for driver %s: %+v", driver, err) + return nil, errors.Wrapf(err, "Can't read sysfs for driver %s", driver) } n := 0 @@ -230,17 +230,17 @@ func (dp *devicePlugin) scan() (deviceplugin.DeviceTree, error) { if driver != dp.dpdkDriver { err := dp.bindDevice(vfpciaddr) if err != nil { - return nil, fmt.Errorf("Error in binding the device to the dpdk driver: %+v", err) + return nil, err } } devNodes, err := dp.getDpdkDeviceNames(vfpciaddr) if err != nil { - return nil, fmt.Errorf("Error in obtaining the device name: %+v", err) + return nil, err } devMounts, err := dp.getDpdkMountPaths(vfpciaddr) if err != nil { - return nil, fmt.Errorf("Error in obtaining the mount point: %+v", err) + return nil, err } devinfo := deviceplugin.DeviceInfo{ diff --git a/cmd/qat_plugin/qat_plugin_test.go b/cmd/qat_plugin/qat_plugin_test.go index dbe6650d..bcac7e9a 100644 --- a/cmd/qat_plugin/qat_plugin_test.go +++ b/cmd/qat_plugin/qat_plugin_test.go @@ -21,19 +21,21 @@ import ( "path" "testing" "time" + + "github.com/pkg/errors" ) func createTestFiles(prefix string, dirs []string, files map[string][]byte) error { for _, dir := range dirs { err := os.MkdirAll(path.Join(prefix, dir), 0755) if err != nil { - return fmt.Errorf("Failed to create fake device directory: %v", err) + return errors.Wrap(err, "Failed to create fake device directory") } } for filename, body := range files { err := ioutil.WriteFile(path.Join(prefix, filename), body, 0644) if err != nil { - return fmt.Errorf("Failed to create fake vendor file: %v", err) + return errors.Wrap(err, "Failed to create fake vendor file") } } @@ -163,7 +165,7 @@ func TestScanPrivate(t *testing.T) { t.Fatal(err) } if err := createTestFiles(tmpdir, tt.dirs, tt.files); err != nil { - t.Fatal(err) + t.Fatalf("%+v", err) } dp := &devicePlugin{ diff --git a/internal/deviceplugin/manager.go b/internal/deviceplugin/manager.go index 4b7e4555..458dd861 100644 --- a/internal/deviceplugin/manager.go +++ b/internal/deviceplugin/manager.go @@ -119,7 +119,8 @@ func (m *Manager) handleUpdate(update updateInfo) { go func() { err := m.servers[devType].Serve(m.namespace) if err != nil { - glog.Fatal(err) + fmt.Printf("Failed to serve %s/%s: %+v\n", m.namespace, devType, err) + os.Exit(1) } }() m.servers[devType].Update(devices) diff --git a/internal/deviceplugin/server.go b/internal/deviceplugin/server.go index 7c655af2..6b269279 100644 --- a/internal/deviceplugin/server.go +++ b/internal/deviceplugin/server.go @@ -25,6 +25,7 @@ import ( "github.com/fsnotify/fsnotify" "github.com/golang/glog" + "github.com/pkg/errors" "google.golang.org/grpc" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" @@ -71,7 +72,7 @@ func (srv *server) sendDevices(stream pluginapi.DevicePlugin_ListAndWatchServer) glog.V(2).Info("Sending to kubelet ", resp.Devices) if err := stream.Send(resp); err != nil { srv.Stop() - return fmt.Errorf("device-plugin: cannot update device list: %v", err) + return errors.Wrapf(err, "Cannot update device list") } return nil @@ -100,10 +101,10 @@ func (srv *server) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) for _, id := range crqt.DevicesIDs { dev, ok := srv.devices[id] if !ok { - return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id) + return nil, errors.Errorf("Invalid allocation request with non-existing device %s", id) } if dev.State != pluginapi.Healthy { - return nil, fmt.Errorf("Invalid allocation request with unhealthy device %s", id) + return nil, errors.Errorf("Invalid allocation request with unhealthy device %s", id) } for _, devnode := range dev.Nodes { cresp.Devices = append(cresp.Devices, &pluginapi.DeviceSpec{ @@ -139,8 +140,7 @@ func (srv *server) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) } func (srv *server) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStartContainerRequest) (*pluginapi.PreStartContainerResponse, error) { - glog.Warning("PreStartContainer() should not be called") - return new(pluginapi.PreStartContainerResponse), nil + return nil, errors.New("PreStartContainer() should not be called") } // Serve starts a gRPC server to serve pluginapi.PluginInterfaceServer interface. @@ -151,7 +151,7 @@ func (srv *server) Serve(namespace string) error { // Stop stops serving pluginapi.PluginInterfaceServer interface. func (srv *server) Stop() error { if srv.grpcServer == nil { - return fmt.Errorf("Can't stop non-existing gRPC server. Calling Stop() before Serve()?") + return errors.New("Can't stop non-existing gRPC server. Calling Stop() before Serve()?") } srv.grpcServer.Stop() close(srv.updatesCh) @@ -173,13 +173,13 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube pluginSocket := path.Join(devicePluginPath, pluginEndpoint) if err := waitForServer(pluginSocket, time.Second); err == nil { - return fmt.Errorf("Socket %s is already in use", pluginSocket) + return errors.Errorf("Socket %s is already in use", pluginSocket) } os.Remove(pluginSocket) lis, err := net.Listen("unix", pluginSocket) if err != nil { - return fmt.Errorf("Failed to listen to plugin socket: %+v", err) + return errors.Wrap(err, "Failed to listen to plugin socket") } srv.grpcServer = grpc.NewServer() @@ -193,20 +193,20 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube // Wait for the server to start if err = waitForServer(pluginSocket, 10*time.Second); err != nil { - return fmt.Errorf("Failed to wait for plugin socket: %+v", err) + return err } // Register with Kubelet. err = registerWithKubelet(kubeletSocket, pluginEndpoint, resourceName) if err != nil { - return fmt.Errorf("Failed to register: %+v", err) + return err } fmt.Println("device-plugin registered") // Kubelet removes plugin socket when it (re)starts // plugin must restart in this case if err = watchFile(pluginSocket); err != nil { - return fmt.Errorf("error watching plugin socket: %+v", err) + return err } fmt.Printf("socket %s removed, restarting", pluginSocket) @@ -220,13 +220,13 @@ func (srv *server) setupAndServe(namespace string, devicePluginPath string, kube func watchFile(file string) error { watcher, err := fsnotify.NewWatcher() if err != nil { - return err + return errors.Wrapf(err, "Failed to create watcher for %s", file) } defer watcher.Close() err = watcher.Add(filepath.Dir(file)) if err != nil { - return err + return errors.Wrapf(err, "Failed to add %s to watcher", file) } for { @@ -236,7 +236,7 @@ func watchFile(file string) error { return nil } case err := <-watcher.Errors: - return err + return errors.WithStack(err) } } } @@ -248,7 +248,7 @@ func registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) err })) defer conn.Close() if err != nil { - return fmt.Errorf("device-plugin: cannot connect to kubelet service: %v", err) + return errors.Wrap(err, "Cannot connect to kubelet service") } client := pluginapi.NewRegistrationClient(conn) reqt := &pluginapi.RegisterRequest{ @@ -259,7 +259,7 @@ func registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) err _, err = client.Register(context.Background(), reqt) if err != nil { - return fmt.Errorf("device-plugin: cannot register to kubelet service: %v", err) + return errors.Wrap(err, "Cannot register to kubelet service") } return nil @@ -278,5 +278,5 @@ func waitForServer(socket string, timeout time.Duration) error { if conn != nil { conn.Close() } - return err + return errors.Wrapf(err, "Failed dial context at %s", socket) } diff --git a/internal/deviceplugin/server_test.go b/internal/deviceplugin/server_test.go index 39b1a743..d58a32e9 100644 --- a/internal/deviceplugin/server_test.go +++ b/internal/deviceplugin/server_test.go @@ -28,6 +28,8 @@ import ( "google.golang.org/grpc/metadata" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + + "github.com/pkg/errors" ) const ( @@ -65,8 +67,7 @@ func (k *kubeletStub) start() error { os.Remove(k.socket) s, err := net.Listen("unix", k.socket) if err != nil { - fmt.Printf("Can't listen at the socket: %+v", err) - return err + return errors.Wrap(err, "Can't listen at the socket") } k.server = grpc.NewServer() @@ -89,7 +90,10 @@ func TestRegisterWithKublet(t *testing.T) { } kubelet := newKubeletStub(kubeletSocket) - kubelet.start() + err = kubelet.start() + if err != nil { + t.Fatalf("%+v", err) + } defer kubelet.server.Stop() err = registerWithKubelet(kubeletSocket, pluginSocket, resourceName) @@ -405,7 +409,7 @@ func TestListAndWatch(t *testing.T) { err := testServer.ListAndWatch(&pluginapi.Empty{}, server) if err != nil && tt.errorOnCall == 0 { - t.Errorf("Test case '%s': got unexpected error %v", tt.name, err) + t.Errorf("Test case '%s': got unexpected error %+v", tt.name, err) } if err == nil && tt.errorOnCall != 0 { t.Errorf("Test case '%s': expected an error, but got nothing", tt.name)