From 8107a9f647c5cfd118536b24cda999457842eea7 Mon Sep 17 00:00:00 2001 From: Ozbolt Menegatti Date: Sun, 17 Feb 2019 15:55:17 +0100 Subject: [PATCH] Adding parallel execution using subprocesses --- wani.py | 46 +++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 43 insertions(+), 3 deletions(-) diff --git a/wani.py b/wani.py index 44ba831..61f477b 100644 --- a/wani.py +++ b/wani.py @@ -7,6 +7,9 @@ import logging import argparse import pickle import time +import subprocess +import concurrent.futures +import tempfile from msd_translate import MSD_TRANSLATE @@ -968,9 +971,44 @@ def main(input_file, structures_file, args): colocation_ids = ColocationIds() matches = {s.id: [] for s in structures} - for words in load_files(args): - new_matches = match_file(words, structures) - matches = colocation_ids.merge_matches(matches, new_matches) + if args.parallel: + num_parallel = int(args.parallel) + + # make temporary directory to hold temporary files + with tempfile.TemporaryDirectory() as tmpdirname: + cmd = sys.argv + for inpt in args.input: + if inpt in cmd: + cmd.remove(inpt) + + # remove "--parallel X" + pidx = cmd.index('--parallel') + del cmd[pidx] + del cmd[pidx] + + def func(n): + cmdn = [sys.executable] + cmd + [args.input[n], "--match-to-file", "{}/{}.p".format(tmpdirname, n)] + subprocess.check_call(cmdn) + return n + + # use ThreadPoolExecuter to run subprocesses in parallel using py threads + with concurrent.futures.ThreadPoolExecutor(max_workers=num_parallel) as executor: + # fancy interface to wait for threads to finish + for id_input in executor.map(func, [i for i, _ in enumerate(args.input)]): + with open("{}/{}.p".format(tmpdirname, id_input), "rb") as fp: + new_matches = pickle.load(fp) + matches = colocation_ids.merge_matches(matches, new_matches) + + else: + for words in load_files(args): + new_matches = match_file(words, structures) + # just save to temporary file, used for children of a parallel process + if args.match_to_file is not None: + with open(args.match_to_file, "wb") as fp: + pickle.dump(new_matches, fp) + return + else: + matches = colocation_ids.merge_matches(matches, new_matches) writer.write_out(matches, structures, colocation_ids) @@ -993,6 +1031,8 @@ if __name__ == '__main__': parser.add_argument('--multiple-output', help='Generate one output for each syntactic structure', action='store_true') parser.add_argument('--pc-tag', help='Tag for separators, usually pc or c', default="pc") + parser.add_argument('--parallel', help='Run in multiple processes, should speed things up') + parser.add_argument('--match-to-file', help='Do not use!') args = parser.parse_args() logging.basicConfig(stream=sys.stderr, level=args.verbose.upper())