Réseaux & Web

Sockets
Python

Communication réseau bas niveau — TCP, UDP, protocoles applicatifs et asyncio.

Modèle réseau TCP/IP

Les sockets opèrent à la couche Transport du modèle TCP/IP. Comprendre les couches permet de savoir quel outil utiliser selon le besoin.

CoucheProtocolesEn Python
ApplicationHTTP, FTP, SSH, DNS, SMTPrequests, ftplib, paramiko…
TransportTCP, UDPsocket ← ici
InternetIP, ICMPadresses IP, socket raw
RéseauEthernet, Wi-Fihors portée Python std
TCP
Connexion établie (3-way handshake)
Livraison garantie et ordonnée
Contrôle de flux et congestion
Usage : HTTP, SSH, BDD, fichiers
UDP
Pas de connexion
Pas de garantie de livraison
Très faible latence
Usage : DNS, jeux, vidéo/voix

TCP — 3-way handshake

Client
Serveur

connect()
──SYN──▶
listen()
◀─SYN-ACK─
accept() bloqué…
connexion établie
──ACK──▶
accept() retourne

send(data)
──data──▶
recv(data)
recv(resp)
◀──resp──
send(resp)

close()
──FIN──▶
close()

Qu'est-ce qu'un socket ?

Un socket est un point de communication identifié par une adresse IP et un numéro de port. C'est le mécanisme de base que HTTP, FTP, SSH utilisent tous sous le capot.

Créer un socket — options
import socket

# socket(famille, type)
# Familles :
#   AF_INET  = IPv4
#   AF_INET6 = IPv6
#   AF_UNIX  = socket de domaine Unix (même machine)
# Types :
#   SOCK_STREAM = TCP (flux, fiable)
#   SOCK_DGRAM  = UDP (datagramme, non fiable)

# TCP/IPv4 (le plus courant)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# UDP/IPv4
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

# Obtenir l'IP de la machine locale
hostname = socket.gethostname()
ip_locale = socket.gethostbyname(hostname)

# Résoudre un nom de domaine
ip = socket.gethostbyname("google.com")
print(ip)  # 142.250.X.X

# Numéros de port standards
# 80=HTTP, 443=HTTPS, 22=SSH, 25=SMTP
# 1-1023 : ports réservés (root requis)
# 1024-65535 : ports libres
MéthodeRôleCôté
bind((host, port))Attacher à une adresseServeur
listen(backlog)Attendre des connexionsServeur
accept()Accepter une connexion → (socket, addr)Serveur
connect((host, port))Se connecter au serveurClient
send(bytes)Envoyer des donnéesLes deux
recv(bufsize)Recevoir des donnéesLes deux
sendall(bytes)Envoyer tout (boucle interne)Les deux
close()Fermer la connexionLes deux
settimeout(sec)Timeout sur les opérationsLes deux
⚠️

recv(n) peut retourner moins de n octets — jamais supposer que tout est arrivé d'un coup. Il faut boucler jusqu'à recevoir le nombre d'octets attendus.

Serveur TCP

🖥️ Serveur
bind + listen + accept
◀──connexion──▶
TCP socket
💻 Client
connect + send/recv
Serveur TCP — version minimale
import socket

HOST = "127.0.0.1"
PORT = 9000

# Créer et configurer le socket serveur
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
    # Réutiliser le port immédiatement après fermeture
    srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    srv.bind((HOST, PORT))
    srv.listen(5)     # file d'attente de 5 connexions
    print(f"Serveur à l'écoute sur {HOST}:{PORT}")

    while True:
        # accept() bloque jusqu'à une connexion entrante
        conn, addr = srv.accept()
        print(f"Connexion de {addr}")

        with conn:
            while True:
                # Recevoir par morceaux de 1024 octets
                data = conn.recv(1024)
                if not data:
                    break   # client déconnecté
                print(f"Reçu : {data.decode()}")
                # Écho — renvoyer ce qu'on reçoit
                conn.sendall(data)
Recevoir exactement N octets
def recv_exact(sock: socket.socket, n: int) -> bytes:
    """Reçoit exactement n octets (boucle si nécessaire)."""
    data = b""
    while len(data) < n:
        morceau = sock.recv(n - len(data))
        if not morceau:
            raise ConnectionError("Connexion fermée prématurément")
        data += morceau
    return data

# Avec makefile() — interface fichier sur un socket
with conn.makefile("rb") as f:
    ligne = f.readline()   # lire jusqu'à \n
💡

