diff --git a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java index e69280e3..e1e4fbb8 100644 --- a/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java +++ b/evcache-client/test/com/netflix/evcache/test/EVCacheTestDI.java @@ -261,6 +261,16 @@ public void testAppendOrAdd() throws Exception { } private void refreshEVCache() { + // Close the previous DI container before building a new one. setupEnv() creates a fresh + // LifecycleInjector (Eureka client, Spectator registry, connection pools) on every call; without + // closing the old one its background threads keep it alive and it leaks, eventually exhausting the heap. + if (lifecycleManager != null) { + try { + lifecycleManager.close(); + } catch (Exception e) { + log.warn("Failed to close previous lifecycle manager during refresh", e); + } + } setupEnv(); testEVCache(); } @@ -278,6 +288,7 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce refreshEVCache(); assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); doFunctionalTests(true); + testBulkHashed(); propertiesToSet.remove(appName + ".hash.key"); // hashing at app level due to auto hashing as a consequence of a large key @@ -331,6 +342,84 @@ public void functionalTestsWithAppLevelAndASGLevelHashingScenarios() throws Exce refreshEVCache(); } + @Test(dependsOnMethods = { "functionalTestsWithAppLevelAndASGLevelHashingScenarios" }) + public void testChunkingScenarios() throws Exception { + // chunking only (no hashing): large values are split into chunks and reassembled on read + propertiesToSet.put(appName + ".chunk.data", "true"); + refreshEVCache(); + assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".chunk.data", Boolean.class).orElse(false).get()); + doChunkingTests(false); + + // chunking + hashing together: the value is wrapped in an EVCacheValue envelope and then chunked under the + // hashed key. This exercises the mixed-key, chunk-aware getBulk (two-step decode after chunk reassembly). + propertiesToSet.put(appName + ".hash.key", "true"); + refreshEVCache(); + assertTrue(manager.getEVCacheConfig().getPropertyRepository().get(appName + ".hash.key", Boolean.class).orElse(false).get()); + doChunkingTests(true); + propertiesToSet.remove(appName + ".hash.key"); + + propertiesToSet.remove(appName + ".chunk.data"); + refreshEVCache(); + } + + private void doChunkingTests(boolean hashingEnabled) throws Exception { + final EVCacheClient client = manager.getEVCacheClientPool(appName).getEVCacheClientForRead(); + + // single large value -> chunked set/get + final String largeKey = "chunk_large_" + System.nanoTime(); + final String largeValue = buildLargeValue(4000); + EVCacheLatch latch = evCache.set(largeKey, largeValue, EVCacheLatch.Policy.ALL); + latch.await(10000, TimeUnit.MILLISECONDS); + + // verify the value was actually chunked (guards against it being too small / compressed below chunk.size). + // For hashed keys the stored key is the hash key, so we only introspect the chunk layout for plain keys. + if (!hashingEnabled) { + final Map chunks = client.getAllChunks("cid:" + largeKey); + assertNotNull(chunks, "large value should exist in cache"); + assertFalse(chunks.containsKey("cid:" + largeKey), + "value should have been chunked, but was stored as a single key (too small / compressed below chunk.size)"); + } + + assertEquals(evCache.get(largeKey), largeValue, "chunked single get did not return the written value"); + + // bulk get of multiple chunked values (sync getBulk; async bulk does not support chunking) + final int count = 3; + final Map kv = new HashMap<>(count); + for (int i = 0; i < count; i++) { + final String key = "chunk_bulk_" + i + "_" + System.nanoTime(); + final String value = buildLargeValue(3000 + i); + kv.put(key, value); + EVCacheLatch l = evCache.set(key, value, EVCacheLatch.Policy.ALL); + l.await(10000, TimeUnit.MILLISECONDS); + } + final Map results = evCache.getBulk(kv.keySet().toArray(new String[0])); + assertNotNull(results); + assertEquals(results.size(), kv.size(), "chunked getBulk returned wrong number of entries"); + for (Map.Entry entry : kv.entrySet()) { + assertEquals(results.get(entry.getKey()), entry.getValue(), "chunked getBulk failed for key " + entry.getKey()); + } + + // cleanup + for (Future f : evCache.delete(largeKey)) { + f.get(); + } + for (String key : kv.keySet()) { + for (Future f : evCache.delete(key)) { + f.get(); + } + } + } + + // Builds an incompressible value (random UUIDs) so its encoded size stays above the chunk size and actually + // triggers chunking. A repeating/low-entropy value would be compressed below chunk.size and never chunk. + private String buildLargeValue(int approxBytes) { + final StringBuilder sb = new StringBuilder(approxBytes + 36); + while (sb.length() < approxBytes) { + sb.append(UUID.randomUUID().toString()); + } + return sb.toString(); + } + private void testWithLargeKey() throws Exception { StringBuilder sb = new StringBuilder(); for (int i= 0; i < 100; i++) { @@ -354,11 +443,43 @@ private void testWithLargeKey() throws Exception { } } - private void testWithMixedKeys() throws Exception { + private void testBulkHashed() throws Exception { + final int count = 3; + Map kv = new HashMap<>(count); + for (int i = 0; i < count; i++) { + String key = "bulkhashed_" + i; + String value = "val_bulkhashed_" + i; + kv.put(key, value); + EVCacheLatch latch = evCache.set(key, value, EVCacheLatch.Policy.ALL); + latch.await(10000, TimeUnit.MILLISECONDS); + } - EVCache[] evcacheInstance = new EVCache[2]; - evcacheInstance[0] = getNewBuilder().setAppName(appName).setCachePrefix("cid").enableRetry().build(); - evcacheInstance[1] = this.evCache; + Map results = evCache.getBulk(kv.keySet().toArray(new String[0])); + assertNotNull(results); + assertEquals(results.size(), kv.size()); + for (Map.Entry entry : kv.entrySet()) { + assertEquals(results.get(entry.getKey()), entry.getValue(), + "getBulk with all hashed keys failed for key " + entry.getKey()); + } + + CompletableFuture> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0])); + results = future.get(10000, TimeUnit.MILLISECONDS); + assertNotNull(results); + assertEquals(results.size(), kv.size()); + for (Map.Entry entry : kv.entrySet()) { + assertEquals(results.get(entry.getKey()), entry.getValue(), + "getAsyncBulk with all hashed keys failed for key " + entry.getKey()); + } + + for (String key : kv.keySet()) { + Future[] deleteFutures = evCache.delete(key); + for (Future deleteFuture : deleteFutures) { + deleteFuture.get(); + } + } + } + + private void testWithMixedKeys() throws Exception { Map kv = new HashMap<>(6); String oneLargeKey = null; @@ -449,17 +570,17 @@ private void testWithMixedKeysAndCustomTranscoder() throws Exception { // async bulk get for (int op : new int[]{0, 1}) { - Map results = new HashMap<>(); + Map results; if (op == 0) { CompletableFuture> future = evCache.getAsyncBulk(kv.keySet().toArray(new String[0])); results = future.get(10000, TimeUnit.MILLISECONDS); - // } else { - // TODO: getBulk api is known to be broken for un-hashed keys not decoding correctly when request contains both hashed and unhashed keys - // results = evCache.getBulk(kv.keySet().toArray(new String[0])); + } else { + results = evCache.getBulk(kv.keySet().toArray(new String[0])); } + assertEquals(results.size(), kv.size()); for (Map.Entry result : results.entrySet()) { - assertEquals(results.size(), kv.size()); + assertEquals(result.getValue(), kv.get(result.getKey()), "Did not get the written value back with op " + (op == 0 ? "getAsyncBulk" : "getBulk")); } } diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index 67f7a1fd..e0395ebf 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -1969,60 +1969,30 @@ private Map buildKeyValueResult(Map objMap, private Map getBulkData(EVCacheClient client, Collection evcacheKeys, Transcoder tc, boolean throwException, boolean hasZF) throws Exception { try { - boolean hasHashedKey = false; - final Map keyMap = new HashMap<>(evcacheKeys.size() * 2); - for(EVCacheKey evcKey : evcacheKeys) { - String key = evcKey.getCanonicalKey(client.isDuetClient()); - String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder()); - if(hashKey != null) { - if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]"); - key = hashKey; - hasHashedKey = true; - } - keyMap.put(key, evcKey); - } - if(hasHashedKey) { - final Map objMap = client.getBulk(keyMap.keySet(), evcacheValueTranscoder, throwException, hasZF); - final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); - for (Map.Entry i : objMap.entrySet()) { - final Object obj = i.getValue(); - if(obj instanceof EVCacheValue) { - if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", The value for key [" + i.getKey() + "] is EVCache Value"); - final EVCacheValue val = (EVCacheValue)obj; - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - final T tVal; - if(tc == null) { - tVal = (T)client.getTranscoder().decode(cd); - } else { - tVal = tc.decode(cd); - } - final EVCacheKey evcKey = keyMap.get(i.getKey()); - if(evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) { - if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - retMap.put(evcKey, tVal); - } else { - if (log.isDebugEnabled() && shouldLog()) log.debug("CACHE COLLISION : APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.BULK.name(), EVCacheMetricsFactory.READ); - } - } else { - final EVCacheKey evcKey = keyMap.get(i.getKey()); - if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - retMap.put(evcKey, (T)obj); - } + final KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); + final Set plainKeys = keyMapDto.getPlainKeysMap().keySet(); + final Set hashedKeys = keyMapDto.getHashedKeysMap().keySet(); + + final BiPredicate collisionChecker = (hashedKey, decodedKey) -> { + final EVCacheKey evcKey = keyMapDto.getHashedKeysMap().get(hashedKey); + if (evcKey.getCanonicalKey(client.isDuetClient()).equals(decodedKey)) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + hashedKey + "] EVCacheKey " + evcKey); + } else { + if (log.isDebugEnabled() && shouldLog()) + log.debug("CACHE COLLISION : APP " + _appName + ", key [" + hashedKey + "] EVCacheKey " + evcKey + " with decodedKey [" + decodedKey + "]"); + incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.BULK.name(), EVCacheMetricsFactory.READ); + return true; } - return retMap; + return false; + }; - } else { - if(tc == null && _transcoder != null) tc = (Transcoder)_transcoder; - final Map objMap = client.getBulk(keyMap.keySet(), tc, throwException, hasZF); - final Map retMap = new HashMap((int)(objMap.size()/0.75) + 1); - for (Map.Entry i : objMap.entrySet()) { - final EVCacheKey evcKey = keyMap.get(i.getKey()); - if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); - retMap.put(evcKey, i.getValue()); - } - return retMap; - } + final Transcoder valueTranscoder = (tc == null) ? ((_transcoder == null) ? (Transcoder) client.getTranscoder() : (Transcoder) _transcoder) : tc; + if (log.isDebugEnabled() && shouldLog()) + log.debug("fetching bulk data with set of keys containing hashed key(s) {} ", evcacheKeys); + + final Map objMap = client.getBulk(plainKeys, hashedKeys, valueTranscoder, evcacheValueTranscoder, _appName, shouldLog(), collisionChecker, throwException, hasZF); + return buildKeyValueResult(objMap, keyMapDto); } catch (Exception ex) { if (log.isDebugEnabled() && shouldLog()) log.debug("Exception while getBulk data for APP " + _appName + ", key : " + evcacheKeys, ex); if (!throwException || hasZF) return null; diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 3a0dbcd9..83c7a6b5 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -34,6 +34,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -568,7 +569,34 @@ private ChunkInfo getChunkInfo(String firstKey, String metadata) { return ci; } - private Map assembleChunks(Collection keyList, Transcoder tc, boolean hasZF) { + private T decodeForKey(String key, CachedData raw, Collection plainKeys, Set hashedKeys, Transcoder valueTranscoder, EVCacheTranscoder evcacheValueTranscoder, BiPredicate collisionChecker, boolean hasZF) { + if (raw == null) return null; + // hashed keys require 2 step decoding, first using envelopeTranscoder then using valueTranscoder + if (hashedKeys != null && hashedKeys.contains(key)) { + if (evcacheValueTranscoder == null) throw new IllegalStateException("Both transcoders required for 2-step decode, failed on key " + key + + " of bulk get for plain keys [" + plainKeys + "] and hashed keys [" + hashedKeys + "]"); + Object obj; + try { + obj = evcacheValueTranscoder.decode(raw); + } catch (Exception e) { + throw new RuntimeException("Failed to decode key " + key + " using envelopeTranscoder " + evcacheValueTranscoder.getClass().getName(), e); + } + if (obj instanceof EVCacheValue) { + final EVCacheValue val = (EVCacheValue) obj; + boolean collision = collisionChecker.test(key, val.getKey()); + if (!collision) { + return valueTranscoder.decode(new CachedData(val.getFlags(), val.getValue(), valueTranscoder.getMaxSize())); + } + } + return null; + } + return valueTranscoder.decode(raw); + } + + private Map assembleChunks(Collection plainKeys, Set hashedKeys, Transcoder valueTranscoder, EVCacheTranscoder evcacheValueTranscoder, BiPredicate collisionChecker, boolean hasZF) { + final Set keyList = new HashSet<>(); + if (plainKeys != null) keyList.addAll(plainKeys); + if (hashedKeys != null) keyList.addAll(hashedKeys); final List firstKeys = new ArrayList<>(); for (String key : keyList) { firstKeys.add(key); @@ -583,7 +611,7 @@ private Map assembleChunks(Collection keyList, Transcoder for (String key : keyList) { if (metadataMap.containsKey(key)) { CachedData val = metadataMap.remove(key); - returnMap.put(key, tc.decode(val)); + returnMap.put(key, decodeForKey(key, val, plainKeys, hashedKeys, valueTranscoder, evcacheValueTranscoder, collisionChecker, hasZF)); } } @@ -652,7 +680,7 @@ private Map assembleChunks(Collection keyList, Transcoder final boolean checksumPass = checkCRCChecksum(data, ci, hasZF); if (data != null && checksumPass) { final CachedData cd = new CachedData(ci.getFlags(), data, Integer.MAX_VALUE); - returnMap.put(ci.getKey(), tc.decode(cd)); + returnMap.put(ci.getKey(), decodeForKey(ci.getKey(), cd, plainKeys, hashedKeys, valueTranscoder, evcacheValueTranscoder, collisionChecker, hasZF)); } else { returnMap.put(ci.getKey(), null); } @@ -664,6 +692,14 @@ private Map assembleChunks(Collection keyList, Transcoder return null; } + /** + * Plain-only chunk assembly. Delegates to the mixed-key variant with an empty hashed-key set, so every key is + * decoded in a single step (identical to the legacy single-transcoder behavior). + */ + private Map assembleChunks(Collection keyList, Transcoder tc, boolean hasZF) { + return assembleChunks(keyList, Collections.emptySet(), tc, null, null, hasZF); + } + private Single> assembleChunks(Collection keyList, Transcoder tc, boolean hasZF, Scheduler scheduler) { final List firstKeys = new ArrayList<>(); for (String key : keyList) { @@ -973,6 +1009,12 @@ public Single getAndTouch(String key, Transcoder transcoder, int timeT } } + /** + * @deprecated Does not support a mix of plain and hashed keys. Use + * {@link #getBulk(Collection, Set, Transcoder, EVCacheTranscoder, String, boolean, BiPredicate, boolean, boolean)}, + * which handles plain and hashed keys (and chunking) in a single request. + */ + @Deprecated public Map getBulk(Collection canonicalKeys, Transcoder tc, boolean _throwException, boolean hasZF) throws Exception { final Map returnVal; @@ -998,6 +1040,38 @@ public Map getBulk(Collection canonicalKeys, Transcoder Map getBulk(Collection plainKeys, Set hashedKeys, + Transcoder valueTranscoder, EVCacheTranscoder evcacheValueTranscoder, + String appName, boolean shouldLog, BiPredicate collisionChecker, + boolean _throwException, boolean hasZF) throws Exception { + try { + if (valueTranscoder == null) valueTranscoder = (Transcoder) getTranscoder(); + if (enableChunking.get()) { + return assembleChunks(plainKeys, hashedKeys, valueTranscoder, evcacheValueTranscoder, collisionChecker, hasZF); + } + final BiPredicate validator = (node, key) -> { + NodeValidationResult result = validateNodeForRead(node, Call.BULK, 2 * maxReadQueueSize.get()); + if (result != NodeValidationResult.OK) { + return false; + } + return true; + }; + return evcacheMemcachedClient + .asyncGetBulk(plainKeys, hashedKeys, valueTranscoder, evcacheValueTranscoder, validator, appName, shouldLog, collisionChecker) + .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); + } catch (Exception e) { + if (_throwException) throw e; + return Collections. emptyMap(); + } + } + /** * @Deprecated This method does NOT support a mix of plain and hashed keys in {@code keys}. All keys are * decoded exactly using the given transcoder (note that hashed keys require two step decoding).