diff --git a/Gemfile.lock b/Gemfile.lock index ed64b88..fdd6fe4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -14,16 +14,16 @@ GEM daemons (1.1.8) diff-lcs (1.1.3) eventmachine (0.12.10) - json (1.7.3) + json (1.7.4) rake (0.9.2.2) - rspec (2.10.0) - rspec-core (~> 2.10.0) - rspec-expectations (~> 2.10.0) - rspec-mocks (~> 2.10.0) - rspec-core (2.10.1) - rspec-expectations (2.10.0) + rspec (2.11.0) + rspec-core (~> 2.11.0) + rspec-expectations (~> 2.11.0) + rspec-mocks (~> 2.11.0) + rspec-core (2.11.1) + rspec-expectations (2.11.2) diff-lcs (~> 1.1.3) - rspec-mocks (2.10.1) + rspec-mocks (2.11.1) PLATFORMS ruby diff --git a/README.markdown b/README.markdown index 60abcf4..3368b8a 100644 --- a/README.markdown +++ b/README.markdown @@ -69,6 +69,12 @@ Options: - :timeout => The number of seconds to wait for a lock to become available (default: wait forever). - :queue_max => If the lock queue length is greater than :queue_max then don't wait for the lock (default: infinite). +#### How to use n Semaphore + +To allow multiple clients to use the same lock you can use n Semaphore functionality. +Officer can use a n Semaphore in stead of a Mutex by adding #n to the lock name. + + client.lock 'some_lock_name#2' # will create a 2 semaphore ### Unlock diff --git a/lib/officer.rb b/lib/officer.rb index 2a0ad6d..6a6ff5e 100644 --- a/lib/officer.rb +++ b/lib/officer.rb @@ -23,3 +23,4 @@ require 'officer/runner' require 'officer/server' require 'officer/client' +require 'officer/semaphore' diff --git a/lib/officer/connection.rb b/lib/officer/connection.rb index ac2eabe..ddaa110 100644 --- a/lib/officer/connection.rb +++ b/lib/officer/connection.rb @@ -34,10 +34,10 @@ def unbind end module LockStoreCallbacks - def acquired name + def acquired name, lock_id @timers.delete(name).cancel if @timers[name] - send_result 'acquired', :name => name + send_result 'acquired', :name => name, :id => lock_id.to_s end def already_acquired name diff --git a/lib/officer/lock_store.rb b/lib/officer/lock_store.rb index 779da80..81a6e54 100644 --- a/lib/officer/lock_store.rb +++ b/lib/officer/lock_store.rb @@ -8,10 +8,14 @@ def to_host_a class Lock attr_reader :name + attr_reader :size attr_reader :queue + attr_reader :lock_ids - def initialize name + def initialize name, size = 1 @name = name + @size = size.to_i + @lock_ids = (0..@size-1).to_a @queue = LockQueue.new end end @@ -22,7 +26,9 @@ class LockStore def initialize @locks = {} # name => Lock @connections = {} # Connection => Set(name, ...) + @locked_connections = {} # {lock_id => Connection} @acquire_counter = 0 + @mutex = Mutex.new end def log_state @@ -52,6 +58,8 @@ def log_state end def acquire name, connection, options={} + name, size = split_name(name) + if options[:queue_max] lock = @locks[name] @@ -63,57 +71,68 @@ def acquire name, connection, options={} @acquire_counter += 1 - lock = @locks[name] ||= Lock.new(name) + lock = @locks[name] ||= Lock.new(name, size) - if lock.queue.include? connection - lock.queue.first == connection ? connection.already_acquired(name) : connection.queued(name, options) + if lock.queue[0..lock.size-1].include?(connection) + return connection.already_acquired(name) + end + if lock.queue.count < lock.size + lock.queue << connection + lock_id = lock_connection(connection, lock) + (@connections[connection] ||= Set.new) << name + + connection.acquired name, lock_id else lock.queue << connection (@connections[connection] ||= Set.new) << name - lock.queue.count == 1 ? connection.acquired(name) : connection.queued(name, options) + connection.queued(name, options) end end def release name, connection, options={} + name, size = split_name(name) + if options[:callback].nil? options[:callback] = true end - lock = @locks[name] names = @connections[connection] - # Client should only be able to release a lock that # exists and that it has previously queued. - if lock.nil? || !names.include?(name) + if lock.nil? || names.nil? || !names.include?(name) connection.release_failed(name) if options[:callback] return end # If connecton has the lock, release it and let the next # connection know that it has acquired the lock. - if lock.queue.first == connection - lock.queue.shift - connection.released name if options[:callback] + if index = lock.queue.index(connection) + if index < lock.size + lock.queue.delete_at(index) + release_connection(connection, lock) + + connection.released name if options[:callback] + + if next_connection = lock.queue[lock.size-1] + lock_id = lock_connection(next_connection, lock) + next_connection.acquired name, lock_id + end + @locks.delete name if lock.queue.count == 0 - if next_connection = lock.queue.first - next_connection.acquired name + # If the connection is queued and doesn't have the lock, + # dequeue it and leave the other connections alone. else - @locks.delete name + lock.queue.delete connection + connection.released name end - - # If the connection is queued and doesn't have the lock, - # dequeue it and leave the other connections alone. - else - lock.queue.delete connection - connection.released name + names.delete name end - - names.delete name end def reset connection + # names = @connections[connection] || [] names = @connections[connection] || Set.new names.each do |name| @@ -125,6 +144,8 @@ def reset connection end def timeout name, connection + name, size = split_name(name) + lock = @locks[name] names = @connections[connection] @@ -169,6 +190,36 @@ def close_idle_connections(max_idle) end end end + + protected + def split_name(name) + if name.include?("#") + name_array = name.split("#") + size = (name_array.last.to_i > 0 && name_array.size > 1) ? name_array.last.to_i : 1 + else + size = 1 + end + [name, size] + end + + def lock_connection(connection, lock) + @mutex.synchronize do + @locked_connections[lock.lock_ids.first] = connection + lock_id = lock.lock_ids.shift + lock.lock_ids.push(lock_id) + lock_id + end + end + + def release_connection(connection, lock) + @mutex.synchronize do + if lock_id = @locked_connections.key(connection) + lock.lock_ids.delete(lock_id) + lock.lock_ids.insert(0, lock_id) + lock_id + end + end + end end end diff --git a/lib/officer/semaphore.rb b/lib/officer/semaphore.rb new file mode 100644 index 0000000..8c4cf98 --- /dev/null +++ b/lib/officer/semaphore.rb @@ -0,0 +1,27 @@ +module Officer + class Semaphore + attr_accessor :size + + def initialize(host, port, name, size) + size ||= 1 + + @client = Officer::Client.new(:host => host, :port => port) + @name = "#{name}##{size}" + @size = size + end + + def lock + @client.lock(@name) + end + + def unlock + @client.unlock(@name) + end + + def synchronize + @client.with_lock @name do + yield + end + end + end +end \ No newline at end of file diff --git a/lib/officer/version.rb b/lib/officer/version.rb index c43f12b..dcaddce 100644 --- a/lib/officer/version.rb +++ b/lib/officer/version.rb @@ -1,3 +1,3 @@ module Officer - VERSION = "0.10.1" + VERSION = "0.10.2" end diff --git a/spec/integration/officer_spec.rb b/spec/integration/officer_spec.rb index 0878f4c..950155e 100644 --- a/spec/integration/officer_spec.rb +++ b/spec/integration/officer_spec.rb @@ -164,14 +164,14 @@ end it "should allow a client to request and release a lock" do - @client.lock("testlock").should eq({"result" => "acquired", "name" => "testlock"}) + @client.lock("testlock").should eq({"result" => "acquired", "name" => "testlock", "id" => "0"}) @client.my_locks.should eq({"value"=>["testlock"], "result"=>"my_locks"}) @client.unlock("testlock") @client.my_locks.should eq({"value"=>[], "result"=>"my_locks"}) end it "should inform the client they already have a lock if they previously locked it" do - @client.lock("testlock") + @client.lock("testlock").should eq({"result" => "acquired", "name" => "testlock", "id" => "0"}) @client.lock("testlock").should eq({"result" => "already_acquired", "name" => "testlock"}) end @@ -309,5 +309,66 @@ JSON.parse(@socket.gets("\n").chomp).should eq({"result" => "released", "name" => "testlock"}) end end + + describe "NEW: server support for multiple locks" do + before do + @client1 = Officer::Client.new + @client2 = Officer::Client.new + @client3 = Officer::Client.new + + @client1_src_port = @client1.instance_variable_get('@socket').addr[1] + @client2_src_port = @client2.instance_variable_get('@socket').addr[1] + @client3_src_port = @client2.instance_variable_get('@socket').addr[1] + end + + after do + @client1.send("disconnect") + @client1 = nil + @client2.send("disconnect") + @client2 = nil + @client3.send("disconnect") + @client3 = nil + end + + it "should allow a client to obtaining and releasing the number of allowed locks" do + @client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"}) + @client2.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"}) + @client1.locks.should eq({"value"=>{"testlock#2"=>["127.0.0.1:#{@client1_src_port}", "127.0.0.1:#{@client2_src_port}"]}, "result"=>"locks"}) + @client2.locks.should eq({"value"=>{"testlock#2"=>["127.0.0.1:#{@client1_src_port}", "127.0.0.1:#{@client2_src_port}"]}, "result"=>"locks"}) + @client1.my_locks.should eq({"value"=>["testlock#2"], "result"=>"my_locks"}) + @client2.my_locks.should eq({"value"=>["testlock#2"], "result"=>"my_locks"}) + @client1.unlock("testlock#2") + @client2.unlock("testlock#2") + @client1.locks.should eq({"value"=>{}, "result"=>"locks"}) + end + + it "should not allow a client to request a lock that is already acquired" do + @client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"}) + @client1.lock("testlock#2").should eq({"result" => "already_acquired", "name" => "testlock#2"}) + @client2.unlock("testlock#2") + end + + it "should allow timeout if number of allowed connections is reached" do + @client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"}) + @client2.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"}) + @client3.lock("testlock#2", :timeout => 0).should eq( + {"result"=>"timed_out", "name"=>"testlock#2", "queue"=>["127.0.0.1:#{@client1_src_port}", "127.0.0.1:#{@client2_src_port}"]} + ) + end + + it "should queue a connection if number of allowed connections is reached and allow connection if a lock is released" do + t = Thread.new do + @client1.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "0"}) + @client2.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"}) + @client3.lock("testlock#2").should eq({"result" => "acquired", "name" => "testlock#2", "id" => "1"}) + end + + t2 = Thread.new do + sleep 2 + @client2.unlock("testlock#2") + end + t.join + end + end end end