Source code for ensemble.logic.publisher

"""
Publisher 
===========
This module dedicates a generic object to generate a publisher pattern implementation responsible of specific methods.
"""

# ============================================================================
# STANDARD  IMPORTS
# ============================================================================

from ensemble.metaclass.subject import AbsSubject

# ============================================================================
# CLASS AND DEFINITIONS
# ============================================================================


[docs]class Publisher(AbsSubject): """This generic class model implements a general publisher pattern to broadcast information towards different subscribers. Subscribers are intented to be objects such as vehicles or other objects that should be aware of the publisher. In particular this creates a subject that can notify to a specific channel where subscribers are registered. Example: Create a DataQuery for 2 type of channels, ``automated`` and ``regular`` and perform a subscription :: >>> channels = ('auto','regular') >>> p = Publisher(channels) >>> s = Subscriber(p,'auto') # Registers a s into p """ def __init__(self, channels=("default",)): # maps event names to subscribers # str -> dict # self._channels = {channel: {} for channel in channels} self._channels = {channel: set() for channel in channels} def __repr__(self): return f"{self.__class__.__name__}({self.channels})" @property def channels(self): return tuple(self._channels.keys())
[docs] def get_subscribers(self, channel): """Retreive subscribers in a particular channel""" return self._channels[channel]
[docs] def attach(self, observer, channel: str, callback=None): """Attach a new observer to a specific channel,, one can specify a method of the class to be called. Args: channel(str): channel name observer(observer): observer object callback(callable): method to be executed when publisher notifies. """ self._channels[channel].add(observer)
# Dictionary implementation has an issue # Old alternative: Not working since dictionary keys are immutable # if callback == None: # callback = getattr(observer, "update") # self.get_subscribers(channel)[observer] = callback
[docs] def detach(self, observer, channel: str): """Detach observer from the subject Args: channel(str): channel name observer(observer): observer object callback(callable): method to be executed when publisher notifies. """ # del self.get_subscribers(channel)[observer] self.get_subscribers(channel).remove(observer)
[docs] def dispatch(self, channel: str = "default", callback: str = "update"): """Dispatches a message to a specific channel Args: channel(str): channel name """ for obj in self.get_subscribers(channel): getattr(obj, callback)() # Every object needs an update method
# for _, callback in self.get_subscribers(channel).items(): # callback()
[docs] def foo(self): """ Demo function""" print("Hello world")