Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: shared informer priority queue improvements #1218

Merged
merged 5 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions scrapers/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/flanksource/config-db/scrapers/kubernetes"
"github.com/flanksource/config-db/utils/kube"
"github.com/flanksource/duty/job"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

Expand Down Expand Up @@ -48,8 +47,9 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
config.BaseScraper = config.BaseScraper.ApplyPlugins(plugins...)

var (
objs []*unstructured.Unstructured
queuedTime = map[string]time.Time{}
objs []*unstructured.Unstructured
deletedObjects []string
queuedTime = map[string]time.Time{}

seenObjects = map[string]struct{}{}
objectsFromEvents = map[string]v1.InvolvedObject{}
Expand All @@ -73,6 +73,11 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
}
obj := queueItem.Obj

if queueItem.Operation == kubernetes.QueueItemOperationDelete {
deletedObjects = append(deletedObjects, string(obj.GetUID()))
continue
}

if obj.GetKind() == "Event" {
involvedObjectRaw, ok, _ := unstructured.NestedMap(obj.Object, "involvedObject")
if !ok {
Expand Down Expand Up @@ -124,7 +129,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
// a way that no two objects in a batch have the same id.

objs = dedup(objs)
if err := consumeResources(ctx, *sc.ScrapeConfig(), *config, objs); err != nil {
if err := consumeResources(ctx, *sc.ScrapeConfig(), *config, objs, deletedObjects); err != nil {
ctx.History.AddErrorf("failed to consume resources: %v", err)
return err
}
Expand All @@ -140,7 +145,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
}
}

func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs []*unstructured.Unstructured) error {
func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v1.Kubernetes, objs []*unstructured.Unstructured, deletedResourcesIDs []string) error {
cc := api.NewScrapeContext(ctx.Context).WithScrapeConfig(&scrapeConfig).WithJobHistory(ctx.History).AsIncrementalScrape()
cc.Context = cc.Context.WithoutName().WithName(fmt.Sprintf("watch[%s/%s]", cc.GetNamespace(), cc.GetName()))
results, err := processObjects(cc, config, objs)
Expand All @@ -162,15 +167,7 @@ func consumeResources(ctx job.JobRuntime, scrapeConfig v1.ScrapeConfig, config v
}
}

_deleteCh, ok := kubernetes.DeleteResourceBuffer.Load(config.Hash())
if !ok {
return fmt.Errorf("no resource watcher channel found for config (scrapeconfig: %s)", config.Hash())
}
deleteChan := _deleteCh.(chan string)

if len(deleteChan) > 0 {
deletedResourcesIDs, _, _, _ := lo.Buffer(deleteChan, len(deleteChan))

if len(deletedResourcesIDs) > 0 {
total, err := db.SoftDeleteConfigItems(ctx.Context, deletedResourcesIDs...)
if err != nil {
return fmt.Errorf("failed to delete %d resources: %w", len(deletedResourcesIDs), err)
Expand Down
104 changes: 88 additions & 16 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/flanksource/is-healthy/pkg/health"
"github.com/google/uuid"
"github.com/samber/lo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
Expand All @@ -29,10 +30,6 @@ var (
// WatchQueue stores a sync buffer per kubernetes config
WatchQueue = sync.Map{}

// DeleteResourceBuffer stores a buffer per kubernetes config
// that contains the ids of resources that have been deleted.
DeleteResourceBuffer = sync.Map{}

informerLagBuckets = []float64{1_000, 5_000, 30_000, 120_000, 300_000, 600_000, 900_000, 1_800_000}
)

Expand All @@ -43,9 +40,6 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) (*pq.Queue, err
priorityQueue = loaded.(*pq.Queue)
}

deleteBuffer := make(chan string, BufferSize)
DeleteResourceBuffer.Store(config.Hash(), deleteBuffer)

if config.Kubeconfig != nil {
var err error
c, err := ctx.WithKubeconfig(*config.Kubeconfig)
Expand All @@ -56,7 +50,7 @@ func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) (*pq.Queue, err
}

for _, watchResource := range lo.Uniq(config.Watch) {
if err := globalSharedInformerManager.Register(ctx, watchResource, priorityQueue, deleteBuffer); err != nil {
if err := globalSharedInformerManager.Register(ctx, watchResource, priorityQueue); err != nil {
return nil, fmt.Errorf("failed to register informer: %w", err)
}
}
Expand Down Expand Up @@ -92,7 +86,7 @@ type SharedInformerManager struct {

type DeleteObjHandler func(ctx context.Context, id string) error

func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue, deleteBuffer chan<- string) error {
func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue) error {
start := time.Now()

apiVersion, kind := watchResource.ApiVersion, watchResource.Kind
Expand Down Expand Up @@ -123,7 +117,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
return
}

queue.Enqueue(NewQueueItem(u))
queue.Enqueue(NewQueueItem(u, QueueItemOperationAdd))

if ctx.Properties().On(false, "scraper.log.items") {
ctx.Logger.V(4).Infof("added: %s %s %s", u.GetUID(), u.GetKind(), u.GetName())
Expand Down Expand Up @@ -166,7 +160,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
).Record(time.Duration(time.Since(*lastUpdatedTime).Milliseconds()))
}

queue.Enqueue(NewQueueItem(u))
queue.Enqueue(NewQueueItem(u, QueueItemOperationUpdate))
},
DeleteFunc: func(obj any) {
u, err := getUnstructuredFromInformedObj(watchResource, obj)
Expand Down Expand Up @@ -197,7 +191,7 @@ func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1
).Record(time.Duration(time.Since(u.GetDeletionTimestamp().Time).Milliseconds()))
}

