Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

di il
47 risposte

Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

A volte in java si crea un ciclo for che itera su di un vettore che viene modificato. Meno frequentemente le operazioni che vengono eseguite sono indipendenti. Pensate ad una matrice alla quale si vuole sommare 1 ad ogni elemento. Sarebbe molto bello se esistesse un ciclo for capace di operare in parallelo ma dato che non esiste ho deciso di crearlo. Per la verità io non ho creato nulla ma semplicemente implementato un framework che esisteva già. Il codice che ho creato funziona ma ha un problema: è difficilissimo da implementare nei diversi casi d’uso. Vorrei qualcosa di più snello da poter implementare nelle diverse classi @Service di Spring Boot. Ci sono soluzioni oppure mi devo rassegnare?

import java.util.List;
import java.util.concurrent.RecursiveTask;

public class Operazioni extends RecursiveTask<Long> {

    int sogliaSequenziale;
    List<Long> dati;
    int inizio;
    int fine;

    public Operazioni(int sogliaSequenziale, List<Long> dati, int inizio, int fine) {
        this.sogliaSequenziale = sogliaSequenziale;
        this.dati = dati;
        this.inizio = inizio;
        this.fine = fine;
    }

    protected Long compute(){
        Long operazioniParallele = 0L;
        if((fine-inizio)<=sogliaSequenziale){
            for(int i=inizio; i<fine; i++){
                dati.set(i,dati.get(i)+1);
                long somma = 0L;
                for(int j=0; j<2111111111; j++){
                    somma++;
                }
            }
            operazioniParallele = 1L;
        }else{
            int mezzo = (inizio+fine)/2;
            Operazioni sottoOperazioneA = new Operazioni(sogliaSequenziale, dati, inizio, mezzo);
            Operazioni sottoOperazioneB = new Operazioni(sogliaSequenziale, dati, mezzo, fine);
            sottoOperazioneA.fork();
            sottoOperazioneB.fork();
            operazioniParallele = sottoOperazioneA.join() + sottoOperazioneB.join();
        }
        return operazioniParallele;
    }

}


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class AvvioOperazioni {

    public AvvioOperazioni() {
    }

    public void avvioOperazioni() {

        long tempoInizio = System.nanoTime();
        ForkJoinPool fjp = new ForkJoinPool();
        List<Long> lista = new ArrayList<Long>();
        for(long i=0; i<1000; i++){
            lista.add(i);
        }
        Operazioni operazioni = new Operazioni(
                lista.size()/fjp.getParallelism(), // 5 secondi
                //lista.size(), // 19 secondi
                lista,
                0,
                lista.size()
        );
        Long operazioniParallele = fjp.invoke(operazioni);
        System.out.println("Primo valore della lista:         " + lista.get(0));
        System.out.println("Ultimo valore della lista:        " + lista.get(lista.size()-1));
        System.out.println("Lunghezza della lista:            " + lista.get(lista.size()-1));
        System.out.println("Operazioni eseguite in parallelo: " + operazioniParallele);
        long tempoFile = System.nanoTime();
        long tempoTrascorso = tempoFile - tempoInizio;
        System.out.println(tempoTrascorso/1000000000);

    }
}


public class Main {
    public static void main(String args[]) {
        AvvioOperazioni avvioOperazioni = new AvvioOperazioni();
        avvioOperazioni.avvioOperazioni();
    }
}

