From c91319d6d5a4bb3276308acdd3cfbf0e52aafce4 Mon Sep 17 00:00:00 2001 From: sdomalap Date: Fri, 24 Jan 2020 14:38:18 -0500 Subject: [PATCH 1/3] Handle the session re-establishment properly and avoid session leaks --- .../datastream/server/Coordinator.java | 5 ++ .../datastream/server/zk/ZkAdapter.java | 53 ++++++++++++++++--- 2 files changed, 52 insertions(+), 6 deletions(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 7dcaccc37..7e55f0bbf 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -962,6 +962,11 @@ private void handleLeaderDoAssignment() { // get all current live instances liveInstances = _adapter.getLiveInstances(); + if (liveInstances.isEmpty()) { + _log.info("handleLeaderDoAssignment: empty list of live instances, maybe this node is no longer a leader."); + return; + } + // Map between instance to tasks assigned to the instance. previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks(); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index cd8ced1d5..d510dc20a 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -20,14 +20,17 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import java.util.stream.Collectors; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; +import org.I0Itec.zkclient.IZkStateListener; import org.I0Itec.zkclient.exception.ZkException; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +129,7 @@ public class ZkAdapter { private final Random randomGenerator = new Random(); private final ZkLeaderElectionListener _leaderElectionListener = new ZkLeaderElectionListener(); + private final ZkStateChangeListener _stateChangeListener = new ZkStateChangeListener(); private ZkBackedTaskListProvider _assignmentList = null; // only the leader should maintain this list and listen to the changes of live instances @@ -133,6 +137,8 @@ public class ZkAdapter { private ZkBackedLiveInstanceListProvider _liveInstancesProvider = null; private ZkTargetAssignmentProvider _targetAssignmentProvider = null; + private ReentrantLock _liveInstancesProviderMutex = new ReentrantLock(); + // Cache all live DatastreamTasks per instance for assignment strategy private Map> _liveTaskMap = new HashMap<>(); @@ -203,7 +209,14 @@ public void disconnect() { public void connect() { disconnect(); // Guard against leaking an existing zookeeper session _zkclient = new ZkClient(_zkServers, _sessionTimeout, _connectionTimeout); + _zkclient.subscribeStateChanges(_stateChangeListener); + initializeSession(); + } + /** + * Initializes the session by creating the necessary znodes + */ + private void initializeSession() { // create a globally unique instance name and create a live instance node in ZooKeeper _instanceName = createLiveInstanceNode(); @@ -247,9 +260,14 @@ private void onBecomeFollower() { _datastreamList = null; } - if (_liveInstancesProvider != null) { - _liveInstancesProvider.close(); - _liveInstancesProvider = null; + _liveInstancesProviderMutex.lock(); + try { + if (_liveInstancesProvider != null) { + _liveInstancesProvider.close(); + _liveInstancesProvider = null; + } + } finally { + _liveInstancesProviderMutex.unlock(); } if (_targetAssignmentProvider != null) { @@ -299,8 +317,7 @@ private void joinLeaderElection() { if (index < 0) { // only when the ZooKeeper session already expired by the time this adapter joins for leader election. // mostly because the zkclient session expiration timeout. - LOG.error("Failed to join leader election. Try reconnect the zookeeper"); - connect(); + LOG.error("Failed to join leader election. Let the state change handler handle the reconnect."); return; } @@ -453,7 +470,12 @@ public void touchAllInstanceAssignments() { * Get all live instances in the cluster */ public List getLiveInstances() { - return _liveInstancesProvider.getLiveInstances(); + _liveInstancesProviderMutex.lock(); + try { + return _liveInstancesProvider == null ? new ArrayList<>() : _liveInstancesProvider.getLiveInstances(); + } finally { + _liveInstancesProviderMutex.unlock(); + } } /** @@ -1252,6 +1274,25 @@ public void handleDataDeleted(String dataPath) throws Exception { } } + /** + * Listener for ZooKeeper state changes. + */ + public class ZkStateChangeListener implements IZkStateListener { + @Override + public void handleStateChanged(Watcher.Event.KeeperState state) { + } + + @Override + public void handleNewSession() { + LOG.info("ZkStateChangeListener::A new session established after the earlier session was expired."); + initializeSession(); + } + + @Override + public void handleSessionEstablishmentError(final Throwable error) { + } + } + /** * ZkBackedTaskListProvider provides information about all DatastreamTasks existing in the cluster * for a given instance. In addition, it notifies the listener about changes that happened From 0a6e3fe85616c6573525e97534d72d3b1d3d752f Mon Sep 17 00:00:00 2001 From: sdomalap Date: Tue, 28 Jan 2020 11:24:15 -0500 Subject: [PATCH 2/3] Revert "Ensure any existing zookeeper sessions are closed before creating a new one (#671)" This reverts commit c3bd17b704e1df9317b96f58be100574107464f5. --- .../main/java/com/linkedin/datastream/server/zk/ZkAdapter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index d510dc20a..39ba37f81 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -207,7 +207,6 @@ public void disconnect() { * the actions that need to be taken with them, which are implemented in the Coordinator class */ public void connect() { - disconnect(); // Guard against leaking an existing zookeeper session _zkclient = new ZkClient(_zkServers, _sessionTimeout, _connectionTimeout); _zkclient.subscribeStateChanges(_stateChangeListener); initializeSession(); From 024a08fe9d67880c5f71dd109c0ba75f512a1043 Mon Sep 17 00:00:00 2001 From: sdomalap Date: Tue, 25 Feb 2020 12:40:49 -0500 Subject: [PATCH 3/3] Handle session expiry --- .../java/com/linkedin/datastream/server/zk/ZkAdapter.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index 39ba37f81..36f903b2a 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -1279,6 +1279,13 @@ public void handleDataDeleted(String dataPath) throws Exception { public class ZkStateChangeListener implements IZkStateListener { @Override public void handleStateChanged(Watcher.Event.KeeperState state) { + if (state == Watcher.Event.KeeperState.Expired) { + LOG.info("ZkStateChangeListener::Session expired."); + if (_assignmentList != null) { + _assignmentList.close(); + _assignmentList = null; + } + } } @Override @@ -1289,6 +1296,7 @@ public void handleNewSession() { @Override public void handleSessionEstablishmentError(final Throwable error) { + LOG.warn("ZkStateChangeListener::Failed to establish session.", error); } }