You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

171 lines
5.7 KiB

from pathlib import Path
from corpusparser import Parser
import argparse
import logging
import json
from pymongo import MongoClient
import pymongo
import sys
from multiprocessing import Pool
import time
CORPORA = ["kres", "ssj"]
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
logger = logging.getLogger(__name__)
lfh = logging.FileHandler("/project/logs/fill-database.log")
# lfh = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
lfh.setFormatter(formatter)
logger.addHandler(lfh)
logger.setLevel(logging.INFO)
n_kres_files = -1 # for logging
def enriched_lemma(token):
return (token["lemma"] if token["msd"][0] == "G" else token["lemma"] + "_")
def _helper_tid_to_token(tid, tokens):
for t in tokens:
if t["tid"] == tid:
return t
return None
def _db_preprocess(e):
if e["srl_links"] is None:
e["headwords"] = []
e["functors"] = []
else:
hw_tids = list(set([x["from"] for x in e["srl_links"]]))
hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids]
headwords = [enriched_lemma(t) for t in hw_tokens]
e["headwords"] = headwords
functors = list(set([x["afun"] for x in e["srl_links"]]))
e["functors"] = functors
return e
# handler for concurrency
def _handle_kres_file_tpl(kres_file_tpl):
tstart = time.time()
kres_file_idx = kres_file_tpl[0]
kres_file = kres_file_tpl[1]
kres_data = kres_parser.parse_xml_file(kres_file)
if args.output == "file":
kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
elif args.output == "db":
# mongoclient needs to be created after forking
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
valdb = dbclient.valdb
kres_col = valdb["kres"]
# HUUUUGE BOTTLENECK
"""
for sentence in kres_data:
kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
"""
# skip if one of the sentences is already in DB
if kres_col.find({"sid": kres_data[0]["sid"]}).count() > 0:
logger.info("File {} already in DB ({}/{})".format(
kres_file, kres_file_idx, n_kres_files))
return
kres_data_1 = [_db_preprocess(x) for x in kres_data]
kres_col.insert_many(kres_data_1) # much much better (just make sure sid has a unique index)
logger.info("Inserted data from {} ({}/{}) in {:.2f} s".format(
kres_file, kres_file_idx, n_kres_files, time.time() - tstart))
def _get_dbclient(args):
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
return dbclient
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
parser.add_argument('--corpus', required=True)
parser.add_argument('--kres-folder', required=False)
parser.add_argument('--kres-srl-folder', required=False)
parser.add_argument('--ssj-file', required=False)
parser.add_argument('--output', required=False, default=None)
parser.add_argument('--outdir', required=False, default=None)
parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1)
args = parser.parse_args()
corpus = args.corpus
assert (corpus in CORPORA), "Wrong corpus name."
outdir = None
if args.output == "file":
outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db":
dbclient = _get_dbclient(args)
dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
if corpus == "ssj":
logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(logger=logger, corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
if corpus == "kres":
# Kres
logger.info("Parsing Kres: {}".format(args.kres_folder))
kres_parser = Parser(
logger=logger,
corpus="kres",
kres_srl_folder=args.kres_srl_folder
)
# [(idx, filepath)]
kres_files = [x for x in Path(args.kres_folder).iterdir()]
kres_files = [x for x in enumerate(kres_files)]
n_kres_files = len(kres_files)
p = Pool(int(args.cores))
p.map(_handle_kres_file_tpl, kres_files)
logger.info("Finished parsing.")