Task Interrupts#

Task interruption handling is one of UPSTAGE’s convenience features for wrapping SimPy. It allows you to use interruptions to modify the task network without having to write exceptions over all the yield statements.

Motivation#

For reference, here is how you might handle interruptions in a process with two timeouts:

import simpy as SIM

def env_print(env, message: str) -> None:
    msg = f"{env.now:.2f} :: {message}"
    print(msg)

def get_and_work(env: SIM.Environment, store: SIM.Store):
    get_event = store.get()
    try:
        item = yield get_event
        env_print(env, f"got the item: '{item}'")
    except SIM.Interrupt as interrupt:
        # No matter what, cancel the get and leave
        get_event.cancel()
        env_print(env, f"Interrupted in the get wait. Cause: {interrupt.cause}")
        return

    time_to_work = 2.0 # This could be a function of your item
    end_time = env.now + time_to_work
    while True:
        try:
            yield env.timeout(time_to_work)
            env_print(env, "Finished work")
            break
        except SIM.Interrupt as interrupt:
            env_print(env, f"Interrupted in the work wait with cause: {interrupt.cause}")
            if interrupt.cause == "CANCEL GET":
                # do nothing, the cancel is too late
                # but get ready to loop again!
                time_to_work = end_time - env.now
            elif interrupt.cause == "CANCEL WORK":
                return

def putter(env, store):
    yield env.timeout(1.0)
    yield store.put("THING")

The work in that process is very simple, but the cancelling and code required to handle interrupts that we may not care about complicates it greatly. In general, we prefer a task/process to explain exactly what it’s trying to do, and we can handle interrupts separately without clouding the business logic of the task.

In addition, if interrupts are sent by another process, we don’t want the other process to have to know about introspecting the actor or the task network to know if it can/should do the interrupt. It’s preferable to let the interrupting process ask for an interrupt (with some data) and let the actor/task decide if it’s a good idea or not. Finally, if the simulation builder does not remember to always cancel events (especially get and put), then you may end up with a get request that takes from a store without going anywhere.

Here’s how those interrupts would look in SimPy:

# Interrupt in the get wait
env = SIM.Environment()
store = SIM.Store(env)

proc1 = env.process(putter(env, store))
proc2 = env.process(get_and_work(env, store))

env.run(until=0.5)
proc2.interrupt(cause="outer stop")
env.run()
env_print(env, "DONE")
env_print(env, f"Items in store: {store.items}")
>>> 0.50 :: Interrupted in the get wait. Cause: outer stop
>>> 1.00 :: DONE
>>> 1.00 :: Items in store: ['THING']

# Interrupt in the work wait with an ignorable cause
env = SIM.Environment()
store = SIM.Store(env)

proc1 = env.process(putter(env, store))
proc2 = env.process(get_and_work(env, store))

env.run(until=1.5)
proc2.interrupt(cause="CANCEL GET")
env.run()
env_print(env, "DONE")
>>> 1.00 :: got the item: 'THING'
>>> 1.50 :: Interrupted in the work wait with cause: CANCEL GET
>>> 3.00 :: Finished work
>>> 3.00 :: DONE

# Interrupt in the work wait with a real cause
env = SIM.Environment()
store = SIM.Store(env)

proc1 = env.process(putter(env, store))
proc2 = env.process(get_and_work(env, store))

env.run(until=1.5)
proc2.interrupt(cause="CANCEL WORK")
env.run()
env_print(env, "DONE")
>>> 1.00 :: got the item: 'THING'
>>> 1.50 :: Interrupted in the work wait with cause: CANCEL WORK
>>> 3.00 :: DONE

If you interrupt without an approved cause, and miss a final else (like in this example), you’d finish the work at time 3.5.

Very critically, if you missed putting the get_event.cancel() line, SimPy would still process the get_event and take the item from the store. This would effectively remove it from the simulation.

UPSTAGE Interrupts#

UPSTAGE’s interrupt handling system mitigates these key sources of error or frustration:

  1. Forgetting to cancel get and put events in an interrupt

  2. Make the main task more readable about what it’s doing.

  3. Simplifies interrupt causes and conditions.

  4. Forgetting to stop some calculation of a state

To access these features, do the following:

  1. Implement on_interrupt in the Task class.

  2. Optionally: use the marker features in the task and interrupt methods.

We’ll start simple, then add complexity to the interrupt.

Here’s what the above process would look like as an UPSTAGE Task:

 1import upstage.api as UP
 2import simpy as SIM
 3from typing import Any
 4
 5def env_print(env, message: str) -> None:
 6    msg = f"{env.now:.2f} :: {message}"
 7    print(msg)
 8
 9
10def putter(env, store):
11    yield env.timeout(1.0)
12    yield store.put("THING")
13
14
15class DoWork(UP.Task):
16    def task(self, *, actor: UP.Actor):
17        self.set_marker("getting")
18        item = yield UP.Get(actor.stage.store)
19        env_print(self.env, f"got the item: '{item}'")
20
21        self.set_marker("working")
22        yield UP.Wait(2.0)
23        env_print(env, "Finished work")
24
25    def on_interrupt(self, *, actor: UP.Actor, cause: Any) -> UP.InterruptStates:
26        marker = self.get_marker()
27        env_print(self.env, f"INTERRUPT:\n\tGot cause: '{cause}'\n\tDuring marker: '{marker}'")
28        return self.INTERRUPT.END

Then, when you run it:

