Multiple DSS engines and multithreading

Multiple DSS engines and multithreading#

Parallel OpenDSS/AltDSS engines in the same process

This is a slightly modified version, ported directly from DSS-Python. The _run function is what changes mostly, visit https://dss-extensions.org/DSS-Python/examples/Multithreading.html for a few more comments and a comparison with multiprocessing and notes on the internal parallel-machine implementation.

This version uses only the 13Bus circuit, but can be easily extended.

General recommendations apply, like using a good memory allocator on Linux.

import os
import threading
from time import perf_counter
import numpy as np

from opendssdirect.OpenDSSDirect import OpenDSSDirect # for type hints
from opendssdirect import dss

# Adjust this if required
PATH_TO_DSS_SCRIPT = './../../tests/data/13Bus/IEEE13Nodeckt.dss'

Remember that OpenDSS changes the working directory of the process? To use multiple instances, we need to disable that behavior. When disabled, the AltDSS engine will track the required paths without changing the working directory.

dss.Basic.AllowChangeDir(False)

Now we need to create a list of cases to run. Let’s try 100 samples of LoadMult between 0.9 and 1.1:

fns = [PATH_TO_DSS_SCRIPT]

cases = []
for fn in fns:
    for load_mult in np.linspace(0.9, 1.1, 100):
        cases.append((fn, load_mult))

cases_to_run_threads = list(cases)
cases_to_run_seq = list(cases)

Decide how many instances based on the number of cases and the CPU count. For processors with Hyper-threading, it might be best to run with the number of real cores. It all depends on the processor architecture, such as cache and memory bandwidth, and the characteristics of the DSS circuit being used. It is recommended to run a few tests to select the optional number of threads, especially for large scale circuits (IEEE13 is tiny).

# Use the number of threads as CPU count, number of cases
num = min(len(cases), os.cpu_count())

# Initialize a new context for each of the threads
ctxs = [dss.NewContext() for n in range(num)]
print(f"Using {len(ctxs)} DSS contexts")
Using 32 DSS contexts

Some dicts to keep the results:

tresults = {}
tconverged = {}
sresults = {}
sconverged = {}

This is the worker function that will run the workload, both in the threads and sequentially.

Note that it references some structures shared as input to the function. Since there is a GIL, we don’t need to use locks. You may need to adjust that for more general usage.

Uncomment the print calls for some visual feedback while running.

def _run(ctx: OpenDSSDirect, case_list, converged, results):
    tname = threading.current_thread().name
    while case_list:
        fn, load_mult = case_list.pop()
        ctx.Text.Command('clear')
        try:
            ctx.Text.Command(f'redirect "{fn}"')
            ctx.Solution.LoadMult(load_mult)
            # print(f'{tname}: Running "{fn}", circuit "{ctx.Circuit.Name()}", mult={load_mult}')
            ctx.Text.Command('Solve mode=daily number=5000')
        except Exception as ex:
            print('ERROR:', tname, (fn, load_mult))
            print('      ', ex.args)

        # print(f'{tname}: Done "{fn}" (LoadMult={load_mult}), circuit "{ctx.Circuit.Name()}"')
        converged[(fn, load_mult)] = ctx.Solution.Converged()
        results[(fn, load_mult)] = ctx.Circuit.AllBusVolts()

With all in place, let’s create and run the threads until completion.

The threads will consume input scenarios from cases_to_run_threads, outputting to tconverged and tresults.

t0 = perf_counter()
threads = []
for ctx in ctxs:
    t = threading.Thread(target=_run, args=(ctx, cases_to_run_threads, tconverged, tresults))
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()

t1 = perf_counter()

# Check if all solutions converged
assert all(tconverged.values())

dt_thread = (t1 - t0)
print(f'Done in {dt_thread:.3f} s with {num} threads')
Done in 0.283 s with 32 threads

For a comparison, let’s also run the same cases sequentially in a simple thread.

t0 = perf_counter()

_run(dss, cases_to_run_seq, sconverged, sresults)

t1 = perf_counter()
dt_seq = (t1 - t0)
print(f'Done in {dt_seq:.3f} s sequentially')
Done in 4.427 s sequentially

Check if each scenario has the same results whether it ran in multiple threads or a single thread

for case in cases:
    np.testing.assert_equal(sresults[case], tresults[case])

That’s it! For modern processors, the difference of running in a single thread vs. multiple can be significant.

Traditionally, a lot of OpenDSS users ran tasks in parallel with the multiprocessing module or other tools like Dask.Distributed, but depending on the use-case, multithreading can present interesting advantages, such as easier use of shared resources.

For an approach compatible with the current official OpenDSS versions, using either multiprocessing or the dss.Parallel functions would be required.