Skip to content

Commit e7f0641

Browse files
committed
feat: add sticky-session support in port-forwarding
1 parent 0a59e0a commit e7f0641

8 files changed

Lines changed: 70 additions & 16 deletions

File tree

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,15 +81,14 @@ First time the k8sdebug command is called this file will be created.
8181

8282

8383
### 🔄 Smart Port Forwarding
84-
84+
![arch](./archport.png)
8585
```bash
8686
k8sdebug port-forward \
8787
-n my-namespace \
8888
--labels "app=backend" \
8989
--policy round-robin \
9090
--hostport 8080 \
9191
--containerport 80
92-
9392
# Supported policies:
9493
# - round-robin (default)
9594

archport.png

67.6 KB
Loading

pkg/forwarder/forwarder.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package forwarder
22

3+
import (
4+
"net"
5+
)
6+
37
type Forwarder interface {
4-
NextPort() string
8+
NextPort(net.Conn) string
59
}

pkg/portforward/mock/mock.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package mock
22

3-
import "github.com/revolyssup/k8sdebug/pkg/forwarder"
3+
import (
4+
"net"
5+
6+
"github.com/revolyssup/k8sdebug/pkg/forwarder"
7+
)
48

59
type MockForwarder struct {
610
ports []string
@@ -15,7 +19,7 @@ func New(ports ...string) forwarder.Forwarder {
1519
}
1620

1721
// Port returns the next predefined port or an error if exhausted.
18-
func (m *MockForwarder) NextPort() string {
22+
func (m *MockForwarder) NextPort(_ net.Conn) string {
1923
port := m.ports[m.index]
2024
m.index = (m.index + 1) % len(m.ports)
2125
return port

pkg/portforward/mock/mock_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ import (
1010
func TestRoundRobin(t *testing.T) {
1111
mock := mock.New("8080", "8081", "8082")
1212

13-
assert.Equal(t, "8080", mock.NextPort())
14-
assert.Equal(t, "8081", mock.NextPort())
15-
assert.Equal(t, "8082", mock.NextPort())
16-
assert.Equal(t, "8080", mock.NextPort())
17-
assert.Equal(t, "8081", mock.NextPort())
18-
assert.Equal(t, "8082", mock.NextPort())
13+
assert.Equal(t, "8080", mock.NextPort(nil))
14+
assert.Equal(t, "8081", mock.NextPort(nil))
15+
assert.Equal(t, "8082", mock.NextPort(nil))
16+
assert.Equal(t, "8080", mock.NextPort(nil))
17+
assert.Equal(t, "8081", mock.NextPort(nil))
18+
assert.Equal(t, "8082", mock.NextPort(nil))
1919
}

pkg/portforward/portforward.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/revolyssup/k8sdebug/pkg/forwarder"
1818
"github.com/revolyssup/k8sdebug/pkg/portforward/mock"
1919
"github.com/revolyssup/k8sdebug/pkg/portforward/roundrobin"
20+
"github.com/revolyssup/k8sdebug/pkg/portforward/sticky"
2021
"github.com/spf13/cobra"
2122
v1 "k8s.io/api/core/v1"
2223
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -48,8 +49,8 @@ func forwardToPod(hostConn net.Conn, podCon net.Conn) {
4849
io.Copy(podCon, hostConn)
4950
}
5051

51-
func getPodConnection(fw forwarder.Forwarder) (net.Conn, error) {
52-
port := fw.NextPort()
52+
func getPodConnection(fw forwarder.Forwarder, hostConn net.Conn) (net.Conn, error) {
53+
port := fw.NextPort(hostConn)
5354
if port == "" {
5455
return nil, fmt.Errorf("no available port")
5556
}
@@ -66,6 +67,8 @@ func getForwarder(policy string) forwarder.Forwarder {
6667
return roundrobin.New(&connPool)
6768
case "mock":
6869
return mock.New()
70+
case "sticky":
71+
return sticky.New(&connPool)
6972
}
7073
return nil
7174
}
@@ -107,7 +110,7 @@ func listenAndAccept(ctx context.Context, listener net.Listener, fw forwarder.Fo
107110
fmt.Printf("Accept error: %v", err)
108111
continue
109112
}
110-
podConn, err := getPodConnection(fw)
113+
podConn, err := getPodConnection(fw, hostConn)
111114
if err != nil {
112115
fmt.Printf("Pod connection error: %v", err)
113116
continue
@@ -247,7 +250,11 @@ func NewCommand() *cobra.Command {
247250

248251
cmd.PersistentFlags().StringVarP(&namespace, "namespace", "n", "default", "Name of the pod")
249252
cmd.PersistentFlags().StringVarP(&typ, "type", "t", "pod", "Name of the pod")
250-
cmd.PersistentFlags().StringVar(&policy, "policy", "round-robin", "policy to use while sending requests")
253+
cmd.PersistentFlags().StringVar(&policy, "policy", "round-robin", `policy to use while sending requests.
254+
Default policy is round-robin.
255+
round-robin: In this mode, requests are balanced across all the pods much like Kubernetes service.
256+
sticky: In this mode, requests from a particular source IP will always be directed to a single pod.
257+
`)
251258
cmd.Flags().StringVarP(&labels, "labels", "l", "", "list of key value pairs to use as labels while filtering pods.")
252259
cmd.Flags().StringVar(&hostport, "hostport", "3000", "host port on which requests will be sent")
253260
cmd.Flags().StringVar(&containerPort, "containerport", "80", "container port on which requests will be sent")

pkg/portforward/roundrobin/roundrobin.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func New(connPool *[]string) forwarder.Forwarder {
1919
connPool: connPool,
2020
}
2121
}
22-
func (rr *RoundRobin) NextPort() string {
22+
func (rr *RoundRobin) NextPort(_ net.Conn) string {
2323
rr.mx.Lock()
2424
defer rr.mx.Unlock()
2525

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package sticky
2+
3+
import (
4+
"net"
5+
"sync"
6+
7+
"github.com/revolyssup/k8sdebug/pkg/forwarder"
8+
"github.com/revolyssup/k8sdebug/pkg/portforward/roundrobin"
9+
)
10+
11+
type StickySession struct {
12+
ipTracker map[string]string // Source IP to Allotted port mapping
13+
mx sync.Mutex
14+
connPool *[]string
15+
roundRobin forwarder.Forwarder
16+
}
17+
18+
func New(connPool *[]string) forwarder.Forwarder {
19+
return &StickySession{
20+
connPool: connPool,
21+
ipTracker: make(map[string]string),
22+
roundRobin: roundrobin.New(connPool),
23+
}
24+
}
25+
func (ss *StickySession) NextPort(conn net.Conn) string {
26+
ss.mx.Lock()
27+
defer ss.mx.Unlock()
28+
29+
remoteAddr := conn.RemoteAddr().String()
30+
31+
// Split into IP and port
32+
host, _, _ := net.SplitHostPort(remoteAddr)
33+
if ss.ipTracker[host] != "" {
34+
return ss.ipTracker[host]
35+
}
36+
//fallback to roundrobin
37+
port := ss.roundRobin.NextPort(conn)
38+
ss.ipTracker[host] = port
39+
return port
40+
}

0 commit comments

Comments
 (0)