diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java index d0a938e7022a..cd911ed99811 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java @@ -211,7 +211,7 @@ public DateTime getBlacklistedUntil() public boolean isValidVersion(String minVersion) { - return worker.getVersion().compareTo(minVersion) >= 0; + return !worker.isDisabled() && worker.getVersion().compareTo(minVersion) >= 0; } public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java index dadb557e84c8..21ed55115418 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java @@ -205,7 +205,8 @@ public boolean isRunningTask(String taskId) @UsedInGeneratedCode // See JavaScriptWorkerSelectStrategyTest public boolean isValidVersion(String minVersion) { - return worker.get().getVersion().compareTo(minVersion) >= 0; + final Worker w = worker.get(); + return !w.isDisabled() && w.getVersion().compareTo(minVersion) >= 0; } public void setWorker(Worker newWorker) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index dc219d4f8b7f..74721fde6154 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -65,8 +65,11 @@ public class WorkerHolder public static final TypeReference> WORKER_SYNC_RESP_TYPE_REF = new TypeReference<>() {}; - private final Worker worker; - private Worker disabledWorker; + /** + * Pre-built views with isDisabled() set to false/true respectively. {@link #getWorker()} selects between them based on {@link #disabled}. + */ + private final Worker enabledWorker; + private final Worker disabledWorker; protected final AtomicBoolean disabled; private final AtomicBoolean syncedAtleastOnce = new AtomicBoolean(false); @@ -100,15 +103,16 @@ public WorkerHolder( this.httpClient = httpClient; this.config = config; this.listener = listener; - this.worker = worker; - //worker holder is created disabled and gets enabled after first sync success. + this.enabledWorker = workerWithDisabledState(worker, false); + this.disabledWorker = workerWithDisabledState(worker, true); + // WorkerHolder starts disabled and gets enabled after first successful sync. this.disabled = new AtomicBoolean(true); this.syncer = new ChangeRequestHttpSyncer<>( smileMapper, httpClient, workersSyncExec, - TaskRunnerUtils.makeWorkerURL(worker, "/"), + TaskRunnerUtils.makeWorkerURL(enabledWorker, "/"), "/druid-internal/v1/worker", WORKER_SYNC_RESP_TYPE_REF, config.getSyncRequestTimeout().toStandardDuration().getMillis(), @@ -125,7 +129,7 @@ public WorkerHolder( public Worker getWorker() { - return worker; + return disabled.get() ? disabledWorker : enabledWorker; } public DateTime getBlacklistedUntil() @@ -145,23 +149,8 @@ public void setBlacklistedUntil(DateTime blacklistedUntil) public ImmutableWorkerInfo toImmutable() { - Worker w = worker; - if (disabled.get()) { - if (disabledWorker == null) { - disabledWorker = new Worker( - worker.getScheme(), - worker.getHost(), - worker.getIp(), - worker.getCapacity(), - "", - worker.getCategory() - ); - } - w = disabledWorker; - } - return ImmutableWorkerInfo.fromWorkerAnnouncements( - w, + getWorker(), tasksSnapshotRef.get(), lastCompletedTaskTime.get(), blacklistedUntil.get() @@ -189,12 +178,12 @@ public boolean assignTask(Task task) log.info( "Received task[%s] assignment on worker[%s] when worker is disabled.", task.getId(), - worker.getHost() + enabledWorker.getHost() ); return false; } - URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask"); + URL url = TaskRunnerUtils.makeWorkerURL(enabledWorker, "/druid-internal/v1/worker/assignTask"); int numTries = config.getAssignRequestMaxRetries(); try { @@ -215,7 +204,7 @@ public boolean assignTask(Task task) throw new RE( "Failed to assign task[%s] to worker[%s]. Response Code[%s] and Message[%s]. Retrying...", task.getId(), - worker.getHost(), + enabledWorker.getHost(), response.getStatus().getCode(), response.getContent() ); @@ -226,7 +215,7 @@ public boolean assignTask(Task task) ex, "Request to assign task[%s] to worker[%s] failed. Retrying...", task.getId(), - worker.getHost() + enabledWorker.getHost() ); } }, @@ -235,14 +224,14 @@ public boolean assignTask(Task task) ); } catch (Exception ex) { - log.info("Not sure whether task[%s] was successfully assigned to worker[%s].", task.getId(), worker.getHost()); + log.info("Not sure whether task[%s] was successfully assigned to worker[%s].", task.getId(), enabledWorker.getHost()); return true; } } public void shutdownTask(String taskId) { - final URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/shutdown", taskId); + final URL url = TaskRunnerUtils.makeWorkerURL(enabledWorker, "/druid/worker/v1/task/%s/shutdown", taskId); try { RetryUtils.retry( @@ -257,17 +246,17 @@ public void shutdownTask(String taskId) if (response.getStatus().getCode() == 200) { log.info( "Sent shutdown message to worker: %s, status %s, response: %s", - worker.getHost(), + enabledWorker.getHost(), response.getStatus(), response.getContent() ); return null; } else { - throw new RE("Attempt to shutdown task[%s] on worker[%s] failed.", taskId, worker.getHost()); + throw new RE("Attempt to shutdown task[%s] on worker[%s] failed.", taskId, enabledWorker.getHost()); } } catch (ExecutionException e) { - throw new RE(e, "Error in handling post to [%s] for task [%s]", worker.getHost(), taskId); + throw new RE(e, "Error in handling post to [%s] for task [%s]", enabledWorker.getHost(), taskId); } }, e -> !(e instanceof InterruptedException), @@ -279,7 +268,7 @@ public void shutdownTask(String taskId) Thread.currentThread().interrupt(); } - log.error("Failed to shutdown task[%s] on worker[%s] failed.", taskId, worker.getHost()); + log.error("Failed to shutdown task[%s] on worker[%s] failed.", taskId, enabledWorker.getHost()); } } @@ -296,7 +285,7 @@ public void stop() public void waitForInitialization() throws InterruptedException { if (!syncer.awaitInitialization()) { - throw new RE("Failed to sync with worker[%s].", worker.getHost()); + throw new RE("Failed to sync with worker[%s].", enabledWorker.getHost()); } } @@ -347,7 +336,7 @@ public void fullSync(List changes) log.makeAlert( "Got unknown sync update[%s] from worker[%s]. Ignored.", change.getClass().getName(), - worker.getHost() + enabledWorker.getHost() ).emit(); } } @@ -359,7 +348,7 @@ public void fullSync(List changes) "task[%s] in state[%s] suddenly disappeared on worker[%s]. failing it.", announcement.getTaskId(), announcement.getStatus(), - worker.getHost() + enabledWorker.getHost() ); delta.add( TaskAnnouncement.create( @@ -402,7 +391,7 @@ public void deltaSync(List changes) "task[%s] in state[%s] suddenly disappeared on worker[%s]. failing it.", announcement.getTaskId(), announcement.getStatus(), - worker.getHost() + enabledWorker.getHost() ); delta.add( TaskAnnouncement.create( @@ -425,7 +414,7 @@ public void deltaSync(List changes) log.makeAlert( "Got unknown sync update[%s] from worker[%s]. Ignored.", change.getClass().getName(), - worker.getHost() + enabledWorker.getHost() ).emit(); } } @@ -444,7 +433,7 @@ private void notifyListener(List announcements, boolean isWork ex, "Unknown exception while updating task[%s] state from worker[%s].", announcement.getTaskId(), - worker.getHost() + enabledWorker.getHost() ); } } @@ -452,13 +441,21 @@ private void notifyListener(List announcements, boolean isWork syncedAtleastOnce.set(true); if (isWorkerDisabled != disabled.get()) { disabled.set(isWorkerDisabled); - log.info("Worker[%s] disabled set to [%s].", worker.getHost(), isWorkerDisabled); + log.info("Worker[%s] disabled set to [%s].", enabledWorker.getHost(), isWorkerDisabled); listener.stateChanged(!isWorkerDisabled, WorkerHolder.this); } } }; } + private static Worker workerWithDisabledState(Worker w, boolean disabled) + { + if (w.isDisabled() == disabled) { + return w; + } + return new Worker(w.getScheme(), w.getHost(), w.getIp(), w.getCapacity(), w.getVersion(), w.getCategory(), disabled); + } + public interface Listener { void taskAddedOrUpdated(TaskAnnouncement announcement, WorkerHolder workerHolder); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java index 2394a464e5c2..405dd452e35a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/Worker.java @@ -21,8 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.common.config.Configs; import org.apache.druid.guice.annotations.PublicApi; +import javax.annotation.Nullable; + /** * A container for worker metadata. */ @@ -35,6 +38,7 @@ public class Worker private final int capacity; private final String version; private final String category; + private final boolean disabled; @JsonCreator public Worker( @@ -43,7 +47,8 @@ public Worker( @JsonProperty("ip") String ip, @JsonProperty("capacity") int capacity, @JsonProperty("version") String version, - @JsonProperty("category") String category + @JsonProperty("category") String category, + @JsonProperty("disabled") @Nullable Boolean disabled ) { this.scheme = scheme == null ? "http" : scheme; // needed for backwards compatibility with older workers (pre-#4270) @@ -52,6 +57,19 @@ public Worker( this.capacity = capacity; this.version = version; this.category = category; + this.disabled = Configs.valueOrDefault(disabled, false); + } + + public Worker( + String scheme, + String host, + String ip, + int capacity, + String version, + String category + ) + { + this(scheme, host, ip, capacity, version, category, false); } @JsonProperty @@ -90,6 +108,12 @@ public String getCategory() return category; } + @JsonProperty + public boolean isDisabled() + { + return disabled; + } + @Override public boolean equals(Object o) { @@ -105,6 +129,9 @@ public boolean equals(Object o) if (capacity != worker.capacity) { return false; } + if (disabled != worker.disabled) { + return false; + } if (!scheme.equals(worker.scheme)) { return false; } @@ -129,6 +156,7 @@ public int hashCode() result = 31 * result + capacity; result = 31 * result + version.hashCode(); result = 31 * result + category.hashCode(); + result = 31 * result + (disabled ? 1 : 0); return result; } @@ -142,6 +170,7 @@ public String toString() ", capacity=" + capacity + ", version='" + version + '\'' + ", category='" + category + '\'' + + ", disabled=" + disabled + '}'; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java index 42d145edb2e9..528c230857e4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/WorkerResource.java @@ -100,13 +100,16 @@ public Response doDisable() { try { if (curatorCoordinator != null) { + // Dual-write disabled signal: legacy version="" for old overlords + disabled=true for new overlords. + // TODO: Safe to drop DISABLED_VERSION once backward compatibility with overlords is no longer required. final Worker disabledWorker = new Worker( enabledWorker.getScheme(), enabledWorker.getHost(), enabledWorker.getIp(), enabledWorker.getCapacity(), DISABLED_VERSION, - enabledWorker.getCategory() + enabledWorker.getCategory(), + true ); curatorCoordinator.updateWorkerAnnouncement(disabledWorker); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index a36176a04e48..d0c2b29f7e65 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -560,7 +560,7 @@ public void testWorkerDisabled() throws Exception Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode()); // Confirm RTR thinks the worker is disabled. - Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion()); + Assert.assertTrue(Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().isDisabled()); } @Test diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index 7c579f196763..af33b6fc9196 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -186,8 +186,9 @@ void disableWorker(Worker worker) throws Exception worker.getHost(), worker.getIp(), worker.getCapacity(), - "", - worker.getCategory() + worker.getVersion(), + worker.getCategory(), + true )) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java index d9ea67f0ce99..7bd3be3e535b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolderTest.java @@ -90,7 +90,7 @@ public void stateChanged(boolean enabled, WorkerHolder workerHolder) ChangeRequestHttpSyncer.Listener syncListener = workerHolder.createSyncListener(); - Assert.assertTrue(workerHolder.disabled.get()); + Assert.assertFalse(workerHolder.isEnabled()); Assert.assertFalse(workerHolder.isInitialized()); syncListener.fullSync( @@ -114,7 +114,7 @@ public void stateChanged(boolean enabled, WorkerHolder workerHolder) ) ); - Assert.assertFalse(workerHolder.disabled.get()); + Assert.assertTrue(workerHolder.isEnabled()); Assert.assertEquals(4, updates.size()); @@ -153,7 +153,7 @@ public void stateChanged(boolean enabled, WorkerHolder workerHolder) ) ); - Assert.assertFalse(workerHolder.disabled.get()); + Assert.assertTrue(workerHolder.isEnabled()); Assert.assertEquals(2, updates.size()); Assert.assertEquals(task2.getId(), updates.get(0).getTaskId()); @@ -191,7 +191,7 @@ public void stateChanged(boolean enabled, WorkerHolder workerHolder) ) ); - Assert.assertTrue(workerHolder.disabled.get()); + Assert.assertFalse(workerHolder.isEnabled()); Assert.assertEquals(3, updates.size()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java index 582806416457..05b1ab36f497 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/setup/EqualDistributionWorkerSelectStrategyTest.java @@ -125,7 +125,6 @@ public void testFindWorkerForTaskWhenSameCurrCapacityUsed() @Test public void testOneDisableWorkerDifferentUsedCapacity() { - String disabledVersion = ""; final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(null, null); ImmutableWorkerInfo worker = strategy.findWorkerForTask( @@ -133,7 +132,7 @@ public void testOneDisableWorkerDifferentUsedCapacity() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "disableHost", "disableHost", 10, disabledVersion, WorkerConfig.DEFAULT_CATEGORY), 2, + new Worker("http", "disableHost", "disableHost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY, true), 2, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() @@ -154,7 +153,6 @@ public void testOneDisableWorkerDifferentUsedCapacity() @Test public void testOneDisableWorkerSameUsedCapacity() { - String disabledVersion = ""; final EqualDistributionWorkerSelectStrategy strategy = new EqualDistributionWorkerSelectStrategy(null, null); ImmutableWorkerInfo worker = strategy.findWorkerForTask( @@ -162,7 +160,7 @@ public void testOneDisableWorkerSameUsedCapacity() ImmutableMap.of( "lhost", new ImmutableWorkerInfo( - new Worker("http", "disableHost", "disableHost", 10, disabledVersion, WorkerConfig.DEFAULT_CATEGORY), 5, + new Worker("http", "disableHost", "disableHost", 10, "v1", WorkerConfig.DEFAULT_CATEGORY, true), 5, new HashSet<>(), new HashSet<>(), DateTimes.nowUtc() diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java index df8e34ab1111..096688e5eb35 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/worker/http/WorkerResourceTest.java @@ -124,12 +124,14 @@ public void testDoDisable() throws Exception { Worker theWorker = JSON_MAPPER.readValue(cf.getData().forPath(ANNOUNCEMENT_PATH), Worker.class); Assert.assertEquals("v1", theWorker.getVersion()); + Assert.assertFalse(theWorker.isDisabled()); Response res = workerResource.doDisable(); Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus()); theWorker = JSON_MAPPER.readValue(cf.getData().forPath(ANNOUNCEMENT_PATH), Worker.class); Assert.assertTrue(theWorker.getVersion().isEmpty()); + Assert.assertTrue(theWorker.isDisabled()); } @Test @@ -140,11 +142,13 @@ public void testDoEnable() throws Exception Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus()); Worker theWorker = JSON_MAPPER.readValue(cf.getData().forPath(ANNOUNCEMENT_PATH), Worker.class); Assert.assertTrue(theWorker.getVersion().isEmpty()); + Assert.assertTrue(theWorker.isDisabled()); // Enable the worker res = workerResource.doEnable(); Assert.assertEquals(Response.Status.OK.getStatusCode(), res.getStatus()); theWorker = JSON_MAPPER.readValue(cf.getData().forPath(ANNOUNCEMENT_PATH), Worker.class); Assert.assertEquals("v1", theWorker.getVersion()); + Assert.assertFalse(theWorker.isDisabled()); } } diff --git a/web-console/src/views/services-view/services-view.tsx b/web-console/src/views/services-view/services-view.tsx index a6e41769f6f8..4b080e587335 100644 --- a/web-console/src/views/services-view/services-view.tsx +++ b/web-console/src/views/services-view/services-view.tsx @@ -261,7 +261,7 @@ function DetailCell({ original, workerInfoLookup }: DetailCellProps) { const workerInfo = workerInfoLookup[service]; if (!workerInfo) return null; - if (workerInfo.worker.version === '') return <>Disabled; + if (workerInfo.worker.disabled || workerInfo.worker.version === '') return <>Disabled; const details: string[] = []; if (workerInfo.lastCompletedTaskTime) { @@ -397,6 +397,7 @@ interface WorkerInfo { readonly scheme: string; readonly version: string; readonly category: string; + readonly disabled?: boolean; }; } @@ -1077,10 +1078,9 @@ ORDER BY ): BasicAction[] { const actions: BasicAction[] = []; - // Add worker-specific actions (enable/disable) if this is a worker if (workerInfo) { const { worker } = workerInfo; - const disabled = worker.version === ''; + const disabled = worker.disabled || worker.version === ''; if (disabled) { actions.push({