diff --git a/src/main/java/hse/java/lectures/lecture3/practice/randomSet/RandomSet.java b/src/main/java/hse/java/lectures/lecture3/practice/randomSet/RandomSet.java index 8af477b5..0d2c4750 100644 --- a/src/main/java/hse/java/lectures/lecture3/practice/randomSet/RandomSet.java +++ b/src/main/java/hse/java/lectures/lecture3/practice/randomSet/RandomSet.java @@ -1,21 +1,152 @@ package hse.java.lectures.lecture3.practice.randomSet; +import java.util.Random; + public class RandomSet { + private static final int INITIAL_CAPACITY = 16; + private static final double LOAD_FACTOR = 0.7; + + private Object[] data; + private int size; + + private Entry[] table; + private int capacity; + private int tableSize; + + private final Random random = new Random(); + + public RandomSet() { + data = new Object[INITIAL_CAPACITY]; + size = 0; + + capacity = INITIAL_CAPACITY; + table = (Entry[]) new Entry[capacity]; + tableSize = 0; + } + public boolean insert(T value) { - throw new UnsupportedOperationException("Not implemented"); + if (contains(value)) { + return false; + } + + ensureDataCapacity(); + ensureTableCapacity(); + + data[size] = value; + put(value, size); + + size++; + return true; } public boolean remove(T value) { - throw new UnsupportedOperationException("Not implemented"); + int index = findIndex(value); + if (index == -1) { + return false; + } + + int dataIndex = table[index].dataIndex; + + T lastElement = elementAt(size - 1); + data[dataIndex] = lastElement; + + int lastTableIndex = findIndex(lastElement); + table[lastTableIndex].dataIndex = dataIndex; + + data[size - 1] = null; + size--; + + table[index].deleted = true; + tableSize--; + + return true; } public boolean contains(T value) { - throw new UnsupportedOperationException("Not implemented"); + return findIndex(value) != -1; } public T getRandom() { - throw new UnsupportedOperationException("Not implemented"); + if (size == 0) { + throw new EmptySetException("Set is empty"); + } + + int index = random.nextInt(size); + return elementAt(index); + } + + private T elementAt(int index) { + return (T) data[index]; + } + + private void ensureDataCapacity() { + if (size >= data.length) { + Object[] newData = new Object[data.length * 2]; + System.arraycopy(data, 0, newData, 0, data.length); + data = newData; + } + } + + private void ensureTableCapacity() { + if ((double) tableSize / capacity >= LOAD_FACTOR) { + rehash(); + } + } + + private void rehash() { + Entry[] oldTable = table; + capacity *= 2; + table = (Entry[]) new Entry[capacity]; + tableSize = 0; + + for (Entry entry : oldTable) { + if (entry != null && !entry.deleted) { + put(entry.value, entry.dataIndex); + } + } + } + + private void put(T value, int dataIndex) { + int hash = hash(value); + while (table[hash] != null && !table[hash].deleted) { + hash = (hash + 1) % capacity; + } + table[hash] = new Entry<>(value, dataIndex); + tableSize++; + } + + private int findIndex(T value) { + int hash = hash(value); + int start = hash; + + while (table[hash] != null) { + if (!table[hash].deleted && table[hash].value.equals(value)) { + return hash; + } + hash = (hash + 1) % capacity; + if (hash == start) { + break; + } + } + + return -1; + } + + private int hash(T value) { + return (value == null ? 0 : Math.abs(value.hashCode())) % capacity; + } + + private static class Entry { + T value; + int dataIndex; + boolean deleted; + + Entry(T value, int dataIndex) { + this.value = value; + this.dataIndex = dataIndex; + this.deleted = false; + } } -} +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 5c686cff..b35fe918 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,59 @@ package hse.java.lectures.lecture6.tasks.queue; public class BoundedBlockingQueue { - + private final Object[] buffer; + private int head = 0; + private int tail = 0; + private int count = 0; + private final int capacity; public BoundedBlockingQueue(int capacity) { - + if (capacity <= 0) { + throw new IllegalArgumentException("Capacity error"); + } + this.capacity = capacity; + this.buffer = new Object[capacity]; } - public void put(T item) { + public void put(T item) throws InterruptedException { + if (item == null) { + throw new NullPointerException("Item error"); + } + + synchronized (this) { + while (count == capacity) { + wait(); + } + buffer[tail] = item; + tail = (tail + 1) % capacity; + count++; + + notifyAll(); + } } - public T take() { - return null; + public T take() throws InterruptedException { + synchronized (this) { + while (count == 0) { + wait(); + } + + T item = (T) buffer[head]; + buffer[head] = null; + head = (head + 1) % capacity; + count--; + + notifyAll(); + return item; + } } public int size() { - return 0; + return count; } public int capacity() { - return 0; + return capacity; } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..a4930622 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,8 +28,21 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { + + if (monitor == null) { + break; + } + + if (!monitor.waitingForTurn(id)) { + break; + } + output.print(message); onTick.run(); + + if(!monitor.tickCompleated(id)) { + break; + } } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..8b0195c6 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,78 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here + private int currentId = 1; + private final int writerCount; + private final int tickPerWriter; + private int[] totalTick; + private boolean completed = false; + + public StreamingMonitor(int writerCount, int tickPerWriter) { + this.writerCount = writerCount; + this.tickPerWriter = tickPerWriter; + this. totalTick = new int[writerCount+1]; + } + + public synchronized boolean waitingForTurn(int writerId) { + if (completed) { + return false; + } + + if (totalTick[writerId] >= tickPerWriter) { + return false; + } + + while (writerId != currentId && !completed) { + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + + if (completed) { + return false; + } + + if (totalTick[writerId] >= tickPerWriter) { + return false; + } + } + + if (completed || totalTick[writerId] >= tickPerWriter) { + return false; + } + + return true; + } + + public synchronized boolean tickCompleated(int writerId) { + totalTick[writerId]++; + + boolean allWriterCompleated = true; + + for (int i = 1; i <= writerCount; i++) { + if(totalTick[i] < tickPerWriter) { + allWriterCompleated = false; + break; + } + } + + if (allWriterCompleated) { + completed = true; + notifyAll(); + return false; + } + + do { + currentId = (currentId % writerCount) + 1; + } while (totalTick[currentId] >= tickPerWriter); + + notifyAll(); + return true; + } + + public synchronized boolean isCompleted() { + return completed; + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..7a910718 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -23,11 +23,28 @@ public Synchronizer(List tasks, int ticksPerWriter) { */ public void execute() { // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks.size(), ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + try { + synchronized (monitor) { + while (!monitor.isCompleted()) { + monitor.wait(); + } + } + + } catch(InterruptedException e) { + Thread.currentThread().interrupt(); + } } }