Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 109 additions & 92 deletions valkey/templates/haproxy-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ data:
retries 3

frontend valkey_frontend_write
bind *:6379
bind *:{{ .Values.haproxy.service.port | default 6379 }}
mode tcp
option tcplog
default_backend valkey_backend_master

frontend valkey_frontend_read
bind *:6380
bind *:{{ .Values.haproxy.service.readPort | default 6380 }}
mode tcp
option tcplog
default_backend valkey_backend_read
Expand All @@ -37,26 +37,37 @@ data:
# the runtime socket (set server addr + enable/disable server).
# This prevents HAProxy's own DNS resolver from putting servers into
# DNS NX maintenance, which cannot be cleared by enable server.
{{ range $i := until (add (int .Values.replica.replicas) 1 | int) }}
server valkey-{{ $i }} 127.0.0.1:{{ $.Values.service.port }} no-check disabled
{{ end }}
{{- range $i := until (add (int .Values.replica.replicas) 1 | int) }}
server valkey-{{ $i }} 127.0.0.1:6379 check-send-proxy disabled
{{- end }}

backend valkey_backend_read
mode tcp
option tcp-check
tcp-check connect
tcp-check comment PING

# Step 1-2: Connect (must use SSL if TLS is enabled)
{{- if .Values.tls.enabled }}
tcp-check connect port {{ .Values.service.port }} ssl
{{- else }}
tcp-check connect port {{ .Values.service.port }}
{{- end }}

# Step 3: Send PING
tcp-check send "PING\r\n"
tcp-check expect rstring "\\+PONG|-NOAUTH"

# Step 4: Expect PONG or any Auth error (which proves the service is up)
# We use a broader regex to catch "-NOAUTH" and "-ERR AUTH"
tcp-check expect rstring ^(\+PONG|-[Nn][Oo][Aa][Uu][Tt][Hh]|-[Ee][Rr][Rr])

timeout connect 2s
timeout check 5s
# Read backend uses health checks since any replica is acceptable.
# Servers start enabled; all will pass a simple PING check.
{{ range $i := until (add (int .Values.replica.replicas) 1 | int) }}
server valkey-{{ $i }} {{ include "valkey.fullname" $ }}-{{ $i }}.{{ include "valkey.headlessServiceName" $ }}.{{ $.Release.Namespace }}.svc.{{ $.Values.clusterDomain }}:{{ $.Values.service.port }} check inter 5s fall 3 rise 1 init-addr last,libc,none
{{ end }}
{{- range $i := until (add (int .Values.replica.replicas) 1 | int) }}
server valkey-{{ $i }} {{ include "valkey.fullname" $ }}-{{ $i }}.{{ include "valkey.headlessServiceName" $ }}.{{ $.Release.Namespace }}.svc.{{ $.Values.clusterDomain }}:{{ $.Values.service.port }} check inter 5s fall 3 rise 1 init-addr last,libc,none {{ if $.Values.tls.enabled }}ssl verify none{{ end }}
{{- end }}

sentinel-watcher.sh: |
sentinel-watcher.sh: |-
#!/bin/sh
# Sentinel watcher: polls Sentinel for master changes and updates HAProxy
# via the runtime socket.
Expand All @@ -69,134 +80,140 @@ data:
# DNS NX maintenance that cannot be overridden by `enable server`).
set -eu

SENTINEL_SVC="{{ include "valkey.fullname" . }}-sentinel.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}"
SENTINEL_PORT="{{ .Values.replica.sentinel.port }}"
MASTER_SET="{{ .Values.replica.sentinel.masterSet }}"
POLL_INTERVAL="{{ .Values.haproxy.config.checkInterval | default 2 }}"
VALKEY_PORT="{{ .Values.service.port }}"
HAPROXY_SOCKET="/var/run/haproxy/admin.sock"
BACKEND="valkey_backend_master"
POLL_INTERVAL=2
TOTAL_SERVERS="{{ add (int .Values.replica.replicas) 1 }}"

# Logging to stderr prevents polluting stdout
log() { echo "$(date) $1" >&2; }

{{- if .Values.auth.enabled }}
# Authentication helpers
get_user_password() {
username="$1"
password_key="${2:-$username}"
if [ -f "/valkey-users-secret/$password_key" ]; then
cat "/valkey-users-secret/$password_key"
elif [ -f "/valkey-auth-secret/${username}-password" ]; then
cat "/valkey-auth-secret/${username}-password"
fi
}

{{- $watcherUser := .Values.haproxy.sentinelWatcher.user | default "default" }}
{{- $userObj := index .Values.auth.sentinelAclUsers $watcherUser | default (dict "passwordKey" "") }}
{{- $passKey := $userObj.passwordKey | default $watcherUser }}

WATCHER_USER="{{ $watcherUser }}"
WATCHER_PASS=$(get_user_password "$WATCHER_USER" "{{ $passKey }}")
{{- end }}

