Compare commits

...

17 Commits

3 changed files with 195 additions and 102 deletions

View File

@@ -3,24 +3,21 @@ import re
import json import json
from lxml import etree from lxml import etree
import logging import logging
import time
logging.basicConfig(level=logging.INFO) # logging.basicConfig(level=logging.INFO)
# Read input file(.xml, .json; kres or ssj500k). # Read input file(.xml, .json; kres or ssj500k).
# Create an iterator that outputs resulting sentences (python dict format). # Create an iterator that outputs resulting sentences (python dict format).
class Parser(): class Parser():
def __init__(self, corpus, infiles, logger=None): def __init__(self, corpus, kres_srl_folder=None, logger=None):
if corpus == "kres":
self.kres_folder = Path(infiles[0])
self.kres_srl_folder = Path(infiles[1])
elif corpus == "ssj":
self.ssj_file = Path(infiles[0])
else:
raise ValueError("Argument corpus should be 'ssj' or 'kres'.")
self.corpus = corpus self.corpus = corpus
if self.corpus == "kres":
self.kres_srl_folder = kres_srl_folder
self.W_TAGS = ['w'] self.W_TAGS = ['w']
self.C_TAGS = ['c'] self.C_TAGS = ['c']
self.S_TAGS = ['S', 'pc'] self.S_TAGS = ['S', 'pc']
@@ -30,6 +27,10 @@ class Parser():
"missing_srl": [] "missing_srl": []
} }
# for logging output
self.n_kres_files = -1
self.nth_kres_file = -1
def parse_jos_links(self, sent_el): def parse_jos_links(self, sent_el):
if self.corpus == "kres": if self.corpus == "kres":
return self.parse_jos_links_kres(sent_el) return self.parse_jos_links_kres(sent_el)
@@ -60,7 +61,10 @@ class Parser():
def parse_any_links_ssj(self, sent_el, links_type): def parse_any_links_ssj(self, sent_el, links_type):
lgrps = sent_el.findall(".//linkGrp") lgrps = sent_el.findall(".//linkGrp")
links = [x for x in lgrps if x.get("type") == links_type][0] try:
links = [x for x in lgrps if x.get("type") == links_type][0]
except:
return []
res_links = [] res_links = []
for link in links: for link in links:
tar = self.parse_ssj_target_arg(link.get("target")) tar = self.parse_ssj_target_arg(link.get("target"))
@@ -90,14 +94,34 @@ class Parser():
def sentence_generator(self): def sentence_generator(self):
# Using generators so we don't copy a whole corpu around in memory. # Using generators so we don't copy a whole corpu around in memory.
# Use parse_xml_file() instead for pre-file processing (parallelism?)
if self.corpus == "kres": if self.corpus == "kres":
# some logging output
if self.n_kres_files == -1:
self.n_kres_files = len(list(Path(self.kres_folder).glob('*')))
for xml_file in self.kres_folder.iterdir(): for xml_file in self.kres_folder.iterdir():
# self.parse_xml_file(xml_file) self.nth_kres_file += 1
yield from self.parse_xml_file(xml_file) self.logger.info("{} ({}/{})".format(
xml_file, self.nth_kres_file, self.n_kres_files))
yield from self.xml_file_to_generator(xml_file)
else: else:
yield from self.parse_xml_file(self.ssj_file) yield from self.xml_file_to_generator(self.ssj_file)
def parse_xml_file(self, xml_file): def parse_xml_file(self, xml_file):
# tstart = time.time()
file_data = []
for tpl in self.xml_file_to_generator(xml_file):
file_data += [tpl[1]]
tend = time.time()
# self.logger.info("Parsed {} in {:.4f} s".format(xml_file, tend - tstart))
return file_data
def xml_file_to_generator(self, xml_file):
# for separate srl links, it will guess the srl file based on
# self.kres_srl_folder
srl_from_json = {} srl_from_json = {}
if self.corpus == "kres": if self.corpus == "kres":
# in case of kres, read the SRL links form a separate json file # in case of kres, read the SRL links form a separate json file
@@ -190,7 +214,7 @@ class Parser():
"text": sentence_text, "text": sentence_text,
"tokens": sentence_tokens, "tokens": sentence_tokens,
"jos_links": jos_links, "jos_links": jos_links,
"srl_links": srl_links_parsed "srl_links": srl_links_parsed,
} }
self.stats["parsed_count"] += 1 self.stats["parsed_count"] += 1
yield (xml_file, sentence_entry) yield (xml_file, sentence_entry)

View File

@@ -1 +1,2 @@
from corpusparser.Parser import Parser from corpusparser.Parser import Parser
from corpusparser.main import enriched_lemma

View File

