Compare commits

..

18 Commits

Author SHA1 Message Date
01adf47b9b a 2019-04-22 09:06:02 +02:00
2582314c4d bugfix 2019-04-21 22:24:36 +02:00
792c0b03fd processing kres files in chunks to reduce number of DB connections 2019-04-21 21:50:56 +02:00
3276619e6f close dbclient after usage 2019-04-21 19:35:39 +02:00
bcc64c767c separate parsing of ssj and kres (mem management) 2019-04-21 19:18:19 +02:00
00d9192993 moved parse_ssj into a subprocess 2019-04-21 17:04:20 +02:00
14c607c106 Merge branch 'master' of gitea.cjvt.si:kristjan/cjvt-corpusparser into my-fix 2019-04-21 16:07:15 +02:00
94d5a6cd73 added exception to parsing ssj 2019-04-21 16:07:08 +02:00
5ae1a9783c fixed some logging 2019-04-21 13:11:05 +02:00
voje
c6b8426fb3 added adjective handling (appending _ to headwords) 2019-04-19 07:41:50 +02:00
af4f6045bb prevent duplicate entries in DB 2019-04-15 20:48:10 +02:00
f0b0abac1b added functors and headwords to db entry 2019-04-15 02:34:53 +02:00
86e56767dd added parallel processing 2019-04-15 00:25:26 +02:00
cce83045e8 adding per-file parsing, for parallel use 2019-04-14 17:16:45 +02:00
19945a9dd9 changed default mongo auth mechanism 2019-04-14 16:50:54 +02:00
c17361fbda added more logging 2019-04-14 04:18:52 +02:00
voje
2b7339ac5a update instead of insert, fixing sentence duplication in db 2019-04-11 07:55:44 +02:00
77c599dded bug fix 2019-03-24 13:44:47 +01:00
3 changed files with 195 additions and 103 deletions

View File

@@ -3,24 +3,21 @@ import re
import json
from lxml import etree
import logging
import time
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.INFO)
# Read input file(.xml, .json; kres or ssj500k).
# Create an iterator that outputs resulting sentences (python dict format).
class Parser():
def __init__(self, corpus, infiles, logger=None):
if corpus == "kres":
self.kres_folder = Path(infiles[0])
self.kres_srl_folder = Path(infiles[1])
elif corpus == "ssj":
self.ssj_file = Path(infiles[0])
else:
raise ValueError("Argument corpus should be 'ssj' or 'kres'.")
def __init__(self, corpus, kres_srl_folder=None, logger=None):
self.corpus = corpus
if self.corpus == "kres":
self.kres_srl_folder = kres_srl_folder
self.W_TAGS = ['w']
self.C_TAGS = ['c']
self.S_TAGS = ['S', 'pc']
@@ -30,6 +27,10 @@ class Parser():
"missing_srl": []
}
# for logging output
self.n_kres_files = -1
self.nth_kres_file = -1
def parse_jos_links(self, sent_el):
if self.corpus == "kres":
return self.parse_jos_links_kres(sent_el)
@@ -60,7 +61,10 @@ class Parser():
def parse_any_links_ssj(self, sent_el, links_type):
lgrps = sent_el.findall(".//linkGrp")
links = [x for x in lgrps if x.get("type") == links_type][0]
try:
links = [x for x in lgrps if x.get("type") == links_type][0]
except:
return []
res_links = []
for link in links:
tar = self.parse_ssj_target_arg(link.get("target"))
@@ -90,14 +94,34 @@ class Parser():
def sentence_generator(self):
# Using generators so we don't copy a whole corpu around in memory.
# Use parse_xml_file() instead for pre-file processing (parallelism?)
if self.corpus == "kres":
# some logging output
if self.n_kres_files == -1:
self.n_kres_files = len(list(Path(self.kres_folder).glob('*')))
for xml_file in self.kres_folder.iterdir():
# self.parse_xml_file(xml_file)
yield from self.parse_xml_file(xml_file)
self.nth_kres_file += 1
self.logger.info("{} ({}/{})".format(
xml_file, self.nth_kres_file, self.n_kres_files))
yield from self.xml_file_to_generator(xml_file)
else:
yield from self.parse_xml_file(self.ssj_file)
yield from self.xml_file_to_generator(self.ssj_file)
def parse_xml_file(self, xml_file):
# tstart = time.time()
file_data = []
for tpl in self.xml_file_to_generator(xml_file):
file_data += [tpl[1]]
tend = time.time()
# self.logger.info("Parsed {} in {:.4f} s".format(xml_file, tend - tstart))
return file_data
def xml_file_to_generator(self, xml_file):
# for separate srl links, it will guess the srl file based on
# self.kres_srl_folder
srl_from_json = {}
if self.corpus == "kres":
# in case of kres, read the SRL links form a separate json file
@@ -190,7 +214,7 @@ class Parser():
"text": sentence_text,
"tokens": sentence_tokens,
"jos_links": jos_links,
"srl_links": srl_links_parsed
"srl_links": srl_links_parsed,
}
self.stats["parsed_count"] += 1
yield (xml_file, sentence_entry)

