Skip to content

Spells (API reference)

This is an API reference for the all the Invoke tasks and helper functions that are included in the Conjuring package.

conjuring.spells.aws

AWS: ECR login.

clean_ecr_url(c, url=None)

Clean an AWS ECR URL.

Source code in src/conjuring/spells/aws.py
def clean_ecr_url(c: Context, url: str | None = None) -> str:
    """Clean an AWS ECR URL."""
    if not url:
        account = fzf_aws_account(c)
        region = fzf_aws_region(c)
        return f"{account}.dkr.ecr.{region}.amazonaws.com"
    return urlparse(url).netloc

ecr_login(c, url='')

Log in to AWS ECR.

Using Amazon ECR with the AWS CLI - Amazon ECR

Source code in src/conjuring/spells/aws.py
@task
def ecr_login(c: Context, url: str = "") -> None:
    """Log in to AWS ECR.

    [Using Amazon ECR with the AWS CLI - Amazon ECR](https://docs.aws.amazon.com/AmazonECR/latest/userguide/getting-started-cli.html#cli-authenticate-registry)
    """
    profile = fzf_aws_profile(c)
    url = clean_ecr_url(c, url)
    run_command(
        c,
        "aws ecr get-login-password --profile",
        profile,
        "| docker login --username AWS --password-stdin",
        url,
    )

fzf_aws_account(c)

Select an AWS account from the config file.

Source code in src/conjuring/spells/aws.py
def fzf_aws_account(c: Context) -> str:
    """Select an AWS account from the config file."""
    return run_with_fzf(c, f"rg -o 'aws:iam::[^:]+' {AWS_CONFIG} | cut -d ':' -f 4 | sort -u")

fzf_aws_profile(c, partial_name=None)

Select an AWS profile from a partial profile name using fzf.

Source code in src/conjuring/spells/aws.py
def fzf_aws_profile(c: Context, partial_name: str | None = None) -> str:
    """Select an AWS profile from a partial profile name using fzf."""
    if not partial_name and (aws_profile := os.environ.get("AWS_PROFILE")) and aws_profile:
        typer.echo(f"Using env variable AWS_PROFILE (set to '{aws_profile}')")
        return aws_profile

    return run_with_fzf(c, LIST_AWS_PROFILES_COMMAND, query=partial_name or "")

fzf_aws_region(c)

Select an AWS region from the config file.

Source code in src/conjuring/spells/aws.py
def fzf_aws_region(c: Context) -> str:
    """Select an AWS region from the config file."""
    return run_with_fzf(c, f"rg -o '^region.+' {AWS_CONFIG} | tr -d ' ' | cut -d'=' -f 2 | sort -u")

list_aws_profiles(c)

List AWS profiles from the config file.

Source code in src/conjuring/spells/aws.py
def list_aws_profiles(c: Context) -> list[str]:
    """List AWS profiles from the config file."""
    return run_lines(c, LIST_AWS_PROFILES_COMMAND)

run_aws_vault(c, *pieces, profile=None)

Run AWS vault commands in a subshell, or open a subshell if no commands were provided.

Source code in src/conjuring/spells/aws.py
def run_aws_vault(c: Context, *pieces: str, profile: str | None = None) -> Result:
    """Run AWS vault commands in a subshell, or open a subshell if no commands were provided."""
    return run_command(c, "aws-vault exec", fzf_aws_profile(c, profile), "--", *pieces, pty=False)

conjuring.spells.direnv

direnv: init local dir.

init(c, source_up=False, dotenv=False, all_=False)

Configure direnv in the local dir.

Source code in src/conjuring/spells/direnv.py
@task()
def init(c: Context, source_up: bool = False, dotenv: bool = False, all_: bool = False) -> None:
    """Configure direnv in the local dir."""
    if all_:
        source_up = dotenv = True
    if not (source_up or dotenv or all_):
        print_error("Choose one of the options: --source-up, --dotenv, --all")
        return

    envrc = Path(ENVRC)
    content = envrc.read_text() if envrc.exists() else ""

    if source_up and SOURCE_UP_IF_EXISTS not in content:
        content += SOURCE_UP_IF_EXISTS_TEMPLATE

    if dotenv and DOTENV_IF_EXISTS not in content:
        content += DOTENV_IF_EXISTS_TEMPLATE

    if content:
        envrc.write_text(content)
        c.run("direnv allow")

    bat(c, ".env*")

conjuring.spells.docker

Docker: remove containers and volumes.

rm_containers(c, container='', all_=False, exited=False)

Remove Docker containers.

Source code in src/conjuring/spells/docker.py
@task(
    help={
        "container": "Container name to remove (regexp)",
        "all_": "All containers",
        "exited": "Exited containers",
    },
)
def rm_containers(c: Context, container: str = "", all_: bool = False, exited: bool = False) -> None:
    """Remove Docker containers."""
    cmd = []
    if all_:
        cmd = ["docker ps -a"]
    elif exited:
        cmd = ["docker ps -a -f status=exited"]
    elif container:
        cmd = ["docker ps -a | grep -e", container]
    if not cmd:
        print_error("Choose one argument. Run with --help to see available argument")
        return

    run_command(c, *cmd, dry=False)
    run_command(c, *cmd, "| tail +2 | awk '{print $1}' | xargs docker rm -f")
    run_command(c, *cmd)

rm_volumes(c, dangling=False)

Remove Docker volumes.

Source code in src/conjuring/spells/docker.py
@task(help=({"dangling": "Dangling volumes"}))
def rm_volumes(c: Context, dangling: bool = False) -> None:
    """Remove Docker volumes."""
    cmd = ""
    if dangling:
        cmd = 'docker volume ls -f "dangling=true"'
    if not cmd:
        print_error("Choose one argument. Run with --help to see available argument")
        return

    run_command(c, cmd, dry=False)
    run_command(c, f"docker volume rm $({cmd} -q)")
    run_command(c, cmd)

conjuring.spells.duplicity

Backup and restore with Duplicity.

backup(c)

Backup files with Duplicity.

Source code in src/conjuring/spells/duplicity.py
@task
def backup(c: Context) -> None:
    """Backup files with Duplicity."""
    host = print_hostname(c)
    backup_dir = f"file://{BACKUP_DIR}/{host}/duplicity/"
    # To back up directly on OneDrive:
    # backup_dir = f"onedrive://Backup/{host}/duplicity/"
    typer.echo(f"Backup dir: {backup_dir}")

    template_file = Path("~/dotfiles/backup-duplicity-template.cfg").expanduser()
    typer.echo(f"Template file: {template_file}")

    template_contents = template_file.read_text()
    duplicity_config = Template(template_contents).substitute({"HOME": Path.home()})

    with NamedTemporaryFile("r+", delete=False) as temp_file:
        temp_file.write(duplicity_config)
        temp_file.flush()
        run_command(
            c,
            "duplicity",
            f"--name='{host}-backup'",
            "-v info",
            f"--include-filelist={temp_file.name}",
            "--exclude='**' $HOME/",
            backup_dir,
        )

print_hostname(c)

Print the hostname of the current machine.

Source code in src/conjuring/spells/duplicity.py
def print_hostname(c: Context) -> str:
    """Print the hostname of the current machine."""
    host = c.run("hostname | sed 's/.local//'", dry=False).stdout.strip()
    typer.echo(f"Host: {host}")
    return host

restore(c)

Restore files with Duplicity. You will be prompted to choose the source dir. Restore dir is ~/Downloads.

Source code in src/conjuring/spells/duplicity.py
@task
def restore(c: Context) -> None:
    """Restore files with Duplicity. You will be prompted to choose the source dir. Restore dir is ~/Downloads."""
    print_hostname(c)
    chosen_dir = run_with_fzf(c, f"fd -d 2 -t d duplicity {BACKUP_DIR}")
    if not chosen_dir:
        return

    source_computer = Path(chosen_dir).parent.name
    c.run(f"duplicity restore file://{chosen_dir} ~/Downloads/duplicity-restore/{source_computer}/")

conjuring.spells.fork

GitHub forks: configure remote and sync.

remote(c, username, remote_='upstream')

Configure a remote for a fork.

Source code in src/conjuring/spells/fork.py
@task(
    help={
        "username": "The owner of the original repository. Required",
        "remote": "The remote to sync with (default: upstream)",
    },
)
def remote(c: Context, username: str, remote_: str = "upstream") -> None:
    """[Configure a remote for a fork](https://docs.github.com/en/github/collaborating-with-issues-and-pull-requests/configuring-a-remote-for-a-fork)."""
    if username.startswith("-"):
        msg = "Arguments should be: username [--remote]"
        raise Exit(msg)
    if not remote_:
        remote_ = username

    project = c.run(r"git remote -v | rg origin | head -1 | rg -o '/(.+)\.git' -r '$1'", pty=False).stdout.strip()
    c.run(f"git remote add {remote_} https://github.com/{username}/{project}.git", warn=True)
    c.run("git remote -v")

sync(c, remote_='upstream')

Sync a fork.

Source code in src/conjuring/spells/fork.py
@task(help={"remote": "The remote to sync with (default: upstream)"})
def sync(c: Context, remote_: str = "upstream") -> None:
    """[Sync a fork](https://docs.github.com/en/github/collaborating-with-issues-and-pull-requests/syncing-a-fork)."""
    c.run(f"git fetch {remote_}")
    existing_branch = Git(c).checkout("master", "main")
    c.run(f"git merge {remote_}/{existing_branch}")
    c.run("git push")

conjuring.spells.generic

Generic spells: list to-do items in files.

Location dataclass

Location of a to-do item in a file.

Source code in src/conjuring/spells/generic.py
@dataclass
class Location:
    """Location of a to-do item in a file."""

    path: str
    line: int | str
    comment: str

    def __post_init__(self) -> None:
        self.line = int(self.line)
        self.comment = self.comment.strip()

ToDoItem dataclass

A to-do item.

Source code in src/conjuring/spells/generic.py
@dataclass(frozen=True)
class ToDoItem:
    """A to-do item."""

    which: str
    assignee: str
    description: str

    @property
    def sort_key(self) -> str:
        """Key to sort the instance.

        String concatenation works.
        Checking both fields separately with ``and`` conditions didn't work: sort order was not as expected
        (meaning fix-me tasks first, then to-do tasks).
        """
        return f"{self.which.casefold()}-{self.assignee.casefold()}-{self.description.casefold()}"

    def __lt__(self, other: ToDoItem) -> bool:
        return self.sort_key < other.sort_key

sort_key: str property

Key to sort the instance.

String concatenation works. Checking both fields separately with and conditions didn't work: sort order was not as expected (meaning fix-me tasks first, then to-do tasks).

