Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.linkedin.avro.api;

import java.util.List;

/**
* A {@link List} implementation with additional functions to prevent boxing.
*/
public interface PrimitiveFloatList extends List<Float> {
/**
* @param index index of the element to return
* @return the element at the specified position in this list
*/
float getPrimitive(int index);

/**
* @param e element whose presence in this collection is to be ensured
* @return <tt>true</tt> if this collection changed as a result of the call
*/
boolean addPrimitive(float e);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.api.PrimitiveFloatList;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.AbstractList;
Expand Down Expand Up @@ -33,8 +34,8 @@
*
* TODO: Provide arrays for other primitive types.
*/
public class PrimitiveFloatList extends AbstractList<Float>
implements GenericArray<Float>, Comparable<GenericArray<Float>> {
public class ByteBufferBackedPrimitiveFloatList extends AbstractList<Float>
implements GenericArray<Float>, Comparable<GenericArray<Float>>, PrimitiveFloatList {
private static final float[] EMPTY = new float[0];
private static final int FLOAT_SIZE = Float.BYTES;
private static final Schema FLOAT_SCHEMA = Schema.create(Schema.Type.FLOAT);
Expand All @@ -44,15 +45,15 @@ public class PrimitiveFloatList extends AbstractList<Float>
private boolean isCached = false;
private CompositeByteBuffer byteBuffer;

public PrimitiveFloatList(int capacity) {
public ByteBufferBackedPrimitiveFloatList(int capacity) {
if (capacity != 0) {
elements = new float[capacity];
}
// create empty ByteBuffer if capacity != 0 ( List<Float> interface usage case)
byteBuffer = new CompositeByteBuffer(capacity != 0);
}

public PrimitiveFloatList(Collection<Float> c) {
public ByteBufferBackedPrimitiveFloatList(Collection<Float> c) {
if (c != null) {
elements = new float[c.size()];
addAll(c);
Expand All @@ -61,21 +62,21 @@ public PrimitiveFloatList(Collection<Float> c) {
}

/**
* Instantiate (or re-use) and populate a {@link PrimitiveFloatList} from a {@link org.apache.avro.io.Decoder}.
* Instantiate (or re-use) and populate a {@link ByteBufferBackedPrimitiveFloatList} from a {@link org.apache.avro.io.Decoder}.
*
* N.B.: the caller must ensure the data is of the appropriate type by calling {@link #isFloatArray(Schema)}.
*
* @param old old {@link PrimitiveFloatList} to reuse
* @param old old {@link ByteBufferBackedPrimitiveFloatList} to reuse
* @param in {@link org.apache.avro.io.Decoder} to read new list from
* @return a {@link PrimitiveFloatList} with data, possibly the old argument reused
* @return a {@link ByteBufferBackedPrimitiveFloatList} with data, possibly the old argument reused
* @throws IOException on io errors
*/
public static Object readPrimitiveFloatArray(Object old, Decoder in) throws IOException {
long length = in.readArrayStart();
long totalLength = 0;

if (length > 0) {
PrimitiveFloatList array = (PrimitiveFloatList) newPrimitiveFloatArray(old);
ByteBufferBackedPrimitiveFloatList array = (ByteBufferBackedPrimitiveFloatList) newPrimitiveFloatArray(old);
int index = 0;

do {
Expand All @@ -90,11 +91,11 @@ public static Object readPrimitiveFloatArray(Object old, Decoder in) throws IOEx
setupElements(array, (int)totalLength);
return array;
} else {
return new PrimitiveFloatList(0);
return new ByteBufferBackedPrimitiveFloatList(0);
}
}

private static void setupElements(PrimitiveFloatList list, int totalSize) {
private static void setupElements(ByteBufferBackedPrimitiveFloatList list, int totalSize) {
if (list.elements.length != 0) {
if (totalSize <= list.getCapacity()) {
// reuse the float array directly
Expand All @@ -111,7 +112,7 @@ private static void setupElements(PrimitiveFloatList list, int totalSize) {

/**
* @param expected {@link Schema} to inspect
* @return true if the {@code expected} SCHEMA is of the right type to decode as a {@link PrimitiveFloatList}
* @return true if the {@code expected} SCHEMA is of the right type to decode as a {@link ByteBufferBackedPrimitiveFloatList}
* false otherwise
*/
public static boolean isFloatArray(Schema expected) {
Expand All @@ -120,15 +121,15 @@ public static boolean isFloatArray(Schema expected) {
}

private static Object newPrimitiveFloatArray(Object old) {
if (old instanceof PrimitiveFloatList) {
PrimitiveFloatList oldFloatList = (PrimitiveFloatList) old;
if (old instanceof ByteBufferBackedPrimitiveFloatList) {
ByteBufferBackedPrimitiveFloatList oldFloatList = (ByteBufferBackedPrimitiveFloatList) old;
oldFloatList.byteBuffer.clear();
oldFloatList.isCached = false;
oldFloatList.size = 0;
return oldFloatList;
} else {
// Just a place holder, will set up the elements later.
return new PrimitiveFloatList(0);
return new ByteBufferBackedPrimitiveFloatList(0);
}
}

Expand Down Expand Up @@ -282,8 +283,8 @@ public Float peek() {
@Override
public int compareTo(GenericArray<Float> that) {
cacheFromByteBuffer();
if (that instanceof PrimitiveFloatList) {
PrimitiveFloatList thatPrimitiveList = (PrimitiveFloatList) that;
if (that instanceof ByteBufferBackedPrimitiveFloatList) {
ByteBufferBackedPrimitiveFloatList thatPrimitiveList = (ByteBufferBackedPrimitiveFloatList) that;
if (this.size == thatPrimitiveList.size) {
for (int i = 0; i < this.size; i++) {
int compare = Float.compare(this.elements[i], thatPrimitiveList.elements[i]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,10 +589,10 @@ private void processArray(JVar arraySchemaVar, final String name, final Schema a

final JVar arrayVar = action.getShouldRead() ? declareValueVar(name, arraySchema, parentBody) : null;
/**
* Special optimization for float array by leveraging {@link PrimitiveFloatList}.
* Special optimization for float array by leveraging {@link ByteBufferBackedPrimitiveFloatList}.
*/
if (action.getShouldRead() && arraySchema.getElementType().getType().equals(Schema.Type.FLOAT)) {
JClass primitiveFloatList = codeModel.ref(PrimitiveFloatList.class);
JClass primitiveFloatList = codeModel.ref(ByteBufferBackedPrimitiveFloatList.class);
JExpression readPrimitiveFloatArrayInvocation = primitiveFloatList.staticInvoke("readPrimitiveFloatArray").
arg(reuseSupplier.get()).arg(JExpr.direct(DECODER));
JExpression castedResult =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.avro.fastserde;

import org.apache.avro.generic.ColdGenericDatumReader;
import org.apache.avro.generic.ColdSpecificDatumReader;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
Expand Down Expand Up @@ -478,7 +480,7 @@ public static class FastDeserializerWithAvroSpecificImpl<V> implements FastDeser
private final SpecificDatumReader<V> datumReader;

public FastDeserializerWithAvroSpecificImpl(Schema writerSchema, Schema readerSchema) {
this.datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
this.datumReader = new ColdSpecificDatumReader<>(writerSchema, readerSchema);
}

@Override
Expand All @@ -491,7 +493,7 @@ public static class FastDeserializerWithAvroGenericImpl<V> implements FastDeseri
private final GenericDatumReader<V> datumReader;

public FastDeserializerWithAvroGenericImpl(Schema writerSchema, Schema readerSchema) {
this.datumReader = new GenericDatumReader<>(writerSchema, readerSchema);
this.datumReader = new ColdGenericDatumReader<>(writerSchema, readerSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.linkedin.avro.fastserde.coldstart;

import com.linkedin.avro.api.PrimitiveFloatList;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;

/**
* A {@link PrimitiveFloatList} implementation which is equivalent in all respect to the vanilla Avro
* implementation, both in terms of functionality and (lack of) performance. It provides the primitive
* API that the interface requires, but actually just returns an unboxed Float Object, thus providing
* no GC benefit. This should be possible to improve upon in the future, however.
*
* The main motivation for this class is merely to provide a guarantee that the extended API is always
* available, even when Fast-Avro isn't warmed up yet.
*/
public class ColdPrimitiveFloatList extends GenericData.Array<Float> implements PrimitiveFloatList {
private static final Schema SCHEMA = Schema.createArray(Schema.create(Schema.Type.FLOAT));
public ColdPrimitiveFloatList(int capacity) {
super(capacity, SCHEMA);
}

@Override
public float getPrimitive(int index) {
return get(index);
}

@Override
public boolean addPrimitive(float o) {
return add(o);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.avro.generic;

import com.linkedin.avro.fastserde.coldstart.ColdPrimitiveFloatList;
import java.util.Collection;
import org.apache.avro.Schema;


/**
* An interface with default implementation in order to defeat the lack of multiple inheritance.
*/
public interface ColdDatumReaderMixIn {
default Object newArray(Object old, int size, Schema schema, NewArrayFunction fallBackFunction) {
switch (schema.getElementType().getType()) {
case FLOAT:
if (null == old || !(old instanceof ColdPrimitiveFloatList)) {
return new ColdPrimitiveFloatList(size);
}
((Collection) old).clear();
return old;
// TODO: Add more cases when we support more primitive array types
default:
return fallBackFunction.newArray(old, size, schema);
}
}

interface NewArrayFunction {
Object newArray(Object old, int size, Schema schema);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.avro.generic;

import org.apache.avro.Schema;


/**
* A light-weight extension of {@link GenericDatumReader} which merely ensures that the types of the
* extended API are always returned.
*
* This class needs to be in the org.apache.avro.generic package in order to access protected methods.
*/
public class ColdGenericDatumReader<T> extends GenericDatumReader<T> implements ColdDatumReaderMixIn {
public ColdGenericDatumReader(Schema writerSchema, Schema readerSchema) {
super(writerSchema, readerSchema);
}

@Override
protected Object newArray(Object old, int size, Schema schema) {
return newArray(old, size, schema, super::newArray);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.apache.avro.generic;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificDatumReader;


/**
* A light-weight extension of {@link SpecificDatumReader} which merely ensures that the types of
* the extended API are always returned.
*
* This class needs to be in the org.apache.avro.generic package in order to access protected methods.
*/
public class ColdSpecificDatumReader<T> extends SpecificDatumReader<T> implements ColdDatumReaderMixIn {
public ColdSpecificDatumReader(Schema writerSchema, Schema readerSchema) {
super(writerSchema, readerSchema);
}

@Override
protected Object newArray(Object old, int size, Schema schema) {
return newArray(old, size, schema, super::newArray);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testPrimitiveFloatListAddPrimitive() {
long startTime = System.currentTimeMillis();

for (int i = 0; i < iteration; i++) {
PrimitiveFloatList list = new PrimitiveFloatList(array_size);
ByteBufferBackedPrimitiveFloatList list = new ByteBufferBackedPrimitiveFloatList(array_size);

for (int l = 0; l < array_size; l++) {
list.addPrimitive((float) l);
Expand Down
Loading