Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

/**
* Buffer to store aggregated metrics.
*/
public class AggregatedBuffer implements MetricsBuffer {

private static final Logger LOGGER = Logger.getLogger(AggregatedBuffer.class.getName());
private Map<String, Map<String, ArrayBlockingQueue<String>>> buffer;
private int maxBufferSize;
private int flushSize;
Expand Down Expand Up @@ -63,7 +65,19 @@ public final boolean addToBuffer(final String metric, final Aggregation aggregat
aggregatedBuffer.put(aggregationFrequency.toString(), aggregatedFreqBuffer);
buffer.put(aggregation.toString(), aggregatedBuffer);

return aggregatedFreqBuffer.offer(metric);
boolean inserted = aggregatedFreqBuffer.offer(metric);
if (!inserted) {
try {
LOGGER.fine("Buffer is full, trying to discard oldest metric.");
aggregatedFreqBuffer.remove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of remove we could use pool ( https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ArrayBlockingQueue.html#poll() ) which would allow us to log the discarded metric if we wanted, which I think we want, and doesn't throw exception if it's empty - returns null).

Thinking out loud, should we drop more then one metric?

} catch (NoSuchElementException e) {
LOGGER.warning("Buffer has been emptied before trying to discard oldest entry.");
} finally {
inserted = aggregatedFreqBuffer.offer(metric);
}
}

return inserted;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Logger;

/**
* Buffer to store metrics.
*/
public class StandardBuffer implements MetricsBuffer {

private static final Logger LOGGER = Logger.getLogger(StandardBuffer.class.getName());
private ArrayBlockingQueue<String> buffer;
private int maxBufferSize;
private int flushSize;
Expand Down Expand Up @@ -40,7 +43,19 @@ public final ArrayBlockingQueue<String> getBuffer() {
* @return A {@link Boolean} with the success of the operation
*/
public final boolean addToBuffer(final String metric) {
return buffer.offer(metric);
boolean inserted = buffer.offer(metric);
if (!inserted) {
try {
LOGGER.fine("Buffer is full, trying to discard oldest metric.");
buffer.remove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

} catch (NoSuchElementException e) {
LOGGER.warning("Buffer has been emptied before trying to discard oldest entry.");
} finally {
inserted = buffer.offer(metric);
}
}

return inserted;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ private int sanitizeSampleRate(final int sampleRate) {
private void putRaw(final String metric) {
boolean inserted = standardBuffer.addToBuffer(metric);
if (!inserted) {
// We should discard older metrics instead
LOGGER.warning("The buffer is full, metric ignored!.");
LOGGER.warning("Failed to add metric to buffer.");
}

if (standardBuffer.isTimeToFlush()) {
Expand All @@ -204,8 +203,7 @@ private void putRaw(final String metric) {
private void putAggregatedRaw(final String metric, final Aggregation aggregation, final AggregationFrequency aggregationFrequency) {
boolean inserted = aggregatedBuffer.addToBuffer(metric, aggregation, aggregationFrequency);
if (!inserted) {
// We should discard older metrics instead
LOGGER.warning("The buffer is full, metric ignored!.");
LOGGER.warning("Failed to add metric to buffer.");
}

if (aggregatedBuffer.isTimeToFlush()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void tearDown() {
}

@Test
public void shouldDiscardIfStandardBufferIsFull() {
public void shouldDiscardOlderMetricsIfStandardBufferIsFull() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should, somehow, test the behaviour of trying to empty it when it's empty, more so if we keep the current exception based implementation for when it's empty

// Given
when(configuration.getFlushSize()).thenReturn(10000);

Expand All @@ -76,6 +76,7 @@ public void shouldDiscardIfStandardBufferIsFull() {
// Then
List<String> buffer = subject.getStandardBuffer();
assertEquals("MetricsBuffer should have 5000 metrics", 5000, buffer.size());
assertTrue(buffer.contains("application.test_metric_overflow 500 123456789 100"));
}

@Test
Expand All @@ -95,6 +96,7 @@ public void shouldDiscardIfAggregatedBufferIsFull() {
// Then
Map<Aggregation, Map<AggregationFrequency, List<String>>> buffer = subject.getAggregatedBuffer();
assertEquals("MetricsBuffer should have 5000 metrics", 5000, buffer.get(Aggregation.AVG).get(AggregationFrequency.FREQ_10).size());
assertTrue(buffer.get(Aggregation.AVG).get(AggregationFrequency.FREQ_10).contains("application.test_metric_overflow 500 123456789 100"));
}

@Test
Expand Down