todo(c, cz=False, valid=True, invalid=True, short=False, priority='', markdown=False, dir_='')

List to-dos and fix-mes in code. Optionally check if the description follows Conventional Commits (cz check).

Source code in src/conjuring/spells/generic.py
@task(
    help={
        "cz": "Run commitizen (cz check) to validate the description of the to-do item as a commit message",
        "valid": "When using cz check, print valid to-do items",
        "invalid": "When using cz check, print invalid to-do items",
        "short": "Short format: only the description, without the lines of code where to-do items were found",
        "priority": "Specify an assignee and show only higher priority tasks for them"
        f" ({FIX_ME} or {TO_DO}(<assignee>)",
        "markdown": "Print the output in Markdown format",
        "dir": "Partial directory names to search for items. Use multiple times or a comma-separated list",
    },
    iterable=["dir"],
)
def todo(  # noqa: C901,PLR0913,PLR0912
    c: Context,
    cz: bool = False,
    valid: bool = True,
    invalid: bool = True,
    short: bool = False,
    priority: str = "",
    markdown: bool = False,
    dir_: str = "",
) -> None:
    """List to-dos and fix-mes in code. Optionally check if the description follows Conventional Commits (cz check)."""
    dir_names = []
    if dir_:
        for one_dir in always_iterable(dir_):
            if "," in one_dir:
                dir_names.extend(one_dir.split(","))
            else:
                dir_names.append(one_dir)
    all_todos: dict[ToDoItem, list[Location]] = _parse_all_todos(c, priority, dir_names)

    if markdown:
        _print_todos_as_markdown(all_todos, short)
        return

    for item, locations in sorted(all_todos.items()):  # type: ToDoItem, list[Location]
        func = print_success
        if cz:
            result = run_command(c, "cz check -m", quote(item.description), hide=True, warn=True)
            if result.ok:
                if not valid:
                    continue
            else:
                if not invalid:
                    continue
                func = print_error

        assignee_str = f"({item.assignee.upper()})" if item.assignee else ""
        func(f"{item.which}{assignee_str}: {item.description}")

        if short:
            continue
        for loc in locations:  # type: Location
            typer.echo(f"   {loc.path}:{loc.line} {loc.comment}")

conjuring.spells.git

Git: update all, extract subtree, rewrite history, ...

Git

Git helpers.

Source code in src/conjuring/spells/git.py
class Git:
    """Git helpers."""

    # Use "tail +2" to remove the blank line at the top
    SHOW_ALL_FILE_HISTORY = 'git log --pretty="format:" --name-only | sort -u | tail +2'

    def __init__(self, context: Context) -> None:
        self.context = context

    def current_branch(self) -> str:
        """Return the current branch name."""
        return run_stdout(self.context, "git branch --show-current")

    def default_branch(self) -> str:
        """Return the default branch name (master/main/develop/development)."""
        return run_stdout(
            self.context,
            "git branch -a | rg -o -e /master -e /develop.+ -e /main | sort -u | cut -b 2- | head -1",
        )

    def checkout(self, *branches: str) -> str:
        """Try checking out the specified branches in order."""
        for branch in branches:
            try:
                self.context.run(f"git checkout {branch}")
            except UnexpectedExit:  # noqa: PERF203
                pass
            else:
                return branch
        return ""

    @property
    def github_username(self) -> str:
        """The GitHub username configured in the global settings."""
        return global_config()["github"]["user"]

    def choose_local_branch(self, branch: str) -> str:
        """Choose a local branch."""
        return run_with_fzf(self.context, "git branch --list | rg -v develop | cut -b 3-", query=branch)

github_username: str property

The GitHub username configured in the global settings.

checkout(*branches)

Try checking out the specified branches in order.

Source code in src/conjuring/spells/git.py
def checkout(self, *branches: str) -> str:
    """Try checking out the specified branches in order."""
    for branch in branches:
        try:
            self.context.run(f"git checkout {branch}")
        except UnexpectedExit:  # noqa: PERF203
            pass
        else:
            return branch
    return ""

choose_local_branch(branch)

Choose a local branch.

Source code in src/conjuring/spells/git.py
def choose_local_branch(self, branch: str) -> str:
    """Choose a local branch."""
    return run_with_fzf(self.context, "git branch --list | rg -v develop | cut -b 3-", query=branch)

current_branch()

Return the current branch name.

Source code in src/conjuring/spells/git.py
def current_branch(self) -> str:
    """Return the current branch name."""
    return run_stdout(self.context, "git branch --show-current")

default_branch()

Return the default branch name (master/main/develop/development).

Source code in src/conjuring/spells/git.py
def default_branch(self) -> str:
    """Return the default branch name (master/main/develop/development)."""
    return run_stdout(
        self.context,
        "git branch -a | rg -o -e /master -e /develop.+ -e /main | sort -u | cut -b 2- | head -1",
    )

PrefixBranch dataclass

Tuple of prefix and branch name.

Source code in src/conjuring/spells/git.py
@dataclass(frozen=True)
class PrefixBranch:
    """Tuple of prefix and branch name."""

    prefix: str
    branch: str

body(c, prefix=False, original_order=False)

Prepare a commit body to be used on pull requests and squashed commits.

Source code in src/conjuring/spells/git.py
@task(
    help={
        "prefix": "Keep the Conventional Commits prefix",
        "original_order": "Don't sort bullets, keep them in original order",
    },
)
def body(c: Context, prefix: bool = False, original_order: bool = False) -> None:
    """Prepare a commit body to be used on pull requests and squashed commits."""
    default_branch = set_default_branch(c)
    bullets = []
    for line in run_lines(c, f"git log {default_branch}..", "--format=%s%n%b"):
        clean = line.strip(" -")
        if (
            "Merge branch" in clean
            or "Merge remote-tracking branch" in clean
            or "Revert " in clean
            or "This reverts" in clean
            or not clean
        ):
            continue

        # Remove Jira ticket with regex
        clean = REGEX_JIRA.sub("", clean).replace("()", "").replace("[]", "").strip(" -")

        # Split on the Conventional Commit prefix
        if not prefix and ":" in clean:
            clean = clean.split(":", 1)[1].strip()

        bullets.append(f"- {clean}")

    results = bullets if original_order else sorted(set(bullets))
    typer.echo("\n".join(results))

changes_since_tag(c, tag='', files=False, verbose=False, by_author=False)

Display changes (commits or files) since the last tag (or a chosen tag).

Source code in src/conjuring/spells/git.py
@task(
    help={
        "tag": "Name of the tag to compare to (default: last created tag)",
        "files": "Display files instead of commits (default: false)",
        "verbose": "Files: display changes/insertions/deletion."
        " Commits: display the full commit message, author... (default: False)",
        "by_author": "Group commits by author. Doesn't work with --files or --verbose. (default: False)",
    },
)
def changes_since_tag(
    c: Context,
    tag: str = "",
    files: bool = False,
    verbose: bool = False,
    by_author: bool = False,
) -> None:
    """Display changes (commits or files) since the last tag (or a chosen tag)."""
    if files:
        which_tag = tag or run_stdout(c, "git tag --list --sort -creatordate | head -1", hide=False, dry=False)
        default_branch = set_default_branch(c)
        option = "" if verbose else " --name-only"
        c.run(f"git diff --stat {which_tag} origin/{default_branch}{option}")
    else:
        which_tag = tag or "$(git describe --tags --abbrev=0)"
        option = " --format='%aN|%s' | sort -u" if by_author else "" if verbose else " --oneline"
        cmd = f"git log {which_tag}..HEAD{option}"
        if by_author:
            commits_by_author = defaultdict(list)
            for line in run_lines(c, cmd):
                author, commit = line.split("|")
                commits_by_author[author].append(commit)
            for author, commits in commits_by_author.items():
                print(f"\n{author}:")  # noqa: T201
                for commit in commits:
                    print(f"  {commit}")  # noqa: T201
        else:
            c.run(cmd)

extract_subtree(c, new_project_dir, reset=False, keep=False)

Extract files from subdirectories of the current Git repo to another repo, using git subtree.

The files will be moved to the root of the new repo.

Solutions adapted from: - https://serebrov.github.io/html/2021-09-13-git-move-history-to-another-repository.html - https://stackoverflow.com/questions/25574407/git-subtree-split-two-directories/58253979#58253979

Source code in src/conjuring/spells/git.py
@task(
    help={
        "new_project_dir": "Dir of the project to be created. The dir might exist or not",
        "reset": "Remove the new dir and start over",
        "keep": "Keep branches and remote after the extracting is done",
    },
)
def extract_subtree(c: Context, new_project_dir: str, reset: bool = False, keep: bool = False) -> None:
    """Extract files from subdirectories of the current Git repo to another repo, using git subtree.

    The files will be moved to the root of the new repo.

    Solutions adapted from:
    - https://serebrov.github.io/html/2021-09-13-git-move-history-to-another-repository.html
    - https://stackoverflow.com/questions/25574407/git-subtree-split-two-directories/58253979#58253979
    """
    new_project_path: Path = Path(new_project_dir).expanduser().absolute()
    if reset:
        c.run(f"rm -rf {new_project_path}")

    new_project_path.mkdir(parents=False, exist_ok=True)
    old_project_path = Path.cwd()

    all_files = set(run_lines(c, Git.SHOW_ALL_FILE_HISTORY, dry=False))
    chosen_files = set(
        run_with_fzf(
            c,
            Git.SHOW_ALL_FILE_HISTORY,
            dry=False,
            header="Use TAB to choose the files you want to KEEP",
            multi=True,
            preview="test -f {} && head -20 {} || echo FILE NOT FOUND, IT EXISTS ONLY IN GIT HISTORY",
        ),
    )
    sub_dirs = {part.rsplit("/", 1)[0] for part in chosen_files}
    obliterate = set(all_files.difference(chosen_files))

    first_date = run_stdout(c, 'git log --format="%cI" --root | sort -u | head -1')

    prefixes: list[str] = []
    for sub_dir in sorted(sub_dirs):
        absolute_subdir = Path(sub_dir).expanduser().absolute()
        # Add slash to the end
        prefixes.append(str(absolute_subdir.relative_to(Path.cwd())).rstrip("/") + "/")

    with c.cd(new_project_dir):
        run_multiple(
            c,
            "git init",
            "touch README.md",
            "git add README.md",
            f'git commit -m "chore: first commit" --date {first_date}',
            f"git remote add -f upstream {old_project_path}",
            "git checkout -b upstream_master upstream/master",
            pty=False,
        )
        pairs: set[PrefixBranch] = set()
        for prefix in prefixes:
            if not Path(prefix).exists():
                print_error(f"Skipping non-existent prefix {prefix}...")
                continue

            clean = prefix.strip(" /").replace("/", "_")
            branch = f"upstream_subtree_{clean}"
            local_obliterate = {f[len(prefix) :] for f in obliterate if f.startswith(prefix)}
            pairs.add(PrefixBranch(prefix, branch))

            run_multiple(
                c,
                "git checkout upstream_master",
                f"git subtree split --prefix={prefix} -b {branch}",
                f"git checkout {branch}",
                "git obliterate " + " ".join(sorted(local_obliterate)) if obliterate else "",
                "git checkout master",
                # TODO: fix: deal with files that have the same name in different subdirs
                #  The files are merged in the root, without prefix.
                #  What happens if a file has the same name in multiple subdirs? e.g.: bin/file.py and src/file.py
                f"git merge {branch} --allow-unrelated-histories -m 'refactor: merge subtree {prefix}'",
            )

        if obliterate:
            c.run("git obliterate " + " ".join(sorted(obliterate)))
        if not keep:
            run_multiple(
                c,
                "git branch -D upstream_master",
                *[f"git branch -D {pair.branch}" for pair in pairs],
                "git remote remove upstream",
            )
        history(c, full=True)
    print_error("Don't forget to switch to the new repo:", f"  cd {new_project_dir}", join_nl=True)
    print_success(
        "Next steps:",
        "- Run 'git obliterate' manually for files in Git history (listed above) you still want to remove",
        "- Run 'invoke git.rewrite' to fix dates and authors",
        "- Create a new empty repo on https://github.com/new without initializing it (no README/.gitignore/license)",
        "- Follow the instructions to add a remote (from 'push an existing repository from the command line')",
        "- Push files to the new repo with:",
        "  git push -u origin master",
        join_nl=True,
    )

