Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
api "github.com/cobaltcore-dev/cortex/api/external/nova"
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/lib"
resv "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"k8s.io/apimachinery/pkg/api/resource"
)
Expand Down Expand Up @@ -198,69 +199,17 @@ func (s *FilterHasEnoughCapacity) Run(traceLog *slog.Logger, request api.Externa
continue
}

// For CR reservations with allocations, compute the effective block:
// CommittedResourceReservations: compute the effective block:
// confirmed = sum of resources for VMs present in both Spec and Status allocations
// specOnly = sum of resources for VMs present in Spec but not yet in Status
// remaining = max(0, Spec.Resources - confirmed) [clamped: never negative]
// block = max(remaining, specOnly) [spec-only VM must be fully covered]
//
// Clamping: if confirmed VMs exceed slot size (e.g. after resize), block = 0.
// Oversize spec-only: if a pending VM is larger than the remaining slot, block its full size.
var resourcesToBlock map[hv1.ResourceName]resource.Quantity
if reservation.Spec.Type == v1alpha1.ReservationTypeCommittedResource &&
// When ignoring allocations (empty-datacenter scenario) VM resources are not
// deducted, so the confirmed-VM adjustment would under-block: always use the
// full slot instead.
!s.Options.IgnoreAllocations &&
// if the reservation is not being migrated, block only unused resources
reservation.Spec.TargetHost == reservation.Status.Host &&
reservation.Spec.CommittedResourceReservation != nil &&
len(reservation.Spec.CommittedResourceReservation.Allocations) > 0 {
confirmedResources := make(map[hv1.ResourceName]resource.Quantity)
specOnlyResources := make(map[hv1.ResourceName]resource.Quantity)

statusAllocs := map[string]string{}
if reservation.Status.CommittedResourceReservation != nil {
statusAllocs = reservation.Status.CommittedResourceReservation.Allocations
}

for instanceUUID, allocation := range reservation.Spec.CommittedResourceReservation.Allocations {
_, isConfirmed := statusAllocs[instanceUUID]
for resourceName, quantity := range allocation.Resources {
if isConfirmed {
existing := confirmedResources[resourceName]
existing.Add(quantity)
confirmedResources[resourceName] = existing
} else {
existing := specOnlyResources[resourceName]
existing.Add(quantity)
specOnlyResources[resourceName] = existing
}
}
}

resourcesToBlock = make(map[hv1.ResourceName]resource.Quantity)
zero := resource.Quantity{}
for resourceName, slotSize := range reservation.Spec.Resources {
confirmed := confirmedResources[resourceName]
specOnly := specOnlyResources[resourceName]

remaining := slotSize.DeepCopy()
remaining.Sub(confirmed)
if remaining.Cmp(zero) < 0 {
remaining = zero.DeepCopy()
}

if specOnly.Cmp(remaining) > 0 {
resourcesToBlock[resourceName] = specOnly.DeepCopy()
} else {
resourcesToBlock[resourceName] = remaining
}
}
} else {
// For other reservation types or CR without allocations, block full resources
resourcesToBlock = reservation.Spec.Resources
}
//
// FailoverReservations: block = Spec.Resources (always fully blocked).
resourcesToBlock := resv.UnusedReservationCapacity(&reservation, s.Options.IgnoreAllocations)

