Examples

The following examples are based on SimPy’s examples.

Bank Renege

Note

The following images are referenced in the example below.

_images/counter.png

Reception icons created by Freepik - Flaticon

_images/user.png

Person icons created by DinosoftLabs - Flaticon

"""
Bank renege example

Covers:

- Resources: Resource
- Condition events

Scenario:
A counter with a random service time and customers who renege. Based on the
program bank08.py from TheBank tutorial of SimPy 2. (KGM)

"""
from IPython.display import display
import random
from simplay import VisualEnvironment, VisualProcess, VisualResource, VisualGrid

RANDOM_SEED = 42
NEW_CUSTOMERS = 6  # Total number of customers
INTERVAL_CUSTOMERS = 10.0  # Generate new customers roughly every x seconds
MIN_PATIENCE = 1  # Min. customer patience
MAX_PATIENCE = 3  # Max. customer patience

free_positions = [[1, 0], [1, 1], [1, 2]]


def source(env, number, interval, counter):
    """Source generates customers randomly"""
    for i in range(number):
        c = Customer(env, 'Customer%02d' % i, counter, time_in_bank=12.0)
        env.process(c.run())
        t = random.expovariate(1.0 / interval)
        yield env.timeout(t)


class Customer(VisualProcess):
    def __init__(self, env, id, counter, time_in_bank):
        color = int(random.random() * 0xFFFFFF)
        super().__init__(env, id, visual="CUSTOMER", tint=color)
        self.counter = counter
        self.time_in_bank = time_in_bank
        self.position = free_positions.pop(0)

    def run(self):
        """Customer arrives, is served and leaves."""
        self.is_visible()
        self.is_at(self.position[0], self.position[1])
        arrive = self.env.now
        print('%7.4f %s: Here I am' % (arrive, self.id))

        with self.counter.request() as req:
            patience = random.uniform(MIN_PATIENCE, MAX_PATIENCE)
            # Wait for the counter or abort at the end of our tether
            results = yield req | env.timeout(patience)

            wait = self.env.now - arrive

            if req in results:
                # We got to the counter
                self.is_near(self.counter)
                self.is_interacting_with(self.counter)
                print('%7.4f %s: Waited %6.3f' % (self.env.now, self.id, wait))

                tib = random.expovariate(1.0 / self.time_in_bank)
                yield self.env.timeout(tib)
                self.is_invisible()
                self.is_no_longer_interacting_with(self.counter)
                free_positions.append(self.position)
                print('%7.4f %s: Finished' % (self.env.now, self.id))

            else:
                # We reneged
                print('%7.4f %s: RENEGED after %6.3f' %
                      (self.env.now, self.id, wait))
                self.is_invisible()
                self.is_no_longer_interacting_with(self.counter)
                free_positions.append(self.position)


# Setup and start the simulation
print('Bank renege')
random.seed(RANDOM_SEED)
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 2, 3)
grid.set_area("counter", "Counter", 3, 1, 0, 0, 0x707070)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
    "CUSTOMER", "./bank_assets/user.png")
env.visualization_manager.register_visual(
    "COUNTER", "./bank_assets/counter.png")

# Start processes and run


class Counter(VisualResource):
    def __init__(self, env):
        super().__init__(env, "Counter", 1, visual="COUNTER")
        self.is_at(0, 1)
        self.is_visible()


counter = Counter(env)
env.process(source(env, NEW_CUSTOMERS, INTERVAL_CUSTOMERS, counter))
env.run()

# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()

# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")

# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output

# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)

Carwash

Note

The following images are referenced in the example below.

_images/car.png

Car icons created by Freepik - Flaticon

_images/carwash.png

Carwash icons created by smashingstocks - Flaticon

"""
Carwash example.

Covers:

- Waiting for other processes
- Resources: Resource

Scenario:
  A carwash has a limited number of washing machines and defines
  a washing processes that takes some (random) time.

  Car processes arrive at the carwash at a random time. If one washing
  machine is available, they start the washing process and wait for it
  to finish. If not, they wait until they an use one.

"""
import random

from simplay import VisualEnvironment, VisualProcess, VisualResource, VisualGrid


