diff --git a/corpusparser/main.py b/corpusparser/main.py index b12478e..83b41fc 100644 --- a/corpusparser/main.py +++ b/corpusparser/main.py @@ -14,14 +14,14 @@ CORPORA = ["kres", "ssj"] # logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO) logger = logging.getLogger(__name__) -lfh = logging.FileHandler("/project/logs/fill-database.log") -# lfh = logging.StreamHandler(sys.stdout) +# lfh = logging.FileHandler("/project/logs/fill-database.log") +lfh = logging.StreamHandler(sys.stdout) formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") lfh.setFormatter(formatter) logger.addHandler(lfh) logger.setLevel(logging.INFO) -n_kres_files = -1 # for logging +n_chunks = -1 # for logging def enriched_lemma(token): @@ -51,18 +51,14 @@ def _db_preprocess(e): # handler for concurrency -def _handle_kres_file_tpl(kres_file_tpl): +def _handle_kres_file_chunk(kres_file_chunk): tstart = time.time() - kres_file_idx = kres_file_tpl[0] - kres_file = kres_file_tpl[1] - kres_data = kres_parser.parse_xml_file(kres_file) - if args.output == "file": - kres_outdir = outdir / "kres_json" - kres_outdir.mkdir(parents=True, exist_ok=True) - kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json") - with kres_outfile.open("w") as fp: - json.dump(kres_data, fp) - elif args.output == "db": + kres_chunk_idx = kres_file_chunk[0] + kres_chunk = kres_file_chunk[1] + + dbclient = None + db_payload = [] + if args.output == "db": # mongoclient needs to be created after forking dbclient = MongoClient( "mongodb://{}".format(args.dbaddr), @@ -71,25 +67,29 @@ def _handle_kres_file_tpl(kres_file_tpl): authSource="valdb", authMechanism='SCRAM-SHA-1' ) - valdb = dbclient.valdb - kres_col = valdb["kres"] - - # HUUUUGE BOTTLENECK - """ - for sentence in kres_data: - kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True) - """ - - # skip if one of the sentences is already in DB - if kres_col.find({"sid": kres_data[0]["sid"]}).count() > 0: - logger.info("File {} already in DB ({}/{})".format( - kres_file, kres_file_idx, n_kres_files)) - return - - kres_data_1 = [_db_preprocess(x) for x in kres_data] - kres_col.insert_many(kres_data_1) # much much better (just make sure sid has a unique index) - logger.info("Inserted data from {} ({}/{}) in {:.2f} s".format( - kres_file, kres_file_idx, n_kres_files, time.time() - tstart)) + # dbclient.valdb["kres"] + for kres_file in kres_chunk: + kres_data = kres_parser.parse_xml_file(kres_file) + if args.output == "file": + kres_outdir = outdir / "kres_json" + kres_outdir.mkdir(parents=True, exist_ok=True) + kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json") + with kres_outfile.open("w") as fp: + json.dump(kres_data, fp) + + elif args.output == "db": + if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0: + logger.info("File {} already in DB, closing chunk ({}/{})".format( + kres_file, kres_chunk_idx, n_chunks)) + dbclient.close() + return + + kres_data_1 = [_db_preprocess(x) for x in kres_data] + db_payload += kres_data_1 + + dbclient.valdb["kres"].insert_many(db_payload) # much much better (just make sure sid has a unique index) + logger.info("Inserted kres files chunk ({}/{}) in {:.2f} s".format( + kres_chunk_idx, n_chunks, time.time() - tstart)) dbclient.close() def _get_dbclient(args): @@ -115,6 +115,7 @@ if __name__ == "__main__": parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--cores', required=False, default=1) + parser.add_argument('--chunk-size', required=False, default=3) args = parser.parse_args() @@ -161,13 +162,19 @@ if __name__ == "__main__": kres_srl_folder=args.kres_srl_folder ) - # [(idx, filepath)] kres_files = [x for x in Path(args.kres_folder).iterdir()] - kres_files = [x for x in enumerate(kres_files)] - n_kres_files = len(kres_files) + kres_files_chunks = [] + i = 0 + while i < len(kres_files): + # kres_files_chunks += kres_files[i:(i+args.chunk_size)] + new_i = i + int(args.chunk_size) + kres_files_chunks += [kres_files[i:new_i]] + i = new_i + kres_files_chunks = [x for x in enumerate(kres_files_chunks)] + n_chunks = len(kres_files_chunks) p = Pool(int(args.cores)) - p.map(_handle_kres_file_tpl, kres_files) + p.map(_handle_kres_file_chunk, kres_files_chunks) logger.info("Finished parsing.")