Concurrency & multi-threadding in python¶

  • In python, only 1 thread in process can execute byte-code at a time in that process.
  • In ThreadPoolExecutor, each thread can execute a certain number of byte-code after which it has to release the GIL lock so that other threads can be executed.
  • Each process has it's own interpreter and memory-space and this limitation is only for threads belonging to the same process.
  • In case of processes, 2 or more processes can run on the CPU simultaneously but they can't access any shared resource in the same way.
  • We need a mutex-lock to prevent race condition and enfore consistency and concurrency between processes.

What is GIL?¶

The Global Interpreter Lock (GIL) in Python, specifically in CPython (the standard implementation), is a mutex that allows only one thread to execute Python bytecode at a time within a single process. This is exactly why it is advised to use multi-threadding only when there's an I/O operation involved.

Using Threads¶

In [1]:
import time

def fn_takes_time(time_in_ms: int = 2):
    print("Before calling sleep function")
    time.sleep(time_in_ms)  # takes parameter in seconds
    print(f"After calling sleep for {time_in_ms * 1000} ms")
In [2]:
from threading import Thread

th = Thread(target = fn_takes_time, args = [1])
th.start()
print("Main thread executing")
Before calling sleep function
Main thread executing
After calling sleep for 1000 ms
In [3]:
th = Thread(target = fn_takes_time, args = [1])
th.start()
th.join()  # joins to the main thread, acts like everything in 1 thread
print("Main thread executing")
Before calling sleep function
After calling sleep for 1000 ms
Main thread executing
In [4]:
from concurrent.futures import ThreadPoolExecutor as executor

with executor(max_workers = 1) as ex:
    future = ex.submit(fn_takes_time, 1)
    if not future.done():
        print("Function running in background...")
    else:
        print("Function completed execution")

    # this line will not be executed until the future returns the result of computation
    print(f"Function returned = {future.result()}")
Before calling sleep function
Function running in background...
After calling sleep for 1000 ms
Function returned = None
In [5]:
some_shared_resource = 0
timings = [1, 2, 2, 1]

def increment(time_in_sec: int = 1) -> int:
    print("Before calling sleep function")
    global some_shared_resource
    some_shared_resource += time_in_sec
    time.sleep(time_in_sec)
    print(f"After calling sleep for {time_in_sec * 1000} ms")
    
    return some_shared_resource

Using ThreadPoolExecutor¶

  • CPU time: The total time the CPU actively spends executing a program's instructions, excluding waiting periods.
  • Wall time: The total elapsed real-world time from the start to the completion of a program, including computation, I/O waits, and other delays.

ThreadPoolExecutor.map creates a separate thread per argument, to run the function with each argument.

Using a single worker¶

In [6]:
%%time
some_shared_resource = 0

with executor(max_workers = 1) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")
Before calling sleep function
After calling sleep for 1000 ms
Before calling sleep function
Function returned = 1

After calling sleep for 2000 ms
Before calling sleep function
Function returned = 3

After calling sleep for 2000 ms
Before calling sleep function
Function returned = 5

After calling sleep for 1000 ms
Function returned = 6

CPU times: user 12.9 ms, sys: 0 ns, total: 12.9 ms
Wall time: 6.01 s

Using 2 workers¶

In [7]:
%%time
some_shared_resource = 0

with executor(max_workers = 2) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")
Before calling sleep function
Before calling sleep function
After calling sleep for 1000 ms
Before calling sleep function
Function returned = 3

After calling sleep for 2000 ms
Before calling sleep function
Function returned = 5

After calling sleep for 2000 ms
Function returned = 6

After calling sleep for 1000 ms
Function returned = 6

CPU times: user 10.6 ms, sys: 3.62 ms, total: 14.2 ms
Wall time: 3.01 s

Using 3 workers¶

In [8]:
%%time
some_shared_resource = 0

with executor(max_workers = 3) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")
Before calling sleep function
Before calling sleep function
Before calling sleep function
After calling sleep for 1000 ms
Before calling sleep function
Function returned = 5

After calling sleep for 1000 ms
After calling sleep for 2000 ms
Function returned = 6

After calling sleep for 2000 ms
Function returned = 6

Function returned = 6

CPU times: user 6.63 ms, sys: 4.29 ms, total: 10.9 ms
Wall time: 2.01 s

Using 4 workers¶

In [9]:
%%time
some_shared_resource = 0

with executor(max_workers = 4) as ex:
    future = ex.map(increment, timings)

    for result in future:
        print(f"Function returned = {result}\n")
