Multiple DSS engines and multithreading#
Parallel OpenDSS/AltDSS engines in the same process
This is a slightly modified version, ported directly from DSS-Python. The _run
function is what changes mostly, visit https://dss-extensions.org/DSS-Python/examples/Multithreading.html for a few more comments and a comparison with multiprocessing
and notes on the internal parallel-machine implementation.
This version uses only the 13Bus circuit, but can be easily extended.
General recommendations apply, like using a good memory allocator on Linux.
import os
import threading
from time import perf_counter
import numpy as np
from opendssdirect.OpenDSSDirect import OpenDSSDirect # for type hints
from opendssdirect import dss
# Adjust this if required
PATH_TO_DSS_SCRIPT = './../../tests/data/13Bus/IEEE13Nodeckt.dss'
Remember that OpenDSS changes the working directory of the process? To use multiple instances, we need to disable that behavior. When disabled, the AltDSS engine will track the required paths without changing the working directory.
dss.Basic.AllowChangeDir(False)
Now we need to create a list of cases to run. Let’s try 100 samples of LoadMult between 0.9 and 1.1:
fns = [PATH_TO_DSS_SCRIPT]
cases = []
for fn in fns:
for load_mult in np.linspace(0.9, 1.1, 100):
cases.append((fn, load_mult))
cases_to_run_threads = list(cases)
cases_to_run_seq = list(cases)
Decide how many instances based on the number of cases and the CPU count. For processors with Hyper-threading, it might be best to run with the number of real cores. It all depends on the processor architecture, such as cache and memory bandwidth, and the characteristics of the DSS circuit being used. It is recommended to run a few tests to select the optional number of threads, especially for large scale circuits (IEEE13 is tiny).
# Use the number of threads as CPU count, number of cases
num = min(len(cases), os.cpu_count())
# Initialize a new context for each of the threads
ctxs = [dss.NewContext() for n in range(num)]
print(f"Using {len(ctxs)} DSS contexts")
Using 32 DSS contexts
Some dicts to keep the results:
tresults = {}
tconverged = {}
sresults = {}
sconverged = {}
This is the worker function that will run the workload, both in the threads and sequentially.
Note that it references some structures shared as input to the function. Since there is a GIL, we don’t need to use locks. You may need to adjust that for more general usage.
Uncomment the print
calls for some visual feedback while running.
def _run(ctx: OpenDSSDirect, case_list, converged, results):
tname = threading.current_thread().name
while case_list:
fn, load_mult = case_list.pop()
ctx.Text.Command('clear')
try:
ctx.Text.Command(f'redirect "{fn}"')
ctx.Solution.LoadMult(load_mult)
# print(f'{tname}: Running "{fn}", circuit "{ctx.Circuit.Name()}", mult={load_mult}')
ctx.Text.Command('Solve mode=daily number=5000')
except Exception as ex:
print('ERROR:', tname, (fn, load_mult))
print(' ', ex.args)
# print(f'{tname}: Done "{fn}" (LoadMult={load_mult}), circuit "{ctx.Circuit.Name()}"')
converged[(fn, load_mult)] = ctx.Solution.Converged()
results[(fn, load_mult)] = ctx.Circuit.AllBusVolts()
With all in place, let’s create and run the threads until completion.
The threads will consume input scenarios from cases_to_run_threads
, outputting to tconverged
and tresults
.
t0 = perf_counter()
threads = []
for ctx in ctxs:
t = threading.Thread(target=_run, args=(ctx, cases_to_run_threads, tconverged, tresults))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
t1 = perf_counter()
# Check if all solutions converged
assert all(tconverged.values())
dt_thread = (t1 - t0)
print(f'Done in {dt_thread:.3f} s with {num} threads')
Done in 0.283 s with 32 threads
For a comparison, let’s also run the same cases sequentially in a simple thread.
t0 = perf_counter()
_run(dss, cases_to_run_seq, sconverged, sresults)
t1 = perf_counter()
dt_seq = (t1 - t0)
print(f'Done in {dt_seq:.3f} s sequentially')
Done in 4.427 s sequentially
Check if each scenario has the same results whether it ran in multiple threads or a single thread
for case in cases:
np.testing.assert_equal(sresults[case], tresults[case])
That’s it! For modern processors, the difference of running in a single thread vs. multiple can be significant.
Traditionally, a lot of OpenDSS users ran tasks in parallel with the multiprocessing
module or other tools like Dask.Distributed, but depending on the use-case, multithreading can present interesting advantages, such as easier use of shared resources.
For an approach compatible with the current official OpenDSS versions, using either multiprocessing
or the dss.Parallel
functions would be required.