From 55e1b56b3041391cc4af7a30e9c84741d37992ab Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Fri, 27 Mar 2026 11:59:34 +0100 Subject: [PATCH 1/5] Refactored the code consistent with the rest of the daemons using base Daemon, DaemonTask, BackgroundJob, BackgroundWorker. - Use ArgumentParser instead of the outdated OptionParser - Rename JobMulti -> HistoryDumpJob, JobMultiLoad -> HistoryLoadJob - Fix pidfile write: os.open() returns an int, not a context manager; wrap with os.fdopen() - Fix flags/mode tuple bug: comma made them a tuple instead of separate arguments --- svndumpsub.py | 542 ++++++++++++++++++-------------------------------- 1 file changed, 193 insertions(+), 349 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 7ddc8c0..771049e 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -26,32 +26,32 @@ # On startup SvnDumpSub starts listening to commits in all repositories. # -from io import BytesIO -import subprocess -import threading -import sys -import stat import os -import tempfile -import re -import json -import socket +import stat import boto3 +import tempfile +import argparse +import subprocess +import svnpubsub.util +import svnpubsub.logger import logging.handlers -try: - import queue -except ImportError: - import queue as Queue - -import optparse -import daemonize import svnpubsub.client -import svnpubsub.util +from svnpubsub import bgworker +from svnpubsub.client import Commit +from svnpubsub.daemon import Daemon, DaemonTask + +AWS = None +SVN = None +BUCKET = None +SVNROOT = None +CLOUDID = None +SVNLOOK = None +SVNADMIN = None HOST = "127.0.0.1" PORT = 2069 -#Will not handle commits if repo starts with any name icluded in REPO_EXCLUDES -REPO_EXCLUDES = ['demo', 'repo'] + +EXCLUDED_REPOS = [] # ['demo', 'repo'] s3client = boto3.client('s3') @@ -76,27 +76,40 @@ def check_call(*args, **kwds): raise subprocess.CalledProcessError(pipe.returncode, args) return pipe.returncode # is EXIT_OK -class Job(object): - def __init__(self, repo, rev, head): - self.repo = repo - self.rev = rev - self.head = head - self.env = {'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'} - self.shard_size = 'shard0' +class Job(bgworker.BackgroundJob): + + def __init__(self, commit: Commit = None, shard_size: str = 'shard0', repo: str = None, rev: int = None, head: int = None): + if commit is not None: + super().__init__(repo=commit.repositoryname, + rev=commit.id, + head=commit.id, + commit=commit, + shard_size=shard_size, + env={'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'}) + elif repo is not None: + super().__init__(repo=repo, + rev=rev, + head=head, + commit=None, + shard_size=shard_size, + env={'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'}) + else: + raise Exception('No commit or repo specified') + def get_key(self, rev): - #/v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz - return '%s/%s' % (self._get_s3_base(rev = rev), self.get_name(rev)) + # /v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz + return '%s/%s' % (self.__get_s3_base(rev = rev), self.get_name(rev)) def get_name(self, rev): - revStr = str(rev) - revStr = revStr.zfill(10) - name = self.repo + '-' + revStr + '.svndump.gz' - #reponame-0000001000.svndump.gz + rev_str = str(rev) + rev_str = rev_str.zfill(10) + name = self.repo + '-' + rev_str + '.svndump.gz' + # reponame-0000001000.svndump.gz return name - def _get_s3_base(self, rev): + def __get_s3_base(self, rev): # Always using 1000 for folders, can not yet support >shard3. d = int(rev) / 1000 @@ -107,54 +120,45 @@ def _get_s3_base(self, rev): # v1/CLOUDID/demo1/shard0/0000000000 return '%s/%s/%s/%s/%s' % (version, CLOUDID, self.repo, self.shard_size, shard_number) - def _get_svn_dump_args(self, from_rev, to_rev): + def __get_svn_dump_args(self, from_rev, to_rev): path = '%s/%s' % (SVNROOT, self.repo) dump_rev = '-r%s:%s' % (from_rev, to_rev) - #svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 + # svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 return [SVNADMIN, 'dump', '--incremental', '--deltas', path, dump_rev] - - #def _get_aws_cp_args(self, rev): - # # aws s3 cp - s3://cms-review-jandersson/v1/jandersson/demo1/shard0/0000000000/demo1-0000000363.svndump.gz - # return [AWS, 's3', 'cp', '-', 's3://%s/%s' % (BUCKET, self.get_key(rev))] - - def _validate_shard(self, rev): + def validate_shard(self, rev): key = self.get_key(rev) try: response = s3client.head_object(Bucket=BUCKET, Key=key) logging.debug('Shard key exists: %s' % key) - if (not response["ContentLength"] > 0): + if not response["ContentLength"] > 0: logging.warning('Dump file empty: %s' % key) return False - #logging.info(response) + # logging.info(response) return True except Exception as err: logging.debug("S3 exception: {0}".format(err)) logging.info('Shard key does not exist: s3://%s/%s' % (BUCKET, key)) return False + def __get_validate_to_rev(self): + rev_round_down = int((self.head - 1) / 1000) + return rev_round_down * 1000 - #Will recursively check a bucket if (rev - 1) exists until it finds a rev dump. - def validate_rev(self, rev): - - validate_to_rev = self._get_validate_to_rev() - rev_to_validate = rev - 1 + def validate(self) -> bool: + # Will recursively check a bucket if (rev - 1) exists until it finds a rev dump. + validate_to_rev = self.__get_validate_to_rev() + rev_to_validate = self.rev - 1 if rev_to_validate < validate_to_rev: logging.info('At first possible rev in shard, will not validate further rev: %s', rev_to_validate) return True - return self._validate_shard(rev_to_validate) - + return self.validate_shard(rev_to_validate) - def _get_validate_to_rev(self): - rev_round_down = int((self.head - 1) / 1000) - return rev_round_down * 1000 - - def _backup_commit(self): + def run(self): logging.info('Dumping and uploading rev: %s from repo: %s' % (self.rev, self.repo)) - self.dump_zip_upload(self._get_svn_dump_args(self.rev, self.rev), self.rev) - + self.dump_zip_upload(self.__get_svn_dump_args(self.rev, self.rev), self.rev) def dump_zip_upload(self, dump_args, rev): shard_key = self.get_key(rev) @@ -182,16 +186,15 @@ def dump_zip_upload(self, dump_args, rev): raise Exception('Compressing shard failed') -# Processing one repo, specified in history option -class JobMulti(Job): - - def __init__(self, repo, shard_size): - self.shard_size = shard_size - self.env = {'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'} - self.repo = repo - self.head = self._get_head(self.repo) - - if shard_size == 'shard3': +class HistoryDumpJob(Job): + """ + Processing one repo as specified in the history option. + """ + def __init__(self, repo, shard_size=None): + super().__init__(shard_size=shard_size, repo=repo, head=self.get_head(repo)) + if shard_size is None: + pass + elif shard_size == 'shard3': self.shard_div = 1000 self.rev_min = 0 elif shard_size == 'shard0': @@ -206,19 +209,19 @@ def __init__(self, repo, shard_size): logging.error('Unsupported shard type: %s' % shard_size) raise Exception('Unsupported shard type') - logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) - shards = self._get_shards(self.head) - self._run(shards) + def validate(self) -> bool: + return True - def _run(self, shards): + def run(self): + logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) + shards = self.get_shards(self.head) for shard in shards: - dump_exists = self._validate_shard(shard) + dump_exists = self.validate_shard(shard) if not dump_exists: logging.info('Shard is missing will dump and upload shard %s' % shard) - self._backup_shard(shard) - - def _get_head(self, repo): + self.__backup_shard(shard) + def get_head(self, repo) -> int: path = '%s/%s' % (SVNROOT, repo) # While using fs we can check that repo exists and provide a better error message. @@ -227,11 +230,11 @@ def _get_head(self, repo): raise Exception('Repository does not exist') # Considered using svn to enable rdump in the future. - #fqdn = socket.getfqdn() - #url = 'http://%s/svn/%s' % (fqdn, repo) + # fqdn = socket.getfqdn() + # url = 'http://%s/svn/%s' % (fqdn, repo) - #args = [SVN, 'info', url] - #grep_args = ['/bin/grep', 'Revision:'] + # args = [SVN, 'info', url] + # grep_args = ['/bin/grep', 'Revision:'] args = [SVNLOOK, 'youngest', path] grep_args = ['/bin/grep', '^[0-9]\+'] @@ -243,7 +246,7 @@ def _get_head(self, repo): logging.info('Repository %s youngest: %s' % (repo, rev)) return rev - def _get_shards(self, head): + def get_shards(self, head): shards = [] number_of_shards = int(head / self.shard_div) # python range excludes second arg from range. @@ -254,38 +257,38 @@ def _get_shards(self, head): return list(range(self.rev_min, int((head + 1) / self.shard_div) * self.shard_div, self.shard_div)) - def _backup_shard(self, shard): + def __backup_shard(self, shard): logging.info('Dumping and uploading shard: %s from repo: %s' % (shard, self.repo)) start_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) - svn_args = self._get_svn_dump_args(start_rev, to_rev) + svn_args = self.__get_svn_dump_args(start_rev, to_rev) self.dump_zip_upload(svn_args, start_rev) -class JobMultiLoad(JobMulti): +class HistoryLoadJob(HistoryDumpJob): def __init__(self, repo): - self.shard_size = '' - self.env = {'LANG': 'en_US.UTF-8', 'LC_ALL': 'en_US.UTF-8'} - self.repo = repo - self.head = self._get_head(self.repo) + super().__init__(repo=repo) if self.head == 0: # Empty repository is a special case because the current head rev can be loaded. self.rev_min = 0 else: self.rev_min = self.head + 1 + def validate(self) -> bool: + return True + + def run(self): logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) # First process large shards if local head is divisible with shard size. if self.rev_min % 1000 == 0: self.shard_size = 'shard3' self.shard_div = 1000 - shards = self._get_shards(self.rev_min + 1000 * self.shard_div) - self._run(shards) - + shards = self.get_shards(self.rev_min + 1000 * self.shard_div) + self.__run(shards) # Refresh head after potentially loading large dumps. - self.head = self._get_head(self.repo) + self.head = self.get_head(self.repo) if self.head == 0: # Empty repository is a special case because the current head rev can be loaded. self.rev_min = 0 @@ -296,16 +299,16 @@ def __init__(self, repo): self.shard_size = 'shard0' self.shard_div = 1 - shards = self._get_shards(self.rev_min + 999) - self._run(shards) + shards = self.get_shards(self.rev_min + 999) + self.__run(shards) - def _run(self, shards): + def __run(self, shards): logging.info('Shards length %s' % len(shards)) for shard in shards: - dump_exists = self._validate_shard(shard) + dump_exists = self.validate_shard(shard) if dump_exists: logging.info('Shard exists, will load shard %s' % shard) - self._load_shard(shard) + self.__load_shard(shard) continue else: logging.info('Shard does not exist, done for now - %s' % shard) @@ -314,8 +317,7 @@ def _run(self, shards): # Restart or raise the maximum number of shards. raise Exception('Maximum number of shards processed') - - def _load_shard(self, shard): + def __load_shard(self, shard): logging.info('Loading shard: %s from repo: %s' % (shard, self.repo)) start_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) @@ -323,7 +325,6 @@ def _load_shard(self, shard): logging.info('Loading shard %s' % shard) self.load_zip(start_rev) - def load_zip(self, rev): shard_key = self.get_key(rev) @@ -366,303 +367,146 @@ def load_zip(self, rev): # TODO Analyze output, should conclude with (ensure revision is correct for shard): # ------- Committed revision 4999 >>> -class BigDoEverythingClasss(object): - #removed the config object from __init__. - def __init__(self): - self.streams = ["http://%s:%d/commits" %(HOST, PORT)] - self.hook = None - self.svnbin = SVNADMIN - self.worker = BackgroundWorker(self.svnbin, self.hook) - self.watch = [ ] +class Task(DaemonTask): - def start(self): - logging.info('start') - - def commit(self, url, commit): - if commit.type != 'svn' or commit.format != 1: - logging.info("SKIP unknown commit format (%s.%d)", - commit.type, commit.format) - return - logging.info("COMMIT r%d (%d paths) from %s" - % (commit.id, len(commit.changed), url)) - - excluded = False - for repo in REPO_EXCLUDES: - if commit.repositoryname.startswith(repo): - logging.info('Commit in excluded repository, ignoring: %s' % commit.repositoryname) - excluded = True - - if not excluded: - job = Job(commit.repositoryname, commit.id, commit.id) - self.worker.add_job(job) - -# Start logging warnings if the work backlog reaches this many items -BACKLOG_TOO_HIGH = 500 - -class BackgroundWorker(threading.Thread): - def __init__(self, svnbin, hook): - threading.Thread.__init__(self) - - # The main thread/process should not wait for this thread to exit. - ### compat with Python 2.5 - self.setDaemon(True) + def __init__(self): + super().__init__(urls=["http://%s:%d/commits" % (HOST, PORT)], excluded_repos=EXCLUDED_REPOS, worker=bgworker.BackgroundWorker(recursive=True)) - self.svnbin = svnbin - self.hook = hook - self.q = queue.PriorityQueue() + def start(self): + logging.info('Daemon started.') - self.has_started = False + def commit(self, url: str, commit: Commit): + try: + job = Job(commit) + self.worker.queue(job) + except Exception: + logging.exception('Failed to queue a job for r%s in: %s.', job.rev, job.repo) - def run(self): - while True: - # This will block until something arrives - tuple = self.q.get() - job = tuple[1] - - # Warn if the queue is too long. - # (Note: the other thread might have added entries to self.q - # after the .get() and before the .qsize().) - qsize = self.q.qsize()+1 - if qsize > BACKLOG_TOO_HIGH: - logging.warn('worker backlog is at %d', qsize) - - try: - prev_exists = self._validate(job) - if prev_exists: - job._backup_commit() - else: - logging.info('Rev - 1 has not been dumped, adding it to the queue') - self.add_job(job) - self.add_job(Job(job.repo, job.rev - 1, job.head)) - - self.q.task_done() - except: - logging.exception('Exception in worker') - - def add_job(self, job): - # Start the thread when work first arrives. Thread-start needs to - # be delayed in case the process forks itself to become a daemon. - if not self.has_started: - self.start() - self.has_started = True - - self.q.put((job.rev, job)) - - def _validate(self, job, boot=False): - "Validate the specific job." - logging.info("Starting validation of rev: %s in repo: %s" % (job.rev, job.repo)) - return job.validate_rev(job.rev) - -class Daemon(daemonize.Daemon): - def __init__(self, logfile, pidfile, umask, bdec): - daemonize.Daemon.__init__(self, logfile, pidfile) - - self.umask = umask - self.bdec = bdec - - def setup(self): - # There is no setup which the parent needs to wait for. - pass - def run(self): - logging.info('svndumpsub started, pid=%d', os.getpid()) - - # Set the umask in the daemon process. Defaults to 000 for - # daemonized processes. Foreground processes simply inherit - # the value from the parent process. - if self.umask is not None: - umask = int(self.umask, 8) - os.umask(umask) - logging.info('umask set to %03o', umask) - - # Start the BDEC (on the main thread), then start the client - self.bdec.start() - - mc = svnpubsub.client.MultiClient(self.bdec.streams, - self.bdec.commit, - self._event) - mc.run_forever() - - def _event(self, url, event_name, event_arg): - if event_name == 'error': - logging.exception('from %s', url) - elif event_name == 'ping': - logging.debug('ping from %s', url) - else: - logging.info('"%s" from %s', event_name, url) +def main(): + global AWS, SVNADMIN, SVNROOT, BUCKET, CLOUDID, SVNLOOK, SVN + parser = argparse.ArgumentParser(description='An SvnPubSub client to keep working copies synchronized with a repository.', usage='Usage: %prog [options] CONFIG_FILE') + parser.add_argument('--logfile', help='filename for logging') + parser.add_argument('--pidfile', help="the process' PID will be written to this file") + parser.add_argument('--uid', help='switch to this UID before running') + parser.add_argument('--gid', help='switch to this GID before running') + parser.add_argument('--daemon', action='store_true', help='run as a background daemon') + parser.add_argument('--umask', help='set this (octal) umask before running') + parser.add_argument('--history', help='Will dump and backup repository in shard3 ranges (even thousands), e.g --history reponame') + parser.add_argument('--shardsize', default='shard3', help='Shard size used by --history. Assumes that shard3 is executed before shard0.') + parser.add_argument('--load', help='Will load repository from shards in size order (shard3 then shard0), e.g --load reponame') + parser.add_argument('--aws', help='path to aws executable e.g /usr/bin/aws') + parser.add_argument('--svnadmin', help='path to svnadmin executable e.g /usr/bin/svnadmin') + parser.add_argument('--svnlook', help='path to svnlook executable e.g /usr/bin/svnlook') + parser.add_argument('--svnroot', help='path to repository locations /srv/cms/svn') + parser.add_argument('--svn', help='path to svn executable only required when combined with --history e.g /usr/bin/svn') + parser.add_argument('--bucket', help='name of S3 bucket where dumps will be stored') + parser.add_argument('--cloudid', help='AWS cloud-id') + parser.add_argument('--log-level', type=int, default=logging.INFO, + help='log level (DEBUG: %d | INFO: %d | WARNING: %d | ERROR: %d | CRITICAL: %d) (default: %d)' % + (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL, logging.INFO)) -def prepare_logging(logfile): - "Log to the specified file, or to stdout if None." - if logfile: - # Rotate logs daily, keeping 7 days worth. - handler = logging.handlers.TimedRotatingFileHandler( - logfile, when='midnight', backupCount=7, - ) - else: - handler = logging.StreamHandler(sys.stdout) + args = parser.parse_args() - # Add a timestamp to the log records - formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s', - '%Y-%m-%d %H:%M:%S') - handler.setFormatter(formatter) + if not args.aws: + parser.error('A valid --aws has to be provided (path to aws executable)') - # Apply the handler to the root logger - root = logging.getLogger() - root.addHandler(handler) + AWS = args.aws - ### use logging.INFO for now. switch to cmdline option or a config? - root.setLevel(logging.INFO) + if not args.svnadmin: + parser.error('A valid --svnadmin has to be provided (path to svnadmin executable)') -def handle_options(options): + SVNADMIN = args.svnadmin - if not options.aws: - raise ValueError('A valid --aws has to be provided (path to aws executable)') - else: - global AWS - AWS = options.aws + if not args.svnroot: + parser.error('A valid --svnroot has to be provided (path to location of svn repositories)') - if not options.svnadmin: - raise ValueError('A valid --svnadmin has to be provided (path to svnadmin executable)') - else: - global SVNADMIN - SVNADMIN = options.svnadmin + SVNROOT = args.svnroot - if not options.svnroot: - raise ValueError('A valid --svnroot has to be provided (path to location of svn repositories)') - else: - global SVNROOT - SVNROOT = options.svnroot + if not args.bucket: + parser.error('A valid --bucket has to be provided (bucket where dump files will be stored)') - if not options.bucket: - raise ValueError('A valid --bucket has to be provided (bucket where dump files will be stored)') - else: - global BUCKET - BUCKET = options.bucket + BUCKET = args.bucket - if not options.cloudid: - raise ValueError('A valid --cloudid has to be provided (aws cloudid)') - else: - global CLOUDID - CLOUDID = options.cloudid + if not args.cloudid: + parser.error('A valid --cloudid has to be provided (aws cloudid)') - if options.svnlook: - global SVNLOOK - SVNLOOK = options.svnlook + CLOUDID = args.cloudid - if options.svn: - global SVN - SVN = options.svn + if args.svnlook: + SVNLOOK = args.svnlook - if options.history and not options.svnlook: - raise ValueError('A valid --svnlook has to be provided if combined with --history (path to svnlook executable)') + if args.svn: + SVN = args.svn - # Set up the logging, then process the rest of the options. + if args.history and not args.svnlook: + parser.error('A valid --svnlook has to be provided if combined with --history (path to svnlook executable)') + # Set up the logging, then process the rest of the args. # In daemon mode, we let the daemonize module handle the pidfile. # Otherwise, we should write this (foreground) PID into the file. - if options.pidfile and not options.daemon: + if args.pidfile and not args.daemon: pid = os.getpid() # Be wary of symlink attacks try: - os.remove(options.pidfile) + os.remove(args.pidfile) except OSError: pass - fd = os.open(options.pidfile, os.O_WRONLY | os.O_CREAT | os.O_EXCL, - stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) - os.write(fd, b'%d\n' % pid) - os.close(fd) - logging.info('pid %d written to %s', pid, options.pidfile) - - if options.gid: + flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL + mode = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH + fd = os.open(args.pidfile, flags, mode) + with os.fdopen(fd, 'wb') as f: + f.write(b'%d\n' % pid) + logging.info('PID: %d -> %s', pid, args.pidfile) + + if args.gid: try: - gid = int(options.gid) + gid = int(args.gid) except ValueError: import grp - gid = grp.getgrnam(options.gid)[2] - logging.info('setting gid %d', gid) + gid = grp.getgrnam(args.gid)[2] + logging.info('GID: %d', gid) os.setgid(gid) - if options.uid: + if args.uid: try: - uid = int(options.uid) + uid = int(args.uid) except ValueError: import pwd - uid = pwd.getpwnam(options.uid)[2] - logging.info('setting uid %d', uid) + uid = pwd.getpwnam(args.uid)[2] + logging.info('UID: %d', uid) os.setuid(uid) - prepare_logging(options.logfile) - -def main(args): - parser = optparse.OptionParser( - description='An SvnPubSub client to keep working copies synchronized ' - 'with a repository.', - usage='Usage: %prog [options] CONFIG_FILE', - ) - parser.add_option('--logfile', - help='filename for logging') - parser.add_option('--pidfile', - help="the process' PID will be written to this file") - parser.add_option('--uid', - help='switch to this UID before running') - parser.add_option('--gid', - help='switch to this GID before running') - parser.add_option('--daemon', action='store_true', - help='run as a background daemon') - parser.add_option('--umask', - help='set this (octal) umask before running') - parser.add_option('--history', - help='Will dump and backup repository in shard3 ranges (even thousands), e.g --history reponame') - parser.add_option('--shardsize', default='shard3', - help='Shard size used by --history. Assumes that shard3 is executed before shard0.') - parser.add_option('--load', - help='Will load repository from shards in size order (shard3 then shard0), e.g --load reponame') - parser.add_option('--aws', - help='path to aws executable e.g /usr/bin/aws') - parser.add_option('--svnadmin', - help='path to svnadmin executable e.g /usr/bin/svnadmin') - parser.add_option('--svnlook', - help='path to svnlook executable e.g /usr/bin/svnlook') - parser.add_option('--svnroot', - help='path to repository locations /srv/cms/svn') - parser.add_option('--svn', - help='path to svn executable only required when combined with --history e.g /usr/bin/svn') - parser.add_option('--bucket', - help='name of S3 bucket where dumps will be stored') - parser.add_option('--cloudid', - help='AWS cloud-id') - - options, extra = parser.parse_args(args) - - # Process any provided options. - handle_options(options) - - if options.history: - JobMulti(options.history, options.shardsize) - elif options.load: - JobMultiLoad(options.load) + # Set up a new logging handler with the specified log level + svnpubsub.logger.setup(logfile=args.logfile, level=args.log_level) + + if args.history: + HistoryDumpJob(args.history, args.shardsize).run() + elif args.load: + HistoryLoadJob(args.load).run() else: - if options.daemon and not options.logfile: + if args.daemon and not args.logfile: parser.error('LOGFILE is required when running as a daemon') - if options.daemon and not options.pidfile: + if args.daemon and not args.pidfile: parser.error('PIDFILE is required when running as a daemon') - bdec = BigDoEverythingClasss() - - # We manage the logfile ourselves (along with possible rotation). The - # daemon process can just drop stdout/stderr into /dev/null. - d = Daemon('/dev/null', os.path.abspath(options.pidfile), - options.umask, bdec) - if options.daemon: + # We manage the logfile ourselves (along with possible rotation). + # The daemon process can just drop stdout/stderr into /dev/null. + daemon = Daemon(name=os.path.basename(__file__), + logfile='/dev/null', + pidfile=os.path.abspath(args.pidfile) if args.pidfile else None, + umask=args.umask, + task=Task()) + if args.daemon: # Daemonize the process and call sys.exit() with appropriate code - d.daemonize_exit() + daemon.daemonize_exit() else: # Just run in the foreground (the default) - d.foreground() + daemon.foreground() if __name__ == "__main__": - main(sys.argv[1:]) + main() From 03c3383e79b61df0c70ac38c8cc0866d7328b87d Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 31 Mar 2026 08:13:22 +0200 Subject: [PATCH 2/5] Replace get_head pipe chain with execute() and remove dead check_call - get_head now uses execute() from util instead of a manual svnlook|grep pipe; svnlook youngest outputs only the revision number so grep was redundant - Remove unused check_call wrapper and its assert (dead code) --- svndumpsub.py | 38 +++++++------------------------------- 1 file changed, 7 insertions(+), 31 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 771049e..83ecd80 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -37,6 +37,7 @@ import logging.handlers import svnpubsub.client from svnpubsub import bgworker +from svnpubsub.util import execute from svnpubsub.client import Commit from svnpubsub.daemon import Daemon, DaemonTask @@ -55,27 +56,6 @@ s3client = boto3.client('s3') -assert hasattr(subprocess, 'check_call') - -def check_call(*args, **kwds): - """Wrapper around subprocess.check_call() that logs stderr upon failure, - with an optional list of exit codes to consider non-failure.""" - assert 'stderr' not in kwds - if '__okayexits' in kwds: - __okayexits = kwds['__okayexits'] - del kwds['__okayexits'] - else: - __okayexits = set([0]) # EXIT_SUCCESS - kwds.update(stderr=subprocess.PIPE) - pipe = subprocess.Popen(*args, **kwds) - output, errput = pipe.communicate() - if pipe.returncode not in __okayexits: - cmd = args[0] if len(args) else kwds.get('args', '(no command)') - logging.error('Command failed: returncode=%d command=%r stderr=%r', - pipe.returncode, cmd, errput) - raise subprocess.CalledProcessError(pipe.returncode, args) - return pipe.returncode # is EXIT_OK - class Job(bgworker.BackgroundJob): @@ -167,9 +147,9 @@ def dump_zip_upload(self, dump_args, rev): gz_args = [gz] # Svn admin dump - p1 = subprocess.Popen((dump_args), stdout=subprocess.PIPE, env=self.env) + p1 = subprocess.Popen(dump_args, stdout=subprocess.PIPE, env=self.env) # Zip stdout - p2 = subprocess.Popen((gz_args), stdin=p1.stdout, stdout=subprocess.PIPE) + p2 = subprocess.Popen(gz_args, stdin=p1.stdout, stdout=subprocess.PIPE) p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits. # Upload zip.stdout to s3 s3client.upload_fileobj(p2.stdout, BUCKET, shard_key) @@ -236,13 +216,10 @@ def get_head(self, repo) -> int: # args = [SVN, 'info', url] # grep_args = ['/bin/grep', 'Revision:'] - args = [SVNLOOK, 'youngest', path] - grep_args = ['/bin/grep', '^[0-9]\+'] - - p1 = subprocess.Popen((args), stdout=subprocess.PIPE) - output = subprocess.check_output((grep_args), stdin=p1.stdout) - - rev = int(''.join(filter(str.isdigit, output.decode("utf-8")))) + # svnlook youngest outputs only the revision number, so grep is redundant. + # Using execute() removes the manual pipe and decode handling. + _, output, _ = execute(SVNLOOK, 'youngest', path) + rev = int(output.strip()) logging.info('Repository %s youngest: %s' % (repo, rev)) return rev @@ -256,7 +233,6 @@ def get_shards(self, head): # Upper limit must be +1 before division (both shard3 and shard0). return list(range(self.rev_min, int((head + 1) / self.shard_div) * self.shard_div, self.shard_div)) - def __backup_shard(self, shard): logging.info('Dumping and uploading shard: %s from repo: %s' % (shard, self.repo)) start_rev = str(shard) From dd9312d1afbfe44f6a4bc5a46f32f856540fec4e Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 31 Mar 2026 08:27:48 +0200 Subject: [PATCH 3/5] Fix TypeError when two jobs with the same rev are queued concurrently heapq falls through to compare Job instances as a tiebreaker when rev is equal, which fails since BackgroundJob has no ordering. Add an itertools counter as a middle element in the priority tuple to prevent this. --- svnpubsub/bgworker.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/svnpubsub/bgworker.py b/svnpubsub/bgworker.py index d8876b9..f0720ec 100644 --- a/svnpubsub/bgworker.py +++ b/svnpubsub/bgworker.py @@ -1,4 +1,5 @@ import logging +import itertools from threading import Thread from queue import PriorityQueue @@ -12,6 +13,7 @@ def __init__(self, recursive=False, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) self.q = PriorityQueue() + self._counter = itertools.count() # Set the kwargs as class attributes self.daemon = True # The main thread/process should not wait for this thread to exit. self.recursive = recursive @@ -20,7 +22,7 @@ def __init__(self, recursive=False, **kwargs): def run(self): while True: # This will block until something arrives - job: BackgroundJob = self.q.get()[1] + job: BackgroundJob = self.q.get()[2] # Warn if the queue is too long. # Note: The other thread might have added entries to self.q after the .get() and before the .qsize() qsize = self.q.qsize() + 1 @@ -44,8 +46,9 @@ def queue(self, job): if not self.started: self.start() self.started = True - # Add the new job to the queue - self.q.put((job.rev, job)) + # Counter breaks ties when two jobs share the same rev, preventing + # heapq from falling through to compare Job instances (which have no ordering). + self.q.put((job.rev, next(self._counter), job)) def __validate(self, job): logging.info("Validating r%s in: %s" % (job.rev, job.repo)) From 0b3eb7cd144aff2286a5fc10526022691cd021b9 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 31 Mar 2026 08:38:26 +0200 Subject: [PATCH 4/5] Fix method visibility across Job, HistoryDumpJob and HistoryLoadJob - _get_svn_dump_args: was __ which caused a name-mangling AttributeError when called from HistoryDumpJob; changed to _ since subclasses need it - Align _get_key, _get_name, _validate_shard, _get_head, _get_shards to single underscore where subclasses access them, double underscore where the method is strictly internal to the defining class --- svndumpsub.py | 40 ++++++++++++++++++++-------------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/svndumpsub.py b/svndumpsub.py index 83ecd80..d81cc04 100755 --- a/svndumpsub.py +++ b/svndumpsub.py @@ -78,11 +78,11 @@ def __init__(self, commit: Commit = None, shard_size: str = 'shard0', repo: str raise Exception('No commit or repo specified') - def get_key(self, rev): + def _get_key(self, rev): # /v1/Cloudid/reponame/shardX/0000001000/reponame-0000001000.svndump.gz - return '%s/%s' % (self.__get_s3_base(rev = rev), self.get_name(rev)) + return '%s/%s' % (self.__get_s3_base(rev = rev), self.__get_name(rev)) - def get_name(self, rev): + def __get_name(self, rev): rev_str = str(rev) rev_str = rev_str.zfill(10) name = self.repo + '-' + rev_str + '.svndump.gz' @@ -100,14 +100,14 @@ def __get_s3_base(self, rev): # v1/CLOUDID/demo1/shard0/0000000000 return '%s/%s/%s/%s/%s' % (version, CLOUDID, self.repo, self.shard_size, shard_number) - def __get_svn_dump_args(self, from_rev, to_rev): + def _get_svn_dump_args(self, from_rev, to_rev): path = '%s/%s' % (SVNROOT, self.repo) dump_rev = '-r%s:%s' % (from_rev, to_rev) # svnadmin dump --incremental --deltas /srv/cms/svn/demo1 -r 237:237 return [SVNADMIN, 'dump', '--incremental', '--deltas', path, dump_rev] - def validate_shard(self, rev): - key = self.get_key(rev) + def _validate_shard(self, rev): + key = self._get_key(rev) try: response = s3client.head_object(Bucket=BUCKET, Key=key) logging.debug('Shard key exists: %s' % key) @@ -134,14 +134,14 @@ def validate(self) -> bool: logging.info('At first possible rev in shard, will not validate further rev: %s', rev_to_validate) return True - return self.validate_shard(rev_to_validate) + return self._validate_shard(rev_to_validate) def run(self): logging.info('Dumping and uploading rev: %s from repo: %s' % (self.rev, self.repo)) - self.dump_zip_upload(self.__get_svn_dump_args(self.rev, self.rev), self.rev) + self.dump_zip_upload(self._get_svn_dump_args(self.rev, self.rev), self.rev) def dump_zip_upload(self, dump_args, rev): - shard_key = self.get_key(rev) + shard_key = self._get_key(rev) gz = '/bin/gzip' gz_args = [gz] @@ -171,7 +171,7 @@ class HistoryDumpJob(Job): Processing one repo as specified in the history option. """ def __init__(self, repo, shard_size=None): - super().__init__(shard_size=shard_size, repo=repo, head=self.get_head(repo)) + super().__init__(shard_size=shard_size, repo=repo, head=self._get_head(repo)) if shard_size is None: pass elif shard_size == 'shard3': @@ -194,14 +194,14 @@ def validate(self) -> bool: def run(self): logging.info('Processing repo %s with head revision %s' % (self.repo, self.head)) - shards = self.get_shards(self.head) + shards = self._get_shards(self.head) for shard in shards: - dump_exists = self.validate_shard(shard) + dump_exists = self._validate_shard(shard) if not dump_exists: logging.info('Shard is missing will dump and upload shard %s' % shard) self.__backup_shard(shard) - def get_head(self, repo) -> int: + def _get_head(self, repo) -> int: path = '%s/%s' % (SVNROOT, repo) # While using fs we can check that repo exists and provide a better error message. @@ -223,7 +223,7 @@ def get_head(self, repo) -> int: logging.info('Repository %s youngest: %s' % (repo, rev)) return rev - def get_shards(self, head): + def _get_shards(self, head): shards = [] number_of_shards = int(head / self.shard_div) # python range excludes second arg from range. @@ -238,7 +238,7 @@ def __backup_shard(self, shard): start_rev = str(shard) to_rev = str(((int(shard / self.shard_div) + 1) * self.shard_div) - 1) - svn_args = self.__get_svn_dump_args(start_rev, to_rev) + svn_args = self._get_svn_dump_args(start_rev, to_rev) self.dump_zip_upload(svn_args, start_rev) @@ -260,11 +260,11 @@ def run(self): if self.rev_min % 1000 == 0: self.shard_size = 'shard3' self.shard_div = 1000 - shards = self.get_shards(self.rev_min + 1000 * self.shard_div) + shards = self._get_shards(self.rev_min + 1000 * self.shard_div) self.__run(shards) # Refresh head after potentially loading large dumps. - self.head = self.get_head(self.repo) + self.head = self._get_head(self.repo) if self.head == 0: # Empty repository is a special case because the current head rev can be loaded. self.rev_min = 0 @@ -275,13 +275,13 @@ def run(self): self.shard_size = 'shard0' self.shard_div = 1 - shards = self.get_shards(self.rev_min + 999) + shards = self._get_shards(self.rev_min + 999) self.__run(shards) def __run(self, shards): logging.info('Shards length %s' % len(shards)) for shard in shards: - dump_exists = self.validate_shard(shard) + dump_exists = self._validate_shard(shard) if dump_exists: logging.info('Shard exists, will load shard %s' % shard) self.__load_shard(shard) @@ -302,7 +302,7 @@ def __load_shard(self, shard): self.load_zip(start_rev) def load_zip(self, rev): - shard_key = self.get_key(rev) + shard_key = self._get_key(rev) gz = '/bin/gunzip' gz_args = [gz, '-c'] From 8850bd9a0064d09322f5d227d222a5f81d35eb63 Mon Sep 17 00:00:00 2001 From: Omid Manikhi Date: Tue, 31 Mar 2026 09:40:52 +0200 Subject: [PATCH 5/5] Add test suite with pytest, JUnit XML reporting for CodeBuild - tests/bgworkertest.py: test for BackgroundWorker.queue with same-rev jobs - pytest.ini: configures testpaths, *test.py pattern, and JUnit XML output - run-tests.sh: convenience script delegating to pytest - .gitignore: exclude generated test-reports/ directory --- .gitignore | 3 +++ pytest.ini | 4 ++++ run-tests.sh | 2 ++ tests/bgworkertest.py | 26 ++++++++++++++++++++++++++ 4 files changed, 35 insertions(+) create mode 100644 pytest.ini create mode 100755 run-tests.sh create mode 100644 tests/bgworkertest.py diff --git a/.gitignore b/.gitignore index 664037e..b0708de 100644 --- a/.gitignore +++ b/.gitignore @@ -242,3 +242,6 @@ dmypy.json .pyre/ # End of https://www.toptal.com/developers/gitignore/api/python,pycharm,osx,backup + +# Test reports +test-reports/ diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..6fa58bc --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +testpaths = tests +python_files = *test.py +addopts = --junitxml=test-reports/results.xml diff --git a/run-tests.sh b/run-tests.sh new file mode 100755 index 0000000..40d0c70 --- /dev/null +++ b/run-tests.sh @@ -0,0 +1,2 @@ +#!/bin/sh +python -m pytest "$@" diff --git a/tests/bgworkertest.py b/tests/bgworkertest.py new file mode 100644 index 0000000..92edac6 --- /dev/null +++ b/tests/bgworkertest.py @@ -0,0 +1,26 @@ +import unittest +from svnpubsub.bgworker import BackgroundJob, BackgroundWorker + + +class ConcreteJob(BackgroundJob): + def validate(self): + return True + def run(self): + pass + + +class BackgroundWorkerTest(unittest.TestCase): + + def test_queue_two_jobs_same_rev(self): + """Two jobs at the same revision must not raise a TypeError. + heapq falls through to compare Job instances when rev is equal, + which fails unless a tiebreaker is in the tuple.""" + worker = BackgroundWorker() + job1 = ConcreteJob(repo='repo-a', rev=1, head=1) + job2 = ConcreteJob(repo='repo-b', rev=1, head=1) + worker.queue(job1) + worker.queue(job2) # would raise TypeError before the fix + + +if __name__ == '__main__': + unittest.main()