You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cjvt-valency/dip_src/valency/dictionary_interface.py

387 lines
12 KiB

from valency import k_utils
import logging
from time import time
from valency.k_utils import dict_safe_key as dsk
from copy import deepcopy as DC
log = logging.getLogger(__name__)
# Upper limit for how many senses a lemma can have.
GUL = 20
SLOWNET_CACHE = "slownet_glosses_cache"
class DictionaryInterface:
def __init__(self, vallex, dictionary):
self.vallex = vallex
self.dictionary = "interface"
def find(self, lemma):
return []
def contains(self, lemma, upper_limit=GUL):
# useless. need to check if sense_glosses returns non empty list
res = self.find(lemma)
if upper_limit is not None and len(res) > upper_limit:
return False
return (len(res) is not 0)
def cached_glosses(self, lemma):
# preprocessed self_glosses (not used)
res = list(self.vallex.db.cached_glosses.find(
{"lemma": lemma, "dictionary": self.dictionary}))
if len(res) == 0:
return []
return res[0]["glosses"]
def sense_glosses(self, lemma):
# array: gloss for each sense
# gloss: {"gloss": ["<sense>", ...], "def": ["<sense"], ...}
return "dictionary_interface.py: not_yet_implemented"
# Recursively pull strgins out of a dictionary,
# based on a list of keys.
# uses self.recursion_buffer
def pull_strings_wrapper(self, element, keys):
if element is None:
return []
self.recursion_buffer = []
self.pull_strings(element, keys)
return self.recursion_buffer[:]
def pull_strings(self, element, keys):
# Recursively pull values out of a dict.
# correct key + element as string or list of strings
for k, e in element.items():
if k not in keys:
continue
if isinstance(e, dict):
self.pull_strings(e, keys)
elif isinstance(e, str):
self.recursion_buffer.append(e)
elif isinstance(e, list):
for ee in e:
if isinstance(ee, dict):
self.pull_strings(ee, keys)
elif isinstance(ee, str):
self.recursion_buffer.append(ee)
class Sskj(DictionaryInterface):
def __init__(self, vallex):
super().__init__(vallex, "sskj")
def find(self, lemma):
res = list(self.vallex.db.sskj.find(
{"ns0:entry.ns0:form.ns0:orth": lemma}
))
return res
def sense_glosses(self, lemma, upper_limit=GUL):
entries = self.find(lemma)
if upper_limit is not None and len(entries) > upper_limit:
log.info("sense_glosses({}): too many sense entries".format(lemma))
return []
senses = []
if len(entries) == 0:
return []
for e in entries:
senses.extend(dsk(
e["ns0:entry"], "ns0:sense"))
keys = [
"ns0:def", "ns0:cit", "ns0:quote",
"ns0:gloss", "ns0:sense", "ns0:orth",
"ns0:form", "#text"
]
glosses = []
for s in senses:
gloss = self.pull_strings_wrapper(s, keys)
if len(gloss) == 0:
continue
glosses.append({
"gloss": gloss,
"def": self.pull_strings_wrapper(s, ["ns0:sense", "ns0:def"])
})
return glosses
class SloWnet(DictionaryInterface):
def __init__(self, vallex):
super().__init__(vallex, "slownet")
self.hypernym_buffer = []
def slo_to_eng(self, lemma):
def helper_get_eng_lemmas(r):
res = []
for literal in dsk(r, "SYNONYM"):
if literal["@xml:lang"] == "en":
for lt in dsk(literal, "LITERAL"):
res.append(lt["#text"])
return res
# takes a slo token, returns array of english counterparts
results = self.find(lemma)
eng_lemmas = []
for r in results:
eng_lemmas.extend(helper_get_eng_lemmas(r))
return eng_lemmas
def helper_get_hypernyms(self, entry):
res = []
dd = dsk(entry, "ILR")
for d in dd:
if d["@type"] == "hypernym":
res.append(d["#text"])
return res
def helper_get_en_literals(self, entry):
res = []
synonyms = dsk(entry, "SYNONYM")
for syn in synonyms:
if syn["@xml:lang"] == "en":
literals = dsk(syn, "LITERAL")
for lit in literals:
res.append(lit["#text"])
return res
def rek_root_chain(self, slownet_id):
entry = self.find_by_id(slownet_id)
if entry is None:
return []
res = self.helper_get_en_literals(entry)
for hypernym_id in self.helper_get_hypernyms(slownet_id):
res.extend(self.rek_root_chain(hypernym_id))
return res
def root_chain(self, lemma):
cached = list(self.vallex.db.cached_root_chains.find({
"lemma": lemma
}))
if cached:
return cached[0]["data"]
res = self.slo_to_eng(lemma)
entries = self.find(lemma)
start_hypernym_ids = []
for ent in entries:
start_hypernym_ids.extend(self.helper_get_hypernyms(ent))
for shi in start_hypernym_ids:
res.extend(self.rek_root_chain(shi))
self.vallex.db.cached_root_chains.insert({
"lemma": lemma,
"data": res
})
return res
def find_by_id(self, slownet_id):
res = list(self.vallex.db.slownet.find({"ID": slownet_id}))
if len(res) == 0:
log.error("ID: {} not in db.slownet.".format(slownet_id))
return None
return res[0]
def find(self, lemma):
return list(self.vallex.db.slownet.find({"slo_lemma": lemma}))
"""
# elemMatch for array query
res = list(self.vallex.db.slownet.find({
"SYNONYM": {'$elemMatch': {
"LITERAL": {'$elemMatch': {"#text": lemma}}
}}
}))
"""
def hypernyms(self, slownet_id, level):
if level == 3:
return
elements = list(self.vallex.db.slownet.find({"ID": slownet_id}))
if len(elements) == 0:
return
for e in elements:
ei = self.extract_element_info(e)
self.hypernym_buffer.append({
"def": ei["domain"] + ei["def"],
"gloss": ei["domain"] + ei["def"] + ei["usage"]
})
for ilr in ei["ilr"]:
self.hypernyms(ilr, level + 1)
def extract_element_info(self, e):
domain = []
dd = dsk(e, "DOMAIN")
for d in dd:
domain.append(d)
definition = []
dd = dsk(e, "DEF")
for d in dd:
if d["@xml:lang"] == "en":
definition.append(d["#text"])
ilr = []
dd = dsk(e, "ILR")
for d in dd:
if d["@type"] == "hypernym":
ilr.append(d["#text"])
usage = []
dd = dsk(e, "USAGE")
for d in dd:
if d["@xml:lang"] == "en":
usage.append(d["#text"])
return {
"domain": domain,
"def": definition,
"ilr": ilr,
"usage": usage,
}
def sense_glosses(self, lemma, upper_limit=GUL):
# stime = time()
# caching
db_key = {
"lemma": lemma,
"upper_limit": upper_limit
}
cache = list(self.vallex.db[SLOWNET_CACHE].find(db_key))
if len(cache) > 0:
return cache[0]["data"]
entries = self.find(lemma)
if upper_limit is not None and len(entries) > upper_limit:
# log.info("sense_glosses({}): too many senses".format(lemma))
return []
ret_glosses = []
for e in entries:
defs = []
glosses = []
self.hypernym_buffer = []
ei = self.extract_element_info(e)
self.hypernym_buffer.append({
"def": ei["domain"] + ei["def"],
"gloss": ei["domain"] + ei["def"] + ei["usage"]
})
for ilr in ei["ilr"]:
self.hypernyms(ilr, 1)
[defs.extend(x["def"]) for x in self.hypernym_buffer]
[glosses.extend(x["gloss"]) for x in self.hypernym_buffer]
ret_glosses.append({
"def": defs,
"gloss": glosses,
})
# log.debug("slownet.sense_glosses({}): {:.2f}s".format(
# lemma, time() - stime))
# caching
db_entry = {
"lemma": db_key["lemma"],
"upper_limit": db_key["upper_limit"],
"data": ret_glosses
}
self.vallex.db.slownet_sense_glosses.update(
db_key, db_entry, upsert=True
)
return ret_glosses
class Sskj2(DictionaryInterface):
def __init__(self, vallex):
super().__init__(vallex, "sskj")
def find(self, lemma):
pos = "glagol"
if lemma[-1] == "_":
pos = "pridevnik"
res = list(self.vallex.db.sskj.find({
"izt_clean": lemma,
"pos": pos
}))
return res
def count_senses(self, lemma):
entries = self.find(lemma)
if len(entries) == 0:
return 0
ol = dsk(entries[0], "ol")
if len(ol) == 0:
return 1
return len(ol[0]["li"])
def sense_glosses(self, lemma, upper_limit=GUL):
def helper_dict_safe_add(dic, key, el):
if key not in dic:
dic[key] = []
dic[key].append(el)
def helper_pull_rec(el_lst, res_dct):
for el in el_lst:
if isinstance(el, dict):
if ("@title" in el) and ("#text" in el):
helper_dict_safe_add(
res_dct, el["@title"], el["#text"])
if "span" in el:
helper_pull_rec(dsk(el, "span"), res_dct)
if ("ol" in el) and ("li" in el["ol"]):
helper_pull_rec(el["ol"]["li"], res_dct)
if "li" in el:
helper_pull_rec(el["li"], res_dct)
entries = self.find(lemma)
if len(entries) == 0:
return []
if len(entries) > 1:
log.warning("{} entries for {} in sskj2.".format(
len(entries), lemma))
glosses_per_entry = []
for idx, entry in enumerate(entries):
res_dict = {}
if "span" in entry:
helper_pull_rec(dsk(entry, "span"), res_dict)
# senses
res_dict["senses"] = []
if ("ol" in entry) and ("li" in entry["ol"]):
for el in dsk(entry["ol"], "li"):
tmp = {"sskj_sense_id": el["span"][0]}
helper_pull_rec(dsk(el, "span"), tmp)
helper_pull_rec(dsk(el, "ol"), tmp)
res_dict["senses"].append(DC(tmp))
def helper_create_gloss(dct):
keys = ["Razlaga", "Zgled", "Stranska razlaga", "Sopomenka"]
ret = []
for k in keys:
ret.extend(dsk(dct, k))
return ret
glosses = []
n_senses = len(res_dict["senses"])
if n_senses == 0:
glosses.append({
"sskj_sense_id": "1-1",
"gloss": helper_create_gloss(res_dict),
"def": dsk(res_dict, "Razlaga")
})
return glosses
for sense in res_dict["senses"]:
glosses.append({
"sskj_sense_id": "{}-{}".format(
sense["sskj_sense_id"], n_senses),
"gloss": helper_create_gloss(sense),
"def": dsk(sense, "Razlaga")
})
glosses_per_entry.append(glosses)
# add entry_id before the_sense id
# entry_id-sskj_sense_id-n_senses
all_glosses = []
for idx, glosses in enumerate(glosses_per_entry):
entry_id = idx + 1 # start with 1
for gloss in glosses:
gloss["sskj_sense_id"] = "{}-{}".format(
entry_id, gloss["sskj_sense_id"])
all_glosses.append(gloss)
return all_glosses