diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 816f3ee6..2575d77d 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -2,24 +2,38 @@ public class BoundedBlockingQueue { - + T[] items; public BoundedBlockingQueue(int capacity) { - + if (capacity <= 0) throw new IllegalArgumentException(); + items = (T[]) new Object[capacity]; } - public void put(T item) throws InterruptedException { - + int size, tail; + public synchronized void put(T item) throws InterruptedException { + if (item == null) throw new NullPointerException(); + while (size == items.length) wait(); + items[tail] = item; + if (++tail == items.length) tail = 0; + size += 1; + notifyAll(); } - public T take() throws InterruptedException { - return null; + int head; + public synchronized T take() throws InterruptedException { + while (size == 0) wait(); + T item = items[head]; + items[head] = null; + if (++head == items.length) head = 0; + size -= 1; + notifyAll(); + return item; } - public int size() { - return 0; + public synchronized int size() { + return size; } public int capacity() { - return 0; + return items.length; } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..d65e1b5e 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -27,9 +27,15 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); + try { + while (true) { + monitor.awaitTurn(id); + output.print(message); + onTick.run(); + monitor.tick(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..d6a8e51e 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,24 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here -} + int[] ids; + public StreamingMonitor(int[] ids, int ticksPerWriter) { + this.ids = ids; + left = ids.length * ticksPerWriter; + } + + int left, current; + public synchronized void awaitTurn(int id) throws InterruptedException { + while (left == 0 || ids[current] != id) wait(); + } + + public synchronized void awaitDone() throws InterruptedException { + while (left > 0) wait(); + } + + public synchronized void tick() { + left -= 1; + if (++current == ids.length) current = 0; + notifyAll(); + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..abcedbd3 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,5 +1,5 @@ package hse.java.lectures.lecture6.tasks.synchronizer; - +import java.util.Arrays; import java.util.List; public class Synchronizer { @@ -23,11 +23,24 @@ public Synchronizer(List tasks, int ticksPerWriter) { */ public void execute() { // add monitor and sync + int[] ids = new int[tasks.size()]; + for (int i = 0; i < tasks.size(); ++i) ids[i] = tasks.get(i).getId(); + + Arrays.sort(ids); + StreamingMonitor monitor = new StreamingMonitor(ids, ticksPerWriter); + for (StreamWriter writer : tasks) writer.attachMonitor(monitor); + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + try { + monitor.awaitDone(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } }