website/conservancy/content/projects/policies/publish-policy.py

358 lines
12 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import contextlib
import functools
import locale
import os
import pathlib
import shutil
import subprocess
import sys
import tempfile
try:
import markdown
from markdown.extensions import sane_lists as mdx_sane_lists
from markdown.extensions import smarty as mdx_smarty
from markdown.extensions import tables as mdx_tables
from markdown.extensions import toc as mdx_toc
markdown_import_success = True
except ImportError:
if __name__ != '__main__':
raise
markdown_import_success = False
TEMPLATE_HEADER = """{% extends "base_projects.html" %}
{% block subtitle %}{% endblock %}
{% block submenuselection %}Policies{% endblock %}
{% block content %}
"""
TEMPLATE_FOOTER = """
{% endblock %}
"""
@contextlib.contextmanager
def run(cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
kwargs.setdefault('stdout', subprocess.PIPE)
if encoding is None:
mode = 'rb'
no_data = b''
else:
mode = 'r'
no_data = ''
with contextlib.ExitStack() as exit_stack:
proc = exit_stack.enter_context(subprocess.Popen(cmd, **kwargs))
pipes = [
exit_stack.enter_context(open(
getattr(proc, name).fileno(), mode, encoding=encoding, closefd=False))
for name in ['stdout', 'stderr']
if kwargs.get(name) is subprocess.PIPE
]
if pipes:
yield (proc, *pipes)
else:
yield proc
for pipe in pipes:
for _ in iter(lambda: pipe.read(4096), no_data):
pass
if proc.returncode not in ok_exitcodes:
raise subprocess.CalledProcessError(proc.returncode, cmd)
class GitPath:
GIT_BIN = shutil.which('git')
CLEAN_ENV = {k: v for k, v in os.environ.items() if not k.startswith('GIT_')}
ANY_EXITCODE = range(-256, 257)
IGNORE_ERRORS = {
'ok_exitcodes': ANY_EXITCODE,
'stderr': subprocess.DEVNULL,
}
STATUS_CLEAN_OR_UNMANAGED = frozenset(' ?')
def __init__(self, path, encoding, env=None):
self.path = path
self.dir_path = path if path.is_dir() else path.parent
self.encoding = encoding
self.run_defaults = {
'cwd': str(self.dir_path),
'env': env,
}
@classmethod
def can_run(cls):
return cls.GIT_BIN is not None
def _run(self, cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
return run(cmd, encoding, ok_exitcodes, **self.run_defaults, **kwargs)
def _cache(orig_func):
attr_name = '_cached_' + orig_func.__name__
@functools.wraps(orig_func)
def cache_wrapper(self):
try:
return getattr(self, attr_name)
except AttributeError:
setattr(self, attr_name, orig_func(self))
return getattr(self, attr_name)
return cache_wrapper
@_cache
def is_work_tree(self):
with self._run([self.GIT_BIN, 'rev-parse', '--is-inside-work-tree'],
self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
return stdout.readline() == 'true\n'
@_cache
def status_lines(self):
with self._run([self.GIT_BIN, 'status', '-z'],
self.encoding) as (_, stdout):
return stdout.read().split('\0')
@_cache
def has_managed_modifications(self):
return any(line and line[1] not in self.STATUS_CLEAN_OR_UNMANAGED
for line in self.status_lines())
@_cache
def has_staged_changes(self):
return any(line and line[0] not in self.STATUS_CLEAN_OR_UNMANAGED
for line in self.status_lines())
def commit_at(self, revision):
with self._run([self.GIT_BIN, 'rev-parse', revision],
self.encoding) as (_, stdout):
return stdout.readline().rstrip('\n') or None
@_cache
def upstream_commit(self):
return self.commit_at('@{upstream}')
@_cache
def head_commit(self):
return self.commit_at('HEAD')
def in_sync_with_upstream(self):
return self.upstream_commit() == self.head_commit()
@_cache
def last_commit(self):
with self._run([self.GIT_BIN, 'log', '-n1', '--format=format:%H', self.path.name],
self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
return stdout.readline().rstrip('\n') or None
def operate(self, subcmd, ok_exitcodes=frozenset([0])):
with self._run([self.GIT_BIN, *subcmd], None, ok_exitcodes, stdout=None):
pass
def add_parser_flag(argparser, dest, **kwargs):
kwargs.update(dest=dest, default=None)
switch_root = dest.replace('_', '-')
switch = '--' + switch_root
argparser.add_argument(switch, **kwargs, action='store_true')
kwargs['help'] = "Do not do {}".format(switch)
argparser.add_argument('--no-' + switch_root, **kwargs, action='store_false')
def parse_arguments(arglist):
parser = argparse.ArgumentParser(
epilog="""By default, the program will pull from Git if the output path
is a Git checkout with a tracking branch, and will commit and push if
that checkout is in sync with the tracking branch without any staged changes.
Setting any flag will always override the default behavior.
""",
)
parser.add_argument(
'--encoding', '-E',
default=locale.getpreferredencoding(),
help="Encoding to use for all I/O. "
"Default is your locale's encoding.",
)
parser.add_argument(
'--revision', '-r',
help="Revision string to version the published page. "
"Default determined from the revision of the source file.",
)
add_parser_flag(
parser, 'pull',
help="Try to pull the remote tracking branch to make the checkout "
"up-to-date before making changes"
)
add_parser_flag(
parser, 'commit',
help="Commit changes to the website repository",
)
parser.add_argument(
'-m', dest='commit_message',
default="Publish {filename} revision {revision}.",
help="Message for any commit",
)
add_parser_flag(
parser, 'push',
help="Push to the remote tracking branch after committing changes",
)
parser.add_argument(
'input_path', type=pathlib.Path,
help="Path to the Conservancy policy Markdown source",
)
parser.add_argument(
'output_path', type=pathlib.Path,
nargs='?', default=pathlib.Path(__file__).parent,
help="Path to the directory to write output files",
)
if not markdown_import_success:
parser.error("""markdown module is not installed.
Try `apt install python3-markdown` or `python3 -m pip install --user Markdown`.""")
args = parser.parse_args(arglist)
args.git_output = GitPath(args.output_path, args.encoding)
if args.pull or args.commit or args.push:
if not args.git_output.can_run():
parser.error("Git operation requested but `git` not found in PATH")
elif not args.git_output.is_work_tree():
parser.error("Git operation requested but {} is not a working path".format(
args.output_path.as_posix()))
if args.revision is None:
try:
args.revision = GitPath(args.input_path, args.encoding, GitPath.CLEAN_ENV).last_commit()
except subprocess.CalledProcessError:
pass
if args.revision is None:
parser.error("no --revision specified and not found from input path")
args.output_link_path = args.git_output.dir_path / args.input_path.with_suffix('.html').name
args.output_file_path = args.output_link_path.with_suffix('.{}.html'.format(args.revision))
return args
class GitOperation:
def __init__(self, args):
self.args = args
self.git_path = args.git_output
self.exitcode = None
self.on_work_tree = self.git_path.can_run() and self.git_path.is_work_tree()
def run(self):
arg_state = getattr(self.args, self.NAME)
if arg_state is None:
arg_state = self.should_run()
if not arg_state:
return
try:
self.exitcode = self.run_git() or 0
except subprocess.CalledProcessError as error:
self.exitcode = error.returncode
class GitPull(GitOperation):
NAME = 'pull'
def should_run(self):
return self.on_work_tree and not self.git_path.has_staged_changes()
def run_git(self):
self.git_path.operate(['fetch', '--no-tags'])
self.git_path.operate(['merge', '--ff-only'])
class GitCommit(GitOperation):
NAME = 'commit'
VERB = 'committed'
def __init__(self, args):
super().__init__(args)
try:
self._should_run = ((not self.git_path.has_staged_changes())
and self.git_path.in_sync_with_upstream())
except subprocess.CalledProcessError:
self._should_run = False
def should_run(self):
return self.on_work_tree and self._should_run
def run_git(self):
self.git_path.operate([
'add', str(self.args.output_file_path), str(self.args.output_link_path),
])
commit_message = self.args.commit_message.format(
filename=self.args.output_link_path.name,
revision=self.args.revision,
)
self.git_path.operate(['commit', '-m', commit_message])
class GitPush(GitCommit):
NAME = 'push'
VERB = 'pushed'
def run_git(self):
self.git_path.operate(['push'])
def write_output(args):
converter = markdown.Markdown(
extensions=[
mdx_tables.TableExtension(),
mdx_sane_lists.SaneListExtension(),
mdx_smarty.SmartyExtension(),
mdx_toc.TocExtension(),
],
output_format='html5',
)
header = TEMPLATE_HEADER
with args.input_path.open(encoding=args.encoding) as src_file:
for line in src_file:
if line.startswith('# '):
subtitle = line[2:].replace('Software Freedom Conservancy', '').strip()
header = header.replace(
'{% block subtitle %}',
'{{% block subtitle %}}{} - '.format(subtitle),
)
break
src_file.seek(0)
body = converter.convert(src_file.read())
with tempfile.NamedTemporaryFile(
'w',
encoding=args.encoding,
dir=args.git_output.dir_path.as_posix(),
suffix='.html',
delete=False,
) as tmp_out:
try:
tmp_out.write(header)
tmp_out.write(body)
tmp_out.write(TEMPLATE_FOOTER)
tmp_out.flush()
os.rename(tmp_out.name, str(args.output_file_path))
except BaseException:
os.unlink(tmp_out.name)
raise
if args.output_link_path.is_symlink():
args.output_link_path.unlink()
args.output_link_path.symlink_to(args.output_file_path.name)
def main(arglist=None, stdout=sys.stdout, stderr=sys.stderr):
args = parse_arguments(arglist)
pull = GitPull(args)
pull.run()
if pull.exitcode:
return pull.exitcode
write_output(args)
ops = [GitCommit(args), GitPush(args)]
for op in ops:
op.run()
if op.exitcode != 0:
exitcode = op.exitcode or 0
break
else:
exitcode = 0
print(args.input_path.name, "converted,",
", ".join(op.VERB if op.exitcode == 0 else "not " + op.VERB for op in ops),
file=stdout)
return exitcode
if __name__ == '__main__':
exit(main())