Skip to content
Open
6 changes: 3 additions & 3 deletions perma_web/api/tests/test_link_authorization.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ def setUp(self):
'title': 'This is a new title',
'default_to_screenshot_view': True}

# mock the IA helpers that are called when the privacy toggle is used
# mock the IA tasks that are called when the privacy toggle is used
self.ia_upload_from_privacy_toggle_patcher = patch('api.views.links.upload_link_to_internet_archive.delay')
self.ia_deletion_from_privacy_toggle_patcher = patch('api.views.links.request_internet_archive_deletion_from_privacy_toggle')
self.ia_deletion_from_privacy_toggle_patcher = patch('api.views.links.delete_link_from_daily_item.delay')
self.mock_upload_link_to_internet_archive_delay = self.ia_upload_from_privacy_toggle_patcher.start()
self.mock_request_internet_archive_deletion_from_privacy_toggle = self.ia_deletion_from_privacy_toggle_patcher.start()
self.mock_delete_link_from_daily_item_delay = self.ia_deletion_from_privacy_toggle_patcher.start()

def tearDown(self):
self.ia_upload_from_privacy_toggle_patcher.stop()
Expand Down
4 changes: 2 additions & 2 deletions perma_web/api/views/links.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from rest_framework import status

from perma.celery_tasks import (
delete_link_from_daily_item,
run_next_capture,
request_internet_archive_deletion_from_privacy_toggle,
upload_link_to_internet_archive
)
from perma.models import Capture, CaptureJob, Folder, Link, LinkBatch
Expand Down Expand Up @@ -363,7 +363,7 @@ def patch(self, request, guid, format=None):
link.internet_archive_upload_status = 'deletion_required'
link.save(update_fields=["internet_archive_upload_status"])
logger.info(f"Link {link.guid} was toggled to private. Requesting the IA deletion.")
request_internet_archive_deletion_from_privacy_toggle(link)
delete_link_from_daily_item.delay(link.guid)

# include remaining links in response
links_remaining = request.user.get_links_remaining()
Expand Down
4 changes: 2 additions & 2 deletions perma_web/perma/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,8 @@ class LinkBatchAdmin(admin.ModelAdmin):


class InternetArchiveItemAdmin(admin.ModelAdmin):
list_display = ['identifier', 'confirmed_exists', 'cached_is_dark', 'added_date', 'span', 'cached_file_count', 'tasks_in_progress', 'complete', 'last_derived', 'derive_required', 'internet_archive_link']
list_filter = [IAIdentifierFilter, IAItemTypeFilter, IAItemHasTasksFilter, 'confirmed_exists', 'complete', 'cached_is_dark']
list_display = ['identifier', 'confirmed_exists', 'cached_is_dark', 'added_date', 'span', 'cached_file_count', 'tasks_in_progress', 'initial_uploads_complete', 'last_derived', 'derive_required', 'internet_archive_link']
list_filter = [IAIdentifierFilter, IAItemTypeFilter, IAItemHasTasksFilter, 'confirmed_exists', 'initial_uploads_complete', 'cached_is_dark']
readonly_fields = ['identifier', 'cached_is_dark', 'added_date', 'span', 'cached_title', 'cached_description', 'cached_file_count', 'last_derived']

paginator = FasterAdminPaginator
Expand Down
157 changes: 104 additions & 53 deletions perma_web/perma/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@