global_config() cached

Global Git configuration.

Source code in src/conjuring/spells/git.py
@lru_cache
def global_config() -> ConfigParser:
    """Global Git configuration."""
    config = ConfigParser()
    config.read(GLOBAL_GITCONFIG_PATH)
    return config

history(c, full=False, files=False, author=False, dates=False)

Grep the whole Git log and display information.

Source code in src/conjuring/spells/git.py
@task(
    help={
        "full": "Display all info: files, authors, dates",
        "files": "Display all files in Git history, even the ones that were deleted and don't exist anymore",
        "author": "Display authors",
        "dates": "Display committer and author dates in different colors",
    },
)
def history(c: Context, full: bool = False, files: bool = False, author: bool = False, dates: bool = False) -> None:
    """Grep the whole Git log and display information."""
    option_chosen = False
    if full:
        option_chosen = True
        files = author = dates = True
    if files:
        option_chosen = True
        c.run(Git.SHOW_ALL_FILE_HISTORY)
    if author:
        option_chosen = True
        c.run("git log --name-only | rg author | sort -u")
    if dates:
        option_chosen = True
        header = True
        for line in run_lines(c, 'git log --format="%H|%cI|%aI|%GK|%s"', hide=False):
            if header:
                print_success("Green = dates are equal")
                print_error("Red = dates are different")
                typer.echo(
                    "Commit                                   Committer Date            "
                    "Author Date               GPG key          Subject",
                )
                header = False

            fields = line.split("|")
            committer_date = fields[1]
            author_date = fields[2]
            func = print_success if committer_date == author_date else print_error
            func(*fields)
    if not option_chosen:
        msg = "Choose at least one option: --full, --files, --author, --dates"
        raise Exit(msg, 1)

merge_default(c, remote=False, update=True, push=True, rebase=False)

Merge the default branch of the repo. Also set it with "git config", if not already set.

Source code in src/conjuring/spells/git.py
@task(
    help={
        "remote": "List remote branches (default: False)",
        "update": "Update the repo before merging (default: True)",
        "push": "Push the merge to the remote (default: True)",
        "rebase": "Rebase the default branch before merging (default: False)",
    },
)
def merge_default(
    c: Context,
    remote: bool = False,
    update: bool = True,
    push: bool = True,
    rebase: bool = False,
) -> None:
    """Merge the default branch of the repo. Also set it with "git config", if not already set."""
    default_branch = set_default_branch(c, remote)

    if update:
        tidy_up(c)
    which_verb = "rebase" if rebase else "merge"
    run_command(c, f"git {which_verb}", f"origin/{default_branch}")
    if push:
        force_option = "--force-with-lease" if rebase else ""
        run_command(c, "git push", force_option)

rewrite(c, commit='--root', gpg=True, author=True)

Rewrite a range of commits, signing with GPG and setting the author.

https://git-scm.com/docs/git-commit https://git-scm.com/docs/git-rebase

Source code in src/conjuring/spells/git.py
@task(
    help={
        "commit": "Base commit to be used for the range (default: --root)",
        "gpg": "Sign the commit (default: True)",
        "author": "Set the current author (from 'git config') on the commit range",
    },
)
def rewrite(c: Context, commit: str = "--root", gpg: bool = True, author: bool = True) -> None:
    """Rewrite a range of commits, signing with GPG and setting the author.

    https://git-scm.com/docs/git-commit
    https://git-scm.com/docs/git-rebase
    """
    gpg_flag = " --gpg-sign" if gpg else " --no-gpg-sign"

    author_flag = ""
    if author:
        name = run_stdout(c, "git config user.name", dry=False)
        email = run_stdout(c, "git config user.email", dry=False)
        author_flag = f' --author "{name} <{email}>"'

    c.run(f'git log --format="%H %cI %aI %s" {commit} > $TMPDIR/rebase_sign_hashlist')
    c.run(
        "git rebase --committer-date-is-author-date --exec 'GIT_COMMITTER_DATE="
        '$(fgrep -m 1 "$(git log -1 --format="%aI %s" $GIT_COMMIT)" $TMPDIR/rebase_sign_hashlist'
        f' | cut -d" " -f3) git commit --amend --no-edit -n{author_flag}{gpg_flag}\' -i {commit}',
    )
    history(c, dates=True)
    typer.echo()
    typer.echo("NOTE: If commits were modified during the rebase above, their committer date will be the current date")
    typer.echo("Rebase again with this command, without changing any commit, and all dates should be green")

set_default_branch(c, remote=False)

Set the default branch config on the repo, if not configured yet.

Source code in src/conjuring/spells/git.py
def set_default_branch(c: Context, remote: bool = False) -> str:
    """Set the default branch config on the repo, if not configured yet."""
    cmd_read_default_branch = "git config git-extras.default-branch"
    default_branch = run_stdout(c, cmd_read_default_branch, warn=True, dry=False)
    if not default_branch:
        default_branch = run_with_fzf(
            c,
            "git branch --list",
            "--all" if remote else "",
            "| cut -b 3- | grep -v HEAD | sed -E 's#remotes/[^/]+/##g' | sort -u",
        )
        run_command(c, cmd_read_default_branch, default_branch)
        run_command(c, "git config init.defaultBranch", default_branch)
        run_command(c, "git config --list | rg default.*branch")
    return default_branch

switch_url_to(c, remote='origin', https=False)

Set an SSH or HTTPS URL for a remote.

Source code in src/conjuring/spells/git.py
@task
def switch_url_to(c: Context, remote: str = "origin", https: bool = False) -> None:
    """Set an SSH or HTTPS URL for a remote."""
    regex = r"'git@(.+\.com):(.+/.+)\.git\s'" if https else r"'/([^/]+\.com)/([^/]+/.+)\s\('"
    replace = "'$1/$2'" if https else "'$1:$2'"

    result = c.run(f"git remote -v | rg {remote} | head -1 | rg -o {regex} -r {replace}", warn=True, pty=False)
    match = result.stdout.strip()
    if not match:
        typer.echo(f"{Color.BOLD_RED.value}Match not found{Color.NONE.value}")
    else:
        repo = f"https://{match}" if https else f"git@{match}"
        if not repo.endswith(".git"):
            repo += ".git"
        c.run(f"git remote set-url {remote} {repo}")

    c.run("git remote -v")

tidy_up(c)

Prune remotes, update all branches of the repo, delete merged/squashed branches.

Source code in src/conjuring/spells/git.py
@task
def tidy_up(c: Context) -> None:
    """Prune remotes, update all branches of the repo, delete merged/squashed branches."""
    c.run("gitup .")
    c.run("git delete-merged-branches")

    # warn=True is needed; apparently, this command fails when there is no branch, and execution is stopped
    c.run("git delete-squashed-branches", warn=True)

    for remote in run_lines(c, "git remote", dry=False):
        c.run(f"git remote prune {remote}")

update_all(c, group='')

Run gita super to update and clean branches.

Source code in src/conjuring/spells/git.py
@task(klass=MagicTask)
def update_all(c: Context, group: str = "") -> None:
    """Run gita super to update and clean branches."""
    parts = ["gita", "super"]
    if group:
        parts.append(group)
    gita_super = " ".join(parts)
    c.run(f"{gita_super} up && {gita_super} delete-merged-branches")

watch(c)

Watch a build on GitHub Actions, then open a pull request or repo after the build is over.

Source code in src/conjuring/spells/git.py
@task()
def watch(c: Context) -> None:
    """Watch a build on GitHub Actions, then open a pull request or repo after the build is over."""
    current_branch = Git(c).current_branch()
    print_success(f"Current branch = {current_branch}")

    c.run("gh run watch", warn=True)
    out = c.run(f"gh pr view {current_branch} --web", warn=True).stdout.strip()
    if "no pull requests found for branch" in out:
        c.run("gh repo view --web")

conjuring.spells.jrnl

Query tags and entries with the jrnl note-taking tool.

edit_last(c, journal='')

Edit the last jrnl entry.

Source code in src/conjuring/spells/jrnl.py
@task
def edit_last(c: Context, journal: str = "") -> None:
    """Edit the last jrnl entry."""
    cmd = ["jrnl"]
    if journal:
        cmd.append(journal)
    cmd.append("-1 --edit")
    c.run(" ".join(cmd))

query(c, n=0, contains='', edit=False, fancy=False, short=False, journal='')

Query jrnl entries.

Source code in src/conjuring/spells/jrnl.py
@task
def query(  # noqa: PLR0913
    c: Context,
    n: int = 0,
    contains: str = "",
    edit: bool = False,
    fancy: bool = False,
    short: bool = False,
    journal: str = "",
) -> None:
    """Query jrnl entries."""
    format_ = "pretty"
    if fancy:
        format_ = "fancy"
    elif short:
        format_ = "short"

    cmd = ["jrnl"]
    if journal:
        cmd.append(journal)
    if n:
        cmd.append(f"-n {n}")
    cmd.append(f"--format {format_}")
    if contains:
        cmd.append(f"-contains {contains}")
    if edit:
        cmd.append("--edit")
    c.run(" ".join(cmd))

