Skip to content

Commit 7050230

Browse files
jfrancoaclaude
andcommitted
fix: replace asyncConfig on update instead of merging
Override merge_with_existing on _ReplicationConfigUpdate to: - Replace entire asyncConfig when provided (not merge), so omitted fields revert to server defaults. This follows the generative/reranker clear+replace pattern. - Pop asyncConfig from schema when asyncEnabled=False, so disabling async replication also clears the config. Add unit tests for all merge behaviors (replace, clear, preserve, reset) and integration tests for unsetting individual fields, resetting all fields via empty async_config(), and disabling async replication. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 0b99cef commit 7050230

5 files changed

Lines changed: 208 additions & 13 deletions

File tree

integration/test_collection_config.py

Lines changed: 108 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,10 +1575,6 @@ def test_replication_config(
15751575

15761576

15771577
def test_replication_config_without_async_config(collection_factory: CollectionFactory) -> None:
1578-
collection_dummy = collection_factory("dummy")
1579-
if collection_dummy._connection._weaviate_version.is_lower_than(1, 26, 0):
1580-
pytest.skip("async replication requires Weaviate >= 1.26.0")
1581-
15821578
collection = collection_factory(
15831579
replication_config=Configure.replication(factor=1, async_enabled=False),
15841580
)
@@ -1589,10 +1585,6 @@ def test_replication_config_without_async_config(collection_factory: CollectionF
15891585

15901586

15911587
def test_replication_config_with_async_config(collection_factory: CollectionFactory) -> None:
1592-
collection_dummy = collection_factory("dummy")
1593-
if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0):
1594-
pytest.skip("async replication config requires Weaviate >= 1.36.0")
1595-
15961588
collection = collection_factory(
15971589
replication_config=Configure.replication(
15981590
factor=1,
@@ -1612,10 +1604,12 @@ def test_replication_config_with_async_config(collection_factory: CollectionFact
16121604
assert ac.hashtree_height == 20
16131605

16141606

1615-
def test_replication_config_remove_async_config(collection_factory: CollectionFactory) -> None:
1607+
def test_replication_config_remove_async_config_by_disabling_async_replication(
1608+
collection_factory: CollectionFactory,
1609+
) -> None:
16161610
collection_dummy = collection_factory("dummy")
1617-
if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0):
1618-
pytest.skip("async replication config requires Weaviate >= 1.36.0")
1611+
if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18):
1612+
pytest.skip("async replication config requires Weaviate >= 1.34.18")
16191613

16201614
collection = collection_factory(
16211615
replication_config=Configure.replication(
@@ -1641,6 +1635,109 @@ def test_replication_config_remove_async_config(collection_factory: CollectionFa
16411635
assert config.replication_config.async_config is None
16421636

16431637

1638+
def test_replication_config_remove_async_config(collection_factory: CollectionFactory) -> None:
1639+
collection_dummy = collection_factory("dummy")
1640+
if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18):
1641+
pytest.skip("async replication config requires Weaviate >= 1.34.18")
1642+
1643+
collection = collection_factory(
1644+
replication_config=Configure.replication(
1645+
factor=1,
1646+
async_enabled=True,
1647+
async_config=Configure.Replication.async_config(
1648+
max_workers=8,
1649+
hashtree_height=20,
1650+
),
1651+
),
1652+
)
1653+
config = collection.config.get()
1654+
assert config.replication_config.async_config is not None
1655+
assert config.replication_config.async_config.max_workers == 8
1656+
1657+
collection.config.update(
1658+
replication_config=Reconfigure.replication(
1659+
factor=1, async_enabled=True, async_config=Reconfigure.Replication.async_config()
1660+
),
1661+
)
1662+
config = collection.config.get()
1663+
assert config.replication_config.async_enabled is True
1664+
assert config.replication_config.async_config is None
1665+
assert config.replication_config.factor == 1
1666+
1667+
1668+
def test_replication_config_unset_single_async_field(
1669+
collection_factory: CollectionFactory,
1670+
) -> None:
1671+
collection_dummy = collection_factory("dummy")
1672+
if collection_dummy._connection._weaviate_version.is_lower_than(1, 36, 0):
1673+
pytest.skip("async replication config requires Weaviate >= 1.36.0")
1674+
1675+
collection = collection_factory(
1676+
replication_config=Configure.replication(
1677+
factor=1,
1678+
async_enabled=True,
1679+
async_config=Configure.Replication.async_config(
1680+
max_workers=8,
1681+
hashtree_height=20,
1682+
),
1683+
),
1684+
)
1685+
config = collection.config.get()
1686+
ac = config.replication_config.async_config
1687+
assert ac is not None
1688+
assert ac.max_workers == 8
1689+
assert ac.hashtree_height == 20
1690+
1691+
# Update with only max_workers — hashtree_height reverts to server default
1692+
collection.config.update(
1693+
replication_config=Reconfigure.replication(
1694+
async_config=Reconfigure.Replication.async_config(
1695+
max_workers=8,
1696+
),
1697+
),
1698+
)
1699+
config = collection.config.get()
1700+
ac = config.replication_config.async_config
1701+
assert ac is not None
1702+
assert ac.max_workers == 8
1703+
assert ac.hashtree_height != 20
1704+
1705+
1706+
def test_replication_config_add_async_config_to_existing_collection(
1707+
collection_factory: CollectionFactory,
1708+
) -> None:
1709+
"""Test updating a collection that was created without async_config to add one.
1710+
1711+
This covers the case where the existing schema has no asyncConfig key
1712+
and merge_with_existing must handle the missing field gracefully.
1713+
"""
1714+
collection_dummy = collection_factory("dummy")
1715+
if collection_dummy._connection._weaviate_version.is_lower_than(1, 34, 18):
1716+
pytest.skip("async replication config requires Weaviate >= 1.34.18")
1717+
1718+
# Create without async_config
1719+
collection = collection_factory(
1720+
replication_config=Configure.replication(factor=1, async_enabled=True),
1721+
)
1722+
config = collection.config.get()
1723+
assert config.replication_config.async_config is None
1724+
1725+
# Update to add async_config
1726+
collection.config.update(
1727+
replication_config=Reconfigure.replication(
1728+
async_config=Reconfigure.Replication.async_config(
1729+
max_workers=12,
1730+
propagation_concurrency=4,
1731+
),
1732+
),
1733+
)
1734+
config = collection.config.get()
1735+
assert config.replication_config.async_config is not None
1736+
ac = config.replication_config.async_config
1737+
assert ac.max_workers == 12
1738+
assert ac.propagation_concurrency == 4
1739+
1740+
16441741
def test_update_property_descriptions(collection_factory: CollectionFactory) -> None:
16451742
collection = collection_factory(
16461743
vectorizer_config=Configure.Vectorizer.none(),

test/collection/test_config.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2986,6 +2986,31 @@ def test_replication_config_to_dict_without_async_config() -> None:
29862986
assert "asyncConfig" not in d
29872987

29882988

2989+
def test_replication_config_update_merge_with_missing_async_config() -> None:
2990+
"""Test that merge_with_existing handles a schema without asyncConfig.
2991+
2992+
When a collection was created without async replication config and we
2993+
update it to add one, the existing schema won't have the asyncConfig key.
2994+
merge_with_existing must not raise KeyError in this case.
2995+
"""
2996+
update = Reconfigure.replication(
2997+
async_config=Reconfigure.Replication.async_config(
2998+
max_workers=12,
2999+
propagation_concurrency=4,
3000+
),
3001+
)
3002+
# Simulate an existing schema that has no asyncConfig key
3003+
existing_schema = {
3004+
"factor": 3,
3005+
"asyncEnabled": True,
3006+
"deletionStrategy": "NoAutomatedResolution",
3007+
}
3008+
result = update.merge_with_existing(existing_schema)
3009+
assert result["asyncConfig"]["maxWorkers"] == 12
3010+
assert result["asyncConfig"]["propagationConcurrency"] == 4
3011+
assert result["factor"] == 3
3012+
3013+
29893014
def test_nested_property_with_id_name_is_allowed() -> None:
29903015
"""A nested property named 'id' must not raise — only top-level 'id' is reserved."""
29913016
prop = Property(

test/collection/test_config_update.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import pytest
22

33
from test.collection.schema import multi_vector_schema
4-
from weaviate.collections.classes.config import Reconfigure, _CollectionConfigUpdate
4+
from weaviate.collections.classes.config import (
5+
Reconfigure,
6+
_CollectionConfigUpdate,
7+
)
58
from weaviate.exceptions import WeaviateInvalidInputError
69

710

@@ -102,3 +105,58 @@ def test_enabling_sq_multi_vector(schema: dict, should_error: bool) -> None:
102105
assert new_schema["vectorConfig"]["boi"]["vectorIndexConfig"]["sq"]["enabled"]
103106

104107
assert new_schema["vectorConfig"]["yeh"] == schema["vectorConfig"]["yeh"]
108+
109+
110+
def test_replication_async_config_replace_on_update() -> None:
111+
"""Test asyncConfig is replaced (not merged) when provided in an update."""
112+
schema = {
113+
"factor": 1,
114+
"asyncEnabled": True,
115+
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
116+
}
117+
update = Reconfigure.replication(
118+
async_config=Reconfigure.Replication.async_config(max_workers=16),
119+
)
120+
result = update.merge_with_existing(schema)
121+
assert result["asyncConfig"] == {"maxWorkers": 16}
122+
assert "hashtreeHeight" not in result["asyncConfig"]
123+
124+
125+
def test_replication_async_config_cleared_when_async_disabled() -> None:
126+
"""Test asyncConfig is removed from schema when asyncEnabled is set to False."""
127+
schema = {
128+
"factor": 1,
129+
"asyncEnabled": True,
130+
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
131+
}
132+
update = Reconfigure.replication(async_enabled=False)
133+
result = update.merge_with_existing(schema)
134+
assert result["asyncEnabled"] is False
135+
assert "asyncConfig" not in result
136+
137+
138+
def test_replication_async_config_preserved_when_not_provided() -> None:
139+
"""Test asyncConfig is preserved when not provided in update."""
140+
schema = {
141+
"factor": 1,
142+
"asyncEnabled": True,
143+
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
144+
}
145+
update = Reconfigure.replication(factor=2)
146+
result = update.merge_with_existing(schema)
147+
assert result["factor"] == 2
148+
assert result["asyncConfig"] == {"maxWorkers": 8, "hashtreeHeight": 20}
149+
150+
151+
def test_replication_async_config_reset_all_fields() -> None:
152+
"""Passing empty async_config should replace with empty dict (server uses defaults)."""
153+
schema = {
154+
"factor": 1,
155+
"asyncEnabled": True,
156+
"asyncConfig": {"maxWorkers": 8, "hashtreeHeight": 20},
157+
}
158+
update = Reconfigure.replication(
159+
async_config=Reconfigure.Replication.async_config(),
160+
)
161+
result = update.merge_with_existing(schema)
162+
assert result["asyncConfig"] == {}

weaviate/collections/classes/config.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,21 @@ class _ReplicationConfigUpdate(_ConfigUpdateModel):
335335
asyncConfig: Optional[_AsyncReplicationConfigUpdate]
336336
deletionStrategy: Optional[ReplicationDeletionStrategy]
337337

338+
def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]:
339+
if self.factor is not None:
340+
schema["factor"] = self.factor
341+
if self.asyncEnabled is not None:
342+
schema["asyncEnabled"] = self.asyncEnabled
343+
if not self.asyncEnabled:
344+
schema.pop("asyncConfig", None)
345+
if self.deletionStrategy is not None:
346+
schema["deletionStrategy"] = str(self.deletionStrategy.value)
347+
if self.asyncConfig is not None:
348+
# Replace entire asyncConfig (like generative/reranker pattern)
349+
# rather than merging, so omitted fields revert to server defaults
350+
schema["asyncConfig"] = self.asyncConfig.model_dump(exclude_none=True)
351+
return schema
352+
338353

339354
class _BM25ConfigCreate(_ConfigCreateModel):
340355
b: float

weaviate/collections/classes/config_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def merge_with_existing(self, schema: Dict[str, Any]) -> Dict[str, Any]:
4242
)
4343
schema[quantizer]["enabled"] = False
4444
elif isinstance(val, _ConfigUpdateModel):
45-
schema[cls_field] = val.merge_with_existing(schema[cls_field])
45+
schema[cls_field] = val.merge_with_existing(schema.get(cls_field, {}))
4646
else:
4747
pass # ignore unknown types so that individual classes can be extended
4848
return schema

0 commit comments

Comments
 (0)