Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 130 additions & 9 deletions evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,16 @@ public void testAppendOrAdd() throws Exception {
}

private void refreshEVCache() {
// Close the previous DI container before building a new one. setupEnv() creates a fresh
// LifecycleInjector (Eureka client, Spectator registry, connection pools) on every call; without
// closing the old one its background threads keep it alive and it leaks, eventually exhausting the heap.
if (lifecycleManager != null) {
try {
lifecycleManager.close();
} catch (Exception e) {
log.warn("Failed to close previous lifecycle manager during refresh", e);
}
}
setupEnv();
testEVCache();
}
Expand All @@ -278,6 +288,7 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce
refreshEVCache();
assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get());
doFunctionalTests(true);
testBulkHashed();
propertiesToSet.remove(appName + ".hash.key");

// hashing at app level due to auto hashing as a consequence of a large key
Expand Down Expand Up @@ -331,6 +342,84 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce
refreshEVCache();
}

@Test(dependsOnMethods = { "functionalTestsWithAppLevelAndASGLevelHashingScenarios" })
public void testChunkingScenarios() throws Exception {
// chunking only (no hashing): large values are split into chunks and reassembled on read
propertiesToSet.put(appName + ".chunk.data", "true");
refreshEVCache();
assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".chunk.data", Boolean.class).orElse(false).get());
doChunkingTests(false);

// chunking + hashing together: the value is wrapped in an EVCacheValue envelope and then chunked under the
// hashed key. This exercises the mixed-key, chunk-aware getBulk (two-step decode after chunk reassembly).
propertiesToSet.put(appName + ".hash.key", "true");
refreshEVCache();
assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get());
doChunkingTests(true);
propertiesToSet.remove(appName + ".hash.key");

propertiesToSet.remove(appName + ".chunk.data");
refreshEVCache();
}

private void doChunkingTests(boolean hashingEnabled) throws Exception {
final EVCacheClient client = manager.getEVCacheClientPool(appName).getEVCacheClientForRead();

// single large value -> chunked set/get
final String largeKey = "chunk_large_" + System.nanoTime();
final String largeValue = buildLargeValue(4000);
EVCacheLatch latch = evCache.set(largeKey, largeValue, EVCacheLatch.Policy.ALL);
latch.await(10000, TimeUnit.MILLISECONDS);

// verify the value was actually chunked (guards against it being too small / compressed below chunk.size).
// For hashed keys the stored key is the hash key, so we only introspect the chunk layout for plain keys.
if (!hashingEnabled) {
final Map<String, ?> chunks = client.getAllChunks("cid:" + largeKey);
assertNotNull(chunks, "large value should exist in cache");
assertFalse(chunks.containsKey("cid:" + largeKey),
"value should have been chunked, but was stored as a single key (too small / compressed below chunk.size)");
}

assertEquals(evCache.get(largeKey), largeValue, "chunked single get did not return the written value");

// bulk get of multiple chunked values (sync getBulk; async bulk does not support chunking)
final int count = 3;
final Map<String, String> kv = new HashMap<>(count);
for (int i = 0; i < count; i++) {
final String key = "chunk_bulk_" + i + "_" + System.nanoTime();
final String value = buildLargeValue(3000 + i);
kv.put(key, value);
EVCacheLatch l = evCache.set(key, value, EVCacheLatch.Policy.ALL);
l.await(10000, TimeUnit.MILLISECONDS);
}
final Map<String, String> results = evCache.getBulk(kv.keySet().toArray(new String[0]));
assertNotNull(results);
assertEquals(results.size(), kv.size(), "chunked getBulk returned wrong number of entries");
for (Map.Entry<String, String> entry : kv.entrySet()) {
assertEquals(results.get(entry.getKey()), entry.getValue(), "chunked getBulk failed for key " + entry.getKey());
}

// cleanup
for (Future<Boolean> f : evCache.delete(largeKey)) {
f.get();
}
for (String key : kv.keySet()) {
for (Future<Boolean> f : evCache.delete(key)) {
f.get();
}
}
}