RANDOM_SEED = 42
NUM_MACHINES = 2  # Number of machines in the carwash
WASHTIME = 5      # Minutes it takes to clean a car
T_INTER = 7       # Create a car every ~7 minutes
SIM_TIME = 20     # Simulation time in minutes

class Carwash(VisualResource):
    """A carwash has a limited number of machines (``NUM_MACHINES``) to
    clean cars in parallel.

    Cars have to request one of the machines. When they got one, they
    can start the washing processes and wait for it to finish (which
    takes ``washtime`` minutes).

    """

    def __init__(self, env, num_machines, washtime):
        super().__init__(env, "Carwash", num_machines, visual="CARWASH")
        self.washtime = washtime
        self.is_at(1, 3)
        self.is_visible()

    def wash(self, car):
        """The washing processes. It takes a ``car`` processes and tries
        to clean it."""
        yield self.env.timeout(WASHTIME)
        print("Carwash removed %d%% of %s's dirt." %
              (random.randint(50, 99), car))


class Car(VisualProcess):
    def __init__(self, env, id, cw):
        color = int(random.random() * 0xFFFFFF)
        super().__init__(env, id, visual="CAR", tint=color)
        self.cw = cw

    def run(self):
        """The car process (each car has a ``name``) arrives at the carwash
        (``cw``) and requests a cleaning machine.

        It then starts the washing process, waits for it to finish and
        leaves to never come back ...

        """
        print('%s arrives at the carwash at %.2f.' % (self.id, env.now))
        self.is_visible()
        self.is_near_cell(4, 2)
        with self.cw.request() as request:
            yield request
            self.is_near(self.cw)
            self.is_interacting_with(self.cw)
            print('%s enters the carwash at %.2f.' % (self.id, env.now))
            yield env.process(self.cw.wash(self.id))

            print('%s leaves the carwash at %.2f.' % (self.id, env.now))
            self.is_invisible()
            self.is_no_longer_interacting_with(self.cw)


def setup(env, num_machines, washtime, t_inter):
    """Create a carwash, a number of initial cars and keep creating cars
    approx. every ``t_inter`` minutes."""
    # Create the carwash
    carwash = Carwash(env, num_machines, washtime)

    # Create 4 initial cars
    for i in range(4):
        env.process(Car(env, 'Car %d' % i, carwash).run())

    # Create more cars while the simulation is running
    while True:
        yield env.timeout(random.randint(t_inter - 2, t_inter + 2))
        i += 1
        env.process(Car(env, 'Car %d' % i, carwash).run())


# Setup and start the simulation
print('Carwash')
print('Check out http://youtu.be/fXXmeP9TvBg while simulating ... ;-)')
random.seed(RANDOM_SEED)  # This helps reproducing the results

# Create an environment and start the setup process
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 6, 6)
grid.set_area("Carwash", "Carwash", 6, 3, 0, 0, 0x707070)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
    "CAR", "./carwash_assets/car.png")
env.visualization_manager.register_visual(
    "CARWASH", "./carwash_assets/carwash.png")
env.process(setup(env, NUM_MACHINES, WASHTIME, T_INTER))


# Execute!
env.run(until=SIM_TIME)
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()

# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")

# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output

# if so, uncomment the following lines
#from IPython.display import display
#display(env.visualization_manager.serialize_for_jupyter(), raw=True)

Machine Shop

Note

The following images are referenced in the example below.

_images/machine.png

Machine icons created by Freepik - Flaticon

_images/repairman.png

Person icons created by DinosoftLabs - Flaticon

Note

This simulation takes a while to run.

"""
Machine shop example

Covers:

- Interrupts
- Resources: PreemptiveResource

Scenario:
  A workshop has *n* identical machines. A stream of jobs (enough to
  keep the machines busy) arrives. Each machine breaks down
  periodically. Repairs are carried out by one repairman. The repairman
  has other, less important tasks to perform, too. Broken machines
  preempt theses tasks. The repairman continues them when he is done
  with the machine repair. The workshop works continuously.

"""
import random

from simplay import VisualEnvironment, VisualProcess, VisualPreemptiveResource, VisualGrid
import simpy


