Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.commons.math3.distribution.ZipfDistribution;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -136,9 +137,10 @@ private static Namespace parseArgs(String[] args) {
parser
.addArgument("--distribution")
.type(String.class)
.choices("uniform", "normal")
.choices("uniform", "normal", "zipfian")
.setDefault("uniform")
.help("Distribution type for generated values: uniform or normal (default: uniform)");
.help(
"Distribution type for generated values: uniform, normal, or zipfian (default: uniform)");

Namespace parsedArgs = parser.parseArgsOrFail(args);
return parsedArgs;
Expand Down Expand Up @@ -179,10 +181,19 @@ private static DataPoint generateDataPoint(
boolean infiniteCardinality,
int keyCardinality,
String distribution,
Random randomSource) {
Random randomSource,
ZipfDistribution zipf) {
// Calculate timestamp so that N items fit into each tumbling window
long timestamp = baseTimestamp + (index / itemsPerWindow) * millisecondTumblingWindow;
String key = infiniteCardinality ? "key" + index : "key" + ((index % keyCardinality) + 1);

String key;
if (infiniteCardinality) key = "key" + index;
else if ("zipfian".equals(distribution)) {
int zipfkey = zipf.sample();
key = "key" + zipfkey;
} else {
key = "key" + ((index % keyCardinality) + 1);
}

// Generate value based on distribution type
int value;
Expand All @@ -191,6 +202,8 @@ private static DataPoint generateDataPoint(
double normalValue = randomSource.nextGaussian() * 166667 + 500000;
// Clamp to 1-1,000,000 range
value = (int) Math.max(1, Math.min(1000000, normalValue));
} else if ("zipfian".equals(distribution)) {
value = 1;
} else {
// Uniform distribution (default): 1-1,000,000
value = randomSource.nextInt(1000000) + 1;
Expand All @@ -215,6 +228,11 @@ private static DataStream<DataPoint> createDataGenStream(

// Create first source
Random random0 = new Random(40L);

// exponent 1.07 is the YCSB (Yahoo! Cloud Serving Benchmark) benchmark
ZipfDistribution zipf =
"zipfian".equals(distribution) ? new ZipfDistribution(keyCardinality, 1.07) : null;

GeneratorFunction<Long, DataPoint> baseGeneratorFunction =
index ->
generateDataPoint(
Expand All @@ -225,7 +243,8 @@ private static DataStream<DataPoint> createDataGenStream(
infiniteCardinality,
keyCardinality,
distribution,
random0);
random0,
zipf);

DataGeneratorSource<DataPoint> baseDataGenSource =
new DataGeneratorSource<>(
Expand All @@ -238,6 +257,8 @@ private static DataStream<DataPoint> createDataGenStream(
for (int i = 1; i < numSources; i++) {
final int sourceId = i;
Random randomSource = new Random(40L + i);
ZipfDistribution zipfSource =
"zipfian".equals(distribution) ? new ZipfDistribution(keyCardinality, 1.07) : null;
GeneratorFunction<Long, DataPoint> generatorFunction =
index ->
generateDataPoint(
Expand All @@ -248,7 +269,8 @@ private static DataStream<DataPoint> createDataGenStream(
infiniteCardinality,
keyCardinality,
distribution,
randomSource);
randomSource,
zipfSource);

DataGeneratorSource<DataPoint> dataGenSource =
new DataGeneratorSource<>(
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,11 @@ under the License.
<artifactId>commons-codec</artifactId>
<version>${commons.codec.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>

<!-- Logging framework -->
<dependency>
Expand Down