Search Framework

Search Framework

Abstract Storage

from abc import ABC, abstractmethod
from typing import Any, Iterable, Optional

class Entity:
    __slots__ = ['id']
    
    def __init__(self):
        self.id: Optional[int] = None


class Storage(ABC):
    @abstractmethod
    def save(self, record: Entity) -> int:
        pass

    @abstractmethod
    def read(self, id: int) -> Entity:
        pass

    @abstractmethod
    def update(self, id: int, record: Entity) -> Entity:
        pass

    @abstractmethod
    def delete(self, id: int) -> bool:
        pass

    @abstractmethod
    def get_all(self) -> Iterable[Entity]:
        pass


class Indexer(ABC):
    @abstractmethod
    def create_index(self, attr_name: str):
        pass

    @abstractmethod
    def update_index(self, record: Entity):
        pass

    @abstractmethod
    def remove_from_index(self, record: Entity):
        pass

    @abstractmethod
    def get_by_index(self, attr_name: str, attr_value: Any) -> Iterable[Entity]:
        pass

Concrete Storage Implementation

import itertools
from threading import RLock
from typing import Any, Iterable, Optional
from weakref import WeakValueDictionary

from search.persistence.storage import Indexer, Storage, Entity

class InMemoryIndexer(Indexer):
    def __init__(self, *attr_names: str):
        self._indexes = {}  # A dictionary to store attribute-based indexes
        self._lock = RLock()

        for attr_name in attr_names:
            self.create_index(attr_name)

    def create_index(self, attr_name: str):
        """Create an index for a given attribute name if it doesn't exist."""
        with self._lock:
            if attr_name not in self._indexes:
                self._indexes[attr_name] = {}

    def update_index(self, record: Entity):
        """Update the indexes when a record is added or updated."""
        with self._lock:
            for attr_name, index in self._indexes.items():
                attr_value = getattr(record, attr_name, None)
                if attr_value is not None:
                    if attr_value not in index:
                        index[attr_value] = set()
                    index[attr_value].add(record.id)

    def remove_from_index(self, record: Entity):
        """Remove a record from indexes when it is deleted."""
        with self._lock:
            for attr_name, index in self._indexes.items():
                attr_value = getattr(record, attr_name, None)
                if attr_value in index:
                    index[attr_value].discard(record.id)

    def get_by_index(self, attr_name: str, attr_value: Any) -> Iterable[Entity]:
        """Retrieve records efficiently by using an index."""
        with self._lock:
            if attr_name not in self._indexes or attr_value not in self._indexes[attr_name]:
                return iter([])  # Return an empty Iterable if the attribute is not indexed or not found
            return self._indexes[attr_name][attr_value]


class InMemoryStorage(Storage):
    def __init__(self):
        self._container = WeakValueDictionary()
        self._lock = RLock()
        self._id_generator = itertools.count(start=1) 

    def save(self, record: Entity) -> int:
        with self._lock:
            record.id = next(self._id_generator)
            self._container[record.id] = record

        return record.id
    
    def read(self, id: int) -> Optional[Entity]:
        with self._lock:
            if id not in self._container:
                raise ValueError(f"Record with id {id} not found")
            return self._container[id]
    
    def update(self, id: int, record: Entity) -> Entity:
        with self._lock:
            if id not in self._container:
                raise ValueError(f"Cannot update. Record with id {id} not found")
            self._container[id] = record
            return record
    
    def delete(self, id: int) -> bool:
        with self._lock:
            return self._container.pop(id, None) is not None
    
    def get_all(self) -> Iterable[Entity]:
        with self._lock:
            return iter(self._container.values())


class StorageWithIndex(InMemoryStorage, InMemoryIndexer):
    def __init__(self, *attr_names: str):
        for attr_name in attr_names:
            self.create_index(attr_name)

    def save(self, record: Entity) -> int:
        record_id = super().save(record)
        self.update_index(record)
        return record_id

    def update(self, id: int, record: Entity) -> bool:
        success = super().update(id, record)
        if success:
            self.update_index(record)
        return success

    def delete(self, id: int) -> bool:
        try:
            record = super().read(id)
            if not super().delete(id):
                return False
            self.remove_from_index(record)
        except ValueError:
            return False
        return True

Query Utils

from abc import ABC, abstractmethod
from typing import Any
from enum import Enum, auto

from search.persistence.storage import Entity

class Operator(Enum):
    EQ = auto()
    GT = auto()
    GTE = auto()
    LT = auto()
    LTE = auto()

    def apply(self, op1: Any, op2: Any) -> bool:
        match self:
            case Operator.EQ:
                return op1 == op2
            case Operator.GT:
                return op1 > op2
            case Operator.GTE:
                return op1 >= op2
            case Operator.LT:
                return op1 < op2
            case Operator.LTE:
                return op1 <= op2
            case _:
                raise ValueError(f"Unsupported operator: {self}")


class Query(ABC):
    @abstractmethod
    def evaluate(self, record: Entity) -> bool:
        pass


class Predicate(Query):
    def __init__(self, name: str, value: Any, operator: Operator):
        self.attr_name = name
        self.attr_value = value
        self.operator = operator
    
    def evaluate(self, record: Entity) -> bool:
        record_value = getattr(record, self.attr_name, None)
        if record_value is None:
            return False
        return self.operator.apply(record_value, self.attr_value)


class CompositeQuery(Query):
    def __init__(self, *predicates: Predicate):
        self.predicates = predicates


class And(CompositeQuery):
    def evaluate(self, record: Entity) -> bool:
        return all(predicate.evaluate(record) for predicate in self.predicates)


class Or(CompositeQuery):
    def evaluate(self, record: Entity) -> bool:
        return any(predicate.evaluate(record) for predicate in self.predicates)

Search Engine

import itertools
from typing import Any, Callable, Iterable
from search.persistence.storage import Entity, Storage
from search.query.criteria import Query


class Paginator:
    def __init__(self, records: Iterable[Entity], page_size: int):
        self.records = records
        self.page_size = page_size

    def get_page(self, page_number: int) -> Iterable[Entity]:
        start = (page_number - 1) * self.page_size
        end = start + self.page_size
        return itertools.islice(self.records, start, end)


class SearchEngine:
    def __init__(self, storage: Storage):
        self.storage = storage

    def filter(self, query: Query) -> Iterable[Entity]:
        return filter(lambda rec: query.evaluate(rec), self.storage.get_all())

    # def indexed_filter(self, attr_name: str, attr_value: Any) -> Iterable[Entity]:
    #     return self.storage.get_by_index(attr_name, attr_value)

    def sort(self, records: Iterable[Entity], key: Callable[[Entity], Any], reverse: bool = False) -> Iterable[Entity]:
        return sorted(records, key=key, reverse=reverse)

    def page(self, records: Iterable[Entity], page_size: int, page_no: int) -> Iterable[Entity]:
        paginator = Paginator(records, page_size)
        return paginator.get_page(page_no)