diff --git a/README.md b/README.md index f3b386c..959bf82 100644 --- a/README.md +++ b/README.md @@ -198,3 +198,42 @@ git push --follow-tags ## Help If you have any questions or run into any problems, please create a Github issue and we'll try our best to help. + +## Configurable Batching + +The scraper now supports configurable batching to control the rate and size of data sent to Typesense. This helps prevent overwhelming the Typesense server with too many small requests. + +### Configuration Options + +You can configure batching using either **JSON config files** or **environment variables**. + +#### JSON Config File + +Add these parameters to your JSON configuration file: + +```json +{ + "index_name": "my_docs", + "batch_size": 400, + "buffer_size_limit": 100, + "flush_interval_seconds": 60, + "start_urls": [...], + "selectors": {...} +} +``` + +#### Environment Variables + +Set these environment variables to configure batching behavior: + +- `TYPESENSE_BUFFER_SIZE_LIMIT`: Maximum number of records to buffer before flushing (default: batch_size \* 2) +- `TYPESENSE_FLUSH_INTERVAL_SECONDS`: Time interval in seconds to flush buffered records (default: 60) + +### How It Works + +The scraper will flush records to Typesense when either: + +1. The buffer reaches the size limit (`buffer_size_limit`) +2. The time interval has elapsed (`flush_interval_seconds`) + +This ensures a controlled, predictable rate of data ingestion that won't overwhelm your Typesense server. diff --git a/configs/public/typesense_docs.json b/configs/public/typesense_docs.json index 9cd7479..b089783 100644 --- a/configs/public/typesense_docs.json +++ b/configs/public/typesense_docs.json @@ -20,6 +20,8 @@ "text": ".content__default p, .content__default ul li, .content__default table tbody tr" } }, + "buffer_size_limit": 5000, + "flush_interval_seconds": 1, "scrape_start_urls": false, "strip_chars": " .,;:#", "custom_settings": { diff --git a/scraper/src/config/config_loader.py b/scraper/src/config/config_loader.py index 6b154a2..71a607c 100644 --- a/scraper/src/config/config_loader.py +++ b/scraper/src/config/config_loader.py @@ -27,6 +27,8 @@ class ConfigLoader: allowed_domains = None api_key = None app_id = None + buffer_size_limit = 1000 + flush_interval_seconds = 60 custom_settings = None dns_resolver = None extra_records = [] @@ -119,6 +121,11 @@ def _parse(self): if self.index_name_tmp is None: self.index_name_tmp = os.environ.get('INDEX_NAME_TMP', f'{self.index_name}_{int(datetime.now().timestamp())}') + if self.buffer_size_limit is None: + self.buffer_size_limit = os.environ.get('TYPESENSE_BUFFER_SIZE_LIMIT', 1000) + + if self.flush_interval_seconds is None: + self.flush_interval_seconds = os.environ.get('TYPESENSE_FLUSH_INTERVAL_SECONDS', 60) # Parse config self.selectors = SelectorsParser().parse(self.selectors) diff --git a/scraper/src/config/config_validator.py b/scraper/src/config/config_validator.py index af41379..efeab8d 100644 --- a/scraper/src/config/config_validator.py +++ b/scraper/src/config/config_validator.py @@ -62,3 +62,15 @@ def validate(self): if self.config.nb_hits_max and not isinstance(self.config.nb_hits_max, int): raise Exception('nb_hits_max should be integer') + + if self.config.buffer_size_limit is not None and not isinstance(self.config.buffer_size_limit, int): + raise Exception('buffer_size_limit should be an integer') + + if self.config.buffer_size_limit is not None and not self.config.buffer_size_limit > 0: + raise Exception('buffer_size_limit should be a positive integer') + + if self.config.flush_interval_seconds is not None and not isinstance(self.config.flush_interval_seconds, int): + raise Exception('flush_interval_seconds should be an integer') + + if self.config.flush_interval_seconds is not None and not self.config.flush_interval_seconds > 0: + raise Exception('flush_interval_seconds should be a positive integer') diff --git a/scraper/src/index.py b/scraper/src/index.py index 7a48c9b..d597cb3 100644 --- a/scraper/src/index.py +++ b/scraper/src/index.py @@ -40,7 +40,9 @@ def run_config(config): typesense_helper = TypesenseHelper( config.index_name, config.index_name_tmp, - config.custom_settings + config.custom_settings, + config.buffer_size_limit, + config.flush_interval_seconds, ) typesense_helper.create_tmp_collection() diff --git a/scraper/src/tests/config_loader/buffer_size_limit_test.py b/scraper/src/tests/config_loader/buffer_size_limit_test.py new file mode 100644 index 0000000..44ec5e6 --- /dev/null +++ b/scraper/src/tests/config_loader/buffer_size_limit_test.py @@ -0,0 +1,69 @@ +# coding: utf-8 +from ...config.config_loader import ConfigLoader +from .abstract import config +import pytest + + +class TestBufferSizeLimit: + @staticmethod + def test_default_buffer_size_limit(): + """Should use default buffer_size_limit of 1000 when not specified""" + c = config({}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.buffer_size_limit == 1000 + + def test_custom_buffer_size_limit(self): + """Should use custom buffer_size_limit when specified in config""" + c = config({"buffer_size_limit": 500}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.buffer_size_limit == 500 + + def test_buffer_size_limit_zero(self): + """Should raise exception when buffer_size_limit is 0""" + c = config({"buffer_size_limit": 0}) + + with pytest.raises(Exception, match="buffer_size_limit should be a positive integer"): + ConfigLoader(c) + + def test_buffer_size_limit_negative(self): + """Should raise exception when buffer_size_limit is negative""" + c = config({"buffer_size_limit": -100}) + + with pytest.raises(Exception, match="buffer_size_limit should be a positive integer"): + ConfigLoader(c) + + def test_buffer_size_limit_string(self): + """Should raise exception when buffer_size_limit is a string""" + c = config({"buffer_size_limit": "500"}) + + with pytest.raises(Exception, match="buffer_size_limit should be an integer"): + ConfigLoader(c) + + def test_buffer_size_limit_float(self): + """Should raise exception when buffer_size_limit is a float""" + c = config({"buffer_size_limit": 500.5}) + + with pytest.raises(Exception, match="buffer_size_limit should be an integer"): + ConfigLoader(c) + + def test_buffer_size_limit_large_value(self): + """Should accept large buffer_size_limit values""" + + c = config({"buffer_size_limit": 10000}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.buffer_size_limit == 10000 + + def test_buffer_size_limit_one(self): + """Should accept buffer_size_limit of 1""" + + c = config({"buffer_size_limit": 1}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.buffer_size_limit == 1 diff --git a/scraper/src/tests/config_loader/flush_interval_seconds_test.py b/scraper/src/tests/config_loader/flush_interval_seconds_test.py new file mode 100644 index 0000000..095e627 --- /dev/null +++ b/scraper/src/tests/config_loader/flush_interval_seconds_test.py @@ -0,0 +1,69 @@ +# coding: utf-8 +from ...config.config_loader import ConfigLoader +from .abstract import config +import pytest + + +class TestFlushIntervalSeconds: + @staticmethod + def test_default_flush_interval_seconds(): + """Should use default flush_interval_seconds of 60 when not specified""" + c = config({}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.flush_interval_seconds == 60 + + def test_custom_flush_interval_seconds(self): + """Should use custom flush_interval_seconds when specified in config""" + c = config({"flush_interval_seconds": 30}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.flush_interval_seconds == 30 + + def test_flush_interval_seconds_zero(self): + """Should raise exception when flush_interval_seconds is 0""" + c = config({"flush_interval_seconds": 0}) + + with pytest.raises(Exception, match="flush_interval_seconds should be a positive integer"): + ConfigLoader(c) + + def test_flush_interval_seconds_negative(self): + """Should raise exception when flush_interval_seconds is negative""" + c = config({"flush_interval_seconds": -30}) + + with pytest.raises(Exception, match="flush_interval_seconds should be a positive integer"): + ConfigLoader(c) + + def test_flush_interval_seconds_string(self): + """Should raise exception when flush_interval_seconds is a string""" + c = config({"flush_interval_seconds": "30"}) + + with pytest.raises(Exception, match="flush_interval_seconds should be an integer"): + ConfigLoader(c) + + def test_flush_interval_seconds_float(self): + """Should raise exception when flush_interval_seconds is a float""" + c = config({"flush_interval_seconds": 30.5}) + + with pytest.raises(Exception, match="flush_interval_seconds should be an integer"): + ConfigLoader(c) + + def test_flush_interval_seconds_large_value(self): + """Should accept large flush_interval_seconds values""" + + c = config({"flush_interval_seconds": 3600}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.flush_interval_seconds == 3600 + + def test_flush_interval_seconds_one(self): + """Should accept flush_interval_seconds of 1""" + + c = config({"flush_interval_seconds": 1}) + + config_loaded = ConfigLoader(c) + + assert config_loaded.flush_interval_seconds == 1 diff --git a/scraper/src/tests/typesense_helper/commit_tmp_test.py b/scraper/src/tests/typesense_helper/commit_tmp_test.py index f0cde86..4896283 100644 --- a/scraper/src/tests/typesense_helper/commit_tmp_test.py +++ b/scraper/src/tests/typesense_helper/commit_tmp_test.py @@ -261,7 +261,7 @@ def typesense_client(): def test_create_tmp_collection(typesense_client): """Test that a temporary collection is created with the expected schema""" # Arrange - original_helper = TypesenseHelper('test_alias', 'collection', {}) + original_helper = TypesenseHelper('test_alias', 'collection', {}, 1000, 60) original_helper.create_tmp_collection() # Act @@ -305,7 +305,7 @@ def test_create_tmp_collection_already_exists(typesense_client): 'token_separators': ['_', '-'], } ) - original_helper = TypesenseHelper('test_alias', 'collection', {}) + original_helper = TypesenseHelper('test_alias', 'collection', {}, 1000, 60) original_helper.create_tmp_collection() # Act @@ -458,7 +458,7 @@ def test_add_records(typesense_client): url = "http://example.com" from_sitemap = True - helper = TypesenseHelper('test_alias', 'collection', {}) + helper = TypesenseHelper('test_alias', 'collection', {}, 1, 60) helper.create_tmp_collection() # Call the method under test @@ -615,7 +615,7 @@ def test_commit_tmp_collection(typesense_client): url = "http://example.com" from_sitemap = True - helper = TypesenseHelper('test_alias', 'collection', {}) + helper = TypesenseHelper('test_alias', 'collection', {}, 1000, 60) helper.create_tmp_collection() original_synonyms = typesense_client.collections['collection'].synonyms.retrieve()[ @@ -630,7 +630,7 @@ def test_commit_tmp_collection(typesense_client): helper.commit_tmp_collection() # Act - tmp_collection_helper = TypesenseHelper('test_alias', 'collection_tmp', {}) + tmp_collection_helper = TypesenseHelper('test_alias', 'collection_tmp', {}, 1000, 60) tmp_collection_helper.create_tmp_collection() tmp_collection_helper.add_records(records, url, from_sitemap) tmp_collection_helper.commit_tmp_collection() @@ -728,7 +728,7 @@ def test_commit_tmp_collection_with_curation_rules(typesense_client): url = "http://example.com" from_sitemap = True - helper = TypesenseHelper('test_alias_curation', 'collection', {}) + helper = TypesenseHelper('test_alias_curation', 'collection', {}, 1000, 60) helper.create_tmp_collection() override = { @@ -757,7 +757,7 @@ def test_commit_tmp_collection_with_curation_rules(typesense_client): # Act tmp_collection_helper = TypesenseHelper( - 'test_alias_curation', 'collection_tmp_curation', {} + 'test_alias_curation', 'collection_tmp_curation', {}, 1000, 60 ) tmp_collection_helper.create_tmp_collection() tmp_collection_helper.add_records(records, url, from_sitemap) diff --git a/scraper/src/typesense_helper.py b/scraper/src/typesense_helper.py index bb35ec5..acd9530 100644 --- a/scraper/src/typesense_helper.py +++ b/scraper/src/typesense_helper.py @@ -4,6 +4,7 @@ import json import os from builtins import range +import time import typesense from typesense import exceptions @@ -12,7 +13,7 @@ class TypesenseHelper: """TypesenseHelper""" - def __init__(self, alias_name, collection_name_tmp, custom_settings): + def __init__(self, alias_name, collection_name_tmp, custom_settings, buffer_size_limit, flush_interval_seconds): self.typesense_client = typesense.Client( { 'api_key': os.environ.get('TYPESENSE_API_KEY', None), @@ -35,6 +36,15 @@ def __init__(self, alias_name, collection_name_tmp, custom_settings): self.collection_name_tmp = collection_name_tmp self.collection_locale = os.environ.get('TYPESENSE_COLLECTION_LOCALE', 'en') self.custom_settings = custom_settings + self.buffer_size_limit = buffer_size_limit + self.flush_interval_seconds = flush_interval_seconds + + print(f'\033[93m> DocSearch: Batching config - buffer_size_limit: {self.buffer_size_limit}, flush_interval_seconds: {self.flush_interval_seconds}\033[0m') + + # buffer for batching records + self.records_buffer = [] + self.buffer_stats = {"total_records": 0, "urls_processed": 0} + self.last_flush_time = time.time() def create_tmp_collection(self): """Create a temporary index to add records to""" @@ -163,33 +173,73 @@ def create_tmp_collection(self): self.typesense_client.collections.create(schema) def add_records(self, records, url, from_sitemap): - """Add new records to the temporary index""" + """Add new records to the buffer and flush based on size/time thresholds""" transformed_records = list(map(TypesenseHelper.transform_record, records)) record_count = len(transformed_records) - for i in range(0, record_count, 50): - result = self.typesense_client.collections[ - self.collection_name_tmp - ].documents.import_(transformed_records[i : i + 50]) + # Add to buffer + self.records_buffer.extend(transformed_records) + self.buffer_stats["total_records"] += record_count + self.buffer_stats["urls_processed"] += 1 - # Check for failed items directly without double-decoding - failed_items = [ - r for r in result if r.get('success') is False - ] - if len(failed_items) > 0: - print(failed_items) - raise Exception + current_time = time.time() - color = "96" if from_sitemap else "94" + flush = { + 'should_flush': False, + 'reason': '' + } + + # Check size-based flush + if len(self.records_buffer) >= self.buffer_size_limit: + flush['should_flush'] = True + flush['reason'] = f"buffer size ({len(self.records_buffer)} >= {self.buffer_size_limit})" + + # Check time-based flush + elif current_time - self.last_flush_time >= self.flush_interval_seconds and self.records_buffer: + flush['should_flush'] = True + flush['reason'] = f"time interval ({self.flush_interval_seconds}s)" + + if flush['should_flush']: + self._flush_buffer(flush.get('reason', "unknown reason")) + + color = "96" if from_sitemap else "94" print( - '\033[{}m> DocSearch: \033[0m{}\033[93m {} records\033[0m)'.format( - color, url, record_count + "\033[{}m> DocSearch: \033[0m{}\033[93m {} records (buffered: {})\033[0m".format( + color, url, record_count, len(self.records_buffer) ) ) + def _flush_buffer(self, reason): + """Send buffered records to Typesense""" + if not self.records_buffer: + return + + print( + f'\033[94m> DocSearch: Flushing {len(self.records_buffer)} records from {self.buffer_stats["urls_processed"]} URLs (Reason: {reason})\033[0m' + ) + + result = self.typesense_client.collections[ + self.collection_name_tmp + ].documents.import_(self.records_buffer) + + failed_items = [r for r in result if r.get("success") is False] + if len(failed_items) > 0: + print(failed_items) + raise Exception + + print( + f"\033[94m> DocSearch: Successfully imported {len(self.records_buffer)} records\033[0m" + ) + + self.records_buffer = [] + self.last_flush_time = time.time() + def commit_tmp_collection(self): """Update alias to point to new collection""" + if self.records_buffer: + self._flush_buffer("commit_tmp_collection") + old_collection_name = self._get_old_collection_name() if old_collection_name: