387 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			387 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from valency import k_utils
 | |
| import logging
 | |
| from time import time
 | |
| from valency.k_utils import dict_safe_key as dsk
 | |
| from copy import deepcopy as DC
 | |
| 
 | |
| log = logging.getLogger(__name__)
 | |
| 
 | |
| # Upper limit for how many senses a lemma can have.
 | |
| GUL = 20
 | |
| SLOWNET_CACHE = "slownet_glosses_cache"
 | |
| 
 | |
| 
 | |
| class DictionaryInterface:
 | |
|     def __init__(self, vallex, dictionary):
 | |
|         self.vallex = vallex
 | |
|         self.dictionary = "interface"
 | |
| 
 | |
|     def find(self, lemma):
 | |
|         return []
 | |
| 
 | |
|     def contains(self, lemma, upper_limit=GUL):
 | |
|         # useless. need to check if sense_glosses returns non empty list
 | |
|         res = self.find(lemma)
 | |
|         if upper_limit is not None and len(res) > upper_limit:
 | |
|             return False
 | |
|         return (len(res) is not 0)
 | |
| 
 | |
|     def cached_glosses(self, lemma):
 | |
|         # preprocessed self_glosses (not used)
 | |
|         res = list(self.vallex.db.cached_glosses.find(
 | |
|             {"lemma": lemma, "dictionary": self.dictionary}))
 | |
|         if len(res) == 0:
 | |
|             return []
 | |
|         return res[0]["glosses"]
 | |
| 
 | |
|     def sense_glosses(self, lemma):
 | |
|         # array: gloss for each sense
 | |
|         # gloss: {"gloss": ["<sense>", ...], "def": ["<sense"], ...}
 | |
|         return "dictionary_interface.py: not_yet_implemented"
 | |
| 
 | |
|     # Recursively pull strgins out of a dictionary,
 | |
|     # based on a list of keys.
 | |
|     # uses self.recursion_buffer
 | |
|     def pull_strings_wrapper(self, element, keys):
 | |
|         if element is None:
 | |
|             return []
 | |
|         self.recursion_buffer = []
 | |
|         self.pull_strings(element, keys)
 | |
|         return self.recursion_buffer[:]
 | |
| 
 | |
|     def pull_strings(self, element, keys):
 | |
|         # Recursively pull values out of a dict.
 | |
|         # correct key + element as string or list of strings
 | |
|         for k, e in element.items():
 | |
|             if k not in keys:
 | |
|                 continue
 | |
|             if isinstance(e, dict):
 | |
|                 self.pull_strings(e, keys)
 | |
|             elif isinstance(e, str):
 | |
|                 self.recursion_buffer.append(e)
 | |
|             elif isinstance(e, list):
 | |
|                 for ee in e:
 | |
|                     if isinstance(ee, dict):
 | |
|                         self.pull_strings(ee, keys)
 | |
|                     elif isinstance(ee, str):
 | |
|                         self.recursion_buffer.append(ee)
 | |
| 
 | |
| 
 | |
| class Sskj(DictionaryInterface):
 | |
|     def __init__(self, vallex):
 | |
|         super().__init__(vallex, "sskj")
 | |
| 
 | |
|     def find(self, lemma):
 | |
|         res = list(self.vallex.db.sskj.find(
 | |
|             {"ns0:entry.ns0:form.ns0:orth": lemma}
 | |
|         ))
 | |
|         return res
 | |
| 
 | |
|     def sense_glosses(self, lemma, upper_limit=GUL):
 | |
|         entries = self.find(lemma)
 | |
|         if upper_limit is not None and len(entries) > upper_limit:
 | |
|             log.info("sense_glosses({}): too many sense entries".format(lemma))
 | |
|             return []
 | |
|         senses = []
 | |
|         if len(entries) == 0:
 | |
|             return []
 | |
|         for e in entries:
 | |
|             senses.extend(dsk(
 | |
|                 e["ns0:entry"], "ns0:sense"))
 | |
|         keys = [
 | |
|             "ns0:def", "ns0:cit", "ns0:quote",
 | |
|             "ns0:gloss", "ns0:sense", "ns0:orth",
 | |
|             "ns0:form", "#text"
 | |
|         ]
 | |
|         glosses = []
 | |
|         for s in senses:
 | |
|             gloss = self.pull_strings_wrapper(s, keys)
 | |
