Task Nucleus#

UPSTAGE provides a more advanced signal-passing interface between tasks and actors with the Nucleus.

A Nucleus can be attached to an Actor, and States linked to the Nucleus. When one of those states changes, an interrupt will be sent to the relevant task networks.

The basic syntax is this:

# Create the nucleus object, attached to an actor
nuc = UP.TaskNetworkNucleus(actor=actor)
# From some task network:
task_net = task_net_factory.make_network()
# Add it to the actor
actor.add_task_network(task_net)
# Tell the nucleus object that this network changes if
# the state names given change
nuc.add_network(task_net, ["state name to watch", "other state"])

When any state given to the nucleus changes, nucleus pushes an interrupt to the task network. That interrupt is passed down as a cause to on_interrupt as an instance of type NucleusInterrupt.

class SomeTask(UP.Task):
    def task(...):
        ...

    def on_interrupt(self, *, actor, cause):
        if isinstance(cause, UP.NucleusInterrupt):
            state_that_changed: str = cause.state_name
            state_value: Any = cause.state_value

From there, you can decide how to handle the interrupt in the usual manner.

Here is a complete example showing the interaction. The actor has some number that defines the time it takes to change the number. Once the number changes, the nucleus interrupts the other task network, and the restarting task increments the results state.

 1class NumberHolder(UP.Actor):
 2    number = UP.State()
 3    results = UP.State(default=0)
 4
 5
 6class NumberChanger(UP.Task):
 7    def task(self, *, actor):
 8        yield UP.Wait(actor.number)
 9        print(f"{self.env.now:.2f}: About to change number")
10        actor.number += 1
11
12
13class InterruptedByNucleus(UP.Task):
14    def task(self, *, actor):
15        # Yielding an event makes the task halt forever EXCEPT
16        # when interrupted.
17        yield UP.Event()
18
19    def on_interrupt(self, *, actor, cause):
20        print(f"{self.env.now:.2f}: Interrupted with cause: {cause}")
21        actor.results += 1
22        return self.INTERRUPT.RESTART
23
24
25fact = UP.TaskNetworkFactory(
26    "changer",
27    {"Runner": NumberChanger},
28    {"Runner": {"default": "Runner", "allowed": ["Runner"]}},
29)
30
31fact2 = UP.TaskNetworkFactory(
32    "interrupted",
33    {"Side": InterruptedByNucleus},
34    {"Side": {"default": "Side", "allowed": ["Side"]}},
35)
36
37
38with UP.EnvironmentContext() as env:
39    actor = NumberHolder(
40        name="example",
41        number=3,
42        results=0,
43    )
44    nuc = UP.TaskNetworkNucleus(actor=actor)
45
46    task_net = fact.make_network()
47    actor.add_task_network(task_net)
48
49    task_net_2 = fact2.make_network()
50    actor.add_task_network(task_net_2)
51    nuc.add_network(task_net_2, ["number"])
52
53    actor.start_network_loop("changer", init_task_name="Runner")
54    actor.start_network_loop("interrupted", init_task_name="Side")
55
56    env.run(until=25)
57    print(f"Number of nucleus interrupts: {actor.results}")
58
59>>> 3.00: About to change number
60>>> 3.00: Interrupted with cause: NucleusInterrupt: number 4
61>>> 7.00: About to change number
62>>> 7.00: Interrupted with cause: NucleusInterrupt: number 5
63>>> 12.00: About to change number
64>>> 12.00: Interrupted with cause: NucleusInterrupt: number 6
65>>> 18.00: About to change number
66>>> 18.00: Interrupted with cause: NucleusInterrupt: number 7
67>>> Number of nucleus interrupts: 4

To note:

  • Line 51: The nucelus is watching for number to change the task network that has the InterruptedByNucleus task.

  • Line 57: Results will increment every time the interrupt runs.

Nucleus and Rehearsal#

Nucleus state watchers are not transferred when an Actor is cloned for rehearsal. When a rehearsing actor has a state change, it does not affect any other networks or the original Actor. Rehearsal only works on a single task network anyway, and so a nucleus rehearsal wouldn’t make sense at this point in UPSTAGE’s development.

State Sharing with Nucleus#

A use case for Nucleus is when multiple task networks are sharing a single state and modifying their processing based on that state. This has one key difficulty, which is that a task cannot interrept itself. If a TaskNetwork changes a state that it is watching, SimPy will fail. It

What follows is an example that implements a nucleus allocation. This is not recommended, but is included to demonstrate how far you can stretch Nucleus and UPSTAGE. Ultimately, it is just running on SimPy and you can do what you like. Here are some issues/caveats with the following example:

  • None of the tasks are rehearsal-safe (this is OK if you’re not going to rehearse)

  • Adding nucleus variables/networks within the network that uses them buries the interaction and increases the risk of bugs.

    • It’s preferable to define all Nucleus interactions near Actor instantiation for readability

    • In the future, it’d be better to have deeper conditions/information in the nucleus.

  • Using a DecisionTask helps avoid an if statement in the CPUProcess task to add the network to the nucleus

  • The business logic of the task is overpowered by assistance code, which UPSTAGE tries to avoid as much as possible.

# Copyright (C) 2024 by the Georgia Tech Research Institute (GTRI)

