Complex Cashier Full Source#

# Copyright (C) 2024 by the Georgia Tech Research Institute (GTRI)

# Licensed under the BSD 3-Clause License.
# See the LICENSE file in the project root for license terms.

from collections.abc import Generator
from typing import Any

import simpy as SIM

import upstage.api as UP
from upstage.task import InterruptStates
from upstage.type_help import SIMPY_GEN, TASK_GEN


class Cashier(UP.Actor):
    scan_speed = UP.State[float](
        valid_types=(float,),
        frozen=True,
    )
    time_until_break = UP.State[float](
        default=120.0,
        valid_types=(float,),
        frozen=True,
    )
    breaks_until_done = UP.State[int](default=2, valid_types=int)
    breaks_taken = UP.State[int](default=0, valid_types=int, recording=True)
    items_scanned = UP.State[int](
        default=0,
        valid_types=(int,),
        recording=True,
    )
    time_scanning = UP.LinearChangingState(
        default=0.0,
        valid_types=(float,),
    )
    messages = UP.ResourceState[UP.SelfMonitoringStore](
        default=UP.SelfMonitoringStore,
    )

    def time_left_to_break(self) -> float:
        elapsed = self.env.now - float(self.get_knowledge("start_time", must_exist=True))
        return self.time_until_break - elapsed


class CheckoutLane(UP.Actor):
    customer_queue = UP.ResourceState[UP.SelfMonitoringStore](
        default=UP.SelfMonitoringStore,
    )


class StoreBoss(UP.UpstageBase):
    def __init__(self, lanes: list[CheckoutLane]) -> None:
        self.lanes = lanes
        self._lane_map: dict[CheckoutLane, Cashier] = {}

    def get_lane(self, cashier: Cashier) -> CheckoutLane:
        possible = [lane for lane in self.lanes if lane not in self._lane_map]
        lane = self.stage.random.choice(possible)
        self._lane_map[lane] = cashier
        return lane

    def clear_lane(self, cashier: Cashier) -> None:
        to_del = [name for name, cash in self._lane_map.items() if cash is cashier]
        for name in to_del:
            del self._lane_map[name]


