intel-device-plugins-for-ku.../internal/deviceplugin/deviceplugin.go
Ed Bartosh 7310a98343 fix golint warnings
Fixed the following golint warnings:
./cmd/fpga_plugin/fpga_plugin.go:71:2: struct field fpgaId should be fpgaID
./cmd/fpga_plugin/fpga_plugin.go:78:44: func parameter fpgaId should be fpgaID
./cmd/fpga_plugin/fpga_plugin.go:104:8: var interfaceId should be interfaceID
./cmd/fpga_plugin/fpga_plugin.go:120:7: var interfaceIdFile should be interfaceIDFile
./cmd/fpga_plugin/fpga_plugin.go:156:8: range var fpgaId should be fpgaID
./cmd/fpga_plugin/fpga_plugin.go:254:6: range var fpgaId should be fpgaID
./cmd/fpga_plugin/fpga_plugin.go:254:14: should omit 2nd value from range; this loop is equivalent to `for fpgaId := range ...`
./internal/deviceplugin/deviceplugin.go:30:6: exported type DeviceInfo should have comment or be unexported
./internal/deviceplugin/deviceplugin.go:35:6: exported type Server should have comment or be unexported
./internal/deviceplugin/deviceplugin.go:39:1: exported method Server.Serve should have comment or be unexported
./internal/deviceplugin/deviceplugin.go:43:1: exported method Server.Stop should have comment or be unexported
2018-05-28 16:53:37 +03:00

170 lines
5.3 KiB
Go

// Copyright 2017 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package deviceplugin
import (
"context"
"fmt"
"net"
"os"
"path"
"time"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
)
// DeviceInfo contains information about device maintained by Device Plugin
type DeviceInfo struct {
State string
Nodes []string
}
// Server structure to keep server parameters
type Server struct {
grpcServer *grpc.Server
}
// Serve serves starts gRPC server to serve Device Plugin functionality
func (srv *Server) Serve(dm pluginapi.DevicePluginServer, resourceName string, pluginPrefix string) error {
return srv.setupAndServe(dm, resourceName, pluginPrefix, pluginapi.DevicePluginPath, pluginapi.KubeletSocket)
}
// Stop stops serving Device Plugin
func (srv *Server) Stop() error {
if srv.grpcServer == nil {
return fmt.Errorf("Can't stop non-existing gRPC server. Calling Stop() before Serve()?")
}
srv.grpcServer.Stop()
return nil
}
// setupAndServe binds given gRPC server to device manager, starts it and registers it with kubelet.
func (srv *Server) setupAndServe(dm pluginapi.DevicePluginServer, resourceName string, pluginPrefix string, devicePluginPath string, kubeletSocket string) error {
for {
pluginEndpoint := pluginPrefix + ".sock"
pluginSocket := path.Join(devicePluginPath, pluginEndpoint)
if err := waitForServer(pluginSocket, time.Second); err == nil {
return fmt.Errorf("Socket %s is already in use", pluginSocket)
}
os.Remove(pluginSocket)
lis, err := net.Listen("unix", pluginSocket)
if err != nil {
return fmt.Errorf("Failed to listen to plugin socket: %+v", err)
}
srv.grpcServer = grpc.NewServer()
pluginapi.RegisterDevicePluginServer(srv.grpcServer, dm)
// Starts device plugin service.
go func() {
fmt.Printf("device-plugin start server at: %s\n", pluginSocket)
srv.grpcServer.Serve(lis)
}()
// Wait for the server to start
if err = waitForServer(pluginSocket, 10*time.Second); err != nil {
return fmt.Errorf("Failed to wait for plugin socket: %+v", err)
}
// Register with Kubelet.
err = registerWithKubelet(kubeletSocket, pluginEndpoint, resourceName)
if err != nil {
return fmt.Errorf("Failed to register: %+v", err)
}
fmt.Println("device-plugin registered")
// Kubelet removes plugin socket when it (re)starts
// plugin must restart in this case
for {
if _, err := os.Stat(pluginSocket); os.IsNotExist(err) {
fmt.Println("plugin socket removed, stop server")
srv.grpcServer.Stop()
break
}
time.Sleep(1 * time.Second)
}
}
}
func registerWithKubelet(kubeletSocket, pluginEndPoint, resourceName string) error {
conn, err := grpc.Dial(kubeletSocket, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}))
defer conn.Close()
if err != nil {
return fmt.Errorf("device-plugin: cannot connect to kubelet service: %v", err)
}
client := pluginapi.NewRegistrationClient(conn)
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: pluginEndPoint,
ResourceName: resourceName,
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return fmt.Errorf("device-plugin: cannot register to kubelet service: %v", err)
}
return nil
}
// waitForServer checks if grpc server is alive
// by making grpc blocking connection to the server socket
func waitForServer(socket string, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
conn, err := grpc.DialContext(ctx, socket, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if conn != nil {
conn.Close()
}
return err
}
// MakeAllocateResponse creates response data for Allocate GRPC call
func MakeAllocateResponse(rqt *pluginapi.AllocateRequest, devices map[string]DeviceInfo) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.AllocateResponse)
for _, crqt := range rqt.ContainerRequests {
cresp := new(pluginapi.ContainerAllocateResponse)
for _, id := range crqt.DevicesIDs {
dev, ok := devices[id]
if !ok {
return nil, fmt.Errorf("Invalid allocation request with non-existing device %s", id)
}
if dev.State != pluginapi.Healthy {
return nil, fmt.Errorf("Invalid allocation request with unhealthy device %s", id)
}
for _, devnode := range dev.Nodes {
cresp.Devices = append(cresp.Devices, &pluginapi.DeviceSpec{
HostPath: devnode,
ContainerPath: devnode,
Permissions: "mrw",
})
}
}
resp.ContainerResponses = append(resp.ContainerResponses, cresp)
}
return resp, nil
}