"""Operations on triples."""
from collections import defaultdict
from collections.abc import Iterable
from typing import TypeAlias
from .filters import exclude_triples
from .model import TripleType
from .. import Reference
__all__ = [
"PrefixPairStratifiedIndex",
"exclude_prefix_stratified_many_to_many",
"exclude_triples",
"get_prefix_pair_stratified_indexes",
"get_prefix_stratified_many_to_many",
"get_reference_indexes",
]
#: A doubly-nested adjacency dictionary whose first
#: keys are subject/object local unique identifier,
#: second level is the opposite side local unique
#: identifier, and values are the list of triples
AdjacencyDict = dict[str, dict[str, list[TripleType]]]
#: A pair of prefixes
PrefixPair: TypeAlias = tuple[str, str]
#: A multi-leveled nested dictionary that represents many-to-many mappings.
#: The first key is subject/object pairs, the second key is either a subject identifier or object identifier,
#: the last key is the opposite object or subject identifier, and the values are a list of mappings.
#:
#: This data structure can be used to index either forward or backwards mappings,
#: as done inside :func:`get_many_to_many`
PrefixPairStratifiedIndex: TypeAlias = dict[PrefixPair, AdjacencyDict[TripleType]]
[docs]
def exclude_prefix_stratified_many_to_many(
triples: Iterable[TripleType], *, progress: bool = False
) -> Iterable[TripleType]:
"""Exclude prefix pair-stratified many-to-many relationships.
.. warning::
This function does not consider the predicate, so if you only want to make this
operation based on specific predicate, then pre-group your triples based on
predicate.
:param triples: An iterable of triples
:param progress: Whether to show a progress bar
:returns: An iterable of triples
.. warning::
This operation fully consumes the iterator since it requires two passes
"""
triples = list(triples)
exclusion = get_prefix_stratified_many_to_many(triples)
return exclude_triples(triples, exclusion, progress=progress)
def get_prefix_stratified_many_to_many(triples: Iterable[TripleType]) -> set[TripleType]:
"""Get many-to-many relationships."""
forward, backward = get_prefix_pair_stratified_indexes(triples)
forward_sliced = get_one_to_many(forward)
backwards_sliced_flipped = flip_prefix_pair_stratified_index(get_one_to_many(backward))
rv: set[TripleType] = set()
for prefix_pair, forward_adjacency_dict in forward_sliced.items():
if backward_adjacency_dict := backwards_sliced_flipped.get(prefix_pair):
rv.update(_compare(forward_adjacency_dict, backward_adjacency_dict))
return rv
def get_prefix_pair_stratified_indexes(
triples: Iterable[TripleType],
) -> tuple[PrefixPairStratifiedIndex[TripleType], PrefixPairStratifiedIndex[TripleType]]:
"""Get a forward and backwards subject/object index.
:param triples: An iterable of triples
:returns: A pair of forward and backwards indexes, where:
- A forward many-to-many index is a triply-nested dictionary from
subject/predicate prefix pair to subject identifier to object identifier to
list of triples.
- A backward many-to-many index is a triply-nested dictionary from
subject/predicate prefix pair to object identifier to subject identifier to
list of triples.
"""
# forward index
f: _DD[TripleType] = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
# backward index
b: _DD[TripleType] = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
for t in triples:
f[t.subject.prefix, t.object.prefix][t.subject.identifier][t.object.identifier].append(t)
b[t.object.prefix, t.subject.prefix][t.object.identifier][t.subject.identifier].append(t)
return _downgrade_defaultdict(f), _downgrade_defaultdict(b)
_DD = defaultdict[PrefixPair, defaultdict[str, defaultdict[str, list[TripleType]]]]
def _downgrade_defaultdict(dd: _DD[TripleType]) -> PrefixPairStratifiedIndex[TripleType]:
return {k1: {k2: dict(v2) for k2, v2 in v1.items()} for k1, v1 in dd.items()}
def get_one_to_many(
index: PrefixPairStratifiedIndex[TripleType],
) -> PrefixPairStratifiedIndex[TripleType]:
"""Filter an index to entities in each prefix pair with a one-to-many relationship."""
rv = {}
for pair, inner in index.items():
filtered_inner = {k: v for k, v in inner.items() if len(v) > 1}
if filtered_inner:
rv[pair] = filtered_inner
return rv
def flip_prefix_pair_stratified_index(
index: PrefixPairStratifiedIndex[TripleType],
) -> PrefixPairStratifiedIndex[TripleType]:
"""Flip a one-to-many relationship index to a many-to-one relationship index."""
rv = {}
for (left, right), adjacency_dict in index.items():
flipped_adjacency_dict: defaultdict[str, dict[str, list[TripleType]]] = defaultdict(dict)
for left_id, inner_dict in adjacency_dict.items():
for right_id, triples in inner_dict.items():
flipped_adjacency_dict[right_id][left_id] = triples
rv[right, left] = {k: v for k, v in flipped_adjacency_dict.items() if len(v) > 1}
return rv
def _compare(
left_adjacency_dict: AdjacencyDict[TripleType], right_adjacency_dict: AdjacencyDict[TripleType]
) -> set[TripleType]:
rv = set()
keys = set(left_adjacency_dict.keys()) & set(right_adjacency_dict.keys())
for key in keys:
inner_keys = set(left_adjacency_dict[key]) & set(right_adjacency_dict[key])
for inner_key in inner_keys:
rv.update(left_adjacency_dict[key][inner_key])
return rv
#: A simple index from reference to references. This can
#: either be subject to objects, or object to subjects,
#: depending on the implementation.
ReferenceIndex = dict[Reference, set[Reference]]
def get_reference_indexes(triples: Iterable[TripleType]) -> tuple[ReferenceIndex, ReferenceIndex]:
"""Get simple entity indexes."""
forward = defaultdict(set)
backward = defaultdict(set)
for triple in triples:
forward[triple.subject].add(triple.object)
backward[triple.object].add(triple.subject)
return dict(forward), dict(backward)