Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
35e7957
Add metrics instrumentation for service health checks and upstream av…
jitendradhawan Apr 8, 2026
b22adac
Refactor RangerZkUpstreamConfiguration to use a single zookeeper stri…
jitendradhawan Apr 8, 2026
07cb447
[Fix] Collision Check concurrency handling
Apr 9, 2026
79378f7
Merge pull request #102 from himneesh9/fix/collision-checker-fix
r0goyal Apr 13, 2026
ac60486
[Fix] Use of Nanoseconds in Id generator
Apr 9, 2026
50dbb53
Merge pull request #103 from himneesh9/fix/collision-check-nanosecond…
r0goyal Apr 13, 2026
a59e0e1
Version Bump
Apr 15, 2026
3dddf43
Merge pull request #105 from himneesh9/fix/version-bump
r0goyal Apr 15, 2026
4945fbc
Removed legacy nexus plugin
santanusinha Apr 8, 2026
e990fc7
Add metrics instrumentation to service data sources and update relate…
jitendradhawan May 31, 2026
9b41df9
resolve merge conflicts with main branch
jitendradhawan May 31, 2026
7991b11
Fix the unit tests
jitendradhawan May 31, 2026
85888b2
Add integration tests for Drove API and Node Data Source metrics reco…
jitendradhawan May 31, 2026
9fbfe1c
Add integration tests for HttpApiCommunicator and HttpServiceDataSour…
jitendradhawan May 31, 2026
2cd0e22
Add integration tests for ZkNodeDataSink and ZkNodeDataSource metrics…
jitendradhawan May 31, 2026
adbcf41
Add integration tests for FinderUtils and HealthChecker metrics recor…
jitendradhawan May 31, 2026
59dfd9c
Update performance test mean operations for IdGenerator
jitendradhawan May 31, 2026
101e545
reverse merge with parent branch 2.x and resolve merge conflicts
jitendradhawan Jun 1, 2026
bf89c32
remove redundant MonitoredFunction annotations and function metrics d…
jitendradhawan Jun 1, 2026
3e2df50
Refactor HealthChecker and MetricRecorder to include DataStoreType in…
jitendradhawan Jun 1, 2026
647ea5a
Update performance test metrics and enhance timer assertions in Servi…
jitendradhawan Jun 1, 2026
6287778
Fix all the sonar issues
jitendradhawan Jun 1, 2026
f08051d
Add aggregate meters for null/empty responses and parse failures
jitendradhawan Jun 2, 2026
7b6941b
Enhance metrics recording for service nodes and services returned; up…
jitendradhawan Jun 2, 2026
ac4485f
Refactor metrics recording to include DataStoreType in metric names f…
jitendradhawan Jun 2, 2026
e612bff
Refactor HealthChecker to remove DataStoreType and metricId parameter…
jitendradhawan Jun 2, 2026
6106154
Address code review comments
jitendradhawan Jun 2, 2026
8cb0427
Add node count metrics for service registry updates and stale data re…
jitendradhawan Jun 2, 2026
44e92ba
Add unit tests for MetricRecorder methods including node count metrics
jitendradhawan Jun 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@SuperBuilder
public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D extends Deserializer<T>> implements RangerHubClient<T,R> {

private final String upstreamId;
private final String namespace;
private final ObjectMapper mapper;
private final D deserializer;
Expand All @@ -60,6 +61,7 @@ public abstract class AbstractRangerHubClient<T, R extends ServiceRegistry<T>, D

@Override
public void start() {
requireNonNull(upstreamId, "upstreamId can't be null");
requireNonNull(mapper, "Mapper can't be null");
requireNonNull(namespace, "namespace can't be null");
requireNonNull(deserializer, "deserializer can't be null");
Expand Down Expand Up @@ -88,7 +90,7 @@ public void start() {
this.excludedServices = Objects.requireNonNullElseGet(this.excludedServices, Set::of);

if(null == this.serviceDataSource){
this.serviceDataSource = getDefaultDataSource();
this.serviceDataSource = getDefaultDataSource(upstreamId);
}

this.hub = buildHub();
Expand Down Expand Up @@ -193,7 +195,7 @@ public CompletableFuture<?> addService(Service service) {
}


protected abstract ServiceDataSource getDefaultDataSource();
protected abstract ServiceDataSource getDefaultDataSource(String upstreamId);

protected abstract ServiceFinderFactory<T, R> getFinderFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ protected void postBuild(ServiceFinderHub<TestNodeData, ListBasedServiceRegistry
}

@Override
protected ServiceDataSource getDefaultDataSource() {
protected ServiceDataSource getDefaultDataSource(String upstreamId) {
return new StaticDataSource(Set.of(RangerHubTestUtils.service));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class TestServiceFinderFactory implements ServiceFinderFactory<TestNodeD
@Override
public ServiceFinder<TestNodeData, ListBasedServiceRegistry<TestNodeData>> buildFinder(Service service) {
val finder = new TestSimpleUnshardedServiceFinder<TestNodeData>()
.withUpstreamId("test-metric")
.withNamespace(service.getNamespace())
.withServiceName(service.getServiceName())
.withDeserializer(new Deserializer<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

import io.appform.ranger.core.finder.SimpleUnshardedServiceFinder;
import io.appform.ranger.core.finder.SimpleUnshardedServiceFinderBuilder;
import io.appform.ranger.core.model.Deserializer;
import io.appform.ranger.core.model.NodeDataSource;
import io.appform.ranger.core.model.Service;
import io.appform.ranger.core.model.ServiceNode;
import io.appform.ranger.core.model.*;
import io.appform.ranger.core.units.TestNodeData;
import lombok.Builder;

Expand All @@ -38,12 +35,22 @@ public SimpleUnshardedServiceFinder<TestNodeData> build() {
}

@Override
protected NodeDataSource<TestNodeData, Deserializer<TestNodeData>> dataSource(Service service) {
protected NodeDataSource<TestNodeData, Deserializer<TestNodeData>> dataSource(String upstreamId, Service service) {
return new TestDataSource();
}

static class TestDataSource implements NodeDataSource<TestNodeData, Deserializer<TestNodeData>>{

@Override
public String getUpstreamId() {
return "testDataSource";
}

@Override
public DataStoreType getDataStoreType() {
return DataStoreType.HTTP;
}

@Override
public Optional<List<ServiceNode<TestNodeData>>> refresh(Deserializer<TestNodeData> deserializer) {
return Optional.of(Collections.singletonList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public static RangerTestHub getTestHub(){
.nodeRefreshTimeMs(1000)
.initialCriteria(new TestCriteria())
.deserializer(new TestDeserializer<>())
.upstreamId("test-metric")
.build();
}

Expand All @@ -51,6 +52,7 @@ public static RangerTestHub getTestHubWithDataSource(){
.useDefaultDataSource(false)
.serviceDataSource(new StaticDataSource(Set.of(RangerHubTestUtils.service)))
.deserializer(new TestDeserializer<>())
.upstreamId("test-metric")
.build();
}

Expand Down
5 changes: 5 additions & 0 deletions ranger-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<artifactId>jakarta.validation-api</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>io.dropwizard</groupId>
<artifactId>dropwizard-metrics</artifactId>
<version>${dropwizard.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class BaseServiceFinderBuilder
B extends BaseServiceFinderBuilder<T, R, F, B, D>,
D extends Deserializer<T>> {

protected String upstreamId;
protected String namespace;
protected String serviceName;
protected int nodeRefreshIntervalMs;
Expand All @@ -58,6 +59,11 @@ public abstract class BaseServiceFinderBuilder
protected final List<Consumer<Void>> startSignalHandlers = new ArrayList<>();
protected final List<Consumer<Void>> stopSignalHandlers = new ArrayList<>();

public B withUpstreamId(final String upstreamId) {
this.upstreamId = upstreamId;
return (B)this;
}

public B withNamespace(final String namespace) {
this.namespace = namespace;
return (B)this;
Expand Down Expand Up @@ -136,6 +142,7 @@ public B withStopSignalHandlers(List<Consumer<Void>> stopSignalHandlers) {
public abstract F build();

protected F buildFinder() {
requireNonNull(upstreamId);
requireNonNull(namespace);
requireNonNull(serviceName);
requireNonNull(deserializer);
Expand All @@ -149,7 +156,7 @@ protected F buildFinder() {
val finder = buildFinder(service, shardSelector, nodeSelector);
val registry = finder.getServiceRegistry();
val signalGenerators = new ArrayList<Signal<T>>();
val nodeDataSource = dataSource(service);
val nodeDataSource = dataSource(upstreamId, service);

signalGenerators.add(new ScheduledRegistryUpdateSignal<>(service, nodeRefreshIntervalMs));
additionalRefreshSignals.addAll(implementationSpecificRefreshSignals(service, nodeDataSource));
Expand Down Expand Up @@ -178,7 +185,7 @@ protected List<Signal<T>> implementationSpecificRefreshSignals(Service service,
return Collections.emptyList();
}

protected abstract NodeDataSource<T, D> dataSource(Service service);
protected abstract NodeDataSource<T, D> dataSource(String upstreamId, Service service);

protected abstract F buildFinder(
Service service,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import io.appform.ranger.core.healthcheck.HealthcheckStatus;
import io.appform.ranger.core.model.Deserializer;
import io.appform.ranger.core.model.NodeDataSource;
import io.appform.ranger.core.model.ServiceNode;
import io.appform.ranger.core.model.ServiceRegistry;
import io.appform.ranger.core.signals.Signal;
import io.appform.ranger.core.util.Exceptions;
import io.appform.ranger.core.util.FinderUtils;
import io.appform.ranger.core.util.MetricRecorder;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
Expand Down Expand Up @@ -139,17 +141,24 @@ private void updateRegistry() throws InterruptedException {
log.debug("Checking for updates on data source for service: {}",
serviceRegistry.getService().getServiceName());
var callFailed = false;
if (nodeDataSource.isActive()) { //Source should implement circuit breaker to fail fast and reopen after some
// time
if (nodeDataSource.isActive()) { //Source should implement circuit breaker to fail fast and reopen after some time
val stopwatch = Stopwatch.createStarted();
try {
val nodeList = nodeDataSource.refresh(deserializer).orElse(null);
if (null != nodeList) {
MetricRecorder.recordNodesFetchedCount(serviceRegistry.getService().getServiceName(),
nodeDataSource.getDataStoreType(), nodeDataSource.getUpstreamId(), nodeList.size());
log.debug("Updating nodeList of size: {} for [{}]", nodeList.size(),
serviceRegistry.getService().getServiceName());
val livenessCheckMaxAge = nodeDataSource.healthcheckZombieCheckThresholdTime(serviceRegistry.getService());
//Remove all stale nodes before updating. This is done centrally to ensure some data sources
//don't skip this check. Some control is still provided so that they can overload.
serviceRegistry.updateNodes(FinderUtils.filterValidNodes(serviceRegistry.getService(), nodeList, livenessCheckMaxAge));
List<ServiceNode<T>> validNodes = FinderUtils.filterValidNodes(serviceRegistry.getService(), nodeList, livenessCheckMaxAge);
MetricRecorder.recordServiceRegistryUpdateNodeCount(serviceRegistry.getService().getServiceName(),
nodeDataSource.getDataStoreType(), nodeDataSource.getUpstreamId(), validNodes.size());
serviceRegistry.updateNodes(validNodes);
MetricRecorder.recordNodeDataRefreshSuccess(nodeDataSource.getDataStoreType(), nodeDataSource.getUpstreamId(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
else {
log.warn("Empty list returned from node data source. We are in a weird state. Keeping old list for {}",
Expand All @@ -161,18 +170,26 @@ private void updateRegistry() throws InterruptedException {
e.getClass().getSimpleName(),
e.getMessage());
callFailed = true;
MetricRecorder.recordNodeDataRefreshFailure(serviceRegistry.getService().getServiceName(),
nodeDataSource.getDataStoreType(), nodeDataSource.getUpstreamId(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
} finally {
stopwatch.stop();
}
}
if (!nodeDataSource.isActive() || callFailed) {
val currTime = System.currentTimeMillis();
log.warn("Node data source seems to be down. Keeping old list for {}." +
" Will update timestamp to keep stale date relevant.",
serviceRegistry.getService().getServiceName());
serviceRegistry.updateNodes(serviceRegistry.nodeList()
.stream()
.filter(node -> HealthcheckStatus.healthy == node.getHealthcheckStatus())
.map(node -> node.setLastUpdatedTimeStamp(currTime))
.toList());
val retainedNodes = serviceRegistry.nodeList()
.stream()
.filter(node -> HealthcheckStatus.healthy == node.getHealthcheckStatus())
.map(node -> node.setLastUpdatedTimeStamp(currTime))
.toList();
serviceRegistry.updateNodes(retainedNodes);
MetricRecorder.recordStaleDataRetained(serviceRegistry.getService().getServiceName(),
nodeDataSource.getDataStoreType(), nodeDataSource.getUpstreamId(), retainedNodes.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.appform.ranger.core.healthcheck;

import io.appform.ranger.core.util.MetricRecorder;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

Expand Down Expand Up @@ -57,11 +58,13 @@ private boolean refreshHealth() {
catch (Exception e) {
log.error("Error running healthcheck. Setting node to unhealthy", e);
healthcheckStatus = HealthcheckStatus.unhealthy;
MetricRecorder.recordHealthcheckFailure();
}
if (HealthcheckStatus.unhealthy == healthcheckStatus) {
break;
}
}
MetricRecorder.recordHealthcheckStatus(HealthcheckStatus.healthy == healthcheckStatus);
//Trigger update only if state change has happened
//Conditions on which update will be triggered
//1. First time
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2024 Authors, Flipkart Internet Pvt. Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.appform.ranger.core.model;

public enum DataStoreType {
ZK,
HTTP,
DROVE,
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,10 @@
*
*/
public interface NodeDataSink<T, S extends Serializer<T>> extends NodeDataStoreConnector<T> {

DataStoreType getDataStoreType();

String getUpstreamId();

void updateState(S serializer, ServiceNode<T> serviceNode);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
@SuppressWarnings("unused")
public interface NodeDataSource<T, D extends Deserializer<T>> extends NodeDataStoreConnector<T> {

String getUpstreamId();

DataStoreType getDataStoreType();

Optional<List<ServiceNode<T>>> refresh(D deserializer) throws CommunicationException;

default long healthcheckZombieCheckThresholdTime(Service service) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static java.util.Objects.requireNonNull;

Expand All @@ -51,6 +49,7 @@
@SuppressWarnings({"unchecked", "unused", "UnusedReturnValue"})
public abstract class BaseServiceProviderBuilder<T, B extends BaseServiceProviderBuilder<T, B, S>, S extends Serializer<T>> {

protected String upstreamId;
protected String namespace;
protected String serviceName;
protected S serializer;
Expand All @@ -70,6 +69,11 @@ public abstract class BaseServiceProviderBuilder<T, B extends BaseServiceProvide
/* list of isolated monitors */
private final List<IsolatedHealthMonitor<HealthcheckStatus>> isolatedMonitors = new ArrayList<>();

public B withUpstreamId(final String upstreamId) {
this.upstreamId = upstreamId;
return (B)this;
}

public BaseServiceProviderBuilder<T, B, S> withNamespace(final String namespace) {
this.namespace = namespace;
return this;
Expand Down Expand Up @@ -175,10 +179,11 @@ public B healthUpdateHandler(final HealthUpdateHandler<T> healthUpdateHandler) {
}

protected final ServiceProvider<T, S> buildProvider() {
Preconditions.checkNotNull(namespace);
Preconditions.checkNotNull(serviceName);
Preconditions.checkNotNull(serializer);
Preconditions.checkNotNull(hostname);
requireNonNull(upstreamId);
requireNonNull(namespace);
requireNonNull(serviceName);
requireNonNull(serializer);
requireNonNull(hostname);
Preconditions.checkNotNull(healthUpdateHandler);
Preconditions.checkArgument(port > 0);
Preconditions.checkArgument(!healthchecks.isEmpty() || !isolatedMonitors.isEmpty());
Expand All @@ -200,7 +205,7 @@ protected final ServiceProvider<T, S> buildProvider() {

healthchecks.add(serviceHealthAggregator);
val service = Service.builder().namespace(namespace).serviceName(serviceName).build();
val usableNodeDataSource = dataSink(service);
val usableNodeDataSource = dataSink(upstreamId, service);

val healthcheckUpdateSignalGenerator
= new ScheduledSignal<>(
Expand Down Expand Up @@ -246,5 +251,5 @@ protected final ServiceProvider<T, S> buildProvider() {

public abstract ServiceProvider<T,S> build();

protected abstract NodeDataSink<T,S> dataSink(final Service service);
protected abstract NodeDataSink<T,S> dataSink(String upstreamId, final Service service);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static <T> boolean isValidNode(
return false;
}
if(serviceNode.getLastUpdatedTimeStamp() < healthcheckZombieCheckThresholdTime) {
MetricRecorder.recordZombieNodeFound(service.getServiceName());
log.warn("Zombie node [{}:{}] found for [{}]",
serviceNode.getHost(), serviceNode.getPort(), service.getServiceName());
return false;
Expand Down
Loading
Loading