445 lines
14 KiB
Python
445 lines
14 KiB
Python
"""fill.py - PDF writer class"""
|
|
# Copyright © 2021 Brett Smith
|
|
# License: AGPLv3-or-later WITH Beancount-Plugin-Additional-Permission-1.0
|
|
#
|
|
# Full copyright and licensing details can be found at toplevel file
|
|
# LICENSE.txt in the repository.
|
|
|
|
import argparse
|
|
import contextlib
|
|
import inspect
|
|
import itertools
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
|
|
from codecs import BOM_UTF16_BE
|
|
from pathlib import Path
|
|
|
|
import yaml
|
|
|
|
from pdfminer import psparser # type:ignore[import]
|
|
from pdfminer.pdfdocument import PDFDocument # type:ignore[import]
|
|
from pdfminer.pdfparser import PDFParser # type:ignore[import]
|
|
from pdfminer.pdftypes import resolve1 # type:ignore[import]
|
|
|
|
from . import fields as fieldmod
|
|
from . import utils as pdfutils
|
|
from .. import cliutil
|
|
|
|
from typing import (
|
|
Any,
|
|
BinaryIO,
|
|
Dict,
|
|
Iterator,
|
|
List,
|
|
Mapping,
|
|
Match,
|
|
NamedTuple,
|
|
Optional,
|
|
Sequence,
|
|
TextIO,
|
|
Tuple,
|
|
Type,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
EmitBytes = Iterator[bytes]
|
|
FieldSource = Mapping[str, Any]
|
|
|
|
PROGNAME = 'pdfform-fill'
|
|
logger = logging.getLogger('conservancy_beancount.pdfforms.extract')
|
|
|
|
SUPPORTED_VALUE_TYPES: Mapping[Type[fieldmod.FormField], Tuple[Type, ...]] = {
|
|
ft: inspect.signature(ft.set_value).parameters['value'].annotation.__args__
|
|
for ft in vars(fieldmod).values()
|
|
if isinstance(ft, type)
|
|
and issubclass(ft, fieldmod.FormField)
|
|
and ft is not fieldmod.FormField
|
|
}
|
|
|
|
class PDFWriter:
|
|
"""Convert an arbitrary Python object out to PDF"""
|
|
HEADER = b'''%FDF-1.2
|
|
%\xe2\xe3\xcf\xd3
|
|
1 0 obj
|
|
'''
|
|
FOOTER = b'''
|
|
endobj
|
|
trailer
|
|
<</Root 1 0 R>>
|
|
%%EOF
|
|
'''
|
|
# From the PDF spec section 7.3.5 "Name Objects"
|
|
LITERAL_ESC_RE = re.compile(b'[^\x21\x22\x24-\x7e]+')
|
|
STRING_ESC = {ord(c): f'\\{c}' for c in '()\\'}
|
|
|
|
@staticmethod
|
|
def escape_literal(match: Match[bytes]) -> bytes:
|
|
return b''.join(
|
|
hex(c).replace('0x', '#', 1).encode('ascii')
|
|
for c in match.group(0)
|
|
)
|
|
|
|
def emit_array(self, obj: Sequence[Any]) -> EmitBytes:
|
|
yield b'[\n'
|
|
for item in obj:
|
|
yield from self.emit(item)
|
|
yield b'\n'
|
|
yield b']'
|
|
|
|
def emit_boolean(self, obj: bool) -> EmitBytes:
|
|
yield b'true' if obj else b'false'
|
|
|
|
def emit_dictionary(self, obj: Mapping[str, Any]) -> EmitBytes:
|
|
yield b'<<\n'
|
|
for key, value in obj.items():
|
|
yield from self.emit_literal(key)
|
|
yield b' '
|
|
yield from self.emit(value)
|
|
yield b'\n'
|
|
yield b'>>'
|
|
|
|
def emit_literal(self, obj: Union[str, psparser.PSLiteral]) -> EmitBytes:
|
|
if isinstance(obj, psparser.PSLiteral):
|
|
obj = cast(str, obj.name)
|
|
yield b'/'
|
|
yield self.LITERAL_ESC_RE.sub(self.escape_literal, obj.encode('ascii'))
|
|
|
|
def emit_null(self, obj: None=None) -> EmitBytes:
|
|
yield b'null'
|
|
|
|
def emit_number(self, obj: Union[int, float]) -> EmitBytes:
|
|
yield str(obj).encode('ascii')
|
|
|
|
def emit_string(self, obj: str) -> EmitBytes:
|
|
yield b'('
|
|
yield pdfutils.encode_text(obj.translate(self.STRING_ESC))
|
|
yield b')'
|
|
|
|
def emit(self, obj: Any) -> EmitBytes:
|
|
if obj is None:
|
|
yield from self.emit_null(obj)
|
|
elif isinstance(obj, bool):
|
|
yield from self.emit_boolean(obj)
|
|
elif isinstance(obj, psparser.PSLiteral):
|
|
yield from self.emit_literal(obj)
|
|
elif isinstance(obj, (int, float)):
|
|
yield from self.emit_number(obj)
|
|
elif isinstance(obj, str):
|
|
yield from self.emit_string(obj)
|
|
elif isinstance(obj, bytes):
|
|
raise ValueError("can't emit raw bytes")
|
|
elif isinstance(obj, Mapping):
|
|
yield from self.emit_dictionary(obj)
|
|
elif isinstance(obj, Sequence):
|
|
yield from self.emit_array(obj)
|
|
else:
|
|
raise ValueError(f"don't know how to emit {type(obj).__name__}")
|
|
|
|
def write_document(self, obj: Any, out_file: BinaryIO) -> None:
|
|
out_file.write(self.HEADER)
|
|
for out_bytes in self.emit(obj):
|
|
out_file.write(out_bytes)
|
|
out_file.write(self.FOOTER)
|
|
|
|
|
|
class FillProblem(NamedTuple):
|
|
level: int
|
|
yaml_index: int
|
|
name: Optional[str]
|
|
errdesc: str
|
|
|
|
def log(self, logger: logging.Logger=logger) -> None:
|
|
logger.log(
|
|
self.level,
|
|
"YAML form field #%d%s%s",
|
|
self.yaml_index + 1,
|
|
' ' if self.name is None else f' ({self.name}) ',
|
|
self.errdesc,
|
|
)
|
|
|
|
|
|
def _ensure_field(
|
|
field_map: Dict[str, fieldmod.FormField],
|
|
key: str,
|
|
field: Optional[FieldSource]=None,
|
|
yaml_index: int=-2,
|
|
) -> Tuple[fieldmod.FormField, Optional[FillProblem]]:
|
|
try:
|
|
return (field_map[key], None)
|
|
except KeyError:
|
|
if field is None:
|
|
field = {}
|
|
problem: Optional[FillProblem] = None
|
|
parent_key, _, kid_name = key.rpartition('.')
|
|
kid_source: fieldmod.FieldSource = {'T': pdfutils.encode_text(kid_name)}
|
|
try:
|
|
field_type = field['type']
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
try:
|
|
field_type = fieldmod.FieldType[field_type.title()].value
|
|
except KeyError:
|
|
problem = FillProblem(
|
|
logging.ERROR, yaml_index, key,
|
|
f"has unknown FDF type {field_type!r}",
|
|
)
|
|
kid_source['FT'] = psparser.PSLiteralTable.intern(field_type)
|
|
try:
|
|
options = iter(field['options'])
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
kid_source['AP'] = {'N': {opt: None for opt in options}}
|
|
kid = fieldmod.FormField.by_type(kid_source)
|
|
if parent_key:
|
|
parent, _ = _ensure_field(field_map, parent_key)
|
|
parent.add_kid(kid)
|
|
field_map[key] = kid
|
|
return (kid, problem)
|
|
|
|
def _set_field_value(
|
|
field: fieldmod.FormField,
|
|
value: Any,
|
|
yaml_index: int=-2,
|
|
yaml_key: Optional[str]=None,
|
|
) -> Iterator[FillProblem]:
|
|
set_ok = True
|
|
if value is not None:
|
|
field_type = type(field)
|
|
try:
|
|
set_ok = isinstance(value, SUPPORTED_VALUE_TYPES[field_type])
|
|
except KeyError:
|
|
yield FillProblem(logging.ERROR, yaml_index, yaml_key,
|
|
"assigns a value to an unsupported field type")
|
|
else:
|
|
# bools shouldn't be allowed in as ints for this purpose.
|
|
if set_ok and isinstance(value, bool):
|
|
set_ok = any(issubclass(t, bool)
|
|
for t in SUPPORTED_VALUE_TYPES[field_type])
|
|
if not set_ok:
|
|
set_type = type(value).__name__
|
|
yield FillProblem(logging.ERROR, yaml_index, yaml_key,
|
|
f"assigns a {set_type} value to a {field_type.__name__}")
|
|
if field.is_readonly():
|
|
yield FillProblem(logging.WARNING, yaml_index, yaml_key,
|
|
"assigns a value to a readonly field")
|
|
if set_ok:
|
|
field.set_value(value)
|
|
|
|
def generate_form(
|
|
form_source: Sequence[FieldSource],
|
|
) -> Tuple[Sequence[FieldSource], Sequence[FillProblem]]:
|
|
problems: List[FillProblem] = []
|
|
field_map: Dict[str, fieldmod.FormField] = {}
|
|
for index, fill in enumerate(form_source):
|
|
try:
|
|
field_key = fill['fdf']['name']
|
|
except KeyError:
|
|
problems.append(FillProblem(logging.ERROR, index, None, "has no FDF name"))
|
|
continue
|
|
field, problem = _ensure_field(field_map, field_key, fill['fdf'], index)
|
|
if problem is not None:
|
|
problems.append(problem)
|
|
try:
|
|
set_value = fill['value']
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
problems.extend(_set_field_value(field, set_value, index, field_key))
|
|
fields = [
|
|
field.as_filled_fdf()
|
|
for key, field in field_map.items()
|
|
if '.' not in key
|
|
]
|
|
return (fields, problems)
|
|
|
|
def merge_form(
|
|
form_fills: Sequence[FieldSource],
|
|
form_source: Sequence[fieldmod.FieldSource],
|
|
) -> Tuple[Sequence[FieldSource], Sequence[FillProblem]]:
|
|
problems: List[FillProblem] = []
|
|
field_list = [fieldmod.FormField.by_type(resolve1(field)) for field in form_source]
|
|
field_map = dict(
|
|
kvpair
|
|
for field in field_list
|
|
for kvpair in field.as_mapping()
|
|
)
|
|
for index, fill in enumerate(form_fills):
|
|
try:
|
|
field_key = fill['fdf']['name']
|
|
except KeyError:
|
|
problems.append(FillProblem(logging.ERROR, index, None, "has no FDF name"))
|
|
continue
|
|
try:
|
|
field = field_map[field_key]
|
|
except KeyError:
|
|
problems.append(FillProblem(
|
|
logging.ERROR, index, field_key,
|
|
"refers to a field that does not exist in the source form",
|
|
))
|
|
continue
|
|
try:
|
|
expect_type = fieldmod.FieldType[fill['fdf']['type'].title()]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
try:
|
|
actual_type = field.field_type()
|
|
except ValueError:
|
|
type_name: Optional[str] = None
|
|
else:
|
|
type_name = actual_type.value
|
|
if expect_type.value != type_name:
|
|
problems.append(FillProblem(
|
|
logging.WARNING, index, field_key,
|
|
f"has type {expect_type.name} but source has type {type_name}",
|
|
))
|
|
try:
|
|
set_value = fill['value']
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
problems.extend(_set_field_value(field, set_value, index, field_key))
|
|
return ([field.as_filled_fdf() for field in field_list], problems)
|
|
|
|
def parse_arguments(arglist: Optional[Sequence[str]]=None) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(prog=PROGNAME)
|
|
cliutil.add_version_argument(parser)
|
|
cliutil.add_loglevel_argument(parser)
|
|
parser.add_argument(
|
|
'--force',
|
|
action='count',
|
|
default=0,
|
|
help="""Continue with filling the PDF even if there are problems in the
|
|
input YAML. Pass this option twice to continue even with major problems.
|
|
""")
|
|
parser.add_argument(
|
|
'--pdftk',
|
|
type=Path,
|
|
default=Path('pdftk'),
|
|
help="""Path of the `pdftk` executable.
|
|
Default searched from your $PATH.
|
|
""")
|
|
parser.add_argument(
|
|
'--form-key', '-f',
|
|
metavar='KEY',
|
|
help="""Key in the document catalog with form data.
|
|
Default is guessed by examining the document.
|
|
""")
|
|
parser.add_argument(
|
|
'--output-file', '-O',
|
|
metavar='PATH',
|
|
type=Path,
|
|
help="""Write output to this file, or stdout when PATH is `-`.
|
|
Default is generated from the input filename.
|
|
""")
|
|
parser.add_argument(
|
|
'yaml_file',
|
|
type=Path,
|
|
help="""YAML file with values generated from pdfform-extract
|
|
""")
|
|
parser.add_argument(
|
|
'pdf_file',
|
|
nargs='?',
|
|
type=Path,
|
|
help="""PDF file with forms to fill. If omitted, pdfform-fill generates
|
|
FDF output that you can give to `pdftk fill_form` later.
|
|
""")
|
|
return parser.parse_args(arglist)
|
|
|
|
def change_suffix(path: Path, suffix: str, backup: str='_filled') -> Path:
|
|
if path.suffix == suffix:
|
|
return path.with_name(f'{path.stem}{backup}{suffix}')
|
|
else:
|
|
return path.with_suffix(suffix)
|
|
|
|
def main(arglist: Optional[Sequence[str]]=None,
|
|
stdout: TextIO=sys.stdout,
|
|
stderr: TextIO=sys.stderr,
|
|
) -> int:
|
|
args = parse_arguments(arglist)
|
|
cliutil.set_loglevel(logger, args.loglevel)
|
|
|
|
with args.yaml_file.open() as yaml_file:
|
|
try:
|
|
yaml_source = yaml.safe_load(yaml_file)
|
|
except yaml.error.YAMLError as error:
|
|
logger.critical("error parsing %s: %s", args.yaml_file, error)
|
|
return os.EX_NOINPUT
|
|
if not isinstance(yaml_source.get('fields'), list):
|
|
logger.critical("YAML file does not include a list of fields")
|
|
return os.EX_NOINPUT
|
|
|
|
if args.pdf_file is None:
|
|
fill_mode = False
|
|
if args.form_key is None:
|
|
args.form_key = yaml_source.get('form key', 'FDF')
|
|
fields, problems = generate_form(yaml_source['fields'])
|
|
else:
|
|
with args.pdf_file.open('rb') as pdf_file:
|
|
parser = PDFParser(pdf_file)
|
|
pdf_doc = PDFDocument(parser)
|
|
if args.form_key is None:
|
|
try:
|
|
args.form_key = pdfutils.guess_form_key(pdf_doc)
|
|
except ValueError as error:
|
|
logger.error("%s", error.args[0])
|
|
logger.info("you can specify a form key using --form-key")
|
|
return os.EX_NOINPUT
|
|
fields, problems = merge_form(
|
|
yaml_source['fields'],
|
|
resolve1(pdf_doc.catalog[args.form_key])['Fields'],
|
|
)
|
|
fill_mode = cliutil.can_run(['pdftk', '--version'])
|
|
if not fill_mode:
|
|
logger.warning("cannot run pdftk to fill the PDF form; writing FDF instead")
|
|
|
|
worst_problem = -1
|
|
for problem in problems:
|
|
problem.log(logger)
|
|
worst_problem = max(worst_problem, problem.level)
|
|
if args.force > 1:
|
|
problems_fatal = False
|
|
elif args.force == 1:
|
|
problems_fatal = worst_problem > logging.WARNING
|
|
else:
|
|
problems_fatal = worst_problem >= 0
|
|
if problems_fatal:
|
|
return os.EX_DATAERR
|
|
|
|
if args.output_file is None:
|
|
args.output_file = change_suffix(
|
|
args.pdf_file or args.yaml_file,
|
|
'.pdf' if fill_mode else '.fdf',
|
|
)
|
|
logger.info("writing output to %s", args.output_file)
|
|
out_writer = PDFWriter()
|
|
# pdftk always expects form fill data to be under the `FDF` key,
|
|
# regardless of what the original PDF uses.
|
|
out_doc = {'FDF': {'Fields': fields}}
|
|
with contextlib.ExitStack() as exit_stack:
|
|
if fill_mode:
|
|
pdftk = exit_stack.enter_context(subprocess.Popen([
|
|
args.pdftk, str(args.pdf_file),
|
|
'fill_form', '-',
|
|
'output', str(args.output_file),
|
|
], stdin=subprocess.PIPE))
|
|
out_file = exit_stack.enter_context(cast(BinaryIO, pdftk.stdin))
|
|
else:
|
|
out_file = cliutil.bytes_output(args.output_file, stdout)
|
|
out_writer.write_document(out_doc, out_file)
|
|
try:
|
|
return pdftk.returncode
|
|
except NameError:
|
|
return os.EX_OK
|
|
|
|
entry_point = cliutil.make_entry_point(__name__, PROGNAME)
|
|
|
|
if __name__ == '__main__':
|
|
exit(entry_point())
|