diff --git a/caldav/async_davclient.py b/caldav/async_davclient.py index d929ec18..2c4bd006 100644 --- a/caldav/async_davclient.py +++ b/caldav/async_davclient.py @@ -1095,7 +1095,11 @@ async def check_dav_support(self) -> str | None: Returns the DAV header from an OPTIONS request, or None if not supported. """ - response = await self.options(str(self.url)) + try: + principal = await self.principal() + response = await self.options(principal.url) + except Exception: + response = await self.options(str(self.url)) return response.headers.get("DAV") async def check_cdav_support(self) -> bool: diff --git a/caldav/calendarobjectresource.py b/caldav/calendarobjectresource.py index 1a9a098b..a0f33976 100644 --- a/caldav/calendarobjectresource.py +++ b/caldav/calendarobjectresource.py @@ -2267,6 +2267,8 @@ def complete( self._complete_ical(completion_timestamp=completion_timestamp) self.save() + ## TODO: there is TERRIBLY much code duplication here. We should try to consolidate + ## the sync and async code better. async def _async_complete( self, completion_timestamp: datetime, @@ -2275,13 +2277,94 @@ async def _async_complete( ) -> None: """Async implementation of complete().""" if "RRULE" in self.icalendar_component and handle_rrule: - # _complete_recurring_* methods are sync-only for now; they internally - # call self.save() which would return an unawaited coroutine in async mode. - # This is a known limitation - handle_rrule is not yet async-safe. - raise NotImplementedError("handle_rrule=True is not yet supported for async clients") + await getattr(self, "_async_complete_recurring_%s" % rrule_mode)(completion_timestamp) + return self._complete_ical(completion_timestamp=completion_timestamp) await self.save() + async def _async_complete_recurring_safe(self, completion_timestamp: datetime) -> None: + """Async version of _complete_recurring_safe.""" + if not self._reduce_count(): + return await self._async_complete(completion_timestamp, handle_rrule=False) + next_dtstart = self._next(completion_timestamp) + if not next_dtstart: + return await self._async_complete(completion_timestamp, handle_rrule=False) + + completed = self.copy() + completed.url = self.parent.url.join(completed.id + ".ics") + completed.icalendar_component.pop("RRULE") + await completed.save() + completed._complete_ical(completion_timestamp=completion_timestamp) + await completed.save() + + duration = self.get_duration() + i = self.icalendar_component + i.pop("DTSTART", None) + i.add("DTSTART", next_dtstart) + self.set_duration(duration, movable_attr="DUE") + await self.save() + + async def _async_complete_recurring_thisandfuture(self, completion_timestamp: datetime) -> None: + """Async version of _complete_recurring_thisandfuture.""" + recurrences = self.icalendar_instance.subcomponents + orig = recurrences[0] + if "STATUS" not in orig: + orig["STATUS"] = "NEEDS-ACTION" + + if len(recurrences) == 1: + just_completed = orig.copy() + just_completed.pop("RRULE") + just_completed.add("RECURRENCE-ID", orig.get("DTSTART", completion_timestamp)) + seqno = just_completed.pop("SEQUENCE", 0) + just_completed.add("SEQUENCE", seqno + 1) + recurrences.append(just_completed) + + prev = recurrences[-1] + rrule = prev.get("RRULE", orig["RRULE"]) + thisandfuture = prev.copy() + seqno = thisandfuture.pop("SEQUENCE", 0) + thisandfuture.add("SEQUENCE", seqno + 1) + + if len(recurrences) > 2: + if prev["RECURRENCE-ID"].params.get("RANGE", None) == "THISANDFUTURE": + prev["RECURRENCE-ID"].params.pop("RANGE") + else: + raise NotImplementedError( + "multiple instances found, but last one is not of type THISANDFUTURE, possibly this has been created by some incompatible client, but we should deal with it" + ) + self._complete_ical(prev, completion_timestamp) + + thisandfuture.pop("RECURRENCE-ID", None) + thisandfuture.add("RECURRENCE-ID", self._next(i=prev, rrule=rrule)) + thisandfuture["RECURRENCE-ID"].params["RANGE"] = "THISANDFUTURE" + rrule2 = thisandfuture.pop("RRULE", None) + + if rrule2 is not None: + count = rrule2.get("COUNT", None) + if count is not None and count[0] in (0, 1): + for i in recurrences: + self._complete_ical(i, completion_timestamp=completion_timestamp) + thisandfuture.add("RRULE", rrule2) + else: + count = rrule.get("COUNT", None) + if count is not None and count[0] <= len( + [x for x in recurrences if not self.is_pending(x)] + ): + self._complete_ical(recurrences[0], completion_timestamp=completion_timestamp) + await self.save(increase_seqno=False) + return + + rrule = rrule2 or rrule + + duration = self._get_duration(i=prev) + thisandfuture.pop("DTSTART", None) + thisandfuture.pop("DUE", None) + next_dtstart = self._next(i=prev, rrule=rrule, ts=completion_timestamp) + thisandfuture.add("DTSTART", next_dtstart) + self._set_duration(i=thisandfuture, duration=duration, movable_attr="DUE") + self.icalendar_instance.subcomponents.append(thisandfuture) + await self.save(increase_seqno=False) + def _complete_ical(self, i=None, completion_timestamp=None) -> None: if i is None: i = self.icalendar_component diff --git a/caldav/collection.py b/caldav/collection.py index b3519281..37b87b96 100644 --- a/caldav/collection.py +++ b/caldav/collection.py @@ -456,6 +456,17 @@ def calendar( It will not initiate any communication with the server. """ if not cal_url: + ## For full-URL cal_id, skip calendar_home_set (which may be async-lazy) + if cal_id and ( + isinstance(cal_id, URL) + or ( + isinstance(cal_id, str) + and (cal_id.startswith("https://") or cal_id.startswith("http://")) + ) + ): + if self.client is None: + raise ValueError("Unexpected value None for self.client") + return Calendar(self.client, url=URL.objectify(cal_id)) return self.calendar_home_set.calendar(name, cal_id) else: if self.client is None: @@ -611,10 +622,14 @@ async def _async_freebusy_request(self, outbox, fb_obj) -> dict: response = await self.client.post(outbox.url, fb_obj.data, headers) return response._parse_scheduling_response_objects(parent=self) - def calendar_user_address_set(self) -> list[str | None]: + def calendar_user_address_set( + self, + ) -> "list[str | None] | Coroutine[Any, Any, list[str | None]]": """ defined in RFC6638 """ + if self.is_async_client: + return self._async_calendar_user_address_set() _addresses: _Element | None = self.get_property( cdav.CalendarUserAddressSet(), parse_props=False ) @@ -629,6 +644,15 @@ def calendar_user_address_set(self) -> list[str | None]: addresses.sort(key=lambda x: -int(x.get("preferred", 0))) return [x.text for x in addresses] + async def _async_calendar_user_address_set(self) -> list[str | None]: + _addresses = await self.get_property(cdav.CalendarUserAddressSet(), parse_props=False) + if _addresses is None: + raise error.NotFoundError("No calendar user addresses given from server") + assert not [x for x in _addresses if x.tag != dav.Href().tag] + addresses = list(_addresses) + addresses.sort(key=lambda x: -int(x.get("preferred", 0))) + return [x.text for x in addresses] + def schedule_inbox(self) -> "ScheduleInbox | Coroutine[Any, Any, ScheduleInbox]": """ Returns the schedule inbox, as defined in RFC6638. @@ -1512,7 +1536,9 @@ def search( self, server_expand, split_expanded, props, xml, post_filter, _hacks ) - def freebusy_request(self, start: datetime, end: datetime) -> "FreeBusy": + def freebusy_request( + self, start: datetime, end: datetime + ) -> "FreeBusy | Coroutine[Any, Any, FreeBusy]": """ Search the calendar, but return only the free/busy information. @@ -1521,13 +1547,18 @@ def freebusy_request(self, start: datetime, end: datetime) -> "FreeBusy": end : same as above. Returns: - [FreeBusy(), ...] + FreeBusy object (or a coroutine for async clients) """ - ## TODO: async variant? root = cdav.FreeBusyQuery() + [cdav.TimeRange(start, end)] + if self.is_async_client: + return self._async_freebusy_request(root) response = self._query(root, 1, "report") return FreeBusy(self, response.raw) + async def _async_freebusy_request(self, root) -> "FreeBusy": + response = await self._query(root, 1, "report") + return FreeBusy(self, response.raw) + def get_todos( self, sort_keys: Sequence[str] = ("due", "priority"), diff --git a/tests/test_async_integration.py b/tests/test_async_integration.py index f17ca163..97565781 100644 --- a/tests/test_async_integration.py +++ b/tests/test_async_integration.py @@ -9,13 +9,15 @@ import asyncio import uuid -from datetime import datetime, timedelta, timezone +from datetime import date, datetime, timedelta, timezone from functools import wraps from typing import Any +import icalendar import pytest import pytest_asyncio +from caldav import Event, FreeBusy, Todo from caldav.compatibility_hints import FeatureSet from .test_caldav import ( @@ -29,6 +31,10 @@ from .test_caldav import todo as todo_static # avoids clash with local var in add_todo() from .test_caldav import todo2 as todo2_static # avoids clash with todo2() generator from .test_caldav import todo3 as todo3_static +from .test_caldav import todo4 as todo4_static +from .test_caldav import todo5 as todo5_static +from .test_caldav import todo6 as todo6_static +from .test_caldav import todo8 as todo8_static from .test_servers import TestServer, get_available_servers @@ -176,6 +182,9 @@ def skip_unless_support(self, feature): msg = self._features.find_feature(feature).get("description", feature) pytest.skip("Test skipped due to server incompatibility issue: " + msg) + def check_compatibility_flag(self, flag: str) -> bool: + return flag in getattr(self._features, "_old_flags", []) + @pytest.fixture(scope="class") def test_server(self) -> TestServer: """Get the test server for this class.""" @@ -823,6 +832,1397 @@ async def test_multi_get(self, async_calendar: Any) -> None: await event1.load_by_multiget() + # ==================== Group B – Sync Tokens ==================== + + @pytest.mark.asyncio + async def test_object_by_sync_token(self, async_calendar: Any) -> None: + """Sync-token cycle via get_objects_by_sync_token().""" + self.skip_unless_support("save-load.event") + + c = async_calendar + + sync_info = self.is_supported("sync-token", return_type=dict) + is_time_based = sync_info.get("behaviour") == "time-based" + is_fragile = sync_info.get("support") in ( + "fragile", + "broken", + "unsupported", + "ungraceful", + ) + + objcnt = 0 + if self.is_supported("save-load.todo.mixed-calendar"): + objcnt += len(await c.get_todos()) + objcnt += len(await c.get_events()) + + obj = await c.add_event(ev1_static) + objcnt += 1 + if self.is_supported("save-load.event.recurrences"): + await c.add_event(evr_static) + objcnt += 1 + if self.is_supported("save-load.todo.mixed-calendar"): + await c.add_todo(todo_static) + await c.add_todo(todo2_static) + await c.add_todo(todo3_static) + objcnt += 3 + + if is_time_based: + await asyncio.sleep(1) + + my_objects = await c.objects() + assert my_objects.sync_token != "" + assert len(list(my_objects)) == objcnt + + is_using_fallback = my_objects.sync_token.startswith("fake-") + if not is_using_fallback: + for some_obj in my_objects: + assert some_obj.data is None + + if is_time_based: + await asyncio.sleep(1) + + my_changed_objects = await c.get_objects_by_sync_token(sync_token=my_objects.sync_token) + if not is_fragile: + assert len(list(my_changed_objects)) == 0 + + self.skip_unless_support("save-load.mutable") + + if is_time_based: + await asyncio.sleep(1) + obj.icalendar_instance.subcomponents[0]["SUMMARY"] = "foobar" + await obj.save() + + if is_time_based: + await asyncio.sleep(1) + + my_changed_objects = await c.get_objects_by_sync_token( + sync_token=my_changed_objects.sync_token, load_objects=True + ) + if is_fragile: + assert len(list(my_changed_objects)) >= 1 + else: + assert len(list(my_changed_objects)) == 1 + assert list(my_changed_objects)[0].data is not None + + if is_time_based: + await asyncio.sleep(1) + + my_changed_objects = await c.get_objects_by_sync_token( + sync_token=my_changed_objects.sync_token + ) + if not is_fragile: + assert len(list(my_changed_objects)) == 0 + + if is_time_based: + await asyncio.sleep(1) + obj3 = await c.add_event(ev3_static) + if is_time_based: + await asyncio.sleep(1) + my_changed_objects = await c.get_objects_by_sync_token( + sync_token=my_changed_objects.sync_token + ) + if not is_fragile: + assert len(list(my_changed_objects)) == 1 + + if is_time_based: + await asyncio.sleep(1) + + my_changed_objects = await c.get_objects_by_sync_token( + sync_token=my_changed_objects.sync_token + ) + if not is_fragile: + assert len(list(my_changed_objects)) == 0 + + if is_time_based: + await asyncio.sleep(1) + + await obj.delete() + self.skip_unless_support("sync-token.delete") + + if is_time_based: + await asyncio.sleep(1) + my_changed_objects = await c.get_objects_by_sync_token( + sync_token=my_changed_objects.sync_token, load_objects=True + ) + if not is_fragile: + assert len(list(my_changed_objects)) == 1 + if is_time_based: + await asyncio.sleep(1) + assert list(my_changed_objects)[0].data is None + + my_changed_objects = await c.get_objects_by_sync_token( + sync_token=my_changed_objects.sync_token + ) + if not is_fragile: + assert len(list(my_changed_objects)) == 0 + + @pytest.mark.asyncio + async def test_sync(self, async_calendar: Any) -> None: + """Sync-token cycle via SynchronizableCalendarObjectCollection.sync().""" + self.skip_unless_support("save-load.event") + + c = async_calendar + + sync_info = self.is_supported("sync-token", return_type=dict) + is_time_based = sync_info.get("behaviour") == "time-based" + is_fragile = sync_info.get("support") in ( + "fragile", + "broken", + "unsupported", + "ungraceful", + ) + + objcnt = 0 + if self.is_supported("save-load.todo.mixed-calendar"): + objcnt += len(await c.get_todos()) + objcnt += len(await c.get_events()) + + obj = await c.add_event(ev1_static) + objcnt += 1 + if self.is_supported("save-load.event.recurrences"): + await c.add_event(evr_static) + objcnt += 1 + if self.is_supported("save-load.todo.mixed-calendar"): + await c.add_todo(todo_static) + await c.add_todo(todo2_static) + await c.add_todo(todo3_static) + objcnt += 3 + + if is_time_based: + await asyncio.sleep(1) + + my_objects = await c.objects(load_objects=True) + assert my_objects.sync_token != "" + assert len(list(my_objects)) == objcnt + + if is_time_based: + await asyncio.sleep(1) + + updated, deleted = await my_objects.sync() + if not is_fragile: + assert len(list(updated)) == 0 + assert len(list(deleted)) == 0 + + if is_time_based: + await asyncio.sleep(1) + + self.skip_unless_support("save-load.mutable") + + obj.icalendar_instance.subcomponents[0]["SUMMARY"] = "foobar" + await obj.save() + + if is_time_based: + await asyncio.sleep(1) + + updated, deleted = await my_objects.sync() + if not is_fragile: + assert len(list(updated)) == 1 + assert len(list(deleted)) == 0 + assert "foobar" in my_objects.objects_by_url()[obj.url].data + + if is_time_based: + await asyncio.sleep(1) + + obj3 = await c.add_event(ev3_static) + + if is_time_based: + await asyncio.sleep(1) + + updated, deleted = await my_objects.sync() + if not is_fragile: + assert len(list(updated)) == 1 + assert len(list(deleted)) == 0 + assert obj3.url in my_objects.objects_by_url() + + self.skip_unless_support("sync-token.delete") + + if is_time_based: + await asyncio.sleep(1) + + await obj.delete() + if is_time_based: + await asyncio.sleep(1) + updated, deleted = await my_objects.sync() + if not is_fragile: + assert len(list(updated)) == 0 + assert len(list(deleted)) == 1 + assert obj.url not in my_objects.objects_by_url() + + if is_time_based: + await asyncio.sleep(1) + + updated, deleted = await my_objects.sync() + if not is_fragile: + assert len(list(updated)) == 0 + assert len(list(deleted)) == 0 + + # ==================== Group C – Search ==================== + + @pytest.mark.asyncio + async def test_search_should_yield_data(self, async_calendar: Any) -> None: + """search(event=True) must return objects with non-empty .data.""" + self.skip_unless_support("search.unlimited-time-range") + c = async_calendar + if self.is_supported("save-load.event"): + await c.add_event(ev1_static) + await c.add_event(ev2_static) + await c.add_event(ev3_static) + objects = await c.search(event=True) + assert objects + assert objects[0].data + + @pytest.mark.asyncio + async def test_search_event(self, async_calendar: Any) -> None: + """Comprehensive event search: UID, class, category, text, sort.""" + self.skip_unless_support("save-load.event") + self.skip_unless_support("search") + self.skip_unless_support("search.time-range.event.old-dates") + c = async_calendar + + await c.add_event(ev1_static) + await c.add_event(ev3_static) + await c.add_event(evr_static) + + all_events = await c.search() + assert len(all_events) <= 3 + + all_events = await c.search(comp_class=Event) + assert len(all_events) == 3 + + try: + no_events = await c.search(todo=True) + except Exception: + no_events = [] + assert len(no_events) == 0 + + some_events = await c.search( + comp_class=Event, + expand=False, + start=datetime(2006, 7, 13, 13, 0), + end=datetime(2006, 7, 15, 13, 0), + ) + assert len(some_events) == 1 + + some_events = await c.search(comp_class=Event, uid="19970901T130000Z-123403@example.com") + assert len(some_events) == 1 + + some_events = await c.search(comp_class=Event, class_="CONFIDENTIAL") + assert len(some_events) == 1 + + some_events = await c.search(comp_class=Event, no_class=True) + assert ( + len(some_events) == 2 + or len([x for x in all_events if x.icalendar_component["class"] == "PUBLIC"]) == 2 + ) + + some_events = await c.search(comp_class=Event, no_category=True) + assert len(some_events) == 2 + + some_events = await c.search(comp_class=Event, no_dtend=True) + assert len(some_events) == 1 + + some_events = await c.search(comp_class=Event, category="PERSONAL") + assert len(some_events) == 1 + + some_events = await c.search(comp_class=Event, summary="Bastille Day Party") + assert len(some_events) == 1 + + all_events = await c.search(sort_keys=("DTSTART",)) + assert len(all_events) == 3 + assert str(all_events[0].icalendar_component["DTSTART"].dt) < str( + all_events[1].icalendar_component["DTSTART"].dt + ) + + @pytest.mark.asyncio + async def test_search_comp_type(self, async_calendar: Any) -> None: + """get_events() and get_todos() must filter by component type.""" + self.skip_unless_support("save-load.todo") + self.skip_unless_support("save-load.event") + self.skip_unless_support("save-load.todo.mixed-calendar") + c = async_calendar + + event = await c.add_event( + summary="Test Event for Component-Type Filtering", + dtstart=datetime(2025, 1, 1, 12, 0, 0), + dtend=datetime(2025, 1, 1, 13, 0, 0), + ) + todo_obj = await c.add_todo( + summary="Test TODO for Component-Type Filtering", + dtstart=date(2025, 1, 2), + ) + + events = await c.get_events() + event_summaries = [e.component["summary"] for e in events] + todos = await c.get_todos(include_completed=True) + todo_summaries = [t.component["summary"] for t in todos] + + assert "Test Event for Component-Type Filtering" in event_summaries + assert "Test TODO for Component-Type Filtering" not in event_summaries + assert "Test TODO for Component-Type Filtering" in todo_summaries + assert "Test Event for Component-Type Filtering" not in todo_summaries + + await event.delete() + await todo_obj.delete() + + @pytest.mark.asyncio + async def test_search_without_comp_type(self, async_calendar: Any) -> None: + """search() with no filter must return all objects.""" + self.skip_unless_support("save-load.todo.mixed-calendar") + c = async_calendar + await c.add_todo(todo_static) + await c.add_event(ev1_static) + objects = await c.search() + assert len(objects) >= 2 + type_names = {type(x).__name__ for x in objects} + assert "Todo" in type_names + assert "Event" in type_names + + @pytest.mark.asyncio + async def test_search_sort_todo(self, async_task_list: Any) -> None: + """Todos are sorted correctly by various sort_keys.""" + self.skip_unless_support("save-load.todo") + self.skip_unless_support("search") + self.skip_unless_support("search.unlimited-time-range") + c = async_task_list + + pre_todos = await c.get_todos() + pre_uid_map = {x.icalendar_component["uid"] for x in pre_todos} + + def cleanse(tasks: list) -> list: + return [x for x in tasks if x.icalendar_component["uid"] not in pre_uid_map] + + t1 = await c.add_todo( + summary="1 task overdue", + due=date(2022, 12, 12), + dtstart=date(2022, 10, 11), + uid="async-sort-test1", + ) + t2 = await c.add_todo( + summary="2 task future", + due=datetime.now() + timedelta(hours=15), + dtstart=datetime.now() + timedelta(minutes=15), + uid="async-sort-test2", + ) + t3 = await c.add_todo( + summary="3 task future due", + due=datetime.now() + timedelta(hours=15), + dtstart=datetime(2022, 12, 11, 10, 9, 8), + uid="async-sort-test3", + ) + t4 = await c.add_todo( + summary="4 task priority is set to nine which is the lowest", + priority=9, + uid="async-sort-test4", + ) + t5 = await c.add_todo( + summary="5 task status is set to COMPLETED and this will disappear from the ordinary todo search", + status="COMPLETED", + uid="async-sort-test5", + ) + t6 = await c.add_todo( + summary="6 task has categories", + categories="home,garden,sunshine", + uid="async-sort-test6", + ) + + def check_order(tasks: list, order: tuple) -> None: + assert [str(x.icalendar_component["uid"]) for x in tasks] == [ + "async-sort-test" + str(x) for x in order + ] + + all_tasks = cleanse(await c.search(todo=True, sort_keys=("uid",))) + check_order(all_tasks, (1, 2, 3, 4, 6)) + + all_tasks = cleanse(await c.search(sort_keys=("summary",))) + check_order(all_tasks, (1, 2, 3, 4, 5, 6)) + + all_tasks = cleanse( + await c.search( + sort_keys=("isnt_overdue", "categories", "dtstart", "priority", "status") + ) + ) + check_order(all_tasks, (1, 5, 4, 3, 2, 6)) + + @pytest.mark.asyncio + async def test_date_search_and_freebusy(self, async_calendar: Any) -> None: + """Date-range search and freebusy request on a non-recurring event.""" + self.skip_unless_support("save-load.event") + self.skip_unless_support("search") + self.skip_unless_support("search.time-range.event.old-dates") + c = async_calendar + + e = await c.add_event(ev1_static) + + r = await c.search( + event=True, + start=datetime(2006, 7, 13, 17, 0, 0), + end=datetime(2006, 7, 15, 17, 0, 0), + expand=False, + ) + assert len(r) == 1 + assert str(e.vobject_instance.vevent.uid) == str(r[0].vobject_instance.vevent.uid) + + self.skip_unless_support("save-load.mutable") + + e.data = ev2_static + await e.save() + + r = await c.search( + event=True, + start=datetime(2006, 7, 13, 17, 0, 0), + end=datetime(2006, 7, 15, 17, 0, 0), + expand=False, + ) + assert len(r) == 0 + + r = await c.search( + event=True, + start=datetime(2007, 7, 13, 17, 0, 0), + end=datetime(2007, 7, 15, 17, 0, 0), + expand=False, + ) + assert len(r) == 1 + + self.skip_unless_support("freebusy-query") + + freebusy = await c.freebusy_request( + datetime(2007, 7, 13, 17, 0, 0), datetime(2007, 7, 15, 17, 0, 0) + ) + assert isinstance(freebusy, FreeBusy) + assert freebusy.vobject_instance.vfreebusy + + @pytest.mark.asyncio + async def test_recurring_date_search(self, async_calendar: Any) -> None: + """Recurring event can be found and expanded across multiple occurrences.""" + self.skip_unless_support("save-load.event") + self.skip_unless_support("search.recurrences.includes-implicit.event") + c = async_calendar + + await c.add_event(evr_static) + + r = await c.search( + event=True, + start=datetime(2008, 11, 1, 17, 0, 0), + end=datetime(2008, 11, 3, 17, 0, 0), + expand=False, + ) + assert len(r) == 1 + + r = await c.search( + event=True, + start=datetime(2008, 11, 1, 17, 0, 0), + end=datetime(2008, 11, 3, 17, 0, 0), + expand=True, + ) + assert len(r) == 1 + assert r[0].data.count("END:VEVENT") == 1 + assert r[0].data.count("DTSTART;VALUE=DATE:2008") == 1 + + r2 = await c.search( + event=True, + start=datetime(2008, 11, 1, 17, 0, 0), + end=datetime(2009, 11, 3, 17, 0, 0), + expand=True, + ) + assert len(r2) == 2 + assert "RRULE" not in r2[0].data + assert "RRULE" not in r2[1].data + + @pytest.mark.asyncio + async def test_recurring_date_with_exception_search(self, async_calendar: Any) -> None: + """Bi-weekly event with exception: expanded search returns correct RECURRENCE-IDs.""" + self.skip_unless_support("search") + self.skip_unless_support("search.time-range.event.old-dates") + c = async_calendar + + await c.add_event(evr2_static) + + rc = await c.search( + start=datetime(2024, 3, 31, 0, 0), + end=datetime(2024, 5, 4, 0, 0), + event=True, + expand=True, + ) + rs = await c.search( + start=datetime(2024, 3, 31, 0, 0), + end=datetime(2024, 5, 4, 0, 0), + event=True, + server_expand=True, + ) + + if self.is_supported("save-load.event.recurrences.exception") or self.is_supported( + "search.recurrences.expanded.exception" + ): + assert len(rc) == 2 + assert "RRULE" not in rc[0].data + assert "RRULE" not in rc[1].data + + if self.is_supported("search.recurrences.expanded.event") and self.is_supported( + "search.recurrences.expanded.exception" + ): + assert len(rs) == 2 + + asserts_on_results = [] + if self.is_supported("save-load.event.recurrences.exception"): + asserts_on_results.append(rc) + if self.is_supported("search.recurrences.expanded.exception"): + asserts_on_results.append(rs) + + for r in asserts_on_results: + recurrence_ids = [] + for event in r: + recurrence_id = event.icalendar_component.get( + "RECURRENCE-ID" + ) or event.icalendar_component.get("DTSTART") + assert recurrence_id is not None + assert isinstance(recurrence_id, icalendar.vDDDTypes) + recurrence_ids.append(recurrence_id.dt.replace(tzinfo=None)) + assert set(recurrence_ids) == { + datetime(2024, 4, 11, 12, 30, 0), + datetime(2024, 4, 25, 12, 30, 0), + } + + @pytest.mark.asyncio + async def test_alarm(self, async_calendar: Any) -> None: + """alarm_start/alarm_end search finds events with matching VALARM trigger.""" + c = async_calendar + await c.add_event( + dtstart=datetime(2015, 10, 10, 8, 0, 0), + summary="This is a test event", + uid="async-alarm-test1", + dtend=datetime(2016, 10, 10, 9, 0, 0), + alarm_trigger=timedelta(minutes=-15), + alarm_action="AUDIO", + ) + + self.skip_unless_support("search.time-range.alarm") + + assert ( + len( + await c.search( + event=True, + alarm_start=datetime(2015, 10, 10, 8, 1), + alarm_end=datetime(2015, 10, 10, 8, 7), + ) + ) + == 0 + ) + assert ( + len( + await c.search( + event=True, + alarm_start=datetime(2015, 10, 10, 7, 40), + alarm_end=datetime(2015, 10, 10, 7, 55), + ) + ) + == 1 + ) + + # ==================== Group D – Todos ==================== + + @pytest.mark.asyncio + async def test_todos(self, async_task_list: Any) -> None: + """get_todos() sort order and include_completed filtering.""" + self.skip_unless_support("save-load.todo") + self.skip_unless_support("search.unlimited-time-range") + c = async_task_list + + t1 = await c.add_todo(todo_static) + t2 = await c.add_todo(todo2_static) + t4 = await c.add_todo(todo4_static) + + todos = await c.get_todos() + assert len(todos) == 3 + + def uids(lst: list) -> list: + return [x.vobject_instance.vtodo.uid for x in lst] + + assert uids(todos) == uids([t2, t1, t4]) + + todos = await c.get_todos(sort_keys=("priority",)) + todos2 = await c.get_todos(sort_key="priority") + + def pri(lst: list) -> list: + return [ + x.vobject_instance.vtodo.priority.value + for x in lst + if hasattr(x.vobject_instance.vtodo, "priority") + ] + + assert pri(todos) == pri([t4, t2]) + assert pri(todos2) == pri([t4, t2]) + + todos = await c.get_todos(sort_keys=("summary", "priority")) + assert uids(todos) == uids([t4, t2, t1]) + + @pytest.mark.asyncio + async def test_todo_completion(self, async_task_list: Any) -> None: + """complete() transitions STATUS; pending/include_completed filtering works.""" + self.skip_unless_support("save-load.todo") + self.skip_unless_support("search.unlimited-time-range") + c = async_task_list + + t1 = await c.add_todo(todo_static) + t2 = await c.add_todo(todo2_static) + t3 = await c.add_todo(todo3_static, status="NEEDS-ACTION") + + todos = await c.get_todos() + assert len(todos) == 3 + + await t3.complete() + + todos = await c.get_todos() + assert len(todos) == 2 + + todos = await c.get_todos(include_completed=True) + assert len(todos) == 3 + t3_ = await c.get_todo_by_uid(t3.id) + assert t3_.vobject_instance.vtodo.summary == t3.vobject_instance.vtodo.summary + assert t3_.vobject_instance.vtodo.uid == t3.vobject_instance.vtodo.uid + + await t2.delete() + + todos = await c.get_todos(include_completed=True) + assert len(todos) == 2 + + @pytest.mark.asyncio + async def test_todo_recurring_complete_safe(self, async_task_list: Any) -> None: + """complete(handle_rrule=True, rrule_mode='safe') advances a recurring todo.""" + self.skip_unless_support("save-load.todo") + self.skip_unless_support("search.unlimited-time-range") + c = async_task_list + + assert len(await c.get_todos()) == 0 + t6 = await c.add_todo(todo6_static, status="NEEDS-ACTION") + assert len(await c.get_todos()) == 1 + if self.is_supported("save-load.todo.recurrences.count"): + t8 = await c.add_todo(todo8_static) + assert len(await c.get_todos()) == 2 + else: + assert len(await c.get_todos()) == 1 + + await t6.complete(handle_rrule=True, rrule_mode="safe") + + if not self.is_supported("save-load.todo.recurrences.count"): + assert len(await c.get_todos()) == 1 + assert len(await c.get_todos(include_completed=True)) == 2 + (await c.get_todos())[0].delete() + + self.skip_unless_support("save-load.todo.recurrences.count") + assert len(await c.get_todos()) == 2 + assert len(await c.get_todos(include_completed=True)) == 3 + + await t8.complete(handle_rrule=True, rrule_mode="safe") + assert len(await c.get_todos()) == 2 + await t8.complete(handle_rrule=True, rrule_mode="safe") + await t8.complete(handle_rrule=True, rrule_mode="safe") + assert len(await c.get_todos()) == 1 + assert len(await c.get_todos(include_completed=True)) == 5 + for x in await c.get_todos(include_completed=True): + await x.delete() + + @pytest.mark.asyncio + async def test_todo_recurring_complete_thisandfuture(self, async_task_list: Any) -> None: + """complete(handle_rrule=True, rrule_mode='thisandfuture') truncates the series.""" + self.skip_unless_support("save-load.todo") + self.skip_unless_support("save-load.todo.recurrences.thisandfuture") + c = async_task_list + + assert len(await c.get_todos()) == 0 + t6 = await c.add_todo(todo6_static, status="NEEDS-ACTION") + if self.is_supported("save-load.todo.recurrences.count"): + t8 = await c.add_todo(todo8_static) + assert len(await c.get_todos()) == 2 + else: + assert len(await c.get_todos()) == 1 + + await t6.complete(handle_rrule=True, rrule_mode="thisandfuture") + all_todos = await c.get_todos(include_completed=True) + if not self.is_supported("save-load.todo.recurrences.count"): + assert len(await c.get_todos()) == 1 + assert len(all_todos) == 1 + + self.skip_unless_support("save-load.todo.recurrences.count") + assert len(await c.get_todos()) == 2 + assert len(all_todos) == 2 + + await t8.complete(handle_rrule=True, rrule_mode="thisandfuture") + assert len(await c.get_todos()) == 2 + await t8.complete(handle_rrule=True, rrule_mode="thisandfuture") + await t8.complete(handle_rrule=True, rrule_mode="thisandfuture") + assert len(await c.get_todos()) == 1 + + @pytest.mark.asyncio + async def test_todo_datesearch(self, async_task_list: Any) -> None: + """search() with start/end date range filters todos by DUE/DTSTART.""" + self.skip_unless_support("save-load.todo") + self.skip_unless_support("search.time-range.todo") + self.skip_unless_support("search.time-range.todo.old-dates") + c = async_task_list + + await c.add_todo(todo_static) + await c.add_todo(todo2_static) + await c.add_todo(todo3_static) + await c.add_todo(todo4_static) + await c.add_todo(todo5_static) + await c.add_todo(todo6_static) + + todos = await c.get_todos() + assert len(todos) == 6 + + todos2 = await c.search( + start=datetime(1997, 4, 14), + end=datetime(2015, 5, 14), + todo=True, + expand=True, + split_expanded=False, + include_completed=True, + ) + + implicit_fragile = ( + self.is_supported("search.recurrences.includes-implicit.todo", str) == "fragile" + ) + foo = 5 + if not self.is_supported("search.recurrences.includes-implicit.todo"): + foo -= 1 + if self.check_compatibility_flag( + "vtodo_datesearch_nodtstart_task_is_skipped" + ) or self.check_compatibility_flag( + "vtodo_datesearch_nodtstart_task_is_skipped_in_closed_date_range" + ): + foo -= 2 + elif self.check_compatibility_flag("vtodo_datesearch_notime_task_is_skipped"): + foo -= 1 + + if implicit_fragile: + assert len(todos2) in (foo, foo + 1) + else: + assert len(todos2) == foo + + @pytest.mark.asyncio + async def test_create_journal_list_and_journal_entry(self, async_journal_list: Any) -> None: + """add_journal() and get_journals() work; search(journal=True) returns entries.""" + self.skip_unless_support("save-load.journal") + c = async_journal_list + + j1 = await c.add_journal(journal_static) + journals = await c.get_journals() + assert len(journals) == 1 + j1_ = await c.get_journal_by_uid(j1.id) + assert j1_.get_icalendar_instance() == journals[0].get_icalendar_instance() + + await c.add_journal( + dtstart=date(2011, 11, 11), + summary="A childbirth in a hospital in Kupchino", + description="A quick birth, in the middle of the night", + uid="async-ctuid1", + ) + assert len(await c.get_journals()) == 2 + assert len(await c.search(journal=True)) == 2 + assert await c.get_todos() == [] + assert await c.get_events() == [] + + # ==================== Group E – Properties & Meta ==================== + + def _skip_on_compatibility_flag(self, flag: str) -> None: + if self.check_compatibility_flag(flag): + pytest.skip(f"Test skipped due to compatibility flag: {flag}") + + @pytest.mark.asyncio + async def test_support(self, async_client: Any) -> None: + """check_dav_support / check_cdav_support / check_scheduling_support.""" + self._skip_on_compatibility_flag("dav_not_supported") + assert await async_client.check_dav_support() + assert await async_client.check_cdav_support() + if self.is_supported("scheduling", return_type=str) != "unknown": + assert await async_client.check_scheduling_support() == self.is_supported("scheduling") + + @pytest.mark.asyncio + async def test_scheduling_info(self, async_client: Any) -> None: + """calendar_user_address_set() and get_vcal_address() on the principal.""" + self.skip_unless_support("scheduling.calendar-user-address-set") + principal = await async_client.principal() + calendar_user_address_set = await principal.calendar_user_address_set() + me_a_participant = await principal.get_vcal_address() + + @pytest.mark.asyncio + async def test_scheduling_mailboxes(self, async_client: Any) -> None: + """schedule_inbox() and schedule_outbox() return without error.""" + self.skip_unless_support("scheduling.mailbox") + principal = await async_client.principal() + inbox = await principal.schedule_inbox() + outbox = await principal.schedule_outbox() + + @pytest.mark.asyncio + async def test_propfind(self, async_client: Any) -> None: + """Raw XML propfind returns a multistatus response.""" + from caldav.lib.python_utilities import to_local + + self._skip_on_compatibility_flag("propfind_allprop_failure") + principal = await async_client.principal() + foo = await async_client.propfind( + principal.url, + props='' + '' + " " + "", + ) + assert "multistatus" in to_local(foo.raw) + + @pytest.mark.asyncio + async def test_get_calendar_home_set(self, async_client: Any) -> None: + """get_properties([CalendarHomeSet()]) must contain the key.""" + from caldav.elements import cdav + + principal = await async_client.principal() + chs = await principal.get_properties([cdav.CalendarHomeSet()]) + assert "{urn:ietf:params:xml:ns:caldav}calendar-home-set" in chs + + @pytest.mark.asyncio + async def test_get_default_calendar(self, async_client: Any) -> None: + """get_calendars() must be non-empty (gated on get-current-user-principal.has-calendar).""" + self.skip_unless_support("get-current-user-principal.has-calendar") + principal = await async_client.principal() + assert len(await principal.get_calendars()) != 0 + + @pytest.mark.asyncio + async def test_get_calendar(self, async_calendar: Any) -> None: + """Calendar has a URL; repr() includes class name and URL.""" + c = async_calendar + assert c.url is not None + repr_ = repr(c) + assert "Calendar" in repr_ + assert str(c.url) in repr_ + + @pytest.mark.asyncio + async def test_principal(self, async_client: Any) -> None: + """All items returned by get_calendars() are Calendar instances.""" + from caldav.aio import AsyncCalendar + + principal = await async_client.principal() + collections = await principal.get_calendars() + for c in collections: + assert isinstance(c, AsyncCalendar) + + @pytest.mark.asyncio + async def test_principals(self, async_client: Any) -> None: + """caldav.principals() list-all and by-name search.""" + from caldav.aio import AsyncPrincipal + + self.skip_unless_support("principal-search") + if self.is_supported("principal-search.by-name.self"): + principal = await async_client.principal() + my_name = await principal.get_display_name() + my_principals = await async_client.principals(name=my_name) + assert isinstance(my_principals, list) + assert len(my_principals) == 1 + assert my_principals[0].url == principal.url + + self.skip_unless_support("principal-search.list-all") + all_principals = await async_client.principals() + assert isinstance(all_principals, list) + if all_principals: + assert all(isinstance(x, AsyncPrincipal) for x in all_principals) + + @pytest.mark.asyncio + async def test_create_delete_calendar(self, async_client: Any) -> None: + """make_calendar() creates; delete() removes it; auto-creation check.""" + from caldav.lib import error + + self.skip_unless_support("create-calendar") + self.skip_unless_support("delete-calendar") + from caldav.aio import AsyncPrincipal + from caldav.lib.error import AuthorizationError, NotFoundError + + from .fixture_helpers import cleanup_calendar_objects + + principal = None + try: + principal = await AsyncPrincipal.create(async_client) + except (NotFoundError, AuthorizationError): + pytest.skip("Cannot discover principal") + + cal_id = "pythoncaldav-async-createdelete-test" + try: + existing = principal.calendar(cal_id=cal_id) + await cleanup_calendar_objects(existing) + await existing.delete() + except Exception: + pass + + c = await principal.make_calendar(name="Yep", cal_id=cal_id) + assert c.url is not None + events = await c.get_events() + assert len(events) == 0 + await c.delete() + + @pytest.mark.asyncio + async def test_calendar_by_full_url(self, async_calendar: Any, async_client: Any) -> None: + """Passing a full URL as cal_id should find the same calendar.""" + principal = await async_client.principal() + samecal = principal.calendar(cal_id=str(async_calendar.url)) + assert async_calendar.url.canonical() == samecal.url.canonical() + samecal2 = principal.calendar(cal_id=async_calendar.url) + assert async_calendar.url.canonical() == samecal2.url.canonical() + + @pytest.mark.asyncio + async def test_find_calendar_owner(self, async_calendar: Any, async_client: Any) -> None: + """get_property(Owner()) returns the owner URL; construct Principal from it.""" + from caldav.aio import AsyncPrincipal + from caldav.elements import dav + + owner = await async_calendar.get_property(dav.Owner()) + if owner is None: + return + + if self.is_supported("scheduling.calendar-user-address-set"): + owner_principal = AsyncPrincipal(client=async_client, url=owner) + address = await owner_principal.get_vcal_address() + assert address is not None + + @pytest.mark.asyncio + async def test_set_calendar_properties(self, async_client: Any) -> None: + """get_properties/set_properties round-trip for DisplayName.""" + from caldav.aio import AsyncPrincipal + from caldav.elements import dav + from caldav.lib.error import AuthorizationError, NotFoundError + + from .fixture_helpers import cleanup_calendar_objects + + self.skip_unless_support("create-calendar.set-displayname") + self.skip_unless_support("delete-calendar") + self.skip_unless_support("create-calendar") + + principal = None + try: + principal = await AsyncPrincipal.create(async_client) + except (NotFoundError, AuthorizationError): + pytest.skip("Cannot discover principal") + + cal_id = "pythoncaldav-async-props-test" + try: + existing = principal.calendar(cal_id=cal_id) + await cleanup_calendar_objects(existing) + await existing.delete() + except Exception: + pass + + c = await principal.make_calendar(name="Yep", cal_id=cal_id) + try: + props = await c.get_properties([dav.DisplayName()]) + assert "Yep" == props[dav.DisplayName.tag] + + await c.set_properties([dav.DisplayName("hooray")]) + props = await c.get_properties([dav.DisplayName()]) + assert props[dav.DisplayName.tag] == "hooray" + finally: + try: + await c.delete() + except Exception: + pass + + # ==================== Group F – Regressions ==================== + + @pytest.mark.asyncio + async def test_issue_397(self, async_calendar: Any) -> None: + """Recurring VEVENT with RECURRENCE-ID override stores and retrieves correctly.""" + self.skip_unless_support("save-load.event.recurrences.exception") + c = async_calendar + await c.add_event( + """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//PeterB//caldav//en_DK +BEGIN:VEVENT +SUMMARY:recurrence with attendee one single item +DTSTART;TZID=Europe/Zurich:20240101T090000 +DTEND;TZID=Europe/Zurich:20240101T180000 +UID:test1 +DESCRIPTION:this is the recurrent series +TRANSP:OPAQUE +RRULE:FREQ=WEEKLY;BYDAY=TU,WE,TH +END:VEVENT +BEGIN:VEVENT +SUMMARY:single item +DTSTART;TZID=Europe/Zurich:20240605T090000 +DTEND;TZID=Europe/Zurich:20240605T170000 +UID:test1 +DESCRIPTION:this is the single item assigning a attendee to just one event +ATTENDEE:foo.bar@corge.baz +RECURRENCE-ID:20240605T070000Z +END:VEVENT +END:VCALENDAR +""" + ) + object_by_id = await c.get_object_by_uid("test1", comp_class=Event) + instance = object_by_id.icalendar_instance + events = [e for e in instance.subcomponents if isinstance(e, icalendar.Event)] + assert len(events) == 2 + + @pytest.mark.asyncio + async def test_issue_399_change_attendee_status(self, async_client: Any) -> None: + """change_attendee_status() works with username-as-email fallback (issue #399).""" + self.skip_unless_support("scheduling") + username = getattr(async_client, "username", None) + if not username or "@" not in str(username): + pytest.skip("Client username is not an email address; cannot build matching ATTENDEE") + my_email = "mailto:" + username + + invite_data = ( + "BEGIN:VCALENDAR\r\nVERSION:2.0\r\nPRODID:-//Test//Test//EN\r\nMETHOD:REQUEST\r\n" + "BEGIN:VEVENT\r\n" + f"UID:test-issue-399-{uuid.uuid4()}@test.example\r\n" + f"DTSTAMP:{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}\r\n" + f"DTSTART:{(datetime.now(timezone.utc) + timedelta(days=10)).strftime('%Y%m%dT%H%M%SZ')}\r\n" + f"DTEND:{(datetime.now(timezone.utc) + timedelta(days=10, hours=1)).strftime('%Y%m%dT%H%M%SZ')}\r\n" + "SUMMARY:Test invite for issue 399\r\n" + "ORGANIZER:mailto:organizer@test.example\r\n" + f"ATTENDEE;PARTSTAT=NEEDS-ACTION:{my_email}\r\n" + "END:VEVENT\r\nEND:VCALENDAR\r\n" + ) + ev = Event(client=async_client, data=invite_data) + ## Pass the email explicitly since change_attendee_status() without attendee + ## calls self.client.principal() which is async-only and can't work synchronously. + ev.change_attendee_status(attendee=username, partstat="ACCEPTED") + attendee = ev.icalendar_component["attendee"] + assert attendee.params.get("PARTSTAT") == "ACCEPTED" + + @pytest.mark.asyncio + async def test_add_organizer_full(self, async_client: Any, async_calendar: Any) -> None: + """add_organizer() with explicit string and vCalAddress args (pure in-memory).""" + from icalendar import vCalAddress + + c = async_calendar + event = Event(client=async_client, data=ev1(), parent=c) + + event.add_organizer("organizer@example.com") + org = event.icalendar_component.get("organizer") + assert org is not None + assert "organizer@example.com" in str(org) + + addr = vCalAddress("mailto:addr@example.com") + event.add_organizer(addr) + org = event.icalendar_component.get("organizer") + assert str(org) == "mailto:addr@example.com" + + @pytest.mark.asyncio + async def test_change_attendee_status_with_email_given( + self, async_calendar: Any, async_client: Any + ) -> None: + """change_attendee_status(attendee=email) updates PARTSTAT correctly.""" + self.skip_unless_support("save-load.event") + c = async_calendar + event = await c.add_event( + uid="test1", + dtstart=datetime(2015, 10, 10, 8, 7, 6), + dtend=datetime(2015, 10, 10, 9, 7, 6), + ical_fragment="ATTENDEE;ROLE=OPT-PARTICIPANT;PARTSTAT=TENTATIVE:MAILTO:testuser@example.com", + ) + event.change_attendee_status(attendee="testuser@example.com", PARTSTAT="ACCEPTED") + await event.save() + event2 = await c.get_event_by_uid("test1") + + @pytest.mark.asyncio + async def test_add_orphaned_recurrence(self, async_calendar: Any) -> None: + """add_event() with orphaned RECURRENCE-ID must not raise NotFoundError.""" + from caldav.lib import error + + self.skip_unless_support("save-load.event") + c = async_calendar + orphaned_recurrence = """BEGIN:VCALENDAR +VERSION:2.0 +PRODID:-//Example//CalDAV test//EN +BEGIN:VEVENT +UID:orphaned-recurrence-test-uid@example.com +DTSTAMP:20200101T000000Z +DTSTART:20200115T100000Z +DTEND:20200115T110000Z +RECURRENCE-ID:20200115T100000Z +SUMMARY:Orphaned recurrence with no master +END:VEVENT +END:VCALENDAR""" + try: + await c.add_event(orphaned_recurrence) + except error.NotFoundError: + pytest.fail( + "add_event() raised NotFoundError for an orphaned recurrence; " + "see commit 7269f179 (graceful adding of orphaned recurrences)" + ) + except Exception: + pass + + @pytest.mark.asyncio + async def test_edit_single_recurrence(self, async_calendar: Any) -> None: + """Expand a recurring event, edit one recurrence, verify only that day changed.""" + self.skip_unless_support("search.recurrences.includes-implicit.event") + self.skip_unless_support("search.text") + cal = async_calendar + + await cal.add_event( + uid="test1", + summary="daily test", + dtstart=datetime(2015, 1, 1, 8, 7, 6), + dtend=datetime(2015, 1, 1, 9, 7, 6), + rrule={"FREQ": "DAILY"}, + ) + + async def search(month): + recurrence = await cal.search( + event=True, + start=datetime(2015, month, 1), + end=datetime(2015, month, 2), + expand=True, + ) + assert len(recurrence) == 1 + return recurrence[0] + + async def summary_by_month(month): + return (await search(month)).icalendar_component["summary"] + + recurrence = await search(7) + recurrence.icalendar_component["summary"] = "half a year of daily testing" + await recurrence.save() + + assert await summary_by_month(6) == "daily test" + assert await summary_by_month(7) == "half a year of daily testing" + assert await summary_by_month(8) == "daily test" + + recurrence = await search(2) + recurrence.icalendar_component["summary"] = "one month of daily testing" + await recurrence.save() + + assert await summary_by_month(1) == "daily test" + assert await summary_by_month(2) == "one month of daily testing" + assert await summary_by_month(7) == "half a year of daily testing" + + recurrence = await search(7) + recurrence.icalendar_component["summary"] = "six months of daily testing" + await recurrence.save() + assert await summary_by_month(7) == "six months of daily testing" + + recurrence = await search(9) + recurrence.icalendar_component["summary"] = "daily testing" + await recurrence.save(all_recurrences=True) + assert await summary_by_month(1) == "daily testing" + assert await summary_by_month(2) == "one month of daily testing" + assert await summary_by_month(3) == "daily testing" + assert await summary_by_month(7) == "six months of daily testing" + + # ==================== Group G – Auth errors & misc ==================== + + @pytest.mark.asyncio + async def test_wrong_auth_type(self, async_client: Any) -> None: + """At least one of digest/bearer auth_type must raise AuthorizationError.""" + from caldav.lib import error + + if not self.server.password or self.server.password == "any-password-seems-to-work": + pytest.skip("Server does not require a password") + + raised = False + for auth_type in ("digest", "bearer"): + try: + c = await self._make_async_client_with_params(auth_type=auth_type) + await c.principal() + except error.AuthorizationError: + raised = True + break + assert raised, "Neither digest nor bearer auth_type raised AuthorizationError" + + @pytest.mark.asyncio + async def test_wrong_password(self, async_client: Any) -> None: + """Bad password must raise AuthorizationError.""" + import codecs + + from caldav.lib import error + + self.skip_unless_support("wrong-password-check") + if not self.server.password or self.server.password == "any-password-seems-to-work": + pytest.skip("Server does not require a password") + + with pytest.raises(error.AuthorizationError): + bad = await self._make_async_client_with_params( + password=codecs.encode(self.server.password, "rot13") + "!" + ) + await bad.principal() + + @pytest.mark.asyncio + async def test_create_child_parent(self, async_calendar: Any) -> None: + """Add parent/child/grandparent events; verify RELATED-TO structure.""" + self.skip_unless_support("save-load.event") + self.skip_unless_support("save-load.icalendar.related-to") + c = async_calendar + parent = await c.add_event( + dtstart=datetime(2022, 12, 26, 19, 15), + dtend=datetime(2022, 12, 26, 20, 0), + summary="parent event", + uid="ctuid1", + ) + child = await c.add_event( + dtstart=datetime(2022, 12, 26, 19, 17), + dtend=datetime(2022, 12, 26, 20, 0), + summary="child event", + parent=[parent.id], + uid="ctuid2", + ) + grandparent = await c.add_event( + dtstart=datetime(2022, 12, 26, 19, 0), + dtend=datetime(2022, 12, 26, 20, 0), + summary="grandparent event", + child=[parent.id], + uid="ctuid3", + ) + + parent_ = await c.get_event_by_uid(parent.id) + child_ = await c.get_event_by_uid(child.id) + grandparent_ = await c.get_event_by_uid(grandparent.id) + + rt = grandparent_.icalendar_component["RELATED-TO"] + if isinstance(rt, list): + assert len(rt) == 1 + rt = rt[0] + assert rt == parent.id + assert rt.params["RELTYPE"] == "CHILD" + + rt = parent_.icalendar_component["RELATED-TO"] + assert len(rt) == 2 + assert set([str(rt[0]), str(rt[1])]) == set([grandparent.id, child.id]) + assert set([rt[0].params["RELTYPE"], rt[1].params["RELTYPE"]]) == set(["CHILD", "PARENT"]) + + rt = child_.icalendar_component["RELATED-TO"] + if isinstance(rt, list): + assert len(rt) == 1 + rt = rt[0] + assert rt == parent.id + assert rt.params["RELTYPE"] == "PARENT" + + foo = await parent_.get_relatives(reltypes={"PARENT"}) + assert len(foo) == 1 + assert len(foo["PARENT"]) == 1 + + @pytest.mark.asyncio + async def test_offset_url(self, async_client: Any, async_calendar: Any) -> None: + """Connecting with url=principal.url or url=calendar.url still works.""" + principal = await async_client.principal() + urls = [principal.url, async_calendar.url] + for url in urls: + conn = await self._make_async_client_with_params(url=url) + p = await conn.principal() + calendars = await p.get_calendars() + + @pytest.mark.asyncio + async def test_utf8_event(self, async_client: Any) -> None: + """Calendar with non-ASCII name; event with non-ASCII summary.""" + self.skip_unless_support("save-load.event") + self.skip_unless_support("create-calendar") + + from caldav.aio import AsyncPrincipal + from caldav.lib.error import AuthorizationError, NotFoundError + + from .fixture_helpers import cleanup_calendar_objects + + principal = None + try: + principal = await AsyncPrincipal.create(async_client) + except (NotFoundError, AuthorizationError): + pytest.skip("Cannot discover principal") + + cal_id = "pythoncaldav-async-utf8-test" + try: + existing = await principal.calendar(cal_id=cal_id) + await cleanup_calendar_objects(existing) + await existing.delete() + except Exception: + pass + + c = await principal.make_calendar(name="Yølp", cal_id=cal_id) + try: + await c.add_event(ev1_static.replace("Bastille Day Party", "Bringebærsyltetøyfestival")) + events = await c.get_events() + if "zimbra" not in str(c.url): + assert len(events) == 1 + finally: + try: + await c.delete() + except Exception: + pass + + @pytest.mark.asyncio + async def test_create_calendar_and_event_from_vobject(self, async_calendar: Any) -> None: + """Add event from vobject.readOne(); verify count.""" + vobject = pytest.importorskip("vobject") + self.skip_unless_support("save-load.event") + c = async_calendar + cnt = len(await c.get_events()) + ve1 = vobject.readOne(ev1_static) + await c.add_event(ve1) + cnt += 1 + events = await c.get_events() + assert len(events) == cnt + + @pytest.mark.asyncio + async def test_create_event_from_ical(self, async_calendar: Any) -> None: + """Add event from icalendar.Calendar and icalendar.Event objects.""" + self.skip_unless_support("save-load.event") + c = async_calendar + try: + icalcal = icalendar.Calendar.new() + except Exception: + pytest.skip("Newer icalendar version required (icalendar 7+)") + + start = datetime.now() + timedelta(days=30) + end = start + timedelta(hours=1) + icalevent = icalendar.Event.new( + uid="ctuid1", + start=start, + end=end, + summary="This is a test event", + ) + icalcal.add_component(icalevent) + + for obj in [icalcal, icalevent]: + await c.add_event(obj) + events = await c.get_events() + assert any(e.icalendar_component["uid"] == "ctuid1" for e in events), ( + f"Event with uid ctuid1 not found after adding {type(obj).__name__}" + ) + + @pytest.mark.asyncio + async def test_set_due(self, async_task_list: Any) -> None: + """set_due() updates DUE and optionally moves DTSTART.""" + self.skip_unless_support("save-load.todo") + c = async_task_list + utc = timezone.utc + + some_todo = await c.add_todo( + dtstart=datetime(2022, 12, 26, 19, 15, tzinfo=utc), + due=datetime(2022, 12, 26, 20, 0, tzinfo=utc), + summary="Some task", + uid="ctuid5", + ) + some_todo.set_due(datetime(2022, 12, 26, 20, 10, tzinfo=utc)) + assert some_todo.icalendar_component["DUE"].dt == datetime(2022, 12, 26, 20, 10, tzinfo=utc) + assert some_todo.icalendar_component["DTSTART"].dt == datetime( + 2022, 12, 26, 19, 15, tzinfo=utc + ) + + some_todo.set_due(datetime(2022, 12, 26, 20, 20, tzinfo=utc), move_dtstart=True) + assert some_todo.icalendar_component["DUE"].dt == datetime(2022, 12, 26, 20, 20, tzinfo=utc) + assert some_todo.icalendar_component["DTSTART"].dt == datetime( + 2022, 12, 26, 19, 25, tzinfo=utc + ) + + await some_todo.save() + + @pytest.mark.asyncio + async def test_create_task_list_and_todo(self, async_task_list: Any) -> None: + """add_todo(); get_todos(); get_object_by_uid().""" + self.skip_unless_support("save-load.todo") + c = async_task_list + t = await c.add_todo(uid="well_known_t1", summary="Well-known async task") + todos = await c.get_todos() + assert any(str(x.icalendar_component.get("uid", "")) == "well_known_t1" for x in todos) + obj = await c.get_object_by_uid("well_known_t1") + assert obj.component["summary"] == "Well-known async task" + class _AsyncTestSchedulingBase: """