added parallel processing

This commit is contained in:
voje 2019-04-15 00:25:26 +02:00
parent cce83045e8
commit 86e56767dd
2 changed files with 76 additions and 113 deletions

View File

@ -3,6 +3,7 @@ import re
import json import json
from lxml import etree from lxml import etree
import logging import logging
import time
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@ -10,17 +11,13 @@ logging.basicConfig(level=logging.INFO)
# Create an iterator that outputs resulting sentences (python dict format). # Create an iterator that outputs resulting sentences (python dict format).
class Parser(): class Parser():
def __init__(self, corpus, infiles, logger=None): def __init__(self, corpus, kres_srl_folder=None, logger=None):
if corpus == "kres":
self.kres_folder = Path(infiles[0])
self.kres_srl_folder = Path(infiles[1])
elif corpus == "ssj":
self.ssj_file = Path(infiles[0])
else:
raise ValueError("Argument corpus should be 'ssj' or 'kres'.")
self.corpus = corpus self.corpus = corpus
if self.corpus == "kres":
self.kres_srl_folder = kres_srl_folder
self.W_TAGS = ['w'] self.W_TAGS = ['w']
self.C_TAGS = ['c'] self.C_TAGS = ['c']
self.S_TAGS = ['S', 'pc'] self.S_TAGS = ['S', 'pc']
@ -94,7 +91,7 @@ class Parser():
def sentence_generator(self): def sentence_generator(self):
# Using generators so we don't copy a whole corpu around in memory. # Using generators so we don't copy a whole corpu around in memory.
# Might be too complicated. Try per-file generator instead. # Use parse_xml_file() instead for pre-file processing (parallelism?)
if self.corpus == "kres": if self.corpus == "kres":
# some logging output # some logging output
@ -102,23 +99,23 @@ class Parser():
self.n_kres_files = len(list(Path(self.kres_folder).glob('*'))) self.n_kres_files = len(list(Path(self.kres_folder).glob('*')))
for xml_file in self.kres_folder.iterdir(): for xml_file in self.kres_folder.iterdir():
# self.parse_xml_file(xml_file)
self.nth_kres_file += 1 self.nth_kres_file += 1
self.logger.info("{} ({}/{})".format( self.logger.info("{} ({}/{})".format(
xml_file, self.nth_kres_file, self.n_kres_files)) xml_file, self.nth_kres_file, self.n_kres_files))
yield from self.parse_xml_file(xml_file) yield from self.xml_file_to_generator(xml_file)
else: else:
yield from self.parse_xml_file(self.ssj_file) yield from self.xml_file_to_generator(self.ssj_file)
def kres_to_json_file(self, in_xml_file, out_json_file):
out_buffer = []
for _, sentence_entry in parser.parser_xml_file(in_xml_file):
out_buffer += [sentence_entry]
with outfile.open("w") as fp:
json.dump(out_buffer, fp)
def parse_xml_file(self, xml_file): def parse_xml_file(self, xml_file):
tstart = time.time()
file_data = []
for tpl in self.xml_file_to_generator(xml_file):
file_data += [tpl[1]]
tend = time.time()
self.logger.info("Parsed {} in {:.4f} s".format(xml_file, tend - tstart))
return file_data
def xml_file_to_generator(self, xml_file):
# for separate srl links, it will guess the srl file based on # for separate srl links, it will guess the srl file based on
# self.kres_srl_folder # self.kres_srl_folder

View File

