356 lines
12 KiB
Python
Executable file
356 lines
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import contextlib
|
|
import functools
|
|
import locale
|
|
import os
|
|
import pathlib
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
try:
|
|
import markdown
|
|
from markdown.extensions import sane_lists as mdx_sane_lists
|
|
from markdown.extensions import smarty as mdx_smarty
|
|
from markdown.extensions import tables as mdx_tables
|
|
from markdown.extensions import toc as mdx_toc
|
|
markdown_import_success = True
|
|
except ImportError:
|
|
if __name__ != '__main__':
|
|
raise
|
|
markdown_import_success = False
|
|
|
|
TEMPLATE_HEADER = """{% extends "base_projects.html" %}
|
|
{% block subtitle %}{% endblock %}
|
|
{% block submenuselection %}Policies{% endblock %}
|
|
{% block content %}
|
|
|
|
"""
|
|
|
|
TEMPLATE_FOOTER = """
|
|
|
|
{% endblock %}
|
|
"""
|
|
|
|
@contextlib.contextmanager
|
|
def run(cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
|
|
kwargs.setdefault('stdout', subprocess.PIPE)
|
|
if encoding is None:
|
|
mode = 'rb'
|
|
no_data = b''
|
|
else:
|
|
mode = 'r'
|
|
no_data = ''
|
|
with contextlib.ExitStack() as exit_stack:
|
|
proc = exit_stack.enter_context(subprocess.Popen(cmd, **kwargs))
|
|
pipes = [exit_stack.enter_context(open(
|
|
getattr(proc, name).fileno(), mode, encoding=encoding, closefd=False))
|
|
for name in ['stdout', 'stderr']
|
|
if kwargs.get(name) is subprocess.PIPE]
|
|
if pipes:
|
|
yield (proc, *pipes)
|
|
else:
|
|
yield proc
|
|
for pipe in pipes:
|
|
for _ in iter(lambda: pipe.read(4096), no_data):
|
|
pass
|
|
if proc.returncode not in ok_exitcodes:
|
|
raise subprocess.CalledProcessError(proc.returncode, cmd)
|
|
|
|
class GitPath:
|
|
GIT_BIN = shutil.which('git')
|
|
CLEAN_ENV = {k: v for k, v in os.environ.items() if not k.startswith('GIT_')}
|
|
ANY_EXITCODE = range(-256, 257)
|
|
IGNORE_ERRORS = {
|
|
'ok_exitcodes': ANY_EXITCODE,
|
|
'stderr': subprocess.DEVNULL,
|
|
}
|
|
STATUS_CLEAN_OR_UNMANAGED = frozenset(' ?')
|
|
|
|
def __init__(self, path, encoding, env=None):
|
|
self.path = path
|
|
self.dir_path = path if path.is_dir() else path.parent
|
|
self.encoding = encoding
|
|
self.run_defaults = {
|
|
'cwd': str(self.dir_path),
|
|
'env': env,
|
|
}
|
|
|
|
@classmethod
|
|
def can_run(cls):
|
|
return cls.GIT_BIN is not None
|
|
|
|
def _run(self, cmd, encoding=None, ok_exitcodes=frozenset([0]), **kwargs):
|
|
return run(cmd, encoding, ok_exitcodes, **self.run_defaults, **kwargs)
|
|
|
|
def _cache(orig_func):
|
|
attr_name = '_cached_' + orig_func.__name__
|
|
@functools.wraps(orig_func)
|
|
def cache_wrapper(self):
|
|
try:
|
|
return getattr(self, attr_name)
|
|
except AttributeError:
|
|
setattr(self, attr_name, orig_func(self))
|
|
return getattr(self, attr_name)
|
|
return cache_wrapper
|
|
|
|
@_cache
|
|
def is_work_tree(self):
|
|
with self._run([self.GIT_BIN, 'rev-parse', '--is-inside-work-tree'],
|
|
self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
|
|
return stdout.readline() == 'true\n'
|
|
|
|
@_cache
|
|
def status_lines(self):
|
|
with self._run([self.GIT_BIN, 'status', '-z'],
|
|
self.encoding) as (_, stdout):
|
|
return stdout.read().split('\0')
|
|
|
|
@_cache
|
|
def has_managed_modifications(self):
|
|
return any(line and line[1] not in self.STATUS_CLEAN_OR_UNMANAGED
|
|
for line in self.status_lines())
|
|
|
|
@_cache
|
|
def has_staged_changes(self):
|
|
return any(line and line[0] not in self.STATUS_CLEAN_OR_UNMANAGED
|
|
for line in self.status_lines())
|
|
|
|
def commit_at(self, revision):
|
|
with self._run([self.GIT_BIN, 'rev-parse', revision],
|
|
self.encoding) as (_, stdout):
|
|
return stdout.readline().rstrip('\n') or None
|
|
|
|
@_cache
|
|
def upstream_commit(self):
|
|
return self.commit_at('@{upstream}')
|
|
|
|
@_cache
|
|
def head_commit(self):
|
|
return self.commit_at('HEAD')
|
|
|
|
def in_sync_with_upstream(self):
|
|
return self.upstream_commit() == self.head_commit()
|
|
|
|
@_cache
|
|
def last_commit(self):
|
|
with self._run([self.GIT_BIN, 'log', '-n1', '--format=format:%H', self.path.name],
|
|
self.encoding, **self.IGNORE_ERRORS) as (_, stdout):
|
|
return stdout.readline().rstrip('\n') or None
|
|
|
|
def operate(self, subcmd, ok_exitcodes=frozenset([0])):
|
|
with self._run([self.GIT_BIN, *subcmd], None, ok_exitcodes, stdout=None):
|
|
pass
|
|
|
|
|
|
def add_parser_flag(argparser, dest, **kwargs):
|
|
kwargs.update(dest=dest, default=None)
|
|
switch_root = dest.replace('_', '-')
|
|
switch = '--' + switch_root
|
|
argparser.add_argument(switch, **kwargs, action='store_true')
|
|
kwargs['help'] = "Do not do {}".format(switch)
|
|
argparser.add_argument('--no-' + switch_root, **kwargs, action='store_false')
|
|
|
|
def parse_arguments(arglist):
|
|
parser = argparse.ArgumentParser(
|
|
epilog="""By default, the program will pull from Git if the output path
|
|
is a Git checkout with a tracking branch, and will commit and push if
|
|
that checkout is in sync with the tracking branch without any staged changes.
|
|
Setting any flag will always override the default behavior.
|
|
""",
|
|
)
|
|
|
|
parser.add_argument(
|
|
'--encoding', '-E',
|
|
default=locale.getpreferredencoding(),
|
|
help="Encoding to use for all I/O. "
|
|
"Default is your locale's encoding.",
|
|
)
|
|
parser.add_argument(
|
|
'--revision', '-r',
|
|
help="Revision string to version the published page. "
|
|
"Default determined from the revision of the source file.",
|
|
)
|
|
add_parser_flag(
|
|
parser, 'pull',
|
|
help="Try to pull the remote tracking branch to make the checkout "
|
|
"up-to-date before making changes"
|
|
)
|
|
add_parser_flag(
|
|
parser, 'commit',
|
|
help="Commit changes to the website repository",
|
|
)
|
|
parser.add_argument(
|
|
'-m', dest='commit_message',
|
|
default="Publish {filename} revision {revision}.",
|
|
help="Message for any commit",
|
|
)
|
|
add_parser_flag(
|
|
parser, 'push',
|
|
help="Push to the remote tracking branch after committing changes",
|
|
)
|
|
parser.add_argument(
|
|
'input_path', type=pathlib.Path,
|
|
help="Path to the Conservancy policy Markdown source",
|
|
)
|
|
parser.add_argument(
|
|
'output_path', type=pathlib.Path,
|
|
nargs='?', default=pathlib.Path(__file__).parent,
|
|
help="Path to the directory to write output files",
|
|
)
|
|
|
|
if not markdown_import_success:
|
|
parser.error("""markdown module is not installed.
|
|
Try `apt install python3-markdown` or `python3 -m pip install --user Markdown`.""")
|
|
|
|
args = parser.parse_args(arglist)
|
|
args.git_output = GitPath(args.output_path, args.encoding)
|
|
if args.pull or args.commit or args.push:
|
|
if not args.git_output.can_run():
|
|
parser.error("Git operation requested but `git` not found in PATH")
|
|
elif not args.git_output.is_work_tree():
|
|
parser.error("Git operation requested but {} is not a working path".format(
|
|
args.output_path.as_posix()))
|
|
if args.revision is None:
|
|
try:
|
|
args.revision = GitPath(args.input_path, args.encoding, GitPath.CLEAN_ENV).last_commit()
|
|
except subprocess.CalledProcessError:
|
|
pass
|
|
if args.revision is None:
|
|
parser.error("no --revision specified and not found from input path")
|
|
args.output_link_path = args.git_output.dir_path / args.input_path.with_suffix('.html').name
|
|
args.output_file_path = args.output_link_path.with_suffix('.{}.html'.format(args.revision))
|
|
return args
|
|
|
|
class GitOperation:
|
|
def __init__(self, args):
|
|
self.args = args
|
|
self.git_path = args.git_output
|
|
self.exitcode = None
|
|
self.on_work_tree = self.git_path.can_run() and self.git_path.is_work_tree()
|
|
|
|
def run(self):
|
|
arg_state = getattr(self.args, self.NAME)
|
|
if arg_state is None:
|
|
arg_state = self.should_run()
|
|
if not arg_state:
|
|
return
|
|
try:
|
|
self.exitcode = self.run_git() or 0
|
|
except subprocess.CalledProcessError as error:
|
|
self.exitcode = error.returncode
|
|
|
|
|
|
class GitPull(GitOperation):
|
|
NAME = 'pull'
|
|
|
|
def should_run(self):
|
|
return self.on_work_tree and not self.git_path.has_staged_changes()
|
|
|
|
def run_git(self):
|
|
self.git_path.operate(['fetch', '--no-tags'])
|
|
self.git_path.operate(['merge', '--ff-only'])
|
|
|
|
|
|
class GitCommit(GitOperation):
|
|
NAME = 'commit'
|
|
VERB = 'committed'
|
|
|
|
def __init__(self, args):
|
|
super().__init__(args)
|
|
try:
|
|
self._should_run = ((not self.git_path.has_staged_changes())
|
|
and self.git_path.in_sync_with_upstream())
|
|
except subprocess.CalledProcessError:
|
|
self._should_run = False
|
|
|
|
def should_run(self):
|
|
return self.on_work_tree and self._should_run
|
|
|
|
def run_git(self):
|
|
self.git_path.operate([
|
|
'add', str(self.args.output_file_path), str(self.args.output_link_path),
|
|
])
|
|
commit_message = self.args.commit_message.format(
|
|
filename=self.args.output_link_path.name,
|
|
revision=self.args.revision,
|
|
)
|
|
self.git_path.operate(['commit', '-m', commit_message])
|
|
|
|
|
|
class GitPush(GitCommit):
|
|
NAME = 'push'
|
|
VERB = 'pushed'
|
|
|
|
def run_git(self):
|
|
self.git_path.operate(['push'])
|
|
|
|
|
|
def write_output(args):
|
|
converter = markdown.Markdown(
|
|
extensions=[
|
|
mdx_tables.TableExtension(),
|
|
mdx_sane_lists.SaneListExtension(),
|
|
mdx_smarty.SmartyExtension(),
|
|
mdx_toc.TocExtension(),
|
|
],
|
|
output_format='html5',
|
|
)
|
|
header = TEMPLATE_HEADER
|
|
with args.input_path.open(encoding=args.encoding) as src_file:
|
|
for line in src_file:
|
|
if line.startswith('# '):
|
|
subtitle = line[2:].replace('Software Freedom Conservancy', '').strip()
|
|
header = header.replace(
|
|
'{% block subtitle %}',
|
|
'{{% block subtitle %}}{} - '.format(subtitle),
|
|
)
|
|
break
|
|
src_file.seek(0)
|
|
body = converter.convert(src_file.read())
|
|
with tempfile.NamedTemporaryFile(
|
|
'w',
|
|
encoding=args.encoding,
|
|
dir=args.git_output.dir_path.as_posix(),
|
|
suffix='.html',
|
|
delete=False,
|
|
) as tmp_out:
|
|
try:
|
|
tmp_out.write(header)
|
|
tmp_out.write(body)
|
|
tmp_out.write(TEMPLATE_FOOTER)
|
|
tmp_out.flush()
|
|
os.rename(tmp_out.name, str(args.output_file_path))
|
|
except BaseException:
|
|
os.unlink(tmp_out.name)
|
|
raise
|
|
if args.output_link_path.is_symlink():
|
|
args.output_link_path.unlink()
|
|
args.output_link_path.symlink_to(args.output_file_path.name)
|
|
|
|
def main(arglist=None, stdout=sys.stdout, stderr=sys.stderr):
|
|
args = parse_arguments(arglist)
|
|
pull = GitPull(args)
|
|
pull.run()
|
|
if pull.exitcode:
|
|
return pull.exitcode
|
|
write_output(args)
|
|
ops = [GitCommit(args), GitPush(args)]
|
|
for op in ops:
|
|
op.run()
|
|
if op.exitcode != 0:
|
|
exitcode = op.exitcode or 0
|
|
break
|
|
else:
|
|
exitcode = 0
|
|
print(args.input_path.name, "converted,",
|
|
", ".join(op.VERB if op.exitcode == 0 else "not " + op.VERB for op in ops),
|
|
file=stdout)
|
|
return exitcode
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|
|
|