tags(c, sort=False, rg='', journal='')

Query jrnl tags.

Source code in src/conjuring/spells/jrnl.py
@task
def tags(c: Context, sort: bool = False, rg: str = "", journal: str = "") -> None:
    """Query jrnl tags."""
    cmd = ["jrnl"]
    if journal:
        cmd.append(journal)
    cmd.append("--tags")
    if sort:
        cmd.append("| sort -u")
    if rg:
        cmd.append(f"| rg {rg}")
    c.run(" ".join(cmd))

conjuring.spells.k8s

Kubernetes: get pods, show variables from config maps, validate score and more.

Kubectl dataclass

Kubectl commands.

Source code in src/conjuring/spells/k8s.py
@dataclass
class Kubectl:
    """Kubectl commands."""

    context: Context

    def choose_apps(self, partial_name: str | None = None, *, multi: bool = False) -> list[str]:
        """Select apps from Kubernetes deployments, using a partial app name and fzf.

        Use the current dir as the app name if no partial app name is provided.
        """
        if not partial_name:
            return [Path.cwd().name]

        return cast(
            list[str],
            run_with_fzf(
                self.context,
                """kubectl get deployments.apps -o jsonpath='{range .items[*]}{.metadata.name}{"\\n"}{end}'""",
                query=partial_name or "",
                multi=multi,
            ),
        )

    @staticmethod
    def _app_selector(apps: list[str]) -> str:
        """Return the app selector for one or more apps."""
        sorted_unique_apps = sorted(set(apps))
        if len(sorted_unique_apps) == 1:
            return f"-l app={sorted_unique_apps[0]}"
        selector = f" in ({', '.join(sorted_unique_apps)})"
        return f"-l 'app{selector}'"

    def cmd_get(self, resource: str, apps: list[str]) -> str:
        """Return the kubectl get command for one or more apps."""
        return f"kubectl get {resource} {self._app_selector(apps)}"

    def run_get(self, resource: str, apps: list[str]) -> Result:
        """Run the kubectl get command for one or more apps."""
        return run_command(self.context, self.cmd_get(resource, apps))

choose_apps(partial_name=None, *, multi=False)

Select apps from Kubernetes deployments, using a partial app name and fzf.

Use the current dir as the app name if no partial app name is provided.

Source code in src/conjuring/spells/k8s.py
def choose_apps(self, partial_name: str | None = None, *, multi: bool = False) -> list[str]:
    """Select apps from Kubernetes deployments, using a partial app name and fzf.

    Use the current dir as the app name if no partial app name is provided.
    """
    if not partial_name:
        return [Path.cwd().name]

    return cast(
        list[str],
        run_with_fzf(
            self.context,
            """kubectl get deployments.apps -o jsonpath='{range .items[*]}{.metadata.name}{"\\n"}{end}'""",
            query=partial_name or "",
            multi=multi,
        ),
    )

cmd_get(resource, apps)

Return the kubectl get command for one or more apps.

Source code in src/conjuring/spells/k8s.py
def cmd_get(self, resource: str, apps: list[str]) -> str:
    """Return the kubectl get command for one or more apps."""
    return f"kubectl get {resource} {self._app_selector(apps)}"

run_get(resource, apps)

Run the kubectl get command for one or more apps.

Source code in src/conjuring/spells/k8s.py
def run_get(self, resource: str, apps: list[str]) -> Result:
    """Run the kubectl get command for one or more apps."""
    return run_command(self.context, self.cmd_get(resource, apps))

config_map(c, app, rg='')

Show the config map for an app.

Source code in src/conjuring/spells/k8s.py
@task(help={"rg": "Filter results with rg"})
def config_map(c: Context, app: str, rg: str = "") -> None:
    """Show the config map for an app."""
    chosen_app = Kubectl(c).choose_apps(app)
    run_command(
        c,
        f"kubectl get deployment/{chosen_app} -o json",
        "| jq -r .spec.template.spec.containers[].envFrom[].configMapRef.name",
        "| rg -v null | xargs -I % kubectl get configmap/% -o json | jq -r .data",
        f"| rg {rg}" if rg else "",
    )

exec_(c, app='')

Exec into the first pod found for the chosen app.

Source code in src/conjuring/spells/k8s.py
@task(name="exec")
def exec_(c: Context, app: str = "") -> None:
    """Exec into the first pod found for the chosen app."""
    kubectl = Kubectl(c)
    chosen_app = kubectl.choose_apps(app)
    chosen_pod = run_with_fzf(
        c,
        kubectl.cmd_get("pods", chosen_app),
        "--no-headers",
        "-o custom-columns=NAME:.metadata.name",
    )
    run_command(c, f"kubectl exec -it {chosen_pod} -- bash")

pods(c, app='', replica_set=False)

Show the pods and replica sets for an app.

Source code in src/conjuring/spells/k8s.py
@task(
    help={
        "app": "Show the pods for an app; if not provided, the current directory name is used.",
        "replica_set": "Show the replica sets for an app",
    },
)
def pods(c: Context, app: str = "", replica_set: bool = False) -> None:
    """Show the pods and replica sets for an app."""
    kubectl = Kubectl(c)
    chosen_apps = kubectl.choose_apps(app, multi=True)
    kubectl.run_get("pods", chosen_apps)

    if replica_set:
        replica_set_names = run_lines(
            c,
            kubectl.cmd_get("pods", chosen_apps),
            """-o jsonpath='{range .items[*]}{.metadata.ownerReferences[0].name}{"\\n"}{end}'""",
            "| sort -u",
        )
        for name in replica_set_names:
            run_command(c, f"kubectl get replicaset {name}")

validate_score(c)

Validate and score files that were changed from the master branch.

Source code in src/conjuring/spells/k8s.py
@task()
def validate_score(c: Context) -> None:
    """Validate and score files that were changed from the master branch."""
    # TODO: handle branches named "main"
    # Continue even if there are errors
    c.run("git diff master.. --name-only | xargs kubeval", warn=True)
    c.run("git diff master.. --name-only | xargs kubectl score")

conjuring.spells.media

Media files: remove empty dirs, clean up picture dirs, download YouTube videos, transcribe audio.

CompareDirsAction

Bases: Enum

Actions to take when comparing two directories.

Source code in src/conjuring/spells/media.py
class CompareDirsAction(Enum):
    """Actions to take when comparing two directories."""

    # keep-sorted start
    DELETE_IDENTICAL = "identical_deleted"
    DIFF_FAILED = "diff_failed"
    DO_NOTHING = None
    MOVE_IDENTICAL = "identical"
    MOVE_NOT_FOUND = "not_found"

cleanup(c, browse=False)

Cleanup pictures.

Source code in src/conjuring/spells/media.py
@task
def cleanup(c: Context, browse: bool = False) -> None:
    """Cleanup pictures."""
    c.run(f"fd -H -0 -tf -i {DOT_DS_STORE} | xargs -0 rm -v")
    c.run(f"fd -H -0 -tf -i {DOT_NOMEDIA} | xargs -0 rm -v")
    c.run("find . -mindepth 1 -type d -empty -print -delete")

    # Unhide Picasa originals dir
    for line in c.run("fd -H -t d .picasaoriginals", pty=False).stdout.splitlines():
        original_dir = Path(line)
        c.run(f"mv {original_dir} {original_dir.parent}/Picasa_Originals")

    # Keep the original dir as the main dir and rename parent dir to "_Copy"
    for line in c.run("fd -t d originals", pty=False).stdout.splitlines():
        original_dir = Path(line)
        c.run(f"mv {original_dir} {original_dir.parent}_Temp")
        c.run(f"mv {original_dir.parent} {original_dir.parent}_Copy")
        c.run(f"mv {original_dir.parent}_Temp {original_dir.parent}")

    # Merge the copy dir with the main one
    for line in run_command(c, "fd -a -uu -t d --color never _copy", str(ONEDRIVE_PICTURES_DIR)).stdout.splitlines():
        copy_dir = Path(line)
        original_dir = Path(line.replace("_Copy", ""))
        if original_dir.exists():
            if browse:
                c.run(f"open '{original_dir}'")
            c.run(f"merge-dirs '{original_dir}' '{copy_dir}'")
        else:
            c.run(f"mv '{copy_dir}' '{original_dir}'")

    # List dirs with _Copy files
    copy_dirs = set()
    for line in run_command(
        c,
        "fd -H -t f --color never _copy",
        str(ONEDRIVE_PICTURES_DIR),
        hide=True,
    ).stdout.splitlines():
        copy_dirs.add(Path(line).parent)

    for dir_ in sorted(copy_dirs):
        typer.echo(dir_)

compare_dirs(c, from_dir, to_dir, count=MAX_COUNT, size=MAX_SIZE, delete=False, move=False, wildcard_search=False)

Compare files in two directories. Stops when it reaches max count or size.

Source code in src/conjuring/spells/media.py
@task(
    help={
        "from_dir": "Source root directory to compare from",
        "to_dir": "Destination root directory to compare to",
        "count": f"Max number of files to compare. Default: {MAX_COUNT}",
        "size": f"Max size of files to compare. Default: {naturalsize(MAX_SIZE)}",
        "delete": "Delete identical files from the source dir",
        "move": "Move identical files from the source dir to the output dir",
        "wildcard_search": "If not found with the exact name,"
        " search the destination dir using the file name with wildcards",
    },
)
def compare_dirs(  # noqa: PLR0913
    c: Context,
    from_dir: str,
    to_dir: str,
    count: int = MAX_COUNT,
    size: int = MAX_SIZE,
    delete: bool = False,
    move: bool = False,
    wildcard_search: bool = False,
) -> None:
    """Compare files in two directories. Stops when it reaches max count or size."""
    if delete and move:
        print_error("Choose either --delete or --move, not both")
        return
    dry = not (delete or move) or c.config.run.dry

    # Use a slug to compare multiple dirs at the same time
    abs_from_dir = Path(from_dir).expanduser().absolute()
    slug = str(abs_from_dir.relative_to(Path.home())).replace(os.sep, "-")
    output_dir = DOWNLOADS_DIR / "compare-dirs-output" / slug

    print_success("Output dir:", str(output_dir), "/ Stop file or dir:", str(STOP_FILE_OR_DIR))

    current_count = 0
    current_size = 0

    max_results = f"--max-results {count}" if count else ""
    lines = run_lines(c, "fd -t f -u", max_results, ".", str(abs_from_dir), "| sort", dry=False)

    with tqdm(lines) as pbar:
        for line in pbar:
            if check_stop_file():
                break

            source_file: Path = Path(line).absolute()
            if source_file.name == DOT_DS_STORE:
                continue

            current_count += 1
            file_size = source_file.stat().st_size
            current_size += file_size

            partial_source_path = source_file.relative_to(abs_from_dir)
            destination_file: Path = to_dir / partial_source_path

            action, file_description = _determine_action(
                c,
                source_file,
                to_dir,
                destination_file,
                delete,
                move,
                wildcard_search,
            )

            pbar.set_postfix(count=current_count, size=naturalsize(file_size), total_size=naturalsize(current_size))

            # Check the file size after running the diff, so remote on-demand files are downloaded locally
            if size and current_size > size:
                print_error(
                    f"Current size ({naturalsize(current_size)})",
                    f"exceeded --size ({naturalsize(size)}), stopping",
                    dry=dry,
                )
                break

            if action == CompareDirsAction.DO_NOTHING:
                print_normal(file_description, dry=dry)
                continue

            _execute(
                action,
                source_file,
                file_description,
                output=output_dir / action.value / partial_source_path,
                dry=dry,
            )

