containerized-data-importer/pkg/controller/controller.go

136 lines
3.6 KiB
Go

package controller
import (
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const (
// pvc annotations
annEndpoint = "kubevirt.io/storage.import.endpoint"
annSecret = "kubevirt.io/storage.import.secretName"
annStatus = "kubevirt.io/storage.import.status"
// importer pod annotations
annCreatedBy = "kubevirt.io/storage.createdByController"
// pvc statuses
pvcStatusInProcess = "In-process"
pvcStatusSuccess = "Success"
pvcStatusFailed = "Failed"
)
type Controller struct {
clientset kubernetes.Interface
queue workqueue.RateLimitingInterface
pvcInformer cache.SharedIndexInformer
pvcListWatcher cache.ListerWatcher
importerImageTag string
}
func NewController(
client kubernetes.Interface,
queue workqueue.RateLimitingInterface,
pvcInformer cache.SharedIndexInformer,
pvcListWatcher cache.ListerWatcher,
importerTag string,
) *Controller {
return &Controller{
clientset: client,
queue: queue,
pvcInformer: pvcInformer,
pvcListWatcher: pvcListWatcher,
importerImageTag: importerTag,
}
}
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
defer c.queue.ShutDown()
glog.Infoln("Starting CDI controller loop")
if threadiness < 1 {
return fmt.Errorf("controller.Run: expected >0 threads, got %d", threadiness)
}
go c.pvcInformer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.pvcInformer.HasSynced) {
return fmt.Errorf("controller.Run: Timeout waiting for cache sync")
}
glog.Infoln("Controller cache has synced")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorkers, time.Second, stopCh)
}
<-stopCh
return nil
}
func (c *Controller) runWorkers() {
for c.processNextItem() {
}
}
// Select pvcs with annEndpoint and without annStatus.
func (c *Controller) processNextItem() bool {
key, shutdown := c.queue.Get()
if shutdown {
return false
}
defer c.queue.Done(key)
pvc, err := c.pvcFromKey(key)
if pvc == nil {
c.queue.Forget(key)
return true
}
glog.Infof("processNextItem: next pvc to process: %s\n", key)
if err != nil {
glog.Errorf("processNextItem: error converting key to pvc: %v", err)
c.queue.Forget(key)
return true
}
if !metav1.HasAnnotation(pvc.ObjectMeta, annEndpoint) {
glog.Infof("processNextItem: annotation %q not found, skipping pvc\n", annEndpoint)
c.queue.Forget(key)
return true
}
if metav1.HasAnnotation(pvc.ObjectMeta, annStatus) {
glog.Infof("processNextItem: annotation %q present, skipping pvc\n", annStatus)
c.queue.Forget(key)
return true
}
if err := c.processItem(pvc); err != nil {
glog.Errorf("processNextItem: error processing key %q: %v", key, err)
c.queue.Forget(key)
}
return true
}
// Create the importer pod and its secert if needed.
// Place a watch on the importer pod and annotate the pvc when the importer pod terminates.
func (c *Controller) processItem(pvc *v1.PersistentVolumeClaim) error {
ep, err := getEndpoint(pvc)
if err != nil {
return fmt.Errorf("processItem: %v\n", err)
}
secretName, err := c.getSecretName(pvc)
if err != nil {
return fmt.Errorf("processItem: %v\n", err)
}
if secretName == "" {
glog.Infof("processItem: no secret will be supplied to endpoint %q\n", ep)
}
locPVC, err := c.setPVCStatus(pvc, pvcStatusInProcess)
if err != nil {
return fmt.Errorf("processItem: %v\n", err)
}
_, err = c.createImporterPod(ep, secretName, locPVC) //TODO: may need importer pod later
if err != nil {
return fmt.Errorf("processItem: %v\n", err)
}
return nil
}