Compare commits

...

5 Commits

Author SHA1 Message Date
f0b0abac1b added functors and headwords to db entry 2019-04-15 02:34:53 +02:00
86e56767dd added parallel processing 2019-04-15 00:25:26 +02:00
cce83045e8 adding per-file parsing, for parallel use 2019-04-14 17:16:45 +02:00
19945a9dd9 changed default mongo auth mechanism 2019-04-14 16:50:54 +02:00
c17361fbda added more logging 2019-04-14 04:18:52 +02:00
2 changed files with 135 additions and 96 deletions

View File

@@ -3,6 +3,7 @@ import re
import json import json
from lxml import etree from lxml import etree
import logging import logging
import time
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -10,17 +11,13 @@ logging.basicConfig(level=logging.INFO)
# Create an iterator that outputs resulting sentences (python dict format). # Create an iterator that outputs resulting sentences (python dict format).
class Parser(): class Parser():
def __init__(self, corpus, infiles, logger=None): def __init__(self, corpus, kres_srl_folder=None, logger=None):
if corpus == "kres":
self.kres_folder = Path(infiles[0])
self.kres_srl_folder = Path(infiles[1])
elif corpus == "ssj":
self.ssj_file = Path(infiles[0])
else:
raise ValueError("Argument corpus should be 'ssj' or 'kres'.")
self.corpus = corpus self.corpus = corpus
if self.corpus == "kres":
self.kres_srl_folder = kres_srl_folder
self.W_TAGS = ['w'] self.W_TAGS = ['w']
self.C_TAGS = ['c'] self.C_TAGS = ['c']
self.S_TAGS = ['S', 'pc'] self.S_TAGS = ['S', 'pc']
@@ -30,6 +27,10 @@ class Parser():
"missing_srl": [] "missing_srl": []
} }
# for logging output
self.n_kres_files = -1
self.nth_kres_file = -1
def parse_jos_links(self, sent_el): def parse_jos_links(self, sent_el):
if self.corpus == "kres": if self.corpus == "kres":
return self.parse_jos_links_kres(sent_el) return self.parse_jos_links_kres(sent_el)
@@ -90,14 +91,34 @@ class Parser():
def sentence_generator(self): def sentence_generator(self):
# Using generators so we don't copy a whole corpu around in memory. # Using generators so we don't copy a whole corpu around in memory.
# Use parse_xml_file() instead for pre-file processing (parallelism?)
if self.corpus == "kres": if self.corpus == "kres":
# some logging output
if self.n_kres_files == -1:
self.n_kres_files = len(list(Path(self.kres_folder).glob('*')))
for xml_file in self.kres_folder.iterdir(): for xml_file in self.kres_folder.iterdir():
# self.parse_xml_file(xml_file) self.nth_kres_file += 1
yield from self.parse_xml_file(xml_file) self.logger.info("{} ({}/{})".format(
xml_file, self.nth_kres_file, self.n_kres_files))
yield from self.xml_file_to_generator(xml_file)
else: else:
yield from self.parse_xml_file(self.ssj_file) yield from self.xml_file_to_generator(self.ssj_file)
def parse_xml_file(self, xml_file): def parse_xml_file(self, xml_file):
# tstart = time.time()
file_data = []
for tpl in self.xml_file_to_generator(xml_file):
file_data += [tpl[1]]
tend = time.time()
# self.logger.info("Parsed {} in {:.4f} s".format(xml_file, tend - tstart))
return file_data
def xml_file_to_generator(self, xml_file):
# for separate srl links, it will guess the srl file based on
# self.kres_srl_folder
srl_from_json = {} srl_from_json = {}
if self.corpus == "kres": if self.corpus == "kres":
# in case of kres, read the SRL links form a separate json file # in case of kres, read the SRL links form a separate json file
@@ -190,7 +211,7 @@ class Parser():
"text": sentence_text, "text": sentence_text,
"tokens": sentence_tokens, "tokens": sentence_tokens,
"jos_links": jos_links, "jos_links": jos_links,
"srl_links": srl_links_parsed "srl_links": srl_links_parsed,
} }
self.stats["parsed_count"] += 1 self.stats["parsed_count"] += 1
yield (xml_file, sentence_entry) yield (xml_file, sentence_entry)

View File