empty_dirs(c, dir_, delete=False, fd=True)

Remove some hidden files first, then remove empty dirs.

The ending slash is needed to search OneDrive, now that its behaviour changed in macOS Monterey.

Source code in src/conjuring/spells/media.py
@task(
    help={
        "dir": "Directory to clean up. Default: current dir",
        "fd": "Use https://github.com/sharkdp/fd instead of 'find'",
        "delete": "Delete the actual files (dotfiles are always deleted). Default: False",
    },
    iterable=["dir_"],
)
def empty_dirs(c: Context, dir_: list[str | Path], delete: bool = False, fd: bool = True) -> None:
    """Remove some hidden files first, then remove empty dirs.

    The ending slash is needed to search OneDrive, now that its behaviour changed in macOS Monterey.
    """
    if not dir_:
        dir_ = [Path.cwd()]

    dirs = list({str(Path(d).expanduser().absolute()) for d in dir_})
    xargs = "xargs -0 -n 1 rm -v"
    for hidden_file in [DOT_DS_STORE, DOT_NOMEDIA]:
        if fd:
            c.run(f"fd -uu -0 -tf -i {hidden_file} {'/ '.join(dirs)}/ | {xargs}")
        else:
            for one_dir in dirs:
                c.run(f"find {one_dir}/ -type f -iname {hidden_file} -print0 | {xargs}")

    f_option = " ".join([f"-f {d}/" for d in dirs[:-1]])
    delete_flag = "-delete" if delete else ""
    run_command(c, "find", f_option, f"{dirs[-1]}/ -mindepth 1 -type d -empty -print", delete_flag)
    if not delete:
        print_warning("Run with --delete to actually delete the files", dry=True)

invidious(c)

Parse Invidious instances to be used in the Chrome plugin random instance pool.

"Invidious random instance pool (comma-separated)" text field config from Privacy Redirect.

Source code in src/conjuring/spells/media.py
@task
def invidious(c: Context) -> None:
    """Parse Invidious instances to be used in the Chrome plugin random instance pool.

    "Invidious random instance pool (comma-separated)" text field config
    from [Privacy Redirect](https://chromewebstore.google.com/detail/privacy-redirect/pmcmeagblkinmogikoikkdjiligflglb).
    """
    instances = run_lines(c, "curl -s https://api.invidious.io/instances.json | jq -rs '.[].[][1].uri'")
    filtered = ",".join([i for i in instances if not i.endswith(".onion") and not i.endswith(".i2p")])
    run_command(c, f"echo {filtered} | pbcopy")

slideshow(c, start_at='')

Show pictures in the current dir with feh.

Source code in src/conjuring/spells/media.py
@task
def slideshow(c: Context, start_at: str = "") -> None:
    """Show pictures in the current dir with feh."""
    start_at_option = f"--start-at {start_at}" if start_at else ""
    run_command(c, "feh -r -. -g 1790x1070 -B black --caption-path .", start_at_option)

unzip_tree(c, dir_, count=1, delete=False)

Unzip .tar.gz files in a directory tree.

Source code in src/conjuring/spells/media.py
@task(
    help={
        "dir": "Root directory to unzip. Default: current dir",
        "count": "Max number of files to unzip. Default: 1",
        "delete": "Delete the .tar.gz file after unzipping with success. Default: False",
    },
    iterable=["dir_"],
)
def unzip_tree(c: Context, dir_: list[str | Path], count: int = 1, delete: bool = False) -> None:
    """Unzip .tar.gz files in a directory tree."""
    if not dir_:
        dir_ = [Path.cwd()]

    for one_dir in dir_:
        for tar_gz_path in iter_path_with_progress(
            c,
            "-t f .tar.gz",
            str(one_dir),
            "| sort --ignore-case",
            max_count=count,
        ):
            result = run_command(c, f"gtar -xzf '{tar_gz_path}' -C '{tar_gz_path.parent}'")
            if result.ok and delete:
                run_command(c, f"rm '{tar_gz_path}'")

whisper(c, dir_)

Transcribe multiple audio file that haven't been transcribed yet, using whisper.

Source code in src/conjuring/spells/media.py
@task(help={"dir_": "Directory with audios to transcribe"})
def whisper(c: Context, dir_: str | Path) -> None:
    """Transcribe multiple audio file that haven't been transcribed yet, using whisper."""
    dir_ = Path(dir_).expanduser()
    audios: list[Path] = []
    for extension in AUDIO_EXTENSIONS:
        audios.extend(dir_.glob(f"*.{extension}"))
    for file in audios:
        transcript_file = file.with_suffix(".txt")
        if not transcript_file.exists():
            c.run(f"whisper --language pt -f txt '{file}' --output_dir '{file.parent}'")
            continue
        c.run(f"open '{transcript_file}'")

youtube_dl(c, url, min_height=360, download_archive_path='')

Download video URLs, try different low-res formats until it finds one.

Source code in src/conjuring/spells/media.py
@task
def youtube_dl(c: Context, url: str, min_height: int = 360, download_archive_path: str = "") -> None:
    """Download video URLs, try different low-res formats until it finds one."""
    download_archive_path = download_archive_path or os.environ.get("YOUTUBE_DL_DOWNLOAD_ARCHIVE_PATH", "")
    archive_option = f"--download-archive {download_archive_path!r}" if download_archive_path else ""

    all_heights = [h for h in [240, 360, 480, 0] if h >= min_height or h == 0]
    for height in all_heights:
        # https://github.com/ytdl-org/youtube-dl#format-selection-examples
        # Download best format available but no better than the chosen height
        fmt = f"-f 'bestvideo[height<={height}]+bestaudio/best[height<={height}]'" if height else ""

        result = run_command(
            c,
            "youtube-dl --ignore-errors --restrict-filenames",
            # "--get-title --get-id",
            # "--get-thumbnail --get-description --get-duration --get-filename",
            # "--get-format",
            archive_option,
            fmt,
            url,
            warn=True,
        )
        if result.ok or "Unsupported URL:" in result.stdout:
            break

zip_tree(c, dir_, count=1, depth=5, delete=False)

Zip files in a directory tree, creating a .tar.gz file.

Source code in src/conjuring/spells/media.py
@task(
    help={
        "dir": "Root directory to zip. Default: current dir",
        "count": "Max number of sub dirs to zip. Default: 1",
        "delete": "Delete the directory after zipping with success. Default: False",
    },
    iterable=["dir_"],
)
def zip_tree(c: Context, dir_: list[str | Path], count: int = 1, depth: int = 5, delete: bool = False) -> None:
    """Zip files in a directory tree, creating a .tar.gz file."""
    if not dir_:
        dir_ = [Path.cwd()]

    for raw_dir in dir_:
        path_dir = Path(raw_dir)
        for path_to_zip in iter_path_with_progress(
            c,
            "-t f --exclude '*.tar.gz' .",
            str(raw_dir),
            "--exec echo {//} | sort --unique --ignore-case",
            max_count=count,
            reverse_depth=depth,
        ):
            if path_to_zip == path_dir:
                continue

            tar_gz_file = unique_file_name(path_to_zip.with_suffix(".tar.gz"))
            with c.cd(path_to_zip.parent):
                run_command(
                    c,
                    "gtar",
                    "--remove-files" if delete else "",
                    "--exclude='*.tar.gz'",
                    f'-czf "{tar_gz_file.name}" -C . "./{path_to_zip.name}"',
                    warn=True,
                )

conjuring.spells.mkdocs

MkDocs spells: install, build, deploy to GitHub, serve locally.

browse(c)

Open the static HTML docs website on your browser.

Source code in src/conjuring/spells/mkdocs.py
@task
def browse(c: Context) -> None:
    """Open the static HTML docs website on your browser."""
    c.run("open http://127.0.0.1:8000/")

build(c)

Build docs.

Source code in src/conjuring/spells/mkdocs.py
@task
def build(c: Context) -> None:
    """Build docs."""
    c.run("mkdocs build --strict --verbose --site-dir site")

deploy(c)

Deploy docs to GitHub pages.

Source code in src/conjuring/spells/mkdocs.py
@task(pre=[build])
def deploy(c: Context) -> None:
    """Deploy docs to GitHub pages."""
    c.run("mkdocs gh-deploy")

install(c, force=False)

Install MkDocs globally with the Material plugin. Upgrade if it already exists.

Source code in src/conjuring/spells/mkdocs.py
@task(help={"force": "Force re-installation of MkDocs."})
def install(c: Context, force: bool = False) -> None:
    """Install MkDocs globally with the Material plugin. Upgrade if it already exists."""
    upgrade = " || pipx upgrade mkdocs" if force else ""
    c.run(f"pipx install mkdocs{upgrade}", warn=True)
    for extension in EXTENSIONS:
        c.run(f"pipx inject mkdocs {extension}")

    # Inject the local project into the global MkDocs installation.
    if has_pyproject_toml():
        c.run("pipx inject mkdocs -e .")

serve(c)

Start the live-reloading server to test the docs locally.

Source code in src/conjuring/spells/mkdocs.py
@task(pre=[build])
def serve(c: Context) -> None:
    """Start the live-reloading server to test the docs locally."""
    c.run("mkdocs serve")

uninstall(c)

Uninstall MkDocs globally.

Source code in src/conjuring/spells/mkdocs.py
@task
def uninstall(c: Context) -> None:
    """Uninstall MkDocs globally."""
    c.run("pipx uninstall mkdocs")

conjuring.spells.mr

myrepos repository management tool: grep text in repos.

