From 5dfe968b2f0842c7f72a567f21a0298a91a205d7 Mon Sep 17 00:00:00 2001 From: Mac Date: Sat, 14 Mar 2026 12:51:13 +0300 Subject: [PATCH 1/9] queue: first try --- .../tasks/queue/BoundedBlockingQueue.java | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 5c686cff..32fc54ef 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,51 @@ package hse.java.lectures.lecture6.tasks.queue; +import java.util.LinkedList; +import java.util.Queue; + public class BoundedBlockingQueue { + int capacity; + Queue queue = new LinkedList(); public BoundedBlockingQueue(int capacity) { + if (capacity <= 0){ + throw new IllegalArgumentException(); + } + this.capacity = capacity; } public void put(T item) { + if (item == null){ + throw new IllegalArgumentException(); + } + + while (queue.size() == capacity){ + try {wait();} catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + queue.offer(item); } - public T take() { - return null; + public synchronized T take() { + while (queue.isEmpty()){ + try {wait();} catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + T item = queue.poll(); + notifyAll(); + return item; } - public int size() { - return 0; + public synchronized int size() { + return queue.size(); } public int capacity() { - return 0; + return capacity; } } From d52909c02e2f7c3b53a49bf36ce05fee3fc8e997 Mon Sep 17 00:00:00 2001 From: Mac Date: Sat, 14 Mar 2026 12:59:04 +0300 Subject: [PATCH 2/9] queue: added synchronized --- .../lectures/lecture6/tasks/queue/BoundedBlockingQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 32fc54ef..9fbdebbf 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -16,14 +16,14 @@ public BoundedBlockingQueue(int capacity) { this.capacity = capacity; } - public void put(T item) { + public synchronized void put(T item) { if (item == null){ throw new IllegalArgumentException(); } while (queue.size() == capacity){ try {wait();} catch (InterruptedException e) { - throw new RuntimeException(e); + e.printStackTrace(); } } From a5865cde1d1032e2de7e3dc383dd8e520fe8af4f Mon Sep 17 00:00:00 2001 From: Mac Date: Sat, 14 Mar 2026 13:01:13 +0300 Subject: [PATCH 3/9] queue: deleted try/catch block --- .../lecture6/tasks/queue/BoundedBlockingQueue.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 9fbdebbf..f9508bcf 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -22,9 +22,7 @@ public synchronized void put(T item) { } while (queue.size() == capacity){ - try {wait();} catch (InterruptedException e) { - e.printStackTrace(); - } + wait(); } queue.offer(item); @@ -32,9 +30,7 @@ public synchronized void put(T item) { public synchronized T take() { while (queue.isEmpty()){ - try {wait();} catch (InterruptedException e) { - throw new RuntimeException(e); - } + wait(); } T item = queue.poll(); notifyAll(); From b74c80be74579fc1493f5e5d62f46acfade5a4cf Mon Sep 17 00:00:00 2001 From: Mac Date: Sat, 14 Mar 2026 13:04:11 +0300 Subject: [PATCH 4/9] queue: returned try/catch block --- .../lecture6/tasks/queue/BoundedBlockingQueue.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index f9508bcf..9654d4db 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -22,7 +22,11 @@ public synchronized void put(T item) { } while (queue.size() == capacity){ - wait(); + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } queue.offer(item); @@ -30,7 +34,11 @@ public synchronized void put(T item) { public synchronized T take() { while (queue.isEmpty()){ - wait(); + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } T item = queue.poll(); notifyAll(); From b9eacab65b6e054808fdf1cdf19a7cbb7b82614c Mon Sep 17 00:00:00 2001 From: Mac Date: Sat, 14 Mar 2026 13:05:55 +0300 Subject: [PATCH 5/9] queue: added notifyAll() in put(...) --- .../java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 9654d4db..35cef791 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -30,6 +30,7 @@ public synchronized void put(T item) { } queue.offer(item); + notifyAll(); } public synchronized T take() { From a9f18d3ed73c6b1a08c0c6056a868956fbe131c2 Mon Sep 17 00:00:00 2001 From: Mac Date: Sat, 14 Mar 2026 13:08:59 +0300 Subject: [PATCH 6/9] queue: added throws ..., delete try/catch block --- .../tasks/queue/BoundedBlockingQueue.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 35cef791..79bffb6c 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -16,30 +16,25 @@ public BoundedBlockingQueue(int capacity) { this.capacity = capacity; } - public synchronized void put(T item) { + public synchronized void put(T item) throws InterruptedException { if (item == null){ throw new IllegalArgumentException(); } while (queue.size() == capacity){ - try { + wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + } queue.offer(item); notifyAll(); } - public synchronized T take() { + public synchronized T take() throws InterruptedException { while (queue.isEmpty()){ - try { + wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } } T item = queue.poll(); notifyAll(); From 9f58bb0b31081581e8590a70fdc07015fe7f866a Mon Sep 17 00:00:00 2001 From: Mac Date: Tue, 17 Mar 2026 21:01:10 +0300 Subject: [PATCH 7/9] synchronizer: test --- .../tasks/synchronizer/StreamWriter.java | 8 +++++ .../tasks/synchronizer/StreamingMonitor.java | 35 +++++++++++++++++++ .../tasks/synchronizer/Synchronizer.java | 10 +++++- 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..a99d3491 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,7 +28,15 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { + try { + monitor.startTick(id); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } output.print(message); + + monitor.endTick(id); + onTick.run(); } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..d26696b7 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -2,4 +2,39 @@ public class StreamingMonitor { // impl your sync here + private int totalWritersCount; + private int writersEnded; + private int expectedWriterId; + private int ticksPerWriter; + private int[] currWritersTicks; + + public StreamingMonitor(int writersCount, int ticksPerWriter){ + this.totalWritersCount = writersCount; + this.currWritersTicks = new int[writersCount]; + this.ticksPerWriter = ticksPerWriter; + this.expectedWriterId = 0; + this.writersEnded = 0; + } + + public synchronized void startTick(int writerId) throws InterruptedException { + if (writerId != expectedWriterId){ + wait(); + } + } + + public synchronized void endTick(int writerId){ + currWritersTicks[writerId]++; + + if (currWritersTicks[writerId] == ticksPerWriter){ + writersEnded++; + } + + notifyAll(); + } + + public synchronized void waitAllWriters() throws InterruptedException { + while (writersEnded < totalWritersCount){ + wait(); + } + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..da119a2f 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -21,13 +21,21 @@ public Synchronizer(List tasks, int ticksPerWriter) { * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks * in strict ascending id order. */ - public void execute() { + public void execute() throws InterruptedException { // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks.size(), ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + monitor.waitAllWriters(); } } From ad7b38c4e5d77bf2cd7203589fe9bd0e0193f1b5 Mon Sep 17 00:00:00 2001 From: Mac Date: Tue, 17 Mar 2026 21:22:28 +0300 Subject: [PATCH 8/9] synchronizer: test2 --- .../tasks/synchronizer/StreamingMonitor.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index d26696b7..c2491ec5 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -4,7 +4,7 @@ public class StreamingMonitor { // impl your sync here private int totalWritersCount; private int writersEnded; - private int expectedWriterId; + private int expectedWriterIndex; private int ticksPerWriter; private int[] currWritersTicks; @@ -12,23 +12,31 @@ public StreamingMonitor(int writersCount, int ticksPerWriter){ this.totalWritersCount = writersCount; this.currWritersTicks = new int[writersCount]; this.ticksPerWriter = ticksPerWriter; - this.expectedWriterId = 0; + this.expectedWriterIndex = 0; this.writersEnded = 0; } public synchronized void startTick(int writerId) throws InterruptedException { - if (writerId != expectedWriterId){ + var writerIndex = writerId - 1; + + if (currWritersTicks[writerIndex] == ticksPerWriter){ + return; + } + + while (writerIndex != expectedWriterIndex){ wait(); } } public synchronized void endTick(int writerId){ - currWritersTicks[writerId]++; + var writeIndex = writerId-1; - if (currWritersTicks[writerId] == ticksPerWriter){ + currWritersTicks[writeIndex]++; + + if (currWritersTicks[writeIndex] == ticksPerWriter){ writersEnded++; } - + expectedWriterIndex = (expectedWriterIndex + 1) % totalWritersCount; notifyAll(); } From b1c9ece46361d5d4145224d7eed77c66a2b892b4 Mon Sep 17 00:00:00 2001 From: Mac Date: Tue, 17 Mar 2026 21:33:11 +0300 Subject: [PATCH 9/9] synchronizer: test3 --- .../lectures/lecture6/tasks/synchronizer/StreamWriter.java | 5 +++++ .../lecture6/tasks/synchronizer/StreamingMonitor.java | 6 ++++++ 2 files changed, 11 insertions(+) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index a99d3491..7a1c68e9 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -33,6 +33,11 @@ public void run() { } catch (InterruptedException e) { throw new RuntimeException(e); } + + if (monitor.writerEnded(id)){ + break; + } + output.print(message); monitor.endTick(id); diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index c2491ec5..0aad7fc4 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -45,4 +45,10 @@ public synchronized void waitAllWriters() throws InterruptedException { wait(); } } + + public boolean writerEnded(int writerId){ + var writerIndex = writerId - 1; + + return (currWritersTicks[writerIndex] == ticksPerWriter); + } }