From 8a083b4855d7bece68b60a501e4eeb38886d0fa6 Mon Sep 17 00:00:00 2001 From: Jose Luis Franco Arza Date: Thu, 26 Mar 2026 14:43:10 +0100 Subject: [PATCH 1/5] feat: parse asyncConfig from replication config response The server returns asyncConfig inside replicationConfig on collection GET, but the Python client was not parsing it. Add _AsyncReplicationConfig dataclass, wire it into _collection_config_from_json, and export AsyncReplicationConfig from the public API surface. Co-Authored-By: Claude Opus 4.6 (1M context) --- test/collection/test_config.py | 140 ++++++++++++++++++ weaviate/collections/classes/config.py | 22 +++ .../collections/classes/config_methods.py | 21 +++ weaviate/outputs/config.py | 2 + 4 files changed, 185 insertions(+) diff --git a/test/collection/test_config.py b/test/collection/test_config.py index 73c3e56ae..6a926d792 100644 --- a/test/collection/test_config.py +++ b/test/collection/test_config.py @@ -4,6 +4,8 @@ from pydantic import ValidationError from weaviate.collections.classes.config import ( + _AsyncReplicationConfig, + _ReplicationConfig, _ReplicationConfigUpdate, Configure, DataType, @@ -18,6 +20,7 @@ _ReplicationConfigCreate, ReplicationDeletionStrategy, ) +from weaviate.collections.classes.config_methods import _collection_config_from_json from weaviate.collections.classes.config_named_vectors import _NamedVectorConfigCreate from weaviate.collections.classes.config_vectorizers import ( Multi2VecField, @@ -2938,6 +2941,143 @@ def test_reconfigure_with_replication(config: _ReplicationConfigUpdate, expected assert config.model_dump() == expected +def test_collection_config_from_json_replication_without_async_config() -> None: + """Test that _collection_config_from_json parses replication config without asyncConfig.""" + from test.collection.schema import multi_vector_schema + + schema = multi_vector_schema() + # Schema has: {"asyncEnabled": False, "factor": 1} — no asyncConfig + config = _collection_config_from_json(schema) + assert config.replication_config.factor == 1 + assert config.replication_config.async_enabled is False + assert ( + config.replication_config.deletion_strategy + == ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION + ) + assert config.replication_config.async_config is None + + +def test_collection_config_from_json_replication_with_async_config() -> None: + """Test that _collection_config_from_json parses asyncConfig from replication config.""" + from test.collection.schema import multi_vector_schema + + schema = multi_vector_schema() + schema["replicationConfig"] = { + "asyncEnabled": True, + "factor": 3, + "deletionStrategy": "TimeBasedResolution", + "asyncConfig": { + "maxWorkers": 8, + "hashtreeHeight": 20, + "frequency": 60, + "frequencyWhilePropagating": 30, + "aliveNodesCheckingFrequency": 3, + "loggingFrequency": 15, + "diffBatchSize": 100, + "diffPerNodeTimeout": 10, + "prePropagationTimeout": 20, + "propagationTimeout": 300, + "propagationLimit": 1000, + "propagationDelay": 5, + "propagationConcurrency": 4, + "propagationBatchSize": 50, + }, + } + config = _collection_config_from_json(schema) + assert config.replication_config.factor == 3 + assert config.replication_config.async_enabled is True + assert ( + config.replication_config.deletion_strategy + == ReplicationDeletionStrategy.TIME_BASED_RESOLUTION + ) + assert config.replication_config.async_config is not None + ac = config.replication_config.async_config + assert ac.max_workers == 8 + assert ac.hashtree_height == 20 + assert ac.frequency == 60 + assert ac.frequency_while_propagating == 30 + assert ac.alive_nodes_checking_frequency == 3 + assert ac.logging_frequency == 15 + assert ac.diff_batch_size == 100 + assert ac.diff_per_node_timeout == 10 + assert ac.pre_propagation_timeout == 20 + assert ac.propagation_timeout == 300 + assert ac.propagation_limit == 1000 + assert ac.propagation_delay == 5 + assert ac.propagation_concurrency == 4 + assert ac.propagation_batch_size == 50 + + +def test_collection_config_from_json_replication_async_config_partial() -> None: + """Test that asyncConfig with only some fields set parses correctly (rest are None).""" + from test.collection.schema import multi_vector_schema + + schema = multi_vector_schema() + schema["replicationConfig"] = { + "asyncEnabled": True, + "factor": 3, + "asyncConfig": { + "maxWorkers": 8, + "hashtreeHeight": 20, + }, + } + config = _collection_config_from_json(schema) + ac = config.replication_config.async_config + assert ac is not None + assert ac.max_workers == 8 + assert ac.hashtree_height == 20 + assert ac.frequency is None + assert ac.frequency_while_propagating is None + assert ac.alive_nodes_checking_frequency is None + assert ac.propagation_batch_size is None + + +def test_replication_config_to_dict_with_async_config() -> None: + """Test that _ReplicationConfig.to_dict() includes asyncConfig when present.""" + config = _ReplicationConfig( + factor=3, + async_enabled=True, + deletion_strategy=ReplicationDeletionStrategy.TIME_BASED_RESOLUTION, + async_config=_AsyncReplicationConfig( + max_workers=8, + hashtree_height=20, + frequency=None, + frequency_while_propagating=None, + alive_nodes_checking_frequency=3, + logging_frequency=None, + diff_batch_size=None, + diff_per_node_timeout=None, + pre_propagation_timeout=None, + propagation_timeout=None, + propagation_limit=None, + propagation_delay=None, + propagation_concurrency=None, + propagation_batch_size=None, + ), + ) + d = config.to_dict() + assert d["factor"] == 3 + assert d["asyncEnabled"] is True + assert d["deletionStrategy"] == "TimeBasedResolution" + assert d["asyncConfig"]["maxWorkers"] == 8 + assert d["asyncConfig"]["hashtreeHeight"] == 20 + assert d["asyncConfig"]["aliveNodesCheckingFrequency"] == 3 + + +def test_replication_config_to_dict_without_async_config() -> None: + """Test that _ReplicationConfig.to_dict() omits asyncConfig when None.""" + config = _ReplicationConfig( + factor=1, + async_enabled=False, + deletion_strategy=ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION, + async_config=None, + ) + d = config.to_dict() + assert d["factor"] == 1 + assert d["asyncEnabled"] is False + assert "asyncConfig" not in d + + def test_nested_property_with_id_name_is_allowed() -> None: """A nested property named 'id' must not raise — only top-level 'id' is reserved.""" prop = Property( diff --git a/weaviate/collections/classes/config.py b/weaviate/collections/classes/config.py index 699a3c2a8..ece2c326e 100644 --- a/weaviate/collections/classes/config.py +++ b/weaviate/collections/classes/config.py @@ -1746,11 +1746,33 @@ def to_dict(self) -> Dict[str, Any]: ReferencePropertyConfig = _ReferenceProperty +@dataclass +class _AsyncReplicationConfig(_ConfigBase): + max_workers: Optional[int] + hashtree_height: Optional[int] + frequency: Optional[int] + frequency_while_propagating: Optional[int] + alive_nodes_checking_frequency: Optional[int] + logging_frequency: Optional[int] + diff_batch_size: Optional[int] + diff_per_node_timeout: Optional[int] + pre_propagation_timeout: Optional[int] + propagation_timeout: Optional[int] + propagation_limit: Optional[int] + propagation_delay: Optional[int] + propagation_concurrency: Optional[int] + propagation_batch_size: Optional[int] + + +AsyncReplicationConfig = _AsyncReplicationConfig + + @dataclass class _ReplicationConfig(_ConfigBase): factor: int async_enabled: bool deletion_strategy: ReplicationDeletionStrategy + async_config: Optional[_AsyncReplicationConfig] ReplicationConfig = _ReplicationConfig diff --git a/weaviate/collections/classes/config_methods.py b/weaviate/collections/classes/config_methods.py index 841d1ff2c..c150394f1 100644 --- a/weaviate/collections/classes/config_methods.py +++ b/weaviate/collections/classes/config_methods.py @@ -14,6 +14,7 @@ VectorFilterStrategy, VectorIndexType, Vectorizers, + _AsyncReplicationConfig, _BM25Config, _BQConfig, _CollectionConfig, @@ -380,6 +381,26 @@ def _collection_config_from_json(schema: Dict[str, Any]) -> _CollectionConfig: if "deletionStrategy" in schema["replicationConfig"] else ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION ), + async_config=( + _AsyncReplicationConfig( + max_workers=async_cfg.get("maxWorkers"), + hashtree_height=async_cfg.get("hashtreeHeight"), + frequency=async_cfg.get("frequency"), + frequency_while_propagating=async_cfg.get("frequencyWhilePropagating"), + alive_nodes_checking_frequency=async_cfg.get("aliveNodesCheckingFrequency"), + logging_frequency=async_cfg.get("loggingFrequency"), + diff_batch_size=async_cfg.get("diffBatchSize"), + diff_per_node_timeout=async_cfg.get("diffPerNodeTimeout"), + pre_propagation_timeout=async_cfg.get("prePropagationTimeout"), + propagation_timeout=async_cfg.get("propagationTimeout"), + propagation_limit=async_cfg.get("propagationLimit"), + propagation_delay=async_cfg.get("propagationDelay"), + propagation_concurrency=async_cfg.get("propagationConcurrency"), + propagation_batch_size=async_cfg.get("propagationBatchSize"), + ) + if (async_cfg := schema["replicationConfig"].get("asyncConfig")) + else None + ), ), reranker_config=__get_rerank_config(schema), sharding_config=( diff --git a/weaviate/outputs/config.py b/weaviate/outputs/config.py index 754f8a09e..17ebebf0e 100644 --- a/weaviate/outputs/config.py +++ b/weaviate/outputs/config.py @@ -1,4 +1,5 @@ from weaviate.collections.classes.config import ( + AsyncReplicationConfig, BM25Config, CollectionConfig, CollectionConfigSimple, @@ -30,6 +31,7 @@ ) __all__ = [ + "AsyncReplicationConfig", "BM25Config", "CollectionConfig", "CollectionConfigSimple", From d26429df0d3b2a1af7dd05e3f86001580023166c Mon Sep 17 00:00:00 2001 From: Jose Luis Franco Arza Date: Thu, 26 Mar 2026 14:47:05 +0100 Subject: [PATCH 2/5] fix: default async_config to None in _ReplicationConfig Existing callers construct _ReplicationConfig without async_config. Adding a default prevents a breaking positional argument error. Co-Authored-By: Claude Opus 4.6 (1M context) --- weaviate/collections/classes/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/collections/classes/config.py b/weaviate/collections/classes/config.py index ece2c326e..b4f8cd3b2 100644 --- a/weaviate/collections/classes/config.py +++ b/weaviate/collections/classes/config.py @@ -1772,7 +1772,7 @@ class _ReplicationConfig(_ConfigBase): factor: int async_enabled: bool deletion_strategy: ReplicationDeletionStrategy - async_config: Optional[_AsyncReplicationConfig] + async_config: Optional[_AsyncReplicationConfig] = None ReplicationConfig = _ReplicationConfig From 0b99cefd7331fd5f77b664c168b077bf11e021e5 Mon Sep 17 00:00:00 2001 From: Jose Luis Franco Arza Date: Thu, 26 Mar 2026 14:58:37 +0100 Subject: [PATCH 3/5] Address PR review feedback - Default async_config to None for backwards compatibility (copilot) - Use `is not None` instead of truthiness check for asyncConfig parsing so empty dict is not silently dropped (copilot) - Replace unit tests calling _collection_config_from_json with integration round-trip tests (dirkkul) - Add integration test for creating with async_config then removing it Co-Authored-By: Claude Opus 4.6 (1M context) --- integration/test_collection_config.py | 67 ++++++++++++++ test/collection/test_config.py | 92 ------------------- .../collections/classes/config_methods.py | 2 +- 3 files changed, 68 insertions(+), 93 deletions(-) diff --git a/integration/test_collection_config.py b/integration/test_collection_config.py index d0693ee33..cd37ae2ee 100644 --- a/integration/test_collection_config.py +++ b/integration/test_collection_config.py @@ -1574,6 +1574,73 @@ def test_replication_config( assert config.replication_config.deletion_strategy == deletion_strategy +def test_replication_config_without_async_config(collection_factory: CollectionFactory) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 26, 0): + pytest.skip("async replication requires Weaviate >= 1.26.0") + + collection = collection_factory( + replication_config=Configure.replication(factor=1, async_enabled=False), + ) + config = collection.config.get() + assert config.replication_config.factor == 1 + assert config.replication_config.async_enabled is False + assert config.replication_config.async_config is None + + +def test_replication_config_with_async_config(collection_factory: CollectionFactory) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0): + pytest.skip("async replication config requires Weaviate >= 1.36.0") + + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.factor == 1 + assert config.replication_config.async_enabled is True + assert config.replication_config.async_config is not None + ac = config.replication_config.async_config + assert ac.max_workers == 8 + assert ac.hashtree_height == 20 + + +def test_replication_config_remove_async_config(collection_factory: CollectionFactory) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0): + pytest.skip("async replication config requires Weaviate >= 1.36.0") + + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.async_config is not None + assert config.replication_config.async_config.max_workers == 8 + + collection.config.update( + replication_config=Reconfigure.replication( + async_enabled=False, + ), + ) + config = collection.config.get() + assert config.replication_config.async_enabled is False + assert config.replication_config.async_config is None + + def test_update_property_descriptions(collection_factory: CollectionFactory) -> None: collection = collection_factory( vectorizer_config=Configure.Vectorizer.none(), diff --git a/test/collection/test_config.py b/test/collection/test_config.py index 6a926d792..4b1573e1f 100644 --- a/test/collection/test_config.py +++ b/test/collection/test_config.py @@ -20,7 +20,6 @@ _ReplicationConfigCreate, ReplicationDeletionStrategy, ) -from weaviate.collections.classes.config_methods import _collection_config_from_json from weaviate.collections.classes.config_named_vectors import _NamedVectorConfigCreate from weaviate.collections.classes.config_vectorizers import ( Multi2VecField, @@ -2941,97 +2940,6 @@ def test_reconfigure_with_replication(config: _ReplicationConfigUpdate, expected assert config.model_dump() == expected -def test_collection_config_from_json_replication_without_async_config() -> None: - """Test that _collection_config_from_json parses replication config without asyncConfig.""" - from test.collection.schema import multi_vector_schema - - schema = multi_vector_schema() - # Schema has: {"asyncEnabled": False, "factor": 1} — no asyncConfig - config = _collection_config_from_json(schema) - assert config.replication_config.factor == 1 - assert config.replication_config.async_enabled is False - assert ( - config.replication_config.deletion_strategy - == ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION - ) - assert config.replication_config.async_config is None - - -def test_collection_config_from_json_replication_with_async_config() -> None: - """Test that _collection_config_from_json parses asyncConfig from replication config.""" - from test.collection.schema import multi_vector_schema - - schema = multi_vector_schema() - schema["replicationConfig"] = { - "asyncEnabled": True, - "factor": 3, - "deletionStrategy": "TimeBasedResolution", - "asyncConfig": { - "maxWorkers": 8, - "hashtreeHeight": 20, - "frequency": 60, - "frequencyWhilePropagating": 30, - "aliveNodesCheckingFrequency": 3, - "loggingFrequency": 15, - "diffBatchSize": 100, - "diffPerNodeTimeout": 10, - "prePropagationTimeout": 20, - "propagationTimeout": 300, - "propagationLimit": 1000, - "propagationDelay": 5, - "propagationConcurrency": 4, - "propagationBatchSize": 50, - }, - } - config = _collection_config_from_json(schema) - assert config.replication_config.factor == 3 - assert config.replication_config.async_enabled is True - assert ( - config.replication_config.deletion_strategy - == ReplicationDeletionStrategy.TIME_BASED_RESOLUTION - ) - assert config.replication_config.async_config is not None - ac = config.replication_config.async_config - assert ac.max_workers == 8 - assert ac.hashtree_height == 20 - assert ac.frequency == 60 - assert ac.frequency_while_propagating == 30 - assert ac.alive_nodes_checking_frequency == 3 - assert ac.logging_frequency == 15 - assert ac.diff_batch_size == 100 - assert ac.diff_per_node_timeout == 10 - assert ac.pre_propagation_timeout == 20 - assert ac.propagation_timeout == 300 - assert ac.propagation_limit == 1000 - assert ac.propagation_delay == 5 - assert ac.propagation_concurrency == 4 - assert ac.propagation_batch_size == 50 - - -def test_collection_config_from_json_replication_async_config_partial() -> None: - """Test that asyncConfig with only some fields set parses correctly (rest are None).""" - from test.collection.schema import multi_vector_schema - - schema = multi_vector_schema() - schema["replicationConfig"] = { - "asyncEnabled": True, - "factor": 3, - "asyncConfig": { - "maxWorkers": 8, - "hashtreeHeight": 20, - }, - } - config = _collection_config_from_json(schema) - ac = config.replication_config.async_config - assert ac is not None - assert ac.max_workers == 8 - assert ac.hashtree_height == 20 - assert ac.frequency is None - assert ac.frequency_while_propagating is None - assert ac.alive_nodes_checking_frequency is None - assert ac.propagation_batch_size is None - - def test_replication_config_to_dict_with_async_config() -> None: """Test that _ReplicationConfig.to_dict() includes asyncConfig when present.""" config = _ReplicationConfig( diff --git a/weaviate/collections/classes/config_methods.py b/weaviate/collections/classes/config_methods.py index c150394f1..16a77de09 100644 --- a/weaviate/collections/classes/config_methods.py +++ b/weaviate/collections/classes/config_methods.py @@ -398,7 +398,7 @@ def _collection_config_from_json(schema: Dict[str, Any]) -> _CollectionConfig: propagation_concurrency=async_cfg.get("propagationConcurrency"), propagation_batch_size=async_cfg.get("propagationBatchSize"), ) - if (async_cfg := schema["replicationConfig"].get("asyncConfig")) + if (async_cfg := schema["replicationConfig"].get("asyncConfig")) is not None else None ), ), From 7050230de7d979d480689dc359762b0e9a5d655f Mon Sep 17 00:00:00 2001 From: Jose Luis Franco Arza Date: Thu, 26 Mar 2026 15:49:16 +0100 Subject: [PATCH 4/5] fix: replace asyncConfig on update instead of merging Override merge_with_existing on _ReplicationConfigUpdate to: - Replace entire asyncConfig when provided (not merge), so omitted fields revert to server defaults. This follows the generative/reranker clear+replace pattern. - Pop asyncConfig from schema when asyncEnabled=False, so disabling async replication also clears the config. Add unit tests for all merge behaviors (replace, clear, preserve, reset) and integration tests for unsetting individual fields, resetting all fields via empty async_config(), and disabling async replication. Co-Authored-By: Claude Opus 4.6 (1M context) --- integration/test_collection_config.py | 119 ++++++++++++++++++-- test/collection/test_config.py | 25 ++++ test/collection/test_config_update.py | 60 +++++++++- weaviate/collections/classes/config.py | 15 +++ weaviate/collections/classes/config_base.py | 2 +- 5 files changed, 208 insertions(+), 13 deletions(-) diff --git a/integration/test_collection_config.py b/integration/test_collection_config.py index cd37ae2ee..371405a1d 100644 --- a/integration/test_collection_config.py +++ b/integration/test_collection_config.py @@ -1575,10 +1575,6 @@ def test_replication_config( def test_replication_config_without_async_config(collection_factory: CollectionFactory) -> None: - collection_dummy = collection_factory("dummy") - if collection_dummy._connection._weaviate_version.is_lower_than(1, 26, 0): - pytest.skip("async replication requires Weaviate >= 1.26.0") - collection = collection_factory( replication_config=Configure.replication(factor=1, async_enabled=False), ) @@ -1589,10 +1585,6 @@ def test_replication_config_without_async_config(collection_factory: CollectionF def test_replication_config_with_async_config(collection_factory: CollectionFactory) -> None: - collection_dummy = collection_factory("dummy") - if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0): - pytest.skip("async replication config requires Weaviate >= 1.36.0") - collection = collection_factory( replication_config=Configure.replication( factor=1, @@ -1612,10 +1604,12 @@ def test_replication_config_with_async_config(collection_factory: CollectionFact assert ac.hashtree_height == 20 -def test_replication_config_remove_async_config(collection_factory: CollectionFactory) -> None: +def test_replication_config_remove_async_config_by_disabling_async_replication( + collection_factory: CollectionFactory, +) -> None: collection_dummy = collection_factory("dummy") - if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0): - pytest.skip("async replication config requires Weaviate >= 1.36.0") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18): + pytest.skip("async replication config requires Weaviate >= 1.34.18") collection = collection_factory( replication_config=Configure.replication( @@ -1641,6 +1635,109 @@ def test_replication_config_remove_async_config(collection_factory: CollectionFa assert config.replication_config.async_config is None +def test_replication_config_remove_async_config(collection_factory: CollectionFactory) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18): + pytest.skip("async replication config requires Weaviate >= 1.34.18") + + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.async_config is not None + assert config.replication_config.async_config.max_workers == 8 + + collection.config.update( + replication_config=Reconfigure.replication( + factor=1, async_enabled=True, async_config=Reconfigure.Replication.async_config() + ), + ) + config = collection.config.get() + assert config.replication_config.async_enabled is True + assert config.replication_config.async_config is None + assert config.replication_config.factor == 1 + + +def test_replication_config_unset_single_async_field( + collection_factory: CollectionFactory, +) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0): + pytest.skip("async replication config requires Weaviate >= 1.36.0") + + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + ac = config.replication_config.async_config + assert ac is not None + assert ac.max_workers == 8 + assert ac.hashtree_height == 20 + + # Update with only max_workers — hashtree_height reverts to server default + collection.config.update( + replication_config=Reconfigure.replication( + async_config=Reconfigure.Replication.async_config( + max_workers=8, + ), + ), + ) + config = collection.config.get() + ac = config.replication_config.async_config + assert ac is not None + assert ac.max_workers == 8 + assert ac.hashtree_height != 20 + + +def test_replication_config_add_async_config_to_existing_collection( + collection_factory: CollectionFactory, +) -> None: + """Test updating a collection that was created without async_config to add one. + + This covers the case where the existing schema has no asyncConfig key + and merge_with_existing must handle the missing field gracefully. + """ + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18): + pytest.skip("async replication config requires Weaviate >= 1.34.18") + + # Create without async_config + collection = collection_factory( + replication_config=Configure.replication(factor=1, async_enabled=True), + ) + config = collection.config.get() + assert config.replication_config.async_config is None + + # Update to add async_config + collection.config.update( + replication_config=Reconfigure.replication( + async_config=Reconfigure.Replication.async_config( + max_workers=12, + propagation_concurrency=4, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.async_config is not None + ac = config.replication_config.async_config + assert ac.max_workers == 12 + assert ac.propagation_concurrency == 4 + + def test_update_property_descriptions(collection_factory: CollectionFactory) -> None: collection = collection_factory( vectorizer_config=Configure.Vectorizer.none(), diff --git a/test/collection/test_config.py b/test/collection/test_config.py index 4b1573e1f..523ddc980 100644 --- a/test/collection/test_config.py +++ b/test/collection/test_config.py @@ -2986,6 +2986,31 @@ def test_replication_config_to_dict_without_async_config() -> None: assert "asyncConfig" not in d +def test_replication_config_update_merge_with_missing_async_config() -> None: + """Test that merge_with_existing handles a schema without asyncConfig. + + When a collection was created without async replication config and we + update it to add one, the existing schema won't have the asyncConfig key. + merge_with_existing must not raise KeyError in this case. + """ + update = Reconfigure.replication( + async_config=Reconfigure.Replication.async_config( + max_workers=12, + propagation_concurrency=4, + ), + ) + # Simulate an existing schema that has no asyncConfig key + existing_schema = { + "factor": 3, + "asyncEnabled": True, + "deletionStrategy": "NoAutomatedResolution", + } + result = update.merge_with_existing(existing_schema) + assert result["asyncConfig"]["maxWorkers"] == 12 + assert result["asyncConfig"]["propagationConcurrency"] == 4 + assert result["factor"] == 3 + + def test_nested_property_with_id_name_is_allowed() -> None: """A nested property named 'id' must not raise — only top-level 'id' is reserved.""" prop = Property( diff --git a/test/collection/test_config_update.py b/test/collection/test_config_update.py index 8875f3368..680337291 100644 --- a/test/collection/test_config_update.py +++ b/test/collection/test_config_update.py @@ -1,7 +1,10 @@ import pytest from test.collection.schema import multi_vector_schema -from weaviate.collections.classes.config import Reconfigure, _CollectionConfigUpdate +from weaviate.collections.classes.config import ( + Reconfigure, + _CollectionConfigUpdate, +) from weaviate.exceptions import WeaviateInvalidInputError @@ -102,3 +105,58 @@ def test_enabling_sq_multi_vector(schema: dict, should_error: bool) -> None: assert new_schema["vectorConfig"]["boi"]["vectorIndexConfig"]["sq"]["enabled"] assert new_schema["vectorConfig"]["yeh"] == schema["vectorConfig"]["yeh"] + + +def test_replication_async_config_replace_on_update() -> None: + """Test asyncConfig is replaced (not merged) when provided in an update.""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication( + async_config=Reconfigure.Replication.async_config(max_workers=16), + ) + result = update.merge_with_existing(schema) + assert result["asyncConfig"] == {"maxWorkers": 16} + assert "hashtreeHeight" not in result["asyncConfig"] + + +def test_replication_async_config_cleared_when_async_disabled() -> None: + """Test asyncConfig is removed from schema when asyncEnabled is set to False.""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication(async_enabled=False) + result = update.merge_with_existing(schema) + assert result["asyncEnabled"] is False + assert "asyncConfig" not in result + + +def test_replication_async_config_preserved_when_not_provided() -> None: + """Test asyncConfig is preserved when not provided in update.""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication(factor=2) + result = update.merge_with_existing(schema) + assert result["factor"] == 2 + assert result["asyncConfig"] == {"maxWorkers": 8, "hashtreeHeight": 20} + + +def test_replication_async_config_reset_all_fields() -> None: + """Passing empty async_config should replace with empty dict (server uses defaults).""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication( + async_config=Reconfigure.Replication.async_config(), + ) + result = update.merge_with_existing(schema) + assert result["asyncConfig"] == {} diff --git a/weaviate/collections/classes/config.py b/weaviate/collections/classes/config.py index b4f8cd3b2..2a8b8d600 100644 --- a/weaviate/collections/classes/config.py +++ b/weaviate/collections/classes/config.py @@ -335,6 +335,21 @@ class _ReplicationConfigUpdate(_ConfigUpdateModel): asyncConfig: Optional[_AsyncReplicationConfigUpdate] deletionStrategy: Optional[ReplicationDeletionStrategy] + def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]: + if self.factor is not None: + schema["factor"] = self.factor + if self.asyncEnabled is not None: + schema["asyncEnabled"] = self.asyncEnabled + if not self.asyncEnabled: + schema.pop("asyncConfig", None) + if self.deletionStrategy is not None: + schema["deletionStrategy"] = str(self.deletionStrategy.value) + if self.asyncConfig is not None: + # Replace entire asyncConfig (like generative/reranker pattern) + # rather than merging, so omitted fields revert to server defaults + schema["asyncConfig"] = self.asyncConfig.model_dump(exclude_none=True) + return schema + class _BM25ConfigCreate(_ConfigCreateModel): b: float diff --git a/weaviate/collections/classes/config_base.py b/weaviate/collections/classes/config_base.py index 98099e8b1..fc696fdfb 100644 --- a/weaviate/collections/classes/config_base.py +++ b/weaviate/collections/classes/config_base.py @@ -42,7 +42,7 @@ def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]: ) schema[quantizer]["enabled"] = False elif isinstance(val, _ConfigUpdateModel): - schema[cls_field] = val.merge_with_existing(schema[cls_field]) + schema[cls_field] = val.merge_with_existing(schema.get(cls_field, {})) else: pass # ignore unknown types so that individual classes can be extended return schema From 53c968774dd371c29d792fa34ea93769db5ec9fa Mon Sep 17 00:00:00 2001 From: Jose Luis Franco Arza Date: Thu, 26 Mar 2026 18:15:28 +0100 Subject: [PATCH 5/5] fix: use truthiness check for asyncConfig parsing Revert to truthiness check when parsing asyncConfig from server response. An empty dict {} means no custom config and should be treated as absent (None), not as an _AsyncReplicationConfig with all None fields. Co-Authored-By: Claude Opus 4.6 (1M context) --- weaviate/collections/classes/config_methods.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weaviate/collections/classes/config_methods.py b/weaviate/collections/classes/config_methods.py index 16a77de09..c150394f1 100644 --- a/weaviate/collections/classes/config_methods.py +++ b/weaviate/collections/classes/config_methods.py @@ -398,7 +398,7 @@ def _collection_config_from_json(schema: Dict[str, Any]) -> _CollectionConfig: propagation_concurrency=async_cfg.get("propagationConcurrency"), propagation_batch_size=async_cfg.get("propagationBatchSize"), ) - if (async_cfg := schema["replicationConfig"].get("asyncConfig")) is not None + if (async_cfg := schema["replicationConfig"].get("asyncConfig")) else None ), ),