Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions integration/test_collection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,170 @@ def test_replication_config(
assert config.replication_config.deletion_strategy == deletion_strategy


def test_replication_config_without_async_config(collection_factory: CollectionFactory) -> None:
collection = collection_factory(
replication_config=Configure.replication(factor=1, async_enabled=False),
)
config = collection.config.get()
assert config.replication_config.factor == 1
assert config.replication_config.async_enabled is False
assert config.replication_config.async_config is None


def test_replication_config_with_async_config(collection_factory: CollectionFactory) -> None:
collection = collection_factory(
replication_config=Configure.replication(
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
),
),
)
config = collection.config.get()
assert config.replication_config.factor == 1
assert config.replication_config.async_enabled is True
assert config.replication_config.async_config is not None
ac = config.replication_config.async_config
assert ac.max_workers == 8
assert ac.hashtree_height == 20


def test_replication_config_remove_async_config_by_disabling_async_replication(
collection_factory: CollectionFactory,
) -> None:
collection_dummy = collection_factory("dummy")
if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18):
pytest.skip("async replication config requires Weaviate >= 1.34.18")

collection = collection_factory(
replication_config=Configure.replication(
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
),
),
)
config = collection.config.get()
assert config.replication_config.async_config is not None
assert config.replication_config.async_config.max_workers == 8

collection.config.update(
replication_config=Reconfigure.replication(
async_enabled=False,
),
)
config = collection.config.get()
assert config.replication_config.async_enabled is False
assert config.replication_config.async_config is None


def test_replication_config_remove_async_config(collection_factory: CollectionFactory) -> None:
collection_dummy = collection_factory("dummy")
if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18):
pytest.skip("async replication config requires Weaviate >= 1.34.18")

collection = collection_factory(
replication_config=Configure.replication(
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
),
),
)
config = collection.config.get()
assert config.replication_config.async_config is not None
assert config.replication_config.async_config.max_workers == 8

collection.config.update(
replication_config=Reconfigure.replication(
factor=1, async_enabled=True, async_config=Reconfigure.Replication.async_config()
),
)
config = collection.config.get()
assert config.replication_config.async_enabled is True
assert config.replication_config.async_config is None
assert config.replication_config.factor == 1


def test_replication_config_unset_single_async_field(
collection_factory: CollectionFactory,
) -> None:
collection_dummy = collection_factory("dummy")
if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0):
pytest.skip("async replication config requires Weaviate >= 1.36.0")

collection = collection_factory(
replication_config=Configure.replication(
factor=1,
async_enabled=True,
async_config=Configure.Replication.async_config(
max_workers=8,
hashtree_height=20,
),
),
)
config = collection.config.get()
ac = config.replication_config.async_config
assert ac is not None
assert ac.max_workers == 8
assert ac.hashtree_height == 20

# Update with only max_workers — hashtree_height reverts to server default
collection.config.update(
replication_config=Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=8,
),
),
)
config = collection.config.get()
ac = config.replication_config.async_config
assert ac is not None
assert ac.max_workers == 8
assert ac.hashtree_height != 20


def test_replication_config_add_async_config_to_existing_collection(
collection_factory: CollectionFactory,
) -> None:
"""Test updating a collection that was created without async_config to add one.

This covers the case where the existing schema has no asyncConfig key
and merge_with_existing must handle the missing field gracefully.
"""
collection_dummy = collection_factory("dummy")
if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18):
pytest.skip("async replication config requires Weaviate >= 1.34.18")

# Create without async_config
collection = collection_factory(
replication_config=Configure.replication(factor=1, async_enabled=True),
)
config = collection.config.get()
assert config.replication_config.async_config is None

# Update to add async_config
collection.config.update(
replication_config=Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=12,
propagation_concurrency=4,
),
),
)
config = collection.config.get()
assert config.replication_config.async_config is not None
ac = config.replication_config.async_config
assert ac.max_workers == 12
assert ac.propagation_concurrency == 4