RANDOM_SEED = 42
PT_MEAN = 10.0         # Avg. processing time in minutes
PT_SIGMA = 2.0         # Sigma of processing time
MTTF = 300.0           # Mean time to failure in minutes
BREAK_MEAN = 1 / MTTF  # Param. for expovariate distribution
REPAIR_TIME = 30.0     # Time it takes to repair a machine in minutes
JOB_DURATION = 30.0    # Duration of other jobs in minutes
NUM_MACHINES = 10      # Number of machines in the machine shop
WEEKS = 4              # Simulation time in weeks
SIM_TIME = WEEKS * 7 * 24 * 60  # Simulation time in minutes


def time_per_part():
    """Return actual processing time for a concrete part."""
    return random.normalvariate(PT_MEAN, PT_SIGMA)


def time_to_failure():
    """Return time until next failure for a machine."""
    return random.expovariate(BREAK_MEAN)


machine_positions = [[0, 0], [1, 0], [2, 0], [3, 0],
                     [4, 0], [5, 0], [1, 1], [2, 1], [3, 1], [4, 1]]


class Machine(VisualProcess):
    """A machine produces parts and my get broken every now and then.

    If it breaks, it requests a *repairman* and continues the production
    after the it is repaired.

    A machine has a *name* and a numberof *parts_made* thus far.

    """

    def __init__(self, env, name, repairman: VisualPreemptiveResource):
        color = int(random.random() * 0x00FFFF)
        super().__init__(env, name, "MACHINE", color)
        self.parts_made = 0
        self.broken = False
        self.repairman = repairman
        self.position = machine_positions.pop(0)
        self.is_at(self.position[0], self.position[1])
        self.is_visible()

        # Start "working" and "break_machine" processes for this machine.
        self.process = env.process(self.working())
        env.process(self.break_machine())

    def working(self):
        """Produce parts as long as the simulation runs.

        While making a part, the machine may break multiple times.
        Request a repairman when this happens.

        """
        while True:
            # Start making a new part
            done_in = time_per_part()
            while done_in:
                try:
                    # Working on the part
                    start = self.env.now
                    yield self.env.timeout(done_in)
                    done_in = 0  # Set to 0 to exit while loop.

                except simpy.Interrupt:
                    self.broken = True
                    done_in -= self.env.now - start  # How much time left?

                    # Request a repairman. This will preempt its "other_job".
                    with self.repairman.request(priority=1) as req:
                        yield req
                        self.repairman.is_near(self)
                        self.repairman.is_interacting_with(self)
                        yield self.env.timeout(REPAIR_TIME)
                        self.repairman.is_no_longer_interacting_with(self)
                        self.repairman.is_at(
                            self.repairman.home[0], self.repairman.home[1])
                    self.broken = False
                    self.has_original_tint()

            # Part is done.
            self.parts_made += 1
            self.has_decorating_text("Parts made: " + str(self.parts_made))

    def break_machine(self):
        """Break the machine every now and then."""
        while True:
            yield self.env.timeout(time_to_failure())
            if not self.broken:
                # Only break the machine if it is currently working.
                self.process.interrupt()
                self.has_tint(0xFF0000)


def other_jobs(env, repairman):
    """The repairman's other (unimportant) job."""
    while True:
        # Start a new job
        done_in = JOB_DURATION
        while done_in:
            # Retry the job until it is done.
            # It's priority is lower than that of machine repairs.
            with repairman.request(priority=2) as req:
                yield req
                try:
                    start = env.now
                    yield env.timeout(done_in)
                    done_in = 0
                except simpy.Interrupt:
                    done_in -= env.now - start


# Setup and start the simulation
print('Machine shop')
random.seed(RANDOM_SEED)  # This helps reproducing the results

# Create an environment and start the setup process
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 6, 5)
grid.set_area("OtherJobs", "Other jobs", 3, 6, 0, 2, 0x707070)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
    "REPAIRMAN", "./machine_shop_assets/repairman.png")
env.visualization_manager.register_visual(
    "MACHINE", "./machine_shop_assets/machine.png")


class Repairman(VisualPreemptiveResource):
    def __init__(self, env, capacity):
        super().__init__(env, "Repairman", capacity, visual="REPAIRMAN")
        self.home = [2, 3]
        self.is_at(2, 3)
        self.is_visible()


repairman = Repairman(env, capacity=1)
machines = [Machine(env, 'Machine %d' % i, repairman)
            for i in range(NUM_MACHINES)]
