Sockets
Python
Communication réseau bas niveau — TCP, UDP, protocoles applicatifs et asyncio.
Modèle réseau TCP/IP
Les sockets opèrent à la couche Transport du modèle TCP/IP. Comprendre les couches permet de savoir quel outil utiliser selon le besoin.
| Couche | Protocoles | En Python |
|---|---|---|
| Application | HTTP, FTP, SSH, DNS, SMTP | requests, ftplib, paramiko… |
| Transport | TCP, UDP | socket ← ici |
| Internet | IP, ICMP | adresses IP, socket raw |
| Réseau | Ethernet, Wi-Fi | hors portée Python std |
Livraison garantie et ordonnée
Contrôle de flux et congestion
Usage : HTTP, SSH, BDD, fichiers
Pas de garantie de livraison
Très faible latence
Usage : DNS, jeux, vidéo/voix
TCP — 3-way handshake
Qu'est-ce qu'un socket ?
Un socket est un point de communication identifié par une adresse IP et un numéro de port. C'est le mécanisme de base que HTTP, FTP, SSH utilisent tous sous le capot.
import socket
# socket(famille, type)
# Familles :
# AF_INET = IPv4
# AF_INET6 = IPv6
# AF_UNIX = socket de domaine Unix (même machine)
# Types :
# SOCK_STREAM = TCP (flux, fiable)
# SOCK_DGRAM = UDP (datagramme, non fiable)
# TCP/IPv4 (le plus courant)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# UDP/IPv4
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Obtenir l'IP de la machine locale
hostname = socket.gethostname()
ip_locale = socket.gethostbyname(hostname)
# Résoudre un nom de domaine
ip = socket.gethostbyname("google.com")
print(ip) # 142.250.X.X
# Numéros de port standards
# 80=HTTP, 443=HTTPS, 22=SSH, 25=SMTP
# 1-1023 : ports réservés (root requis)
# 1024-65535 : ports libres
| Méthode | Rôle | Côté |
|---|---|---|
bind((host, port)) | Attacher à une adresse | Serveur |
listen(backlog) | Attendre des connexions | Serveur |
accept() | Accepter une connexion → (socket, addr) | Serveur |
connect((host, port)) | Se connecter au serveur | Client |
send(bytes) | Envoyer des données | Les deux |
recv(bufsize) | Recevoir des données | Les deux |
sendall(bytes) | Envoyer tout (boucle interne) | Les deux |
close() | Fermer la connexion | Les deux |
settimeout(sec) | Timeout sur les opérations | Les deux |
recv(n) peut retourner moins de n octets — jamais supposer que tout est arrivé d'un coup. Il faut boucler jusqu'à recevoir le nombre d'octets attendus.
Serveur TCP
bind + listen + accept
connect + send/recv
import socket
HOST = "127.0.0.1"
PORT = 9000
# Créer et configurer le socket serveur
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
# Réutiliser le port immédiatement après fermeture
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind((HOST, PORT))
srv.listen(5) # file d'attente de 5 connexions
print(f"Serveur à l'écoute sur {HOST}:{PORT}")
while True:
# accept() bloque jusqu'à une connexion entrante
conn, addr = srv.accept()
print(f"Connexion de {addr}")
with conn:
while True:
# Recevoir par morceaux de 1024 octets
data = conn.recv(1024)
if not data:
break # client déconnecté
print(f"Reçu : {data.decode()}")
# Écho — renvoyer ce qu'on reçoit
conn.sendall(data)
def recv_exact(sock: socket.socket, n: int) -> bytes:
"""Reçoit exactement n octets (boucle si nécessaire)."""
data = b""
while len(data) < n:
morceau = sock.recv(n - len(data))
if not morceau:
raise ConnectionError("Connexion fermée prématurément")
data += morceau
return data
# Avec makefile() — interface fichier sur un socket
with conn.makefile("rb") as f:
ligne = f.readline() # lire jusqu'à \n
SO_REUSEADDR est indispensable en développement — sans lui, relancer le serveur après un crash donne "Address already in use" pendant quelques secondes (état TIME_WAIT du TCP).
Client TCP
import socket
HOST = "127.0.0.1"
PORT = 9000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(10) # timeout de 10s
sock.connect((HOST, PORT))
print("Connecté au serveur")
# Envoyer un message (toujours en bytes)
message = "Bonjour serveur!"
sock.sendall(message.encode("utf-8"))
# Recevoir la réponse
reponse = sock.recv(1024)
print(f"Réponse : {reponse.decode('utf-8')}")
# Connexion automatiquement fermée avec with
import socket
def envoyer_message(host: str, port: int, msg: str) -> str:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5)
s.connect((host, port))
s.sendall(msg.encode("utf-8"))
# Recevoir jusqu'à la fin
reponse = b""
while True:
morceau = s.recv(4096)
if not morceau:
break
reponse += morceau
return reponse.decode("utf-8")
except socket.timeout:
raise TimeoutError(f"Pas de réponse après 5s")
except ConnectionRefusedError:
raise ConnectionError(f"Serveur {host}:{port} injoignable")
except OSError as e:
raise ConnectionError(f"Erreur réseau : {e}")
Serveur multi-clients
import socket
import threading
def gerer_client(conn: socket.socket, addr: tuple) -> None:
print(f"[+] Client {addr} connecté")
with conn:
while True:
try:
data = conn.recv(1024)
if not data:
break
reponse = traiter(data)
conn.sendall(reponse)
except ConnectionResetError:
break
print(f"[-] Client {addr} déconnecté")
def traiter(data: bytes) -> bytes:
# Traitement métier — écho ici
return data.upper()
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", 9000))
srv.listen()
print("Serveur démarré")
while True:
conn, addr = srv.accept()
# Un thread par client
t = threading.Thread(
target=gerer_client,
args=(conn, addr),
daemon=True
)
t.start()
import socket, select
# select() surveille plusieurs sockets à la fois
# sans threads — un seul thread gère N clients
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", 9000))
srv.listen()
srv.setblocking(False)
sockets_surveilles = [srv]
while True:
# Retourne les sockets prêts à lire
lisibles, _, _ = select.select(sockets_surveilles, [], [])
for s in lisibles:
if s is srv:
# Nouvelle connexion
conn, addr = srv.accept()
conn.setblocking(False)
sockets_surveilles.append(conn)
else:
# Données d'un client existant
data = s.recv(1024)
if data:
s.sendall(data.upper())
else:
sockets_surveilles.remove(s)
s.close()
Thread par client — simple, mais limité à quelques centaines de clients (overhead mémoire). select() — un seul thread, scalable, mais plus complexe. asyncio — la meilleure approche pour des milliers de clients.
UDP — sans connexion
UDP envoie des datagrammes sans établir de connexion. Plus rapide que TCP, mais sans garantie de livraison ni d'ordre. Idéal pour DNS, jeux en réseau, streaming vidéo.
import socket
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.bind(("0.0.0.0", 9001))
print("Serveur UDP prêt")
while True:
# recvfrom retourne (données, adresse_expéditeur)
data, addr = sock.recvfrom(1024)
print(f"{addr}: {data.decode()}")
# Répondre directement à l'expéditeur
sock.sendto(b"ACK", addr)
import socket
SERVER = ("127.0.0.1", 9001)
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(2)
# Pas de connect() — envoyer directement
sock.sendto(b"Bonjour UDP", SERVER)
try:
resp, addr = sock.recvfrom(1024)
print(f"Réponse : {resp.decode()}")
except socket.timeout:
print("Pas de réponse")
# UDP broadcast — envoyer à tout le sous-réseau
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.sendto(b"découverte", ("255.255.255.255", 9001))
| TCP | UDP | |
|---|---|---|
| Connexion | Établie (3-way handshake) | Aucune |
| Fiabilité | Garantie (retransmission) | Aucune (fire and forget) |
| Ordre | Garanti | Non garanti |
| Vitesse | Plus lent (overhead) | Plus rapide |
| Usage | HTTP, SSH, BDD, transfert fichiers | DNS, jeux, VoIP, streaming |
| API Python | connect/accept/send/recv | sendto/recvfrom |
Protocole applicatif
Un protocole applicatif définit comment les messages sont structurés et délimités sur le socket. Sans délimiteur, impossible de savoir où finit un message et où commence le suivant.
import socket
import struct
import json
def envoyer_message(sock: socket.socket, data: dict) -> None:
"""Envoie un dict JSON précédé de sa taille (4 octets)."""
payload = json.dumps(data).encode("utf-8")
# !I = big-endian unsigned int (4 octets)
longueur = struct.pack("!I", len(payload))
sock.sendall(longueur + payload)
def recevoir_message(sock: socket.socket) -> dict:
"""Reçoit un message length-prefix et le décode."""
# Lire les 4 premiers octets = longueur
header = recv_exact(sock, 4)
longueur = struct.unpack("!I", header)[0]
# Lire exactement longueur octets
payload = recv_exact(sock, longueur)
return json.loads(payload.decode("utf-8"))
def recv_exact(sock, n):
data = b""
while len(data) < n:
m = sock.recv(n - len(data))
if not m: raise ConnectionError()
data += m
return data
import socket, json
def envoyer_ligne(sock, data: dict) -> None:
"""JSON sur une ligne (sans newline dans les valeurs)."""
ligne = json.dumps(data, separators=(",", ":")) + "\n"
sock.sendall(ligne.encode("utf-8"))
def recevoir_ligne(sock) -> dict:
"""Lire jusqu'au \n."""
buffer = b""
while not buffer.endswith(b"\n"):
morceau = sock.recv(4096)
if not morceau:
raise ConnectionError("Connexion fermée")
buffer += morceau
return json.loads(buffer.decode("utf-8"))
# Format utilisé par : JSON Lines, certains protocoles IM
2 approches courantes : délimiteur fixe (\n, \r\n, \0) — simple mais les données ne peuvent pas contenir le délimiteur. Length-prefix — 4 octets de taille suivis du payload — plus robuste, supporte les données binaires.
asyncio & sockets
import asyncio
async def gerer_client(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter
) -> None:
addr = writer.get_extra_info("peername")
print(f"[+] {addr} connecté")
try:
while True:
data = await reader.read(1024)
if not data:
break
message = data.decode().strip()
print(f"{addr}: {message}")
reponse = f"Echo: {message}\n"
writer.write(reponse.encode())
await writer.drain() # vider le buffer
finally:
writer.close()
await writer.wait_closed()
print(f"[-] {addr} déconnecté")
async def main() -> None:
server = await asyncio.start_server(
gerer_client, "0.0.0.0", 9000
)
async with server:
print("Serveur asyncio démarré")
await server.serve_forever()
asyncio.run(main())
import asyncio
async def client() -> None:
reader, writer = await asyncio.open_connection(
"127.0.0.1", 9000
)
for msg in ["Bonjour", "Python", "asyncio"]:
writer.write(f"{msg}\n".encode())
await writer.drain()
reponse = await reader.readline()
print(f"Serveur : {reponse.decode().strip()}")
writer.close()
await writer.wait_closed()
asyncio.run(client())
asyncio.StreamReader/StreamWriter sont les abstractions haut niveau d'asyncio pour les sockets — ils gèrent la bufferisation, les timeouts et la fermeture propre.
SSL/TLS — chiffrement
import socket
import ssl
hostname = "www.google.com"
port = 443
# Contexte SSL avec vérification du certificat
context = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
print(f"Protocole : {ssock.version()}") # TLSv1.3
print(f"Chiffrement : {ssock.cipher()}")
# Requête HTTP/1.1 brute
requete = (
f"GET / HTTP/1.1\r\n"
f"Host: {hostname}\r\n"
f"Connection: close\r\n\r\n"
)
ssock.sendall(requete.encode())
reponse = b""
while chunk := ssock.recv(4096):
reponse += chunk
print(reponse[:200].decode(errors="replace"))
# Générer certificat auto-signé (développement)
# openssl req -x509 -newkey rsa:4096 \
# -keyout key.pem -out cert.pem \
# -days 365 -nodes -subj "/CN=localhost"
import socket, ssl
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain("cert.pem", "key.pem")
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", 9443))
srv.listen()
with context.wrap_socket(srv, server_side=True) as ssrv:
conn, addr = ssrv.accept()
with conn:
data = conn.recv(1024)
conn.sendall(data) # écho chiffré
Ne jamais utiliser ssl.PROTOCOL_TLSv1 ou TLS 1.1 — obsolètes et vulnérables. ssl.create_default_context() choisit automatiquement la version la plus récente supportée.
socketserver — serveurs prêts à l'emploi
import socketserver
class MonHandler(socketserver.BaseRequestHandler):
"""Appelé pour chaque nouvelle connexion."""
def handle(self) -> None:
# self.request = socket connecté
# self.client_address = (ip, port)
data = self.request.recv(1024).strip()
print(f"{self.client_address} : {data.decode()}")
self.request.sendall(data.upper())
# TCPServer standard (séquentiel)
with socketserver.TCPServer(("0.0.0.0", 9000), MonHandler) as srv:
srv.serve_forever()
# ThreadingTCPServer — un thread par client
with socketserver.ThreadingTCPServer(("0.0.0.0", 9000), MonHandler) as srv:
srv.serve_forever()
from http.server import HTTPServer, BaseHTTPRequestHandler
class MonServeurHTTP(BaseHTTPRequestHandler):
def do_GET(self) -> None:
corps = b"<h1>Bonjour!</h1>"
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.send_header("Content-Length", len(corps))
self.end_headers()
self.wfile.write(corps)
def log_message(self, *args): pass # désactiver les logs
with HTTPServer(("", 8080), MonServeurHTTP) as srv:
print("http://localhost:8080")
srv.serve_forever()
# Équivalent en ligne de commande :
# python -m http.server 8080
Debug & outils réseau
# Tester un port TCP
nc -zv 127.0.0.1 9000 # netcat
telnet 127.0.0.1 9000
# Voir les ports en écoute
ss -tlnp # Linux
netstat -an | grep LISTEN # macOS/Windows
# Capturer le trafic réseau
tcpdump -i lo port 9000 # Linux
wireshark # GUI multiplateforme
# Scanner des ports
nmap -p 8000-9000 127.0.0.1
import socket
# Vérifier si un port est ouvert
def port_ouvert(host: str, port: int, timeout: float = 1) -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
try:
s.connect((host, port))
return True
except (socket.timeout, ConnectionRefusedError):
return False
print(port_ouvert("google.com", 443)) # True
print(port_ouvert("127.0.0.1", 9000)) # True si serveur tourne
# Obtenir son IP locale
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80)) # pas de vrai trafic
ip_locale = s.getsockname()[0]
print(ip_locale)
Cheat sheet
Serveur TCP
| socket(AF_INET, SOCK_STREAM) | Créer socket TCP |
| setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) | Réutiliser port |
| bind(("0.0.0.0", port)) | Écouter toutes interfaces |
| listen(backlog) | File d'attente |
| conn, addr = accept() | Accepter connexion |
| recv(n) | Recevoir jusqu'à n octets |
| sendall(data) | Envoyer tout |
Client TCP
| socket(AF_INET, SOCK_STREAM) | Créer socket TCP |
| settimeout(sec) | Timeout |
| connect((host, port)) | Se connecter |
| sendall(msg.encode()) | Envoyer bytes |
| recv(n).decode() | Recevoir str |
| with socket(...) as s: | Fermeture auto |
UDP
| socket(AF_INET, SOCK_DGRAM) | Créer socket UDP |
| sendto(data, (host, port)) | Envoyer datagramme |
| data, addr = recvfrom(n) | Recevoir + adresse source |
| Pas de connect/accept | Sans connexion |
asyncio
| asyncio.start_server(cb, h, p) | Serveur async |
| asyncio.open_connection(h, p) | Client async |
| await reader.read(n) | Lire |
| writer.write(data) | Écrire |
| await writer.drain() | Vider buffer |
| await reader.readline() | Lire jusqu'à \n |