def test_update_property_descriptions(collection_factory: CollectionFactory) -> None:
collection = collection_factory(
vectorizer_config=Configure.Vectorizer.none(),
Expand Down
73 changes: 73 additions & 0 deletions test/collection/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pydantic import ValidationError

from weaviate.collections.classes.config import (
_AsyncReplicationConfig,
_ReplicationConfig,
_ReplicationConfigUpdate,
Configure,
DataType,
Expand Down Expand Up @@ -2938,6 +2940,77 @@ def test_reconfigure_with_replication(config: _ReplicationConfigUpdate, expected
assert config.model_dump() == expected


def test_replication_config_to_dict_with_async_config() -> None:
"""Test that _ReplicationConfig.to_dict() includes asyncConfig when present."""
config = _ReplicationConfig(
factor=3,
async_enabled=True,
deletion_strategy=ReplicationDeletionStrategy.TIME_BASED_RESOLUTION,
async_config=_AsyncReplicationConfig(
max_workers=8,
hashtree_height=20,
frequency=None,
frequency_while_propagating=None,
alive_nodes_checking_frequency=3,
logging_frequency=None,
diff_batch_size=None,
diff_per_node_timeout=None,
pre_propagation_timeout=None,
propagation_timeout=None,
propagation_limit=None,
propagation_delay=None,
propagation_concurrency=None,
propagation_batch_size=None,
),
)
d = config.to_dict()
assert d["factor"] == 3
assert d["asyncEnabled"] is True
assert d["deletionStrategy"] == "TimeBasedResolution"
assert d["asyncConfig"]["maxWorkers"] == 8
assert d["asyncConfig"]["hashtreeHeight"] == 20
assert d["asyncConfig"]["aliveNodesCheckingFrequency"] == 3


def test_replication_config_to_dict_without_async_config() -> None:
"""Test that _ReplicationConfig.to_dict() omits asyncConfig when None."""
config = _ReplicationConfig(
factor=1,
async_enabled=False,
deletion_strategy=ReplicationDeletionStrategy.NO_AUTOMATED_RESOLUTION,
async_config=None,
)
d = config.to_dict()
assert d["factor"] == 1
assert d["asyncEnabled"] is False
assert "asyncConfig" not in d


def test_replication_config_update_merge_with_missing_async_config() -> None:
"""Test that merge_with_existing handles a schema without asyncConfig.

When a collection was created without async replication config and we
update it to add one, the existing schema won't have the asyncConfig key.
merge_with_existing must not raise KeyError in this case.
"""
update = Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(
max_workers=12,
propagation_concurrency=4,
),
)
# Simulate an existing schema that has no asyncConfig key
existing_schema = {
"factor": 3,
"asyncEnabled": True,
"deletionStrategy": "NoAutomatedResolution",
}
result = update.merge_with_existing(existing_schema)
assert result["asyncConfig"]["maxWorkers"] == 12
assert result["asyncConfig"]["propagationConcurrency"] == 4
assert result["factor"] == 3


def test_nested_property_with_id_name_is_allowed() -> None:
"""A nested property named 'id' must not raise — only top-level 'id' is reserved."""
prop = Property(
Expand Down
60 changes: 59 additions & 1 deletion test/collection/test_config_update.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import pytest

from test.collection.schema import multi_vector_schema
from weaviate.collections.classes.config import Reconfigure, _CollectionConfigUpdate
from weaviate.collections.classes.config import (
Reconfigure,
_CollectionConfigUpdate,
)
from weaviate.exceptions import WeaviateInvalidInputError


Expand Down Expand Up @@ -102,3 +105,58 @@ def test_enabling_sq_multi_vector(schema: dict, should_error: bool) -> None:
assert new_schema["vectorConfig"]["boi"]["vectorIndexConfig"]["sq"]["enabled"]

assert new_schema["vectorConfig"]["yeh"] == schema["vectorConfig"]["yeh"]


def test_replication_async_config_replace_on_update() -> None:
"""Test asyncConfig is replaced (not merged) when provided in an update."""
schema = {
"factor": 1,
"asyncEnabled": True,
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
}
update = Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(max_workers=16),
)
result = update.merge_with_existing(schema)
assert result["asyncConfig"] == {"maxWorkers": 16}
assert "hashtreeHeight" not in result["asyncConfig"]


def test_replication_async_config_cleared_when_async_disabled() -> None:
"""Test asyncConfig is removed from schema when asyncEnabled is set to False."""
schema = {
"factor": 1,
"asyncEnabled": True,
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
}
update = Reconfigure.replication(async_enabled=False)
result = update.merge_with_existing(schema)
assert result["asyncEnabled"] is False
assert "asyncConfig" not in result


def test_replication_async_config_preserved_when_not_provided() -> None:
"""Test asyncConfig is preserved when not provided in update."""
schema = {
"factor": 1,
"asyncEnabled": True,
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
}
update = Reconfigure.replication(factor=2)
result = update.merge_with_existing(schema)
assert result["factor"] == 2
assert result["asyncConfig"] == {"maxWorkers": 8, "hashtreeHeight": 20}


def test_replication_async_config_reset_all_fields() -> None:
"""Passing empty async_config should replace with empty dict (server uses defaults)."""
schema = {
"factor": 1,
"asyncEnabled": True,
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
}
update = Reconfigure.replication(
async_config=Reconfigure.Replication.async_config(),
)
result = update.merge_with_existing(schema)
assert result["asyncConfig"] == {}
37 changes: 37 additions & 0 deletions weaviate/collections/classes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,21 @@ class _ReplicationConfigUpdate(_ConfigUpdateModel):
asyncConfig: Optional[_AsyncReplicationConfigUpdate]
deletionStrategy: Optional[ReplicationDeletionStrategy]

def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]:
if self.factor is not None:
schema["factor"] = self.factor
if self.asyncEnabled is not None:
schema["asyncEnabled"] = self.asyncEnabled
if not self.asyncEnabled:
schema.pop("asyncConfig", None)
if self.deletionStrategy is not None:
schema["deletionStrategy"] = str(self.deletionStrategy.value)
if self.asyncConfig is not None:
# Replace entire asyncConfig (like generative/reranker pattern)
# rather than merging, so omitted fields revert to server defaults
schema["asyncConfig"] = self.asyncConfig.model_dump(exclude_none=True)
return schema


class _BM25ConfigCreate(_ConfigCreateModel):
b: float
Expand Down Expand Up @@ -1746,11 +1761,33 @@ def to_dict(self) -> Dict[str, Any]:
ReferencePropertyConfig = _ReferenceProperty


@dataclass
class _AsyncReplicationConfig(_ConfigBase):
max_workers: Optional[int]
hashtree_height: Optional[int]
frequency: Optional[int]
frequency_while_propagating: Optional[int]
alive_nodes_checking_frequency: Optional[int]
logging_frequency: Optional[int]
diff_batch_size: Optional[int]
diff_per_node_timeout: Optional[int]
pre_propagation_timeout: Optional[int]
propagation_timeout: Optional[int]
propagation_limit: Optional[int]
propagation_delay: Optional[int]
propagation_concurrency: Optional[int]
propagation_batch_size: Optional[int]


AsyncReplicationConfig = _AsyncReplicationConfig


@dataclass
class _ReplicationConfig(_ConfigBase):
factor: int
async_enabled: bool
deletion_strategy: ReplicationDeletionStrategy
async_config: Optional[_AsyncReplicationConfig] = None


ReplicationConfig = _ReplicationConfig
Expand Down
2 changes: 1 addition & 1 deletion weaviate/collections/classes/config_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]:
)
schema[quantizer]["enabled"] = False
elif isinstance(val, _ConfigUpdateModel):
schema[cls_field] = val.merge_with_existing(schema[cls_field])
schema[cls_field] = val.merge_with_existing(schema.get(cls_field, {}))
else:
pass # ignore unknown types so that individual classes can be extended
return schema
Expand Down
Loading
Loading