env.process(other_jobs(env, repairman))

# Execute!
env.run(until=SIM_TIME)

# Analyis/results
print('Machine shop results after %s weeks' % WEEKS)
for machine in machines:
    print('%s made %d parts.' % (machine.id, machine.parts_made))

# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()

# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")

# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output

# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)

Movie Renege

Note

The following images are referenced in the example below.

_images/counter1.png

Reception icons created by Freepik - Flaticon

_images/customer.png

Person icons created by inkubators - Flaticon

"""
Movie renege example

Covers:

- Resources: Resource
- Condition events
- Shared events

Scenario:
  A movie theatre has one ticket counter selling tickets for three
  movies (next show only). When a movie is sold out, all people waiting
  to buy tickets for that movie renege (leave queue).

"""
import collections
import random

from simplay import VisualEnvironment, VisualGrid, VisualResource, VisualProcess


RANDOM_SEED = 42
TICKETS = 50  # Number of tickets per movie
SIM_TIME = 120  # Simulate until

row_current_x = 1
row_current_y = 0

COLS = 5
ROWS = 16

def define_next_moviegoer_position():
    global row_current_x
    global row_current_y
    if row_current_x == COLS:
        row_current_y += 1
        row_current_x = 0
    else:
        row_current_x += 1


class Moviegoer(VisualProcess):
    def __init__(self, env, id, movie, num_tickets, theater):
        color = int(random.random() * 0xFFFFFF)
        super().__init__(env, id, "CUSTOMER", color)
        self.movie = movie
        self.num_tickets = num_tickets
        self.theater = theater
        self.is_at(row_current_x, row_current_y)
        define_next_moviegoer_position()
        self.is_visible()

    def run(self):
        """A moviegoer tries to by a number of tickets (*num_tickets*) for
        a certain *movie* in a *theater*.

        If the movie becomes sold out, she leaves the theater. If she gets
        to the counter, she tries to buy a number of tickets. If not enough
        tickets are left, she argues with the teller and leaves.

        If at most one ticket is left after the moviegoer bought her
        tickets, the *sold out* event for this movie is triggered causing
        all remaining moviegoers to leave.

        """
        with self.theater.counter.request() as my_turn:
            # Wait until its our turn or until the movie is sold out
            result = yield my_turn | self.theater.sold_out[self.movie]
            self.is_near(self.theater.counter)
            self.is_interacting_with(self.theater.counter)
            # Check if it's our turn or if movie is sold out
            if my_turn not in result:
                self.theater.num_renegers[self.movie] += 1
                self.is_invisible()
                self.is_no_longer_interacting_with(self.theater.counter)
                return

            # Check if enough tickets left.
            if self.theater.available[self.movie] < self.num_tickets:
                # Moviegoer leaves after some discussion
                yield self.env.timeout(0.5)
                self.is_invisible()
                self.is_no_longer_interacting_with(self.theater.counter)
                return

            # Buy tickets
            self.theater.available[self.movie] -= self.num_tickets
            if self.theater.available[self.movie] < 2:
                # Trigger the "sold out" event for the movie
                self.theater.sold_out[self.movie].succeed()
                self.theater.when_sold_out[self.movie] = env.now
                self.theater.available[self.movie] = 0
            yield self.env.timeout(1)
            self.is_invisible()
            self.is_no_longer_interacting_with(self.theater.counter)


def customer_arrivals(env, theater):
    """Create new *moviegoers* until the sim time reaches 120."""
    while True:
        yield env.timeout(random.expovariate(1 / 0.5))

        movie = random.choice(theater.movies)
        num_tickets = random.randint(1, 6)
        if theater.available[movie]:
            env.process(Moviegoer(
                env, "Movigoer" + str(random.random()), movie, num_tickets, theater).run())


Theater = collections.namedtuple('Theater', 'counter, movies, available, '
                                            'sold_out, when_sold_out, '
                                            'num_renegers')


# Setup and start the simulation
print('Movie renege')
random.seed(RANDOM_SEED)
env = VisualEnvironment()
env.visualization_manager.register_visual('COUNTER', './movie_renege_assets/counter.png')
env.visualization_manager.register_visual('CUSTOMER', './movie_renege_assets/customer.png')
grid = VisualGrid(1000, 1000, COLS, ROWS)
env.visualization_manager.set_grid(grid)

