Java-based Apache Pulsar Functions project for processing temperature sensor events in two stages:
- Enforce and enrich incoming message schema with a broker publish timestamp.
- Convert Fahrenheit temperature values to Celsius and add a processing timestamp.
This project defines two Pulsar Functions that can be used as separate flows.
Function: TempSchemaEnforcerFunction
- Input type:
SensorReadingTemp - Output type:
SensorReadingTempSchema - Purpose:
- Standardize the message shape before further processing.
- Add
pulsar_timestampusing the Pulsar publish time from message metadata.
Flow:
input message -> schema message
Functions:
TempSchemaEnforcerFunctionTemperatureProcessorFunction
TemperatureProcessorFunction details:
- Input type:
SensorReadingTempSchema - Output type:
SensorReadingProcessedTemp - Purpose:
- Keep the original Fahrenheit value.
- Compute Celsius value in
updated_temperature_celsius. - Add
processed_timestampin UTC format.
Flow:
input message -> schema message -> processed message
Functions are under src/main/java/org/example, and schema classes are under src/main/java/org/example/schema.
{
"message_id": "string",
"temperature": 0.0,
"timestamp": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS",
"payload": "string"
}{
"message_id": "string",
"temperature": 0.0,
"timestamp": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS",
"pulsar_timestamp": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS",
"payload": "string"
}{
"message_id": "string",
"temperature_fahrenheit": 0.0,
"updated_temperature_celsius": 0.0,
"produced_timestamp": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS",
"processed_timestamp": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS",
"pulsar_timestamp": "yyyy-MM-dd'T'HH:mm:ss.SSSSSSSSS",
"payload": "string"
}Raw input topic -> TempSchemaEnforcerFunction -> schema topic
Raw input topic -> TempSchemaEnforcerFunction -> schema topic -> TemperatureProcessorFunction -> processed topic
- Java 17+
- Maven 3.8+
- Apache Pulsar cluster with Pulsar Functions enabled
mvn clean packageThe build uses maven-shade-plugin to package dependencies into a deployable jar in target/.
Update topic names, tenant/namespace, broker URL, and jar path for your environment.
docker exec -it pulsar bin/pulsar-admin functions create \
--tenant public \
--namespace default \
--name temp-schema-fn \
--jar /pulsar/functions/PulsarFunction-1.0-SNAPSHOT.jar \
--classname org.example.TempSchemaEnforcerFunction \
--inputs persistent://public/default/temp \
--output persistent://public/default/temp_schema \
--parallelism 1docker exec -it pulsar bin/pulsar-admin functions create \
--tenant public \
--namespace default \
--name temp-processing-fn \
--jar /pulsar/functions/PulsarFunction-1.0-SNAPSHOT.jar \
--classname org.example.TemperatureProcessorFunction \
--inputs persistent://public/default/temp_schema \
--output persistent://public/default/processed_temp \
--parallelism 1Input message (SensorReadingTemp):
{
"message_id": "m-1001",
"temperature": 77.0,
"timestamp": "2026-03-15T11:45:12.123456789",
"payload": "sensor=A12"
}Schema message output (SensorReadingTempSchema):
{
"message_id": "m-1001",
"temperature": 77.0,
"timestamp": "2026-03-15T11:45:12.123456789",
"pulsar_timestamp": "2026-03-15T11:45:12.900000000",
"payload": "sensor=A12"
}Input message (SensorReadingTemp):
{
"message_id": "m-1001",
"temperature": 77.0,
"timestamp": "2026-03-15T11:45:12.123456789",
"payload": "sensor=A12"
}Schema message (intermediate, SensorReadingTempSchema):
{
"message_id": "m-1001",
"temperature": 77.0,
"timestamp": "2026-03-15T11:45:12.123456789",
"pulsar_timestamp": "2026-03-15T11:45:12.900000000",
"payload": "sensor=A12"
}Processed message output (SensorReadingProcessedTemp):
{
"message_id": "m-1001",
"temperature_fahrenheit": 77.0,
"updated_temperature_celsius": 25.0,
"produced_timestamp": "2026-03-15T11:45:12.123456789",
"processed_timestamp": "2026-03-15T11:45:13.004500000",
"pulsar_timestamp": "2026-03-15T11:45:12.900000000",
"payload": "sensor=A12"
}- Both functions return
nullwhen input isnull. - Timestamps generated by functions are UTC-based and nanosecond-formatted strings.
- Maven configuration is defined in
pom.xmlwith Pulsar Functions API4.0.0.