diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 5c686cff..79bffb6c 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,51 @@ package hse.java.lectures.lecture6.tasks.queue; +import java.util.LinkedList; +import java.util.Queue; + public class BoundedBlockingQueue { + int capacity; + Queue queue = new LinkedList(); public BoundedBlockingQueue(int capacity) { + if (capacity <= 0){ + throw new IllegalArgumentException(); + } + this.capacity = capacity; } - public void put(T item) { + public synchronized void put(T item) throws InterruptedException { + if (item == null){ + throw new IllegalArgumentException(); + } + + while (queue.size() == capacity){ + + wait(); + } + + queue.offer(item); + notifyAll(); } - public T take() { - return null; + public synchronized T take() throws InterruptedException { + while (queue.isEmpty()){ + + wait(); + } + T item = queue.poll(); + notifyAll(); + return item; } - public int size() { - return 0; + public synchronized int size() { + return queue.size(); } public int capacity() { - return 0; + return capacity; } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..7a1c68e9 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,7 +28,20 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { + try { + monitor.startTick(id); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + if (monitor.writerEnded(id)){ + break; + } + output.print(message); + + monitor.endTick(id); + onTick.run(); } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..0aad7fc4 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -2,4 +2,53 @@ public class StreamingMonitor { // impl your sync here + private int totalWritersCount; + private int writersEnded; + private int expectedWriterIndex; + private int ticksPerWriter; + private int[] currWritersTicks; + + public StreamingMonitor(int writersCount, int ticksPerWriter){ + this.totalWritersCount = writersCount; + this.currWritersTicks = new int[writersCount]; + this.ticksPerWriter = ticksPerWriter; + this.expectedWriterIndex = 0; + this.writersEnded = 0; + } + + public synchronized void startTick(int writerId) throws InterruptedException { + var writerIndex = writerId - 1; + + if (currWritersTicks[writerIndex] == ticksPerWriter){ + return; + } + + while (writerIndex != expectedWriterIndex){ + wait(); + } + } + + public synchronized void endTick(int writerId){ + var writeIndex = writerId-1; + + currWritersTicks[writeIndex]++; + + if (currWritersTicks[writeIndex] == ticksPerWriter){ + writersEnded++; + } + expectedWriterIndex = (expectedWriterIndex + 1) % totalWritersCount; + notifyAll(); + } + + public synchronized void waitAllWriters() throws InterruptedException { + while (writersEnded < totalWritersCount){ + wait(); + } + } + + public boolean writerEnded(int writerId){ + var writerIndex = writerId - 1; + + return (currWritersTicks[writerIndex] == ticksPerWriter); + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..da119a2f 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -21,13 +21,21 @@ public Synchronizer(List tasks, int ticksPerWriter) { * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks * in strict ascending id order. */ - public void execute() { + public void execute() throws InterruptedException { // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks.size(), ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + monitor.waitAllWriters(); } }