class CashierBreakTimer(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        yield UP.Wait(actor.time_until_break)
        actor.interrupt_network("CashierJob", cause=dict(reason="BREAK TIME"))


class InterruptibleTask(UP.Task):
    def on_interrupt(self, *, actor: Cashier, cause: dict[str, Any]) -> InterruptStates:
        # We will only interrupt with a dictionary of data
        assert isinstance(cause, dict)
        job_list: list[str]

        if cause["reason"] == "BREAK TIME":
            job_list = ["Break"]
        elif cause["reason"] == "NEW JOB":
            job_list = cause["job_list"]
        else:
            raise UP.SimulationError("Unexpected interrupt cause")

        # determine time until break
        time_left = actor.time_left_to_break()
        # if there are only five minutes left, take the break and queue the task.
        if time_left <= 5.0 and "Break" not in job_list:
            job_list = ["Break"] + job_list

        # Ignore the interrupt, unless we've marked it to know otherwise
        marker = self.get_marker() or "none"
        if marker == "on break":
            if "Break" in job_list:
                job_list.remove("Break")

        self.clear_actor_task_queue(actor)
        self.set_actor_task_queue(actor, job_list)
        if marker == "cancellable":
            return self.INTERRUPT.END
        return self.INTERRUPT.IGNORE


class GoToWork(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Go to work"""
        yield UP.Wait(15.0)


class TalkToBoss(UP.DecisionTask):
    def make_decision(self, *, actor: Cashier) -> None:
        """Zero-time task to get information."""
        boss: StoreBoss = self.stage.boss
        lane = boss.get_lane(actor)
        self.set_actor_knowledge(actor, "checkout_lane", lane, overwrite=False)
        actor.breaks_taken = 0
        self.set_actor_knowledge(actor, "start_time", self.env.now, overwrite=True)
        # Convenient spot to run the timer.
        CashierBreakTimer().run(actor=actor)


class WaitInLane(InterruptibleTask):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Wait until break time, or a customer."""
        lane: CheckoutLane = self.get_actor_knowledge(
            actor,
            "checkout_lane",
            must_exist=True,
        )
        customer_arrival = UP.Get(lane.customer_queue)

        self.set_marker(marker="cancellable")
        yield customer_arrival

        customer: int = customer_arrival.get_value()
        self.set_actor_knowledge(actor, "customer", customer, overwrite=True)


class DoCheckout(InterruptibleTask):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Do the checkout"""
        items: int = self.get_actor_knowledge(
            actor,
            "customer",
            must_exist=True,
        )
        per_item_time = actor.scan_speed / items
        actor.activate_linear_state(
            state="time_scanning",
            rate=1.0,
            task=self,
        )
        for _ in range(items):
            yield UP.Wait(per_item_time)
            actor.items_scanned += 1
        actor.deactivate_all_states(task=self)
        # assume 2 minutes to take payment
        yield UP.Wait(2.0)


class Break(UP.DecisionTask):
    def make_decision(self, *, actor: Cashier) -> None:
        """Decide what kind of break we are taking."""
        actor.breaks_taken += 1

        # we might have jobs queued
        queue = self.get_actor_task_queue(actor) or []
        if "Break" in queue:
            raise UP.SimulationError("Odd task network state")
        self.clear_actor_task_queue(actor)

        if actor.breaks_taken == actor.breaks_until_done:
            self.set_actor_task_queue(actor, ["NightBreak"])
        elif actor.breaks_taken > actor.breaks_until_done:
            raise UP.SimulationError("Too many breaks taken")
        else:
            self.set_actor_task_queue(actor, ["ShortBreak"] + queue)


class ShortBreak(InterruptibleTask):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Take a short break."""
        self.set_marker("on break")
        yield UP.Wait(15.0)
        self.set_actor_knowledge(actor, "start_time", self.env.now, overwrite=True)
        CashierBreakTimer().run(actor=actor)


class NightBreak(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Go home and rest."""
        self.clear_actor_knowledge(actor, "checkout_lane")
        self.stage.boss.clear_lane(actor)
        yield UP.Wait(60 * 12.0)


class Restock(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Restock."""
        yield UP.Wait(10.0)


task_classes = {
    "GoToWork": GoToWork,
    "TalkToBoss": TalkToBoss,
    "WaitInLane": WaitInLane,
    "DoCheckout": DoCheckout,
    "Break": Break,
    "ShortBreak": ShortBreak,
    "NightBreak": NightBreak,
    "Restock": Restock,
}

task_links = {
    "GoToWork": UP.TaskLinks(default="TalkToBoss", allowed=["TalkToBoss"]),
    "TalkToBoss": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane"]),
    "WaitInLane": UP.TaskLinks(default="DoCheckout", allowed=["DoCheckout", "Break"]),
    "DoCheckout": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane", "Break"]),
    "Break": UP.TaskLinks(default="ShortBreak", allowed=["ShortBreak", "NightBreak"]),
    "ShortBreak": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane"]),
    "NightBreak": UP.TaskLinks(default="GoToWork", allowed=["GoToWork"]),
    "Restock": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane", "Break"]),
}

cashier_task_network = UP.TaskNetworkFactory(
    name="CashierJob",
    task_classes=task_classes,
    task_links=task_links,
)


class CashierMessages(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        getter = UP.Get(actor.messages)
        yield getter
        tasks_needed: list[str] | str = getter.get_value()
        tasks_needed = [tasks_needed] if isinstance(tasks_needed, str) else tasks_needed
        actor.interrupt_network("CashierJob", cause=dict(reason="NEW JOB", job_list=tasks_needed))


cashier_message_net = UP.TaskNetworkFactory.from_single_looping("Messages", CashierMessages)


def customer_spawner(
    env: SIM.Environment,
    lanes: list[CheckoutLane],
) -> Generator[SIM.Event, None, None]:
    # sneaky way to get access to stage
    stage = lanes[0].stage
    while True:
        hrs = env.now / 60
        time_of_day = hrs // 24
        if time_of_day <= 8 or time_of_day >= 15.5:
            time_until_open = (24 - time_of_day) + 8
            yield env.timeout(time_until_open)

        lane_pick = stage.random.choice(lanes)
        number_pick = stage.random.randint(3, 17)
        yield lane_pick.customer_queue.put(number_pick)
        yield UP.Wait.from_random_uniform(5.0, 30.0).as_event()


def manager_process(boss: StoreBoss, cashiers: list[Cashier]) -> SIMPY_GEN:
    while True:
        # Use the random uniform feature, but convert the UPSTAGE event to simpy
        # because this is a simpy only process
        yield UP.Wait.from_random_uniform(30.0, 90.0).as_event()
        possible = [
            cash
            for cash in cashiers
            if getattr(cash.get_running_task("CashierJob"), "name", "") != "NightBreak"
        ]
        if not possible:
            return
        cash = boss.stage.random.choice(possible)
        yield cash.messages.put(["Restock"])


def test_cashier_example() -> None:
    with UP.EnvironmentContext(initial_time=8 * 60) as env:
        UP.add_stage_variable("time_unit", "min")
        cashier = Cashier(
            name="Bob",
            scan_speed=1.0,
            time_until_break=120.0,
            breaks_until_done=4,
            debug_log=True,
        )
        lane_1 = CheckoutLane(name="Lane 1")
        lane_2 = CheckoutLane(name="Lane 2")
        boss = StoreBoss(lanes=[lane_1, lane_2])

        UP.add_stage_variable("boss", boss)

        net = cashier_task_network.make_network()
        cashier.add_task_network(net)
        cashier.start_network_loop(net.name, "GoToWork")

        net = cashier_message_net.make_network()
        cashier.add_task_network(net)
        cashier.start_network_loop(net.name, "CashierMessages")

        customer_proc = customer_spawner(env, [lane_1, lane_2])
        _ = env.process(customer_proc)

        _ = env.process(manager_process(boss, [cashier]))

        env.run(until=20 * 60)

    for line in cashier.get_log():
        if "Interrupt" in line:
            print(line)

    print(cashier.items_scanned)


if __name__ == "__main__":
    test_cashier_example()

This file is auto-generated.