Ben Sturmfels
531a97a3c9
The directory nesting is unnecessary here and confusing to navigate. I've moved all apps to the project subdirectory, currently called "www", but soon to be renamed "conservancy". I've also moved manage.py to the top-level directory.
356 lines
12 KiB
Python
Executable file
356 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import contextlib
|
|
import functools
|
|
import locale
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
try:
|
|
import markdown
|
|
from markdown.extensions import sane_lists as mdx_sane_lists
|
|
from markdown.extensions import smarty as mdx_smarty
|
|
from markdown.extensions import tables as mdx_tables
|
|
from markdown.extensions import toc as mdx_toc
|
|
markdown_import_success = True
|
|
except ImportError:
|
|
if __name__ != '__main__':
|
|
raise
|
|
markdown_import_success = False
|
|
|
|
TEMPLATE_HEADER = """{% extends "base_projects.html" %}
|
|
{% block subtitle %}{% endblock %}
|
|
{% block submenuselection %}Policies{% endblock %}
|
|
{% block content %}
|
|
|
|
"""
|
|
|
|
TEMPLATE_FOOTER = """
|
|
|
|
{% endblock %}
|
|
"""
|
|
|
|
@contextlib.contextmanager
|
|
def run(cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
|
|
kwargs.setdefault('stdout', subprocess.PIPE)
|
|
if encoding is None:
|
|
mode = 'rb'
|
|
no_data = b''
|
|
else:
|
|
mode = 'r'
|
|
no_data = ''
|
|
with contextlib.ExitStack() as exit_stack:
|
|
proc = exit_stack.enter_context(subprocess.Popen(cmd, **kwargs))
|
|
pipes = [exit_stack.enter_context(open(
|
|
getattr(proc, name).fileno(), mode, encoding=encoding, closefd=False))
|
|
for name in ['stdout', 'stderr']
|
|
if kwargs.get(name) is subprocess.PIPE]
|
|
if pipes:
|
|
yield (proc, *pipes)
|
|
else:
|
|
yield proc
|
|
for pipe in pipes:
|
|
for _ in iter(lambda: pipe.read(4096), no_data):
|
|
pass
|
|
if proc.returncode not in ok_exitcodes:
|
|
raise subprocess.CalledProcessError(proc.returncode, cmd)
|
|
|
|
class GitPath:
|
|
GIT_BIN = shutil.which('git')
|
|
CLEAN_ENV = {k: v for k, v in os.environ.items() if not k.startswith('GIT_')}
|
|
ANY_EXITCODE = range(-256, 257)
|
|
IGNORE_ERRORS = {
|
|
'ok_exitcodes': ANY_EXITCODE,
|
|
'stderr': subprocess.DEVNULL,
|
|
}
|
|
STATUS_CLEAN_OR_UNMANAGED = frozenset(' ?')
|
|
|
|
def __init__(self, path, encoding, env=None):
|
|
self.path = path
|
|
self.dir_path = path if path.is_dir() else path.parent
|
|
self.encoding = encoding
|
|
self.run_defaults = {
|
|
'cwd': str(self.dir_path),
|
|
'env': env,
|
|
}
|
|
|
|
@classmethod
|
|
def can_run(cls):
|
|
return cls.GIT_BIN is not None
|
|
|
|
def _run(self, cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
|
|
return run(cmd, encoding, ok_exitcodes, **self.run_defaults, **kwargs)
|
|
|
|
def _cache(orig_func):
|
|
attr_name = '_cached_' + orig_func.__name__
|
|
@functools.wraps(orig_func)
|
|
def cache_wrapper(self):
|
|
try:
|
|
return getattr(self, attr_name)
|
|
except AttributeError:
|
|
setattr(self, attr_name, orig_func(self))
|
|
return getattr(self, attr_name)
|
|
return cache_wrapper
|
|
|
|
@_cache
|
|
def is_work_tree(self):
|
|
with self._run([self.GIT_BIN, 'rev-parse', '--is-inside-work-tree'],
|
|
self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
|
|
return stdout.readline() == 'true\n'
|
|
|
|
@_cache
|
|
def status_lines(self):
|
|
with self._run([self.GIT_BIN, 'status', '-z'],
|
|
self.encoding) as (_, stdout):
|
|
return stdout.read().split('\0')
|
|
|
|
@_cache
|
|
def has_managed_modifications(self):
|
|
return any(line and line[1] not in self.STATUS_CLEAN_OR_UNMANAGED
|
|
for line in self.status_lines())
|
|
|
|
@_cache
|
|
def has_staged_changes(self):
|
|
return any(line and line[0] not in self.STATUS_CLEAN_OR_UNMANAGED
|
|
for line in self.status_lines())
|
|
|
|
def commit_at(self, revision):
|
|
with self._run([self.GIT_BIN, 'rev-parse', revision],
|
|
self.encoding) as (_, stdout):
|
|
return stdout.readline().rstrip('\n') or None
|
|
|
|
@_cache
|
|
def upstream_commit(self):
|
|
return self.commit_at('@{upstream}')
|
|
|
|
@_cache
|
|
def head_commit(self):
|
|
return self.commit_at('HEAD')
|
|
|
|
def in_sync_with_upstream(self):
|
|
return self.upstream_commit() == self.head_commit()
|
|
|
|
@_cache
|
|
def last_commit(self):
|
|
with self._run([self.GIT_BIN, 'log', '-n1', '--format=format:%H', self.path.name],
|
|
self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
|
|
return stdout.readline().rstrip('\n') or None
|
|
|
|
def operate(self, subcmd, ok_exitcodes=frozenset([0])):
|
|
with self._run([self.GIT_BIN, *subcmd], None, ok_exitcodes, stdout=None):
|
|
pass
|
|
|
|
|
|
def add_parser_flag(argparser, dest, **kwargs):
|
|
kwargs.update(dest=dest, default=None)
|
|
switch_root = dest.replace('_', '-')
|
|
switch = '--' + switch_root
|
|
argparser.add_argument(switch, **kwargs, action='store_true')
|
|
kwargs['help'] = "Do not do {}".format(switch)
|
|
argparser.add_argument('--no-' + switch_root, **kwargs, action='store_false')
|
|
|
|
def parse_arguments(arglist):
|
|
parser = argparse.ArgumentParser(
|
|
epilog="""By default, the program will pull from Git if the output path
|
|
is a Git checkout with a tracking branch, and will commit and push if
|
|
that checkout is in sync with the tracking branch without any staged changes.
|
|
Setting any flag will always override the default behavior.
|
|
""",
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--encoding', '-E',
|
|
default=locale.getpreferredencoding(),
|
|
help="Encoding to use for all I/O. "
|
|
"Default is your locale's encoding.",
|
|
)
|
|
parser.add_argument(
|
|
'--revision', '-r',
|
|
help="Revision string to version the published page. "
|
|
"Default determined from the revision of the source file.",
|
|
)
|
|
add_parser_flag(
|
|
parser, 'pull',
|
|
help="Try to pull the remote tracking branch to make the checkout "
|
|
"up-to-date before making changes"
|
|
)
|
|
add_parser_flag(
|
|
parser, 'commit',
|
|
help="Commit changes to the website repository",
|
|
)
|
|
parser.add_argument(
|
|
'-m', dest='commit_message',
|
|
default="Publish {filename} revision {revision}.",
|
|
help="Message for any commit",
|
|
)
|
|
add_parser_flag(
|
|
parser, 'push',
|
|
help="Push to the remote tracking branch after committing changes",
|
|
)
|
|
parser.add_argument(
|
|
'input_path', type=pathlib.Path,
|
|
help="Path to the Conservancy policy Markdown source",
|
|
)
|
|
parser.add_argument(
|
|
'output_path', type=pathlib.Path,
|
|
nargs='?', default=pathlib.Path(__file__).parent,
|
|
help="Path to the directory to write output files",
|
|
)
|
|
|
|
if not markdown_import_success:
|
|
parser.error("""markdown module is not installed.
|
|
Try `apt install python3-markdown` or `python3 -m pip install --user Markdown`.""")
|
|
|
|
args = parser.parse_args(arglist)
|
|
args.git_output = GitPath(args.output_path, args.encoding)
|
|
if args.pull or args.commit or args.push:
|
|
if not args.git_output.can_run():
|
|
parser.error("Git operation requested but `git` not found in PATH")
|
|
elif not args.git_output.is_work_tree():
|
|
parser.error("Git operation requested but {} is not a working path".format(
|
|
args.output_path.as_posix()))
|
|
if args.revision is None:
|
|
try:
|
|
args.revision = GitPath(args.input_path, args.encoding, GitPath.CLEAN_ENV).last_commit()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
if args.revision is None:
|
|
parser.error("no --revision specified and not found from input path")
|
|
args.output_link_path = args.git_output.dir_path / args.input_path.with_suffix('.html').name
|
|
args.output_file_path = args.output_link_path.with_suffix('.{}.html'.format(args.revision))
|
|
return args
|
|
|
|
class GitOperation:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.git_path = args.git_output
|
|
self.exitcode = None
|
|
self.on_work_tree = self.git_path.can_run() and self.git_path.is_work_tree()
|
|
|
|
def run(self):
|
|
arg_state = getattr(self.args, self.NAME)
|
|
if arg_state is None:
|
|
arg_state = self.should_run()
|
|
if not arg_state:
|
|
return
|
|
try:
|
|
self.exitcode = self.run_git() or 0
|
|
except subprocess.CalledProcessError as error:
|
|
self.exitcode = error.returncode
|
|
|
|
|
|
class GitPull(GitOperation):
|
|
NAME = 'pull'
|
|
|
|
def should_run(self):
|
|
return self.on_work_tree and not self.git_path.has_staged_changes()
|
|
|
|
def run_git(self):
|
|
self.git_path.operate(['fetch', '--no-tags'])
|
|
self.git_path.operate(['merge', '--ff-only'])
|
|
|
|
|
|
class GitCommit(GitOperation):
|
|
NAME = 'commit'
|
|
VERB = 'committed'
|
|
|
|
def __init__(self, args):
|
|
super().__init__(args)
|
|
try:
|
|
self._should_run = ((not self.git_path.has_staged_changes())
|
|
and self.git_path.in_sync_with_upstream())
|
|
except subprocess.CalledProcessError:
|
|
self._should_run = False
|
|
|
|
def should_run(self):
|
|
return self.on_work_tree and self._should_run
|
|
|
|
def run_git(self):
|
|
self.git_path.operate([
|
|
'add', str(self.args.output_file_path), str(self.args.output_link_path),
|
|
])
|
|
commit_message = self.args.commit_message.format(
|
|
filename=self.args.output_link_path.name,
|
|
revision=self.args.revision,
|
|
)
|
|
self.git_path.operate(['commit', '-m', commit_message])
|
|
|
|
|
|
class GitPush(GitCommit):
|
|
NAME = 'push'
|
|
VERB = 'pushed'
|
|
|
|
def run_git(self):
|
|
self.git_path.operate(['push'])
|
|
|
|
|
|
def write_output(args):
|
|
converter = markdown.Markdown(
|
|
extensions=[
|
|
mdx_tables.TableExtension(),
|
|
mdx_sane_lists.SaneListExtension(),
|
|
mdx_smarty.SmartyExtension(),
|
|
mdx_toc.TocExtension(),
|
|
],
|
|
output_format='html5',
|
|
)
|
|
header = TEMPLATE_HEADER
|
|
with args.input_path.open(encoding=args.encoding) as src_file:
|
|
for line in src_file:
|
|
if line.startswith('# '):
|
|
subtitle = line[2:].replace('Software Freedom Conservancy', '').strip()
|
|
header = header.replace(
|
|
'{% block subtitle %}',
|
|
'{{% block subtitle %}}{} - '.format(subtitle),
|
|
)
|
|
break
|
|
src_file.seek(0)
|
|
body = converter.convert(src_file.read())
|
|
with tempfile.NamedTemporaryFile(
|
|
'w',
|
|
encoding=args.encoding,
|
|
dir=args.git_output.dir_path.as_posix(),
|
|
suffix='.html',
|
|
delete=False,
|
|
) as tmp_out:
|
|
try:
|
|
tmp_out.write(header)
|
|
tmp_out.write(body)
|
|
tmp_out.write(TEMPLATE_FOOTER)
|
|
tmp_out.flush()
|
|
os.rename(tmp_out.name, str(args.output_file_path))
|
|
except BaseException:
|
|
os.unlink(tmp_out.name)
|
|
raise
|
|
if args.output_link_path.is_symlink():
|
|
args.output_link_path.unlink()
|
|
args.output_link_path.symlink_to(args.output_file_path.name)
|
|
|
|
def main(arglist=None, stdout=sys.stdout, stderr=sys.stderr):
|
|
args = parse_arguments(arglist)
|
|
pull = GitPull(args)
|
|
pull.run()
|
|
if pull.exitcode:
|
|
return pull.exitcode
|
|
write_output(args)
|
|
ops = [GitCommit(args), GitPush(args)]
|
|
for op in ops:
|
|
op.run()
|
|
if op.exitcode != 0:
|
|
exitcode = op.exitcode or 0
|
|
break
|
|
else:
|
|
exitcode = 0
|
|
print(args.input_path.name, "converted,",
|
|
", ".join(op.VERB if op.exitcode == 0 else "not " + op.VERB for op in ops),
|
|
file=stdout)
|
|
return exitcode
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|
|
|