Source code for brickschema.merge

"""
The `merge` module implements data integration methods for merging Brick graphs
together. This is based on techniques described in 'Shepherding Metadata Through
the Building Lifecycle' published in BuildSys 2020
"""
from colorama import init as colorama_init
from colorama import Fore, Style

from rdflib import URIRef
from collections import defaultdict
import dedupe
from .graph import Graph
from dedupe._typing import (
    TrainingData,
    Literal,
)
import sys
from dedupe.core import unique
from typing import List, Tuple, Any

colorama_init()
DEBUG = False


def _unpack_linked_records(linked_records):
    s = set()
    for (e1, e2), _ in linked_records:
        s.add(e1)
        s.add(e2)
    return s


[docs]def unify_entities(G, e1, e2): """ Replaces all instances of e2 with e1 in graph G """ print(Style.BRIGHT + Fore.CYAN + f"Unifying {e1} and {e2}" + Style.RESET_ALL) e1 = URIRef(e1) e2 = URIRef(e2) pos = G.predicate_objects(subject=e2) for (p, o) in pos: G.remove((e2, p, o)) G.add((e1, p, o)) sps = G.subject_predicates(object=e2) for (s, p) in sps: G.remove((s, p, e2)) G.add((s, p, e1))
[docs]def get_entity_feature_vectors(g, namespace): """ Returns a dictionary of features for each entity in graph 'g'. Entities are any node with at least one `rdf:type` edge that is in the given namespace. """ entities = g.query( f"""SELECT ?ent ?type ?label WHERE {{ ?ent rdf:type ?type . OPTIONAL {{ ?ent rdfs:label ?label }} . FILTER(STRSTARTS(STR(?ent), STR(<{namespace}>))) }}""" ) features = defaultdict(lambda: defaultdict(set)) for row in entities: entity, etype, label = row features[str(entity)]["type"].add(str(etype)) features[str(entity)]["label"].add(str(label)) features[str(entity)]["uri"].add(str(entity)) if DEBUG: for ent, featurelist in features.items(): print(ent) for name, vals in featurelist.items(): print(f" {name} = {vals}") return features
[docs]def flatten_features(features): for _, flist in features.items(): for k, v in flist.items(): if isinstance(v, (list, set)): flist[k] = list(v)[0]
[docs]def cluster_by_type(g1, g2, namespace): clusters = defaultdict(lambda: {"g1": {}, "g2": {}}) g1_features = get_entity_feature_vectors(g1, namespace) g2_features = get_entity_feature_vectors(g2, namespace) for g1_ent, g1_ent_feat in g1_features.items(): for etype in g1_ent_feat["type"]: flat_features = g1_ent_feat.copy() flat_features["type"] = etype clusters[etype]["g1"][g1_ent] = flat_features for g2_ent, g2_ent_feat in g2_features.items(): for etype in g2_ent_feat["type"]: flat_features = g2_ent_feat.copy() flat_features["type"] = etype clusters[etype]["g2"][g2_ent] = flat_features if DEBUG: for ent_type, cluster in clusters.items(): print(ent_type) print(" ", cluster["g1"].keys()) print(" ", cluster["g2"].keys()) print() return clusters
def _merge_features(fields, g1_features, g2_features): while True: if len(g1_features) == len(g2_features) == 1: linked_records = ( (list(g1_features.keys())[0], list(g2_features.keys())[0]), 1.0, ) g1_features = [] g2_features = [] break linker = dedupe.RecordLink(fields) linker.prepare_training(g1_features, g2_features) confirmed_matches = console_label(linker) linker.train() linked_records = linker.join( g1_features, g2_features, 0.0, constraint="one-to-one" ) # remove records from linked_records that are in confirmed_matches for (e1, e2) in confirmed_matches: idx = 0 while idx < len(linked_records): pair = linked_records[idx] for (pair, _) in linked_records: if e1 in pair or e2 in pair: linked_records.pop(idx) break idx += 1 # replace linked record scores with 1.0 if the user explicitly # marked them as equivalent. Then fill in other user-marked pairs # of linked records at the end idx = 0 while idx < len(confirmed_matches): pair = confirmed_matches[idx] for lidx, (lpair, _) in enumerate(linked_records): if pair == lpair: linked_records[lidx] = (pair, 1.0) confirmed_matches.pop(idx) idx -= 1 # cancel out the increment break idx += 1 linked_records.extend([(pair, 1.0) for pair in confirmed_matches]) print( Style.BRIGHT + Fore.YELLOW + "Is this matching correct?" + Style.RESET_ALL ) for (e1, e2), similarity in linked_records: for field in unique(field["field"] for field in fields): g1_val = g1_features[e1][field] g2_val = g2_features[e2][field] print(f"{g1_val:<50} | {g2_val:<50}") print(f"Similarity: {similarity}") print("-" * 20) ans = input(Fore.YELLOW + "[y/n]? " + Style.RESET_ALL) if ans.lower() == "y": print( Fore.GREEN + "All correct! Moving on to any stragglers" + Style.RESET_ALL ) break else: print(Fore.RED + "Re-labeling..." + Style.RESET_ALL) linked_entities = _unpack_linked_records(linked_records) if len(linked_entities) != len(g1_features) or len(linked_entities) != len( g2_features ): leftover_g1 = set(g1_features.keys()).difference(linked_entities) leftover_g2 = set(g2_features.keys()).difference(linked_entities) leftover_g1 = {k: v for (k, v) in g1_features.items() if k in leftover_g1} leftover_g2 = {k: v for (k, v) in g2_features.items() if k in leftover_g2} return linked_records, leftover_g1, leftover_g2
[docs]def merge_type_cluster(g1, g2, namespace, similarity_threshold=0.9, merge_types=None): merge_types = list(map(str, get_common_types(g1, g2, namespace))) _g1 = Graph().load_file("Brick.ttl").from_triples(g1.triples((None, None, None))) _g1.expand("brick") _g2 = Graph().load_file("Brick.ttl").from_triples(g2.triples((None, None, None))) _g2.expand("brick") clusters = cluster_by_type(_g1, _g2, namespace) G = g1 + g2 linked = set() for etype, cluster in clusters.items(): if merge_types and etype not in merge_types: continue print( Style.BRIGHT + Fore.YELLOW + f"Handling clusters for {etype}" + Style.RESET_ALL ) # if not same # of entities in both clusters, # then type alignment will be less successful for e in linked: if e in cluster["g1"]: cluster["g1"].pop(e) if e in cluster["g2"]: cluster["g2"].pop(e) if not len(cluster["g1"]) or not len(cluster["g2"]): continue g1_features = cluster["g1"].copy() g2_features = cluster["g2"].copy() flatten_features(g1_features) flatten_features(g2_features) fields = [ {"field": "uri", "type": "String"}, # {"field": "type", "type": "String"}, {"field": "label", "type": "String"}, ] while True: linked_records, leftover_g1, leftover_g2 = _merge_features( fields, g1_features, g2_features ) for link in linked_records: (e1, e2), similarity = link if similarity < similarity_threshold: print( Fore.RED + f"cannot merge {e1}, {e2} due to similarity threshold {similarity} < {similarity_threshold}" + Style.RESET_ALL ) continue linked.add(e1) linked.add(e2) unify_entities(G, e1, e2) if leftover_g1 and leftover_g2 and len(leftover_g1) and len(leftover_g2): print( Style.BRIGHT + Fore.YELLOW + "More entities left to merge" + Style.RESET_ALL ) g1_features = leftover_g1 g2_features = leftover_g2 continue break return G
# def merge_record_linkage(g1, g2, namespace): # g1_features = get_entity_feature_vectors(g1, namespace) # g2_features = get_entity_feature_vectors(g2, namespace) # # flatten_features(g1_features) # flatten_features(g2_features) # # fields = [ # {"field": "uri", "type": "String"}, # {"field": "type", "type": "String"}, # {"field": "label", "type": "String"}, # ] # linker = dedupe.RecordLink(fields) # # linker.prepare_training(g2_features, g1_features) # dedupe.console_label(linker) # linker.train() # linked_records = linker.join(g1_features, g2_features, 0.0, constraint="one-to-one")
[docs]def get_common_types(g1, g2, namespace): """ Returns the list of types that are common to both graphs. A type is included if both graphs have instances of that type """ g1ents = g1.query( f"""SELECT DISTINCT ?type WHERE {{ ?ent rdf:type ?type . FILTER(STRSTARTS(STR(?ent), STR(<{namespace}>))) }}""" ) g2ents = g2.query( f"""SELECT DISTINCT ?type WHERE {{ ?ent rdf:type ?type . FILTER(STRSTARTS(STR(?ent), STR(<{namespace}>))) }}""" ) g1types = set([x[0] for x in g1ents]) g2types = set([x[0] for x in g2ents]) return list(g1types.intersection(g2types))
# using code from dedupe.io
[docs]def console_label(deduper: dedupe.api.ActiveMatching) -> None: # noqa: C901 """ Train a matcher instance (Dedupe, RecordLink, or Gazetteer) from the command line. Example .. code:: python > deduper = dedupe.Dedupe(variables) > deduper.prepare_training(data) > dedupe.console_label(deduper) """ confirmed_matches = [] finished = False use_previous = False fields = unique(field.field for field in deduper.data_model.primary_fields) buffer_len = 1 # Max number of previous operations examples_buffer: List[Tuple[Any, Literal["match", "distinct", "uncertain"]]] = [] uncertain_pairs: List[Any] = [] # don't re-use items that are confirmed with a mapping mapped_items = set() while not finished: if use_previous: record_pair, _ = examples_buffer.pop(0) use_previous = False else: try: if not uncertain_pairs: uncertain_pairs = deduper.uncertain_pairs() while True: record_pair = uncertain_pairs.pop() if ( len( set([x["uri"] for x in record_pair]).intersection( mapped_items ) ) > 0 ): examples_buffer.insert(0, (record_pair, "distinct")) # TODO: do i need to process these? else: break except IndexError: break n_match = len(deduper.training_pairs["match"]) + sum( label == "match" for _, label in examples_buffer ) n_distinct = len(deduper.training_pairs["distinct"]) + sum( label == "distinct" for _, label in examples_buffer ) for pair in record_pair: for field in fields: line = "%s : %s" % (field, pair[field]) print(line, file=sys.stderr) print(file=sys.stderr) print( "{0}/10 positive, {1}/10 negative".format(n_match, n_distinct), file=sys.stderr, ) print( Fore.YELLOW + "Do these records refer to the same thing?" + Style.RESET_ALL, file=sys.stderr, ) valid_response = False user_input = "" while not valid_response: if examples_buffer: prompt = "(y)es / (n)o / (u)nsure / (f)inished / (p)revious" valid_responses = {"y", "n", "u", "f", "p"} else: prompt = "(y)es / (n)o / (u)nsure / (f)inished" valid_responses = {"y", "n", "u", "f"} print(Fore.YELLOW + prompt + Style.RESET_ALL, file=sys.stderr) user_input = input() if user_input in valid_responses: valid_response = True if user_input == "y": examples_buffer.insert(0, (record_pair, "match")) mapped_items.add(record_pair[0]["uri"]) mapped_items.add(record_pair[1]["uri"]) confirmed_matches.append((record_pair[0]["uri"], record_pair[1]["uri"])) # deduper.mark_pairs({'match': record_pair}) elif user_input == "n": examples_buffer.insert(0, (record_pair, "distinct")) elif user_input == "u": examples_buffer.insert(0, (record_pair, "uncertain")) elif user_input == "f": print(Fore.GREEN + "Finished labeling" + Style.RESET_ALL, file=sys.stderr) finished = True elif user_input == "p": use_previous = True uncertain_pairs.append(record_pair) if len(examples_buffer) > buffer_len: record_pair, label = examples_buffer.pop() if label in {"distinct", "match"}: examples: TrainingData examples = {"distinct": [], "match": []} examples[label].append(record_pair) # type: ignore deduper.mark_pairs(examples) for record_pair, label in examples_buffer: if label in ["distinct", "match"]: examples: TrainingData examples = {"distinct": [], "match": []} examples[label].append(record_pair) # type: ignore deduper.mark_pairs(examples) return confirmed_matches