Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions lib/sumologic/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,21 @@ def list_collectors
Commands::ListCollectorsCommand.new(options, create_client).execute
end

desc 'list-sources', 'List sources from collectors'
desc 'list-sources', 'List sources from collectors with optional filters'
long_desc <<~DESC
List all sources from all collectors, or sources from a specific collector.
List sources from all collectors, or from a specific collector.
Supports filtering by collector name, source name, and category.

Examples:
# List all sources
sumo-query list-sources

# List sources for specific collector
sumo-query list-sources --collector "my-service" --name "nginx" -l 20
sumo-query list-sources --category "production"
sumo-query list-sources --collector-id 12345
DESC
option :collector_id, type: :string, desc: 'Collector ID to list sources for'
option :collector, type: :string, desc: 'Filter by collector name (case-insensitive)'
option :name, type: :string, aliases: '-n', desc: 'Filter by source name (case-insensitive)'
option :category, type: :string, desc: 'Filter by source category (case-insensitive)'
option :limit, type: :numeric, aliases: '-l', desc: 'Maximum total sources to return'
def list_sources
Commands::ListSourcesCommand.new(options, create_client).execute
end
Expand Down
10 changes: 7 additions & 3 deletions lib/sumologic/cli/commands/list_sources_command.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@ def list_sources_for_collector
end

def list_all_sources
warn 'Fetching all sources from all collectors...'
warn 'This may take a minute...'
warn 'Fetching sources from collectors...'

all_sources = client.list_all_sources
all_sources = client.list_all_sources(
collector: options[:collector],
name: options[:name],
category: options[:category],
limit: options[:limit]
)

