Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,11 @@ private void handleLeaderDoAssignment() {
// get all current live instances
liveInstances = _adapter.getLiveInstances();

if (liveInstances.isEmpty()) {
_log.info("handleLeaderDoAssignment: empty list of live instances, maybe this node is no longer a leader.");
return;
}

// Map between instance to tasks assigned to the instance.
previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,17 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.exception.ZkException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -126,13 +129,16 @@ public class ZkAdapter {
private final Random randomGenerator = new Random();

private final ZkLeaderElectionListener _leaderElectionListener = new ZkLeaderElectionListener();
private final ZkStateChangeListener _stateChangeListener = new ZkStateChangeListener();
private ZkBackedTaskListProvider _assignmentList = null;

// only the leader should maintain this list and listen to the changes of live instances
private ZkBackedDMSDatastreamList _datastreamList = null;
private ZkBackedLiveInstanceListProvider _liveInstancesProvider = null;
private ZkTargetAssignmentProvider _targetAssignmentProvider = null;

private ReentrantLock _liveInstancesProviderMutex = new ReentrantLock();

// Cache all live DatastreamTasks per instance for assignment strategy
private Map<String, Set<DatastreamTask>> _liveTaskMap = new HashMap<>();

Expand Down Expand Up @@ -201,9 +207,15 @@ public void disconnect() {
* the actions that need to be taken with them, which are implemented in the Coordinator class
*/
public void connect() {
disconnect(); // Guard against leaking an existing zookeeper session
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this will not be sufficient. On Zookeeper session expiry, the live instance ephemeral node will be deleted. So, the leader will become follower and both needs to creates new live instance nodes. Once the role will change, all the listener cleanup will be required which is happening in disconnect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @vmaheshw.

It seems like you are correct. I might be leaking subscriptions. I may not have experienced any issues as the subscriptions are to the nodes that would never get updated.

I will add the clean up logic as necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this line anyway? If we are connecting or reconnecting, then we should not have an existing session. The guard code defensively ensures that any existing connection is definitely closed.

_zkclient = new ZkClient(_zkServers, _sessionTimeout, _connectionTimeout);
_zkclient.subscribeStateChanges(_stateChangeListener);
initializeSession();
}

/**
* Initializes the session by creating the necessary znodes
*/
private void initializeSession() {
// create a globally unique instance name and create a live instance node in ZooKeeper
_instanceName = createLiveInstanceNode();

Expand Down Expand Up @@ -247,9 +259,14 @@ private void onBecomeFollower() {
_datastreamList = null;
}

if (_liveInstancesProvider != null) {
_liveInstancesProvider.close();
_liveInstancesProvider = null;
_liveInstancesProviderMutex.lock();
try {
if (_liveInstancesProvider != null) {
_liveInstancesProvider.close();
_liveInstancesProvider = null;
}
} finally {
_liveInstancesProviderMutex.unlock();
}

if (_targetAssignmentProvider != null) {
Expand Down Expand Up @@ -299,8 +316,7 @@ private void joinLeaderElection() {
if (index < 0) {
// only when the ZooKeeper session already expired by the time this adapter joins for leader election.
// mostly because the zkclient session expiration timeout.
LOG.error("Failed to join leader election. Try reconnect the zookeeper");
connect();
LOG.error("Failed to join leader election. Let the state change handler handle the reconnect.");
return;
}

Expand Down Expand Up @@ -453,7 +469,12 @@ public void touchAllInstanceAssignments() {
* Get all live instances in the cluster
*/
public List<String> getLiveInstances() {
return _liveInstancesProvider.getLiveInstances();
_liveInstancesProviderMutex.lock();
try {
return _liveInstancesProvider == null ? new ArrayList<>() : _liveInstancesProvider.getLiveInstances();
} finally {
_liveInstancesProviderMutex.unlock();
}
}

/**
Expand Down Expand Up @@ -1252,6 +1273,33 @@ public void handleDataDeleted(String dataPath) throws Exception {
}
}

/**
* Listener for ZooKeeper state changes.
*/
public class ZkStateChangeListener implements IZkStateListener {
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) {
if (state == Watcher.Event.KeeperState.Expired) {
LOG.info("ZkStateChangeListener::Session expired.");
if (_assignmentList != null) {
_assignmentList.close();
_assignmentList = null;
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the situation, when the session never gets reestablished, handleNewSession will not get invoked. So, handleStateChanged with state EXPIRY needs to be handled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. Will post a new commit. Thanks


@Override
public void handleNewSession() {
LOG.info("ZkStateChangeListener::A new session established after the earlier session was expired.");
initializeSession();
}

@Override
public void handleSessionEstablishmentError(final Throwable error) {
LOG.warn("ZkStateChangeListener::Failed to establish session.", error);
}
}

/**
* ZkBackedTaskListProvider provides information about all DatastreamTasks existing in the cluster
* for a given instance. In addition, it notifies the listener about changes that happened
Expand Down