Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,38 @@

public class BoundedBlockingQueue<T> {


T[] items;
public BoundedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();
items = (T[]) new Object[capacity];
}

public void put(T item) throws InterruptedException {

int size, tail;
public synchronized void put(T item) throws InterruptedException {
if (item == null) throw new NullPointerException();
while (size == items.length) wait();
items[tail] = item;
if (++tail == items.length) tail = 0;
size += 1;
notifyAll();
}

public T take() throws InterruptedException {
return null;
int head;
public synchronized T take() throws InterruptedException {
while (size == 0) wait();
T item = items[head];
items[head] = null;
if (++head == items.length) head = 0;
size -= 1;
notifyAll();
return item;
}

public int size() {
return 0;
public synchronized int size() {
return size;
}

public int capacity() {
return 0;
return items.length;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,15 @@ public void attachMonitor(StreamingMonitor monitor) {
@Override
public void run() {
// Writer threads are intentionally infinite for the task contract.
while (true) {
output.print(message);
onTick.run();
try {
while (true) {
monitor.awaitTurn(id);
output.print(message);
onTick.run();
monitor.tick();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

public class StreamingMonitor {
// impl your sync here
}
int[] ids;
public StreamingMonitor(int[] ids, int ticksPerWriter) {
this.ids = ids;
left = ids.length * ticksPerWriter;
}

int left, current;
public synchronized void awaitTurn(int id) throws InterruptedException {
while (left == 0 || ids[current] != id) wait();
}

public synchronized void awaitDone() throws InterruptedException {
while (left > 0) wait();
}

public synchronized void tick() {
left -= 1;
if (++current == ids.length) current = 0;
notifyAll();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import java.util.Arrays;
import java.util.List;

public class Synchronizer {
Expand All @@ -23,11 +23,24 @@ public Synchronizer(List<StreamWriter> tasks, int ticksPerWriter) {
*/
public void execute() {
// add monitor and sync
int[] ids = new int[tasks.size()];
for (int i = 0; i < tasks.size(); ++i) ids[i] = tasks.get(i).getId();

Arrays.sort(ids);
StreamingMonitor monitor = new StreamingMonitor(ids, ticksPerWriter);
for (StreamWriter writer : tasks) writer.attachMonitor(monitor);

for (StreamWriter writer : tasks) {
Thread worker = new Thread(writer, "stream-writer-" + writer.getId());
worker.setDaemon(true);
worker.start();
}

try {
monitor.awaitDone();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

}
Loading