with UP.EnvironmentContext() as env:
    actor = UP.Actor(name="worker")
    store = SIM.Store(env)
    UP.add_stage_variable("store", store)

    task = DoWork()
    proc = task.run(actor=actor)

    proc1 = env.process(putter(env, store))

    env.run(until=0.5)
    proc.interrupt(cause="because")
    env.run()
    env_print(env, f"Items in store: {store.items}")

>>> 0.50 :: INTERRUPT:
    >>> Got cause: 'because'
    >>> During marker: 'getting'
>>> 1.00 :: Items in store: ['THING']

Now the task is small and informative about what it’s supposed to do when its not interrupted. The marker features let us set and get introspection data cleanly.

Notice also that the Get() call does not need to be cancelled by the user; UPSTAGE does that for us (for all BaseEvent subclasses that implement cancel).

Some additional details:

  • Line 25: The on_interrupt method will pass in the actor and the interrupt cause only.

  • Line 21: If we didn’t do: self.set_marker("working") here, the Task would still think it was marked as "getting". Yields do not clear marks.

    • You can use clear_marker to clear it, and return to a default behavior if you like.

  • Line 26: If no marker is set, the get_marker method will return None

  • Line 28: More on INTERRUPT below.

INTERRUPT Types and Setting Markers#

Interrupts allow 4 different outcomes to the task, which are signalled by the InterruptStates Enum (or INTERRUPT as part of self). The first three can be returned from on_interrupt to define how to handle the interrupt.

  1. END: Ends the task right there (and moves on in the task network). This cancels the pending event(s).

  2. IGNORE: When returned, keeps the task moving along as if the interrupt didn’t happen

  3. RESTART: Starts the task back over. This cancels the pending event(s).

UPSTAGE Tasks work by being the process that SimPy sees and managing the internal task() loop as its own generator, passing SimPy events to the event handler as needed. By default, it assumes you want to END the task on an interrupt. This is assumed when no on_interrupt is defined.

Markers allow some flexibility in handling interrupts. If you do not define an on_interrupt, then you can use self.set_marker(marker, self.INTERRUPT.IGNORE) to ignore interrupts while that marker is active.

If you implement on_interrupt, then the marker’s interrupt value is ignored.

Advanced Interrupts and Marking#

Let’s return to our example, and add more complicated interrupt handling, including with an active state on the actor.

 1class Worker(UP.Actor):
 2    time_worked: float = UP.LinearChangingState(default=0.0)
 3
 4class DoWork(UP.Task):
 5    def task(self, *, actor: Worker):
 6        self.set_marker("getting")
 7        actor.activate_linear_state(state="time_worked", rate=1.0, task=self)
 8        item = yield UP.Get(actor.stage.store)
 9        env_print(self.env, f"got the item: '{item}'")
10
11        self.set_marker("working")
12        yield UP.Wait(2.0)
13        actor.deactivate_all_states(task=self)
14        env_print(env, "Finished work")
15
16    def on_interrupt(self, *, actor: Worker, cause: Any) -> UP.InterruptStates:
17        marker = self.get_marker()
18        marker_time = self.get_marker_time()
19        env_print(self.env, f"INTERRUPT:\n\tGot cause: '{cause}'\n\tDuring marker: '{marker}'\nWhich started at: {marker_time}")
20
21        if marker == "getting":
22            if cause == "CANCEL GET":
23                time_passed = self.env.now - marker_time
24                # Allow some leeway that we won't cancel the wait if it's been long enough
25                if time_passed > 0.9:
26                    return self.INTERRUPT.IGNORE
27                else:
28                    return self.INTERRUPT.END
29            else:
30                return self.INTERRUPT.IGNORE
31        elif marker == "working":
32            if cause == "CANCEL WORK":
33                return self.INTERRUPT.END
34            else:
35                return self.INTERRUPT.IGNORE
36        # A return of None will cause an error, which might be what we want to know.

The new features are:

  • Line 13: Get the time we set the marker

  • Line 18: Use the marker time to determine how we want to interrupt

  • Line 31: Remind ourselves that returning None throws an exception

With these features we now have separated the logic of a successful task from one that is interrupted. It also allows more structure and streamlining of interrupt actions.

Here’s an example where the automatic cancelling of an active state is also shown:

with UP.EnvironmentContext() as env:
    actor = Worker(name="worker")
    store = SIM.Store(env)
    UP.add_stage_variable("store", store)

    task = DoWork()
    proc = task.run(actor=actor)

    proc1 = env.process(putter(env, store))

    env.run(until=1.75)
    proc.interrupt(cause="CANCEL WORK")
    env.run()
    env_print(env, f"Time worked: {actor.time_worked}")

>>> 1.00 :: got the item: 'THING'
>>> 1.75 :: INTERRUPT:
>>>     Got cause: 'CANCEL WORK'
>>>     During marker: 'working'
>>>     Which started at: 1.0
>>> 3.00 :: Items in store: []
>>> 3.00 :: Time worked: 1.75

The interrupt automatically deactivates all states, keeping your Actors safe from runaway state values.

Getting the Process#

If an actor is running a task network, you will need to get the current Task process to send an interrupt. Do that with the upstage.actor.Actor.get_running_tasks() method.

network_processes = actor.get_running_tasks()
task_name, task_process = network_processes[task_network_name]
task_process.interrupt(cause="Stop running")

# OR:
task_data = actor.get_running_task(task_network_name)
task_data.process.interrupt(cause="Stop running")

# OR:
actor.interrupt_network(task_network_name, cause="Stop running")

The first two methods are better to use if you need to check that the task name is the right one for interrupt. A well-defined task network should handle the interrupt anywhere, though.