Compare commits
	
		
			No commits in common. "master" and "bug_fix" have entirely different histories.
		
	
	
		
	
		
| @ -5,7 +5,7 @@ from lxml import etree | |||||||
| import logging | import logging | ||||||
| import time | import time | ||||||
| 
 | 
 | ||||||
| # logging.basicConfig(level=logging.INFO) | logging.basicConfig(level=logging.INFO) | ||||||
| 
 | 
 | ||||||
| # Read input file(.xml, .json; kres or ssj500k).   | # Read input file(.xml, .json; kres or ssj500k).   | ||||||
| # Create an iterator that outputs resulting sentences (python dict format).   | # Create an iterator that outputs resulting sentences (python dict format).   | ||||||
| @ -61,10 +61,7 @@ class Parser(): | |||||||
| 
 | 
 | ||||||
|     def parse_any_links_ssj(self, sent_el, links_type): |     def parse_any_links_ssj(self, sent_el, links_type): | ||||||
|         lgrps = sent_el.findall(".//linkGrp") |         lgrps = sent_el.findall(".//linkGrp") | ||||||
|         try: |  | ||||||
|         links = [x for x in lgrps if x.get("type") == links_type][0] |         links = [x for x in lgrps if x.get("type") == links_type][0] | ||||||
|         except: |  | ||||||
|             return [] |  | ||||||
|         res_links = [] |         res_links = [] | ||||||
|         for link in links: |         for link in links: | ||||||
|             tar = self.parse_ssj_target_arg(link.get("target")) |             tar = self.parse_ssj_target_arg(link.get("target")) | ||||||
|  | |||||||
| @ -9,19 +9,10 @@ import sys | |||||||
| from multiprocessing import Pool | from multiprocessing import Pool | ||||||
| import time | import time | ||||||
| 
 | 
 | ||||||
| CORPORA = ["kres", "ssj"] | logging.basicConfig(level=logging.INFO) | ||||||
| 
 |  | ||||||
| # logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO) |  | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
| 
 | 
 | ||||||
| # lfh = logging.FileHandler("/project/logs/fill-database.log") | n_kres_files = -1  # for logging | ||||||
| lfh = logging.StreamHandler(sys.stdout) |  | ||||||
| formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") |  | ||||||
| lfh.setFormatter(formatter) |  | ||||||
| logger.addHandler(lfh) |  | ||||||
| logger.setLevel(logging.INFO) |  | ||||||
| 
 |  | ||||||
| n_chunks = -1 |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def enriched_lemma(token): | def enriched_lemma(token): | ||||||
| @ -51,14 +42,18 @@ def _db_preprocess(e): | |||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| # handler for concurrency | # handler for concurrency | ||||||
| def _handle_kres_file_chunk(kres_file_chunk): | def _handle_kres_file_tpl(kres_file_tpl): | ||||||
|     tstart = time.time() |     tstart = time.time() | ||||||
|     kres_chunk_idx = kres_file_chunk[0] |     kres_file_idx = kres_file_tpl[0] | ||||||
|     kres_chunk = kres_file_chunk[1] |     kres_file = kres_file_tpl[1] | ||||||
| 
 |     kres_data = kres_parser.parse_xml_file(kres_file) | ||||||
|     dbclient = None |     if args.output == "file": | ||||||
|     db_payload = [] |         kres_outdir = outdir / "kres_json" | ||||||
|     if args.output == "db": |         kres_outdir.mkdir(parents=True, exist_ok=True) | ||||||
|  |         kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json") | ||||||
|  |         with kres_outfile.open("w") as fp: | ||||||
|  |             json.dump(kres_data, fp) | ||||||
|  |     elif args.output == "db": | ||||||
|         # mongoclient needs to be created after forking |         # mongoclient needs to be created after forking | ||||||
|         dbclient = MongoClient( |         dbclient = MongoClient( | ||||||
|             "mongodb://{}".format(args.dbaddr), |             "mongodb://{}".format(args.dbaddr), | ||||||
| @ -67,40 +62,25 @@ def _handle_kres_file_chunk(kres_file_chunk): | |||||||
|             authSource="valdb", |             authSource="valdb", | ||||||
|             authMechanism='SCRAM-SHA-1' |             authMechanism='SCRAM-SHA-1' | ||||||
|         ) |         ) | ||||||
|         # dbclient.valdb["kres"] |         valdb = dbclient.valdb | ||||||
|     for kres_file in kres_chunk: |         kres_col = valdb["kres"] | ||||||
|         try: |  | ||||||
|             kres_data = kres_parser.parse_xml_file(kres_file) |  | ||||||
|         except: |  | ||||||
|             logger.error("Failed to parse file: {}".format(kres_file)) |  | ||||||
|             continue |  | ||||||
|         if args.output == "file": |  | ||||||
|             kres_outdir = outdir / "kres_json" |  | ||||||
|             kres_outdir.mkdir(parents=True, exist_ok=True) |  | ||||||
|             kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json") |  | ||||||
|             with kres_outfile.open("w") as fp: |  | ||||||
|                 json.dump(kres_data, fp) |  | ||||||
| 
 | 
 | ||||||