# Create movie theater
counter = VisualResource(env, "Counter", 1, "COUNTER")
counter.is_at(0, 0)
counter.is_visible()
movies = ['Python Unchained', 'Kill Process', 'Pulp Implementation']
available = {movie: TICKETS for movie in movies}
sold_out = {movie: env.event() for movie in movies}
when_sold_out = {movie: None for movie in movies}
num_renegers = {movie: 0 for movie in movies}
theater = Theater(counter, movies, available, sold_out, when_sold_out,
                  num_renegers)

# Start process and run
env.process(customer_arrivals(env, theater))
env.run(until=SIM_TIME)

# Analysis/results
for movie in movies:
    if theater.sold_out[movie]:
        print('Movie "%s" sold out %.1f minutes after ticket counter '
              'opening.' % (movie, theater.when_sold_out[movie]))
        print('  Number of people leaving queue when film sold out: %s' %
              theater.num_renegers[movie])

# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()

# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")

# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output

# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)

Gas Station Refueling

"""
Gas Station Refueling example
Covers:
- Resources: Resource
- Resources: Container
- Waiting for other processes
Scenario:
  A gas station has a limited number of gas pumps that share a common
  fuel reservoir. Cars randomly arrive at the gas station, request one
  of the fuel pumps and start refueling from that reservoir.
  A gas station control process observes the gas station's fuel level
  and calls a tank truck for refueling if the station's level drops
  below a threshold.
"""
import itertools
import random

from simplay import (
    VisualEnvironment, VisualContainer, VisualProcess,
    VisualResource, VisualGrid
)

RANDOM_SEED = 42
GAS_STATION_SIZE = 500  # liters
THRESHOLD = 20  # Threshold for calling the tank truck (in %)
FUEL_TANK_SIZE = 50  # liters
FUEL_TANK_LEVEL = [5, 25]  # Min/max levels of fuel tanks (in liters)
REFUELING_SPEED = 2  # liters / second
TANK_TRUCK_TIME = 300  # Seconds it takes the tank truck to arrive
T_INTER = [10, 100]  # Create a car every [min, max] seconds
SIM_TIME = 10000  # Simulation time in seconds


class Car(VisualProcess):
    def __init__(self, name, env, gas_station, fuel_pump):
        color = int(random.random() * 0xFFFFFF)
        super().__init__(env, name, visual="CAR", tint=color)
        self.gas_station = gas_station
        self.fuel_pump = fuel_pump

    def run(self):
        self.is_visible()
        self.is_near_cell(2, 1)
        fuel_tank_level = random.randint(*FUEL_TANK_LEVEL)

        with self.gas_station.request() as req:
            start = self.env.now
            # Request one of the gas pumps
            yield req
            self.is_near(self.gas_station)
            self.is_interacting_with(self.gas_station)

            # Get the required amount of fuel
            liters_required = FUEL_TANK_SIZE - fuel_tank_level
            yield self.fuel_pump.get(liters_required)

            # The "actual" refueling process takes some time
            yield self.env.timeout(liters_required / REFUELING_SPEED)
            self.is_no_longer_interacting_with(self.gas_station)

        self.is_invisible()


def gas_station_control(env, fuel_pump):
    """Periodically check the level of the *fuel_pump* and call the tank
    truck if the level falls below a threshold."""
    truck = TankTruck(env, fuel_pump)
    while True:
        if fuel_pump.level / fuel_pump.capacity * 100 < THRESHOLD:
            # We need to call the tank truck now!
            # Wait for the tank truck to arrive and refuel the station
            yield env.process(truck.run())

        yield env.timeout(10)  # Check every 10 seconds


class TankTruck(VisualProcess):
    def __init__(self, env, fuel_pump):
        super().__init__(env, "Tank Truck", visual="TANK_TRUCK", tint=0xFF0000)
        self.fuel_pump = fuel_pump

    def run(self):
        self.is_visible()
        self.is_at(0, 0)
        self.is_near(self.fuel_pump)
        self.is_interacting_with(self.fuel_pump)
        yield self.env.timeout(TANK_TRUCK_TIME)
        ammount = self.fuel_pump.capacity - self.fuel_pump.level
        yield self.fuel_pump.put(ammount)
        self.is_no_longer_interacting_with(self.fuel_pump)
        self.is_invisible()


