From 5f19a244c24f24877530abc4f775f2a82b5b288f Mon Sep 17 00:00:00 2001 From: ntroutman Date: Fri, 19 Sep 2025 08:35:06 -0700 Subject: [PATCH 1/3] Use ArrayBlockingQueue in StatsDNonBlockingProcessor Swaps out the ConcurrentLinkedQueue for ArrayBlockingQueue which is bounded by default removing the need to track queue size independently and avoiding allocation of linked nodes, instead perfering a constant upfront allocation reducing GC churn. --- .../statsd/StatsDNonBlockingProcessor.java | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 481727b6..dcd913da 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -1,21 +1,18 @@ package com.timgroup.statsd; import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; public class StatsDNonBlockingProcessor extends StatsDProcessor { private final Queue messages; - private final AtomicInteger qsize; // qSize will not reflect actual size, but a close estimate. private class ProcessingTask extends StatsDProcessor.ProcessingTask { @Override protected Message getMessage() throws InterruptedException { final Message message = messages.poll(); if (message != null) { - qsize.decrementAndGet(); return message; } @@ -50,8 +47,7 @@ protected boolean haveMessages() { aggregatorFlushInterval, aggregatorShards, threadFactory); - this.qsize = new AtomicInteger(0); - this.messages = new ConcurrentLinkedQueue<>(); + this.messages = new ArrayBlockingQueue<>(qcapacity); } @Override @@ -62,11 +58,7 @@ protected ProcessingTask createProcessingTask() { @Override protected boolean send(final Message message) { if (!shutdown) { - if (qsize.get() < qcapacity) { - messages.offer(message); - qsize.incrementAndGet(); - return true; - } + return messages.offer(message); } return false; From cea33ec5c5172da2e3a901dcbbc21f9d85ff48e5 Mon Sep 17 00:00:00 2001 From: Nathaniel Troutman Date: Fri, 19 Sep 2025 09:41:38 -0700 Subject: [PATCH 2/3] use LinkedBlockingQueue for "large" queue sizes --- .../com/timgroup/statsd/StatsDNonBlockingProcessor.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index dcd913da..3bdaa06b 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -2,6 +2,7 @@ import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; public class StatsDNonBlockingProcessor extends StatsDProcessor { @@ -47,7 +48,12 @@ protected boolean haveMessages() { aggregatorFlushInterval, aggregatorShards, threadFactory); - this.messages = new ArrayBlockingQueue<>(qcapacity); + if (qcapacity <= 8192) { + this.messages = new ArrayBlockingQueue<>(qcapacity); + } else { + this.messages = new LinkedBlockingQueue<>(qcapacity); + } + } @Override From acc7b285039f687068698fe0d570cfc191c12460 Mon Sep 17 00:00:00 2001 From: Nathaniel Troutman Date: Fri, 19 Sep 2025 10:45:04 -0700 Subject: [PATCH 3/3] mvn spotless:apply --- .../java/com/timgroup/statsd/StatsDNonBlockingProcessor.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java index 3bdaa06b..6521f980 100644 --- a/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java +++ b/src/main/java/com/timgroup/statsd/StatsDNonBlockingProcessor.java @@ -53,7 +53,6 @@ protected boolean haveMessages() { } else { this.messages = new LinkedBlockingQueue<>(qcapacity); } - } @Override