Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public DateTime getBlacklistedUntil()

public boolean isValidVersion(String minVersion)
{
return worker.getVersion().compareTo(minVersion) >= 0;
return !worker.isDisabled() && worker.getVersion().compareTo(minVersion) >= 0;
}

public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ public boolean isRunningTask(String taskId)
@UsedInGeneratedCode // See JavaScriptWorkerSelectStrategyTest
public boolean isValidVersion(String minVersion)
{
return worker.get().getVersion().compareTo(minVersion) >= 0;
final Worker w = worker.get();
return !w.isDisabled() && w.getVersion().compareTo(minVersion) >= 0;
}

public void setWorker(Worker newWorker)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ public class WorkerHolder
public static final TypeReference<ChangeRequestsSnapshot<WorkerHistoryItem>> WORKER_SYNC_RESP_TYPE_REF = new TypeReference<>() {};


private final Worker worker;
private Worker disabledWorker;
/**
* Pre-built views with isDisabled() set to false/true respectively. {@link #getWorker()} selects between them based on {@link #disabled}.
*/
private final Worker enabledWorker;
private final Worker disabledWorker;

protected final AtomicBoolean disabled;
private final AtomicBoolean syncedAtleastOnce = new AtomicBoolean(false);
Expand Down Expand Up @@ -100,15 +103,16 @@ public WorkerHolder(
this.httpClient = httpClient;
this.config = config;
this.listener = listener;
this.worker = worker;
//worker holder is created disabled and gets enabled after first sync success.
this.enabledWorker = workerWithDisabledState(worker, false);
this.disabledWorker = workerWithDisabledState(worker, true);
// WorkerHolder starts disabled and gets enabled after first successful sync.
this.disabled = new AtomicBoolean(true);

this.syncer = new ChangeRequestHttpSyncer<>(
smileMapper,
httpClient,
workersSyncExec,
TaskRunnerUtils.makeWorkerURL(worker, "/"),
TaskRunnerUtils.makeWorkerURL(enabledWorker, "/"),
"/druid-internal/v1/worker",
WORKER_SYNC_RESP_TYPE_REF,
config.getSyncRequestTimeout().toStandardDuration().getMillis(),
Expand All @@ -125,7 +129,7 @@ public WorkerHolder(

public Worker getWorker()
{
return worker;
return disabled.get() ? disabledWorker : enabledWorker;
}

public DateTime getBlacklistedUntil()
Expand All @@ -145,23 +149,8 @@ public void setBlacklistedUntil(DateTime blacklistedUntil)

public ImmutableWorkerInfo toImmutable()
{
Worker w = worker;
if (disabled.get()) {
if (disabledWorker == null) {
disabledWorker = new Worker(
worker.getScheme(),
worker.getHost(),
worker.getIp(),
worker.getCapacity(),
"",
Comment thread
jtuglu1 marked this conversation as resolved.
worker.getCategory()
);
}
w = disabledWorker;
}

return ImmutableWorkerInfo.fromWorkerAnnouncements(
w,
getWorker(),
tasksSnapshotRef.get(),
lastCompletedTaskTime.get(),
blacklistedUntil.get()
Expand Down Expand Up @@ -189,12 +178,12 @@ public boolean assignTask(Task task)
log.info(
"Received task[%s] assignment on worker[%s] when worker is disabled.",
task.getId(),
worker.getHost()
enabledWorker.getHost()
);
return false;
}

URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid-internal/v1/worker/assignTask");
URL url = TaskRunnerUtils.makeWorkerURL(enabledWorker, "/druid-internal/v1/worker/assignTask");
int numTries = config.getAssignRequestMaxRetries();

try {
Expand All @@ -215,7 +204,7 @@ public boolean assignTask(Task task)
throw new RE(
"Failed to assign task[%s] to worker[%s]. Response Code[%s] and Message[%s]. Retrying...",
task.getId(),
worker.getHost(),
enabledWorker.getHost(),
response.getStatus().getCode(),
response.getContent()
);
Expand All @@ -226,7 +215,7 @@ public boolean assignTask(Task task)
ex,
"Request to assign task[%s] to worker[%s] failed. Retrying...",
task.getId(),
worker.getHost()
enabledWorker.getHost()
);
}
},
Expand All @@ -235,14 +224,14 @@ public boolean assignTask(Task task)
);
}
catch (Exception ex) {
log.info("Not sure whether task[%s] was successfully assigned to worker[%s].", task.getId(), worker.getHost());
log.info("Not sure whether task[%s] was successfully assigned to worker[%s].", task.getId(), enabledWorker.getHost());
return true;
}
}