@ -5,74 +5,30 @@ import logging
import json import json
from pymongo import MongoClient from pymongo import MongoClient
import sys import sys
from multiprocessing import Pool
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
## Main handles command line arguments and writing to files / DB. n_kres_files = -1 # for logging
def ssj_to_json_file(sentence_generator, outfolder):
# this funciton is based on the fact that files are parsed sequentially
outfolder = Path(outfolder)
outfolder.mkdir(parents=True, exist_ok=True)
outfile = outfolder / "ssj500k.json"
data_buffer = []
for s in sentence_generator:
sdata = s[1]
data_buffer += [sdata]
# outfile = Path(outfile)
with outfile.open("w") as fp:
logger.info("Writing to {}".format(outfile))
json.dump(data_buffer, fp)
def kres_to_json_files(sentence_generator, outfolder):
outfolder = Path(outfolder) / "kres_json"
outfolder.mkdir(parents=True, exist_ok=True)
def write_buffer_to_file(outfile, outfile_buffer):
logger.info("Writing file: {}".format(outfile))
with outfile.open("w") as fp:
json.dump(outfile_buffer, fp)
outfile_buffer = None
current_outfile = None
for s in sentence_generator:
infile = s[0]
outfile = outfolder / Path(infile.name.split(".")[0]).with_suffix(".json")
# parser sequentially parses files; when we're done with a file, write it out
if current_outfile is None:
current_outfile = outfile
outfile_buffer = []
elif outfile != current_outfile:
write_buffer_to_file(current_outfile, outfile_buffer)
current_outfile = outfile
outfile_buffer = []
# update buffer
sdata = s[1]
outfile_buffer += [sdata]
write_buffer_to_file(current_outfile, outfile_buffer)
def data_to_valdb(sentence_generator, dbaddr, username, password, collection_name):
logger.info("Connecting to: {}".format(dbaddr))
client = MongoClient(
"mongodb://{}".format(dbaddr),
username=username,
password=password,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
valdb = client.valdb
logger.info("Writing data to {}.".format(collection_name))
col = valdb[collection_name]
for s in sentence_generator:
sdata = s[1]
# col.insert_one(sdata)
col.update({"sid": sdata["sid"]}, sdata, upsert=True)
# handler for concurrency
def _handle_kres_file_tpl(kres_file_tpl):
kres_file_idx = kres_file_tpl[0]
kres_file = kres_file_tpl[1]
logging.info("Handling {} ({}/{})".format(
kres_file, kres_file_idx, n_kres_files))
kres_data = kres_parser.parse_xml_file(kres_file)
if args.output == "file":
kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
elif args.output == "db":
kres_col = valdb["kres"]
for sentence in kres_data:
kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
@ -84,41 +40,51 @@ if __name__ == "__main__":
parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1)
args = parser.parse_args() args = parser.parse_args()
outdir = None
# parse ssj valdb = None
logger.info("Parsing ssj500k: {}".format(args.ssj_file))
ssj_parser = Parser(
corpus="ssj",
infiles=[args.ssj_file],
)
# ssj to json
if args.output == "file": if args.output == "file":
ssj_to_json_file(ssj_parser.sentence_generator(), args.outdir) outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db": elif args.output == "db":
data_to_valdb( dbclient = MongoClient(
ssj_parser.sentence_generator(), "mongodb://{}".format(args.dbaddr),
args.dbaddr, username=args.dbuser,
args.dbuser, password=args.dbpass,
args.dbpass, authSource="valdb",
collection_name="ssj" authMechanism='SCRAM-SHA-1'
) )
valdb = dbclient.valdb
# parse kres # SSJ
logger.info("Parsing Kres: {}".format(args.ssj_file)) logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
ssj_col = valdb["ssj"]
for sentence in ssj_data:
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
# Kres
logger.info("Parsing Kres: {}".format(args.kres_folder))
kres_parser = Parser( kres_parser = Parser(
corpus="kres", corpus="kres",
infiles=[args.kres_folder, args.kres_srl_folder], kres_srl_folder=args.kres_srl_folder
) )
# kres to json
if args.output == "file": # [(idx, filepath)]
kres_to_json_files(kres_parser.sentence_generator(), args.outdir) kres_files = [x for x in Path(args.kres_folder).iterdir()]
elif args.output == "db": kres_files = [x for x in enumerate(kres_files)]
data_to_valdb( n_kres_files = len(kres_files)
kres_parser.sentence_generator(),
args.dbaddr, p = Pool(int(args.cores))
args.dbuser, p.map(_handle_kres_file_tpl, kres_files)
args.dbpass,
collection_name="kres" logger.info("Finished parsing.")
)