From 464e1de27d4adc97d351e2256368f2fc5f5f1627 Mon Sep 17 00:00:00 2001 From: Diana Date: Mon, 16 Mar 2026 16:50:04 +0300 Subject: [PATCH] synchronizer: first try --- .../tasks/synchronizer/StreamWriter.java | 15 +++++-- .../tasks/synchronizer/StreamingMonitor.java | 44 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 19 +++++++- 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..147af5ab 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,9 +28,18 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + try { + if(!monitor.checkTurn(id)) { + return; + } + output.print(message); + onTick.run(); + monitor.TickFinished(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } } } -} +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..0872b182 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,45 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.List; + public class StreamingMonitor { - // impl your sync here -} + private int currentWriter; + private int ticksPerWriter; + private int totalWriters; + private int[] ticks; + private int total; + + StreamingMonitor(List tasks, int ticksPerWriter) { + this.ticksPerWriter = ticksPerWriter; + this.totalWriters = tasks.size(); + this.ticks = new int[totalWriters + 1]; + this.total = 0; + this.currentWriter = 1; + } + + synchronized boolean checkTurn(int writer) throws InterruptedException { + while (writer != currentWriter && total < totalWriters * ticksPerWriter) { + wait(); + } + return total < totalWriters * ticksPerWriter; + } + + synchronized void TickFinished() { + ticks[currentWriter]++; + total++; + + if (total == totalWriters * ticksPerWriter) { + notifyAll(); + return; + } + + int next = currentWriter % totalWriters + 1; + + while (ticks[next] >= ticksPerWriter) { + next = next % totalWriters + 1; + } + currentWriter = next; + + notifyAll(); + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..dd6629ad 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,5 +1,6 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.ArrayList; import java.util.List; public class Synchronizer { @@ -23,11 +24,27 @@ public Synchronizer(List tasks, int ticksPerWriter) { */ public void execute() { // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter); + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + + List workers = new ArrayList<>(); for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); + workers.add(worker); + } + + for (Thread worker : workers) { + try { + worker.join(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } } } -} +} \ No newline at end of file