SO_REUSEADDR est indispensable en développement — sans lui, relancer le serveur après un crash donne "Address already in use" pendant quelques secondes (état TIME_WAIT du TCP).

Client TCP

Client TCP — bases
import socket

HOST = "127.0.0.1"
PORT = 9000

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.settimeout(10)              # timeout de 10s
    sock.connect((HOST, PORT))
    print("Connecté au serveur")

    # Envoyer un message (toujours en bytes)
    message = "Bonjour serveur!"
    sock.sendall(message.encode("utf-8"))

    # Recevoir la réponse
    reponse = sock.recv(1024)
    print(f"Réponse : {reponse.decode('utf-8')}")

# Connexion automatiquement fermée avec with
Client TCP robuste avec gestion d'erreurs
import socket

def envoyer_message(host: str, port: int, msg: str) -> str:
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(5)
            s.connect((host, port))
            s.sendall(msg.encode("utf-8"))

            # Recevoir jusqu'à la fin
            reponse = b""
            while True:
                morceau = s.recv(4096)
                if not morceau:
                    break
                reponse += morceau
            return reponse.decode("utf-8")

    except socket.timeout:
        raise TimeoutError(f"Pas de réponse après 5s")
    except ConnectionRefusedError:
        raise ConnectionError(f"Serveur {host}:{port} injoignable")
    except OSError as e:
        raise ConnectionError(f"Erreur réseau : {e}")

Serveur multi-clients

Un thread par client
import socket
import threading

def gerer_client(conn: socket.socket, addr: tuple) -> None:
    print(f"[+] Client {addr} connecté")
    with conn:
        while True:
            try:
                data = conn.recv(1024)
                if not data:
                    break
                reponse = traiter(data)
                conn.sendall(reponse)
            except ConnectionResetError:
                break
    print(f"[-] Client {addr} déconnecté")

def traiter(data: bytes) -> bytes:
    # Traitement métier — écho ici
    return data.upper()

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
    srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    srv.bind(("0.0.0.0", 9000))
    srv.listen()
    print("Serveur démarré")

    while True:
        conn, addr = srv.accept()
        # Un thread par client
        t = threading.Thread(
            target=gerer_client,
            args=(conn, addr),
            daemon=True
        )
        t.start()
Serveur select() — I/O multiplexing
import socket, select

# select() surveille plusieurs sockets à la fois
# sans threads — un seul thread gère N clients
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
srv.bind(("0.0.0.0", 9000))
srv.listen()
srv.setblocking(False)

sockets_surveilles = [srv]

while True:
    # Retourne les sockets prêts à lire
    lisibles, _, _ = select.select(sockets_surveilles, [], [])

    for s in lisibles:
        if s is srv:
            # Nouvelle connexion
            conn, addr = srv.accept()
            conn.setblocking(False)
            sockets_surveilles.append(conn)
        else:
            # Données d'un client existant
            data = s.recv(1024)
            if data:
                s.sendall(data.upper())
            else:
                sockets_surveilles.remove(s)
                s.close()
💡

Thread par client — simple, mais limité à quelques centaines de clients (overhead mémoire). select() — un seul thread, scalable, mais plus complexe. asyncio — la meilleure approche pour des milliers de clients.

UDP — sans connexion

UDP envoie des datagrammes sans établir de connexion. Plus rapide que TCP, mais sans garantie de livraison ni d'ordre. Idéal pour DNS, jeux en réseau, streaming vidéo.

Serveur UDP
import socket

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
    sock.bind(("0.0.0.0", 9001))
    print("Serveur UDP prêt")

    while True:
        # recvfrom retourne (données, adresse_expéditeur)
        data, addr = sock.recvfrom(1024)
        print(f"{addr}: {data.decode()}")

        # Répondre directement à l'expéditeur
        sock.sendto(b"ACK", addr)
Client UDP
import socket

SERVER = ("127.0.0.1", 9001)

with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
    sock.settimeout(2)

    # Pas de connect() — envoyer directement
    sock.sendto(b"Bonjour UDP", SERVER)

    try:
        resp, addr = sock.recvfrom(1024)
        print(f"Réponse : {resp.decode()}")
    except socket.timeout:
        print("Pas de réponse")

# UDP broadcast — envoyer à tout le sous-réseau
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    sock.sendto(b"découverte", ("255.255.255.255", 9001))
TCPUDP
ConnexionÉtablie (3-way handshake)Aucune
FiabilitéGarantie (retransmission)Aucune (fire and forget)
OrdreGarantiNon garanti
VitessePlus lent (overhead)Plus rapide
UsageHTTP, SSH, BDD, transfert fichiersDNS, jeux, VoIP, streaming
API Pythonconnect/accept/send/recvsendto/recvfrom

