From 42442c401f9aa814cd54f8ac8f38da91e7cfc3aa Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Thu, 3 Sep 2020 09:40:47 -0400 Subject: [PATCH] extract_odf_links: Support multiprocessing. This cuts audit packet manifest build time in half on my 8-core laptop. --- conservancy_beancount/tools/audit_report.py | 2 +- .../tools/extract_odf_links.py | 71 ++++++++++++++++--- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/conservancy_beancount/tools/audit_report.py b/conservancy_beancount/tools/audit_report.py index 4b1fa4f..709d1ea 100644 --- a/conservancy_beancount/tools/audit_report.py +++ b/conservancy_beancount/tools/audit_report.py @@ -242,7 +242,7 @@ def main(arglist: Optional[Sequence[str]]=None, if not args.force or len(missing_reports) == len(output_reports): return os.EX_UNAVAILABLE - arglist = [f'--delimiter={args.delimiter}'] + arglist = [f'--delimiter={args.delimiter}', f'--jobs={args.jobs}'] if repo_path is not None: arglist.append(f'--relative-to={repo_path}') arglist.extend(str(p) for p in output_reports if p not in missing_reports) diff --git a/conservancy_beancount/tools/extract_odf_links.py b/conservancy_beancount/tools/extract_odf_links.py index 0659e58..71b0ca2 100644 --- a/conservancy_beancount/tools/extract_odf_links.py +++ b/conservancy_beancount/tools/extract_odf_links.py @@ -20,7 +20,9 @@ filesystem, and writes their full paths to stdout. # along with this program. If not, see . import argparse +import concurrent.futures as futmod import logging +import multiprocessing import os import sys import urllib.parse @@ -32,6 +34,8 @@ import odf.opendocument # type:ignore[import] import odf.text # type:ignore[import] from typing import ( + Dict, + Hashable, Iterator, Optional, Sequence, @@ -41,7 +45,11 @@ from typing import ( from .. import cliutil +# This should be Queue[Optional[Path]] but that requires newer typeshed +PathQ = multiprocessing.SimpleQueue + PROGNAME = 'extract-odf-links' +_QUEUES: Dict[Hashable, PathQ] = {} logger = logging.getLogger('conservancy_beancount.tools.extract_odf_links') def parse_delimiter(arg: str) -> str: @@ -58,6 +66,7 @@ def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace parser = argparse.ArgumentParser(prog=PROGNAME) cliutil.add_version_argument(parser) cliutil.add_loglevel_argument(parser) + cliutil.add_jobs_argument(parser) parser.add_argument( '--delimiter', '-d', metavar='TEXT', @@ -88,8 +97,11 @@ spreadsheet's path help="""ODF file(s) to extract links from """) args = parser.parse_args(arglist) + paths_count = len(args.odf_paths) + args.odf_paths = [path for path in args.odf_paths if path != cliutil.STDSTREAM_PATH] + args.read_stdin = paths_count > len(args.odf_paths) if args.relative_to is None: - if any(path == cliutil.STDSTREAM_PATH for path in args.odf_paths): + if args.read_stdin: parser.error("--relative-to is required to read from stdin") elif args.relative_to.is_dir() or not args.relative_to.exists(): args.relative_to /= 'PathStub.ods' @@ -112,31 +124,68 @@ def extract_links(odf_doc: odf.opendocument.OpenDocument, rel_path: Path) -> Ite continue yield path +def enqueue_links(odf_path: Path, + rel_path: Optional[Path], + queue_key: Hashable, + stdin_fd: int, +) -> None: + queue = _QUEUES[queue_key] + try: + with cliutil.bytes_output(odf_path, stdin_fd, 'r') as odf_file: + odf_doc = odf.opendocument.load(odf_file) + for path in extract_links(odf_doc, rel_path or odf_path): + queue.put(path) + finally: + queue.put(None) + def main(arglist: Optional[Sequence[str]]=None, stdout: TextIO=sys.stdout, stderr: TextIO=sys.stderr, ) -> int: args = parse_arguments(arglist) cliutil.set_loglevel(logger, args.loglevel) + args.odf_paths.sort(key=lambda path: path.stat().st_size) + if not args.read_stdin: + # Set a value that passes type checking but will crash if opened. + stdin_fd = -1 + else: + # multiprocessing closes subprocesses' stdin. + # Migrate it to a new fd they can read. + stdin_fd = os.dup(sys.stdin.fileno()) + os.set_inheritable(stdin_fd, True) + args.odf_paths.append(cliutil.STDSTREAM_PATH) + args.odf_paths.reverse() + + queue_key = id(args) + queue = _QUEUES[queue_key] = multiprocessing.SimpleQueue() + # `args.jobs - 1` because the main process does enough work to count as one. + max_procs = max(1, min(args.jobs - 1, len(args.odf_paths))) + with futmod.ProcessPoolExecutor(max_procs) as pool: + procs = {pool.submit( + enqueue_links, odf_path, args.relative_to, queue_key, stdin_fd, + ) for odf_path in args.odf_paths} + procs_left = len(procs) + seen: Set[Path] = set() + while procs_left: + path = queue.get() + if path is None: + procs_left -= 1 + elif path not in seen: + seen.add(path) + if not path.exists(): + logger.warning("path %s not found", path) + print(path, end=args.delimiter, file=stdout) returncode = 0 - links: Set[Path] = set() - for odf_path in args.odf_paths: + for odf_path, proc in zip(args.odf_paths, procs): try: - with cliutil.bytes_output(odf_path, sys.stdin, 'r') as odf_file: - odf_doc = odf.opendocument.load(odf_file) - links.update(extract_links(odf_doc, args.relative_to or odf_path)) + proc.result() except IOError as error: logger.error("error reading %s: %s", odf_path, error.strerror) returncode = os.EX_DATAERR except BadZipFile as error: logger.error("error parsing %s: %s", odf_path, error.args[0]) returncode = os.EX_DATAERR - - for link in links: - if not link.exists(): - logger.warning("path %s not found", link) - print(link, end=args.delimiter, file=stdout) return returncode entry_point = cliutil.make_entry_point(__name__, PROGNAME)