MyRepos dataclass

Find and interact with myrepos config files.

Source code in src/conjuring/spells/mr.py
@dataclass
class MyRepos:
    """Find and interact with myrepos config files."""

    context: Context

    def find_configs(self, partial_name: str, echo: bool = False) -> list[Path]:
        """Find config files in the current dir or dirs above."""
        lower_partial_name = partial_name.lower()
        glob_pattern = MRCONFIG_FILE if not lower_partial_name else f"{MRCONFIG_FILE}*{lower_partial_name}*"
        config_dir = self._find_dir_with_mrconfigs(glob_pattern)
        if not config_dir:
            msg = f"No {MRCONFIG_FILE}* file was found in {Path.cwd()} or its parents"
            raise FileNotFoundError(msg)

        if not lower_partial_name:
            return [config_dir / MRCONFIG_FILE]

        with self.context.cd(str(config_dir)):
            chosen = run_with_fzf(
                self.context,
                "ls -1",
                f"{MRCONFIG_FILE}*",
                query=lower_partial_name,
                multi=True,
                echo=echo,
                hide=not echo,
            )
        return sorted({config_dir / c for c in chosen})

    @staticmethod
    def _find_dir_with_mrconfigs(glob_pattern: str) -> Path | None:
        for dir_ in chain([Path.cwd()], Path.cwd().parents):
            for _ in dir_.glob(glob_pattern):
                # Exit loop on the first file found; fzf will handle the rest
                return dir_
        return None

find_configs(partial_name, echo=False)

Find config files in the current dir or dirs above.

Source code in src/conjuring/spells/mr.py
def find_configs(self, partial_name: str, echo: bool = False) -> list[Path]:
    """Find config files in the current dir or dirs above."""
    lower_partial_name = partial_name.lower()
    glob_pattern = MRCONFIG_FILE if not lower_partial_name else f"{MRCONFIG_FILE}*{lower_partial_name}*"
    config_dir = self._find_dir_with_mrconfigs(glob_pattern)
    if not config_dir:
        msg = f"No {MRCONFIG_FILE}* file was found in {Path.cwd()} or its parents"
        raise FileNotFoundError(msg)

    if not lower_partial_name:
        return [config_dir / MRCONFIG_FILE]

    with self.context.cd(str(config_dir)):
        chosen = run_with_fzf(
            self.context,
            "ls -1",
            f"{MRCONFIG_FILE}*",
            query=lower_partial_name,
            multi=True,
            echo=echo,
            hide=not echo,
        )
    return sorted({config_dir / c for c in chosen})

grep(c, search_text, config='', echo=False)

Grep mr repositories with a search text and print the directories in which the text was found.

Needs mr to be preconfigured with files starting with the ".mrconfig" prefix.

Source code in src/conjuring/spells/mr.py
@task(
    help={
        "config": f"Specific config file to use. Use fzf if multiple are found. Default: {MRCONFIG_FILE}",
        "echo": "Echo the commands being executed, for debugging purposes. Default: False",
    },
)
def grep(c: Context, search_text: str, config: str = "", echo: bool = False) -> None:
    """Grep mr repositories with a search text and print the directories in which the text was found.

    Needs mr to be preconfigured with files starting with the ".mrconfig" prefix.
    """
    for chosen in MyRepos(c).find_configs(config, echo=echo):
        # For some reason, using run_command() prints a "\r" char at the end of each line;
        # the solution is to get output as a string and use print().
        output_without_linefeed = run_stdout(
            c,
            "mr -c",
            str(chosen),
            "-m grep",
            search_text,
            "| rg --color=never 'mr grep: (.+)$' --replace '$1'",
            echo=echo,
        )
        typer.echo(output_without_linefeed)

conjuring.spells.onedrive

OneDrive: list files with conflicts.

conflicts(c, dir_)

List files with conflicts.

Source code in src/conjuring/spells/onedrive.py
@task(
    help={"dir": "Directory; can be used multiple times. Default: current dir"},
    iterable=["dir_"],
)
def conflicts(c: Context, dir_: list[str | Path]) -> None:
    """List files with conflicts."""
    if not dir_:
        dir_ = [Path.cwd()]

    hostname = run_stdout(c, "hostname -s").strip()
    suffix = f"-{hostname}"
    for one_dir in list({str(Path(d).expanduser().absolute()) for d in dir_}):
        for line in run_lines(c, f"fd -t f {hostname} {one_dir} | sort"):
            duplicated = Path(line)
            original_name = duplicated.stem[: -len(suffix)]
            original = duplicated.with_stem(original_name)
            typer.echo(run_stdout(c, f"diff {duplicated} {original}", warn=True).strip())

force_downloads(c, dir_)

Force downloads of remote OneDrive files by reading them with a dummy "diff".

Source code in src/conjuring/spells/onedrive.py
@task(
    help={"dir": "Directory; can be used multiple times. Default: current dir"},
    iterable=["dir_"],
)
def force_downloads(c: Context, dir_: list[str | Path]) -> None:
    """Force downloads of remote OneDrive files by reading them with a dummy "diff"."""
    if not dir_:
        dir_ = [Path.cwd()]
    temp_file = Path().home() / "delete-me"
    temp_file.touch(exist_ok=True)
    for one_dir in dir_:
        c.run(f"fd -t f -0 . {one_dir} | sort -z | xargs -0 -n 1 diff {temp_file}")
    temp_file.unlink()

conjuring.spells.paperless

Paperless: maintenance, renamer, sanity, delete duplicates.

Document dataclass

A paperless document.

Source code in src/conjuring/spells/paperless.py
@dataclass
class Document:
    """A paperless document."""

    document_id: int
    title: str
    errors: list = field(default_factory=list, init=False)

OrphanFile dataclass

A paperless orphan file.

Source code in src/conjuring/spells/paperless.py
@dataclass
class OrphanFile:
    """A paperless orphan file."""

    source: Path
    destination: Path

    def __lt__(self, other: OrphanFile) -> bool:
        return self.source < other.source

delete_failed_duplicates(c, max_delete=100)

Delete records marked as duplicate but that cannot be downloaded. So the PDF files can be reimported.

Source code in src/conjuring/spells/paperless.py
@task
def delete_failed_duplicates(c: Context, max_delete: int = 100) -> None:
    """Delete records marked as duplicate but that cannot be downloaded. So the PDF files can be reimported."""
    session = requests.Session()
    session.headers.update({"authorization": f"token {paperless_token()}"})

    delete_count = 0
    req_tasks = session.get(f"{paperless_url()}/api/tasks/?format=json")
    for obj in req_tasks.json():
        if obj["status"] != "FAILURE":
            continue

        raw_line = obj["result"]
        if DUPLICATE_OF not in raw_line:
            print_error(f"Unknown error: {raw_line}")
            continue

        clean_line = raw_line.replace(" Not consuming ", "").replace(DUPLICATE_OF, "")
        first, second, duplicate_with_id = clean_line.split(":", maxsplit=2)
        if first != second:
            print_error(f"Files are different: {first=} / {second=}")
        match = REGEX_TITLE_WITH_ID.match(duplicate_with_id)
        if not match:
            print_error(f"Line doesn't match regex {duplicate_with_id=}", clean_line)
            continue

        data = match.groupdict()
        document_id = data["id"]

        api_document_url = f"{paperless_url()}/api/documents/{document_id}/"
        document_url = f"{paperless_url()}/documents/{document_id}"
        url = f"{api_document_url}download/"
        req_download = session.head(url)
        if req_download.status_code != HTTPStatus.NOT_FOUND:
            print_success(document_url, f"Document exists {req_download.status_code=}", clean_line)
            continue

        req_document = session.head(api_document_url)
        if req_document.status_code == HTTPStatus.NOT_FOUND:
            print_warning(document_url, "Document already deleted before", clean_line)
            continue

        req_delete = session.delete(api_document_url)
        if req_delete.status_code == HTTPStatus.NO_CONTENT:
            print_success(document_url, f"Document deleted #{delete_count}", clean_line)
            delete_count += 1
            if delete_count >= max_delete:
                raise SystemExit
            continue

        print_error(document_url, clean_line, f"Something wrong: {req_delete.status_code=}")
        c.run(f"open {document_url}")

maintenance(c, reindex=True, optimize=True, thumbnails=True)

Reindex all docs and optionally optimize them.

https://docs.paperless-ngx.com/administration/#index https://docs.paperless-ngx.com/administration/#thumbnails

Source code in src/conjuring/spells/paperless.py
@task
def maintenance(c: Context, reindex: bool = True, optimize: bool = True, thumbnails: bool = True) -> None:
    """Reindex all docs and optionally optimize them.

    https://docs.paperless-ngx.com/administration/#index
    https://docs.paperless-ngx.com/administration/#thumbnails
    """
    if reindex:
        c.run(f"{paperless_cmd()} document_index reindex")
    if optimize:
        c.run(f"{paperless_cmd()} document_index optimize")
    if thumbnails:
        c.run(f"{paperless_cmd()} document_thumbnails")

paperless_cmd()

Command to run Paperless with Docker.

Source code in src/conjuring/spells/paperless.py
def paperless_cmd() -> str:
    """Command to run Paperless with Docker."""
    yaml_file = lazy_env_variable("PAPERLESS_COMPOSE_YAML", "path to the Paperless Docker compose YAML file")
    return f"docker compose -f {yaml_file} exec webserver"

paperless_documents_dir()

Directory where Paperless stores documents.

Source code in src/conjuring/spells/paperless.py
def paperless_documents_dir() -> Path:
    """Directory where Paperless stores documents."""
    documents_dir = lazy_env_variable("PAPERLESS_MEDIA_DOCUMENTS_DIR", "directory where Paperless stores documents")
    return Path(documents_dir).expanduser()

paperless_token()

Auth token to access Paperless API.

Source code in src/conjuring/spells/paperless.py
def paperless_token() -> str:
    """Auth token to access Paperless API."""
    return lazy_env_variable("PAPERLESS_TOKEN", "auth token to access Paperless API")

paperless_url()

URL where Paperless is running.

Source code in src/conjuring/spells/paperless.py
def paperless_url() -> str:
    """URL where Paperless is running."""
    return lazy_env_variable("PAPERLESS_URL", "URL where Paperless is running")

rename(c)

Rename files.

https://docs.paperless-ngx.com/administration/#renamer

Source code in src/conjuring/spells/paperless.py
@task
def rename(c: Context) -> None:
    """Rename files.

    https://docs.paperless-ngx.com/administration/#renamer
    """
    c.run(f"{paperless_cmd()} document_renamer")

sanity(c, hide=True, orphans=False, thumbnails=False, documents=False, unknown=True, together=False, fix=False, move=False)