Protocole applicatif

Un protocole applicatif définit comment les messages sont structurés et délimités sur le socket. Sans délimiteur, impossible de savoir où finit un message et où commence le suivant.

Protocole length-prefix (4 octets + payload)
import socket
import struct
import json

def envoyer_message(sock: socket.socket, data: dict) -> None:
    """Envoie un dict JSON précédé de sa taille (4 octets)."""
    payload = json.dumps(data).encode("utf-8")
    # !I = big-endian unsigned int (4 octets)
    longueur = struct.pack("!I", len(payload))
    sock.sendall(longueur + payload)

def recevoir_message(sock: socket.socket) -> dict:
    """Reçoit un message length-prefix et le décode."""
    # Lire les 4 premiers octets = longueur
    header = recv_exact(sock, 4)
    longueur = struct.unpack("!I", header)[0]
    # Lire exactement longueur octets
    payload = recv_exact(sock, longueur)
    return json.loads(payload.decode("utf-8"))

def recv_exact(sock, n):
    data = b""
    while len(data) < n:
        m = sock.recv(n - len(data))
        if not m: raise ConnectionError()
        data += m
    return data
Protocole ligne (newline-delimited)
import socket, json

def envoyer_ligne(sock, data: dict) -> None:
    """JSON sur une ligne (sans newline dans les valeurs)."""
    ligne = json.dumps(data, separators=(",", ":")) + "\n"
    sock.sendall(ligne.encode("utf-8"))

def recevoir_ligne(sock) -> dict:
    """Lire jusqu'au \n."""
    buffer = b""
    while not buffer.endswith(b"\n"):
        morceau = sock.recv(4096)
        if not morceau:
            raise ConnectionError("Connexion fermée")
        buffer += morceau
    return json.loads(buffer.decode("utf-8"))

# Format utilisé par : JSON Lines, certains protocoles IM
ℹ️

2 approches courantes : délimiteur fixe (\n, \r\n, \0) — simple mais les données ne peuvent pas contenir le délimiteur. Length-prefix — 4 octets de taille suivis du payload — plus robuste, supporte les données binaires.

asyncio & sockets

Serveur asyncio — milliers de clients
import asyncio

async def gerer_client(
    reader: asyncio.StreamReader,
    writer: asyncio.StreamWriter
) -> None:
    addr = writer.get_extra_info("peername")
    print(f"[+] {addr} connecté")

    try:
        while True:
            data = await reader.read(1024)
            if not data:
                break
            message = data.decode().strip()
            print(f"{addr}: {message}")

            reponse = f"Echo: {message}\n"
            writer.write(reponse.encode())
            await writer.drain()  # vider le buffer
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"[-] {addr} déconnecté")

async def main() -> None:
    server = await asyncio.start_server(
        gerer_client, "0.0.0.0", 9000
    )
    async with server:
        print("Serveur asyncio démarré")
        await server.serve_forever()

asyncio.run(main())
Client asyncio
import asyncio

async def client() -> None:
    reader, writer = await asyncio.open_connection(
        "127.0.0.1", 9000
    )

    for msg in ["Bonjour", "Python", "asyncio"]:
        writer.write(f"{msg}\n".encode())
        await writer.drain()

        reponse = await reader.readline()
        print(f"Serveur : {reponse.decode().strip()}")

    writer.close()
    await writer.wait_closed()

asyncio.run(client())
💡

asyncio.StreamReader/StreamWriter sont les abstractions haut niveau d'asyncio pour les sockets — ils gèrent la bufferisation, les timeouts et la fermeture propre.

SSL/TLS — chiffrement

Client SSL — connexion chiffrée
import socket
import ssl

hostname = "www.google.com"
port     = 443

# Contexte SSL avec vérification du certificat
context = ssl.create_default_context()

with socket.create_connection((hostname, port)) as sock:
    with context.wrap_socket(sock, server_hostname=hostname) as ssock:
        print(f"Protocole : {ssock.version()}")  # TLSv1.3
        print(f"Chiffrement : {ssock.cipher()}")

        # Requête HTTP/1.1 brute
        requete = (
            f"GET / HTTP/1.1\r\n"
            f"Host: {hostname}\r\n"
            f"Connection: close\r\n\r\n"
        )
        ssock.sendall(requete.encode())
        reponse = b""
        while chunk := ssock.recv(4096):
            reponse += chunk
        print(reponse[:200].decode(errors="replace"))
