Skip to content

Commit 2ef9dbd

Browse files
committed
feat: Move examples to a package and re-write the buggy Parquet datasource
feat: Show usage guide in README
1 parent 29054df commit 2ef9dbd

8 files changed

Lines changed: 397 additions & 203 deletions

File tree

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,8 @@ TODO
9898
### Datasets ###
9999
datasets/
100100

101+
### Python Environment ###
102+
env/
103+
.venv/
104+
101105
# End of https://www.toptal.com/developers/gitignore/api/java,maven,visualstudiocode,gradle

README.md

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,47 @@ queries.
133133
- Uses vectorizewd Volcano-style iterator model
134134
- Processes data in batches for efficiency
135135
- Supports push-down optimizations
136-
- Implements memory-efficient operations
136+
- Implements memory-efficient operations
137+
138+
### Running the examples
139+
140+
You will probably want to use an IDE like IntelliJ or what I personally recommend VSCode with the
141+
Java pack at least for working with the codebase but you are free to use ed or nano as well.
142+
143+
Running this thing will require Maven for no other reason than trying to run it without Maven
144+
has made me realize this will be the last and only time I write Java as a hobby or professionaly.
145+
146+
If you don't want Maven; you should be able to figure it out.
147+
148+
```sh
149+
150+
$ export JDK_JAVA_OPTIONS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
151+
$ export MAVEN_OPTS="--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED"
152+
153+
$ mvn compile exec:java --file glint/pom.xml
154+
```
155+
156+
```
157+
Schema [fields=[(name: passenger_count, type: Int(32, true)), (name: MAX, type: FloatingPoint(SINGLE))]]
158+
159+
Logical Plan: Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(CAST(#fare_amount AS FLOAT))]
160+
Scan: parquet_scan [projection=None]
161+
Optimized Plan: Aggregate: groupExpr=[#passenger_count], aggregateExpr=[MAX(CAST(#fare_amount AS FLOAT))]
162+
Scan: parquet_scan [projection=None]
163+
164+
Results:
165+
166+
0,36090.3
167+
1,623259.9
168+
2,492.5
169+
3,350.0
170+
4,500.0
171+
5,760.0
172+
6,262.5
173+
7,78.0
174+
8,87.0
175+
9,92.0
176+
null,103.2
177+
178+
Query took 2758 ms
179+
```

glint/pom.xml

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,17 @@
3333
<dependency>
3434
<groupId>org.apache.arrow</groupId>
3535
<artifactId>arrow-vector</artifactId>
36-
<version>9.0.0</version>
36+
<version>18.0.0</version>
3737
</dependency>
3838
<dependency>
3939
<groupId>org.apache.arrow</groupId>
4040
<artifactId>arrow-memory-netty</artifactId>
41-
<version>9.0.0</version>
41+
<version>18.0.0</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.arrow</groupId>
45+
<artifactId>arrow-dataset</artifactId>
46+
<version>18.0.0</version>
4247
</dependency>
4348
<dependency>
4449
<groupId>com.google.protobuf</groupId>
@@ -145,6 +150,18 @@
145150
<artifactId>maven-project-info-reports-plugin</artifactId>
146151
<version>3.0.0</version>
147152
</plugin>
153+
<plugin>
154+
<groupId>org.codehaus.mojo</groupId>
155+
<artifactId>exec-maven-plugin</artifactId>
156+
<version>3.5.0</version>
157+
<configuration>
158+
<mainClass>co.clflushopt.glint.App</mainClass>
159+
<arguments>
160+
<argument>--add-opens</argument>
161+
<argument>java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED</argument>
162+
</arguments>
163+
</configuration>
164+
</plugin>
148165
</plugins>
149166
</pluginManagement>
150167
</build>
Lines changed: 2 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,8 @@
11
package co.clflushopt.glint;
22

33
import java.io.FileNotFoundException;
4-
import java.util.Arrays;
5-
import java.util.Iterator;
6-
import java.util.List;
7-
import java.util.Optional;
84

9-
import org.apache.arrow.vector.types.pojo.ArrowType;
10-
11-
import co.clflushopt.glint.core.CsvReaderOptions;
12-
import co.clflushopt.glint.core.ExecutionContext;
13-
import co.clflushopt.glint.dataframe.DataFrame;
14-
import co.clflushopt.glint.query.logical.expr.AggregateExpr;
15-
import co.clflushopt.glint.query.logical.expr.CastExpr;
16-
import co.clflushopt.glint.query.logical.expr.ColumnExpr;
17-
import co.clflushopt.glint.query.logical.expr.LogicalExpr;
18-
import co.clflushopt.glint.query.logical.plan.LogicalPlan;
19-
import co.clflushopt.glint.query.optimizer.QueryOptimizer;
20-
import co.clflushopt.glint.types.ArrowTypes;
21-
import co.clflushopt.glint.types.Field;
22-
import co.clflushopt.glint.types.RecordBatch;
23-
import co.clflushopt.glint.types.Schema;
5+
import co.clflushopt.glint.examples.NYCYellowTrips;
246

