diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 5c686cff..816f3ee6 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -7,11 +7,11 @@ public BoundedBlockingQueue(int capacity) { } - public void put(T item) { + public void put(T item) throws InterruptedException { } - public T take() { + public T take() throws InterruptedException { return null; } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..32af1f32 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,8 +28,12 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { + if (!monitor.acquire(this)) { + return; + } output.print(message); onTick.run(); + monitor.release(this); } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..84e97dd4 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,46 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.List; + public class StreamingMonitor { - // impl your sync here + + private final List writers; + private final int totalTicks; + private int nextIndex = 0; + private int ticks = 0; + + public StreamingMonitor(List writers, int ticksPerWriter) { + this.writers = writers.stream() + .sorted((a, b) -> Integer.compare(a.getId(), b.getId())) + .toList(); + this.totalTicks = writers.size() * ticksPerWriter; + } + + public synchronized boolean acquire(StreamWriter writer) { + while (ticks < totalTicks && writers.get(nextIndex) != writer) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + return ticks < totalTicks; + } + + public synchronized void release(StreamWriter writer) { + ++ticks; + nextIndex = (nextIndex + 1) % writers.size(); + notifyAll(); + } + + public synchronized void awaitCompletion() { + while (ticks < totalTicks) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..0828257c 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -22,12 +22,19 @@ public Synchronizer(List tasks, int ticksPerWriter) { * in strict ascending id order. */ public void execute() { - // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + monitor.awaitCompletion(); } }