@@ -4,120 +4,188 @@ import argparse
import logging import logging
import json import json
from pymongo import MongoClient from pymongo import MongoClient
import pymongo
import sys import sys
from multiprocessing import Pool
import time
logging.basicConfig(level=logging.INFO) CORPORA = ["kres", "ssj"]
# logging.basicConfig(filename=Path("/var/tmp/corpusparser.log"), filemode='a', level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
## Main handles command line arguments and writing to files / DB. # lfh = logging.FileHandler("/project/logs/fill-database.log")
lfh = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
lfh.setFormatter(formatter)
logger.addHandler(lfh)
logger.setLevel(logging.INFO)
def ssj_to_json_file(sentence_generator, outfolder): n_chunks = -1
# this funciton is based on the fact that files are parsed sequentially
outfolder = Path(outfolder)
outfolder.mkdir(parents=True, exist_ok=True)
outfile = outfolder / "ssj500k.json"
data_buffer = []
for s in sentence_generator:
sdata = s[1]
data_buffer += [sdata]
# outfile = Path(outfile) def enriched_lemma(token):
with outfile.open("w") as fp: return (token["lemma"] if token["msd"][0] == "G" else token["lemma"] + "_")
logger.info("Writing to {}".format(outfile))
json.dump(data_buffer, fp)
def kres_to_json_files(sentence_generator, outfolder):
outfolder = Path(outfolder) / "kres_json"
outfolder.mkdir(parents=True, exist_ok=True)
def write_buffer_to_file(outfile, outfile_buffer): def _helper_tid_to_token(tid, tokens):
logger.info("Writing file: {}".format(outfile)) for t in tokens:
with outfile.open("w") as fp: if t["tid"] == tid:
json.dump(outfile_buffer, fp) return t
return None
outfile_buffer = None
current_outfile = None
for s in sentence_generator:
infile = s[0]
outfile = outfolder / Path(infile.name.split(".")[0]).with_suffix(".json")
# parser sequentially parses files; when we're done with a file, write it out def _db_preprocess(e):
if current_outfile is None: if e["srl_links"] is None:
current_outfile = outfile e["headwords"] = []
outfile_buffer = [] e["functors"] = []
elif outfile != current_outfile: else:
write_buffer_to_file(current_outfile, outfile_buffer) hw_tids = list(set([x["from"] for x in e["srl_links"]]))
current_outfile = outfile hw_tokens = [_helper_tid_to_token(tid, e["tokens"]) for tid in hw_tids]
outfile_buffer = [] headwords = [enriched_lemma(t) for t in hw_tokens]
e["headwords"] = headwords
# update buffer functors = list(set([x["afun"] for x in e["srl_links"]]))
sdata = s[1] e["functors"] = functors
outfile_buffer += [sdata] return e
write_buffer_to_file(current_outfile, outfile_buffer)
def data_to_valdb(sentence_generator, dbaddr, username, password, collection_name):
logger.info("Connecting to: {}".format(dbaddr)) # handler for concurrency
client = MongoClient( def _handle_kres_file_chunk(kres_file_chunk):
"mongodb://{}".format(dbaddr), tstart = time.time()
username=username, kres_chunk_idx = kres_file_chunk[0]
password=password, kres_chunk = kres_file_chunk[1]
dbclient = None
db_payload = []
if args.output == "db":
# mongoclient needs to be created after forking
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb",
authMechanism='SCRAM-SHA-1'
)
# dbclient.valdb["kres"]
for kres_file in kres_chunk:
try:
kres_data = kres_parser.parse_xml_file(kres_file)
except:
logger.error("Failed to parse file: {}".format(kres_file))
continue
if args.output == "file":
kres_outdir = outdir / "kres_json"
kres_outdir.mkdir(parents=True, exist_ok=True)
kres_outfile = kres_outdir / Path(kres_file.name.split(".")[0]).with_suffix(".json")
with kres_outfile.open("w") as fp:
json.dump(kres_data, fp)
elif args.output == "db":
"""
if dbclient.valdb["kres"].find({"sid": kres_data[0]["sid"]}).count() > 0:
logger.info("File {} already in DB, closing chunk ({}/{})".format(
kres_file, kres_chunk_idx, n_chunks))
dbclient.close()
return
"""
kres_data_1 = [_db_preprocess(x) for x in kres_data]
db_payload += kres_data_1
try:
dbclient.valdb["kres"].insert_many(db_payload, ordered=False) # much much better (just make sure sid has a unique index)
except:
logger.error("Failed inserting kres files chunk ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
logger.info("Db insert: chunks ({}/{}) in {:.2f} s".format(
kres_chunk_idx, n_chunks, time.time() - tstart))
dbclient.close()
def _get_dbclient(args):
dbclient = MongoClient(
"mongodb://{}".format(args.dbaddr),
username=args.dbuser,
password=args.dbpass,
authSource="valdb", authSource="valdb",
authMechanism='SCRAM-SHA-256' authMechanism='SCRAM-SHA-1'
) )
valdb = client.valdb return dbclient
logger.info("Writing data to {}.".format(collection_name))
col = valdb[collection_name]
for s in sentence_generator:
sdata = s[1]
col.insert_one(sdata)
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.") parser = argparse.ArgumentParser(description="Parsing corpora kres and ssj500k.")
parser.add_argument('--kres-folder', required=True) parser.add_argument('--corpus', required=True)
parser.add_argument('--kres-srl-folder', required=True) parser.add_argument('--kres-folder', required=False)
parser.add_argument('--ssj-file', required=True) parser.add_argument('--kres-srl-folder', required=False)
parser.add_argument('--ssj-file', required=False)
parser.add_argument('--output', required=False, default=None) parser.add_argument('--output', required=False, default=None)
parser.add_argument('--outdir', required=False, default=None) parser.add_argument('--outdir', required=False, default=None)
parser.add_argument('--dbaddr', required=False, default=None) parser.add_argument('--dbaddr', required=False, default=None)
parser.add_argument('--dbuser', required=False, default=None) parser.add_argument('--dbuser', required=False, default=None)
parser.add_argument('--dbpass', required=False, default=None) parser.add_argument('--dbpass', required=False, default=None)
parser.add_argument('--cores', required=False, default=1)
parser.add_argument('--chunk-size', required=False, default=3)
args = parser.parse_args() args = parser.parse_args()
# parse ssj corpus = args.corpus
logger.info("Parsing ssj500k: {}".format(args.ssj_file)) assert (corpus in CORPORA), "Wrong corpus name."
ssj_parser = Parser(
corpus="ssj",
infiles=[args.ssj_file], outdir = None
)
# ssj to json
if args.output == "file": if args.output == "file":
ssj_to_json_file(ssj_parser.sentence_generator(), args.outdir) outdir = Path(args.outdir)
outdir.mkdir(parents=True, exist_ok=True)
elif args.output == "db": elif args.output == "db":
data_to_valdb( dbclient = _get_dbclient(args)
ssj_parser.sentence_generator(), dbclient.valdb[corpus].ensure_index([("sid", pymongo.ASCENDING)])
args.dbaddr, dbclient.valdb[corpus].ensure_index([("headwords", pymongo.ASCENDING)])
args.dbuser, dbclient.valdb[corpus].ensure_index([("functors", pymongo.ASCENDING)])
args.dbpass, dbclient.close()
collection_name="ssj"
if corpus == "ssj":
logger.info("Parsing Ssj: {}".format(args.ssj_file))
ssj_parser = Parser(logger=logger, corpus="ssj")
ssj_data = ssj_parser.parse_xml_file(Path(args.ssj_file))
if args.output == "file":
ssj_outfile = outdir / "ssj500k.json"
with ssj_outfile.open("w") as fp:
json.dump(ssj_data, fp)
elif args.output == "db":
dbclient = _get_dbclient(args)
valdb = dbclient.valdb
ssj_col = valdb["ssj"]
for sentence in ssj_data:
sentence = _db_preprocess(sentence)
ssj_col.update({"sid": sentence["sid"]}, sentence, upsert=True)
dbclient.close()
if corpus == "kres":
# Kres
logger.info("Parsing Kres: {}".format(args.kres_folder))
kres_parser = Parser(
logger=logger,
corpus="kres",
kres_srl_folder=args.kres_srl_folder
) )
# parse kres kres_files = [x for x in Path(args.kres_folder).iterdir()]
logger.info("Parsing Kres: {}".format(args.ssj_file)) kres_files = sorted(kres_files, key=lambda x: x.name)
kres_parser = Parser( kres_files_chunks = []
corpus="kres", i = 0
infiles=[args.kres_folder, args.kres_srl_folder], while i < len(kres_files):
) # kres_files_chunks += kres_files[i:(i+args.chunk_size)]
# kres to json new_i = i + int(args.chunk_size)
if args.output == "file": kres_files_chunks += [kres_files[i:new_i]]
kres_to_json_files(kres_parser.sentence_generator(), args.outdir) i = new_i
elif args.output == "db": kres_files_chunks = [x for x in enumerate(kres_files_chunks)]
data_to_valdb( n_chunks = len(kres_files_chunks)
kres_parser.sentence_generator(),
args.dbaddr, p = Pool(int(args.cores))
args.dbuser, p.map(_handle_kres_file_chunk, kres_files_chunks)
args.dbpass,
collection_name="kres"
) logger.info("Finished parsing.")