From 6ef39b3c2f8d2a27d65b4d26e55c667822c07ae4 Mon Sep 17 00:00:00 2001 From: George Brocklehurst & Sebastian Abondano Date: Sat, 20 May 2017 23:53:06 -0400 Subject: [PATCH] Support pipelines. Two commands combined with the pipe character (`|`) will be run in parallel with the standard output of the first command connected to the standard input of the second command via an `IO.pipe`. --- lib/gitsh/commands/pipeline.rb | 50 +++++++++++++++ lib/gitsh/lexer.rb | 1 + lib/gitsh/parser.rb | 2 + lib/gitsh/pipeline_environment.rb | 27 +++++++++ lib/gitsh/shell_command_runner.rb | 1 + man/man1/gitsh.1.in | 9 +++ spec/integration/pipeline_spec.rb | 48 +++++++++++++++ spec/support/gitsh_runner.rb | 29 +++++++-- spec/units/commands/pipeline_spec.rb | 61 +++++++++++++++++++ spec/units/lexer_spec.rb | 5 ++ spec/units/parser_spec.rb | 8 +++ spec/units/pipeline_environment_spec.rb | 81 +++++++++++++++++++++++++ spec/units/shell_command_runner_spec.rb | 2 + 13 files changed, 318 insertions(+), 6 deletions(-) create mode 100644 lib/gitsh/commands/pipeline.rb create mode 100644 lib/gitsh/pipeline_environment.rb create mode 100644 spec/integration/pipeline_spec.rb create mode 100644 spec/units/commands/pipeline_spec.rb create mode 100644 spec/units/pipeline_environment_spec.rb diff --git a/lib/gitsh/commands/pipeline.rb b/lib/gitsh/commands/pipeline.rb new file mode 100644 index 00000000..fece9dee --- /dev/null +++ b/lib/gitsh/commands/pipeline.rb @@ -0,0 +1,50 @@ +require 'gitsh/pipeline_environment' + +module Gitsh + module Commands + class Pipeline + def initialize(left, right) + @left = left + @right = right + end + + def execute(env) + left_env, right_env = PipelineEnvironment.build_pair(env) + threads = [start_left_thread(left_env), start_right_thread(right_env)] + wait_for_threads(threads) + threads.map(&:value).all? + end + + private + + attr_reader :left, :right, :threads + + def start_left_thread(env) + Thread.new { execute_left(env) } + end + + def start_right_thread(env) + Thread.new { execute_right(env) } + end + + def execute_left(left_env) + left.execute(left_env) + ensure + left_env.output_stream.close + end + + def execute_right(right_env) + right.execute(right_env) + ensure + right_env.input_stream.close + end + + def wait_for_threads(threads) + threads.map(&:join) + rescue Interrupt + threads.each { |thread| thread.raise(Interrupt) } + retry + end + end + end +end diff --git a/lib/gitsh/lexer.rb b/lib/gitsh/lexer.rb index c8bcea4d..eebc51e6 100644 --- a/lib/gitsh/lexer.rb +++ b/lib/gitsh/lexer.rb @@ -36,6 +36,7 @@ def initialize(*args) rule(/\s*;\s*/) { :SEMICOLON } rule(/\s*&&\s*/) { :AND } rule(/\s*\|\|\s*/) { :OR } + rule(/\s*\|\s*/) { :PIPE } [:default, :soft_string].each do |state| rule(/\$\(\s*/, state) do diff --git a/lib/gitsh/parser.rb b/lib/gitsh/parser.rb index a059c486..49950130 100644 --- a/lib/gitsh/parser.rb +++ b/lib/gitsh/parser.rb @@ -9,6 +9,7 @@ require 'gitsh/commands/shell_command' require 'gitsh/commands/noop' require 'gitsh/commands/tree' +require 'gitsh/commands/pipeline' module Gitsh class Parser < RLTK::Parser @@ -36,6 +37,7 @@ class Parser < RLTK::Parser clause('.commands SEMICOLON .commands') { |c1, c2| Commands::Tree::Multi.new(c1, c2) } clause('.commands OR .commands') { |c1, c2| Commands::Tree::Or.new(c1, c2) } clause('.commands AND .commands') { |c1, c2| Commands::Tree::And.new(c1, c2) } + clause('.commands PIPE .commands') { |c1, c2| Commands::Pipeline.new(c1, c2) } end production(:command, 'word argument_list?') do |word, args| diff --git a/lib/gitsh/pipeline_environment.rb b/lib/gitsh/pipeline_environment.rb new file mode 100644 index 00000000..b36bad95 --- /dev/null +++ b/lib/gitsh/pipeline_environment.rb @@ -0,0 +1,27 @@ +require 'delegate' + +module Gitsh + class PipelineEnvironment < SimpleDelegator + def self.build_pair(env) + pipe_reader, pipe_writer = IO.pipe + [ + new(env, output_stream: pipe_writer), + new(env, input_stream: pipe_reader), + ] + end + + def initialize(env, options) + super(env) + @input_stream = options[:input_stream] + @output_stream = options[:output_stream] + end + + def input_stream + @input_stream || super + end + + def output_stream + @output_stream || super + end + end +end diff --git a/lib/gitsh/shell_command_runner.rb b/lib/gitsh/shell_command_runner.rb index d8bd17ee..b8758c93 100644 --- a/lib/gitsh/shell_command_runner.rb +++ b/lib/gitsh/shell_command_runner.rb @@ -12,6 +12,7 @@ def initialize(command_with_arguments, env) def run pid = Process.spawn( *command_with_arguments, + in: env.input_stream.to_i, out: env.output_stream.to_i, err: env.error_stream.to_i ) diff --git a/man/man1/gitsh.1.in b/man/man1/gitsh.1.in index f16d49cc..d7941dbe 100644 --- a/man/man1/gitsh.1.in +++ b/man/man1/gitsh.1.in @@ -100,6 +100,15 @@ execute .Ic left , and then .Ic right . +.It Ic left | right +execute +.Ic left +and +.Ic right +simultaneously, redirecting the standard output from +.Ic left +to the standard input of +.Ic right . .El .Pp As in diff --git a/spec/integration/pipeline_spec.rb b/spec/integration/pipeline_spec.rb new file mode 100644 index 00000000..78d1fda8 --- /dev/null +++ b/spec/integration/pipeline_spec.rb @@ -0,0 +1,48 @@ +require 'spec_helper' + +describe 'Pipeline' do + it 'passes output of first command to second command' do + GitshRunner.interactive do |gitsh| + gitsh.type('init') + gitsh.type('commit --allow-empty --message "Empty commit"') + gitsh.type('log --oneline | !wc -l') + + expect(gitsh).to output_no_errors + expect(gitsh).to output /\b1\b/ + end + end + + it 'runs processes in parallel' do + GitshRunner.interactive do |gitsh| + gitsh.type_without_waiting('!yes hello | !sed -e "s/ello/i/"') + gitsh.wait_for_output + gitsh.send_sigint + + expect(gitsh).to output_no_errors + expect(gitsh).to output /hi\nhi\n/ + end + end + + it 'considers the pipeline to have failed if either command fails' do + GitshRunner.interactive do |gitsh| + gitsh.type(':echo $unset | !wc && :echo Success') + + expect(gitsh).to output_error /unset/ + expect(gitsh).not_to output /Success/ + end + end + + it 'supports multi-stage pipelines' do + GitshRunner.interactive do |gitsh| + gitsh.type('init') + gitsh.type('commit --allow-empty -m First --author "A "') + gitsh.type('commit --allow-empty -m Second --author "B "') + gitsh.type('commit --allow-empty -m Third --author "A "') + gitsh.type('commit --allow-empty -m Fourth --author "C "') + gitsh.type('log --format="%aN" | !sort -u | !wc -l') + + expect(gitsh).to output_no_errors + expect(gitsh).to output /\b3\b/ + end + end +end diff --git a/spec/support/gitsh_runner.rb b/spec/support/gitsh_runner.rb index 4ba0f525..896b346d 100644 --- a/spec/support/gitsh_runner.rb +++ b/spec/support/gitsh_runner.rb @@ -17,7 +17,7 @@ def self.interactive(options={}, &block) end def initialize(options) - @input_stream = RSpec::Mocks::Double.new('STDIN', tty?: true) + @input_stream = RSpec::Mocks::Double.new('STDIN', tty?: true, to_i: 0) @output_stream = Tempfile.new('stdout') @error_stream = Tempfile.new('stderr') @line_editor = Gitsh::LineEditorHistoryFilter.new(FakeLineEditor.new) @@ -27,29 +27,46 @@ def initialize(options) end def run_interactive - runner = nil + @runner = nil with_a_temporary_home_directory do in_a_temporary_directory do setup_unix_env - runner = start_runner_thread + @runner = start_runner_thread wait_for_prompt yield(self) line_editor.type(':exit') - runner.join + @runner.join + @runner = nil end end rescue RSpec::Expectations::ExpectationNotMetError - runner.kill - runner.join + @runner.kill + @runner.join + @runner = nil raise end def type(string) + type_without_waiting(string) + wait_for_prompt + end + + def type_without_waiting(string) @error_position_before_command = error_stream.pos @position_before_command = output_stream.pos line_editor.type(string) + end + + def wait_for_output + while output_stream.pos == @position_before_command + sleep 0.001 + end + end + + def send_sigint + @runner.raise(Interrupt) wait_for_prompt end diff --git a/spec/units/commands/pipeline_spec.rb b/spec/units/commands/pipeline_spec.rb new file mode 100644 index 00000000..6f8f8336 --- /dev/null +++ b/spec/units/commands/pipeline_spec.rb @@ -0,0 +1,61 @@ +require 'spec_helper' +require 'gitsh/commands/pipeline' +require 'gitsh/commands/git_command' +require 'gitsh/commands/shell_command' + +describe Gitsh::Commands::Pipeline do + describe '#execute' do + it 'pipes output of left command to right command' do + left_command = create_command_double { 'string' } + right_command = create_command_double { |input| input.upcase } + env = build_env + pipeline = described_class.new(left_command, right_command) + + result = pipeline.execute(env) + + expect(result).to be true + expect(env.output_stream.string).to eq "STRING\n" + end + + context 'when the left command fails' do + it 'returns false' do + left_command = create_command_double(false) { '' } + right_command = create_command_double { |input| input.upcase } + pipeline = described_class.new(left_command, right_command) + + result = pipeline.execute(build_env) + + expect(result).to be false + end + end + + context 'when the right command fails' do + it 'returns false' do + left_command = create_command_double { 'string' } + right_command = create_command_double(false) { '' } + pipeline = described_class.new(left_command, right_command) + + result = pipeline.execute(build_env) + + expect(result).to be false + end + end + end + + def create_command_double(value=true) + command = instance_double(Gitsh::Commands::GitCommand) + allow(command).to receive(:execute) do |env| + input = env.input_stream.read + env.output_stream.puts yield(input) + value + end + command + end + + def build_env + Gitsh::Environment.new( + input_stream: instance_double(IO, read: ""), + output_stream: StringIO.new + ) + end +end diff --git a/spec/units/lexer_spec.rb b/spec/units/lexer_spec.rb index d6b508e9..d2629b73 100644 --- a/spec/units/lexer_spec.rb +++ b/spec/units/lexer_spec.rb @@ -29,6 +29,11 @@ to produce_tokens ['WORD(foo)', 'OR', 'WORD(bar)', 'EOS'] end + it 'recognises the | operator' do + expect('foo | bar'). + to produce_tokens ['WORD(foo)', 'PIPE', 'WORD(bar)', 'EOS'] + end + it 'recognises newlines' do expect("foo\nbar"). to produce_tokens ['WORD(foo)', 'EOL', 'WORD(bar)', 'EOS'] diff --git a/spec/units/parser_spec.rb b/spec/units/parser_spec.rb index abade65c..8077b887 100644 --- a/spec/units/parser_spec.rb +++ b/spec/units/parser_spec.rb @@ -169,6 +169,14 @@ expect(result).to be_a(Gitsh::Commands::Tree::Multi) end + it 'parses two commands combined with |' do + result = parse(tokens( + [:WORD, 'log'], [:PIPE], [:WORD, '!wc'], [:EOS], + )) + + expect(result).to be_a(Gitsh::Commands::Pipeline) + end + it 'parses two commands combined with newlines' do result = parse(tokens( [:WORD, 'add'], [:SPACE], [:WORD, '.'], diff --git a/spec/units/pipeline_environment_spec.rb b/spec/units/pipeline_environment_spec.rb new file mode 100644 index 00000000..0edbf849 --- /dev/null +++ b/spec/units/pipeline_environment_spec.rb @@ -0,0 +1,81 @@ +require 'spec_helper' +require 'gitsh/pipeline_environment' + +describe Gitsh::PipelineEnvironment do + describe '.build_pair' do + it 'returns environments for both ends of a pipeline' do + default_input_stream = double(:default_input_stream) + default_output_stream = double(:default_output_stream) + env = double( + :env, + input_stream: default_input_stream, + output_stream: default_output_stream, + ) + pipe_writer = double(:pipe_writer) + pipe_reader = double(:pipe_reader) + allow(IO).to receive(:pipe).and_return([pipe_reader, pipe_writer]) + + left, right = described_class.build_pair(env) + + expect(left.input_stream).to eq default_input_stream + expect(left.output_stream).to eq pipe_writer + expect(right.input_stream).to eq pipe_reader + expect(right.output_stream).to eq default_output_stream + end + end + + describe '#input_stream' do + context 'when constructed with a custom input stream' do + it 'returns the custom input stream' do + env = double(:env) + input_stream = double(:input_stream) + pipeline_env = described_class.new(env, input_stream: input_stream) + + expect(pipeline_env.input_stream).to eq input_stream + end + end + + context 'when constructed with no custom input stream' do + it 'returns the default environment\'s input stream' do + input_stream = double(:input_stream) + env = double(:env, input_stream: input_stream) + pipeline_env = described_class.new(env, output_stream: double) + + expect(pipeline_env.input_stream).to eq input_stream + end + end + end + + describe '#output_stream' do + context 'when constructed with a custom output stream' do + it 'returns the custom output stream' do + env = double(:env) + output_stream = double(:output_stream) + pipeline_env = described_class.new(env, output_stream: output_stream) + + expect(pipeline_env.output_stream).to eq output_stream + end + end + + context 'when constructed with no custom output stream' do + it 'returns the default environment\'s output stream' do + output_stream = double(:output_stream) + env = double(:env, output_stream: output_stream) + pipeline_env = described_class.new(env, input_stream: double) + + expect(pipeline_env.output_stream).to eq output_stream + end + end + end + + describe 'delegations' do + it 'delegates unknown methods to the wrapped environment' do + return_value = double(:return_value) + env = double(:env, foo: return_value) + pipeline_env = described_class.new(env, input_stream: double) + + expect(pipeline_env).to respond_to(:foo) + expect(pipeline_env.foo).to eq return_value + end + end +end diff --git a/spec/units/shell_command_runner_spec.rb b/spec/units/shell_command_runner_spec.rb index 48ebfcc3..dacf1ce0 100644 --- a/spec/units/shell_command_runner_spec.rb +++ b/spec/units/shell_command_runner_spec.rb @@ -17,6 +17,7 @@ expect(Process).to have_received(:spawn).with( 'echo', 'Hello world', + in: env.input_stream.to_i, out: env.output_stream.to_i, err: env.error_stream.to_i ) @@ -66,6 +67,7 @@ def ensure_exit_status_exists let(:env) do double('Environment', + input_stream: double('InputStream', to_i: 0), output_stream: double('OutputStream', to_i: 1), error_stream: double('ErrorStream', to_i: 2), puts_error: nil