Communications#
UPSTAGE provides a built-in method for passing communications between actors. The CommsManager
class
allows actors to send messages while allowing for simplified retry attempts and timeouts. It also allows for communications
blocking to be turned on and off on a point to point basis.
The Message
class is used to describe a message, although strings and dictionaries can
also be passed as messages, and UPSTAGE will convert them into the Message
class.
The communications manager needs to be instantiated and run, and any number of them can be run, to represent different modes of communication. The following code shows how create an actor class that has two communication interfaces, and then start the necessary comms managers.
import upstage.api as UP
class Worker(UP.Actor):
walkie = UP.CommunicationStore(mode="UHF")
intercom = UP.CommunicationStore(mode="loudspeaker")
with UP.EnvironmentContext() as env:
w1 = Worker(name="worker1")
w2 = Worker(name="worker2")
uhf_comms = CommsManager(name="Walkies", mode="UHF")
loudspeaker_comms = CommsManager(name="Overhead", mode="loudspeaker")
UP.add_stage_variable("uhf", uhf_comms)
UP.add_stage_variable("loudspeaker", loudspeaker_comms)
uhf_comms.run()
loudspeaker_comms.run()
The CommsManager
class allows for explicitly connecting actors and the store that will receive messages, but using the
CommunicationStore
lets the manager auto-discover the proper store for a communications mode, letting
the simulation designer only need to pass the source actor, destination actor, and message information to the manager.
To send a message, use the comm manager’s make_put
method to return an UPSTAGE event to yield on to send the message.
class Talk(UP.Task):
def task(self, *, actor: Worker):
uhf = self.stage.uhf
friend = self.get_actor_knowledge(actor, "friend", must_exist=True)
msg_evt = uhf.make_put("Hello worker", actor, friend)
yield msg_evt
class GetMessage(UP.Task):
def task(self, *, actor: Worker):
get_uhf = UP.Get(actor.walkie)
get_loud = UP.Get(actor.loudspeaker)
yield UP.Any(get_uhf, get_loud)
if get_uhf.is_complete():
msg = get_uhf.get_value()
print(f"{msg.sender} sent '{msg.message}' at {msg.time_sent}")
else:
get_uhf.cancel()
...
Stopping Communications#
Communications can be halted for all transmissions of a single manager by setting comms_degraded
to be True
at any time.
Setting it back to False will allow comms to pass again, and any retries that are waiting (and didn’t exceed a timeout) will go through.
Additionally, specific links can be stopped by adding/removing from blocked_links
with a tuple of (sender_actor, destination_actor)
links to shut down. The same timeout rules will apply.