Introduction
Lecture de deux sockets :
Socket s1 = new Socket(host, port);
Socket s2 = new Socket(host2, port2);
InputStream in1 = s1.getInputStream();
InputStream in2 = s2.getInputStream();
OutputStream out = s2.getOutputStream();
int a = in1.read();
int b = in2.read();
int c = a+b;
out.write(c);
Exemple de l’architecture client-serveur
Dans une architecture client/serveur, deux clients qui veulent communiquer ensemble, doivent passer par un serveur.
Exemple de l’architecture client-serveur
String message = aliceIn.readLine();
bobOut.println(message);
message = bobIn.readLine();
aliceOut.println(message);
Serveur multithreadé
Calculs intensifs
En plus de la nécessité d’utiliser les threads pour pouvoir faire plusieurs choses en même temps. Il est parfois utile de pouvoir paralléliser un calcul pour utiliser plus efficacement les mutliple coeurs du CPU (pour effectuer un calcul plus vite).
Dans ce context le problème est parfois plus compliqué car des threads travaillant sur des données communes doivent faire attention lorsqu’ils accèdent de manière concurrenteau même données.
Les processus et les threads
Différences entre processus et threads
Un processus est un programme qui s’exécute. Il possède son propre espace mémoire.
Un thread est un ensemble d’instructions qui s’exécutent (un sous-ensemble des instructions du processus en cours). Il utilise le même espace mémoire et les mêmes resources que le processus dans lequel il s’exécute. Cependant, il peut s’exécuter de manière indépendantes de (parfois en même temps que) son processus (possiblement sur un coeur différent du CPU ).
Le processus principal
La class Runtime
définie un objet qui représente un environement d’exécution. On obtient l’environement de l’application courante à l’aide de la méthode statique Runtime.getRuntime()
Runtime monApplication = Runtime.getRuntime();
Création de processus
A partir d’un objet Runtime
on peut créer des processus (objects de type Process
) avec la méthode exec
. Ou bien en utilisant la classe ProcessBuilder
.
Runtime monApplication = Runtime.getRuntime();
Process p1 = monApplication.exec("cat index.html");
Process p2 = monApplication.exec("python", "script.py");
ProcessBuilder pb = new ProcessBuilder("espeak", "hello, how are you?");
Process p3 = pb.start();
pb = new ProcessBuilder("say", "Bonjour tout le monde!");
Process p4 = pb.start();
// Waits for the command to finish.
p1.waitFor();
p2.waitFor();
p3.waitFor();
p4.waitFor();
Communication avec un processus
Les méthodes getOutputStream()
, getInputStream()
, et getErrorStream()
permettent de communiquer avec le processus.
Process p2 = monApplication.exec("python script.py");
BufferedReader in = new BufferedReader(new InputStreamReader(p.getErrorStream()));
String line;
while ((line = in.readLine()) != null) {
System.out.println(line);
}
/usr/local/bin/python: can't open file 'script.py': [Errno 2] No such file or directory
Avantage des Threads
Dans un processus Java, on peut créer plusieurs fils d’exécution (threads) internes
- un seul processus (au sens système d’expl)
- possibilités de contrôle plus fin (priorité, interruption…)
- c’est la JVM qui assure l’ordonnancement (concurrence)
- espace mémoire commun entre les différents threads
Créer un Thread
Java propose la classe Thread
. Lorsqu’un objet Thread
est démarré avec la méthode start()
, le code contenu dans la méthode run()
est éxécuté de manière indépendante.
Il y a deux possibilités pour créer son propre thread :
- On peut créer une sous-classe de
Thread
et redéfinir la méthoderun()
- On peut créer une classe qui implémente l’interface fonctionelle
Runnable
et créer unThread
en donnant une instance de notre classe en paramètre du constructeur
Créer une classe qui implémente Runnable
class TacheDeFond implements Runnable {
@Override
public void run() {
int i = 40;
while(i-->0) {
System.out.println(i);
}
}
}
Runnable t = new TacheDeFond();
new Thread(t).start();
System.out.println("Le thread a été lancé");
Runnable t = new TacheDeFond();
new Thread(t).start();
new Thread(t).start();
System.out.println("Les threads ont été lancés");
Créer une classe anonyme qui implémente Runnable
Runnable t = new Runnable() {
@Override
public void run() {
int i = 40;
while(i-->0) {
System.out.println(i);
}
}
}; // la classe est définie en même temps que son instanciation
new Thread(t).start();
new Thread(t).start();
System.out.println("Les threads ont été lancés");
Créer un lambda Runnable
(java 8)
Runnable t = () -> {
int i = 40;
while(i-->0) {
System.out.println(i);
}
};
new Thread(t).start();
new Thread(t).start();
System.out.println("Les threads ont été lancés");
Créer une classe qui implémente Runnable
class TacheDeFond implements Runnable {
private int iter;
TacheDeFond(int iter) {
this.iter = iter;
}
@Override
public void run() {
while(iter-->0) {
System.out.println(iter);
}
}
}
ici les deux threads utilisent le même objet Runnable
donc le même attribut iter
Runnable t = new TacheDeFond(50);
new Thread(t).start();
new Thread(t).start();
System.out.println("Les threads ont été lancés");
Créer une classe qui implémente Runnable
class TacheDeFond implements Runnable {
private int iter;
TacheDeFond(int iter) {
this.iter = iter;
}
@Override
public void run() {
while(iter-->0) {
System.out.println(iter);
}
}
}
Runnable t = new TacheDeFond(50);
new Thread(t).start();
t = new TacheDeFond(50);
new Thread(t).start();
System.out.println("Les threads ont été lancés");
Gestion des Threads
Propriété d’un thread
Chaque thread possède :
- un nom :
[get/set]Name()
- une priorité :
[get/set]Priority()
- un statut daemon :
[is/set]Daemon()
La Machine Virtuelle Java continue de tourner jusqu’à ce que:
- soit la méthode
exit()
de la classeRuntime
soit appelée - soit tous les threads non marqués “daemon” soient terminés. On peut savoir si un thread est terminé via la méthode
isAlive()
Le thread courant
On peut récuperer l’instance du thead courant à l’aide de la méthode statique Thread.currentThread()
.
new Thread(() -> {
int i = 20;
while(i-->0) {
System.out.println(Thread.currentThread().getName()+" : "+i);
}
}, "mon super thread").start();
System.out.println(Thread.currentThread().getName()+" : le thread a été lancé");
Gestion d’un thread
Un thread est lancé avec la méthode start()
. Le code de la méthode run()
est alors exécuté. Un thread ne peut pas être intérrompu lorsqu’il effectue des calculs.
Thread t1 = new Thread(() -> {
int i = 10;
while(i-->0) {
System.out.println("t1: "+i);
longComputation();
}
});
t1.start();
Thread.sleep(5000);
System.out.println("trying to stop the thread");
t1.interrupt();
ici, le thread n’est pas interrompu au milieu.
Interrumption d’un tread
Un thread peut vérifier si un autre thread tente de l’interrompre à l’aide des méthodes interrupted()
ou isInterruped()
.
Thread t2 = new Thread(() -> {
int i = 10;
while(i-->0 && !Thread.currentThread().interrupted()) {
System.out.println("t2: "+i);
longComputation();
}
});
t2.start();
Thread.sleep(5000);
t2.interrupt();
System.out.println("trying to stop the thread");
Différence entre interrupted()
et isInterruped()
? Le status “interrompu” d’un thread est réinitiaisé à false
après un appelle à interrupted()
.
Interruption d’un thread
Un thread peut aussi être interrompu lorsqu’il est en attente de quelque chose. Par exemple avec la méthode sleep(long milliseconds)
(elle lance une exception InterruptedException
).
Thread t2 = new Thread(() -> {
int i = 10;
try {
while(i-->0) {
System.out.println("t2: "+i);
longComputation();
Thread.sleep(1000);
}
} catch(Exception e) { System.out.println(e); }
});
t2.start();
Thread.sleep(5000);
t2.interrupt();
System.out.println("trying to stop the thread");
Attendre la fin d’un autre thread
Le méthode bloquante join()
permet d’attendre un thread. Comme elle “attend”, on peut être interrompu dans cette attente (elle lance une exception InterruptedException
)
Thread t1 = new Thread(() -> {
longComputation();
longComputation();
});
Thread t2 = new Thread(() -> {
try {
longComputation();
t1.join();
longComputation();
} catch(Exception e) { }
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Les threads ont terminé");
Execution des Threads
Dans les exemples précédents, on a vu comment créer des threads “à la main” à partir d’un code qu’on voulait exécuter (le Runnable). C’est utile lorsque l’on a une application qui doit absolument faire certaines choses sur des threads différents.
Cependant, on veut souvent uniquement exécuter des tâches sans se soucier réellement de la manière dont elle sont exécutées. Dans ce cas on va utiliser l’outil Executor
.
Execution des Threads
Un Executor
doit être utilisé lorsque l’on veut exécuter des tâches (potentiellement gourmandes), de manière efficace (réparties plusieurs coeurs du CPU), de manière asynchrone (être averti lorsqu’elles sont terminées).
Ex: charger des textures dans un jeu vidéo, appliquer des filtres dans un logiciel de retouche photo, recevoir des données d’un client et y repondre.
Execution des Threads
Avantages d’un Executor :
- la manière dont les tâches s’exécutent est abstraite
- la JVM peut optimiser l’utilisation des threads servant à exécuter ces tâches
- on ne crée pas un Thread par tâche (la création d’un Thread est couteuse)
Executor
et ExecutorService
Executor
est une interface définissant la capacité d’un objet à exécuter des tâches Runnable
. L’interface ExecutorService
définit en plus la capacité à exécuter des Callable
et ajoute des méthodes pour gérer l’avancement des tâches.
On peut créer un objet de type ThreadPoolExecutor
(qui implémente ExecutorService
) avec la méthode statique Executors.newFixedThreadPool(int nThreads)
. L’exécution des tâches se fait alors sur un ensemble fini de threads.
Utilisation de Executor
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.submit(fonctionCompliquee1);
pool.submit(fonctionCompliquee2);
pool.submit(fonctionCompliquee3);
try {
System.out.println("attempt to shutdown executor");
pool.shutdown();
pool.awaitTermination(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
System.err.println("tasks interrupted");
}
fonctions Callable
lorsque les fonctions que l’on souhaite exécutées retournent une valeur alors ce sont des Callable<V>
(interface fonctionnelle contenant une seule méthode V call()
)
ExecutorService pool = Executors.newFixedThreadPool(2);
pool.submit(() -> { return racineCarrePrecise(2,10000); } );
pool.submit(() -> { return racineCarrePrecise(3,100000); } );
Mais alors comment récupérer la valeur de retour des fonctions ?
ExecutorService pool = Executors.newFixedThreadPool(2);
BigDecimal r2 = pool.submit(() -> racineCarrePrecise(2,10000););
BigDecimal r3 = pool.submit(() -> racineCarrePrecise(3,100000););
System.out.println(r2);
System.out.println(r3);
Problème ? la fonction submit
retourne tout de suite mais la tâche à effectuer prend du temps et le résultat sera connu dans le futur
la classe du Turfu
ExecutorService pool = Executors.newFixedThreadPool(2);
Future<BigDecimal> r2 = pool.submit(()->racineCarrePrecise(2,10000));
Future<BigDecimal> r3 = pool.submit(()->racineCarrePrecise(3,100000));
System.out.println(r2.get());
if(r3.isDone())
System.out.println(r3.get());
else
r3.cancel(true);
Un objet Future
représent un objet qui est le résultat d’une fonction asynchrone.
V get()
: attend que le résultat soit disponnible, puis le retourneisDone()
: retournetrue
si le résultat est disponniblecancel(boolean interrupt)
: annule le calcul (force l’interruption siinterrupt
esttrue
)
Interface ScheduledExecutorService
Ajoute des notions temporelles (de planifications). doc
Classes et interfaces autour de Future<T>
Accès Concurrents et Synchronisation
Problèmes liés au processeur
- certaines variables peuvent être stockées dans des registres pour éviter les aller-retour à la mémoire
- les instructions sont ré-ordonnancées à l’intérieur d’un thread pour de meilleures performances
- Les long et double sont chargés en 2 fois sur une machine 32bit
public class A implements Runnable {
public boolean start; // false
public int value;
public void run() {
while(!this.start) ; // attente active
System.out.println(value); // jamais atteint
}
}
public void doSomething(A a) {
a.value = 3;
a.start = true;
}
start
n’est pas mise à jour dans la mémoire centrale
public class A implements Runnable {
public boolean start; // false
public int value;
public void run() {
while(!this.start) ; // attente active
System.out.println(value); // affiche 0
}
}
public void doSomething(A a) {
a.value = 3;
a.start = true;
}
a.start = true;
a.value = 3;
Le mot clé volatile
Il est utilisé comme un modificateur de champ. Il empêche les problèmes liés à l’optimisation.
- Assure la cohérence entre la mémoire de travail et la mémoire principale (pas de cache possible). La mise à jour est forcée à chaque lecture/écriture pour prendre en compte les dernières modifications
- Impose une barrière mémoire. Interdit le réordonnancement entre les variables volatiles et les autres
- Assure l’atomicité de la lecture et de l’écriture des variables de type double et long
Accès Concurrents
Exécution d’opérations en parallèle => problèmes lors de l’accès à une resource. Même si le CPU n’a qu’un seul coeur!!
Exemple d’accès concurrents
class Account{
private int balance;
public Account(int b){
balance=b;
}
public boolean draw(int x) {
if(balance < x) return false;
balance -= x;
return true;
}
public int getBalance() { return balance; }
}
Account a = new Account(30);
Runnable r = () -> { while(a.draw(1)); };
Thread t1 = new Thread(r); t1.start();
Thread t2 = new Thread(r); t2.start();
t1.join();
System.out.println(a.getBalance());
Problème
Si balance
vaut initialement 1, cette exécution permet malgré tout de retirer 2, et d’obtenir une balance négative.
Exclusion Mutuelle
Même si chaque instruction de base est atomique, impossible d’assurer qu’un thread ne « perdra » pas le processeur entre deux instructions.
On ne peut « que » exclure mutuellement plusieurs threads, grâce à la notion de moniteur
Les moniteurs
Chaque Object
est un moniteur (ou moniteur verrou). Ce moniteur sert à la synchronisation des threads, l’accès exlusif à l’état d’un objet.
Un thread peut acquiérir un verrou lorsqu’il veut un accès exclusif à une donnée, puis le libérer lorsqu’il a terminé. Pendant ce temps, on dit qu’il possède ce verrou.
Lorsqu’un thread possède un vérrou, aucun autre thread ne peut l’acquiérir. Un autre thread qui essaie de l’acquérir attend que le verrou se libère.
L’action de libèrer un verrou se produit strictement avant toute autre acquisition.
Acquisition d’un verrou
L’acquisition du verrou d’un moniteur se fait avec le mot clé synchronized
synchronized (monitor) {
// bloc protégé par monitor
}
class Account{
private final Object lock = new Object();
private int balance;
public Account(int b){
balance=b;
}
public boolean draw(int x) {
synchronized(lock) {
if(balance < x) return false;
balance -= x;
return true;
}
}
public int getBalance() { synchronized(lock) { return balance; } }
}
Exécution possible
Méthode synchronized
On peut utiliser l’objet courant this
comme moniteur
class Account{
private volatile int balance;
public Account(int b){
balance=b;
}
public synchronized boolean draw(int x) {
if(balance < x) return false;
if(x < 0) return false;
balance -= x;
return true;
}
public synchronized boolean add(int x) {
if(x < 0) return false;
balance += x;
return true;
}
public int getBalance() { return balance; }
}
Synchronized this
Cependant il est préférable d’utiliser un moniteur différent de this
.
this
est peut-etre déjà utilisé comme moniteur par une autre partie du code- cela permet de définir la zone d’exclusion mutuelle de manière plus précise
- cela permet de définir des moniteurs différents pour des resources différentes
Un moniteur par resource
class Account{
private final Object balanceLock = new Object();
private final Object nameLock = new Object();
private int balance;
private String name;
private List<String> previousNames =
new ArrayList<String>();
public Account(int b){
balance=b;
}
public boolean draw(int x) {
synchronized(balanceLock) {
if(balance < x) return false;
balance -= x;
return true;
}
}
public void setName(String name) {
synchronized(nameLock) {
previousNames.add(name);
this.name = name;
}
}
}
Les moniteurs sont réentrants
Si un thread détient un moniteur m
alors il peut exécuter d’autres section de code nécessitant l’acquisition de m
class Account{
private final Object lock = new Object();
private int balance;
public Account(int b){
balance=b;
}
public boolean draw(int x) {
synchronized(lock) { // acquiert lock
if(getBalance() < x) // appelle getBalance()
return false;
balance -= x;
return true;
}
}
public int getBalance() {
synchronized(lock) { // lock déjà acquit => OK
return balance;
}
}
}
Attention à l’inter-blocage
class Account{
private final Object lock = new Object();
private int balance;
public Account(int b){
balance=b;
}
public boolean draw(int x) {
synchronized(lock) {
if(balance < x) return false;
balance -= x;
return true;
}
}
public boolean transfertTo(Account account, int amount) {
synchronized(lock) {
synchronized(account.lock) {
if(!draw(amount)) return false;
account.add(amount);
return true;
}
}
}
}
Attention à l’inter-blocage
Account a = new Account(100);
Account b = new Account(100);
Thread t1 = new Thread(() -> {
a.transfertTo(b, 50);
});
t1.start();
Thread t2 = new Thread(() -> {
b.transfertTo(a, 50);
});
t1.join(); // ne termine jamais
t2.join();
System.out.println(a.getBalance());
Synchronisation et mémoire
- Le relâchement d’un moniteur (release) force l’écriture dans la mémoire principale de tout ce qui a été modifié dans le cache (registres) avant ou pendant le bloc synchronisé
- L’acquisition d’un moniteur (acquire) invalide le cache (registres) et force la relecture depuis la mémoire principale
La notion d’atomicité tend à garantir que l’état de la mémoire sera cohérent
La notion d’exclusion mutuelle (synchronized) ne fait qu’assurer que du code n’aura pas accès à un état de la mémoire incohérent
Synchronisation et notification
On a vu les méthodes sleep
, resp. join
, qui permettent d’attendre un temps donné, resp. la terminaison d’un thread.
Il est aussi possible pour un thread d’attendre une notification avec la méthode wait
.
wait/notify
Un thread A
peut appelé la méthode o.wait()
sur un objet quelconque o
. Pour cela le thread doit détenir le verrou du moniteur o
.
Après l’appelle de wait
:
- le thread
A
libère le verrou deo
- le thread attend
- Lorsque un autre thread envoie une notification en utilisant
o
avec la méthodeo.notify()
ouo.notifyAll()
, le threadA
est révéillé et tente à nouveau d’acquiérir le verrou.
wait/notify
Pour pouvoir appeler notify
ou notifyAll
sur un objet o
, un thread doit détenir le verrou de o
.
Lorsqu’un thread B
appelle de o.notify()
: un seul thread en attente sur l’objet o
(chosi aléatoirement) est réveillé.
Lorsqu’un thread B
appelle de o.notifyAll()
: tous les threads en attente sur l’objet o
sont réveillés. Ils sont alors en compétition pour acquérir le verrou de o
.
Exemple typique
Toujours faire les wait()
dans une boucle vérifiant la condition que l’on souhaite. Après le réveil et la ré-acquisition du moniteur, cela assure que la condition requise est toujours valide! En particulier, on sait qu’elle n’a pas été consommée par un tiers.
// code des processus légers
// intéressés par les événements
synchronized (o) {
while (ok==0) {
o.wait();
}
ok--;
}
traitement();
// code du processus léger
// qui signale les événements
synchronized (o) {
ok++;
o.notify();
}
wait()
Le méthode wait()
“attend” donc le thread qui l’appelle peut être interrompu dans cette attente (elle lance une exception InterruptedException
)
Comme les autres méthodes de synchronisation Thread.join
, Future<T>.get
, …. elle accepte un paramètre long
qui définit l’attente maximale autorisée.
Problèmes
Data Race
C’est lorsqu’il peut y avoir une “course aux données”. C’est-à-dire lorsque : (a) un thread écrit dans une variable (b) un autre thread lit dans cette variable (c) l’écriture et la lecture ne sont pas ordonnées explicitement par synchronisation.
public class SimpleDataRace {
static int a = 0;
public static void main(String[] args) {
new Thread(() -> { System.out.println(a); }).start();
a = 1;
} /* Peut afficher 1 comme 0. */
}
Race Condition
class FileDownloader {
setFile(File file) { ... }
isAuthorized(User u) { ... }
download(User u) { ... }
}
doDownload(User u, FileDownloader f){
if(f.isAutgorized()){
f.download(u);
}
}
f n’est pas immuable donc si un autre thread appelle setFile entre isAuthorized et download(), alors un fichier potentiellement privé sera envoyé. Même si FileDownloader
est thread-safe.

Serveur Multithread
Architecture d’un serveur acceptant plusieurs connexion
Le plus simple est d’écouter l’arrivée de nouvelles connexions dans le thread principal.
Pour pouvoir accepter de nouvelle connexion, il ne faut pas que l’écoute d’une socket particulière s’exécute sur le thread principal
Donc lorsqu’un connexion arrive, il faut que le serveur délègue la gestion de cette connexion à un autre thread.
Exemple Basique
import java.util.*;
import java.net.*;
import java.io.*;
class Server {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(3014);
while(true) {
Socket clientSocket = serverSocket.accept();
// on a une nouvelle connexion, on la traite.
PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
BufferedReader in =
new BufferedReader(
new InputStreamReader(clientSocket.getInputStream())
);
System.out.println("connection of "+
clientSocket.getRemoteSocketAddress());
String inputLine = "";
while ((inputLine = in.readLine()) != null) {
System.out.println(inputLine);
if (inputLine.equals("Bye"))
{
out.println("Bye");
break;
}
out.println("You said "+inputLine);
}
System.out.println("disconnection of "+
clientSocket.getRemoteSocketAddress());
out.close();
in.close();
clientSocket.close();
// le client est parti, on peut accepter un nouvelle connexion
}
}
}
Exemple Basique
class Server {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(3014);
while(true) {
Socket clientSocket = serverSocket.accept();
// on a une nouvelle connexion, on la traite.
// ... code qui gère un client
// ... contient des fonctions bloquantes!!!
// le client est parti, on peut accepter un nouvelle connexion
}
}
}
Solution Basique
class Server {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(3014);
while(true) {
Socket clientSocket = serverSocket.accept();
// on a une nouvelle connexion, on la traite.
new Thread(() -> {
// ... code qui gère un client
}).start();
// le client est géré dans un thread séparé,
// on peut accepter un nouvelle connexion
}
}
}
Solution avec un executor
import java.util.concurrent.*;
class Server {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(3014);
ExecutorService executor = Executors.newFixedThreadPool(30);
while(true) {
Socket clientSocket = serverSocket.accept();
// on a une nouvelle connexion, on la traite.
executor.submit(() -> {
// ... code qui gère un client
});
// le client est géré séparément,
// on peut accepter un nouvelle connexion
}
}
}
S’il y a trop de connexions, elles seront mises en attente par l’exécutor, on n’arrête jamais d’accepter de nouvelles connexions.
Solution Complète
Comme la gestion d’un client peut être assez complexe, ça vaut le coup de créer une classe dédiée.
class ClientHandler implements Runnable {
Socket client;
ClientHandler(Socket s) { client = s; }
@Override
public void run() {
// ... code qui gère un client
}
}
Solution Complète
Comme la gestion d’un client peut être assez complexe, ça vaut le coup de créer une classe dédiée.
import java.util.concurrent.*;
class Server {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(3014);
ExecutorService executor = Executors.newFixedThreadPool(30);
while(true) {
Socket clientSocket = serverSocket.accept();
// on a une nouvelle connexion, on la traite.
executor.submit(new ClientHandler(clientSocket));
// le client est géré séparément,
// on peut accepter un nouvelle connexion
}
}
}