177 lines
5.0 KiB
Python
177 lines
5.0 KiB
Python
# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
|
|
# mypy: no-warn-return-any, allow-any-generics
|
|
|
|
from __future__ import annotations
|
|
|
|
import importlib.util
|
|
import os
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
from typing import Any
|
|
from typing import Callable
|
|
from typing import Dict
|
|
from typing import List
|
|
from typing import Optional
|
|
from typing import TYPE_CHECKING
|
|
from typing import Union
|
|
|
|
from .. import util
|
|
from ..util import compat
|
|
from ..util.pyfiles import _preserving_path_as_str
|
|
|
|
if TYPE_CHECKING:
|
|
from ..config import PostWriteHookConfig
|
|
|
|
REVISION_SCRIPT_TOKEN = "REVISION_SCRIPT_FILENAME"
|
|
|
|
_registry: dict = {}
|
|
|
|
|
|
def register(name: str) -> Callable:
|
|
"""A function decorator that will register that function as a write hook.
|
|
|
|
See the documentation linked below for an example.
|
|
|
|
.. seealso::
|
|
|
|
:ref:`post_write_hooks_custom`
|
|
|
|
|
|
"""
|
|
|
|
def decorate(fn):
|
|
_registry[name] = fn
|
|
return fn
|
|
|
|
return decorate
|
|
|
|
|
|
def _invoke(
|
|
name: str,
|
|
revision_path: Union[str, os.PathLike[str]],
|
|
options: PostWriteHookConfig,
|
|
) -> Any:
|
|
"""Invokes the formatter registered for the given name.
|
|
|
|
:param name: The name of a formatter in the registry
|
|
:param revision: string path to the revision file
|
|
:param options: A dict containing kwargs passed to the
|
|
specified formatter.
|
|
:raises: :class:`alembic.util.CommandError`
|
|
"""
|
|
revision_path = _preserving_path_as_str(revision_path)
|
|
try:
|
|
hook = _registry[name]
|
|
except KeyError as ke:
|
|
raise util.CommandError(
|
|
f"No formatter with name '{name}' registered"
|
|
) from ke
|
|
else:
|
|
return hook(revision_path, options)
|
|
|
|
|
|
def _run_hooks(
|
|
path: Union[str, os.PathLike[str]], hooks: list[PostWriteHookConfig]
|
|
) -> None:
|
|
"""Invoke hooks for a generated revision."""
|
|
|
|
for hook in hooks:
|
|
name = hook["_hook_name"]
|
|
try:
|
|
type_ = hook["type"]
|
|
except KeyError as ke:
|
|
raise util.CommandError(
|
|
f"Key '{name}.type' (or 'type' in toml) is required "
|
|
f"for post write hook {name!r}"
|
|
) from ke
|
|
else:
|
|
with util.status(
|
|
f"Running post write hook {name!r}", newline=True
|
|
):
|
|
_invoke(type_, path, hook)
|
|
|
|
|
|
def _parse_cmdline_options(cmdline_options_str: str, path: str) -> List[str]:
|
|
"""Parse options from a string into a list.
|
|
|
|
Also substitutes the revision script token with the actual filename of
|
|
the revision script.
|
|
|
|
If the revision script token doesn't occur in the options string, it is
|
|
automatically prepended.
|
|
"""
|
|
if REVISION_SCRIPT_TOKEN not in cmdline_options_str:
|
|
cmdline_options_str = REVISION_SCRIPT_TOKEN + " " + cmdline_options_str
|
|
cmdline_options_list = shlex.split(
|
|
cmdline_options_str, posix=compat.is_posix
|
|
)
|
|
cmdline_options_list = [
|
|
option.replace(REVISION_SCRIPT_TOKEN, path)
|
|
for option in cmdline_options_list
|
|
]
|
|
return cmdline_options_list
|
|
|
|
|
|
def _get_required_option(options: dict, name: str) -> str:
|
|
try:
|
|
return options[name]
|
|
except KeyError as ke:
|
|
raise util.CommandError(
|
|
f"Key {options['_hook_name']}.{name} is required for post "
|
|
f"write hook {options['_hook_name']!r}"
|
|
) from ke
|
|
|
|
|
|
def _run_hook(
|
|
path: str, options: dict, ignore_output: bool, command: List[str]
|
|
) -> None:
|
|
cwd: Optional[str] = options.get("cwd", None)
|
|
cmdline_options_str = options.get("options", "")
|
|
cmdline_options_list = _parse_cmdline_options(cmdline_options_str, path)
|
|
|
|
kw: Dict[str, Any] = {}
|
|
if ignore_output:
|
|
kw["stdout"] = kw["stderr"] = subprocess.DEVNULL
|
|
|
|
subprocess.run([*command, *cmdline_options_list], cwd=cwd, **kw)
|
|
|
|
|
|
@register("console_scripts")
|
|
def console_scripts(
|
|
path: str, options: dict, ignore_output: bool = False
|
|
) -> None:
|
|
entrypoint_name = _get_required_option(options, "entrypoint")
|
|
for entry in compat.importlib_metadata_get("console_scripts"):
|
|
if entry.name == entrypoint_name:
|
|
impl: Any = entry
|
|
break
|
|
else:
|
|
raise util.CommandError(
|
|
f"Could not find entrypoint console_scripts.{entrypoint_name}"
|
|
)
|
|
|
|
command = [
|
|
sys.executable,
|
|
"-c",
|
|
f"import {impl.module}; {impl.module}.{impl.attr}()",
|
|
]
|
|
_run_hook(path, options, ignore_output, command)
|
|
|
|
|
|
@register("exec")
|
|
def exec_(path: str, options: dict, ignore_output: bool = False) -> None:
|
|
executable = _get_required_option(options, "executable")
|
|
_run_hook(path, options, ignore_output, command=[executable])
|
|
|
|
|
|
@register("module")
|
|
def module(path: str, options: dict, ignore_output: bool = False) -> None:
|
|
module_name = _get_required_option(options, "module")
|
|
|
|
if importlib.util.find_spec(module_name) is None:
|
|
raise util.CommandError(f"Could not find module {module_name}")
|
|
|
|
command = [sys.executable, "-m", module_name]
|
|
_run_hook(path, options, ignore_output, command)
|