-
Notifications
You must be signed in to change notification settings - Fork 36
Adding Dask support to mrun #927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
tsmathis
wants to merge
17
commits into
main
Choose a base branch
from
maggma-dask
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
c54592a
Create Dask executor for mrun
tsmathis 538da32
Dask entry point and cli options for mrun
tsmathis 5d0f4e3
Dask dependencies
tsmathis 58dd6fd
remove explicit setting of Python arg names
tsmathis 266c374
update scheduler help
tsmathis c0cc38a
add dask-threads option
tsmathis 7754bd0
change mem limit accepted values to match dask cli + update help text
tsmathis 34cc7e0
option for dashboard port
tsmathis 46b1f52
change hosts to hostfile to be consistent with dask cli
tsmathis d81ff89
typo in asyncssh name
tsmathis 41f58ec
sort dask related args
tsmathis 1742485
update arg names and cluster kwargs
tsmathis 8a5aaca
cluster misspelled
tsmathis 7373768
Wrong brackets for Union type
tsmathis 05597d9
Settings thread count to 0 is deprecated
tsmathis a28f001
add builder class name to dask task key, tokenize index of each chunk…
tsmathis 245cf3e
add option for saving diagnostics report from dask dashboard
tsmathis File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,132 @@ | ||
| #!/usr/bin/env/python | ||
| # coding utf-8 | ||
|
|
||
| from logging import getLogger | ||
| from typing import List, Union | ||
|
|
||
| from maggma.cli.settings import CLISettings | ||
| from maggma.core import Builder | ||
|
|
||
| try: | ||
| import dask | ||
| from dask.distributed import LocalCluster, SSHCluster, performance_report | ||
| except ImportError: | ||
| raise ImportError("Both dask and distributed are required to use Dask as a broker") | ||
|
|
||
| settings = CLISettings() | ||
|
|
||
|
|
||
| def dask_executor( | ||
| builders: List[Builder], | ||
| dashboard_port: int, | ||
| hostfile: str, | ||
| dask_threads: int, | ||
| dask_workers: int, | ||
| memory_limit, | ||
| processes: bool, | ||
| perf_report: bool, | ||
| report_name: str, | ||
| scheduler_address: str, | ||
| scheduler_port: int, | ||
| ): | ||
| """ | ||
| Dask executor for processing builders. Constructs Dask task graphs | ||
| that will be submitted to a Dask scheduler for distributed processing | ||
| on a Dask cluster. | ||
| """ | ||
| scheduler_logger = getLogger("Scheduler") | ||
|
|
||
| if hostfile: | ||
| with open(hostfile) as file: | ||
| hostnames = file.read().split() | ||
|
|
||
| scheduler_logger.info( | ||
| f"""Starting distributed Dask cluster, with scheduler at {hostnames[0]}:{scheduler_port}, | ||
| and workers at: {hostnames[1:]}:{scheduler_port}...""" | ||
| ) | ||
| else: | ||
| hostnames = None | ||
| scheduler_logger.info(f"Starting Dask LocalCluster with scheduler at: {scheduler_address}:{scheduler_port}...") | ||
|
|
||
| client = setup_dask( | ||
| dashboard_port=dashboard_port, | ||
| hostnames=hostnames, | ||
| memory_limit=memory_limit, | ||
| n_workers=dask_workers, | ||
| nthreads=dask_threads, | ||
| processes=processes, | ||
| scheduler_address=scheduler_address, | ||
| scheduler_port=scheduler_port, | ||
| ) | ||
|
|
||
| scheduler_logger.info(f"Dask dashboard available at: {client.dashboard_link}") | ||
|
|
||
| if perf_report: | ||
| with performance_report(report_name): | ||
| run_builders(builders, scheduler_logger) | ||
| else: | ||
| run_builders(builders, scheduler_logger) | ||
|
|
||
| client.shutdown() | ||
|
|
||
|
|
||
| def setup_dask( | ||
| dashboard_port: int, | ||
| hostnames: Union[List[str], None], | ||
| memory_limit, | ||
| n_workers: int, | ||
| nthreads: int, | ||
| processes: bool, | ||
| scheduler_address: str, | ||
| scheduler_port: int, | ||
| ): | ||
| logger = getLogger("Cluster") | ||
|
|
||
| logger.info("Starting cluster...") | ||
|
|
||
| if hostnames: | ||
| cluster = SSHCluster( | ||
| hosts=hostnames, | ||
| scheduler_options={"port": scheduler_port, "dashboard_address": f":{dashboard_port}"}, | ||
| worker_options={"n_workers": n_workers, "nthreads": nthreads, "memory_limit": memory_limit}, | ||
| ) | ||
| else: | ||
| cluster = LocalCluster( | ||
| dashboard_address=f":{dashboard_port}", | ||
| host=scheduler_address, | ||
| memory_limit=memory_limit, | ||
| n_workers=n_workers, | ||
| processes=processes, | ||
| scheduler_port=scheduler_port, | ||
| threads_per_worker=nthreads, | ||
| ) | ||
|
|
||
| logger.info(f"Cluster started with config: {cluster}") | ||
|
|
||
| return cluster.get_client() | ||
|
|
||
|
|
||
| def run_builders(builders, logger): | ||
| for builder in builders: | ||
| builder_name = builder.__class__.__name__ | ||
| logger.info(f"Working on {builder_name}") | ||
|
|
||
| builder.connect() | ||
| items = builder.get_items() | ||
|
|
||
| task_graph = [] | ||
|
|
||
| for idx, chunk in enumerate(items): | ||
| chunk_token = dask.base.tokenize(idx) | ||
| docs = dask.delayed(builder.get_processed_docs)( | ||
| chunk, dask_key_name=f"{builder_name}.get_processed_docs-" + chunk_token | ||
| ) | ||
| built_docs = dask.delayed(builder.process_item)( | ||
| docs, dask_key_name=f"{builder_name}.process_item-" + chunk_token | ||
| ) | ||
| update_store = dask.delayed(builder.update_targets)( | ||
| built_docs, dask_key_name=f"{builder_name}.update_targets-" + chunk_token | ||
| ) | ||
| task_graph.append(update_store) | ||
|
|
||
| dask.compute(*task_graph) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.