257
/**
268
* Hello world!
@@ -30,78 +12,10 @@ public class App {
3012
public static void main(String[] args) {
3113
System.out.println("Welcome to the Glint query compiler");
3214
try {
33-
nycTripsBenchmark(args);
15+
NYCYellowTrips.runParquetExample();
3416
} catch (FileNotFoundException e) {
3517
e.printStackTrace();
3618
}
3719
}
3820

39-
public static void nycTripsBenchmark(String[] args) throws FileNotFoundException {
40-
// Create execution context
41-
ExecutionContext ctx = ExecutionContext.create().build();
42-
43-
long startTime = System.currentTimeMillis();
44-
try {
45-
// Define the schema for NYC Taxi dataset
46-
Schema schema = new Schema(Arrays.asList(new Field("VendorID", ArrowTypes.Int32Type),
47-
new Field("tpep_pickup_datetime", ArrowTypes.StringType), // Could be Timestamp
48-
new Field("tpep_dropoff_datetime", ArrowTypes.StringType), // Could be Timestamp
49-
new Field("passenger_count", ArrowTypes.Int32Type),
50-
new Field("trip_distance", ArrowTypes.DoubleType),
51-
new Field("pickup_longitude", ArrowTypes.DoubleType),
52-
new Field("pickup_latitude", ArrowTypes.DoubleType),
53-
new Field("RatecodeID", ArrowTypes.Int32Type),
54-
new Field("store_and_fwd_flag", ArrowTypes.StringType),
55-
new Field("dropoff_longitude", ArrowTypes.DoubleType),
56-
new Field("dropoff_latitude", ArrowTypes.DoubleType),
57-
new Field("payment_type", ArrowTypes.Int32Type),
58-
new Field("fare_amount", ArrowTypes.DoubleType),
59-
new Field("extra", ArrowTypes.DoubleType),
60-
new Field("mta_tax", ArrowTypes.DoubleType),
61-
new Field("tip_amount", ArrowTypes.DoubleType),
62-
new Field("tolls_amount", ArrowTypes.DoubleType),
63-
new Field("improvement_surcharge", ArrowTypes.DoubleType),
64-
new Field("total_amount", ArrowTypes.DoubleType)));
65-
// Create DataFrame and apply transformations
66-
DataFrame df = ctx
67-
.readCsv("./datasets/yellow_tripdata_example.csv", Optional.of(schema),
68-
CsvReaderOptions.builder().delimiter(',').hasHeader(true).build())
69-
.aggregate(List.of(col("passenger_count")),
70-
List.of(max(cast(col("fare_amount"), ArrowTypes.FloatType))));
71-
72-
System.out.println("Logical Plan:\t" + LogicalPlan.format(df.getLogicalPlan()));
73-
System.out.println("Schema:\t" + df.getSchema());
74-
75-
// Optimize and execute the plan
76-
LogicalPlan optimizedPlan = QueryOptimizer.optimize(df.getLogicalPlan());
77-
System.out.println("Optimized Plan:\t" + LogicalPlan.format(optimizedPlan));
78-
79-
// Execute and print results
80-
Iterator<RecordBatch> results = ctx.execute(optimizedPlan);
81-
82-
while (results.hasNext()) {
83-
RecordBatch batch = results.next();
84-
System.out.println(batch.getSchema());
85-
System.out.println(batch.toCsv());
86-
87-
}
88-
89-
} finally {
90-
long endTime = System.currentTimeMillis();
91-
System.out.println("Query took " + (endTime - startTime) + " ms");
92-
}
93-
}
94-
95-
// Helper methods for creating expressions
96-
private static LogicalExpr col(String name) {
97-
return new ColumnExpr(name);
98-
}
99-
100-
private static LogicalExpr cast(LogicalExpr expr, ArrowType targetType) {
101-
return new CastExpr(expr, targetType);
102-
}
103-
104-
private static AggregateExpr max(LogicalExpr expr) {
105-
return new AggregateExpr.Max(expr);
106-
}
10721
}

glint/src/main/java/co/clflushopt/glint/core/ExecutionContext.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import co.clflushopt.glint.dataframe.DataFrame;
1010
import co.clflushopt.glint.dataframe.DataFrameImpl;
1111
import co.clflushopt.glint.datasource.CsvDataSource;
12+
import co.clflushopt.glint.datasource.DataSource;
1213
import co.clflushopt.glint.datasource.ParquetDataSource;
1314
import co.clflushopt.glint.query.logical.plan.LogicalPlan;
1415
import co.clflushopt.glint.query.logical.plan.Scan;
@@ -109,7 +110,12 @@ public DataFrame readCsv(String path, Schema schema, CsvReaderOptions options)
109110
* @param df
110111
*/
111112
public DataFrame readParquet(String path, Optional<Schema> schema) {
112-
var source = new ParquetDataSource(path);
113+
DataSource source;
114+
if (schema.isPresent()) {
115+
source = new ParquetDataSource(path, schema.get());
116+
} else {
117+
source = new ParquetDataSource(path);
118+
}
113119
return new DataFrameImpl(new Scan("parquet_scan", source, Collections.emptyList()));
114120
}
115121

0 commit comments

Comments
 (0)