cjvt-corpusparser/corpusparser/main.py
2019-04-22 09:06:02 +02:00

192 lines
6.4 KiB
Python

from pathlib import Path
from corpusparser import Parser
import argparse
import logging
import json
from pymongo import MongoClient
import pymongo
import sys
from multiprocessing import Pool
import time
CORPORA = ["kres", "ssj"]
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
logger = logging.getLogger(__name__)
# lfh = logging.FileHandler("/project/logs/fill-database.log")
lfh = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
lfh.setFormatter(formatter)
logger.addHandler(lfh)
logger.setLevel(logging.INFO)
n_chunks = -1
def enriched_lemma(token):
return (token["lemma"] if token["msd"][0] == "G" else token["lemma"] + "_")
def _helper_tid_to_token(tid, tokens):
for t in tokens:
if t["tid"] == tid:
return t
return None
def _db_preprocess(e):
if e["srl_links"] is None:
e["headwords"] = []
e["functors"] = []
else:
hw_tids = list(set([x["from"] for x in e["srl_links"]]))
hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids]
headwords = [enriched_lemma(t) for t in hw_tokens]
e["headwords"] = headwords
functors = list(set([x["afun"] for x in e["srl_links"]]))
e["functors"] = functors
return e
# handler for concurrency
def _handle_kres_file_chunk(kres_file_chunk):
tstart = time.time()
kres_chunk_idx = kres_file_chunk[0]
kres_chunk = kres_file_chunk[1]
dbclient = None
db_payload = []
if args.output == "db":
# mongoclient needs to be created after forking
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
# dbclient.valdb["kres"]
for kres_file in kres_chunk:
try:
kres_data = kres_parser.parse_xml_file(kres_file)
except:
logger.error("Failed to parse file: {}".format(kres_file))
continue
if args.output == "file":
kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
elif args.output == "db":
"""
if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0:
logger.info("File {} already in DB, closing chunk ({}/{})".format(
kres_file, kres_chunk_idx, n_chunks))
dbclient.close()
return
"""
kres_data_1 = [_db_preprocess(x) for x in kres_data]
db_payload += kres_data_1
try:
dbclient.valdb["kres"].insert_many(db_payload, ordered=False) # much much better (just make sure sid has a unique index)
except:
logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
dbclient.close()
def _get_dbclient(args):
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
return dbclient
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
parser.add_argument('--corpus', required=True)
parser.add_argument('--kres-folder', required=False)
parser.add_argument('--kres-srl-folder', required=False)
parser.add_argument('--ssj-file', required=False)
parser.add_argument('--output', required=False, default=None)
parser.add_argument('--outdir', required=False, default=None)
parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1)
parser.add_argument('--chunk-size', required=False, default=3)
args = parser.parse_args()
corpus = args.corpus
assert (corpus in CORPORA), "Wrong corpus name."
outdir = None
if args.output == "file":
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db":
dbclient = _get_dbclient(args)
dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
dbclient.close()
if corpus == "ssj":
logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(logger=logger, corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
dbclient.close()
if corpus == "kres":
# Kres
logger.info("Parsing Kres: {}".format(args.kres_folder))
kres_parser = Parser(
logger=logger,
corpus="kres",
kres_srl_folder=args.kres_srl_folder
)
kres_files = [x for x in Path(args.kres_folder).iterdir()]
kres_files = sorted(kres_files, key=lambda x: x.name)
kres_files_chunks = []
i = 0
while i < len(kres_files):
# kres_files_chunks += kres_files[i:(i+args.chunk_size)]
new_i = i + int(args.chunk_size)
kres_files_chunks += [kres_files[i:new_i]]
i = new_i
kres_files_chunks = [x for x in enumerate(kres_files_chunks)]
n_chunks = len(kres_files_chunks)
p = Pool(int(args.cores))
p.map(_handle_kres_file_chunk, kres_files_chunks)
logger.info("Finished parsing.")