Commit 20f8c9d2 authored by Ahmad Reza's avatar Ahmad Reza
Browse files

Merge branches '3D-swarm-sim' and 'leader_coating' of...

Merge branches '3D-swarm-sim' and 'leader_coating' of gitlab.cs.uni-duesseldorf.de:cheraghi/swarm-sim into leader_coating

Conflicts:
	grids/TriangularGrid.py
parent f9e0f21f
"""The world module provides the interface of the simulation world. In the simulation world
all the data of the particles, tiles, and locations are stored.
It also have the the coordination system and stated the maximum of the x and y coordinate.
"""
.. module:: particle
:platform: Unix, Windows
:synopsis: This module provides the interfaces of the robotics particle
.. moduleauthor:: Ahmad Reza Cheraghi
TODO: Erase Memory
.. todo:: What happens if the maximum y or x axis is passed? Either the start from the other side or turns back.
"""
import importlib
import logging
import random
import threading
import os
import datetime
from lib import csv_generator, particle, tile, location, vis3d
from lib.swarm_sim_header import eprint
class World:
def __init__(self, config_data):
"""
Initializing the world constructor
:param config_data: configuration data from config.ini file
"""
self.__round_counter = 1
self.__end = False
self.init_particles = []
self.particle_id_counter = 0
self.particles = []
self.particle_map_coordinates = {}
self.particle_map_id = {}
self.particles_created = []
self.particle_rm = []
self.__particle_deleted = False
self.new_particle = None
self.tiles = []
self.tile_map_coordinates = {}
self.tile_map_id = {}
self.tiles_created = []
self.tiles_rm = []
self.__tile_deleted = False
self.new_tile = None
self.__tile_deleted = False
self.locations = []
self.location_map_coordinates = {}
self.location_map_id = {}
self.locations_created = []
self.locations_rm = []
self.__location_deleted = False
self.new_location = None
self.config_data = config_data
self.grid = config_data.grid
self.csv_round = csv_generator.CsvRoundData(scenario=config_data.scenario,
solution=config_data.solution,
seed=config_data.seed_value,
directory=config_data.direction_name)
if config_data.visualization:
self.vis = vis3d.Visualization(self)
else:
self.vis = None
mod = importlib.import_module('scenario.' + self.config_data.scenario)
if config_data.visualization:
import threading
x = threading.Thread(target=mod.scenario, args=(self,))
self.vis.wait_for_thread(x, "loading scenario... please wait.", "Loading Scenario")
else:
mod.scenario(self)
if self.config_data.particle_random_order:
random.shuffle(self.particles)
def reset(self):
"""
resets everything (particles, tiles, locations) except for the logging in system.log and in the csv file...
reloads the scenario.
:return:
"""
self.__round_counter = 1
self.__end = False
self.init_particles = []
self.particle_id_counter = 0
self.particles = []
self.particles_created = []
self.particle_rm = []
self.particle_map_coordinates = {}
self.particle_map_id = {}
self.__particle_deleted = False
self.new_particle = None
from lib import csv_generator, matter
from lib.swarm_sim_header import *
class Particle(matter.Matter):
def __init__(self, world, coordinates, color, particle_counter=0):
"""Initializing the particle constructor"""
super().__init__(world, coordinates, color,
type="particle", mm_size=world.config_data.particle_mm_size)
self.number = particle_counter
self.__isCarried = False
self.carried_tile = None
self.carried_particle = None
self.steps = 0
self.csv_particle_writer = csv_generator.CsvParticleData(self.get_id(), self.number)
def has_tile(self):
if self.carried_tile is None:
return False
else:
return True
def has_particle(self):
if self.carried_particle is None:
return False
else:
return True
self.tiles = []
self.tiles_created = []
self.tiles_rm = []
self.tile_map_coordinates = {}
self.tile_map_id = {}
self.__tile_deleted = False
self.new_tile = None
def get_carried_status(self):
"""
Get the status if it is taken or not
self.locations = []
self.locations_created = []
self.location_map_coordinates = {}
self.location_map_id = {}
self.locations_rm = []
self.__location_deleted = False
self.new_location = None
:return: Tiles status
"""
return self.__isCarried
if self.config_data.visualization:
self.vis.reset()
def check_on_tile(self):
"""
Checks if the particle is on a tile
mod = importlib.import_module('scenario.' + self.config_data.scenario)
:return: True: On a tile; False: Not on a Tile
"""
if self.coordinates in self.world.tile_map_coordinates:
return True
else:
return False
if self.config_data.visualization:
# if visualization is on, run the scenario in a separate thread and show that the program runs..
x = threading.Thread(target=mod.scenario, args=(self,))
self.vis.wait_for_thread(x, "loading scenario... please wait.", "Loading Scenario")
self.vis.update_visualization_data()
def check_on_particle(self):
"""
Checks if the particle is on a particle
:return: True: On a particle; False: Not on a particle
"""
if self.coordinates in self.world.particle_map_coordinates:
return True
else:
# if no vis, just run the scenario on the main thread
mod.scenario(self)
return False
if self.config_data.particle_random_order:
random.shuffle(self.particles)
def check_on_location(self):
"""
Checks if the particle is on a location
def save_scenario(self):
:return: True: On a location; False: Not on a location
"""
if self.coordinates in self.world.location_map_coordinates:
return True
else:
return False
# create scenario folder, if it doesn't already exist.
if not os.path.exists("scenario") or not os.path.isdir("scenario"):
os.mkdir("scenario")
def move_to(self, direction):
"""
Moves the particle to the given direction
:param direction: The direction is defined by loaded grid class
:return: True: Success Moving; False: Non moving
"""
direction_coord = get_coordinates_in_direction(self.coordinates, direction)
direction_coord = self.check_within_border(direction, direction_coord)
if self.world.grid.are_valid_coordinates(direction_coord) \
and direction_coord not in self.world.particle_map_coordinates:
if self.coordinates in self.world.particle_map_coordinates:
del self.world.particle_map_coordinates[self.coordinates]
self.coordinates = direction_coord
self.world.particle_map_coordinates[self.coordinates] = self
if self.world.vis is not None:
self.world.vis.particle_changed(self)
logging.info("particle %s successfully moved to %s", str(self.get_id()), direction)
self.world.csv_round.update_metrics(steps=1)
self.csv_particle_writer.write_particle(steps=1)
self.check_for_carried_tile_or_particle()
return True
# if the scenario folder exists, try to create and save the new scenario file, if it fails print the error.
if os.path.exists("scenario") and os.path.isdir("scenario"):
now = datetime.datetime.now()
filename = str("scenario/%d-%d-%d_%d-%d-%d_scenario.py"
% (now.year, now.month, now.day, now.hour, now.minute, now.second))
try:
f = open(filename, "w+")
f.write("def scenario(world):\n")
for p in self.particle_map_coordinates.values():
f.write("\tworld.add_particle(%s, color=%s)\n" % (str(p.coordinates), str(p.get_color())))
for t in self.tile_map_coordinates.values():
f.write("\tworld.add_tile(%s, color=%s)\n" % (str(t.coordinates), str(t.get_color())))
for l in self.location_map_coordinates.values():
f.write("\tworld.add_location(%s, color=%s)\n" % (str(l.coordinates), str(l.get_color())))
f.flush()
f.close()
except IOError as e:
eprint(e)
return False
# checks if the file exists. If not, some unknown error occured while saving.
if not os.path.exists(filename) or not os.path.isfile(filename):
eprint("Error: scenario couldn't be saved due to unknown reasons.")
def check_for_carried_tile_or_particle(self):
if self.carried_tile is not None:
self.carried_tile.coordinates = self.coordinates
if self.world.vis is not None:
self.world.vis.tile_changed(self.carried_tile)
elif self.carried_particle is not None:
self.carried_particle.coordinates = self.coordinates
if self.world.vis is not None:
self.world.vis.particle_changed(self.carried_particle)
def check_within_border(self, direction, direction_coord):
if self.world.config_data.border == 1:
if self.world.config_data.type == 1:
if abs(direction_coord[0]) > self.world.get_x_size():
direction_coord = (-1 * (self.coordinates[0] - direction[0]), direction_coord[1], direction_coord[2])
if abs(direction_coord[1]) > self.world.get_y_size():
direction_coord = (direction_coord[0], -1* (self.coordinates[1] - direction[1]), direction_coord[2])
if abs(direction_coord[2]) > self.world.get_z_size():
direction_coord = (direction_coord[0], direction_coord[1], -1* (self.coordinates[2] - direction[2]))
else:
eprint("\"scenario\" folder couldn't be created.")
if abs(direction_coord[0]) > self.world.get_x_size():
direction_coord = (self.coordinates[0], direction_coord[1], direction_coord[2])
if abs(direction_coord[1]) > self.world.get_y_size():
direction_coord = (direction_coord[0], self.coordinates[1], direction_coord[2])
if abs(direction_coord[2]) > self.world.get_z_size():
direction_coord = (direction_coord[0], direction_coord[1], self.coordinates[2])
return direction_coord
def csv_aggregator(self):
self.csv_round.aggregate_metrics()
particle_csv = csv_generator.CsvParticleFile(self.config_data.direction_name)
for p in self.particles:
particle_csv.write_particle(p)
particle_csv.csv_file.close()
def read_from_with(self, target, key=None):
"""
Read the memories from the matters (particle, tile, or location object) memories with a given keyword
def set_successful_end(self):
self.csv_round.success()
# self.set_end()
:param target: The matter can be either a particle, tile, or location
:param key: A string keyword to searcg for the data in the memory
:return: The matters memory; None
"""
if key is not None:
tmp_memory = target.read_memory_with(key)
else:
tmp_memory = target.read_whole_memory()
if tmp_memory is not None \
and not (hasattr(tmp_memory, '__len__')) or len(tmp_memory) > 0:
if target.type == "particle":
self.world.csv_round.update_metrics(particle_read=1)
self.csv_particle_writer.write_particle(particle_read=1)
elif target.type == "tile":
self.world.csv_round.update_metrics(tile_read=1)
self.csv_particle_writer.write_particle(tile_read=1)
elif target.type == "location":
self.world.csv_round.update_metrics(location_read=1)
self.csv_particle_writer.write_particle(location_read=1)
return tmp_memory
return None
def matter_in(self, direction):
"""
:param direction: the directionection to check if a matter is there
:return: True: if a matter is there, False: if not
"""
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_tile_map_coordinates() \
or get_coordinates_in_direction(self.coordinates, direction) \
in self.world.get_particle_map_coordinates() \
or get_coordinates_in_direction(self.coordinates, direction) \
in self.world.get_location_map_coordinates():
return True
else:
return False
def get_max_round(self):
def tile_in(self, direction):
"""
:param direction: the direction to check if a tile is there
:return: True: if a tile is there, False: if not
"""
The max round number
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_tile_map_coordinates():
return True
else:
return False
:return: maximum round number
def particle_in(self, direction):
"""
:param direction: the direction to check if a particle is there
:return: True: if a particle is there, False: if not
"""
return self.config_data.max_round
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_particle_map_coordinates():
return True
else:
return False
def get_actual_round(self):
def location_in(self, direction):
"""
:param direction: the direction to check if a location is there
:return: True: if a location is there, False: if not
"""
The actual round number
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_location_map_coordinates():
return True
else:
return False
def get_matter_in(self, direction):
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_tile_map_coordinates():
return self.world.get_tile_map_coordinates()[get_coordinates_in_direction(self.coordinates, direction)]
elif get_coordinates_in_direction(self.coordinates, direction) in self.world.get_particle_map_coordinates():
return self.world.get_particle_map_coordinates()[get_coordinates_in_direction(self.coordinates, direction)]
elif get_coordinates_in_direction(self.coordinates, direction) in self.world.get_location_map_coordinates():
return self.world.get_location_map_coordinates()[get_coordinates_in_direction(self.coordinates, direction)]
else:
return False
def get_tile_in(self, direction):
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_tile_map_coordinates():
return self.world.get_tile_map_coordinates()[get_coordinates_in_direction(self.coordinates, direction)]
else:
return False
def get_particle_in(self, direction):
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_particle_map_coordinates():
return self.world.get_particle_map_coordinates()[get_coordinates_in_direction(self.coordinates, direction)]
else:
return False
:return: actual round number
def get_location_in(self, direction):
if get_coordinates_in_direction(self.coordinates, direction) in self.world.get_location_map_coordinates():
return self.world.get_location_map_coordinates()[get_coordinates_in_direction(self.coordinates, direction)]
else:
return False
def get_location(self):
if self.coordinates in self.world.location_map_coordinates:
return self.world.get_location_map_coordinates()[self.coordinates]
else:
return False
def get_tile(self):
if self.coordinates in self.world.get_tile_map_coordinates():
return self.world.get_tile_map_coordinates()[self.coordinates]
else:
return False
def write_to_with(self, target, key=None, data=None):
"""
return self.__round_counter
Writes data with given a keyword direction on the matters (particle, tile, or location object) memory
def set_unsuccessful_end(self):
:param target: The matter can be either a particle, tile, or location
:param key: A string keyword so to order the data that is written into the memory
:param data: The data that should be stored into the memory
:return: True: Successful written into the memory; False: Unsuccessful
"""
Allows to terminate before the max round is reached
if data is not None:
if key is None:
wrote = target.write_memory(data)
else:
wrote = target.write_memory_with(key, data)
if wrote:
if target.type == "particle":
self.world.csv_round.update_metrics(particle_write=1)
self.csv_particle_writer.write_particle(particle_write=1)
elif target.type == "tile":
self.world.csv_round.update_metrics(tile_write=1)
self.csv_particle_writer.write_particle(tile_write=1)
elif target.type == "location":
self.world.csv_round.update_metrics(location_write=1)
self.csv_particle_writer.write_particle(location_write=1)
return True
else:
return False
else:
return False
def scan_for_matters_within(self, matter_type='all', hop=1):
"""
self.__end = True
Scans for particles, tiles, or locations on a given hop distance and all the matters within the hop distance
def get_end(self):
:todo: If nothing then everything should be scanned
:param matter_type: For what matter this method should scan for.
Can be either particles, tiles, locations, or (default) all
:param hop: The hop distance from the actual position of the scanning particle
:return: A list of the founded matters
"""
Returns the end parameter values either True or False
within_hop_list = []
for i in range(hop + 1):
in_list = self.scan_for_matters_in(matter_type, i)
if in_list is not None:
within_hop_list.extend(in_list)
if len(within_hop_list) != 0:
return within_hop_list
else:
return None
def scan_for_matters_in(self, matter_type='all', hop=1):
"""
return self.__end
Scanning for particles, tiles, or locations on a given hop distance
def inc_round_counter_by(self, number=1):
:param matter_type: For what matter this method should scan for.
Can be either particles, tiles, locations, or (default) all
:param hop: The hop distance from thee actual position of the scanning particle
:return: A list of the founded matters
"""
Increases the the round counter by
:return:
logging.info("particle on %s is scanning for %s in %i hops", str(self.coordinates), matter_type, hop)
if matter_type == "particles":
scanned_list = scan_in(self.world.particle_map_coordinates, self.coordinates, hop, self.world.grid)
elif matter_type == "tiles":
scanned_list = scan_in(self.world.tile_map_coordinates, self.coordinates, hop, self.world.grid)
elif matter_type == "locations":
scanned_list = scan_in(self.world.location_map_coordinates, self.coordinates, hop, self.world.grid)
else:
scanned_list = []
scanned_list.extend(scan_in(self.world.particle_map_coordinates, self.coordinates, hop, self.world.grid))
scanned_list.extend(scan_in(self.world.tile_map_coordinates, self.coordinates, hop, self.world.grid))
scanned_list.extend(scan_in(self.world.location_map_coordinates, self.coordinates, hop, self.world.grid))
return scanned_list
def scan_for_particles_within(self, hop=1):
"""
self.__round_counter += number
Scans for particles on a given hop distance and all the matters within the hop distance
:todo: If nothing then everything should be scanned
def get_solution(self):
:param hop: The hop distance from the actual position of the scanning particle
:return: A list of the founded matters
"""
actual solution name
return scan_within(self.world.particle_map_coordinates, self.coordinates, hop, self.world.grid)
:return: actual solution name
def scan_for_particles_in(self, hop=1):
"""
return self.config_data.solution
Scanning for particles on a given hop distance
def get_amount_of_particles(self):
:param hop: The hop distance from thee actual position of the scanning particle
:return: A list of the founded matters
"""
Returns the actual number of particles in the world
:return: The actual number of Particles
return scan_in(self.world.particle_map_coordinates, self.coordinates, hop, self.world.grid)
def scan_for_tiles_within(self, hop=1):
"""
return len(self.particles)
Scans for tiles on a given hop distance and all the matters within the hop distance
:todo: If nothing then everything should be scanned
def get_particle_list(self):
:param hop: The hop distance from the actual position of the scanning particle
:return: A list of the founded matters
"""
Returns the actual number of particles in the world
:return: The actual number of Particles
return scan_within(self.world.tile_map_coordinates, self.coordinates, hop, self.world.grid)
def scan_for_tiles_in(self, hop=1):
"""
return self.particles
Scanning for tiles on a given hop distance
def get_particle_map_coordinates(self):
:param hop: The hop distance from thee actual position of the scanning particle
:return: A list of the founded matters
"""
Get a dictionary with all particles mapped with their actual coordinates
return scan_in(self.world.tile_map_coordinates, self.coordinates, hop, self.world.grid)
:return: a dictionary with particles and their coordinates
def scan_for_locations_within(self, hop=1):
"""
return self.particle_map_coordinates
Scans for particles, tiles, or location on a given hop distance and all the matters within the hop distance
:todo: If nothing then everything should be scanned
def get_particle_map_id(self):
:param hop: The hop distance from the actual position of the scanning particle
:return: A list of the founded matters
"""
Get a dictionary with all particles mapped with their own ids
:return: a dictionary with particles and their own ids
return scan_within(self.world.location_map_coordinates, self.coordinates, hop, self.world.grid)
def scan_for_locations_in(self, hop=1):
"""
return self.particle_map_id
Scanning for particles, tiles, or location on a given hop distance
def get_amount_of_tiles(self):
:param hop: The hop distance from thee actual position of the scanning particle
:return: A list of the founded matters
"""
Returns the actual number of particles in the world
return scan_in(self.world.location_map_coordinates, self.coordinates, hop, self.world.grid)
:return: The actual number of Particles
def take_me(self, coordinates):
"""
return len(self.tiles)
The particle is getting taken from the the other particle on the given coordinate
def get_tiles_list(self):
:param coordinates, the coordinates of the particle which takes this particle
:return: True: Successful taken; False: Cannot be taken or wrong Coordinates
"""
Returns the actual number of tiles in the world
:return: a list of all the tiles in the world
if not self.__isCarried:
if self.coordinates in self.world.particle_map_coordinates:
del self.world.particle_map_coordinates[self.coordinates]
self.__isCarried = True
self.coordinates = coordinates
if self.world.vis is not None:
self.world.vis.particle_changed(self)
return True
else:
return False
def drop_me(self, coordinates):