from pathlib import Path from corpusparser import Parser import argparse import logging import json from pymongo import MongoClient import pymongo import sys from multiprocessing import Pool import time # logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO) logger = logging.getLogger(__name__) # lfh = logging.FileHandler("/var/tmp/fill-database.log") lfh = logging.StreamHandler(sys.stdout) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") lfh.setFormatter(formatter) logger.addHandler(lfh) logger.setLevel(logging.INFO) n_kres_files = -1 # for logging def enriched_lemma(token): return (token["lemma"] if token["msd"][0] == "G" else token["lemma"] + "_") def _helper_tid_to_token(tid, tokens): for t in tokens: if t["tid"] == tid: return t return None def _db_preprocess(e): if e["srl_links"] is None: e["headwords"] = [] e["functors"] = [] else: hw_tids = list(set([x["from"] for x in e["srl_links"]])) hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids] headwords = [enriched_lemma(t) for t in hw_tokens] e["headwords"] = headwords functors = list(set([x["afun"] for x in e["srl_links"]])) e["functors"] = functors return e # handler for concurrency def _handle_kres_file_tpl(kres_file_tpl): tstart = time.time() kres_file_idx = kres_file_tpl[0] kres_file = kres_file_tpl[1] kres_data = kres_parser.parse_xml_file(kres_file) if args.output == "file": kres_outdir = outdir / "kres_json" kres_outdir.mkdir(parents=True, exist_ok=True) kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json") with kres_outfile.open("w") as fp: json.dump(kres_data, fp) elif args.output == "db": # mongoclient needs to be created after forking dbclient = MongoClient( "mongodb://{}".format(args.dbaddr), username=args.dbuser, password=args.dbpass, authSource="valdb", authMechanism='SCRAM-SHA-1' ) valdb = dbclient.valdb kres_col = valdb["kres"] # HUUUUGE BOTTLENECK """ for sentence in kres_data: kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True) """ # skip if one of the sentences is already in DB if kres_col.find({"sid": kres_data[0]["sid"]}).count() > 0: logger.info("File {} already in DB ({}/{})".format( kres_file, kres_file_idx, n_kres_files)) return kres_data_1 = [_db_preprocess(x) for x in kres_data] kres_col.insert_many(kres_data_1) # much much better (just make sure sid has a unique index) logger.info("Inserted data from {} ({}/{}) in {:.2f} s".format( kres_file, kres_file_idx, n_kres_files, time.time() - tstart)) def _get_dbclient(args): dbclient = MongoClient( "mongodb://{}".format(args.dbaddr), username=args.dbuser, password=args.dbpass, authSource="valdb", authMechanism='SCRAM-SHA-1' ) return dbclient if __name__ == "__main__": parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") parser.add_argument('--kres-folder', required=True) parser.add_argument('--kres-srl-folder', required=True) parser.add_argument('--ssj-file', required=True) parser.add_argument('--output', required=False, default=None) parser.add_argument('--outdir', required=False, default=None) parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--cores', required=False, default=1) args = parser.parse_args() outdir = None if args.output == "file": outdir = Path(args.outdir) outdir.mkdir(parents=True, exist_ok=True) elif args.output == "db": # Force unique sid dbclient = _get_dbclient(args) for corpus in ["kres", "ssj"]: dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)]) dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)]) dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)]) # SSJ logger.info("Parsing Ssj: {}".format(args.ssj_file)) ssj_parser = Parser(logger=logger, corpus="ssj") ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file)) if args.output == "file": ssj_outfile = outdir / "ssj500k.json" with ssj_outfile.open("w") as fp: json.dump(ssj_data, fp) elif args.output == "db": dbclient = _get_dbclient(args) valdb = dbclient.valdb ssj_col = valdb["ssj"] for sentence in ssj_data: sentence = _db_preprocess(sentence) ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True) # Kres logger.info("Parsing Kres: {}".format(args.kres_folder)) kres_parser = Parser( logger=logger, corpus="kres", kres_srl_folder=args.kres_srl_folder ) # [(idx, filepath)] kres_files = [x for x in Path(args.kres_folder).iterdir()] kres_files = [x for x in enumerate(kres_files)] n_kres_files = len(kres_files) p = Pool(int(args.cores)) p.map(_handle_kres_file_tpl, kres_files) logger.info("Finished parsing.")