From 61091242f142f20f480ce4c728bed77193d4f2ec Mon Sep 17 00:00:00 2001 From: Timofeev_V_S Date: Sun, 22 Mar 2026 22:39:13 +0300 Subject: [PATCH] synchronizer: --- .../tasks/synchronizer/StreamWriter.java | 24 ++++--- .../tasks/synchronizer/StreamingMonitor.java | 67 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 21 +++--- 3 files changed, 91 insertions(+), 21 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..079876ad 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -1,13 +1,9 @@ package hse.java.lectures.lecture6.tasks.synchronizer; -import lombok.Getter; - import java.io.PrintStream; public class StreamWriter implements Runnable { - private final String message; - @Getter private final int id; private final PrintStream output; private final Runnable onTick; @@ -24,13 +20,21 @@ public void attachMonitor(StreamingMonitor monitor) { this.monitor = monitor; } + public int getId() { + return id; + } + @Override public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); + try { + while (true) { + monitor.awaitTurn(id); + output.print(message); + onTick.run(); + monitor.tickCompleted(id); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } - -} +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..33e65bd8 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,68 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.Arrays; + public class StreamingMonitor { - // impl your sync here -} + private final int[] sortedIds; + private final int[] remaining; + private final int totalTicks; + private int ticksDone; + private int currentIdx; + private boolean finished; + + public StreamingMonitor(int[] ids, int ticksPerWriter) { + this.sortedIds = ids.clone(); + Arrays.sort(this.sortedIds); + this.remaining = new int[this.sortedIds.length]; + for (int i = 0; i < this.sortedIds.length; i++) { + remaining[i] = ticksPerWriter; + } + this.totalTicks = ids.length * ticksPerWriter; + this.ticksDone = 0; + this.currentIdx = 0; + this.finished = false; + } + + public synchronized void awaitTurn(int id) throws InterruptedException { + while (!finished && (currentIdx >= sortedIds.length || sortedIds[currentIdx] != id)) { + wait(); + } + while (finished) { + wait(); + } + } + + + public synchronized void tickCompleted(int id) { + ticksDone++; + int idx = findIndex(id); + remaining[idx]--; + + if (ticksDone == totalTicks) { + finished = true; + notifyAll(); + return; + } + + int next = (currentIdx + 1) % sortedIds.length; + while (remaining[next] == 0 && next != currentIdx) { + next = (next + 1) % sortedIds.length; + } + currentIdx = next; + notifyAll(); + } + + + public synchronized void waitForCompletion() throws InterruptedException { + while (!finished) { + wait(); + } + } + + private int findIndex(int id) { + for (int i = 0; i < sortedIds.length; i++) { + if (sortedIds[i] == id) return i; + } + throw new IllegalArgumentException("Unknown writer id: " + id); + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..06873277 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,9 +1,9 @@ package hse.java.lectures.lecture6.tasks.synchronizer; import java.util.List; +import java.util.stream.IntStream; public class Synchronizer { - public static final int DEFAULT_TICKS_PER_WRITER = 10; private final List tasks; private final int ticksPerWriter; @@ -17,17 +17,20 @@ public Synchronizer(List tasks, int ticksPerWriter) { this.ticksPerWriter = ticksPerWriter; } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ - public void execute() { - // add monitor and sync + public void execute() throws InterruptedException { + int[] ids = tasks.stream().mapToInt(StreamWriter::getId).toArray(); + StreamingMonitor monitor = new StreamingMonitor(ids, ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } - } -} + monitor.waitForCompletion(); + } +} \ No newline at end of file