First Simulation Full Source#

# Copyright (C) 2024 by the Georgia Tech Research Institute (GTRI)

# Licensed under the BSD 3-Clause License.
# See the LICENSE file in the project root for complete license terms and disclaimers.

from collections.abc import Generator

import simpy as SIM

import upstage.api as UP
from upstage.type_help import TASK_GEN


class Cashier(UP.Actor):
    scan_speed = UP.State[float](
        valid_types=(float,),
        frozen=True,
    )
    time_until_break = UP.State[float](
        default=120.0,
        valid_types=(float,),
        frozen=True,
    )
    breaks_until_done = UP.State[int](default=2, valid_types=int)
    breaks_taken = UP.State[int](default=0, valid_types=int, recording=True)
    items_scanned = UP.State[int](
        default=0,
        valid_types=(int,),
        recording=True,
    )
    time_scanning = UP.LinearChangingState(
        default=0.0,
        valid_types=(float,),
    )


class CheckoutLane(UP.Actor):
    customer_queue = UP.ResourceState[UP.SelfMonitoringStore](
        default=UP.SelfMonitoringStore,
    )


class StoreBoss(UP.UpstageBase):
    def __init__(self, lanes: list[CheckoutLane]) -> None:
        self.lanes = lanes
        self._lane_map: dict[CheckoutLane, Cashier] = {}

    def get_lane(self, cashier: Cashier) -> CheckoutLane:
        possible = [lane for lane in self.lanes if lane not in self._lane_map]
        lane = self.stage.random.choice(possible)
        self._lane_map[lane] = cashier
        return lane


class GoToWork(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Go to work"""
        yield UP.Wait(15.0)


class TalkToBoss(UP.DecisionTask):
    def make_decision(self, *, actor: Cashier) -> None:
        """Zero-time task to get information."""
        boss: StoreBoss = self.stage.boss
        lane = boss.get_lane(actor)
        self.set_actor_knowledge(actor, "checkout_lane", lane, overwrite=False)
        actor.breaks_taken = 0
        self.set_actor_knowledge(actor, "start_time", self.env.now)


class WaitInLane(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Wait until break time, or a customer."""
        lane: CheckoutLane = self.get_actor_knowledge(
            actor,
            "checkout_lane",
            must_exist=True,
        )
        customer_arrival = UP.Get(lane.customer_queue)

        start_time = self.get_actor_knowledge(
            actor,
            "start_time",
            must_exist=True,
        )
        break_start = start_time + actor.time_until_break
        wait_until_break = break_start - self.env.now
        if wait_until_break < 0:
            self.set_actor_task_queue(actor, ["Break"])
            return

        break_event = UP.Wait(wait_until_break)

        yield UP.Any(customer_arrival, break_event)

        if customer_arrival.is_complete():
            customer: int = customer_arrival.get_value()
            self.set_actor_knowledge(actor, "customer", customer, overwrite=True)
        else:
            customer_arrival.cancel()
            self.set_actor_task_queue(actor, ["Break"])


class DoCheckout(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Do the checkout"""
        items: int = self.get_actor_knowledge(
            actor,
            "customer",
            must_exist=True,
        )
        per_item_time = actor.scan_speed / items
        actor.activate_linear_state(
            state="time_scanning",
            rate=1.0,
            task=self,
        )
        for _ in range(items):
            yield UP.Wait(per_item_time)
            actor.items_scanned += 1
        actor.deactivate_all_states(task=self)
        # assume 2 minutes to take payment
        yield UP.Wait(2.0)


class Break(UP.DecisionTask):
    def make_decision(self, *, actor: Cashier) -> None:
        """Decide what kind of break we are taking."""
        actor.breaks_taken += 1
        if actor.breaks_taken == actor.breaks_until_done:
            self.set_actor_task_queue(actor, ["NightBreak"])
        elif actor.breaks_taken > actor.breaks_until_done:
            raise UP.SimulationError("Too many breaks taken")
        else:
            self.set_actor_task_queue(actor, ["ShortBreak"])


class ShortBreak(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Take a short break."""
        yield UP.Wait(15.0)
        self.set_actor_knowledge(actor, "start_time", self.env.now, overwrite=True)


class NightBreak(UP.Task):
    def task(self, *, actor: Cashier) -> TASK_GEN:
        """Go home and rest."""
        self.clear_actor_knowledge(actor, "checkout_lane")
        yield UP.Wait(60 * 12.0)


task_classes = {
    "GoToWork": GoToWork,
    "TalkToBoss": TalkToBoss,
    "WaitInLane": WaitInLane,
    "DoCheckout": DoCheckout,
    "Break": Break,
    "ShortBreak": ShortBreak,
    "NightBreak": NightBreak,
}

task_links = {
    "GoToWork": UP.TaskLinks(default="TalkToBoss", allowed=["TalkToBoss"]),
    "TalkToBoss": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane"]),
    "WaitInLane": UP.TaskLinks(default="DoCheckout", allowed=["DoCheckout", "Break"]),
    "DoCheckout": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane"]),
    "Break": UP.TaskLinks(default="ShortBreak", allowed=["ShortBreak", "NightBreak"]),
    "ShortBreak": UP.TaskLinks(default="WaitInLane", allowed=["WaitInLane"]),
    "NightBreak": UP.TaskLinks(default="GoToWork", allowed=["GoToWork"]),
}


cashier_task_network = UP.TaskNetworkFactory(
    name="CashierJob",
    task_classes=task_classes,
    task_links=task_links,
)


def customer_spawner(
    env: SIM.Environment,
    lanes: list[CheckoutLane],
) -> Generator[SIM.Event, None, None]:
    # sneaky way to get access to stage
    stage = lanes[0].stage
    while True:
        hrs = env.now / 60
        time_of_day = hrs // 24
        if time_of_day <= 8 or time_of_day >= 15.5:
            time_until_open = (24 - time_of_day) + 8
            yield env.timeout(time_until_open)

        lane_pick = stage.random.choice(lanes)
        number_pick = stage.random.randint(3, 17)
        yield lane_pick.customer_queue.put(number_pick)
        yield UP.Wait.from_random_uniform(5.0, 30.0).as_event()


def test_cashier_example() -> None:
    with UP.EnvironmentContext(initial_time=8 * 60) as env:
        UP.add_stage_variable("time_unit", "min")
        cashier = Cashier(
            name="Bob",
            scan_speed=1.0,
            time_until_break=120.0,
            breaks_until_done=4,
            debug_log=True,
        )
        lane_1 = CheckoutLane(name="Lane 1")
        lane_2 = CheckoutLane(name="Lane 2")
        boss = StoreBoss(lanes=[lane_1, lane_2])

        UP.add_stage_variable("boss", boss)

        net = cashier_task_network.make_network()
        cashier.add_task_network(net)
        cashier.start_network_loop(net.name, "GoToWork")

        customer_proc = customer_spawner(env, [lane_1, lane_2])
        _ = env.process(customer_proc)

        env.run(until=20 * 60)

    for line in cashier.get_log():
        print(line)


if __name__ == "__main__":
    test_cashier_example()

This file is auto-generated.