Sanity checker. Optionally fix orphan files (copies or movies them to the download dir).

https://docs.paperless-ngx.com/administration/#sanity-checker

Source code in src/conjuring/spells/paperless.py
@task(
    help={
        "hide": "Hide progress bar of sanity command",
        "orphans": "Show orphan files",
        "thumbnails": "Show thumbnail files",
        "documents": "Show documents with issues",
        "unknown": "Show unknown lines from the log",
        "together": f"Keep {ORPHAN_ORIGINALS} and {ORPHAN_ARCHIVE} in the same output directory",
        "fix": "Fix broken files by copying them to the downloads dir",
        "move": "Move files instead of copying",
    },
)
def sanity(  # noqa: PLR0913
    c: Context,
    hide: bool = True,
    orphans: bool = False,
    thumbnails: bool = False,
    documents: bool = False,
    unknown: bool = True,
    together: bool = False,
    fix: bool = False,
    move: bool = False,
) -> None:
    """Sanity checker. Optionally fix orphan files (copies or movies them to the download dir).

    https://docs.paperless-ngx.com/administration/#sanity-checker
    """
    # Fail fast if the env var is not set
    documents_dir = paperless_documents_dir() if fix else None
    if documents_dir and not documents_dir.exists():
        msg = f"Documents directory doesn't exist: {documents_dir}"
        raise RuntimeError(msg)

    # TODO: fix(paperless): implement dry-run mode with dry=False and actually avoid files being copied/moved
    lines = run_lines(c, paperless_cmd(), "document_sanity_checker", hide=hide, warn=True, pty=True)

    progress_bar: list[str] = []
    original_or_archive_files: dict[str, list[OrphanFile]] = defaultdict(list)
    matched_files: list[OrphanFile] = []
    unmatched_files: list[OrphanFile] = []
    orphan_files: list[str] = []
    thumbnail_files: list[str] = []
    current_document: Document | None = None
    documents_with_issues: list[Document] = []
    unknown_lines = []
    for line in lines:
        if "it/s]" in line:
            progress_bar.append(line)
            continue

        if (msg := "Orphaned file in media dir: ") in line:
            partial_path = Path(line.split(msg)[1].replace(USR_SRC_DOCUMENTS, ""))
            _process_orphans(partial_path, documents_dir, original_or_archive_files, orphan_files, thumbnail_files)
            continue

        if (msg := "Detected following issue(s) with document #") in line:
            # Append the previous document
            if current_document:
                documents_with_issues.append(current_document)

            document_id, title = line.split(msg)[1].split(", titled ")
            current_document = Document(int(document_id), title)
            continue

        if current_document:
            _, error = line.split("[paperless.sanity_checker] ")
            current_document.errors.append(error)
            continue

        unknown_lines.append(line)

    _split_matched_unmatched(original_or_archive_files, matched_files, unmatched_files, together)

    _handle_items(fix, move, orphans, "Matched files", matched_files)
    _handle_items(fix, move, orphans, "Unmatched files", unmatched_files)
    _handle_items(False, move, orphans, "Orphan files", orphan_files)
    # TODO: feat(paperless): move thumbnail files to downloads dir
    _handle_items(fix, move, thumbnails, "Thumbnail files", thumbnail_files)
    _handle_items(False, move, documents, "Documents with issues", documents_with_issues)
    _handle_items(False, move, unknown, "Unknown lines", unknown_lines)

conjuring.spells.pre_commit

pre-commit: install, uninstall, run/autoupdate selected hooks.

auto(c, repo='', bleed=False)

Autoupdate a Git hook or all hooks with the latest tag. Needs fzf and yq.

Source code in src/conjuring/spells/pre_commit.py
@task()
def auto(c: Context, repo: str = "", bleed: bool = False) -> None:
    """Autoupdate a Git hook or all hooks with the latest tag. Needs fzf and yq."""
    command = ""
    if repo:
        chosen = run_with_fzf(c, "yq e '.repos[].repo' .pre-commit-config.yaml", query=repo, dry=False)
        command = f"--repo {chosen}"
    run_command(c, "pre-commit autoupdate", "--bleeding-edge" if bleed else "", command)

get_hook_types(commit_msg, desired_hooks=None)

Prepare a list of hook types to install/uninstall.

Source code in src/conjuring/spells/pre_commit.py
def get_hook_types(commit_msg: bool, desired_hooks: list[str] | None = None) -> str:
    """Prepare a list of hook types to install/uninstall."""
    hooks = ["pre-commit"]
    if desired_hooks:
        hooks.extend(desired_hooks)
    if commit_msg:
        hooks.append("commit-msg")
        hooks.append("prepare-commit-msg")
    return " ".join([f"--hook-type {h}" for h in hooks])

install(c, before, gc=False, commit_msg=True)

Pre-commit install hooks.

Source code in src/conjuring/spells/pre_commit.py
@task(
    help={
        "gc": "Run the garbage collector to remove unused venvs",
        "commit_msg": "Install commit message hooks",
        "before": "Config files to run before the current one.",
    },
    iterable=["before"],
)
def install(c: Context, before: list[str], gc: bool = False, commit_msg: bool = True) -> None:
    """Pre-commit install hooks."""
    if gc:
        _run_garbage_collector(c)
    c.run(f"pre-commit install {get_hook_types(commit_msg)} --install-hooks")
    if before:
        _patch_pre_commit_configs(before)

run(c, hooks)

Pre-commit run all hooks or a specific one. Don't stop on failures. Needs fzf and yq.

Source code in src/conjuring/spells/pre_commit.py
@task(
    help={
        "hooks": "Comma-separated list of partial hook IDs (fzf will be used to match them)."
        " Use 'all', '.' or '-' to run all hooks.",
    },
)
def run(c: Context, hooks: str) -> None:
    """Pre-commit run all hooks or a specific one. Don't stop on failures. Needs fzf and yq."""
    split_hooks = hooks.split(",")
    chosen_hooks = []
    for special in ("all", ".", "-"):
        if special in split_hooks:
            chosen_hooks.append("")
            break
    if not chosen_hooks:
        chosen_hooks = [
            run_with_fzf(
                c,
                "yq e '.repos[].hooks[].id' .pre-commit-config.yaml | sort -u",
                query=partial_hook,
                dry=False,
            )
            for partial_hook in split_hooks
        ]

    for chosen_hook in chosen_hooks:
        run_command(c, "pre-commit run --all-files", chosen_hook, warn=True)

uninstall(c, gc=False, commit_msg=True)

Pre-commit uninstall ALL hooks.

Source code in src/conjuring/spells/pre_commit.py
@task(help={"gc": "Run the garbage collector to remove unused venvs", "commit_msg": "Uninstall commit message hooks"})
def uninstall(c: Context, gc: bool = False, commit_msg: bool = True) -> None:
    """Pre-commit uninstall ALL hooks."""
    if gc:
        _run_garbage_collector(c)

    installed_hooks = [hook for hook in run_stdout(c, "ls .git/hooks", dry=False).splitlines() if ".sample" not in hook]
    c.run(f"pre-commit uninstall {get_hook_types(commit_msg, installed_hooks)}")

conjuring.spells.py

Python and Poetry.

Install venvs, run tests and coverage, install debug tools.

Poetry dataclass

Poetry-related tasks.

Source code in src/conjuring/spells/py.py
@dataclass()
class Poetry:
    """Poetry-related tasks."""

    context: Context

    def used_in_project(self, display_error: bool = True) -> bool:
        """Check if Poetry is being used."""
        used = int(
            run_command(
                self.context,
                f"grep tool.poetry {PYPROJECT_TOML} 2>/dev/null | wc -c",
                hide=True,
                warn=True,
            ).stdout.strip(),
        )
        if not used and display_error:
            print_error("This task only works with Poetry projects (so far).")
        return bool(used)

    @staticmethod
    def parse_python_version(venv: str) -> str:
        """For now, assuming we only have Poetry venvs."""
        return venv.split(" ")[0].split("-py")[1]

    def remove_venv(self, python_version: str) -> Result:
        """Remove a Poetry venv."""
        return self.context.run(f"poetry env remove python{python_version}")

    def guess_python_version(self) -> str:
        """Guess Python version from pyproject.toml."""
        # TODO: rewrite this hack and use a TOML package to read the values directly
        pyproject_lines = run_lines(
            self.context,
            f"rg --no-line-number -e '^python ' -e python_version {PYPROJECT_TOML}",
        )
        versions: set[str] = set()
        for line in pyproject_lines:
            value_with_comment = line.split("=")[1]
            value_only = value_with_comment.split("#")[0]
            clean_version = value_only.replace("^", "").replace("~", "").strip('" ')
            versions.add(clean_version)
        if len(versions) > 1:
            print_error(f"Multiple Python versions found in {PYPROJECT_TOML}: {versions=}")
            raise SystemExit
        return next(iter(versions))

    def use_venv(self, python_version: str) -> Result:
        """Use a Poetry venv."""
        version_obj = parse(python_version)
        return self.context.run(f"poetry env use python{version_obj.major}.{version_obj.minor}")

guess_python_version()

Guess Python version from pyproject.toml.

Source code in src/conjuring/spells/py.py
def guess_python_version(self) -> str:
    """Guess Python version from pyproject.toml."""
    # TODO: rewrite this hack and use a TOML package to read the values directly
    pyproject_lines = run_lines(
        self.context,
        f"rg --no-line-number -e '^python ' -e python_version {PYPROJECT_TOML}",
    )
    versions: set[str] = set()
    for line in pyproject_lines:
        value_with_comment = line.split("=")[1]
        value_only = value_with_comment.split("#")[0]
        clean_version = value_only.replace("^", "").replace("~", "").strip('" ')
        versions.add(clean_version)
    if len(versions) > 1:
        print_error(f"Multiple Python versions found in {PYPROJECT_TOML}: {versions=}")
        raise SystemExit
    return next(iter(versions))

parse_python_version(venv) staticmethod

For now, assuming we only have Poetry venvs.

Source code in src/conjuring/spells/py.py
@staticmethod
def parse_python_version(venv: str) -> str:
    """For now, assuming we only have Poetry venvs."""
    return venv.split(" ")[0].split("-py")[1]

remove_venv(python_version)

Remove a Poetry venv.

Source code in src/conjuring/spells/py.py
def remove_venv(self, python_version: str) -> Result:
    """Remove a Poetry venv."""
    return self.context.run(f"poetry env remove python{python_version}")

use_venv(python_version)

Use a Poetry venv.

