Merge pull request #535 from uniemimu/preferred

api: add PreferredAllocator interface
This commit is contained in:
Dmitry Rozhkov 2020-12-29 17:39:45 +02:00 committed by GitHub
commit 231df24a2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 23 deletions

View File

@ -96,6 +96,12 @@ type PostAllocator interface {
PostAllocate(*pluginapi.AllocateResponse) error PostAllocate(*pluginapi.AllocateResponse) error
} }
// PreferredAllocator is an optional interface implemented by device plugins.
type PreferredAllocator interface {
// GetPreferredAllocation defines the list of devices preferred for allocating next.
GetPreferredAllocation(*pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error)
}
// ContainerPreStarter is an optional interface implemented by device plugins. // ContainerPreStarter is an optional interface implemented by device plugins.
type ContainerPreStarter interface { type ContainerPreStarter interface {
// PreStartContainer defines device initialization function before container is started. // PreStartContainer defines device initialization function before container is started.

View File

@ -22,6 +22,10 @@ import (
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
) )
type postAllocateFunc func(*pluginapi.AllocateResponse) error
type preStartContainerFunc func(*pluginapi.PreStartContainerRequest) error
type getPreferredAllocationFunc func(*pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error)
// updateInfo contains info for added, updated and deleted devices. // updateInfo contains info for added, updated and deleted devices.
type updateInfo struct { type updateInfo struct {
Added DeviceTree Added DeviceTree
@ -73,7 +77,7 @@ type Manager struct {
devicePlugin Scanner devicePlugin Scanner
namespace string namespace string
servers map[string]devicePluginServer servers map[string]devicePluginServer
createServer func(string, func(*pluginapi.AllocateResponse) error, func(*pluginapi.PreStartContainerRequest) error) devicePluginServer createServer func(string, postAllocateFunc, preStartContainerFunc, getPreferredAllocationFunc) devicePluginServer
} }
// NewManager creates a new instance of Manager. // NewManager creates a new instance of Manager.
@ -107,8 +111,9 @@ func (m *Manager) Run() {
func (m *Manager) handleUpdate(update updateInfo) { func (m *Manager) handleUpdate(update updateInfo) {
klog.V(4).Info("Received dev updates:", update) klog.V(4).Info("Received dev updates:", update)
for devType, devices := range update.Added { for devType, devices := range update.Added {
var postAllocate func(*pluginapi.AllocateResponse) error var postAllocate postAllocateFunc
var preStartContainer func(*pluginapi.PreStartContainerRequest) error var preStartContainer preStartContainerFunc
var getPreferredAllocation getPreferredAllocationFunc
if postAllocator, ok := m.devicePlugin.(PostAllocator); ok { if postAllocator, ok := m.devicePlugin.(PostAllocator); ok {
postAllocate = postAllocator.PostAllocate postAllocate = postAllocator.PostAllocate
@ -118,7 +123,11 @@ func (m *Manager) handleUpdate(update updateInfo) {
preStartContainer = containerPreStarter.PreStartContainer preStartContainer = containerPreStarter.PreStartContainer
} }
m.servers[devType] = m.createServer(devType, postAllocate, preStartContainer) if preferredAllocator, ok := m.devicePlugin.(PreferredAllocator); ok {
getPreferredAllocation = preferredAllocator.GetPreferredAllocation
}
m.servers[devType] = m.createServer(devType, postAllocate, preStartContainer, getPreferredAllocation)
go func(dt string) { go func(dt string) {
err := m.servers[dt].Serve(m.namespace) err := m.servers[dt].Serve(m.namespace)
if err != nil { if err != nil {

View File

@ -265,7 +265,7 @@ func TestHandleUpdate(t *testing.T) {
mgr := Manager{ mgr := Manager{
devicePlugin: &devicePluginStub{}, devicePlugin: &devicePluginStub{},
servers: tt.servers, servers: tt.servers,
createServer: func(string, func(*pluginapi.AllocateResponse) error, func(*pluginapi.PreStartContainerRequest) error) devicePluginServer { createServer: func(string, postAllocateFunc, preStartContainerFunc, getPreferredAllocationFunc) devicePluginServer {
return &serverStub{} return &serverStub{}
}, },
} }
@ -279,7 +279,7 @@ func TestHandleUpdate(t *testing.T) {
func TestRun(t *testing.T) { func TestRun(t *testing.T) {
mgr := NewManager("testnamespace", &devicePluginStub{}) mgr := NewManager("testnamespace", &devicePluginStub{})
mgr.createServer = func(string, func(*pluginapi.AllocateResponse) error, func(*pluginapi.PreStartContainerRequest) error) devicePluginServer { mgr.createServer = func(string, postAllocateFunc, preStartContainerFunc, getPreferredAllocationFunc) devicePluginServer {
return &serverStub{} return &serverStub{}
} }
mgr.Run() mgr.Run()

View File

@ -51,27 +51,30 @@ type devicePluginServer interface {
// server implements devicePluginServer and pluginapi.PluginInterfaceServer interfaces. // server implements devicePluginServer and pluginapi.PluginInterfaceServer interfaces.
type server struct { type server struct {
devType string devType string
grpcServer *grpc.Server grpcServer *grpc.Server
updatesCh chan map[string]DeviceInfo updatesCh chan map[string]DeviceInfo
devices map[string]DeviceInfo devices map[string]DeviceInfo
postAllocate func(*pluginapi.AllocateResponse) error postAllocate postAllocateFunc
preStartContainer func(*pluginapi.PreStartContainerRequest) error preStartContainer preStartContainerFunc
state serverState getPreferredAllocation getPreferredAllocationFunc
stateMutex sync.Mutex state serverState
stateMutex sync.Mutex
} }
// newServer creates a new server satisfying the devicePluginServer interface. // newServer creates a new server satisfying the devicePluginServer interface.
func newServer(devType string, func newServer(devType string,
postAllocate func(*pluginapi.AllocateResponse) error, postAllocate postAllocateFunc,
preStartContainer func(*pluginapi.PreStartContainerRequest) error) devicePluginServer { preStartContainer preStartContainerFunc,
getPreferredAllocation getPreferredAllocationFunc) devicePluginServer {
return &server{ return &server{
devType: devType, devType: devType,
updatesCh: make(chan map[string]DeviceInfo, 1), // TODO: is 1 needed? updatesCh: make(chan map[string]DeviceInfo, 1), // TODO: is 1 needed?
devices: make(map[string]DeviceInfo), devices: make(map[string]DeviceInfo),
postAllocate: postAllocate, postAllocate: postAllocate,
preStartContainer: preStartContainer, preStartContainer: preStartContainer,
state: uninitialized, getPreferredAllocation: getPreferredAllocation,
state: uninitialized,
} }
} }
@ -162,6 +165,9 @@ func (srv *server) PreStartContainer(ctx context.Context, rqt *pluginapi.PreStar
} }
func (srv *server) GetPreferredAllocation(ctx context.Context, rqt *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) { func (srv *server) GetPreferredAllocation(ctx context.Context, rqt *pluginapi.PreferredAllocationRequest) (*pluginapi.PreferredAllocationResponse, error) {
if srv.getPreferredAllocation != nil {
return srv.getPreferredAllocation(rqt)
}
return nil, errors.New("GetPreferredAllocation should not be called as this device plugin doesn't implement it") return nil, errors.New("GetPreferredAllocation should not be called as this device plugin doesn't implement it")
} }

View File

@ -530,7 +530,7 @@ func TestGetPreferredAllocation(t *testing.T) {
} }
func TestNewServer(t *testing.T) { func TestNewServer(t *testing.T) {
_ = newServer("test", nil, nil) _ = newServer("test", nil, nil, nil)
} }
func TestUpdate(t *testing.T) { func TestUpdate(t *testing.T) {