Serveur SSL avec certificat auto-signé
# Générer certificat auto-signé (développement)
# openssl req -x509 -newkey rsa:4096 \
#   -keyout key.pem -out cert.pem \
#   -days 365 -nodes -subj "/CN=localhost"

import socket, ssl

context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain("cert.pem", "key.pem")

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as srv:
    srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    srv.bind(("0.0.0.0", 9443))
    srv.listen()

    with context.wrap_socket(srv, server_side=True) as ssrv:
        conn, addr = ssrv.accept()
        with conn:
            data = conn.recv(1024)
            conn.sendall(data)  # écho chiffré
⚠️

Ne jamais utiliser ssl.PROTOCOL_TLSv1 ou TLS 1.1 — obsolètes et vulnérables. ssl.create_default_context() choisit automatiquement la version la plus récente supportée.

socketserver — serveurs prêts à l'emploi

socketserver.TCPServer
import socketserver

class MonHandler(socketserver.BaseRequestHandler):
    """Appelé pour chaque nouvelle connexion."""

    def handle(self) -> None:
        # self.request = socket connecté
        # self.client_address = (ip, port)
        data = self.request.recv(1024).strip()
        print(f"{self.client_address} : {data.decode()}")
        self.request.sendall(data.upper())

# TCPServer standard (séquentiel)
with socketserver.TCPServer(("0.0.0.0", 9000), MonHandler) as srv:
    srv.serve_forever()

# ThreadingTCPServer — un thread par client
with socketserver.ThreadingTCPServer(("0.0.0.0", 9000), MonHandler) as srv:
    srv.serve_forever()
Serveur HTTP simple (socketserver)
from http.server import HTTPServer, BaseHTTPRequestHandler

class MonServeurHTTP(BaseHTTPRequestHandler):
    def do_GET(self) -> None:
        corps = b"<h1>Bonjour!</h1>"
        self.send_response(200)
        self.send_header("Content-Type", "text/html")
        self.send_header("Content-Length", len(corps))
        self.end_headers()
        self.wfile.write(corps)

    def log_message(self, *args): pass  # désactiver les logs

with HTTPServer(("", 8080), MonServeurHTTP) as srv:
    print("http://localhost:8080")
    srv.serve_forever()

# Équivalent en ligne de commande :
# python -m http.server 8080

Debug & outils réseau

Outils en ligne de commande
# Tester un port TCP
nc -zv 127.0.0.1 9000        # netcat
telnet 127.0.0.1 9000

# Voir les ports en écoute
ss -tlnp                     # Linux
netstat -an | grep LISTEN    # macOS/Windows

# Capturer le trafic réseau
tcpdump -i lo port 9000      # Linux
wireshark                    # GUI multiplateforme

# Scanner des ports
nmap -p 8000-9000 127.0.0.1
Debug depuis Python
import socket

# Vérifier si un port est ouvert
def port_ouvert(host: str, port: int, timeout: float = 1) -> bool:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.settimeout(timeout)
        try:
            s.connect((host, port))
            return True
        except (socket.timeout, ConnectionRefusedError):
            return False

print(port_ouvert("google.com", 443))  # True
print(port_ouvert("127.0.0.1", 9000))  # True si serveur tourne

# Obtenir son IP locale
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
    s.connect(("8.8.8.8", 80))         # pas de vrai trafic
    ip_locale = s.getsockname()[0]
    print(ip_locale)

Cheat sheet

Serveur TCP

socket(AF_INET, SOCK_STREAM)Créer socket TCP
setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)Réutiliser port
bind(("0.0.0.0", port))Écouter toutes interfaces
listen(backlog)File d'attente
conn, addr = accept()Accepter connexion
recv(n)Recevoir jusqu'à n octets
sendall(data)Envoyer tout

Client TCP

socket(AF_INET, SOCK_STREAM)Créer socket TCP
settimeout(sec)Timeout
connect((host, port))Se connecter
sendall(msg.encode())Envoyer bytes
recv(n).decode()Recevoir str
with socket(...) as s:Fermeture auto

UDP

socket(AF_INET, SOCK_DGRAM)Créer socket UDP
sendto(data, (host, port))Envoyer datagramme
data, addr = recvfrom(n)Recevoir + adresse source
Pas de connect/acceptSans connexion

asyncio

asyncio.start_server(cb, h, p)Serveur async
asyncio.open_connection(h, p)Client async
await reader.read(n)Lire
writer.write(data)Écrire
await writer.drain()Vider buffer
await reader.readline()Lire jusqu'à \n