From d12b69e86a9ba482662b314f48136ec62859fc88 Mon Sep 17 00:00:00 2001 From: siwreienta Date: Wed, 18 Mar 2026 02:34:28 +0300 Subject: [PATCH 1/2] synchronizer: first try --- .../tasks/synchronizer/StreamWriter.java | 15 +++++-- .../tasks/synchronizer/StreamingMonitor.java | 44 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 16 ++++++- 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..a9b10dc4 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,11 +26,18 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + try { + if (!monitor.allowTick(id)) { + if (monitor.finished()) return; + continue; + } + output.print(message); + onTick.run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } } } - } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..4660156a 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,47 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class StreamingMonitor { - // impl your sync here + private final Map ticks_ = new HashMap<>(); + private final int ticks_per_writer_; + private final int writers_amount_; + private int current_idx_ = 0; + private int total_ticks_ = 0; + private final int max_ticks_; + private final List sorted_id_; + + public StreamingMonitor(List ids, int ticks_per_writer) { + this.sorted_id_ = ids; + this.ticks_per_writer_ = ticks_per_writer; + this.writers_amount_ = ids.size(); + this.max_ticks_ = writers_amount_ * ticks_per_writer_; + for (Integer id : ids) ticks_.put(id, 0); + } + + public synchronized boolean allowTick(int id) throws InterruptedException { + while (!sorted_id_.get(current_idx_).equals(id) && total_ticks_ < max_ticks_) wait(); + if (total_ticks_ >= max_ticks_) { + notifyAll(); + return false; + } + int count = ticks_.get(id); + if (count >= ticks_per_writer_) { + current_idx_ = (current_idx_ + 1) % writers_amount_; + notifyAll(); + return false; + } + ticks_.put(id, count + 1); + total_ticks_++; + current_idx_ = (current_idx_ + 1) % writers_amount_; + notifyAll(); + return true; + } + + public synchronized boolean finished() { + return total_ticks_ >= max_ticks_; + } + } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..40704530 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -22,12 +22,24 @@ public Synchronizer(List tasks, int ticksPerWriter) { * in strict ascending id order. */ public void execute() { - // add monitor and sync + + List ids = tasks.stream() + .map(StreamWriter::getId) + .sorted() + .toList(); + + StreamingMonitor monitor = new StreamingMonitor(ids, ticksPerWriter); + for (StreamWriter writer : tasks) writer.attachMonitor(monitor); for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + while (!monitor.finished()) { + try { + Thread.sleep(1); + } catch (InterruptedException ignored) { + } + } } - } From 7f0046ade835dfbe8181a013eb9225b489c78500 Mon Sep 17 00:00:00 2001 From: siwreienta Date: Wed, 18 Mar 2026 02:42:41 +0300 Subject: [PATCH 2/2] synchronizer: second try --- .../tasks/synchronizer/StreamWriter.java | 24 ++++---- .../tasks/synchronizer/StreamingMonitor.java | 60 +++++++++---------- .../tasks/synchronizer/Synchronizer.java | 44 ++++++-------- 3 files changed, 54 insertions(+), 74 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index a9b10dc4..e2de1df9 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -5,22 +5,20 @@ import java.io.PrintStream; public class StreamWriter implements Runnable { - - private final String message; - @Getter - private final int id; - private final PrintStream output; - private final Runnable onTick; + @Getter private final String message; + @Getter private final int id; + @Getter private final PrintStream output; + @Getter private final Runnable on_tick; private volatile StreamingMonitor monitor; - public StreamWriter(int id, String message, PrintStream output, Runnable onTick) { + public StreamWriter(int id, String message, PrintStream output, Runnable on_tick) { this.message = message; this.id = id; this.output = output; - this.onTick = onTick; + this.on_tick = on_tick; } - public void attachMonitor(StreamingMonitor monitor) { + public void attach_monitor(StreamingMonitor monitor) { this.monitor = monitor; } @@ -28,12 +26,10 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { while (true) { try { - if (!monitor.allowTick(id)) { - if (monitor.finished()) return; - continue; - } + monitor.await(id); output.print(message); - onTick.run(); + on_tick.run(); + monitor.tick_done(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 4660156a..8ce418a0 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,47 +1,41 @@ package hse.java.lectures.lecture6.tasks.synchronizer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import lombok.Getter; public class StreamingMonitor { - private final Map ticks_ = new HashMap<>(); - private final int ticks_per_writer_; - private final int writers_amount_; - private int current_idx_ = 0; - private int total_ticks_ = 0; - private final int max_ticks_; - private final List sorted_id_; + @Getter private final int[] sorted_ids; + @Getter private final int[] counts; + @Getter private final int ticks_per_writer; + @Getter private int current_idx = 0; + @Getter private int remaining_ticks; - public StreamingMonitor(List ids, int ticks_per_writer) { - this.sorted_id_ = ids; - this.ticks_per_writer_ = ticks_per_writer; - this.writers_amount_ = ids.size(); - this.max_ticks_ = writers_amount_ * ticks_per_writer_; - for (Integer id : ids) ticks_.put(id, 0); + public StreamingMonitor(int[] sorted_ids, int ticks_per_writer) { + this.sorted_ids = sorted_ids; + this.ticks_per_writer = ticks_per_writer; + this.counts = new int[sorted_ids.length]; + this.remaining_ticks = sorted_ids.length * ticks_per_writer; } - public synchronized boolean allowTick(int id) throws InterruptedException { - while (!sorted_id_.get(current_idx_).equals(id) && total_ticks_ < max_ticks_) wait(); - if (total_ticks_ >= max_ticks_) { - notifyAll(); - return false; - } - int count = ticks_.get(id); - if (count >= ticks_per_writer_) { - current_idx_ = (current_idx_ + 1) % writers_amount_; + public synchronized void await(int id) throws InterruptedException { + while (remaining_ticks > 0 && sorted_ids[current_idx] != id) wait(); + while (remaining_ticks == 0) wait(); + } + + public synchronized void tick_done() { + counts[current_idx]++; + remaining_ticks--; + if (remaining_ticks == 0) { notifyAll(); - return false; + return; } - ticks_.put(id, count + 1); - total_ticks_++; - current_idx_ = (current_idx_ + 1) % writers_amount_; + int writers_count = sorted_ids.length; + int next = (current_idx + 1) % writers_count; + while (counts[next] >= ticks_per_writer) next = (next + 1) % writers_count; + current_idx = next; notifyAll(); - return true; } - public synchronized boolean finished() { - return total_ticks_ >= max_ticks_; + public synchronized void wait_all() throws InterruptedException { + while (remaining_ticks > 0) wait(); } - } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 40704530..c53b3e78 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,45 +1,35 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.Arrays; import java.util.List; -public class Synchronizer { +import lombok.Getter; +public class Synchronizer { public static final int DEFAULT_TICKS_PER_WRITER = 10; - private final List tasks; - private final int ticksPerWriter; + @Getter private final List tasks; + @Getter private final int ticks_per_writer; public Synchronizer(List tasks) { this(tasks, DEFAULT_TICKS_PER_WRITER); } - public Synchronizer(List tasks, int ticksPerWriter) { + public Synchronizer(List tasks, int ticks_per_writer) { this.tasks = tasks; - this.ticksPerWriter = ticksPerWriter; + this.ticks_per_writer = ticks_per_writer; } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ - public void execute() { - - List ids = tasks.stream() - .map(StreamWriter::getId) - .sorted() - .toList(); - - StreamingMonitor monitor = new StreamingMonitor(ids, ticksPerWriter); - for (StreamWriter writer : tasks) writer.attachMonitor(monitor); + public void execute() throws InterruptedException { + int[] ids = new int[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) ids[i] = tasks.get(i).getId(); + Arrays.sort(ids); + StreamingMonitor monitor = new StreamingMonitor(ids, ticks_per_writer); + for (StreamWriter writer : tasks) writer.attach_monitor(monitor); for (StreamWriter writer : tasks) { - Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); - worker.setDaemon(true); - worker.start(); - } - while (!monitor.finished()) { - try { - Thread.sleep(1); - } catch (InterruptedException ignored) { - } + Thread thread = new Thread(writer, "stream-writer-" + writer.getId()); + thread.setDaemon(true); + thread.start(); } + monitor.wait_all(); } }