public void shutdownTask(String taskId)
{
final URL url = TaskRunnerUtils.makeWorkerURL(worker, "/druid/worker/v1/task/%s/shutdown", taskId);
final URL url = TaskRunnerUtils.makeWorkerURL(enabledWorker, "/druid/worker/v1/task/%s/shutdown", taskId);

try {
RetryUtils.retry(
Expand All @@ -257,17 +246,17 @@ public void shutdownTask(String taskId)
if (response.getStatus().getCode() == 200) {
log.info(
"Sent shutdown message to worker: %s, status %s, response: %s",
worker.getHost(),
enabledWorker.getHost(),
response.getStatus(),
response.getContent()
);
return null;
} else {
throw new RE("Attempt to shutdown task[%s] on worker[%s] failed.", taskId, worker.getHost());
throw new RE("Attempt to shutdown task[%s] on worker[%s] failed.", taskId, enabledWorker.getHost());
}
}
catch (ExecutionException e) {
throw new RE(e, "Error in handling post to [%s] for task [%s]", worker.getHost(), taskId);
throw new RE(e, "Error in handling post to [%s] for task [%s]", enabledWorker.getHost(), taskId);
}
},
e -> !(e instanceof InterruptedException),
Expand All @@ -279,7 +268,7 @@ public void shutdownTask(String taskId)
Thread.currentThread().interrupt();
}

log.error("Failed to shutdown task[%s] on worker[%s] failed.", taskId, worker.getHost());
log.error("Failed to shutdown task[%s] on worker[%s] failed.", taskId, enabledWorker.getHost());
}
}

Expand All @@ -296,7 +285,7 @@ public void stop()
public void waitForInitialization() throws InterruptedException
{
if (!syncer.awaitInitialization()) {
throw new RE("Failed to sync with worker[%s].", worker.getHost());
throw new RE("Failed to sync with worker[%s].", enabledWorker.getHost());
}
}

Expand Down Expand Up @@ -347,7 +336,7 @@ public void fullSync(List<WorkerHistoryItem> changes)
log.makeAlert(
"Got unknown sync update[%s] from worker[%s]. Ignored.",
change.getClass().getName(),
worker.getHost()
enabledWorker.getHost()
).emit();
}
}
Expand All @@ -359,7 +348,7 @@ public void fullSync(List<WorkerHistoryItem> changes)
"task[%s] in state[%s] suddenly disappeared on worker[%s]. failing it.",
announcement.getTaskId(),
announcement.getStatus(),
worker.getHost()
enabledWorker.getHost()
);
delta.add(
TaskAnnouncement.create(
Expand Down Expand Up @@ -402,7 +391,7 @@ public void deltaSync(List<WorkerHistoryItem> changes)
"task[%s] in state[%s] suddenly disappeared on worker[%s]. failing it.",
announcement.getTaskId(),
announcement.getStatus(),
worker.getHost()
enabledWorker.getHost()
);
delta.add(
TaskAnnouncement.create(
Expand All @@ -425,7 +414,7 @@ public void deltaSync(List<WorkerHistoryItem> changes)
log.makeAlert(
"Got unknown sync update[%s] from worker[%s]. Ignored.",
change.getClass().getName(),
worker.getHost()
enabledWorker.getHost()
).emit();
}
}
Expand All @@ -444,21 +433,29 @@ private void notifyListener(List<TaskAnnouncement> announcements, boolean isWork
ex,
"Unknown exception while updating task[%s] state from worker[%s].",
announcement.getTaskId(),
worker.getHost()
enabledWorker.getHost()
);
}
}

