api: add PreferredAllocator interface

This adds a PreferredAllocator interface so that plugins can
optionally implement the API.

Signed-off-by: Ukri Niemimuukko <ukri.niemimuukko@intel.com>
This commit is contained in:
Ukri Niemimuukko 2020-12-28 16:44:21 +02:00
parent 5d80f2da3c
commit 4c92093ca7
5 changed files with 44 additions and 23 deletions

View File

@ -96,6 +96,12 @@ type PostAllocator interface {
PostAllocate(*pluginapi.AllocateResponse) error
}
// PreferredAllocator is an optional interface implemented by device plugins.
type PreferredAllocator interface {
// GetPreferredAllocation defines the list of devices preferred for allocating next.
GetPreferredAllocation(*pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error)
}
// ContainerPreStarter is an optional interface implemented by device plugins.
type ContainerPreStarter interface {
// PreStartContainer defines device initialization function before container is started.

View File

@ -22,6 +22,10 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
)
type postAllocateFunc func(*pluginapi.AllocateResponse) error
type preStartContainerFunc func(*pluginapi.PreStartContainerRequest) error
type getPreferredAllocationFunc func(*pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error)
// updateInfo contains info for added, updated and deleted devices.
type updateInfo struct {
Added DeviceTree
@ -73,7 +77,7 @@ type Manager struct {
devicePlugin Scanner
namespace string
servers map[string]devicePluginServer
createServer func(string, func(*pluginapi.AllocateResponse) error, func(*pluginapi.PreStartContainerRequest) error) devicePluginServer
createServer func(string, postAllocateFunc, preStartContainerFunc, getPreferredAllocationFunc) devicePluginServer
}
// NewManager creates a new instance of Manager.
@ -107,8 +111,9 @@ func (m *Manager) Run() {
func (m *Manager) handleUpdate(update updateInfo) {
klog.V(4).Info("Received dev updates:", update)
for devType, devices := range update.Added {
var postAllocate func(*pluginapi.AllocateResponse) error
var preStartContainer func(*pluginapi.PreStartContainerRequest) error
var postAllocate postAllocateFunc
var preStartContainer preStartContainerFunc
var getPreferredAllocation getPreferredAllocationFunc
if postAllocator, ok := m.devicePlugin.(PostAllocator); ok {
postAllocate = postAllocator.PostAllocate
@ -118,7 +123,11 @@ func (m *Manager) handleUpdate(update updateInfo) {
preStartContainer = containerPreStarter.PreStartContainer
}
m.servers[devType] = m.createServer(devType, postAllocate, preStartContainer)
if preferredAllocator, ok := m.devicePlugin.(PreferredAllocator); ok {
getPreferredAllocation = preferredAllocator.GetPreferredAllocation
}
m.servers[devType] = m.createServer(devType, postAllocate, preStartContainer, getPreferredAllocation)
go func(dt string) {
err := m.servers[dt].Serve(m.namespace)
if err != nil {

View File

@ -265,7 +265,7 @@ func TestHandleUpdate(t *testing.T) {
mgr := Manager{
devicePlugin: &devicePluginStub{},
servers: tt.servers,
createServer: func(string, func(*pluginapi.AllocateResponse) error, func(*pluginapi.PreStartContainerRequest) error) devicePluginServer {
createServer: func(string, postAllocateFunc, preStartContainerFunc, getPreferredAllocationFunc) devicePluginServer {
return &serverStub{}
},
}
@ -279,7 +279,7 @@ func TestHandleUpdate(t *testing.T) {
func TestRun(t *testing.T) {
mgr := NewManager("testnamespace", &devicePluginStub{})
mgr.createServer = func(string, func(*pluginapi.AllocateResponse) error, func(*pluginapi.PreStartContainerRequest) error) devicePluginServer {
mgr.createServer = func(string, postAllocateFunc, preStartContainerFunc, getPreferredAllocationFunc) devicePluginServer {
return &serverStub{}
}
mgr.Run()

View File

@ -51,27 +51,30 @@ type devicePluginServer interface {
// server implements devicePluginServer and pluginapi.PluginInterfaceServer interfaces.
type server struct {
devType string
grpcServer *grpc.Server
updatesCh chan map[string]DeviceInfo
devices map[string]DeviceInfo
postAllocate func(*pluginapi.AllocateResponse) error
preStartContainer func(*pluginapi.PreStartContainerRequest) error
state serverState
stateMutex sync.Mutex
devType string
grpcServer *grpc.Server
updatesCh chan map[string]DeviceInfo
devices map[string]DeviceInfo
postAllocate postAllocateFunc
preStartContainer preStartContainerFunc
getPreferredAllocation getPreferredAllocationFunc
state serverState
stateMutex sync.Mutex
}
// newServer creates a new server satisfying the devicePluginServer interface.
func newServer(devType string,
postAllocate func(*pluginapi.AllocateResponse) error,
preStartContainer func(*pluginapi.PreStartContainerRequest) error) devicePluginServer {
postAllocate postAllocateFunc,
preStartContainer preStartContainerFunc,
getPreferredAllocation getPreferredAllocationFunc) devicePluginServer {
return &server{
devType: devType,
updatesCh: make(chan map[string]DeviceInfo, 1), // TODO: is 1 needed?
devices: make(map[string]DeviceInfo),
postAllocate: postAllocate,
preStartContainer: preStartContainer,
state: uninitialized,
devType: devType,
updatesCh: make(chan map[string]DeviceInfo, 1), // TODO: is 1 needed?
devices: make(map[string]DeviceInfo),
postAllocate: postAllocate,
preStartContainer: preStartContainer,
getPreferredAllocation: getPreferredAllocation,
state: uninitialized,
}
}
@ -162,6 +165,9 @@ func (srv *server) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStar
}
func (srv *server) GetPreferredAllocation(ctx context.Context, rqt *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
if srv.getPreferredAllocation != nil {
return srv.getPreferredAllocation(rqt)
}
return nil, errors.New("GetPreferredAllocation should not be called as this device plugin doesn't implement it")
}

View File

@ -530,7 +530,7 @@ func TestGetPreferredAllocation(t *testing.T) {
}
func TestNewServer(t *testing.T) {
_ = newServer("test", nil, nil)
_ = newServer("test", nil, nil, nil)
}
func TestUpdate(t *testing.T) {