387 lines
12 KiB
Python
387 lines
12 KiB
Python
|
from valency import k_utils
|
||
|
import logging
|
||
|
from time import time
|
||
|
from valency.k_utils import dict_safe_key as dsk
|
||
|
from copy import deepcopy as DC
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
|
||
|
# Upper limit for how many senses a lemma can have.
|
||
|
GUL = 20
|
||
|
SLOWNET_CACHE = "slownet_glosses_cache"
|
||
|
|
||
|
|
||
|
class DictionaryInterface:
|
||
|
def __init__(self, vallex, dictionary):
|
||
|
self.vallex = vallex
|
||
|
self.dictionary = "interface"
|
||
|
|
||
|
def find(self, lemma):
|
||
|
return []
|
||
|
|
||
|
def contains(self, lemma, upper_limit=GUL):
|
||
|
# useless. need to check if sense_glosses returns non empty list
|
||
|
res = self.find(lemma)
|
||
|
if upper_limit is not None and len(res) > upper_limit:
|
||
|
return False
|
||
|
return (len(res) is not 0)
|
||
|
|
||
|
def cached_glosses(self, lemma):
|
||
|
# preprocessed self_glosses (not used)
|
||
|
res = list(self.vallex.db.cached_glosses.find(
|
||
|
{"lemma": lemma, "dictionary": self.dictionary}))
|
||
|
if len(res) == 0:
|
||
|
return []
|
||
|
return res[0]["glosses"]
|
||
|
|
||
|
def sense_glosses(self, lemma):
|
||
|
# array: gloss for each sense
|
||
|
# gloss: {"gloss": ["<sense>", ...], "def": ["<sense"], ...}
|
||
|
return "dictionary_interface.py: not_yet_implemented"
|
||
|
|
||
|
# Recursively pull strgins out of a dictionary,
|
||
|
# based on a list of keys.
|
||
|
# uses self.recursion_buffer
|
||
|
def pull_strings_wrapper(self, element, keys):
|
||
|
if element is None:
|
||
|
return []
|
||
|
self.recursion_buffer = []
|
||
|
self.pull_strings(element, keys)
|
||
|
return self.recursion_buffer[:]
|
||
|
|
||
|
def pull_strings(self, element, keys):
|
||
|
# Recursively pull values out of a dict.
|
||
|
# correct key + element as string or list of strings
|
||
|
for k, e in element.items():
|
||
|
if k not in keys:
|
||
|
continue
|
||
|
if isinstance(e, dict):
|
||
|
self.pull_strings(e, keys)
|
||
|
elif isinstance(e, str):
|
||
|
self.recursion_buffer.append(e)
|
||
|
elif isinstance(e, list):
|
||
|
for ee in e:
|
||
|
if isinstance(ee, dict):
|
||
|
self.pull_strings(ee, keys)
|
||
|
elif isinstance(ee, str):
|
||
|
self.recursion_buffer.append(ee)
|
||
|
|
||
|
|
||
|
class Sskj(DictionaryInterface):
|
||
|
def __init__(self, vallex):
|
||
|
super().__init__(vallex, "sskj")
|
||
|
|
||
|
def find(self, lemma):
|
||
|
res = list(self.vallex.db.sskj.find(
|
||
|
{"ns0:entry.ns0:form.ns0:orth": lemma}
|
||
|
))
|
||
|
return res
|
||
|
|
||
|
def sense_glosses(self, lemma, upper_limit=GUL):
|
||
|
entries = self.find(lemma)
|
||
|
if upper_limit is not None and len(entries) > upper_limit:
|
||
|
log.info("sense_glosses({}): too many sense entries".format(lemma))
|
||
|
return []
|
||
|
senses = []
|
||
|
if len(entries) == 0:
|
||
|
return []
|
||
|
for e in entries:
|
||
|
senses.extend(dsk(
|
||
|
e["ns0:entry"], "ns0:sense"))
|
||
|
keys = [
|
||
|
"ns0:def", "ns0:cit", "ns0:quote",
|
||
|
"ns0:gloss", "ns0:sense", "ns0:orth",
|
||
|
"ns0:form", "#text"
|
||
|
]
|
||
|
glosses = []
|
||
|
for s in senses:
|
||
|
gloss = self.pull_strings_wrapper(s, keys)
|
||
|
if len(gloss) == 0:
|
||
|
continue
|
||
|
glosses.append({
|
||
|
"gloss": gloss,
|
||
|
"def": self.pull_strings_wrapper(s, ["ns0:sense", "ns0:def"])
|
||
|
})
|
||
|
return glosses
|
||
|
|
||
|
|
||
|
class SloWnet(DictionaryInterface):
|
||
|
def __init__(self, vallex):
|
||
|
super().__init__(vallex, "slownet")
|
||
|
self.hypernym_buffer = []
|
||
|
|
||
|
def slo_to_eng(self, lemma):
|
||
|
|
||
|
def helper_get_eng_lemmas(r):
|
||
|
res = []
|
||
|
for literal in dsk(r, "SYNONYM"):
|
||
|
if literal["@xml:lang"] == "en":
|
||
|
for lt in dsk(literal, "LITERAL"):
|
||
|
res.append(lt["#text"])
|
||
|
return res
|
||
|
|
||
|
# takes a slo token, returns array of english counterparts
|
||
|
results = self.find(lemma)
|
||
|
eng_lemmas = []
|
||
|
for r in results:
|
||
|
eng_lemmas.extend(helper_get_eng_lemmas(r))
|
||
|
return eng_lemmas
|
||
|
|
||
|
def helper_get_hypernyms(self, entry):
|
||
|
res = []
|
||
|
dd = dsk(entry, "ILR")
|
||
|
for d in dd:
|
||
|
if d["@type"] == "hypernym":
|
||
|
res.append(d["#text"])
|
||
|
return res
|
||
|
|
||
|
def helper_get_en_literals(self, entry):
|
||
|
res = []
|
||
|
synonyms = dsk(entry, "SYNONYM")
|
||
|
for syn in synonyms:
|
||
|
if syn["@xml:lang"] == "en":
|
||
|
literals = dsk(syn, "LITERAL")
|
||
|
for lit in literals:
|
||
|
res.append(lit["#text"])
|
||
|
return res
|
||
|
|
||
|
def rek_root_chain(self, slownet_id):
|
||
|
entry = self.find_by_id(slownet_id)
|
||
|
if entry is None:
|
||
|
return []
|
||
|
res = self.helper_get_en_literals(entry)
|
||
|
for hypernym_id in self.helper_get_hypernyms(slownet_id):
|
||
|
res.extend(self.rek_root_chain(hypernym_id))
|
||
|
return res
|
||
|
|
||
|
def root_chain(self, lemma):
|
||
|
cached = list(self.vallex.db.cached_root_chains.find({
|
||
|
"lemma": lemma
|
||
|
}))
|
||
|
if cached:
|
||
|
return cached[0]["data"]
|
||
|
|
||
|
res = self.slo_to_eng(lemma)
|
||
|
entries = self.find(lemma)
|
||
|
start_hypernym_ids = []
|
||
|
for ent in entries:
|
||
|
start_hypernym_ids.extend(self.helper_get_hypernyms(ent))
|
||
|
for shi in start_hypernym_ids:
|
||
|
res.extend(self.rek_root_chain(shi))
|
||
|
self.vallex.db.cached_root_chains.insert({
|
||
|
"lemma": lemma,
|
||
|
"data": res
|
||
|
})
|
||
|
return res
|
||
|
|
||
|
def find_by_id(self, slownet_id):
|
||
|
res = list(self.vallex.db.slownet.find({"ID": slownet_id}))
|
||
|
if len(res) == 0:
|
||
|
log.error("ID: {} not in db.slownet.".format(slownet_id))
|
||
|
return None
|
||
|
return res[0]
|
||
|
|
||
|
def find(self, lemma):
|
||
|
return list(self.vallex.db.slownet.find({"slo_lemma": lemma}))
|
||
|
"""
|
||
|
# elemMatch for array query
|
||
|
res = list(self.vallex.db.slownet.find({
|
||
|
"SYNONYM": {'$elemMatch': {
|
||
|
"LITERAL": {'$elemMatch': {"#text": lemma}}
|
||
|
}}
|
||
|
}))
|
||
|
"""
|
||
|
|
||
|
def hypernyms(self, slownet_id, level):
|
||
|
if level == 3:
|
||
|
return
|
||
|
elements = list(self.vallex.db.slownet.find({"ID": slownet_id}))
|
||
|
if len(elements) == 0:
|
||
|
return
|
||
|
for e in elements:
|
||
|
ei = self.extract_element_info(e)
|
||
|
self.hypernym_buffer.append({
|
||
|
"def": ei["domain"] + ei["def"],
|
||
|
"gloss": ei["domain"] + ei["def"] + ei["usage"]
|
||
|
})
|
||
|
for ilr in ei["ilr"]:
|
||
|
self.hypernyms(ilr, level + 1)
|
||
|
|
||
|
def extract_element_info(self, e):
|
||
|
domain = []
|
||
|
dd = dsk(e, "DOMAIN")
|
||
|
for d in dd:
|
||
|
domain.append(d)
|
||
|
definition = []
|
||
|
dd = dsk(e, "DEF")
|
||
|
for d in dd:
|
||
|
if d["@xml:lang"] == "en":
|
||
|
definition.append(d["#text"])
|
||
|
ilr = []
|
||
|
dd = dsk(e, "ILR")
|
||
|
for d in dd:
|
||
|
if d["@type"] == "hypernym":
|
||
|
ilr.append(d["#text"])
|
||
|
usage = []
|
||
|
dd = dsk(e, "USAGE")
|
||
|
for d in dd:
|
||
|
if d["@xml:lang"] == "en":
|
||
|
usage.append(d["#text"])
|
||
|
return {
|
||
|
"domain": domain,
|
||
|
"def": definition,
|
||
|
"ilr": ilr,
|
||
|
"usage": usage,
|
||
|
}
|
||
|
|
||
|
def sense_glosses(self, lemma, upper_limit=GUL):
|
||
|
# stime = time()
|
||
|
|
||
|
# caching
|
||
|
db_key = {
|
||
|
"lemma": lemma,
|
||
|
"upper_limit": upper_limit
|
||
|
}
|
||
|
cache = list(self.vallex.db[SLOWNET_CACHE].find(db_key))
|
||
|
if len(cache) > 0:
|
||
|
return cache[0]["data"]
|
||
|
|
||
|
entries = self.find(lemma)
|
||
|
if upper_limit is not None and len(entries) > upper_limit:
|
||
|
# log.info("sense_glosses({}): too many senses".format(lemma))
|
||
|
return []
|
||
|
ret_glosses = []
|
||
|
for e in entries:
|
||
|
defs = []
|
||
|
glosses = []
|
||
|
self.hypernym_buffer = []
|
||
|
ei = self.extract_element_info(e)
|
||
|
self.hypernym_buffer.append({
|
||
|
"def": ei["domain"] + ei["def"],
|
||
|
"gloss": ei["domain"] + ei["def"] + ei["usage"]
|
||
|
})
|
||
|
for ilr in ei["ilr"]:
|
||
|
self.hypernyms(ilr, 1)
|
||
|
|
||
|
[defs.extend(x["def"]) for x in self.hypernym_buffer]
|
||
|
[glosses.extend(x["gloss"]) for x in self.hypernym_buffer]
|
||
|
ret_glosses.append({
|
||
|
"def": defs,
|
||
|
"gloss": glosses,
|
||
|
})
|
||
|
|
||
|
# log.debug("slownet.sense_glosses({}): {:.2f}s".format(
|
||
|
# lemma, time() - stime))
|
||
|
|
||
|
# caching
|
||
|
db_entry = {
|
||
|
"lemma": db_key["lemma"],
|
||
|
"upper_limit": db_key["upper_limit"],
|
||
|
"data": ret_glosses
|
||
|
}
|
||
|
self.vallex.db.slownet_sense_glosses.update(
|
||
|
db_key, db_entry, upsert=True
|
||
|
)
|
||
|
return ret_glosses
|
||
|
|
||
|
|
||
|
class Sskj2(DictionaryInterface):
|
||
|
def __init__(self, vallex):
|
||
|
super().__init__(vallex, "sskj")
|
||
|
|
||
|
def find(self, lemma):
|
||
|
pos = "glagol"
|
||
|
if lemma[-1] == "_":
|
||
|
pos = "pridevnik"
|
||
|
res = list(self.vallex.db.sskj.find({
|
||
|
"izt_clean": lemma,
|
||
|
"pos": pos
|
||
|
}))
|
||
|
return res
|
||
|
|
||
|
def count_senses(self, lemma):
|
||
|
entries = self.find(lemma)
|
||
|
if len(entries) == 0:
|
||
|
return 0
|
||
|
ol = dsk(entries[0], "ol")
|
||
|
if len(ol) == 0:
|
||
|
return 1
|
||
|
return len(ol[0]["li"])
|
||
|
|
||
|
def sense_glosses(self, lemma, upper_limit=GUL):
|
||
|
|
||
|
def helper_dict_safe_add(dic, key, el):
|
||
|
if key not in dic:
|
||
|
dic[key] = []
|
||
|
dic[key].append(el)
|
||
|
|
||
|
def helper_pull_rec(el_lst, res_dct):
|
||
|
for el in el_lst:
|
||
|
if isinstance(el, dict):
|
||
|
if ("@title" in el) and ("#text" in el):
|
||
|
helper_dict_safe_add(
|
||
|
res_dct, el["@title"], el["#text"])
|
||
|
if "span" in el:
|
||
|
helper_pull_rec(dsk(el, "span"), res_dct)
|
||
|
if ("ol" in el) and ("li" in el["ol"]):
|
||
|
helper_pull_rec(el["ol"]["li"], res_dct)
|
||
|
if "li" in el:
|
||
|
helper_pull_rec(el["li"], res_dct)
|
||
|
|
||
|
entries = self.find(lemma)
|
||
|
if len(entries) == 0:
|
||
|
return []
|
||
|
if len(entries) > 1:
|
||
|
log.warning("{} entries for {} in sskj2.".format(
|
||
|
len(entries), lemma))
|
||
|
glosses_per_entry = []
|
||
|
for idx, entry in enumerate(entries):
|
||
|
res_dict = {}
|
||
|
if "span" in entry:
|
||
|
helper_pull_rec(dsk(entry, "span"), res_dict)
|
||
|
# senses
|
||
|
res_dict["senses"] = []
|
||
|
if ("ol" in entry) and ("li" in entry["ol"]):
|
||
|
for el in dsk(entry["ol"], "li"):
|
||
|
tmp = {"sskj_sense_id": el["span"][0]}
|
||
|
helper_pull_rec(dsk(el, "span"), tmp)
|
||
|
helper_pull_rec(dsk(el, "ol"), tmp)
|
||
|
res_dict["senses"].append(DC(tmp))
|
||
|
|
||
|
def helper_create_gloss(dct):
|
||
|
keys = ["Razlaga", "Zgled", "Stranska razlaga", "Sopomenka"]
|
||
|
ret = []
|
||
|
for k in keys:
|
||
|
ret.extend(dsk(dct, k))
|
||
|
return ret
|
||
|
|
||
|
glosses = []
|
||
|
n_senses = len(res_dict["senses"])
|
||
|
if n_senses == 0:
|
||
|
glosses.append({
|
||
|
"sskj_sense_id": "1-1",
|
||
|
"gloss": helper_create_gloss(res_dict),
|
||
|
"def": dsk(res_dict, "Razlaga")
|
||
|
})
|
||
|
return glosses
|
||
|
|
||
|
for sense in res_dict["senses"]:
|
||
|
glosses.append({
|
||
|
"sskj_sense_id": "{}-{}".format(
|
||
|
sense["sskj_sense_id"], n_senses),
|
||
|
"gloss": helper_create_gloss(sense),
|
||
|
"def": dsk(sense, "Razlaga")
|
||
|
})
|
||
|
glosses_per_entry.append(glosses)
|
||
|
|
||
|
# add entry_id before the_sense id
|
||
|
# entry_id-sskj_sense_id-n_senses
|
||
|
all_glosses = []
|
||
|
for idx, glosses in enumerate(glosses_per_entry):
|
||
|
entry_id = idx + 1 # start with 1
|
||
|
for gloss in glosses:
|
||
|
gloss["sskj_sense_id"] = "{}-{}".format(
|
||
|
entry_id, gloss["sskj_sense_id"])
|
||
|
all_glosses.append(gloss)
|
||
|
return all_glosses
|