processing kres files in chunks to reduce number of DB connections
This commit is contained in:
parent
3276619e6f
commit
792c0b03fd
|
@ -14,14 +14,14 @@ CORPORA = ["kres", "ssj"]
|
|||
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
lfh = logging.FileHandler("/project/logs/fill-database.log")
|
||||
# lfh = logging.StreamHandler(sys.stdout)
|
||||
# lfh = logging.FileHandler("/project/logs/fill-database.log")
|
||||
lfh = logging.StreamHandler(sys.stdout)
|
||||
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
|
||||
lfh.setFormatter(formatter)
|
||||
logger.addHandler(lfh)
|
||||
logger.setLevel(logging.INFO)
|
||||
|
||||
n_kres_files = -1 # for logging
|
||||
n_chunks = -1 # for logging
|
||||
|
||||
|
||||
def enriched_lemma(token):
|
||||
|
@ -51,18 +51,14 @@ def _db_preprocess(e):
|
|||
|
||||
|
||||
# handler for concurrency
|
||||
def _handle_kres_file_tpl(kres_file_tpl):
|
||||
def _handle_kres_file_chunk(kres_file_chunk):
|
||||
tstart = time.time()
|
||||
kres_file_idx = kres_file_tpl[0]
|
||||
kres_file = kres_file_tpl[1]
|
||||
kres_data = kres_parser.parse_xml_file(kres_file)
|
||||
if args.output == "file":
|
||||
kres_outdir = outdir / "kres_json"
|
||||
kres_outdir.mkdir(parents=True, exist_ok=True)
|
||||
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
|
||||
with kres_outfile.open("w") as fp:
|
||||
json.dump(kres_data, fp)
|
||||
elif args.output == "db":
|
||||
kres_chunk_idx = kres_file_chunk[0]
|
||||
kres_chunk = kres_file_chunk[1]
|
||||
|
||||
dbclient = None
|
||||
db_payload = []
|
||||
if args.output == "db":
|
||||
# mongoclient needs to be created after forking
|
||||
dbclient = MongoClient(
|
||||
"mongodb://{}".format(args.dbaddr),
|
||||
|
@ -71,25 +67,29 @@ def _handle_kres_file_tpl(kres_file_tpl):
|
|||
authSource="valdb",
|
||||
authMechanism='SCRAM-SHA-1'
|
||||
)
|
||||
valdb = dbclient.valdb
|
||||
kres_col = valdb["kres"]
|
||||
# dbclient.valdb["kres"]
|
||||
for kres_file in kres_chunk:
|
||||
kres_data = kres_parser.parse_xml_file(kres_file)
|
||||
if args.output == "file":
|
||||
kres_outdir = outdir / "kres_json"
|
||||
kres_outdir.mkdir(parents=True, exist_ok=True)
|
||||
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
|
||||
with kres_outfile.open("w") as fp:
|
||||
json.dump(kres_data, fp)
|
||||
|
||||
# HUUUUGE BOTTLENECK
|
||||
"""
|
||||
for sentence in kres_data:
|
||||
kres_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
|
||||
"""
|
||||
elif args.output == "db":
|
||||
if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0:
|
||||
logger.info("File {} already in DB, closing chunk ({}/{})".format(
|
||||
kres_file, kres_chunk_idx, n_chunks))
|
||||
dbclient.close()
|
||||
return
|
||||
|
||||
# skip if one of the sentences is already in DB
|
||||
if kres_col.find({"sid": kres_data[0]["sid"]}).count() > 0:
|
||||
logger.info("File {} already in DB ({}/{})".format(
|
||||
kres_file, kres_file_idx, n_kres_files))
|
||||
return
|
||||
kres_data_1 = [_db_preprocess(x) for x in kres_data]
|
||||
db_payload += kres_data_1
|
||||
|
||||
kres_data_1 = [_db_preprocess(x) for x in kres_data]
|
||||
kres_col.insert_many(kres_data_1) # much much better (just make sure sid has a unique index)
|
||||
logger.info("Inserted data from {} ({}/{}) in {:.2f} s".format(
|
||||
kres_file, kres_file_idx, n_kres_files, time.time() - tstart))
|
||||
dbclient.valdb["kres"].insert_many(db_payload) # much much better (just make sure sid has a unique index)
|
||||
logger.info("Inserted kres files chunk ({}/{}) in {:.2f} s".format(
|
||||
kres_chunk_idx, n_chunks, time.time() - tstart))
|
||||
dbclient.close()
|
||||
|
||||
def _get_dbclient(args):
|
||||
|
@ -115,6 +115,7 @@ if __name__ == "__main__":
|
|||
parser.add_argument('--dbuser', required=False, default=None)
|
||||
parser.add_argument('--dbpass', required=False, default=None)
|
||||
parser.add_argument('--cores', required=False, default=1)
|
||||
parser.add_argument('--chunk-size', required=False, default=3)
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
|
@ -161,13 +162,19 @@ if __name__ == "__main__":
|
|||
kres_srl_folder=args.kres_srl_folder
|
||||
)
|
||||
|
||||
# [(idx, filepath)]
|
||||
kres_files = [x for x in Path(args.kres_folder).iterdir()]
|
||||
kres_files = [x for x in enumerate(kres_files)]
|
||||
n_kres_files = len(kres_files)
|
||||
kres_files_chunks = []
|
||||
i = 0
|
||||
while i < len(kres_files):
|
||||
# kres_files_chunks += kres_files[i:(i+args.chunk_size)]
|
||||
new_i = i + int(args.chunk_size)
|
||||
kres_files_chunks += [kres_files[i:new_i]]
|
||||
i = new_i
|
||||
kres_files_chunks = [x for x in enumerate(kres_files_chunks)]
|
||||
n_chunks = len(kres_files_chunks)
|
||||
|
||||
p = Pool(int(args.cores))
|
||||
p.map(_handle_kres_file_tpl, kres_files)
|
||||
p.map(_handle_kres_file_chunk, kres_files_chunks)
|
||||
|
||||
|
||||
logger.info("Finished parsing.")
|
||||
|
|
Loading…
Reference in New Issue
Block a user