Skip to content
This repository was archived by the owner on Aug 3, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.flink.benchmark.full;

import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;

import java.lang.reflect.Field;

public class OffheapInputWrapper {
public SpillingAdaptiveSpanningRecordDeserializer<?> reader;
public Buffer buffer;
public DataInputView dataInput;

public OffheapInputWrapper(byte[] initialPayload) throws Exception {
reader = new SpillingAdaptiveSpanningRecordDeserializer<>(new String[0]);
MemorySegment segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(initialPayload.length, this);
segment.put(0, initialPayload);
buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, initialPayload.length);
Field nonSpanningWrapper = reader.getClass().getDeclaredField("nonSpanningWrapper");
nonSpanningWrapper.setAccessible(true);
dataInput = (DataInputView) nonSpanningWrapper.get(reader);
}
Comment on lines +18 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of issues here.

First, this relays on a non public Flink's API (SpillingAdaptiveSpanningRecordDeserializer), which means it can change at any point of time and brake the benchmark builds.

Secondly, this is using reflections to access a private field of an Internal class, which makes this benchmark even more fragile (and could cause compilation issues like this recent one).

Is there a way to avoid using internal classes? If there is really a no way, then it could be fixed similarly how #40 is refactoring the 15199 (take a look at org.apache.flink.contrib.streaming.state.benchmark.StateBackendBenchmarkUtils class).

Here in this case, the OffheapInputWrapper class should be defined and tested in flink repository, with some comment/annotation that it's being used by micro benchmarks. + code should be adjusted that the reflection is no longer needed.


