From c54592ad621f7007e9088c1ceebed8592bd9d7d0 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Mon, 26 Feb 2024 17:40:11 -0800 Subject: [PATCH 01/17] Create Dask executor for mrun --- src/maggma/cli/dask_executor.py | 80 +++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 src/maggma/cli/dask_executor.py diff --git a/src/maggma/cli/dask_executor.py b/src/maggma/cli/dask_executor.py new file mode 100644 index 000000000..ca30d8df0 --- /dev/null +++ b/src/maggma/cli/dask_executor.py @@ -0,0 +1,80 @@ +#!/usr/bin/env/python +# coding utf-8 + +from logging import getLogger +from typing import List + +from maggma.cli.settings import CLISettings +from maggma.core import Builder + +try: + import dask + from dask.distributed import LocalCluster, SSHCluster +except ImportError: + raise ImportError("Both dask and distributed are required to use Dask as a broker") + +settings = CLISettings() + + +def dask_executor( + scheduler_address: str, + scheduler_port: int, + dask_hosts: str, + builders: List[Builder], + dask_workers: int, + processes: bool, +): + """ + Dask executor for processing builders. Constructs Dask task graphs + that will be submitted to a Dask scheduler for distributed processing + on a Dask cluster. + """ + logger = getLogger("Scheduler") + + if dask_hosts: + with open(dask_hosts) as file: + dask_hosts = file.read().split() + + logger.info( + f"""Starting distributed Dask cluster, with scheduler at {dask_hosts[0]}:{scheduler_port}, + and workers at: {dask_hosts[1:]}:{scheduler_port}...""" + ) + else: + logger.info(f"Starting Dask LocalCluster with scheduler at: {scheduler_address}:{scheduler_port}...") + + client = setup_dask( + address=scheduler_address, port=scheduler_port, hosts=dask_hosts, n_workers=dask_workers, processes=processes + ) + + logger.info(f"Dask dashboard available at: {client.dashboard_link}") + + for builder in builders: + logger.info(f"Working on {builder.__class__.__name__}") + builder.connect() + items = builder.get_items() + + task_graph = [] + for chunk in items: + docs = dask.delayed(builder.get_processed_docs)(chunk) + built_docs = dask.delayed(builder.process_item)(docs) + update_store = dask.delayed(builder.update_targets)(built_docs) + task_graph.append(update_store) + + dask.compute(*task_graph) + + client.shutdown() + + +def setup_dask(address: str, port: int, hosts: List[str], n_workers: int, processes: bool): + logger = getLogger("Cluster") + + logger.info("Starting clutser...") + + if hosts: + cluster = SSHCluster(hosts=hosts, scheduler_port=port, n_workers=n_workers) + else: + cluster = LocalCluster(host=address, scheduler_port=port, n_workers=n_workers, processes=processes) + + logger.info(f"Cluster started with config: {cluster}") + + return cluster.get_client() From 538da32a1ddc383d6a113d8b8ff68f4959dde479 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Mon, 26 Feb 2024 17:40:52 -0800 Subject: [PATCH 02/17] Dask entry point and cli options for mrun --- src/maggma/cli/__init__.py | 72 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index ef87e2b3e..3fb41f47c 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -7,6 +7,7 @@ import sys from datetime import datetime from itertools import chain +from typing import List import click from monty.serialization import loadfn @@ -23,6 +24,10 @@ settings = CLISettings() +class BrokerExcepton(Exception): + pass + + @click.command() @click.argument("builders", nargs=-1, type=click.Path(exists=True), required=True) @click.option( @@ -83,6 +88,52 @@ type=str, help="Prefix to use in queue names when RabbitMQ is select as the broker", ) +@click.option("--dask", is_flag=True, help="Enables the use of Dask as the work broker") +@click.option( + "--processes", + default=False, + is_flag=True, + help="""**only applies when running Dask on a single machine**\n + Whether or not the Dask cluster uses thread-based or process-based parallelism.""", +) +@click.option( + "--dask-workers", + "dask_workers", + default=1, + type=int, + help="""Number of 'workers' to start. If using a distributed cluster, + this will set the number of workers, or processes, per Dask Worker""", +) +@click.option( + "--memory-limit", + "memory_limit", + default=None, + type=str, + help="""Amount of memory ('512MB', '4GB', etc.) to be allocated to each worker (process) for Dask. + Default is no limit""", +) +@click.option( + "--scheduler-address", + "scheduler_address", + type=str, + default="127.0.0.1", + help="Address for Dask scheduler", +) +@click.option( + "--scheduler-port", + "scheduler_port", + default=8786, + type=int, + help="Port for the Dask scheduler to communicate with workers over", +) +@click.option( + "--hosts", + "hosts", + default=None, + type=click.Path(exists=True), + help="""Path to file containing addresses of host machines for creating a Dask SSHcluster. + A Dask LocalCluster will be created if no 'hosts' are provided""", +) @click.option( "-m", "--memray", @@ -114,6 +165,13 @@ def run( no_bars, num_processes, rabbitmq, + dask, + processes, + dask_workers, + memory_limit, + scheduler_address, + scheduler_port, + hosts, queue_prefix, memray, memray_dir, @@ -147,8 +205,13 @@ def run( ) # Import proper manager and worker + if rabbitmq and dask: + raise BrokerExcepton("Use of multiple work brokers is not supported") + if rabbitmq: from maggma.cli.rabbitmq import manager, worker + elif dask: + from maggma.cli.dask_executor import dask_executor else: from maggma.cli.distributed import manager, worker @@ -214,6 +277,15 @@ def run( ) else: worker(url=url, port=port, num_processes=num_processes, no_bars=no_bars) + elif dask: + dask_executor( + scheduler_address=scheduler_address, + scheduler_port=scheduler_port, + dask_hosts=hosts, + builders=builder_objects, + dask_workers=dask_workers, + processes=-processes, + ) else: if num_processes == 1: for builder in builder_objects: From 5d0f4e3a84f7c650ded7bbf3f4fe4f64c8bd4958 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Mon, 26 Feb 2024 17:41:26 -0800 Subject: [PATCH 03/17] Dask dependencies --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index c12fd347d..42e7ca64c 100644 --- a/setup.py +++ b/setup.py @@ -49,6 +49,7 @@ extras_require={ "vault": ["hvac>=0.9.5"], "memray": ["memray>=1.7.0"], + "dask": ["dask>=2024.1.1", "distributed>=2024.1.1", "bokeh!=3.0.*,>=2.4.2", "aysncssh>=2.14.2"], "montydb": ["montydb>=2.3.12"], "notebook_runner": ["IPython>=8.11", "nbformat>=5.0", "regex>=2020.6"], "azure": ["azure-storage-blob>=12.16.0", "azure-identity>=1.12.0"], From 58dd6fd76de289ed4f685b494299d38aa9e43d1a Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:45:50 -0800 Subject: [PATCH 04/17] remove explicit setting of Python arg names --- src/maggma/cli/__init__.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index 3fb41f47c..c2c9420bd 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -98,7 +98,6 @@ class BrokerExcepton(Exception): ) @click.option( "--dask-workers", - "dask_workers", default=1, type=int, help="""Number of 'workers' to start. If using a distributed cluster, @@ -106,7 +105,6 @@ class BrokerExcepton(Exception): ) @click.option( "--memory-limit", - "memory_limit", default=None, type=str, help="""Amount of memory ('512MB', '4GB', etc.) to be allocated to each worker (process) for Dask. @@ -114,21 +112,18 @@ class BrokerExcepton(Exception): ) @click.option( "--scheduler-address", - "scheduler_address", type=str, default="127.0.0.1", help="Address for Dask scheduler", ) @click.option( "--scheduler-port", - "scheduler_port", default=8786, type=int, help="Port for the Dask scheduler to communicate with workers over", ) @click.option( "--hosts", - "hosts", default=None, type=click.Path(exists=True), help="""Path to file containing addresses of host machines for creating a Dask SSHcluster. From 266c374dc9ff272b8c91c405b9b52dcaf808b52d Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:46:57 -0800 Subject: [PATCH 05/17] update scheduler help --- src/maggma/cli/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index c2c9420bd..d90263e84 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -114,7 +114,8 @@ class BrokerExcepton(Exception): "--scheduler-address", type=str, default="127.0.0.1", - help="Address for Dask scheduler", + help="""Address for Dask scheduler. If a host file is provided, + the first entry in the file will be used for the scheduler""", ) @click.option( "--scheduler-port", From c0cc38a051d7484ee304418fecaf818b44ef2b8d Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:47:41 -0800 Subject: [PATCH 06/17] add dask-threads option --- src/maggma/cli/__init__.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index d90263e84..909b12fe6 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -103,6 +103,14 @@ class BrokerExcepton(Exception): help="""Number of 'workers' to start. If using a distributed cluster, this will set the number of workers, or processes, per Dask Worker""", ) +@click.option( + "--dask-threads", + default=0, + type=int, + help="""Number of threads per worker process. + Defaults to number of cores divided by the number of + processes per host.""", +) @click.option( "--memory-limit", default=None, From 7754bd0eaf431b8eb85476a27c0fd89967689f32 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:49:17 -0800 Subject: [PATCH 07/17] change mem limit accepted values to match dask cli + update help text --- src/maggma/cli/__init__.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index 909b12fe6..ec76d2fe7 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -113,10 +113,13 @@ class BrokerExcepton(Exception): ) @click.option( "--memory-limit", - default=None, - type=str, - help="""Amount of memory ('512MB', '4GB', etc.) to be allocated to each worker (process) for Dask. - Default is no limit""", + default="auto", + show_default=True, + help="""Bytes of memory that the worker can use. + This can be an integer (bytes), + float (fraction of total system memory), + string (like '5GB' or '5000M'), + 'auto', or 0, for no memory management""", ) @click.option( "--scheduler-address", From 34cc7e0963173ab2f705c8ffe3f367ba989d2cf6 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:49:30 -0800 Subject: [PATCH 08/17] option for dashboard port --- src/maggma/cli/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index ec76d2fe7..16707ede6 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -134,6 +134,7 @@ class BrokerExcepton(Exception): type=int, help="Port for the Dask scheduler to communicate with workers over", ) +@click.option("--dashboard-port", "dashboard_port", default=8787, type=int, help="") @click.option( "--hosts", default=None, From 46b1f524a9bfced45d454126ce21666ebdd7c721 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 13:50:06 -0800 Subject: [PATCH 09/17] change hosts to hostfile to be consistent with dask cli --- src/maggma/cli/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index 16707ede6..cc7b54330 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -136,11 +136,10 @@ class BrokerExcepton(Exception): ) @click.option("--dashboard-port", "dashboard_port", default=8787, type=int, help="") @click.option( - "--hosts", + "--hostfile", default=None, type=click.Path(exists=True), - help="""Path to file containing addresses of host machines for creating a Dask SSHcluster. - A Dask LocalCluster will be created if no 'hosts' are provided""", + help="Textfile with hostnames/IP addresses for creating Dask SSHCluster", ) @click.option( "-m", @@ -179,7 +178,7 @@ def run( memory_limit, scheduler_address, scheduler_port, - hosts, + hostfile, queue_prefix, memray, memray_dir, From d81ff894456282072be23ee9748c18a8d1ab3e3d Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 15:31:27 -0800 Subject: [PATCH 10/17] typo in asyncssh name --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 42e7ca64c..3ffb7afa7 100644 --- a/setup.py +++ b/setup.py @@ -49,7 +49,7 @@ extras_require={ "vault": ["hvac>=0.9.5"], "memray": ["memray>=1.7.0"], - "dask": ["dask>=2024.1.1", "distributed>=2024.1.1", "bokeh!=3.0.*,>=2.4.2", "aysncssh>=2.14.2"], + "dask": ["dask>=2024.1.1", "distributed>=2024.1.1", "bokeh!=3.0.*,>=2.4.2", "asyncssh>=2.14.2"], "montydb": ["montydb>=2.3.12"], "notebook_runner": ["IPython>=8.11", "nbformat>=5.0", "regex>=2020.6"], "azure": ["azure-storage-blob>=12.16.0", "azure-identity>=1.12.0"], From 41f58ecfe5b011659e48ff3e2a4a92a8ae2f1e3d Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 15:33:22 -0800 Subject: [PATCH 11/17] sort dask related args --- src/maggma/cli/__init__.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index cc7b54330..4cb2b4ee2 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -173,12 +173,14 @@ def run( num_processes, rabbitmq, dask, - processes, + dashboard_port, + dask_threads, dask_workers, + hostfile, memory_limit, + processes, scheduler_address, scheduler_port, - hostfile, queue_prefix, memray, memray_dir, @@ -286,12 +288,15 @@ def run( worker(url=url, port=port, num_processes=num_processes, no_bars=no_bars) elif dask: dask_executor( - scheduler_address=scheduler_address, - scheduler_port=scheduler_port, - dask_hosts=hosts, builders=builder_objects, + dashboard_port=dashboard_port, + dask_threads=dask_threads, dask_workers=dask_workers, + hostfile=hostfile, + memory_limit=memory_limit, processes=-processes, + scheduler_address=scheduler_address, + scheduler_port=scheduler_port, ) else: if num_processes == 1: From 1742485dd9d7d4157598de064365c25c951bbbd2 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 15:34:23 -0800 Subject: [PATCH 12/17] update arg names and cluster kwargs --- src/maggma/cli/dask_executor.py | 60 +++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 14 deletions(-) diff --git a/src/maggma/cli/dask_executor.py b/src/maggma/cli/dask_executor.py index ca30d8df0..57097266b 100644 --- a/src/maggma/cli/dask_executor.py +++ b/src/maggma/cli/dask_executor.py @@ -2,7 +2,7 @@ # coding utf-8 from logging import getLogger -from typing import List +from typing import List, Union from maggma.cli.settings import CLISettings from maggma.core import Builder @@ -17,12 +17,15 @@ def dask_executor( - scheduler_address: str, - scheduler_port: int, - dask_hosts: str, builders: List[Builder], + dashboard_port: int, + hostfile: str, + dask_threads: int, dask_workers: int, + memory_limit, processes: bool, + scheduler_address: str, + scheduler_port: int, ): """ Dask executor for processing builders. Constructs Dask task graphs @@ -31,19 +34,27 @@ def dask_executor( """ logger = getLogger("Scheduler") - if dask_hosts: - with open(dask_hosts) as file: - dask_hosts = file.read().split() + if hostfile: + with open(hostfile) as file: + hostnames = file.read().split() logger.info( - f"""Starting distributed Dask cluster, with scheduler at {dask_hosts[0]}:{scheduler_port}, - and workers at: {dask_hosts[1:]}:{scheduler_port}...""" + f"""Starting distributed Dask cluster, with scheduler at {hostnames[0]}:{scheduler_port}, + and workers at: {hostnames[1:]}:{scheduler_port}...""" ) else: + hostnames = None logger.info(f"Starting Dask LocalCluster with scheduler at: {scheduler_address}:{scheduler_port}...") client = setup_dask( - address=scheduler_address, port=scheduler_port, hosts=dask_hosts, n_workers=dask_workers, processes=processes + dashboard_port=dashboard_port, + hostnames=hostnames, + memory_limit=memory_limit, + n_workers=dask_workers, + nthreads=dask_threads, + processes=processes, + scheduler_address=scheduler_address, + scheduler_port=scheduler_port, ) logger.info(f"Dask dashboard available at: {client.dashboard_link}") @@ -65,15 +76,36 @@ def dask_executor( client.shutdown() -def setup_dask(address: str, port: int, hosts: List[str], n_workers: int, processes: bool): +def setup_dask( + dashboard_port: int, + hostnames: Union(List[str], None), + memory_limit, + n_workers: int, + nthreads: int, + processes: bool, + scheduler_address: str, + scheduler_port: int, +): logger = getLogger("Cluster") logger.info("Starting clutser...") - if hosts: - cluster = SSHCluster(hosts=hosts, scheduler_port=port, n_workers=n_workers) + if hostnames: + cluster = SSHCluster( + hosts=hostnames, + scheduler_options={"port": scheduler_port, "dashboard_address": f":{dashboard_port}"}, + worker_options={"n_workers": n_workers, "nthreads": nthreads, "memory_limit": memory_limit}, + ) else: - cluster = LocalCluster(host=address, scheduler_port=port, n_workers=n_workers, processes=processes) + cluster = LocalCluster( + dashboard_address=f":{dashboard_port}", + host=scheduler_address, + memory_limit=memory_limit, + n_workers=n_workers, + processes=processes, + scheduler_port=scheduler_port, + threads_per_worker=nthreads, + ) logger.info(f"Cluster started with config: {cluster}") From 8a5aacab7128607037534299379e3b57f797a7c2 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Tue, 27 Feb 2024 15:37:26 -0800 Subject: [PATCH 13/17] cluster misspelled --- src/maggma/cli/dask_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/cli/dask_executor.py b/src/maggma/cli/dask_executor.py index 57097266b..b896c004c 100644 --- a/src/maggma/cli/dask_executor.py +++ b/src/maggma/cli/dask_executor.py @@ -88,7 +88,7 @@ def setup_dask( ): logger = getLogger("Cluster") - logger.info("Starting clutser...") + logger.info("Starting cluster...") if hostnames: cluster = SSHCluster( From 73737685ce3e65c2c948cf281422c3792aaeb3c8 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Thu, 29 Feb 2024 10:31:01 -0800 Subject: [PATCH 14/17] Wrong brackets for Union type --- src/maggma/cli/dask_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/cli/dask_executor.py b/src/maggma/cli/dask_executor.py index b896c004c..2469bfd97 100644 --- a/src/maggma/cli/dask_executor.py +++ b/src/maggma/cli/dask_executor.py @@ -78,7 +78,7 @@ def dask_executor( def setup_dask( dashboard_port: int, - hostnames: Union(List[str], None), + hostnames: Union[List[str], None], memory_limit, n_workers: int, nthreads: int, From 05597d92d479e68eb048a266ceb261e625cfa5dc Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Wed, 6 Mar 2024 11:35:01 -0800 Subject: [PATCH 15/17] Settings thread count to 0 is deprecated --- src/maggma/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index 4cb2b4ee2..cb56df8e0 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -105,7 +105,7 @@ class BrokerExcepton(Exception): ) @click.option( "--dask-threads", - default=0, + default=None, type=int, help="""Number of threads per worker process. Defaults to number of cores divided by the number of From a28f00196c5da51c43ce8dc8a8b9fae9f4ff355c Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Wed, 6 Mar 2024 12:22:12 -0800 Subject: [PATCH 16/17] add builder class name to dask task key, tokenize index of each chunk for use as post-fix for dask task key --- src/maggma/cli/dask_executor.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/maggma/cli/dask_executor.py b/src/maggma/cli/dask_executor.py index 2469bfd97..ba69e9d93 100644 --- a/src/maggma/cli/dask_executor.py +++ b/src/maggma/cli/dask_executor.py @@ -60,15 +60,23 @@ def dask_executor( logger.info(f"Dask dashboard available at: {client.dashboard_link}") for builder in builders: - logger.info(f"Working on {builder.__class__.__name__}") + builder_name = builder.__class__.__name__ + logger.info(f"Working on {builder_name}") builder.connect() items = builder.get_items() task_graph = [] - for chunk in items: - docs = dask.delayed(builder.get_processed_docs)(chunk) - built_docs = dask.delayed(builder.process_item)(docs) - update_store = dask.delayed(builder.update_targets)(built_docs) + for idx, chunk in enumerate(items): + chunk_token = dask.base.tokenize(idx) + docs = dask.delayed(builder.get_processed_docs)( + chunk, dask_key_name=f"{builder_name}.get_processed_docs-" + chunk_token + ) + built_docs = dask.delayed(builder.process_item)( + docs, dask_key_name=f"{builder_name}.process_item-" + chunk_token + ) + update_store = dask.delayed(builder.update_targets)( + built_docs, dask_key_name=f"{builder_name}.update_targets-" + chunk_token + ) task_graph.append(update_store) dask.compute(*task_graph) From 245cf3ef69611e8cd654118346ad04f253455f57 Mon Sep 17 00:00:00 2001 From: Tyler Mathis <35553152+tsmathis@users.noreply.github.com> Date: Wed, 6 Mar 2024 14:01:15 -0800 Subject: [PATCH 17/17] add option for saving diagnostics report from dask dashboard --- src/maggma/cli/__init__.py | 6 ++++ src/maggma/cli/dask_executor.py | 64 +++++++++++++++++++-------------- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/src/maggma/cli/__init__.py b/src/maggma/cli/__init__.py index cb56df8e0..daf6f6f03 100644 --- a/src/maggma/cli/__init__.py +++ b/src/maggma/cli/__init__.py @@ -121,6 +121,8 @@ class BrokerExcepton(Exception): string (like '5GB' or '5000M'), 'auto', or 0, for no memory management""", ) +@click.option("--perf-report", default=False, is_flag=True, help="Turn on to save diagnostic report for Dask dashboard") +@click.option("--report-name", default="dask_report.html", help="File name for Dask diagnostic report") @click.option( "--scheduler-address", type=str, @@ -179,6 +181,8 @@ def run( hostfile, memory_limit, processes, + perf_report, + report_name, scheduler_address, scheduler_port, queue_prefix, @@ -295,6 +299,8 @@ def run( hostfile=hostfile, memory_limit=memory_limit, processes=-processes, + perf_report=perf_report, + report_name=report_name, scheduler_address=scheduler_address, scheduler_port=scheduler_port, ) diff --git a/src/maggma/cli/dask_executor.py b/src/maggma/cli/dask_executor.py index ba69e9d93..7f315ffde 100644 --- a/src/maggma/cli/dask_executor.py +++ b/src/maggma/cli/dask_executor.py @@ -9,7 +9,7 @@ try: import dask - from dask.distributed import LocalCluster, SSHCluster + from dask.distributed import LocalCluster, SSHCluster, performance_report except ImportError: raise ImportError("Both dask and distributed are required to use Dask as a broker") @@ -24,6 +24,8 @@ def dask_executor( dask_workers: int, memory_limit, processes: bool, + perf_report: bool, + report_name: str, scheduler_address: str, scheduler_port: int, ): @@ -32,19 +34,19 @@ def dask_executor( that will be submitted to a Dask scheduler for distributed processing on a Dask cluster. """ - logger = getLogger("Scheduler") + scheduler_logger = getLogger("Scheduler") if hostfile: with open(hostfile) as file: hostnames = file.read().split() - logger.info( + scheduler_logger.info( f"""Starting distributed Dask cluster, with scheduler at {hostnames[0]}:{scheduler_port}, and workers at: {hostnames[1:]}:{scheduler_port}...""" ) else: hostnames = None - logger.info(f"Starting Dask LocalCluster with scheduler at: {scheduler_address}:{scheduler_port}...") + scheduler_logger.info(f"Starting Dask LocalCluster with scheduler at: {scheduler_address}:{scheduler_port}...") client = setup_dask( dashboard_port=dashboard_port, @@ -57,29 +59,13 @@ def dask_executor( scheduler_port=scheduler_port, ) - logger.info(f"Dask dashboard available at: {client.dashboard_link}") + scheduler_logger.info(f"Dask dashboard available at: {client.dashboard_link}") - for builder in builders: - builder_name = builder.__class__.__name__ - logger.info(f"Working on {builder_name}") - builder.connect() - items = builder.get_items() - - task_graph = [] - for idx, chunk in enumerate(items): - chunk_token = dask.base.tokenize(idx) - docs = dask.delayed(builder.get_processed_docs)( - chunk, dask_key_name=f"{builder_name}.get_processed_docs-" + chunk_token - ) - built_docs = dask.delayed(builder.process_item)( - docs, dask_key_name=f"{builder_name}.process_item-" + chunk_token - ) - update_store = dask.delayed(builder.update_targets)( - built_docs, dask_key_name=f"{builder_name}.update_targets-" + chunk_token - ) - task_graph.append(update_store) - - dask.compute(*task_graph) + if perf_report: + with performance_report(report_name): + run_builders(builders, scheduler_logger) + else: + run_builders(builders, scheduler_logger) client.shutdown() @@ -118,3 +104,29 @@ def setup_dask( logger.info(f"Cluster started with config: {cluster}") return cluster.get_client() + + +def run_builders(builders, logger): + for builder in builders: + builder_name = builder.__class__.__name__ + logger.info(f"Working on {builder_name}") + + builder.connect() + items = builder.get_items() + + task_graph = [] + + for idx, chunk in enumerate(items): + chunk_token = dask.base.tokenize(idx) + docs = dask.delayed(builder.get_processed_docs)( + chunk, dask_key_name=f"{builder_name}.get_processed_docs-" + chunk_token + ) + built_docs = dask.delayed(builder.process_item)( + docs, dask_key_name=f"{builder_name}.process_item-" + chunk_token + ) + update_store = dask.delayed(builder.update_targets)( + built_docs, dask_key_name=f"{builder_name}.update_targets-" + chunk_token + ) + task_graph.append(update_store) + + dask.compute(*task_graph)