// Block the calculated resources on each host
for host := range hostsToBlock {
Expand Down
57 changes: 52 additions & 5 deletions internal/scheduling/reservations/capacity/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,17 @@ func (c *Controller) reconcileAll(ctx context.Context) error {

azs := availabilityZones(hvList.Items)

// Compute reservation memory blocks once per cycle — shared across all (group × AZ) pairs.
blockedByReservations, err := c.blockedMemoryByHost(ctx)
if err != nil {
logger.Error(err, "failed to compute blocked memory by host, placeable slot counts may be overstated")
blockedByReservations = map[string]int64{}
}

var succeeded, failed int
for groupName, groupData := range flavorGroups {
for _, az := range azs {
if err := c.reconcileOne(ctx, groupName, groupData, az, hvByName, hvList.Items); err != nil {
if err := c.reconcileOne(ctx, groupName, groupData, az, hvByName, hvList.Items, blockedByReservations); err != nil {
logger.Error(err, "failed to reconcile flavor group capacity",
"flavorGroup", groupName, "az", az)
failed++
Expand Down Expand Up @@ -118,6 +125,7 @@ func (c *Controller) reconcileOne(
az string,
hvByName map[string]hv1.Hypervisor,
allHVs []hv1.Hypervisor,
blockedByReservations map[string]int64,
) error {

smallestFlavorBytes := int64(groupData.SmallestFlavor.MemoryMB) * 1024 * 1024 //nolint:gosec
Expand Down Expand Up @@ -162,8 +170,8 @@ func (c *Controller) reconcileOne(
cur := existingByName[flavor.Name]
cur.FlavorName = flavor.Name

totalVMSlots, totalHosts, totalErr := c.probeScheduler(ctx, flavor, az, c.config.TotalPipeline, hvByName, true)
placeableVMs, placeableHosts, placeableErr := c.probeScheduler(ctx, flavor, az, c.config.PlaceablePipeline, hvByName, false)
totalVMSlots, totalHosts, totalErr := c.probeScheduler(ctx, flavor, az, c.config.TotalPipeline, hvByName, true, nil)
placeableVMs, placeableHosts, placeableErr := c.probeScheduler(ctx, flavor, az, c.config.PlaceablePipeline, hvByName, false, blockedByReservations)

if totalErr != nil {
allFresh = false
Expand Down Expand Up @@ -258,14 +266,15 @@ func (c *Controller) reconcileOne(
// probeScheduler calls the scheduler with the given pipeline and returns VM slots + host count.
// Capacity is computed as sum of floor(hostMemory / flavorMemory) across returned hosts.
// When ignoreAllocations is true (total/empty-datacenter probe), raw effective capacity is used.
// When false (placeable probe), hv.Status.Allocation is subtracted first so that slots reflect
// remaining capacity after running VMs.
// When false (placeable probe), hv.Status.Allocation and blockedByReservations are subtracted so
// that slots reflect remaining capacity after running VMs and active reservation blocks.
func (c *Controller) probeScheduler(
ctx context.Context,
flavor compute.FlavorInGroup,
az, pipeline string,
hvByName map[string]hv1.Hypervisor,
ignoreAllocations bool,
blockedByReservations map[string]int64,
) (capacity, hosts int64, err error) {

flavorBytes := int64(flavor.MemoryMB) * 1024 * 1024 //nolint:gosec
Expand Down Expand Up @@ -318,6 +327,7 @@ func (c *Controller) probeScheduler(
if alloc, ok := hv.Status.Allocation[hv1.ResourceMemory]; ok {
capBytes -= alloc.Value()
}
capBytes -= blockedByReservations[hostName]
if capBytes < 0 {
capBytes = 0
}
Expand All @@ -329,6 +339,43 @@ func (c *Controller) probeScheduler(
return capacity, hosts, nil
}

// blockedMemoryByHost lists all Reservations and returns the total bytes blocked per host name.
// Only placed reservations (TargetHost or Status.Host non-empty) are counted.
// When a reservation is being migrated (TargetHost != Status.Host), both hosts are blocked.
func (c *Controller) blockedMemoryByHost(ctx context.Context) (map[string]int64, error) {
var list v1alpha1.ReservationList
if err := c.client.List(ctx, &list); err != nil {
return nil, fmt.Errorf("failed to list reservations: %w", err)
}

blocked := make(map[string]int64)
for i := range list.Items {
res := &list.Items[i]

hostsToBlock := make(map[string]struct{})
if res.Spec.TargetHost != "" {
hostsToBlock[res.Spec.TargetHost] = struct{}{}
}
if res.Status.Host != "" {
hostsToBlock[res.Status.Host] = struct{}{}
}
if len(hostsToBlock) == 0 {
continue
}

resourcesToBlock := reservations.UnusedReservationCapacity(res, false)
memQty, ok := resourcesToBlock[hv1.ResourceMemory]
if !ok {
continue
}
memBytes := memQty.Value()
for host := range hostsToBlock {
blocked[host] += memBytes
}
}
return blocked, nil
}

// sumCommittedCapacity sums AcceptedSpec.Amount (or Spec.Amount as fallback) across all
// CommittedResource CRDs for the given (flavorGroup, az) pair with an active state
// (guaranteed or confirmed) and resource type memory. Returns the total in slots.
Expand Down
60 changes: 52 additions & 8 deletions internal/scheduling/reservations/capacity/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestReconcileOne_CreatesCRD(t *testing.T) {
}
hvByName := map[string]hv1.Hypervisor{"host-1": *hv}

if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil {
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}, map[string]int64{}); err != nil {
t.Fatalf("reconcileOne failed: %v", err)
}

Expand Down Expand Up @@ -293,7 +293,7 @@ func TestReconcileOne_SetsReadyConditionFalseOnSchedulerError(t *testing.T) {
}

// reconcileOne returns no error itself (it continues on probe failure), but sets Ready=False
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, map[string]hv1.Hypervisor{}, []hv1.Hypervisor{}); err != nil {
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, map[string]hv1.Hypervisor{}, []hv1.Hypervisor{}, map[string]int64{}); err != nil {
t.Fatalf("reconcileOne failed: %v", err)
}

Expand Down Expand Up @@ -358,11 +358,11 @@ func TestReconcileOne_IdempotentUpdate(t *testing.T) {
hvByName := map[string]hv1.Hypervisor{"host-1": *hv}

// First call
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil {
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}, map[string]int64{}); err != nil {
t.Fatalf("first reconcileOne failed: %v", err)
}
// Second call — should not error on the already-existing CRD
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}); err != nil {
if err := ctrl.reconcileOne(context.Background(), groupName, groupData, az, hvByName, []hv1.Hypervisor{*hv}, map[string]int64{}); err != nil {
t.Fatalf("second reconcileOne failed: %v", err)
}

Expand Down Expand Up @@ -429,7 +429,7 @@ func TestProbeScheduler_CapacityCalculation(t *testing.T) {
}
flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB}

capacity, hosts, err := c.probeScheduler(context.Background(), flavor, "az-a", "test-pipeline", hvByName, true)
capacity, hosts, err := c.probeScheduler(context.Background(), flavor, "az-a", "test-pipeline", hvByName, true, nil)
if err != nil {
t.Fatalf("probeScheduler failed: %v", err)
}
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestProbeScheduler_SubtractsAllocationsWhenNotIgnored(t *testing.T) {
flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB}

// Total probe (ignoreAllocations=true): raw capacity → 2 slots.
totalCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "total-pipeline", hvByName, true)
totalCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "total-pipeline", hvByName, true, nil)
if err != nil {
t.Fatalf("probeScheduler (total) failed: %v", err)
}
Expand All @@ -476,7 +476,7 @@ func TestProbeScheduler_SubtractsAllocationsWhenNotIgnored(t *testing.T) {
}

// Placeable probe (ignoreAllocations=false): capacity − allocation → 1 slot.
placeableCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "placeable-pipeline", hvByName, false)
placeableCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "placeable-pipeline", hvByName, false, nil)
if err != nil {
t.Fatalf("probeScheduler (placeable) failed: %v", err)
}
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestReconcileOne_ZeroMemoryFlavorReturnsError(t *testing.T) {
groupData := compute.FlavorGroupFeature{
SmallestFlavor: compute.FlavorInGroup{Name: "bad-flavor", MemoryMB: 0},
}
err := c.reconcileOne(context.Background(), "hana-v2", groupData, "az-a", nil, nil)
err := c.reconcileOne(context.Background(), "hana-v2", groupData, "az-a", nil, nil, nil)
if err == nil {
t.Error("expected error for zero-memory flavor")
}
Expand Down Expand Up @@ -648,3 +648,47 @@ func TestSumCommittedCapacity(t *testing.T) {
t.Errorf("sumCommittedCapacity = %d, want 3", got)
}
}

// TestProbeScheduler_SubtractsReservationBlocksWhenNotIgnored verifies that placeable-probe
// slot counting subtracts per-host reservation blocks in addition to hv.Status.Allocation.
func TestProbeScheduler_SubtractsReservationBlocksWhenNotIgnored(t *testing.T) {
const memMB = 4096
const memBytes = int64(memMB) * 1024 * 1024

scheme := newTestScheme(t)

// Host has 3-slot capacity (3 × flavor), 1 slot used by running VM, 1 slot blocked by reservation.
hv := newHypervisor("host-1", "az-a", memBytes*3)
hv.Status.Allocation = map[hv1.ResourceName]resource.Quantity{
hv1.ResourceMemory: *resource.NewQuantity(memBytes, resource.BinarySI),
}

fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()
srv := newMockSchedulerServer(t, []string{"host-1"})
defer srv.Close()

c := NewController(fakeClient, Config{SchedulerURL: srv.URL})
hvByName := map[string]hv1.Hypervisor{"host-1": *hv}
flavor := compute.FlavorInGroup{Name: "test-flavor", MemoryMB: memMB}

// Total probe: raw 3 slots, no subtraction.
totalCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "total-pipeline", hvByName, true, nil)
if err != nil {
t.Fatalf("probeScheduler (total) failed: %v", err)
}
if totalCap != 3 {
t.Errorf("total capacity = %d, want 3", totalCap)
}

// Placeable probe with 1 reservation block: 3 - 1 (alloc) - 1 (reservation) = 1 slot.
blockedByReservations := map[string]int64{
"host-1": memBytes, // 1 reservation blocking 1 slot's worth of memory
}
placeableCap, _, err := c.probeScheduler(context.Background(), flavor, "az-a", "placeable-pipeline", hvByName, false, blockedByReservations)
if err != nil {
t.Fatalf("probeScheduler (placeable) failed: %v", err)
}
if placeableCap != 1 {
t.Errorf("placeable capacity = %d, want 1 (3 slots − 1 alloc − 1 reservation)", placeableCap)
}
}
75 changes: 75 additions & 0 deletions internal/scheduling/reservations/capacity_accounting.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright SAP SE
// SPDX-License-Identifier: Apache-2.0

package reservations

import (
hv1 "github.com/cobaltcore-dev/openstack-hypervisor-operator/api/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
)

// UnusedReservationCapacity returns the resources a Reservation should block on its host(s).
// This is the single source of truth used by both the capacity controller and
// filter_has_enough_capacity to ensure consistent accounting.
//
// CommittedResourceReservations: confirmed VMs already appear in hv.Status.Allocation,
// so blocking the full slot would double-count them. The effective block is:
// max(slot − confirmedVMs, specOnlyVMs), clamped to zero. Skipped (full slot used) when
// ignoreAllocations is true or when mid-migration (TargetHost != Status.Host).
//
// FailoverReservations: always block the full Spec.Resources.
func UnusedReservationCapacity(res *v1alpha1.Reservation, ignoreAllocations bool) map[hv1.ResourceName]resource.Quantity {
if res.Spec.Type == v1alpha1.ReservationTypeCommittedResource &&
!ignoreAllocations &&
res.Spec.TargetHost == res.Status.Host &&
res.Spec.CommittedResourceReservation != nil &&
len(res.Spec.CommittedResourceReservation.Allocations) > 0 {
confirmedResources := make(map[hv1.ResourceName]resource.Quantity)
specOnlyResources := make(map[hv1.ResourceName]resource.Quantity)

statusAllocs := map[string]string{}
if res.Status.CommittedResourceReservation != nil {
statusAllocs = res.Status.CommittedResourceReservation.Allocations
}

for instanceUUID, allocation := range res.Spec.CommittedResourceReservation.Allocations {
_, isConfirmed := statusAllocs[instanceUUID]
for resourceName, quantity := range allocation.Resources {
if isConfirmed {
existing := confirmedResources[resourceName]
existing.Add(quantity)
confirmedResources[resourceName] = existing
} else {
existing := specOnlyResources[resourceName]
existing.Add(quantity)
specOnlyResources[resourceName] = existing
}
}
}

result := make(map[hv1.ResourceName]resource.Quantity)
zero := resource.Quantity{}
for resourceName, slotSize := range res.Spec.Resources {
confirmed := confirmedResources[resourceName]
specOnly := specOnlyResources[resourceName]

remaining := slotSize.DeepCopy()
remaining.Sub(confirmed)
if remaining.Cmp(zero) < 0 {
remaining = zero.DeepCopy()
}

if specOnly.Cmp(remaining) > 0 {
result[resourceName] = specOnly.DeepCopy()
} else {
result[resourceName] = remaining
}
}
return result
} else {
// FailoverReservations are always fully blocked and unused.
return res.Spec.Resources
}
}
Loading