public void reset() throws Exception {
reader.setNextBuffer(buffer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,16 @@
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.benchmark.BenchmarkBase;
import org.apache.flink.benchmark.SerializationFrameworkMiniBenchmarks;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.*;
import org.apache.flink.formats.avro.typeutils.AvroSerializer;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
import org.openjdk.jmh.annotations.*;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
Expand All @@ -39,13 +36,16 @@ public class PojoSerializationBenchmark extends BenchmarkBase {
TypeSerializer<org.apache.flink.benchmark.avro.MyPojo> avroSerializer =
new AvroSerializer<>(org.apache.flink.benchmark.avro.MyPojo.class);

ByteArrayInputStream pojoBuffer;
ByteArrayInputStream avroBuffer;
ByteArrayInputStream kryoBuffer;
OffheapInputWrapper pojoBuffer;
OffheapInputWrapper avroBuffer;
OffheapInputWrapper kryoBuffer;

DataOutputSerializer stream = new DataOutputSerializer(128);

public static final int INVOCATIONS = 1000;

@Setup
public void setup() throws IOException {
public void setup() throws Exception {
pojo = new SerializationFrameworkMiniBenchmarks.MyPojo(
0,
"myName",
Expand All @@ -70,9 +70,9 @@ public void setup() throws IOException {
2,
3,
"null");
pojoBuffer = new ByteArrayInputStream(write(pojoSerializer, pojo));
avroBuffer = new ByteArrayInputStream(write(avroSerializer, avroPojo));
kryoBuffer = new ByteArrayInputStream(write(kryoSerializer, pojo));
pojoBuffer = new OffheapInputWrapper(writePayload(pojoSerializer, pojo));
avroBuffer = new OffheapInputWrapper(writePayload(avroSerializer, avroPojo));
kryoBuffer = new OffheapInputWrapper(writePayload(kryoSerializer, pojo));
}

public static void main(String[] args)
Expand All @@ -86,42 +86,69 @@ public static void main(String[] args)
}

@Benchmark
public byte[] writePojo() throws IOException {
return write(pojoSerializer, pojo);
@OperationsPerInvocation(INVOCATIONS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could this be moved to the top of the class?

public int writePojo() throws IOException {
stream.pruneBuffer();
for (int i = 0; i < INVOCATIONS; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you motivate why do we need multiple invocations in a single benchmark? Because of potential costs of stream.pruneBuffer(); or pojoBuffer.reset();? Are they measurable?

As it is, I would be a little be concerned what magic JIT can do after inlining pojoSerializer.serialize(pojo, stream); and unrolling the loop. That might be desireable (vectorisation) but as we are invoking it over and over again with the same parameters, some other magic could yield false results.

pojoSerializer.serialize(pojo, stream);
}
return stream.length();
}

@Benchmark
public byte[] writeAvro() throws IOException {
return write(avroSerializer, avroPojo);
@OperationsPerInvocation(INVOCATIONS)
public int writeAvro() throws IOException {
stream.pruneBuffer();
for (int i = 0; i < INVOCATIONS; i++) {
avroSerializer.serialize(avroPojo, stream);
}
return stream.length();
}

@Benchmark
public byte[] writeKryo() throws IOException {
return write(kryoSerializer, pojo);
@OperationsPerInvocation(INVOCATIONS)
public int writeKryo() throws IOException {
stream.pruneBuffer();
for (int i = 0; i < INVOCATIONS; i++) {
kryoSerializer.serialize(pojo, stream);
}
return stream.length();
}

@Benchmark
public SerializationFrameworkMiniBenchmarks.MyPojo readPojo() throws IOException {
@OperationsPerInvocation(INVOCATIONS)
public void readPojo(Blackhole bh) throws Exception {
pojoBuffer.reset();
return pojoSerializer.deserialize(new DataInputViewStreamWrapper(pojoBuffer));
for (int i = 0; i < INVOCATIONS; i++) {
bh.consume(pojoSerializer.deserialize(pojoBuffer.dataInput));
}
}

@Benchmark
public SerializationFrameworkMiniBenchmarks.MyPojo readKryo() throws IOException {
@OperationsPerInvocation(INVOCATIONS)
public void readKryo(Blackhole bh) throws Exception {
kryoBuffer.reset();
return kryoSerializer.deserialize(new DataInputViewStreamWrapper(kryoBuffer));
for (int i = 0; i < INVOCATIONS; i++) {
bh.consume(kryoSerializer.deserialize(kryoBuffer.dataInput));
}
}

@Benchmark
public org.apache.flink.benchmark.avro.MyPojo readAvro() throws IOException {
@OperationsPerInvocation(INVOCATIONS)
public void readAvro(Blackhole bh) throws Exception {
avroBuffer.reset();
return avroSerializer.deserialize(new DataInputViewStreamWrapper(avroBuffer));
for (int i = 0; i < INVOCATIONS; i++) {
bh.consume(avroSerializer.deserialize(avroBuffer.dataInput));
}
}

private <T> byte[] write(TypeSerializer<T> serializer, T value) throws IOException {
private <T> byte[] writePayload(TypeSerializer<T> serializer, T value) throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
DataOutputView out = new DataOutputViewStreamWrapper(buffer);
serializer.serialize(value, out);
for (int i = 0; i < INVOCATIONS; i++) {
serializer.serialize(value, out);
}
buffer.close();
return buffer.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,15 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.benchmark.BenchmarkBase;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;
import org.openjdk.jmh.annotations.*;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -50,11 +46,13 @@ public static void main(String[] args)

ExecutionConfig config = new ExecutionConfig();
TypeSerializer<String> serializer = TypeInformation.of(String.class).createSerializer(config);
ByteArrayInputStream serializedBuffer;
DataInputView serializedStream;
DataOutputSerializer serializedStream;
OffheapInputWrapper offheapInput;

public static final int INVOCATIONS = 1000;

@Setup
public void setup() throws IOException {
public void setup() throws Exception {
length = Integer.parseInt(lengthStr);
switch (type) {
case "ascii":
Expand All @@ -69,23 +67,32 @@ public void setup() throws IOException {
default:
throw new IllegalArgumentException(type + "charset is not supported");
}
byte[] stringBytes = stringWrite();
serializedBuffer = new ByteArrayInputStream(stringBytes);
serializedStream = new DataInputViewStreamWrapper(serializedBuffer);
serializedStream = new DataOutputSerializer(128);
DataOutputSerializer payloadWriter = new DataOutputSerializer(128);
for (int i = 0; i < INVOCATIONS; i++) {
serializer.serialize(input, payloadWriter);
}
byte[] payload = payloadWriter.getCopyOfBuffer();
offheapInput = new OffheapInputWrapper(payload);
}

@Benchmark
public byte[] stringWrite() throws IOException {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
DataOutputView out = new DataOutputViewStreamWrapper(buffer);
serializer.serialize(input, out);
return buffer.toByteArray();
@OperationsPerInvocation(INVOCATIONS)
public int stringWrite() throws IOException {
serializedStream.pruneBuffer();
for (int i = 0; i < INVOCATIONS; i++) {
serializer.serialize(input, serializedStream);
}
return serializedStream.length();
}

@Benchmark
public String stringRead() throws IOException {
serializedBuffer.reset();
return serializer.deserialize(serializedStream);
@OperationsPerInvocation(INVOCATIONS)
public void stringRead(Blackhole bh) throws Exception {
offheapInput.reset();
for (int i = 0; i < INVOCATIONS; i++) {
bh.consume(serializer.deserialize(offheapInput.dataInput));
}
}

private String generate(char[] charset, int length) {
Expand Down