from pathlib import Path from corpusparser import Parser import argparse import logging import json from pymongo import MongoClient import pymongo import sys from multiprocessing import Pool import time CORPORA = ["kres", "ssj"] # logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO) logger = logging.getLogger(__name__) # lfh = logging.FileHandler("/project/logs/fill-database.log") lfh = logging.StreamHandler(sys.stdout) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") lfh.setFormatter(formatter) logger.addHandler(lfh) logger.setLevel(logging.INFO) n_chunks = -1 def enriched_lemma(token): return (token["lemma"] if token["msd"][0] == "G" else token["lemma"] + "_") def _helper_tid_to_token(tid, tokens): for t in tokens: if t["tid"] == tid: return t return None def _db_preprocess(e): if e["srl_links"] is None: e["headwords"] = [] e["functors"] = [] else: hw_tids = list(set([x["from"] for x in e["srl_links"]])) hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids] headwords = [enriched_lemma(t) for t in hw_tokens] e["headwords"] = headwords functors = list(set([x["afun"] for x in e["srl_links"]])) e["functors"] = functors return e # handler for concurrency def _handle_kres_file_chunk(kres_file_chunk): tstart = time.time() kres_chunk_idx = kres_file_chunk[0] kres_chunk = kres_file_chunk[1] dbclient = None db_payload = [] if args.output == "db": # mongoclient needs to be created after forking dbclient = MongoClient( "mongodb://{}".format(args.dbaddr), username=args.dbuser, password=args.dbpass, authSource="valdb", authMechanism='SCRAM-SHA-1' ) # dbclient.valdb["kres"] for kres_file in kres_chunk: try: kres_data = kres_parser.parse_xml_file(kres_file) except: logger.error("Failed to parse file: {}".format(kres_file)) continue if args.output == "file": kres_outdir = outdir / "kres_json" kres_outdir.mkdir(parents=True, exist_ok=True) kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json") with kres_outfile.open("w") as fp: json.dump(kres_data, fp) elif args.output == "db": """ if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0: logger.info("File {} already in DB, closing chunk ({}/{})".format( kres_file, kres_chunk_idx, n_chunks)) dbclient.close() return """ kres_data_1 = [_db_preprocess(x) for x in kres_data] db_payload += kres_data_1 try: dbclient.valdb["kres"].insert_many(db_payload, ordered=False) # much much better (just make sure sid has a unique index) except: logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format( kres_chunk_idx, n_chunks, time.time() - tstart)) logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format( kres_chunk_idx, n_chunks, time.time() - tstart)) dbclient.close() def _get_dbclient(args): dbclient = MongoClient( "mongodb://{}".format(args.dbaddr), username=args.dbuser, password=args.dbpass, authSource="valdb", authMechanism='SCRAM-SHA-1' ) return dbclient if __name__ == "__main__": parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") parser.add_argument('--corpus', required=True) parser.add_argument('--kres-folder', required=False) parser.add_argument('--kres-srl-folder', required=False) parser.add_argument('--ssj-file', required=False) parser.add_argument('--output', required=False, default=None) parser.add_argument('--outdir', required=False, default=None) parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--cores', required=False, default=1) parser.add_argument('--chunk-size', required=False, default=3) args = parser.parse_args() corpus = args.corpus assert (corpus in CORPORA), "Wrong corpus name." outdir = None if args.output == "file": outdir = Path(args.outdir) outdir.mkdir(parents=True, exist_ok=True) elif args.output == "db": dbclient = _get_dbclient(args) dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)]) dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)]) dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)]) dbclient.close() if corpus == "ssj": logger.info("Parsing Ssj: {}".format(args.ssj_file)) ssj_parser = Parser(logger=logger, corpus="ssj") ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file)) if args.output == "file": ssj_outfile = outdir / "ssj500k.json" with ssj_outfile.open("w") as fp: json.dump(ssj_data, fp) elif args.output == "db": dbclient = _get_dbclient(args) valdb = dbclient.valdb ssj_col = valdb["ssj"] for sentence in ssj_data: sentence = _db_preprocess(sentence) ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True) dbclient.close() if corpus == "kres": # Kres logger.info("Parsing Kres: {}".format(args.kres_folder)) kres_parser = Parser( logger=logger, corpus="kres", kres_srl_folder=args.kres_srl_folder ) kres_files = [x for x in Path(args.kres_folder).iterdir()] kres_files = sorted(kres_files, key=lambda x: x.name) kres_files_chunks = [] i = 0 while i < len(kres_files): # kres_files_chunks += kres_files[i:(i+args.chunk_size)] new_i = i + int(args.chunk_size) kres_files_chunks += [kres_files[i:new_i]] i = new_i kres_files_chunks = [x for x in enumerate(kres_files_chunks)] n_chunks = len(kres_files_chunks) p = Pool(int(args.cores)) p.map(_handle_kres_file_chunk, kres_files_chunks) logger.info("Finished parsing.")