From 37de3bbf8b3cc45eb4b8e5599727e5cd9cfd026c Mon Sep 17 00:00:00 2001 From: VictoriaGrudtsyna Date: Mon, 16 Mar 2026 21:04:08 +0300 Subject: [PATCH 1/2] queue: all methods are implemented --- .../tasks/queue/BoundedBlockingQueue.java | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 816f3ee..2a1bcf4 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,53 @@ package hse.java.lectures.lecture6.tasks.queue; +import java.util.LinkedList; +import java.util.Queue; + public class BoundedBlockingQueue { + private int capacity; + private Queue queue; + private int size; public BoundedBlockingQueue(int capacity) { - + if (capacity > 0) { + this.queue = new LinkedList<>(); + this.capacity = capacity; + } + else { + throw new IllegalArgumentException("capacity should be positive!"); + } } public void put(T item) throws InterruptedException { - + if (item == null) { + throw new IllegalArgumentException("item should be not null!"); + } + synchronized(queue) { + while (queue.size() >= capacity) { + queue.wait(); + } + queue.add(item); + queue.notifyAll(); + } } public T take() throws InterruptedException { - return null; + synchronized (queue) { + while (queue.size() == 0) { + queue.wait(); + } + T takenElem = queue.poll(); + queue.notifyAll(); + return takenElem; + } } - public int size() { - return 0; + public synchronized int size() { + return queue.size(); } public int capacity() { - return 0; + return capacity; } } From 566e25df1e3e97001a4b0d17513be86aefa60bf2 Mon Sep 17 00:00:00 2001 From: VictoriaGrudtsyna Date: Wed, 18 Mar 2026 16:10:58 +0300 Subject: [PATCH 2/2] synchronizer: first attempt --- .../tasks/synchronizer/StreamWriter.java | 35 ++++++++++++++--- .../tasks/synchronizer/StreamingMonitor.java | 38 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 23 +++++++---- 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e6..dcc291c 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,11 +26,34 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); + while(true) { + synchronized (monitor) { + if (monitor.tickCounters.getOrDefault(this.id, 0) == monitor.ticksPerWriter) { + break; + } + + while (monitor.currentId != this.id && !monitor.done) { + try { + monitor.wait(); + } + catch (InterruptedException exception) { + Thread.currentThread().interrupt(); + return; + } + } + + if (monitor.done) { + break; + } + + output.print(message); + onTick.run(); + + monitor.tickCounters.put(monitor.currentId, monitor.tickCounters.get(monitor.currentId) + 1); + monitor.globalTicks++; + monitor.currentId = monitor.getNextId(); + monitor.notifyAll(); + } } } - -} +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f27..d5558fc 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,39 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.*; + public class StreamingMonitor { - // impl your sync here -} + List tasksIds; + Map tickCounters; + int currentId; + boolean done; + int globalTicks; + int ticksPerWriter; + + public StreamingMonitor(List tasks, int ticksPerWriter) { + this.tasksIds = new ArrayList<>(); + this.tickCounters = new HashMap<>(); + this.ticksPerWriter = ticksPerWriter; + this.done = false; + this.globalTicks = 0; + + for (StreamWriter elem : tasks) { + int id = elem.getId(); + tickCounters.put(id, 0); + tasksIds.add(id); + } + + Collections.sort(tasksIds); + this.currentId = tasksIds.get(0); + } + + public int getNextId() { + if (globalTicks == tasksIds.size() * ticksPerWriter) { + this.done = true; + return -1; + } + else { + return tasksIds.get((tasksIds.indexOf(currentId) + 1) % tasksIds.size()); + } + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8ade..f9152d3 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,5 +1,6 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.ArrayList; import java.util.List; public class Synchronizer { @@ -17,17 +18,25 @@ public Synchronizer(List tasks, int ticksPerWriter) { this.ticksPerWriter = ticksPerWriter; } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ public void execute() { - // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter); + + List workers = new ArrayList<>(); + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); + workers.add(worker); worker.setDaemon(true); worker.start(); } - } -} + for (Thread worker : workers) { + try { + worker.join(); + } catch (InterruptedException exception) { + throw new RuntimeException(exception); + } + } + } +} \ No newline at end of file