Source code for twin4build.model.semantic_model.semantic_model

# Standard library imports
import io
import json
import logging
import os
import shutil
import subprocess
import sys
import warnings
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
from urllib.error import HTTPError

# Third party imports
import brickschema.brickify.src.handlers.Handler.TableHandler as table_handler
import pandas as pd
import pydotplus
import rdflib.tools.csv2rdf
import typer
from bs4 import BeautifulSoup
from openpyxl import load_workbook
from rdflib import RDF, RDFS, Graph, Literal, Namespace, URIRef
from rdflib.tools.rdf2dot import rdf2dot

# Local application imports
import twin4build.core as core
from twin4build.utils.mkdir_in_root import mkdir_in_root
from twin4build.utils.uppath import uppath


[docs] def get_short_name(uri: Union[str, URIRef], namespaces: Dict[str, Namespace]): for namespace in namespaces.values(): if namespace in str(uri): return str(uri).split(namespace)[-1] return None
[docs] class SemanticProperty: """Represents an ontology property""" def __init__(self, uri: Union[str, URIRef], graph: Graph): # Convert string URI to URIRef if needed self.uri = URIRef(uri) if isinstance(uri, str) else uri self.graph = graph # Property types to check for property_types = { URIRef("http://www.w3.org/2002/07/owl#ObjectProperty"), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#Property"), URIRef("http://www.w3.org/2002/07/owl#DatatypeProperty"), URIRef("http://www.w3.org/2002/07/owl#AnnotationProperty"), URIRef("http://www.w3.org/2002/07/owl#FunctionalProperty"), } # Check if URI represents a valid property is_property = any( (self.uri, RDF.type, prop_type) in self.graph for prop_type in property_types ) is_used_as_predicate = any(self.graph.triples((None, self.uri, None))) if not (is_property or is_used_as_predicate): raise ValueError( f"URI '{self.uri}' is not a valid property in the ontology" ) self._domain = None self._range = None @property def domain(self) -> List[URIRef]: """Get the domain (valid subject types) of this property""" if self._domain is None: self._domain = set(self.graph.objects(self.uri, RDFS.domain)) return self._domain @property def range(self) -> List[URIRef]: """Get the range (valid object types) of this property""" if self._range is None: self._range = set(self.graph.objects(self.uri, RDFS.range)) return self._range def __str__(self): return str(self.uri)
[docs] def get_short_name(self): for namespace in self.model.namespaces.values(): if namespace in str(self.uri): return str(self.uri).split(namespace)[-1] return None
[docs] def isproperty( self, cls: Union[ str, "SemanticProperty", Tuple[Union[str, "SemanticProperty"], ...], List[Union[str, "SemanticProperty"]], ], ) -> bool: """Check if this instance is of any of the given property types (including inheritance) Args: property: Single property or tuple/list of properties to check against Returns: True if instance matches any of the specified properties """ if not isinstance(cls, (tuple, list)): cls = (cls,) # Check each class in the tuple against all instance types for c in cls: if str(c) == str(self.uri): return True return False
[docs] class SemanticType: """Represents an ontology class with inheritance""" def __init__(self, uri: Union[str, URIRef], model: "SemanticModel", validate=False): # Convert string URI to URIRef if needed self.uri = URIRef(uri) if isinstance(uri, str) else uri self.model = model self.graph = model.graph if validate: # Built-in RDF/RDFS classes that are always valid BUILT_IN_CLASSES = { URIRef("http://www.w3.org/2000/01/rdf-schema#Resource"), URIRef("http://www.w3.org/2000/01/rdf-schema#Class"), URIRef("http://www.w3.org/2002/07/owl#Class"), } # Debug: Print all type triples for this URI # print(f"\nDebug - Checking type declarations for {self.uri}") # print("All triples where this URI is subject:") # for s, p, o in self.graph.triples((self.uri, None, None)): # print(f" {p} -> {o}") # print("All triples where this URI is object:") # for s, p, o in self.graph.triples((None, None, self.uri)): # print(f" {s} -> {p}") # Check if URI represents a valid type is_owl_class = ( self.uri, RDF.type, URIRef("http://www.w3.org/2002/07/owl#Class"), ) in self.graph is_rdfs_class = (self.uri, RDF.type, RDFS.Class) in self.graph is_built_in = self.uri in BUILT_IN_CLASSES # Additional checks for class-like behavior has_subclass = any(self.graph.triples((None, RDFS.subClassOf, self.uri))) is_subclass = any(self.graph.triples((self.uri, RDFS.subClassOf, None))) has_instances = any(self.graph.triples((None, RDF.type, self.uri))) # Check if it's a property (which should not be treated as a class) property_types = { URIRef("http://www.w3.org/2002/07/owl#ObjectProperty"), URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#Property"), URIRef("http://www.w3.org/2002/07/owl#DatatypeProperty"), URIRef("http://www.w3.org/2002/07/owl#AnnotationProperty"), URIRef("http://www.w3.org/2002/07/owl#FunctionalProperty"), } is_property = any( (self.uri, RDF.type, prop_type) in self.graph for prop_type in property_types ) is_used_as_predicate = any(self.graph.triples((None, self.uri, None))) # Check if it's used in domain/range declarations (suggesting it's a class) is_in_domain = any(self.graph.triples((None, RDFS.domain, self.uri))) is_in_range = any(self.graph.triples((None, RDFS.range, self.uri))) # print(f"Debug - Class checks for {self.uri}:") # print(f" is_owl_class: {is_owl_class}") # print(f" is_rdfs_class: {is_rdfs_class}") # print(f" is_built_in: {is_built_in}") # print(f" has_subclass: {has_subclass}") # print(f" is_subclass: {is_subclass}") # print(f" has_instances: {has_instances}") # print(f" is_property: {is_property}") # print(f" is_used_as_predicate: {is_used_as_predicate}") # print(f" is_in_domain: {is_in_domain}") # print(f" is_in_range: {is_in_range}") if is_property or is_used_as_predicate: raise ValueError(f"URI '{self.uri}' is a property, not a class") # Consider it a valid class if any of these conditions are true if not ( is_owl_class or is_rdfs_class or is_built_in or has_subclass or is_subclass or has_instances or is_in_domain or is_in_range ): raise ValueError( f"URI '{self.uri}' is not declared as a valid class/type in the ontology" ) self._parent_classes = None self._attributes = None @property def parent_classes(self) -> Set[str]: """Get all parent classes (including indirect) using RDFS reasoning""" if self._parent_classes is None: self._parent_classes = set() for parent in self.graph.transitive_objects(self.uri, RDFS.subClassOf): self._parent_classes.add(str(parent)) return self._parent_classes
[docs] def get_type_attributes(self) -> Dict[str, List[Any]]: """Find all possible attributes (properties) that can be used with instances of this class. This method looks for all ObjectProperties defined in the ontology that could be used with instances of this class (self.uri). These are properties that either: 1. Have no domain restrictions (can be used with any class) 2. Have this class or any of its parent classes in their domain Returns: Dictionary mapping property names to lists of their allowed range values """ if self._attributes is None: self._attributes = {} # Find all ObjectProperties in the ontology for prop, _, _ in self.graph.triples( (None, RDF.type, URIRef("http://www.w3.org/2002/07/owl#ObjectProperty")) ): # We are looking explicitly for ObjectProperties. This could maybe be generalized? # pred_name = str(prop).split('#')[-1] # Get the domains (if any) for this property domains = list(self.graph.objects(prop, RDFS.domain)) # Property is valid if it has no domain restrictions or if this class/parents are in its domain if not domains or any( str(domain) in self.parent_classes or domain == self.uri for domain in domains ): # Get the ranges for this property ranges = list(self.graph.objects(prop, RDFS.range)) self._attributes[prop] = ranges return self._attributes
def __str__(self): return str(self.uri)
[docs] def get_short_name(self): for namespace in self.model.namespaces.values(): if namespace in str(self.uri): return str(self.uri).split(namespace)[-1] return None
[docs] def istype( self, cls: Union[ str, "SemanticType", Tuple[Union[str, "SemanticType"], ...], List[Union[str, "SemanticType"]], ], ) -> bool: """Check if this instance is of any of the given class types (including inheritance) Args: cls: Single class or tuple/list of classes to check against Returns: True if instance matches any of the specified classes """ # Convert single class to tuple for consistent handling if not isinstance(cls, (tuple, list)): cls = (cls,) # Check each class in the tuple against all instance types for c in cls: if str(c) == str(self.uri): return True elif str(c) in self.parent_classes: return True return False
[docs] def has_subclasses(self) -> bool: """Check if this type has any subclasses""" return any(self.graph.triples((None, RDFS.subClassOf, self.uri)))
[docs] class SemanticObject: """Class to represent an ontology instance or literal""" def __init__( self, value: Union[str, URIRef, Literal], model: "SemanticModel", datatype: Optional[URIRef] = None, lang: Optional[str] = None, ): """ Initialize a semantic object representing either a URI resource or a literal Args: value: The URI or literal value model: The semantic model this object belongs to datatype: Optional datatype URI for literals. If provided (not None), the value is treated as a literal. lang: Optional language tag for literals. If provided (not None), the value is treated as a literal. """ self.model = model # Handle different input types if isinstance(value, Literal): # Special handling for rdf:JSON literals since rdflib doesn't recognize this datatype if value.datatype and value.datatype == core.namespace.RDF.JSON: datatype = value.datatype # Extract JSON from the lexical form since rdflib couldn't parse it lexical_form = str(value) # Remove the datatype suffix if present if lexical_form.endswith("^^rdf:JSON"): json_str = lexical_form[:-9] # Remove '^^rdf:JSON' else: json_str = lexical_form # Create a new literal with the parsed JSON value value = json.loads(json_str) # Handle the case where rdflib serializes None as 'None' string elif str(value) == "None": value = None datatype = value.datatype if hasattr(value, "datatype") else None # Already a Literal object self.uri = Literal(value, datatype=datatype) self.is_literal = True elif datatype is not None or lang is not None: # String that should be treated as a literal if datatype and datatype == core.namespace.RDF.JSON: # Extract JSON from the lexical form since rdflib couldn't parse it lexical_form = str(value) # Remove the datatype suffix if present if lexical_form.endswith("^^rdf:JSON"): json_str = lexical_form[:-9] # Remove '^^rdf:JSON' else: json_str = lexical_form # Create a new literal with the parsed JSON value value = json.loads(json_str) # Handle the case where rdflib serializes None as 'None' string elif str(value) == "None": value = None self.uri = Literal(value, datatype=datatype, lang=lang) self.is_literal = True else: # URI reference self.uri = URIRef(value) if isinstance(value, str) else value self.is_literal = False self._types = None self._attributes = None @property def type(self) -> List[SemanticType]: """Get all types of this instance""" if self.is_literal: # For literals, return the datatype if self._types is None: if self.uri.datatype: self._types = {self.model.get_type(self.uri.datatype)} else: # Plain literals without datatype self._types = {self.model.get_type(RDFS.Literal)} return self._types if self._types is None: # First check direct RDF.type assertions direct_types = set(self.model.graph.objects(self.uri, RDF.type)) # Then check for type through owl:sameAs relations same_as = set( self.model.graph.objects( self.uri, URIRef("http://www.w3.org/2002/07/owl#sameAs") ) ) for same_as_uri in same_as: same_as_types = set(self.model.graph.objects(same_as_uri, RDF.type)) direct_types.update(same_as_types) # Convert all direct types to SemanticType objects and include their parent types types_with_parents = set() for t in direct_types: # Add the direct type type_obj = self.model.get_type(t) types_with_parents.add(type_obj) # Add parent types for parent_uri in type_obj.parent_classes: types_with_parents.add(self.model.get_type(parent_uri)) self._types = types_with_parents return self._types
[docs] def get_predicate_object_pairs(self) -> Dict[str, Any]: """Return all attributes of this instance""" if self.is_literal: # Literals don't have properties return {} if self._attributes is None: self._attributes = {} for pred, obj in self.model.graph.predicate_objects(self.uri): is_class = any( self.model.graph.triples( (obj, RDF.type, URIRef("http://www.w3.org/2002/07/owl#Class")) ) ) or any(self.model.graph.triples((obj, RDF.type, RDFS.Class))) if is_class: obj_instance = self.model.get_type(obj) else: obj_instance = self.model.get_instance(obj) # if pred != RDF.type: #It is not necessary to include the type triples when used with signature pattern as the type explicitly defined in the signature pattern # Handle both URI and literal objects if pred in self._attributes: self._attributes[pred].append(obj_instance) else: self._attributes[pred] = [obj_instance] return self._attributes
[docs] def isinstance( self, cls: Union[ str, SemanticType, Tuple[Union[str, SemanticType], ...], List[Union[str, SemanticType]], ], ) -> bool: """Check if this instance is of any of the given class types (including inheritance) Args: cls: Single class or tuple/list of classes to check against Returns: True if instance matches any of the specified classes """ # Convert single class to tuple for consistent handling if not isinstance(cls, (tuple, list)): cls = (cls,) # Handle literals differently if self.is_literal: # Check each class in the tuple against the literal's datatype for c in cls: c_str = str(c) # Direct datatype match if self.uri.datatype and c_str == str(self.uri.datatype): return True # Check for XSD.string match with language-tagged literals if self.uri.language and c_str == str(self.model.XSD.string): return True # Plain literals (no datatype) match with XSD.string if ( not self.uri.datatype and not self.uri.language and c_str == str(self.model.XSD.string) ): return True return False if self.type: # Check each class in the tuple against all instance types for c in cls: for instance_type in self.type: if str(c) == str(instance_type.uri): return True elif str(c) in instance_type.parent_classes: return True return False
def __str__(self): return str(self.uri)
[docs] def get_short_name(self): for namespace in self.model.namespaces.values(): if namespace in str(self.uri): return str(self.uri).split(namespace)[-1] return None
[docs] def get_namespace(self): for namespace in self.model.namespaces.values(): for instance_type in self.type: if namespace in str(instance_type.uri): return namespace return None
[docs] class SemanticModel: def __init__( self, rdf_file: Optional[str] = None, namespaces: Optional[Dict[str, str]] = None, format: Optional[str] = None, parse_namespaces=False, verbose=False, id: str = "semantic_model", dir_conf: List[str] = None, ): """ Initialize the ontology model Args: rdf_file: Path or URL to the ontology file namespaces: Optional additional namespace prefix-URI pairs format: Optional format specification ('xml', 'turtle', 'n3', 'nt', 'json-ld', etc.) """ self.id = id self.rdf_file = rdf_file if namespaces is None: self.namespaces = {} else: assert isinstance( namespaces, dict ), 'The "namespaces" argument must be a dictionary.' namespaces_ = {} for prefix, uri in namespaces.items(): namespaces_[prefix.upper()] = Namespace(uri) setattr(self, prefix.upper(), namespaces_[prefix.upper()]) namespaces = namespaces_ self.namespaces = namespaces self.ontologies = {} self.format = format self.parsed_namespaces = set() self.error_namespaces = set() # Cache for instances self._instances = {} self._types = {} self._properties = {} if dir_conf is None: self.dir_conf = ["generated_files", "models", self.id] else: self.dir_conf = dir_conf if rdf_file is not None: if verbose: self._init(namespaces=namespaces, parse_namespaces=parse_namespaces) else: logging.disable( sys.maxsize ) # https://stackoverflow.com/questions/2266646/how-to-disable-logging-on-the-standard-error-stream with warnings.catch_warnings(): warnings.simplefilter("ignore") self._init(namespaces=namespaces, parse_namespaces=parse_namespaces) else: self.graph = Graph() for prefix, namespace in self.namespaces.items(): self.graph.bind(prefix.lower(), namespace) # logging.disable(logging.NOTSET) def _init( self, namespaces: Optional[Dict[str, Namespace]] = None, parse_namespaces=False ): # Load and parse the RDF file self.graph = self.get_graph(self.rdf_file, self.format) # Common namespaces self.RDF = RDF self.RDFS = RDFS if parse_namespaces: self.parse_namespaces(self.graph, namespaces=namespaces) # Extract namespaces from the graph for prefix, uri in self.graph.namespaces(): if prefix: # Skip empty prefix self.namespaces[prefix.upper()] = Namespace(uri) setattr(self, prefix.upper(), self.namespaces[prefix.upper()]) for prefix, namespace in self.namespaces.items(): self.graph.bind(prefix.lower(), namespace)
[docs] def get_graph_copy(self): """Create a complete copy of the graph including namespace bindings. Returns: Graph: A new graph instance with all triples and namespace bindings copied """ new_graph = Graph() # Copy all namespace bindings for prefix, namespace in self.graph.namespaces(): new_graph.bind(prefix, namespace) # Copy all triples for s, p, o in self.graph.triples((None, None, None)): new_graph.add((s, p, o)) return new_graph
[docs] def parse_namespaces(self, graph, namespaces=None): if namespaces is None: namespaces = dict(graph.namespaces()) for prefix, uri in namespaces.items(): try: if graph == self.graph: if ( uri not in self.parsed_namespaces and uri not in self.error_namespaces ): graph.parse(uri) self.parsed_namespaces.add(uri) self.namespaces[prefix.upper()] = Namespace(uri) self.ontologies[prefix.upper()] = str(uri) setattr(self, prefix.upper(), self.namespaces[prefix.upper()]) else: graph.parse(uri) except HTTPError: warnings.warn( f"HTTPError: Cannot parse namespace {uri}. Sometimes this error occurs when the ontology is not available at the same address as the namespace." ) self._handle_namespace_fallback(graph, prefix, uri) except Exception as e: warnings.warn(f"Cannot parse namespace {uri}: {str(e)}") self._handle_namespace_fallback(graph, prefix, uri)
def _handle_namespace_fallback(self, graph, prefix, uri): """Handle fallback for namespace parsing failures""" if graph == self.graph: if uri not in self.parsed_namespaces and uri not in self.error_namespaces: # Check if we have a fallback namespace in core.ontologies if hasattr(core.namespace, prefix.upper()) and hasattr( core.ontology, prefix.upper() ): if prefix.upper() in core.ontologies.namespaces: fallback_namespace_uri = getattr(core.namespace, prefix.upper()) fallback_ontology_uri = getattr(core.ontology, prefix.upper()) warnings.warn( f"Found namespace in core.ontology fallback: {fallback_ontology_uri}" ) try: graph.parse(fallback_ontology_uri) self.parsed_namespaces.add(str(fallback_namespace_uri)) self.namespaces[prefix.upper()] = fallback_namespace_uri self.ontologies[prefix.upper()] = fallback_ontology_uri setattr( self, prefix.upper(), self.namespaces[prefix.upper()] ) return except Exception as e: warnings.warn( f"Fallback namespace {fallback_ontology_uri} also failed: {str(e)}" ) else: warnings.warn(f"No fallback namespace found for {prefix.upper()}") # If no fallback or fallback failed, add to error namespaces self.error_namespaces.add(uri) else: # For non-main graph, try fallback but don't track if hasattr(core.namespace, prefix.upper()) and hasattr( core.ontology, prefix.upper() ): if prefix.upper() in core.ontologies.namespaces: fallback_namespace_uri = getattr(core.namespace, prefix.upper()) fallback_ontology_uri = getattr(core.ontology, prefix.upper()) warnings.warn( f"Found namespace in core.ontologies fallback: {fallback_namespace_uri}" ) try: graph.parse(fallback_ontology_uri) return except Exception as e: warnings.warn( f"Fallback namespace {fallback_ontology_uri} also failed: {str(e)}" ) else: warnings.warn(f"No fallback namespace found for {prefix.upper()}")
[docs] def get_dir( self, folder_list: List[str] = None, filename: Optional[str] = None ) -> Tuple[str, bool]: """ Get the directory path for storing model-related files. Args: folder_list (List[str]): List of folder names to create. filename (Optional[str]): Name of the file to create. Returns: Tuple[str, bool]: The full path to the directory or file, and a boolean indicating if the file exists. """ if folder_list is None: folder_list = [] folder_list_ = self.dir_conf.copy() folder_list_.extend(folder_list) filename, isfile = mkdir_in_root(folder_list=folder_list_, filename=filename) return filename, isfile
[docs] def get_graph( self, filename: str, format: Optional[str] = None, mappings_dir: Optional[str] = None, ) -> Graph: ext = os.path.splitext(filename)[1][1:].lower() if ext == "xlsx" or ext == "xlsm": if os.path.isfile(filename): # Get extension # ext = os.path.splitext(filename)[1][1:].lower() graph = self.parse_spreadsheet(filename, mappings_dir=mappings_dir) else: raise FileNotFoundError(f"File {filename} not found.") else: graph = Graph() if format is None: graph.parse(filename) else: graph.parse(filename, format=format) return graph
[docs] def filter_graph(self, query: str) -> Graph: """Filter the graph based on class and predicate filters. The filtering is done using OR(class_filter) and OR(predicate_filter). Args: class_filter: List of class URIs to include (None = no class filtering) predicate_filter: List of predicates to include (None = no predicate filtering) Returns: Filtered graph """ new_graph = self.get_graph_copy() # self.get_graph(self.rdf_file, self.format) keep_triples = set() if query is not None: if query.strip().upper().startswith("CONSTRUCT"): result = self.graph.query(query) # Create new graph with only the constructed triples for row in result: keep_triples.add(row) else: raise ValueError("Query must start with CONSTRUCT") for s, p, o in new_graph.triples((None, None, None)): if (s, p, o) not in keep_triples: new_graph.remove((s, p, o)) return new_graph
[docs] def get_instance( self, value: Union[str, URIRef, Literal], datatype: Optional[URIRef] = None, lang: Optional[str] = None, ) -> SemanticObject: """ Get a specific instance by URI or create a literal Args: value: The URI or literal value is_literal: Flag to indicate if the value should be treated as a literal datatype: Optional datatype URI for literals lang: Optional language tag for literals Returns: SemanticObject representing the URI or Literal """ # Handle literals if isinstance(value, Literal) or datatype is not None or lang is not None: # Create a new literal object (we don't cache literals) if isinstance(value, Literal): return SemanticObject(value, self) else: return SemanticObject(value, self, datatype=datatype, lang=lang) # Handle URIs uri = URIRef(value) if isinstance(value, str) else value if uri not in self._instances: self._instances[uri] = SemanticObject(uri, self) return self._instances[uri]
[docs] def get_type(self, uri: str) -> SemanticType: """Get a specific type by URI""" uri = URIRef(uri) if isinstance(uri, str) else uri if uri not in self._types: self._types[uri] = SemanticType(uri, self) return self._types[uri]
[docs] def get_property(self, uri: str) -> SemanticProperty: """Get a specific property by URI""" uri = URIRef(uri) if isinstance(uri, str) else uri if uri not in self._properties: self._properties[uri] = SemanticProperty(uri, self.graph) return self._properties[uri]
[docs] def get_instances_of_type( self, class_uris: Union[str, URIRef, SemanticType, Tuple, List] ) -> List[SemanticObject]: """ Get all instances that match any of the specified types (including subtypes) Args: class_uris: Single URI or tuple/list of URIs representing the types to match Returns: List of SemanticObject instances that match any of the specified types """ # Convert single class_uri to tuple for consistent handling if not isinstance(class_uris, tuple): if isinstance(class_uris, (list, set)): class_uris = tuple(class_uris) else: class_uris = (class_uris,) # print("GET INSTANCES OF TYPE") # Check if any of the requested types are XSD datatypes # If so, we need to find literals with those datatypes xsd_datatypes = [] uri_types = [] for class_uri in class_uris: if isinstance(class_uri, str): uri = URIRef(class_uri) elif isinstance(class_uri, SemanticType): uri = class_uri.uri elif isinstance(class_uri, URIRef): uri = class_uri else: raise ValueError(f"Invalid class URI: {class_uri}") # Check if this is an XSD datatype if str(uri).startswith("http://www.w3.org/2001/XMLSchema#"): xsd_datatypes.append(uri) else: uri_types.append(uri) instances = [] processed_instances = set() # To avoid duplicates # First, handle regular URI instances if uri_types: # Process each type in the tuple for uri in uri_types: # Get the class and all its subclasses subclasses = set([uri]) for subclass in self.graph.transitive_subjects(RDFS.subClassOf, uri): subclasses.add(subclass) # Get instances of the class and its subclasses for subclass in subclasses: # print(f"SUBCLASS: {subclass}") # First check direct type assertions for instance in self.graph.subjects(RDF.type, subclass): if instance not in processed_instances: inst_obj = self.get_instance(instance) instances.append(inst_obj) processed_instances.add(instance) # Then check for indirect type assertions through owl:sameAs for instance in self.graph.subjects(RDF.type, subclass): for same_as in self.graph.objects( instance, URIRef("http://www.w3.org/2002/07/owl#sameAs") ): if same_as not in processed_instances: inst_obj = self.get_instance(same_as) instances.append(inst_obj) processed_instances.add(same_as) # Then, handle literals with the specified datatypes if xsd_datatypes: # Find all literals in the graph with the specified datatypes for s, p, o in self.graph.triples((None, None, None)): if isinstance(o, Literal) and o.datatype in xsd_datatypes: # Create a SemanticObject for this literal literal_obj = self.get_instance(o) instances.append(literal_obj) return instances
[docs] def count_instances(self) -> int: return len(list(self.graph.subjects(RDF.type, None)))
[docs] def count_triples(self, s=None, p=None, o=None) -> int: return len(list(self.graph.triples((s, p, o))))
[docs] def visualize(self, query=None, include_full_uri=True): """ Visualize RDF graph with optional class and predicate filtering. The filter acts as an OR filter. Args: query: SPARQL CONSTRUCT query to filter the graph include_full_uri: If True, include the last row with the full instance URI. If False, remove it. """ # Omit rdf:type triples by default if query is None: query = """ CONSTRUCT { ?s ?p ?o } WHERE { ?s ?p ?o . FILTER (?p != rdf:type && ?p != rdfs:subClassOf) } """ light_black = "#3B3838" dark_blue = "#44546A" orange = "#DC8665" # "#C55A11" red = "#873939" grey = "#666666" light_blue = "#8497B0" green = "#83AF9B" # "#BF9000" green = "#83AF9B" white = "#FFFFFF" # Here we set the attributes for the tables. # The lists are indexed by the row number. # The first element is the attribute for the first row, i.e. the class # The second element is the attribute for the second row, i.e. the instance name # The third element is the attribute for the third row, i.e. the full uri # The fourth element is the attribute all the literals fill_color_map = { "default": [light_black, light_black, None, None], core.namespace.S4BLDG.BuildingSpace: [light_black, light_black, None, None], core.namespace.S4BLDG.Controller: [orange, orange, None, None], core.namespace.S4BLDG.AirToAirHeatRecovery: [ dark_blue, dark_blue, None, None, ], core.namespace.S4BLDG.Coil: [red, red, None, None], core.namespace.S4BLDG.Damper: [dark_blue, dark_blue, None, None], core.namespace.S4BLDG.Valve: [red, red, None, None], core.namespace.S4BLDG.Fan: [dark_blue, dark_blue, None, None], core.namespace.S4BLDG.SpaceHeater: [red, red, None, None], core.namespace.SAREF.Sensor: [green, green, None, None], core.namespace.SAREF.Meter: [green, green, None, None], core.namespace.S4BLDG.Schedule: [grey, grey, None, None], core.namespace.S4BLDG.Pump: [red, red, None, None], core.namespace.S4SYST.System: [green, green, None, None], core.namespace.S4SYST.Connection: [light_blue, light_blue, None, None], core.namespace.S4SYST.ConnectionPoint: [light_blue, light_blue, None, None], } font_color_map = { "default": [white, white, None, None], core.namespace.S4BLDG.BuildingSpace: [white, white, None, None], core.namespace.S4BLDG.Controller: [white, white, None, None], core.namespace.S4BLDG.AirToAirHeatRecovery: [white, white, None, None], core.namespace.S4BLDG.Coil: [white, white, None, None], core.namespace.S4BLDG.Damper: [white, white, None, None], core.namespace.S4BLDG.Valve: [white, white, None, None], core.namespace.S4BLDG.Fan: [white, white, None, None], core.namespace.S4BLDG.SpaceHeater: [white, white, None, None], core.namespace.SAREF.Sensor: [white, white, None, None], core.namespace.SAREF.Meter: [white, white, None, None], core.namespace.S4BLDG.Schedule: [white, white, None, None], core.namespace.S4BLDG.Pump: [white, white, None, None], core.namespace.S4SYST.System: [white, white, None, None], core.namespace.S4SYST.Connection: [white, white, None, None], core.namespace.S4SYST.ConnectionPoint: [white, white, None, None], } font_size_map = { "default": [10, 8, 8, 6], core.namespace.S4BLDG.BuildingSpace: [10, 8, 8, 6], core.namespace.S4BLDG.Controller: [10, 8, 8, 6], core.namespace.S4BLDG.AirToAirHeatRecovery: [10, 8, 8, 6], core.namespace.S4BLDG.Coil: [10, 8, 8, 6], core.namespace.S4BLDG.Damper: [10, 8, 8, 6], core.namespace.S4BLDG.Valve: [10, 8, 8, 6], core.namespace.S4BLDG.Fan: [10, 8, 8, 6], core.namespace.S4BLDG.SpaceHeater: [10, 8, 8, 6], core.namespace.SAREF.Sensor: [10, 8, 8, 6], core.namespace.SAREF.Meter: [10, 8, 8, 6], core.namespace.S4BLDG.Schedule: [10, 8, 8, 6], core.namespace.S4BLDG.Pump: [10, 8, 8, 6], core.namespace.S4SYST.System: [10, 8, 8, 6], core.namespace.S4SYST.Connection: [7, 6, 6, 5], core.namespace.S4SYST.ConnectionPoint: [7, 6, 6, 5], } font_bold_map = { "default": [True, True, None, None], core.namespace.S4BLDG.BuildingSpace: [True, True, None, None], core.namespace.S4BLDG.Controller: [True, True, None, None], core.namespace.S4BLDG.AirToAirHeatRecovery: [True, True, None, None], core.namespace.S4BLDG.Coil: [True, True, None, None], core.namespace.S4BLDG.Damper: [True, True, None, None], core.namespace.S4BLDG.Valve: [True, True, None, None], core.namespace.S4BLDG.Fan: [True, True, None, None], core.namespace.S4BLDG.SpaceHeater: [True, True, None, None], core.namespace.SAREF.Sensor: [True, True, None, None], core.namespace.SAREF.Meter: [True, True, None, None], core.namespace.S4BLDG.Schedule: [True, True, None, None], core.namespace.S4BLDG.Pump: [True, True, None, None], core.namespace.S4SYST.System: [True, True, None, None], core.namespace.S4SYST.Connection: [False, False, None, None], core.namespace.S4SYST.ConnectionPoint: [False, False, None, None], } # Filter graph graph = self.filter_graph(query) # graph = self.graph # TODO: Remove this stream = io.StringIO() rdf2dot(graph, stream) dg = pydotplus.graph_from_dot_data(stream.getvalue()) # Add class type to node labels for node in dg.get_nodes(): if node.obj_dict["name"] == "node": # del node.obj_dict["attributes"]["fontname"] node.obj_dict["attributes"]["fontname"] = "Courier-Bold" if "label" in node.obj_dict["attributes"]: html_str = node.obj_dict["attributes"]["label"] soup = BeautifulSoup(html_str, "html.parser") soup.table.attrs.update({"border": "2", "width": "100%"}) row = soup.find_all("tr")[1] col = row.find_all("td")[0] uri = col.string inst = self.get_instance(uri) type_ = inst.type z_ = {e for e in type_ if e.has_subclasses() == False} z = {e.uri.n3(self.graph.namespace_manager) for e in z_} if len(z) == 0: z = {"Unknown class"} z = " | ".join(z) # data b = soup.new_tag("b", attrs={}) b.string = z new_col = soup.new_tag("td", attrs={"bgcolor": "grey", "colspan": "2"}) new_col.append(b) new_row = soup.new_tag("tr", attrs={"border": "1px solid black"}) new_row.append(new_col) first_row = soup.find_all("tr")[0] first_row.insert_before(new_row) for i, row in enumerate( soup.find_all("tr") ): # [:-1]: #All except the last row, which is the full inst URI (small blue text) for col in row.find_all("td"): attrs = {} bold = False i_ = i if i < 4 else 3 for t in type_: if t.uri in fill_color_map: # print(col.attrs) if t.uri in fill_color_map: if fill_color_map[t.uri][i_] is not None: col.attrs["bgcolor"] = fill_color_map[t.uri][i_] elif fill_color_map["default"][i_] is not None: col.attrs["bgcolor"] = fill_color_map["default"][i_] if t.uri in font_color_map: if font_color_map[t.uri][i_] is not None: attrs["color"] = font_color_map[t.uri][i_] elif font_color_map["default"][i_] is not None: attrs["color"] = font_color_map["default"][i_] if t.uri in font_size_map: if font_size_map[t.uri][i_] is not None: attrs["point-size"] = font_size_map[t.uri][i_] elif font_size_map["default"][i_] is not None: attrs["point-size"] = font_size_map["default"][i_] if t.uri in font_bold_map: if font_bold_map[t.uri][i_] is not None: bold = font_bold_map[t.uri][i_] elif font_bold_map["default"][i_] is not None: bold = font_bold_map["default"][i_] font = soup.new_tag("font", attrs=attrs) if col.string: s = col.string new_col = soup.new_tag("td", attrs=col.attrs) col.replace_with(new_col) new_col.append(font) elif col.find("b"): s = col.find("b").string col.find("b").replace_with(font) if bold: s_ = soup.new_tag("b", attrs={}) s_.string = s s = s_ font.append(s) # Remove the last row if include_full_uri is False if not include_full_uri: all_rows = soup.find_all("tr") if len(all_rows) > 0: all_rows[2].decompose() node.obj_dict["attributes"]["label"] = ( str(soup).replace("&lt;", "<").replace("&gt;", ">") ) def del_dir(dirname): for filename in os.listdir(dirname): file_path = os.path.join(dirname, filename) if os.path.isfile(file_path): os.remove(file_path) dirname, _ = self.get_dir(folder_list=["graphs", "temp"]) # Delete all files in dirname del_dir(dirname) dot_filename = os.path.join(dirname, "object_graph.dot") dg.write(dot_filename) ### ccomps ### dirname_ccomps, _ = self.get_dir(folder_list=["graphs", "temp", "ccomps"]) dot_filename_ccomps = os.path.join(dirname_ccomps, "object_graph_ccomps.dot") del_dir(dirname_ccomps) app_path = shutil.which("ccomps") assert app_path is not None, "ccomps not found" args = [app_path, "-x", f"-o{dot_filename_ccomps}", f"{dot_filename}"] subprocess.run(args=args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) ### dot ### # Get all filenames generated in the folder dirname app_path = shutil.which("dot") assert app_path is not None, "dot not found. Is Graphviz installed?" filenames = [] for filename in os.listdir(dirname_ccomps): file_path = os.path.join(dirname_ccomps, filename) if os.path.isfile(file_path): dot_filename_ccomps = file_path dot_filename_dot = os.path.join( dirname_ccomps, filename.replace("ccomps", "dot") ) args = [ app_path, "-q", f"-o{dot_filename_dot}", f"{dot_filename_ccomps}", ] subprocess.run( args=args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) filenames.append(dot_filename_dot) dot_filename_ccomps = os.path.join(dirname, "object_graph_ccomps_joined.dot") with open(dot_filename_ccomps, "wb") as wfd: for f in filenames: with open(f, "rb") as fd: shutil.copyfileobj(fd, wfd) ### gvpack ### dot_filename_gvpack = os.path.join(dirname, "object_graph_gvpack.dot") app_path = shutil.which("gvpack") assert app_path is not None, "gvpack not found" args = [ app_path, "-array3", f"-o{dot_filename_gvpack}", f"{dot_filename_ccomps}", ] subprocess.run(args=args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) ### neato ### semantic_model_png, _ = self.get_dir( folder_list=["graphs"], filename="semantic_model.png" ) app_path = shutil.which("neato") assert app_path is not None, "neato not found" args = [ app_path, "-Tpng", "-n2", "-Gsize=10!", "-Gdpi=2000", "-q", # "-v", # verbose f"-o{semantic_model_png}", f"{dot_filename_gvpack}", ] subprocess.run(args=args, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
[docs] def parse_spreadsheet(self, spreadsheet, mappings_dir=None): """Parse spreadsheet into RDF graph using brickify tool""" # Overwrite typer progress bar to prevent it from printing to the console. class Overwriter: def __init__(self, iterable, *args, **kwargs): self.iterable = iterable def __enter__(self): return self.iterable def __exit__(self, exc_type, exc_value, traceback): pass def overwriter(iterable_object, *args, **kwargs): return Overwriter(iterable_object, *args, **kwargs) table_handler.progressbar = overwriter graph = Graph() # Get directories for storing files dirname, _ = self.get_dir(folder_list=["mappings"]) if mappings_dir is None: mappings_dir = os.path.join( uppath(os.path.abspath(__file__), 1), "mappings" ) # Read common macros content common_macros_path = os.path.join(mappings_dir, "common_macros.yml") with open(common_macros_path, "r") as fd: common_macros_content = fd.read() # Load workbook and process each sheet wb = load_workbook(spreadsheet) # # temp # # for sheet in wb.sheetnames: # csv_file = os.path.join(dirname, f"{sheet}.csv") # df = pd.read_excel(spreadsheet, sheet_name=sheet) # df.to_csv(csv_file, index=False) # ######## # aa for sheet in wb.sheetnames: # Check if there's a corresponding config file config_file = os.path.join(mappings_dir, f"{sheet}.yml") if os.path.exists(config_file): # Create new config file with macros in dirname new_config_path = os.path.join(dirname, f"{sheet}.yml") with open(config_file, "rb") as fd: with open(new_config_path, "wb") as wfd: shutil.copyfileobj(fd, wfd) wfd.write(b"\n") wfd.write(common_macros_content.encode()) # Convert sheet to CSV csv_file = os.path.join(dirname, f"{sheet}.csv") df = pd.read_excel(spreadsheet, sheet_name=sheet) df.to_csv(csv_file, index=False) # Run brickify with new config file output_file = os.path.join(dirname, f"{sheet}.ttl") # cmd = [ # "brickify", # csv_file, # "--output", output_file, # "--input-type", "csv", # "--config", new_config_path, # "--building-prefix", "bldg", # "--building-namespace", "http://example.org/building#" # ] # Parse the output file into our graph handler = table_handler.TableHandler( source=csv_file, input_format="csv", config_file=new_config_path, ) # Convert using the handler result_graph = handler.convert( "bldg", "http://example.org/building#", "site", "https://example.com/site#", ) # Serialize to the output file result_graph.serialize(destination=str(output_file), format="turtle") try: # subprocess.run(cmd, check=True) # Parse the output file into our graph graph.parse(output_file, format="turtle") except subprocess.CalledProcessError as e: print(f"Error running brickify for sheet {sheet}: {e}") except Exception as e: print(f"Error processing sheet {sheet}: {e}") return graph
[docs] def reason(self, namespaces=None): """Perform RDFS and OWL reasoning to infer additional triples. Currently, we infer: - Inverse properties - Symmetric properties - Transitive properties - Equivalent classes (owl:equivalentClass) - Subclass reasoning (rdfs:subClassOf) - Equivalent properties (owl:equivalentProperty) - Property chains (owl:propertyChainAxiom) - SameAs reasoning (owl:sameAs) """ if namespaces is None: # Convert Namespace objects to URI strings for parse_namespaces # namespaces = {prefix: str(uri) for prefix, uri in self.namespaces.items()} namespaces = self.namespaces ontology_graph = Graph() self.parse_namespaces(ontology_graph, namespaces=namespaces) # Track new triples to add new_triples = set() # Handle inverse properties for s, p, o in ontology_graph.triples( (None, URIRef("http://www.w3.org/2002/07/owl#inverseOf"), None) ): # Find and add inverse relationships for subj, _, obj in self.graph.triples((None, s, None)): new_triple = (obj, o, subj) new_triples.add(new_triple) for subj, _, obj in self.graph.triples((None, o, None)): new_triple = (obj, s, subj) new_triples.add(new_triple) # Handle symmetric properties symmetric_props = set( ontology_graph.subjects( RDF.type, URIRef("http://www.w3.org/2002/07/owl#SymmetricProperty") ) ) for prop in symmetric_props: for subj, _, obj in self.graph.triples((None, prop, None)): new_triples.add((obj, prop, subj)) # Handle transitive properties transitive_props = set( ontology_graph.subjects( RDF.type, URIRef("http://www.w3.org/2002/07/owl#TransitiveProperty") ) ) for prop in transitive_props: # Find all pairs connected by this property pairs = list(self.graph.triples((None, prop, None))) # For each pair, look for additional connections for s1, _, o1 in pairs: for s2, _, o2 in pairs: if o1 == s2: # Found a chain new_triples.add((s1, prop, o2)) # Handle equivalent classes (owl:equivalentClass) # This adds missing class assertions for instances using recursive reasoning def build_equivalence_classes( ontology_graph: Graph, ) -> Dict[URIRef, Set[URIRef]]: """ Recursively build equivalence classes by finding connected components. Args: ontology_graph: The ontology graph containing equivalentClass assertions Returns: Dictionary mapping each class to its complete equivalence class """ # Build adjacency list for equivalent classes adjacency = {} for class1, _, class2 in ontology_graph.triples( (None, URIRef("http://www.w3.org/2002/07/owl#equivalentClass"), None) ): if class1 not in adjacency: adjacency[class1] = set() if class2 not in adjacency: adjacency[class2] = set() adjacency[class1].add(class2) adjacency[class2].add(class1) # Find connected components using DFS visited = set() equivalence_map = {} def dfs(node: URIRef, component: Set[URIRef]): """Depth-first search to find connected component.""" if node in visited: return visited.add(node) component.add(node) if node in adjacency: for neighbor in adjacency[node]: dfs(neighbor, component) # Find all connected components for class_uri in adjacency: if class_uri not in visited: component = set() dfs(class_uri, component) # Map each class in the component to the full component for cls in component: equivalence_map[cls] = component return equivalence_map def propagate_class_assertions( equivalence_class: Set[URIRef], new_triples: set ): """ Propagate class assertions within an equivalence class. Args: equivalence_class: Set of equivalent classes new_triples: Set to collect new triples to add """ # Collect all instances of any class in this equivalence class all_instances = set() for class_uri in equivalence_class: instances = set(self.graph.subjects(RDF.type, class_uri)) all_instances.update(instances) # Add all instances to all classes in the equivalence class for instance in all_instances: for class_uri in equivalence_class: new_triples.add((instance, RDF.type, class_uri)) # Execute equivalent class reasoning equivalence_map = build_equivalence_classes(ontology_graph) for equivalence_class in equivalence_map.values(): propagate_class_assertions(equivalence_class, new_triples) # These have not been tested yet # # Handle subclass reasoning (rdfs:subClassOf) # # This adds missing class assertions for instances based on inheritance # for subclass, _, superclass in ontology_graph.triples((None, RDFS.subClassOf, None)): # # If an instance is of the subclass, it's also of the superclass # for instance in self.graph.subjects(RDF.type, subclass): # new_triples.add((instance, RDF.type, superclass)) # # Handle equivalent properties (owl:equivalentProperty) # for prop1, _, prop2 in ontology_graph.triples((None, URIRef("http://www.w3.org/2002/07/owl#equivalentProperty"), None)): # # If subject-property1-object exists, add subject-property2-object # for subj, _, obj in self.graph.triples((None, prop1, None)): # new_triples.add((subj, prop2, obj)) # # If subject-property2-object exists, add subject-property1-object # for subj, _, obj in self.graph.triples((None, prop2, None)): # new_triples.add((subj, prop1, obj)) # # Handle property chains (owl:propertyChainAxiom) # # This handles cases where property1 o property2 -> property3 # for chain_prop, _, chain_list in ontology_graph.triples((None, URIRef("http://www.w3.org/2002/07/owl#propertyChainAxiom"), None)): # if isinstance(chain_list, URIRef): # # Handle simple property chains (property1 o property2 -> property3) # # Find the chain properties and target property # chain_properties = list(ontology_graph.objects(chain_list, RDF.first)) # target_property = list(ontology_graph.objects(chain_list, RDF.rest)) # if len(chain_properties) == 2 and target_property: # prop1, prop2 = chain_properties[0], chain_properties[1] # target_prop = target_property[0] # # Find all chains: if a->b via prop1 and b->c via prop2, then a->c via target_prop # for s1, _, o1 in self.graph.triples((None, prop1, None)): # for s2, _, o2 in self.graph.triples((None, prop2, None)): # if o1 == s2: # Found a chain # new_triples.add((s1, target_prop, o2)) # # Handle sameAs reasoning (owl:sameAs) # # This propagates all properties from one individual to its sameAs individuals # for ind1, _, ind2 in self.graph.triples((None, URIRef("http://www.w3.org/2002/07/owl#sameAs"), None)): # # Copy all properties from ind1 to ind2 # for subj, pred, obj in self.graph.triples((ind1, None, None)): # if pred != URIRef("http://www.w3.org/2002/07/owl#sameAs"): # Avoid infinite loops # new_triples.add((ind2, pred, obj)) # # Copy all properties from ind2 to ind1 # for subj, pred, obj in self.graph.triples((ind2, None, None)): # if pred != URIRef("http://www.w3.org/2002/07/owl#sameAs"): # Avoid infinite loops # new_triples.add((ind1, pred, obj)) # # Handle functional properties (owl:FunctionalProperty) # # If a property is functional, ensure we don't have conflicting values # functional_props = set(ontology_graph.subjects(RDF.type, URIRef("http://www.w3.org/2002/07/owl#FunctionalProperty"))) # for prop in functional_props: # # Group by subject and keep only the first value for each subject # subject_values = {} # for subj, _, obj in self.graph.triples((None, prop, None)): # if subj not in subject_values: # subject_values[subj] = obj # # Note: In a real implementation, you might want to handle conflicts differently # # Handle inverse functional properties (owl:InverseFunctionalProperty) # # If a property is inverse functional, ensure we don't have conflicting subjects # inverse_functional_props = set(ontology_graph.subjects(RDF.type, URIRef("http://www.w3.org/2002/07/owl#InverseFunctionalProperty"))) # for prop in inverse_functional_props: # # Group by object and keep only the first subject for each object # object_subjects = {} # for subj, _, obj in self.graph.triples((None, prop, None)): # if obj not in object_subjects: # object_subjects[obj] = subj # # Note: In a real implementation, you might want to handle conflicts differently # Add all new triples to the graph for s, p, o in new_triples: self.graph.add((s, p, o))
[docs] def serialize(self, folder_list: List[str] = None): dirname, _ = self.get_dir( folder_list=folder_list, filename="semantic_model.ttl" ) self.graph.serialize(destination=str(dirname), format="turtle")