Compare commits

...

18 Commits

Author SHA1 Message Date
01adf47b9b a 2019-04-22 09:06:02 +02:00
2582314c4d bugfix 2019-04-21 22:24:36 +02:00
792c0b03fd processing kres files in chunks to reduce number of DB connections 2019-04-21 21:50:56 +02:00
3276619e6f close dbclient after usage 2019-04-21 19:35:39 +02:00
bcc64c767c separate parsing of ssj and kres (mem management) 2019-04-21 19:18:19 +02:00
00d9192993 moved parse_ssj into a subprocess 2019-04-21 17:04:20 +02:00
14c607c106 Merge branch 'master' of gitea.cjvt.si:kristjan/cjvt-corpusparser into my-fix 2019-04-21 16:07:15 +02:00
94d5a6cd73 added exception to parsing ssj 2019-04-21 16:07:08 +02:00
5ae1a9783c fixed some logging 2019-04-21 13:11:05 +02:00
voje
c6b8426fb3 added adjective handling (appending _ to headwords) 2019-04-19 07:41:50 +02:00
af4f6045bb prevent duplicate entries in DB 2019-04-15 20:48:10 +02:00
f0b0abac1b added functors and headwords to db entry 2019-04-15 02:34:53 +02:00
86e56767dd added parallel processing 2019-04-15 00:25:26 +02:00
cce83045e8 adding per-file parsing, for parallel use 2019-04-14 17:16:45 +02:00
19945a9dd9 changed default mongo auth mechanism 2019-04-14 16:50:54 +02:00
c17361fbda added more logging 2019-04-14 04:18:52 +02:00
voje
2b7339ac5a update instead of insert, fixing sentence duplication in db 2019-04-11 07:55:44 +02:00
77c599dded bug fix 2019-03-24 13:44:47 +01:00
3 changed files with 195 additions and 103 deletions

View File

@@ -3,24 +3,21 @@ import re
import json import json
from lxml import etree from lxml import etree
import logging import logging
import time
logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.INFO)
# Read input file(.xml, .json; kres or ssj500k). # Read input file(.xml, .json; kres or ssj500k).
# Create an iterator that outputs resulting sentences (python dict format). # Create an iterator that outputs resulting sentences (python dict format).
class Parser(): class Parser():
def __init__(self, corpus, infiles, logger=None): def __init__(self, corpus, kres_srl_folder=None, logger=None):
if corpus == "kres":
self.kres_folder = Path(infiles[0])
self.kres_srl_folder = Path(infiles[1])
elif corpus == "ssj":
self.ssj_file = Path(infiles[0])
else:
raise ValueError("Argument corpus should be 'ssj' or 'kres'.")
self.corpus = corpus self.corpus = corpus
if self.corpus == "kres":
self.kres_srl_folder = kres_srl_folder
self.W_TAGS = ['w'] self.W_TAGS = ['w']
self.C_TAGS = ['c'] self.C_TAGS = ['c']
self.S_TAGS = ['S', 'pc'] self.S_TAGS = ['S', 'pc']
@@ -30,6 +27,10 @@ class Parser():
"missing_srl": [] "missing_srl": []
} }
# for logging output
self.n_kres_files = -1
self.nth_kres_file = -1
def parse_jos_links(self, sent_el): def parse_jos_links(self, sent_el):
if self.corpus == "kres": if self.corpus == "kres":
return self.parse_jos_links_kres(sent_el) return self.parse_jos_links_kres(sent_el)
@@ -60,7 +61,10 @@ class Parser():
def parse_any_links_ssj(self, sent_el, links_type): def parse_any_links_ssj(self, sent_el, links_type):
lgrps = sent_el.findall(".//linkGrp") lgrps = sent_el.findall(".//linkGrp")
links = [x for x in lgrps if x.get("type") == links_type][0] try:
links = [x for x in lgrps if x.get("type") == links_type][0]
except:
return []
res_links = [] res_links = []
for link in links: for link in links:
tar = self.parse_ssj_target_arg(link.get("target")) tar = self.parse_ssj_target_arg(link.get("target"))
@@ -90,14 +94,34 @@ class Parser():
def sentence_generator(self): def sentence_generator(self):
# Using generators so we don't copy a whole corpu around in memory. # Using generators so we don't copy a whole corpu around in memory.
# Use parse_xml_file() instead for pre-file processing (parallelism?)
if self.corpus == "kres": if self.corpus == "kres":
# some logging output
if self.n_kres_files == -1:
self.n_kres_files = len(list(Path(self.kres_folder).glob('*')))
for xml_file in self.kres_folder.iterdir(): for xml_file in self.kres_folder.iterdir():
# self.parse_xml_file(xml_file) self.nth_kres_file += 1
yield from self.parse_xml_file(xml_file) self.logger.info("{} ({}/{})".format(
xml_file, self.nth_kres_file, self.n_kres_files))
yield from self.xml_file_to_generator(xml_file)
else: else:
yield from self.parse_xml_file(self.ssj_file) yield from self.xml_file_to_generator(self.ssj_file)
def parse_xml_file(self, xml_file): def parse_xml_file(self, xml_file):
# tstart = time.time()
file_data = []
for tpl in self.xml_file_to_generator(xml_file):
file_data += [tpl[1]]
tend = time.time()
# self.logger.info("Parsed {} in {:.4f} s".format(xml_file, tend - tstart))
return file_data
def xml_file_to_generator(self, xml_file):
# for separate srl links, it will guess the srl file based on
# self.kres_srl_folder
srl_from_json = {} srl_from_json = {}
if self.corpus == "kres": if self.corpus == "kres":
# in case of kres, read the SRL links form a separate json file # in case of kres, read the SRL links form a separate json file
@@ -190,7 +214,7 @@ class Parser():
"text": sentence_text, "text": sentence_text,
"tokens": sentence_tokens, "tokens": sentence_tokens,
"jos_links": jos_links, "jos_links": jos_links,
"srl_links": srl_links_parsed "srl_links": srl_links_parsed,
} }
self.stats["parsed_count"] += 1 self.stats["parsed_count"] += 1
yield (xml_file, sentence_entry) yield (xml_file, sentence_entry)