output_json(
total_collectors: all_sources.size,
Expand Down
10 changes: 7 additions & 3 deletions lib/sumologic/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,15 @@ def list_sources(collector_id:)
@source.list(collector_id: collector_id)
end

# List all sources from all collectors
# List all sources from all collectors with optional filtering
#
# @param collector [String, nil] Filter collectors by name
# @param name [String, nil] Filter sources by name
# @param category [String, nil] Filter sources by category
# @param limit [Integer, nil] Maximum total sources to return
# @return [Array<Hash>] Array of { 'collector' => Hash, 'sources' => Array<Hash> }
def list_all_sources
@source.list_all
def list_all_sources(collector: nil, name: nil, category: nil, limit: nil)
@source.list_all(collector: collector, name: name, category: category, limit: limit)
end

# Discover source metadata from actual log data
Expand Down
48 changes: 44 additions & 4 deletions lib/sumologic/metadata/source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,28 @@ def list(collector_id:)
raise Error, "Failed to list sources for collector #{collector_id}: #{e.message}"
end

# List all sources from all collectors
# List all sources from all collectors with optional filtering
# Returns array of hashes with collector info and their sources
# Uses parallel fetching with thread pool for better performance
def list_all
#
# @param collector [String, nil] Filter collectors by name (case-insensitive substring)
# @param name [String, nil] Filter sources by name (case-insensitive substring)
# @param category [String, nil] Filter sources by category (case-insensitive substring)
# @param limit [Integer, nil] Maximum total sources to return
def list_all(collector: nil, name: nil, category: nil, limit: nil)
collectors = @collector_client.list
active_collectors = collectors.select { |c| c['alive'] }
active_collectors = filter_collectors(active_collectors, collector) if collector

log_info "Fetching sources for #{active_collectors.size} active collectors in parallel..."

result = @fetcher.fetch_all(active_collectors) do |collector|
fetch_collector_sources(collector)
result = @fetcher.fetch_all(active_collectors) do |c|
fetch_collector_sources(c)
end

result = filter_sources(result, name: name, category: category)
result = apply_source_limit(result, limit) if limit

log_info "Total: #{result.size} collectors with sources"
result
rescue StandardError => e
Expand All @@ -52,6 +61,37 @@ def list_all

private

def filter_collectors(collectors, pattern)
pattern = pattern.downcase
collectors.select { |c| (c['name'] || '').downcase.include?(pattern) }
end

def filter_sources(result, name:, category:)
matcher = source_matcher(name&.downcase, category&.downcase)
result.filter_map do |entry|
filtered = entry['sources'].select(&matcher)
{ 'collector' => entry['collector'], 'sources' => filtered } unless filtered.empty?
end
end

def source_matcher(name_pattern, cat_pattern)
lambda do |s|
(!name_pattern || (s['name'] || '').downcase.include?(name_pattern)) &&
(!cat_pattern || (s['category'] || '').downcase.include?(cat_pattern))
end
end

def apply_source_limit(result, limit)
remaining = limit
result.each_with_object([]) do |entry, acc|
break acc if remaining <= 0

sources = entry['sources'].take(remaining)
acc << { 'collector' => entry['collector'], 'sources' => sources }
remaining -= sources.size
end
end

# Fetch sources for a single collector
# @return [Hash] collector and sources data
def fetch_collector_sources(collector)
Expand Down
19 changes: 16 additions & 3 deletions spec/sumologic/cli/commands/commands_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,26 @@ def capture_stdout_stderr(command)
end

it 'lists all sources when no collector_id given' do
allow(client).to receive(:list_all_sources).and_return(
[{ 'collector' => { 'id' => '1' }, 'sources' => [{ 'id' => 's1' }] }]
)
allow(client).to receive(:list_all_sources)
.with(collector: nil, name: nil, category: nil, limit: nil)
.and_return(
[{ 'collector' => { 'id' => '1' }, 'sources' => [{ 'id' => 's1' }] }]
)

command = described_class.new(options, client)
expect { command.execute }.to output(/"total_collectors": 1/).to_stdout
end

it 'passes filter options when listing all sources' do
allow(client).to receive(:list_all_sources)
.with(collector: 'web', name: 'nginx', category: 'prod', limit: 10)
.and_return([])

command = described_class.new(
options.merge(collector: 'web', name: 'nginx', category: 'prod', limit: 10), client
)
expect { command.execute }.to output(/"total_collectors": 0/).to_stdout
end
end

describe Sumologic::CLI::Commands::ListFoldersCommand do
Expand Down
129 changes: 129 additions & 0 deletions spec/sumologic/metadata/source_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# frozen_string_literal: true

RSpec.describe Sumologic::Metadata::Source do
let(:http_client) { instance_double('Sumologic::Http::Client') }
let(:collector_client) { instance_double(Sumologic::Metadata::Collector) }
let(:config) { instance_double(Sumologic::Configuration, max_workers: 1, request_delay: 0) }
let(:source) { described_class.new(http_client: http_client, collector_client: collector_client, config: config) }

let(:collectors) do
[
{ 'id' => '1', 'name' => 'prod-web-01', 'alive' => true },
{ 'id' => '2', 'name' => 'prod-api-01', 'alive' => true },
{ 'id' => '3', 'name' => 'staging-web-01', 'alive' => true },
{ 'id' => '4', 'name' => 'dead-collector', 'alive' => false }
]
end

let(:web_sources) do
{
'sources' => [
{ 'id' => 's1', 'name' => 'nginx_access', 'category' => 'production/web' },
{ 'id' => 's2', 'name' => 'nginx_error', 'category' => 'production/web' }
]
}
end

let(:api_sources) do
{
'sources' => [
{ 'id' => 's3', 'name' => 'app_logs', 'category' => 'production/api' }
]
}
end

let(:staging_sources) do
{
'sources' => [
{ 'id' => 's4', 'name' => 'nginx_access', 'category' => 'staging/web' }
]
}
end

before do
allow(collector_client).to receive(:list).and_return(collectors)
end

describe '#list' do
it 'returns sources for a specific collector' do
allow(http_client).to receive(:request)
.with(method: :get, path: '/collectors/1/sources')
.and_return(web_sources)

result = source.list(collector_id: '1')
expect(result.size).to eq(2)
end

it 'raises Error on failure' do
allow(http_client).to receive(:request).and_raise(StandardError, 'timeout')
expect { source.list(collector_id: '1') }.to raise_error(Sumologic::Error, /Failed to list sources/)
end
end

describe '#list_all' do
before do
allow(http_client).to receive(:request)
.with(method: :get, path: '/collectors/1/sources').and_return(web_sources)
allow(http_client).to receive(:request)
.with(method: :get, path: '/collectors/2/sources').and_return(api_sources)
allow(http_client).to receive(:request)
.with(method: :get, path: '/collectors/3/sources').and_return(staging_sources)
end

it 'returns sources from all active collectors' do
result = source.list_all
expect(result.size).to eq(3)
total_sources = result.sum { |r| r['sources'].size }
expect(total_sources).to eq(4)
end

it 'skips dead collectors' do
result = source.list_all
collector_ids = result.map { |r| r['collector']['id'] }
expect(collector_ids).not_to include('4')
end

it 'filters collectors by name' do
result = source.list_all(collector: 'web')
collector_names = result.map { |r| r['collector']['name'] }
expect(collector_names).to eq(%w[prod-web-01 staging-web-01])
end

it 'filters sources by name' do
result = source.list_all(name: 'nginx')
sources = result.flat_map { |r| r['sources'] }
expect(sources.map { |s| s['id'] }).to eq(%w[s1 s2 s4])
end

it 'filters sources by category' do
result = source.list_all(category: 'staging')
expect(result.size).to eq(1)
expect(result.first['collector']['name']).to eq('staging-web-01')
end

it 'combines collector and source filters' do
result = source.list_all(collector: 'prod', name: 'nginx')
sources = result.flat_map { |r| r['sources'] }
expect(sources.map { |s| s['id'] }).to eq(%w[s1 s2])
end

it 'limits total sources across collectors' do
result = source.list_all(limit: 2)
total_sources = result.sum { |r| r['sources'].size }
expect(total_sources).to eq(2)
end

it 'applies filters before limit' do
result = source.list_all(name: 'nginx', limit: 1)
total_sources = result.sum { |r| r['sources'].size }
expect(total_sources).to eq(1)
expect(result.first['sources'].first['id']).to eq('s1')
end

it 'excludes collectors with no matching sources after filtering' do
result = source.list_all(name: 'app_logs')
expect(result.size).to eq(1)
expect(result.first['collector']['name']).to eq('prod-api-01')
end
end
end