diff --git a/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java b/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java index fadfecd..9333e93 100644 --- a/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java +++ b/core/src/main/java/com/statful/client/core/buffer/AggregatedBuffer.java @@ -7,12 +7,14 @@ import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Logger; /** * Buffer to store aggregated metrics. */ public class AggregatedBuffer implements MetricsBuffer { + private static final Logger LOGGER = Logger.getLogger(AggregatedBuffer.class.getName()); private Map>> buffer; private int maxBufferSize; private int flushSize; @@ -63,7 +65,19 @@ public final boolean addToBuffer(final String metric, final Aggregation aggregat aggregatedBuffer.put(aggregationFrequency.toString(), aggregatedFreqBuffer); buffer.put(aggregation.toString(), aggregatedBuffer); - return aggregatedFreqBuffer.offer(metric); + boolean inserted = aggregatedFreqBuffer.offer(metric); + if (!inserted) { + try { + LOGGER.fine("Buffer is full, trying to discard oldest metric."); + aggregatedFreqBuffer.remove(); + } catch (NoSuchElementException e) { + LOGGER.warning("Buffer has been emptied before trying to discard oldest entry."); + } finally { + inserted = aggregatedFreqBuffer.offer(metric); + } + } + + return inserted; } /** diff --git a/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java b/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java index 8a18e3c..573d5e0 100644 --- a/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java +++ b/core/src/main/java/com/statful/client/core/buffer/StandardBuffer.java @@ -4,13 +4,16 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; +import java.util.logging.Logger; /** * Buffer to store metrics. */ public class StandardBuffer implements MetricsBuffer { + private static final Logger LOGGER = Logger.getLogger(StandardBuffer.class.getName()); private ArrayBlockingQueue buffer; private int maxBufferSize; private int flushSize; @@ -40,7 +43,19 @@ public final ArrayBlockingQueue getBuffer() { * @return A {@link Boolean} with the success of the operation */ public final boolean addToBuffer(final String metric) { - return buffer.offer(metric); + boolean inserted = buffer.offer(metric); + if (!inserted) { + try { + LOGGER.fine("Buffer is full, trying to discard oldest metric."); + buffer.remove(); + } catch (NoSuchElementException e) { + LOGGER.warning("Buffer has been emptied before trying to discard oldest entry."); + } finally { + inserted = buffer.offer(metric); + } + } + + return inserted; } /** diff --git a/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java b/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java index a578eff..61d7d17 100644 --- a/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java +++ b/core/src/main/java/com/statful/client/core/sender/BufferedMetricsSender.java @@ -192,8 +192,7 @@ private int sanitizeSampleRate(final int sampleRate) { private void putRaw(final String metric) { boolean inserted = standardBuffer.addToBuffer(metric); if (!inserted) { - // We should discard older metrics instead - LOGGER.warning("The buffer is full, metric ignored!."); + LOGGER.warning("Failed to add metric to buffer."); } if (standardBuffer.isTimeToFlush()) { @@ -204,8 +203,7 @@ private void putRaw(final String metric) { private void putAggregatedRaw(final String metric, final Aggregation aggregation, final AggregationFrequency aggregationFrequency) { boolean inserted = aggregatedBuffer.addToBuffer(metric, aggregation, aggregationFrequency); if (!inserted) { - // We should discard older metrics instead - LOGGER.warning("The buffer is full, metric ignored!."); + LOGGER.warning("Failed to add metric to buffer."); } if (aggregatedBuffer.isTimeToFlush()) { diff --git a/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java b/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java index 97182e7..6a4a295 100644 --- a/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java +++ b/core/src/test/java/com/statful/client/core/sender/BufferedMetricsSenderAPITest.java @@ -60,7 +60,7 @@ public void tearDown() { } @Test - public void shouldDiscardIfStandardBufferIsFull() { + public void shouldDiscardOlderMetricsIfStandardBufferIsFull() { // Given when(configuration.getFlushSize()).thenReturn(10000); @@ -76,6 +76,7 @@ public void shouldDiscardIfStandardBufferIsFull() { // Then List buffer = subject.getStandardBuffer(); assertEquals("MetricsBuffer should have 5000 metrics", 5000, buffer.size()); + assertTrue(buffer.contains("application.test_metric_overflow 500 123456789 100")); } @Test @@ -95,6 +96,7 @@ public void shouldDiscardIfAggregatedBufferIsFull() { // Then Map>> buffer = subject.getAggregatedBuffer(); assertEquals("MetricsBuffer should have 5000 metrics", 5000, buffer.get(Aggregation.AVG).get(AggregationFrequency.FREQ_10).size()); + assertTrue(buffer.get(Aggregation.AVG).get(AggregationFrequency.FREQ_10).contains("application.test_metric_overflow 500 123456789 100")); } @Test