Speed Snakes - Leveraging Parallelism in Python

Zoë Farmer

Made with Jupyter and Reveal.js

Who am I

  • My name is Zoë Farmer
  • CU graduate with a BS in Applied Math and a CS Minor
  • I'm a co-coordinator of the Boulder Python Meetup
  • I'm a big fan of open source software
  • http://www.dataleek.io
  • @TheDataLeek, github.com/thedataleek, gitlab.com/thedataleek
  • Data Visualizer at Talus Analytics
In [37]:
Image('./talus-logo.png', height=300, width=300)
Out[37]:

Concurrency is not Parallelism

Python can do both

  • Lots of different ways to achieve either
  • Concurrency - A good example is async/await, this handles a lot of quick interactions that are staggered
  • Parallelism - os.fork() is a classic example

Concurrency

In [82]:
Image('gil.png', height=800, width=800)
Out[82]:

Parallelism

In [86]:
import os, signal, time
 
pid = os.fork()
print(pid)
if pid == 0:
    # Have the child do a thing
    print('child starting', end='... ')
    time.sleep(10)
    print('child ending')
else:
    # have the parent do a thing
    os.kill(pid, signal.SIGINT)
60724
0
child starting... child ending

Python's Multiprocessing

  • import multiprocessing
  • Start up new processes without os.fork()
  • Communicate with queues and pipes
  • Queues are super handy, they handle memory and locks and all those nasty things that you can read about more in The little book of Semaphores

Quick Example

In [23]:
def worker(q, worknum):
    print(f'Worker {worknum} Starting')
    for i in range(2):
        print(f'Worker {worknum} {q.get()}')
    print(f'Worker {worknum} Exiting')
for i in range(3):
    w = mp.Process(
        name='example_worker',
        target=worker,
        args=(q, i)
    ).start()
for i in range(100): q.put(f'foo {i}', block=False)
Worker 0 Starting
Worker 0 foo 0
Worker 1 Starting
Worker 1 foo 1
Worker 0 foo 2
Worker 1 foo 3
Worker 2 Starting
Worker 0 Exiting
Worker 1 Exiting
Worker 2 foo 4
Worker 2 foo 5
Worker 2 Exiting

Let's jump into a practical example

What's the goal?

  • Personal dev workflow:
    • I need to search for keywords in large codebases
    • find . -exec grep is slow
    • Can Python do better?

TL;DR without an IDE, finding arbitrary strings in a massive amount of text in a reasonably short amount of time.

Black Arrow

In [13]:
!ba "Boulder Python" -d /Users/zoe/projects
/Users/zoe/projects/Boulder Python/Speed Snakes.ipynb:44
	"* I'm a co-coordinator of the Boulder Python Meetup\n",
/Users/zoe/projects/Boulder Python/.ipynb_checkpoints/Speed Snakes-checkpoint.ipynb:44
	"* I'm a co-coordinator of the Boulder Python Meetup\n",
/Users/zoe/projects/Boulder Python/Speed Snakes.slides.html:11921
	<li>I'm a co-coordinator of the Boulder Python Meetup</li>
---------------
Files Searched: 418
Files Matched: 3
Lines Searched: 59372
Duration: 0.1414327621459961

Why is it this fast?

  • Multiprocessing
  • Native file access
    • Python uses c bindings for basic operations like reading files
  • Modern SSDs
    • Waaaaay faster than HDDs
    • No drive head bouncing around

Black Arrow's Architecture

  • Each block is a process
  • Each arrow is a queue (or multiple) png

Main Process - Start all the other processes

  1. start_indexer()
  2. start_workers()
  3. start_printer()
In [24]:
def main():
    args = get_args()
    processes, final_queue = ba.start_search(args)
    print_process = processes[-1]
    try:
        print_process.join()  # Wait main thread until printer is done
    except (KeyboardInterrupt, EOFError):  # kill all on ctrl+c/d
        [p.terminate() for p in processes]

File Indexing - what files exist?

  1. for every file recursively below
    1. add to the worker queue
In [ ]:
def index_worker(
    directories: List[str], ignore_re: RETYPE, workers: int, search_queue: mp.Queue, output: mp.Queue, block=False
) -> None:
    for dir in list(set(directories)):  # no duplicate input
        for subdir, _, files in os.walk(dir):
            for question_file in files:
                # we don't want to block, this process should be fastest
                search_queue.put(
                    subdir + "/" + question_file, block=block, timeout=10
                )  # faster than os.path.join
    for i in range(workers):
        search_queue.put("EXIT")  # poison pill workers

File Searching - do any files match?

  1. for every file pulled from the worker queue:
    1. search using grep for regex
    2. pass information forward to printer
In [ ]:
def file_searching_worker(
    regex: RETYPE, ignore_re: RETYPE, filename_re: RETYPE, replace: Union[str, None],
    search_queue: mp.Queue, output: mp.Queue,
) -> None:
    # https://stackoverflow.com/questions/566746/how-to-get-console-window-width-in-python
    rows, columns = os.popen("stty size", "r").read().split()
    # Max width of printed line is 3/4 column count
    maxwidth = int(3 * int(columns) / 4)
    while True:
        # we want to block this thread until we get search_queue
        name = search_queue.get()
        try:
            with open(name, "r") as ofile:
                flag = []
                for line in ofile:
                    if regex.search(line):
                        found_string = line.split("\n")[0]
                        if len(found_string) > maxwidth:
                            found_string = found_string[:maxwidth] + "..."
                        flag.append((i, found_string))
                if flag:
                    for value in flag:
                        output.put((name, value[0], value[1], regex))
        except:
            pass

Printing - print results

  1. print every result passed by workers
  2. print summary statistics
In [ ]:
def print_worker(
    start_time: float, worker_count: int, output: mp.Queue,
    final_queue: mp.Queue, pipemode: bool, editmode: bool,
) -> None:
    while True:
        statement = output.get()
        if len(statement) == 4:
            filename, linenum, matched, line = statement
            replace = None
        else:
            filename, linenum, matched, line, replace = statement
        final_queue.put(filename, linenum)
        print(
            "{}:{}\n\t{}".format(
                filename,
                color.fg256("#00ff00", linenum),
                insert_colour(matched, line, extra_str=replace),
            )
        )
        file_list.append((statement[1], statement[0]))
    final_queue.put("EXIT")

Implementation Details

Questions?