diff --git a/.github/workflows/release-ipfix-collector.yaml b/.github/workflows/release-ipfix-collector.yaml new file mode 100644 index 000000000..9d9b2e5b8 --- /dev/null +++ b/.github/workflows/release-ipfix-collector.yaml @@ -0,0 +1,115 @@ +# +name: Create and publish an IPFIX Collector image + +on: + push: + paths: + - '.github/workflows/release-ipfix-collector.yaml' + - 'Containerfiles/IPFIX-Collector-Containerfile' + branches: + - development + - main + workflow_dispatch: + inputs: + imageTag: + description: 'Set tag for the image' + required: true + default: '1.0.0-alpine' + type: choice + options: + - 1.0.0-alpine + - 1.1.0-alpine + - latest + +# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds. +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + DEF_TAG_NAME: 1.0.0-alpine + +# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu. +jobs: + build-and-push-image: + outputs: + MY_DATE: ${{ steps.mydate.outputs.MY_DATE }} + MY_CONTAINER: ${{ steps.mycontainer.outputs.MY_CONTAINER }} + runs-on: ubuntu-latest + # Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job. + permissions: + contents: read + packages: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + # Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here. + - name: Log in to the Container registry + uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + # ghcr only allows lowercase repository names + - name: lowercase repo name + run: | + echo "IMAGE_NAME=${GITHUB_REPOSITORY,,}" >>${GITHUB_ENV} + # This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels. + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + # This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages. + # It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository. + # It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step. + - name: Dynamically set MY_DATE environment variable + run: echo "MY_DATE=$(date +%s)" >> $GITHUB_ENV + - name: Build and push Docker image + uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4 + with: + context: . + file: Containerfiles/IPFIX-Collector-Containerfile + push: true + tags: | + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/ipfix-collector:${{ github.event.inputs.imageTag || env.DEF_TAG_NAME }} + ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/ipfix-collector:${{ github.event.inputs.imageTag || env.DEF_TAG_NAME }}-${{ env.MY_DATE }} + labels: ${{ steps.meta.outputs.labels }} + - name: Dynamically set MY_CONTAINER output option + id: mycontainer + run: echo "MY_CONTAINER=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/ipfix-collector:${{ github.event.inputs.imageTag || env.DEF_TAG_NAME }}-${{ env.MY_DATE }}" >> $GITHUB_OUTPUT + - name: Dynamically set MY_DATE output option + id: mydate + run: echo "MY_DATE=${{ env.MY_DATE }}" >> $GITHUB_OUTPUT + + change-original-images: + runs-on: ubuntu-latest + needs: [build-and-push-image] + permissions: + contents: write + pull-requests: write + steps: + - name: Checkout repository + uses: actions/checkout@v4 + - name: Dynamically update the original images file + run: jq '. + ["${{ needs.build-and-push-image.outputs.MY_CONTAINER }}"] | sort' .original-images.json | tee .original-images.json.new + - name: Rewrite original images file + run: mv .original-images.json.new .original-images.json + - name: Create Pull Request + id: cpr + uses: peter-evans/create-pull-request@v7 + with: + commit-message: Update original images with new IPFIX collector container + committer: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> + author: ${{ github.actor }} <${{ github.actor_id }}+${{ github.actor }}@users.noreply.github.com> + signoff: false + branch: ipfix-collector-${{ needs.build-and-push-image.outputs.MY_DATE }} + sign-commits: true + delete-branch: true + title: 'chore: Update original images with IPFIX collector' + body: | + Update container image + - Updated original image file with container ${{needs.build-and-push-image.outputs.MY_CONTAINER}} + change request Auto-generated + labels: | + container images + automated pr + draft: false diff --git a/.original-images.json b/.original-images.json index 78900ae1b..c781d3550 100644 --- a/.original-images.json +++ b/.original-images.json @@ -1,5 +1,8 @@ [ "cr.fluentbit.io/fluent/fluent-bit:4.0.7", + "docker.io/bitnamilegacy/memcached-exporter:0.15.0-debian-12-r3", + "docker.io/bitnamilegacy/memcached:1.6.33-debian-12-r0", + "docker.io/bitnamilegacy/os-shell:12-debian-12-r50", "docker.io/docker:17.07.0", "docker.io/kolla/ubuntu-source-nova-compute-ironic:master", "docker.io/library/postgres:14.5", @@ -13,9 +16,11 @@ "docker.io/openstackhelm/horizon:2023.1-ubuntu_jammy", "docker.io/openstackhelm/ironic:2024.1-ubuntu_jammy", "docker.io/openstackhelm/magnum:2024.1-ubuntu_jammy", + "docker.io/openstackhelm/manila:2024.1-ubuntu_jammy", + "docker.io/openstackhelm/masakari-monitors:2024.1-ubuntu_jammy", "docker.io/openstackhelm/masakari-monitors:2024.1-ubuntu_jammy", "docker.io/openstackhelm/masakari:2024.1-ubuntu_jammy", - "docker.io/openstackhelm/manila:2024.1-ubuntu_jammy", + "docker.io/openstackhelm/masakari:2024.1-ubuntu_jammy", "docker.io/openstackhelm/neutron:2024.1-ubuntu_jammy", "docker.io/openstackhelm/osh-selenium:latest-ubuntu_jammy", "docker.io/openstackhelm/ospurge:latest", @@ -25,6 +30,7 @@ "docker.io/wrouesnel/postgres_exporter:v0.4.6", "docker.io/xrally/xrally-openstack:2.0.0", "gcr.io/google_containers/hyperkube-amd64:v1.11.6", + "ghcr.io/lukerepko/genestack/ipfix-collector:1.0.0-alpine-1773344112", "ghcr.io/rackerlabs/genestack/ceilometer:2024.1-ubuntu_jammy-1738626813", "ghcr.io/rackerlabs/genestack/cinder-volume-rxt:2024.1-ubuntu_jammy-1731085441", "ghcr.io/rackerlabs/genestack/glance:2024.1-ubuntu_jammy-1740121591", @@ -43,15 +49,10 @@ "ghcr.io/rackerlabs/keystone-rxt:2024.1-ubuntu_jammy-1747958291", "ghcr.io/rackerlabs/skyline-rxt:master-ubuntu_jammy-1748595671", "ghcr.io/vexxhost/netoffload:v1.0.1", - "quay.io/airshipit/kubernetes-entrypoint:latest-ubuntu_jammy", - "quay.io/airshipit/porthole-postgresql-utility:latest-ubuntu_bionic", - "quay.io/airshipit/freezer:2025.1-ubuntu_jammy", - "quay.io/airshipit/freezer-api:2025.1-ubuntu_jammy", "quay.io/airshipit/blazar:2025.1-ubuntu_jammy", "quay.io/airshipit/cloudkitty:2024.1-ubuntu_jammy", - "docker.io/openstackhelm/masakari:2024.1-ubuntu_jammy", - "docker.io/openstackhelm/masakari-monitors:2024.1-ubuntu_jammy", - "docker.io/bitnamilegacy/memcached:1.6.33-debian-12-r0", - "docker.io/bitnamilegacy/memcached-exporter:0.15.0-debian-12-r3", - "docker.io/bitnamilegacy/os-shell:12-debian-12-r50" + "quay.io/airshipit/freezer-api:2025.1-ubuntu_jammy", + "quay.io/airshipit/freezer:2025.1-ubuntu_jammy", + "quay.io/airshipit/kubernetes-entrypoint:latest-ubuntu_jammy", + "quay.io/airshipit/porthole-postgresql-utility:latest-ubuntu_bionic" ] diff --git a/Containerfiles/IPFIX-Collector-Containerfile b/Containerfiles/IPFIX-Collector-Containerfile new file mode 100644 index 000000000..fa34ed2bf --- /dev/null +++ b/Containerfiles/IPFIX-Collector-Containerfile @@ -0,0 +1,27 @@ +FROM python:3.13-alpine + +# Install runtime dependencies +RUN apk add --no-cache \ + bash \ + ca-certificates \ + tzdata \ + curl \ + nfdump + +# Install Python dependencies for rollup script +RUN pip install --no-cache-dir \ + requests \ + urllib3 + +# Create flow directory +RUN mkdir -p /var/lib/ipfix /var/lib/ipfix/processed && \ + chmod 755 /var/lib/ipfix /var/lib/ipfix/processed + +# Add non-root user +RUN adduser -D -u 1000 ipfix + +WORKDIR /var/lib/ipfix + +USER ipfix + +ENTRYPOINT ["/bin/bash"] diff --git a/base-helm-configs/clickhouse/clickhouse-helm-overrides.yaml b/base-helm-configs/clickhouse/clickhouse-helm-overrides.yaml new file mode 100644 index 000000000..790159773 --- /dev/null +++ b/base-helm-configs/clickhouse/clickhouse-helm-overrides.yaml @@ -0,0 +1,34 @@ +# Operator goes in the clickhouse namespace and watches that namespace only. +createCRDs: true +watchNamespaces: + - clickhouse + +affinity: + # Schedule the operator only on worker nodes. + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/worker + operator: In + values: + - worker + # Spread away from other operator pods (if HA operator is enabled later). + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchExpressions: + - key: app.kubernetes.io/name + operator: In + values: ["altinity-clickhouse-operator"] + topologyKey: "kubernetes.io/hostname" + +# Keep it quiet-ish but observable +env: + - name: LOG_LEVEL + value: "info" + +metrics: + enabled: true diff --git a/base-helm-configs/ipfix/ipfix-helm-overrides.yaml b/base-helm-configs/ipfix/ipfix-helm-overrides.yaml new file mode 100644 index 000000000..627e7e1aa --- /dev/null +++ b/base-helm-configs/ipfix/ipfix-helm-overrides.yaml @@ -0,0 +1,43 @@ +ipfix: + namespace: ipfix + + # Schedule collectors only on nodes we expect VIP egress on. + nodeSelector: + openstack-network-node: "enabled" + + # nfcapd + listenPort: 4739 + rotateSeconds: 300 + compress: true + + # Local flow storage on each node (hostPath: /var/lib/ipfix) + # Flows are stored locally on each node, surviving reboots + # No shared storage needed - each node processes its own flows + + # Rollup interval (seconds) + rollupEverySeconds: 300 + + # ClickHouse HTTP endpoint (override here to match your cluster) + clickhouse: + namespace: clickhouse + httpUrl: "http://clickhouse-http.clickhouse.svc.cluster.local:8123" + database: "ipfix" + table: "vip_hourly_node" + # secret name for creds; install script creates if missing + secretName: "ipfix-clickhouse-creds" + + # OVS bridge to export IPFIX from + # br-int = integration bridge where VM traffic flows (OVN/kube-ovn) + # br-ex = external bridge (typically just management traffic) + ovs: + providerBridge: "br-int" + + # Optional: restrict rollup to known VIPs + # (ConfigMap with vips.txt can be added later) + vipListConfigMap: "" + + s3: + enabled: false + rawBucket: "s3://org-ipfix-raw-dev" + prefix: "clusterA/${NODE_NAME}/" + secretName: "s3-credentials" diff --git a/base-kustomize/clickhouse/base/chi-server-base.yaml b/base-kustomize/clickhouse/base/chi-server-base.yaml new file mode 100644 index 000000000..3b6a7fe97 --- /dev/null +++ b/base-kustomize/clickhouse/base/chi-server-base.yaml @@ -0,0 +1,84 @@ +apiVersion: clickhouse.altinity.com/v1 +kind: ClickHouseInstallation +metadata: + name: server + namespace: clickhouse +spec: + taskID: "1" + + configuration: + # No Keeper here for base config (single node / non-replicated) + + users: + # Disable dangerous defaults; create reader/writer explicitly. + default/readonly: 1 + + writer/password_sha256_hex: + valueFrom: + secretKeyRef: + name: clickhouse-db-passwords + key: writer_password_sha256 + writer/profile: default + writer/quota: default + writer/networks/ip: "::/0" + + reader/password_sha256_hex: + valueFrom: + secretKeyRef: + name: clickhouse-db-passwords + key: reader_password_sha256 + reader/profile: readonly + reader/quota: default + reader/networks/ip: "::/0" + + profiles: + readonly/readonly: 1 + + clusters: + - name: c + layout: + shardsCount: 1 + replicasCount: 1 + templates: + podTemplate: ch-pod + volumeClaimTemplate: ch-data + + templates: + podTemplates: + - name: ch-pod + spec: + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/worker + operator: In + values: + - worker + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + clickhouse.altinity.com/chi: server + topologyKey: "kubernetes.io/hostname" + containers: + - name: clickhouse + # Injected by envsubst in install script from helm-chart-versions.yaml + image: ${CLICKHOUSE_SERVER_IMAGE} + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8123 + - name: native + containerPort: 9000 + - name: inter + containerPort: 9009 + volumeClaimTemplates: + - name: ch-data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: general + resources: + requests: + storage: 10Gi diff --git a/base-kustomize/clickhouse/base/kustomization.yaml b/base-kustomize/clickhouse/base/kustomization.yaml new file mode 100644 index 000000000..f18c78729 --- /dev/null +++ b/base-kustomize/clickhouse/base/kustomization.yaml @@ -0,0 +1,6 @@ +--- +sortOptions: + order: fifo +resources: + - chi-server-base.yaml + - svc-clickhouse-http.yaml diff --git a/base-kustomize/clickhouse/base/svc-clickhouse-http.yaml b/base-kustomize/clickhouse/base/svc-clickhouse-http.yaml new file mode 100644 index 000000000..3c3fc60c6 --- /dev/null +++ b/base-kustomize/clickhouse/base/svc-clickhouse-http.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: clickhouse-http + namespace: clickhouse + labels: + app: clickhouse +spec: + type: ClusterIP + selector: + clickhouse.altinity.com/chi: server + ports: + - name: http + port: 8123 + targetPort: 8123 diff --git a/base-kustomize/clickhouse/ipfix/ch-pdb-s01.yaml b/base-kustomize/clickhouse/ipfix/ch-pdb-s01.yaml new file mode 100644 index 000000000..ebab08240 --- /dev/null +++ b/base-kustomize/clickhouse/ipfix/ch-pdb-s01.yaml @@ -0,0 +1,12 @@ +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: ch-pdb-s01 + namespace: clickhouse +spec: + minAvailable: 1 + selector: + matchLabels: + clickhouse.altinity.com/chi: server + clickhouse.altinity.com/cluster: ipfix + clickhouse.altinity.com/shard: s01 diff --git a/base-kustomize/clickhouse/ipfix/ch-pdb-s02.yaml b/base-kustomize/clickhouse/ipfix/ch-pdb-s02.yaml new file mode 100644 index 000000000..a83bcc0b0 --- /dev/null +++ b/base-kustomize/clickhouse/ipfix/ch-pdb-s02.yaml @@ -0,0 +1,12 @@ +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: ch-pdb-s02 + namespace: clickhouse +spec: + minAvailable: 1 + selector: + matchLabels: + clickhouse.altinity.com/chi: server + clickhouse.altinity.com/cluster: ipfix + clickhouse.altinity.com/shard: s02 diff --git a/base-kustomize/clickhouse/ipfix/ch-pdb-s03.yaml b/base-kustomize/clickhouse/ipfix/ch-pdb-s03.yaml new file mode 100644 index 000000000..0a526f9f4 --- /dev/null +++ b/base-kustomize/clickhouse/ipfix/ch-pdb-s03.yaml @@ -0,0 +1,12 @@ +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: ch-pdb-s03 + namespace: clickhouse +spec: + minAvailable: 1 + selector: + matchLabels: + clickhouse.altinity.com/chi: server + clickhouse.altinity.com/cluster: ipfix + clickhouse.altinity.com/shard: s03 diff --git a/base-kustomize/clickhouse/ipfix/chi-server-ipfix.yaml b/base-kustomize/clickhouse/ipfix/chi-server-ipfix.yaml new file mode 100644 index 000000000..b010d7a4c --- /dev/null +++ b/base-kustomize/clickhouse/ipfix/chi-server-ipfix.yaml @@ -0,0 +1,71 @@ +apiVersion: clickhouse.altinity.com/v1 +kind: ClickHouseInstallation +metadata: + name: server + namespace: clickhouse +spec: + configuration: + # Keeper for replication in this overlay + zookeeper: + nodes: + - host: keeper-keeper + port: 2181 + + # Override cluster layout to 3 shards × 2 replicas + clusters: + - name: ipfix + layout: + shardsCount: 3 + replicasCount: 2 + templates: + podTemplate: ch-pod + volumeClaimTemplate: ch-data + + # NOTE: Schema initialization (database, tables) is handled by the IPFIX + # install script, not here. This keeps ClickHouse generic and lets IPFIX + # own its schema requirements. + + templates: + podTemplates: + - name: ch-pod + spec: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/worker + operator: In + values: + - worker + + # Prefer spreading across nodes cluster-wide + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchLabels: + clickhouse.altinity.com/chi: server + topologyKey: kubernetes.io/hostname + + containers: + - name: clickhouse + image: ${CLICKHOUSE_SERVER_IMAGE} + imagePullPolicy: IfNotPresent + ports: + - name: http + containerPort: 8123 + - name: native + containerPort: 9000 + - name: inter + containerPort: 9009 + + volumeClaimTemplates: + - name: ch-data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: general + resources: + requests: + storage: 10Gi diff --git a/base-kustomize/clickhouse/ipfix/chk-keeper.yaml b/base-kustomize/clickhouse/ipfix/chk-keeper.yaml new file mode 100644 index 000000000..693ff76bf --- /dev/null +++ b/base-kustomize/clickhouse/ipfix/chk-keeper.yaml @@ -0,0 +1,47 @@ +apiVersion: clickhouse-keeper.altinity.com/v1 +kind: ClickHouseKeeperInstallation +metadata: + name: keeper + namespace: clickhouse +spec: + configuration: + clusters: + - name: c + layout: + replicasCount: 1 + templates: + podTemplates: + - name: keeper-pod + spec: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: node-role.kubernetes.io/worker + operator: In + values: + - worker + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchLabels: + clickhouse-keeper.altinity.com/chi: keeper + topologyKey: "kubernetes.io/hostname" + containers: + - name: clickhouse-keeper + # Injected by envsubst in install script + image: ${CLICKHOUSE_KEEPER_IMAGE} + imagePullPolicy: IfNotPresent + volumeClaimTemplates: + - name: keeper-data + spec: + accessModes: ["ReadWriteOnce"] + storageClassName: general + resources: + requests: + storage: 10Gi + defaults: + templates: + podTemplate: keeper-pod + volumeClaimTemplate: keeper-data diff --git a/base-kustomize/clickhouse/ipfix/keeper-pdb.yaml b/base-kustomize/clickhouse/ipfix/keeper-pdb.yaml new file mode 100644 index 000000000..a5196b579 --- /dev/null +++ b/base-kustomize/clickhouse/ipfix/keeper-pdb.yaml @@ -0,0 +1,10 @@ +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: keeper-pdb + namespace: clickhouse +spec: + minAvailable: 2 + selector: + matchLabels: + clickhouse-keeper.altinity.com/chi: keeper diff --git a/base-kustomize/clickhouse/ipfix/kustomization.yaml b/base-kustomize/clickhouse/ipfix/kustomization.yaml new file mode 100644 index 000000000..32a6f213f --- /dev/null +++ b/base-kustomize/clickhouse/ipfix/kustomization.yaml @@ -0,0 +1,15 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: clickhouse + +resources: + - ../base + - chk-keeper.yaml + - keeper-pdb.yaml + - ch-pdb-s01.yaml + - ch-pdb-s02.yaml + - ch-pdb-s03.yaml + +patchesStrategicMerge: + - chi-server-ipfix.yaml diff --git a/base-kustomize/ipfix/base/kustomization.yaml b/base-kustomize/ipfix/base/kustomization.yaml new file mode 100644 index 000000000..c53866a7e --- /dev/null +++ b/base-kustomize/ipfix/base/kustomization.yaml @@ -0,0 +1,5 @@ +--- +sortOptions: + order: fifo +resources: + - all.yaml diff --git a/base-kustomize/ipfix/overlay/kustomization.yaml b/base-kustomize/ipfix/overlay/kustomization.yaml new file mode 100644 index 000000000..3a5aaf3b5 --- /dev/null +++ b/base-kustomize/ipfix/overlay/kustomization.yaml @@ -0,0 +1,9 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +# NOTE: Do NOT set namespace here - Helm templates already specify namespace +# and Helm's --create-namespace handles namespace creation. +# Setting namespace here causes ownership conflicts with Helm. + +resources: + - ../base diff --git a/bin/install-clickhouse.sh b/bin/install-clickhouse.sh new file mode 100755 index 000000000..929d69b9d --- /dev/null +++ b/bin/install-clickhouse.sh @@ -0,0 +1,198 @@ +#!/usr/bin/env bash +# Description: Fetches the version for SERVICE_NAME from the specified +# YAML file and executes a helm upgrade/install command with dynamic values files. + +# Disable SC2124 (unused array), SC2145 (array expansion issue), SC2294 (eval) +# SC2016 (intentionallay not expanding) +# shellcheck disable=SC2124,SC2145,SC2294,SC2016 +set -euo pipefail + +# Service +SERVICE_NAME="clickhouse" +SERVICE_NAMESPACE="clickhouse" + +# Helm +HELM_REPO_NAME="altinity" +HELM_REPO_URL="https://helm.altinity.com" + +# Base directories provided by the environment +GENESTACK_BASE_DIR="${GENESTACK_BASE_DIR:-/opt/genestack}" +GENESTACK_OVERRIDES_DIR="${GENESTACK_OVERRIDES_DIR:-/etc/genestack}" + +# Define service-specific override directories based on the framework +SERVICE_BASE_OVERRIDES="${GENESTACK_BASE_DIR}/base-helm-configs/${SERVICE_NAME}" +SERVICE_CUSTOM_OVERRIDES="${GENESTACK_OVERRIDES_DIR}/helm-configs/${SERVICE_NAME}" +GLOBAL_OVERRIDES_DIR="${GENESTACK_OVERRIDES_DIR}/helm-configs/global_overrides" + +# Read the desired chart version from VERSION_FILE +VERSION_FILE="${GENESTACK_OVERRIDES_DIR}/helm-chart-versions.yaml" +KUSTOMIZE_DIR="${GENESTACK_OVERRIDES_DIR}/kustomize/clickhouse/overlay" +OP_RELEASE="altinity-operator" + +need() { command -v "$1" >/dev/null || { echo "Missing required command: $1" >&2; exit 1; }; } +need helm +need kubectl +need awk +need sha256sum +need openssl +need envsubst + +echo "==> Ensuring namespace '${SERVICE_NAMESPACE}' exists" +kubectl get ns "${SERVICE_NAMESPACE}" >/dev/null 2>&1 || kubectl create ns "${SERVICE_NAMESPACE}" + +# --- Create/reuse DB password secret --- +echo "==> Ensuring DB password secret exists in namespace '${SERVICE_NAMESPACE}'" +if ! kubectl -n "${SERVICE_NAMESPACE}" get secret clickhouse-db-passwords >/dev/null 2>&1; then + WRITER_PLAIN="$(openssl rand -hex 16)" + READER_PLAIN="$(openssl rand -hex 16)" + WRITER_SHA256="$(printf "%s" "${WRITER_PLAIN}" | sha256sum | awk "{print \$1}")" + READER_SHA256="$(printf "%s" "${READER_PLAIN}" | sha256sum | awk "{print \$1}")" + kubectl -n "${SERVICE_NAMESPACE}" apply -f - <&2 + exit 1 +fi + +# Prepare an array to collect -f arguments +overrides_args=() + +# Include all YAML files from the BASE configuration directory +if [[ -d "$SERVICE_BASE_OVERRIDES" ]]; then + echo "Including base overrides from directory: $SERVICE_BASE_OVERRIDES" + for file in "$SERVICE_BASE_OVERRIDES"/*.yaml; do + # Check that there is at least one match + if [[ -e "$file" ]]; then + echo " - $file" + overrides_args+=("-f" "$file") + fi + done +else + echo "Warning: Base override directory not found: $SERVICE_BASE_OVERRIDES" +fi + +# Include all YAML files from the GLOBAL configuration directory +if [[ -d "$GLOBAL_OVERRIDES_DIR" ]]; then + echo "Including overrides from global config directory:" + for file in "$GLOBAL_OVERRIDES_DIR"/*.yaml; do + if [[ -e "$file" ]]; then + echo " - $file" + overrides_args+=("-f" "$file") + fi + done +else + echo "Warning: Global config directory not found: $GLOBAL_OVERRIDES_DIR" +fi + +# Include all YAML files from the custom SERVICE configuration directory +if [[ -d "$SERVICE_CUSTOM_OVERRIDES" ]]; then + echo "Including overrides from service config directory:" + for file in "$SERVICE_CUSTOM_OVERRIDES"/*.yaml; do + if [[ -e "$file" ]]; then + echo " - $file" + overrides_args+=("-f" "$file") + fi + done +else + echo "Warning: Service config directory not found: $SERVICE_CUSTOM_OVERRIDES" +fi + +echo + +echo "==> Helm repo add/update for operator chart: ${OP_CHART} @ ${OP_VERSION}" +helm repo add "$HELM_REPO_NAME" "$HELM_REPO_URL" +helm repo update + +# Collect all --set arguments, executing commands and quoting safely +set_args=() + +helm_command=( + helm upgrade --install "$OP_RELEASE" "$OP_CHART" + --version "$OP_VERSION" + --namespace="$SERVICE_NAMESPACE" + --timeout 120m + + "${overrides_args[@]}" + "${set_args[@]}" + + "$@" +) + +echo "==> Executing Helm command (arguments are quoted safely):" +printf '%q ' "${helm_command[@]}" +echo + +# Execute the command directly from the array +"${helm_command[@]}" + +echo "==> Waiting for operator to be ready" +kubectl -n "${SERVICE_NAMESPACE}" rollout status deploy/altinity-operator-altinity-clickhouse-operator --timeout=300s + +# --- Apply Kustomize with envsubsted images from versions file --- +export CLICKHOUSE_SERVER_IMAGE="${CH_SERVER_IMAGE}" +export CLICKHOUSE_KEEPER_IMAGE="${CH_KEEPER_IMAGE}" + +echo "==> Applying ClickHouse Keeper + Cluster (kustomize + envsubst)" +# We envsubst only image placeholders present in manifests. +kubectl kustomize "${KUSTOMIZE_DIR}" | envsubst '${CLICKHOUSE_SERVER_IMAGE} ${CLICKHOUSE_KEEPER_IMAGE}' | kubectl apply -n "${SERVICE_NAMESPACE}" -f - + +echo "==> Service endpoint (HTTP 8123)" +kubectl -n "${SERVICE_NAMESPACE}" get svc clickhouse-http -o wide + +# Print connection hints using stored plaintext (if present) +WRITER_PLAIN="$(kubectl -n "${SERVICE_NAMESPACE}" get secret clickhouse-db-passwords -o jsonpath='{.data.writer_password_plain}' 2>/dev/null | base64 -d || true)" +READER_PLAIN="$(kubectl -n "${SERVICE_NAMESPACE}" get secret clickhouse-db-passwords -o jsonpath='{.data.reader_password_plain}' 2>/dev/null | base64 -d || true)" + +# Print out the in-cluster endpoint, and various service info +cat </dev/null || { echo "Missing required command: $1" >&2; exit 1; }; } +need helm +need kubectl +need openssl +need yq + +# --- Ensure namespace exists with Helm ownership labels --- +# Create namespace with Helm labels so Helm can adopt it during install +echo "==> Ensuring namespace '${SERVICE_NAMESPACE}' exists with Helm ownership" +if ! kubectl get ns "${SERVICE_NAMESPACE}" >/dev/null 2>&1; then + kubectl create ns "${SERVICE_NAMESPACE}" +fi +# Add Helm ownership labels/annotations so Helm can manage it +kubectl label ns "${SERVICE_NAMESPACE}" app.kubernetes.io/managed-by=Helm --overwrite +kubectl annotate ns "${SERVICE_NAMESPACE}" meta.helm.sh/release-name="${SERVICE_NAME}" --overwrite +kubectl annotate ns "${SERVICE_NAMESPACE}" meta.helm.sh/release-namespace="${SERVICE_NAMESPACE}" --overwrite + +# --- Read image versions from YAML --- +if [ ! -f "$VERSION_FILE" ]; then + echo "Error: helm-chart-versions.yaml not found at $VERSION_FILE" >&2 + exit 1 +fi + +# Extract IPFIX image versions +IPFIX_COLLECTOR_IMAGE=$(yq eval '.charts.ipfix.images.collector // ""' "$VERSION_FILE") +IPFIX_S3_IMAGE=$(yq eval '.charts.ipfix.images.s3 // "docker.io/amazon/aws-cli:latest"' "$VERSION_FILE") +IPFIX_OVS_EXPORTER_IMAGE=$(yq eval '.charts.ipfix.images.ovsExporter // "docker.io/openvswitch/ovs:3.4.1"' "$VERSION_FILE") + +if [ -z "$IPFIX_COLLECTOR_IMAGE" ]; then + echo "Error: Could not extract IPFIX collector image from $VERSION_FILE" >&2 + echo "Please ensure charts.ipfix.images.collector is defined" >&2 + exit 1 +fi + +echo "Found IPFIX collector image: $IPFIX_COLLECTOR_IMAGE" +echo "Found S3 image: $IPFIX_S3_IMAGE" +echo "Found OVS exporter image: $IPFIX_OVS_EXPORTER_IMAGE" + +# --- Copy ClickHouse credentials to ipfix namespace --- +echo "==> Setting up ClickHouse credentials in namespace '${SERVICE_NAMESPACE}'" +if ! kubectl -n clickhouse get secret clickhouse-db-passwords >/dev/null 2>&1; then + echo "Error: clickhouse-db-passwords secret not found in clickhouse namespace" >&2 + echo "Please install ClickHouse first: ./bin/install-clickhouse.sh" >&2 + exit 1 +fi + +WRITER_PASS="$(kubectl -n clickhouse get secret clickhouse-db-passwords -o jsonpath='{.data.writer_password_plain}' | base64 -d)" +kubectl -n "${SERVICE_NAMESPACE}" create secret generic ipfix-clickhouse-creds \ + --from-literal=password="${WRITER_PASS}" \ + --dry-run=client -o yaml | kubectl apply -f - +echo " Synced clickhouse credentials to ${SERVICE_NAMESPACE}/ipfix-clickhouse-creds" + +# Prepare an array to collect -f arguments +overrides_args=() + +# Include all YAML files from the BASE configuration directory +if [[ -d "$SERVICE_BASE_OVERRIDES" ]]; then + echo "Including base overrides from directory: $SERVICE_BASE_OVERRIDES" + for file in "$SERVICE_BASE_OVERRIDES"/*.yaml; do + if [[ -e "$file" ]]; then + echo " - $file" + overrides_args+=("-f" "$file") + fi + done +else + echo "Warning: Base override directory not found: $SERVICE_BASE_OVERRIDES" +fi + +# Include all YAML files from the GLOBAL configuration directory +if [[ -d "$GLOBAL_OVERRIDES_DIR" ]]; then + echo "Including overrides from global config directory:" + for file in "$GLOBAL_OVERRIDES_DIR"/*.yaml; do + if [[ -e "$file" ]]; then + echo " - $file" + overrides_args+=("-f" "$file") + fi + done +else + echo "Warning: Global config directory not found: $GLOBAL_OVERRIDES_DIR" +fi + +# Include all YAML files from the custom SERVICE configuration directory +if [[ -d "$SERVICE_CUSTOM_OVERRIDES" ]]; then + echo "Including overrides from service config directory:" + for file in "$SERVICE_CUSTOM_OVERRIDES"/*.yaml; do + if [[ -e "$file" ]]; then + echo " - $file" + overrides_args+=("-f" "$file") + fi + done +else + echo "Warning: Service config directory not found: $SERVICE_CUSTOM_OVERRIDES" +fi + +echo + +# Collect all --set arguments for image overrides +set_args=( + --set "ipfix.images.collector=${IPFIX_COLLECTOR_IMAGE}" + --set "ipfix.images.s3=${IPFIX_S3_IMAGE}" + --set "ipfix.images.ovsExporter=${IPFIX_OVS_EXPORTER_IMAGE}" +) + +helm_command=( + helm upgrade --install "$SERVICE_NAME" "$HELM_CHART_PATH" + --namespace="$SERVICE_NAMESPACE" + --timeout 120m + --create-namespace + + "${overrides_args[@]}" + "${set_args[@]}" + + # Post-renderer configuration + --post-renderer "$GENESTACK_OVERRIDES_DIR/kustomize/kustomize.sh" + --post-renderer-args "$SERVICE_NAME/overlay" + + "$@" +) + +echo "==> Executing Helm command (arguments are quoted safely):" +printf '%q ' "${helm_command[@]}" +echo + +# Execute the command directly from the array +"${helm_command[@]}" + +echo "==> Waiting for IPFIX collector DaemonSet to be ready" +kubectl -n "${SERVICE_NAMESPACE}" rollout status daemonset/ipfix-collector --timeout=300s || true + +echo +cat <:4739 + +EOF diff --git a/helm-chart-versions.yaml b/helm-chart-versions.yaml index 5326454e4..58c431ae0 100755 --- a/helm-chart-versions.yaml +++ b/helm-chart-versions.yaml @@ -7,6 +7,9 @@ charts: ceilometer: 2024.2.115+13651f45-628a320c cert-manager: v1.19.2 cinder: 2024.2.409+13651f45-628a320c + clickhouse-operator: 0.25.5 + clickhouse-server: 24.8.14.10544.altinitystable + clickhouse-keeper: 24.8.14.10544.altinitystable-alpine cloudkitty: 2025.1.2+ebb1488dc designate: 2025.2.6+f3906fe15 envoyproxy-gateway: v1.7.0 @@ -17,6 +20,11 @@ charts: grafana: 10.1.0 heat: 2024.2.294+13651f45-628a320c horizon: 2024.2.264+13651f45-628a320c + ipfix: + images: + collector: ghcr.io/lukerepko/genestack/ipfix-collector:1.0.0-alpine + s3: docker.io/amazon/aws-cli:latest + ovsExporter: ghcr.io/rackerlabs/genestack-images/ovs:v3.5.1-latest ironic: 2024.2.121+13651f45-628a320c keystone: 2024.2.386+13651f45-628a320c kubernetes-event-exporter: 3.6.3 diff --git a/helm-charts/ipfix/.helmignore b/helm-charts/ipfix/.helmignore new file mode 100644 index 000000000..29b560c93 --- /dev/null +++ b/helm-charts/ipfix/.helmignore @@ -0,0 +1,7 @@ +# Patterns to ignore when building packages. +*.swp +*.bak +*.tmp +*.orig +*~ +.DS_Store diff --git a/helm-charts/ipfix/Chart.yaml b/helm-charts/ipfix/Chart.yaml new file mode 100644 index 000000000..1bd97d6dc --- /dev/null +++ b/helm-charts/ipfix/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: ipfix +description: IPFIX flow collector for bandwidth telemetry +type: application +version: 0.1.0 +appVersion: "1.0" diff --git a/helm-charts/ipfix/README.md b/helm-charts/ipfix/README.md new file mode 100644 index 000000000..f6f549cfd --- /dev/null +++ b/helm-charts/ipfix/README.md @@ -0,0 +1,238 @@ +# IPFIX Collector Helm Chart + +IPFIX flow collection and aggregation for bandwidth telemetry. Collects flows from OVS bridges and aggregates them into ClickHouse for billing and analytics. + +## Components + +| Component | Type | Description | +|-----------|------|-------------| +| `ipfix-collector` | DaemonSet | Receives IPFIX flows (nfcapd) and rolls up to ClickHouse | +| `ipfix-ovs-exporter` | DaemonSet | Configures OVS bridges to export IPFIX flows | +| `ipfix-clickhouse-schema-init` | Job | Initializes ClickHouse database and tables (runs on install/upgrade) | + +## Prerequisites + +- ClickHouse must be installed and running +- The `clickhouse-db-passwords` secret must exist in the `clickhouse` namespace +- Nodes must have the `openstack-network-node: enabled` label + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Node (per node) │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌─────────────┐ UDP 4739 ┌─────────────────────────┐ │ +│ │ OVS │ ─────────────────► │ ipfix-collector pod │ │ +│ │ br-ex │ │ ┌─────────┐ ┌────────┐ │ │ +│ │ │ │ │ nfcapd │ │ rollup │ │ │ +│ └─────────────┘ │ └────┬────┘ └───┬────┘ │ │ +│ ▲ │ │ │ │ │ +│ │ │ ▼ │ │ │ +│ ┌─────┴───────┐ │ /var/lib/ipfix │ │ │ +│ │ ovs-exporter│ │ │ │ │ │ +│ │ pod │ └───────┼──────────┼──────┘ │ +│ └─────────────┘ │ │ │ +│ │ ▼ │ +└─────────────────────────────────────────────┼───────────────────┘ + │ ClickHouse + │ (cluster) + └────────► +``` + +## Troubleshooting + +### Kubernetes Level + +```bash +# Check pod status +kubectl -n ipfix get pods -o wide + +# Check collector logs +kubectl -n ipfix logs -l app=ipfix-collector -c nfcapd --tail=100 +kubectl -n ipfix logs -l app=ipfix-collector -c rollup --tail=100 -f + +# Check OVS exporter logs +kubectl -n ipfix logs -l app=ipfix-ovs-exporter --tail=100 + +# Describe pods for events +kubectl -n ipfix describe pod -l app=ipfix-collector +kubectl -n ipfix describe pod -l app=ipfix-ovs-exporter +``` + +### Host Level - OVS Exporter + +SSH to a node with `openstack-network-node=enabled` label. + +```bash +# Verify OVS bridge exists +ovs-vsctl list-br + +# Check IPFIX configuration on bridge +ovs-vsctl list IPFIX + +# Expected output shows: +# targets: ["127.0.0.1:4739"] +# obs_domain_id: 1001 +# obs_point_id: +# cache_active_timeout: 60 +# cache_max_flows: 4096 + +# Check which bridge has IPFIX configured +ovs-vsctl get Bridge br-ex ipfix + +# View IPFIX statistics (if available) +ovs-ofctl dump-ipfix-bridge br-ex +ovs-ofctl dump-ipfix-flow br-ex + +# Manually clear IPFIX config (if needed for debugging) +# WARNING: This stops flow export until pod reconfigures +ovs-vsctl clear Bridge br-ex ipfix + +# Check OVS logs for IPFIX errors +journalctl -u ovs-vswitchd --since "1 hour ago" | grep -i ipfix +``` + +### Host Level - Collector + +SSH to a node with `openstack-network-node=enabled` label. + +```bash +# Check flow files directory +ls -la /var/lib/ipfix/ + +# Expected files: +# nfcapd.current. - Currently being written (active) +# nfcapd.YYYYMMDDHHmm - Rotated files (ready for processing) + +# Check processed files +ls -la /var/lib/ipfix/processed/ + +# View flow file contents (requires nfdump) +nfdump -r /var/lib/ipfix/nfcapd.202602251430 | head -20 + +# Check disk usage +du -sh /var/lib/ipfix/ +du -sh /var/lib/ipfix/processed/ + +# Verify nfcapd is listening +ss -ulnp | grep 4739 + +# Check for UDP packet drops (kernel level) +cat /proc/net/udp | head -5 +netstat -su | grep -A5 "Udp:" + +# Check socket buffer settings +sysctl net.core.rmem_max +sysctl net.core.rmem_default +``` + +### Verifying Flow Export + +```bash +# On the node, capture IPFIX traffic to verify flows are being sent +tcpdump -i lo -n udp port 4739 -c 10 + +# Expected: UDP packets from 127.0.0.1 to 127.0.0.1:4739 + +# If no packets, check OVS IPFIX config and bridge traffic +ovs-vsctl list IPFIX +ovs-ofctl dump-flows br-ex | head -10 +``` + +### ClickHouse Schema Issues + +If the `ipfix` database or tables don't exist: + +```bash +# Check if schema init job ran successfully +kubectl -n ipfix get jobs +kubectl -n ipfix logs job/ipfix-clickhouse-schema-init + +# Re-run schema init by deleting and re-installing +kubectl -n ipfix delete job ipfix-clickhouse-schema-init +# Then re-run: bin/install-ipfix.sh + +# Or manually verify tables exist +kubectl -n clickhouse exec -it chi-server-ipfix-0-0-0 -- \ + clickhouse-client --query "SHOW TABLES FROM ipfix" +``` + +### ClickHouse Verification + +```bash +# Port-forward to ClickHouse +kubectl -n clickhouse port-forward svc/clickhouse-http 8123:8123 & + +# Get credentials +READER_PASS=$(kubectl -n clickhouse get secret clickhouse-db-passwords \ + -o jsonpath='{.data.reader_password_plain}' | base64 -d) + +# Check if data is arriving +curl -s "http://localhost:8123/?user=reader&password=${READER_PASS}" \ + --data "SELECT count() FROM ipfix.vip_hourly_node" + +# Check recent data by node +curl -s "http://localhost:8123/?user=reader&password=${READER_PASS}" \ + --data "SELECT node, count(), sum(bytes) FROM ipfix.vip_hourly_node + WHERE hour_ts > now() - INTERVAL 1 HOUR + GROUP BY node ORDER BY sum(bytes) DESC" + +# Check data freshness +curl -s "http://localhost:8123/?user=reader&password=${READER_PASS}" \ + --data "SELECT node, max(hour_ts) as latest FROM ipfix.vip_hourly_node GROUP BY node" +``` + +## Common Issues + +### No flow files appearing in /var/lib/ipfix/ + +1. Check OVS IPFIX is configured: `ovs-vsctl list IPFIX` +2. Check nfcapd is running: `ss -ulnp | grep 4739` +3. Check there's actual traffic on the bridge: `ovs-ofctl dump-flows br-ex` +4. Verify OVS exporter pod is running: `kubectl -n ipfix get pods -l app=ipfix-ovs-exporter` + +### Flow files not being processed (piling up) + +1. Check rollup container logs: `kubectl -n ipfix logs -l app=ipfix-collector -c rollup` +2. Verify ClickHouse connectivity from the node +3. Check ClickHouse credentials secret exists: `kubectl -n ipfix get secret ipfix-clickhouse-creds` +4. Files younger than 60 seconds are skipped (still being written) + +### OVS exporter fails to configure + +1. Check bridge exists: `ovs-vsctl br-exists br-ex` +2. Check pod has privileged access +3. Check `/var/run/openvswitch` is mounted +4. Review pod logs for specific error + +### High memory usage in rollup container + +1. Large flow files can consume memory during parsing +2. Consider reducing `rotateSeconds` to create smaller files +3. Check for backlog of unprocessed files + +## File Locations + +| Path | Description | +|------|-------------| +| `/var/lib/ipfix/` | Flow files directory (hostPath) | +| `/var/lib/ipfix/nfcapd.current.*` | Active file being written | +| `/var/lib/ipfix/nfcapd.YYYYMMDDHHmm` | Rotated files awaiting processing | +| `/var/lib/ipfix/processed/` | Successfully uploaded files | +| `/var/run/openvswitch/` | OVS socket directory | +| `/scripts/` | Mounted ConfigMap scripts | + +## Configuration Reference + +See `values.yaml` for all configuration options. Key settings: + +| Setting | Default | Description | +|---------|---------|-------------| +| `ipfix.listenPort` | 4739 | UDP port for IPFIX collection | +| `ipfix.rotateSeconds` | 300 | Flow file rotation interval | +| `ipfix.rollupEverySeconds` | 300 | How often to process and upload | +| `ipfix.ovs.providerBridge` | br-ex | OVS bridge to export from | +| `ipfix.ovs.cacheActiveTimeout` | 60 | Export active flows every N seconds | +| `ipfix.ovs.samplingRate` | 1 | 1 = all packets, N = 1 in N | diff --git a/helm-charts/ipfix/templates/_helpers.tpl b/helm-charts/ipfix/templates/_helpers.tpl new file mode 100644 index 000000000..de687417b --- /dev/null +++ b/helm-charts/ipfix/templates/_helpers.tpl @@ -0,0 +1,49 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "ipfix.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +*/}} +{{- define "ipfix.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- if contains $name .Release.Name }} +{{- .Release.Name | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} +{{- end }} + +{{/* +Create chart name and version as used by the chart label. +*/}} +{{- define "ipfix.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Common labels +*/}} +{{- define "ipfix.labels" -}} +helm.sh/chart: {{ include "ipfix.chart" . }} +{{ include "ipfix.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{/* +Selector labels +*/}} +{{- define "ipfix.selectorLabels" -}} +app.kubernetes.io/name: {{ include "ipfix.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} diff --git a/helm-charts/ipfix/templates/clickhouse-schema-job.yaml b/helm-charts/ipfix/templates/clickhouse-schema-job.yaml new file mode 100644 index 000000000..8d9bd8fb1 --- /dev/null +++ b/helm-charts/ipfix/templates/clickhouse-schema-job.yaml @@ -0,0 +1,121 @@ +{{- if .Values.ipfix.clickhouse.initSchema.enabled }} +apiVersion: batch/v1 +kind: Job +metadata: + name: ipfix-clickhouse-schema-init + namespace: {{ .Values.ipfix.namespace }} + labels: + app: ipfix-schema-init + annotations: + # Helm hooks to run after install/upgrade + "helm.sh/hook": post-install,post-upgrade + "helm.sh/hook-weight": "-5" + "helm.sh/hook-delete-policy": before-hook-creation +spec: + ttlSecondsAfterFinished: 3600 + backoffLimit: 5 + template: + metadata: + labels: + app: ipfix-schema-init + spec: + restartPolicy: OnFailure + containers: + - name: init-schema + image: docker.io/curlimages/curl:8.5.0 + command: + - /bin/sh + - -c + - | + set -e + echo "=== IPFIX ClickHouse Schema Initialization ===" + echo "ClickHouse URL: {{ .Values.ipfix.clickhouse.httpUrl }}" + echo "Cluster name: {{ .Values.ipfix.clickhouse.clusterName }}" + + CH_URL="{{ .Values.ipfix.clickhouse.httpUrl }}" + CH_USER="{{ .Values.ipfix.clickhouse.initSchema.user }}" + + echo "Waiting for ClickHouse to be ready..." + until curl -sf "${CH_URL}/?user=${CH_USER}&password=${CLICKHOUSE_PASSWORD}" -d "SELECT 1" >/dev/null 2>&1; do + echo "ClickHouse not ready, waiting 5s..." + sleep 5 + done + echo "ClickHouse is ready!" + + run_query() { + echo "Running: $1" + HTTP_CODE=$(curl -s -w "%{http_code}" -o /tmp/response.txt "${CH_URL}/?user=${CH_USER}&password=${CLICKHOUSE_PASSWORD}" -d "$1") + RESULT=$(cat /tmp/response.txt) + if [ "$HTTP_CODE" != "200" ]; then + echo "Query failed (HTTP $HTTP_CODE):" + echo "$RESULT" + return 1 + fi + [ -n "$RESULT" ] && echo "$RESULT" + return 0 + } + + echo "" + echo "Creating database..." + run_query "CREATE DATABASE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }} ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}'" + + echo "" + echo "Creating vip_hourly_node table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.{{ .Values.ipfix.clickhouse.table }} ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' (hour_ts DateTime, vip LowCardinality(String), dir LowCardinality(String), bytes UInt64, packets UInt64, node LowCardinality(String)) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/{{ .Values.ipfix.clickhouse.table }}','{replica}') PARTITION BY toYYYYMMDD(hour_ts) ORDER BY (hour_ts, vip, dir, node)" + + echo "" + echo "Creating vip_hourly_mv table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.vip_hourly_mv ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' (hour_ts DateTime, vip LowCardinality(String), dir LowCardinality(String), bytes UInt64, packets UInt64) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/vip_hourly_mv','{replica}') PARTITION BY toYYYYMMDD(hour_ts) ORDER BY (hour_ts, vip, dir)" + + echo "" + echo "Creating vip_hourly_mv materialized view..." + run_query "CREATE MATERIALIZED VIEW IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.vip_hourly_mv__mv ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' TO {{ .Values.ipfix.clickhouse.database }}.vip_hourly_mv AS SELECT hour_ts, vip, dir, sum(bytes) AS bytes, sum(packets) AS packets FROM {{ .Values.ipfix.clickhouse.database }}.{{ .Values.ipfix.clickhouse.table }} GROUP BY hour_ts, vip, dir" + + echo "" + echo "Creating vip_hourly_node_dist distributed table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.vip_hourly_node_dist ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' AS {{ .Values.ipfix.clickhouse.database }}.{{ .Values.ipfix.clickhouse.table }} ENGINE = Distributed('{{ .Values.ipfix.clickhouse.clusterName }}', '{{ .Values.ipfix.clickhouse.database }}', '{{ .Values.ipfix.clickhouse.table }}', rand())" + + echo "" + echo "Creating vip_hourly_mv_dist distributed table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.vip_hourly_mv_dist ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' AS {{ .Values.ipfix.clickhouse.database }}.vip_hourly_mv ENGINE = Distributed('{{ .Values.ipfix.clickhouse.clusterName }}', '{{ .Values.ipfix.clickhouse.database }}', 'vip_hourly_mv', rand())" + + echo "" + echo "Creating flows_local table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.flows_local ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' (flow_start DateTime64(3), flow_end DateTime64(3), fip IPv6, src_ip IPv6, dst_ip IPv6, src_port UInt16, dst_port UInt16, proto UInt8, bytes UInt64, packets UInt64, exporter_id LowCardinality(String)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flows_local','{replica}') PARTITION BY toDate(flow_start) ORDER BY (fip, flow_start, src_ip, dst_ip, src_port, dst_port, proto) SETTINGS index_granularity = 8192" + + echo "" + echo "Creating flows_dist distributed table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.flows_dist ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' AS {{ .Values.ipfix.clickhouse.database }}.flows_local ENGINE = Distributed('{{ .Values.ipfix.clickhouse.clusterName }}', '{{ .Values.ipfix.clickhouse.database }}', 'flows_local', cityHash64(fip))" + + echo "" + echo "Creating flows_by_fip_1h table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.flows_by_fip_1h ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' (ts DateTime, fip IPv6, bytes UInt64, packets UInt64) ENGINE = ReplicatedSummingMergeTree('/clickhouse/tables/{shard}/flows_by_fip_1h','{replica}') PARTITION BY toDate(ts) ORDER BY (fip, ts)" + + echo "" + echo "Creating flows_by_fip_1h_dist distributed table..." + run_query "CREATE TABLE IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.flows_by_fip_1h_dist ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' AS {{ .Values.ipfix.clickhouse.database }}.flows_by_fip_1h ENGINE = Distributed('{{ .Values.ipfix.clickhouse.clusterName }}', '{{ .Values.ipfix.clickhouse.database }}', 'flows_by_fip_1h', cityHash64(fip))" + + echo "" + echo "Creating mv_flows_by_fip_1h materialized view..." + run_query "CREATE MATERIALIZED VIEW IF NOT EXISTS {{ .Values.ipfix.clickhouse.database }}.mv_flows_by_fip_1h ON CLUSTER '{{ .Values.ipfix.clickhouse.clusterName }}' TO {{ .Values.ipfix.clickhouse.database }}.flows_by_fip_1h AS SELECT toStartOfHour(flow_start) AS ts, fip, sum(bytes) AS bytes, sum(packets) AS packets FROM {{ .Values.ipfix.clickhouse.database }}.flows_local GROUP BY ts, fip" + + echo "" + echo "=== Schema initialization complete! ===" + echo "Verifying tables..." + run_query "SHOW TABLES FROM {{ .Values.ipfix.clickhouse.database }}" + echo "" + echo "Done!" + env: + - name: CLICKHOUSE_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.ipfix.clickhouse.secretName }} + key: password + resources: + requests: + cpu: 50m + memory: 32Mi + limits: + cpu: 200m + memory: 64Mi +{{- end }} diff --git a/helm-charts/ipfix/templates/collector-daemonset.yaml b/helm-charts/ipfix/templates/collector-daemonset.yaml new file mode 100644 index 000000000..5660d160d --- /dev/null +++ b/helm-charts/ipfix/templates/collector-daemonset.yaml @@ -0,0 +1,171 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: ipfix-collector + namespace: {{ .Values.ipfix.namespace }} + labels: + app: ipfix-collector +spec: + selector: + matchLabels: + app: ipfix-collector + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 20% + template: + metadata: + labels: + app: ipfix-collector + annotations: + # Force pod restart when configmap or values change + checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }} + spec: + nodeSelector: + {{- range $key, $value := .Values.ipfix.nodeSelector }} + {{ $key }}: {{ $value | quote }} + {{- end }} + hostNetwork: true + dnsPolicy: ClusterFirstWithHostNet + + # Init container to set up directory structure with proper permissions + initContainers: + - name: init-dirs + image: {{ .Values.ipfix.images.collector }} + command: ["/bin/sh", "-c"] + args: + - | + mkdir -p /var/lib/ipfix/processed + chmod 755 /var/lib/ipfix /var/lib/ipfix/processed + echo "Directory structure initialized" + volumeMounts: + - name: flows + mountPath: /var/lib/ipfix + securityContext: + runAsUser: 0 + + containers: + - name: nfcapd + image: {{ .Values.ipfix.images.collector }} + command: ["/bin/sh", "/scripts/nfcapd-entrypoint.sh"] + securityContext: + runAsUser: 0 + ports: + - name: ipfix + containerPort: {{ .Values.ipfix.listenPort }} + protocol: UDP + hostPort: {{ .Values.ipfix.listenPort }} + volumeMounts: + - name: flows + mountPath: /var/lib/ipfix + - name: scripts + mountPath: /scripts + resources: + {{- toYaml .Values.ipfix.resources.nfcapd | nindent 10 }} + + - name: rollup + image: {{ .Values.ipfix.images.collector }} + command: ["/bin/sh", "-c"] + args: + - | + echo "Starting rollup loop (every {{ .Values.ipfix.rollupEverySeconds }}s)" + while true; do + sleep {{ .Values.ipfix.rollupEverySeconds }} + echo "$(date): Running rollup" + python3 /scripts/rollup.py || echo "Rollup failed, will retry" + done + securityContext: + runAsUser: 0 + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: CLICKHOUSE_URL + value: {{ .Values.ipfix.clickhouse.httpUrl | quote }} + - name: CLICKHOUSE_USER + value: "writer" + - name: CLICKHOUSE_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.ipfix.clickhouse.secretName }} + key: password + - name: CLICKHOUSE_DATABASE + value: {{ .Values.ipfix.clickhouse.database | quote }} + - name: CLICKHOUSE_TABLE + value: {{ .Values.ipfix.clickhouse.table | quote }} + - name: NFDUMP_FILTER + value: {{ .Values.ipfix.nfdumpFilter | quote }} + {{- if .Values.ipfix.vipListConfigMap }} + - name: VIP_LIST_FILE + value: "/etc/vips/vips.txt" + {{- end }} + volumeMounts: + - name: flows + mountPath: /var/lib/ipfix + - name: scripts + mountPath: /scripts + {{- if .Values.ipfix.vipListConfigMap }} + - name: vip-list + mountPath: /etc/vips + {{- end }} + resources: + {{- toYaml .Values.ipfix.resources.rollup | nindent 10 }} + + {{- if .Values.ipfix.s3.enabled }} + - name: s3-archive + image: {{ .Values.ipfix.images.s3 }} + command: ["/bin/sh", "-c"] + args: + - | + echo "Starting S3 archival loop (every 3600s)" + while true; do + sleep 3600 + echo "$(date): Running S3 archival" + /scripts/s3-archive.sh || echo "S3 archival failed, will retry" + done + securityContext: + runAsUser: 0 + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: {{ .Values.ipfix.s3.secretName }} + key: access_key_id + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: {{ .Values.ipfix.s3.secretName }} + key: secret_access_key + volumeMounts: + - name: flows + mountPath: /var/lib/ipfix + - name: scripts + mountPath: /scripts + resources: + requests: + cpu: 100m + memory: 256Mi + limits: + cpu: 500m + memory: 512Mi + {{- end }} + + volumes: + - name: flows + hostPath: + path: /var/lib/ipfix + type: DirectoryOrCreate + - name: scripts + configMap: + name: ipfix-collector-scripts + defaultMode: 0755 + {{- if .Values.ipfix.vipListConfigMap }} + - name: vip-list + configMap: + name: {{ .Values.ipfix.vipListConfigMap }} + {{- end }} diff --git a/helm-charts/ipfix/templates/configmap.yaml b/helm-charts/ipfix/templates/configmap.yaml new file mode 100644 index 000000000..6ec9c4f98 --- /dev/null +++ b/helm-charts/ipfix/templates/configmap.yaml @@ -0,0 +1,575 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: ipfix-collector-scripts + namespace: {{ .Values.ipfix.namespace }} +data: + nfcapd-entrypoint.sh: | + #!/bin/sh + set -e + + # Create flow directory + mkdir -p /var/lib/ipfix + mkdir -p /var/lib/ipfix/processed + + echo "=== nfcapd configuration ===" + echo "Flow directory: /var/lib/ipfix" + echo "Listen port: {{ .Values.ipfix.listenPort }}" + echo "Rotation interval: {{ .Values.ipfix.rotateSeconds }}s" + echo "Compression: {{ if .Values.ipfix.compress }}LZO{{ else }}none{{ end }}" + echo "Ident: any" + echo "" + echo "File naming convention:" + echo " Active file: nfcapd.current." + echo " Rotated files: nfcapd.YYYYMMDDHHmm" + echo "" + echo "Starting nfcapd with verbose output..." + + # Run nfcapd in foreground with verbose logging + # nfcapd 1.7.6+ syntax: + # -w: output directory for flow files + # -p: UDP port to listen on + # -t: rotation interval in seconds + # -z=lzo: compress rotated files with LZO + # -S 0: no subdirectory hierarchy (flat structure) + # -P: PID file + # -I: ident string in filenames + # -v: verbose (can use multiple times for more verbosity) + exec nfcapd \ + -w /var/lib/ipfix \ + -p {{ .Values.ipfix.listenPort }} \ + -t {{ .Values.ipfix.rotateSeconds }} \ + {{ if .Values.ipfix.compress }}-z=lzo{{ end }} \ + -S 0 \ + -P /var/run/nfcapd.pid \ + -I any \ + -v + + rollup.py: | + #!/usr/bin/env python3 + """ + IPFIX flow rollup processor. + Reads nfcapd binary files, aggregates by VIP/node/hour, inserts to ClickHouse. + + Features: + - Exponential backoff with jitter for ClickHouse retries + - Files only moved to processed/ after successful upload + - Failed files remain in place for retry on next rollup cycle + - Per-file processing to avoid losing data on partial failures + """ + import os + import sys + import time + import glob + import random + import subprocess + import json + from datetime import datetime + from collections import defaultdict + import requests + from requests.adapters import HTTPAdapter + from urllib3.util.retry import Retry + + # Configuration from environment + FLOW_DIR = "/var/lib/ipfix" + PROCESSED_DIR = "/var/lib/ipfix/processed" + CH_URL = os.getenv("CLICKHOUSE_URL", "http://clickhouse-http.clickhouse.svc.cluster.local:8123") + CH_USER = os.getenv("CLICKHOUSE_USER", "writer") + CH_PASS = os.getenv("CLICKHOUSE_PASSWORD", "") + CH_DB = os.getenv("CLICKHOUSE_DATABASE", "ipfix") + CH_TABLE = os.getenv("CLICKHOUSE_TABLE", "vip_hourly_node") + NODE_NAME = os.getenv("NODE_NAME", "unknown") + VIP_LIST_FILE = os.getenv("VIP_LIST_FILE", "") + NFDUMP_FILTER = os.getenv("NFDUMP_FILTER", "") + + # Retry configuration + MAX_RETRIES = 5 + BASE_DELAY = 1.0 # seconds + MAX_DELAY = 60.0 # seconds + JITTER_FACTOR = 0.5 # +/- 50% jitter + + # File age threshold (seconds) - skip files still being written + MIN_FILE_AGE = 60 + + + def get_delay_with_jitter(attempt: int) -> float: + """Calculate exponential backoff delay with jitter.""" + delay = min(BASE_DELAY * (2 ** attempt), MAX_DELAY) + jitter = delay * JITTER_FACTOR * (2 * random.random() - 1) + return max(0.1, delay + jitter) + + + def create_session() -> requests.Session: + """Create a requests session with connection pooling.""" + session = requests.Session() + adapter = HTTPAdapter( + pool_connections=1, + pool_maxsize=1, + max_retries=0 # We handle retries manually for better control + ) + session.mount("http://", adapter) + session.mount("https://", adapter) + return session + + + def load_vip_list() -> set | None: + """Load VIP list from file if configured.""" + if not VIP_LIST_FILE or not os.path.exists(VIP_LIST_FILE): + return None + try: + with open(VIP_LIST_FILE) as f: + vips = set(line.strip() for line in f if line.strip()) + print(f"Loaded {len(vips)} VIPs from {VIP_LIST_FILE}") + return vips + except Exception as e: + print(f"Warning: Failed to load VIP list: {e}", file=sys.stderr) + return None + + + def parse_nfcapd_file(filepath: str, vip_set: set | None = None) -> dict: + """Parse nfcapd file and aggregate flows by hour/VIP/direction.""" + cmd = ["nfdump", "-r", filepath, "-o", "json", "-q"] + + # Add filter to exclude private IPs if configured + if NFDUMP_FILTER: + cmd.append(NFDUMP_FILTER) + print(f"Applying nfdump filter: {NFDUMP_FILTER}") + + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) + except subprocess.TimeoutExpired: + print(f"Timeout parsing {filepath}", file=sys.stderr) + return {} + + if result.returncode != 0: + print(f"Error reading {filepath}: {result.stderr}", file=sys.stderr) + return {} + + aggregates = defaultdict(lambda: {"bytes": 0, "packets": 0}) + parse_errors = 0 + flow_count = 0 + + # nfdump -o json outputs a JSON array, not newline-delimited objects + # Parse the entire output as a JSON array + try: + flows = json.loads(result.stdout) + if not isinstance(flows, list): + flows = [flows] + print(f"Parsed {len(flows)} flow records from JSON") + except json.JSONDecodeError as e: + print(f"Failed to parse JSON from {filepath}: {e}", file=sys.stderr) + print(f"First 500 chars of output: {result.stdout[:500]}", file=sys.stderr) + return {} + + for flow in flows: + + try: + # Skip non-flow entries (nfdump may include stat records) + if flow.get("type") != "FLOW": + continue + + # Get IP addresses - nfdump uses src4_addr/dst4_addr for IPv4 + # and src6_addr/dst6_addr for IPv6 + src_ip = flow.get("src4_addr") or flow.get("src6_addr") or flow.get("src_ip", "") + dst_ip = flow.get("dst4_addr") or flow.get("dst6_addr") or flow.get("dst_ip", "") + + # Get byte/packet counts - nfdump uses in_bytes/in_packets + bytes_val = int(flow.get("in_bytes", 0) or flow.get("bytes", 0)) + packets_val = int(flow.get("in_packets", 0) or flow.get("packets", 0)) + + # Use 'received' timestamp (when nfcapd got the flow) since OVS + # sends relative timestamps in 'first'/'last' that show as 1970 dates + # Format: "2026-03-10T19:34:59.905" + received_str = flow.get("received", "") + if not received_str: + continue + + try: + # Parse ISO format timestamp, handle milliseconds + if "." in received_str: + received_dt = datetime.strptime(received_str, "%Y-%m-%dT%H:%M:%S.%f") + else: + received_dt = datetime.strptime(received_str, "%Y-%m-%dT%H:%M:%S") + except ValueError: + continue + + # Determine hour bucket from received time + hour_ts = received_dt.replace(minute=0, second=0, microsecond=0) + + # Check if src or dst is a VIP (or process all if no VIP list) + for ip, direction in [(src_ip, "from"), (dst_ip, "to")]: + if ip and (vip_set is None or ip in vip_set): + key = (hour_ts, ip, direction) + aggregates[key]["bytes"] += bytes_val + aggregates[key]["packets"] += packets_val + flow_count += 1 + + except (KeyError, ValueError, TypeError) as e: + parse_errors += 1 + if parse_errors <= 5: + print(f"Error parsing flow: {e}", file=sys.stderr) + continue + + if parse_errors > 5: + print(f"... and {parse_errors - 5} more parse errors", file=sys.stderr) + + if flow_count > 0: + print(f"Parsed {flow_count} flow records into {len(aggregates)} aggregates") + + return dict(aggregates) + + + def insert_to_clickhouse(session: requests.Session, aggregates: dict) -> bool: + """ + Insert aggregated data to ClickHouse with exponential backoff retry. + Returns True on success, False on failure after all retries exhausted. + """ + if not aggregates: + return True + + # Build INSERT query + values = [] + for (hour_ts, vip, direction), stats in aggregates.items(): + # Escape single quotes in VIP (shouldn't happen but be safe) + safe_vip = vip.replace("'", "\\'") + values.append( + f"('{hour_ts.strftime('%Y-%m-%d %H:%M:%S')}', '{safe_vip}', " + f"'{direction}', {stats['bytes']}, {stats['packets']}, '{NODE_NAME}')" + ) + + query = f"INSERT INTO {CH_DB}.{CH_TABLE} (hour_ts, vip, dir, bytes, packets, node) VALUES {','.join(values)}" + + # Log query details (truncate if too long) + query_preview = query[:500] + "..." if len(query) > 500 else query + print(f"ClickHouse query ({len(values)} rows, {len(query)} bytes): {query_preview}") + + last_error = None + for attempt in range(MAX_RETRIES): + try: + print(f"Attempt {attempt + 1}/{MAX_RETRIES}: POST to {CH_URL}") + response = session.post( + CH_URL, + params={"user": CH_USER, "password": CH_PASS}, + data=query.encode('utf-8'), + timeout=60 + ) + + # Log response details + print(f"Response status: {response.status_code} {response.reason}") + print(f"Response headers: {dict(response.headers)}") + if response.text: + response_preview = response.text[:1000] + "..." if len(response.text) > 1000 else response.text + print(f"Response body: {response_preview}") + + response.raise_for_status() + print(f"SUCCESS: Inserted {len(aggregates)} aggregates to ClickHouse") + return True + + except requests.exceptions.Timeout as e: + last_error = e + print(f"TIMEOUT on attempt {attempt + 1}/{MAX_RETRIES}: {e}", file=sys.stderr) + + except requests.exceptions.ConnectionError as e: + last_error = e + print(f"CONNECTION ERROR on attempt {attempt + 1}/{MAX_RETRIES}: {e}", file=sys.stderr) + + except requests.exceptions.HTTPError as e: + last_error = e + status_code = e.response.status_code if e.response else None + print(f"HTTP ERROR {status_code} on attempt {attempt + 1}/{MAX_RETRIES}", file=sys.stderr) + if e.response is not None: + print(f" Status: {e.response.status_code} {e.response.reason}", file=sys.stderr) + print(f" Headers: {dict(e.response.headers)}", file=sys.stderr) + print(f" Body: {e.response.text}", file=sys.stderr) + print(f" Exception: {e}", file=sys.stderr) + + # Don't retry on client errors (4xx) except 429 (rate limit) + if status_code and 400 <= status_code < 500 and status_code != 429: + print(f"Client error {status_code}, not retrying", file=sys.stderr) + return False + + except Exception as e: + last_error = e + print(f"UNEXPECTED ERROR on attempt {attempt + 1}/{MAX_RETRIES}: {type(e).__name__}: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + + # Calculate delay and wait before retry (unless this was the last attempt) + if attempt < MAX_RETRIES - 1: + delay = get_delay_with_jitter(attempt) + print(f"Retrying in {delay:.2f}s...") + time.sleep(delay) + + print(f"FAILED to insert after {MAX_RETRIES} attempts. Last error: {last_error}", file=sys.stderr) + return False + + + def process_single_file(session: requests.Session, filepath: str, vip_set: set | None) -> bool: + """ + Process a single flow file: parse, aggregate, upload to ClickHouse. + Returns True if successful (file can be moved), False otherwise. + """ + print(f"Processing {filepath}") + + # Parse the file + aggregates = parse_nfcapd_file(filepath, vip_set) + + if not aggregates: + print(f"No aggregates from {filepath} (empty or parse failed)") + # Consider empty files as "processed" - move them + return True + + # Upload to ClickHouse with retries + success = insert_to_clickhouse(session, aggregates) + + if success: + print(f"Successfully processed {filepath}: {len(aggregates)} aggregates") + else: + print(f"Failed to upload {filepath}, will retry on next cycle", file=sys.stderr) + + return success + + + def process_flows(): + """Main processing function.""" + os.makedirs(PROCESSED_DIR, exist_ok=True) + + # Load VIP list if configured + vip_set = load_vip_list() + + # List all files in flow directory for debugging + print(f"=== Scanning {FLOW_DIR} ===") + try: + all_entries = os.listdir(FLOW_DIR) + print(f"All entries in {FLOW_DIR}: {sorted(all_entries)}") + except Exception as e: + print(f"Error listing {FLOW_DIR}: {e}", file=sys.stderr) + return + + # Find all nfcapd files (excluding current file being written) + all_files = sorted(glob.glob(os.path.join(FLOW_DIR, "nfcapd.*"))) + print(f"Files matching nfcapd.*: {[os.path.basename(f) for f in all_files]}") + + # CRITICAL: Exclude nfcapd.current.* files - these are actively being written by nfcapd + # nfcapd will rename these to nfcapd.YYYYMMDDHHmm when rotating + # If we move them, nfcapd's rename will fail with "No such file or directory" + flow_files = [f for f in all_files if not os.path.basename(f).startswith("nfcapd.current.")] + + excluded = [os.path.basename(f) for f in all_files if os.path.basename(f).startswith("nfcapd.current.")] + if excluded: + print(f"Excluding active files (nfcapd.current.*): {excluded}") + + print(f"Eligible rotated files: {[os.path.basename(f) for f in flow_files]}") + + if not flow_files: + print("No rotated flow files to process") + return + + # Filter out files that are too recent (still being written) + now = time.time() + eligible_files = [] + for filepath in flow_files: + try: + file_age = now - os.path.getmtime(filepath) + if file_age >= MIN_FILE_AGE: + eligible_files.append(filepath) + else: + print(f"Skipping {filepath} (age: {file_age:.0f}s < {MIN_FILE_AGE}s)") + except OSError as e: + print(f"Error checking {filepath}: {e}", file=sys.stderr) + + if not eligible_files: + print("No eligible flow files to process") + return + + print(f"Processing {len(eligible_files)} flow files") + + # Create session for connection reuse + session = create_session() + + success_count = 0 + fail_count = 0 + + for filepath in eligible_files: + try: + if process_single_file(session, filepath, vip_set): + # Only move to processed after successful upload + dest = os.path.join(PROCESSED_DIR, os.path.basename(filepath)) + os.rename(filepath, dest) + print(f"Moved {filepath} -> {dest}") + success_count += 1 + else: + fail_count += 1 + except Exception as e: + print(f"Unexpected error processing {filepath}: {e}", file=sys.stderr) + fail_count += 1 + + session.close() + + print(f"Processing complete: {success_count} succeeded, {fail_count} failed") + if fail_count > 0: + print(f"Failed files will be retried on next rollup cycle") + + + if __name__ == "__main__": + print(f"IPFIX rollup processor starting (node: {NODE_NAME})") + print(f"ClickHouse: {CH_URL}, DB: {CH_DB}, Table: {CH_TABLE}") + print(f"ClickHouse user: {CH_USER}, password: {'*' * len(CH_PASS) if CH_PASS else '(empty)'}") + print(f"Retry config: max_retries={MAX_RETRIES}, base_delay={BASE_DELAY}s, max_delay={MAX_DELAY}s") + + # Test ClickHouse connectivity + print(f"\n=== Testing ClickHouse connectivity ===") + try: + test_session = create_session() + test_response = test_session.get( + CH_URL, + params={"user": CH_USER, "password": CH_PASS, "query": "SELECT 1"}, + timeout=10 + ) + print(f"Connectivity test: {test_response.status_code} {test_response.reason}") + print(f"Response: {test_response.text.strip()}") + + # Test table exists + table_check = test_session.get( + CH_URL, + params={"user": CH_USER, "password": CH_PASS, "query": f"DESCRIBE TABLE {CH_DB}.{CH_TABLE}"}, + timeout=10 + ) + print(f"\nTable check ({CH_DB}.{CH_TABLE}): {table_check.status_code}") + if table_check.status_code == 200: + print(f"Table schema:\n{table_check.text[:500]}") + else: + print(f"Table check failed: {table_check.text}", file=sys.stderr) + test_session.close() + except Exception as e: + print(f"Connectivity test FAILED: {type(e).__name__}: {e}", file=sys.stderr) + print(f"=== End connectivity test ===\n") + + process_flows() + print("Processing complete") + + s3-archive.sh: | + #!/bin/sh + set -e + + # Archive processed flows to S3 + PROCESSED_DIR="/var/lib/ipfix/processed" + S3_BUCKET="{{ .Values.ipfix.s3.rawBucket }}" + S3_PREFIX="{{ .Values.ipfix.s3.prefix }}" + + if [ ! -d "$PROCESSED_DIR" ]; then + echo "No processed directory found" + exit 0 + fi + + # Find files older than 1 hour + find "$PROCESSED_DIR" -name "nfcapd.*" -mmin +60 | while read -r file; do + echo "Archiving $file to S3" + aws s3 cp "$file" "${S3_BUCKET}/${S3_PREFIX}$(basename $file)" && rm -f "$file" + done + + echo "S3 archival complete" + + ovs-ipfix-config.sh: | + #!/bin/sh + set -e + + # OVS IPFIX exporter configuration script + # Configures OVS bridge to export IPFIX flows to the local collector + # Idempotent: safe to run multiple times without breaking OVS + + BR="${PROVIDER_BRIDGE:-br-ex}" + TARGET="${IPFIX_TARGET:-127.0.0.1:4739}" + OBS_DOMAIN="${OBS_DOMAIN_ID:-1001}" + CACHE_TIMEOUT="${CACHE_ACTIVE_TIMEOUT:-60}" + CACHE_MAX="${CACHE_MAX_FLOWS:-4096}" + SAMPLING="${SAMPLING_RATE:-1}" + + # Generate a unique obs_point_id from node name (deterministic per node) + OBS_POINT=$(echo "${NODE_NAME}" | cksum | awk '{print $1 % 100000}') + + echo "Configuring IPFIX export on bridge ${BR}" + echo " Target: ${TARGET}" + echo " Observation Domain ID: ${OBS_DOMAIN}" + echo " Observation Point ID: ${OBS_POINT} (derived from ${NODE_NAME})" + echo " Cache Active Timeout: ${CACHE_TIMEOUT}s" + echo " Cache Max Flows: ${CACHE_MAX}" + echo " Sampling Rate: 1/${SAMPLING}" + + # Check if bridge exists + if ! ovs-vsctl br-exists "${BR}" 2>/dev/null; then + echo "Error: Bridge ${BR} does not exist" + echo "Available bridges:" + ovs-vsctl list-br + exit 1 + fi + + # Check current IPFIX configuration + CURRENT_IPFIX=$(ovs-vsctl get Bridge "${BR}" ipfix 2>/dev/null || echo "[]") + + if [ "${CURRENT_IPFIX}" != "[]" ]; then + echo "IPFIX already configured on ${BR}, checking if update needed..." + + # Get current target + CURRENT_TARGET=$(ovs-vsctl get IPFIX "${CURRENT_IPFIX}" targets 2>/dev/null | tr -d '[]"' || echo "") + + if [ "${CURRENT_TARGET}" = "\"${TARGET}\"" ] || [ "${CURRENT_TARGET}" = "${TARGET}" ]; then + echo "IPFIX configuration is current, updating parameters..." + + # Update existing IPFIX entry (idempotent) + ovs-vsctl set IPFIX "${CURRENT_IPFIX}" \ + targets="[\"${TARGET}\"]" \ + obs_domain_id="${OBS_DOMAIN}" \ + obs_point_id="${OBS_POINT}" \ + cache_active_timeout="${CACHE_TIMEOUT}" \ + cache_max_flows="${CACHE_MAX}" \ + sampling="${SAMPLING}" + + echo "IPFIX configuration updated successfully" + else + echo "IPFIX target changed from ${CURRENT_TARGET} to ${TARGET}, reconfiguring..." + + # Clear existing and reconfigure + ovs-vsctl clear Bridge "${BR}" ipfix + ovs-vsctl -- --id=@ipfix create IPFIX \ + targets="[\"${TARGET}\"]" \ + obs_domain_id="${OBS_DOMAIN}" \ + obs_point_id="${OBS_POINT}" \ + cache_active_timeout="${CACHE_TIMEOUT}" \ + cache_max_flows="${CACHE_MAX}" \ + sampling="${SAMPLING}" \ + -- set Bridge "${BR}" ipfix=@ipfix + + echo "IPFIX configuration replaced successfully" + fi + else + echo "No existing IPFIX configuration, creating new..." + + # Create new IPFIX configuration + ovs-vsctl -- --id=@ipfix create IPFIX \ + targets="[\"${TARGET}\"]" \ + obs_domain_id="${OBS_DOMAIN}" \ + obs_point_id="${OBS_POINT}" \ + cache_active_timeout="${CACHE_TIMEOUT}" \ + cache_max_flows="${CACHE_MAX}" \ + sampling="${SAMPLING}" \ + -- set Bridge "${BR}" ipfix=@ipfix + + echo "IPFIX configuration created successfully" + fi + + # Verify configuration + echo "" + echo "Current IPFIX configuration:" + ovs-vsctl list IPFIX 2>/dev/null || echo " (none)" + + echo "" + echo "IPFIX exporter configured. Sleeping to keep pod alive..." + echo "Pod will reconfigure on restart if needed." + + # Sleep forever - pod stays running to maintain the configuration + # If pod is deleted, OVS config persists until explicitly cleared + while true; do + sleep 3600 + echo "$(date): IPFIX exporter still running on ${BR} -> ${TARGET}" + done diff --git a/helm-charts/ipfix/templates/ovs-exporter-daemonset.yaml b/helm-charts/ipfix/templates/ovs-exporter-daemonset.yaml new file mode 100644 index 000000000..32dafffc3 --- /dev/null +++ b/helm-charts/ipfix/templates/ovs-exporter-daemonset.yaml @@ -0,0 +1,67 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: ipfix-ovs-exporter + namespace: {{ .Values.ipfix.namespace }} + labels: + app: ipfix-ovs-exporter +spec: + selector: + matchLabels: + app: ipfix-ovs-exporter + template: + metadata: + labels: + app: ipfix-ovs-exporter + spec: + hostPID: true + hostNetwork: true + serviceAccountName: ipfix-ovs-exporter + nodeSelector: + {{- range $key, $value := .Values.ipfix.nodeSelector }} + {{ $key }}: {{ $value | quote }} + {{- end }} + containers: + - name: ovs-config + image: {{ .Values.ipfix.images.ovsExporter }} + securityContext: + privileged: true + command: ["/bin/sh", "/scripts/ovs-ipfix-config.sh"] + env: + - name: NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: PROVIDER_BRIDGE + value: {{ .Values.ipfix.ovs.providerBridge | quote }} + - name: IPFIX_TARGET + value: {{ .Values.ipfix.ovs.target | quote }} + - name: OBS_DOMAIN_ID + value: {{ .Values.ipfix.ovs.obsDomainId | quote }} + - name: CACHE_ACTIVE_TIMEOUT + value: {{ .Values.ipfix.ovs.cacheActiveTimeout | quote }} + - name: CACHE_MAX_FLOWS + value: {{ .Values.ipfix.ovs.cacheMaxFlows | quote }} + - name: SAMPLING_RATE + value: {{ .Values.ipfix.ovs.samplingRate | quote }} + volumeMounts: + - name: scripts + mountPath: /scripts + - name: ovs-run + mountPath: /var/run/openvswitch + resources: + requests: + cpu: 10m + memory: 32Mi + limits: + cpu: 100m + memory: 64Mi + volumes: + - name: scripts + configMap: + name: ipfix-collector-scripts + defaultMode: 0755 + - name: ovs-run + hostPath: + path: /var/run/openvswitch + type: Directory diff --git a/helm-charts/ipfix/templates/ovs-exporter-rbac.yaml b/helm-charts/ipfix/templates/ovs-exporter-rbac.yaml new file mode 100644 index 000000000..8f92f7603 --- /dev/null +++ b/helm-charts/ipfix/templates/ovs-exporter-rbac.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ipfix-ovs-exporter + namespace: {{ .Values.ipfix.namespace }} + labels: + app: ipfix-ovs-exporter diff --git a/helm-charts/ipfix/templates/service.yaml b/helm-charts/ipfix/templates/service.yaml new file mode 100644 index 000000000..8daeae23e --- /dev/null +++ b/helm-charts/ipfix/templates/service.yaml @@ -0,0 +1,16 @@ +apiVersion: v1 +kind: Service +metadata: + name: ipfix-collector + namespace: {{ .Values.ipfix.namespace }} + labels: + app: ipfix-collector +spec: + type: ClusterIP + clusterIP: None + selector: + app: ipfix-collector + ports: + - name: ipfix + port: {{ .Values.ipfix.listenPort }} + protocol: UDP diff --git a/helm-charts/ipfix/values.yaml b/helm-charts/ipfix/values.yaml new file mode 100644 index 000000000..082b8dea8 --- /dev/null +++ b/helm-charts/ipfix/values.yaml @@ -0,0 +1,92 @@ +ipfix: + namespace: ipfix + + nodeSelector: + openstack-network-node: "enabled" + + # nfcapd configuration + listenPort: 4739 + rotateSeconds: 300 + compress: true + + # Local flow storage on each node (hostPath) + # Flows are stored in /var/lib/ipfix on each node + # This survives reboots and avoids shared storage bottlenecks + + # Rollup interval (seconds) - how often the sidecar processes flows + rollupEverySeconds: 300 + + # ClickHouse HTTP endpoint + clickhouse: + namespace: clickhouse + httpUrl: "http://clickhouse-http.clickhouse.svc.cluster.local:8123" + database: "ipfix" + table: "vip_hourly_node" + secretName: "ipfix-clickhouse-creds" + # ClickHouse cluster name (must match ClickHouseInstallation cluster name) + clusterName: "ipfix" + + # Schema initialization job configuration + initSchema: + enabled: true + # User with CREATE privileges + user: "writer" + + # Optional: restrict rollup to known VIPs + vipListConfigMap: "" + + # nfdump filter applied when reading flow files + # Use this to exclude private/internal IPs from rollup + # Default is empty (no filtering - collect all flows) + # Example to exclude RFC1918, CGNAT, and link-local: + # "not net 10.0.0.0/8 and not net 172.16.0.0/12 and not net 192.168.0.0/16 and not net 100.64.0.0/10 and not net 169.254.0.0/16" + nfdumpFilter: "" + + # S3 archival + s3: + enabled: false + rawBucket: "s3://org-ipfix-raw-dev" + prefix: "clusterA/${NODE_NAME}/" + secretName: "s3-credentials" + + # Container images + images: + # Single unified image with nfdump pre-installed + collector: "ghcr.io/rackerlabs/genestack/ipfix-collector:1.0.0-alpine" + s3: "docker.io/amazon/aws-cli:latest" + # OVS exporter needs ovs-vsctl - use Genestack OVS image + ovsExporter: "ghcr.io/rackerlabs/genestack-images/ovs:v3.5.1-latest" + + # OVS IPFIX exporter configuration + ovs: + # Target for IPFIX export (collector address) + target: "127.0.0.1:4739" + # OVS bridge to export from + # br-int = integration bridge where VM traffic flows (OVN/kube-ovn) + # br-ex = external bridge (typically just north-south traffic) + providerBridge: "br-int" + # IPFIX observation domain ID (identifies this cluster/deployment) + obsDomainId: 1001 + # How often to export active flows (seconds) + cacheActiveTimeout: 60 + # Maximum flows to cache before forced export + cacheMaxFlows: 4096 + # Sampling rate (1 = every packet, 100 = 1 in 100) + samplingRate: 1 + + # Resource limits + resources: + nfcapd: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2000m + memory: 2Gi + rollup: + requests: + cpu: 500m + memory: 1Gi + limits: + cpu: 2000m + memory: 2Gi