extract_odf_links: Support multiprocessing.
This cuts audit packet manifest build time in half on my 8-core laptop.
This commit is contained in:
parent
552dae6ea5
commit
42442c401f
2 changed files with 61 additions and 12 deletions
|
@ -242,7 +242,7 @@ def main(arglist: Optional[Sequence[str]]=None,
|
|||
if not args.force or len(missing_reports) == len(output_reports):
|
||||
return os.EX_UNAVAILABLE
|
||||
|
||||
arglist = [f'--delimiter={args.delimiter}']
|
||||
arglist = [f'--delimiter={args.delimiter}', f'--jobs={args.jobs}']
|
||||
if repo_path is not None:
|
||||
arglist.append(f'--relative-to={repo_path}')
|
||||
arglist.extend(str(p) for p in output_reports if p not in missing_reports)
|
||||
|
|
|
@ -20,7 +20,9 @@ filesystem, and writes their full paths to stdout.
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
import argparse
|
||||
import concurrent.futures as futmod
|
||||
import logging
|
||||
import multiprocessing
|
||||
import os
|
||||
import sys
|
||||
import urllib.parse
|
||||
|
@ -32,6 +34,8 @@ import odf.opendocument # type:ignore[import]
|
|||
import odf.text # type:ignore[import]
|
||||
|
||||
from typing import (
|
||||
Dict,
|
||||
Hashable,
|
||||
Iterator,
|
||||
Optional,
|
||||
Sequence,
|
||||
|
@ -41,7 +45,11 @@ from typing import (
|
|||
|
||||
from .. import cliutil
|
||||
|
||||
# This should be Queue[Optional[Path]] but that requires newer typeshed
|
||||
PathQ = multiprocessing.SimpleQueue
|
||||
|
||||
PROGNAME = 'extract-odf-links'
|
||||
_QUEUES: Dict[Hashable, PathQ] = {}
|
||||
logger = logging.getLogger('conservancy_beancount.tools.extract_odf_links')
|
||||
|
||||
def parse_delimiter(arg: str) -> str:
|
||||
|
@ -58,6 +66,7 @@ def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace
|
|||
parser = argparse.ArgumentParser(prog=PROGNAME)
|
||||
cliutil.add_version_argument(parser)
|
||||
cliutil.add_loglevel_argument(parser)
|
||||
cliutil.add_jobs_argument(parser)
|
||||
parser.add_argument(
|
||||
'--delimiter', '-d',
|
||||
metavar='TEXT',
|
||||
|
@ -88,8 +97,11 @@ spreadsheet's path
|
|||
help="""ODF file(s) to extract links from
|
||||
""")
|
||||
args = parser.parse_args(arglist)
|
||||
paths_count = len(args.odf_paths)
|
||||
args.odf_paths = [path for path in args.odf_paths if path != cliutil.STDSTREAM_PATH]
|
||||
args.read_stdin = paths_count > len(args.odf_paths)
|
||||
if args.relative_to is None:
|
||||
if any(path == cliutil.STDSTREAM_PATH for path in args.odf_paths):
|
||||
if args.read_stdin:
|
||||
parser.error("--relative-to is required to read from stdin")
|
||||
elif args.relative_to.is_dir() or not args.relative_to.exists():
|
||||
args.relative_to /= 'PathStub.ods'
|
||||
|
@ -112,31 +124,68 @@ def extract_links(odf_doc: odf.opendocument.OpenDocument, rel_path: Path) -> Ite
|
|||
continue
|
||||
yield path
|
||||
|
||||
def enqueue_links(odf_path: Path,
|
||||
rel_path: Optional[Path],
|
||||
queue_key: Hashable,
|
||||
stdin_fd: int,
|
||||
) -> None:
|
||||
queue = _QUEUES[queue_key]
|
||||
try:
|
||||
with cliutil.bytes_output(odf_path, stdin_fd, 'r') as odf_file:
|
||||
odf_doc = odf.opendocument.load(odf_file)
|
||||
for path in extract_links(odf_doc, rel_path or odf_path):
|
||||
queue.put(path)
|
||||
finally:
|
||||
queue.put(None)
|
||||
|
||||
def main(arglist: Optional[Sequence[str]]=None,
|
||||
stdout: TextIO=sys.stdout,
|
||||
stderr: TextIO=sys.stderr,
|
||||
) -> int:
|
||||
args = parse_arguments(arglist)
|
||||
cliutil.set_loglevel(logger, args.loglevel)
|
||||
args.odf_paths.sort(key=lambda path: path.stat().st_size)
|
||||
if not args.read_stdin:
|
||||
# Set a value that passes type checking but will crash if opened.
|
||||
stdin_fd = -1
|
||||
else:
|
||||
# multiprocessing closes subprocesses' stdin.
|
||||
# Migrate it to a new fd they can read.
|
||||
stdin_fd = os.dup(sys.stdin.fileno())
|
||||
os.set_inheritable(stdin_fd, True)
|
||||
args.odf_paths.append(cliutil.STDSTREAM_PATH)
|
||||
args.odf_paths.reverse()
|
||||
|
||||
queue_key = id(args)
|
||||
queue = _QUEUES[queue_key] = multiprocessing.SimpleQueue()
|
||||
# `args.jobs - 1` because the main process does enough work to count as one.
|
||||
max_procs = max(1, min(args.jobs - 1, len(args.odf_paths)))
|
||||
with futmod.ProcessPoolExecutor(max_procs) as pool:
|
||||
procs = {pool.submit(
|
||||
enqueue_links, odf_path, args.relative_to, queue_key, stdin_fd,
|
||||
) for odf_path in args.odf_paths}
|
||||
procs_left = len(procs)
|
||||
seen: Set[Path] = set()
|
||||
while procs_left:
|
||||
path = queue.get()
|
||||
if path is None:
|
||||
procs_left -= 1
|
||||
elif path not in seen:
|
||||
seen.add(path)
|
||||
if not path.exists():
|
||||
logger.warning("path %s not found", path)
|
||||
print(path, end=args.delimiter, file=stdout)
|
||||
|
||||
returncode = 0
|
||||
links: Set[Path] = set()
|
||||
for odf_path in args.odf_paths:
|
||||
for odf_path, proc in zip(args.odf_paths, procs):
|
||||
try:
|
||||
with cliutil.bytes_output(odf_path, sys.stdin, 'r') as odf_file:
|
||||
odf_doc = odf.opendocument.load(odf_file)
|
||||
links.update(extract_links(odf_doc, args.relative_to or odf_path))
|
||||
proc.result()
|
||||
except IOError as error:
|
||||
logger.error("error reading %s: %s", odf_path, error.strerror)
|
||||
returncode = os.EX_DATAERR
|
||||
except BadZipFile as error:
|
||||
logger.error("error parsing %s: %s", odf_path, error.args[0])
|
||||
returncode = os.EX_DATAERR
|
||||
|
||||
for link in links:
|
||||
if not link.exists():
|
||||
logger.warning("path %s not found", link)
|
||||
print(link, end=args.delimiter, file=stdout)
|
||||
return returncode
|
||||
|
||||
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
|
||||
|
|
Loading…
Reference in a new issue