diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..9af525be 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -4,33 +4,46 @@ import java.io.PrintStream; -public class StreamWriter implements Runnable { - - private final String message; - @Getter - private final int id; - private final PrintStream output; - private final Runnable onTick; - private volatile StreamingMonitor monitor; - - public StreamWriter(int id, String message, PrintStream output, Runnable onTick) { - this.message = message; - this.id = id; - this.output = output; - this.onTick = onTick; - } - - public void attachMonitor(StreamingMonitor monitor) { - this.monitor = monitor; - } - - @Override - public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); - } - } - +public class StreamWriter implements Runnable +{ + @Getter + private final String message; + @Getter + private final int id; + @Getter + private final PrintStream output; + @Getter + private final Runnable onTick; + @Getter + private volatile StreamingMonitor monitor; + + public StreamWriter(int id, String message, PrintStream output, Runnable onTick) { + this.message = message; + this.id = id; + this.output = output; + this.onTick = onTick; + } + + public void attach(StreamingMonitor monitor) + { + this.monitor = monitor; + } + + @Override + public void run() + { + while (true) + { + try + { + monitor.await(id); + output.print(message); + onTick.run(); + monitor.tickDone(); + } catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..177802cd 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,55 @@ -package hse.java.lectures.lecture6.tasks.synchronizer; + package hse.java.lectures.lecture6.tasks.synchronizer; -public class StreamingMonitor { - // impl your sync here -} +import lombok.Getter; + +public class StreamingMonitor + { + @Getter + private final int[] sortedIds; + @Getter + private final int[] counts; + @Getter + private final int ticksPerWriter; + @Getter + private int currentIdx = 0; + @Getter + private int n; + + StreamingMonitor(int[] sortedIds, int ticksPerWriter) + { + this.sortedIds = sortedIds; + this.ticksPerWriter = ticksPerWriter; + this.counts = new int[sortedIds.length]; + this.n = sortedIds.length * ticksPerWriter; + } + + public synchronized void await(int id) throws InterruptedException + { + while (n > 0 && sortedIds[currentIdx] != id) wait(); + while (n == 0) wait(); + } + + public synchronized void tickDone() + { + counts[currentIdx]++; + n--; + + if (n == 0) + { + notifyAll(); + } + else + { + int n = sortedIds.length; + int next = (currentIdx + 1) % n; + while (counts[next] >= ticksPerWriter) next = (next + 1) % n; + currentIdx = next; + notifyAll(); + } + } + + public synchronized void waitAll() throws InterruptedException + { + while (n > 0) wait(); + } + } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..bb24f0c2 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,33 +1,47 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.Arrays; import java.util.List; -public class Synchronizer { - - public static final int DEFAULT_TICKS_PER_WRITER = 10; - private final List tasks; - private final int ticksPerWriter; - - public Synchronizer(List tasks) { - this(tasks, DEFAULT_TICKS_PER_WRITER); - } - - public Synchronizer(List tasks, int ticksPerWriter) { - this.tasks = tasks; - this.ticksPerWriter = ticksPerWriter; - } - - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ - public void execute() { - // add monitor and sync - for (StreamWriter writer : tasks) { - Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); - worker.setDaemon(true); - worker.start(); - } - } +import lombok.Getter; +public class Synchronizer +{ + @Getter + public static final int DEFAULT_TICKS_PER_WRITER = 10; + @Getter + private final List tasks; + @Getter + private final int ticksPerWriter; + + public Synchronizer(List tasks) + { + this(tasks, DEFAULT_TICKS_PER_WRITER); + } + + public Synchronizer(List tasks, int ticksPerWriter) + { + this.tasks = tasks; + this.ticksPerWriter = ticksPerWriter; + } + + public void execute() throws InterruptedException + { + + int[] ids = new int[tasks.size()]; + for (int i = 0; i < tasks.size(); i++) ids[i] = tasks.get(i).getId(); + Arrays.sort(ids); + StreamingMonitor monitor = new StreamingMonitor(ids, ticksPerWriter); + + + for (StreamWriter writer : tasks) writer.attach(monitor); + + for (StreamWriter writer : tasks) + { + Thread thread = new Thread(writer, "stream-writer-" + writer.getId()); + thread.setDaemon(true); + thread.start(); + } + monitor.waitAll(); + } }