Before calling sleep function
Before calling sleep function
Before calling sleep function
Before calling sleep function
After calling sleep for 1000 ms
Function returned = 6

After calling sleep for 1000 ms
After calling sleep for 2000 ms
Function returned = 6

After calling sleep for 2000 ms
Function returned = 6

Function returned = 6

CPU times: user 13.1 ms, sys: 482 µs, total: 13.5 ms
Wall time: 2.01 s

How does it work?¶

  • Each thread is assigned to a worker which runs for till a certain number of bytecode instructions is executed (controlled by sys.setswitchinterval() which defaults to 5ms) after which the GIL is released.
  • A new thread then locks the GIL and starts it's execution.

Using ProcessPoolExecutor¶

Task: Let's create a pool of several processes all of which computes the a part or the sum of $n$ numbers and adds it to a shared integer variable.

In [10]:
from multiprocessing import Value, Lock

"""
'i': Signed integer (equivalent to C’s int).
'I' Unsigned integer (equivalent to C’s unsigned int).
'h': Signed short integer.
'H': Unsigned short integer.
'l': Signed long integer.
'L': Unsigned long integer.
"""

result = Value("i", 0)  # Shared integer, initialized to 0
lock = Lock()  # Process-safe lock
In [11]:
def calculate(start: int, end: int, shared_integer, mutex_lock) -> int:
    if start > end:
        raise ValueError("Start cannot be greater than end")

    # assume each addition takes 1s of time
    n = end - start + 1
    time.sleep(n)

    with mutex_lock:
        # Protected by lock to avoid race conditions
        shared_integer.value += sum([(start + i) for i in range(n)])

    return shared_integer.value

Let's calculate the sum of fist 10 natural numbers using 3 processes.¶

In [12]:
%%time
from typing import Tuple
from concurrent.futures import ProcessPoolExecutor

partitions = [(1, 3), (4, 6), (7, 10)]

def calculate_sum(args: Tuple[int]) -> int:
    return calculate(args[0], args[1], result, lock)

with ProcessPoolExecutor(max_workers = len(partitions)) as executor:
    results = executor.map(calculate_sum, partitions)
    results = list(results)
    print(results)
[6, 21, 55]
CPU times: user 16.4 ms, sys: 16.4 ms, total: 32.8 ms
Wall time: 4.03 s

Using 1 process¶

In [13]:
%%time
result = Value("i", 0)  # Shared integer, initialized to 0
lock = Lock()  # Process-safe lock

with ProcessPoolExecutor(max_workers = 1) as executor:
    future = executor.submit(calculate_sum, (1, 10))
    print(future.result())
55
CPU times: user 8.43 ms, sys: 8.79 ms, total: 17.2 ms
Wall time: 10 s

Since the computation of the sum isn't based on the result of other threads, we can run each thread independent of one another.

Why Lambda Functions Fail with Shared Variables and Locks in ProcessPoolExecutor¶

In Python's multiprocessing module, ProcessPoolExecutor relies on pickling (serialization via the pickle module) to pass functions and their arguments to worker processes. However, using shared variables (e.g., multiprocessing.Value) or mutex locks (e.g., multiprocessing.Lock) inside a lambda function or directly in the parameter list of ProcessPoolExecutor.map can lead to errors due to pickling limitations.

Issues with Lambda Functions¶

Lambda functions (anonymous functions defined with lambda) cannot be pickled because they lack a stable name or module reference required by the pickle module. When used in ProcessPoolExecutor.map, such as in:

results = list(executor.map(lambda x: process_range(x, counter, lock), args))

a PicklingError occurs, typically with a message like:

PicklingError: Can't pickle <function <lambda> at ...>: attribute lookup <lambda> on __main__ failed

This happens because ProcessPoolExecutor tries to serialize the lambda function to send it to worker processes, but lambdas are not picklable.

Issues with Shared Variables and Locks in Parameter Lists¶

Shared variables (multiprocessing.Value) and mutex locks (multiprocessing.Lock) are designed to be shared through inheritance by creating them in the parent process and passing them to worker processes. While these objects are picklable (unlike lambdas), using them directly inside a lambda or incorrectly in the parameter list of ProcessPoolExecutor.map can still cause issues if the function itself is not picklable. For example:

counter = Value("i", 0)
lock = Lock()
results = list(executor.map(lambda x: some_function(x, counter, lock), args))

This fails with a PicklingError due to the lambda, not because of counter or lock. However, passing counter and lock correctly requires creating them in the parent process and including them in the argument list explicitly, as shown in a proper implementation.