# Licensed under the BSD 3-Clause License.
# See the LICENSE file in the project root for complete license terms and disclaimers.

import simpy as SIM

import upstage.api as UP
from upstage.type_help import SIMPY_GEN, TASK_GEN


class CPU(UP.Actor):
    n_procs = UP.State[int](default=0, valid_types=int, recording=True)
    jobs = UP.ResourceState[SIM.Store](default=SIM.Store)

    @staticmethod
    def time_from_data(process_data: dict[str, float]) -> float:
        best_time = process_data["best time"]
        percent_complete = process_data["percent complete"]
        alloc = process_data["allocated"]
        work_left = best_time * (1 - percent_complete)
        time = work_left / alloc
        return time

    @staticmethod
    def left_from_partial(
        process_data: dict[str, float], start_time: float, curr_time: float
    ) -> float:
        alloc = process_data["allocated"]
        work_left = process_data["best time"] * (1 - process_data["percent complete"])
        elapsed = curr_time - start_time
        percent_of_left = 1 - (elapsed / (work_left / alloc))
        percent_left = (1 - process_data["percent complete"]) * percent_of_left
        return percent_left


class CPUProcessStart(UP.DecisionTask):
    def get_name(self) -> str:
        return f"{self._network_name}"

    def make_decision(self, *, actor: CPU) -> None:
        knowledge_name = self.get_name()
        process_data: dict[str, float] = self.get_actor_knowledge(
            actor, knowledge_name, must_exist=True
        )
        # the task takes some amount of time to finish based on its cpu amount
        assert process_data["percent complete"] == 0.0
        # Touch the nucleus variable before it affects this network
        actor.n_procs += 1
        # Now we can add ourselves to the nucleus
        nucleus = actor.get_nucleus()
        nucleus.add_network(knowledge_name, ["n_procs"])


class CPUProcess(UP.Task):
    def get_name(self) -> str:
        return f"{self._network_name}"

    def task(self, *, actor: CPU) -> TASK_GEN:
        knowledge_name = self.get_name()
        process_data: dict[str, float] = self.get_actor_knowledge(
            actor, knowledge_name, must_exist=True
        )
        # We know at this point we're part of the n_procs
        allocate_amount = 1 / (actor.n_procs)
        process_data["allocated"] = allocate_amount
        self.set_actor_knowledge(actor, knowledge_name, process_data, overwrite=True)
        self.set_marker("RUNNING")
        time = actor.time_from_data(process_data)
        print(
            f"{self.env.now:.2f}: Starting: {knowledge_name}\n\tAllocated: {allocate_amount:.2f}"
            f"\n\tTime Left: {time:.2f}"
        )
        yield UP.Wait(time)
        self.clear_actor_knowledge(actor, knowledge_name)
        actor.get_nucleus().remove_network(self.get_name())
        actor.n_procs -= 1
        print(f"{self.env.now:.2f}: Done with: {knowledge_name}")

    def on_interrupt(self, *, actor: CPU, cause: str | UP.NucleusInterrupt) -> UP.InterruptStates:
        if isinstance(cause, UP.NucleusInterrupt):
            assert cause.state_name == "n_procs"

            start_time = self.get_marker_time()
            assert start_time is not None
            knowledge_name = self.get_name()
            process_data: dict[str, float] = self.get_actor_knowledge(
                actor, knowledge_name, must_exist=True
            )
            perc = actor.left_from_partial(process_data, start_time, self.env.now)
            process_data["percent complete"] = perc
            self.set_actor_knowledge(actor, knowledge_name, process_data, overwrite=True)

            return self.INTERRUPT.RESTART
        raise UP.SimulationError("Unexpected interrupt state")


cpu_job_factory = UP.TaskNetworkFactory.from_ordered_terminating(
    name="SingleJob", task_classes=[CPUProcessStart, CPUProcess]
)


class CPUJobFarmer(UP.Task):
    def task(self, *, actor: CPU) -> TASK_GEN:
        job = yield UP.Get(actor.jobs)

        suggest = actor.suggest_network_name(cpu_job_factory)
        new_net = cpu_job_factory.make_network(other_name=suggest)
        actor.add_task_network(new_net)

        proc_know = {"best time": job, "percent complete": 0.0}
        self.set_actor_knowledge(actor, suggest, proc_know)
        actor.start_network_loop(suggest, init_task_name="CPUProcessStart")


cpu_farmer_factory = UP.TaskNetworkFactory.from_single_looping(
    name="Dispatch", task_class=CPUJobFarmer
)


def test_nucleus_sharing() -> None:
    job_time_list = [100.0, 10.0, 15.0, 13.0, 5.0, 25.0]
    job_start_delay = [0.0, 3.0, 5.0, 10.0, 10.0, 3.0]

    with UP.EnvironmentContext() as env:
        cpu = CPU(
            name="Magic Computer",
            n_procs=0,
        )
        _ = UP.TaskNetworkNucleus(actor=cpu)

        net = cpu_farmer_factory.make_network()
        cpu.add_task_network(net)
        cpu.start_network_loop(net.name, "CPUJobFarmer")

        def job_sender() -> SIMPY_GEN:
            for time, delay in zip(job_time_list, job_start_delay):
                yield env.timeout(delay)
                yield cpu.jobs.put(time)

        env.process(job_sender())
        env.run()
        print(env.now)