Examples¶
The following examples are based on SimPy’s examples.
Bank Renege¶
Note
The following images are referenced in the example below.
Reception icons created by Freepik - Flaticon
"""
Bank renege example
Covers:
- Resources: Resource
- Condition events
Scenario:
A counter with a random service time and customers who renege. Based on the
program bank08.py from TheBank tutorial of SimPy 2. (KGM)
"""
from IPython.display import display
import random
from simplay import VisualEnvironment, VisualProcess, VisualResource, VisualGrid
RANDOM_SEED = 42
NEW_CUSTOMERS = 6 # Total number of customers
INTERVAL_CUSTOMERS = 10.0 # Generate new customers roughly every x seconds
MIN_PATIENCE = 1 # Min. customer patience
MAX_PATIENCE = 3 # Max. customer patience
free_positions = [[1, 0], [1, 1], [1, 2]]
def source(env, number, interval, counter):
"""Source generates customers randomly"""
for i in range(number):
c = Customer(env, 'Customer%02d' % i, counter, time_in_bank=12.0)
env.process(c.run())
t = random.expovariate(1.0 / interval)
yield env.timeout(t)
class Customer(VisualProcess):
def __init__(self, env, id, counter, time_in_bank):
color = int(random.random() * 0xFFFFFF)
super().__init__(env, id, visual="CUSTOMER", tint=color)
self.counter = counter
self.time_in_bank = time_in_bank
self.position = free_positions.pop(0)
def run(self):
"""Customer arrives, is served and leaves."""
self.is_visible()
self.is_at(self.position[0], self.position[1])
arrive = self.env.now
print('%7.4f %s: Here I am' % (arrive, self.id))
with self.counter.request() as req:
patience = random.uniform(MIN_PATIENCE, MAX_PATIENCE)
# Wait for the counter or abort at the end of our tether
results = yield req | env.timeout(patience)
wait = self.env.now - arrive
if req in results:
# We got to the counter
self.is_near(self.counter)
self.is_interacting_with(self.counter)
print('%7.4f %s: Waited %6.3f' % (self.env.now, self.id, wait))
tib = random.expovariate(1.0 / self.time_in_bank)
yield self.env.timeout(tib)
self.is_invisible()
self.is_no_longer_interacting_with(self.counter)
free_positions.append(self.position)
print('%7.4f %s: Finished' % (self.env.now, self.id))
else:
# We reneged
print('%7.4f %s: RENEGED after %6.3f' %
(self.env.now, self.id, wait))
self.is_invisible()
self.is_no_longer_interacting_with(self.counter)
free_positions.append(self.position)
# Setup and start the simulation
print('Bank renege')
random.seed(RANDOM_SEED)
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 2, 3)
grid.set_area("counter", "Counter", 3, 1, 0, 0, 0x707070)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
"CUSTOMER", "./bank_assets/user.png")
env.visualization_manager.register_visual(
"COUNTER", "./bank_assets/counter.png")
# Start processes and run
class Counter(VisualResource):
def __init__(self, env):
super().__init__(env, "Counter", 1, visual="COUNTER")
self.is_at(0, 1)
self.is_visible()
counter = Counter(env)
env.process(source(env, NEW_CUSTOMERS, INTERVAL_CUSTOMERS, counter))
env.run()
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()
# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")
# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output
# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)
Carwash¶
Note
The following images are referenced in the example below.
Car icons created by Freepik - Flaticon
"""
Carwash example.
Covers:
- Waiting for other processes
- Resources: Resource
Scenario:
A carwash has a limited number of washing machines and defines
a washing processes that takes some (random) time.
Car processes arrive at the carwash at a random time. If one washing
machine is available, they start the washing process and wait for it
to finish. If not, they wait until they an use one.
"""
import random
from simplay import VisualEnvironment, VisualProcess, VisualResource, VisualGrid
RANDOM_SEED = 42
NUM_MACHINES = 2 # Number of machines in the carwash
WASHTIME = 5 # Minutes it takes to clean a car
T_INTER = 7 # Create a car every ~7 minutes
SIM_TIME = 20 # Simulation time in minutes
class Carwash(VisualResource):
"""A carwash has a limited number of machines (``NUM_MACHINES``) to
clean cars in parallel.
Cars have to request one of the machines. When they got one, they
can start the washing processes and wait for it to finish (which
takes ``washtime`` minutes).
"""
def __init__(self, env, num_machines, washtime):
super().__init__(env, "Carwash", num_machines, visual="CARWASH")
self.washtime = washtime
self.is_at(1, 3)
self.is_visible()
def wash(self, car):
"""The washing processes. It takes a ``car`` processes and tries
to clean it."""
yield self.env.timeout(WASHTIME)
print("Carwash removed %d%% of %s's dirt." %
(random.randint(50, 99), car))
class Car(VisualProcess):
def __init__(self, env, id, cw):
color = int(random.random() * 0xFFFFFF)
super().__init__(env, id, visual="CAR", tint=color)
self.cw = cw
def run(self):
"""The car process (each car has a ``name``) arrives at the carwash
(``cw``) and requests a cleaning machine.
It then starts the washing process, waits for it to finish and
leaves to never come back ...
"""
print('%s arrives at the carwash at %.2f.' % (self.id, env.now))
self.is_visible()
self.is_near_cell(4, 2)
with self.cw.request() as request:
yield request
self.is_near(self.cw)
self.is_interacting_with(self.cw)
print('%s enters the carwash at %.2f.' % (self.id, env.now))
yield env.process(self.cw.wash(self.id))
print('%s leaves the carwash at %.2f.' % (self.id, env.now))
self.is_invisible()
self.is_no_longer_interacting_with(self.cw)
def setup(env, num_machines, washtime, t_inter):
"""Create a carwash, a number of initial cars and keep creating cars
approx. every ``t_inter`` minutes."""
# Create the carwash
carwash = Carwash(env, num_machines, washtime)
# Create 4 initial cars
for i in range(4):
env.process(Car(env, 'Car %d' % i, carwash).run())
# Create more cars while the simulation is running
while True:
yield env.timeout(random.randint(t_inter - 2, t_inter + 2))
i += 1
env.process(Car(env, 'Car %d' % i, carwash).run())
# Setup and start the simulation
print('Carwash')
print('Check out http://youtu.be/fXXmeP9TvBg while simulating ... ;-)')
random.seed(RANDOM_SEED) # This helps reproducing the results
# Create an environment and start the setup process
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 6, 6)
grid.set_area("Carwash", "Carwash", 6, 3, 0, 0, 0x707070)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
"CAR", "./carwash_assets/car.png")
env.visualization_manager.register_visual(
"CARWASH", "./carwash_assets/carwash.png")
env.process(setup(env, NUM_MACHINES, WASHTIME, T_INTER))
# Execute!
env.run(until=SIM_TIME)
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()
# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")
# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output
# if so, uncomment the following lines
#from IPython.display import display
#display(env.visualization_manager.serialize_for_jupyter(), raw=True)
Machine Shop¶
Note
The following images are referenced in the example below.
Machine icons created by Freepik - Flaticon
Note
This simulation takes a while to run.
"""
Machine shop example
Covers:
- Interrupts
- Resources: PreemptiveResource
Scenario:
A workshop has *n* identical machines. A stream of jobs (enough to
keep the machines busy) arrives. Each machine breaks down
periodically. Repairs are carried out by one repairman. The repairman
has other, less important tasks to perform, too. Broken machines
preempt theses tasks. The repairman continues them when he is done
with the machine repair. The workshop works continuously.
"""
import random
from simplay import VisualEnvironment, VisualProcess, VisualPreemptiveResource, VisualGrid
import simpy
RANDOM_SEED = 42
PT_MEAN = 10.0 # Avg. processing time in minutes
PT_SIGMA = 2.0 # Sigma of processing time
MTTF = 300.0 # Mean time to failure in minutes
BREAK_MEAN = 1 / MTTF # Param. for expovariate distribution
REPAIR_TIME = 30.0 # Time it takes to repair a machine in minutes
JOB_DURATION = 30.0 # Duration of other jobs in minutes
NUM_MACHINES = 10 # Number of machines in the machine shop
WEEKS = 4 # Simulation time in weeks
SIM_TIME = WEEKS * 7 * 24 * 60 # Simulation time in minutes
def time_per_part():
"""Return actual processing time for a concrete part."""
return random.normalvariate(PT_MEAN, PT_SIGMA)
def time_to_failure():
"""Return time until next failure for a machine."""
return random.expovariate(BREAK_MEAN)
machine_positions = [[0, 0], [1, 0], [2, 0], [3, 0],
[4, 0], [5, 0], [1, 1], [2, 1], [3, 1], [4, 1]]
class Machine(VisualProcess):
"""A machine produces parts and my get broken every now and then.
If it breaks, it requests a *repairman* and continues the production
after the it is repaired.
A machine has a *name* and a numberof *parts_made* thus far.
"""
def __init__(self, env, name, repairman: VisualPreemptiveResource):
color = int(random.random() * 0x00FFFF)
super().__init__(env, name, "MACHINE", color)
self.parts_made = 0
self.broken = False
self.repairman = repairman
self.position = machine_positions.pop(0)
self.is_at(self.position[0], self.position[1])
self.is_visible()
# Start "working" and "break_machine" processes for this machine.
self.process = env.process(self.working())
env.process(self.break_machine())
def working(self):
"""Produce parts as long as the simulation runs.
While making a part, the machine may break multiple times.
Request a repairman when this happens.
"""
while True:
# Start making a new part
done_in = time_per_part()
while done_in:
try:
# Working on the part
start = self.env.now
yield self.env.timeout(done_in)
done_in = 0 # Set to 0 to exit while loop.
except simpy.Interrupt:
self.broken = True
done_in -= self.env.now - start # How much time left?
# Request a repairman. This will preempt its "other_job".
with self.repairman.request(priority=1) as req:
yield req
self.repairman.is_near(self)
self.repairman.is_interacting_with(self)
yield self.env.timeout(REPAIR_TIME)
self.repairman.is_no_longer_interacting_with(self)
self.repairman.is_at(
self.repairman.home[0], self.repairman.home[1])
self.broken = False
self.has_original_tint()
# Part is done.
self.parts_made += 1
self.has_decorating_text("Parts made: " + str(self.parts_made))
def break_machine(self):
"""Break the machine every now and then."""
while True:
yield self.env.timeout(time_to_failure())
if not self.broken:
# Only break the machine if it is currently working.
self.process.interrupt()
self.has_tint(0xFF0000)
def other_jobs(env, repairman):
"""The repairman's other (unimportant) job."""
while True:
# Start a new job
done_in = JOB_DURATION
while done_in:
# Retry the job until it is done.
# It's priority is lower than that of machine repairs.
with repairman.request(priority=2) as req:
yield req
try:
start = env.now
yield env.timeout(done_in)
done_in = 0
except simpy.Interrupt:
done_in -= env.now - start
# Setup and start the simulation
print('Machine shop')
random.seed(RANDOM_SEED) # This helps reproducing the results
# Create an environment and start the setup process
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 6, 5)
grid.set_area("OtherJobs", "Other jobs", 3, 6, 0, 2, 0x707070)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
"REPAIRMAN", "./machine_shop_assets/repairman.png")
env.visualization_manager.register_visual(
"MACHINE", "./machine_shop_assets/machine.png")
class Repairman(VisualPreemptiveResource):
def __init__(self, env, capacity):
super().__init__(env, "Repairman", capacity, visual="REPAIRMAN")
self.home = [2, 3]
self.is_at(2, 3)
self.is_visible()
repairman = Repairman(env, capacity=1)
machines = [Machine(env, 'Machine %d' % i, repairman)
for i in range(NUM_MACHINES)]
env.process(other_jobs(env, repairman))
# Execute!
env.run(until=SIM_TIME)
# Analyis/results
print('Machine shop results after %s weeks' % WEEKS)
for machine in machines:
print('%s made %d parts.' % (machine.id, machine.parts_made))
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()
# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")
# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output
# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)
Movie Renege¶
Note
The following images are referenced in the example below.
Reception icons created by Freepik - Flaticon
"""
Movie renege example
Covers:
- Resources: Resource
- Condition events
- Shared events
Scenario:
A movie theatre has one ticket counter selling tickets for three
movies (next show only). When a movie is sold out, all people waiting
to buy tickets for that movie renege (leave queue).
"""
import collections
import random
from simplay import VisualEnvironment, VisualGrid, VisualResource, VisualProcess
RANDOM_SEED = 42
TICKETS = 50 # Number of tickets per movie
SIM_TIME = 120 # Simulate until
row_current_x = 1
row_current_y = 0
COLS = 5
ROWS = 16
def define_next_moviegoer_position():
global row_current_x
global row_current_y
if row_current_x == COLS:
row_current_y += 1
row_current_x = 0
else:
row_current_x += 1
class Moviegoer(VisualProcess):
def __init__(self, env, id, movie, num_tickets, theater):
color = int(random.random() * 0xFFFFFF)
super().__init__(env, id, "CUSTOMER", color)
self.movie = movie
self.num_tickets = num_tickets
self.theater = theater
self.is_at(row_current_x, row_current_y)
define_next_moviegoer_position()
self.is_visible()
def run(self):
"""A moviegoer tries to by a number of tickets (*num_tickets*) for
a certain *movie* in a *theater*.
If the movie becomes sold out, she leaves the theater. If she gets
to the counter, she tries to buy a number of tickets. If not enough
tickets are left, she argues with the teller and leaves.
If at most one ticket is left after the moviegoer bought her
tickets, the *sold out* event for this movie is triggered causing
all remaining moviegoers to leave.
"""
with self.theater.counter.request() as my_turn:
# Wait until its our turn or until the movie is sold out
result = yield my_turn | self.theater.sold_out[self.movie]
self.is_near(self.theater.counter)
self.is_interacting_with(self.theater.counter)
# Check if it's our turn or if movie is sold out
if my_turn not in result:
self.theater.num_renegers[self.movie] += 1
self.is_invisible()
self.is_no_longer_interacting_with(self.theater.counter)
return
# Check if enough tickets left.
if self.theater.available[self.movie] < self.num_tickets:
# Moviegoer leaves after some discussion
yield self.env.timeout(0.5)
self.is_invisible()
self.is_no_longer_interacting_with(self.theater.counter)
return
# Buy tickets
self.theater.available[self.movie] -= self.num_tickets
if self.theater.available[self.movie] < 2:
# Trigger the "sold out" event for the movie
self.theater.sold_out[self.movie].succeed()
self.theater.when_sold_out[self.movie] = env.now
self.theater.available[self.movie] = 0
yield self.env.timeout(1)
self.is_invisible()
self.is_no_longer_interacting_with(self.theater.counter)
def customer_arrivals(env, theater):
"""Create new *moviegoers* until the sim time reaches 120."""
while True:
yield env.timeout(random.expovariate(1 / 0.5))
movie = random.choice(theater.movies)
num_tickets = random.randint(1, 6)
if theater.available[movie]:
env.process(Moviegoer(
env, "Movigoer" + str(random.random()), movie, num_tickets, theater).run())
Theater = collections.namedtuple('Theater', 'counter, movies, available, '
'sold_out, when_sold_out, '
'num_renegers')
# Setup and start the simulation
print('Movie renege')
random.seed(RANDOM_SEED)
env = VisualEnvironment()
env.visualization_manager.register_visual('COUNTER', './movie_renege_assets/counter.png')
env.visualization_manager.register_visual('CUSTOMER', './movie_renege_assets/customer.png')
grid = VisualGrid(1000, 1000, COLS, ROWS)
env.visualization_manager.set_grid(grid)
# Create movie theater
counter = VisualResource(env, "Counter", 1, "COUNTER")
counter.is_at(0, 0)
counter.is_visible()
movies = ['Python Unchained', 'Kill Process', 'Pulp Implementation']
available = {movie: TICKETS for movie in movies}
sold_out = {movie: env.event() for movie in movies}
when_sold_out = {movie: None for movie in movies}
num_renegers = {movie: 0 for movie in movies}
theater = Theater(counter, movies, available, sold_out, when_sold_out,
num_renegers)
# Start process and run
env.process(customer_arrivals(env, theater))
env.run(until=SIM_TIME)
# Analysis/results
for movie in movies:
if theater.sold_out[movie]:
print('Movie "%s" sold out %.1f minutes after ticket counter '
'opening.' % (movie, theater.when_sold_out[movie]))
print(' Number of people leaving queue when film sold out: %s' %
theater.num_renegers[movie])
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()
# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")
# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output
# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)
Gas Station Refueling¶
Note
The following images are referenced in the example below.
Car icons created by Freepik - Flaticon
Fuel icons created by Good Ware - Flaticon
Adapted from Gas Tank icon created by Freepik - Flaticon
Adapted from Gas Tank icon created by Freepik - Flaticon
Adapted from Gas Tank icon created by Freepik - Flaticon
Adapted from Gas Tank icon created by Freepik - Flaticon
Adapted from Gas Tank icon created by Freepik - Flaticon
"""
Gas Station Refueling example
Covers:
- Resources: Resource
- Resources: Container
- Waiting for other processes
Scenario:
A gas station has a limited number of gas pumps that share a common
fuel reservoir. Cars randomly arrive at the gas station, request one
of the fuel pumps and start refueling from that reservoir.
A gas station control process observes the gas station's fuel level
and calls a tank truck for refueling if the station's level drops
below a threshold.
"""
import itertools
import random
from simplay import (
VisualEnvironment, VisualContainer, VisualProcess,
VisualResource, VisualGrid
)
RANDOM_SEED = 42
GAS_STATION_SIZE = 500 # liters
THRESHOLD = 20 # Threshold for calling the tank truck (in %)
FUEL_TANK_SIZE = 50 # liters
FUEL_TANK_LEVEL = [5, 25] # Min/max levels of fuel tanks (in liters)
REFUELING_SPEED = 2 # liters / second
TANK_TRUCK_TIME = 300 # Seconds it takes the tank truck to arrive
T_INTER = [10, 100] # Create a car every [min, max] seconds
SIM_TIME = 10000 # Simulation time in seconds
class Car(VisualProcess):
def __init__(self, name, env, gas_station, fuel_pump):
color = int(random.random() * 0xFFFFFF)
super().__init__(env, name, visual="CAR", tint=color)
self.gas_station = gas_station
self.fuel_pump = fuel_pump
def run(self):
self.is_visible()
self.is_near_cell(2, 1)
fuel_tank_level = random.randint(*FUEL_TANK_LEVEL)
with self.gas_station.request() as req:
start = self.env.now
# Request one of the gas pumps
yield req
self.is_near(self.gas_station)
self.is_interacting_with(self.gas_station)
# Get the required amount of fuel
liters_required = FUEL_TANK_SIZE - fuel_tank_level
yield self.fuel_pump.get(liters_required)
# The "actual" refueling process takes some time
yield self.env.timeout(liters_required / REFUELING_SPEED)
self.is_no_longer_interacting_with(self.gas_station)
self.is_invisible()
def gas_station_control(env, fuel_pump):
"""Periodically check the level of the *fuel_pump* and call the tank
truck if the level falls below a threshold."""
truck = TankTruck(env, fuel_pump)
while True:
if fuel_pump.level / fuel_pump.capacity * 100 < THRESHOLD:
# We need to call the tank truck now!
# Wait for the tank truck to arrive and refuel the station
yield env.process(truck.run())
yield env.timeout(10) # Check every 10 seconds
class TankTruck(VisualProcess):
def __init__(self, env, fuel_pump):
super().__init__(env, "Tank Truck", visual="TANK_TRUCK", tint=0xFF0000)
self.fuel_pump = fuel_pump
def run(self):
self.is_visible()
self.is_at(0, 0)
self.is_near(self.fuel_pump)
self.is_interacting_with(self.fuel_pump)
yield self.env.timeout(TANK_TRUCK_TIME)
ammount = self.fuel_pump.capacity - self.fuel_pump.level
yield self.fuel_pump.put(ammount)
self.is_no_longer_interacting_with(self.fuel_pump)
self.is_invisible()
def car_generator(env, gas_station, fuel_pump):
"""Generate new cars that arrive at the gas station."""
for i in itertools.count():
yield env.timeout(random.randint(*T_INTER))
car = Car("Car %d" % i, env, gas_station, fuel_pump)
env.process(car.run())
class GasStation(VisualResource):
def __init__(self, env):
super().__init__(env, "Gas Station", 3, visual="GAS_STATION",
tint=0x00FF00)
self.is_at(3, 1)
self.is_visible()
class FuelPump(VisualContainer):
def __init__(self, env):
super().__init__(
env,
"Fuel Pump",
capacity=GAS_STATION_SIZE,
init=GAS_STATION_SIZE,
visual="FUEL_PUMP",
tint=0x000001,
)
self.is_at(1, 1)
self.is_visible()
self.has_frame(4)
def update_sprite(self):
fillPercentage = self.level / self.capacity
if fillPercentage < 0.25:
self.has_frame(0)
elif fillPercentage < 0.5:
self.has_frame(1)
elif fillPercentage < 0.75:
self.has_frame(2)
elif fillPercentage < 1:
self.has_frame(3)
else:
self.has_frame(4)
def get(self, amount):
cget = super().get(amount)
self.update_sprite()
return cget
def put(self, amount):
cput = super().put(amount)
self.update_sprite()
return cput
# Setup and start the simulation
random.seed(RANDOM_SEED)
# Create environment and start processes
env = VisualEnvironment()
env.visualization_manager.register_visual("CAR", "./gas_station_assets/car.png")
env.visualization_manager.register_visual(
"TANK_TRUCK", "./gas_station_assets/truck.png")
env.visualization_manager.register_visual(
"GAS_STATION", "./gas_station_assets/gaspump.png")
env.visualization_manager.register_sprites(
"FUEL_PUMP",
[
"./gas_station_assets/pump_000.png",
"./gas_station_assets/pump_025.png",
"./gas_station_assets/pump_050.png",
"./gas_station_assets/pump_075.png",
"./gas_station_assets/pump_100.png",
],
)
grid = VisualGrid(500, 500, 5, 3)
grid.set_area("gasstation01", "GAS_STATION", 3, 5, 0, 0, 0xbdbbbb)
env.visualization_manager.set_grid(grid)
gas_station = GasStation(env)
fuel_pump = FuelPump(env)
env.process(gas_station_control(env, fuel_pump))
env.process(car_generator(env, gas_station, fuel_pump))
# Execute!
env.run(until=SIM_TIME)
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()
# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")
# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output
# if so, uncomment the following lines
#from IPython.display import display
#display(env.visualization_manager.serialize_for_jupyter(), raw=True)
Process Communication¶
Note
The following images are referenced in the example below.
Generator icons created by Freepik - Flaticon
"""
Process communication example
Covers:
- Resources: Store
Scenario:
This example shows how to interconnect simulation model elements
together using :class:`~simpy.resources.store.Store` for one-to-one,
and many-to-one asynchronous processes. For one-to-many a simple
BroadCastPipe class is constructed from Store.
When Useful:
When a consumer process does not always wait on a generating process
and these processes run asynchronously. This example shows how to
create a buffer and also tell is the consumer process was late
yielding to the event from a generating process.
This is also useful when some information needs to be broadcast to
many receiving processes
Finally, using pipes can simplify how processes are interconnected to
each other in a simulation model.
Example By:
Keith Smith
"""
import random
from simplay import VisualEnvironment, VisualStore, VisualGrid, VisualProcess
import simpy
RANDOM_SEED = 42
SIM_TIME = 100
output_conn_positions = [[0, 1], [2, 1]]
generator_positions = [[1, 0], [2, 0]]
class BroadcastPipe(object):
"""A Broadcast pipe that allows one process to send messages to many.
This construct is useful when message consumers are running at
different rates than message generators and provides an event
buffering to the consuming processes.
The parameters are used to create a new
:class:`~simpy.resources.store.Store` instance each time
:meth:`get_output_conn()` is called.
"""
def __init__(self, env, capacity=1000):
self.env = env
self.capacity = capacity
self.pipes = []
def put(self, value):
"""Broadcast a *value* to all receivers."""
if not self.pipes:
raise RuntimeError('There are no output pipes.')
events = [store.put(value) for store in self.pipes]
return self.env.all_of(events) # Condition event for all "events"
def get_output_conn(self):
"""Get a new output connection for this broadcast pipe.
The return value is a :class:`~simpy.resources.store.Store`.
"""
position = output_conn_positions.pop(0)
pipe = VisualStore(
self.env, "pipe" + str(position[0]) + str(position[1]), "PIPE", capacity=self.capacity)
pipe.is_at(position[0], position[1])
pipe.is_visible()
self.pipes.append(pipe)
return pipe
def has_tint(self, color):
[store.has_tint(color) for store in self.pipes]
class Message_generator(VisualProcess):
def __init__(self, env: VisualEnvironment, id: str, out_pipe: VisualStore):
super().__init__(env, id, "GENERATOR", 0xFFFFFF)
self.out_pipe = out_pipe
position = generator_positions.pop(0)
self.is_at(position[0], position[1])
self.is_visible()
def run(self):
"""A process which randomly generates messages."""
while True:
# wait for next transmission
yield self.env.timeout(random.randint(6, 10))
# messages are time stamped to later check if the consumer was
# late getting them. Note, using event.triggered to do this may
# result in failure due to FIFO nature of simulation yields.
# (i.e. if at the same env.now, message_generator puts a message
# in the pipe first and then message_consumer gets from pipe,
# the event.triggered will be True in the other order it will be
# False
msg = (self.env.now, '%s says hello at %d' %
(self.id, self.env.now))
if hasattr(self.out_pipe, "pipes"):
[self.is_interacting_with(store)
for store in self.out_pipe.pipes]
else:
self.is_interacting_with(self.out_pipe)
self.out_pipe.put(msg)
# If the out_pipe has a delay in consuming the pipe will be marked yellow
self.out_pipe.has_tint(0xFFFF00)
yield self.env.timeout(1)
if hasattr(self.out_pipe, "pipes"):
[self.is_no_longer_interacting_with(
store) for store in self.out_pipe.pipes]
else:
self.is_no_longer_interacting_with(self.out_pipe)
def message_consumer(name, env: VisualEnvironment, in_pipe: VisualStore):
"""A process which consumes messages."""
while True:
# Get event for message pipe
msg = yield in_pipe.get()
if msg[0] < env.now:
# if message was already put into pipe, then
# message_consumer was late getting to it. Depending on what
# is being modeled this, may, or may not have some
# significance
print('LATE Getting Message: at time %d: %s received message: %s' %
(env.now, name, msg[1]))
in_pipe.has_decorating_text('LATE Getting Message: at time %d: %s received message: \n %s' %
(env.now, name, msg[1]))
in_pipe.has_tint(0xFF0000)
else:
# message_consumer is synchronized with message_generator
print('at time %d: %s received message: %s.' %
(env.now, name, msg[1]))
in_pipe.has_decorating_text(str('at time %d: %s received message: \n %s.' %
(env.now, name, msg[1])))
in_pipe.has_tint(0x00FF00)
# Process does some other work, which may result in missing messages
yield env.timeout(random.randint(4, 8))
# Setup and start the simulation
print('Process communication')
random.seed(RANDOM_SEED)
env = VisualEnvironment()
env.visualization_manager.register_visual(
"PIPE", "./process_communication_assets/message_consumer.png")
env.visualization_manager.register_visual(
"GENERATOR", "./process_communication_assets/generator.png")
grid = VisualGrid(1000, 1000, 3, 3)
env.visualization_manager.set_grid(grid)
# For one-to-one or many-to-one type pipes, use Store
pipe = VisualStore(env, "pipe", "PIPE", capacity=1000)
pipe.is_at(1, 1)
pipe.is_visible()
env.process(Message_generator(env, 'Generator A', pipe).run())
env.process(message_consumer('Consumer A', env, pipe))
print('\nOne-to-one pipe communication\n')
# For one-to many use BroadcastPipe comment out the lines 176 to 181 and comment the lines 168 to 173
# (Note: could also be used for one-to-one,many-to-one or many-to-many)
# bc_pipe = BroadcastPipe(env)
# env.process(Message_generator(env, 'Generator A', bc_pipe).run())
# env.process(message_consumer('Consumer A', env, bc_pipe.get_output_conn()))
# env.process(message_consumer('Consumer B', env, bc_pipe.get_output_conn()))
# print('\nOne-to-many pipe communication\n')
env.run(until=SIM_TIME)
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()
# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")
# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output
# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)
Event Latency¶
Note
The following images are referenced in the example below.
Cable icons created by Freepik - Flaticon
Inbox icons created by Pixel perfect - Flaticon
"""
Event Latency example
Covers:
- Resources: Store
Scenario:
This example shows how to separate the time delay of events between
processes from the processes themselves.
When Useful:
When modeling physical things such as cables, RF propagation, etc. it
better encapsulation to keep this propagation mechanism outside of the
sending and receiving processes.
Can also be used to interconnect processes sending messages
Example by:
Keith Smith
"""
import simpy
from simplay import VisualEnvironment, VisualGrid, VisualProcess, VisualStore
SIM_DURATION = 100
class Cable(VisualStore):
"""This class represents the propagation through a cable."""
def __init__(self, env, delay):
super().__init__(env, "Cable", "CABLE", capacity=1000)
self.delay = delay
self.is_at(1, 0)
self.number_of_messages_in_cable = 0
self.is_visible()
def latency(self, value):
yield self.env.timeout(self.delay)
super().put(value)
def put(self, value):
self.number_of_messages_in_cable += 1
self.has_decorating_text(
str('number of messages in cable %d' % self.number_of_messages_in_cable))
self.has_tint(0xFFFF00)
self.env.process(self.latency(value))
def get(self):
if self.number_of_messages_in_cable > 0:
self.number_of_messages_in_cable -= 1
self.has_decorating_text(
str('number of messages in cable %d' % self.number_of_messages_in_cable))
return super().get()
class Sender(VisualProcess):
def __init__(self, env: VisualEnvironment, cable: Cable):
super().__init__(env, "Sender", "SENDER")
self.cable = cable
self.is_at(0, 0)
self.is_visible()
def run(self):
"""A process which randomly generates messages."""
while True:
# wait for next transmission
yield self.env.timeout(4)
self.is_interacting_with(self.cable)
yield self.env.timeout(1)
self.cable.put('Sender sent this at %d' % self.env.now)
self.is_no_longer_interacting_with(self.cable)
class Receiver(VisualProcess):
def __init__(self, env: VisualEnvironment, cable: Cable):
super().__init__(env, "Receiver", "RECEIVER")
self.cable = cable
self.is_at(2, 0)
self.is_visible()
def run(self):
"""A process which consumes messages."""
while True:
# Get event for message pipe
msg = yield self.cable.get()
self.is_interacting_with(self.cable)
yield self.env.timeout(1)
print('Received this at %d while %s' % (self.env.now, msg))
self.has_decorating_text(
str('Received this at %d while %s' % (self.env.now, msg)))
self.is_no_longer_interacting_with(self.cable)
# Setup and start the simulation
print('Event Latency')
env = VisualEnvironment()
grid = VisualGrid(1000, 1000, 3, 1)
env.visualization_manager.set_grid(grid)
env.visualization_manager.register_visual(
"SENDER", "./event_latency_assets/sender.png")
env.visualization_manager.register_visual(
"RECEIVER", "./event_latency_assets/receiver.png")
env.visualization_manager.register_visual(
"CABLE", "./event_latency_assets/cable.png")
cable = Cable(env, 9)
env.process(Sender(env, cable).run())
env.process(Receiver(env, cable).run())
env.run(until=SIM_DURATION)
# you can extract the output from the visualization manager
output = env.visualization_manager.serialize()
# or save the output to a file directly
env.visualization_manager.write_to_file("output.simplay")
# or, if you're working in jupyter lab and have the simplay-jupyter
# extensions installed, you can display the output
# if so, uncomment the following lines
# from IPython.display import display
# display(env.visualization_manager.serialize_for_jupyter(), raw=True)