"""Reusable configuration."""
from __future__ import annotations
import json
from collections.abc import Callable
from pathlib import Path
from typing import Any, Literal, TypeAlias, TypeVar, overload
from pydantic import BaseModel, Field
from typing_extensions import Never, Self
from .api import (
RETURN_NONE_ERROR_TEXT,
Converter,
Reference,
ReferenceTuple,
)
__all__ = [
"BlockAction",
"BlocklistError",
"PostprocessingRewrites",
"PreprocessingBlocklists",
"PreprocessingConverter",
"PreprocessingRewrites",
"PreprocessingRules",
]
#: The action taken when the blocklist is invoked
BlockAction: TypeAlias = Literal["raise", "pass"]
X = TypeVar("X", bound=Reference)
[docs]
class PreprocessingBlocklists(BaseModel):
"""A model for prefix and full blocklists."""
full: list[str] = Field(default_factory=list)
resource_full: dict[str, list[str]] = Field(default_factory=dict)
prefix: list[str] = Field(default_factory=list)
resource_prefix: dict[str, list[str]] = Field(default_factory=dict)
suffix: list[str] = Field(default_factory=list)
def _sort(self) -> None:
self.full.sort()
self.prefix.sort()
self.suffix.sort()
for v in self.resource_full.values():
v.sort()
for v in self.resource_prefix.values():
v.sort()
[docs]
def str_has_blocked_prefix(
self, str_or_curie_or_uri: str, *, context: str | None = None
) -> bool:
"""Check if the CURIE string has a blocklisted prefix."""
if context:
prefixes: list[str] = self.resource_prefix.get(context, [])
if prefixes and any(str_or_curie_or_uri.startswith(prefix) for prefix in prefixes):
return True
return any(str_or_curie_or_uri.startswith(prefix) for prefix in self.prefix)
[docs]
def str_has_blocked_suffix(self, str_or_curie_or_uri: str) -> bool:
"""Check if the CURIE string has a blocklisted suffix."""
return any(str_or_curie_or_uri.endswith(suffix) for suffix in self.suffix)
[docs]
def str_is_blocked_full(self, str_or_curie_or_uri: str, *, context: str | None = None) -> bool:
"""Check if the full CURIE string is blocklisted."""
if context and str_or_curie_or_uri in self.resource_full.get(context, set()):
return True
return str_or_curie_or_uri in self.full
[docs]
def str_is_blocked(self, str_or_curie_or_uri: str, *, context: str | None = None) -> bool:
"""Check if the full CURIE string is blocklisted."""
return (
self.str_has_blocked_prefix(str_or_curie_or_uri, context=context)
or self.str_has_blocked_suffix(str_or_curie_or_uri)
or self.str_is_blocked_full(str_or_curie_or_uri, context=context)
)
[docs]
class PostprocessingRewrites(BaseModel):
"""A model for post-processing based on the prefix parsed."""
suffix: dict[str, list[str]] = Field(default_factory=dict)
[docs]
class PreprocessingRewrites(BaseModel):
"""A model for prefix and full rewrites."""
full: dict[str, str] = Field(
default_factory=dict, description="Global remappings for an entire string"
)
resource_full: dict[str, dict[str, str]] = Field(
default_factory=dict, description="Resource-keyed remappings for an entire string"
)
prefix: dict[str, str] = Field(
default_factory=dict, description="Global remappings of just the prefix"
)
resource_prefix: dict[str, dict[str, str]] = Field(
default_factory=dict, description="Resource-keyed remappings for just a prefix"
)
[docs]
def remap_full(
self,
str_or_curie_or_uri: str,
reference_cls: type[X],
*,
context: str | None = None,
) -> X | None:
"""Remap the string if possible otherwise return it."""
if context:
resource_rewrites: dict[str, str] = self.resource_full.get(context, {})
if resource_rewrites and str_or_curie_or_uri in resource_rewrites:
return reference_cls.from_curie(resource_rewrites[str_or_curie_or_uri])
if str_or_curie_or_uri in self.full:
return reference_cls.from_curie(self.full[str_or_curie_or_uri])
return None
[docs]
def remap_prefix(self, str_or_curie_or_uri: str, *, context: str | None = None) -> str:
"""Remap a prefix."""
if context is not None:
for old_prefix, new_prefix in self.resource_prefix.get(context, {}).items():
if str_or_curie_or_uri.startswith(old_prefix):
return new_prefix + str_or_curie_or_uri[len(old_prefix) :]
for old_prefix, new_prefix in self.prefix.items():
if str_or_curie_or_uri.startswith(old_prefix):
return new_prefix + str_or_curie_or_uri[len(old_prefix) :]
return str_or_curie_or_uri
[docs]
class PreprocessingRules(BaseModel):
"""A model for blocklists and rewrites."""
blocklists: PreprocessingBlocklists = Field(default_factory=PreprocessingBlocklists)
rewrites: PreprocessingRewrites = Field(default_factory=PreprocessingRewrites)
postprocessing: PostprocessingRewrites = Field(default_factory=PostprocessingRewrites)
[docs]
@classmethod
def lint_file(cls, path: str | Path) -> None:
"""Lint a file, in place, given a file path."""
path = Path(path).expanduser().resolve()
rules = cls.model_validate_json(path.read_text())
rules.blocklists._sort()
path.write_text(
json.dumps(
rules.model_dump(exclude_unset=True, exclude_defaults=True),
sort_keys=True,
indent=2,
)
)
[docs]
def str_is_blocked(self, str_or_curie_or_uri: str, *, context: str | None = None) -> bool:
"""Check if the CURIE string is blocked."""
return self.blocklists.str_is_blocked(str_or_curie_or_uri, context=context)
[docs]
def remap_full(
self,
str_or_curie_or_uri: str,
reference_cls: type[X],
*,
context: str | None = None,
) -> X | None:
"""Remap the string if possible otherwise return it."""
return self.rewrites.remap_full(
str_or_curie_or_uri, reference_cls=reference_cls, context=context
)
[docs]
def remap_prefix(self, str_or_curie_or_uri: str, *, context: str | None = None) -> str:
"""Remap a prefix."""
return self.rewrites.remap_prefix(str_or_curie_or_uri, context=context)
def _load_rules(rules: str | Path | PreprocessingRules) -> PreprocessingRules:
# TODO load remote?
if isinstance(rules, (str, Path)):
rules = Path(rules).expanduser().resolve()
rules = PreprocessingRules.model_validate_json(rules.read_text())
return rules
class BlocklistError(ValueError):
"""An error for block list."""
def _identity(x: str) -> str:
return x
[docs]
class PreprocessingConverter(Converter):
"""A converter with pre-processing rules."""
def __init__(
self,
*args: Any,
rules: PreprocessingRules | str | Path,
reference_cls: type[X] | None = None,
preclean: Callable[[str], str] | None = None,
**kwargs: Any,
) -> None:
"""Instantiate a converter with a ruleset for pre-processing.
:param args: Positional arguments passed to :meth:`curies.Converter.__init__`
:param rules: A set of rules
:param reference_cls: The reference class to use. Defaults to
:class:`curies.Reference`.
:param preclean: An optional function used to preprocess strings, CURIEs, and
URIs before parsing
:param kwargs: Keyword arguments passed to :meth:`curies.Converter.__init__`
"""
super().__init__(*args, **kwargs)
self.rules = _load_rules(rules)
self._reference_cls = Reference if reference_cls is None else reference_cls
self._preclean = preclean if preclean is not None else _identity
[docs]
@classmethod
def from_converter(cls, converter: Converter, rules: PreprocessingRules | str | Path) -> Self:
"""Wrap a converter with a ruleset.
:param converter: A pre-instantiated converter
:param rules: A pre-processing rules object, or path to a JSON file containing a
pre-processing configuration
:returns: A converter that uses the ruls for pre-processing when parsing URIs
and CURIEs.
"""
return cls(records=converter.records, rules=rules)
def _post_process(self, rt: ReferenceTuple | None) -> ReferenceTuple | None:
if rt is None:
return None
if rt.prefix in self.rules.postprocessing.suffix:
for s in self.rules.postprocessing.suffix[rt.prefix]:
if rt.identifier.endswith(s):
return ReferenceTuple(
prefix=rt.prefix, identifier=rt.identifier.removesuffix(s)
)
return rt
# docstr-coverage:excused `overload`
@overload
def parse(
self,
str_or_uri_or_curie: str,
*,
strict: Literal[True] = True,
context: str | None = ...,
block_action: BlockAction = ...,
) -> ReferenceTuple: ...
# docstr-coverage:excused `overload`
@overload
def parse(
self,
str_or_uri_or_curie: str,
*,
strict: Literal[False] = False,
context: str | None = ...,
block_action: BlockAction = ...,
) -> ReferenceTuple | None: ...
[docs]
def parse(
self,
str_or_uri_or_curie: str,
*,
strict: bool = False,
context: str | None = None,
block_action: BlockAction = "raise",
) -> ReferenceTuple | None:
"""Parse a string, CURIE, or URI."""
str_or_uri_or_curie = self._preclean(str_or_uri_or_curie)
if r1 := self.rules.remap_full(
str_or_uri_or_curie, reference_cls=self._reference_cls, context=context
):
return r1.pair
# Remap node's prefix (if necessary)
str_or_uri_or_curie = self.rules.remap_prefix(str_or_uri_or_curie, context=context)
if self.rules.str_is_blocked(str_or_uri_or_curie, context=context):
if block_action == "raise":
raise BlocklistError
else:
return None
rv = super().parse(str_or_uri_or_curie, strict=strict) # type:ignore[call-overload]
return self._post_process(rv)
# docstr-coverage:excused `overload`
@overload
def parse_curie(
self,
curie: str,
*,
strict: Literal[False] = False,
context: str | None = ...,
block_action: BlockAction = ...,
) -> ReferenceTuple | None: ...
# docstr-coverage:excused `overload`
@overload
def parse_curie(
self,
curie: str,
*,
strict: Literal[True] = True,
context: str | None = ...,
block_action: BlockAction = ...,
) -> ReferenceTuple: ...
[docs]
def parse_curie(
self,
curie: str,
*,
strict: bool = False,
context: str | None = None,
block_action: BlockAction = "raise",
) -> ReferenceTuple | None:
"""Parse and standardize a CURIE.
:param curie: The CURIE to parse and standardize
:param strict: If the CURIE can't be parsed, should an error be thrown? Defaults
to false.
:param context: Is there a context, e.g., an ontology prefix that should be
applied to the remapping and blocklist rules?
:param block_action: What action should be taken when the blocklist is invoked?
- **raise** - raise an exception
- **pass** - return ``None``
:returns: A tuple representing a parsed and standardized CURIE
:raises BlocklistError: If the CURIE is blocked
"""
curie = self._preclean(curie)
if r1 := self.rules.remap_full(curie, reference_cls=self._reference_cls, context=context):
return r1.pair
# Remap node's prefix (if necessary)
curie = self.rules.remap_prefix(curie, context=context)
if self.rules.str_is_blocked(curie, context=context):
if block_action == "raise":
raise BlocklistError
else:
return None
rv = super().parse_curie(curie, strict=strict) # type:ignore[call-overload]
return self._post_process(rv)
# docstr-coverage:excused `overload`
@overload
def parse_uri(
self,
uri: str,
*,
strict: Literal[False] = ...,
return_none: Literal[False] = ...,
context: str | None = ...,
block_action: BlockAction = ...,
) -> Never: ...
# docstr-coverage:excused `overload`
@overload
def parse_uri(
self,
uri: str,
*,
strict: Literal[False] = ...,
return_none: Literal[True] | None = ...,
context: str | None = ...,
block_action: BlockAction = ...,
) -> ReferenceTuple | None: ...
# docstr-coverage:excused `overload`
@overload
def parse_uri(
self,
uri: str,
*,
strict: Literal[True] = True,
return_none: bool | None = ...,
context: str | None = ...,
block_action: BlockAction = ...,
) -> ReferenceTuple: ...
[docs]
def parse_uri(
self,
uri: str,
*,
strict: bool = False,
return_none: bool | None = None,
context: str | None = None,
block_action: BlockAction = "raise",
) -> ReferenceTuple | None:
"""Parse and standardize a URI.
:param uri: The URI to parse and standardize
:param strict: If the URI can't be parsed, should an error be thrown? Defaults
to false.
:param return_none: A dummy value, do not use. If given as False, will raise a
not implemented error
:param context: Is there a context, e.g., an ontology prefix that should be
applied to the remapping and blocklist rules?
:param block_action: What action should be taken when the blocklist is invoked?
- **raise** - raise an exception
- **pass** - return ``None``
:returns: A tuple representing a parsed and standardized URI
:raises BlocklistError: If the URI is blocked
:raises NotImplementedError: If return_none is given as False
"""
if return_none is False:
raise NotImplementedError(RETURN_NONE_ERROR_TEXT)
uri = self._preclean(uri)
if r1 := self.rules.remap_full(uri, reference_cls=self._reference_cls, context=context):
return r1.pair
# Remap node's prefix (if necessary)
uri = self.rules.remap_prefix(uri, context=context)
if self.rules.str_is_blocked(uri, context=context):
if block_action == "raise":
raise BlocklistError
else:
return None
rv: ReferenceTuple | None = super().parse_uri(uri, strict=strict) # type:ignore[call-overload]
return self._post_process(rv)