194 lines
6.5 KiB
Python
194 lines
6.5 KiB
Python
#!/usr/bin/env python3
|
|
"""extract_odf_links.py - Tool to extract links from ODF documents
|
|
|
|
Given one or more ODF documents, this tool finds links that refer to the local
|
|
filesystem, and writes their full paths to stdout.
|
|
"""
|
|
# Copyright © 2020 Brett Smith
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import argparse
|
|
import concurrent.futures as futmod
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import sys
|
|
import urllib.parse
|
|
|
|
from pathlib import Path
|
|
from zipfile import BadZipFile
|
|
|
|
import odf.opendocument # type:ignore[import]
|
|
import odf.text # type:ignore[import]
|
|
|
|
from typing import (
|
|
Dict,
|
|
Hashable,
|
|
Iterator,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
TextIO,
|
|
)
|
|
|
|
from .. import cliutil
|
|
|
|
# This should be Queue[Optional[Path]] but that requires newer typeshed
|
|
PathQ = multiprocessing.SimpleQueue
|
|
|
|
PROGNAME = 'extract-odf-links'
|
|
_QUEUES: Dict[Hashable, PathQ] = {}
|
|
logger = logging.getLogger('conservancy_beancount.tools.extract_odf_links')
|
|
|
|
def parse_delimiter(arg: str) -> str:
|
|
try:
|
|
retval = eval('"{}"'.format(arg.replace('"', r'\"')), {})
|
|
except SyntaxError:
|
|
retval = None
|
|
if isinstance(retval, str):
|
|
return retval
|
|
else:
|
|
raise ValueError(f"not a valid string: {arg!r}")
|
|
|
|
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(prog=PROGNAME)
|
|
cliutil.add_version_argument(parser)
|
|
cliutil.add_loglevel_argument(parser)
|
|
cliutil.add_jobs_argument(parser)
|
|
parser.add_argument(
|
|
'--delimiter', '-d',
|
|
metavar='TEXT',
|
|
type=parse_delimiter,
|
|
default='\\n',
|
|
help="""String to output between links. Accepts all backslash escapes
|
|
supported in Python like \\n, \\t, \\0, \\u, etc. Default `%(default)s`.
|
|
""")
|
|
parser.add_argument(
|
|
'--zero', '--null', '-z', '-0',
|
|
action='store_const',
|
|
dest='delimiter',
|
|
const='\0',
|
|
help="""Shortcut for --delimiter=\\0
|
|
""")
|
|
parser.add_argument(
|
|
'--relative-to', '-r',
|
|
metavar='PATH',
|
|
type=Path,
|
|
help="""Try to resolve all links relative to this path, rather than each
|
|
spreadsheet's path
|
|
""")
|
|
parser.add_argument(
|
|
'odf_paths',
|
|
metavar='ODF_PATH',
|
|
type=Path,
|
|
nargs=argparse.ONE_OR_MORE,
|
|
help="""ODF file(s) to extract links from
|
|
""")
|
|
args = parser.parse_args(arglist)
|
|
paths_count = len(args.odf_paths)
|
|
args.odf_paths = [path for path in args.odf_paths if path != cliutil.STDSTREAM_PATH]
|
|
args.read_stdin = paths_count > len(args.odf_paths)
|
|
if args.relative_to is None:
|
|
if args.read_stdin:
|
|
parser.error("--relative-to is required to read from stdin")
|
|
elif args.relative_to.is_dir() or not args.relative_to.exists():
|
|
args.relative_to /= 'PathStub.ods'
|
|
return args
|
|
|
|
def extract_links(odf_doc: odf.opendocument.OpenDocument, rel_path: Path) -> Iterator[Path]:
|
|
for a_elem in odf_doc.getElementsByType(odf.text.A):
|
|
parts = urllib.parse.urlparse(a_elem.getAttribute('href') or '')
|
|
if (parts.scheme and parts.scheme != 'file') or not parts.path:
|
|
continue
|
|
path = Path(urllib.parse.unquote(parts.path))
|
|
if not path.is_absolute():
|
|
path = (rel_path / path).resolve()
|
|
try:
|
|
path.relative_to(rel_path)
|
|
except ValueError:
|
|
pass
|
|
else:
|
|
# Link points to another document inside the ODF. Skip it.
|
|
continue
|
|
yield path
|
|
|
|
def enqueue_links(odf_path: Path,
|
|
rel_path: Optional[Path],
|
|
queue_key: Hashable,
|
|
stdin_fd: int,
|
|
) -> None:
|
|
queue = _QUEUES[queue_key]
|
|
try:
|
|
with cliutil.bytes_output(odf_path, stdin_fd, 'r') as odf_file:
|
|
odf_doc = odf.opendocument.load(odf_file)
|
|
for path in extract_links(odf_doc, rel_path or odf_path):
|
|
queue.put(path)
|
|
finally:
|
|
queue.put(None)
|
|
|
|
def main(arglist: Optional[Sequence[str]]=None,
|
|
stdout: TextIO=sys.stdout,
|
|
stderr: TextIO=sys.stderr,
|
|
) -> int:
|
|
args = parse_arguments(arglist)
|
|
cliutil.set_loglevel(logger, args.loglevel)
|
|
args.odf_paths.sort(key=lambda path: path.stat().st_size)
|
|
if not args.read_stdin:
|
|
# Set a value that passes type checking but will crash if opened.
|
|
stdin_fd = -1
|
|
else:
|
|
# multiprocessing closes subprocesses' stdin.
|
|
# Migrate it to a new fd they can read.
|
|
stdin_fd = os.dup(sys.stdin.fileno())
|
|
os.set_inheritable(stdin_fd, True)
|
|
args.odf_paths.append(cliutil.STDSTREAM_PATH)
|
|
args.odf_paths.reverse()
|
|
|
|
queue_key = id(args)
|
|
queue = _QUEUES[queue_key] = multiprocessing.SimpleQueue()
|
|
# `args.jobs - 1` because the main process does enough work to count as one.
|
|
max_procs = max(1, min(args.jobs - 1, len(args.odf_paths)))
|
|
with futmod.ProcessPoolExecutor(max_procs) as pool:
|
|
procs = {pool.submit(
|
|
enqueue_links, odf_path, args.relative_to, queue_key, stdin_fd,
|
|
) for odf_path in args.odf_paths}
|
|
procs_left = len(procs)
|
|
seen: Set[Path] = set()
|
|
while procs_left:
|
|
path = queue.get()
|
|
if path is None:
|
|
procs_left -= 1
|
|
elif path not in seen:
|
|
seen.add(path)
|
|
if not path.exists():
|
|
logger.warning("link path not found: %s", path)
|
|
print(path, end=args.delimiter, file=stdout)
|
|
|
|
returncode = 0
|
|
for odf_path, proc in zip(args.odf_paths, procs):
|
|
try:
|
|
proc.result()
|
|
except IOError as error:
|
|
logger.error("error reading %s: %s", odf_path, error.strerror)
|
|
returncode = os.EX_DATAERR
|
|
except BadZipFile as error:
|
|
logger.error("error parsing %s: %s", odf_path, error.args[0])
|
|
returncode = os.EX_DATAERR
|
|
return returncode
|
|
|
|
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
|
|
|
|
if __name__ == '__main__':
|
|
exit(entry_point())
|