from perma.models import LinkUser, Link, Capture, \
CaptureJob, InternetArchiveItem, InternetArchiveFile, Folder, Sponsorship, UserOrganizationAffiliation
from perma.models.internet_archive import (
UNEDITABLE_DAILY_ITEM_DATE_STRINGS,
daily_item_backlog_queryset_filter,
uneditable_daily_item_identifiers,
)
from perma.exceptions import PermaPaymentsCommunicationException, ScoopAPINetworkException, ScoopAPIException
from perma.utils import (
remove_whitespace,
Expand Down Expand Up @@ -572,12 +577,9 @@ def queue_batched_tasks(task, query, batch_size=1000, **kwargs):
logger.info(f"Queued {batches_queued} batches of size {batch_size}{' and a single batch of size ' + str(remainder) if remainder else ''}, pks {first}-{last}.")


def clear_link_ia_status_after_privacy_toggle_jobs(link):
"""
Clear internet_archive_upload_status field of a link.
Only applies to links marked upload_or_reupload_required or deletion_required after a privacy toggle.
"""
if link.internet_archive_upload_status in ('upload_or_reupload_required', 'deletion_required'):
def clear_pending_ia_status(link):
"""Unset the privacy-toggle IA sync flag so Beat stops retrying this link."""
if link.internet_archive_upload_status:
link.internet_archive_upload_status = None
link.save(update_fields=['internet_archive_upload_status'])

Expand All @@ -594,6 +596,11 @@ def upload_link_to_internet_archive(link_guid, attempts=0, timeouts=0):
link = Link.objects.get(guid=link_guid)
if not link.can_upload_to_internet_archive():
logger.info(f"Queued Link {link_guid} no longer eligible for upload.")
# Keep status set while waiting for playback cache (cached_can_play_back is None on
# a discoverable link). Clear when ineligibility is definitive: not discoverable,
# or playback cache has run and the capture can't play back.
if link.cached_can_play_back is not None or not link.is_discoverable():
clear_pending_ia_status(link)
return

# Get or create the appropriate InternetArchiveItem object for this link
Expand All @@ -617,6 +624,7 @@ def upload_link_to_internet_archive(link_guid, attempts=0, timeouts=0):
if perma_file:
if perma_file.status == 'confirmed_present':
logger.info(f"Not uploading {link_guid} to {identifier}: our records indicate it is already present.")
clear_pending_ia_status(link)
return
elif perma_file.status in ['deletion_attempted', 'deletion_submitted']:
# If we find ourselves here, something has gotten very mixed up indeed. We probably need a human to have a look.
Expand All @@ -629,6 +637,7 @@ def upload_link_to_internet_archive(link_guid, attempts=0, timeouts=0):
logger.info(f"Uploading {link_guid} (previously deleted) to {identifier}.")
else:
logger.warning(f"Not uploading {link_guid} to {identifier}: task not implemented for InternetArchiveFiles with status '{perma_file.status}'.")
clear_pending_ia_status(link)
return
else:
# A fresh one. Create the InternetArchiveFile here.
Expand Down Expand Up @@ -823,12 +832,7 @@ def queue_file_uploaded_confirmation_tasks(limit=None):
file_ids = InternetArchiveFile.objects.filter(
status='upload_submitted'
).exclude(
item_id__in=[
'daily_perma_cc_2022-07-25',
'daily_perma_cc_2022-07-21',
'daily_perma_cc_2022-07-20',
'daily_perma_cc_2022-07-19'
]
item_id__in=uneditable_daily_item_identifiers()
).values_list(
'id', flat=True
)[:limit]
Expand Down Expand Up @@ -857,7 +861,7 @@ def confirm_file_uploaded_to_internet_archive(file_id, attempts=0, connection_er

if perma_file.status == 'confirmed_present':
logger.info(f"InternetArchiveFile {file_id} ({link.guid}) already confirmed to be uploaded to {perma_item.identifier}.")
clear_link_ia_status_after_privacy_toggle_jobs(link)
clear_pending_ia_status(link)
return

ia_session = get_ia_session()
Expand Down Expand Up @@ -924,15 +928,24 @@ def confirm_file_uploaded_to_internet_archive(file_id, attempts=0, connection_er
'cached_file_count',
'tasks_in_progress'
])
clear_link_ia_status_after_privacy_toggle_jobs(link)
clear_pending_ia_status(link)

logger.info(f"Confirmed upload of {link.guid} to {perma_item.identifier}.")


@shared_task(acks_late=True)
def delete_link_from_daily_item(link_guid, attempts=0):
perma_file = InternetArchiveFile.objects.select_related('item').get(link_id=link_guid, item__span__isempty=False)
perma_file = InternetArchiveFile.objects.select_related('item', 'link').filter(
link_id=link_guid,
item__span__isempty=False,
).first()
if not perma_file:
logger.info(f"No daily InternetArchiveFile for {link_guid}; nothing to delete.")
clear_pending_ia_status(Link.objects.get(guid=link_guid))
return

perma_item = perma_file.item
link = perma_file.link
identifier = perma_item.identifier

def retry_deletion(attempt_count):
Expand All @@ -942,6 +955,7 @@ def retry_deletion(attempt_count):

if perma_file.status == 'confirmed_absent':
logger.info(f"The daily InternetArchiveFile for {link_guid} is already confirmed absent from {identifier}.")
clear_pending_ia_status(link)
return
elif perma_file.status in ['upload_attempted', 'upload_submitted']:
# If we find ourselves here, something has gotten very mixed up indeed. We probably need a human to have a look.
Expand All @@ -954,6 +968,7 @@ def retry_deletion(attempt_count):
logger.info(f"Deleting {link_guid} from {identifier}.")
else:
logger.warning(f"Not deleting {link_guid} from {identifier}: task not implemented for InternetArchiveFiles with status '{perma_file.status}'.")
clear_pending_ia_status(link)
return

# Record that we are attempting a deletion
Expand Down Expand Up @@ -1079,7 +1094,7 @@ def confirm_file_deleted_from_daily_item(file_id, attempts=0, connection_errors=

if perma_file.status == 'confirmed_absent':
logger.info(f"InternetArchiveFile {file_id} ({guid}) already confirmed absent from {perma_item.identifier}.")
clear_link_ia_status_after_privacy_toggle_jobs(link)
clear_pending_ia_status(link)
return

ia_session = get_ia_session()
Expand Down Expand Up @@ -1138,7 +1153,7 @@ def confirm_file_deleted_from_daily_item(file_id, attempts=0, connection_errors=
'cached_file_count',
'tasks_in_progress'
])
clear_link_ia_status_after_privacy_toggle_jobs(link)
clear_pending_ia_status(link)

logger.info(f"Confirmed deletion of {guid} from {perma_item.identifier}.")

Expand All @@ -1164,12 +1179,7 @@ def queue_file_deleted_confirmation_tasks(limit=100):
file_ids = InternetArchiveFile.objects.filter(
status='deletion_submitted'
).exclude(
item_id__in=[
'daily_perma_cc_2022-07-25',
'daily_perma_cc_2022-07-21',
'daily_perma_cc_2022-07-20',
'daily_perma_cc_2022-07-19'
]
item_id__in=uneditable_daily_item_identifiers()
).values_list(
'id', flat=True
)[:limit]
Expand Down Expand Up @@ -1215,20 +1225,6 @@ def queue_internet_archive_deletions(limit=None):
logger.info(f"Queued { len(queued) } links for deletion ({queued[0]} through {queued[-1]}).")


def request_internet_archive_deletion_from_privacy_toggle(link):
"""
Queue deletion of a link from its daily Internet Archive item, if a deletable file exists.
Triggered by the privacy toggle in the API.
"""
is_deletable = InternetArchiveFile.deletable_from_privacy_toggle().filter(link_id=link.guid).exists()

if not is_deletable:
return False

delete_link_from_daily_item.delay(link.guid)
return True


def dispatch_link_guid_tasks(task, queryset=None, first_link=None, rest_of_links=None, log_label=None):
"""
Dispatch task.delay(link.guid) for each link, returning queued guids.
Expand Down Expand Up @@ -1314,10 +1310,10 @@ def queue_internet_archive_uploads_for_date(date_string, limit=100):
)
try:
item = InternetArchiveItem.objects.get(identifier=identifier)
# Don't mark an item complete if it's yesterday's
# Don't mark initial uploads complete if it's yesterday's.
if timezone.now() - item.span.lower > timedelta(days=3):
item.complete = True
item.save(update_fields=['complete'])
item.initial_uploads_complete = True
item.save(update_fields=['initial_uploads_complete'])
logger.info(f"Found no pending links: marked IA Item {item.identifier} complete.")
else:
logger.info(f"Found no pending links for recent IA Item {item.identifier}; not marking complete.")
Expand All @@ -1326,8 +1322,9 @@ def queue_internet_archive_uploads_for_date(date_string, limit=100):
return 0


@shared_task
def conditionally_queue_internet_archive_uploads_for_date_range(start_date_string, end_date_string, daily_limit=100, limit=None):
def _queue_internet_archive_initial_uploads_for_date_range(
start_date_string, end_date_string, daily_limit=100, limit=None
):
"""
Queues up to settings.INTERNET_ARCHIVE_MAX_SIMULTANEOUS_UPLOADS links for upload to IA, spread over
a number of days such that no more than `daily_limit` are ever queued for a particular day. May
Expand All @@ -1336,17 +1333,17 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin
- there are submitted-but-as-of-yet-unfinished upload requests being processed by IA
- there are not enough qualifying links in the date range
- there are not enough qualifying links in the date range, while respecting daily_limit

Returns the number of upload tasks queued, or None if queuing was skipped.
"""
tasks_in_ia_queue = redis.from_url(settings.CELERY_BROKER_URL).llen('ia')
if tasks_in_ia_queue:
logger.info(f"Skipped the queuing of file upload tasks: {tasks_in_ia_queue} task{pluralize(tasks_in_ia_queue)} in the ia queue.")
return
return None

if not start_date_string:
oldest_incomplete_daily_item_in_backlog = InternetArchiveItem.objects.filter(
span__isempty=False,
span__gt=('2021-11-10', '2021-11-11'),
complete=False,
**daily_item_backlog_queryset_filter(complete=False),
).order_by('span').first()
start = oldest_incomplete_daily_item_in_backlog.span.lower.date()
else:
Expand All @@ -1364,7 +1361,7 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin

if to_queue < 0:
logger.error(f"Something is amiss with the IA upload process: InternetArchiveItem.inflight_task_count ({InternetArchiveItem.inflight_task_count()}) is larger than settings.INTERNET_ARCHIVE_MAX_SIMULTANEOUS_UPLOADS.")
return
return None

if to_queue:

Expand All @@ -1373,19 +1370,17 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin
for day in date_range(start, end, timedelta(days=1)):
if total_queued < to_queue:
date_string = day.strftime('%Y-%m-%d')
if date_string in ['2022-07-25', '2022-07-21', '2022-07-20', '2022-07-19']:
# for now, skip these days: by accident, we don't presently have edit
# privileges for the IA Items with these identifiers
if date_string in UNEDITABLE_DAILY_ITEM_DATE_STRINGS:
# We do not presently have edit privileges for these IA Items.
continue
identifier = InternetArchiveItem.DAILY_IDENTIFIER.format(
prefix=settings.INTERNET_ARCHIVE_DAILY_IDENTIFIER_PREFIX,
date_string=date_string
)
try:
item = InternetArchiveItem.objects.get(identifier=identifier)
if item.complete:
# if this day is already complete, skip it, and move on to the
# next day in the range
if item.initial_uploads_complete:
# Initial uploads for this day are complete; skip to the next day.
continue
in_flight_for_this_day = item.tasks_in_progress
except InternetArchiveItem.DoesNotExist:
Expand All @@ -1404,8 +1399,64 @@ def conditionally_queue_internet_archive_uploads_for_date_range(start_date_strin
else:
logger.info("Prepared to upload 0 links to internet archive: no pending links in range.")

return total_queued

else:
logger.info("Skipped the queuing of file upload tasks: max tasks already in progress.")
return None


@shared_task
def conditionally_queue_internet_archive_uploads_for_date_range(start_date_string, end_date_string, daily_limit=100, limit=None):
"""
Queues up to settings.INTERNET_ARCHIVE_MAX_SIMULTANEOUS_UPLOADS links for upload to IA, spread over
a number of days such that no more than `daily_limit` are ever queued for a particular day. May
queue fewer links, if an explicit `limit` is passed in, or if:
- there are pending tasks in the Celery queue
- there are submitted-but-as-of-yet-unfinished upload requests being processed by IA
- there are not enough qualifying links in the date range
- there are not enough qualifying links in the date range, while respecting daily_limit
"""
_queue_internet_archive_initial_uploads_for_date_range(
start_date_string, end_date_string, daily_limit=daily_limit, limit=limit
)


@shared_task
def queue_internet_archive_pending_work(
start_date_string, end_date_string, daily_limit=100, limit=None
):
"""
First, queues any pending initial IA uploads for a date range.
If there aren't any, then queues any pending privacy-toggle uploads and deletions.

If the queuing of pending initial IA uploads is skipped for any reason
(tasks are already in progress, rate-limiting has been triggered, a problem has been encountered, etc.),
then the queuing of privacy-toggle-related work is skipped too.
"""
initial_queued = _queue_internet_archive_initial_uploads_for_date_range(
start_date_string, end_date_string, daily_limit=daily_limit, limit=limit
)
if initial_queued == 0:
logger.info("Queuing privacy-toggle uploads and deletions.")
queue_internet_archive_uploads_required_from_privacy_toggle()
queue_internet_archive_deletions_required_from_privacy_toggle()
elif initial_queued is None:
logger.info("Skipped the queuing of pending privacy-toggle tasks.")
else:
logger.info("Not queuing privacy-toggle tasks: routine uploads already in progress.")


@shared_task
def queue_internet_archive_privacy_toggled_still_pending(limit=100):
"""
Queues any pending privacy-toggle uploads and deletions.

Schedule once daily, to catch any links whose processing has been
delayed or not otherwise handled by other scheduling.
"""
queue_internet_archive_uploads_required_from_privacy_toggle(limit=limit)
queue_internet_archive_deletions_required_from_privacy_toggle(limit=limit)


# WACZ CONVERSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
from django.db import migrations, models


LEGACY_IA_UPLOAD_STATUSES = (
'not_started',
'completed',
'failed',
'deleted',
'deletion_incomplete',
)


def null_legacy_internet_archive_upload_statuses(apps, schema_editor):
Link = apps.get_model('perma', 'Link')
Link.objects.filter(
internet_archive_upload_status__in=LEGACY_IA_UPLOAD_STATUSES
).update(internet_archive_upload_status=None)


class Migration(migrations.Migration):

dependencies = [
Expand All @@ -15,4 +31,8 @@ class Migration(migrations.Migration):
name='internet_archive_upload_status',
field=models.CharField(choices=[('deletion_required', 'deletion_required'), ('upload_or_reupload_required', 'upload_or_reupload_required')], db_index=True, max_length=28, null=True),
),
migrations.RunPython(
null_legacy_internet_archive_upload_statuses,
migrations.RunPython.noop,
),
]
Loading