// Builds an incompressible value (random UUIDs) so its encoded size stays above the chunk size and actually
// triggers chunking. A repeating/low-entropy value would be compressed below chunk.size and never chunk.
private String buildLargeValue(int approxBytes) {
final StringBuilder sb = new StringBuilder(approxBytes + 36);
while (sb.length() < approxBytes) {
sb.append(UUID.randomUUID().toString());
}
return sb.toString();
}

private void testWithLargeKey() throws Exception {
StringBuilder sb = new StringBuilder();
for (int i= 0; i < 100; i++) {
Expand All @@ -354,11 +443,43 @@ private void testWithLargeKey() throws Exception {
}
}

private void testWithMixedKeys() throws Exception {
private void testBulkHashed() throws Exception {
final int count = 3;
Map<String, String> kv = new HashMap<>(count);
for (int i = 0; i < count; i++) {
String key = "bulkhashed_" + i;
String value = "val_bulkhashed_" + i;
kv.put(key, value);
EVCacheLatch latch = evCache.set(key, value, EVCacheLatch.Policy.ALL);
latch.await(10000, TimeUnit.MILLISECONDS);
}

EVCache[] evcacheInstance = new EVCache[2];
evcacheInstance[0] = getNewBuilder().setAppName(appName).setCachePrefix("cid").enableRetry().build();
evcacheInstance[1] = this.evCache;
Map<String, String> results = evCache.getBulk(kv.keySet().toArray(new String[0]));
assertNotNull(results);
assertEquals(results.size(), kv.size());
for (Map.Entry<String, String> entry : kv.entrySet()) {
assertEquals(results.get(entry.getKey()), entry.getValue(),
"getBulk with all hashed keys failed for key " + entry.getKey());
}

CompletableFuture<Map<String, String>> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0]));
results = future.get(10000, TimeUnit.MILLISECONDS);
assertNotNull(results);
assertEquals(results.size(), kv.size());
for (Map.Entry<String, String> entry : kv.entrySet()) {
assertEquals(results.get(entry.getKey()), entry.getValue(),
"getAsyncBulk with all hashed keys failed for key " + entry.getKey());
}

for (String key : kv.keySet()) {
Future<Boolean>[] deleteFutures = evCache.delete(key);
for (Future<Boolean> deleteFuture : deleteFutures) {
deleteFuture.get();
}
}
}

private void testWithMixedKeys() throws Exception {

Map<String, String> kv = new HashMap<>(6);
String oneLargeKey = null;
Expand Down Expand Up @@ -449,17 +570,17 @@ private void testWithMixedKeysAndCustomTranscoder() throws Exception {

// async bulk get
for (int op : new int[]{0, 1}) {
Map<String, Movie> results = new HashMap<>();
Map<String, Movie> results;
if (op == 0) {
CompletableFuture<Map<String, Movie>> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0]));
results = future.get(10000, TimeUnit.MILLISECONDS);
// } else {
// TODO: getBulk api is known to be broken for un-hashed keys not decoding correctly when request contains both hashed and unhashed keys
// results = evCache.getBulk(kv.keySet().toArray(new String[0]));
} else {
results = evCache.getBulk(kv.keySet().toArray(new String[0]));
}