View File

@@ -1 +1,2 @@
from corpusparser.Parser import Parser from corpusparser.Parser import Parser
from corpusparser.main import enriched_lemma

View File

@@ -4,121 +4,188 @@ import argparse
import logging import logging
import json import json
from pymongo import MongoClient from pymongo import MongoClient
import pymongo
import sys import sys
from multiprocessing import Pool
import time
logging.basicConfig(level=logging.INFO) CORPORA = ["kres", "ssj"]
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
## Main handles command line arguments and writing to files / DB. # lfh = logging.FileHandler("/project/logs/fill-database.log")
lfh = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
lfh.setFormatter(formatter)
logger.addHandler(lfh)
logger.setLevel(logging.INFO)
def ssj_to_json_file(sentence_generator, outfolder): n_chunks = -1
# this funciton is based on the fact that files are parsed sequentially
outfolder = Path(outfolder)
outfolder.mkdir(parents=True, exist_ok=True)
outfile = outfolder / "ssj500k.json"
data_buffer = []
for s in sentence_generator:
sdata = s[1]
data_buffer += [sdata]
# outfile = Path(outfile) def enriched_lemma(token):
with outfile.open("w") as fp: return (token["lemma"] if token["msd"][0] == "G" else token["lemma"] + "_")
logger.info("Writing to {}".format(outfile))
json.dump(data_buffer, fp)
def kres_to_json_files(sentence_generator, outfolder):
outfolder = Path(outfolder) / "kres_json"
outfolder.mkdir(parents=True, exist_ok=True)
def write_buffer_to_file(outfile, outfile_buffer): def _helper_tid_to_token(tid, tokens):
logger.info("Writing file: {}".format(outfile)) for t in tokens:
with outfile.open("w") as fp: if t["tid"] == tid:
json.dump(outfile_buffer, fp) return t
return None
outfile_buffer = None
current_outfile = None
for s in sentence_generator:
infile = s[0]
outfile = outfolder / Path(infile.name.split(".")[0]).with_suffix(".json")
# parser sequentially parses files; when we're done with a file, write it out def _db_preprocess(e):
if current_outfile is None: if e["srl_links"] is None:
current_outfile = outfile e["headwords"] = []
outfile_buffer = [] e["functors"] = []
elif outfile != current_outfile: else:
write_buffer_to_file(current_outfile, outfile_buffer) hw_tids = list(set([x["from"] for x in e["srl_links"]]))
current_outfile = outfile hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids]
outfile_buffer = [] headwords = [enriched_lemma(t) for t in hw_tokens]
e["headwords"] = headwords
# update buffer functors = list(set([x["afun"] for x in e["srl_links"]]))
sdata = s[1] e["functors"] = functors
outfile_buffer += [sdata] return e
write_buffer_to_file(current_outfile, outfile_buffer)
def data_to_valdb(sentence_generator, dbaddr, username, password, collection_name):
logger.info("Connecting to: {}".format(dbaddr)) # handler for concurrency
client = MongoClient( def _handle_kres_file_chunk(kres_file_chunk):
"mongodb://{}".format(dbaddr), tstart = time.time()
username=username, kres_chunk_idx = kres_file_chunk[0]
password=password, kres_chunk = kres_file_chunk[1]
dbclient = None
db_payload = []
if args.output == "db":
# mongoclient needs to be created after forking
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
# dbclient.valdb["kres"]
for kres_file in kres_chunk:
try:
kres_data = kres_parser.parse_xml_file(kres_file)
except:
logger.error("Failed to parse file: {}".format(kres_file))
continue
if args.output == "file":
kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
elif args.output == "db":
"""
if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0:
logger.info("File {} already in DB, closing chunk ({}/{})".format(
kres_file, kres_chunk_idx, n_chunks))
dbclient.close()
return
"""
kres_data_1 = [_db_preprocess(x) for x in kres_data]
db_payload += kres_data_1
try:
dbclient.valdb["kres"].insert_many(db_payload, ordered=False) # much much better (just make sure sid has a unique index)
except:
logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
dbclient.close()
def _get_dbclient(args):
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb", authSource="valdb",
authMechanism='SCRAM-SHA-256' authMechanism='SCRAM-SHA-1'
) )
valdb = client.valdb return dbclient
logger.info("Writing data to {}.".format(collection_name))
col = valdb[collection_name]
for s in sentence_generator:
sdata = s[1]
col.insert_one(sdata)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
parser.add_argument('--kres-folder', required=True) parser.add_argument('--corpus', required=True)
parser.add_argument('--kres-srl-folder', required=True) parser.add_argument('--kres-folder', required=False)
parser.add_argument('--ssj-file', required=True) parser.add_argument('--kres-srl-folder', required=False)
parser.add_argument('--ssj-file', required=False)
parser.add_argument('--output', required=False, default=None) parser.add_argument('--output', required=False, default=None)
parser.add_argument('--outdir', required=False, default=None) parser.add_argument('--outdir', required=False, default=None)
parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1)
parser.add_argument('--chunk-size', required=False, default=3)
args = parser.parse_args() args = parser.parse_args()
# parse ssj corpus = args.corpus
logger.info("Parsing ssj500k: {}".format(args.ssj_file)) assert (corpus in CORPORA), "Wrong corpus name."
ssj_parser = Parser(
corpus="ssj",
infiles=[args.ssj_file], outdir = None
)
# ssj to json
if args.output == "file": if args.output == "file":
ssj_to_json_file(ssj_parser.sentence_generator(), args.outdir) outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db": elif args.output == "db":
data_to_valdb( dbclient = _get_dbclient(args)
ssj_parser.sentence_generator(), dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
args.dbaddr, dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
args.dbuser, dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
args.dbpass, dbclient.close()
collection_name="ssj"
if corpus == "ssj":
logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(logger=logger, corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
dbclient.close()
if corpus == "kres":
# Kres
logger.info("Parsing Kres: {}".format(args.kres_folder))
kres_parser = Parser(
logger=logger,
corpus="kres",
kres_srl_folder=args.kres_srl_folder
) )
sys.exit() kres_files = [x for x in Path(args.kres_folder).iterdir()]
# parse kres kres_files = sorted(kres_files, key=lambda x: x.name)
logger.info("Parsing Kres: {}".format(args.ssj_file)) kres_files_chunks = []
kres_parser = Parser( i = 0
corpus="kres", while i < len(kres_files):
infiles=[args.kres_folder, args.kres_srl_folder], # kres_files_chunks += kres_files[i:(i+args.chunk_size)]
) new_i = i + int(args.chunk_size)
# kres to json kres_files_chunks += [kres_files[i:new_i]]
if args.output == "file": i = new_i
kres_to_json_files(kres_parser.sentence_generator(), args.outdir) kres_files_chunks = [x for x in enumerate(kres_files_chunks)]
elif args.output == "db": n_chunks = len(kres_files_chunks)
data_to_valdb(
ssj_parser.sentence_generator(), p = Pool(int(args.cores))
args.dbaddr, p.map(_handle_kres_file_chunk, kres_files_chunks)
args.dbuser,
args.dbpass,
collection_name="kres" logger.info("Finished parsing.")
)