deleteBuffer <- string(u.GetUID())
queue.Enqueue(NewQueueItem(u, QueueItemOperationDelete))
},
})
if err != nil {
Expand Down Expand Up @@ -329,28 +323,106 @@ func kubeConfigIdentifier(ctx api.ScrapeContext) string {
return rs.Host
}

type QueueItemOperation int

const (
QueueItemOperationAdd QueueItemOperation = iota + 1
QueueItemOperationUpdate
QueueItemOperationDelete
)

func (t *QueueItemOperation) Priority() int {
// smaller value represents higher priority
priority := map[QueueItemOperation]int{
QueueItemOperationAdd: 1,
QueueItemOperationUpdate: 2,
QueueItemOperationDelete: 3,
}

return priority[*t]
}

type QueueItem struct {
Timestamp time.Time // Queued time
Obj *unstructured.Unstructured
Operation QueueItemOperation
}

func NewQueueItem(obj *unstructured.Unstructured) *QueueItem {
func NewQueueItem(obj *unstructured.Unstructured, operation QueueItemOperation) *QueueItem {
return &QueueItem{
Timestamp: time.Now(),
Obj: obj,
Operation: operation,
}
}

func pqComparator(a, b any) int {
var aTimestamp, bTimestamp time.Time
qa := a.(*QueueItem)
qb := b.(*QueueItem)

if qa.Obj.GetCreationTimestamp().Time.Before(qb.Obj.GetCreationTimestamp().Time) {
if opResult := pqCompareOperation(qa.Operation, qb.Operation); opResult != 0 {
return opResult
}

if opResult := pqCompareOwnerRef(qa.Obj.GetOwnerReferences(), qb.Obj.GetOwnerReferences()); opResult != 0 {
return opResult
}

if opResult := pqCompareKind(qa.Obj.GetKind(), qb.Obj.GetKind()); opResult != 0 {
return opResult
}

lastUpdatedTimeA := *health.GetLastUpdatedTime(qa.Obj)
lastUpdatedTimeB := *health.GetLastUpdatedTime(qb.Obj)

if lastUpdatedTimeA.Before(lastUpdatedTimeB) {
return -1
} else if aTimestamp.Equal(bTimestamp) {
} else if lastUpdatedTimeA.Equal(lastUpdatedTimeB) {
return 0
} else {
return 1
}
}

func pqCompareOperation(a, b QueueItemOperation) int {
return a.Priority() - b.Priority()
}

func pqCompareOwnerRef(a, b []metav1.OwnerReference) int {
if len(a) == len(b) {
return 0
}

return len(b) - len(a)
}

func pqCompareKind(a, b string) int {
// smaller means earlier in the queue
priority := map[string]int{
"Namespace": 1,
"Deployment": 2,
"StatefulSet": 2,
"DaemonSet": 2,
"Service": 2,
"ClusterRole": 2,
"Role": 2,
"HelmChart": 2,
"HelmRepository": 2,
"OCIRepository": 2,
"ClusterRoleBinding": 3,
"RoleBinding": 3,
"Endpoints": 3,
"CronJob": 3,
"Job": 3,
"ReplicaSet": 3,
"Pod": 4,
"Event": 5,
}

const unknownKindPriority = 3 // set medium priority for unknown kinds

pa := lo.CoalesceOrEmpty(priority[a], unknownKindPriority)
pb := lo.CoalesceOrEmpty(priority[b], unknownKindPriority)

return pa - pb
}
Loading
Loading