diff --git a/integration/test_collection_config.py b/integration/test_collection_config.py index d0693ee33..371405a1d 100644 --- a/integration/test_collection_config.py +++ b/integration/test_collection_config.py @@ -1574,6 +1574,170 @@ def test_replication_config( assert config.replication_config.deletion_strategy == deletion_strategy +def test_replication_config_without_async_config(collection_factory: CollectionFactory) -> None: + collection = collection_factory( + replication_config=Configure.replication(factor=1, async_enabled=False), + ) + config = collection.config.get() + assert config.replication_config.factor == 1 + assert config.replication_config.async_enabled is False + assert config.replication_config.async_config is None + + +def test_replication_config_with_async_config(collection_factory: CollectionFactory) -> None: + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.factor == 1 + assert config.replication_config.async_enabled is True + assert config.replication_config.async_config is not None + ac = config.replication_config.async_config + assert ac.max_workers == 8 + assert ac.hashtree_height == 20 + + +def test_replication_config_remove_async_config_by_disabling_async_replication( + collection_factory: CollectionFactory, +) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18): + pytest.skip("async replication config requires Weaviate >= 1.34.18") + + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.async_config is not None + assert config.replication_config.async_config.max_workers == 8 + + collection.config.update( + replication_config=Reconfigure.replication( + async_enabled=False, + ), + ) + config = collection.config.get() + assert config.replication_config.async_enabled is False + assert config.replication_config.async_config is None + + +def test_replication_config_remove_async_config(collection_factory: CollectionFactory) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18): + pytest.skip("async replication config requires Weaviate >= 1.34.18") + + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.async_config is not None + assert config.replication_config.async_config.max_workers == 8 + + collection.config.update( + replication_config=Reconfigure.replication( + factor=1, async_enabled=True, async_config=Reconfigure.Replication.async_config() + ), + ) + config = collection.config.get() + assert config.replication_config.async_enabled is True + assert config.replication_config.async_config is None + assert config.replication_config.factor == 1 + + +def test_replication_config_unset_single_async_field( + collection_factory: CollectionFactory, +) -> None: + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0): + pytest.skip("async replication config requires Weaviate >= 1.36.0") + + collection = collection_factory( + replication_config=Configure.replication( + factor=1, + async_enabled=True, + async_config=Configure.Replication.async_config( + max_workers=8, + hashtree_height=20, + ), + ), + ) + config = collection.config.get() + ac = config.replication_config.async_config + assert ac is not None + assert ac.max_workers == 8 + assert ac.hashtree_height == 20 + + # Update with only max_workers — hashtree_height reverts to server default + collection.config.update( + replication_config=Reconfigure.replication( + async_config=Reconfigure.Replication.async_config( + max_workers=8, + ), + ), + ) + config = collection.config.get() + ac = config.replication_config.async_config + assert ac is not None + assert ac.max_workers == 8 + assert ac.hashtree_height != 20 + + +def test_replication_config_add_async_config_to_existing_collection( + collection_factory: CollectionFactory, +) -> None: + """Test updating a collection that was created without async_config to add one. + + This covers the case where the existing schema has no asyncConfig key + and merge_with_existing must handle the missing field gracefully. + """ + collection_dummy = collection_factory("dummy") + if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18): + pytest.skip("async replication config requires Weaviate >= 1.34.18") + + # Create without async_config + collection = collection_factory( + replication_config=Configure.replication(factor=1, async_enabled=True), + ) + config = collection.config.get() + assert config.replication_config.async_config is None + + # Update to add async_config + collection.config.update( + replication_config=Reconfigure.replication( + async_config=Reconfigure.Replication.async_config( + max_workers=12, + propagation_concurrency=4, + ), + ), + ) + config = collection.config.get() + assert config.replication_config.async_config is not None + ac = config.replication_config.async_config + assert ac.max_workers == 12 + assert ac.propagation_concurrency == 4 + + def test_update_property_descriptions(collection_factory: CollectionFactory) -> None: collection = collection_factory( vectorizer_config=Configure.Vectorizer.none(), diff --git a/test/collection/test_config.py b/test/collection/test_config.py index 73c3e56ae..523ddc980 100644 --- a/test/collection/test_config.py +++ b/test/collection/test_config.py @@ -4,6 +4,8 @@ from pydantic import ValidationError from weaviate.collections.classes.config import ( + _AsyncReplicationConfig, + _ReplicationConfig, _ReplicationConfigUpdate, Configure, DataType, @@ -2938,6 +2940,77 @@ def test_reconfigure_with_replication(config: _ReplicationConfigUpdate, expected assert config.model_dump() == expected +def test_replication_config_to_dict_with_async_config() -> None: + """Test that _ReplicationConfig.to_dict() includes asyncConfig when present.""" + config = _ReplicationConfig( + factor=3, + async_enabled=True, + deletion_strategy=ReplicationDeletionStrategy.TIME_BASED_RESOLUTION, + async_config=_AsyncReplicationConfig( + max_workers=8, + hashtree_height=20, + frequency=None, + frequency_while_propagating=None, + alive_nodes_checking_frequency=3, + logging_frequency=None, + diff_batch_size=None, + diff_per_node_timeout=None, + pre_propagation_timeout=None, + propagation_timeout=None, + propagation_limit=None, + propagation_delay=None, + propagation_concurrency=None, + propagation_batch_size=None, + ), + ) + d = config.to_dict() + assert d["factor"] == 3 + assert d["asyncEnabled"] is True + assert d["deletionStrategy"] == "TimeBasedResolution" + assert d["asyncConfig"]["maxWorkers"] == 8 + assert d["asyncConfig"]["hashtreeHeight"] == 20 + assert d["asyncConfig"]["aliveNodesCheckingFrequency"] == 3 + + +def test_replication_config_to_dict_without_async_config() -> None: + """Test that _ReplicationConfig.to_dict() omits asyncConfig when None.""" + config = _ReplicationConfig( + factor=1, + async_enabled=False, + deletion_strategy=ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION, + async_config=None, + ) + d = config.to_dict() + assert d["factor"] == 1 + assert d["asyncEnabled"] is False + assert "asyncConfig" not in d + + +def test_replication_config_update_merge_with_missing_async_config() -> None: + """Test that merge_with_existing handles a schema without asyncConfig. + + When a collection was created without async replication config and we + update it to add one, the existing schema won't have the asyncConfig key. + merge_with_existing must not raise KeyError in this case. + """ + update = Reconfigure.replication( + async_config=Reconfigure.Replication.async_config( + max_workers=12, + propagation_concurrency=4, + ), + ) + # Simulate an existing schema that has no asyncConfig key + existing_schema = { + "factor": 3, + "asyncEnabled": True, + "deletionStrategy": "NoAutomatedResolution", + } + result = update.merge_with_existing(existing_schema) + assert result["asyncConfig"]["maxWorkers"] == 12 + assert result["asyncConfig"]["propagationConcurrency"] == 4 + assert result["factor"] == 3 + + def test_nested_property_with_id_name_is_allowed() -> None: """A nested property named 'id' must not raise — only top-level 'id' is reserved.""" prop = Property( diff --git a/test/collection/test_config_update.py b/test/collection/test_config_update.py index 8875f3368..680337291 100644 --- a/test/collection/test_config_update.py +++ b/test/collection/test_config_update.py @@ -1,7 +1,10 @@ import pytest from test.collection.schema import multi_vector_schema -from weaviate.collections.classes.config import Reconfigure, _CollectionConfigUpdate +from weaviate.collections.classes.config import ( + Reconfigure, + _CollectionConfigUpdate, +) from weaviate.exceptions import WeaviateInvalidInputError @@ -102,3 +105,58 @@ def test_enabling_sq_multi_vector(schema: dict, should_error: bool) -> None: assert new_schema["vectorConfig"]["boi"]["vectorIndexConfig"]["sq"]["enabled"] assert new_schema["vectorConfig"]["yeh"] == schema["vectorConfig"]["yeh"] + + +def test_replication_async_config_replace_on_update() -> None: + """Test asyncConfig is replaced (not merged) when provided in an update.""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication( + async_config=Reconfigure.Replication.async_config(max_workers=16), + ) + result = update.merge_with_existing(schema) + assert result["asyncConfig"] == {"maxWorkers": 16} + assert "hashtreeHeight" not in result["asyncConfig"] + + +def test_replication_async_config_cleared_when_async_disabled() -> None: + """Test asyncConfig is removed from schema when asyncEnabled is set to False.""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication(async_enabled=False) + result = update.merge_with_existing(schema) + assert result["asyncEnabled"] is False + assert "asyncConfig" not in result + + +def test_replication_async_config_preserved_when_not_provided() -> None: + """Test asyncConfig is preserved when not provided in update.""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication(factor=2) + result = update.merge_with_existing(schema) + assert result["factor"] == 2 + assert result["asyncConfig"] == {"maxWorkers": 8, "hashtreeHeight": 20} + + +def test_replication_async_config_reset_all_fields() -> None: + """Passing empty async_config should replace with empty dict (server uses defaults).""" + schema = { + "factor": 1, + "asyncEnabled": True, + "asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20}, + } + update = Reconfigure.replication( + async_config=Reconfigure.Replication.async_config(), + ) + result = update.merge_with_existing(schema) + assert result["asyncConfig"] == {} diff --git a/weaviate/collections/classes/config.py b/weaviate/collections/classes/config.py index 699a3c2a8..2a8b8d600 100644 --- a/weaviate/collections/classes/config.py +++ b/weaviate/collections/classes/config.py @@ -335,6 +335,21 @@ class _ReplicationConfigUpdate(_ConfigUpdateModel): asyncConfig: Optional[_AsyncReplicationConfigUpdate] deletionStrategy: Optional[ReplicationDeletionStrategy] + def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]: + if self.factor is not None: + schema["factor"] = self.factor + if self.asyncEnabled is not None: + schema["asyncEnabled"] = self.asyncEnabled + if not self.asyncEnabled: + schema.pop("asyncConfig", None) + if self.deletionStrategy is not None: + schema["deletionStrategy"] = str(self.deletionStrategy.value) + if self.asyncConfig is not None: + # Replace entire asyncConfig (like generative/reranker pattern) + # rather than merging, so omitted fields revert to server defaults + schema["asyncConfig"] = self.asyncConfig.model_dump(exclude_none=True) + return schema + class _BM25ConfigCreate(_ConfigCreateModel): b: float @@ -1746,11 +1761,33 @@ def to_dict(self) -> Dict[str, Any]: ReferencePropertyConfig = _ReferenceProperty +@dataclass +class _AsyncReplicationConfig(_ConfigBase): + max_workers: Optional[int] + hashtree_height: Optional[int] + frequency: Optional[int] + frequency_while_propagating: Optional[int] + alive_nodes_checking_frequency: Optional[int] + logging_frequency: Optional[int] + diff_batch_size: Optional[int] + diff_per_node_timeout: Optional[int] + pre_propagation_timeout: Optional[int] + propagation_timeout: Optional[int] + propagation_limit: Optional[int] + propagation_delay: Optional[int] + propagation_concurrency: Optional[int] + propagation_batch_size: Optional[int] + + +AsyncReplicationConfig = _AsyncReplicationConfig + + @dataclass class _ReplicationConfig(_ConfigBase): factor: int async_enabled: bool deletion_strategy: ReplicationDeletionStrategy + async_config: Optional[_AsyncReplicationConfig] = None ReplicationConfig = _ReplicationConfig diff --git a/weaviate/collections/classes/config_base.py b/weaviate/collections/classes/config_base.py index 98099e8b1..fc696fdfb 100644 --- a/weaviate/collections/classes/config_base.py +++ b/weaviate/collections/classes/config_base.py @@ -42,7 +42,7 @@ def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]: ) schema[quantizer]["enabled"] = False elif isinstance(val, _ConfigUpdateModel): - schema[cls_field] = val.merge_with_existing(schema[cls_field]) + schema[cls_field] = val.merge_with_existing(schema.get(cls_field, {})) else: pass # ignore unknown types so that individual classes can be extended return schema diff --git a/weaviate/collections/classes/config_methods.py b/weaviate/collections/classes/config_methods.py index 841d1ff2c..c150394f1 100644 --- a/weaviate/collections/classes/config_methods.py +++ b/weaviate/collections/classes/config_methods.py @@ -14,6 +14,7 @@ VectorFilterStrategy, VectorIndexType, Vectorizers, + _AsyncReplicationConfig, _BM25Config, _BQConfig, _CollectionConfig, @@ -380,6 +381,26 @@ def _collection_config_from_json(schema: Dict[str, Any]) -> _CollectionConfig: if "deletionStrategy" in schema["replicationConfig"] else ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION ), + async_config=( + _AsyncReplicationConfig( + max_workers=async_cfg.get("maxWorkers"), + hashtree_height=async_cfg.get("hashtreeHeight"), + frequency=async_cfg.get("frequency"), + frequency_while_propagating=async_cfg.get("frequencyWhilePropagating"), + alive_nodes_checking_frequency=async_cfg.get("aliveNodesCheckingFrequency"), + logging_frequency=async_cfg.get("loggingFrequency"), + diff_batch_size=async_cfg.get("diffBatchSize"), + diff_per_node_timeout=async_cfg.get("diffPerNodeTimeout"), + pre_propagation_timeout=async_cfg.get("prePropagationTimeout"), + propagation_timeout=async_cfg.get("propagationTimeout"), + propagation_limit=async_cfg.get("propagationLimit"), + propagation_delay=async_cfg.get("propagationDelay"), + propagation_concurrency=async_cfg.get("propagationConcurrency"), + propagation_batch_size=async_cfg.get("propagationBatchSize"), + ) + if (async_cfg := schema["replicationConfig"].get("asyncConfig")) + else None + ), ), reranker_config=__get_rerank_config(schema), sharding_config=( diff --git a/weaviate/outputs/config.py b/weaviate/outputs/config.py index 754f8a09e..17ebebf0e 100644 --- a/weaviate/outputs/config.py +++ b/weaviate/outputs/config.py @@ -1,4 +1,5 @@ from weaviate.collections.classes.config import ( + AsyncReplicationConfig, BM25Config, CollectionConfig, CollectionConfigSimple, @@ -30,6 +31,7 @@ ) __all__ = [ + "AsyncReplicationConfig", "BM25Config", "CollectionConfig", "CollectionConfigSimple",