syncedAtleastOnce.set(true);
if (isWorkerDisabled != disabled.get()) {
disabled.set(isWorkerDisabled);
log.info("Worker[%s] disabled set to [%s].", worker.getHost(), isWorkerDisabled);
log.info("Worker[%s] disabled set to [%s].", enabledWorker.getHost(), isWorkerDisabled);
listener.stateChanged(!isWorkerDisabled, WorkerHolder.this);
}
}
};
}

private static Worker workerWithDisabledState(Worker w, boolean disabled)
{
if (w.isDisabled() == disabled) {
return w;
}
return new Worker(w.getScheme(), w.getHost(), w.getIp(), w.getCapacity(), w.getVersion(), w.getCategory(), disabled);
Comment thread
jtuglu1 marked this conversation as resolved.
}

public interface Listener
{
void taskAddedOrUpdated(TaskAnnouncement announcement, WorkerHolder workerHolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.guice.annotations.PublicApi;

import javax.annotation.Nullable;

/**
* A container for worker metadata.
*/
Expand All @@ -35,6 +38,7 @@ public class Worker
private final int capacity;
private final String version;
private final String category;
private final boolean disabled;

@JsonCreator
public Worker(
Expand All @@ -43,7 +47,8 @@ public Worker(
@JsonProperty("ip") String ip,
@JsonProperty("capacity") int capacity,
@JsonProperty("version") String version,
@JsonProperty("category") String category
@JsonProperty("category") String category,
@JsonProperty("disabled") @Nullable Boolean disabled
)
{
this.scheme = scheme == null ? "http" : scheme; // needed for backwards compatibility with older workers (pre-#4270)
Expand All @@ -52,6 +57,19 @@ public Worker(
this.capacity = capacity;
this.version = version;
this.category = category;
this.disabled = Configs.valueOrDefault(disabled, false);
}

public Worker(
String scheme,
String host,
String ip,
int capacity,
String version,
String category
)
{
this(scheme, host, ip, capacity, version, category, false);
}

@JsonProperty
Expand Down Expand Up @@ -90,6 +108,12 @@ public String getCategory()
return category;
}

@JsonProperty
public boolean isDisabled()
{
return disabled;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -105,6 +129,9 @@ public boolean equals(Object o)
if (capacity != worker.capacity) {
return false;
}
if (disabled != worker.disabled) {
return false;
}
if (!scheme.equals(worker.scheme)) {
return false;
}
Expand All @@ -129,6 +156,7 @@ public int hashCode()
result = 31 * result + capacity;
result = 31 * result + version.hashCode();
result = 31 * result + category.hashCode();
result = 31 * result + (disabled ? 1 : 0);
return result;
}

Expand All @@ -142,6 +170,7 @@ public String toString()
", capacity=" + capacity +
", version='" + version + '\'' +
", category='" + category + '\'' +
", disabled=" + disabled +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,16 @@ public Response doDisable()
{
try {
if (curatorCoordinator != null) {
// Dual-write disabled signal: legacy version="" for old overlords + disabled=true for new overlords.
// TODO: Safe to drop DISABLED_VERSION once backward compatibility with overlords is no longer required.
final Worker disabledWorker = new Worker(
enabledWorker.getScheme(),
enabledWorker.getHost(),
enabledWorker.getIp(),
enabledWorker.getCapacity(),
DISABLED_VERSION,
enabledWorker.getCategory()
enabledWorker.getCategory(),
true
);
curatorCoordinator.updateWorkerAnnouncement(disabledWorker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ public void testWorkerDisabled() throws Exception
Assert.assertEquals(TaskState.SUCCESS, result.get().getStatusCode());

// Confirm RTR thinks the worker is disabled.
Assert.assertEquals("", Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().getVersion());
Assert.assertTrue(Iterables.getOnlyElement(remoteTaskRunner.getWorkers()).getWorker().isDisabled());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,9 @@ void disableWorker(Worker worker) throws Exception
worker.getHost(),
worker.getIp(),
worker.getCapacity(),
"",
worker.getCategory()
worker.getVersion(),
worker.getCategory(),
true
))
);
}
Expand Down
Loading
Loading