def car_generator(env, gas_station, fuel_pump):
    """Generate new cars that arrive at the gas station."""
    for i in itertools.count():
        yield env.timeout(random.randint(*T_INTER))
        car = Car("Car %d" % i, env, gas_station, fuel_pump)
        env.process(car.run())


class GasStation(VisualResource):
    def __init__(self, env):
        super().__init__(env, "Gas Station", 3, visual="GAS_STATION",
                         tint=0x00FF00)
        self.is_at(3, 1)
        self.is_visible()


class FuelPump(VisualContainer):
    def __init__(self, env):
        super().__init__(
            env,
            "Fuel Pump",
            capacity=GAS_STATION_SIZE,
            init=GAS_STATION_SIZE,
            visual="FUEL_PUMP",
            tint=0x000001,
        )
        self.is_at(1, 1)
        self.is_visible()
        self.has_frame(4)

    def update_sprite(self):
        fillPercentage = self.level / self.capacity
        if fillPercentage < 0.25:
            self.has_frame(0)
        elif fillPercentage < 0.5:
            self.has_frame(1)
        elif fillPercentage < 0.75:
            self.has_frame(2)
        elif fillPercentage < 1:
            self.has_frame(3)
        else:
            self.has_frame(4)

    def get(self, amount):
        cget = super().get(amount)
        self.update_sprite()
        return cget

    def put(self, amount):
        cput = super().put(amount)
        self.update_sprite()
        return cput


# Setup and start the simulation
random.seed(RANDOM_SEED)

# Create environment and start processes
env = VisualEnvironment()
env.visualization_manager.register_visual("CAR", "./gas_station_assets/car.png")
env.visualization_manager.register_visual(
    "TANK_TRUCK", "./gas_station_assets/truck.png")
env.visualization_manager.register_visual(
    "GAS_STATION", "./gas_station_assets/gaspump.png")
env.visualization_manager.register_sprites(
    "FUEL_PUMP",
    [
        "./gas_station_assets/pump_000.png",
        "./gas_station_assets/pump_025.png",
        "./gas_station_assets/pump_050.png",
        "./gas_station_assets/pump_075.png",
        "./gas_station_assets/pump_100.png",
    ],
)

grid = VisualGrid(500, 500, 5, 3)
grid.set_area("gasstation01", "GAS_STATION", 3, 5, 0, 0, 0xbdbbbb)
env.visualization_manager.set_grid(grid)

gas_station = GasStation(env)
fuel_pump = FuelPump(env)
env.process(gas_station_control(env, fuel_pump))
env.process(car_generator(env, gas_station, fuel_pump))

# Execute!
env.run(until=SIM_TIME)

# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()

# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")

# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output

# if so, uncomment the following lines

#from IPython.display import display
#display(env.visualization_manager.serialize_for_jupyter(), raw=True)

Process Communication

Note

The following images are referenced in the example below.

_images/generator.png

Generator icons created by Freepik - Flaticon

_images/message_consumer.png

Seo and web icons created by BizzBox - Flaticon

"""
Process communication example

Covers:

- Resources: Store

Scenario:
  This example shows how to interconnect simulation model elements
  together using :class:`~simpy.resources.store.Store` for one-to-one,
  and many-to-one asynchronous processes. For one-to-many a simple
  BroadCastPipe class is constructed from Store.

When Useful:
  When a consumer process does not always wait on a generating process
  and these processes run asynchronously. This example shows how to
  create a buffer and also tell is the consumer process was late
  yielding to the event from a generating process.

  This is also useful when some information needs to be broadcast to
  many receiving processes

  Finally, using pipes can simplify how processes are interconnected to
  each other in a simulation model.

Example By:
  Keith Smith

"""
import random

from simplay import VisualEnvironment, VisualStore, VisualGrid, VisualProcess

import simpy


RANDOM_SEED = 42
SIM_TIME = 100

output_conn_positions = [[0, 1], [2, 1]]
generator_positions = [[1, 0], [2, 0]]


