diff --git a/README.md b/README.md index 6588309..222ad23 100644 --- a/README.md +++ b/README.md @@ -17,10 +17,10 @@ design and specifications of [Black][black]. > `--diff` or `--check` options. See [Usage](#usage) for more details. > [!IMPORTANT] -> **Recent Changes:** +> **Recent Changes:** > 1. **Rule and module directives are now sorted by default:** `snakefmt` will automatically sort the order of directives inside rules (e.g. `input`, `output`, `shell`) and modules into a consistent order. You can opt out of this by using the `--no-sort` CLI flag. > 2. **Black upgraded to v26:** The underlying `black` formatter has been upgraded to v26. You will see changes in how implicitly concatenated strings are wrapped (they are now collapsed onto a single line if they fit within the line limit) and other minor adjustments compared to previous versions. -> +> > **Example of expected differences:** > ```python > # Before (Snakefmt older versions) @@ -33,7 +33,7 @@ design and specifications of [Black][black]. > "b.txt", > input: > "a.txt", -> +> > # After (Directives sorted, strings collapsed by Black 26) > rule example: > input: @@ -56,13 +56,16 @@ design and specifications of [Black][black]. - [Usage](#usage) - [Basic Usage](#basic-usage) - [Full Usage](#full-usage) -- [Configuration](#configuration) - [Directive Sorting](#directive-sorting) + - [Format Directives](#format-directives) + - [Configuration](#configuration) - [Integration](#integration) - - [Editor Integration](#editor-integration) - - [Version Control Integration](#version-control-integration) - - [Github Actions](#github-actions) + - [Editor Integration](#editor-integration) + - [Version Control Integration](#version-control-integration) + - [GitHub Actions](#github-actions) - [Plug Us](#plug-us) + - [Markdown](#markdown) + - [ReStructuredText](#restructuredtext) - [Changes](#changes) - [Contributing](#contributing) - [Cite](#cite) @@ -280,20 +283,6 @@ Options: -v, --verbose Turns on debug-level logger. ``` -## Configuration - -`snakefmt` is able to read project-specific default values for its command line options -from a `pyproject.toml` file. In addition, it will also load any [`black` -configurations][black-config] you have in the same file. - -By default, `snakefmt` will search in the parent directories of the formatted file(s) -for a file called `pyproject.toml` and use any configuration there. -If your configuration file is located somewhere else or called something different, -specify it using `--config`. - -Any options you pass on the command line will take precedence over default values in the -configuration file. - ### Directive Sorting By default, `snakefmt` sorts rule and module directives (like `input`, `output`, `shell`, etc.) into a consistent order. This makes rules easier to read and allows for quicker cross-referencing between inputs, outputs, and the resources used by the execution command. @@ -313,9 +302,104 @@ This ordering ensures that the directives most frequently used in execution bloc You can disable this feature using the `--no-sort` flag. +### Format Directives + +`snakefmt` supports comment directives to control formatting behaviour for specific regions of code. +Directives should appear as standalone comment lines, an inline occurrence (e.g. `input: # fmt: off`) is treated as a plain comment and has no effect. +All directives are scope-local: only the region they select is affected, while code before and after follows normal `snakefmt` formatting and spacing rules (equivalent to replacing the directive with a plain comment line). + +#### `# fmt: off` / `# fmt: on` + +Disables all formatting for the region between the two directives. +Both directives *must* appear at the same indentation level; a `# fmt: on` at a deeper indent than the matching `# fmt: off` has no effect. + +```python +rule a: + input: + "a.txt", + + +# fmt: off +rule b: + input: "b.txt" + output: + "c.txt" +# fmt: on + + +rule c: + input: + "d.txt", +``` + +> **Note:** inside `run:` blocks and other Python contexts, `# fmt: off` / `# fmt: on` is passed through to [Black][black], which handles it natively. + +#### `# fmt: off[sort]` + +Disables directive sorting for the enclosed region while still applying all other formatting. +Directives between `# fmt: off[sort]` and `# fmt: on[sort]` are kept in their original order. +A plain `# fmt: on` also closes a `# fmt: off[sort]` region. + +```python +# fmt: off[sort] +rule keep_my_order: + output: + "result.txt", + input: + "source.txt", + shell: + "cp {input} {output}" +# fmt: on[sort] +``` + +#### `# fmt: off[next]` + +Disables formatting for the single next Snakemake keyword block (e.g. `rule`, `checkpoint`, `use rule`). +Only that block is left unformatted; all subsequent blocks are formatted normally. + +```python +rule formatted: + input: + "a.txt", + output: + "b.txt", + + +# fmt: off[next] +rule unformatted: + input: "a.txt" + output: "b.txt" + + +rule also_formatted: + input: + "a.txt", +``` + +#### `# fmt: skip` + +`# fmt: skip` preserves a single line exactly as written, without any formatting (see [Black's documentation][black-skip] for details). + +> **Note:** `# fmt: skip` is not yet supported within Snakemake rule blocks. +> It currently applies only to plain Python lines outside of rules, checkpoints, and similar Snakemake constructs. + +### Configuration + +`snakefmt` is able to read project-specific default values for its command line options +from a `pyproject.toml` file. In addition, it will also load any [`black` +configurations][black-config] you have in the same file. + +By default, `snakefmt` will search in the parent directories of the formatted file(s) +for a file called `pyproject.toml` and use any configuration there. +If your configuration file is located somewhere else or called something different, +specify it using `--config`. + +Any options you pass on the command line will take precedence over default values in the +configuration file. + #### Example -`pyproject.toml` +[`pyproject.toml`][pyproject] ```toml [tool.snakefmt] @@ -415,13 +499,13 @@ in your project. [![Code style: snakefmt](https://img.shields.io/badge/code%20style-snakefmt-000000.svg)](https://github.com/snakemake/snakefmt) -#### Markdown +### Markdown ```md [![Code style: snakefmt](https://img.shields.io/badge/code%20style-snakefmt-000000.svg)](https://github.com/snakemake/snakefmt) ``` -#### ReStructuredText +### ReStructuredText ```rst .. image:: https://img.shields.io/badge/code%20style-snakefmt-000000.svg @@ -459,6 +543,7 @@ See [CONTRIBUTING.md][contributing]. [snakemake]: https://snakemake.readthedocs.io/ [black]: https://black.readthedocs.io/en/stable/ [black-config]: https://github.com/psf/black#pyprojecttoml +[black-skip]: https://black.readthedocs.io/en/stable/usage_and_configuration/the_basics.html#ignoring-sections [pyproject]: https://github.com/snakemake/snakefmt/blob/master/pyproject.toml [contributing]: CONTRIBUTING.md [changes]: CHANGELOG.md diff --git a/snakefmt/formatter.py b/snakefmt/formatter.py index a2a3b4b..3ea3d20 100644 --- a/snakefmt/formatter.py +++ b/snakefmt/formatter.py @@ -65,7 +65,6 @@ def __init__( self.result: str = "" self.lagging_comments: str = "" self.no_formatting_yet: bool = True - self.sort_directives = sort_directives self.previous_result: str = "" self.keyword_spec: list[str] = [] self.keywords: dict[str, str] = {} # cache to sort @@ -75,7 +74,7 @@ def __init__( if line_length is not None: self.black_mode.line_length = line_length - super().__init__(snakefile) # Call to parse snakefile + super().__init__(snakefile, sort_directives=sort_directives) def get_formatted(self) -> str: return self.result @@ -90,10 +89,13 @@ def flush_buffer( from_python: bool = False, final_flush: bool = False, in_global_context: bool = False, + exiting_keywords: bool = False, ) -> None: if len(self.buffer) == 0 or self.buffer.isspace(): self.result += self.buffer self.buffer = "" + if exiting_keywords and self.no_formatting_yet and self.result.rstrip("\n"): + self.no_formatting_yet = False return if not from_python: @@ -103,6 +105,9 @@ def flush_buffer( else: # Invalid python syntax, eg lone 'else:' between two rules, can occur. # Below constructs valid code statements and formats them. + if self.fmt_off_expected_indent: + self.buffer += self.fmt_off_expected_indent + self.fmt_off_expected_indent = "" re_match = contextual_matcher.match(self.buffer) if re_match is not None: callback_keyword = re_match.group(2) @@ -119,11 +124,13 @@ def flush_buffer( ) formatted = self.run_black_format_str(to_format, self.block_indent) re_rematch = contextual_matcher.match(formatted) - if re_rematch is None: - raise ValueError( - "contextual_matcher failed to match for the given " - f"formatted string: {formatted}" - ) + assert re_rematch, ( + "This should always match as we just formatted it with the same " + "regex. If this error is raised, it's a bug in snakefmt's " + "handling of snakemake syntax. Please report this to the " + "developers with the code so we can fix it: " + "https://github.com/snakemake/snakefmt/issues" + ) if condition != "": callback_keyword += re_rematch.group(3) formatted = ( @@ -174,7 +181,7 @@ def process_keyword_param( context=param_context, ) param_formatted = self.format_params(param_context) - if self.sort_directives and not in_global_context and self.keyword_spec: + if self.sort_off_indent is None and not in_global_context and self.keyword_spec: self.keywords[param_context.keyword_name] = self.result + param_formatted self.result = "" else: @@ -188,13 +195,95 @@ def post_process_keyword(self): for keyword in self.keyword_spec: res = self.keywords.pop(keyword, "") self.previous_result += res - if self.keywords: - raise InvalidParameterSyntax( - "Unexpected keywords when sorted keywords: " - + (", ".join(self.keywords)) - ) + assert not self.keywords, ( + "All directives should have been consumed; " + "if not, this is a bug in snakefmt's handling of snakemake syntax. " + "It must be the coder's fault, not the user's. " + "So please report this to the developers with the code so we can fix it: " + "https://github.com/snakemake/snakefmt/issues" + ) self.result = self.previous_result + self.result self.previous_result = "" + # Keep no_formatting_yet when there is pending buffered content. + # This prevents premature separator insertion after fmt: off/on + # verbatim regions before the next flush occurs. + if self.no_formatting_yet and self.result.rstrip("\n") and not self.buffer: + self.no_formatting_yet = False + + def flush_fmt_off_region(self, verbatim: str): + """Blank-line rules: + + applied before the verbatim block: + - At global indent (fmt_off[0] == 0) and result not empty: + result should end with exactly 2 blank lines (``\\n\\n\\n``) + (standard separation between top-level constructs). + - When the preceding Python code had a blank line before ``# fmt: off`` + (``fmt_off_preceded_by_blank_line``): + result should end with >= 1 blank line. + - ``# fmt: off[next]`` nested inside a Python block: + another ``\\n`` is prepended to any lagging comment + so the following keyword gets its normal blank-line separator. + + applied after the verbatim block: + - ``# fmt: off[next]``: sets ``no_formatting_yet := False``, + so the next formatted block gets its normal blank-line separator. + - Plain ``# fmt: off`` regions: sets ``no_formatting_yet := True``, + suppressing blank-line insertion in the next ``add_newlines`` call. + """ + + if self.no_formatting_yet: + self.result = self.result.lstrip("\n") + self.result += self.buffer + self.buffer = "" + if self.fmt_off: + if self.fmt_off[0] == 0 and not self.no_formatting_yet: + while not self.result.endswith("\n\n\n"): + self.result += "\n" + # When fmt:off[next] is inside a Python block (e.g. `if 1:`), the + # directive ends up as a lagging_comment after flushing that block. + is_nested_next = self.fmt_off[1] == "next" + else: + is_nested_next = False + if self.lagging_comments: + # For nested fmt:off[next], add the same \n separator that + # process_keyword_context/add_newlines would normally provide + # before the first keyword inside the Python block. + if is_nested_next and not self.no_formatting_yet: + self.result += "\n" + self.result += self.lagging_comments + self.lagging_comments = "" + self.no_formatting_yet = not is_nested_next + if self.fmt_off_preceded_by_blank_line: + if self.result and not self.result.endswith("\n\n"): + self.result += "\n" + self.fmt_off_preceded_by_blank_line = False + self.result += verbatim + self.last_recognised_keyword = "" + + def flush_sort_signal(self, verbatim): + """ + If "fmt: on sort" directive is in the keyword syntax, e.g.: + + rule: + directive1: ... + # fmt: off[sort] + directive2: ... + # fmt: on[sort] <- + # other comments + directive3: ... + + the "other comments" should be kept with directive3. + This function is called when "fmt: on[sort]" reached, + and it flushes the pending comments into self.result. + """ + if self.keywords: + pending = "" + for keyword in self.keyword_spec: + pending += self.keywords.pop(keyword, "") + self.previous_result += pending + self.previous_result += self.result + verbatim + self.result = "" + self.last_recognised_keyword = "" def run_black_format_str( self, @@ -216,7 +305,6 @@ def run_black_format_str( and len(string.strip().splitlines()) > 1 and not no_nesting ) - if artificial_nest: string = f"if x:\n{textwrap.indent(string, TAB)}" @@ -473,6 +561,10 @@ def add_newlines( if comment_matches > 0: self.lagging_comments = "\n".join(all_lines[comment_break:]) + "\n" if final_flush: + # Preserve one intentional blank line before trailing + # comments at EOF (e.g. indented # fmt-like comments). + if comment_break > 0 and all_lines[comment_break - 1] == "": + self.result += "\n" self.result += self.lagging_comments else: self.result += formatted_string diff --git a/snakefmt/parser/parser.py b/snakefmt/parser/parser.py index cd0dc4a..3566865 100644 --- a/snakefmt/parser/parser.py +++ b/snakefmt/parser/parser.py @@ -1,6 +1,7 @@ +import re import tokenize from abc import ABC, abstractmethod -from typing import NamedTuple, Optional +from typing import Literal, NamedTuple, Optional from snakefmt.exceptions import UnsupportedSyntax from snakefmt.parser.grammar import PythonCode, SnakeGlobal @@ -15,6 +16,53 @@ ) from snakefmt.types import TAB, Token, TokenIterator, col_nb +_FMT_DIRECTIVE_RE = re.compile( + r"^# fmt: (off|on)(?:\[(\w+(?:,\s*\w+)*)\])?(?=$|\s{2}|\s#)" +) + + +class FMT_DIRECTIVE(NamedTuple): + disable: bool + modifiers: list[str] + + @classmethod + def from_token(cls, token: Token): + if token.type != tokenize.COMMENT: + return None + return cls.from_str(token.string) + + @classmethod + def from_str(cls, token_string: str): + """Parse a fmt directive comment. + Returns (disable, modifiers) or None if not a fmt directive. + disable: True | False + modifiers: e.g. [] | ['sort'] | ['next'] | ['sort', 'next'] + """ + m = _FMT_DIRECTIVE_RE.match(token_string) + if m is None: + return None + disable = m.group(1) == "off" + mods = [s.strip() for s in m.group(2).split(",")] if m.group(2) else [] + return cls(disable, mods) # type: ignore[arg-type] + + +def split_token_lines(token: tokenize.TokenInfo): + """Token can be multiline. + e.g., `f'''\\nplaintext\\n'''` has these tokens: + + TokenInfo(type=61 (FSTRING_START), string="f'''", + start=(21, 0), end=(21, 4), line="f'''\\n") + TokenInfo(type=62 (FSTRING_MIDDLE), string='\\ncccccccc\\n', + start=(21, 4), end=(23, 0), line="f'''\\ncccccccc\\n'''\\n") + TokenInfo(type=63 (FSTRING_END), string="'''", + start=(23, 0), end=(23, 3), line="'''\\n") + + lines should be split to drop overlapping lines and keep unique ones. + """ + return zip( + range(token.start[0], token.end[0] + 1), token.line.splitlines(keepends=True) + ) + def not_a_comment_related_token(token: Token): return token.type not in { @@ -26,6 +74,32 @@ def not_a_comment_related_token(token: Token): } +def check_indent(line: str, indents: list[str]) -> int: + indents_len = len(indents) + for i, indent in enumerate(reversed(indents), 1): + if line.startswith(indent): + return indents_len - i + raise SyntaxError("Unexpected indent") + + +def token_indents_updated(token: Token, indents: list[str]) -> bool: + if token.type == tokenize.INDENT: + line = token.line + indent = line[: len(line) - len(line.lstrip())] + if indent not in indents: + indents.append(indent) + elif token.type == tokenize.DEDENT: + line = token.line + indent = line[: len(line) - len(line.lstrip())] + while indents and indents[-1] != indent: + indents.pop() + if not indents: + raise SyntaxError("Unexpected dedent") + else: + return False + return True + + class Snakefile(TokenIterator): """ Adapted from snakemake.parser.Snakefile @@ -84,7 +158,7 @@ class Parser(ABC): and the alternation in `:self.last_block_was_snakecode`. """ - def __init__(self, snakefile: Snakefile): + def __init__(self, snakefile: Snakefile, sort_directives=False): self.context = Context( SnakeGlobal(), KeywordSyntax("Global", keyword_indent=0, accepts_py=True) ) @@ -97,6 +171,18 @@ def __init__(self, snakefile: Snakefile): self.queriable = True self.in_fstring = False self.last_token: Optional[Token] = None + # for `# fmt: off`, (indent, kind) + # kind: "region" = off/on, "sort" = off[sort]/on[sort], "next" + self.fmt_off: Optional[tuple[int, Literal["next", "region"]]] = None + self.fmt_off_expected_indent: str = "" + self.fmt_off_preceded_by_blank_line: bool = False + # None: sorting enabled (no active off[sort]). + # >=0 : disabled at that indent level and below due to active off[sort] + # sorting can be initially disabled (-1), + # but will be enabled in contexts with `# fmt: on[sort]` + self.sort_off_indent = None if sort_directives else -1 + + self.indents: list[str] = [""] status = self.get_next_queriable() self.buffer = status.buffer @@ -112,13 +198,48 @@ def __init__(self, snakefile: Snakefile): break keyword = status.token.string + if fmt_label := FMT_DIRECTIVE.from_token(status.token): + if fmt_label.disable: + if not fmt_label.modifiers: + self.fmt_off = (status.cur_indent, "region") + self.fmt_off_expected_indent = status.token.line[ + : col_nb(status.token) + ] + elif "next" in fmt_label.modifiers: + self.fmt_off = (status.cur_indent, "next") + self.fmt_off_expected_indent = status.token.line[ + : col_nb(status.token) + ] + elif "sort" in fmt_label.modifiers: + self.sort_off_indent = status.cur_indent + elif self._check_fmt_on(fmt_label, status.token) == "sort": + if not self.from_python and self.keyword_indent: + # multiline string is impossible here + # and we assume that origin_indent is the same indent + # of this comment + token_indent = status.cur_indent + sort_on = token_indent * TAB + status.token.line.strip() + "\n" + self.flush_sort_signal(sort_on) + status = self.get_next_queriable() + self.buffer = status.buffer + continue + elif self.fmt_off and status.cur_indent <= self.fmt_off[0]: + self.fmt_off = None + elif ( + self.sort_off_indent is not None + and status.cur_indent < self.sort_off_indent + ): + self.sort_off_indent = None if self.vocab.recognises(keyword): + new_vocab, new_syntax_cls = self.vocab.get(keyword) + is_context_kw = new_vocab is not None and issubclass( + new_syntax_cls, KeywordSyntax + ) if status.cur_indent > self.keyword_indent: - in_if_else = self.buffer.startswith(("if", "else", "elif")) - if self.syntax.from_python or status.pythonable or in_if_else: + if self.syntax.from_python or status.pythonable: self.from_python = True - elif self.from_python: + elif self.from_python and not is_context_kw: # We are exiting python context, so force spacing out keywords self.last_recognised_keyword = "" self.from_python = self.syntax.from_python @@ -129,6 +250,20 @@ def __init__(self, snakefile: Snakefile): status = self.process_keyword(status, self.from_python) self.block_indent = status.cur_indent self.last_block_was_snakecode = True + elif self.fmt_off: + self.flush_buffer( + from_python=True, + in_global_context=self.in_global_context, + ) + if self.keyword_indent > 0: + self.syntax.add_processed_keyword(status.token, keyword) + status = self._consume_fmt_off( + status.token, min_indent=status.cur_indent + ) + if self.last_block_was_snakecode and not status.eof: + self.block_indent = status.block_indent + self.last_block_was_snakecode = self.keyword_indent > 0 + self.buffer = status.buffer.lstrip() else: if not self.syntax.accepts_python_code and not comment_start(keyword): raise SyntaxError( @@ -136,10 +271,9 @@ def __init__(self, snakefile: Snakefile): f"in {self.syntax.keyword_name} definition" ) else: - self.buffer += f"{keyword}" - status = self.get_next_queriable() + source, status = self._consume_python(status.token) + self.buffer += source if self.last_block_was_snakecode and not status.eof: - self.block_indent = status.block_indent self.last_block_was_snakecode = False self.buffer += status.buffer if ( @@ -193,6 +327,7 @@ def flush_buffer( from_python: bool = False, final_flush: bool = False, in_global_context: bool = False, + exiting_keywords: bool = False, ) -> None: """Processes the text in :self.buffer:""" @@ -211,6 +346,229 @@ def post_process_keyword(self) -> None: """Sort params when exiting a keyword context, eg after finishing parsing a 'rule:'""" + def _consume_python( + self, start_token: Token, vocab_recognises=True, added_indent: str = "" + ) -> tuple[str, Status]: + """Collect Python source lines until a snakemake keyword at correct indent, + or dedent below min_indent, or EOF. + Returns (source_text, next_status) where next_status carries the stopping token. + """ + origin_indent = col_nb(start_token) + + lines: dict[int, str] = {start_token.start[0]: start_token.line} + # Lines that are interior to a multiline token (string / f-string body). + # Their content must not be reindented. + string_interior_lines: set[int] = set() + self.queriable = False + prev_token = None + last_indent_token = None + min_indent = -1 + # If stop_at_min is True, also stop when dedenting back to min_indent level + # (used for fmt: off[next] to consume exactly one block). + is_next_mode = self.fmt_off and self.fmt_off[1] == "next" + consuming_next = False # used with stop_at_min + seen_next_block_keyword = False + + def _init_min_indent(token: Token): + nonlocal min_indent + if not comment_start(token.string): + while not token.line.startswith(self.indents[-1]): + self.indents.pop() + min_indent = len(self.indents) - 1 + + _init_min_indent(start_token) + while True: + try: + token = next(self.snakefile) + except StopIteration: + eof_token = Token(tokenize.ENDMARKER, "", (0, 0), (0, 0), "") + self.snakefile.denext(eof_token) + break + if min_indent == -1: + _init_min_indent(token) + elif token.line[:origin_indent].strip(): + # non-whitespace before origin indent: stop + self.snakefile.denext(token) + break + self.last_token = token + self.in_fstring = fstring_processing(token, prev_token, self.in_fstring) + prev_token = token + if token.type == tokenize.ENDMARKER: + self.snakefile.denext(token) + break + elif token.type == tokenize.INDENT: + token_indents_updated(token, self.indents) + self.syntax.cur_indent = len(self.indents) - 1 + last_indent_token = token + if is_next_mode and len(self.indents) - 1 > min_indent: + consuming_next = True + continue + elif token.type == tokenize.DEDENT: + saved_indents = list(self.indents) + token_indents_updated(token, self.indents) + new_indent = len(self.indents) - 1 + last_indent_token = None + if new_indent < min_indent or ( + consuming_next and new_indent == min_indent + ): + # let get_next_queriable handle dedent below min_indent + self.indents = saved_indents + self.snakefile.denext(token) + break + self.syntax.cur_indent = new_indent + continue + elif is_newline(token): + self.queriable = True + lines.update(split_token_lines(token)) + continue + elif ( + (token.type == tokenize.NAME or token.string == "@") + and self.queriable + and not self.in_fstring + and self.vocab.recognises(token.string) + ): + if is_next_mode: + if seen_next_block_keyword: + # fmt: off[next] consumed one whole keyword block. + self._detent_last_indent(token, last_indent_token) + break + else: + seen_next_block_keyword = True + if vocab_recognises: + # snakemake keyword: stop, let main loop handle it + self._detent_last_indent(token, last_indent_token) + break + # `# fmt: off[next]` within Python code: stop and let main loop handle it. + elif self._consume_fmt_off_in_python(token, lines): + break + + self.queriable = False + lines.update(split_token_lines(token)) + # Mark interior lines of any multiline token as string content. + if token.start[0] != token.end[0]: + string_interior_lines.update( + range(token.start[0] + 1, token.end[0] + 1) + ) + + verbatim = self._reindent( + lines, string_interior_lines, origin_indent, added_indent + ) + next_status = self.get_next_queriable() + if consuming_next and verbatim: + # Strip extra trailing blank lines; the following block's separator + # logic (add_newlines) will provide the correct spacing. + while verbatim.endswith("\n\n"): + verbatim = verbatim[:-1] + return verbatim, next_status._replace( + pythonable=next_status.pythonable or bool(verbatim.strip()) + ) + + def _detent_last_indent(self, token: Token, last_indent_token: Optional[Token]): + """ + A whole keyword block consumed, + hand the next same-level block back to main loop. + """ + self.snakefile.denext(token) + if last_indent_token is not None: + self.snakefile.denext(last_indent_token) + self.indents.pop() + self.syntax.cur_indent = len(self.indents) - 1 + + def _consume_fmt_off_in_python(self, token: Token, lines: dict[int, str]): + """ + Consume `# fmt: off/on` directives within Python code. + lines is needed to: + 1. determine the effective indent of the comment token + (when fmt: off in global context, or fmt: off[next] in any context) + 2. record the lines of a fmt: off region (when fmt: on[region]) + Returns True if a fmt directive was consumed, + which should be handled by the main loop (and break there) + """ + fmt_label = FMT_DIRECTIVE.from_token(token) + if not fmt_label: + return False + if fmt_label.disable: + if fmt_label.modifiers: + # `# fmt: off[` isn't actual format disabler, affects limited + if not self.fmt_off or ( + # two following [next] + self.fmt_off[1] != "region" + and self._determine_comment_indent(token) == self.fmt_off[0] + ): + self.snakefile.denext(token) + return True + elif self.in_global_context: + # In global Python context, plain `# fmt: off` starts a parser + # verbatim region. In non-global Python contexts (e.g. run:), it + # stays inside Python and is handled by Black. + last_line = lines[max(lines)] if lines else "" + self.fmt_off_preceded_by_blank_line = not last_line.strip() + self.snakefile.denext(token) + return True + else: + sort_off_indent = self.sort_off_indent + if fmt_on := self._check_fmt_on(fmt_label, token): + if fmt_on == "region": + lines.update(split_token_lines(token)) + elif fmt_on == "sort": + if not self.from_python and self.keyword_indent: + # multiline string is impossible here + # and we assume that origin_indent is the same indent + # of this comment + token_indent = sort_off_indent or 0 + lines.update(split_token_lines(token)) + verbatim = self._reindent( + lines, set(), col_nb(token), token_indent * TAB + ) + self.flush_sort_signal(verbatim) + lines.clear() + else: + self.snakefile.denext(token) + return True + return False + + @abstractmethod + def flush_fmt_off_region(self, verbatim: str) -> None: + """Flush unformatted region introduced by a fmt: off directive into result""" + + @abstractmethod + def flush_sort_signal(self, verbatim: str) -> None: + """Commit fmt:on sort signal directly.""" + + def _consume_fmt_off(self, start_token: Token, min_indent: int): + verbatim, next_status = self._consume_python( + start_token, vocab_recognises=False, added_indent=TAB * min_indent + ) + self.flush_fmt_off_region(verbatim) + self.snakefile.denext(next_status.token) + self.queriable = True + if self.fmt_off and self.fmt_off[1] == "next": + self.fmt_off = None + return self.get_next_queriable() + + def _reindent( + self, + lines: dict[int, str], + string_interior_lines: set[int], + origin_indent: int, + added_indent: str = "", + ) -> str: + newlines = [] + for i in sorted(lines): + line = lines[i] + if i in string_interior_lines: + newlines.append(line) + elif line.strip(): + newline = line.rsplit("\n", 1) + if newline[0][:origin_indent].strip(): + newline[0] = added_indent + newline[0].lstrip() + else: + newline[0] = added_indent + newline[0][origin_indent:] + newlines.append("\n".join(newline)) + else: + newlines.append(line[origin_indent:]) + return "".join(newlines) + def process_keyword(self, status: Status, from_python: bool = False) -> Status: """Called when a snakemake keyword has been found. @@ -222,7 +580,7 @@ def process_keyword(self, status: Status, from_python: bool = False) -> Status: new_vocab, new_syntax = self.vocab.get(keyword) if new_vocab is not None and issubclass(new_syntax, KeywordSyntax): in_global_context = self.in_global_context - saved_context = self.context + saved_context: Context = self.context # 'use' keyword can not enter a new context self.context = Context( new_vocab(), @@ -244,8 +602,7 @@ def process_keyword(self, status: Status, from_python: bool = False) -> Status: self.queriable = True self.block_indent = self.syntax.keyword_indent + 1 status = self.get_next_queriable() - # lstrip forces the formatter deal with newlines - if self.context.syntax.accepts_python_code: # type: ignore + if self.context.syntax.accepts_python_code: self.buffer += status.buffer.lstrip("\n\r") else: self.buffer += status.buffer.lstrip() @@ -261,10 +618,13 @@ def process_keyword(self, status: Status, from_python: bool = False) -> Status: ) self.process_keyword_param(param_context, self.in_global_context) self.syntax.add_processed_keyword(status.token, status.token.string) + cur_indent = param_context.cur_indent + if param_context.token.type == tokenize.COMMENT and not param_context.eof: + cur_indent = self._determine_comment_indent(param_context.token) return Status( param_context.token, - param_context.cur_indent, - param_context.cur_indent, + cur_indent, + cur_indent, status.buffer, param_context.eof, self.from_python, @@ -278,7 +638,8 @@ def context_exit(self, status: Status) -> None: while self.keyword_indent > status.cur_indent: callback_context: Context = self.context_stack.pop() if callback_context.syntax.accepts_python_code: - self.flush_buffer() # Flushes any code inside 'run' directive + # Flushes any code inside 'run' directive + self.flush_buffer(exiting_keywords=True) else: callback_context.syntax.check_empty() self.context = self.context_stack[-1] @@ -288,6 +649,84 @@ def context_exit(self, status: Status) -> None: self.block_indent = self.cur_indent if self.keyword_indent > 0: self.syntax.keyword_indent = status.cur_indent + 1 + # ParameterSyntax consumes INDENT/DEDENT tokens without updating + # Parser.indents, leaving stale deeper-level entries. Trim them now + # so get_next_queriable computes the correct cur_indent for the next block. + while len(self.indents) - 1 > status.cur_indent: + self.indents.pop() + + def _determine_comment_indent(self, comment_token: Token) -> int: + """ + This function returns the real indent level of a comment token and + update self.indents if needed, + which is determined by the following real code line and previous indents. + + Durning parsing self.snakefile, when a comment token is encountered, + its effective indent level is not directly knowable. + + principles: + follow_indent = indent of the following real code line + if EOF: + follow_indent = 0 + rule 1 (always): + indent of comments >= follow_indent + rule 2 (if follow_indent < self.indents[-1]): + indent of comments = epsilon + max( + i for i in self.indents if i <= comment_indent + ) + + next(self.snakefile) until follow_indent is determined, + then put all peeked tokens back. + """ + # ── Step 1: peek ahead to find follow_indent ──────────────────────── + peeked: list[Token] = [] + saved_indents = list(self.indents) + follow_indent = len(self.indents) - 1 + try: + while True: + token = next(self.snakefile) + peeked.append(token) + if token_indents_updated(token, self.indents): + pass + elif token.type not in { + tokenize.NEWLINE, + tokenize.NL, + tokenize.COMMENT, + }: + follow_indent = check_indent(token.line, self.indents) + break + except StopIteration: + follow_indent = 0 + # restore indent stack and token stream unchanged + self.indents = saved_indents + for token in reversed(peeked): + self.snakefile.denext(token) + # Rule 1 (always): comment must not be indented below following code. + if len(self.indents) - 1 <= follow_indent: + return follow_indent + # Rule 2 (dedent is happening, standalone only): snap comment to the + # highest indent level fitting within the comment's column. + return max(check_indent(comment_token.line, self.indents), follow_indent) + + def _check_fmt_on(self, fmt_label: FMT_DIRECTIVE, token: Token): + """Determine which fmt: on can turn on formatting""" + if self.fmt_off: + # `# fmt: on[sort]` no effect + if "sort" not in fmt_label.modifiers: + token_indent = self._determine_comment_indent(token) + if token_indent == self.fmt_off[0]: + self.fmt_off = None + return "region" + elif self.sort_off_indent is not None: + # `fmt: on[sort]` will turn on sorting + # `fmt: on` will also turn on sorting if no `fmt: off` set + if not fmt_label.modifiers or "sort" in fmt_label.modifiers: + token_indent = self._determine_comment_indent(token) + # but if sort is globally off, only `# fmt: on[sort]` + # can turn it on (self.sort_off_indent := -1) + if token_indent == self.sort_off_indent: + self.sort_off_indent = None + return "sort" def get_next_queriable(self) -> Status: """Produces the next word that could be a snakemake keyword, @@ -307,24 +746,51 @@ def get_next_queriable(self) -> Status: self.in_fstring = fstring_processing(token, prev_token, self.in_fstring) if block_indent == -1 and not_a_comment_related_token(token): block_indent = self.cur_indent - if token.type == tokenize.INDENT: - self.syntax.cur_indent += 1 - prev_token = None - newline = True - continue - elif token.type == tokenize.DEDENT: - if self.cur_indent > 0: - self.syntax.cur_indent -= 1 + if token_indents_updated(token, self.indents): prev_token = None newline = True + self.syntax.cur_indent = len(self.indents) - 1 continue elif token.type == tokenize.ENDMARKER: return Status( token, block_indent, self.cur_indent, buffer, True, pythonable ) elif token.type == tokenize.COMMENT: - if col_nb(token) == 0: + fmt_dir = FMT_DIRECTIVE.from_token(token) + if ( + fmt_dir + and col_nb(token) == 0 + and not (fmt_dir.disable and "next" in fmt_dir.modifiers) + ): + # col-0 comments report cur_indent=0 to trigger context_exit; + # fmt directives at other columns report actual cur_indent. return Status(token, block_indent, 0, buffer, False, pythonable) + # Comments arrive in the token stream *before* any following + # INDENT/DEDENT tokens, so self.cur_indent still reflects the + # previous (potentially higher) level. Delegate to + # _determine_comment_indent which peeks ahead and applies the + # two snapping rules. + effective_indent = self._determine_comment_indent(token) + self.syntax.cur_indent = effective_indent + if effective_indent < max(self.keyword_indent, self.block_indent): + return Status( + token, block_indent, effective_indent, buffer, False, pythonable + ) + # `# fmt: off[next]` always needs parser-level handling. + # Plain `# fmt: off` is parser-level only in global context; in other + # Python contexts it is handled by Black. + if ( + fmt_dir + and fmt_dir.disable + and ( + "next" in fmt_dir.modifiers + or "sort" in fmt_dir.modifiers + or (not fmt_dir.modifiers and self.in_global_context) + ) + ): + return Status( + token, block_indent, effective_indent, buffer, False, pythonable + ) elif is_newline(token): self.queriable, newline = True, True @@ -343,7 +809,11 @@ def get_next_queriable(self) -> Status: else: buffer += TAB * self.effective_indent - if (token.type == tokenize.NAME or token.string == "@") and self.queriable: + if ( + (token.type == tokenize.NAME or token.string == "@") + and self.queriable + and not self.in_fstring + ): self.queriable = False return Status( token, block_indent, self.cur_indent, buffer, False, pythonable diff --git a/snakefmt/parser/syntax.py b/snakefmt/parser/syntax.py index 3cee140..ae90fc5 100644 --- a/snakefmt/parser/syntax.py +++ b/snakefmt/parser/syntax.py @@ -309,7 +309,7 @@ def __init__( self.keyword_indent = keyword_indent self.cur_indent = max(self.keyword_indent - 1, 0) self.comment = "" - self.token = None + self.token: Token if snakefile is not None: self.validate_keyword_line(snakefile) @@ -516,21 +516,46 @@ def parse_params(self, snakefile: TokenIterator): self.flush_param(cur_param, skip_empty=True) self.eof = True break - if self.check_exit(cur_param): + if self.check_exit(cur_param, snakefile): break if self.num_params() == 0: raise NoParametersError(f"{self.line_nb}In {self.keyword_name} definition.") - def check_exit(self, cur_param: Parameter): + def check_exit(self, cur_param: Parameter, snakefile: TokenIterator): exit = False - if not self.found_newline: + if not self.found_newline or not self.token: return exit if not_empty(self.token): - # Special condition for comments: they appear before indents/dedents. if self.token.type == tokenize.COMMENT: if not cur_param.is_empty() and col_nb(self.token) < cur_param.col_nb: - exit = True + # comment appears before INDENT/DEDENT in the token stream; + # peek ahead with a temp counter so self.cur_indent stays + # untouched — the real processing will update it once tokens + # are put back. + temp_indent = self.cur_indent + cached_tokens: list[Token] = [] + try: + while True: + t = next(snakefile) + cached_tokens.append(t) + if t.type == tokenize.INDENT: + temp_indent += 1 + elif t.type == tokenize.DEDENT: + temp_indent = max(temp_indent - 1, 0) + elif t.type not in { + tokenize.NEWLINE, + tokenize.NL, + tokenize.COMMENT, + }: + # stop here; this token stays in cached_tokens + # and will be put back below — no double-denext + break + except StopIteration: + pass + for t in reversed(cached_tokens): + snakefile.denext(t) # type: ignore[attr-defined] + exit = temp_indent < self.keyword_indent else: exit = self.cur_indent < self.keyword_indent if exit: diff --git a/tests/test_formatter.py b/tests/test_formatter.py index 216622c..99f9685 100644 --- a/tests/test_formatter.py +++ b/tests/test_formatter.py @@ -8,6 +8,7 @@ from unittest import mock import black +import black.parsing import pytest from snakefmt.exceptions import InvalidPython @@ -47,58 +48,55 @@ def test_single_param_keyword_stays_on_same_line(self): assert actual == expected + example_shell_newline = ( + "rule a:\n" + f'{TAB * 1}shell: "for i in $(seq 1 5);"\n' + f'{TAB * 2}"do echo $i;"\n' + f'{TAB * 2}"done"', + "rule a:\n" + f"{TAB * 1}shell:\n" + f'{TAB * 2}"for i in $(seq 1 5);" "do echo $i;" "done"\n', + ) + def test_shell_param_newline_indented(self): - formatter = setup_formatter( - "rule a:\n" - f'{TAB * 1}shell: "for i in $(seq 1 5);"\n' - f'{TAB * 2}"do echo $i;"\n' - f'{TAB * 2}"done"' - ) - expected = ( - "rule a:\n" - f"{TAB * 1}shell:\n" - f'{TAB * 2}"for i in $(seq 1 5);" "do echo $i;" "done"\n' - ) - assert formatter.get_formatted() == expected + formatter = setup_formatter(self.example_shell_newline[0]) + assert formatter.get_formatted() == self.example_shell_newline[1] + + example_params_newline = ( + f"rule b: \n" + f'{TAB * 1}input: "a", "b",\n' + f'{TAB * 4}"c"\n' + f'{TAB * 1}wrapper: "mywrapper"', + f"rule b:\n" + f"{TAB * 1}input:\n" + f'{TAB * 2}"a",\n' + f'{TAB * 2}"b",\n' + f'{TAB * 2}"c",\n' + f"{TAB * 1}wrapper:\n" + f'{TAB * 2}"mywrapper"\n', + ) def test_single_param_keyword_in_rule_gets_newline_indented(self): - formatter = setup_formatter( - f"rule a: \n" - f'{TAB * 1}input: "a", "b",\n' - f'{TAB * 4}"c"\n' - f'{TAB * 1}wrapper: "mywrapper"' - ) - - actual = formatter.get_formatted() - expected = ( - "rule a:\n" - f"{TAB * 1}input:\n" - f'{TAB * 2}"a",\n' - f'{TAB * 2}"b",\n' - f'{TAB * 2}"c",\n' - f"{TAB * 1}wrapper:\n" - f'{TAB * 2}"mywrapper"\n' - ) - - assert actual == expected + formatter = setup_formatter(self.example_params_newline[0]) + assert formatter.get_formatted() == self.example_params_newline[1] + + example_input_threads_newline = ( + f"rule c: \n" + f'{TAB * 1}input: "c"\n' + f"{TAB * 1}threads:\n" + f"{TAB * 2}20\n" + f"{TAB * 1}default_target:\n" + f"{TAB * 2}True\n", + f"rule c:\n" + f"{TAB * 1}input:\n" + f'{TAB * 2}"c",\n' + f"{TAB * 1}threads: 20\n" + f"{TAB * 1}default_target: True\n", + ) def test_single_numeric_param_keyword_in_rule_stays_on_same_line(self): - formatter = setup_formatter( - "rule a: \n" - f'{TAB * 1}input: "c"\n' - f"{TAB * 1}threads:\n" - f"{TAB * 2}20\n" - f"{TAB * 1}default_target:\n" - f"{TAB * 2}True\n" - ) - - actual = formatter.get_formatted() - expected = ( - f'rule a:\n{TAB * 1}input:\n{TAB * 2}"c",\n{TAB * 1}threads: 20\n' - f"{TAB * 1}default_target: True\n" - ) - - assert actual == expected + formatter = setup_formatter(self.example_input_threads_newline[0]) + assert formatter.get_formatted() == self.example_input_threads_newline[1] class TestModuleFormatting: @@ -567,7 +565,7 @@ def test_python_code_after_nested_snakecode_gets_formatted(self): 'f("a")', 0, 3, no_nesting=True ) - assert mock_m.call_args_list[2] == mock.call("b = 2\n", 0) + assert mock_m.call_args_list[2] == mock.call("b=2\n", 0) formatter = setup_formatter(snakecode) expected = ( @@ -1718,15 +1716,29 @@ def test_double_block_comment_mid_run(self): class TestSortFormatting: + sort_simple = ( + "rule a:\n" + f"{TAB * 1}# annots\n" + f"{TAB * 1}threads: 1\n" + f'{TAB * 1}log: "b",\n' + f'{TAB * 1}output: "a", "fsdfdsdfd", "ccc"\n' + f"{TAB * 1}run:\n" + f'{TAB * 2}print("hello world")\n', + "rule a:\n" + f"{TAB * 1}output:\n" + f'{TAB * 2}"a",\n' + f'{TAB * 2}"fsdfdsdfd",\n' + f'{TAB * 2}"ccc",\n' + f"{TAB * 1}log:\n" + f'{TAB * 2}"b",\n' + f"{TAB * 1}# annots\n" + f"{TAB * 1}threads: 1\n" + f"{TAB * 1}run:\n" + f'{TAB * 2}print("hello world")\n', + ) + def test_sorting_of_params(self): - snakecode = ( - "rule a:\n" - f"{TAB * 1}# annots\n" - f"{TAB * 1}threads: 1\n" - f'{TAB * 1}log: "b",\n' - f'{TAB * 1}output: "a", "fsdfdsdfd", "ccc"\n' - f"{TAB * 1}run:\n" - f'{TAB * 2}print("hello world")\n' + snakecode = self.sort_simple[0] + ( "if 2:\n" f"{TAB * 1}rule b:\n" f'{TAB * 2}output: "b",\n' @@ -1742,18 +1754,8 @@ def test_sorting_of_params(self): f'{TAB * 1}print("error")\n' ) formatter = setup_formatter(snakecode, sort_params=True) - expected = ( - "rule a:\n" - f"{TAB * 1}output:\n" - f'{TAB * 2}"a",\n' - f'{TAB * 2}"fsdfdsdfd",\n' - f'{TAB * 2}"ccc",\n' - f"{TAB * 1}log:\n" - f'{TAB * 2}"b",\n' - f"{TAB * 1}# annots\n" - f"{TAB * 1}threads: 1\n" - f"{TAB * 1}run:\n" - f'{TAB * 2}print("hello world")\n\n\n' + expected = self.sort_simple[1] + ( + f"\n\n" "if 2:\n" "\n" f"{TAB * 1}rule b:\n" @@ -1776,127 +1778,121 @@ def test_sorting_of_params(self): ) assert formatter.get_formatted() == expected + sorting_comprehensive = ( + "rule all:\n" + f"{TAB}params: p=1\n" + f"{TAB}resources: mem_mb=100\n" + f"{TAB}threads: 4\n" + f"{TAB}conda: 'env.yaml'\n" + f"{TAB}message: 'finishing'\n" + f"{TAB}log: 'log.txt'\n" + f"{TAB}output: 'out.txt'\n" + f"{TAB}# Important input\n" + f"{TAB}input: 'in.txt'\n" + f"{TAB}name: 'myrule'\n" + f"{TAB}shell: 'echo done'\n", + "rule all:\n" + f"{TAB}name:\n" + f'{TAB * 2}"myrule"\n' + f"{TAB}# Important input\n" + f"{TAB}input:\n" + f'{TAB * 2}"in.txt",\n' + f"{TAB}output:\n" + f'{TAB * 2}"out.txt",\n' + f"{TAB}log:\n" + f'{TAB * 2}"log.txt",\n' + f"{TAB}conda:\n" + f'{TAB * 2}"env.yaml"\n' + f"{TAB}threads: 4\n" + f"{TAB}resources:\n" + f"{TAB * 2}mem_mb=100,\n" + f"{TAB}params:\n" + f"{TAB * 2}p=1,\n" + f"{TAB}message:\n" + f'{TAB * 2}"finishing"\n' + f"{TAB}shell:\n" + f'{TAB * 2}"echo done"\n', + ) + def test_sorting_comprehensive(self): - snakecode = ( - "rule all:\n" - f"{TAB}shell: 'echo done'\n" - f"{TAB}params: p=1\n" - f"{TAB}resources: mem_mb=100\n" - f"{TAB}threads: 4\n" - f"{TAB}conda: 'env.yaml'\n" - f"{TAB}message: 'finishing'\n" - f"{TAB}log: 'log.txt'\n" - f"{TAB}output: 'out.txt'\n" - f"{TAB}# Important input\n" - f"{TAB}input: 'in.txt'\n" - f"{TAB}name: 'myrule'\n" - ) - formatter = setup_formatter(snakecode, sort_params=True) - expected = ( - "rule all:\n" - f"{TAB}name:\n" - f'{TAB*2}"myrule"\n' - f"{TAB}# Important input\n" - f"{TAB}input:\n" - f'{TAB*2}"in.txt",\n' - f"{TAB}output:\n" - f'{TAB*2}"out.txt",\n' - f"{TAB}log:\n" - f'{TAB*2}"log.txt",\n' - f"{TAB}conda:\n" - f'{TAB*2}"env.yaml"\n' - f"{TAB}threads: 4\n" - f"{TAB}resources:\n" - f"{TAB*2}mem_mb=100,\n" - f"{TAB}params:\n" - f"{TAB*2}p=1,\n" - f"{TAB}message:\n" - f'{TAB*2}"finishing"\n' - f"{TAB}shell:\n" - f'{TAB*2}"echo done"\n' - ) - assert formatter.get_formatted() == expected + formatter = setup_formatter(self.sorting_comprehensive[0], sort_params=True) + assert formatter.get_formatted() == self.sorting_comprehensive[1] + + sort_with_comments = ( + "rule complex:\n" + f"{TAB}# Action comment\n" + f"{TAB}shell: 'do something'\n" + f"{TAB}# Resource comment\n" + f"{TAB}resources: res=1\n" + f"{TAB}# Input comment\n" + f"{TAB}input: 'i'\n", + "rule complex:\n" + f"{TAB}# Input comment\n" + f"{TAB}input:\n" + f'{TAB * 2}"i",\n' + f"{TAB}# Resource comment\n" + f"{TAB}resources:\n" + f"{TAB * 2}res=1,\n" + f"{TAB}# Action comment\n" + f"{TAB}shell:\n" + f'{TAB * 2}"do something"\n', + ) def test_sorting_with_comments_preservation(self): - snakecode = ( - "rule complex:\n" - f"{TAB}# Action comment\n" - f"{TAB}shell: 'do something'\n" - f"{TAB}# Resource comment\n" - f"{TAB}resources: res=1\n" - f"{TAB}# Input comment\n" - f"{TAB}input: 'i'\n" - ) - formatter = setup_formatter(snakecode, sort_params=True) - # Comments stay with their keywords - expected = ( - "rule complex:\n" - f"{TAB}# Input comment\n" - f"{TAB}input:\n" - f'{TAB*2}"i",\n' - f"{TAB}# Resource comment\n" - f"{TAB}resources:\n" - f"{TAB*2}res=1,\n" - f"{TAB}# Action comment\n" - f"{TAB}shell:\n" - f'{TAB*2}"do something"\n' - ) - actual = formatter.get_formatted() - assert actual == expected + """Comments stay with their keywords""" + formatter = setup_formatter(self.sort_with_comments[0], sort_params=True) + assert formatter.get_formatted() == self.sort_with_comments[1] + + sort_inline_comments = ( + "rule inline_comments:\n" + f"{TAB}shell: 'echo'\n" + f"{TAB}params:\n" + f"{TAB * 2}p=1, # parameter comment\n" + f"{TAB}input: 'i'\n", + "rule inline_comments:\n" + f"{TAB}input:\n" + f'{TAB * 2}"i",\n' + f"{TAB}params:\n" + f"{TAB * 2}p=1, # parameter comment\n" + f"{TAB}shell:\n" + f'{TAB * 2}"echo"\n', + ) def test_sorting_with_inline_parameter_comments(self): - snakecode = ( - "rule inline_comments:\n" - f"{TAB}shell: 'echo'\n" - f"{TAB}params:\n" - f"{TAB*2}p=1, # parameter comment\n" - f"{TAB}input: 'i'\n" - ) - formatter = setup_formatter(snakecode, sort_params=True) - expected = ( - "rule inline_comments:\n" - f"{TAB}input:\n" - f'{TAB*2}"i",\n' - f"{TAB}params:\n" - f"{TAB*2}p=1, # parameter comment\n" - f"{TAB}shell:\n" - f'{TAB*2}"echo"\n' - ) - actual = formatter.get_formatted() - assert actual == expected + formatter = setup_formatter(self.sort_inline_comments[0], sort_params=True) + assert formatter.get_formatted() == self.sort_inline_comments[1] + + sort_module = ( + "module other:\n" + f"{TAB}meta_wrapper: 'wrapper'\n" + f"{TAB}replace_prefix: 'rp'\n" + f"{TAB}prefix: 'p'\n" + f"{TAB}skip_validation: True\n" + f"{TAB}config: 'c'\n" + f"{TAB}snakefile: 's'\n" + f"{TAB}pathvars: ['pv']\n" + f"{TAB}name: 'n'\n", + "module other:\n" + f'{TAB}name: "n"\n' + f"{TAB}pathvars:\n" + f'{TAB * 2}["pv"],\n' + f"{TAB}snakefile:\n" + f'{TAB * 2}"s"\n' + f"{TAB}config:\n" + f'{TAB * 2}"c"\n' + f"{TAB}skip_validation:\n" + f"{TAB * 2}True\n" + f"{TAB}prefix:\n" + f'{TAB * 2}"p"\n' + f"{TAB}replace_prefix:\n" + f'{TAB * 2}"rp"\n' + f"{TAB}meta_wrapper:\n" + f'{TAB * 2}"wrapper"\n', + ) def test_sorting_module(self): - snakecode = ( - "module other:\n" - f"{TAB}meta_wrapper: 'wrapper'\n" - f"{TAB}replace_prefix: 'rp'\n" - f"{TAB}prefix: 'p'\n" - f"{TAB}skip_validation: True\n" - f"{TAB}config: 'c'\n" - f"{TAB}snakefile: 's'\n" - f"{TAB}pathvars: ['pv']\n" - f"{TAB}name: 'n'\n" - ) - formatter = setup_formatter(snakecode, sort_params=True) - expected = ( - "module other:\n" - f'{TAB}name: "n"\n' - f"{TAB}pathvars:\n" - f'{TAB*2}["pv"],\n' - f"{TAB}snakefile:\n" - f'{TAB*2}"s"\n' - f"{TAB}config:\n" - f'{TAB*2}"c"\n' - f"{TAB}skip_validation:\n" - f"{TAB*2}True\n" - f"{TAB}prefix:\n" - f'{TAB*2}"p"\n' - f"{TAB}replace_prefix:\n" - f'{TAB*2}"rp"\n' - f"{TAB}meta_wrapper:\n" - f'{TAB*2}"wrapper"\n' - ) - assert formatter.get_formatted() == expected + formatter = setup_formatter(self.sort_module[0], sort_params=True) + assert formatter.get_formatted() == self.sort_module[1] def test_sorting_checkpoint(self): snakecode = ( @@ -1909,11 +1905,11 @@ def test_sorting_checkpoint(self): expected = ( "checkpoint map_reads:\n" f"{TAB}input:\n" - f'{TAB*2}"in.txt",\n' + f'{TAB * 2}"in.txt",\n' f"{TAB}output:\n" - f'{TAB*2}"out.txt",\n' + f'{TAB * 2}"out.txt",\n' f"{TAB}shell:\n" - f'{TAB*2}"echo"\n' + f'{TAB * 2}"echo"\n' ) assert formatter.get_formatted() == expected @@ -2004,11 +2000,12 @@ def side_effect(*args, **kwargs): formatter.snakefile = smk formatter.black_mode = black.Mode() formatter.from_python = False + formatter.fmt_off = None from snakefmt.parser.parser import Context from snakefmt.parser.syntax import KeywordSyntax formatter.context = Context( - None, KeywordSyntax("Global", keyword_indent=0, accepts_py=True) + None, KeywordSyntax("Global", keyword_indent=0, accepts_py=True) # type: ignore ) # Manually set last_token to something that isn't DEDENT/ENDMARKER formatter.last_token = tokenize.TokenInfo( @@ -2091,3 +2088,789 @@ def test_index_of_first_docstring_match(): from snakefmt.formatter import index_of_first_docstring assert index_of_first_docstring('"""docstring"""') == 14 + + +class TestFmtOffOn: + """Tests for # fmt: off / # fmt: on directives.""" + + def test_fmt_off_at_start(self): + for code, formatted in ( + TestSimpleParamFormatting.example_shell_newline, + TestSimpleParamFormatting.example_params_newline, + TestSimpleParamFormatting.example_input_threads_newline, + ): + expected = "# fmt: off\n" + code + assert setup_formatter(expected).get_formatted() == expected + + def test_fmt_off_at_middle(self): + for code, formatted in ( + TestSimpleParamFormatting.example_shell_newline, + TestSimpleParamFormatting.example_params_newline, + TestSimpleParamFormatting.example_input_threads_newline, + ): + # baseline + code1 = code + "\n\n\n# fmtoff\n" + code + expected = formatted.strip() + "\n\n\n# fmtoff\n" + formatted + assert setup_formatter(code1).get_formatted() == expected + + # before `# fmt: off`, new lines are added as usual + code1 = code + "\n\n\n# fmt: off\n" + code + expected = formatted.strip() + "\n\n\n# fmt: off\n" + code + assert setup_formatter(code1).get_formatted() == expected + + def test_fmt_off_on(self): + for code, formatted in ( + TestSimpleParamFormatting.example_shell_newline, + TestSimpleParamFormatting.example_params_newline, + TestSimpleParamFormatting.example_input_threads_newline, + ): + # baseline + code1 = "\n# fmton\n" + code + expected = "# fmton\n" + formatted + assert setup_formatter(code1).get_formatted() == expected + + # before `# fmt: on`, empty lines are removed as usual + code1 = "\n\n# fmt: on\n" + code + expected = "# fmt: on\n" + formatted + assert setup_formatter(code1).get_formatted() == expected + + # also assert in `test_fmt_off_sort` + code1 = code + "\n\n# fmt: on\n" + code + expected = formatted + "\n\n# fmt: on\n" + formatted + assert setup_formatter(code1).get_formatted() == expected + + # fmt on can enable formatting after fmt off + code1 = "\n# fmt: off\n" + code + "\n# fmt: on\n" + code + expected = "# fmt: off\n" + code + "\n# fmt: on\n" + formatted + assert setup_formatter(code1).get_formatted() == expected + + def test_fmt_off_not_on(self): + """` + - `# fmt: on` at a deeper indentation level than `# fmt: off` has no effect + - `# fmt: off` keeps the rest of the code unformatted until a same-indent + `# fmt: on` found + """ + for code, formatted in ( + TestSimpleParamFormatting.example_shell_newline, + TestSimpleParamFormatting.example_params_newline, + TestSimpleParamFormatting.example_input_threads_newline, + ): + expected = ( + "# fmt: off\n" + + code + + "\nif 1:\n a=1\n # fmt: on\n b=2\n" + + code + ) + assert setup_formatter(expected).get_formatted() == expected + + def test_fmt_off_on_in_run(self): + """# fmt: off inside Python code is handled by Black.""" + code = ( + "# ?\n" + "x = [1,2,3]\n" + "# fmt: off\n" + "y = [ 1, 2]\n" + "s = f'''\n" + " {y} \n" + " '''\n" + "# fmt: on\n" + "z = [4,5,6]\n" + ) + expected = ( + "# ?\n" + "x = [1, 2, 3]\n" + "# fmt: off\n" + "y = [ 1, 2]\n" + "s = f'''\n" + " {y} \n" + " '''\n" + "# fmt: on\n" + "z = [4, 5, 6]\n" + ) + assert setup_formatter(code).get_formatted() == expected + bad_indent = " " + snakecode = "rule:\n" " run:\n" + ( + "".join(f"{bad_indent}{i}\n" for i in code.splitlines()) + ) + snakexpected = "rule:\n" f"{TAB * 1}run:\n" + ( + f"{TAB * 2}# ?\n" + f"{TAB * 2}x = [1, 2, 3]\n" + f"{TAB * 2}# fmt: off\n" + f"{TAB * 2}y = [ 1, 2]\n" + f"{TAB * 2}s = f'''\n" + f"{bad_indent} {{y}} \n" + f"{bad_indent} '''\n" + f"{TAB * 2}# fmt: on\n" + f"{TAB * 2}z = [4, 5, 6]\n" + ) + assert setup_formatter(snakecode).get_formatted() == snakexpected + + def test_fmt_off_on_in_run_complex(self): + code, formatted = TestSimpleParamFormatting.example_shell_newline + formatter = setup_formatter( + f"rule:\n" + f"{TAB * 1}run:\n" + f"{TAB * 2}# fmt: off\n" + f"{TAB * 2}x = [ 1,2,3]\n" + f"{TAB * 2}# fmt: on\n\n" + f"sth=1\n" + f"{code}" + ) + expected = ( + "rule:\n" + f"{TAB * 1}run:\n" + f"{TAB * 2}# fmt: off\n" + f"{TAB * 2}x = [ 1,2,3]\n" + f"{TAB * 2}# fmt: on\n\n\n" + f"sth = 1\n\n\n" + f"{formatted}" + ) + assert formatter.get_formatted() == expected + formatter = setup_formatter( + f"rule:\n" + f"{TAB * 1}run:\n" + f"{TAB * 2}# fmt: off\n" + f"{TAB * 2}x = [ 1,2,3]\n\n" + f"sth=1\n" + f"{code}" + ) + expected = ( + "rule:\n" + f"{TAB * 1}run:\n" + f"{TAB * 2}# fmt: off\n" + f"{TAB * 2}x = [ 1,2,3]\n\n\n" + f"sth = 1\n\n\n" + f"{formatted}" + ) + assert formatter.get_formatted() == expected + + def test_fmt_off_on_in_rule(self): + code, formatted = TestSimpleParamFormatting.example_shell_newline + formatter = setup_formatter( + f"rule:\n" + f"{TAB * 1}# fmt: off\n" + f"{TAB * 1}run:\n" + f"{TAB * 2}x = [ 1,2,3]\n" + f"sth=1\n" + f"{code}" + ) + expected = ( + "rule:\n" + f"{TAB * 1}# fmt: off\n" + f"{TAB * 1}run:\n" + f"{TAB * 2}x = [ 1,2,3]\n\n\n" + f"sth = 1\n\n\n" + f"{formatted}" + ) + assert formatter.get_formatted() == expected + formatter = setup_formatter( + f"rule:\n" + f"{TAB * 1}message: 'finishing'\n" + f"{TAB * 1}# Important input\n" + f"{TAB * 1}input: 'in.txt'\n" + f"{TAB * 1}# fmt: off\n" + f"{TAB * 1}log: 'log.txt'\n" + f"{TAB * 1}name: 'myrule'\n" + f"{TAB * 1}# fmt: on\n" + f"{TAB * 1}output: 'out.txt'\n" + f"{TAB * 1}run:\n" + f"{TAB * 2}# fmt: off\n" + f"{TAB * 2}x = [ 1,2,3]\n\n" + f"sth=1\n" + f"{code}" + ) + expected = ( + "rule:\n" + f"{TAB}message:\n" + f'{TAB * 2}"finishing"\n' + f"{TAB}# Important input\n" + f"{TAB}input:\n" + f'{TAB * 2}"in.txt",\n' + f"{TAB}# fmt: off\n" + f"{TAB}log: 'log.txt'\n" + f"{TAB}name: 'myrule'\n" + f"{TAB}# fmt: on\n" + f"{TAB}output:\n" + f'{TAB * 2}"out.txt",\n' + f"{TAB}run:\n" + f"{TAB * 2}# fmt: off\n" + f"{TAB * 2}x = [ 1,2,3]\n\n\n" + f"sth = 1\n\n\n" + f"{formatted}" + ) + assert formatter.get_formatted() == expected + + def test_fmt_off_on_in_other(self): + formatter = setup_formatter( + "module a: \n" + f'{TAB * 1}snakefile: "other.smk"\n' + f"{TAB * 1}# fmt: off\n" + f"{TAB * 1}config: config\n" + f'{TAB * 1}prefix: "testmodule"\n' + f"{TAB * 1}# fmt: on\n" + f'{TAB * 1}replace_prefix: {{"results/": "results/testmodule/"}}\n' + f'{TAB * 1}meta_wrapper: "0.72.0/meta/bio/bwa_mapping"\n' + ) + expected = ( + "module a:\n" + f"{TAB * 1}snakefile:\n" + f'{TAB * 2}"other.smk"\n' + f"{TAB * 1}# fmt: off\n" + f"{TAB * 1}config: config\n" + f'{TAB * 1}prefix: "testmodule"\n' + f"{TAB * 1}# fmt: on\n" + f"{TAB * 1}replace_prefix:\n" + f'{TAB * 2}{{"results/": "results/testmodule/"}}\n' + f"{TAB * 1}meta_wrapper:\n" + f'{TAB * 2}"0.72.0/meta/bio/bwa_mapping"\n' + ) + assert formatter.get_formatted() == expected + + def test_fmt_off_lagging_comments(self): + expected = "if 1:\n" f"{TAB * 1}lagging_comments\n" "\n" f"{TAB * 1}# fmtany\n" + assert setup_formatter(expected).get_formatted() == expected + expected = ( + "if 1:\n" + f"{TAB * 1}lagging_comments\n" + "\n" + f"{TAB * 1}# fmt: off\n" + f"{TAB * 1}rule a:\n" + f'{TAB * 2}input: "sth"\n' + f'{TAB * 2}name: "sth"\n' + f"{TAB * 1}# fmt: on\n" + ) + assert setup_formatter(expected).get_formatted() == expected + expected = ( + "if 1:\n" + f"{TAB * 1}# lagging_comments\n" + f"{TAB * 1}# fmt: off\n" + f"{TAB * 1}rule a:\n" + f'{TAB * 2}input: "sth"\n' + f'{TAB * 2}name: "sth"\n' + f"{TAB * 1}# fmt: on\n" + ) + assert setup_formatter(expected).get_formatted() == expected + + def test_fmt_skip_in_python(self): + formatter = setup_formatter( + "if 1:\n" + f"{TAB}x = [ 1,2,3] # fmt: skip\n" + f"{TAB}sth=1 # comment no skip\n" + f"{TAB}y = [4,5,6]" + ) + expected = ( + "if 1:\n" + f"{TAB}x = [ 1,2,3] # fmt: skip\n" + f"{TAB}sth = 1 # comment no skip\n" + f"{TAB}y = [4, 5, 6]\n" + ) + assert formatter.get_formatted() == expected + + def test_fmt_skip_in_directive(self): + formatter = setup_formatter( + "rule a:\n" + " params:\n" + " x = [ 1,2,3] # fmt: skip\n" + " input: a= 'sth' # fmt: skip\n" + ) + expected = ( + "rule a:\n" + f"{TAB}params:\n" + f"{TAB * 2}x=[1, 2, 3], # fmt: skip\n" + f"{TAB}input:\n" + f'{TAB * 2}a="sth", # fmt: skip\n' + ) + # TODO: currently `# fmt: skip` in directives is not supported + assert formatter.get_formatted() # == expected + assert expected + + +class TestFmtOffSort: + def test_fmt_off_sort(self): + for code, formatted in ( + TestSortFormatting.sorting_comprehensive, + TestSortFormatting.sort_with_comments, + TestSortFormatting.sort_inline_comments, + TestSortFormatting.sort_module, + ): + # baseline: `# fmt: on` without a preceding `# fmt: off*` is a no-op + # and act as a normal comment + code1 = code + "\n\n# fmt: on\n" + code + expected = formatted + "\n\n# fmt: on\n" + formatted + assert setup_formatter(code1, sort_params=True).get_formatted() == expected + + # `# fmt: off[sort]` disables sorting for the rest of the rule + code1 = "# fmt: off[sort]\n" + code + expected = "# fmt: off[sort]\n" + setup_formatter(code).get_formatted() + assert setup_formatter(code1, sort_params=True).get_formatted() == expected + + # `# fmt: on[sort]` re-enables sorting after `# fmt: off[sort]` + code2 = code1 + "\n\n# fmt: on[sort]\n" + code + expected2 = expected + "\n\n# fmt: on[sort]\n" + formatted + assert setup_formatter(code2, sort_params=True).get_formatted() == expected2 + + # plain `# fmt: on` also re-enables sorting after `# fmt: off[sort]` + code2 = code1 + "\n\n# fmt: on\n" + code + expected2 = expected + "\n\n# fmt: on\n" + formatted + assert setup_formatter(code2, sort_params=True).get_formatted() == expected2 + + def test_fmt_off_sort_dedent(self): + """`# fmt: on` or `on[sort]` at a deeper indentation level than `off[sort]` + has no effect""" + code1, formatted1 = TestSortFormatting.sorting_comprehensive + formatted1 = setup_formatter(code1).get_formatted() + code2, formatted2 = TestSortFormatting.sort_with_comments + formatted2 = setup_formatter(code2).get_formatted() + code = ( + "# fmt: off[sort]\n" + "if 1:\n" + " # fmt: on\n" + + "".join(" " + i for i in code1.splitlines(keepends=True)).rstrip() + + "\n" + + code2.rstrip() + ) + expected = ( + "# fmt: off[sort]\n" + "if 1:\n" + "\n" + f"{TAB}# fmt: on\n" + + "".join(TAB + i for i in formatted1.splitlines(keepends=True)).rstrip() + + "\n" + "\n\n" + formatted2 + ) + assert setup_formatter(code, sort_params=True).get_formatted() == expected + + def test_fmt_off_sort_on_noeffect(self): + code1, formatted1 = TestSortFormatting.sorting_comprehensive + code2, formatted2 = TestSortFormatting.sort_with_comments + formatted2 = setup_formatter(code2).get_formatted() + code3, formatted3 = TestSortFormatting.sort_inline_comments + code = ( + code1.rstrip() + "\n" + "\n" + "if 1:\n" + " # fmt: off[sort]\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)).rstrip() + + "\n" + "\n\n" + code3 + ) + expected = ( + formatted1 + "\n\n" + "if 1:\n" + "\n" + f"{TAB}# fmt: off[sort]\n" + + "".join(TAB + i for i in formatted2.splitlines(keepends=True)) + + "\n\n" + + formatted3 + ) + assert setup_formatter(code, sort_params=True).get_formatted() == expected + + def test_fmt_off_sort_nothing(self): + code1, formatted1 = TestSortFormatting.sorting_comprehensive + code3, formatted3 = TestSortFormatting.sort_inline_comments + code = code1.rstrip() + "\n" "\n" "if 1:\n" " pass\n" "\n\n" + code3 + expected = formatted1 + "\n\n" "if 1:\n" f"{TAB}pass\n" "\n\n" + formatted3 + assert setup_formatter(code, sort_params=True).get_formatted() == expected + + def test_fmt_off_sort_between_directive(self): + """ + if you turn off sorting around one directive half way through the rule, + you would sort the half above it and the half below it, + the directive(s) that is surrounded by `# fmt: off` remain + at the same index within the rule. + """ + code = ( + "rule all:\n" + f"{TAB}params: p=1\n" + f"{TAB}resources: mem_mb=100\n" + f"{TAB}threads: 4\n" + f"{TAB}conda: 'env.yaml'\n" + f"{TAB}message: 'finishing'\n" + f"{TAB}# fmt: off[sort]\n" + f"{TAB}log: 'log.txt'\n" + f"{TAB}output: 'out.txt'\n" + f"{TAB}# before fmt\n" + f"{TAB}# fmt: on[sort]\n" + f"{TAB}# Important input\n" + f"{TAB}input: 'in.txt'\n" + f"{TAB}name: 'myrule'\n" + f"{TAB}shell: 'echo done'\n" + ) + expected = ( + "rule all:\n" + f"{TAB}conda:\n" + f'{TAB * 2}"env.yaml"\n' + f"{TAB}threads: 4\n" + f"{TAB}resources:\n" + f"{TAB * 2}mem_mb=100,\n" + f"{TAB}params:\n" + f"{TAB * 2}p=1,\n" + f"{TAB}message:\n" + f'{TAB * 2}"finishing"\n' + f"{TAB}# fmt: off[sort]\n" + f"{TAB}log:\n" + f'{TAB * 2}"log.txt",\n' + f"{TAB}output:\n" + f'{TAB * 2}"out.txt",\n' + f"{TAB}# before fmt\n" + f"{TAB}# fmt: on[sort]\n" + f"{TAB}name:\n" + f'{TAB * 2}"myrule"\n' + f"{TAB}# Important input\n" + f"{TAB}input:\n" + f'{TAB * 2}"in.txt",\n' + f"{TAB}shell:\n" + f'{TAB * 2}"echo done"\n' + ) + assert setup_formatter(code, sort_params=True).get_formatted() == expected + + def test_fmt_off_sort_between_directive2(self): + """ + In this case, the `# fmt: on` is directly parsed from `Parser.process_keyword` + """ + code = ( + "rule all:\n" + " params: p=1\n" + " resources: mem_mb=100\n" + " threads: 4\n" + " conda: 'env.yaml'\n" + " message: 'finishing'\n" + " # fmt: off[sort]\n" + " log: 'log.txt'\n" + " output: 'out.txt'\n" + " # fmt: on[sort]\n" + " # Important input\n" + " input: 'in.txt'\n" + " name: 'myrule'\n" + " shell: 'echo done'\n" + ) + expected = ( + "rule all:\n" + f"{TAB}conda:\n" + f'{TAB * 2}"env.yaml"\n' + f"{TAB}threads: 4\n" + f"{TAB}resources:\n" + f"{TAB * 2}mem_mb=100,\n" + f"{TAB}params:\n" + f"{TAB * 2}p=1,\n" + f"{TAB}message:\n" + f'{TAB * 2}"finishing"\n' + f"{TAB}# fmt: off[sort]\n" + f"{TAB}log:\n" + f'{TAB * 2}"log.txt",\n' + f"{TAB}output:\n" + f'{TAB * 2}"out.txt",\n' + f"{TAB}# fmt: on[sort]\n" + f"{TAB}name:\n" + f'{TAB * 2}"myrule"\n' + f"{TAB}# Important input\n" + f"{TAB}input:\n" + f'{TAB * 2}"in.txt",\n' + f"{TAB}shell:\n" + f'{TAB * 2}"echo done"\n' + ) + assert setup_formatter(code, sort_params=True).get_formatted() == expected + + def test_fmt_off_sort_between_directive_empty(self): + code = ( + "rule all:\n" + f"{TAB}params: p=1\n" + f"{TAB}resources: mem_mb=100\n" + f"{TAB}threads: 4\n" + f"{TAB}conda: 'env.yaml'\n" + f"{TAB}message: 'finishing'\n" + f"{TAB}# fmt: off[sort]\n" + f"{TAB}# fmt: on\n" + f"{TAB}# Important input\n" + f"{TAB}input: 'in.txt'\n" + f"{TAB}name: 'myrule'\n" + f"{TAB}shell: 'echo done'\n" + ) + expected = ( + "rule all:\n" + f"{TAB}conda:\n" + f'{TAB * 2}"env.yaml"\n' + f"{TAB}threads: 4\n" + f"{TAB}resources:\n" + f"{TAB * 2}mem_mb=100,\n" + f"{TAB}params:\n" + f"{TAB * 2}p=1,\n" + f"{TAB}message:\n" + f'{TAB * 2}"finishing"\n' + f"{TAB}# fmt: off[sort]\n" + f"{TAB}# fmt: on\n" + f"{TAB}name:\n" + f'{TAB * 2}"myrule"\n' + f"{TAB}# Important input\n" + f"{TAB}input:\n" + f'{TAB * 2}"in.txt",\n' + f"{TAB}shell:\n" + f'{TAB * 2}"echo done"\n' + ) + assert setup_formatter(code, sort_params=True).get_formatted() == expected + + +class TestFmtOffNext: + def test_fmt_off_next(self): + for code, formatted in ( + TestSimpleParamFormatting.example_shell_newline, + TestSimpleParamFormatting.example_params_newline, + TestSimpleParamFormatting.example_input_threads_newline, + ): + code1 = "\n\n# fmt: off[next]\n" + code + "\n" + code + expected = "# fmt: off[next]\n" + code.strip("\n") + "\n\n\n" + formatted + assert setup_formatter(code1).get_formatted() == expected + code1 = code.rstrip() + "\n\n# fmtnext\n" + "\n\n\n" + code + expected = ( + formatted.rstrip() + "\n\n\n" + "# fmtnext\n" + "\n\n" + formatted + ) + assert setup_formatter(code1).get_formatted() == expected + code1 = code.rstrip() + "\n\n# fmt: off[next]\n" + code + "\n\n\n" + code + expected = ( + formatted.rstrip() + + "\n\n\n" + + "# fmt: off[next]\n" + + code.strip("\n") + + "\n\n\n" + + formatted + ) + assert setup_formatter(code1).get_formatted() == expected + code1 = code + "\n# fmt: off[next]\n" + code + expected = formatted + "\n\n# fmt: off[next]\n" + code + assert setup_formatter(code1).get_formatted() == expected + code1 = code + "\n# fmt: off[next]\n" + code + "\n\n" + expected = formatted + "\n\n# fmt: off[next]\n" + code.rstrip("\n") + "\n" + assert setup_formatter(code1).get_formatted() == expected + + def test_rule_if_rule(self): + code1, format1 = TestSimpleParamFormatting.example_shell_newline + code2, format2 = TestSimpleParamFormatting.example_params_newline + code3, format3 = TestSimpleParamFormatting.example_input_threads_newline + formatter = setup_formatter( + code1 + "\n" + "if 1:\n" + "".join(" " + i for i in code2.splitlines(keepends=True)) + "\n" + f"{code3}" + ) + expected = ( + format1 + + "\n\n" + + "if 1:\n\n" + + "".join(f"{TAB * 1}" + i for i in format2.splitlines(keepends=True)) + + "\n\n" + + format3 + ) + assert formatter.get_formatted() == expected + + def test_rule_if2_rule(self): + code1, format1 = TestSimpleParamFormatting.example_shell_newline + code2, format2 = TestSimpleParamFormatting.example_params_newline + code3, format3 = TestSimpleParamFormatting.example_input_threads_newline + formatter = setup_formatter( + code1 + "\n" + "if 1:\n" + " if 2:\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + "\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + "\n" + f"{code3}" + ) + expected = ( + format1 + + "\n\n" + + "if 1:\n" + + f"{TAB * 1}if 2:\n\n" + + "".join(f"{TAB * 2}" + i for i in format2.splitlines(keepends=True)) + + "\n" + + "".join(f"{TAB * 1}" + i for i in format2.splitlines(keepends=True)) + + "\n\n" + + format3 + ) + assert formatter.get_formatted() == expected + formatter = setup_formatter( + code1 + "\n" + "if 1:\n" + " if 2:\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)).rstrip("\n") + + "\n" + " # fmt: off[next]\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + "\n" + + code3 + ) + expected = ( + format1 + + "\n\n" + + "if 1:\n" + + f"{TAB * 1}if 2:\n\n" + + "".join( + f"{TAB * 2}" + i for i in format2.splitlines(keepends=True) + ).rstrip("\n") + + "\n" + f"{TAB * 1}# fmt: off[next]\n" + + "".join(f"{TAB * 1}" + i for i in code2.splitlines(keepends=True)) + + "\n" + + "\n\n" + + format3 + ) + assert formatter.get_formatted() == expected + formatter = setup_formatter( + code1 + "\n" + "if 1:\n" + " if 2:\n" + " sth\n" + + "\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)).rstrip("\n") + + "\n" + f"{code3}" + ) + expected = ( + format1 + "\n\n" + "if 1:\n" + f"{TAB * 1}if 2:\n" + f"{TAB * 2}sth\n" + + "\n" + + "".join(f"{TAB * 1}" + i for i in format2.splitlines(keepends=True)) + + "\n\n" + + format3 + ) + assert formatter.get_formatted() == expected + formatter = setup_formatter( + code1 + "\n" + "if 1:\n" + " if 2:\n" + " # fmt: off[next]\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + "\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)).rstrip("\n") + + "\n" + f"{code3}" + ) + expected = ( + format1 + "\n\n" + "if 1:\n" + f"{TAB * 1}if 2:\n" + f"{TAB * 2}# fmt: off[next]\n" + + "".join(f"{TAB * 2}" + i for i in code2.splitlines(keepends=True)).rstrip( + "\n" + ) + + "\n\n" + + "".join(f"{TAB * 1}" + i for i in format2.splitlines(keepends=True)) + + "\n\n" + + format3 + ) + assert formatter.get_formatted() == expected + + def test_fmt_off_next_in_if(self): + code1, format1 = TestSimpleParamFormatting.example_shell_newline + code2, format2 = TestSimpleParamFormatting.example_params_newline + code3, format3 = TestSimpleParamFormatting.example_input_threads_newline + formatter = setup_formatter( + code1 + "\n# fmt: \n" + "if 1:\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + f"\n" + f"{code3}" + ) + expected = ( + format1 + + "\n\n# fmt:\n" + + "if 1:\n\n" + + "".join(f"{TAB * 1}" + i for i in format2.splitlines(keepends=True)) + + "\n" + + "\n" + + format3 + ) + assert formatter.get_formatted() == expected + formatter = setup_formatter( + code1.rstrip("\n") + "\n# fmt: off[next]\n" + "if 1:\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + "\n" + + code3 + ) + expected = ( + format1 + "\n\n# fmt: off[next]\n" + "if 1:\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)).rstrip("\n") + + "\n\n\n" + + format3 + ) + assert formatter.get_formatted() == expected + + def test_fmt_off_next_in_2if(self): + code1, format1 = TestSimpleParamFormatting.example_shell_newline + code2, format2 = TestSimpleParamFormatting.example_params_newline + code3, format3 = TestSimpleParamFormatting.example_input_threads_newline + formatter = setup_formatter( + code1.rstrip("\n") + "\n" + "if 1:\n" + " \n# fmt:\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + "\n" + + "".join(" " + i for i in code3.splitlines(keepends=True)) + ) + expected = ( + format1.rstrip("\n") + "\n" + "\n\n" + "if 1:\n\n" + f"{TAB * 1}# fmt:\n" + + "".join(f"{TAB * 1}" + i for i in format2.splitlines(keepends=True)) + + "\n" + + "".join(f"{TAB * 1}" + i for i in format3.splitlines(keepends=True)) + ) + assert formatter.get_formatted() == expected + formatter = setup_formatter( + code1.rstrip("\n") + "\n" + "if 1:\n" + "\n # fmt: off[next]\n" + + "".join(" " + i for i in code2.splitlines(keepends=True)) + + "\n" + + "".join(" " + i for i in code3.splitlines(keepends=True)) + ) + expected = ( + format1.rstrip("\n") + "\n" + "\n\n" + "if 1:\n" + f"{TAB * 1}# fmt: off[next]\n" + + "".join(f"{TAB * 1}" + i for i in code2.splitlines(keepends=True)).strip( + "\n" + ) + + "\n\n" + + "".join(f"{TAB * 1}" + i for i in format3.splitlines(keepends=True)) + ) + assert formatter.get_formatted() == expected + + def test_fmt_off_2(self): + formatter = setup_formatter( + "if 1:\n" + " rule a:\n" + ' input: "foo"\n' + " # fmt: off[next]\n" + " rule b:\n" + ' input: "bar"\n' + "\n" + " # fmt: off[next]\n" + " rule c:\n" + ' input: "baz"\n' + "rule d:\n" + ' input: "qux"\n' + ) + assert formatter.get_formatted() == ( + f"if 1:\n" + f"\n" + f"{TAB}rule a:\n" + f"{TAB * 2}input:\n" + f'{TAB * 3}"foo",\n' + f"{TAB}# fmt: off[next]\n" + f"{TAB}rule b:\n" + f'{TAB} input: "bar"\n' + f"{TAB}# fmt: off[next]\n" + f"{TAB}rule c:\n" + f'{TAB} input: "baz"\n' + f"\n" + f"\n" + f"rule d:\n" + f"{TAB}input:\n" + f'{TAB * 2}"qux",\n' + ) diff --git a/uv.lock b/uv.lock index 0db0086..d659540 100644 --- a/uv.lock +++ b/uv.lock @@ -996,7 +996,7 @@ wheels = [ [[package]] name = "snakefmt" -version = "0.11.5" +version = "1.0.0" source = { editable = "." } dependencies = [ { name = "black" },