View File

@@ -1 +1,2 @@
from corpusparser.Parser import Parser
from corpusparser.Parser import Parser
from corpusparser.main import enriched_lemma

View File

@@ -4,121 +4,188 @@ import argparse
import logging
import json
from pymongo import MongoClient
import pymongo
import sys
from multiprocessing import Pool
import time
logging.basicConfig(level=logging.INFO)
CORPORA = ["kres", "ssj"]
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
logger = logging.getLogger(__name__)
## Main handles command line arguments and writing to files / DB.
# lfh = logging.FileHandler("/project/logs/fill-database.log")
lfh = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
lfh.setFormatter(formatter)
logger.addHandler(lfh)
logger.setLevel(logging.INFO)
def ssj_to_json_file(sentence_generator, outfolder):
# this funciton is based on the fact that files are parsed sequentially
outfolder = Path(outfolder)
outfolder.mkdir(parents=True, exist_ok=True)
outfile = outfolder / "ssj500k.json"
n_chunks = -1
data_buffer = []
for s in sentence_generator:
sdata = s[1]
data_buffer += [sdata]
# outfile = Path(outfile)
with outfile.open("w") as fp:
logger.info("Writing to {}".format(outfile))
json.dump(data_buffer, fp)
def enriched_lemma(token):
return (token["lemma"] if token["msd"][0] == "G" else token["lemma"] + "_")
def kres_to_json_files(sentence_generator, outfolder):
outfolder = Path(outfolder) / "kres_json"
outfolder.mkdir(parents=True, exist_ok=True)
def write_buffer_to_file(outfile, outfile_buffer):
logger.info("Writing file: {}".format(outfile))
with outfile.open("w") as fp:
json.dump(outfile_buffer, fp)
def _helper_tid_to_token(tid, tokens):
for t in tokens:
if t["tid"] == tid:
return t
return None
outfile_buffer = None
current_outfile = None
for s in sentence_generator:
infile = s[0]
outfile = outfolder / Path(infile.name.split(".")[0]).with_suffix(".json")
# parser sequentially parses files; when we're done with a file, write it out
if current_outfile is None:
current_outfile = outfile
outfile_buffer = []
elif outfile != current_outfile:
write_buffer_to_file(current_outfile, outfile_buffer)
current_outfile = outfile
outfile_buffer = []
def _db_preprocess(e):
if e["srl_links"] is None:
e["headwords"] = []
e["functors"] = []
else:
hw_tids = list(set([x["from"] for x in e["srl_links"]]))
hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids]
headwords = [enriched_lemma(t) for t in hw_tokens]
e["headwords"] = headwords
# update buffer
sdata = s[1]
outfile_buffer += [sdata]
write_buffer_to_file(current_outfile, outfile_buffer)
functors = list(set([x["afun"] for x in e["srl_links"]]))
e["functors"] = functors
return e
def data_to_valdb(sentence_generator, dbaddr, username, password, collection_name):
logger.info("Connecting to: {}".format(dbaddr))
client = MongoClient(
"mongodb://{}".format(dbaddr),
username=username,
password=password,
# handler for concurrency
def _handle_kres_file_chunk(kres_file_chunk):
tstart = time.time()
kres_chunk_idx = kres_file_chunk[0]
kres_chunk = kres_file_chunk[1]
dbclient = None
db_payload = []
if args.output == "db":
# mongoclient needs to be created after forking
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
# dbclient.valdb["kres"]
for kres_file in kres_chunk:
try:
kres_data = kres_parser.parse_xml_file(kres_file)
except:
logger.error("Failed to parse file: {}".format(kres_file))
continue
if args.output == "file":
kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
elif args.output == "db":
"""
if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0:
logger.info("File {} already in DB, closing chunk ({}/{})".format(
kres_file, kres_chunk_idx, n_chunks))
dbclient.close()
return
"""
kres_data_1 = [_db_preprocess(x) for x in kres_data]
db_payload += kres_data_1
try:
dbclient.valdb["kres"].insert_many(db_payload, ordered=False) # much much better (just make sure sid has a unique index)
except:
logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
dbclient.close()
def _get_dbclient(args):
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-256'
authMechanism='SCRAM-SHA-1'
)
valdb = client.valdb
logger.info("Writing data to {}.".format(collection_name))
col = valdb[collection_name]
for s in sentence_generator:
sdata = s[1]
col.insert_one(sdata)
return dbclient
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
parser.add_argument('--kres-folder', required=True)
parser.add_argument('--kres-srl-folder', required=True)
parser.add_argument('--ssj-file', required=True)
parser.add_argument('--corpus', required=True)
parser.add_argument('--kres-folder', required=False)
parser.add_argument('--kres-srl-folder', required=False)
parser.add_argument('--ssj-file', required=False)
parser.add_argument('--output', required=False, default=None)
parser.add_argument('--outdir', required=False, default=None)
parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1)
parser.add_argument('--chunk-size', required=False, default=3)
args = parser.parse_args()
# parse ssj
logger.info("Parsing ssj500k: {}".format(args.ssj_file))
ssj_parser = Parser(
corpus="ssj",
infiles=[args.ssj_file],
)
# ssj to json
corpus = args.corpus
assert (corpus in CORPORA), "Wrong corpus name."
outdir = None
if args.output == "file":
ssj_to_json_file(ssj_parser.sentence_generator(), args.outdir)
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db":
data_to_valdb(
ssj_parser.sentence_generator(),
args.dbaddr,
args.dbuser,
args.dbpass,
collection_name="ssj"
dbclient = _get_dbclient(args)
dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
dbclient.close()
if corpus == "ssj":
logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(logger=logger, corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
dbclient.close()
if corpus == "kres":
# Kres
logger.info("Parsing Kres: {}".format(args.kres_folder))
kres_parser = Parser(
logger=logger,
corpus="kres",
kres_srl_folder=args.kres_srl_folder
)
sys.exit()
# parse kres
logger.info("Parsing Kres: {}".format(args.ssj_file))
kres_parser = Parser(
corpus="kres",
infiles=[args.kres_folder, args.kres_srl_folder],
)
# kres to json
if args.output == "file":
kres_to_json_files(kres_parser.sentence_generator(), args.outdir)
elif args.output == "db":
data_to_valdb(
ssj_parser.sentence_generator(),
args.dbaddr,
args.dbuser,
args.dbpass,
collection_name="kres"
)
kres_files = [x for x in Path(args.kres_folder).iterdir()]
kres_files = sorted(kres_files, key=lambda x: x.name)
kres_files_chunks = []
i = 0
while i < len(kres_files):
# kres_files_chunks += kres_files[i:(i+args.chunk_size)]
new_i = i + int(args.chunk_size)
kres_files_chunks += [kres_files[i:new_i]]
i = new_i
kres_files_chunks = [x for x in enumerate(kres_files_chunks)]
n_chunks = len(kres_files_chunks)
p = Pool(int(args.cores))
p.map(_handle_kres_file_chunk, kres_files_chunks)
logger.info("Finished parsing.")