Compare commits

..

5 Commits

Author SHA1 Message Date
01adf47b9b a 2019-04-22 09:06:02 +02:00
2582314c4d bugfix 2019-04-21 22:24:36 +02:00
792c0b03fd processing kres files in chunks to reduce number of DB connections 2019-04-21 21:50:56 +02:00
3276619e6f close dbclient after usage 2019-04-21 19:35:39 +02:00
bcc64c767c separate parsing of ssj and kres (mem management) 2019-04-21 19:18:19 +02:00

View File

@@ -9,17 +9,19 @@ import sys
from multiprocessing import Pool from multiprocessing import Pool
import time import time
CORPORA = ["kres", "ssj"]
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO) # logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# lfh = logging.FileHandler("/var/tmp/fill-database.log") # lfh = logging.FileHandler("/project/logs/fill-database.log")
lfh = logging.StreamHandler(sys.stdout) lfh = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s") formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
lfh.setFormatter(formatter) lfh.setFormatter(formatter)
logger.addHandler(lfh) logger.addHandler(lfh)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
n_kres_files = -1 # for logging n_chunks = -1
def enriched_lemma(token): def enriched_lemma(token):
@@ -49,18 +51,14 @@ def _db_preprocess(e):
# handler for concurrency # handler for concurrency
def _handle_kres_file_tpl(kres_file_tpl): def _handle_kres_file_chunk(kres_file_chunk):
tstart = time.time() tstart = time.time()
kres_file_idx = kres_file_tpl[0] kres_chunk_idx = kres_file_chunk[0]
kres_file = kres_file_tpl[1] kres_chunk = kres_file_chunk[1]
kres_data = kres_parser.parse_xml_file(kres_file)
if args.output == "file": dbclient = None
kres_outdir = outdir / "kres_json" db_payload = []
kres_outdir.mkdir(parents=True, exist_ok=True) if args.output == "db":
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
elif args.output == "db":
# mongoclient needs to be created after forking # mongoclient needs to be created after forking
dbclient = MongoClient( dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr), "mongodb://{}".format(args.dbaddr),
@@ -69,25 +67,40 @@ def _handle_kres_file_tpl(kres_file_tpl):
authSource="valdb", authSource="valdb",
authMechanism='SCRAM-SHA-1' authMechanism='SCRAM-SHA-1'
) )
valdb = dbclient.valdb # dbclient.valdb["kres"]
kres_col = valdb["kres"] for kres_file in kres_chunk:
try:
kres_data = kres_parser.parse_xml_file(kres_file)
except:
logger.error("Failed to parse file: {}".format(kres_file))
continue
if args.output == "file":
kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
# HUUUUGE BOTTLENECK elif args.output == "db":
""" """
for sentence in kres_data: if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0:
kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True) logger.info("File {} already in DB, closing chunk ({}/{})".format(
""" kres_file, kres_chunk_idx, n_chunks))
dbclient.close()
return
"""
# skip if one of the sentences is already in DB kres_data_1 = [_db_preprocess(x) for x in kres_data]
if kres_col.find({"sid": kres_data[0]["sid"]}).count() > 0: db_payload += kres_data_1
logger.info("File {} already in DB ({}/{})".format(
kres_file, kres_file_idx, n_kres_files))
return
kres_data_1 = [_db_preprocess(x) for x in kres_data] try:
kres_col.insert_many(kres_data_1) # much much better (just make sure sid has a unique index) dbclient.valdb["kres"].insert_many(db_payload, ordered=False) # much much better (just make sure sid has a unique index)
logger.info("Inserted data from {} ({}/{}) in {:.2f} s".format( except:
kres_file, kres_file_idx, n_kres_files, time.time() - tstart)) logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
dbclient.close()
def _get_dbclient(args): def _get_dbclient(args):
dbclient = MongoClient( dbclient = MongoClient(
@@ -100,72 +113,79 @@ def _get_dbclient(args):
return dbclient return dbclient
# wrap it in a function for better garbage collection
def parse_ssj(args):
logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(logger=logger, corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
del ssj_parser
del ssj_data
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
parser.add_argument('--kres-folder', required=True) parser.add_argument('--corpus', required=True)
parser.add_argument('--kres-srl-folder', required=True) parser.add_argument('--kres-folder', required=False)
parser.add_argument('--ssj-file', required=True) parser.add_argument('--kres-srl-folder', required=False)
parser.add_argument('--ssj-file', required=False)
parser.add_argument('--output', required=False, default=None) parser.add_argument('--output', required=False, default=None)
parser.add_argument('--outdir', required=False, default=None) parser.add_argument('--outdir', required=False, default=None)
parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1) parser.add_argument('--cores', required=False, default=1)
parser.add_argument('--chunk-size', required=False, default=3)
args = parser.parse_args() args = parser.parse_args()
corpus = args.corpus
assert (corpus in CORPORA), "Wrong corpus name."
outdir = None outdir = None
if args.output == "file": if args.output == "file":
outdir = Path(args.outdir) outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True) outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db": elif args.output == "db":
# Force unique sid
dbclient = _get_dbclient(args) dbclient = _get_dbclient(args)
for corpus in ["kres", "ssj"]: dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)]) dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)]) dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)]) dbclient.close()
# SSJ if corpus == "ssj":
p = Pool(1) logger.info("Parsing Ssj: {}".format(args.ssj_file))
p.map(parse_ssj, [args]) ssj_parser = Parser(logger=logger, corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
dbclient.close()
time.sleep(30)
# Kres if corpus == "kres":
logger.info("Parsing Kres: {}".format(args.kres_folder)) # Kres
kres_parser = Parser( logger.info("Parsing Kres: {}".format(args.kres_folder))
logger=logger, kres_parser = Parser(
corpus="kres", logger=logger,
kres_srl_folder=args.kres_srl_folder corpus="kres",
) kres_srl_folder=args.kres_srl_folder
)
# [(idx, filepath)] kres_files = [x for x in Path(args.kres_folder).iterdir()]
kres_files = [x for x in Path(args.kres_folder).iterdir()] kres_files = sorted(kres_files, key=lambda x: x.name)
kres_files = [x for x in enumerate(kres_files)] kres_files_chunks = []
n_kres_files = len(kres_files) i = 0
while i < len(kres_files):
# kres_files_chunks += kres_files[i:(i+args.chunk_size)]
new_i = i + int(args.chunk_size)
kres_files_chunks += [kres_files[i:new_i]]
i = new_i
kres_files_chunks = [x for x in enumerate(kres_files_chunks)]
n_chunks = len(kres_files_chunks)
p = Pool(int(args.cores))
p.map(_handle_kres_file_chunk, kres_files_chunks)
p = Pool(int(args.cores))
p.map(_handle_kres_file_tpl, kres_files)
logger.info("Finished parsing.") logger.info("Finished parsing.")