Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,25 +1,53 @@
package hse.java.lectures.lecture6.tasks.queue;

import java.util.LinkedList;
import java.util.Queue;

public class BoundedBlockingQueue<T> {

private int capacity;
private Queue<T> queue;
private int size;

public BoundedBlockingQueue(int capacity) {

if (capacity > 0) {
this.queue = new LinkedList<>();
this.capacity = capacity;
}
else {
throw new IllegalArgumentException("capacity should be positive!");
}
}

public void put(T item) throws InterruptedException {

if (item == null) {
throw new IllegalArgumentException("item should be not null!");
}
synchronized(queue) {
while (queue.size() >= capacity) {
queue.wait();
}
queue.add(item);
queue.notifyAll();
}
}

public T take() throws InterruptedException {
return null;
synchronized (queue) {
while (queue.size() == 0) {
queue.wait();
}
T takenElem = queue.poll();
queue.notifyAll();
return takenElem;
}
}

public int size() {
return 0;
public synchronized int size() {
return queue.size();
}

public int capacity() {
return 0;
return capacity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,34 @@ public void attachMonitor(StreamingMonitor monitor) {

@Override
public void run() {
// Writer threads are intentionally infinite for the task contract.
while (true) {
output.print(message);
onTick.run();
while(true) {
synchronized (monitor) {
if (monitor.tickCounters.getOrDefault(this.id, 0) == monitor.ticksPerWriter) {
break;
}

while (monitor.currentId != this.id && !monitor.done) {
try {
monitor.wait();
}
catch (InterruptedException exception) {
Thread.currentThread().interrupt();
return;
}
}

if (monitor.done) {
break;
}

output.print(message);
onTick.run();

monitor.tickCounters.put(monitor.currentId, monitor.tickCounters.get(monitor.currentId) + 1);
monitor.globalTicks++;
monitor.currentId = monitor.getNextId();
monitor.notifyAll();
}
}
}

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import java.util.*;

public class StreamingMonitor {
// impl your sync here
}
List<Integer> tasksIds;
Map<Integer, Integer> tickCounters;
int currentId;
boolean done;
int globalTicks;
int ticksPerWriter;

public StreamingMonitor(List<StreamWriter> tasks, int ticksPerWriter) {
this.tasksIds = new ArrayList<>();
this.tickCounters = new HashMap<>();
this.ticksPerWriter = ticksPerWriter;
this.done = false;
this.globalTicks = 0;

for (StreamWriter elem : tasks) {
int id = elem.getId();
tickCounters.put(id, 0);
tasksIds.add(id);
}

Collections.sort(tasksIds);
this.currentId = tasksIds.get(0);
}

public int getNextId() {
if (globalTicks == tasksIds.size() * ticksPerWriter) {
this.done = true;
return -1;
}
else {
return tasksIds.get((tasksIds.indexOf(currentId) + 1) % tasksIds.size());
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import java.util.ArrayList;
import java.util.List;

public class Synchronizer {
Expand All @@ -17,17 +18,25 @@ public Synchronizer(List<StreamWriter> tasks, int ticksPerWriter) {
this.ticksPerWriter = ticksPerWriter;
}

/**
* Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks
* in strict ascending id order.
*/
public void execute() {
// add monitor and sync
StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter);

List<Thread> workers = new ArrayList<>();

for (StreamWriter writer : tasks) {
writer.attachMonitor(monitor);
Thread worker = new Thread(writer, "stream-writer-" + writer.getId());
workers.add(worker);
worker.setDaemon(true);
worker.start();
}
}

}
for (Thread worker : workers) {
try {
worker.join();
} catch (InterruptedException exception) {
throw new RuntimeException(exception);
}
}
}
}
Loading