@@ -4,75 +4,83 @@ import argparse
import logging import logging
import json import json
from pymongo import MongoClient from pymongo import MongoClient
import pymongo
import sys import sys
from multiprocessing import Pool
import time
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
## Main handles command line arguments and writing to files / DB. n_kres_files = -1 # for logging
def ssj_to_json_file(sentence_generator, outfolder):
# this funciton is based on the fact that files are parsed sequentially
outfolder = Path(outfolder)
outfolder.mkdir(parents=True, exist_ok=True)
outfile = outfolder / "ssj500k.json"
data_buffer = [] def _helper_tid_to_token(tid, tokens):
for s in sentence_generator: for t in tokens:
sdata = s[1] if t["tid"] == tid:
data_buffer += [sdata] return t
return None
# outfile = Path(outfile)
with outfile.open("w") as fp:
logger.info("Writing to {}".format(outfile))
json.dump(data_buffer, fp)
def kres_to_json_files(sentence_generator, outfolder): def _db_preprocess(e):
outfolder = Path(outfolder) / "kres_json" if e["srl_links"] is None:
outfolder.mkdir(parents=True, exist_ok=True) e["headwords"] = []
e["functors"] = []
else:
hw_tids = list(set([x["from"] for x in e["srl_links"]]))
hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids]
headwords = [(t["lemma"] if t["msd"][0] == "G" else t["lemma"] + "_") for t in hw_tokens]
e["headwords"] = headwords
def write_buffer_to_file(outfile, outfile_buffer): functors = list(set([x["afun"] for x in e["srl_links"]]))
logger.info("Writing file: {}".format(outfile)) e["functors"] = functors
with outfile.open("w") as fp: return e
json.dump(outfile_buffer, fp)
outfile_buffer = None
current_outfile = None
for s in sentence_generator:
infile = s[0]
outfile = outfolder / Path(infile.name.split(".")[0]).with_suffix(".json")
# parser sequentially parses files; when we're done with a file, write it out # handler for concurrency
if current_outfile is None: def _handle_kres_file_tpl(kres_file_tpl):
current_outfile = outfile tstart = time.time()
outfile_buffer = [] kres_file_idx = kres_file_tpl[0]
elif outfile != current_outfile: kres_file = kres_file_tpl[1]
write_buffer_to_file(current_outfile, outfile_buffer) kres_data = kres_parser.parse_xml_file(kres_file)
current_outfile = outfile if args.output == "file":
outfile_buffer = [] kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
# update buffer kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
sdata = s[1] with kres_outfile.open("w") as fp:
outfile_buffer += [sdata] json.dump(kres_data, fp)
write_buffer_to_file(current_outfile, outfile_buffer) elif args.output == "db":
# mongoclient needs to be created after forking
def data_to_valdb(sentence_generator, dbaddr, username, password, collection_name): dbclient = MongoClient(
logger.info("Connecting to: {}".format(dbaddr)) "mongodb://{}".format(args.dbaddr),
client = MongoClient( username=args.dbuser,
"mongodb://{}".format(dbaddr), password=args.dbpass,
username=username,
password=password,
authSource="valdb", authSource="valdb",
authMechanism='SCRAM-SHA-256' authMechanism='SCRAM-SHA-1'
) )
valdb = client.valdb valdb = dbclient.valdb
logger.info("Writing data to {}.".format(collection_name)) kres_col = valdb["kres"]
col = valdb[collection_name]
for s in sentence_generator:
sdata = s[1]
# col.insert_one(sdata)
col.update({"sid": sdata["sid"]}, sdata, upsert=True)
# HUUUUGE BOTTLENECK
"""
for sentence in kres_data:
kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
"""
kres_data_1 = [_db_preprocess(x) for x in kres_data]
kres_col.insert_many(kres_data_1) # much much better (just make sure sid has a unique index)
logging.info("Handled {} ({}/{}) in {:.2f} s".format(
kres_file, kres_file_idx, n_kres_files, time.time() - tstart))
def _get_dbclient(args):
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
return dbclient
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
@@ -84,41 +92,51 @@ if __name__ == "__main__":
parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1)
args = parser.parse_args() args = parser.parse_args()
outdir = None
# parse ssj
logger.info("Parsing ssj500k: {}".format(args.ssj_file))
ssj_parser = Parser(
corpus="ssj",
infiles=[args.ssj_file],
)
# ssj to json
if args.output == "file": if args.output == "file":
ssj_to_json_file(ssj_parser.sentence_generator(), args.outdir) outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db": elif args.output == "db":
data_to_valdb( # Force unique sid
ssj_parser.sentence_generator(), dbclient = _get_dbclient(args)
args.dbaddr, for corpus in ["kres", "ssj"]:
args.dbuser, dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
args.dbpass, dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
collection_name="ssj" dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
)
# parse kres # SSJ
logger.info("Parsing Kres: {}".format(args.ssj_file)) logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
# Kres
logger.info("Parsing Kres: {}".format(args.kres_folder))
kres_parser = Parser( kres_parser = Parser(
corpus="kres", corpus="kres",
infiles=[args.kres_folder, args.kres_srl_folder], kres_srl_folder=args.kres_srl_folder
)
# kres to json
if args.output == "file":
kres_to_json_files(kres_parser.sentence_generator(), args.outdir)
elif args.output == "db":
data_to_valdb(
kres_parser.sentence_generator(),
args.dbaddr,
args.dbuser,
args.dbpass,
collection_name="kres"
) )
# [(idx, filepath)]
kres_files = [x for x in Path(args.kres_folder).iterdir()]
kres_files = [x for x in enumerate(kres_files)]
n_kres_files = len(kres_files)
p = Pool(int(args.cores))
p.map(_handle_kres_file_tpl, kres_files)
logger.info("Finished parsing.")