assertEquals(results.size(), kv.size());
for (Map.Entry<String, Movie> result : results.entrySet()) {
assertEquals(results.size(), kv.size());

assertEquals(result.getValue(), kv.get(result.getKey()), "Did not get the written value back with op " + (op == 0 ? "getAsyncBulk" : "getBulk"));
}
}
Expand Down
74 changes: 22 additions & 52 deletions evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1969,60 +1969,30 @@ private <T> Map<EVCacheKey, T> buildKeyValueResult(Map<String, T> objMap,

private <T> Map<EVCacheKey, T> getBulkData(EVCacheClient client, Collection<EVCacheKey> evcacheKeys, Transcoder<T> tc, boolean throwException, boolean hasZF) throws Exception {
try {
boolean hasHashedKey = false;
final Map<String, EVCacheKey> keyMap = new HashMap<>(evcacheKeys.size() * 2);
for(EVCacheKey evcKey : evcacheKeys) {
String key = evcKey.getCanonicalKey(client.isDuetClient());
String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder());
if(hashKey != null) {
if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]");
key = hashKey;
hasHashedKey = true;
}
keyMap.put(key, evcKey);
}
if(hasHashedKey) {
final Map<String, Object> objMap = client.getBulk(keyMap.keySet(), evcacheValueTranscoder, throwException, hasZF);
final Map<EVCacheKey, T> retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1);
for (Map.Entry<String, Object> i : objMap.entrySet()) {
final Object obj = i.getValue();
if(obj instanceof EVCacheValue) {
if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", The value for key [" + i.getKey() + "] is EVCache Value");
final EVCacheValue val = (EVCacheValue)obj;
final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE);
final T tVal;
if(tc == null) {
tVal = (T)client.getTranscoder().decode(cd);
} else {
tVal = tc.decode(cd);
}
final EVCacheKey evcKey = keyMap.get(i.getKey());
if(evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) {
if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey);
retMap.put(evcKey, tVal);
} else {
if (log.isDebugEnabled() && shouldLog()) log.debug("CACHE COLLISION : APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey);
incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.BULK.name(), EVCacheMetricsFactory.READ);
}
} else {
final EVCacheKey evcKey = keyMap.get(i.getKey());
if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey);
retMap.put(evcKey, (T)obj);
}
final KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys);
final Set<String> plainKeys = keyMapDto.getPlainKeysMap().keySet();
final Set<String> hashedKeys = keyMapDto.getHashedKeysMap().keySet();

final BiPredicate<String, String> collisionChecker = (hashedKey, decodedKey) -> {
final EVCacheKey evcKey = keyMapDto.getHashedKeysMap().get(hashedKey);
if (evcKey.getCanonicalKey(client.isDuetClient()).equals(decodedKey)) {
if (log.isDebugEnabled() && shouldLog())
log.debug("APP " + _appName + ", key [" + hashedKey + "] EVCacheKey " + evcKey);
} else {
if (log.isDebugEnabled() && shouldLog())
log.debug("CACHE COLLISION : APP " + _appName + ", key [" + hashedKey + "] EVCacheKey " + evcKey + " with decodedKey [" + decodedKey + "]");
incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.BULK.name(), EVCacheMetricsFactory.READ);
return true;
}
return retMap;
return false;
};

} else {
if(tc == null && _transcoder != null) tc = (Transcoder<T>)_transcoder;
final Map<String, T> objMap = client.getBulk(keyMap.keySet(), tc, throwException, hasZF);
final Map<EVCacheKey, T> retMap = new HashMap<EVCacheKey, T>((int)(objMap.size()/0.75) + 1);
for (Map.Entry<String, T> i : objMap.entrySet()) {
final EVCacheKey evcKey = keyMap.get(i.getKey());
if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey);
retMap.put(evcKey, i.getValue());
}
return retMap;
}
final Transcoder<T> valueTranscoder = (tc == null) ? ((_transcoder == null) ? (Transcoder<T>) client.getTranscoder() : (Transcoder<T>) _transcoder) : tc;
if (log.isDebugEnabled() && shouldLog())
log.debug("fetching bulk data with set of keys containing hashed key(s) {} ", evcacheKeys);

final Map<String, T> objMap = client.getBulk(plainKeys, hashedKeys, valueTranscoder, evcacheValueTranscoder, _appName, shouldLog(), collisionChecker, throwException, hasZF);
return buildKeyValueResult(objMap, keyMapDto);
} catch (Exception ex) {
if (log.isDebugEnabled() && shouldLog()) log.debug("Exception while getBulk data for APP " + _appName + ", key : " + evcacheKeys, ex);
if (!throwException || hasZF) return null;
Expand Down
Loading