From ea80dda00892b5e72eea5b5c02384b29be335b5a Mon Sep 17 00:00:00 2001 From: Ravi Shankar Date: Thu, 23 Apr 2026 14:27:05 -0700 Subject: [PATCH] chore(slinky-engine): Use PodSelecotr to replace scontrol show partition Signed-off-by: Ravi Shankar --- .github/workflows/docker.yml | 2 + pkg/engines/slinky/engine.go | 52 ++++++++++++++++- pkg/engines/slinky/engine_test.go | 92 ++++++++++++++++--------------- pkg/engines/slurm/slurm.go | 11 ++-- pkg/translate/yaml.go | 2 +- pkg/translate/yaml_test.go | 20 +++---- 6 files changed, 117 insertions(+), 62 deletions(-) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 4ce5e73c..d23568ec 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -50,6 +50,8 @@ jobs: # Set up Docker Buildx - name: Set up Docker Buildx uses: docker/setup-buildx-action@v3 + with: + buildkitd-config: /etc/buildkit/buildkitd.toml # Build and push Docker image with Buildx (don't push on PR) # https://github.com/docker/build-push-action diff --git a/pkg/engines/slinky/engine.go b/pkg/engines/slinky/engine.go index d32ce8d3..1a5c6be3 100644 --- a/pkg/engines/slinky/engine.go +++ b/pkg/engines/slinky/engine.go @@ -22,6 +22,8 @@ import ( "fmt" "maps" "net/http" + "slices" + "strings" "time" corev1 "k8s.io/api/core/v1" @@ -55,6 +57,7 @@ type SlinkyEngine struct { type Params struct { slurm.BaseParams `mapstructure:",squash"` + // Namespace specifies the namespace where Slinky cluster is deployed Namespace string `mapstructure:"namespace"` // PodSelector specifies slurmd pods @@ -155,18 +158,22 @@ func (eng *SlinkyEngine) GetComputeInstances(ctx context.Context, _ engines.Envi } func (eng *SlinkyEngine) getClusterNodes(ctx context.Context) (*corev1.NodeList, map[string]string, *httperr.Error) { - nodes, err := k8s.GetNodes(ctx, eng.client, eng.params.nodeListOpt) + return getMatchingNodes(ctx, eng.client, eng.params.Namespace, eng.params.nodeListOpt, eng.params.podListOpt) +} + +func getMatchingNodes(ctx context.Context, client kubernetes.Interface, namespace string, nodeListOpt *metav1.ListOptions, podListOpt *metav1.ListOptions) (*corev1.NodeList, map[string]string, *httperr.Error) { + nodes, err := k8s.GetNodes(ctx, client, nodeListOpt) if err != nil { return nil, nil, httperr.NewError(http.StatusBadGateway, err.Error()) } - pods, err := eng.client.CoreV1().Pods(eng.params.Namespace).List(ctx, *eng.params.podListOpt) + pods, err := client.CoreV1().Pods(namespace).List(ctx, *podListOpt) if err != nil { return nil, nil, httperr.NewError(http.StatusBadGateway, fmt.Sprintf("failed to list SLURM pods in the cluster: %v", err)) } - klog.V(4).Infof("Found %d pods in %q namespace with selector %q", len(pods.Items), eng.params.Namespace, eng.params.podListOpt.LabelSelector) + klog.V(4).Infof("Found %d pods in %q namespace with selector %q", len(pods.Items), namespace, podListOpt.LabelSelector) // map k8s host name to SLURM host name nodeMap := make(map[string]string) @@ -325,6 +332,45 @@ func (eng *SlinkyEngine) getPartitionNodes(ctx context.Context, partition string return "", fmt.Errorf("getPartitionNodes expects a string parameter") } + var podSelector metav1.LabelSelector + //Get the topology name for the partition + topoName := partition + for name, topo := range eng.params.Topologies { + if topo.Partition == partition { + topoName = name + break + } + } + + topo, exists := eng.params.Topologies[topoName] + if exists { + //decode the pod selector from the other map + podSelectorConfig, ok := topo.Other["podSelector"] + if ok { + err := config.Decode(podSelectorConfig, &podSelector) + if err != nil { + return "", fmt.Errorf("failed to decode pod selector: %v", err) + } + } + } + if podSelector.MatchLabels != nil || len(podSelector.MatchExpressions) > 0 { + sel, err := metav1.LabelSelectorAsSelector(&podSelector) + if err != nil { + return "", fmt.Errorf("failed to convert pod selector to label selector: %v", err) + } + podListOpt := &metav1.ListOptions{ + LabelSelector: sel.String(), + } + _, nodeMap, httpErr := getMatchingNodes(ctx, eng.client, namespace, eng.params.nodeListOpt, podListOpt) + if httpErr != nil { + return "", fmt.Errorf("failed to get matching nodes: %v", httpErr) + } + + nodes := slices.Collect(maps.Values(nodeMap)) + nodesStr := strings.Join(nodes, ",") + return fmt.Sprintf(" Nodes=%s", nodesStr), nil + } + labels := map[string]string{"app.kubernetes.io/component": "login"} pods, err := k8s.GetPodsByLabels(ctx, eng.client, namespace, labels) if err != nil { diff --git a/pkg/engines/slinky/engine_test.go b/pkg/engines/slinky/engine_test.go index 7abf12f1..871797e0 100644 --- a/pkg/engines/slinky/engine_test.go +++ b/pkg/engines/slinky/engine_test.go @@ -19,8 +19,6 @@ package slinky import ( "context" "fmt" - "maps" - "slices" "testing" "time" @@ -35,7 +33,6 @@ import ( "github.com/NVIDIA/topograph/pkg/engines/slurm" "github.com/NVIDIA/topograph/pkg/models" "github.com/NVIDIA/topograph/pkg/topology" - "github.com/NVIDIA/topograph/pkg/translate" ) func TestGetParameters(t *testing.T) { @@ -254,7 +251,8 @@ func TestConfigMapAnnotationsAndMetadata(t *testing.T) { }, { name: "with plugin only", - params: &Params{Namespace: "test-namespace", + params: &Params{ + Namespace: "test-namespace", BaseParams: slurm.BaseParams{ Plugin: topology.TopologyBlock, }, @@ -340,12 +338,10 @@ const ( - switch: sw3 children: sw[21-22] - switch: sw21 - children: sw[11-12] + children: sw11 - switch: sw22 - children: sw[13-14] + children: sw14 - switch: sw11 - - switch: sw12 - - switch: sw13 - switch: sw14 ` @@ -353,15 +349,12 @@ const ( mediumBlockTopologyYamlSkeleton = `- topology: topo-0 cluster_default: false block: - blockSizes: + block_sizes: + - 1 - 2 - - 4 - - 8 blocks: - block: block1 - block: block2 - - block: block3 - - block: block4 ` //medium.yaml - tree topology skeleton mediumCombinedTopologyYamlSkeleton = `- topology: topo-0 @@ -371,29 +364,43 @@ const ( - switch: sw3 children: sw[21-22] - switch: sw21 - children: sw[11-12] + children: sw11 - switch: sw22 - children: sw[13-14] + children: sw13 - switch: sw11 - - switch: sw12 - switch: sw13 - - switch: sw14 - topology: topo-1 cluster_default: false block: - blockSizes: + block_sizes: + - 1 - 2 - - 4 - - 8 blocks: - block: block1 - block: block2 - - block: block3 - - block: block4 ` ) +// slurmTopologiesForDynamicTest builds per-partition slurm.Topology entries for BaseParams.Topologies. +// Each entry includes podSelector under Other (seeRemain) for getPartitionNodes, matching engine decoding in getPartitionNodes. +func slurmTopologiesForDynamicTest(plugins []string) map[string]*slurm.Topology { + podSel := map[string]any{"matchLabels": map[string]string{"app": "slinky"}} + out := make(map[string]*slurm.Topology, len(plugins)) + for i, plugin := range plugins { + key := fmt.Sprintf("topo-%d", i) + out[key] = &slurm.Topology{ + Plugin: plugin, + Partition: key, + Other: map[string]interface{}{ + "podSelector": podSel, + }, + } + } + return out +} + func TestGenerateDynamicNodesOutput(t *testing.T) { + slinkyPodSel := metav1.LabelSelector{MatchLabels: map[string]string{"app": "slinky"}} fakeSuccessClient := func(slurmNames []string, createConfigMap bool) *fake.Clientset { client := fake.NewSimpleClientset() @@ -488,7 +495,7 @@ func TestGenerateDynamicNodesOutput(t *testing.T) { topologyConfig: []string{topology.TopologyBlock}, slurmName: []string{"1101", "1301"}, expectTopologyYaml: mediumBlockTopologyYamlSkeleton, - expectTopologySpec: []string{"topo-0:block1", "topo-0:block3"}, + expectTopologySpec: []string{"topo-0:block1", "topo-0:block2"}, expectError: false, }, { @@ -499,7 +506,7 @@ func TestGenerateDynamicNodesOutput(t *testing.T) { topologyConfig: []string{topology.TopologyTree, topology.TopologyBlock}, slurmName: []string{"1101", "1302"}, expectTopologyYaml: mediumCombinedTopologyYamlSkeleton, - expectTopologySpec: []string{"topo-0:sw3:sw21:sw11,topo-1:block1", "topo-0:sw3:sw22:sw13,topo-1:block3"}, + expectTopologySpec: []string{"topo-0:sw3:sw21:sw11,topo-1:block1", "topo-0:sw3:sw22:sw13,topo-1:block2"}, expectError: false, }, { @@ -518,8 +525,8 @@ func TestGenerateDynamicNodesOutput(t *testing.T) { }, { name: "error getting config map", - k8sClient: func([]string, bool) *fake.Clientset { - client := fake.NewSimpleClientset() + k8sClient: func(_ []string, _ bool) *fake.Clientset { + client := fakeSuccessClient([]string{"1101", "1402"}, true) client.PrependReactor("get", "configmaps", func(action k8stesting.Action) (bool, runtime.Object, error) { return true, nil, errors.NewInternalError(fmt.Errorf("failed to get config map")) }) @@ -535,34 +542,33 @@ func TestGenerateDynamicNodesOutput(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { client := tc.k8sClient(tc.slurmName, tc.createConfigMap) + + model, err := models.NewModelFromFile(tc.topologyFile) + require.NoError(t, err) + topo, _ := model.ToGraph() + + podListSel, err := metav1.LabelSelectorAsSelector(&slinkyPodSel) + require.NoError(t, err) + podListOpt := &metav1.ListOptions{LabelSelector: podListSel.String()} + params := &Params{ Namespace: "test-ns", ConfigMapName: "slurm-config", ConfigPath: "topology.yaml", - podListOpt: &metav1.ListOptions{LabelSelector: "app=slinky"}, + PodSelector: slinkyPodSel, + ReportingMode: ReportingModeDynamicNodes, + podListOpt: podListOpt, nodeListOpt: &metav1.ListOptions{}, + BaseParams: slurm.BaseParams{ + Topologies: slurmTopologiesForDynamicTest(tc.topologyConfig), + }, } engine := &SlinkyEngine{ client: client, params: params, } - //Get the topology tree, and topology config for the test - model, err := models.NewModelFromFile(tc.topologyFile) - require.NoError(t, err) - topo, inst2Node := model.ToGraph() - nodes := slices.Collect(maps.Keys(inst2Node)) - topologyConfig := &translate.Config{ - Topologies: make(map[string]*translate.TopologySpec), - } - for i, topo := range tc.topologyConfig { - topologyConfig.Topologies[fmt.Sprintf("topo-%d", i)] = &translate.TopologySpec{Plugin: topo, Nodes: nodes} - } - - nt, err := translate.NewNetworkTopology(topo, topologyConfig) - require.NoError(t, err) - - result, httpErr := engine.generateDynamicNodesOutput(context.Background(), nt) + result, httpErr := engine.GenerateOutput(context.Background(), topo, nil) if tc.expectError { require.Error(t, httpErr) diff --git a/pkg/engines/slurm/slurm.go b/pkg/engines/slurm/slurm.go index 41d237f4..bb03c0b6 100644 --- a/pkg/engines/slurm/slurm.go +++ b/pkg/engines/slurm/slurm.go @@ -59,11 +59,12 @@ type BaseParams struct { } type Topology struct { - Partition string `mapstructure:"partition"` - Plugin string `mapstructure:"plugin"` - BlockSizes []int `mapstructure:"blockSizes"` - Nodes []string `mapstructure:"nodes"` - Default bool `mapstructure:"clusterDefault"` + Partition string `mapstructure:"partition"` + Plugin string `mapstructure:"plugin"` + BlockSizes []int `mapstructure:"blockSizes"` + Nodes []string `mapstructure:"nodes"` + Default bool `mapstructure:"clusterDefault"` + Other map[string]interface{} `mapstructure:",remain"` } type Params struct { diff --git a/pkg/translate/yaml.go b/pkg/translate/yaml.go index e9edf560..126a3740 100644 --- a/pkg/translate/yaml.go +++ b/pkg/translate/yaml.go @@ -40,7 +40,7 @@ type Switch struct { } type BlockTopo struct { - BlockSizes []int `yaml:"blockSizes"` + BlockSizes []int `yaml:"block_sizes"` Blocks []*Block `yaml:"blocks"` parents map[string]string `yaml:"-"` } diff --git a/pkg/translate/yaml_test.go b/pkg/translate/yaml_test.go index 4df611cc..206a0225 100644 --- a/pkg/translate/yaml_test.go +++ b/pkg/translate/yaml_test.go @@ -76,7 +76,7 @@ func TestBlockYamlTopology(t *testing.T) { expected := `- topology: topo1 cluster_default: true block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -84,7 +84,7 @@ func TestBlockYamlTopology(t *testing.T) { - topology: topo2 cluster_default: false block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -93,14 +93,14 @@ func TestBlockYamlTopology(t *testing.T) { expectedSkeleton := `- topology: topo1 cluster_default: true block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 - topology: topo2 cluster_default: false block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -164,7 +164,7 @@ func TestMixedYamlTopology(t *testing.T) { - topology: topo3 cluster_default: false block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -172,7 +172,7 @@ func TestMixedYamlTopology(t *testing.T) { - topology: topo4 cluster_default: false block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -218,7 +218,7 @@ func TestBlockOnlyYamlTopology(t *testing.T) { expected := `- topology: topo1 cluster_default: true block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -226,7 +226,7 @@ func TestBlockOnlyYamlTopology(t *testing.T) { - topology: topo2 cluster_default: false block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -272,7 +272,7 @@ func TestEmptyPartitionTopology(t *testing.T) { - topology: topo3 cluster_default: false block: - blockSizes: + block_sizes: - 2 blocks: - block: block1 @@ -299,7 +299,7 @@ func TestEmptyPartitionTopology(t *testing.T) { - topology: topo3 cluster_default: false block: - blockSizes: + block_sizes: - 2 blocks: - block: block1