Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ jobs:
# Set up Docker Buildx
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
with:
buildkitd-config: /etc/buildkit/buildkitd.toml

# Build and push Docker image with Buildx (don't push on PR)
# https://github.com/docker/build-push-action
Expand Down
52 changes: 49 additions & 3 deletions pkg/engines/slinky/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"maps"
"net/http"
"slices"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -55,6 +57,7 @@ type SlinkyEngine struct {

type Params struct {
slurm.BaseParams `mapstructure:",squash"`

// Namespace specifies the namespace where Slinky cluster is deployed
Namespace string `mapstructure:"namespace"`
// PodSelector specifies slurmd pods
Expand Down Expand Up @@ -155,18 +158,22 @@ func (eng *SlinkyEngine) GetComputeInstances(ctx context.Context, _ engines.Envi
}

func (eng *SlinkyEngine) getClusterNodes(ctx context.Context) (*corev1.NodeList, map[string]string, *httperr.Error) {
nodes, err := k8s.GetNodes(ctx, eng.client, eng.params.nodeListOpt)
return getMatchingNodes(ctx, eng.client, eng.params.Namespace, eng.params.nodeListOpt, eng.params.podListOpt)
}

func getMatchingNodes(ctx context.Context, client kubernetes.Interface, namespace string, nodeListOpt *metav1.ListOptions, podListOpt *metav1.ListOptions) (*corev1.NodeList, map[string]string, *httperr.Error) {
nodes, err := k8s.GetNodes(ctx, client, nodeListOpt)
if err != nil {
return nil, nil, httperr.NewError(http.StatusBadGateway, err.Error())
}

pods, err := eng.client.CoreV1().Pods(eng.params.Namespace).List(ctx, *eng.params.podListOpt)
pods, err := client.CoreV1().Pods(namespace).List(ctx, *podListOpt)
if err != nil {
return nil, nil, httperr.NewError(http.StatusBadGateway,
fmt.Sprintf("failed to list SLURM pods in the cluster: %v", err))
}

klog.V(4).Infof("Found %d pods in %q namespace with selector %q", len(pods.Items), eng.params.Namespace, eng.params.podListOpt.LabelSelector)
klog.V(4).Infof("Found %d pods in %q namespace with selector %q", len(pods.Items), namespace, podListOpt.LabelSelector)

// map k8s host name to SLURM host name
nodeMap := make(map[string]string)
Expand Down Expand Up @@ -325,6 +332,45 @@ func (eng *SlinkyEngine) getPartitionNodes(ctx context.Context, partition string
return "", fmt.Errorf("getPartitionNodes expects a string parameter")
}

var podSelector metav1.LabelSelector
//Get the topology name for the partition
topoName := partition
for name, topo := range eng.params.Topologies {
if topo.Partition == partition {
topoName = name
break
}
}

topo, exists := eng.params.Topologies[topoName]
Comment thread
ravisoundar marked this conversation as resolved.
if exists {
//decode the pod selector from the other map
podSelectorConfig, ok := topo.Other["podSelector"]
if ok {
err := config.Decode(podSelectorConfig, &podSelector)
if err != nil {
return "", fmt.Errorf("failed to decode pod selector: %v", err)
}
}
}
if podSelector.MatchLabels != nil || len(podSelector.MatchExpressions) > 0 {
sel, err := metav1.LabelSelectorAsSelector(&podSelector)
if err != nil {
return "", fmt.Errorf("failed to convert pod selector to label selector: %v", err)
}
podListOpt := &metav1.ListOptions{
LabelSelector: sel.String(),
}
_, nodeMap, httpErr := getMatchingNodes(ctx, eng.client, namespace, eng.params.nodeListOpt, podListOpt)
if httpErr != nil {
return "", fmt.Errorf("failed to get matching nodes: %v", httpErr)
}

nodes := slices.Collect(maps.Values(nodeMap))
nodesStr := strings.Join(nodes, ",")
return fmt.Sprintf(" Nodes=%s", nodesStr), nil
}

labels := map[string]string{"app.kubernetes.io/component": "login"}
pods, err := k8s.GetPodsByLabels(ctx, eng.client, namespace, labels)
if err != nil {
Expand Down
92 changes: 49 additions & 43 deletions pkg/engines/slinky/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package slinky
import (
"context"
"fmt"
"maps"
"slices"
"testing"
"time"

Expand All @@ -35,7 +33,6 @@ import (
"github.com/NVIDIA/topograph/pkg/engines/slurm"
"github.com/NVIDIA/topograph/pkg/models"
"github.com/NVIDIA/topograph/pkg/topology"
"github.com/NVIDIA/topograph/pkg/translate"
)

func TestGetParameters(t *testing.T) {
Expand Down Expand Up @@ -254,7 +251,8 @@ func TestConfigMapAnnotationsAndMetadata(t *testing.T) {
},
{
name: "with plugin only",
params: &Params{Namespace: "test-namespace",
params: &Params{
Namespace: "test-namespace",
BaseParams: slurm.BaseParams{
Plugin: topology.TopologyBlock,
},
Expand Down Expand Up @@ -340,28 +338,23 @@ const (
- switch: sw3
children: sw[21-22]
- switch: sw21
children: sw[11-12]
children: sw11
- switch: sw22
children: sw[13-14]
children: sw14
- switch: sw11
- switch: sw12
- switch: sw13
- switch: sw14
`

//medium.yaml - block topology skeleton
mediumBlockTopologyYamlSkeleton = `- topology: topo-0
cluster_default: false
block:
blockSizes:
block_sizes:
- 1
- 2
- 4
- 8
blocks:
- block: block1
- block: block2
- block: block3
- block: block4
`
//medium.yaml - tree topology skeleton
mediumCombinedTopologyYamlSkeleton = `- topology: topo-0
Expand All @@ -371,29 +364,43 @@ const (
- switch: sw3
children: sw[21-22]
- switch: sw21
children: sw[11-12]
children: sw11
- switch: sw22
children: sw[13-14]
children: sw13
- switch: sw11
- switch: sw12
- switch: sw13
- switch: sw14
- topology: topo-1
cluster_default: false
block:
blockSizes:
block_sizes:
- 1
- 2
- 4
- 8
blocks:
- block: block1
- block: block2
- block: block3
- block: block4
`
)

// slurmTopologiesForDynamicTest builds per-partition slurm.Topology entries for BaseParams.Topologies.
// Each entry includes podSelector under Other (seeRemain) for getPartitionNodes, matching engine decoding in getPartitionNodes.
func slurmTopologiesForDynamicTest(plugins []string) map[string]*slurm.Topology {
podSel := map[string]any{"matchLabels": map[string]string{"app": "slinky"}}
out := make(map[string]*slurm.Topology, len(plugins))
for i, plugin := range plugins {
key := fmt.Sprintf("topo-%d", i)
out[key] = &slurm.Topology{
Plugin: plugin,
Partition: key,
Other: map[string]interface{}{
"podSelector": podSel,
},
}
}
return out
}

func TestGenerateDynamicNodesOutput(t *testing.T) {
slinkyPodSel := metav1.LabelSelector{MatchLabels: map[string]string{"app": "slinky"}}

fakeSuccessClient := func(slurmNames []string, createConfigMap bool) *fake.Clientset {
client := fake.NewSimpleClientset()
Expand Down Expand Up @@ -488,7 +495,7 @@ func TestGenerateDynamicNodesOutput(t *testing.T) {
topologyConfig: []string{topology.TopologyBlock},
slurmName: []string{"1101", "1301"},
expectTopologyYaml: mediumBlockTopologyYamlSkeleton,
expectTopologySpec: []string{"topo-0:block1", "topo-0:block3"},
expectTopologySpec: []string{"topo-0:block1", "topo-0:block2"},
expectError: false,
},
{
Expand All @@ -499,7 +506,7 @@ func TestGenerateDynamicNodesOutput(t *testing.T) {
topologyConfig: []string{topology.TopologyTree, topology.TopologyBlock},
slurmName: []string{"1101", "1302"},
expectTopologyYaml: mediumCombinedTopologyYamlSkeleton,
expectTopologySpec: []string{"topo-0:sw3:sw21:sw11,topo-1:block1", "topo-0:sw3:sw22:sw13,topo-1:block3"},
expectTopologySpec: []string{"topo-0:sw3:sw21:sw11,topo-1:block1", "topo-0:sw3:sw22:sw13,topo-1:block2"},
expectError: false,
},
{
Expand All @@ -518,8 +525,8 @@ func TestGenerateDynamicNodesOutput(t *testing.T) {
},
{
name: "error getting config map",
k8sClient: func([]string, bool) *fake.Clientset {
client := fake.NewSimpleClientset()
k8sClient: func(_ []string, _ bool) *fake.Clientset {
client := fakeSuccessClient([]string{"1101", "1402"}, true)
client.PrependReactor("get", "configmaps", func(action k8stesting.Action) (bool, runtime.Object, error) {
return true, nil, errors.NewInternalError(fmt.Errorf("failed to get config map"))
})
Expand All @@ -535,34 +542,33 @@ func TestGenerateDynamicNodesOutput(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := tc.k8sClient(tc.slurmName, tc.createConfigMap)

model, err := models.NewModelFromFile(tc.topologyFile)
require.NoError(t, err)
topo, _ := model.ToGraph()

podListSel, err := metav1.LabelSelectorAsSelector(&slinkyPodSel)
require.NoError(t, err)
podListOpt := &metav1.ListOptions{LabelSelector: podListSel.String()}

params := &Params{
Namespace: "test-ns",
ConfigMapName: "slurm-config",
ConfigPath: "topology.yaml",
podListOpt: &metav1.ListOptions{LabelSelector: "app=slinky"},
PodSelector: slinkyPodSel,
ReportingMode: ReportingModeDynamicNodes,
podListOpt: podListOpt,
nodeListOpt: &metav1.ListOptions{},
BaseParams: slurm.BaseParams{
Topologies: slurmTopologiesForDynamicTest(tc.topologyConfig),
},
}
engine := &SlinkyEngine{
client: client,
params: params,
}

//Get the topology tree, and topology config for the test
model, err := models.NewModelFromFile(tc.topologyFile)
require.NoError(t, err)
topo, inst2Node := model.ToGraph()
nodes := slices.Collect(maps.Keys(inst2Node))
topologyConfig := &translate.Config{
Topologies: make(map[string]*translate.TopologySpec),
}
for i, topo := range tc.topologyConfig {
topologyConfig.Topologies[fmt.Sprintf("topo-%d", i)] = &translate.TopologySpec{Plugin: topo, Nodes: nodes}
}

nt, err := translate.NewNetworkTopology(topo, topologyConfig)
require.NoError(t, err)

result, httpErr := engine.generateDynamicNodesOutput(context.Background(), nt)
result, httpErr := engine.GenerateOutput(context.Background(), topo, nil)

if tc.expectError {
require.Error(t, httpErr)
Expand Down
11 changes: 6 additions & 5 deletions pkg/engines/slurm/slurm.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ type BaseParams struct {
}

type Topology struct {
Partition string `mapstructure:"partition"`
Plugin string `mapstructure:"plugin"`
BlockSizes []int `mapstructure:"blockSizes"`
Nodes []string `mapstructure:"nodes"`
Default bool `mapstructure:"clusterDefault"`
Partition string `mapstructure:"partition"`
Plugin string `mapstructure:"plugin"`
BlockSizes []int `mapstructure:"blockSizes"`
Nodes []string `mapstructure:"nodes"`
Default bool `mapstructure:"clusterDefault"`
Other map[string]interface{} `mapstructure:",remain"`
}

type Params struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/translate/yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Switch struct {
}

type BlockTopo struct {
BlockSizes []int `yaml:"blockSizes"`
BlockSizes []int `yaml:"block_sizes"`
Comment thread
ravisoundar marked this conversation as resolved.
Blocks []*Block `yaml:"blocks"`
parents map[string]string `yaml:"-"`
}
Expand Down
Loading
Loading