diff --git a/perma_web/api/tests/test_link_authorization.py b/perma_web/api/tests/test_link_authorization.py index b46d64cf9..a920f13a9 100644 --- a/perma_web/api/tests/test_link_authorization.py +++ b/perma_web/api/tests/test_link_authorization.py @@ -65,11 +65,11 @@ def setUp(self): 'title': 'This is a new title', 'default_to_screenshot_view': True} - # mock the IA helpers that are called when the privacy toggle is used + # mock the IA tasks that are called when the privacy toggle is used self.ia_upload_from_privacy_toggle_patcher = patch('api.views.links.upload_link_to_internet_archive.delay') - self.ia_deletion_from_privacy_toggle_patcher = patch('api.views.links.request_internet_archive_deletion_from_privacy_toggle') + self.ia_deletion_from_privacy_toggle_patcher = patch('api.views.links.delete_link_from_daily_item.delay') self.mock_upload_link_to_internet_archive_delay = self.ia_upload_from_privacy_toggle_patcher.start() - self.mock_request_internet_archive_deletion_from_privacy_toggle = self.ia_deletion_from_privacy_toggle_patcher.start() + self.mock_delete_link_from_daily_item_delay = self.ia_deletion_from_privacy_toggle_patcher.start() def tearDown(self): self.ia_upload_from_privacy_toggle_patcher.stop() diff --git a/perma_web/api/views/links.py b/perma_web/api/views/links.py index 104c6b3e7..3e09189af 100644 --- a/perma_web/api/views/links.py +++ b/perma_web/api/views/links.py @@ -14,8 +14,8 @@ from rest_framework import status from perma.celery_tasks import ( + delete_link_from_daily_item, run_next_capture, - request_internet_archive_deletion_from_privacy_toggle, upload_link_to_internet_archive ) from perma.models import Capture, CaptureJob, Folder, Link, LinkBatch @@ -363,7 +363,7 @@ def patch(self, request, guid, format=None): link.internet_archive_upload_status = 'deletion_required' link.save(update_fields=["internet_archive_upload_status"]) logger.info(f"Link {link.guid} was toggled to private. Requesting the IA deletion.") - request_internet_archive_deletion_from_privacy_toggle(link) + delete_link_from_daily_item.delay(link.guid) # include remaining links in response links_remaining = request.user.get_links_remaining() diff --git a/perma_web/perma/admin.py b/perma_web/perma/admin.py index 28e1a6feb..8ac594707 100644 --- a/perma_web/perma/admin.py +++ b/perma_web/perma/admin.py @@ -744,8 +744,8 @@ class LinkBatchAdmin(admin.ModelAdmin): class InternetArchiveItemAdmin(admin.ModelAdmin): - list_display = ['identifier', 'confirmed_exists', 'cached_is_dark', 'added_date', 'span', 'cached_file_count', 'tasks_in_progress', 'complete', 'last_derived', 'derive_required', 'internet_archive_link'] - list_filter = [IAIdentifierFilter, IAItemTypeFilter, IAItemHasTasksFilter, 'confirmed_exists', 'complete', 'cached_is_dark'] + list_display = ['identifier', 'confirmed_exists', 'cached_is_dark', 'added_date', 'span', 'cached_file_count', 'tasks_in_progress', 'initial_uploads_complete', 'last_derived', 'derive_required', 'internet_archive_link'] + list_filter = [IAIdentifierFilter, IAItemTypeFilter, IAItemHasTasksFilter, 'confirmed_exists', 'initial_uploads_complete', 'cached_is_dark'] readonly_fields = ['identifier', 'cached_is_dark', 'added_date', 'span', 'cached_title', 'cached_description', 'cached_file_count', 'last_derived'] paginator = FasterAdminPaginator diff --git a/perma_web/perma/celery_tasks.py b/perma_web/perma/celery_tasks.py index 2dcc492a2..0856694d9 100644 --- a/perma_web/perma/celery_tasks.py +++ b/perma_web/perma/celery_tasks.py @@ -36,6 +36,11 @@ from perma.models import LinkUser, Link, Capture, \ CaptureJob, InternetArchiveItem, InternetArchiveFile, Folder, Sponsorship, UserOrganizationAffiliation +from perma.models.internet_archive import ( + UNEDITABLE_DAILY_ITEM_DATE_STRINGS, + daily_item_backlog_queryset_filter, + uneditable_daily_item_identifiers, +) from perma.exceptions import PermaPaymentsCommunicationException, ScoopAPINetworkException, ScoopAPIException from perma.utils import ( remove_whitespace, @@ -572,12 +577,9 @@ def queue_batched_tasks(task, query, batch_size=1000, **kwargs): logger.info(f"Queued {batches_queued} batches of size {batch_size}{' and a single batch of size ' + str(remainder) if remainder else ''}, pks {first}-{last}.") -def clear_link_ia_status_after_privacy_toggle_jobs(link): - """ - Clear internet_archive_upload_status field of a link. - Only applies to links marked upload_or_reupload_required or deletion_required after a privacy toggle. - """ - if link.internet_archive_upload_status in ('upload_or_reupload_required', 'deletion_required'): +def clear_pending_ia_status(link): + """Unset the privacy-toggle IA sync flag so Beat stops retrying this link.""" + if link.internet_archive_upload_status: link.internet_archive_upload_status = None link.save(update_fields=['internet_archive_upload_status']) @@ -594,6 +596,11 @@ def upload_link_to_internet_archive(link_guid, attempts=0, timeouts=0): link = Link.objects.get(guid=link_guid) if not link.can_upload_to_internet_archive(): logger.info(f"Queued Link {link_guid} no longer eligible for upload.") + # Keep status set while waiting for playback cache (cached_can_play_back is None on + # a discoverable link). Clear when ineligibility is definitive: not discoverable, + # or playback cache has run and the capture can't play back. + if link.cached_can_play_back is not None or not link.is_discoverable(): + clear_pending_ia_status(link) return # Get or create the appropriate InternetArchiveItem object for this link @@ -617,6 +624,7 @@ def upload_link_to_internet_archive(link_guid, attempts=0, timeouts=0): if perma_file: if perma_file.status == 'confirmed_present': logger.info(f"Not uploading {link_guid} to {identifier}: our records indicate it is already present.") + clear_pending_ia_status(link) return elif perma_file.status in ['deletion_attempted', 'deletion_submitted']: # If we find ourselves here, something has gotten very mixed up indeed. We probably need a human to have a look. @@ -629,6 +637,7 @@ def upload_link_to_internet_archive(link_guid, attempts=0, timeouts=0): logger.info(f"Uploading {link_guid} (previously deleted) to {identifier}.") else: logger.warning(f"Not uploading {link_guid} to {identifier}: task not implemented for InternetArchiveFiles with status '{perma_file.status}'.") + clear_pending_ia_status(link) return else: # A fresh one. Create the InternetArchiveFile here. @@ -823,12 +832,7 @@ def queue_file_uploaded_confirmation_tasks(limit=None): file_ids = InternetArchiveFile.objects.filter( status='upload_submitted' ).exclude( - item_id__in=[ - 'daily_perma_cc_2022-07-25', - 'daily_perma_cc_2022-07-21', - 'daily_perma_cc_2022-07-20', - 'daily_perma_cc_2022-07-19' - ] + item_id__in=uneditable_daily_item_identifiers() ).values_list( 'id', flat=True )[:limit] @@ -857,7 +861,7 @@ def confirm_file_uploaded_to_internet_archive(file_id, attempts=0, connection_er if perma_file.status == 'confirmed_present': logger.info(f"InternetArchiveFile {file_id} ({link.guid}) already confirmed to be uploaded to {perma_item.identifier}.") - clear_link_ia_status_after_privacy_toggle_jobs(link) + clear_pending_ia_status(link) return ia_session = get_ia_session() @@ -924,15 +928,24 @@ def confirm_file_uploaded_to_internet_archive(file_id, attempts=0, connection_er 'cached_file_count', 'tasks_in_progress' ]) - clear_link_ia_status_after_privacy_toggle_jobs(link) + clear_pending_ia_status(link) logger.info(f"Confirmed upload of {link.guid} to {perma_item.identifier}.") @shared_task(acks_late=True) def delete_link_from_daily_item(link_guid, attempts=0): - perma_file = InternetArchiveFile.objects.select_related('item').get(link_id=link_guid, item__span__isempty=False) + perma_file = InternetArchiveFile.objects.select_related('item', 'link').filter( + link_id=link_guid, + item__span__isempty=False, + ).first() + if not perma_file: + logger.info(f"No daily InternetArchiveFile for {link_guid}; nothing to delete.") + clear_pending_ia_status(Link.objects.get(guid=link_guid)) + return + perma_item = perma_file.item + link = perma_file.link identifier = perma_item.identifier def retry_deletion(attempt_count): @@ -942,6 +955,7 @@ def retry_deletion(attempt_count): if perma_file.status == 'confirmed_absent': logger.info(f"The daily InternetArchiveFile for {link_guid} is already confirmed absent from {identifier}.") + clear_pending_ia_status(link) return elif perma_file.status in ['upload_attempted', 'upload_submitted']: # If we find ourselves here, something has gotten very mixed up indeed. We probably need a human to have a look. @@ -954,6 +968,7 @@ def retry_deletion(attempt_count): logger.info(f"Deleting {link_guid} from {identifier}.") else: logger.warning(f"Not deleting {link_guid} from {identifier}: task not implemented for InternetArchiveFiles with status '{perma_file.status}'.") + clear_pending_ia_status(link) return # Record that we are attempting a deletion @@ -1079,7 +1094,7 @@ def confirm_file_deleted_from_daily_item(file_id, attempts=0, connection_errors= if perma_file.status == 'confirmed_absent': logger.info(f"InternetArchiveFile {file_id} ({guid}) already confirmed absent from {perma_item.identifier}.") - clear_link_ia_status_after_privacy_toggle_jobs(link) + clear_pending_ia_status(link) return ia_session = get_ia_session() @@ -1138,7 +1153,7 @@ def confirm_file_deleted_from_daily_item(file_id, attempts=0, connection_errors= 'cached_file_count', 'tasks_in_progress' ]) - clear_link_ia_status_after_privacy_toggle_jobs(link) + clear_pending_ia_status(link) logger.info(f"Confirmed deletion of {guid} from {perma_item.identifier}.") @@ -1164,12 +1179,7 @@ def queue_file_deleted_confirmation_tasks(limit=100): file_ids = InternetArchiveFile.objects.filter( status='deletion_submitted' ).exclude( - item_id__in=[ - 'daily_perma_cc_2022-07-25', - 'daily_perma_cc_2022-07-21', - 'daily_perma_cc_2022-07-20', - 'daily_perma_cc_2022-07-19' - ] + item_id__in=uneditable_daily_item_identifiers() ).values_list( 'id', flat=True )[:limit] @@ -1215,20 +1225,6 @@ def queue_internet_archive_deletions(limit=None): logger.info(f"Queued { len(queued) } links for deletion ({queued[0]} through {queued[-1]}).") -def request_internet_archive_deletion_from_privacy_toggle(link): - """ - Queue deletion of a link from its daily Internet Archive item, if a deletable file exists. - Triggered by the privacy toggle in the API. - """ - is_deletable = InternetArchiveFile.deletable_from_privacy_toggle().filter(link_id=link.guid).exists() - - if not is_deletable: - return False - - delete_link_from_daily_item.delay(link.guid) - return True - - def dispatch_link_guid_tasks(task, queryset=None, first_link=None, rest_of_links=None, log_label=None): """ Dispatch task.delay(link.guid) for each link, returning queued guids. @@ -1314,10 +1310,10 @@ def queue_internet_archive_uploads_for_date(date_string, limit=100): ) try: item = InternetArchiveItem.objects.get(identifier=identifier) - # Don't mark an item complete if it's yesterday's + # Don't mark initial uploads complete if it's yesterday's. if timezone.now() - item.span.lower > timedelta(days=3): - item.complete = True - item.save(update_fields=['complete']) + item.initial_uploads_complete = True + item.save(update_fields=['initial_uploads_complete']) logger.info(f"Found no pending links: marked IA Item {item.identifier} complete.") else: logger.info(f"Found no pending links for recent IA Item {item.identifier}; not marking complete.") @@ -1326,8 +1322,9 @@ def queue_internet_archive_uploads_for_date(date_string, limit=100): return 0 -@shared_task -def conditionally_queue_internet_archive_uploads_for_date_range(start_date_string, end_date_string, daily_limit=100, limit=None): +def _queue_internet_archive_initial_uploads_for_date_range( + start_date_string, end_date_string, daily_limit=100, limit=None +): """ Queues up to settings.INTERNET_ARCHIVE_MAX_SIMULTANEOUS_UPLOADS links for upload to IA, spread over a number of days such that no more than `daily_limit` are ever queued for a particular day. May @@ -1336,17 +1333,17 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin - there are submitted-but-as-of-yet-unfinished upload requests being processed by IA - there are not enough qualifying links in the date range - there are not enough qualifying links in the date range, while respecting daily_limit + + Returns the number of upload tasks queued, or None if queuing was skipped. """ tasks_in_ia_queue = redis.from_url(settings.CELERY_BROKER_URL).llen('ia') if tasks_in_ia_queue: logger.info(f"Skipped the queuing of file upload tasks: {tasks_in_ia_queue} task{pluralize(tasks_in_ia_queue)} in the ia queue.") - return + return None if not start_date_string: oldest_incomplete_daily_item_in_backlog = InternetArchiveItem.objects.filter( - span__isempty=False, - span__gt=('2021-11-10', '2021-11-11'), - complete=False, + **daily_item_backlog_queryset_filter(complete=False), ).order_by('span').first() start = oldest_incomplete_daily_item_in_backlog.span.lower.date() else: @@ -1364,7 +1361,7 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin if to_queue < 0: logger.error(f"Something is amiss with the IA upload process: InternetArchiveItem.inflight_task_count ({InternetArchiveItem.inflight_task_count()}) is larger than settings.INTERNET_ARCHIVE_MAX_SIMULTANEOUS_UPLOADS.") - return + return None if to_queue: @@ -1373,9 +1370,8 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin for day in date_range(start, end, timedelta(days=1)): if total_queued < to_queue: date_string = day.strftime('%Y-%m-%d') - if date_string in ['2022-07-25', '2022-07-21', '2022-07-20', '2022-07-19']: - # for now, skip these days: by accident, we don't presently have edit - # privileges for the IA Items with these identifiers + if date_string in UNEDITABLE_DAILY_ITEM_DATE_STRINGS: + # We do not presently have edit privileges for these IA Items. continue identifier = InternetArchiveItem.DAILY_IDENTIFIER.format( prefix=settings.INTERNET_ARCHIVE_DAILY_IDENTIFIER_PREFIX, @@ -1383,9 +1379,8 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin ) try: item = InternetArchiveItem.objects.get(identifier=identifier) - if item.complete: - # if this day is already complete, skip it, and move on to the - # next day in the range + if item.initial_uploads_complete: + # Initial uploads for this day are complete; skip to the next day. continue in_flight_for_this_day = item.tasks_in_progress except InternetArchiveItem.DoesNotExist: @@ -1404,8 +1399,64 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin else: logger.info("Prepared to upload 0 links to internet archive: no pending links in range.") + return total_queued + else: logger.info("Skipped the queuing of file upload tasks: max tasks already in progress.") + return None + + +@shared_task +def conditionally_queue_internet_archive_uploads_for_date_range(start_date_string, end_date_string, daily_limit=100, limit=None): + """ + Queues up to settings.INTERNET_ARCHIVE_MAX_SIMULTANEOUS_UPLOADS links for upload to IA, spread over + a number of days such that no more than `daily_limit` are ever queued for a particular day. May + queue fewer links, if an explicit `limit` is passed in, or if: + - there are pending tasks in the Celery queue + - there are submitted-but-as-of-yet-unfinished upload requests being processed by IA + - there are not enough qualifying links in the date range + - there are not enough qualifying links in the date range, while respecting daily_limit + """ + _queue_internet_archive_initial_uploads_for_date_range( + start_date_string, end_date_string, daily_limit=daily_limit, limit=limit + ) + + +@shared_task +def queue_internet_archive_pending_work( + start_date_string, end_date_string, daily_limit=100, limit=None +): + """ + First, queues any pending initial IA uploads for a date range. + If there aren't any, then queues any pending privacy-toggle uploads and deletions. + + If the queuing of pending initial IA uploads is skipped for any reason + (tasks are already in progress, rate-limiting has been triggered, a problem has been encountered, etc.), + then the queuing of privacy-toggle-related work is skipped too. + """ + initial_queued = _queue_internet_archive_initial_uploads_for_date_range( + start_date_string, end_date_string, daily_limit=daily_limit, limit=limit + ) + if initial_queued == 0: + logger.info("Queuing privacy-toggle uploads and deletions.") + queue_internet_archive_uploads_required_from_privacy_toggle() + queue_internet_archive_deletions_required_from_privacy_toggle() + elif initial_queued is None: + logger.info("Skipped the queuing of pending privacy-toggle tasks.") + else: + logger.info("Not queuing privacy-toggle tasks: routine uploads already in progress.") + + +@shared_task +def queue_internet_archive_privacy_toggled_still_pending(limit=100): + """ + Queues any pending privacy-toggle uploads and deletions. + + Schedule once daily, to catch any links whose processing has been + delayed or not otherwise handled by other scheduling. + """ + queue_internet_archive_uploads_required_from_privacy_toggle(limit=limit) + queue_internet_archive_deletions_required_from_privacy_toggle(limit=limit) # WACZ CONVERSION diff --git a/perma_web/perma/migrations/0064_alter_link_internet_archive_upload_status.py b/perma_web/perma/migrations/0064_alter_link_internet_archive_upload_status.py index 7a20afc9d..30a23e7f5 100644 --- a/perma_web/perma/migrations/0064_alter_link_internet_archive_upload_status.py +++ b/perma_web/perma/migrations/0064_alter_link_internet_archive_upload_status.py @@ -3,6 +3,22 @@ from django.db import migrations, models +LEGACY_IA_UPLOAD_STATUSES = ( + 'not_started', + 'completed', + 'failed', + 'deleted', + 'deletion_incomplete', +) + + +def null_legacy_internet_archive_upload_statuses(apps, schema_editor): + Link = apps.get_model('perma', 'Link') + Link.objects.filter( + internet_archive_upload_status__in=LEGACY_IA_UPLOAD_STATUSES + ).update(internet_archive_upload_status=None) + + class Migration(migrations.Migration): dependencies = [ @@ -15,4 +31,8 @@ class Migration(migrations.Migration): name='internet_archive_upload_status', field=models.CharField(choices=[('deletion_required', 'deletion_required'), ('upload_or_reupload_required', 'upload_or_reupload_required')], db_index=True, max_length=28, null=True), ), + migrations.RunPython( + null_legacy_internet_archive_upload_statuses, + migrations.RunPython.noop, + ), ] diff --git a/perma_web/perma/migrations/0065_rename_internetarchiveitem_complete.py b/perma_web/perma/migrations/0065_rename_internetarchiveitem_complete.py new file mode 100644 index 000000000..021ecc731 --- /dev/null +++ b/perma_web/perma/migrations/0065_rename_internetarchiveitem_complete.py @@ -0,0 +1,33 @@ +# Generated by Django 4.2.23 on 2026-06-11 21:04 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('perma', '0064_alter_link_internet_archive_upload_status'), + ] + + operations = [ + migrations.RenameField( + model_name='internetarchiveitem', + old_name='complete', + new_name='initial_uploads_complete', + ), + migrations.AlterField( + model_name='internetarchiveitem', + name='initial_uploads_complete', + field=models.BooleanField( + default=False, + help_text=( + "True when a daily upload task has found zero eligible links for this calendar day " + "that lack an InternetArchiveFile. Only set after the IA Item is older than yesterday, " + "to avoid missing any links. The daily upload scheduler skips days marked True. " + "This field does not consider whether the day's IA Item lacks links due to later " + "eligibility changes, such as changes in links' privacy." + ), + verbose_name='Initial uploads complete', + ), + ), + ] diff --git a/perma_web/perma/models/internet_archive.py b/perma_web/perma/models/internet_archive.py index f652d9175..2ee0be4d9 100644 --- a/perma_web/perma/models/internet_archive.py +++ b/perma_web/perma/models/internet_archive.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import date, datetime, timedelta from datetime import timezone as tz from django.conf import settings @@ -14,6 +14,26 @@ from perma.utils import protocol, remove_control_characters +# Daily items with span at or before this range are excluded from backlog scheduling +# and monitoring (early daily-item era, before the current pipeline). +# When the new pipeline is complete, we should be able to remove all +# references to this value. We keep it for now, to be cautious. +DAILY_ITEM_BACKLOG_SPAN_FLOOR = ('2021-11-10', '2021-11-11') + +# Links created after this date were only uploaded to daily IA items, not legacy per-link items. +LAST_INDIVIDUAL_LINK_IA_UPLOAD_DATE = '2022-10-03' + +# Daily IA items Perma cannot edit on Internet Archive; skip uploads and confirmation tasks. +# We need IA's help to resolve the situation; once they transfer ownership of these items, +# we should be able to remove all references to this value. +UNEDITABLE_DAILY_ITEM_DATE_STRINGS = frozenset({ + '2022-07-19', + '2022-07-20', + '2022-07-21', + '2022-07-25', +}) + + def get_empty_datetime_range(): return DateTimeTZRange(empty=True) @@ -72,7 +92,17 @@ class InternetArchiveItem(models.Model): cached_description = models.TextField(null=True, blank=True, default=None) tasks_in_progress = models.IntegerField(default=0, db_index=True, help_text="We have asked Internet Archive to run appx this many tasks for this item and have not yet confirmed that those tasks are complete; derivative tasks not counted.") - complete = models.BooleanField(default=False, help_text="Has all the files it ought to have; has no files it ought not have.") + initial_uploads_complete = models.BooleanField( + default=False, + verbose_name='Initial uploads complete', + help_text=( + "True when a daily upload task has found zero eligible links for this calendar day " + "that lack an InternetArchiveFile. Only set after the IA Item is older than yesterday, " + "to avoid missing any links. The daily upload scheduler skips days marked True. " + "This field does not consider whether the day's IA Item lacks links due to later " + "eligibility changes, such as changes in links' privacy." + ), + ) last_derived = models.DateTimeField(null=True, blank=True) derive_required = models.BooleanField(default=False) @@ -113,6 +143,91 @@ def inflight_task_count(cls): return cls.objects.aggregate(Sum('tasks_in_progress'))['tasks_in_progress__sum'] + """Filter kwargs for daily InternetArchiveItem backlog queries.""" +def daily_item_backlog_queryset_filter(*, initial_uploads_complete=None): + filters = { + 'span__isempty': False, + 'span__gt': DAILY_ITEM_BACKLOG_SPAN_FLOOR, + } + if initial_uploads_complete is not None: + filters['initial_uploads_complete'] = initial_uploads_complete + return filters + + +def _daily_item_span_overlap_range(window_start, window_end): + return DateTimeTZRange( + InternetArchiveItem.datetime(f'{window_start.isoformat()} 00:00:00'), + InternetArchiveItem.datetime(f'{(window_end + timedelta(days=1)).isoformat()} 00:00:00'), + ) + + +def daily_item_dates_in_window(window_start, window_end): + """ + Return dates that have a daily InternetArchiveItem whose span overlaps + [window_start, window_end] (inclusive calendar days). + """ + if window_start > window_end: + return frozenset() + + return frozenset( + item.span.lower.date() + for item in InternetArchiveItem.objects.filter( + **daily_item_backlog_queryset_filter(), + span__overlap=_daily_item_span_overlap_range(window_start, window_end), + ).only('span') + ) + + +def initial_uploads_incomplete_dates_in_window(window_start, window_end): + """Days in [window_start, window_end] where initial_uploads_complete is False, oldest first.""" + if window_start > window_end: + return [] + + return [ + span.lower.date() + for span in initial_uploads_incomplete_queryset().filter( + span__overlap=_daily_item_span_overlap_range(window_start, window_end), + ).order_by('span').values_list('span', flat=True) + ] + + +def initial_uploads_incomplete_queryset(): + return InternetArchiveItem.objects.filter( + **daily_item_backlog_queryset_filter(initial_uploads_complete=False), + ) + + +def initial_uploads_incomplete_stats(): + """ + Count and span bounds for days where the initial upload pass is not complete. + """ + qs = initial_uploads_incomplete_queryset() + count = qs.count() + if not count: + return {'count': 0, 'oldest': None, 'newest': None} + oldest_span = qs.order_by('span').values_list('span', flat=True).first() + newest_span = qs.order_by('-span').values_list('span', flat=True).first() + return { + 'count': count, + 'oldest': oldest_span.lower.date(), + 'newest': newest_span.lower.date(), + } + + +def uneditable_daily_item_identifiers(): + return [ + InternetArchiveItem.DAILY_IDENTIFIER.format( + prefix=settings.INTERNET_ARCHIVE_DAILY_IDENTIFIER_PREFIX, + date_string=date_string, + ) + for date_string in sorted(UNEDITABLE_DAILY_ITEM_DATE_STRINGS) + ] + + +def uneditable_daily_item_dates(): + return frozenset(date.fromisoformat(d) for d in UNEDITABLE_DAILY_ITEM_DATE_STRINGS) + + class InternetArchiveFile(models.Model): """ A file we uploaded to an Internet Archive "Item". @@ -155,23 +270,6 @@ def __str__(self): WARC_FILENAME = '{guid}.warc.gz' - # Statuses describing a file we should request deletion for from the privacy toggle - DELETION_FROM_PRIVACY_TOGGLE_INCLUDE_STATUSES = ( - 'confirmed_present', - 'deletion_attempted' - ) - - @classmethod - def deletable_from_privacy_toggle(cls): - """ - Files associated with daily IA items (i.e. excluding legacy single-link items, which have an empty `span`) - whose status indicates we can ask IA to delete them in response to a privacy toggle. - """ - return cls.objects.filter( - item__span__isempty=False, - status__in=cls.DELETION_FROM_PRIVACY_TOGGLE_INCLUDE_STATUSES, - ) - @classmethod def guid_from_filename(cls, filename): return filename.split('.')[0] diff --git a/perma_web/perma/models/link.py b/perma_web/perma/models/link.py index 5e0304708..e805ee7df 100644 --- a/perma_web/perma/models/link.py +++ b/perma_web/perma/models/link.py @@ -28,6 +28,7 @@ from .base import DeletableManager, DeletableModel, GenericStringTaggedItem from .folder import Folder +from .internet_archive import LAST_INDIVIDUAL_LINK_IA_UPLOAD_DATE from .organization import Organization from .user import LinkUser @@ -107,8 +108,8 @@ def ia_upload_pending(self, date_string, limit=100): # Get all Links we think should have been uploaded to IA, # and then filter out the ones that have already been uploaded # to a "daily" item. - if date_string > "2022-10-03": - # No links created after 2022-10-03 were uploaded to IA as individual Items: + if date_string > LAST_INDIVIDUAL_LINK_IA_UPLOAD_DATE: + # No links created after this date were uploaded to IA as individual Items: # use a simplified query logger.debug("Running simple IA eligibility query.") query = Link.objects.filter( diff --git a/perma_web/perma/settings/deployments/settings_common.py b/perma_web/perma/settings/deployments/settings_common.py index 7bb9bc440..e30403e52 100644 --- a/perma_web/perma/settings/deployments/settings_common.py +++ b/perma_web/perma/settings/deployments/settings_common.py @@ -496,6 +496,8 @@ def filter(self, record): 'perma.celery_tasks.queue_file_deleted_confirmation_tasks': {'queue': 'ia-readonly'}, 'perma.celery_tasks.confirm_file_deleted_from_daily_item': {'queue': 'ia-readonly'}, 'perma.celery_tasks.conditionally_queue_internet_archive_uploads_for_date_range': {'queue': 'ia-readonly'}, + 'perma.celery_tasks.queue_internet_archive_pending_work': {'queue': 'ia-readonly'}, + 'perma.celery_tasks.queue_internet_archive_privacy_toggled_still_pending': {'queue': 'ia-readonly'}, 'perma.celery_tasks.queue_internet_archive_deletions': {'queue': 'ia-readonly'}, 'perma.celery_tasks.queue_internet_archive_uploads_required_from_privacy_toggle': {'queue': 'ia-readonly'}, 'perma.celery_tasks.queue_internet_archive_deletions_required_from_privacy_toggle': {'queue': 'ia-readonly'}, diff --git a/perma_web/perma/settings/deployments/settings_prod.py b/perma_web/perma/settings/deployments/settings_prod.py index 702927dc4..77507d316 100644 --- a/perma_web/perma/settings/deployments/settings_prod.py +++ b/perma_web/perma/settings/deployments/settings_prod.py @@ -13,9 +13,8 @@ 'run-next-capture', 'sync_subscriptions_from_perma_payments', 'cache_playback_status_for_new_links', - 'conditionally_queue_internet_archive_uploads_for_date_range', - 'queue_internet_archive_uploads_required_from_privacy_toggle', - 'queue_internet_archive_deletions_required_from_privacy_toggle', + 'queue_internet_archive_pending_work', + 'queue_internet_archive_privacy_toggled_still_pending', 'confirm_files_uploaded_to_internet_archive', 'confirm_files_deleted_from_internet_archive', 'deactivate_expired_sponsored_users', diff --git a/perma_web/perma/settings/utils/post_processing.py b/perma_web/perma/settings/utils/post_processing.py index a253d8e7d..48ad323fc 100644 --- a/perma_web/perma/settings/utils/post_processing.py +++ b/perma_web/perma/settings/utils/post_processing.py @@ -62,21 +62,17 @@ def post_process_settings(settings): 'task': 'perma.celery_tasks.sync_subscriptions_from_perma_payments', 'schedule': crontab(hour='23', minute='0') }, - 'conditionally_queue_internet_archive_uploads_for_date_range': { - 'task': 'perma.celery_tasks.conditionally_queue_internet_archive_uploads_for_date_range', + 'queue_internet_archive_pending_work': { + 'task': 'perma.celery_tasks.queue_internet_archive_pending_work', 'schedule': crontab(minute="*/5"), 'args': ( os.environ.get('IA_UPLOAD_START_DATESTRING') or None, os.environ.get('IA_UPLOAD_END_DATESTRING') or None ) }, - 'queue_internet_archive_uploads_required_from_privacy_toggle': { - 'task': 'perma.celery_tasks.queue_internet_archive_uploads_required_from_privacy_toggle', - 'schedule': crontab(minute="10,40"), - }, - 'queue_internet_archive_deletions_required_from_privacy_toggle': { - 'task': 'perma.celery_tasks.queue_internet_archive_deletions_required_from_privacy_toggle', - 'schedule': crontab(minute="20,50"), + 'queue_internet_archive_privacy_toggled_still_pending': { + 'task': 'perma.celery_tasks.queue_internet_archive_privacy_toggled_still_pending', + 'schedule': crontab(hour='3', minute='0'), }, 'confirm_files_uploaded_to_internet_archive': { 'task': 'perma.celery_tasks.queue_file_uploaded_confirmation_tasks', diff --git a/perma_web/perma/utils.py b/perma_web/perma/utils.py index c8eebab84..e1019be37 100644 --- a/perma_web/perma/utils.py +++ b/perma_web/perma/utils.py @@ -1,7 +1,7 @@ from collections import OrderedDict from contextlib import contextmanager, redirect_stdout import csv -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from datetime import timezone as tz from functools import reduce, wraps import hashlib @@ -1036,6 +1036,199 @@ def ia_bucket_task_limit_approaching(s3_details): return s3_details['detail']['bucket_ration'] - s3_details['detail']['bucket_tasks_queued'] - settings.INTERNET_ARCHIVE_PERMITTED_PROXIMITY_TO_RATE_LIMIT <= 0 +DEFAULT_IA_UPLOAD_HEALTH_SPAN_DAYS = 10 + + +def parse_ia_upload_health_date(value, param_name): + if value is None or value == '': + return None + try: + return date.fromisoformat(str(value)) + except ValueError as e: + raise ValueError(f'{param_name} must be YYYY-MM-DD; got {value!r}') from e + + +def resolve_ia_upload_health_span(span_days, span_start=None, span_end=None): + """ + Resolve inclusive [start, end] and day count for an IA upload health report. + + span_days: positive integer, or 'all' for the full daily-item dataset + (from backlog floor through span_end, defaulting to today). + """ + from perma.models.internet_archive import DAILY_ITEM_BACKLOG_SPAN_FLOOR + + today = timezone.now().date() + parsed_start = parse_ia_upload_health_date(span_start, 'span_start') + parsed_end = parse_ia_upload_health_date(span_end, 'span_end') + full = str(span_days).lower() == 'all' + + if parsed_start and parsed_end: + if full: + raise ValueError('span_days=all cannot be combined with both span_start and span_end') + start, end = parsed_start, parsed_end + elif full: + start = parsed_start or date.fromisoformat(DAILY_ITEM_BACKLOG_SPAN_FLOOR[0]) + end = parsed_end or today + elif parsed_start: + span_days_int = int(span_days) + if span_days_int < 1: + raise ValueError('span_days must be a positive integer or all') + start = parsed_start + end = parsed_start + timedelta(days=span_days_int - 1) + elif parsed_end: + span_days_int = int(span_days) + if span_days_int < 1: + raise ValueError('span_days must be a positive integer or all') + end = parsed_end + start = parsed_end - timedelta(days=span_days_int - 1) + else: + span_days_int = int(span_days) + if span_days_int < 1: + raise ValueError('span_days must be a positive integer or all') + end = today + start = end - timedelta(days=span_days_int - 1) + + if start > end: + raise ValueError(f'span start {start} is after span end {end}') + + days = (end - start).days + 1 + return start, end, days, full + + +def _ia_upload_health_pending_links_by_date(dates): + from django.db.models import Count + + from perma.models import Link + + if not dates: + return {} + return { + row['creation_timestamp__date']: row['count'] + for row in Link.objects.visible_to_ia().filter( + internet_archive_files__isnull=True, + creation_timestamp__date__in=dates, + ).values('creation_timestamp__date').annotate(count=Count('guid')) + } + + +def build_ia_upload_health_report( + mode='auto', + auto_detail_max_days=10, + span_days=DEFAULT_IA_UPLOAD_HEALTH_SPAN_DAYS, + span_start=None, + span_end=None, +): + from perma.models import Link + from perma.models.internet_archive import ( + daily_item_dates_in_window, + initial_uploads_incomplete_dates_in_window, + initial_uploads_incomplete_stats, + uneditable_daily_item_dates, + ) + + mode = mode.lower() + auto_detail_max_days = int(auto_detail_max_days) + if auto_detail_max_days < 1: + raise ValueError('auto_detail_max_days must be a positive integer') + if mode not in ('auto', 'detailed', 'summary'): + raise ValueError(f'mode must be auto, detailed, or summary; got {mode!r}') + + span_start, span_end, span_day_count, span_full = resolve_ia_upload_health_span( + span_days, span_start=span_start, span_end=span_end, + ) + + initial_uploads_incomplete = initial_uploads_incomplete_stats() + initial_uploads_incomplete_dates = initial_uploads_incomplete_dates_in_window(span_start, span_end) + + existing_dates = daily_item_dates_in_window(span_start, span_end) + expected_dates = { + span_start + timedelta(days=i) + for i in range(span_day_count) + } + missing_dates = sorted( + day for day in (expected_dates - existing_dates) if day not in uneditable_daily_item_dates() + ) + + if mode == 'auto': + effective_mode = 'detailed' if ( + len(initial_uploads_incomplete_dates) <= auto_detail_max_days and len(missing_dates) <= auto_detail_max_days + ) else 'summary' + else: + effective_mode = mode + + report = { + 'mode_requested': mode, + 'mode': effective_mode, + 'span': { + 'start': span_start.isoformat(), + 'end': span_end.isoformat(), + 'days': span_day_count, + **({'full': True} if span_full else {}), + }, + 'global': { + 'initial_uploads_incomplete': { + 'count': initial_uploads_incomplete['count'], + 'oldest': initial_uploads_incomplete['oldest'].isoformat() if initial_uploads_incomplete['oldest'] else None, + 'newest': initial_uploads_incomplete['newest'].isoformat() if initial_uploads_incomplete['newest'] else None, + }, + 'privacy_toggle': { + 'uploads_pending': Link.objects.filter( + internet_archive_upload_status='upload_or_reupload_required', + ).count(), + 'deletions_pending': Link.objects.filter( + internet_archive_upload_status='deletion_required', + ).count(), + }, + }, + 'in_span': {}, + } + + if mode == 'auto': + report['auto_detail_max_days'] = auto_detail_max_days + + pending_links_filter = dict( + internet_archive_files__isnull=True, + creation_timestamp__date__gte=span_start, + creation_timestamp__date__lte=span_end, + ) + + if effective_mode == 'detailed': + dates_of_interest = set(initial_uploads_incomplete_dates) | set(missing_dates) + pending_by_date = _ia_upload_health_pending_links_by_date(dates_of_interest) + report['in_span']['initial_uploads_incomplete'] = { + 'count': len(initial_uploads_incomplete_dates), + 'days': [ + { + 'date': day.isoformat(), + 'links_pending': pending_by_date.get(day, 0), + } + for day in initial_uploads_incomplete_dates + ], + } + report['in_span']['missing_daily_items'] = { + 'count': len(missing_dates), + 'days': [ + { + 'date': day.isoformat(), + 'links_pending': pending_by_date.get(day, 0), + } + for day in missing_dates + ], + } + else: + initial_uploads_incomplete_section = {'count': len(initial_uploads_incomplete_dates)} + if initial_uploads_incomplete_dates: + initial_uploads_incomplete_section['oldest'] = initial_uploads_incomplete_dates[0].isoformat() + initial_uploads_incomplete_section['newest'] = initial_uploads_incomplete_dates[-1].isoformat() + report['in_span']['initial_uploads_incomplete'] = initial_uploads_incomplete_section + report['in_span']['missing_daily_items'] = {'count': len(missing_dates)} + report['in_span']['links_pending_total'] = Link.objects.visible_to_ia().filter( + **pending_links_filter + ).count() + + return report + + def date_range(start_date, end_date, delta): current = start_date while current <= end_date: diff --git a/perma_web/tasks/dev.py b/perma_web/tasks/dev.py index f46d19aec..ff3140c39 100644 --- a/perma_web/tasks/dev.py +++ b/perma_web/tasks/dev.py @@ -13,6 +13,7 @@ from perma.email import send_user_email, send_self_email, registrar_users, registrar_users_plus_stats from perma.models import Link, LinkUser, Registrar +from perma.utils import DEFAULT_IA_UPLOAD_HEALTH_SPAN_DAYS, build_ia_upload_health_report import logging logger = logging.getLogger(__name__) @@ -74,14 +75,52 @@ def init_db(ctx): @task -def count_pending_ia_links(ctx): +def report_ia_upload_health( + ctx, + mode='auto', + auto_detail_max_days=10, + span_days=DEFAULT_IA_UPLOAD_HEALTH_SPAN_DAYS, + span_start='', + span_end='', +): """ - For use in monitoring the size of the queue. + Report Internet Archive daily upload backlog health as JSON. + + Output is grouped by scope: + span — the analysis period (start, end, days) + global — metrics across all time (not limited to span) + in_span — metrics limited to span + + Span (pick one style): + span_days=10 (default) — last 10 days ending today, with per-day detail + span_days=90 --span-start DATE — N days starting on that date + span_days=90 --span-end DATE — N days ending on that date + span_start + span_end — explicit range (span_days ignored) + span_days=all — full daily-item dataset (backlog floor through today by + default; combine with span_start and/or span_end to adjust). May be expensive. + + mode: + auto (default) — per-day breakdown when the count of in-span days with initial_uploads_incomplete + and in-span missing days are both <= auto_detail_max_days; otherwise aggregate only + detailed — always per-day breakdown for the span + summary — always aggregate counts for the span + + Examples: + invoke report-ia-upload-health + invoke report-ia-upload-health --span-days 90 --span-start 2010-01-01 + invoke report-ia-upload-health --span-days 90 --span-end 2015-06-01 + invoke report-ia-upload-health --span-start 2010-01-01 --span-end 2010-06-30 + invoke report-ia-upload-health --span-days all --mode summary + invoke report-ia-upload-health --span-days all --span-start 2022-01-01 """ - count = Link.objects.visible_to_ia().filter( - internet_archive_upload_status__in=['not_started', 'failed', 'upload_or_reupload_required', 'deleted'] - ).count() - print(count) + report = build_ia_upload_health_report( + mode=mode, + auto_detail_max_days=auto_detail_max_days, + span_days=span_days, + span_start=span_start or None, + span_end=span_end or None, + ) + print(json.dumps(report, indent=2)) @task