diff --git a/api/dao/hierarchy.py b/api/dao/hierarchy.py index e3bce623a..aa040ed5c 100644 --- a/api/dao/hierarchy.py +++ b/api/dao/hierarchy.py @@ -223,66 +223,102 @@ def check_cont(cont, reqs): return False return True -def upsert_fileinfo(cont_name, _id, fileinfo): +def upsert_fileinfo(cont_name, _id, fileinfo, ignore_hash_replace=False): cont_name = containerutil.pluralize(cont_name) _id = bson.ObjectId(_id) container_before = config.db[cont_name].find_one({'_id': _id}) - container_after, file_before = None, None + container_after = None + saved_state = 'saved' + + # Look to see if file with the same name already exists in the container for f in container_before.get('files',[]): - # Fine file in result and set to file_after + + # File already exists, respond accordingly if f['name'] == fileinfo['name']: + + # If the existing file is deleted, always replace (But this is not considered a "replaced" saved state) if 'deleted' in f: - # Ugly hack: remove already existing file that has the 'deleted' tag - # This creates a gap in the delete functionality, ie. this file cannot be restored from this point on. - # Note that the previous file in storage will be unreferenced from the DB (unless CAS edge case...) - config.db[cont_name].find_one_and_update( - {'_id': _id, 'files.name': fileinfo['name']}, - {'$pull': {'files': {'name': fileinfo['name']}}} - ) + remove_file(cont_name, _id, fileinfo['name']) + container_after = add_file(cont_name, _id, fileinfo) + saved_state = 'saved' + + + # Files from a failed job should never replaced existing files that are "accepted" (unless they are deleted) + elif fileinfo.get('from_failed_job') and not f.get('from_failed_job'): + saved_state = 'ignored' + + # The file object is the same as an existing file and the caller has chosen to ignore in this situation + elif f.get('hash') == fileinfo['hash'] and ignore_hash_replace: + saved_state = 'ignored' + + # No special circumstances, proceed with a replace else: - file_before = f + container_after = replace_file(cont_name, _id, f, fileinfo) + saved_state = 'replaced' + break - if file_before is None: - fileinfo['created'] = fileinfo['modified'] - container_after = add_fileinfo(cont_name, _id, fileinfo) + else: - container_after = update_fileinfo(cont_name, _id, fileinfo) - return container_before, container_after + # File was not already in container, add as normal + container_after = add_file(cont_name, _id, fileinfo) -def update_fileinfo(cont_name, _id, fileinfo): - if fileinfo.get('size') is not None: - if type(fileinfo['size']) != int: - log.warn('Fileinfo passed with non-integer size') - fileinfo['size'] = int(fileinfo['size']) + return container_before, container_after, saved_state - update_set = {'files.$.modified': datetime.datetime.utcnow()} - # in this method, we are overriding an existing file. - # update_set allows to update all the fileinfo like size, hash, etc. - for k,v in fileinfo.iteritems(): - update_set['files.$.' + k] = v + +def add_file(cont_name, _id, fileinfo): return config.db[cont_name].find_one_and_update( - {'_id': _id, 'files.name': fileinfo['name']}, - {'$set': update_set}, + {'_id': _id}, + {'$push': {'files': fileinfo}}, return_document=pymongo.collection.ReturnDocument.AFTER ) -def add_fileinfo(cont_name, _id, fileinfo): - if fileinfo.get('size') is not None: - if type(fileinfo['size']) != int: - log.warn('Fileinfo passed with non-integer size') - fileinfo['size'] = int(fileinfo['size']) +def replace_file(cont_name, _id, existing_fileinfo, fileinfo): + while True: + + # Update the version and add existing file to "other_versions" list + fileinfo['version'] = existing_fileinfo['version']+1 + fileinfo['other_versions'] = existing_fileinfo.pop('other_versions', []) + fileinfo['other_versions'].append(existing_fileinfo) + + # Only update if the existing file has not changed since last load + result = config.db[cont_name].replace_one( + {'_id': _id, 'files': {'$elemMatch': { + 'name': fileinfo['name'], + 'modified': existing_fileinfo['modified'], + 'version': existing_fileinfo['version']}}}, + fileinfo + ) + + if not result.modified_count: + # The existing file must have changed, grab update from db + c = config.db[cont_name].find_one({'_id': _id, 'files.name': fileinfo['name']}) + for f in c.get('files'): + if f['name'] == fileinfo['name']: + existing_fileinfo = f + break + else: + # The file wasn't found, must have been removed + fileinfo['version'] = 1 + return add_file(cont_name, _id, fileinfo) + + return fileinfo + + + +def remove_file(cont_name, _id, filename): return config.db[cont_name].find_one_and_update( - {'_id': _id}, - {'$push': {'files': fileinfo}}, + {'_id': _id, 'files.name': filename}, + {'$pull': {'files': {'name': filename}}}, return_document=pymongo.collection.ReturnDocument.AFTER ) + def _group_id_fuzzy_match(group_id, project_label): existing_group_ids = [g['_id'] for g in config.db.groups.find(None, ['_id'])] if group_id.lower() in existing_group_ids: diff --git a/api/jobs/rules.py b/api/jobs/rules.py index b111baf59..697aed600 100644 --- a/api/jobs/rules.py +++ b/api/jobs/rules.py @@ -208,17 +208,22 @@ def create_potential_jobs(db, container, container_type, file_): return potential_jobs -def create_jobs(db, container_before, container_after, container_type): +def create_jobs(db, container_before, container_after, container_type, replaced_files=None): """ Given a before and after set of file attributes, enqueue a list of jobs that would only be possible after the changes. Returns the algorithm names that were queued. """ + # A list of FileContainerReferences that have been completely replaced + # Jobs with these as inputs should get enqueue even if they are in the jobs_before list + if not replaced_files: + replaced_files = [] + jobs_before, jobs_after, potential_jobs = [], [], [] files_before = container_before.get('files', []) - files_after = container_after['files'] # It should always have at least one file after + files_after = container_after.get('files', []) for f in files_before: jobs_before.extend(create_potential_jobs(db, container_before, container_type, f)) @@ -229,13 +234,18 @@ def create_jobs(db, container_before, container_after, container_type): # Using a uniqueness constraint, create a list of the set difference of jobs_after \ jobs_before # (members of jobs_after that are not in jobs_before) for ja in jobs_after: - new_job = True - for jb in jobs_before: - if ja['job'].intention_equals(jb['job']): - new_job = False - break # this job matched in both before and after, ignore - if new_job: + + if set(replaced_files).intersection(set(ja['job'].inputs.itervalues())): + # one of the replaced files is an input potential_jobs.append(ja) + else: + should_enqueue_job = True + for jb in jobs_before: + if ja['job'].intention_equals(jb['job']): + should_enqueue_job = False + break # this job matched in both before and after, ignore + if should_enqueue_job: + potential_jobs.append(ja) spawned_jobs = [] diff --git a/api/placer.py b/api/placer.py index 0c642d9fc..235282033 100644 --- a/api/placer.py +++ b/api/placer.py @@ -44,6 +44,9 @@ def __init__(self, container_type, container, id_, metadata, timestamp, origin, # A list of files that have been saved via save_file() usually returned by finalize() self.saved = [] + # A list of files that have been ignored by save_file() because a file with the same name and hash already existed + self.ignored = [] + def check(self): """ @@ -91,11 +94,24 @@ def save_file(self, field=None, file_attrs=None): # Update the DB if file_attrs is not None: - container_before, self.container = hierarchy.upsert_fileinfo(self.container_type, self.id_, file_attrs) + container_before, self.container, saved_state = hierarchy.upsert_fileinfo(self.container_type, self.id_, file_attrs) + + # If this file was ignored because an existing file with the same name and hash existed on this project, + # add the file to the ignored list and move on + if saved_state == 'ignored': + self.ignored.append(file_attrs) + + else: + self.saved.append(file_attrs) + + # create_jobs handles files that have been replaced differently + replaced_files = [] + if saved_state == 'replaced': + replaced_files.append(containerutil.FileReference(self.container_type, self.id_, file_attrs['name'])) + + rules.create_jobs(config.db, container_before, self.container, self.container_type, replaced_files=replaced_files) + - # Queue any jobs as a result of this upload, uploading to a gear will not make jobs though - if self.container_type != 'gear': - rules.create_jobs(config.db, container_before, self.container, self.container_type) def recalc_session_compliance(self): if self.container_type in ['session', 'acquisition'] and self.id_: @@ -121,7 +137,7 @@ def process_file_field(self, field, file_attrs): if self.metadata: file_attrs.update(self.metadata) self.save_file(field, file_attrs) - self.saved.append(file_attrs) + def finalize(self): self.recalc_session_compliance() @@ -181,21 +197,11 @@ def process_file_field(self, field, file_attrs): r_metadata = target['metadata'] file_attrs.update(r_metadata) - if container.level != 'subject': - self.container_type = container.level - self.id_ = container.id_ - self.container = container.container - self.save_file(field, file_attrs) - else: - if field is not None: - files.move_form_file_field_into_cas(field) - if file_attrs is not None: - container.upsert_file(file_attrs) - - # # Queue any jobs as a result of this upload - # rules.create_jobs(config.db, self.container, self.container_type, info) - self.saved.append(file_attrs) + self.container_type = container.level + self.id_ = container.id_ + self.container = container.container + self.save_file(field, file_attrs) def finalize(self): # Check that there is at least one file being uploaded @@ -294,7 +300,6 @@ def process_file_field(self, field, file_attrs): file_attrs['from_failed_job'] = True self.save_file(field, file_attrs) - self.saved.append(file_attrs) def finalize(self): if self.metadata is not None: @@ -305,7 +310,6 @@ def finalize(self): for file_md in file_mds: if file_md['name'] not in saved_file_names: self.save_file(None, file_md) # save file_attrs update only - self.saved.append(file_md) # Remove file metadata as it was already updated in process_file_field for k in self.metadata.keys(): @@ -653,7 +657,6 @@ def check(self): def process_file_field(self, field, file_attrs): self.save_file(field) - self.saved.append(file_attrs) def finalize(self): # we are going to merge the "hard" infos from the processed upload @@ -689,7 +692,6 @@ def process_file_field(self, field, file_attrs): file_attrs['output'] = True file_attrs['created'] = file_attrs['modified'] self.save_file(field) - self.saved.append(file_attrs) def finalize(self): # Search the sessions table for analysis, replace file field @@ -720,9 +722,7 @@ def process_file_field(self, field, file_attrs): self.metadata.update({'exchange': {'rootfs-hash': proper_hash, 'git-commit': 'local', 'rootfs-url': 'INVALID'}}) - # self.metadata['hash'] = file_attrs.get('hash') self.save_file(field) - self.saved.append(file_attrs) self.saved.append(self.metadata) def finalize(self): diff --git a/api/upload.py b/api/upload.py index 617a30355..67c996b67 100644 --- a/api/upload.py +++ b/api/upload.py @@ -122,11 +122,13 @@ def process_upload(request, strategy, container_type=None, id_=None, origin=None # Stands in for a dedicated object... for now. file_attrs = { 'name': field.filename, - 'modified': field.modified, # + 'created': timestamp, + 'modified': timestamp, 'size': field.size, 'mimetype': field.mimetype, 'hash': field.hash, 'origin': origin, + 'version': 1, 'type': None, 'modality': None, diff --git a/bin/database.py b/bin/database.py index 375c4b76f..835a86eb0 100755 --- a/bin/database.py +++ b/bin/database.py @@ -22,7 +22,7 @@ from api.types import Origin from api.jobs import batch -CURRENT_DATABASE_VERSION = 42 # An int that is bumped when a new schema change is made +CURRENT_DATABASE_VERSION = 43 # An int that is bumped when a new schema change is made def get_db_version(): @@ -1360,6 +1360,27 @@ def upgrade_to_42(): process_cursor(cursor, upgrade_to_42_closure, context=cont_name) +def upgrade_to_43_closure(cont, cont_name): + """ + Update all files in a collection that do not have a version + """ + files = cont.get('files', []) + for f in files: + if 'version' not in f: + f['version'] = 1 + config.db[cont_name].update_one({'_id': cont['_id']}, {'$set': {'files': files}}) + return True + + +def upgrade_to_43(): + """ + Add initial file versioning to all files + """ + for cont_name in ['projects', 'sessions', 'acquisitions', 'analyses', 'collections']: + cursor = config.db[cont_name].find({'files': { '$gt': [] }, 'files.version': {'$exists': False }}) + process_cursor(cursor, upgrade_to_43_closure, context=cont_name) + + ### ### BEGIN RESERVED UPGRADE SECTION ###