diff --git a/flinksketch-bench/src/main/java/dev/projectasap/flinksketch/DataStreamJob.java b/flinksketch-bench/src/main/java/dev/projectasap/flinksketch/DataStreamJob.java index 9a89687..a75876a 100644 --- a/flinksketch-bench/src/main/java/dev/projectasap/flinksketch/DataStreamJob.java +++ b/flinksketch-bench/src/main/java/dev/projectasap/flinksketch/DataStreamJob.java @@ -20,6 +20,7 @@ import net.sourceforge.argparse4j.ArgumentParsers; import net.sourceforge.argparse4j.inf.ArgumentParser; import net.sourceforge.argparse4j.inf.Namespace; +import org.apache.commons.math3.distribution.ZipfDistribution; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -136,9 +137,10 @@ private static Namespace parseArgs(String[] args) { parser .addArgument("--distribution") .type(String.class) - .choices("uniform", "normal") + .choices("uniform", "normal", "zipfian") .setDefault("uniform") - .help("Distribution type for generated values: uniform or normal (default: uniform)"); + .help( + "Distribution type for generated values: uniform, normal, or zipfian (default: uniform)"); Namespace parsedArgs = parser.parseArgsOrFail(args); return parsedArgs; @@ -179,10 +181,19 @@ private static DataPoint generateDataPoint( boolean infiniteCardinality, int keyCardinality, String distribution, - Random randomSource) { + Random randomSource, + ZipfDistribution zipf) { // Calculate timestamp so that N items fit into each tumbling window long timestamp = baseTimestamp + (index / itemsPerWindow) * millisecondTumblingWindow; - String key = infiniteCardinality ? "key" + index : "key" + ((index % keyCardinality) + 1); + + String key; + if (infiniteCardinality) key = "key" + index; + else if ("zipfian".equals(distribution)) { + int zipfkey = zipf.sample(); + key = "key" + zipfkey; + } else { + key = "key" + ((index % keyCardinality) + 1); + } // Generate value based on distribution type int value; @@ -191,6 +202,8 @@ private static DataPoint generateDataPoint( double normalValue = randomSource.nextGaussian() * 166667 + 500000; // Clamp to 1-1,000,000 range value = (int) Math.max(1, Math.min(1000000, normalValue)); + } else if ("zipfian".equals(distribution)) { + value = 1; } else { // Uniform distribution (default): 1-1,000,000 value = randomSource.nextInt(1000000) + 1; @@ -215,6 +228,11 @@ private static DataStream createDataGenStream( // Create first source Random random0 = new Random(40L); + + // exponent 1.07 is the YCSB (Yahoo! Cloud Serving Benchmark) benchmark + ZipfDistribution zipf = + "zipfian".equals(distribution) ? new ZipfDistribution(keyCardinality, 1.07) : null; + GeneratorFunction baseGeneratorFunction = index -> generateDataPoint( @@ -225,7 +243,8 @@ private static DataStream createDataGenStream( infiniteCardinality, keyCardinality, distribution, - random0); + random0, + zipf); DataGeneratorSource baseDataGenSource = new DataGeneratorSource<>( @@ -238,6 +257,8 @@ private static DataStream createDataGenStream( for (int i = 1; i < numSources; i++) { final int sourceId = i; Random randomSource = new Random(40L + i); + ZipfDistribution zipfSource = + "zipfian".equals(distribution) ? new ZipfDistribution(keyCardinality, 1.07) : null; GeneratorFunction generatorFunction = index -> generateDataPoint( @@ -248,7 +269,8 @@ private static DataStream createDataGenStream( infiniteCardinality, keyCardinality, distribution, - randomSource); + randomSource, + zipfSource); DataGeneratorSource dataGenSource = new DataGeneratorSource<>( diff --git a/pom.xml b/pom.xml index 8f06c4a..45f2673 100644 --- a/pom.xml +++ b/pom.xml @@ -158,6 +158,11 @@ under the License. commons-codec ${commons.codec.version} + + org.apache.commons + commons-math3 + 3.6.1 +