Compare commits
No commits in common. "master" and "bug_fix" have entirely different histories.
|
@ -5,7 +5,7 @@ from lxml import etree
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
# Read input file(.xml, .json; kres or ssj500k).
|
# Read input file(.xml, .json; kres or ssj500k).
|
||||||
# Create an iterator that outputs resulting sentences (python dict format).
|
# Create an iterator that outputs resulting sentences (python dict format).
|
||||||
|
@ -61,10 +61,7 @@ class Parser():
|
||||||
|
|
||||||
def parse_any_links_ssj(self, sent_el, links_type):
|
def parse_any_links_ssj(self, sent_el, links_type):
|
||||||
lgrps = sent_el.findall(".//linkGrp")
|
lgrps = sent_el.findall(".//linkGrp")
|
||||||
try:
|
|
||||||
links = [x for x in lgrps if x.get("type") == links_type][0]
|
links = [x for x in lgrps if x.get("type") == links_type][0]
|
||||||
except:
|
|
||||||
return []
|
|
||||||
res_links = []
|
res_links = []
|
||||||
for link in links:
|
for link in links:
|
||||||
tar = self.parse_ssj_target_arg(link.get("target"))
|
tar = self.parse_ssj_target_arg(link.get("target"))
|
||||||
|
|
|
@ -9,19 +9,10 @@ import sys
|
||||||
from multiprocessing import Pool
|
from multiprocessing import Pool
|
||||||
import time
|
import time
|
||||||
|
|
||||||
CORPORA = ["kres", "ssj"]
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
# lfh = logging.FileHandler("/project/logs/fill-database.log")
|
n_kres_files = -1 # for logging
|
||||||
lfh = logging.StreamHandler(sys.stdout)
|
|
||||||
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
|
|
||||||
lfh.setFormatter(formatter)
|
|
||||||
logger.addHandler(lfh)
|
|
||||||
logger.setLevel(logging.INFO)
|
|
||||||
|
|
||||||
n_chunks = -1
|
|
||||||
|
|
||||||
|
|
||||||
def enriched_lemma(token):
|
def enriched_lemma(token):
|
||||||
|
@ -51,14 +42,18 @@ def _db_preprocess(e):
|
||||||
|
|
||||||
|
|
||||||
# handler for concurrency
|
# handler for concurrency
|
||||||
def _handle_kres_file_chunk(kres_file_chunk):
|
def _handle_kres_file_tpl(kres_file_tpl):
|
||||||
tstart = time.time()
|
tstart = time.time()
|
||||||
kres_chunk_idx = kres_file_chunk[0]
|
kres_file_idx = kres_file_tpl[0]
|
||||||
kres_chunk = kres_file_chunk[1]
|
kres_file = kres_file_tpl[1]
|
||||||
|
kres_data = kres_parser.parse_xml_file(kres_file)
|
||||||
dbclient = None
|
if args.output == "file":
|
||||||
db_payload = []
|
kres_outdir = outdir / "kres_json"
|
||||||
if args.output == "db":
|
kres_outdir.mkdir(parents=True, exist_ok=True)
|
||||||
|
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
|
||||||
|
with kres_outfile.open("w") as fp:
|
||||||
|
json.dump(kres_data, fp)
|
||||||
|
elif args.output == "db":
|
||||||
# mongoclient needs to be created after forking
|
# mongoclient needs to be created after forking
|
||||||
dbclient = MongoClient(
|
dbclient = MongoClient(
|
||||||
"mongodb://{}".format(args.dbaddr),
|
"mongodb://{}".format(args.dbaddr),
|
||||||
|
@ -67,40 +62,25 @@ def _handle_kres_file_chunk(kres_file_chunk):
|
||||||
authSource="valdb",
|
authSource="valdb",
|
||||||
authMechanism='SCRAM-SHA-1'
|
authMechanism='SCRAM-SHA-1'
|
||||||
)
|
)
|
||||||
# dbclient.valdb["kres"]
|
valdb = dbclient.valdb
|
||||||
for kres_file in kres_chunk:
|
kres_col = valdb["kres"]
|
||||||
try:
|
|
||||||
kres_data = kres_parser.parse_xml_file(kres_file)
|
|
||||||
except:
|
|
||||||
logger.error("Failed to parse file: {}".format(kres_file))
|
|
||||||
continue
|
|
||||||
if args.output == "file":
|
|
||||||
kres_outdir = outdir / "kres_json"
|
|
||||||
kres_outdir.mkdir(parents=True, exist_ok=True)
|
|
||||||
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
|
|
||||||
with kres_outfile.open("w") as fp:
|
|
||||||
json.dump(kres_data, fp)
|
|
||||||
|
|
||||||
elif args.output == "db":
|
# HUUUUGE BOTTLENECK
|
||||||
"""
|
"""
|
||||||
if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0:
|
for sentence in kres_data:
|
||||||
logger.info("File {} already in DB, closing chunk ({}/{})".format(
|
kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
|
||||||
kres_file, kres_chunk_idx, n_chunks))
|
"""
|
||||||
dbclient.close()
|
|
||||||
|
# skip if one of the sentences is already in DB
|
||||||
|
if kres_col.find({"sid": kres_data[0]["sid"]}).count() > 0:
|
||||||
|
logging.info("File {} already in DB ({}/{})".format(
|
||||||
|
kres_file, kres_file_idx, n_kres_files))
|
||||||
return
|
return
|
||||||
"""
|
|
||||||
|
|
||||||
kres_data_1 = [_db_preprocess(x) for x in kres_data]
|
kres_data_1 = [_db_preprocess(x) for x in kres_data]
|
||||||
db_payload += kres_data_1
|
kres_col.insert_many(kres_data_1) # much much better (just make sure sid has a unique index)
|
||||||
|
logging.info("Inserted data from {} ({}/{}) in {:.2f} s".format(
|
||||||
try:
|
kres_file, kres_file_idx, n_kres_files, time.time() - tstart))
|
||||||
dbclient.valdb["kres"].insert_many(db_payload, ordered=False) # much much better (just make sure sid has a unique index)
|
|
||||||
except:
|
|
||||||
logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format(
|
|
||||||
kres_chunk_idx, n_chunks, time.time() - tstart))
|
|
||||||
logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format(
|
|
||||||
kres_chunk_idx, n_chunks, time.time() - tstart))
|
|
||||||
dbclient.close()
|
|
||||||
|
|
||||||
def _get_dbclient(args):
|
def _get_dbclient(args):
|
||||||
dbclient = MongoClient(
|
dbclient = MongoClient(
|
||||||
|
@ -112,42 +92,34 @@ def _get_dbclient(args):
|
||||||
)
|
)
|
||||||
return dbclient
|
return dbclient
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
|
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
|
||||||
parser.add_argument('--corpus', required=True)
|
parser.add_argument('--kres-folder', required=True)
|
||||||
parser.add_argument('--kres-folder', required=False)
|
parser.add_argument('--kres-srl-folder', required=True)
|
||||||
parser.add_argument('--kres-srl-folder', required=False)
|
parser.add_argument('--ssj-file', required=True)
|
||||||
parser.add_argument('--ssj-file', required=False)
|
|
||||||
parser.add_argument('--output', required=False, default=None)
|
parser.add_argument('--output', required=False, default=None)
|
||||||
parser.add_argument('--outdir', required=False, default=None)
|
parser.add_argument('--outdir', required=False, default=None)
|
||||||
parser.add_argument('--dbaddr', required=False, default=None)
|
parser.add_argument('--dbaddr', required=False, default=None)
|
||||||
parser.add_argument('--dbuser', required=False, default=None)
|
parser.add_argument('--dbuser', required=False, default=None)
|
||||||
parser.add_argument('--dbpass', required=False, default=None)
|
parser.add_argument('--dbpass', required=False, default=None)
|
||||||
parser.add_argument('--cores', required=False, default=1)
|
parser.add_argument('--cores', required=False, default=1)
|
||||||
parser.add_argument('--chunk-size', required=False, default=3)
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
corpus = args.corpus
|
|
||||||
assert (corpus in CORPORA), "Wrong corpus name."
|
|
||||||
|
|
||||||
|
|
||||||
outdir = None
|
outdir = None
|
||||||
if args.output == "file":
|
if args.output == "file":
|
||||||
outdir = Path(args.outdir)
|
outdir = Path(args.outdir)
|
||||||
outdir.mkdir(parents=True, exist_ok=True)
|
outdir.mkdir(parents=True, exist_ok=True)
|
||||||
elif args.output == "db":
|
elif args.output == "db":
|
||||||
|
# Force unique sid
|
||||||
dbclient = _get_dbclient(args)
|
dbclient = _get_dbclient(args)
|
||||||
|
for corpus in ["kres", "ssj"]:
|
||||||
dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
|
dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
|
||||||
dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
|
dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
|
||||||
dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
|
dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
|
||||||
dbclient.close()
|
|
||||||
|
|
||||||
|
# SSJ
|
||||||
if corpus == "ssj":
|
|
||||||
logger.info("Parsing Ssj: {}".format(args.ssj_file))
|
logger.info("Parsing Ssj: {}".format(args.ssj_file))
|
||||||
ssj_parser = Parser(logger=logger, corpus="ssj")
|
ssj_parser = Parser(corpus="ssj")
|
||||||
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
|
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
|
||||||
if args.output == "file":
|
if args.output == "file":
|
||||||
ssj_outfile = outdir / "ssj500k.json"
|
ssj_outfile = outdir / "ssj500k.json"
|
||||||
|
@ -160,32 +132,21 @@ if __name__ == "__main__":
|
||||||
for sentence in ssj_data:
|
for sentence in ssj_data:
|
||||||
sentence = _db_preprocess(sentence)
|
sentence = _db_preprocess(sentence)
|
||||||
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
|
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
|
||||||
dbclient.close()
|
|
||||||
|
|
||||||
|
|
||||||
if corpus == "kres":
|
|
||||||
# Kres
|
# Kres
|
||||||
logger.info("Parsing Kres: {}".format(args.kres_folder))
|
logger.info("Parsing Kres: {}".format(args.kres_folder))
|
||||||
kres_parser = Parser(
|
kres_parser = Parser(
|
||||||
logger=logger,
|
|
||||||
corpus="kres",
|
corpus="kres",
|
||||||
kres_srl_folder=args.kres_srl_folder
|
kres_srl_folder=args.kres_srl_folder
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# [(idx, filepath)]
|
||||||
kres_files = [x for x in Path(args.kres_folder).iterdir()]
|
kres_files = [x for x in Path(args.kres_folder).iterdir()]
|
||||||
kres_files = sorted(kres_files, key=lambda x: x.name)
|
kres_files = [x for x in enumerate(kres_files)]
|
||||||
kres_files_chunks = []
|
n_kres_files = len(kres_files)
|
||||||
i = 0
|
|
||||||
while i < len(kres_files):
|
|
||||||
# kres_files_chunks += kres_files[i:(i+args.chunk_size)]
|
|
||||||
new_i = i + int(args.chunk_size)
|
|
||||||
kres_files_chunks += [kres_files[i:new_i]]
|
|
||||||
i = new_i
|
|
||||||
kres_files_chunks = [x for x in enumerate(kres_files_chunks)]
|
|
||||||
n_chunks = len(kres_files_chunks)
|
|
||||||
|
|
||||||
p = Pool(int(args.cores))
|
p = Pool(int(args.cores))
|
||||||
p.map(_handle_kres_file_chunk, kres_files_chunks)
|
p.map(_handle_kres_file_tpl, kres_files)
|
||||||
|
|
||||||
|
|
||||||
logger.info("Finished parsing.")
|
logger.info("Finished parsing.")
|
||||||
|
|
Loading…
Reference in New Issue
Block a user