47 Risposte

  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    iBaffiPro ha scritto:


    Sarebbe molto bello se esistesse un ciclo for capace di operare in parallelo ma dato che non esiste ho deciso di crearlo. Per la verità io non ho creato nulla ma semplicemente implementato un framework che esisteva già. Il codice che ho creato funziona ma ha un problema: è difficilissimo da implementare nei diversi casi d’uso. Vorrei qualcosa di più snello da poter implementare nelle diverse classi @Service di Spring Boot. Ci sono soluzioni oppure mi devo rassegnare?
    No, allora chiariamo bene. Innanzitutto per usare direttamente il Fork/Join Framework di Java 7+ serve chiaramente uno scenario abbastanza ben delineato. Generalizzare purtroppo NON è facile e non è sempre possibile.

    Poi comunque non l'hai neanche usato benissimo. Quando si usa il Fork/Join, tipicamente delle due sottoparti una si esegue forkando (cioè fork() poi join()) e l'altra conviene invocarla in modo sincrono con il compute(). Perché? Perché così si ottimizza l'uso dei thread e si agevola l'ulteriore splitting.
    Come hai fatto tu, fai fork() su entrambe le sottoparti e poi "sprechi" il thread tenendolo bloccato solo per attendere entrambe con i join. Studia di più

    Comunque i benchmark con i System.nanoTime() possono anche andare bene. Ma ne devi fare tanti in ciclo prendendo sempre il tempo minore. Oppure facendo una fase iniziale di warm-up con un po' di cicli non cronometrati. Altrimenti con 1 sola prova, il tempo che ottieni è assolutamente inattendibile/fuorviante.

    Tornando al multithreading: la cosa più semplice per lavorare in maniera "funzionale" sulle collezioni è usare la Stream API. Che è anche facilmente parallelizzabile se ci sono i presupposti giusti/validi (grandi quantità di dati, operazioni stateless e non interferenti con la collezione sorgente).
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    Grazie mille per la risposta.
    Sapresti scrivermi una classe immutabile che possa essere usata per spezzare in N thread un ciclo for di questo genere con Stream API?
    
    for(int i=inizio; i<fine; i++){
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    }
    
    In pratica si tratta di scorrere su di una lista e aggiungere 1 al numero presente. Il pezzo seguente serve solo per rendere il ciclo più pesante in termini di calcoli e rallentare le operazioni.
    
    for(int j=0; j<2111111111; j++){
    	somma++;
    }
    
    Sarebbe bello se si potesse fare una cosa di questo tipo:
    In un comune programma Java:
    
    Multithreading multithreading = new Multithreading();
    multithreading.forMultithreading(List<T> lista, int inizio, int fine,){
    	// le operazioni che l'utente desidera fare su List<T> lista
    }
    
    In Spring Boot 2.0:
    
    @Autowired
    Multithreading multithreading;
    multithreading.forMultithreading(List<T> lista, int inizio, int fine,){
    	// le operazioni che l'utente desidera fare su List<T> lista
    }
    
    In modo da avere a disposizione un ciclo for Multithreading da usare all’occorrenza.
    Pensi che una cosa così potrebbe essere utile ed impostata bene?
    Framework a parte, imposteresti il problema in modo diverso?

    Con Java 7 si può calcolare il numero di Thread disponibili sulla CPU (nel mio caso 4) e spezzare lista.size() in N parti automaticamente, con Stream API, creato con Java 8, presumo che si possa fare allo stesso modo dato che è un framework più recente.
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    iBaffiPro ha scritto:


    Sapresti scrivermi una classe immutabile che possa essere usata per spezzare in N thread un ciclo for di questo genere con Stream API?
    
    for(int i=inizio; i<fine; i++){
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    }
    
    Non ho ben capito cosa devi fare e a che scopo ma ... giusto per dare una idea: se hai una lista di numeri (List<Integer>) e vuoi aggiungere 1 a tutti i valori, con la Stream API si fa così:
    List<Integer> numeriA = Arrays.asList(47, 25, 56, 19, 32, 20);
    
    List<Integer> numeriB = numeriA.stream().map(n -> n+1).collect(Collectors.toList());
    Questo però crea una nuova lista, NON modifica quella originale (numeriA).

    Se si vuole la versione "parallela": numeriA.parallelStream().blabla

    Ma attenzione: perché abbia senso la parallelizzazione devi trattare tanti, TANTI dati. Se ne hai solo 10, 100 o 1000, la parallelizzazione non ti dà benefici. Perché la parallelizzazione "costa" molto!
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    Se ho la lista 'List<Componente> componenti' e desidero fare diverse verifiche su ciascun componente (verificare se l'oggetto contiene un dato valore in una data variabile, se il volume del componente calcolato con alcune variabili è inferiore ad un certo valore, ecc...) posso processare contemporaneamente il primo elemento della lista insieme al secondo e così via.
    Il tuo codice non sembra adattabile a qualunque scenario. Se avessi un ciclo for con 1000 righe di codice che volessi parallelizzare con Stream API queste 1000 righe di codice dove le potrei scrivere? In buona sostanza a me interessa rendere parallelo questo codice (è un esempio inventato, il mio caso reale sono le 1000 righe di codice, calcolo di volumi, perimetri, pesi specifici, ecc...):
    
    for(int i=0; i<dati.size(); i++){
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    }
    
    senza modificare di una virgola questo codice:
    
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    
    Quando programmo vorrei poter ragionare in questo modo:
    1) Il ciclo è lungo e richiede tempo? > Bene uso un ciclo for le cui operazioni interne avvengono in parallelo.
    2) Il ciclo non è lungo? > Bene uso un ciclo for classico.
    Per farti capire nel codice io dovrei vedere il codice seguente immutato da qualche parte
    
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    
    e da qualche altra parte una classe, un paio di classi o anche 100 classi immutabili che posso usare per trasformare un ciclo for in un ciclo for multithreading. Da quello che scrivi Stream API non sembra essere la soluzione giusta per il mio caso.
    In buona sostanza un metodo che mi permetta di passare da qui:
    
    for(int i=0; i<dati.size(); i++){
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    }
    
    a qui:
    
    Multithreading multithreading = new Multithreading();
    List<Integer> dati = multithreading.forMultithreading(List<Integer> dati, int inizio, int fine, int i){
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    }
    
    o qui:
    
    List<Integer> lista = ...;
    Multithreading multithreading = new Multithreading(List<Integer> dati, int inizio, int fine, int i){
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    }
    
    Quello che dovrei riuscire a fare sono:
    1) Classi immutabili che parallelizzano (es. Multithreading);
    2) Qualche riga di codice nella classe @Service che sto scrivendo in sostituzione di questo:
    
    for(int i=0; i<dati.size(); i++){
    }
    
    3) Codice parallelizzabile immutabile (esempio stupido ma buono per testare l'efficienza del metodo):
    
    	dati.set(i,dati.get(i)+1);
    	long somma = 0L;
    	for(int j=0; j<2111111111; j++){
    		somma++;
    	}
    
    P.S.: In merito alla tua nota sulla parallelizzazione ti ringrazio molto per l’appunto ma sono al corrente di quello che dici. L’unico dubbio che ho è sull’uso del multithreading quando in queste 1000 righe di codice di cui parlavo sopra ci sono anche delle query. La mia intenzione era quella di non parallelizzare i cicli for che contengono query perché queste ultime sono infinitamente più lente rispetto a Java. Concordi?
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    iBaffiPro ha scritto:


    queste 1000 righe di codice dove le potrei scrivere?
    Guarda che con la stream API non è quello il problema. Nell'esempio ho usato una lambda expression cortissima ( quel n -> n+1 ) ma in quel mapping potrebbe esserci la chiamata esplicita ad un metodo di 1000 righe o eventualmente un method reference che fa riferimento a quel metodo di 1000 righe. Non è quella la questione!!

    iBaffiPro ha scritto:


    
    for(int i=0; i<dati.size(); i++){
    	dati.set(i,dati.get(i)+1);
    
    Ok, partiamo da questa questione. Qui fai un dati.set( .... ) cioè stai SETTANDO il valore ad un indice di una lista. Finché fai tutto "sincrono" con un ciclo for così, non ci sono particolari questioni.
    Ma se questo set fosse parallelizzato E la collezione non è thread-safe (tipo un puro ArrayList), combina solo macelli pazzeschi, perché senza una apposita sincronizzazione, NON c'è garanzia di "visibilità" delle modifiche da parte di altri thread oltre alla eventualità concreta di "corrompere" la struttura dati interna al ArrayList.
    EDIT: precisazione, effettivamente con un set "secco" ad indice c'è poca probabilità di corrompere la struttura. Ci sarebbe invece con add/insert/remove che potenzialmente impattano sugli altri elementi. Ma il discorso comunque non cambia.

    Se al contrario la collezione fosse thread-safe (tipo un Vector o anche ArrayList ma incapsulato con Collections.synchronizedList() ) ci sarebbe garanzia di visibilità delle modifiche ma in queste collezioni thread-safe il locking avviene tipicamente con un lock esclusivo a livello di istanza della collezione. Quindi ci sarebbe un alto contendimento del lock da parte dei thread e questo farebbe degradare ulteriormente le prestazioni della parallelizzazione.

    Il mio grosso dubbio è che temo davvero non hai bene idea di cosa voglia dire usare il multi-threading, riguardo le questioni di sincronizzazione, mutua-esclusione, visibilità delle modifiche, ecc...
    E se non hai queste basi (come temo), diventa anche difficile risponderti e farti capire.
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    Le liste di cui parlo sono di 3 tipi:
    List<OggettoSpecifico> lista1 = new ArrayList<OggettoSpecifico>();
    List<List<Long>> lista2 = new ArrayList<List<Long>>();
    List<List<String>> lista3 = new ArrayList<List<String>>();
    Io in questi contesti eseguo solo ed esclusivamente delle modifiche, mai add/insert/remove.
    Su Stream API non mi pronuncio perché non ho mai avuto modo di testarlo ma quando si parla di multithread spesso nel codice si setta una variabile come 'shared' che permette di essere appunto condivisa tra i diversi thread. Forse Collections.synchronizedList() è quello che io chiamo 'shared' ma non mi pronuncio neppure su questo.
    E' chiaro che la condivisione di una risorsa un po' riduce le performance del programma ma in linea generale c'è un guadagno.
    In ogni caso i pasticci di cui parli nel mio caso non dovrebbero avvenire perché ogni singolo elemento della mia lista (OggettoSpecifico/Long/String) viene editato 1 volta sola e da 1 solo thread (vedi l'esempio con ForkJoinPool).
    Il programma che ho creato (vedi primo post) fa esattamente quello che desidero e funziona anche molto bene, l'unico difetto è che non riesco ad implementarlo nel codice con delle modalità comode come quelle di cui ti ho raccontato.
    Stream API ha il difetto di ricreare una nuova lista.
    Oggi finisco un programmino e domani provo a lavorare su questo Stream API.
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    Vorrei farti capire bene la questione MOLTO il generale. Altrimenti ti assicuro che non vai "avanti".

    Prova il seguente codice che ho scritto poco fa:
    import java.util.ArrayList;
    import java.util.List;
    
    public class ProvaMultiThread {
        public static void main(String[] args) throws InterruptedException {
            List<List<String>> listeStringhe = new ArrayList<>();
    
            for (int i = 0; i < 10; i++) {
                listeStringhe.add(new ArrayList<>());
            }
    
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    String threadName = Thread.currentThread().getName();
    
                    for (int n = 0; n < 1000; n++) {
                        listeStringhe.get(n % 10).add(threadName + "-" + n);
                    }
                }
            };
    
            Thread[] threads = new Thread[5];
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Thread(runnable, "T"+i);
            }
            for (Thread t : threads) {
                t.start();
            }
            for (Thread t : threads) {
                t.join();
            }
    
            for (int i = 0; i < 10; i++) {
                System.out.format("lista[%d]: %d elementi%n", i, listeStringhe.get(i).size());
            }
        }
    }
    C'è una lista principale che contiene 10 "sotto" liste (inizialmente vuote). Ci sono 5 thread, ciascuno inserisce 1000 valori sparpagliandoli in modo uniforme tra le varie sotto liste. Nota che la lista principale NON viene mai modificata dai thread, solo le sotto liste vengono modificate.
    Se la matematica non è una opinione, vuol dire che il main alla fine dovrebbe stampare che ci sono esattamente 500 elementi in ogni lista.

    Esegui il codice un po' di volte, cosa succede? Hai sempre lo stesso risultato? O .... risultati MOLTO diversi?
    Perché secondo te avviene quello che vedi nei risultati? E infine, la domanda "clou": quale sarebbe la cosa più semplice da fare per garantire il risultato che ci si aspetta in teoria (500 elementi in ogni lista)?


    EDIT 19/12: comunque chiedevi se è possibile realizzare qualcosa di molto riutilizzabile per parallelizzare la modifica di una lista. Sì, è perfettamente possibile. L'ho appena fatto e provato in 20 minuti usando come base il ForkJoinPool.
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    Eccomi ritornato, scusa per il lungo ritardo ma ho avuto un sacco di problemi.
    Ho letto il tuo codice, molto bello. Per risolvere basta sostituire questo:
    
            for (Thread t : threads) {
                t.start();
            }
            for (Thread t : threads) {
                t.join();
            }
    
    con questo:
    
            for (Thread t : threads) {
                t.start();
                t.join();
            }
    
    Tutti gli oggetti Thread condividono il metodo run(). Senza l'uso del metodo join() quando il primo fra i 4 thread termina la risorsa comune definita dal metodo run() cessa di essere fruibile agli altri 3 thread.
    Con il primo codice, i metodi join() vengono lanciati quando tutti i thread sono già partiti e non si fa in tempo a ricordare a tutti i thread di lasciare run() disponibile. Uno dei 4 thread termina senza che abbia ricevuto la direttiva join() rendendo run() non più disponibile. Questo fa sì che alcuni thread non riescano per poco a completare la serie di operazioni e quindi le liste immagazzinano un numero di elementi diversi.

    Comunque non riesco a capire come potrei utilizzare questo approccio per risolvere il mio problema. Sono curiosissimo di vedere cosa sei riuscito a scrivere se vuoi condividerlo.

    Grazie
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    iBaffiPro ha scritto:


    Per risolvere basta sostituire questo:
    [...]

    con questo:
    
            for (Thread t : threads) {
                t.start();
                t.join();
            }
    
    Ma non è quella la soluzione! Se fai così, non hai più concorrenza/parallelismo. Hai 1 solo thread per volta. Così ovvio che "funziona".
    Davvero non hai idea del perché succede quello che si vede provandolo? E non sapresti neanche dire cosa invece sarebbe giusto cambiare nel codice per avere il risultato SEMPRE corretto?
    Domanda extra: sai tra l'altro la differenza tra concorrenza e parallelismo?

    iBaffiPro ha scritto:


    Tutti gli oggetti Thread condividono il metodo run().
    Sì, nel mio codice c'è 1 solo oggetto Runnable che è quindi "condiviso" dai thread. Ma non è quello il punto o problema (avrei potuto creare 5 Runnable distinti e il problema ci sarebbe ancora perché è listeStringhe la risorsa "condivisa" che è il nocciolo della questione).

    iBaffiPro ha scritto:


    Senza l'uso del metodo join() quando il primo fra i 4 thread termina la risorsa comune definita dal metodo run() cessa di essere fruibile agli altri 3 thread.
    Con il primo codice, i metodi join() vengono lanciati quando tutti i thread sono già partiti e non si fa in tempo a ricordare a tutti i thread di lasciare run() disponibile. Uno dei 4 thread termina senza che abbia ricevuto la direttiva join() rendendo run() non più disponibile.
    Non vuol dire nulla. L'oggetto Runnable "esiste" fino alla fine del main.

    iBaffiPro ha scritto:


    Questo fa sì che alcuni thread non riescano per poco a completare la serie di operazioni e quindi le liste immagazzinano un numero di elementi diversi.
    No, mi spiace, NON è quello il problema. Non dipende da quando un thread finisce prima di un altro.

    iBaffiPro ha scritto:


    Comunque non riesco a capire come potrei utilizzare questo approccio per risolvere il mio problema.
    Il codice sopra era solo per vedere se avevi le idee chiare sul multithreading. Ho dedotto ora che non le hai molto ....

    iBaffiPro ha scritto:


    Sono curiosissimo di vedere cosa sei riuscito a scrivere se vuoi condividerlo.
    RIguardo quello che avevo detto prima

    andbin ha scritto:


    comunque chiedevi se è possibile realizzare qualcosa di molto riutilizzabile per parallelizzare la modifica di una lista. Sì, è perfettamente possibile. L'ho appena fatto e provato in 20 minuti usando come base il ForkJoinPool.
    Il codice dovrei averlo ancora (non mi pare di averlo eliminato). Lo recupero e lo posto appena possibile.
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    E' strano che non ci sia parallelismo con la mia soluzione perché alzando il numero delle iterazioni ed osservando il tash manager di windows vedo tutti i core al 100%.
    Si ho chiara la differenza tra concorrenza e parallelismo e ti dico anche che a me interessa il parallelismo perché mentre eseguo una data operazione con il mio IDE la cpu non viaggia al 100%. Io voglio parallelizzare le operazioni del mio IDE per ridurre i tempi di calcolo.
    Sono curioso di vedere la tua soluzione.
    Grazie
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    iBaffiPro ha scritto:


    Sono curioso di vedere la tua soluzione.
    Ok, ho ripreso il codice che avevo fatto. Se si vuole modificare una lista di oggetti in modo "parallelizzabile" (modificando quella lista, NON creando una nuova lista), si può fare usando il fork-join pool. Ma va fatto oculatamente, scrivendo la implementazione una volta sola e permettendo invece di "parametrizzare" il suo comportamento in modo opportuno.

    Innanzitutto si fa una functional interface.
    @FunctionalInterface
    public interface ElementMutator<T> {
        T apply(int index, T value);
    }
    Perché ho fatto una nuova interfaccia? Semplicemente perché tra le 43 functional interface in java.util.function non ce n'è purtroppo una con quella forma ( (int,T) --> T ). C'è il mio cheat-sheet su queste 43 interfacce.

    Nota che questa interfaccia NON ha alcuna nozione di lista o collezione. Riceve solamente un oggetto (e l'indice a cui si trova l'oggetto nella lista, che "potrebbe" tornare utile in certi casi) e può/deve restituire:
    - un nuovo oggetto, se gli oggetti sono immutabili
    - lo stesso oggetto modificato come stato, se gli oggetti sono mutabili

    Se l'indice non servisse, si potrebbe definire il tutto usando un più semplice UnaryOperator<T> la cui forma è T --> T.

    Poi la implementazione della modifica sulla lista con il fork-join pool.
    import java.util.List;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.RecursiveAction;
    
    public class ForkJoinListMutator {
        public static final int DEFAULT_SEQ_THRESHOLD = 10000;
    
        private static final ForkJoinListMutator defaultInstance =
                new ForkJoinListMutator(ForkJoinPool.commonPool());
    
        private final ForkJoinPool forkJoinPool;
    
        public ForkJoinListMutator(ForkJoinPool forkJoinPool) {
            this.forkJoinPool = forkJoinPool;
        }
    
        public static ForkJoinListMutator getDefault() {
            return defaultInstance;
        }
    
        public <T> void mutate(List<T> list, ElementMutator<T> mutator) {
            mutate(list, DEFAULT_SEQ_THRESHOLD, mutator);
        }
    
        public <T> void mutate(List<T> list, int seqThreshold, ElementMutator<T> mutator) {
            MutateTask<T> mainTask = new MutateTask<>(list, seqThreshold, mutator);
            forkJoinPool.invoke(mainTask);
        }
    
    
        private static class MutateTask<T> extends RecursiveAction {
            private static final long serialVersionUID = 1L;
    
            private final List<T> list;
            private final int start;
            private final int end;
            private final int seqThreshold;
            private final ElementMutator<T> mutator;
    
            public MutateTask(List<T> list, int seqThreshold, ElementMutator<T> mutator) {
                this(list, 0, list.size(), seqThreshold, mutator);
            }
    
            public MutateTask(List<T> list, int start, int end, int seqThreshold, ElementMutator<T> mutator) {
                this.list = list;
                this.start = start;
                this.end = end;
                this.seqThreshold = seqThreshold;
                this.mutator = mutator;
            }
    
            @Override
            protected void compute() {
                final int length = end - start;
    
                if (length <= seqThreshold) {
                    computeSequentially();
                } else {
                    MutateTask<T> leftTask = new MutateTask<>(list, start, start+length/2, seqThreshold, mutator);
                    leftTask.fork();
                    MutateTask<T> rightTask = new MutateTask<>(list, start+length/2, end, seqThreshold, mutator);
                    rightTask.compute();
                    leftTask.join();
                }
            }
    
            private void computeSequentially() {
                for (int i = start; i < end; i++) {
                    list.set(i, mutator.apply(i, list.get(i)));
                }
            }
        }
    }
    Un ForkJoinListMutator incapsula semplicemente un ForkJoinPool ed è immutabile. Lo stesso oggetto ForkJoinListMutator lo puoi RI-usare più volte per modificare tante liste anche di tipi differenti. Perché sono i due metodi mutate che sfruttano i generics (e non il ForkJoinListMutator in sé).

    La API è molto semplice: puoi ottenere un ForkJoinListMutator di "default" (che usa il "common" pool di default) oppure creare un nuovo ForkJoinListMutator con un ForkJoinPool esplicito (magari configurato diversamente dal common).
    Poi basta usare uno dei due mutate(), con/senza seqThreshold esplicito.

    Quindi es.:
    ForkJoinListMutator listMutator = ForkJoinListMutator.getDefault();
    Se hai una lista di stringhe e vuoi fare il reverse di tutte le stringhe:
    List<String> lista = //........
    listMutator.mutate(lista, (i, str) -> new StringBuilder(str).reverse().toString());
    Se avessi una lista di LocalDate e vuoi aggiungere 1 giorno a tutte le date:
    List<LocalDate> lista = //........
    listMutator.mutate(lista, (i, data) -> data.plusDays(1));
    Se (e ripeto SE) il ForkJoinListMutator possa risultare più performante di una modifica puramente sequenziale con un banale for-each, dipende da 2 fattori: 1) quanti elementi ci sono nella lista, 2) quanto "pesa" computazionalmente il lavoro di modifica di ciascun elemento.

    Per dire: se hai solo 1000 stringhe e vuoi fare il reverse di ciascuna, allora sicuramente il ForkJoinListMutator NON velocizza un bel niente.

    CONCLUSIONI:
    In generale, è così che devi ragionare quando c'è da fare cose di questo tipo. Si realizza un certo algoritmo (una volta sola!) e lo si rende "parametrabile". Parametrabile non solo in termini di semplici "dati" (es. quel seqThreshold) ma anche e soprattutto come "comportamenti". Cioè devi saper "staccare" l'algoritmo in sé da quella che è la logica particolare da fare per ciascun caso specifico che può essere anche molto variabile e differente. Lo si fa applicando il principio di astrazione e tipicamente tramite l'uso delle interfacce.
    Io l'ho fatto con una interfaccia che è in modo specifico una functional interface.


    Domanda aggiuntiva-extra: nel mio esempio di mutazione di un List<String> ho usato un StringBuilder per il reverse. Se la lista avesse es. 1000000 di stringhe, crea purtroppo 1000000 di oggetti StringBuilder che poi butta via. Ma nel contesto del computeSequentially() che lavora sequenzialmente su es. 10000 (default) oggetti, si POTREBBE benissimo riusare lo stesso oggetto StringBuilder (ad esempio resettando il suo length). Nota però che NON puoi banalmente usare un StringBuilder totalmente "globale", perché la lambda di implementazione viene chiamata da più/molti thread differenti, che causerebbe solo grandi disastri.

    Modificando leggermente il MutateTask (e modificando/aggiungendo metodi mutate() ), cosa pensi si potrebbe aggiungere per fare in modo che la lambda (o comunque la implementazione della functional interface) possa RI-usare uno/più oggetti di "contesto" ma SOLO all'interno di ciascun "run" del computeSequentially()? Se non ci arrivi ... te lo dico io.
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    Intanto grazie mille per il codice perché è uno spettacolo.
    L'unica cosa che non capisco è l'uso di questo codice:
    private static final long serialVersionUID = 1L;
    che scommetto ti ha aggiunto Eclipse. Su internet ci sono scritte molte cose ma io non ho mai capito in quali casi dimenticando la variabile nelle mie webapp possano nascere problemi. Nel mio progetto serialVersionUID non c'è in nessuna classe e funziona tutto senza intoppi.
    In merito alla domanda che hai posto sarei tentato di dirti che basta aggiungere questo oggetto dentro ForkJoinListMutator oppure dentro la sua classe annidata. Non saprei risponderti sinceramente.
    Ora provo ad implementare il tuo codice al mio progetto e vedo cosa esce fuori. Ho fatto qualche esperimento e nel mio caso (4 core) riduco di 4 volte i tempi di calcolo. Hehehe... mica noccioline Andrea!
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    iBaffiPro ha scritto:


    L'unica cosa che non capisco è l'uso di questo codice:
    private static final long serialVersionUID = 1L;
    che scommetto ti ha aggiunto Eclipse.
    Guarda, è la cosa MENO importante di tutto. Sì, l'ha aggiunta Eclipse ma dietro mia azione con il mouse.
    Riguarda la serializzazione degli oggetti, per il "versionamento" degli oggetti serializzati. La serializzazione non si usa quasi mai (è davvero raro, a meno di usare es. HttpSession in cui devi mettere oggetti serializzabili perché la sessione può essere "passivata", cioè spostata su disco).

    Ma se una classe deriva da Serializable (RecursiveAction è Serializable), il serialVersionUID va messo, altrimenti un IDE/compilatore "pignolo" dà un warning, anche se poi non si usa nulla della serializzazione!
    Solo un "proforma", insomma ...

    iBaffiPro ha scritto:


    In merito alla domanda che hai posto sarei tentato di dirti che basta aggiungere questo oggetto dentro ForkJoinListMutator oppure dentro la sua classe annidata. Non saprei risponderti sinceramente.
    No, no, sei lontano anni-luce!! Lo dico io ... non ci arriveresti neanche prima della fine dell'attuale Sanremo musicale ...

    Il ForkJoinListMutator (e il MutateTask) NON devono "sapere" nulla di un eventuale StringBuilder. Per un altro tipo di compito potrebbe servire un altro oggetto di contesto .. o magari più oggetti di contesto. Sarebbe impossibile/ingestibile trattare questo nel ForkJoinListMutator.

    Semplicemente: invece di passare al mutate() e MutateTask un ElementMutator<T>, passi un Supplier<ElementMutator<T>>

    Un Supplier serve generalmente per creare un nuovo oggetto ad ogni invocazione del suo get(), in pratica fa da "produttore" di oggetti.

    Il computeSequentially() cambia solo leggermente:
            private void computeSequentially() {
                ElementMutator<T> mutator = mutatorSupplier.get();
                // resto come prima ...
    Il computeSequentially semplicemente "stacca" un nuovo oggetto ElementMutator e lo usa SOLO per quel "run" del computeSequentially.

    Quello che poi cambia è nella chiamata al mutate().
    Invece di:
    listMutator.mutate(lista, (i, str) -> new StringBuilder(str).reverse().toString());
    Ci sarà:
    listMutator.mutate(lista, () -> {
        StringBuilder buffer = new StringBuilder();
        return (i, str) -> {
            buffer.setLength(0);
            return buffer.append(str).reverse().toString();
        };
    });
    Sì è una lambda che restituisce un'altra lambda!
    Quando il get() del Supplier viene invocato, viene eseguito quel primo blocco { } del Supplier. Vuol dire che crea quella variabile locale buffer e restituisce un nuovo oggetto Mutator<String> implementato dalla seconda lambda interna.
    Ma questo oggetto si tiene "in pancia" il riferimento al buffer. Quindi per tutto il run del computeSequentially viene RI-usato lo stesso StringBuilder, quindi MENO oggetti creati e buttati via. E' tutto assolutamente safe perché dentro il computeSequentially è tutto sequenziale, non ci sono problemi.
    Ad un'altra invocazione di computeSequentially verrà fornito un altro Mutator<String> ecc...

    E il ForkJoinListMutator/MutateTask non ne sanno assolutamente nulla di tutto questo!!

    Ci saresti mai potuto arrivare?
  • Re: Creare un codice indipendente ed immutabile per far lavorare in multithreading un ciclo for parallelizzabile

    No no, assolutamente no!
Devi accedere o registrarti per scrivere nel forum
47 risposte