diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 7dcaccc37..7e55f0bbf 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -962,6 +962,11 @@ private void handleLeaderDoAssignment() { // get all current live instances liveInstances = _adapter.getLiveInstances(); + if (liveInstances.isEmpty()) { + _log.info("handleLeaderDoAssignment: empty list of live instances, maybe this node is no longer a leader."); + return; + } + // Map between instance to tasks assigned to the instance. previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks(); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index cd8ced1d5..36f903b2a 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -20,14 +20,17 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.exception.ZkException; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +129,7 @@ public class ZkAdapter { private final Random randomGenerator = new Random(); private final ZkLeaderElectionListener _leaderElectionListener = new ZkLeaderElectionListener(); + private final ZkStateChangeListener _stateChangeListener = new ZkStateChangeListener(); private ZkBackedTaskListProvider _assignmentList = null; // only the leader should maintain this list and listen to the changes of live instances @@ -133,6 +137,8 @@ public class ZkAdapter { private ZkBackedLiveInstanceListProvider _liveInstancesProvider = null; private ZkTargetAssignmentProvider _targetAssignmentProvider = null; + private ReentrantLock _liveInstancesProviderMutex = new ReentrantLock(); + // Cache all live DatastreamTasks per instance for assignment strategy private Map> _liveTaskMap = new HashMap<>(); @@ -201,9 +207,15 @@ public void disconnect() { * the actions that need to be taken with them, which are implemented in the Coordinator class */ public void connect() { - disconnect(); // Guard against leaking an existing zookeeper session _zkclient = new ZkClient(_zkServers, _sessionTimeout, _connectionTimeout); + _zkclient.subscribeStateChanges(_stateChangeListener); + initializeSession(); + } + /** + * Initializes the session by creating the necessary znodes + */ + private void initializeSession() { // create a globally unique instance name and create a live instance node in ZooKeeper _instanceName = createLiveInstanceNode(); @@ -247,9 +259,14 @@ private void onBecomeFollower() { _datastreamList = null; } - if (_liveInstancesProvider != null) { - _liveInstancesProvider.close(); - _liveInstancesProvider = null; + _liveInstancesProviderMutex.lock(); + try { + if (_liveInstancesProvider != null) { + _liveInstancesProvider.close(); + _liveInstancesProvider = null; + } + } finally { + _liveInstancesProviderMutex.unlock(); } if (_targetAssignmentProvider != null) { @@ -299,8 +316,7 @@ private void joinLeaderElection() { if (index < 0) { // only when the ZooKeeper session already expired by the time this adapter joins for leader election. // mostly because the zkclient session expiration timeout. - LOG.error("Failed to join leader election. Try reconnect the zookeeper"); - connect(); + LOG.error("Failed to join leader election. Let the state change handler handle the reconnect."); return; } @@ -453,7 +469,12 @@ public void touchAllInstanceAssignments() { * Get all live instances in the cluster */ public List getLiveInstances() { - return _liveInstancesProvider.getLiveInstances(); + _liveInstancesProviderMutex.lock(); + try { + return _liveInstancesProvider == null ? new ArrayList<>() : _liveInstancesProvider.getLiveInstances(); + } finally { + _liveInstancesProviderMutex.unlock(); + } } /** @@ -1252,6 +1273,33 @@ public void handleDataDeleted(String dataPath) throws Exception { } } + /** + * Listener for ZooKeeper state changes. + */ + public class ZkStateChangeListener implements IZkStateListener { + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) { + if (state == Watcher.Event.KeeperState.Expired) { + LOG.info("ZkStateChangeListener::Session expired."); + if (_assignmentList != null) { + _assignmentList.close(); + _assignmentList = null; + } + } + } + + @Override + public void handleNewSession() { + LOG.info("ZkStateChangeListener::A new session established after the earlier session was expired."); + initializeSession(); + } + + @Override + public void handleSessionEstablishmentError(final Throwable error) { + LOG.warn("ZkStateChangeListener::Failed to establish session.", error); + } + } + /** * ZkBackedTaskListProvider provides information about all DatastreamTasks existing in the cluster * for a given instance. In addition, it notifies the listener about changes that happened