Solving Political Boundaries Through Simulation
Presentation
Adapted as a talk for the Boulder Python Meetup. Download archive here.
Press the space-bar to proceed to the next slide. See here for a brief tutorial
Expand Code
#!/usr/bin/env python3.6 | |
""" | |
Simulated Annealing and Genetic Algorithm solutions to districting problem. | |
Notes on implementation: | |
* I like properties and I use them in a couple spots... | |
* Use `pip install --user -r requirements.txt` on the requirements file | |
available in the root of this git repository. | |
* If by some strange happenstance you only have this file, go to the | |
following url to get the entire repo. | |
https://gitlab.com/thedataleek/politicalboundaries | |
* Test coverage is around 80% which I'm happy with. All the super important | |
things are tested. | |
TODO: Multithread | |
""" | |
# stdlib imports | |
import sys # exits and calls | |
import os # path manipulation | |
import argparse # argument handling | |
import math # exponent - faster than numpy for this | |
import random # built-in random func | |
import re # dynamically pull out algo names | |
import queue # used for SCC labelling | |
# typing | |
from typing import Union | |
# third-party imports | |
import numpy as np # heavy lifting | |
import matplotlib # visualization | |
matplotlib.use('agg') # switch backends for compatibility | |
import matplotlib.pyplot as plt # visualization | |
import matplotlib.animation as animation # animation | |
from moviepy import editor # More gif | |
from tqdm import tqdm # Progress bars are nice | |
from halo import Halo # Spinner | |
FIGSIZE = (4, 4) # For asset exporting | |
OUTDIR = './img' | |
INITIAL_COLORMAP = plt.get_cmap('bwr') | |
FILL_COLORMAP = plt.get_cmap('nipy_spectral') | |
DISTRICT_COLORMAP = plt.get_cmap('tab10') | |
STICKY_NUM = 100 | |
TARGET_VALUE = 35 | |
def main(): | |
args = get_args() | |
system = System(args.filename) | |
if args.numdistricts is None: | |
args.numdistricts = len(system.matrix[0]) | |
if args.full: | |
generate_report_assets(system, args.numdistricts, args.precision, True) | |
simulated_annealing(system, args.numdistricts, args.precision, True, True) | |
genetic_algorithm(system, args.numdistricts, args.precision, True, True) | |
elif args.report: | |
generate_report_assets(system, args.numdistricts, args.precision, args.gif) | |
elif args.annealing: | |
simulated_annealing(system, args.numdistricts, args.precision, | |
args.animate, args.gif) | |
elif args.genetic: | |
genetic_algorithm(system, args.numdistricts, args.precision, | |
args.animate, args.gif) | |
else: | |
print('Running in Demo Mode!!!') | |
print('First we\'ll use Simulated Annealing') | |
simulated_annealing(system, args.numdistricts, args.precision, | |
False, False) | |
print('Now we\'ll try the Genetic Algorithm') | |
genetic_algorithm(system, args.numdistricts, args.precision, | |
False, False) | |
def simulated_annealing(system, numdistricts, precision, animate, makegif): | |
""" | |
Perform simulated annealing on our system with a series of progressively | |
improving solutions. | |
""" | |
solution = get_good_start(system, numdistricts) | |
history = [solution] # Keep track of our history | |
k = 0.1 # Larger k => more chance of randomly accepting | |
Tvals = np.linspace(1, 1e-15, precision, | |
dtype=np.float128) | |
cval = solution.value | |
iterations_since_increase = 0 | |
print(f'Running Simulated Annealing with k={k:0.03f}, alpha={1.0 / precision:0.05f}') | |
print(f'num_iterations={len(Tvals)}') | |
for i, T in tqdm(enumerate(Tvals), total=len(Tvals)): | |
new_solution = solution.copy() # copy our current solution | |
new_solution.mutate() # Mutate the copy | |
dv = new_solution.value - cval # Look at delta of values | |
# If it's better, or random chance, we accept it | |
if dv > 0 or random.random() < math.exp(dv / (k * T)): | |
solution = new_solution | |
cval = solution.value | |
history.append(new_solution) | |
if dv > 0: | |
iterations_since_increase = 0 | |
else: | |
iterations_since_increase += 1 | |
if ((iterations_since_increase > STICKY_NUM) and | |
(cval >= TARGET_VALUE)): | |
print('Hit a ceiling, aborting algorithm.') | |
break | |
solution.count = len(Tvals) | |
solution.algo = 'Simulated Annealing' | |
print(solution) | |
print(solution.summary()) | |
plt.figure(figsize=FIGSIZE) | |
plt.plot(np.arange(len(history)), | |
[s.value for s in history]) | |
plt.title('Simulated Annealing Convergence') | |
plt.xlabel('Iteration Count') | |
plt.ylabel('Value') | |
plt.savefig(os.path.join(OUTDIR, 'simulated_annealing_values.png')) | |
if animate: | |
animate_history(system.filename, system.matrix, | |
history, solution.numdistricts, | |
makegif) | |
def get_good_start(system, numdistricts): | |
""" | |
Basically, instead of starting with a really bad initial solution for | |
simulated annealing sometimes we can rig it to start with a decent one... | |
""" | |
print('Acquiring a good initial solution') | |
solution = Solution(system, numdistricts) | |
solution.generate_random_solution() # start with random solution | |
for i in tqdm(range(500)): | |
new_solution = Solution(system, numdistricts) | |
new_solution.generate_random_solution() | |
if new_solution.value > solution.value: | |
solution = new_solution | |
print(f'Starting with Solution[{solution.value}]') | |
return solution | |
def genetic_algorithm(system, numdistricts, precision, animate, makegif): | |
""" | |
Use a genetic algorithm to find a good solution to our district problem | |
""" | |
def get_top_3(solutions): | |
solutions.sort(key=lambda s: -s.value) | |
return solutions[:3] | |
# Start with random initial solution space (3) | |
solutions = [Solution(system, numdistricts) for _ in range(100)] | |
for s in solutions: | |
s.generate_random_solution() # Initialize our solutions | |
solutions = get_top_3(solutions) | |
top_history = [] # Keep history of our top solution from each "frame" | |
iterations_since_increase = 0 | |
value_history = [] | |
iteration_num = 0 | |
with Halo(text='Running Algorithm', spinner='dots') as spinner: | |
while True: | |
new_solutions = [] | |
for _ in range(10): # Create 10 children per frame | |
s1, s2 = np.random.choice(solutions, size=2) | |
# Randomly combine two parents | |
combined = s1.combine(s2) | |
# Mutate as well | |
combined.mutate() | |
new_solutions.append(combined) | |
# Combine everything, giving 13 total solutions | |
full_solutions = new_solutions + solutions | |
# Keep the top 3 for next generation | |
solutions = sorted([(s, s.value) for s in full_solutions], | |
key=lambda tup: -tup[1]) | |
value_history += [(iteration_num, s[1]) for s in solutions] | |
solutions = [_[0] for _ in solutions[:3]] | |
# Only record top from generation, and only if it's changed | |
if len(top_history) == 0 or solutions[0] != top_history[-1]: | |
top_history.append(solutions[0]) | |
spinner.text = f'Current Generation Top Solution: {str(solutions[0].value)}' | |
iterations_since_increase = 0 | |
else: | |
iterations_since_increase += 1 | |
if ((iterations_since_increase > STICKY_NUM) and | |
(top_history[-1].value >= TARGET_VALUE)): | |
print('Hit a ceiling, aborting algorithm.') | |
break | |
iteration_num += 1 | |
solution = top_history[-1] | |
solution.count = precision | |
solution.algo = 'Genetic Algorithm' | |
print(solution) | |
print(solution.summary()) | |
value_history = np.array(value_history) | |
plt.figure(figsize=FIGSIZE) | |
plt.scatter(value_history[:, 0], value_history[:, 1], alpha=0.2, s=10) | |
plt.title('Genetic Algorithm Convergence') | |
plt.xlabel('Iteration Count') | |
plt.ylabel('Value') | |
plt.savefig(os.path.join(OUTDIR, 'genetic_algorithm_values.png')) | |
if animate: | |
animate_history(system.filename, system.matrix, | |
top_history, solution.numdistricts, | |
makegif) | |
def generate_report_assets(system, numdistricts, precision, makegif): | |
""" | |
Responsible for generating all plots and animations specific to the writeup. | |
In order this includes the following. | |
1. Basic initial voting areas | |
2. Random solution progression | |
3. Mutation demonstration | |
4. Genetic algorithm combination demonstration | |
""" | |
# First just plot initial map | |
plt.figure(figsize=FIGSIZE) | |
plt.imshow(system.matrix, interpolation='nearest', | |
cmap=INITIAL_COLORMAP) | |
plt.axis('off') | |
plt.title(system.filename) | |
plt.savefig(os.path.join(OUTDIR, system.filename.split('.')[0] + '_initial.png')) | |
# Now generate random solution | |
solution = Solution(system, numdistricts) | |
solution_history = solution.generate_random_solution(history=True) | |
animate_history(system.filename, system.matrix, | |
solution_history, solution.numdistricts, makegif, | |
algo_name='generate_random', | |
cmap=FILL_COLORMAP) | |
# Now show mutation | |
backup = solution.copy() | |
fig, axarr = plt.subplots(1, 3, figsize=FIGSIZE) | |
axarr[0].imshow(solution.full_mask, interpolation='nearest', | |
cmap=DISTRICT_COLORMAP) | |
axarr[0].axis('off') | |
axarr[0].set_title('Initial') | |
solution.mutate() | |
axarr[1].imshow(solution.full_mask, interpolation='nearest', | |
cmap=DISTRICT_COLORMAP) | |
axarr[1].axis('off') | |
axarr[1].set_title('Mutant') | |
axarr[2].imshow(np.abs(backup.full_mask - solution.full_mask), | |
interpolation='nearest', | |
cmap=FILL_COLORMAP) | |
axarr[2].axis('off') | |
axarr[2].set_title('Difference') | |
plt.savefig(os.path.join(OUTDIR, 'mutation.png')) | |
# Now show combination | |
solution.full_mask[:] = 0 | |
solution.generate_random_solution() | |
fig, axarr = plt.subplots(2, 2, figsize=FIGSIZE) | |
axarr[0, 0].imshow(backup.full_mask, interpolation='nearest', | |
cmap=FILL_COLORMAP, | |
vmin=0, | |
vmax=solution.numdistricts) | |
axarr[0, 0].axis('off') | |
axarr[0, 0].set_title('Parent 1') | |
axarr[0, 1].imshow(solution.full_mask, interpolation='nearest', | |
cmap=FILL_COLORMAP, | |
vmin=0, | |
vmax=solution.numdistricts) | |
axarr[0, 1].axis('off') | |
axarr[0, 1].set_title('Parent 2') | |
child, history = backup.combine(solution, keep_history=True) | |
axarr[1, 1].imshow(child.full_mask, interpolation='nearest', | |
cmap=FILL_COLORMAP, | |
vmin=0, | |
vmax=solution.numdistricts) | |
axarr[1, 1].axis('off') | |
axarr[1, 1].set_title('Child') | |
sol = axarr[1, 0].imshow(history[0].full_mask, interpolation='nearest', | |
cmap=FILL_COLORMAP, | |
vmin=0, | |
vmax=child.numdistricts) | |
axarr[1, 0].axis('off') | |
axarr[1, 0].set_title('Step by Step') | |
def update(i): | |
sol.set_data(history[i].full_mask) | |
return sol, | |
ani = animation.FuncAnimation(fig, update, len(history), | |
interval=500, blit=True) | |
filename = 'combine' | |
ani.save(os.path.join(OUTDIR, filename + '.mp4')) | |
editor.VideoFileClip(os.path.join(OUTDIR, filename + '.mp4'))\ | |
.write_gif(os.path.join(OUTDIR, filename + '.gif')) | |
# Now show the difference in k for simulated annealing | |
plt.figure(figsize=(6, 6)) | |
Tvals = np.arange(1, 1e-12, -1.0 / precision) | |
dv = -1 | |
determine_k = lambda T, k: np.exp(dv / (k * T)) | |
for k in np.linspace(0.01, 1, 100): | |
plt.plot(Tvals[::-1], determine_k(Tvals, k)) | |
plt.xlabel('Algorithm Iteration') | |
plt.ylabel('Chance of Accepting') | |
plt.title(r'Effect of Differing $k$') | |
plt.savefig(os.path.join(OUTDIR, 'kvals.png')) | |
def animate_history(filename, systemdata, history, numdistricts, makegif, algo_name=None, cmap=None): | |
""" | |
Take our given solution history, and animate it using matplotlib.animate. | |
Save to gif if asked. | |
""" | |
print('Saving Animation') | |
fig, axarr = plt.subplots(1, 2, figsize=FIGSIZE) | |
# Plot our "field" | |
systemplot = axarr[0].imshow(systemdata, interpolation='nearest', | |
cmap=INITIAL_COLORMAP) | |
axarr[0].axis('off') | |
# Plot our first solution | |
sol = axarr[1].imshow(history[0].full_mask, interpolation='nearest', | |
cmap=cmap or DISTRICT_COLORMAP, | |
vmin=0, | |
vmax=numdistricts) | |
axarr[1].set_title(f'value {history[0].value:0.03f}') | |
axarr[1].axis('off') | |
def update_plot(i, N): | |
"""Animation loop""" | |
sol.set_data(history[i].full_mask) | |
axarr[1].set_title(f'value {history[i].value:0.03f}') | |
plt.suptitle(f'Solution {i}') | |
return sol, | |
interval = 100 # milliseconds | |
ani = animation.FuncAnimation( | |
fig, | |
update_plot, | |
len(history), | |
fargs=(len(history) - 1,), | |
interval=interval, | |
blit=True | |
) | |
if not algo_name: | |
algo_name = re.sub(' ', '_', history[-1].algo.lower()) | |
filename = f'{algo_name}_solution_{filename.split(".")[0]}' | |
ani.save(os.path.join(OUTDIR, filename + '.mp4')) | |
if makegif: | |
editor.VideoFileClip(os.path.join(OUTDIR, filename + '.mp4'))\ | |
.write_gif(os.path.join(OUTDIR, filename + '.gif')) | |
# Save final solution as separate image | |
if history[-1].algo is not None: | |
plt.figure(figsize=FIGSIZE) | |
plt.imshow(history[-1].full_mask, interpolation='nearest', | |
cmap=DISTRICT_COLORMAP, | |
vmin=0, | |
vmax=numdistricts) | |
plt.title(history[-1].algo + ' Final Solution') | |
plt.axis('off') | |
plt.savefig(os.path.join(OUTDIR, filename + '.png')) | |
class Solution(object): | |
"""This is our unique solution class""" | |
def __init__(self, system, numdistricts): | |
self.system = system | |
self.numdistricts = numdistricts | |
if numdistricts is None: # If user doesn't specify | |
self.numdistricts = system.width | |
# Our solution is simply a numpy array | |
self.full_mask = np.zeros((system.height, system.width)) | |
self.algo = None | |
self.count = 0 | |
def __getitem__(self, key): | |
"""Allows us to easily index each district and get a Mask back""" | |
if key < 1 or key > self.numdistricts: | |
raise KeyError('District does not exist!') | |
else: | |
new_mask = Mask() # initialize new empty mask | |
# Set mask from district | |
new_mask.parse_list(self.get_solution(key)) | |
return new_mask | |
def __str__(self): | |
"""String version is just the string version of numpy array""" | |
return str(self.full_mask) | |
def __eq__(self, other): | |
return (self.full_mask == other.full_mask).all() | |
def __ne__(self, other): | |
return not (self == other) | |
def summary(self): | |
"""This is literally only here for the grading...""" | |
sep = (40 * '-') + '\n' | |
summary_string = '' | |
summary_string += sep | |
summary_string += f'Score: {self.value}\n' | |
summary_string += sep | |
total_size, percents = self.system.stats | |
summary_string += f'Total Population Size: {total_size}\n' | |
summary_string += sep | |
summary_string += 'Party Division in Population\n' | |
for k, v in percents.items(): | |
summary_string += f'{k}: {v:05f}\n' | |
summary_string += sep | |
majorities = {k:0 for k in self.system.names.keys()} | |
locations = [] | |
for i in range(1, self.numdistricts + 1): | |
majorities[self.system._name_arr[self.majority(i)]] += 1 | |
locations.append(self[i].location) | |
summary_string += 'Number of Districts with Majority by Party\n' | |
for k, v in majorities.items(): | |
summary_string += f'{k}: {v}\n' | |
summary_string += sep | |
summary_string += 'District Locations (zero-indexed, [y, x])\n' | |
for i, loc in enumerate(locations): | |
loc_string = ','.join(str(tup) for tup in loc) | |
summary_string += f'District {i + 1}:{loc_string}\n' | |
summary_string += sep | |
summary_string += f'Algorithm: {self.algo}\n' | |
summary_string += sep | |
summary_string += f'Valid Solution States Explored: {self.count}\n' | |
summary_string += sep | |
return summary_string[:-1] | |
def majority(self, i): | |
""" | |
Tell us who has majority in the specified district | |
""" | |
district = self.system.matrix[self[i].mask.astype(bool)] | |
if district.sum() > (len(district) / 2.0): | |
return 1 | |
else: | |
return 0 | |
def copy(self): | |
""" | |
So... Numpy uses memory instances of arrays, meaning you need to tell it | |
to actually copy the damn thing otherwise messing with the first will | |
mess with all of its successors | |
This was a bad bug... | |
""" | |
new_sol = Solution(self.system, self.numdistricts) | |
new_sol.full_mask = np.copy(self.full_mask) | |
return new_sol | |
def show(self, save=False, name='out.png'): | |
"""Debug function for individual plotting. Deprecated.""" | |
fig, axarr = plt.subplots(1, 2, figsize=FIGSIZE) | |
axarr[0].imshow(self.system.matrix, interpolation='nearest') | |
axarr[1].imshow(self.full_mask, interpolation='nearest') | |
axarr[1].set_title(f'Value: {self.value}') | |
axarr[0].axis('off') | |
axarr[1].axis('off') | |
if save: | |
plt.savefig(os.path.join(OUTDIR, name)) | |
else: | |
plt.show() | |
@property | |
def is_valid(self): | |
""" | |
A valid solution is one that covers everything. So we do two things | |
here, first of which is to make sure that no element in the mask is | |
zero, and second check that each district is valid. | |
""" | |
if (self.full_mask == 0).any(): | |
return False | |
for i in range(1, self.numdistricts + 1): | |
if not self[i].is_valid: | |
return False | |
return True | |
@property | |
def majorities(self): | |
""" | |
Tell us the number of districts with majority in each party | |
""" | |
majorities = {k:0 for k in self.system.names.keys()} | |
for i in range(1, self.numdistricts + 1): | |
majorities[self.system._name_arr[self.majority(i)]] += 1 | |
return majorities | |
@property | |
def value(self): | |
""" | |
This is our fitness function. | |
Here's what we're doing here | |
1. Make sure we have valid solution | |
2. Make sure that the population distribution matches the district | |
distribution within 10% | |
3. The value of a solution is just the sum of our district solutions | |
4. Each district has value equal to the absolute value difference | |
between party population sizes. For instance, a district with [R, D, D] | |
has value 2. | |
5. We also look for the optimal district size which is just | |
(width*height/numdistricts), and subtract 1 for every point off we are | |
from "optimal" | |
6. Lastly we say that independent voters are a fixed effect in "rogue" | |
districts, so every district with a rogue voter counts as 0.1 towards | |
the total. This can be seen as just the sum for the following district | |
[R, D, D] | |
sum([-0.9, 1, 1]) = 2.1 | |
""" | |
value = 0 | |
if not self.is_valid: # if we don't have a valid solution, return 0 | |
return 0 | |
# Make sure the number of districts tries to match population | |
# distribution within 10% | |
size, stats = self.system.stats | |
for k, v in self.majorities.items(): | |
if np.abs((float(v) / self.numdistricts) - stats[k]) >= 0.1: | |
return 0 | |
district_size = int(self.width * self.height / self.numdistricts) | |
# Sum up values of each district | |
for i in range(1, self.numdistricts + 1): | |
values = self.system.matrix[self[i].mask.astype(bool)] | |
if len(values) == 0: | |
value = 0 | |
return value | |
else: | |
# District value is simply abs(num_red - num_blue) | |
subvalue = np.abs(len(values[values == 0]) - len(values[values == 1])) | |
size_bonus = 0.25 * np.abs(len(values) - district_size) | |
if subvalue < len(values): | |
# For any non-uniform values, add 10% their value to account | |
# for independent voter turnout | |
subvalue += (len(values) - subvalue) * 0.1 | |
value += subvalue | |
value -= size_bonus | |
# Minimize neighbors (same as minimizing edge length) | |
value += -0.1 * len(self.get_district_neighbors(i)) | |
return value | |
def get_solution(self, i): | |
""" | |
Return array just showing district | |
If our full_mask looks like this | |
[[1, 1, 2], | |
[1, 2, 3], | |
[2, 2, 3]] | |
This function returns the following when i=2 | |
[[0, 0, 1], | |
[0, 1, 0], | |
[1, 1, 0]] | |
""" | |
return (self.full_mask == i).astype(int) | |
def get_random_openspot(self, value): | |
""" | |
Return a random location where our full mask is equal to value | |
If our full_mask is | |
[[1, 1, 2], | |
[1, 2, 3], | |
[2, 2, 3]] | |
self.get_random_openspot(1) could return any of | |
[[0, 0], [0, 1], [1, 0]] | |
""" | |
openspots = np.where(self.full_mask == value) | |
if len(openspots[0]) == 1: | |
choice = 0 | |
elif len(openspots[0]) == 0: | |
return None, None # if no spots exist, return None | |
else: | |
choice = np.random.randint(0, len(openspots[0]) - 1) | |
y = openspots[0][choice] | |
x = openspots[1][choice] | |
return y, x | |
def get_full_openspots(self, value): | |
""" | |
Instead of just returning one random openspot, return all of them. | |
If our full_mask is | |
[[1, 1, 2], | |
[1, 2, 3], | |
[2, 2, 3]] | |
self.get_full_openspots(1) will return (not necessarily sorted) | |
[[0, 0], [0, 1], [1, 0]] | |
""" | |
openspots = np.where(self.full_mask == value) | |
spots = [] | |
for i in range(len(openspots[0])): | |
spots.append((openspots[0][i], openspots[1][i])) | |
return spots | |
def get_neighbors(self, y, x): | |
""" | |
Get all neighbors of a point that fall within boundary | |
If our full_mask is | |
[[1, 1, 2], | |
[1, 2, 3], | |
[2, 2, 3]] | |
self.get_neighbors(0, 1) will return (not necessarily sorted) | |
[[0, 0], [1, 0], [1, 1], [1, 2], [0, 2]] | |
""" | |
neighbors = [(y + yi, x + xi) | |
for xi in range(-1, 2) | |
for yi in range(-1, 2) | |
if (0 <= y + yi < self.system.height) and | |
(0 <= x + xi < self.system.width) and | |
not (xi == 0 and yi == 0)] | |
return neighbors | |
def get_district_neighbors(self, i): | |
""" | |
Get all points on the edge of a district | |
If our full_mask is | |
[[1, 1, 2], | |
[1, 2, 3], | |
[2, 2, 3]] | |
self.get_district_neighbors(1) will return (not necessarily sorted) | |
[[2, 0], [2, 1], [1, 1], [1, 2], [0, 2]] | |
""" | |
y, x = self.get_random_openspot(i) | |
q = queue.Queue() | |
q.put((y, x)) | |
edges = [] | |
labels = np.zeros(self.full_mask.shape) | |
labels[y, x] = 1 | |
while not q.empty(): | |
y, x = q.get() | |
if self.full_mask[y, x] == i: | |
for yi, xi in self.get_neighbors(y, x): | |
if labels[yi, xi] == 0: | |
q.put((yi, xi)) | |
labels[yi, xi] = 1 | |
else: | |
edges.append((y, x)) | |
return edges | |
def get_filtered_district_neighbors(self, i, filter_list): | |
""" | |
Simply a handy filter on get_district_neighbors. Only includes values | |
that fall into the filter list | |
If our full_mask is | |
[[1, 1, 2], | |
[1, 2, 3], | |
[2, 2, 3]] | |
self.get_filtered_district_neighbors(1, [2]) will return (not necessarily | |
sorted) | |
[[2, 0], [2, 1], [1, 1], [0, 2]] | |
""" | |
return [(y, x) for y, x in self.get_district_neighbors(i) | |
if self.full_mask[y, x] in filter_list] | |
def fill(self, keep_history=False): | |
districts = list(range(1, self.numdistricts + 1)) | |
history = [] | |
while (self.full_mask == 0).any(): | |
try: | |
i = districts[random.randint(0, len(districts) - 1)] | |
except ValueError: | |
# So here's a neat bug... Sometimes if there's a zero in the | |
# corner, get filtered won't find it. So this code is here to | |
# forcibly fix this problem. | |
for j in range(1, self.numdistricts): | |
if len(self.get_filtered_district_neighbors(j, [0])) != 0: | |
districts = [j] | |
i = j | |
break | |
neighbors = self.get_filtered_district_neighbors(i, [0]) | |
if len(neighbors) == 0: | |
districts.remove(i) | |
else: | |
y, x = neighbors[random.randint(0, len(neighbors) - 1)] | |
self.full_mask[y, x] = i | |
if keep_history: | |
history.append(self.copy()) | |
return history | |
def generate_random_solution(self, history=False): | |
""" | |
Generate a random solution by picking spawn points and filling around | |
them. | |
Solutions are not guaranteed to be equal in size, as if one gets boxed | |
off it will stay small... | |
""" | |
solution_history = [self.copy()] | |
for i in range(1, self.numdistricts + 1): | |
y, x = self.get_random_openspot(0) | |
self.full_mask[y, x] = i | |
if history: | |
solution_history.append(self.copy()) | |
solution_history += self.fill(keep_history=history) | |
if history: | |
return solution_history | |
def mutate(self): | |
""" | |
Pick a random district, find a random neighbor, and if the other | |
district is at least size 2, replace the point with our district | |
""" | |
i = np.random.randint(1, self.numdistricts) | |
y, x = self.get_random_openspot(i) | |
if y is None: | |
raise IndexError('No open spots? Something is real bad') | |
traversed = set() | |
q = queue.Queue() | |
q.put((y, x)) | |
while not q.empty(): | |
y, x = q.get() | |
if (y, x) not in traversed: | |
traversed.add((y, x)) | |
if (self.full_mask[y, x] != i and | |
self[self.full_mask[y, x]].size > 1): | |
old_value = self.full_mask[y, x] | |
self.full_mask[y, x] = i | |
if not self.is_valid: # make sure new mutation is valid | |
# If not, reset and start over | |
self.full_mask[y, x] = old_value | |
else: | |
break | |
for ii, jj in self.get_neighbors(y, x): | |
q.put((ii, jj)) | |
def combine(self, other_solution, keep_history=False): | |
""" | |
Look at both solutions, alternate between them randomly, and try to | |
basically inject one side at a time. Afterwards fill the gaps in with | |
fill() | |
""" | |
new_solution = Solution(self.system, self.numdistricts) | |
# Randomly order parents to choose from | |
pick_order = [self, other_solution] | |
random.shuffle(pick_order) | |
# Randomly order districts to choose from | |
districts = list(range(1, self.numdistricts + 1)) | |
random.shuffle(districts) | |
cursor = 0 # alternates between parents | |
history = [new_solution.copy()] | |
# place districts | |
for i in districts: | |
parent_locations = pick_order[cursor][i].location | |
open_locations = new_solution.get_full_openspots(0) | |
district = Mask() | |
# We make every child valid | |
district.parse_locations(self.height, self.width, | |
[(y, x) for y, x in parent_locations | |
if ((y, x) in open_locations)]) | |
district.make_valid() | |
for y, x in district.location: | |
new_solution.full_mask[y, x] = i | |
cursor ^= 1 | |
if keep_history: | |
history.append(new_solution.copy()) | |
# Fill | |
for i in range(1, self.numdistricts + 1): | |
y, x = new_solution.get_random_openspot(i) | |
if y is None: | |
y, x = new_solution.get_random_openspot(0) | |
new_solution.full_mask[y, x] = i | |
if keep_history: | |
history.append(new_solution.copy()) | |
history += new_solution.fill(keep_history=True) | |
if random.random() < 0.1: | |
new_solution.mutate() | |
history.append(new_solution.copy()) | |
if keep_history: | |
return new_solution, history | |
return new_solution | |
@property | |
def height(self): | |
return self.full_mask.shape[0] | |
@property | |
def width(self): | |
return self.full_mask.shape[1] | |
class System(object): | |
""" | |
Solely for reading in the file and keeping track of where things are | |
""" | |
def __init__(self, filename): | |
self.filename = filename | |
self.matrix = None | |
self.names = dict() | |
self.num_names = 0 | |
self._read_file() | |
def __getitem__(self, key): | |
""" | |
Again, lets us access with self[i], and just return every index where | |
our matrix is equal to 'D' or 'R' | |
""" | |
if key not in list(self.names.keys()): | |
raise KeyError(f'{key} does not exist') | |
raw_spots = np.where(self.matrix == self.names[key]) | |
spots = [] | |
for i in range(len(raw_spots[0])): | |
spots.append([raw_spots[0][i], raw_spots[1][i]]) | |
return spots | |
@property | |
def width(self): | |
"""Just the width of the system""" | |
return self.matrix.shape[1] | |
@property | |
def height(self): | |
"""Just the height of the system""" | |
return self.matrix.shape[0] | |
@property | |
def _name_arr(self): | |
"""Internal use, in order list of names ['D', 'R'] probably""" | |
return [_[0] for _ in | |
sorted(self.names.items(), | |
key=lambda tup: tup[1])] | |
@property | |
def stats(self): | |
"""For grading, returns size of system, percent of each party""" | |
size = self.width * self.height | |
percents = {} | |
for k in self.names.keys(): | |
percents[k] = len(self[k]) / float(size) | |
return size, percents | |
def _read_file(self): | |
""" | |
We read in the file here. The input file needs to be of a very specific | |
format, where there are m rows and n columns, with fields separated by a | |
space. | |
D R D R D R R R | |
D D R D R R R R | |
D D D R R R R R | |
D D R R R R D R | |
R R D D D R R R | |
R D D D D D R R | |
R R R D D D D D | |
D D D D D D R D | |
""" | |
width = 0 | |
height = 0 | |
system = [] | |
with open(self.filename, 'r') as fileobj: | |
i = 0 | |
for line in [re.sub('\n', '', _) for _ in fileobj.readlines()]: | |
items = line.split(' ') | |
system.append(items) | |
width = len(items) | |
i += 1 | |
height = i | |
self.matrix = np.zeros((height, width)) | |
for i in range(height): | |
for j in range(width): | |
try: | |
num = self.names[system[i][j]] | |
except KeyError: | |
self.names[system[i][j]] = self.num_names | |
self.num_names += 1 | |
self.matrix[i, j] = self.names[system[i][j]] | |
def empty_state(self): | |
"""Return an empty version of the system. Deprecated.""" | |
return np.zeros(self.matrix.shape) | |
class Mask(object): | |
""" | |
This is the class that tracks each solution | |
Solutions are easy, as they're in the form of a bitmask | |
""" | |
def __init__(self, height=0, width=0): | |
self.mask = np.zeros((height, width)) | |
self.width, self.height = width, height | |
def __str__(self): | |
"""Numpy string version of array""" | |
return str(self.mask) | |
def __eq__(self, other: Union['Mask', np.ndarray]): | |
"""Tells us if two masks are the same. Used in test code""" | |
if isinstance(other, Mask): | |
return np.array_equal(self.mask, other.mask) | |
elif isinstance(other, np.ndarray): | |
return np.array_equal(self.mask, other) | |
else: | |
raise ValueError('Invalid Types Supplied') | |
@property | |
def size(self): | |
"""Number of elements in mask""" | |
return self.mask.sum() | |
def parse_list(self, listvals): | |
"""given some entry list, set our mask to be those vals""" | |
self.mask = np.array(listvals) | |
self.height, self.width = self.mask.shape | |
def parse_locations(self, height, width, locations): | |
self.mask = np.zeros((height, width)) | |
self.height = height | |
self.width = width | |
for y, x in locations: | |
self.mask[y, x] = 1 | |
def make_valid(self): | |
""" | |
Makes the mask valid, remains the same if already valid | |
Keeps a random connected component | |
""" | |
if not self.is_valid: | |
curlab, labels = self.get_labels() | |
num_components = labels.max() | |
keep = random.randint(1, num_components) | |
spots = np.where(labels != keep) | |
for i in range(len(spots[0])): | |
y, x = spots[0][i], spots[1][i] | |
self.mask[y, x] = 0 | |
assert self.is_valid # CAUSE IM SCRED | |
@property | |
def location(self): | |
""" | |
List of locations where mask == 1, returns (y, x) pairs | |
""" | |
raw_spots = np.where(self.mask == 1) | |
spots = [] | |
for i in range(len(raw_spots[0])): | |
spots.append((raw_spots[0][i], raw_spots[1][i])) | |
return spots | |
def get_labels(self): | |
""" | |
Valid masks have a single connected component. | |
https://en.wikipedia.org/wiki/Connected-component_labeling | |
This is what inspired much of the other code, this pattern is repeated | |
throughout the code. | |
""" | |
curlab = 1 | |
labels = np.zeros(self.mask.shape) | |
q = queue.Queue() | |
def unlabelled(i, j): | |
return ((self.mask[i, j] == 1) and (labels[i, j] == 0)) | |
for i in range(self.height): | |
for j in range(self.width): | |
if unlabelled(i, j): | |
labels[i, j] = curlab | |
q.put((i, j)) | |
while not q.empty(): | |
y0, x0 = q.get() | |
neighbors = [(y0 + y, x0 + x) | |
for x in range(-1, 2) | |
for y in range(-1, 2) | |
if (0 <= y0 + y < self.height) and | |
(0 <= x0 + x < self.width) and | |
not (x == 0 and y == 0)] | |
for ii, jj in neighbors: | |
if unlabelled(ii, jj): | |
labels[ii, jj] = curlab | |
q.put((ii, jj)) | |
curlab += 1 | |
return curlab, labels | |
@property | |
def is_valid(self): | |
curlab, _ = self.get_labels() | |
if curlab > 2: | |
return False | |
else: | |
return True | |
def get_args(): | |
"""Get our arguments""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument('filename', metavar='F', type=str, nargs=1, | |
help='File to load') | |
parser.add_argument('-a', '--annealing', action='store_true', | |
default=False, | |
help='Use Simulated Annealing Algorithm?') | |
parser.add_argument('-g', '--genetic', action='store_true', | |
default=False, | |
help='Use Genetic Algorithm?') | |
parser.add_argument('-n', '--numdistricts', type=int, default=None, | |
help=('Number of districts to form. Defaults to the ' | |
'width of the system')) | |
parser.add_argument('-z', '--animate', action='store_true', default=False, | |
help='Animate algorithms?') | |
parser.add_argument('-p', '--precision', type=int, default=10000, | |
help=('Tweak precision, lower is less. ' | |
'In a nutshell, how many loops to run.')) | |
parser.add_argument('-r', '--report', action='store_true', default=False, | |
help='Generate all assets for the report') | |
parser.add_argument('-j', '--gif', action='store_true', default=False, | |
help='Generate gif versions of animations?') | |
parser.add_argument('-F', '--full', action='store_true', default=False, | |
help='Generate everything. Report assets, SA, and GA.') | |
args = parser.parse_args() | |
args.filename = args.filename[0] # We only allow 1 file at a time. | |
return args | |
if __name__ == '__main__': | |
sys.exit(main()) |
Writeup
In this writeup we’ll discuss two algorithms, simulated annealing and genetic algorithms, and show how they can be applied to the problem of drawing political boundaries while avoiding gerrymandering.
This writeup is available on GitHub, or my personal website.
Slides for this post are also available here
Table of Contents
- Table of Contents
- An Introduction to Simulated Annealing and Genetic Algorithms
- Drawing Political District Boundaries
- Using Provided Code
- Next Steps
An Introduction to Simulated Annealing and Genetic Algorithms
First let’s talk about our algorithms.
Simulated Annealing and Genetic Algorithms are both methods of finding solutions to problems through simulations. In a nutshell, basically testing a large amount of semi-random solutions, looking at random combinations of our solutions, and then keeping the best that we encounter.
The benefit of these algorithms is that we can relatively quickly approximate a good solution without much computation time. Our solution won’t be “the best” unless we get extraordinarily lucky, however it will be a good approximation.
What is a Fitness Function?
Let’s look at what the wikipedia page says on the matter.
A fitness function is a particular type of objective function that is used to summarise, as a single figure of merit, how close a given design solution is to achieving the set aims.
In other words, it’s a single number that basically tells us how “good” of a solution we have. For our genetic algorithm and simulated annealing approach we’ll want to maximize this number, thereby maximizing how “good” our solutions are.
What is Simulated Annealing?
Simulated annealing can be defined as follows.
- Generate a random solution
- Generate a “neighboring solution” to our generated solution
- Keep whichever is better, or (with decaying probability) take the new one regardless
- Go back to 2
Incomplete python code for this is below.
def simulated_annealing(g):
s, c = generate_solution(g)
T = 1
Tmin = 1e-9
alpha = 0.99
k = 1
i = 0
while T > Tmin:
sp, cp = generate_solution_neighbor(g, s, c)
DE = cp - c
if DE < 0:
s = sp
c = cp
elif random.random() < math.exp(-DE / (k * T)):
s = sp
c = cp
T *= alpha
i += 1
print(s, c, i)
What are Genetic Algorithms?
Genetic algorithms are very similar, and the algorithm can be defined as follows.
- Randomly generate an initial population of solutions
- Use our solution population to generate some large number of children (note, these children should inherit properties from their parents)
- Keep the best of our total population
- Go back to 2
Again, incomplete code is below.
for i in range(10):
print(f'Starting with {get_value(solutions)}')
new_solutions = gen_new(solutions)
print(f'Birthed {get_value(new_solutions)}')
full_solutions = solutions + new_solutions
solutions = get_top3(full_solutions)
print(f'Evolved to {get_value(solutions)}')
print('---')
Drawing Political District Boundaries
Now that we know what these monsters are, we can dig into how they can be applied to solving a system.
Let’s say we’re interested in determining how to section off a two-party system of voters into “equal” districts, for some definition of equal. Our system is defined in a provided file that simply denotes, for every index, the type of voter in that location. It looks like this
D R D R D R R R
D D R D R R R R
D D D R R R R R
D D R R R R D R
R R D D D R R R
R D D D D D R R
R R R D D D D D
D D D D D D R D
Which can be plotted for readability.
And our other (larger) state looks like the following.
This pattern will continue for the rest of the writeup, I’ll talk about (and show) the smaller version first, and then follow up with the larger version.
Procedure
So in the context of our problem, we can examine how the code actually works.
Overall Structure
The structure is as follows:
main
is responsible for running and calling everythinggenetic_algorithm
uses a genetic approach to solve the problemsimulated_annealing
likewise uses simulated annealing- Each solution uses a
System
instance which keeps track of the system state - Each solution is an instance of a
Solution
which also keeps track of its particular state - Each district is a
Mask
which provides an ease of access through abstraction.
▼ Mask : class
+__init__ : function
-__str__ : function
▼+get_labels : function
+unlabelled : function
+is_valid : function
+location : function
+make_valid : function
+overlap : function
+parse_list : function
+parse_locations : function
+size : function
▼ Solution : class
-__eq__ : function
-__getitem__ : function
+__init__ : function
-__ne__ : function
-__str__ : function
+combine : function
+copy : function
+fill : function
+generate_random_solution : function
+get_district_neighbors : function
+get_filtered_district_neighbors : function
+get_full_openspots : function
+get_neighbors : function
+get_random_openspot : function
+get_solution : function
+height : function
+is_valid : function
+majorities : function
+majority : function
+mutate : function
+show : function
+summary : function
+value : function
+width : function
▼ System : class
-__getitem__ : function
+__init__ : function
+_name_arr : function
+_read_file : function
+empty_state : function
+height : function
+stats : function
+width : function
▼+animate_history : function
+update_plot : function
▼+generate_report_assets : function
+update : function
+genetic_algorithm : function
+get_args : function
+get_good_start : function
+main : function
+simulated_annealing : function
▼ variables
FIGSIZE
OUTDIR
So basically how this works is we read in the file, creating a System
instance, and then create an empty Solution
instance to keep track of our changes.
Some Quick Notes
- There’s a lot of code here, and to be honest, it’s more than is really necessary. The reason for it however, is so that I can abstract away the more complicated parts and just say something like
solution.mutate()
instead of digging into how or why the mutation algorithm works. This abstraction is why the code to do simulated annealing is so terse, abstraction makes algorithms terse. - All indexing starts from the upper left point, which is at
(0, 0)
, and all indices are(y, x)
pairs. - There’s a hierarchy of the code, our system is represented by a
System
object, and each solution is represented by aSolution
object, which consists of severalMask
objects that provide a direct interface to each district. - I like properties, so if something looks like it might be doing something complicated under the hood (like the
value
function), it probably is.
Helpful Code
While solving this problem, there are a couple sub-problems that we need to consider and solve.
Finding Neighbors of a Point
Our first big problem is how we find neighbors of a single point. For any (y, x)
pair we can express its neighbors using the following algorithm.
- Iterate over range(-1, 2) for both x and y
-
For each loop, accept (y + yi, x + xi) if the following conditions hold:
- y + yi is within the range of the field
- x + xi is within our domain of the field
- xi and yi are not both equal to zero
In python, this is expressed as
neighbors = [(y0 + y, x0 + x)
for x in range(-1, 2)
for y in range(-1, 2)
if (0 <= y0 + y < self.height) and
(0 <= x0 + x < self.width) and
not (x == 0 and y == 0)]
Determining if a District is Valid
One of the problems we need to solve is to know if any given district we have is valid. In our case, valid simply means having a single connected component, which we can determine using connected component labelling. The easiest version of this (which is in the implementation) is just examining a single component at a time, and the algorithm is as follows (from wikipedia):
- Start from the first pixel in the image. Set “curlab” (short for “current label”) to 1. Go to (2).
- If this pixel is a foreground pixel and it is not already labelled, then give it the label “curlab” and add it as the first element in a queue, then go to (3). If it is a background pixel or it was already labelled, then repeat (2) for the next pixel in the image.
- Pop out an element from the queue, and look at its neighbours (based on any type of connectivity). If a neighbour is a foreground pixel and is not already labelled, give it the “curlab” label and add it to the queue. Repeat (3) until there are no more elements in the queue.
- Go to (2) for the next pixel in the image and increment “curlab” by 1.
Which is implemented in our code as the following.
class Mask(object):
...
def get_labels(self):
curlab = 1
labels = np.zeros(self.mask.shape)
q = queue.Queue()
def unlabelled(i, j):
return self.mask[i, j] == 1 and labels[i, j] == 0
for i in range(self.height):
for j in range(self.width):
if unlabelled(i, j):
labels[i, j] = curlab
q.put((i, j))
while not q.empty():
y0, x0 = q.get()
neighbors = [(y0 + y, x0 + x)
for x in range(-1, 2)
for y in range(-1, 2)
if (0 <= y0 + y < self.height) and
(0 <= x0 + x < self.width) and
not (x == 0 and y == 0)]
for ii, jj in neighbors:
if unlabelled(ii, jj):
labels[ii, jj] = curlab
q.put((ii, jj))
curlab += 1
return curlab, labels
...
Finding District Neighbors
Another huge problem is we have to find all neighbors of a given district. This is incredible similar to the Connected Component Labelling process above.
The basic algorithm is as follows.
- Get a random spot inside the given district
- Add this spot to a Queue
- Initialize an empty labelling array (as with connected component labelling)
- While the queue is not empty, get an new
(y, x)
pair. - If the point falls within the district, get all of the point’s neighbors, add them to the queue, and go back to (4)
- If the point does not fall into the district, add it to the list of district neighbors.
class Solution(object):
...
def get_district_neighbors(self, i):
y, x = self.get_random_openspot(i)
q = queue.Queue()
q.put((y, x))
edges = []
labels = np.zeros(self.full_mask.shape)
labels[y, x] = 1
while not q.empty():
y, x = q.get()
if self.full_mask[y, x] == i:
for yi, xi in self.get_neighbors(y, x):
if labels[yi, xi] == 0:
q.put((yi, xi))
labels[yi, xi] = 1
else:
edges.append((y, x))
return edges
...
Fitness Function
Taking a step back from the code and considering the real world, let’s think about what we’d ideally like to emphasize in a political districting system.
- We’d want districts to be homogeneous, i.e. each district is comprised of either all Republican or all Democrat voters.
- We want our district ratios to approximately match our population ratios. By this I mean, if we have 52% Republican voters in the general population, 52% of the districts should have a Republican majority, and vice-versa for the Democrat population.
- We’d want to avoid gerrymandering, which is the practice of shaping voting districts such that a certain political party has an unfair advantage. A real world example of the sort of district we’d like to avoid looks like this (which is in Texas)
- We want all districts to be around the same population size, i.e. there are an equal number (within reason) of voters in each district.
We can design our fitness function to meet these criteria. The final fitness function that we use emphasizes the following qualities in its assessment.
- Validity of solution
- Make sure the ratio of
R
toD
majority districts matches the ratio ofR
toD
in the general population. - Make sure each district is as homogeneous as possible
- Reduce the value of the district if its size isn’t close to the “ideal size”, which is
total_size / num_districts
. This is our attempt to reduce the “squiggliness” of a district, however it’s not perfect and squiggly districts still pop up. - We also take into account that in non-homogeneous districts voters that aren’t affiliated with the majority party might be swayed by targeted campaigns. To this effect we account each non-affiliated “zone” with a weight of -0.9 instead of -1.
- Finally, we can also minimize edge length as well as trying to keep each district the same size. This will result in hopefully ideal districts
class Solution(object):
...
@property
def value(self):
value = 0
if not self.is_valid: # if we don't have a valid solution, return 0
return 0
# Make sure the number of districts tries to match population
# distribution within 10%
size, stats = self.system.stats
for k, v in self.majorities.items():
if np.abs((float(v) / self.numdistricts) - stats[k]) >= 0.1:
return 0
district_size = int(self.width * self.height / self.numdistricts)
# Sum up values of each district
for i in range(1, self.numdistricts + 1):
values = self.system.matrix[self[i].mask.astype(bool)]
if len(values) == 0:
value = 0
return value
else:
# District value is simply abs(num_red - num_blue)
subvalue = np.abs(len(values[values == 0]) - len(values[values == 1]))
size_bonus = 0.25 * np.abs(len(values) - district_size)
if subvalue < len(values):
# For any non-uniform values, add 10% their value to account
# for independent voter turnout
subvalue += (len(values) - subvalue) * 0.1
value += subvalue
value -= size_bonus
# Minimize neighbors (same as minimizing edge length)
value += -0.1 * len(self.get_district_neighbors(i))
return value
...
Generating Random Solutions
This algorithm is very straightforward.
- Generate a number of “spawn points” equal to the number of districts.
- Fill.
The fill algorithm is also straightforward.
- Set a list of available districts.
- While there are any non-set points, pick a random district,
i
, from the list of available districts. - Get a list of all neighbors of the district, but filter to only 0-valued entries.
- If no such neighbors exist, remove this district from the list of available districts.
- Otherwise pick a neighbor at random and set it to
i
. - Loop back to (2).
class Solution(object):
...
def generate_random_solution(self, history=False):
solution_history = [self.copy()]
for i in range(1, self.numdistricts + 1):
y, x = self.get_random_openspot(0)
self.full_mask[y, x] = i
if history:
solution_history.append(self.copy())
solution_history += self.fill(keep_history=history)
if history:
return solution_history
def fill(self, keep_history=False):
districts = list(range(1, self.numdistricts + 1))
history = []
while (self.full_mask == 0).any():
try:
i = districts[random.randint(0, len(districts) - 1)]
except ValueError:
# So here's a neat bug... Sometimes if there's a zero in the
# corner, get filtered won't find it. So this code is here to
# forcibly fix this problem.
for j in range(1, self.numdistricts):
if len(self.get_filtered_district_neighbors(j, [0])) != 0:
districts = [j]
i = j
break
neighbors = self.get_filtered_district_neighbors(i, [0])
if len(neighbors) == 0:
districts.remove(i)
else:
y, x = neighbors[random.randint(0, len(neighbors) - 1)]
self.full_mask[y, x] = i
if keep_history:
history.append(self.copy())
return history
...
We can see how this looks for our two different state sizes.
Simulated Annealing
Now that we can do all of that, let’s talk about simulated annealing. The basic algorithm is as follows.
- Generate a random solution
- Generate a solution neighbor
- If the new solution is better than the old, set the current solution to the new one.
- If the solution is worse, but
random.random() < math.exp(dv / (k * T))
, wheredv
is the difference between solution values,k
is a set constant, andT
is the current iteration value, accept it.
def simulated_annealing(system, numdistricts, precision, animate, makegif):
solution = get_good_start(system, numdistricts)
history = [solution] # Keep track of our history
k = 0.8 # Larger k => more chance of randomly accepting
Tvals = np.arange(1, 1e-12, -1.0 / precision)
print('Running Simulated Annealing with k={:0.03f}, alpha={:0.05f}'\
.format(k, 1.0 / precision))
for i, T in tqdm(enumerate(Tvals), total=len(Tvals)):
new_solution = solution.copy() # copy our current solution
new_solution.mutate() # Mutate the copy
dv = new_solution.value - solution.value # Look at delta of values
# If it's better, or random chance, we accept it
if dv > 0 or random.random() < math.exp(dv / (k * T)):
solution = new_solution
history.append(new_solution)
To show how choice of k
effects the algorithm, we can plot this.
The entire process looks like this:
Which has the following final solution.
And for the large system,
Which has the following final solution.
Mutations
Much of simulated annealing rests on being able to find a valid neighboring solution to our current solution. We do this through a process I call “mutation”, which simply flips a zone from one district to another, based on some randomness and criteria to make the new solution valid.
The general algorithm can be thought of as follows.
- Find all district neighbors
- Pick a neighboring point at random.
- If the neighboring point’s district has at least size 2, set this neighboring point to our district.
- Otherwise, pick a different neighboring point.
class Solution(object):
...
def mutate(self):
i = np.random.randint(1, self.numdistricts)
y, x = self.get_random_openspot(i)
if y is None:
raise IndexError('No open spots? Something is real bad')
traversed = set()
q = queue.Queue()
q.put((y, x))
while not q.empty():
y, x = q.get()
if (y, x) not in traversed:
traversed.add((y, x))
if (self.full_mask[y, x] != i and
self[self.full_mask[y, x]].size > 1):
old_value = self.full_mask[y, x]
self.full_mask[y, x] = i
if not self.is_valid: # make sure new mutation is valid
# If not, reset and start over
self.full_mask[y, x] = old_value
else:
break
for ii, jj in self.get_neighbors(y, x):
q.put((ii, jj))
...
Which can be visualized as follows.
A Better Starting Point
Instead of naively choosing just one random solution to start with simulated annealing, we can instead generate many initial random solutions, and then pick the best for our algorithm.
This has the advantage of starting off from a better place, and theoretically have less “distance to climb”.
Genetic Algorithm
This algorithm is also straightforward, and is generally as follows.
- Generate a set of random “parent” solutions.
- From our parents, generate a large set of “children” solutions.
- Sort the entire population by their value.
- Set our parents to be the “best” of the current population, discard the rest.
- Go back to (2).
def genetic_algorithm(system, numdistricts, precision, animate, makegif):
# Start with random initial solution space (3)
solutions = [Solution(system, numdistricts) for _ in range(3)]
for s in solutions:
s.generate_random_solution() # Initialize our solutions
top_history = [] # Keep history of our top solution from each "frame"
for i in tqdm(range(precision)):
new_solutions = []
for _ in range(10): # Create 10 children per frame
s1, s2 = np.random.choice(solutions, size=2)
# Randomly combine two parents
new_solutions.append(s1.combine(s2))
# Combine everything, giving 13 total solutions
full_solutions = new_solutions + solutions
# Keep the top 3 for next generation
solutions = [_[0] for _ in
sorted([(s, s.value) for s in full_solutions],
key=lambda tup: -tup[1])[:3]]
# Only record top from generation, and only if it's changed
if len(top_history) == 0 or solutions[0] != top_history[-1]:
top_history.append(solutions[0])
The entire process looks like this:
Which has the following final solution.
And for the large system,
Which has the following final solution.
Combining Solutions
As simulated annealing relies on mutate()
to narrow down on a good solution, the genetic algorithm relies on combine()
to take two solutions and generate a “child” solution.
We can think of the code as follows.
- Shuffle our two parents in an array.
- Shuffle a list of districts.
- Set a cursor that points to the first parent in the array.
- Iterate through our districts with variable
i
- For the current district, find all points of the parent that our cursor is pointing to.
- Get all “open” (i.e. set to 0) points for our child solution
- For every point that matches between these two sets, make a new bitmask.
- If this bitmask is valid (i.e. one connected component), set all point in this child solution to our current district
- Otherwise, make the district valid and set the bits in the child solution
- Flip the cursor
The algorithm behind making a district valid is easy, if we have more than one connected component in a given district, pick one at random and discard the other connected components.
class Solution(object):
...
def combine(self, other_solution, keep_history=False):
"""
Look at both solutions, alternate between them randomly, and try to
basically inject one side at a time. Afterwards fill the gaps in with
fill()
"""
new_solution = Solution(self.system, self.numdistricts)
# Randomly order parents to choose from
pick_order = [self, other_solution]
random.shuffle(pick_order)
# Randomly order districts to choose from
districts = list(range(1, self.numdistricts + 1))
random.shuffle(districts)
cursor = 0 # alternates between parents
history = [new_solution.copy()]
for i in districts:
parent_locations = pick_order[cursor][i].location
open_locations = new_solution.get_full_openspots(0)
district = Mask()
# We make every child valid
district.parse_locations(self.height, self.width,
[(y, x) for y, x in parent_locations
if [y, x] in open_locations])
if not district.is_valid:
district.make_valid()
for y, x in district.location:
new_solution.full_mask[y, x] = i
cursor ^= 1
if keep_history:
history.append(new_solution.copy())
for i in range(1, self.numdistricts + 1):
y, x = new_solution.get_random_openspot(i)
if y is None:
y, x = new_solution.get_random_openspot(0)
new_solution.full_mask[y, x] = i
if keep_history:
history.append(new_solution.copy())
history += new_solution.fill(keep_history=True)
if random.random() < 0.1:
new_solution.mutate()
history.append(new_solution.copy())
if keep_history:
return new_solution, history
return new_solution
Which can be visualized as follows.
Final Thoughts
Both of these approaches can be applied to solve incredibly complex problems but much of their success hinges on the effectiveness of your fitness function.
We also realize that any given “final solution” is somewhat unique, or at the very least hard to obtain again. If we visualize our solution space as a two-dimensional plane, with each solution having some value, then a final solution that we find is merely one of these peaks, so a re-run will not necessarily yield the same solution again.
To get around this, we could theoretically run the code thousands of times, with different seeds each time, and then pick the best out of all runs.
Using Provided Code
If you’re a TA, this is straightforward! After installed the required libraries (again check the repository) just run
python3.6 ./politicalboundaries.py $FILE_TO_RUN
If you want to dig a little deeper, use the -h
flag to see what it can do, but here’s a short list as well.
- Use Simulated Annealing on the file
- Use the Genetic Algorithm on the file
- Set the number of districts for either solution type
- Set the precision (number of runs) for either algorithm
- Animate the solution process
- Create gifs of the solution process (otherwise just
.mp4
) - Generate report (
README.md
) assets. - Do all of the above in one go.
┬─[zoe@fillory:~/Dropbox/classwork/2016b/csci3202/hw5]
╰─>$ ./politicalboundaries.py -h
usage: politicalboundaries.py [-h] [-a] [-g] [-n NUMDISTRICTS] [-z] [-p PRECISION]
[-r] [-j] [-F]
F
positional arguments:
F File to load
optional arguments:
-h, --help show this help message and exit
-a, --annealing Use Simulated Annealing Algorithm?
-g, --genetic Use Genetic Algorithm?
-n NUMDISTRICTS, --numdistricts NUMDISTRICTS
Number of districts to form. Defaults to the width of
the system
-z, --animate Animate algorithms?
-p PRECISION, --precision PRECISION
Tweak precision, lower is less. In a nutshell, how
many loops to run.
-r, --report Generate all assets for the report
-j, --gif Generate gif versions of animations?
-F, --full Generate everything. Report assets, SA, and GA.
Next Steps
I want to do more for this project but I’m limited in the time I have. I do have a couple of ideas for next steps however.
- Parallelizing - Instead of just running simulations on a single thread, we could theoretically spin up a bunch of different threads and run simulations on them simultaneously, only keeping the best of all trials.
- Real Data - It would be amazing to take the approaches used in this writeup and apply it to real-world political data. A topic for a different blog post!