Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/how-to-guides/test-an-app.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ The following are the supported KSQL statements in `run-ksql-test`:
- `UNSET`
- `INSERT INTO`
- `INSERT VALUES`
- `DEFINE`
- `UNDEFINE`

There are also four test only statements for verifying data.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@
import io.confluent.ksql.parser.tree.AssertValues;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
import io.confluent.ksql.parser.tree.DefineVariable;
import io.confluent.ksql.parser.tree.DropStatement;
import io.confluent.ksql.parser.tree.InsertValues;
import io.confluent.ksql.parser.tree.SetProperty;
import io.confluent.ksql.parser.tree.UndefineVariable;
import io.confluent.ksql.parser.tree.UnsetProperty;
import io.confluent.ksql.properties.PropertyOverrider;
import io.confluent.ksql.query.QueryId;
Expand Down Expand Up @@ -124,6 +126,7 @@ public class SqlTestExecutor implements Closeable {
private KafkaTopicClient topicClient;
private Path tmpFolder;
private final Map<String, Object> overrides;
private final Map<String, String> variables;
private final Map<QueryId, DriverAndProperties> drivers;

// populated during execution to handle the expected exception
Expand Down Expand Up @@ -204,6 +207,7 @@ public void onDeregister(final QueryMetadata query) {
this.formatInjector = new DefaultFormatInjector();
this.topicClient = requireNonNull(topicClient, "topicClient");
this.overrides = new HashMap<>();
this.variables = new HashMap<>();
this.drivers = drivers;
this.tmpFolder = requireNonNull(tmpFolder, "tmpFolder");
}
Expand Down Expand Up @@ -242,7 +246,7 @@ private void doAssert(final AssertStatement statement) {
}

private void execute(final ParsedStatement parsedStatement) {
final PreparedStatement<?> engineStatement = engine.prepare(parsedStatement);
final PreparedStatement<?> engineStatement = engine.prepare(parsedStatement, variables);
final ConfiguredStatement<?> configured = ConfiguredStatement
.of(engineStatement, SessionConfig.of(config, overrides));

Expand All @@ -257,6 +261,12 @@ private void execute(final ParsedStatement parsedStatement) {
} else if (engineStatement.getStatement() instanceof UnsetProperty) {
PropertyOverrider.unset((ConfiguredStatement<UnsetProperty>) configured, overrides);
return;
} else if (engineStatement.getStatement() instanceof DefineVariable variableStatement) {
variables.put(variableStatement.getVariableName(), variableStatement.getVariableValue());
return;
} else if (engineStatement.getStatement() instanceof UndefineVariable variableStatement) {
variables.remove(variableStatement.getVariableName());
return;
}

final ConfiguredStatement<?> injected = formatInjector.inject(configured);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ CREATE OR REPLACE STREAM bar AS SELECT id, col1 + 1 as col1 FROM foo;
INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 1);
ASSERT VALUES bar (rowtime, id, col1) VALUES (1, 1, 2);

----------------------------------------------------------------------------------------------------
--@test: basic test with `DEFINE` statement
----------------------------------------------------------------------------------------------------
DEFINE someVar='bar';
CREATE STREAM foo (id INT KEY, col1 VARCHAR) WITH (kafka_topic='foo', value_format='JSON');

INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 'val-${someVar}');
ASSERT VALUES foo (rowtime, id, col1) VALUES (1, 1, 'val-bar');

----------------------------------------------------------------------------------------------------
--@test: basic test with `UNDEFINE` statement
----------------------------------------------------------------------------------------------------
DEFINE someVar='bar';
UNDEFINE someVar;
CREATE STREAM foo (id INT KEY, col1 VARCHAR) WITH (kafka_topic='foo', value_format='JSON');

INSERT INTO foo (rowtime, id, col1) VALUES (1, 1, 'val-${someVar}');
ASSERT VALUES foo (rowtime, id, col1) VALUES (1, 1, 'val-');

----------------------------------------------------------------------------------------------------
--@test: bad assert statement should fail

Expand Down