-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathx-cmds.py
More file actions
471 lines (370 loc) · 19.2 KB
/
x-cmds.py
File metadata and controls
471 lines (370 loc) · 19.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
#!/usr/bin/env python3
#[x-cmds]: UPDATE
"""Lists all Python files, executable as commands, in the current directory.
A short description and command arguments are displayed if available."""
from pathlib import Path
from typing import TypedDict, Optional, Literal, cast
from xulbux.base.types import ArgParseConfigs
from xulbux.console import Throbber
from xulbux.regex import LazyRegex
from xulbux import FormatCodes, Console, FileSys, String, System
import requests
import hashlib
import os
import re
"""
[1] WHICH FILES ARE CONSIDERED COMMANDS?
Only files, starting with a python shebang line (e.g. `#!/usr/bin/env python3`), are considered commands.
[2] WHICH FILES WILL BE CHECKED FOR UPDATES?
Only files that include the comment `#[x-cmds]: UPDATE` at the top of the file will be checked for updates from GitHub.
[3] COMMAND DESCRIPTION
The first multi-line comment (triple quotes) at the start of the file is used as a short description.
[4] COMMAND ARGUMENTS & OPTIONS
The use of `Console.get_args()` will automatically be parsed and displayed correctly.
When getting args using `sys.argv`, add a comment to describe the arguments on the line `sys.argv` is used.
The structure of the comment is similar to how the `**arg_parse_configs` kwargs are defined for `Console.get_args()`:
# [pos_arg1: before, arg2: {-a2, --arg2}, arg3: {-a3, --arg3}, pos_arg4: after]
"""
CONFIG: ScriptConfig = {
"command_dir": FileSys.script_dir,
"github_updates" : {
"github_repo_urls": ["https://github.com/xulbux/python/tree/main/commands"],
"check_for_new_commands": True,
"check_for_command_updates": True,
},
}
class GithubUpdatesConfig(TypedDict):
"""Schema for GitHub updates configuration."""
github_repo_urls: list[str]
check_for_new_commands: bool
check_for_command_updates: bool
class ScriptConfig(TypedDict):
"""Schema for the script configuration."""
command_dir: Path
github_updates: GithubUpdatesConfig
ARGS = Console.get_args({"update_check": {"-u", "--update"}})
PATTERNS = LazyRegex(
python_shebang=r"(?i)^\s*#!.*python",
update_marker=r"(?i)^\s*#\s*\[x-cmds\]\s*:\s*UPDATE\s*$",
desc=r"(?is)^(?:\s*#!?[^\n]+)*\s*(\"{3}(?:(?!\"\"\").)+\"{3}|'{3}(?:(?!''').)+'{3})",
sys_argv=r"(?m)sys\s*\.\s*argv(?:\[[-:0-9]+\])?(?:\s*#\s*(\[.+?\]))?",
args_comment=r"(\w+)(?:\s*:\s*(?:\{([^\}]*)\}|(before|after)))?",
get_args=r"(?m)Console\s*\.\s*get_args\s*\(\s*(?:[\w]+\s*=\s*(['\"])[^\1]+\1\s*(?:,\s*)?)?(?:arg_parse_configs\s*=\s*)?\{(?P<brace>(?:[^{}\"']|\"(?:\\.|[^\"\\])*\"|'(?:\\.|[^'\\])*'|\{(?&brace)\})*)\}(?:\s*(?:,\s*)?(?:[\w]+\s*=\s*)?(['\"])[^\3]+\3)?\s*\)",
arg=r"""\s*(['"])(\w+)\1\s*:\s*(.*)\s*,?""",
)
def is_python_file(filepath: str) -> bool:
"""Check if a file is a Python file by looking for shebang line."""
try:
with open(filepath, "r", encoding="utf-8") as file:
return bool(PATTERNS.python_shebang.match(file.readline()))
except Exception:
return False
def get_python_files() -> set[str]:
"""Get all Python files in the command directory by checking shebang lines."""
python_files: set[str] = set()
for file_path in CONFIG["command_dir"].iterdir():
if file_path.is_file() and is_python_file(str(file_path)):
python_files.add(file_path.name)
return python_files
def get_xcmds_options(filepath: str) -> dict[str, bool]:
"""Get options for `x-cmds` set using special `#[x-cmds]: …` comments."""
options: dict[str, bool] = {}
try:
with open(filepath, "r", encoding="utf-8") as file:
for line in file:
if PATTERNS.python_shebang.match(line):
continue # SKIP SHEBANG LINE
elif PATTERNS.update_marker.match(line):
options["update_check"] = True
else:
break # STOP AT FIRST NON-MATCHING LINE
except Exception:
pass
return options
def sort_flags(flags: list[str]) -> list[str]:
return sorted(flags, key=lambda x: (len(x) - len(x.lstrip("-")), x))
def arguments_desc(arg_parse_configs: Optional[ArgParseConfigs]) -> str:
if not arg_parse_configs or len(arg_parse_configs) < 1:
return f"\n\n[b](Takes Options/Arguments) [dim]([[i](unknown)])"
arg_descs: list[str | list[str]] = []
keys = list(arg_parse_configs.keys())
for key, val in arg_parse_configs.items():
if len(val) < 1:
arg_descs.append(f"non-flagged argument at position [b]({keys.index(key) + 1})")
elif isinstance(val, str):
if val.lower() == "before":
arg_descs.append("all non flagged arguments [b](before) first flag")
elif val.lower() == "after":
arg_descs.append("all non flagged arguments [b](after) last flag's value")
else:
arg_descs.append(val)
elif isinstance(val, dict) and "flags" in val.keys():
arg_descs.append(sort_flags(list(val["flags"])))
elif isinstance(val, (list, tuple, set, frozenset)):
arg_descs.append(sort_flags(list(val)))
else:
arg_descs.append(repr(val))
opt_descs = ["[_c], [br:blue]".join(d) for d in arg_descs if isinstance(d, (list, tuple, set, frozenset))]
opt_keys = [keys.pop(i - j) for j, (i, _) in enumerate((i, d) for i, d in enumerate(arg_descs) if isinstance(d, (list, tuple, set, frozenset)))]
arg_descs = [d for d in arg_descs if isinstance(d, str)]
arg_keys = [f"<{keys[i]}>" for i, _ in enumerate(arg_descs)]
left_part_len = max(len(FormatCodes.remove(x)) for x in opt_descs + arg_keys)
opt_len_diff = [len(d) - len(FormatCodes.remove(d)) for d in opt_descs]
opt_descs = [
f"[br:blue]({d:<{left_part_len + opt_len_diff[i]}})"
f" [blue]({FormatCodes.escape(f'[{opt_keys[i]}]')})"
for i, d in enumerate(opt_descs)
]
arg_descs = [
f"[br:cyan]({arg_keys[i]:<{left_part_len}})"
f" [cyan]({d})"
for i, d in enumerate(arg_descs)
]
return (
(f"\n\n[b](Takes {len(arg_descs)} Argument{'' if len(arg_descs) == 1 else 's'}:)"
f"\n {'\n '.join(cast(list[str], arg_descs))}") if len(arg_descs) > 0 else ""
) + (
(f"\n\n[b](Has {len(opt_descs)} Option{'' if len(opt_descs) == 1 else 's'}:)"
f"\n {'\n '.join(opt_descs)}") if len(opt_descs) > 0 else ""
)
def parse_args_comment(comment_str: str) -> ArgParseConfigs:
result: ArgParseConfigs = {}
for match in PATTERNS.args_comment.finditer(cast(re.Match[str], re.match(r"\[(.*)\]", comment_str)).group(1)):
key = str(match.group(1))
if (val := match.group(3)) in {"before", "after"}:
result[key] = cast(Literal["before", "after"], val)
else:
flags: set[str] = {flag.strip() for flag in match.group(2).split(",")} if match.group(2) else set()
result[key] = flags
return result
def get_commands_str(python_files: set[str]) -> str:
i, cmds = 0, ""
for i, file in enumerate(sorted(python_files), 1):
cmd_name = Path(file).stem
cmd_title_len = len(str(i)) + len(cmd_name) + 4
cmds += f"\n[b|br:white|bg:br:white]([[black]{i}[br:white]][in|black]( {cmd_name} [bg:black]{'━' * (Console.w - cmd_title_len)}))"
sys_argv_comments, get_args_funcs = [], []
with open(CONFIG["command_dir"] / file, "r", encoding="utf-8") as f:
if desc := PATTERNS.desc.match(content := f.read()):
cmds += f"\n\n[i]{desc.group(1).strip("\n\"'")}[_]"
sys_argv_comments = PATTERNS.sys_argv.findall(content)
get_args_funcs = [func_args[1] for func_args in PATTERNS.get_args.findall(content) if func_args[1]]
arg_parse_configs: ArgParseConfigs = {}
if len(get_args_funcs) > 0:
try:
# GET ARGUMENTS OF FIRST NON-EMPTY Console.get_args() CALL
func_args = ""
if len(get_args_funcs) > 1:
for func_args in get_args_funcs:
if (func_args := func_args.strip()):
break
elif len(get_args_funcs) == 1:
func_args = get_args_funcs[0]
# PARSE THE FUNCTION ARGUMENTS
for arg in PATTERNS.arg.finditer(func_args):
if (key := arg.group(2)) and (val := arg.group(3)):
arg_parse_configs[key.strip()] = String.to_type(val.strip().rstrip(","))
cmds += arguments_desc(arg_parse_configs)
except Exception:
pass
elif len(sys_argv_comments) > 0:
# PARSE FIRST NON-EMPTY ARGS-DESCRIBING COMMENT
for comment in sys_argv_comments:
if (comment := comment.strip()).startswith("["):
arg_parse_configs.update(parse_args_comment(comment))
cmds += arguments_desc(arg_parse_configs)
cmds += "\n\n"
return cmds
class GitHubDiffs(TypedDict):
new_commands: list[str]
updated_commands: list[str]
deleted_commands: list[str]
download_urls: dict[str, str]
fetch_failed: bool
def get_github_diffs(local_files: set[str]) -> GitHubDiffs:
"""Check for new files, updated files, and deleted files on GitHub compared to local command-directory."""
result: GitHubDiffs = {
"new_commands": [],
"updated_commands": [],
"deleted_commands": [],
"download_urls": {},
"fetch_failed": False,
}
try:
# MERGE FILES FROM ALL GITHUB REPO URLS
github_files: dict[str, dict[str, str]] = {}
successful_fetches = 0
for repo_url in CONFIG["github_updates"]["github_repo_urls"]:
try:
# PARSE THE URL TO EXTRACT REPO INFO
url_pattern = re.match(r"https?://github\.com/([^/]+)/([^/]+)(?:/(?:tree|blob)/([^/]+)(/.*)?)?", repo_url)
if not url_pattern:
continue
user, repo, branch, path = url_pattern.groups()
branch, path = branch or "main", (path or "").strip("/")
# USE GITHUB API TO GET DIRECTORY CONTENTS
api_url = f"https://api.github.com/repos/{user}/{repo}/contents/{path}"
if branch: api_url += f"?ref={branch}"
response = requests.get(api_url, timeout=10)
response.raise_for_status()
# MERGE FILES FROM THIS REPO (LATER URLS OVERRIDE EARLIER ONES IF SAME NAME)
for item in response.json():
if item["type"] == "file" and item["name"].endswith((".py", ".pyw")):
cmd_name = Path(item["name"]).stem
github_files[cmd_name] = {
"filename": item["name"],
"download_url": item["download_url"],
"sha": item["sha"],
}
successful_fetches += 1
except Exception:
pass # SKIP REPOS THAT CAN'T BE ACCESSED
if successful_fetches == 0 and len(CONFIG["github_updates"]["github_repo_urls"]) > 0:
result["fetch_failed"] = True
return result # BAIL OUT TO PREVENT FALSE DELETIONS WHEN GITHUB API REQUESTS FAIL
# CREATE MAPPING FROM CMD NAME TO ACTUAL FILENAME FOR LOCAL FILES
local_file_map = {Path(f).stem: f for f in local_files}
local_cmd_names = set(local_file_map.keys())
# GET LOCAL FILES THAT HAVE UPDATE MARKER
local_updateable_files: set[str] = set()
for filename in local_files:
filepath = CONFIG["command_dir"] / filename
options = get_xcmds_options(str(filepath))
if options.get("update_check"):
local_updateable_files.add(Path(filename).stem)
# CHECK FOR NEW FILES
if CONFIG["github_updates"]["check_for_new_commands"]:
for cmd_name in github_files.keys():
if cmd_name not in local_cmd_names:
result["new_commands"].append(cmd_name)
result["download_urls"][github_files[cmd_name]["filename"]] = github_files[cmd_name]["download_url"]
# CHECK FOR UPDATED FILES (ONLY THOSE WITH UPDATE MARKER)
if CONFIG["github_updates"]["check_for_command_updates"]:
for cmd_name in local_updateable_files:
if cmd_name in github_files:
try:
local_filename = local_file_map[cmd_name]
local_path = CONFIG["command_dir"] / local_filename
# READ AS TEXT AND NORMALIZE TO LF (UNIX) LINE ENDINGS LIKE GITHUB
with open(local_path, "r", encoding="utf-8", newline="") as f:
local_content = f.read().replace("\r\n", "\n").replace("\r", "\n").encode("utf-8")
# GITHUB USES: "blob " + FILESIZE + "\0" + CONTENT THEN SHA1 HASH
local_sha = hashlib.sha1(f"blob {len(local_content)}\0".encode() + local_content).hexdigest()
# COMPARE WITH GITHUB'S SHA
if local_sha != github_files[cmd_name]["sha"]:
result["updated_commands"].append(cmd_name)
result["download_urls"][github_files[cmd_name]["filename"]] = github_files[cmd_name]["download_url"]
except Exception:
pass # SKIP FILES THAT CAN'T BE COMPARED
# CHECK FOR DELETED FILES (LOCAL FILES WITH UPDATE MARKER NOT IN GITHUB)
if CONFIG["github_updates"]["check_for_new_commands"]:
for cmd_name in local_updateable_files:
if cmd_name not in github_files and cmd_name not in result["updated_commands"]:
result["deleted_commands"].append(cmd_name)
except Exception:
pass # RETURN EMPTY LISTS IF GITHUB CHECK FAILS
return result
def github_diffs_str(github_diffs: GitHubDiffs) -> str:
if github_diffs.get("fetch_failed", False):
return (
"[br:red]✗ Failed to fetch command updates from GitHub.\n"
" [dim]Check your internet connection or configuration.[_]\n\n"
)
num_new_cmds = len(github_diffs["new_commands"])
num_cmd_updates = len(github_diffs["updated_commands"])
num_deleted_cmds = len(github_diffs["deleted_commands"])
total_changes = num_new_cmds + num_cmd_updates + num_deleted_cmds
if total_changes == 0:
return (
"[magenta](ⓘ [i](You have all available command-files"
f"{' and they\'re all up-to-date' if CONFIG['github_updates']['check_for_command_updates'] else ''}.))\n\n"
) if CONFIG["github_updates"]["check_for_new_commands"] else "[magenta](ⓘ [i](All your command-files are up-to-date.))\n\n"
# BUILD TITLE
title_parts: list[str] = []
if num_new_cmds:
title_parts.append(f"{num_new_cmds} new command{'' if num_new_cmds == 1 else 's'}")
if num_cmd_updates:
title_parts.append(f"{num_cmd_updates} command update{'' if num_cmd_updates == 1 else 's'}")
if num_deleted_cmds:
title_parts.append(f"{num_deleted_cmds} command deletion{'' if num_deleted_cmds == 1 else 's'}")
if len(title_parts) == 1:
title = f"There {'is' if total_changes == 1 else 'are'} {title_parts[0]} available."
elif len(title_parts) == 2:
title = f"There are {title_parts[0]} and {title_parts[1]} available."
else:
title = f"There are {title_parts[0]}, {title_parts[1]}, and {title_parts[2]} available."
diffs_title_len = len(title) + 5
diffs = f"[b|magenta|bg:magenta]([[black]⇣[magenta]][in|black]( {title} [bg:black]{'━' * (Console.w - diffs_title_len)}))"
if num_new_cmds:
diffs += f"\n\n[b](New Commands:)\n " + "\n ".join(f"[br:green]{cmd}[_]" for cmd in sorted(github_diffs["new_commands"]))
if num_cmd_updates:
diffs += f"\n\n[b](Updated Commands:)\n " + "\n ".join(f"[br:blue]{cmd}[_]" for cmd in sorted(github_diffs["updated_commands"]))
if num_deleted_cmds:
diffs += f"\n\n[b](Deleted Commands:)\n " + "\n ".join(f"[br:red]{cmd}[_]" for cmd in sorted(github_diffs["deleted_commands"]))
return diffs
def download_files(github_diffs: GitHubDiffs) -> None:
"""Download new and updated files from GitHub, and delete removed files."""
downloads = github_diffs["download_urls"].items()
deletions = github_diffs["deleted_commands"]
total_operations = len(downloads) + len(deletions)
if total_operations == 0:
return
if not Console.confirm("\n[b](Execute these updates?)", end="\n", default_is_yes=True):
FormatCodes.print(f"[dim|magenta](✗ Not updating commands from GitHub)\n\n")
return
success_count = 0
# DOWNLOAD NEW AND UPDATED FILES
for filename, url in downloads:
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
# SAVE WITH OR WITHOUT EXTENSION BASED ON PLATFORM
cmd_name = Path(filename).stem
file_path = CONFIG["command_dir"] / (filename if System.is_win else cmd_name)
with open(file_path, "w", encoding="utf-8") as f:
f.write(response.text)
# MAKE EXECUTABLE ON UNIX-LIKE SYSTEMS
if not System.is_win:
os.chmod(file_path, 0o755)
action = "Added" if cmd_name in github_diffs["new_commands"] else "Updated"
FormatCodes.print(f"[br:green](✓ {action} [b]({cmd_name}))")
success_count += 1
except Exception as e:
FormatCodes.print(f"[br:red](✗ Failed to download [b]({filename}) [dim]/({e})[_])")
# DELETE REMOVED FILES
for cmd_name in deletions:
try:
# TRY BOTH WITH AND WITHOUT EXTENSION
deleted = False
for ext in [".py", ".pyw", ""]:
file_path = CONFIG["command_dir"] / f"{cmd_name}{ext}"
if file_path.exists():
file_path.unlink()
deleted = True
break
if deleted:
FormatCodes.print(f"[br:green](✓ Deleted [b]({cmd_name}))")
success_count += 1
else:
FormatCodes.print(f"[dim|br:yellow](⚠ Could not find [b]({cmd_name}) to delete)")
except Exception as e:
FormatCodes.print(f"[br:red](✗ Failed to delete [b]({cmd_name}) [dim]/({e})[_])")
color = 'br:green' if success_count == total_operations else 'br:red' if success_count == 0 else 'br:yellow'
FormatCodes.print(f"\nSuccessfully completed [{color}]([b]({success_count})/{total_operations}) operation{'s' if total_operations > 1 else ''}!\n\n")
def main() -> None:
python_files = get_python_files()
FormatCodes.print(get_commands_str(python_files))
if ARGS.update_check.exists:
throbber = Throbber(label="⟳ Checking for updates")
throbber.set_format(["[magenta]({l})", "[b|magenta]({a})"])
with throbber.context():
github_diffs = get_github_diffs(python_files)
FormatCodes.print(github_diffs_str(github_diffs))
download_files(github_diffs)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print()
except Exception as e:
Console.fail(e, start="\n", end="\n\n")