Skip to content
This repository was archived by the owner on Apr 24, 2023. It is now read-only.

Commit

Permalink
mark apps that exceed the cluster capacity with a custom pod condition (
Browse files Browse the repository at this point in the history
#35)

* mark apps that exceed the cluster capacity with a custom pod condition

* address comments

* move params

* less logging duplication
  • Loading branch information
onursatici authored Jun 5, 2019
1 parent af85a36 commit 075e989
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 7 deletions.
13 changes: 12 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
nodeInformer.Lister(),
)

binpacker := extender.SelectBinpacker(install.BinpackAlgo)

sparkSchedulerExtender := extender.NewExtender(
nodeInformer.Lister(),
extender.NewSparkPodLister(podInformer.Lister()),
Expand All @@ -135,7 +137,7 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {
sparkSchedulerClient.ScalerV1alpha1(),
apiExtensionsClient,
install.FIFO,
install.BinpackAlgo,
binpacker,
overheadComputer,
)

Expand All @@ -146,10 +148,19 @@ func initServer(ctx context.Context, info witchcraft.InitInfo) (func(), error) {

queueReporter := metrics.NewQueueReporter(podInformer.Lister())

unschedulablePodMarker := extender.NewUnschedulablePodMarker(
nodeInformer.Lister(),
podInformer.Lister(),
kubeClient.CoreV1(),
overheadComputer,
binpacker,
)

sparkSchedulerExtender.Start(ctx)
go resourceReporter.StartReportingResourceUsage(ctx)
go queueReporter.StartReportingQueues(ctx)
go overheadComputer.Start(ctx)
go unschedulablePodMarker.Start(ctx)

if err := registerExtenderEndpoints(info.Router, sparkSchedulerExtender); err != nil {
return nil, err
Expand Down
8 changes: 5 additions & 3 deletions internal/extender/binpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@ const (
tightlyPack string = "tightly-pack"
)

type binpacker struct {
// Binpacker is a BinpackFunc with a known name
type Binpacker struct {
Name string
BinpackFunc binpack.SparkBinPackFunction
}

var binpackFunctions = map[string]*binpacker{
var binpackFunctions = map[string]*Binpacker{
tightlyPack: {tightlyPack, binpack.TightlyPack},
distributeEvenly: {distributeEvenly, binpack.DistributeEvenly},
}

func selectBinpacker(name string) *binpacker {
// SelectBinpacker selects the binpack function from the given name
func SelectBinpacker(name string) *Binpacker {
binpacker, ok := binpackFunctions[name]
if !ok {
return binpackFunctions[distributeEvenly]
Expand Down
6 changes: 3 additions & 3 deletions internal/extender/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type SparkSchedulerExtender struct {
demandCRDInitialized atomic.Bool

isFIFO bool
binpacker *binpacker
binpacker *Binpacker
overheadComputer *OverheadComputer
}

Expand All @@ -82,7 +82,7 @@ func NewExtender(
demandClient demandclient.ScalerV1alpha1Interface,
apiExtensionsClient apiextensionsclientset.Interface,
isFIFO bool,
binpackAlgo string,
binpacker *Binpacker,
overheadComputer *OverheadComputer) *SparkSchedulerExtender {
return &SparkSchedulerExtender{
nodeLister: nodeLister,
Expand All @@ -93,7 +93,7 @@ func NewExtender(
demandClient: demandClient,
apiExtensionsClient: apiExtensionsClient,
isFIFO: isFIFO,
binpacker: selectBinpacker(binpackAlgo),
binpacker: binpacker,
overheadComputer: overheadComputer,
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/extender/sparkpods.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
)

const (
// SparkSchedulerName is the name of the kube-scheduler instance that talks with the extender
SparkSchedulerName = "spark-scheduler"
// SparkRoleLabel represents the label key for the spark-role of a pod
SparkRoleLabel = "spark-role"
// SparkAppIDLabel represents the label key for the spark application ID on a pod
Expand Down
154 changes: 154 additions & 0 deletions internal/extender/unschedulablepods.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright (c) 2019 Palantir Technologies. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package extender

import (
"context"
"time"

"github.com/palantir/k8s-spark-scheduler-lib/pkg/resources"
"github.com/palantir/witchcraft-go-logging/wlog/svclog/svc1log"
"github.com/palantir/witchcraft-go-logging/wlog/wapp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

const (
podExceedsClusterCapacity v1.PodConditionType = "PodExceedsClusterCapacity"
unschedulablePollingInterval time.Duration = time.Minute
unschedulableInClusterThreshold time.Duration = 10 * time.Minute
)

// UnschedulablePodMarker checks for spark scheduler managed pending driver pods
// and checks if they can fit if the cluster was empty, else marks them with a
// custom pod condition.
type UnschedulablePodMarker struct {
nodeLister corelisters.NodeLister
podLister corelisters.PodLister
coreClient corev1.CoreV1Interface
overheadComputer *OverheadComputer
binpacker *Binpacker
}

// NewUnschedulablePodMarker creates a new UnschedulablePodMarker
func NewUnschedulablePodMarker(
nodeLister corelisters.NodeLister,
podLister corelisters.PodLister,
coreClient corev1.CoreV1Interface,
overheadComputer *OverheadComputer,
binpacker *Binpacker) *UnschedulablePodMarker {
return &UnschedulablePodMarker{
nodeLister: nodeLister,
podLister: podLister,
coreClient: coreClient,
overheadComputer: overheadComputer,
binpacker: binpacker,
}
}

// Start starts periodic scanning for unschedulable applications
func (u *UnschedulablePodMarker) Start(ctx context.Context) {
_ = wapp.RunWithFatalLogging(ctx, u.doStart)
}

func (u *UnschedulablePodMarker) doStart(ctx context.Context) error {
t := time.NewTicker(unschedulablePollingInterval)
for {
select {
case <-ctx.Done():
return nil
case <-t.C:
u.scanForUnschedulablePods(ctx)
}
}
}

func (u *UnschedulablePodMarker) scanForUnschedulablePods(ctx context.Context) {
pods, err := u.podLister.List(labels.Everything())
if err != nil {
svc1log.FromContext(ctx).Error("failed to list pods", svc1log.Stacktrace(err))
return
}
now := time.Now()
for _, pod := range pods {
if pod.Spec.SchedulerName == SparkSchedulerName &&
len(pod.Spec.NodeName) == 0 &&
pod.DeletionTimestamp == nil &&
pod.Labels[SparkRoleLabel] == Driver &&
pod.CreationTimestamp.Time.Add(unschedulableInClusterThreshold).Before(now) {

ctx = svc1log.WithLoggerParams(
ctx,
svc1log.SafeParam("podName", pod.Name),
svc1log.SafeParam("podNamespace", pod.Namespace))

exceedsCapacity, err := u.doesPodExceedClusterCapacity(ctx, pod)
if err != nil {
svc1log.FromContext(ctx).Error("failed to check if pod was unschedulable",
svc1log.Stacktrace(err))
return
}

err = u.markPodClusterCapacityStatus(ctx, pod, exceedsCapacity)
if err != nil {
svc1log.FromContext(ctx).Error("failed to mark pod cluster capacity status",
svc1log.Stacktrace(err))
}
}
}
}

func (u *UnschedulablePodMarker) doesPodExceedClusterCapacity(ctx context.Context, driver *v1.Pod) (bool, error) {
nodes, err := u.nodeLister.List(labels.Set(driver.Spec.NodeSelector).AsSelector())
if err != nil {
return false, err
}
nodeNames := make([]string, 0, len(nodes))
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
}
availableResources := resources.AvailableForNodes(nodes, u.overheadComputer.GetOverhead(ctx, nodes))
applicationResources, err := sparkResources(ctx, driver)
if err != nil {
return false, err
}
_, _, hasCapacity := u.binpacker.BinpackFunc(
ctx,
applicationResources.driverResources,
applicationResources.executorResources,
applicationResources.executorCount,
nodeNames,
nodeNames,
availableResources)

return !hasCapacity, nil
}

func (u *UnschedulablePodMarker) markPodClusterCapacityStatus(ctx context.Context, driver *v1.Pod, exceedsCapacity bool) error {
exceedsCapacityStatus := v1.ConditionFalse
if exceedsCapacity {
exceedsCapacityStatus = v1.ConditionTrue
}

if !podutil.UpdatePodCondition(&driver.Status, &v1.PodCondition{Type: podExceedsClusterCapacity, Status: exceedsCapacityStatus}) {
return nil
}

_, err := u.coreClient.Pods(driver.Namespace).UpdateStatus(driver)
return err
}

0 comments on commit 075e989

Please sign in to comment.