From 86e56767ddb72b83adcb144c32373b3e92e215dc Mon Sep 17 00:00:00 2001 From: voje Date: Mon, 15 Apr 2019 00:25:26 +0200 Subject: [PATCH] added parallel processing --- corpusparser/Parser.py | 39 +++++------ corpusparser/main.py | 150 ++++++++++++++++------------------------- 2 files changed, 76 insertions(+), 113 deletions(-) diff --git a/corpusparser/Parser.py b/corpusparser/Parser.py index 531e23c..f45247a 100644 --- a/corpusparser/Parser.py +++ b/corpusparser/Parser.py @@ -3,6 +3,7 @@ import re import json from lxml import etree import logging +import time logging.basicConfig(level=logging.INFO) @@ -10,17 +11,13 @@ logging.basicConfig(level=logging.INFO) # Create an iterator that outputs resulting sentences (python dict format). class Parser(): - def __init__(self, corpus, infiles, logger=None): - - if corpus == "kres": - self.kres_folder = Path(infiles[0]) - self.kres_srl_folder = Path(infiles[1]) - elif corpus == "ssj": - self.ssj_file = Path(infiles[0]) - else: - raise ValueError("Argument corpus should be 'ssj' or 'kres'.") + def __init__(self, corpus, kres_srl_folder=None, logger=None): self.corpus = corpus + + if self.corpus == "kres": + self.kres_srl_folder = kres_srl_folder + self.W_TAGS = ['w'] self.C_TAGS = ['c'] self.S_TAGS = ['S', 'pc'] @@ -94,7 +91,7 @@ class Parser(): def sentence_generator(self): # Using generators so we don't copy a whole corpu around in memory. - # Might be too complicated. Try per-file generator instead. + # Use parse_xml_file() instead for pre-file processing (parallelism?) if self.corpus == "kres": # some logging output @@ -102,23 +99,23 @@ class Parser(): self.n_kres_files = len(list(Path(self.kres_folder).glob('*'))) for xml_file in self.kres_folder.iterdir(): - # self.parse_xml_file(xml_file) self.nth_kres_file += 1 self.logger.info("{} ({}/{})".format( xml_file, self.nth_kres_file, self.n_kres_files)) - yield from self.parse_xml_file(xml_file) + yield from self.xml_file_to_generator(xml_file) else: - yield from self.parse_xml_file(self.ssj_file) - - def kres_to_json_file(self, in_xml_file, out_json_file): - out_buffer = [] - for _, sentence_entry in parser.parser_xml_file(in_xml_file): - out_buffer += [sentence_entry] - - with outfile.open("w") as fp: - json.dump(out_buffer, fp) + yield from self.xml_file_to_generator(self.ssj_file) def parse_xml_file(self, xml_file): + tstart = time.time() + file_data = [] + for tpl in self.xml_file_to_generator(xml_file): + file_data += [tpl[1]] + tend = time.time() + self.logger.info("Parsed {} in {:.4f} s".format(xml_file, tend - tstart)) + return file_data + + def xml_file_to_generator(self, xml_file): # for separate srl links, it will guess the srl file based on # self.kres_srl_folder diff --git a/corpusparser/main.py b/corpusparser/main.py index 05a2577..d5c5bd5 100644 --- a/corpusparser/main.py +++ b/corpusparser/main.py @@ -5,74 +5,30 @@ import logging import json from pymongo import MongoClient import sys +from multiprocessing import Pool logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -## Main handles command line arguments and writing to files / DB. - -def ssj_to_json_file(sentence_generator, outfolder): - # this funciton is based on the fact that files are parsed sequentially - outfolder = Path(outfolder) - outfolder.mkdir(parents=True, exist_ok=True) - outfile = outfolder / "ssj500k.json" - - data_buffer = [] - for s in sentence_generator: - sdata = s[1] - data_buffer += [sdata] - - # outfile = Path(outfile) - with outfile.open("w") as fp: - logger.info("Writing to {}".format(outfile)) - json.dump(data_buffer, fp) - -def kres_to_json_files(sentence_generator, outfolder): - outfolder = Path(outfolder) / "kres_json" - outfolder.mkdir(parents=True, exist_ok=True) - - def write_buffer_to_file(outfile, outfile_buffer): - logger.info("Writing file: {}".format(outfile)) - with outfile.open("w") as fp: - json.dump(outfile_buffer, fp) - - outfile_buffer = None - current_outfile = None - for s in sentence_generator: - infile = s[0] - outfile = outfolder / Path(infile.name.split(".")[0]).with_suffix(".json") - - # parser sequentially parses files; when we're done with a file, write it out - if current_outfile is None: - current_outfile = outfile - outfile_buffer = [] - elif outfile != current_outfile: - write_buffer_to_file(current_outfile, outfile_buffer) - current_outfile = outfile - outfile_buffer = [] - - # update buffer - sdata = s[1] - outfile_buffer += [sdata] - write_buffer_to_file(current_outfile, outfile_buffer) - -def data_to_valdb(sentence_generator, dbaddr, username, password, collection_name): - logger.info("Connecting to: {}".format(dbaddr)) - client = MongoClient( - "mongodb://{}".format(dbaddr), - username=username, - password=password, - authSource="valdb", - authMechanism='SCRAM-SHA-1' - ) - valdb = client.valdb - logger.info("Writing data to {}.".format(collection_name)) - col = valdb[collection_name] - for s in sentence_generator: - sdata = s[1] - # col.insert_one(sdata) - col.update({"sid": sdata["sid"]}, sdata, upsert=True) +n_kres_files = -1 # for logging +# handler for concurrency +def _handle_kres_file_tpl(kres_file_tpl): + kres_file_idx = kres_file_tpl[0] + kres_file = kres_file_tpl[1] + logging.info("Handling {} ({}/{})".format( + kres_file, kres_file_idx, n_kres_files)) + kres_data = kres_parser.parse_xml_file(kres_file) + if args.output == "file": + kres_outdir = outdir / "kres_json" + kres_outdir.mkdir(parents=True, exist_ok=True) + kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json") + with kres_outfile.open("w") as fp: + json.dump(kres_data, fp) + elif args.output == "db": + kres_col = valdb["kres"] + for sentence in kres_data: + kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True) if __name__ == "__main__": parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") @@ -84,41 +40,51 @@ if __name__ == "__main__": parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None) + parser.add_argument('--cores', required=False, default=1) args = parser.parse_args() - - # parse ssj - logger.info("Parsing ssj500k: {}".format(args.ssj_file)) - ssj_parser = Parser( - corpus="ssj", - infiles=[args.ssj_file], - ) - # ssj to json + outdir = None + valdb = None if args.output == "file": - ssj_to_json_file(ssj_parser.sentence_generator(), args.outdir) + outdir = Path(args.outdir) + outdir.mkdir(parents=True, exist_ok=True) elif args.output == "db": - data_to_valdb( - ssj_parser.sentence_generator(), - args.dbaddr, - args.dbuser, - args.dbpass, - collection_name="ssj" + dbclient = MongoClient( + "mongodb://{}".format(args.dbaddr), + username=args.dbuser, + password=args.dbpass, + authSource="valdb", + authMechanism='SCRAM-SHA-1' ) + valdb = dbclient.valdb - # parse kres - logger.info("Parsing Kres: {}".format(args.ssj_file)) + # SSJ + logger.info("Parsing Ssj: {}".format(args.ssj_file)) + ssj_parser = Parser(corpus="ssj") + ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file)) + if args.output == "file": + ssj_outfile = outdir / "ssj500k.json" + with ssj_outfile.open("w") as fp: + json.dump(ssj_data, fp) + elif args.output == "db": + ssj_col = valdb["ssj"] + for sentence in ssj_data: + ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True) + + + # Kres + logger.info("Parsing Kres: {}".format(args.kres_folder)) kres_parser = Parser( corpus="kres", - infiles=[args.kres_folder, args.kres_srl_folder], + kres_srl_folder=args.kres_srl_folder ) - # kres to json - if args.output == "file": - kres_to_json_files(kres_parser.sentence_generator(), args.outdir) - elif args.output == "db": - data_to_valdb( - kres_parser.sentence_generator(), - args.dbaddr, - args.dbuser, - args.dbpass, - collection_name="kres" - ) + + # [(idx, filepath)] + kres_files = [x for x in Path(args.kres_folder).iterdir()] + kres_files = [x for x in enumerate(kres_files)] + n_kres_files = len(kres_files) + + p = Pool(int(args.cores)) + p.map(_handle_kres_file_tpl, kres_files) + + logger.info("Finished parsing.")