class BroadcastPipe(object):
    """A Broadcast pipe that allows one process to send messages to many.

    This construct is useful when message consumers are running at
    different rates than message generators and provides an event
    buffering to the consuming processes.

    The parameters are used to create a new
    :class:`~simpy.resources.store.Store` instance each time
    :meth:`get_output_conn()` is called.

    """

    def __init__(self, env, capacity=1000):
        self.env = env
        self.capacity = capacity
        self.pipes = []

    def put(self, value):
        """Broadcast a *value* to all receivers."""
        if not self.pipes:
            raise RuntimeError('There are no output pipes.')
        events = [store.put(value) for store in self.pipes]
        return self.env.all_of(events)  # Condition event for all "events"

    def get_output_conn(self):
        """Get a new output connection for this broadcast pipe.

        The return value is a :class:`~simpy.resources.store.Store`.

        """
        position = output_conn_positions.pop(0)
        pipe = VisualStore(
            self.env, "pipe" + str(position[0]) + str(position[1]), "PIPE", capacity=self.capacity)
        pipe.is_at(position[0], position[1])
        pipe.is_visible()
        self.pipes.append(pipe)
        return pipe

    def has_tint(self, color):
        [store.has_tint(color) for store in self.pipes]


class Message_generator(VisualProcess):

    def __init__(self, env: VisualEnvironment, id: str, out_pipe: VisualStore):
        super().__init__(env, id, "GENERATOR", 0xFFFFFF)
        self.out_pipe = out_pipe
        position = generator_positions.pop(0)
        self.is_at(position[0], position[1])
        self.is_visible()

    def run(self):
        """A process which randomly generates messages."""
        while True:
            # wait for next transmission
            yield self.env.timeout(random.randint(6, 10))

            # messages are time stamped to later check if the consumer was
            # late getting them.  Note, using event.triggered to do this may
            # result in failure due to FIFO nature of simulation yields.
            # (i.e. if at the same env.now, message_generator puts a message
            # in the pipe first and then message_consumer gets from pipe,
            # the event.triggered will be True in the other order it will be
            # False
            msg = (self.env.now, '%s says hello at %d' %
                   (self.id, self.env.now))
            if hasattr(self.out_pipe, "pipes"):
                [self.is_interacting_with(store)
                 for store in self.out_pipe.pipes]
            else:
                self.is_interacting_with(self.out_pipe)
            self.out_pipe.put(msg)
            # If the out_pipe has a delay in consuming the pipe will be marked yellow
            self.out_pipe.has_tint(0xFFFF00)
            yield self.env.timeout(1)
            if hasattr(self.out_pipe, "pipes"):
                [self.is_no_longer_interacting_with(
                    store) for store in self.out_pipe.pipes]
            else:
                self.is_no_longer_interacting_with(self.out_pipe)


def message_consumer(name, env: VisualEnvironment, in_pipe: VisualStore):
    """A process which consumes messages."""
    while True:
        # Get event for message pipe
        msg = yield in_pipe.get()

        if msg[0] < env.now:
            # if message was already put into pipe, then
            # message_consumer was late getting to it. Depending on what
            # is being modeled this, may, or may not have some
            # significance
            print('LATE Getting Message: at time %d: %s received message: %s' %
                  (env.now, name, msg[1]))
            in_pipe.has_decorating_text('LATE Getting Message: at time %d: %s received message: \n %s' %
                                        (env.now, name, msg[1]))
            in_pipe.has_tint(0xFF0000)

        else:
            # message_consumer is synchronized with message_generator
            print('at time %d: %s received message: %s.' %
                  (env.now, name, msg[1]))
            in_pipe.has_decorating_text(str('at time %d: %s received message: \n %s.' %
                                            (env.now, name, msg[1])))
            in_pipe.has_tint(0x00FF00)

        # Process does some other work, which may result in missing messages
        yield env.timeout(random.randint(4, 8))


# Setup and start the simulation
print('Process communication')
random.seed(RANDOM_SEED)
env = VisualEnvironment()
env.visualization_manager.register_visual(
    "PIPE", "./process_communication_assets/message_consumer.png")
env.visualization_manager.register_visual(
    "GENERATOR", "./process_communication_assets/generator.png")