|         elif args.output == "db": |         # HUUUUGE BOTTLENECK | ||||||
|         """ |         """ | ||||||
|             if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0: |         for sentence in kres_data: | ||||||
|                 logger.info("File {} already in DB, closing chunk ({}/{})".format( |             kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True) | ||||||
|                     kres_file, kres_chunk_idx, n_chunks)) |         """ | ||||||
|                 dbclient.close() | 
 | ||||||
|  |         # skip if one of the sentences is already in DB | ||||||
|  |         if kres_col.find({"sid": kres_data[0]["sid"]}).count() > 0: | ||||||
|  |             logging.info("File {} already in DB ({}/{})".format( | ||||||
|  |                 kres_file, kres_file_idx, n_kres_files)) | ||||||
|             return |             return | ||||||
|             """ |  | ||||||
| 
 | 
 | ||||||
|         kres_data_1 = [_db_preprocess(x) for x in kres_data] |         kres_data_1 = [_db_preprocess(x) for x in kres_data] | ||||||
|             db_payload += kres_data_1 |         kres_col.insert_many(kres_data_1)  # much much better (just make sure sid has a unique index) | ||||||
| 
 |         logging.info("Inserted data from {} ({}/{}) in {:.2f} s".format( | ||||||
|     try: |             kres_file, kres_file_idx, n_kres_files, time.time() - tstart)) | ||||||
|         dbclient.valdb["kres"].insert_many(db_payload, ordered=False)  # much much better (just make sure sid has a unique index) |  | ||||||
|     except: |  | ||||||
|         logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format( |  | ||||||
|             kres_chunk_idx, n_chunks, time.time() - tstart)) |  | ||||||
|     logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format( |  | ||||||
|         kres_chunk_idx, n_chunks, time.time() - tstart)) |  | ||||||
|     dbclient.close() |  | ||||||
| 
 | 
 | ||||||
| def _get_dbclient(args): | def _get_dbclient(args): | ||||||
|     dbclient = MongoClient( |     dbclient = MongoClient( | ||||||
| @ -112,42 +92,34 @@ def _get_dbclient(args): | |||||||
|     ) |     ) | ||||||
|     return dbclient |     return dbclient | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") |     parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") | ||||||
|     parser.add_argument('--corpus', required=True) |     parser.add_argument('--kres-folder', required=True) | ||||||
|     parser.add_argument('--kres-folder', required=False) |     parser.add_argument('--kres-srl-folder', required=True) | ||||||
|     parser.add_argument('--kres-srl-folder', required=False) |     parser.add_argument('--ssj-file', required=True) | ||||||
|     parser.add_argument('--ssj-file', required=False) |  | ||||||
|     parser.add_argument('--output', required=False, default=None) |     parser.add_argument('--output', required=False, default=None) | ||||||
|     parser.add_argument('--outdir', required=False, default=None) |     parser.add_argument('--outdir', required=False, default=None) | ||||||
|     parser.add_argument('--dbaddr', required=False, default=None) |     parser.add_argument('--dbaddr', required=False, default=None) | ||||||
|     parser.add_argument('--dbuser', required=False, default=None) |     parser.add_argument('--dbuser', required=False, default=None) | ||||||
|     parser.add_argument('--dbpass', required=False, default=None) |     parser.add_argument('--dbpass', required=False, default=None) | ||||||
|     parser.add_argument('--cores', required=False, default=1) |     parser.add_argument('--cores', required=False, default=1) | ||||||
|     parser.add_argument('--chunk-size', required=False, default=3) |  | ||||||
|     args = parser.parse_args() |     args = parser.parse_args() | ||||||
| 
 | 
 | ||||||
