Source code for acopy.solvers

# -*- coding: utf-8 -*-
import sys
import functools
import collections

from . import utils

[docs]@functools.total_ordering class Solution: """Tour for a graph. :param graph: a graph :type graph: :class:`networkx.Graph` :param start: starting node :param ant: ant responsible :type ant: :class:`~acopy.ant.Ant` """ def __init__(self, graph, start, ant=None): self.graph = graph self.start = start self.ant = ant self.current = start self.cost = 0 self.path = [] self.nodes = [start] self.visited = set(self.nodes) def __iter__(self): return iter(self.path) def __eq__(self, other): return self.cost == other.cost def __lt__(self, other): return self.cost < other.cost def __contains__(self, node): return node in self.visited or node == self.current def __repr__(self): easy_id = self.get_easy_id(sep=',', monospace=False) return '{}\t{}'.format(self.cost, easy_id) def __hash__(self): return hash(self.get_id())
[docs] def get_easy_id(self, sep=' ', monospace=True): nodes = [str(n) for n in self.get_id()] if monospace: size = max([len(n) for n in nodes]) nodes = [n.rjust(size) for n in nodes]
return sep.join(nodes)
[docs] def get_id(self): """Return the ID of the solution. The default implementation is just each of the nodes in visited order. :return: solution ID :rtype: tuple """ first = min(self.nodes) index = self.nodes.index(first)
return tuple(self.nodes[index:] + self.nodes[:index])
[docs] def add_node(self, node): """Record a node as visited. :param node: the node visited """ self.nodes.append(node) self.visited.add(node)
[docs] def close(self): """Close the tour so that the first and last nodes are the same."""
self._add_node(self.start) def _add_node(self, node): edge = self.current, node data = self.graph.edges[edge] self.path.append(edge) self.cost += data['weight'] self.current = node
[docs] def trace(self, q, rho=0): """Deposit pheromone on the edges. Note that by default no pheromone evaporates. :param float q: the amount of pheromone :param float rho: the percentage of pheromone to evaporate """ amount = q / self.cost for edge in self.path: self.graph.edges[edge]['pheromone'] += amount self.graph.edges[edge]['pheromone'] *= 1 - rho if not self.graph.edges[edge]['pheromone']:
self.graph.edges[edge]['pheromone'] = sys.float_info.min
[docs]class State: """Solver state. This class tracks the state of a solution in progress and is passed to each plugin hook. Specially it contains: ===================== ====================================== Attribute Description ===================== ====================================== ``graph`` graph being solved ``colony`` colony that generated the ants ``ants`` ants being used to solve the graph ``limit`` maximum number of iterations ``gen_size`` number of ants being used ``solutions`` solutions found this iteration ``best`` best solution found this iteration ``is_new_record`` whether the best is a new record ``record`` best solution found so far ``previous_record`` previously best solution ===================== ====================================== :param graph: a graph :type graph: :class:`networkx.Graph` :param list ants: the ants being used :param int limit: maximum number of iterations :param int gen_size: number of ants to use :param colony: source colony for the ants :type colony: :class:`~acopy.ant.Colony` """ def __init__(self, graph, ants, limit, gen_size, colony): self.graph = graph self.ants = ants self.limit = limit self.gen_size = gen_size self.colony = colony = None self.record = None self.previous_record = None self.is_new_record = False self._best = None @property def best(self): return self._best @best.setter def best(self, best): self.is_new_record = self.record is None or best < self.record if self.is_new_record: self.previous_record = self.record self.record = best
self._best = best
[docs]class Solver: """ACO solver. Solvers control the parameters related to pheromone deposit and evaporation. If top is not specified, it defaults to the number of ants used to solve a graph. :param float rho: percentage of pheromone that evaporates each iteration :param float q: amount of pheromone each ant can deposit :param int top: number of ants that deposit pheromone :param list plugins: zero or more solver plugins """ def __init__(self, rho=.03, q=1, top=None, plugins=None): self.rho = rho self.q = q = top self.plugins = collections.OrderedDict() if plugins: self.add_plugins(*plugins) def __repr__(self): return (f'{self.__class__.__name__}(rho={self.rho}, q={self.q}, ' f'top={})')
[docs] def solve(self, *args, **kwargs): """Find and return the best solution. Accepts exactly the same parameters as the :func:`~optimize` method. :return: best solution found :rtype: :class:`~Solution` """ best = None for solution in self.optimize(*args, **kwargs): best = solution
return best
[docs] def optimize(self, graph, colony, gen_size=None, limit=None): """Find and return increasingly better solutions. :param graph: graph to solve :type graph: :class:`networkx.Graph` :param colony: colony from which to source each :class:`~acopy.ant.Ant` :type colony: :class:`~acopy.ant.Colony` :param int gen_size: number of :class:`~acopy.ant.Ant` s to use (default is one per graph node) :param int limit: maximum number of iterations to perform (default is unlimited so it will run forever) :return: better solutions as they are found :rtype: iter """ # initialize the colony of ants and the graph gen_size = gen_size or len(graph.nodes) ants = colony.get_ants(gen_size) for u, v in graph.edges: graph.edges[u, v].setdefault('pheromone', 0) state = State(graph=graph, ants=ants, limit=limit, gen_size=gen_size, colony=colony) # call start hook for all plugins self._call_plugins('start', state=state) # find solutions and update the graph pheromone accordingly for __ in utils.looper(limit): solutions = self.find_solutions(state.graph, state.ants) # we want to ensure the ants are sorted with the solutions, but # since ants aren't directly comparable, so we interject a list of # unique numbers that satifies any two solutions that are equal data = list(zip(solutions, range(len(state.ants)), state.ants)) data.sort() solutions, __, ants = zip(*data) = solutions state.ants = ants self.global_update(state) # yield increasingly better solutions =[0] if state.is_new_record: yield state.record # call iteration hook for all plugins if self._call_plugins('iteration', state=state): break # call finish hook for all plugins
self._call_plugins('finish', state=state)
[docs] def find_solutions(self, graph, ants): """Return the solutions found for the given ants. :param graph: a graph :type graph: :class:`networkx.Graph` :param list ants: the ants to use :return: one solution per ant :rtype: list """
return [ant.tour(graph) for ant in ants]
[docs] def global_update(self, state): """Perform a global pheromone update. :param state: solver state :type state: :class:`~State` """ for edge in state.graph.edges: amount = 0 if solutions =[] else: solutions = for solution in solutions: if edge in solution.path: amount += self.q / solution.cost p = state.graph.edges[edge]['pheromone']
state.graph.edges[edge]['pheromone'] = (1 - self.rho) * p + amount
[docs] def add_plugin(self, plugin): """Add a single solver plugin. If plugins have the same name, only the last one added is kept. :param plugin: the plugin to add :type plugin: :class:`acopy.plugins.SolverPlugin` """
[docs] def add_plugins(self, *plugins): """Add one or more solver plugins.""" for plugin in plugins: plugin.initialize(self)
self.plugins[plugin.__class__.__qualname__] = plugin
[docs] def get_plugins(self): """Return the added plugins. :return: plugins previously added :rtype: list """
return self.plugins.values() def _call_plugins(self, hook, **kwargs): should_stop = False for plugin in self.get_plugins(): try: plugin(hook, **kwargs) except StopIteration: should_stop = True
return should_stop
[docs]class SolverPlugin: """Solver plugin. Solver plugins can be added to any solver to customize its behavior. Plugins are initialized once when added, once before the first solver iteration, once after each solver iteration has completed, and once after all iterations have completed. Implementing each hook is optional. """ #: unique name name = 'plugin' def __init__(self, **kwargs): self._params = kwargs def __repr__(self): params = ', '.join(f'{k}={v}'for k, v in self._params.items()) return f'<{self.__class__.__qualname__}({params})>' def __call__(self, hook, **kwargs): return getattr(self, f'on_{hook}')(**kwargs)
[docs] def initialize(self, solver): """Perform actions when being added to a solver. Though technically not required, this method should be probably be idempotent since the same plugin could be added to the same solver multiple times (perhaps even by mistake). :param solver: the solver to which the plugin is being added :type solver: :class:`acopy.solvers.Solver` """
self.solver = solver
[docs] def on_start(self, state): """Perform actions before the first iteration. :param state: solver state :type state: :class:`acopy.solvers.State` """
[docs] def on_iteration(self, state): """Perform actions after each iteration. :param state: solver state :type state: :class:`acopy.solvers.State` """
[docs] def on_finish(self, state): """Perform actions once all iterations have completed. :param state: solver state :type state: :class:`acopy.solvers.State` """