Multiple DSS engines, multithreading vs. multiprocessing#
Parallel OpenDSS/AltDSS engines in the same process
General recommendations apply, like using a good memory allocator on Linux.
For a similar example using OpenDSSDirect.py (the _run
function is what changes): https://dss-extensions.org/OpenDSSDirect.py/notebooks/Multithreading
If you need to create multiple, independent OpenDSS engines, DSS.NewContext() can be used. This was introduced in DSS-Python 0.12.0, after a lot of work in the underlying DSS C-API engine. This feature is also unique to DSS-Extensions.
Although multi-threading in Python is not great due to the the GIL (Global Interpreter Lock), it is important to note that DSS-Python releases when API calls are made, i.e. multiple Python threads that spend most time in power flow solutions should still be performant.
Check other DSS-Extensions for implementations of multi-threading using multiple DSS contexts in other programming languages:
If you are not comfortable with threads, we recommend using Python’s multiprocessing, especially for larger circuits. multiprocessing
can be used with most OpenDSS implementations. We also provide a basic example here in the OpenDSS forum, besides the example in this document.
General setup#
import os
import threading
from time import perf_counter
import numpy as np
from dss import dss, IDSS # IDSS for type hints and better autocomplete
NUM_SOLVE
below is used as the number of solution for each scenario. Read more at the end of the notebook.
NUM_SOLVE = 96
Remember that OpenDSS changes the working directory of the process? To use multiple instances, we need to disable that behavior. When disabled, the AltDSS engine will track the required paths without changing the working directory.
dss.AllowChangeDir = False
We usually don’t want the editor popping up and/or interrupting this kind of batch run.
dss.AllowForms = False
dss.AllowEditor = False
Let’s provide some circuits to run. Remember to adjust BASE_DIR
BASE_DIR = './electricdss-tst'
fns = [
f"{BASE_DIR}/Version8/Distrib/EPRITestCircuits/epri_dpv/M1/Master_NoPV.dss",
f"{BASE_DIR}/Version8/Distrib/EPRITestCircuits/epri_dpv/K1/Master_NoPV.dss",
f"{BASE_DIR}/Version8/Distrib/EPRITestCircuits/epri_dpv/J1/Master_withPV.dss",
f"{BASE_DIR}/Version8/Distrib/IEEETestCases/8500-Node/Master-unbal.dss",
f"{BASE_DIR}/Version8/Distrib/IEEETestCases/NEVTestCase/NEVMASTER.DSS",
]
Assemble a list of all scenarios to run:
cases = []
for fn in fns:
for load_mult in (0.9, 0.95, 1.0, 1.05, 1.1):
cases.append((fn, load_mult))
Decide how many instances based on the number of cases and the CPU count. For processors with Hyper-threading, it might be best to run with the number of real cores. It all depends on the processor architecture, such as cache and memory bandwidth, and the characteristics of the DSS circuit being used. It is recommended to run a few tests to select the optional number of threads, especially for large scale circuits.
# Use the number of threads as CPU count, number of cases
num = min(len(cases), os.cpu_count())
# Initialize a new context for each of the threads
ctxs = [dss.NewContext() for n in range(num)]
print(f"Using {len(ctxs)} DSS contexts")
Using 25 DSS contexts
Some dicts to keep the results:
tresults = {}
tconverged = {}
sresults = {}
sconverged = {}
This is the worker function that will run the workload, both in the threads and sequentially.
Note that it references some structures shared as input to the function. Since there is a GIL, we don’t need to use locks. You may need to adjust that for more general usage.
Uncomment the print
calls for some visual feedback while running.
def _run(ctx: IDSS, case_list, converged, results):
tname = threading.current_thread().name
circ = ctx.ActiveCircuit
while case_list:
fn, load_mult = case_list.pop()
ctx('clear')
try:
ctx(f'redirect "{fn}"')
circ.Solution.LoadMult = load_mult
# print(f'{tname}: Running "{fn}", circuit "{ctx.ActiveCircuit.Name}", mult={load_mult}')
ctx(f'Solve mode=daily number={NUM_SOLVE}')
except Exception as ex:
print('ERROR:', tname, (fn, load_mult))
print(' ', ex.args)
# print(f'{tname}: Done "{fn}" (LoadMult={load_mult}), circuit "{circ.Name}"')
converged[(fn, load_mult)] = circ.Solution.Converged
# Just get the voltages to compare later; an actual study could get other
# useful values or calculate specific indices for each scenario
results[(fn, load_mult)] = circ.AllBusVolts
Multithreaded run#
With all in place, let’s create and run the threads until completion.
The threads will consume input scenarios from cases_to_run_threads
, outputting to tconverged
and tresults
.
# Copy the list of scenarios
cases_to_run_threads = list(cases)
t0 = perf_counter()
threads = []
for ctx in ctxs:
t = threading.Thread(target=_run, args=(ctx, cases_to_run_threads, tconverged, tresults))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
t1 = perf_counter()
# Check if all solutions converged
assert all(tconverged.values())
dt_thread = (t1 - t0)
print(f'Done in {dt_thread:.3f} s with {num} threads')
Done in 0.746 s with 25 threads
Single-threaded / sequential run#
For a comparison, let’s also run the same cases sequentially in a simple thread.
# Copy the list of scenarios
cases_to_run_seq = list(cases)
t0 = perf_counter()
_run(dss, cases_to_run_seq, sconverged, sresults)
t1 = perf_counter()
dt_seq = (t1 - t0)
print(f'Done in {dt_seq:.3f} s sequentially')
Done in 3.935 s sequentially
Validation of the results#
Check if each scenario has the same results whether it ran in multiple threads or a single thread
for case in cases:
np.testing.assert_equal(sresults[case], tresults[case])
That’s it! For modern processors, the difference of running in a single thread vs. multiple can be significant.
Traditionally, a lot of OpenDSS users ran tasks in parallel with the multiprocessing
module or other tools like Dask.Distributed, but depending on the use-case, multithreading can present interesting advantages, such as easier use of shared resources.
For an approach compatible with the current official OpenDSS versions, using either multiprocessing
or the dss.Parallel
functions would be required.
Using multiprocessing
#
For completeness, let’s also run using the multiprocessing
module, which is part of Python’s standard library.
Here, instead of modifying the structs directly, we have to keep in mind that the input data will be serialized, and so will the output data from the function.
def _run_mp(fn, load_mult):
from dss import dss as ctx
circ = ctx.ActiveCircuit
ctx('clear')
try:
ctx(f'redirect "{fn}"')
circ.Solution.LoadMult = load_mult
# print(f'Running "{fn}", circuit "{ctx.ActiveCircuit.Name}", mult={load_mult}')
ctx(f'Solve mode=daily number={NUM_SOLVE}')
except Exception as ex:
print('ERROR:', (fn, load_mult))
print(' ', ex.args)
# print(f'Done "{fn}" (LoadMult={load_mult}), circuit "{circ.Name}"')
return (fn, load_mult, circ.Solution.Converged, circ.AllBusVolts)
Let’s run using the same number workers as before. After running, assemble back the results in dicts.
import multiprocessing as mp
t0 = perf_counter()
pool = mp.Pool(processes=num)
results_mp = pool.starmap(_run_mp, cases)
# Assemble back the results
mpresults = {(item[0], item[1]): item[3] for item in results_mp}
mpconverged = {(item[0], item[1]): item[2] for item in results_mp}
t1 = perf_counter()
dt_mp = (t1 - t0)
print(f'Done in {dt_mp:.3f} s using {num} processes')
Done in 0.870 s using 25 processes
And validate the results, just to be sure:
for case in cases:
np.testing.assert_equal(sresults[case], mpresults[case])
Performance notes#
If each task is short-lived, keeping the processes alive and reusing the engines is more efficient, but that’s not the point of this example. Try rerunning this notebook setting NUM_SOLVE=960
(or a larger number) vs. NUM_SOLVE=96
. At some point, the task is long enough that it doesn’t matter if processes or threads are used, since the overhead is diluted. That is also true when using/comparing the scenarios that can be run using the internal Parallel
interface from OpenDSS.
For shorter tasks and small circuits, threads should always win if most of the time is spent in the DSS engine, since that releases the Python GIL.
Parallel
/PM implementation#
The DSS-Extensions implementation of the OpenDSS engine does include support for the parallel-machine (PM) concepts of the official implementation. The internals are completely different, and in fact result in a wider range of scenarios where it could be used when compared to the official version. Although useful in other use-cases like running through COM in Excel/VBA or MATLAB, using thread and multiple DSS engines explicitly provide more control. Sometimes threading is not ideal and using multiprocessing is preferred anyway, so there is little space where the PM alternative would be recommended with DSS-Python.
For the Parallel
interface and the parallel-machine commands from OpenDSS, check the official documentation, and the API docs for the interface in DSS-Python.
Currently, DSS-Extensions do not implement the diakoptics methods anymore. The methods were disabled and will be reenabled after the developers on DSS-Extensions have an opportunity to address it.
Future expectations#
Specific to DSS-Extensions, there could be new features to enable automatic sharing of some data. Some of the required infrastructure is already in place, but there are tasks with higher priority before a system is fully implemented.
There has been recent changes in Python and more are expected for Python 3.13 (expected by October 2024, first beta in May 2024) which would enable better performance when using multiple threads by using subinterpreters (see PEP 734). When the toolset is mature, the projects on DSS-Extensions will try to integrate it.