| 
 |  | ||||||
|     corpus = args.corpus |  | ||||||
|     assert (corpus in CORPORA), "Wrong corpus name." |  | ||||||
| 
 |  | ||||||
| 
 |  | ||||||
|     outdir = None |     outdir = None | ||||||
|     if args.output == "file": |     if args.output == "file": | ||||||
|         outdir = Path(args.outdir) |         outdir = Path(args.outdir) | ||||||
|         outdir.mkdir(parents=True, exist_ok=True) |         outdir.mkdir(parents=True, exist_ok=True) | ||||||
|     elif args.output == "db": |     elif args.output == "db": | ||||||
|  |         # Force unique sid | ||||||
|         dbclient = _get_dbclient(args) |         dbclient = _get_dbclient(args) | ||||||
|  |         for corpus in ["kres", "ssj"]: | ||||||
|             dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)]) |             dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)]) | ||||||
|             dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)]) |             dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)]) | ||||||
|             dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)]) |             dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)]) | ||||||
|         dbclient.close() |  | ||||||
| 
 | 
 | ||||||
| 
 |     # SSJ | ||||||
|     if corpus == "ssj": |  | ||||||
|     logger.info("Parsing Ssj: {}".format(args.ssj_file)) |     logger.info("Parsing Ssj: {}".format(args.ssj_file)) | ||||||
|         ssj_parser = Parser(logger=logger, corpus="ssj") |     ssj_parser = Parser(corpus="ssj") | ||||||
|     ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file)) |     ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file)) | ||||||
|     if args.output == "file": |     if args.output == "file": | ||||||
|         ssj_outfile = outdir / "ssj500k.json" |         ssj_outfile = outdir / "ssj500k.json" | ||||||
| @ -160,32 +132,21 @@ if __name__ == "__main__": | |||||||
|         for sentence in ssj_data: |         for sentence in ssj_data: | ||||||
|             sentence = _db_preprocess(sentence) |             sentence = _db_preprocess(sentence) | ||||||
|             ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True) |             ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True) | ||||||
|             dbclient.close() |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|     if corpus == "kres": |  | ||||||
|     # Kres |     # Kres | ||||||
|     logger.info("Parsing Kres: {}".format(args.kres_folder)) |     logger.info("Parsing Kres: {}".format(args.kres_folder)) | ||||||
|     kres_parser = Parser( |     kres_parser = Parser( | ||||||
|             logger=logger, |  | ||||||
|         corpus="kres", |         corpus="kres", | ||||||
|         kres_srl_folder=args.kres_srl_folder |         kres_srl_folder=args.kres_srl_folder | ||||||
|     ) |     ) | ||||||
| 
 | 
 | ||||||
|  |     # [(idx, filepath)] | ||||||
|     kres_files = [x for x in Path(args.kres_folder).iterdir()] |     kres_files = [x for x in Path(args.kres_folder).iterdir()] | ||||||
|         kres_files = sorted(kres_files, key=lambda x: x.name) |     kres_files = [x for x in enumerate(kres_files)] | ||||||
|         kres_files_chunks = [] |     n_kres_files = len(kres_files) | ||||||
|         i = 0 |  | ||||||
|         while i < len(kres_files): |  | ||||||
|             # kres_files_chunks += kres_files[i:(i+args.chunk_size)] |  | ||||||
|             new_i = i + int(args.chunk_size) |  | ||||||
|             kres_files_chunks += [kres_files[i:new_i]] |  | ||||||
|             i = new_i |  | ||||||
|         kres_files_chunks = [x for x in enumerate(kres_files_chunks)] |  | ||||||
|         n_chunks = len(kres_files_chunks) |  | ||||||
| 
 | 
 | ||||||
|     p = Pool(int(args.cores)) |     p = Pool(int(args.cores)) | ||||||
|         p.map(_handle_kres_file_chunk, kres_files_chunks) |     p.map(_handle_kres_file_tpl, kres_files) | ||||||
| 
 |  | ||||||
| 
 | 
 | ||||||
|     logger.info("Finished parsing.") |     logger.info("Finished parsing.") | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user