|             if len(gloss) == 0:
 | |
|                 continue
 | |
|             glosses.append({
 | |
|                 "gloss": gloss,
 | |
|                 "def": self.pull_strings_wrapper(s, ["ns0:sense", "ns0:def"])
 | |
|             })
 | |
|         return glosses
 | |
| 
 | |
| 
 | |
| class SloWnet(DictionaryInterface):
 | |
|     def __init__(self, vallex):
 | |
|         super().__init__(vallex, "slownet")
 | |
|         self.hypernym_buffer = []
 | |
| 
 | |
|     def slo_to_eng(self, lemma):
 | |
| 
 | |
|         def helper_get_eng_lemmas(r):
 | |
|             res = []
 | |
|             for literal in dsk(r, "SYNONYM"):
 | |
|                 if literal["@xml:lang"] == "en":
 | |
|                     for lt in dsk(literal, "LITERAL"):
 | |
|                         res.append(lt["#text"])
 | |
|             return res
 | |
| 
 | |
|         # takes a slo token, returns array of english counterparts
 | |
|         results = self.find(lemma)
 | |
|         eng_lemmas = []
 | |
|         for r in results:
 | |
|             eng_lemmas.extend(helper_get_eng_lemmas(r))
 | |
|         return eng_lemmas
 | |
| 
 | |
|     def helper_get_hypernyms(self, entry):
 | |
|         res = []
 | |
|         dd = dsk(entry, "ILR")
 | |
|         for d in dd:
 | |
|             if d["@type"] == "hypernym":
 | |
|                 res.append(d["#text"])
 | |
|         return res
 | |
| 
 | |
|     def helper_get_en_literals(self, entry):
 | |
|         res = []
 | |
|         synonyms = dsk(entry, "SYNONYM")
 | |
|         for syn in synonyms:
 | |
|             if syn["@xml:lang"] == "en":
 | |
|                 literals = dsk(syn, "LITERAL")
 | |
|                 for lit in literals:
 | |
|                     res.append(lit["#text"])
 | |
|         return res
 | |
| 
 | |
|     def rek_root_chain(self, slownet_id):
 | |
|         entry = self.find_by_id(slownet_id)
 | |
|         if entry is None:
 | |
|             return []
 | |
|         res = self.helper_get_en_literals(entry)
 | |
|         for hypernym_id in self.helper_get_hypernyms(slownet_id):
 | |
|             res.extend(self.rek_root_chain(hypernym_id))
 | |
|         return res
 | |
| 
 | |
|     def root_chain(self, lemma):
 | |
|         cached = list(self.vallex.db.cached_root_chains.find({
 | |
|             "lemma": lemma
 | |
|         }))
 | |
|         if cached:
 | |
|             return cached[0]["data"]
 | |
| 
 | |
|         res = self.slo_to_eng(lemma)
 | |
|         entries = self.find(lemma)
 | |
|         start_hypernym_ids = []
 | |
|         for ent in entries:
 | |
|             start_hypernym_ids.extend(self.helper_get_hypernyms(ent))
 | |
|         for shi in start_hypernym_ids:
 | |
|             res.extend(self.rek_root_chain(shi))
 | |
|         self.vallex.db.cached_root_chains.insert({
 | |
|             "lemma": lemma,
 | |
|             "data": res
 | |
|         })
 | |
|         return res
 | |
| 
 | |
|     def find_by_id(self, slownet_id):
 | |
|         res = list(self.vallex.db.slownet.find({"ID": slownet_id}))
 | |
|         if len(res) == 0:
 | |
|             log.error("ID: {} not in db.slownet.".format(slownet_id))
 | |
|             return None
 | |
|         return res[0]
 | |
| 
 | |
|     def find(self, lemma):
 | |
|         return list(self.vallex.db.slownet.find({"slo_lemma": lemma}))
 | |
|         """
 | |
|         # elemMatch for array query
 | |
|         res = list(self.vallex.db.slownet.find({
 | |
|             "SYNONYM": {'$elemMatch': {
 | |
|                 "LITERAL": {'$elemMatch': {"#text": lemma}}
 | |
|             }}
 | |
|         }))
 | |
|         """
 | |
| 
 | |
|     def hypernyms(self, slownet_id, level):
 | |
|         if level == 3:
 | |
