Threads &
Synchronisation
threading, locks, Queue, ThreadPoolExecutor — et pourquoi le GIL change tout.
Processus vs Threads
Un processus est un programme en cours d'exécution avec son propre espace mémoire. Un thread est une unité d'exécution à l'intérieur d'un processus — les threads d'un même processus partagent la même mémoire.
| Processus | Thread | |
|---|---|---|
| Mémoire | Isolée (copie) | Partagée |
| Création | Lente (~50 ms) | Rapide (~1 ms) |
| Communication | IPC (pipe, socket…) | Variables partagées |
| Crash isolé | Oui | Non — plante tout |
| GIL Python | Contourné ✓ | Bloqué ✗ (CPU) |
| Idéal pour | Calcul intensif (CPU) | I/O (réseau, fichiers) |
Les threads partagent la mémoire — c'est leur force (communication facile) et leur danger (modifications simultanées non contrôlées). Toute variable partagée entre threads doit être protégée.
Cycle de vie d'un thread
t = Thread(target=fn) — le thread est créé mais pas encore démarrét.start() — le thread entre en file d'attente du scheduler OStarget(*args). Peut être interrompu à tout moment par le scheduler.t.join() attend cette étape.Le GIL — Global Interpreter Lock
Le GIL est un verrou interne à CPython qui n'autorise qu'un seul thread à exécuter du bytecode Python à la fois. Il protège les structures internes de l'interpréteur, mais empêche le vrai parallélisme CPU.
Impact concret du GIL
Le GIL est propre à CPython (l'implémentation standard). Jython et IronPython n'ont pas de GIL. Python 3.13 introduit un mode expérimental "free-threaded" sans GIL.
# I/O système (réseau, fichiers, stdin)
response = requests.get("https://...") # ← GIL libéré
data = file.read() # ← GIL libéré
time.sleep(1) # ← GIL libéré
# Extensions C qui le libèrent explicitement
numpy_result = np.dot(a, b) # NumPy libère le GIL
# Calcul pur Python → GIL NON libéré
total = sum(range(10_000_000)) # bloque les autres threads
Règle pratique : threads Python = idéal pour I/O (requêtes HTTP, lecture fichiers, BDD). Pour du calcul CPU (traitement d'image, ML, maths), utiliser multiprocessing ou numpy.
Quand utiliser quoi ?
Créer et démarrer un thread
import threading
import time
def telecharger(url: str, duree: float) -> None:
"""Simule un téléchargement."""
print(f"[{threading.current_thread().name}] Début {url}")
time.sleep(duree) # GIL libéré ici
print(f"[{threading.current_thread().name}] Fin {url}")
# Créer les threads
t1 = threading.Thread(
target=telecharger,
args=("https://site-a.com", 2),
name="T-SiteA"
)
t2 = threading.Thread(
target=telecharger,
args=("https://site-b.com", 3),
name="T-SiteB"
)
# Démarrer (non bloquant)
t1.start()
t2.start()
# Attendre la fin des deux
t1.join()
t2.join()
print("Tous les téléchargements terminés")
# Durée totale ≈ 3s (pas 5s) — parallélisme I/O
Paramètres de Thread()
| Paramètre | Type | Rôle |
|---|---|---|
target | callable | Fonction à exécuter |
args | tuple | Arguments positionnels |
kwargs | dict | Arguments nommés |
name | str | Nom (debug, logs) |
daemon | bool | Thread daemon (voir §07) |
# Dans n'importe quelle fonction
t = threading.current_thread()
print(t.name) # "T-SiteA"
print(t.ident) # ID système
print(t.is_alive()) # True / False
# Lister tous les threads actifs
for t in threading.enumerate():
print(t.name, t.is_alive())
join() sans argument attend indéfiniment. Passer un timeout : t.join(timeout=5) puis vérifier t.is_alive() pour détecter un blocage.
Thread par héritage
Pour encapsuler état et comportement, on peut hériter de threading.Thread et surcharger la méthode run().
import threading
import time
import requests
class WorkerThread(threading.Thread):
def __init__(self, url: str) -> None:
# Toujours appeler super().__init__()
super().__init__(name=f"Worker-{url[:20]}")
self.url = url
self.resultat = None
self.erreur = None
def run(self) -> None:
"""Appelée par start() — NE PAS appeler directement."""
try:
resp = requests.get(self.url, timeout=10)
self.resultat = resp.json()
except Exception as e:
self.erreur = e
# Utilisation
urls = [
"https://api.example.com/users",
"https://api.example.com/products",
]
workers = [WorkerThread(url) for url in urls]
for w in workers: w.start()
for w in workers: w.join()
for w in workers:
if w.erreur:
print(f"Erreur {w.url}: {w.erreur}")
else:
print(f"OK {w.url}: {w.resultat}")
run() vs start() — ne jamais appeler run() directement : cela exécute la méthode dans le thread courant, sans créer de nouveau thread. Toujours utiliser start().
L'héritage est utile quand le thread a un état propre (stocker un résultat, un statut) ou des méthodes auxiliaires. Pour les cas simples, préférer Thread(target=fn) — plus concis.
class MoniteurThread(threading.Thread):
def __init__(self) -> None:
super().__init__(daemon=True)
self._stop_event = threading.Event()
def stop(self) -> None:
self._stop_event.set()
def run(self) -> None:
while not self._stop_event.is_set():
# Travail périodique
self._stop_event.wait(5) # sleep interruptible
Threads daemon
Un thread daemon est tué automatiquement quand le programme principal se termine. Les threads non-daemon bloquent la sortie du programme jusqu'à leur fin.
import threading, time
def tache_longue():
time.sleep(10)
print("Fin de la tâche longue")
# Thread NON-daemon (défaut)
# → programme attend 10s même si main() est terminé
t1 = threading.Thread(target=tache_longue)
# Thread DAEMON
# → tué immédiatement à la fin du programme
t2 = threading.Thread(target=tache_longue, daemon=True)
# Ou après création :
t2.daemon = True # avant start() uniquement
t2.start()
print("Main terminé — le daemon sera tué")
# Programme se ferme ici sans attendre t2
| Non-daemon (défaut) | Daemon | |
|---|---|---|
| Fin du programme | Attend la fin du thread | Tue le thread |
| Risque | Programme "pendu" | Données non sauvegardées |
| Usage typique | Tâches critiques | Moniteurs, heartbeats, logs |
Un thread daemon tué brutalement ne ferme pas ses fichiers, ne flush pas ses buffers, n'exécute pas ses blocs finally. Ne jamais l'utiliser pour des opérations qui nécessitent un nettoyage propre.
Race conditions
Une race condition survient quand deux threads accèdent simultanément à une ressource partagée et que le résultat dépend de l'ordre d'exécution — ce qui n'est jamais garanti.
import threading
compteur = 0
def incrementer(n: int) -> None:
global compteur
for _ in range(n):
compteur += 1 # ← PAS atomique !
t1 = threading.Thread(target=incrementer, args=(100_000,))
t2 = threading.Thread(target=incrementer, args=(100_000,))
t1.start(); t2.start()
t1.join(); t2.join()
print(compteur)
# Attendu : 200 000
# Obtenu : 173 842 (valeur variable !)
compteur += 1 n'est pas atomiqueOpérations qui semblent atomiques mais ne le sont pas
# ✗ Lecture-modification-écriture
x += 1
liste.append(element) # OK en Python (GIL)
# ✗ Test puis action (check-then-act)
if cle not in dico: # Thread 2 peut s'insérer ici
dico[cle] = valeur # entre le test et l'écriture
# ✗ Séquence de plusieurs opérations
total = lire_solde() # Thread 2 peut modifier
nouveau = total - 100 # entre ces deux lignes
ecrire_solde(nouveau)
En Python, certaines opérations sont atomiques grâce au GIL : list.append(), dict[key] = val, x = y (affectation simple). Mais toute séquence de plusieurs opérations ne l'est pas.
Lock & RLock
Un Lock (verrou) garantit qu'un seul thread peut exécuter une section critique à la fois. RLock est un verrou réentrant — un même thread peut l'acquérir plusieurs fois.
import threading
class CompteurSafe:
def __init__(self) -> None:
self._valeur = 0
self._lock = threading.Lock()
def incrementer(self) -> None:
with self._lock: # acquire() + release() auto
self._valeur += 1 # section critique
def valeur(self) -> int:
with self._lock:
return self._valeur # lecture protégée aussi
# Test
compteur = CompteurSafe()
threads = [
threading.Thread(target=lambda: [compteur.incrementer()
for _ in range(100_000)])
for _ in range(2)
]
for t in threads: t.start()
for t in threads: t.join()
print(compteur.valeur()) # Toujours 200 000 ✓
# Lock ordinaire → deadlock si même thread acquiert 2x
lock = threading.Lock()
lock.acquire()
lock.acquire() # ← DEADLOCK ! attend éternellement
# RLock → même thread peut l'acquérir N fois
rlock = threading.RLock()
def methode_a(self):
with self._rlock:
self.methode_b() # OK avec RLock
def methode_b(self):
with self._rlock: # même thread → pas de blocage
...
lock = threading.Lock()
# Tentative d'acquisition avec timeout
acquis = lock.acquire(timeout=2.0)
if acquis:
try:
# section critique
...
finally:
lock.release()
else:
print("Impossible d'acquérir le lock")
Toujours utiliser with lock: plutôt que acquire()/release() manuels — un release() oublié (après une exception) causerait un deadlock permanent.
Semaphore
Un Semaphore est un compteur qui limite le nombre de threads pouvant accéder simultanément à une ressource. Un Lock est un cas particulier (Semaphore de valeur 1).
import threading, time
# Autoriser au plus 3 accès simultanés à la BDD
sem_db = threading.Semaphore(3)
def requete_db(worker_id: int) -> None:
print(f"Worker {worker_id} : en attente...")
with sem_db: # attend si 3 threads dedans
print(f"Worker {worker_id} : connexion BDD")
time.sleep(1) # simule requête
print(f"Worker {worker_id} : terminé")
# 10 workers → max 3 en BDD à la fois
threads = [
threading.Thread(target=requete_db, args=(i,))
for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
BoundedSemaphore lève une exception si release() est appelé plus de fois que acquire() — utile pour détecter les bugs de synchronisation.
import threading, time
class RateLimiter:
"""Limite à N appels par seconde."""
def __init__(self, max_par_sec: int):
self.sem = threading.BoundedSemaphore(max_par_sec)
self.max = max_par_sec
def appel(self, fn, *args):
self.sem.acquire()
try:
return fn(*args)
finally:
threading.Timer(1.0, self.sem.release).start()
Event & Condition
Event — signal simple
Un Event est un drapeau booléen partagé. Les threads peuvent attendre qu'il soit levé avec wait().
pret = threading.Event()
def chargement():
time.sleep(2) # charge les données
pret.set() # signal "c'est prêt"
def traitement():
print("En attente des données...")
pret.wait() # bloque jusqu'à set()
print("Traitement en cours")
threading.Thread(target=chargement).start()
threading.Thread(target=traitement).start()
# event.clear() → remet à False
# event.is_set() → vérifie sans bloquer
# event.wait(timeout=5) → attente limitée
Condition — notification ciblée
Une Condition combine un Lock et la capacité de notifier des threads en attente — plus précise qu'un Event.
cond = threading.Condition()
donnees = []
def producteur():
for i in range(5):
with cond:
donnees.append(i)
cond.notify() # réveille 1 consommateur
time.sleep(0.5)
def consommateur():
while True:
with cond:
# wait() libère le lock et attend notify()
cond.wait_for(lambda: len(donnees) > 0)
item = donnees.pop(0)
print(f"Consommé : {item}")
Queue — Producteur / Consommateur
queue.Queue est la solution idiomatique pour la communication inter-threads. Elle est thread-safe de facto — pas besoin de Lock explicite.
Thread 1..N
maxsize=10
[ item1 | item2 | item3 ]
Thread 1..N
import threading
import queue
import time
# Queue bornée : bloque le prod si pleine
q: queue.Queue = queue.Queue(maxsize=10)
POISON_PILL = None # signal d'arrêt
def producteur(items: list) -> None:
for item in items:
q.put(item) # bloque si queue pleine
print(f"Produit : {item}")
q.put(POISON_PILL) # signal de fin
def consommateur() -> None:
while True:
item = q.get() # bloque si queue vide
if item is POISON_PILL:
q.task_done()
break
time.sleep(0.1) # traitement
print(f"Consommé : {item}")
q.task_done() # obligatoire après get()
prod = threading.Thread(
target=producteur, args=(["a", "b", "c", "d"],)
)
cons = threading.Thread(target=consommateur)
prod.start(); cons.start()
prod.join(); cons.join()
q.join() # attend que task_done() soit appelé pour chaque item
API Queue essentielle
| Méthode | Comportement |
|---|---|
q.put(item) | Ajoute — bloque si pleine (maxsize) |
q.put_nowait(item) | Ajoute — lève Full si pleine |
q.get() | Retire — bloque si vide |
q.get_nowait() | Retire — lève Empty si vide |
q.get(timeout=5) | Retire — lève Empty après 5s |
q.task_done() | Signale qu'un item est traité |
q.join() | Attend que tous les items soient traités |
q.qsize() | Nombre d'items (approximatif) |
q.empty() | True si vide (non fiable entre threads) |
Pour N consommateurs, envoyer N POISON_PILL — un par consommateur. Chaque consommateur retire une pilule et s'arrête.
ThreadPoolExecutor
concurrent.futures.ThreadPoolExecutor est l'API moderne et de haut niveau pour gérer un pool de threads. Elle gère automatiquement la création, la réutilisation et la récupération des erreurs.
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
urls = [
"https://api.github.com/users/python",
"https://api.github.com/users/django",
"https://api.github.com/users/flask",
]
def fetch(url: str) -> dict:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.json()
── map() : simple, résultats dans l'ordre ──
with ThreadPoolExecutor(max_workers=5) as pool:
resultats = list(pool.map(fetch, urls))
# résultats[i] correspond à urls[i]
── submit() : plus de contrôle, gestion erreurs ──
with ThreadPoolExecutor(max_workers=5) as pool:
futures = {pool.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
data = future.result()
print(f"✓ {url}: {data['login']}")
except Exception as e:
print(f"✗ {url}: {e}")
Future — objet résultat différé
| Méthode | Rôle |
|---|---|
f.result(timeout=N) | Attend et retourne le résultat (ou lève l'exception) |
f.done() | True si terminé (succès ou erreur) |
f.running() | True si en cours d'exécution |
f.cancel() | Annule si pas encore démarré |
f.exception() | Retourne l'exception sans la relancer |
import os
# I/O-bound : plus de workers que de CPUs
# (règle empirique : 4 × nb_CPUs)
nb_io = (os.cpu_count() or 1) * 4
# CPU-bound : jamais > nb de CPUs
# (préférer multiprocessing dans ce cas)
nb_cpu = os.cpu_count() or 1
# Défaut Python 3.8+ : min(32, cpu_count + 4)
multiprocessing — contourner le GIL
Pour les tâches CPU-bound, utiliser multiprocessing qui crée de vrais processus parallèles — chacun avec son propre GIL.
from concurrent.futures import ProcessPoolExecutor
import math
def est_premier(n: int) -> bool:
"""Calcul CPU intensif."""
if n < 2: return False
for i in range(2, int(math.sqrt(n)) + 1):
if n % i == 0: return False
return True
nombres = range(10_000_000, 10_000_100)
# ThreadPoolExecutor → séquentiel (GIL)
# ProcessPoolExecutor → vrai parallélisme
with ProcessPoolExecutor() as pool:
resultats = list(pool.map(est_premier, nombres, chunksize=10))
premiers = [n for n, p in zip(nombres, resultats) if p]
print(premiers)
Sur Windows, le code multiprocessing doit être dans un bloc if __name__ == "__main__": pour éviter la récursion infinie lors du spawn des processus.
| ThreadPoolExecutor | ProcessPoolExecutor | |
|---|---|---|
| GIL | Partagé → bloque CPU | Un par processus → libre |
| Mémoire | Partagée | Copiée (pickle) |
| Overhead | Faible | Élevé (fork/spawn) |
| I/O-bound | ✓ Idéal | Overkill |
| CPU-bound | ✗ Inutile | ✓ Idéal |
asyncio — concurrence coopérative
asyncio réalise de la concurrence dans un seul thread via un event loop. Les coroutines cèdent le contrôle volontairement avec await — pas de préemption, pas de race condition.
import asyncio
import aiohttp
async def fetch(session, url: str) -> dict:
# await cède le contrôle pendant l'I/O
async with session.get(url) as resp:
return await resp.json()
async def main() -> None:
urls = ["https://..."] * 10
async with aiohttp.ClientSession() as session:
# Lance toutes les coroutines "simultanément"
tasks = [fetch(session, url) for url in urls]
resultats = await asyncio.gather(*tasks)
return resultats
# Point d'entrée
asyncio.run(main())
asyncio est la solution la plus efficace pour des milliers de connexions réseau simultanées (serveurs, scrapers massifs). Pour 10–50 tâches I/O, ThreadPoolExecutor est souvent plus simple à mettre en œuvre.
| Concept | threading | asyncio |
|---|---|---|
| Modèle | Préemptif (OS) | Coopératif (event loop) |
| Threads | N threads réels | 1 thread |
| Race conditions | Possibles | Très rares |
| Scalabilité | Centaines | Milliers+ |
| Syntaxe | Standard | async/await |
| Libs requises | Standard | async-compatibles |
Pièges classiques
| Piège | Symptôme | Solution |
|---|---|---|
| Deadlock | Programme bloqué indéfiniment — deux threads attendent chacun le lock de l'autre | Toujours acquérir les locks dans le même ordre. Utiliser timeout. Préférer les structures thread-safe (Queue). |
| Race condition | Résultats incohérents, aléatoires, différents entre exécutions | Protéger toute variable partagée avec Lock. Préférer Queue pour la communication. |
| Thread oublié | Programme ne se termine pas (thread non-daemon bloqué) | Toujours join() les threads, ou les passer en daemon=True s'ils sont auxiliaires. |
| Exception silencieuse | Thread planté silencieusement, pas de stack trace visible | Capturer les exceptions dans run() et les stocker. Vérifier future.exception() avec Executor. |
| Variable partagée en closure | Tous les threads reçoivent la même valeur (la dernière) | Capturer la valeur par défaut : lambda i=i: ... ou utiliser args=(i,). |
| GIL ignoré | Threads CPU-bound plus lents qu'un seul thread (overhead) | Utiliser multiprocessing ou des bibliothèques qui libèrent le GIL (NumPy). |
# ✗ Tous les threads reçoivent i=4
threads = []
for i in range(5):
t = threading.Thread(target=lambda: print(i))
threads.append(t)
# ✓ Capturer i par valeur
for i in range(5):
t = threading.Thread(target=lambda n=i: print(n))
# ou : target=print, args=(i,)
# ✗ Deadlock : ordre différent d'acquisition
def thread_a():
with lock1: # acquiert lock1
with lock2: ... # attend lock2
def thread_b():
with lock2: # acquiert lock2
with lock1: ... # attend lock1 → DEADLOCK
# ✓ Même ordre partout
def thread_a():
with lock1:
with lock2: ...
def thread_b():
with lock1: # même ordre que thread_a
with lock2: ...
Cheat sheet
threading — Thread
| Thread(target, args) | Créer un thread |
| t.start() | Démarrer |
| t.join(timeout) | Attendre la fin |
| t.is_alive() | En cours ? |
| t.daemon = True | Thread daemon |
| current_thread() | Thread courant |
| enumerate() | Tous les threads actifs |
Synchronisation
| Lock() | Verrou exclusif |
| RLock() | Verrou réentrant |
| Semaphore(n) | N accès simultanés |
| Event() | Signal booléen |
| Condition() | Attente conditionnelle |
| with lock: | Acquire + release auto |
| event.wait() | Bloque jusqu'à set() |
Queue
| Queue(maxsize) | File thread-safe |
| q.put(item) | Ajouter (bloque si pleine) |
| q.get() | Retirer (bloque si vide) |
| q.task_done() | Marquer item traité |
| q.join() | Attendre tous les items |
| POISON_PILL = None | Signal d'arrêt |
concurrent.futures
| ThreadPoolExecutor(n) | Pool de threads |
| ProcessPoolExecutor() | Pool de processus (CPU) |
| pool.map(fn, iterable) | Map parallèle (résultats ordonnés) |
| pool.submit(fn, *args) | Soumettre → Future |
| as_completed(futures) | Itérer au fil des fins |
| future.result() | Résultat (ou relance exception) |