Source code in src/conjuring/spells/py.py
def use_venv(self, python_version: str) -> Result:
    """Use a Poetry venv."""
    version_obj = parse(python_version)
    return self.context.run(f"poetry env use python{version_obj.major}.{version_obj.minor}")

used_in_project(display_error=True)

Check if Poetry is being used.

Source code in src/conjuring/spells/py.py
def used_in_project(self, display_error: bool = True) -> bool:
    """Check if Poetry is being used."""
    used = int(
        run_command(
            self.context,
            f"grep tool.poetry {PYPROJECT_TOML} 2>/dev/null | wc -c",
            hide=True,
            warn=True,
        ).stdout.strip(),
    )
    if not used and display_error:
        print_error("This task only works with Poetry projects (so far).")
    return bool(used)

PyEnv dataclass

pyenv-related tasks.

Source code in src/conjuring/spells/py.py
@dataclass
class PyEnv:
    """pyenv-related tasks."""

    context: Context

    def has_local(self) -> bool:
        """Check if a local Python version is set."""
        output = self.context.run("pyenv local", warn=True).stdout.strip()
        return output and "no local version" not in output

    def set_local(self, python_version: str) -> Result:
        """Set the local pyenv version."""
        latest = self.list_versions(python_version)[-1]
        return self.context.run(f"pyenv local {latest}")

    def list_versions(self, python_version: str | None = None) -> list[str]:
        """List all installed Python versions, or only the ones matching the desired version."""
        all_versions = run_lines(self.context, "pyenv versions --bare")
        if not python_version:
            return all_versions

        return [version for version in all_versions if version.startswith(python_version)]

has_local()

Check if a local Python version is set.

Source code in src/conjuring/spells/py.py
def has_local(self) -> bool:
    """Check if a local Python version is set."""
    output = self.context.run("pyenv local", warn=True).stdout.strip()
    return output and "no local version" not in output

list_versions(python_version=None)

List all installed Python versions, or only the ones matching the desired version.

Source code in src/conjuring/spells/py.py
def list_versions(self, python_version: str | None = None) -> list[str]:
    """List all installed Python versions, or only the ones matching the desired version."""
    all_versions = run_lines(self.context, "pyenv versions --bare")
    if not python_version:
        return all_versions

    return [version for version in all_versions if version.startswith(python_version)]

set_local(python_version)

Set the local pyenv version.

Source code in src/conjuring/spells/py.py
def set_local(self, python_version: str) -> Result:
    """Set the local pyenv version."""
    latest = self.list_versions(python_version)[-1]
    return self.context.run(f"pyenv local {latest}")

Pytest

Pytest-related tasks.

Source code in src/conjuring/spells/py.py
class Pytest:
    """Pytest-related tasks."""

    @staticmethod
    def command(s: bool) -> str:
        """Build pytest command."""
        command = "pytest -v"
        if s:
            command += " -s"
        return command

command(s) staticmethod

Build pytest command.

Source code in src/conjuring/spells/py.py
@staticmethod
def command(s: bool) -> str:
    """Build pytest command."""
    command = "pytest -v"
    if s:
        command += " -s"
    return command

coverage(c, show_all=False, s=False)

Run tests with pytest and coverage.

Source code in src/conjuring/spells/py.py
@task(
    help={
        "show_all": "Show all lines, even if they are covered",
        "s": "Don't capture output (same shortcut as pytest)",
    },
)
def coverage(c: Context, show_all: bool = False, s: bool = False) -> None:
    """Run tests with pytest and coverage."""
    if not Poetry(c).used_in_project():
        return

    options = [f"--cov={source}" for source in ["src", Path.cwd().name, "app"] if Path(source).exists()]

    skip_option = "" if show_all else ":skip-covered"
    options.append(f"--cov-report=term-missing{skip_option}")

    run_command(c, "poetry run", Pytest.command(s), *options)

debug_tools(c, all_=False, ipython=False, ipdb=False, pudb=False, icecream=False, devtools=False, watch=False, watcher=False)

Install debug tools.

Source code in src/conjuring/spells/py.py
@task(
    help={
        "all_": "Install all debug tools",
        "ipython": "Install https://pypi.org/project/ipython/",
        "ipdb": "Install https://pypi.org/project/ipdb/",
        "pudb": "Install https://pypi.org/project/pudb/",
        "icecream": "Install https://pypi.org/project/icecream/",
        "devtools": "Install https://pypi.org/project/devtools/",
        "watch": "Install https://github.com/joeyespo/pytest-watch",
        "watcher": "Install https://github.com/olzhasar/pytest-watcher",
    },
)
def debug_tools(  # noqa: PLR0913
    c: Context,
    all_: bool = False,
    ipython: bool = False,
    ipdb: bool = False,
    pudb: bool = False,
    icecream: bool = False,
    devtools: bool = False,
    watch: bool = False,
    watcher: bool = False,
) -> None:
    """Install debug tools."""
    if not Poetry(c).used_in_project():
        return

    if watch and watcher:
        print_error("Use only one of --watch and --watcher.")
        return
    if watch:
        c.run("poetry run python -m pip uninstall -y pytest-watcher")
    elif watcher:
        c.run("poetry run python -m pip uninstall -y pytest-watch")
    tools = [
        "pip",
        "ipython" if ipython or all_ else "",
        "ipdb" if ipdb or all_ else "",
        "pudb" if pudb or all_ else "",
        "icecream" if icecream or all_ else "",
        "devtools[pygments]" if devtools or all_ else "",
        "pytest-watch pytest-testmon" if watch or all_ else "",
        "pytest-watcher pytest-testmon" if watcher or all_ else "",
    ]
    run_command(c, "poetry run pip install --upgrade", *tools)

editable(c, inject='')

Hack to install a Poetry package as editable until Poetry supports PEP660 hooks.

It won't be needed anymore when https://github.com/python-poetry/poetry-core/pull/182 is merged.

Source code in src/conjuring/spells/py.py
@task(help={"inject": "Pipx repo to inject this project into"})
def editable(c: Context, inject: str = "") -> None:
    """Hack to install a Poetry package as editable until Poetry supports PEP660 hooks.

    It won't be needed anymore when https://github.com/python-poetry/poetry-core/pull/182 is merged.
    """
    if not Poetry(c).used_in_project():
        return

    chosen_repo = ""
    if inject:
        # Ask for the repo before doing anything else... to fail fast if no repo is chosen
        chosen_repo = run_with_fzf(c, "ls -1 ~/.local/pipx/venvs/", query=inject)
        if not chosen_repo:
            return

    c.run("poetry build")
    c.run("tar -xvzf dist/*.gz --strip-components 1 */setup.py")
    # Ignore errors, it might not be installed
    c.run("black setup.py", warn=True)

    if not chosen_repo:
        print_error("Use --inject to inject this repo into a pipx virtualenv.")
        return

    c.run(f"mv {PYPROJECT_TOML} _{PYPROJECT_TOML}")
    run_command(c, "pipx inject -e", chosen_repo, ".")
    c.run(f"mv _{PYPROJECT_TOML} {PYPROJECT_TOML}")
    c.run("rm setup.py")

install(c, version='', force=False, delete_all=False, pipx=False, editable=False)

Install a Python virtual environment. For now, only works with Poetry.

Source code in src/conjuring/spells/py.py
@task(
    help={
        "version": "Python version",
        "force": "Recreate the environment",
        "delete_all": "Delete all environments",
        "pipx": "Install with pipx",
        "editable": "Install as editable",
    },
)
def install(  # noqa: PLR0913
    c: Context,
    version: str = "",
    force: bool = False,
    delete_all: bool = False,
    pipx: bool = False,
    editable: bool = False,
) -> None:
    """Install a Python virtual environment. For now, only works with Poetry."""
    venv_list = run_lines(c, "poetry env list", hide=False)
    poetry = Poetry(c)
    if not poetry.used_in_project():
        return

    if delete_all:
        for venv_name in venv_list:
            if ".venv" in venv_name:
                c.run("rm -rf .venv")
            else:
                poetry.remove_venv(poetry.parse_python_version(venv_name))

    if not version:
        version = poetry.guess_python_version()
    pyenv = PyEnv(c)
    if force or not pyenv.has_local():
        # TODO: if tox.ini is present in the repo, set all versions from there
        pyenv.set_local(version)
    if force and not delete_all:
        poetry.remove_venv(version)
    poetry.use_venv(version)

    c.run("poetry lock --check && poetry install")
    if pipx:
        run_command(c, "pipx install", "--python", f"python{version}", " --editable" if editable else "", ".")
    c.run("poetry env list")

test(c, s=False)

Run tests with pytest.

Source code in src/conjuring/spells/py.py
@task(
    help={
        "s": "Don't capture output (same shortcut as pytest)",
    },
)
def test(c: Context, s: bool = False) -> None:
    """Run tests with pytest."""
    if not Poetry(c).used_in_project():
        return

    run_command(c, "poetry run", Pytest.command(s))

watch(c)

Watch changed files and run tests with pytest.

Source code in src/conjuring/spells/py.py
@task
def watch(c: Context) -> None:
    """Watch changed files and run tests with pytest."""
    if not Poetry(c).used_in_project():
        return

    run_command(c, "poetry run", "ptw . --testmon")

conjuring.spells.shell

Shell: install/uninstall completion.

completion_install(c, app)

Install shell completion. For now, only for the Bash shell, and only for Click projects.

Source code in src/conjuring/spells/shell.py
@task
def completion_install(c: Context, app: str) -> None:
    """Install shell completion. For now, only for the Bash shell, and only for Click projects.

    - [Shell Completion — Click Documentation (8.0.x)](https://click.palletsprojects.com/en/8.0.x/shell-completion/)
    - [click-contrib/click-completion: Add or enhance bash, fish, zsh and powershell completion in Click](https://github.com/click-contrib/click-completion)
    """
    completion_file = f"{COMPAT_DIR}{app}.bash-completion"
    c.run(f"_{app.upper()}_COMPLETE=bash_source {app} > {completion_file}")
    c.run(f"eza -l {completion_file}")
    c.run(f"bat {completion_file}")

completion_list(c)

List existing shell completions.

Source code in src/conjuring/spells/shell.py
@task
def completion_list(c: Context) -> None:
    """List existing shell completions."""
    for var in COMPLETION_DIRS:
        c.run(f"eza -l {var}")

completion_uninstall(c, app)

Uninstall shell completion from both completion dirs.

Source code in src/conjuring/spells/shell.py
@task
def completion_uninstall(c: Context, app: str) -> None:
    """Uninstall shell completion from both completion dirs."""
    for completion_dir in COMPLETION_DIRS:
        with c.cd(completion_dir):
            c.run(f"rm -v {app}*", warn=True)
    completion_list(c)