|             return
 | |
|         elements = list(self.vallex.db.slownet.find({"ID": slownet_id}))
 | |
|         if len(elements) == 0:
 | |
|             return
 | |
|         for e in elements:
 | |
|             ei = self.extract_element_info(e)
 | |
|             self.hypernym_buffer.append({
 | |
|                 "def": ei["domain"] + ei["def"],
 | |
|                 "gloss": ei["domain"] + ei["def"] + ei["usage"]
 | |
|             })
 | |
|             for ilr in ei["ilr"]:
 | |
|                 self.hypernyms(ilr, level + 1)
 | |
| 
 | |
|     def extract_element_info(self, e):
 | |
|         domain = []
 | |
|         dd = dsk(e, "DOMAIN")
 | |
|         for d in dd:
 | |
|             domain.append(d)
 | |
|         definition = []
 | |
|         dd = dsk(e, "DEF")
 | |
|         for d in dd:
 | |
|             if d["@xml:lang"] == "en":
 | |
|                 definition.append(d["#text"])
 | |
|         ilr = []
 | |
|         dd = dsk(e, "ILR")
 | |
|         for d in dd:
 | |
|             if d["@type"] == "hypernym":
 | |
|                 ilr.append(d["#text"])
 | |
|         usage = []
 | |
|         dd = dsk(e, "USAGE")
 | |
|         for d in dd:
 | |
|             if d["@xml:lang"] == "en":
 | |
|                 usage.append(d["#text"])
 | |
|         return {
 | |
|             "domain": domain,
 | |
|             "def": definition,
 | |
|             "ilr": ilr,
 | |
|             "usage": usage,
 | |
|         }
 | |
| 
 | |
|     def sense_glosses(self, lemma, upper_limit=GUL):
 | |
|         # stime = time()
 | |
| 
 | |
|         # caching
 | |
|         db_key = {
 | |
|             "lemma": lemma,
 | |
|             "upper_limit": upper_limit
 | |
|         }
 | |
|         cache = list(self.vallex.db[SLOWNET_CACHE].find(db_key))
 | |
|         if len(cache) > 0:
 | |
|             return cache[0]["data"]
 | |
| 
 | |
|         entries = self.find(lemma)
 | |
|         if upper_limit is not None and len(entries) > upper_limit:
 | |
|             # log.info("sense_glosses({}): too many senses".format(lemma))
 | |
|             return []
 | |
|         ret_glosses = []
 | |
|         for e in entries:
 | |
|             defs = []
 | |
|             glosses = []
 | |
|             self.hypernym_buffer = []
 | |
|             ei = self.extract_element_info(e)
 | |
|             self.hypernym_buffer.append({
 | |
|                 "def": ei["domain"] + ei["def"],
 | |
|                 "gloss": ei["domain"] + ei["def"] + ei["usage"]
 | |
|             })
 | |
|             for ilr in ei["ilr"]:
 | |
|                 self.hypernyms(ilr, 1)
 | |
| 
 | |
|             [defs.extend(x["def"]) for x in self.hypernym_buffer]
 | |
|             [glosses.extend(x["gloss"]) for x in self.hypernym_buffer]
 | |
|             ret_glosses.append({
 | |
|                 "def": defs,
 | |
|                 "gloss": glosses,
 | |
|             })
 | |
| 
 | |
|         # log.debug("slownet.sense_glosses({}): {:.2f}s".format(
 | |
|         #     lemma, time() - stime))
 | |
| 
 | |
|         # caching
 | |
|         db_entry = {
 | |
|             "lemma": db_key["lemma"],
 | |
|             "upper_limit": db_key["upper_limit"],
 | |
|             "data": ret_glosses
 | |
|         }
 | |
|         self.vallex.db.slownet_sense_glosses.update(
 | |
|             db_key, db_entry, upsert=True
 | |
|         )
 | |
|         return ret_glosses
 | |
| 
 | |
| 
 | |
| class Sskj2(DictionaryInterface):
 | |
|     def __init__(self, vallex):
 | |
|         super().__init__(vallex, "sskj")
 | |
| 
 | |
|     def find(self, lemma):
 | |
|         pos = "glagol"
 | |
|         if lemma[-1] == "_":
 | |
|             pos = "pridevnik"
 | |
