Setting Up the Script
Writing code using cvpl_tools requires some setup code that configures the dask library and other utilities we need for our image processing pipeline. Below briefly describes the setup.
Dask Cluster and temporary directory
cvpl_tools depends on dask library, where many functions of cvpl_tools takes dask array as input or output.
Dask is a multithreaded and distributed computing library, in which temporary results may not fit in
memory. In such cases, they are written in the temporary directory set in
the dask config’s temporary_directory
variable. When working on HPC system on Compute Canada,
make sure this path is set to a /scratch directory where number of file allowed to be created is large
enough.
Dask Client setup is described in the Dask quickstart page. See this SO post to determine if you need to initialize a client or not. Below examples are modified from the simplest setup in the quickstart guide.
if __name__ == '__main__':
import dask
import dask.config
import dask.array as da
from dask.distributed import Client
TMP_PATH = "path/to/tmp/dir" # CHANGE THIS TO AN EMPTY PATH YOU WOULD LIKE TO USE FOR CACHING
with dask.config.set({'temporary_directory': TMP_PATH}):
client = Client(threads_per_worker=6, n_workers=1)
print((da.zeros((3, 3)) + 1).compute().sum().item()) # will output 9
The line if __name__ == '__main__':
ensures only the main thread executes the task creation
code. Client()
spawns worker threads that executes the code from top. If the guard if is absent,
re-execution of the Client creation will crash the program.
Next problem is relevant to contention of thread resources between dask and Napari:
if __name__ == '__main__':
import dask
import dask.config
import dask.array as da
import napari
from dask.distributed import Client
TMP_PATH = "path/to/tmp/dir"
with dask.config.set({'temporary_directory': TMP_PATH}):
client = Client(threads_per_worker=6, n_workers=1)
viewer = napari.Viewer(ndisplay=2)
viewer.add_image(...) # add a large ome zarr image
viewer.show(block=True) # here all threads available will be used to fetch data to show image
Napari utilizes all threads on the current process to load image chunks when we navigation camera
across chunks. With typical dask setup, however, threads spawned by the Client()
take up all
threads except the main thread. If viewer.show(block=True)
is called
on the main thread, then Napari viewer does not get the rest of the threads, and loading speed is slow.
The issue is not in adding the image to the viewer, but in the call to
viewer.show()
where the loading happens. It’s slow regardless if the value
of threads_per_worker
is 1 or more.
Calling client.close()
to release threads before viewer.show(block=True)
solves the problem:
viewer.add_image(...) # adding image itself does not take too much time
client.close() # here resources are freed up
viewer.show(block=True) # threads are now available to fetch data to show image
Dask Logging Setup
Distributed logging setup. Python’s logging module is supported by Dask but I’ve had some issues to get it right, so I looked and found Dask provides a simple strategy for debug logging as described in this page. The solution is to use the same logging as usual for the main threads, and use dask.distributed.print to print debugging messages if inside a worker thread. For convenience I also echo the stdout and stderr outputs into separate logfiles so they will persist even if you accidentally close the command window. Below is an example:
if __name__ == '__main__':
import cvpl_tools.tools.fs as tlfs
import numpy as np
from dask.distributed import print as dprint
logfile_stdout = open('log_stdout.txt', mode='w')
logfile_stderr = open('log_stderr.txt', mode='w')
sys.stdout = tlfs.MultiOutputStream(sys.stdout, logfile_stdout)
sys.stderr = tlfs.MultiOutputStream(sys.stderr, logfile_stderr)
import dask
import dask.config
import dask.array as da
from dask.distributed import Client
TMP_PATH = "path/to/tmp/dir"
with dask.config.set({'temporary_directory': TMP_PATH}):
client = Client(threads_per_worker=6, n_workers=1)
print((da.zeros((3, 3)) + 1).compute().sum().item()) # will output 9
def map_fn(block, block_info=None):
dprint(f'map_fn is called with input {block}')
return block + 1
arr = da.zeros((3, 3), dtype=np.uint8).map_blocks(map_fn, meta=np.array(tuple(), dtype=np.uint8))
print('result is:', arr.compute())
After running this program, you should see outputs in both the command window and the log_stdout.txt and log_stderr.txt files under your working directory.
cache directory
Different from Dask’s temporary directory, cvpl_tools.tools.fs provides intermediate result
caching APIs. A multi-step segmentation pipeline may produce many intermediate results, for some of them we
may discard once computed, and for the others (like the final output) we may want to cache them on the disk
for access later without having to redo the computation. In order to cache the result, we need a fixed path
that do not change across program executions. The cvpl_tools.tools.fs.cdir_init
and
cvpl_tools.tools.fs.cdir_commit
and ones used to commit and check if the result exist or needs to be
computed from scratch.
In a program, we may cache hierarchically, where there is a root cache directory that is created or loaded when the program starts to run, and every cache directory contains subdirectories and step-specific caches.
if __name__ == '__main__':
import cvpl_tools.tools.fs as tlfs
# Use case #1. Create a data directory for caching computation results
cache_path = f'{TMP_PATH}/CacheDirectory/some_cache_path'
is_commit = tlfs.cdir_init(cache_path).commit
if not is_commit:
pass # PUT CODE HERE: Now write your data into cache_path.url and load it back later
# Use case #2. Create a sub-directory and pass it to other processes for caching
def multi_step_computation(cache_at: str):
cache_path1 = f'{cache_path}/A'
is_commit1 = tlfs.cdir_init(cache_path1).commit
if not is_commit1:
A = computeA()
save(cache_path1, A) # note here cache_path1 is a existing directory, not a file
A = load(cache_path1)
cache_path2 = f'{cache_path}/B'
is_commit2 = tlfs.cdir_init(cache_path2).commit
if not is_commit2:
B = computeBFromA()
save(cache_path2, B) # note here cache_path1 is a existing directory, not a file
B = load(cache_path2)
return B
result = multi_step_computation(cache_at=f'{cache_path}/multi_step_cache')
After running the above code once, caching files will be created. The second time the code is run, the computation steps will be skipped. This sort of hierarchical caching is convenient for working with complex processes that can be hierarchically broken down to smaller and simpler compute steps.
A Quicker Setup
You can use the following code to get a quick start locally. This is currently pretty bare-boned, but should allow you
to run any dask-computation defined in the cvpl_tools library and your custom SegProcess
functions. The
qsetup.py code automatically creates two log files in your current directory, containing the program’s stdout and
stderr, since those capture Dask’s distributed print function’s text output.
if __name__ == '__main__':
import cvpl_tools.im.process.qsetup as qsetup
import napari
# IMPORT YOUR LIBRARIES HERE
TMP_PATH = "C:/ProgrammingTools/ComputerVision/RobartsResearch/data/lightsheet/tmp"
plc = qsetup.PLComponents(TMP_PATH,
'CacheDirectory',
get_client=lambda: Client(threads_per_worker=12, n_workers=1))
viewer = napari.Viewer(ndisplay=2)
# DO DASK COMPUTATION, AND SHOW RESULTS IN viewer
plc.close()
viewer.show(block=True)
If anyone would like more features with this setup, please let me know.