# -*- coding: utf-8 -*-
import logging
import uuid
from collections import defaultdict
from typing import Optional, Union, List
from zipfile import ZipFile
import pandas as pd
import rdflib
from rdflib import Graph
from rdflib.namespace import DC, RDF, DCTERMS, NamespaceManager
from rdflib.store import Store
from rdflib.term import IdentifiedNode, URIRef, Literal
from rdflib.plugins.stores import sparqlstore
from iribaker import to_iri
from .converters import nafConverter
from .nafdocument import NafDocument
from .nifobjects import (
NifContext,
NifContextCollection,
NifSentence,
)
from .utils import tokenize_text
from .const import ITSRDF, NIF, OLIA, DEFAULT_URI, DEFAULT_PREFIX
from .lemonobjects import Lexicon, LexicalEntry, Form
[docs]class NifGraph(Graph):
"""
An NIF Graph
The constructor accepts the same arguments as a `rdflib.Graph`.
:param file: name of the file to read
:param nafdocument: an xml file in NLP Annotation Format
:param collection: an NifContextCollection
"""
def __init__(
self,
file: str = None,
nafdocument: NafDocument = None,
collection: NifContextCollection = None,
URIScheme: str = None,
store: Union[Store, str] = "default",
identifier: Optional[Union[IdentifiedNode, str]] = None,
namespace_manager: Optional[NamespaceManager] = None,
base: Optional[str] = None,
bind_namespaces: str = "core",
):
"""
An NIF Graph
:param file: name of the file to read
:param nafdocument: an xml file in NLP Annotation Format
:param collection: an NifContextCollection
"""
super(NifGraph, self).__init__(
store=store,
identifier=identifier,
namespace_manager=namespace_manager,
base=base,
bind_namespaces=bind_namespaces,
)
self.URIScheme = URIScheme
self.bind("rdf", ITSRDF)
self.bind("rdfs", ITSRDF)
self.bind("itsrdf", ITSRDF)
self.bind("xsd", ITSRDF)
self.bind("dcterms", DCTERMS)
self.bind("dc", DC)
self.bind("nif", NIF)
self.bind("olia", OLIA)
self.open(file=file, nafdocument=nafdocument, collection=collection)
[docs] def open(
self,
file: str = None,
nafdocument: NafDocument = None,
collection: NifContextCollection = None,
):
"""
Read data from multiple sources into current `NifGraph` object.
:param file: name of the file to read
:param nafdocument: an xml file in NLP Annotation Format
:param collection: an NifContextCollection
:return: None
"""
if file is not None:
self.__parse_file(file=file)
elif nafdocument is not None:
self.__parse_nafdocument(nafdocument=nafdocument)
elif collection is not None:
self.__parse_collection(collection=collection)
return self
def __parse_nafdocument(self, nafdocument: NafDocument = None):
"""
Read data from an xml file in NLP Annotation Format
:param nafdocument: an xml file in NLP Annotation Format
:return: None
"""
logging.info(".. Parsing NafDocument to NifGraph")
doc_uri = nafdocument.header["public"]["{http://purl.org/dc/elements/1.1/}uri"]
doc_uuid = "nif-" + str(uuid.uuid3(uuid.NAMESPACE_DNS, doc_uri).hex)
base_uri = DEFAULT_URI
base_prefix = DEFAULT_PREFIX
collection = nafConverter(
collection_name="collection",
context_name=doc_uuid,
nafdocument=nafdocument,
base_uri=base_uri,
base_prefix=base_prefix,
URIScheme=self.URIScheme,
)
self.__parse_collection(collection)
# self.parse_collection(collection)
def __parse_collection(self, collection: NifContextCollection = None):
"""
Read data from a NifContextCollection object.
:param collection: a NifContextCollection
:return: None
"""
g = Graph(identifier=self.identifier)
for r in collection.triples():
g.add(r)
self += g
def __parse_file(self, file: str = None):
"""
Read data from a file.
filename ending with "naf.xml": file is read and parsed as
an xml file in NLP Annotation Format.
filename ending with "zip": file is extracted and content
is parsed.
:param file: a filename.
:return: None
"""
if file is not None:
if file[-7:].lower() == "naf.xml":
logging.info(".. Parsing file " + file + "")
nafdocument = NafDocument().open(file)
self.__parse_nafdocument(nafdocument=nafdocument)
else:
if file[-3:].lower() == "zip":
# if zip file then parse all files in zip
with ZipFile(file, mode="r") as zipfile:
logging.info(".. Reading zip file " + file)
for filename in zipfile.namelist():
with zipfile.open(filename) as f:
logging.info(
".. Parsing file " + filename + " from zip file"
)
if filename[-4:].lower() == "hext":
self += Graph().parse(
data=f.read().decode(), format="hext"
)
elif filename[-3:].lower() == "ttl":
self += Graph().parse(
data=f.read().decode(), format="turtle"
)
else:
self += Graph().parse(data=f.read().decode())
elif file[-4:].lower() == "hext":
# if file ends with .hext then parse as hext file
with open(file, encoding="utf-8") as f:
logging.info(".. Parsing file " + file + "")
self += Graph().parse(data=f.read(), format="hext")
else:
# otherwise let rdflib determine format
with open(file, encoding="utf-8") as f:
logging.info(".. Parsing file " + file + "")
self += Graph().parse(data=f.read())
@property
def contexts(self, uri: str = DEFAULT_URI) -> list:
"""
This property constructs and returns a nif:Context from the NifGraph.
return list of nif:Context in the graph
"""
uris = list(self.subjects(RDF.type, NIF.Context))
return [NifContext(uri=uri, graph=self) for uri in uris]
@property
def collections(self, uri: str = DEFAULT_URI) -> list:
"""
This property constructs and returns a list of nif:ContextCollection from the NifGraph.
return list of `nif:ContextCollection` in the graph
"""
uris = list(self.subjects(RDF.type, NIF.ContextCollection))
return [NifContextCollection(uri=uri, graph=self) for uri in uris]
@property
def collection(self, uri: str = DEFAULT_URI) -> NifContextCollection:
"""
This property constructs and returns the first nif:ContextCollection from the NifGraph.
return the first nif:ContextCollection in the graph
"""
for uri in list(self.subjects(RDF.type, NIF.ContextCollection)):
return NifContextCollection(uri=uri, graph=self)
# dict_collections = self.query_rdf_type(NIF.ContextCollection)
# dict_context = self.query_rdf_type(NIF.Context)
# logging.info(".. extracting nif statements")
# logging.info(
# ".... found " + str(len(dict_collections.keys())) + " collections."
# )
# logging.info(".... found " + str(len(dict_context.keys())) + " contexts.")
# for collection_uri in dict_collections.keys():
# collection = NifContextCollection(uri=collection_uri)
# for predicate in dict_collections[collection_uri].keys():
# if predicate == NIF.hasContext:
# for context_uri in dict_collections[collection_uri][predicate]:
# if isinstance(
# self.store,
# rdflib.plugins.stores.sparqlstore.SPARQLUpdateStore,
# ):
# graph = self.context_graph(uri=context_uri)
# else:
# graph = self
# nif_context = NifContext(
# URIScheme=self.URIScheme,
# uri=context_uri,
# graph=graph,
# )
# collection.add_context(context=nif_context)
# return collection
# else:
# collection = NifContextCollection(uri=uri)
# for context_uri in dict_context.keys():
# if isinstance(
# self.store, rdflib.plugins.stores.sparqlstore.SPARQLUpdateStore
# ):
# graph = self.context_graph(uri=context_uri)
# else:
# graph = self
# nif_context = NifContext(
# URIScheme=self.URIScheme,
# uri=context_uri,
# graph=graph,
# )
# collection.add_context(context=nif_context)
# return collection
@property
def catalog(self):
""" """
# derive the conformsTo from the collection
if isinstance(self.store, sparqlstore.SPARQLUpdateStore):
q = (
"""
SELECT ?s ?p ?o
WHERE {
SERVICE <"""
+ self.store.query_endpoint
+ """>
{
?s rdf:type nif:ContextCollection .
?s ?p ?o .
}
}"""
)
else:
q = """
SELECT ?s ?p ?o
WHERE {
?s rdf:type nif:ContextCollection .
?s ?p ?o .
}"""
results = self.query(q)
collections = defaultdict(dict)
for s, p, o in results:
if p == NIF.hasContext:
if collections[s].get(p, None) is None:
collections[s][p] = [o]
else:
collections[s][p].append(o)
else:
collections[s][p] = o
# find all context in the graphs with corresponding data
if isinstance(self.store, sparqlstore.SPARQLUpdateStore):
q = (
"""
SELECT ?s ?p ?o
WHERE {
SERVICE <"""
+ self.store.query_endpoint
+ """>
{
?s rdf:type nif:Context . ?s ?p ?o .
}
}"""
)
else:
q = """
SELECT ?s ?p ?o
WHERE {
?s rdf:type nif:Context . ?s ?p ?o .
}"""
results = self.query(q)
# construct DataFrame from query results
d = defaultdict(dict)
index = list()
columns = set()
for result in results:
idx = result[0]
col = result[1].n3(self.namespace_manager)
if isinstance(result[2], Literal):
val = result[2].value
else:
val = result[2]
if "dc:" in col or "dcterms:" in col:
d[idx][col] = val
columns.add(col)
if idx not in index:
index.append(idx)
df = pd.DataFrame(
index=index,
columns=list(columns),
data=[[d[idx][col] for col in columns] for idx in index],
)
for idx in df.index:
for c in collections.keys():
if idx in collections[c][NIF.hasContext]:
df.loc[idx, DCTERMS.conformsTo] = collections[c][DCTERMS.conformsTo]
df.loc[idx, NIF.ContextCollection] = c
df = df.reindex(sorted(df.columns), axis=1)
return df
# def query_rdf_type(self, rdf_type: URIRef = None):
# if isinstance(self.store, sparqlstore.SPARQLUpdateStore):
# q = (
# """
# SELECT ?s ?p ?o
# WHERE {
# SERVICE <"""
# + self.store.query_endpoint
# + """>
# {
# ?s rdf:type """
# + rdf_type.n3(self.namespace_manager)
# + """ .
# ?s ?p ?o .
# }
# }"""
# )
# else:
# q = (
# """
# SELECT ?s ?p ?o
# WHERE {
# ?s rdf:type """
# + rdf_type.n3(self.namespace_manager)
# + """ .
# ?s ?p ?o .
# }"""
# )
# results = self.query(q)
# d = defaultdict(dict)
# for result in results:
# idx = result[0]
# col = result[1]
# val = result[2]
# if col == NIF.hasContext:
# if col in d[idx].keys():
# d[idx][col].append(val)
# else:
# d[idx][col] = [val]
# elif val in OLIA:
# if col in d[idx].keys():
# d[idx][col].append(val)
# else:
# d[idx][col] = [val]
# else:
# d[idx][col] = val
# return d
[docs] def context_graph(self, uri: URIRef = None):
""" """
if isinstance(self.store, rdflib.plugins.stores.sparqlstore.SPARQLUpdateStore):
q = (
"""
SELECT ?s ?p ?o
WHERE {
SERVICE <"""
+ graph.store.query_endpoint
+ """>
{
?s nif:referenceContext """
+ uri.n3(graph.namespace_manager)
+ """ .
?s ?p ?o .
}
}"""
)
else:
q = (
"""
SELECT ?s ?p ?o
WHERE {
?s nif:referenceContext """
+ uri.n3(graph.namespace_manager)
+ """ .
?s ?p ?o .
}"""
)
results = self.query(q)
graph = Graph(store="SimpleMemory")
for s, p, o in results:
# necessary if data is read from http protocol
if isinstance(o, Literal) and isinstance(o.value, str):
o = Literal(o.value.replace("\r\n", "\n"), datatype=XSD.string)
graph.add((s, p, o))
return graph
@property
def lexicon(self):
""" """
def noNumber(s: str = ""):
return not s.replace(".", "", 1).replace(",", "", 1).isdigit()
# query for all anchorOfs of all word with optional lemma
if isinstance(self.store, rdflib.plugins.stores.sparqlstore.SPARQLUpdateStore):
q = (
"""
SELECT ?anchor ?lemma ?pos ?lang
WHERE {
SERVICE <"""
+ graph.store.query_endpoint
+ """>
{
?w rdf:type nif:Word .
?w nif:anchorOf ?anchor .
?w nif:referenceContext ?context .
OPTIONAL {?w nif:lemma ?lemma . } .
OPTIONAL {?w nif:pos ?pos . } .
OPTIONAL {?context dc:language ?lang }
}
}"""
)
else:
q = """
SELECT ?anchor ?lemma ?pos ?lang
WHERE {
?w rdf:type nif:Word .
?w nif:anchorOf ?anchor .
?w nif:referenceContext ?context .
OPTIONAL {?w nif:lemma ?lemma . } .
OPTIONAL {?w nif:pos ?pos . } .
OPTIONAL {?context dc:language ?lang }
}
"""
# execute the query
results = self.query(q)
lexica = dict()
for anchorOf, lemma, pos, lang in results:
if lemma is not None and noNumber(lemma):
# default language is "en"
if lang is None:
lang = "en"
# construct lexicon if necessary
if lang not in lexica.keys():
lexica[lang] = Lexicon(uri=URIRef(DEFAULT_URI + "lexicon/" + lang))
lexica[lang].set_language(lang)
# derive lexical entry uri from the lemma
if not isinstance(lemma, URIRef):
entry_uri = to_iri(str(lexica[lang].uri) + "/" + lemma)
else:
entry_uri = lemma
# create the lexical entry
entry = LexicalEntry(uri=entry_uri, language=lexica[lang].language)
# set canonicalForm (this is the lemma)
entry.set_canonicalForm(
Form(
uri=URIRef(entry_uri),
formVariant="canonicalForm",
writtenReps=[lemma],
)
)
# set otherForm if the anchorOf is not the same as the lemma
if anchorOf.value != lemma.value:
entry.set_otherForms(
[
Form(
uri=URIRef(entry_uri),
formVariant="otherForm",
writtenReps=[anchorOf],
)
]
)
# set part of speech if it exists
if pos is not None:
entry.set_partOfSpeechs([pos])
lexica[lang].add_entry(entry)
return lexica
[docs] def get(self, uri: URIRef = None):
""" """
if uri is None:
return None
else:
r = list(self.triples([uri, RDF.type, None]))
if len(r) > 0:
rdf_type = r[0][2]
else:
logging.warning("uri not found: " + str(uri))
return None
if rdf_type == NIF.ContextCollection:
collection = NifContextCollection(uri=uri, graph=self)
return collection
elif rdf_type == NIF.Context:
return NifContext(uri=uri, graph=self)
else:
context_uri = uri.split("&nif=")[0] + "&nif=context"
context = NifContext(uri=context_uri, graph=self)
if rdf_type == NIF.Sentence:
return NifSentence(uri=uri, referenceContext=context, graph=self)
elif rdf_type == NIF.Page:
return NifPage(uri=uri, referenceContext=context, graph=self)
elif rdf_type == NIF.Paragraph:
return NifParagraph(uri=uri, referenceContext=context, graph=self)
elif rdf_type == NIF.Phrase:
return NifPhrase(uri=uri, referenceContext=context, graph=self)
elif rdf_type == NIF.Word:
return NifWord(uri=uri, referenceContext=context, graph=self)