# Base command assembly for Auth and TLS support
CLI_BASE="valkey-cli -p ${SENTINEL_PORT}"

{{- if .Values.auth.enabled }}
{{- $replUsername := .Values.replica.replicationUser }}
{{- $replUser := index .Values.auth.aclUsers $replUsername }}
{{- $replPasswordKey := $replUser.passwordKey | default $replUsername }}
AUTH_ARG="-a $(cat /valkey-auth-secret/{{ $replPasswordKey }}-password 2>/dev/null || cat /valkey-users-secret/{{ $replPasswordKey }} 2>/dev/null)"
{{- else }}
AUTH_ARG=""
if [ "$WATCHER_USER" != "default" ]; then
CLI_BASE="${CLI_BASE} --user ${WATCHER_USER} -a ${WATCHER_PASS}"
else
CLI_BASE="${CLI_BASE} -a ${WATCHER_PASS}"
fi
{{- end }}

log() {
echo "[sentinel-watcher] $(date '+%Y-%m-%dT%H:%M:%S') $*"
{{- if .Values.tls.enabled }}
CLI_BASE="${CLI_BASE} --tls --cacert /tls/{{ .Values.tls.caPublicKey }} --cert /tls/{{ .Values.tls.serverPublicKey }} --key /tls/{{ .Values.tls.serverKey }}"
{{- end }}

# Wrapper to query Sentinel
sentinel_cmd() {
local host=$1
shift
${CLI_BASE} -h "${host}" "$@"
}

# Wrapper to update HAProxy via socket
haproxy_cmd() {
echo "$1" | socat stdio "${HAPROXY_SOCKET}"
}

# Resolve a hostname to an IPv4 address using getent (musl/Alpine libc resolver).
# Falls back to nslookup if getent fails.
resolve_ip() {
hostname="$1"
ip=$(getent hosts "${hostname}" 2>/dev/null | awk '{print $1}' | head -1)
if [ -z "${ip}" ]; then
ip=$(nslookup "${hostname}" 2>/dev/null \
| awk '/^Address:/{print $2}' \
| grep -v '#' | head -1)
fi
echo "${ip}"
}

# Query Sentinel: returns first line of get-master-addr-by-name (hostname or IP)
get_master_from_sentinel() {
valkey-cli -h "${SENTINEL_SVC}" \
{{- if .Values.tls.enabled }}
--tls --cacert /tls/{{ .Values.tls.caPublicKey }} \
{{- end }}
-p "${SENTINEL_PORT}" ${AUTH_ARG} \
SENTINEL get-master-addr-by-name "${MASTER_SET}" 2>/dev/null | head -1 || true
getent hosts "$1" | awk '{print $1}'
}
LAST_MASTER_HOST=""

# Extract server index from a hostname like "valkey-1.valkey-headless..."
index_from_host() {
local host="${1%%.*}"
echo "${host##*-}"
}

wait_for_socket() {
log "Waiting for HAProxy socket..."
until [ -S "${HAPROXY_SOCKET}" ]; do sleep 1; done
log "HAProxy socket ready."
}
while true; do
MASTER_HOST=""

wait_for_sentinel() {
log "Waiting for Sentinel at ${SENTINEL_SVC}:${SENTINEL_PORT}..."
until valkey-cli -h "${SENTINEL_SVC}" \
{{- if .Values.tls.enabled }}
--tls --cacert /tls/{{ .Values.tls.caPublicKey }} \
{{- end }}
-p "${SENTINEL_PORT}" ${AUTH_ARG} PING 2>/dev/null | grep -q PONG; do
sleep 2
# 1. Ask Sentinels who the master is
i=0
while [ "${i}" -lt "${TOTAL_SERVERS}" ]; do
S_HOST="{{ include "valkey.fullname" . }}-${i}.{{ include "valkey.headlessServiceName" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}"

RESP=$(sentinel_cmd "${S_HOST}" sentinel get-master-addr-by-name "${MASTER_SET}" 2>/dev/null) || {
i=$((i + 1))
continue
}

if [ -n "${RESP}" ]; then
MASTER_HOST=$(echo "${RESP}" | head -n 1)
break
fi
i=$((i + 1))
done
log "Sentinel is ready."
}

wait_for_socket
wait_for_sentinel

LAST_MASTER_IDX=""

while true; do
MASTER_HOST=$(get_master_from_sentinel)

if [ -z "${MASTER_HOST}" ]; then
log "WARN: Could not get master from Sentinel, retrying..."
sleep "${POLL_INTERVAL}"
continue
fi

MASTER_IDX=$(index_from_host "${MASTER_HOST}")

if [ -z "${MASTER_IDX}" ]; then
log "WARN: Could not parse master index from '${MASTER_HOST}', retrying..."
# 2. Check if master actually changed
if [ "${MASTER_HOST}" = "${LAST_MASTER_HOST}" ]; then
sleep "${POLL_INTERVAL}"
continue
fi

if [ "${MASTER_IDX}" = "${LAST_MASTER_IDX}" ]; then
# 3. Find the actual HAProxy backend index for this master
MASTER_IDX=""
j=0
while [ "${j}" -lt "${TOTAL_SERVERS}" ]; do
EXPECTED_HOST="{{ include "valkey.fullname" . }}-${j}.{{ include "valkey.headlessServiceName" . }}.{{ .Release.Namespace }}.svc.{{ .Values.clusterDomain }}"
if [ "${MASTER_HOST}" = "${EXPECTED_HOST}" ]; then
MASTER_IDX="${j}"
break
fi
j=$((j + 1))
done

if [ -z "${MASTER_IDX}" ]; then
log "WARN: Unknown master host '${MASTER_HOST}'"
sleep "${POLL_INTERVAL}"
continue
fi

log "Master changed: index ${LAST_MASTER_IDX:-none} -> ${MASTER_IDX} (host: ${MASTER_HOST})"
log "Master changed: ${LAST_MASTER_HOST:-none} -> ${MASTER_HOST} (HAProxy Backend Index: valkey-${MASTER_IDX})"

# Resolve hostname to IP so HAProxy connects directly (no DNS resolver involved)
MASTER_IP=$(resolve_ip "${MASTER_HOST}")
if [ -z "${MASTER_IP}" ]; then
log "WARN: Could not resolve IP for '${MASTER_HOST}', retrying..."
sleep "${POLL_INTERVAL}"
continue
fi
log "Resolved master IP: ${MASTER_IP}"

# Disable old master servers first (no traffic interruption gap)
i=0
while [ "${i}" -lt "${TOTAL_SERVERS}" ]; do
if [ "${i}" != "${MASTER_IDX}" ]; then
log "Disabling server valkey-${i} in ${BACKEND}"
haproxy_cmd "disable server ${BACKEND}/valkey-${i}" > /dev/null
# 4. Update HAProxy routing safely
j=0
while [ "${j}" -lt "${TOTAL_SERVERS}" ]; do
if [ "${j}" != "${MASTER_IDX}" ]; then
haproxy_cmd "disable server ${BACKEND}/valkey-${j}" > /dev/null || true
fi
i=$((i + 1))
j=$((j + 1))
done

# Point the new master server to its actual Pod IP and enable it
log "Setting server valkey-${MASTER_IDX} addr ${MASTER_IP}:${VALKEY_PORT} and enabling"
haproxy_cmd "set server ${BACKEND}/valkey-${MASTER_IDX} addr ${MASTER_IP} port ${VALKEY_PORT}" > /dev/null
haproxy_cmd "enable server ${BACKEND}/valkey-${MASTER_IDX}" > /dev/null

LAST_MASTER_IDX="${MASTER_IDX}"
log "HAProxy updated: valkey-${MASTER_IDX} (${MASTER_IP}:${VALKEY_PORT}) is now active in ${BACKEND}"
LAST_MASTER_HOST="${MASTER_HOST}"
sleep "${POLL_INTERVAL}"
done
{{- end }}
{{- end }}
25 changes: 18 additions & 7 deletions valkey/templates/haproxy-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,30 @@ spec:
app.kubernetes.io/component: haproxy
template:
metadata:
annotations:
checksum/config: {{ include (print $.Template.BasePath "/haproxy-configmap.yaml") . | sha256sum }}
labels:
{{- include "valkey.selectorLabels" . | nindent 8 }}
{{- if .Values.haproxy.enabled }}
app.kubernetes.io/component: haproxy
{{- end }}
{{- with .Values.commonLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
annotations:
{{- with .Values.podAnnotations }}
{{- toYaml . | nindent 8 }}
{{- end }}
checksum/config: {{ include (print $.Template.BasePath "/haproxy-configmap.yaml") . | sha256sum }}
spec:
automountServiceAccountToken: {{ .Values.serviceAccount.automount }}
serviceAccountName: {{ include "valkey.serviceAccountName" . }}
{{- (include "valkey.imagePullSecrets" .) | nindent 6 }}
{{- with .Values.haproxy.extraInitContainers }}
initContainers:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- (include "valkey.imagePullSecrets" .) | nindent 6 }}
{{- with .Values.haproxy.extraInitContainers }}
initContainers:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: haproxy
{{- if .Values.haproxy.image.registry }}
Expand Down
8 changes: 4 additions & 4 deletions valkey/templates/haproxy-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ metadata:
spec:
type: {{ .Values.haproxy.service.type }}
ports:
- port: {{ .Values.haproxy.service.port }}
targetPort: 6379
- port: {{ .Values.haproxy.service.port | default 6379 }}
targetPort: {{ .Values.haproxy.service.port | default 6379 }}
protocol: TCP
name: valkey-write
- port: 6380
targetPort: 6380
- port: {{ .Values.haproxy.service.readPort | default 6380 }}
targetPort: {{ .Values.haproxy.service.readPort | default 6380 }}
protocol: TCP
name: valkey-read
selector:
Expand Down
Loading