grid = VisualGrid(1000, 1000, 3, 3)
env.visualization_manager.set_grid(grid)

# For one-to-one or many-to-one type pipes, use Store
pipe = VisualStore(env, "pipe", "PIPE", capacity=1000)
pipe.is_at(1, 1)
pipe.is_visible()
env.process(Message_generator(env, 'Generator A', pipe).run())
env.process(message_consumer('Consumer A', env, pipe))
print('\nOne-to-one pipe communication\n')

# For one-to many use BroadcastPipe comment out the lines 176 to 181 and comment the lines 168 to 173
# (Note: could also be used for one-to-one,many-to-one or many-to-many)
# bc_pipe = BroadcastPipe(env)
# env.process(Message_generator(env, 'Generator A', bc_pipe).run())
# env.process(message_consumer('Consumer A', env, bc_pipe.get_output_conn()))
# env.process(message_consumer('Consumer B', env, bc_pipe.get_output_conn()))
# print('\nOne-to-many pipe communication\n')

env.run(until=SIM_TIME)

# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()

# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")

# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output

# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)

Event Latency

"""
Event Latency example

Covers:

- Resources: Store

Scenario:
  This example shows how to separate the time delay of events between
  processes from the processes themselves.

When Useful:
  When modeling physical things such as cables, RF propagation, etc.  it
  better encapsulation to keep this propagation mechanism outside of the
  sending and receiving processes.

  Can also be used to interconnect processes sending messages

Example by:
  Keith Smith

"""
import simpy
from simplay import VisualEnvironment, VisualGrid, VisualProcess, VisualStore


SIM_DURATION = 100


class Cable(VisualStore):
    """This class represents the propagation through a cable."""

    def __init__(self, env, delay):
        super().__init__(env, "Cable", "CABLE", capacity=1000)
        self.delay = delay
        self.is_at(1, 0)
        self.number_of_messages_in_cable = 0
        self.is_visible()

    def latency(self, value):
        yield self.env.timeout(self.delay)
        super().put(value)

    def put(self, value):
        self.number_of_messages_in_cable += 1
        self.has_decorating_text(
            str('number of messages in cable %d' % self.number_of_messages_in_cable))
        self.has_tint(0xFFFF00)
        self.env.process(self.latency(value))

    def get(self):
        if self.number_of_messages_in_cable > 0:
            self.number_of_messages_in_cable -= 1
        self.has_decorating_text(
            str('number of messages in cable %d' % self.number_of_messages_in_cable))
        return super().get()


class Sender(VisualProcess):
    def __init__(self, env: VisualEnvironment, cable: Cable):
        super().__init__(env, "Sender", "SENDER")
        self.cable = cable
        self.is_at(0, 0)
        self.is_visible()

    def run(self):
        """A process which randomly generates messages."""
        while True:
            # wait for next transmission
            yield self.env.timeout(4)
            self.is_interacting_with(self.cable)
            yield self.env.timeout(1)
            self.cable.put('Sender sent this at %d' % self.env.now)
            self.is_no_longer_interacting_with(self.cable)


class Receiver(VisualProcess):
    def __init__(self, env: VisualEnvironment, cable: Cable):
        super().__init__(env, "Receiver", "RECEIVER")
        self.cable = cable
        self.is_at(2, 0)
        self.is_visible()

    def run(self):
        """A process which consumes messages."""
        while True:
            # Get event for message pipe
            msg = yield self.cable.get()
            self.is_interacting_with(self.cable)
            yield self.env.timeout(1)
            print('Received this at %d while %s' % (self.env.now, msg))
            self.has_decorating_text(
                str('Received this at %d while %s' % (self.env.now, msg)))
            self.is_no_longer_interacting_with(self.cable)


# Setup and start the simulation
print('Event Latency')
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 3, 1)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
    "SENDER", "./event_latency_assets/sender.png")
env.visualization_manager.register_visual(
    "RECEIVER", "./event_latency_assets/receiver.png")
env.visualization_manager.register_visual(
    "CABLE", "./event_latency_assets/cable.png")

cable = Cable(env, 9)
env.process(Sender(env, cable).run())
env.process(Receiver(env, cable).run())

env.run(until=SIM_DURATION)

# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()

# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")

# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output

# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)