|         res = list(self.vallex.db.sskj.find({
 | |
|             "izt_clean": lemma,
 | |
|             "pos": pos
 | |
|         }))
 | |
|         return res
 | |
| 
 | |
|     def count_senses(self, lemma):
 | |
|         entries = self.find(lemma)
 | |
|         if len(entries) == 0:
 | |
|             return 0
 | |
|         ol = dsk(entries[0], "ol")
 | |
|         if len(ol) == 0:
 | |
|             return 1
 | |
|         return len(ol[0]["li"])
 | |
| 
 | |
|     def sense_glosses(self, lemma, upper_limit=GUL):
 | |
| 
 | |
|         def helper_dict_safe_add(dic, key, el):
 | |
|             if key not in dic:
 | |
|                 dic[key] = []
 | |
|             dic[key].append(el)
 | |
| 
 | |
|         def helper_pull_rec(el_lst, res_dct):
 | |
|             for el in el_lst:
 | |
|                 if isinstance(el, dict):
 | |
|                     if ("@title" in el) and ("#text" in el):
 | |
|                         helper_dict_safe_add(
 | |
|                             res_dct, el["@title"], el["#text"])
 | |
|                     if "span" in el:
 | |
|                         helper_pull_rec(dsk(el, "span"), res_dct)
 | |
|                     if ("ol" in el) and ("li" in el["ol"]):
 | |
|                         helper_pull_rec(el["ol"]["li"], res_dct)
 | |
|                     if "li" in el:
 | |
|                         helper_pull_rec(el["li"], res_dct)
 | |
| 
 | |
|         entries = self.find(lemma)
 | |
|         if len(entries) == 0:
 | |
|             return []
 | |
|         if len(entries) > 1:
 | |
|             log.warning("{} entries for {} in sskj2.".format(
 | |
|                 len(entries), lemma))
 | |
|         glosses_per_entry = []
 | |
|         for idx, entry in enumerate(entries):
 | |
|             res_dict = {}
 | |
|             if "span" in entry:
 | |
|                 helper_pull_rec(dsk(entry, "span"), res_dict)
 | |
|             # senses
 | |
|             res_dict["senses"] = []
 | |
|             if ("ol" in entry) and ("li" in entry["ol"]):
 | |
|                 for el in dsk(entry["ol"], "li"):
 | |
|                     tmp = {"sskj_sense_id": el["span"][0]}
 | |
|                     helper_pull_rec(dsk(el, "span"), tmp)
 | |
|                     helper_pull_rec(dsk(el, "ol"), tmp)
 | |
|                     res_dict["senses"].append(DC(tmp))
 | |
| 
 | |
|             def helper_create_gloss(dct):
 | |
|                 keys = ["Razlaga", "Zgled", "Stranska razlaga", "Sopomenka"]
 | |
|                 ret = []
 | |
|                 for k in keys:
 | |
|                     ret.extend(dsk(dct, k))
 | |
|                 return ret
 | |
| 
 | |
|             glosses = []
 | |
|             n_senses = len(res_dict["senses"])
 | |
|             if n_senses == 0:
 | |
|                 glosses.append({
 | |
|                     "sskj_sense_id": "1-1",
 | |
|                     "gloss": helper_create_gloss(res_dict),
 | |
|                     "def": dsk(res_dict, "Razlaga")
 | |
|                 })
 | |
|                 return glosses
 | |
| 
 | |
|             for sense in res_dict["senses"]:
 | |
|                 glosses.append({
 | |
|                     "sskj_sense_id": "{}-{}".format(
 | |
|                         sense["sskj_sense_id"], n_senses),
 | |
|                     "gloss": helper_create_gloss(sense),
 | |
|                     "def": dsk(sense, "Razlaga")
 | |
|                 })
 | |
|             glosses_per_entry.append(glosses)
 | |
| 
 | |
|         # add entry_id before the_sense id
 | |
|         # entry_id-sskj_sense_id-n_senses
 | |
|         all_glosses = []
 | |
|         for idx, glosses in enumerate(glosses_per_entry):
 | |
|             entry_id = idx + 1  # start with 1
 | |
|             for gloss in glosses:
 | |
|                 gloss["sskj_sense_id"] = "{}-{}".format(
 | |
|                     entry_id, gloss["sskj_sense_id"])
 | |
|                 all_glosses.append(gloss)
 | |
|         return all_glosses
 |