diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..e2de1df9 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -5,32 +5,35 @@ import java.io.PrintStream; public class StreamWriter implements Runnable { - - private final String message; - @Getter - private final int id; - private final PrintStream output; - private final Runnable onTick; + @Getter private final String message; + @Getter private final int id; + @Getter private final PrintStream output; + @Getter private final Runnable on_tick; private volatile StreamingMonitor monitor; - public StreamWriter(int id, String message, PrintStream output, Runnable onTick) { + public StreamWriter(int id, String message, PrintStream output, Runnable on_tick) { this.message = message; this.id = id; this.output = output; - this.onTick = onTick; + this.on_tick = on_tick; } - public void attachMonitor(StreamingMonitor monitor) { + public void attach_monitor(StreamingMonitor monitor) { this.monitor = monitor; } @Override public void run() { - // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + try { + monitor.await(id); + output.print(message); + on_tick.run(); + monitor.tick_done(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } } } - } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..8ce418a0 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,41 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import lombok.Getter; + public class StreamingMonitor { - // impl your sync here + @Getter private final int[] sorted_ids; + @Getter private final int[] counts; + @Getter private final int ticks_per_writer; + @Getter private int current_idx = 0; + @Getter private int remaining_ticks; + + public StreamingMonitor(int[] sorted_ids, int ticks_per_writer) { + this.sorted_ids = sorted_ids; + this.ticks_per_writer = ticks_per_writer; + this.counts = new int[sorted_ids.length]; + this.remaining_ticks = sorted_ids.length * ticks_per_writer; + } + + public synchronized void await(int id) throws InterruptedException { + while (remaining_ticks > 0 && sorted_ids[current_idx] != id) wait(); + while (remaining_ticks == 0) wait(); + } + + public synchronized void tick_done() { + counts[current_idx]++; + remaining_ticks--; + if (remaining_ticks == 0) { + notifyAll(); + return; + } + int writers_count = sorted_ids.length; + int next = (current_idx + 1) % writers_count; + while (counts[next] >= ticks_per_writer) next = (next + 1) % writers_count; + current_idx = next; + notifyAll(); + } + + public synchronized void wait_all() throws InterruptedException { + while (remaining_ticks > 0) wait(); + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..c53b3e78 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,33 +1,35 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.Arrays; import java.util.List; -public class Synchronizer { +import lombok.Getter; +public class Synchronizer { public static final int DEFAULT_TICKS_PER_WRITER = 10; - private final List tasks; - private final int ticksPerWriter; + @Getter private final List tasks; + @Getter private final int ticks_per_writer; public Synchronizer(List tasks) { this(tasks, DEFAULT_TICKS_PER_WRITER); } - public Synchronizer(List tasks, int ticksPerWriter) { + public Synchronizer(List tasks, int ticks_per_writer) { this.tasks = tasks; - this.ticksPerWriter = ticksPerWriter; + this.ticks_per_writer = ticks_per_writer; } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ - public void execute() { - // add monitor and sync + public void execute() throws InterruptedException { + int[] ids = new int[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) ids[i] = tasks.get(i).getId(); + Arrays.sort(ids); + StreamingMonitor monitor = new StreamingMonitor(ids, ticks_per_writer); + for (StreamWriter writer : tasks) writer.attach_monitor(monitor); for (StreamWriter writer : tasks) { - Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); - worker.setDaemon(true); - worker.start(); + Thread thread = new Thread(writer, "stream-writer-" + writer.getId()); + thread.setDaemon(true); + thread.start(); } + monitor.wait_all(); } - }