Concurrency Utilities

Java has supported multithreading and synchronization from the start, using features such as Runnable interface, Thread class, synchronized keyword and wait()/notify() methods (see MultiThreaded Programming), however through the new versions of Java concurrency features were added and improved, to especially handle intensive use of multiple threads. Features such as semaphores, thread pools and execution managers have now been added. One feature is the Fork/Join Framework which when used with servers that have multiple CPU cores makes it easier to develop true parallel concurrency programs (not just time-slicing). However multithreading applications do pose some issues as well as it becomes more complex to make sure that application behave themselves as debugging and testing does become more harder.

Java parallel programming is a very advanced and complex topic, I will cover the basics to get you going and touch on the most common concurrency features and will then leave you to the internet to extend your knowledge further, One important thing to remember is that complex parallel program does require more testing and very difficult to replicate true running operations.

Using synchronization we are trying to prevent to consitions from happening

Before we start taking a look at the concurrency utilties I would like to make one point that these do not replace the traditional multi-threading and synchronization technologies but complements them when extra control is needed, a basic understanding of threads is required.

I have covered a number of thread safe queues in my data structure queues section.

I have not covered MapReduce which is a software framework for easily writing applications which process vast amounts of data in-parallel on large clusters which contains many nodes in a reliable, fault-tolerant manner, I will leave you to the internet for information.

Concurrent API Packages

The Java Concurrency utilties are in the java.util.concurrent and its two subpackages java.util.atomic and java.util.concurrent.locks

java.util.concurrent defines the core features, synchronizers offer high-level of synchronization between multiple threads

  • Synchronizers
    • Semaphore - implements classic semaphore
    • CountDownLatch - waits until a specified number of events have occurred
    • CyclicBarrier - enables a group of threads to wait at a predefined execution point
    • Exchanger - exchanges data between two threads
    • Phaser - synchronizes threads that advance through multiple phases of an operation
  • Executors
  • Concurrent collections
  • Fork/Join Framework
java.util.atomic you can use variables in a concurrent environment, this allows you to change variables without the use of locks, classes such AtomicInteger and AtomicLong allow you to do this.
java.util.concurrent.locks provides an alternative to the use of synchronized methods, you use the Lock interface which defines the basic mechanism used to acquire and relinquish access to an Object. The key methods are lock(), tryLock() ans unlock, this gives you greater control over synchronization.

Synchronization Objects

Synchronization objects are supported by the Semaphore, CountDownLatch, CyclicBarrier, Exchanger and Phaser. Each one handles synchronization differently and to be used for different synchronization problems.

Firstly we will start with Semaphore, it controls access to a shared resource through the use of its counter. If the counter is greater than zero then access is granted, if its zero then access is denied. A Thread requires a permit form the semaphore be gaining access it it, once it gains access the counter is decreased when the counter hits zero no more access is allowed, once the Thread has finished the counter is then increased. The semaphore constructor at a minimum takes the counter starting number, this indicates how many Threads can have access at the same time, you also have a option to you can ensure that waiting threads are granted a permit in the order in which they requested access.

Semaphores are great for producer/consumer types of solutions.

Semaphore example
import java.util.concurrent.Semaphore;

public class ConcurrencyTest1 {

    public static void main(String[] args) {

        Semaphore sem = new Semaphore(1);             // create the semaphore with only 1 permit

        new IncThread(sem, "A");                      // create a Runnable class that starts the Thread upon construction 
        new DecThread(sem, "B");                      // create a Runnable class that starts the Thread upon construction
    }
}

// A shared resource.
class Shared {
    static int count = 0;
}

// A thread of execution that increments count.
class IncThread implements Runnable {
    String name;
    Semaphore sem;

    IncThread(Semaphore s, String n) {
        sem = s;
        name = n;
        new Thread(this).start();                             // start the Thread straight away
    }

    public void run() {

        System.out.println("Starting " + name);

        try {
            // First, acquire a permit.
            System.out.println(name + " is waiting for a permit.");
            sem.acquire();                                                 
            System.out.println(name + " gets a permit.");

            // Now, access shared resource.
            for(int i=0; i < 5; i++) {
                Shared.count++;
                System.out.println(name + ": " + Shared.count);

                // Now, allow a context switch -- if possible.
                Thread.sleep(10);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc);
        }

        // Release the permit.
        System.out.println(name + " releases the permit.");
        sem.release();
    }
}

// A thread of execution that decrements count.
class DecThread implements Runnable {
    String name;
    Semaphore sem;

    DecThread(Semaphore s, String n) {
        sem = s;
        name = n;
        new Thread(this).start();                             // start the Thread straight away
    }

    public void run() {

        System.out.println("Starting " + name);

        try {
            // First, acquire a permit.
            System.out.println(name + " is waiting for a permit.");
            sem.acquire();
            System.out.println(name + " gets a permit.");

            // Now, access shared resource.
            for(int i=0; i < 5; i++) {
                Shared.count--;
                System.out.println(name + ": " + Shared.count);

                // Now, allow a context switch -- if possible.
                Thread.sleep(10);
            }
        } catch (InterruptedException exc) {
            System.out.println(exc);
        }

        // Release the permit.
        System.out.println(name + " releases the permit.");
        sem.release();
    }
}

Output
------------------------------------------
Starting A
Starting B
B is waiting for a permit.
A is waiting for a permit.
B gets a permit.
B: -1
B: -2
B: -3
B: -4
B: -5
B releases the permit.
A gets a permit.
A: -4
A: -3
A: -2
A: -1
A: 0
A releases the permit.
Consumer/Producer using Semaphore
import java.util.concurrent.Semaphore; 
 
class Q { 
  int n; 
 
  // Start with consumer semaphore unavailable. 
  static Semaphore semCon = new Semaphore(0); 
  static Semaphore semProd = new Semaphore(1); 
 
  void get() { 
    try { 
      semCon.acquire(); 
    } catch(InterruptedException e) { 
      System.out.println("InterruptedException caught"); 
    } 
 
     System.out.println("Got: " + n); 
     semProd.release(); 
  } 
 
  void put(int n) { 
    try { 
      semProd.acquire(); 
    } catch(InterruptedException e) { 
      System.out.println("InterruptedException caught"); 
    } 
 
    this.n = n; 
    System.out.println("Put: " + n); 
    semCon.release(); 
  } 
} 
 
class Producer implements Runnable { 
  Q q; 
 
  Producer(Q q) { 
    this.q = q; 
    new Thread(this, "Producer").start(); 
  } 
 
  public void run() { 
    for(int i=0; i < 20; i++) q.put(i); 
  } 
} 
 
class Consumer implements Runnable { 
  Q q; 
 
  Consumer(Q q) { 
    this.q = q; 
    new Thread(this, "Consumer").start(); 
  } 
 
  public void run() { 
    for(int i=0; i < 20; i++)  q.get(); 
  } 
} 
 
class ProdCon { 
  public static void main(String args[]) { 
    Q q = new Q(); 
    new Consumer(q); 
    new Producer(q); 
  } 
}

Output
---------------------------------
Put: 0
Got: 0
Put: 1
Got: 1
Put: 2
Got: 2
Put: 3
Got: 3
Put: 4
...

Now we look at CountDownLatch which will make a Thread wait until a number of events have occurred, when created you specify the number of events that occur before the latch is released. Each time an event happens the count is decreamented when the count reaches zero the latch opens. When you create the CountDownLtach Object you specify the number of events that must occur before the latch is open.

CountDownLatch example
import java.util.concurrent.CountDownLatch;

public class ConcurrencyTest3 {

    public static void main(String[] args) {

        // 5 events have to happen before the latch is opened
        CountDownLatch cdl = new CountDownLatch(5);

        System.out.println("Starting");

        new MyThread(cdl);

        try {
            cdl.await();
        } catch (InterruptedException exc) {
            System.out.println(exc);
        }

        System.out.println("Done");
    }
}

class MyThread implements Runnable {
    CountDownLatch latch;

    MyThread(CountDownLatch c) {
        latch = c;
        new Thread(this).start();
    }

    public void run() {
        for(int i = 0; i < 5; i++) {
            System.out.println(i);
            latch.countDown(); // decrement count
        }
    }
}

Output
--------------------------
Starting
0
1
2
3
4
Done

A CyclicBarrier is were it enables you to define a synchronization Object that suspends until the specified number of Threads has reached the barrier point, the action which is a Runnable is then executed. When you create a CyclicBarrier you specify the numbers of threads to reach the barrier and the action to be taken when it does.

CyclicBarrier example
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ConcurrencyTest4 {

    public static void main(String[] args) {
        CyclicBarrier cb = new CyclicBarrier(3, new BarAction() );

        System.out.println("Starting");

        new MyThread4(cb, "A");
        new MyThread4(cb, "B");
        new MyThread4(cb, "C");
    }
}

// A thread of execution that uses a CyclicBarrier.
class MyThread4 implements Runnable {
    CyclicBarrier cbar;
    String name;

    MyThread4(CyclicBarrier c, String n) {
        cbar = c;
        name = n;
        new Thread(this).start();
    }

    public void run() {
        System.out.println(name);

        try {
            cbar.await();
        } catch (BrokenBarrierException exc) {
            System.out.println(exc);
        } catch (InterruptedException exc) {
            System.out.println(exc);
        }
    }
}

// An object of this class is called when the CyclicBarrier ends.
class BarAction implements Runnable {
    public void run() {
        System.out.println("Barrier Reached!");
    }
}

Output
--------------------
Starting
A
B
C
Barrier Reached!

An Exchanger allows the exchange of data between two threads, it simply waits until two separate threads call its exchange() method, when this occurs the data it exchanges the data supplied by the threads. This is idea if you have one thread preparing a buffer for receiving data and another thread that fills the buffer with data.

Exchanger example
import java.util.concurrent.Exchanger;

public class ConcurrencyTest5 {

    public static void main(String[] args) {
        Exchanger<String> exgr = new Exchanger<String>();

        new UseString(exgr);
        new MakeString(exgr);
    }
}

// A Thread that constructs a string.
class MakeString implements Runnable {
    Exchanger<String> ex;
    String str;

    MakeString(Exchanger<String> c) {
        ex = c;
        str = new String();

        new Thread(this).start();
    }

    public void run() {
        char ch = 'A';

        for(int i = 0; i < 3; i++) {

            // Fill Buffer
            for(int j = 0; j < 5; j++)
                str += ch++;

            try {
                // Exchange a full buffer for an empty one.
                str = ex.exchange(str);
            } catch(InterruptedException exc) {
                System.out.println(exc);
            }
        }
    }
}

// A Thread that uses a a string.
class UseString implements Runnable {
    Exchanger<String> ex;
    String str;

    UseString(Exchanger<String> c) {
        ex = c;
        new Thread(this).start();
    }

    public void run() {

        for(int i=0; i < 3; i++) {
            try {
                // Exchange an empty buffer for a full one.
                str = ex.exchange(new String());
                System.out.println("Got: " + str);
            } catch(InterruptedException exc) {
                System.out.println(exc);
            }
        }
    }
}

Output
----------------------------------------
Got: ABCDE
Got: FGHIJ
Got: KLMNO

The Phaser enables the synchronization of threads that represent one or more phases of activity, say you have 3 phases in booking a ticket at a cinema, first you get movie infomation, secondly you take the payment and thirdly you email tickets, each of these phases are handle by threads. Phaser is a bit like CyclicBarrier you can construct a phaser with a registration count of zero or to numParties, parties is used when using a Phaser. Fiurst you create aPhaser then register one or more parties using the register() method or by using the constructor to specify the number of parties. For each registered party have the phaser wait until all regsitered parties complete a phase, a part signals this by using ones of the methods arrive(), arriveAndAwaitAdvanced().

Phaser example
import java.util.concurrent.Phaser;

public class ConcurrencyTest6 {

    public static void main(String[] args) {

        Phaser phsr = new Phaser(1);
        int curPhase;

        System.out.println("Starting");

        new MyThread6(phsr, "A");
        new MyThread6(phsr, "B");
        new MyThread6(phsr, "C");

        // Wait for all threads to complete phase one.
        curPhase = phsr.getPhase();
        phsr.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " Complete");

        // Wait for all threads to complete phase two.
        curPhase =  phsr.getPhase();
        phsr.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " Complete");

        curPhase =  phsr.getPhase();
        phsr.arriveAndAwaitAdvance();
        System.out.println("Phase " + curPhase + " Complete");

        // Deregister the main thread.
        phsr.arriveAndDeregister();

        if(phsr.isTerminated())
            System.out.println("The Phaser is terminated");
    }
}

// A thread of execution that uses a Phaser.
class MyThread6 implements Runnable {
    Phaser phsr;
    String name;

    MyThread6(Phaser p, String n) {
        phsr = p;
        name = n;
        phsr.register();
        new Thread(this).start();
    }

    public void run() {

        System.out.println("Thread " + name + " Beginning Phase One");
        phsr.arriveAndAwaitAdvance(); // Signal arrival.

        // Pause a bit to prevent jumbled output. This is for illustration
        // only. It is not required for the proper opration of the phaser.
        try {
            Thread.sleep(10);
        } catch(InterruptedException e) {
            System.out.println(e);
        }

        System.out.println("Thread " + name + " Beginning Phase Two");
        phsr.arriveAndAwaitAdvance(); // Signal arrival.

        // Pause a bit to prevent jumbled output. This is for illustration
        // only. It is not required for the proper opration of the phaser.
        try {
            Thread.sleep(10);
        } catch(InterruptedException e) {
            System.out.println(e);
        }

        System.out.println("Thread " + name + " Beginning Phase Three");
        phsr.arriveAndDeregister(); // Signal arrival and deregister.
    }
}

Output
---------------------------
Starting
Thread C Beginning Phase One
Thread A Beginning Phase One
Thread B Beginning Phase One
Phase 0 Complete
Thread A Beginning Phase Two
Thread C Beginning Phase Two
Thread B Beginning Phase Two
Phase 1 Complete
Thread C Beginning Phase Three
Thread A Beginning Phase Three
Thread B Beginning Phase Three
Phase 2 Complete
The Phaser is terminated
Phaser example 2
import java.util.concurrent.Phaser;

public class ConcurrencyTest7 {

    public static void main(String[] args) {
        MyPhaser phsr = new MyPhaser(1, 4);     // parties = 1, phaseCount = 4

        System.out.println("Starting\n");

        new MyThread7(phsr, "A");
        new MyThread7(phsr, "B");
        new MyThread7(phsr, "C");

        // Wait for the specified number of phases to complete..
        while(!phsr.isTerminated()) {
            phsr.arriveAndAwaitAdvance();
        }

        System.out.println("The Phaser is terminated");
    }
}

// Extend MyPhaser to allow only a specific number of phases
// to be executed.
class MyPhaser extends Phaser {
    int numPhases;

    MyPhaser(int parties, int phaseCount) {
        super(parties);
        numPhases = phaseCount - 1;
    }

    // Override onAdvance() to execute the specified
    // number of phases.
    protected boolean onAdvance(int p, int regParties) {
        // This println() statement is for illustration only.
        // Normally, onAdvance() will not display output.
        System.out.println("Phase " + p + " completed.\n");

        // If all phases have completed, return true
        if(p == numPhases || regParties == 0) return true;

        // Otherwise, return false.
        return false;
    }
}

// A thread of execution that uses a Phaser.
class MyThread7 implements Runnable {
    Phaser phsr;
    String name;

    MyThread7(Phaser p, String n) {
        phsr = p;
        name = n;
        phsr.register();
        new Thread(this).start();
    }

    public void run() {

        while(!phsr.isTerminated()) {
            System.out.println("Thread " + name + " Beginning Phase " +
                    phsr.getPhase());

            phsr.arriveAndAwaitAdvance();

            // Pause a bit to prevent jumbled output. This is for illustration
            // only. It is not required for the proper operation of the phaser.
            try {
                Thread.sleep(10);
            } catch(InterruptedException e) {
                System.out.println(e);
            }
        }

    }
}

Output
--------------------------------------------
Starting

Thread C Beginning Phase 0
Thread A Beginning Phase 0
Thread B Beginning Phase 0
Phase 0 completed.

Thread B Beginning Phase 1
Thread A Beginning Phase 1
Thread C Beginning Phase 1
Phase 1 completed.

Thread B Beginning Phase 2
Thread A Beginning Phase 2
Thread C Beginning Phase 2
Phase 2 completed.

Thread B Beginning Phase 3
Thread A Beginning Phase 3
Thread C Beginning Phase 3
Phase 3 completed.

The Phaser is terminated

Using an Executor

An Executor initiates and controls the execution of Threads, it offers an alternative to managing threads through the Thread class. You use the Executor interface which defines a method called execute(Runable thread), the Thread specified is then executed.

Executor example
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrencyTest8 {

    public static void main(String[] args) {
        CountDownLatch cdl = new CountDownLatch(5);
        CountDownLatch cdl2 = new CountDownLatch(5);
        CountDownLatch cdl3 = new CountDownLatch(5);
        CountDownLatch cdl4 = new CountDownLatch(5);
        ExecutorService es = Executors.newFixedThreadPool(2);

        System.out.println("Starting");

        // Start the threads. 
        es.execute(new MyThread8(cdl, "A"));
        es.execute(new MyThread8(cdl2, "B"));
        es.execute(new MyThread8(cdl3, "C"));
        es.execute(new MyThread8(cdl4, "D"));

        try {
            cdl.await();
            cdl2.await();
            cdl3.await();
            cdl4.await();
        } catch (InterruptedException exc) {
            System.out.println(exc);
        }

        es.shutdown();
        System.out.println("Done");
    }
}

class MyThread8 implements Runnable {
    String name;
    CountDownLatch latch;

    MyThread8(CountDownLatch c, String n) {
        latch = c;
        name = n;
        new Thread(this);
    }

    public void run() {
        for(int i = 0; i < 5; i++) {
            System.out.println(name + ": " + i);
            latch.countDown();
        }
    }
}

Callable and Future

A Callable represents a Thread that returns a value, you can use Callable Objects to compute a result and then return it to the invoking thread. This is powerful mechanism because it can be used in numerical computations in which partial results are computed simultaneously, you can also use it to return a status code if the task was successful. A Future is the value that will be returned by a Callable Object, this is because the value is obtained sometime into the future.

You can use the TimeUnit Enumeration and is used to specify the granularity of the timing, you can use the TimeUnit in the future.

Callable and Future example
import java.util.concurrent.*;

public class ConcurrencyTest9 {

    public static void main(String[] args) {

        ExecutorService es = Executors.newFixedThreadPool(3);
        Future<Integer> f = null;
        Future<Double> f2;
        Future<Integer> f3;

        System.out.println("Starting");

        f = es.submit(new Sum(10));
        f2 = es.submit(new Hypot(3, 4));
        f3 = es.submit(new Factorial(5));

        try {
            System.out.println(f.get(10, TimeUnit.SECONDS));    // Can use TimeUnit
            System.out.println(f2.get());
            System.out.println(f3.get());
        } catch (InterruptedException exc) {
            System.out.println(exc);
        }
        catch (ExecutionException exc) {
            System.out.println(exc);
        }
        catch (Exception exc){
            System.out.println(exc);
        }

        es.shutdown();
        System.out.println("Done");
    }
}

// Following are three computational threads.
class Sum implements Callable<Integer> {
    int stop;

    Sum(int v) { stop = v; }

    public Integer call() {
        int sum = 0;
        for(int i = 1; i <= stop; i++) {
            sum += i;
        }
        return sum;
    }
}

class Hypot implements Callable<Double> {
    double side1, side2;

    Hypot(double s1, double s2) {
        side1 = s1;
        side2 = s2;
    }

    public Double call() {
        return Math.sqrt((side1*side1) + (side2*side2));
    }
}

class Factorial implements Callable<Integer> {
    int stop;

    Factorial(int v) { stop = v; }

    public Integer call() {
        int fact = 1;
        for(int i = 2; i <= stop; i++) {
            fact *= i;
        }
        return fact;
    }
}

Output
------------------------------
Starting
55
5.0
120
Done

Concurrent Collections

I have covered the Collections Framework in another secition, however there are a number of collections that have been engineered for concurrent operations, the offer the same features but can be used in synchronous environments.

Concurrent Collections
ArrayBlockingQueue ConcurrentSkipListSet LinkedBlockingQueue
ConcurrentHashMap CopyOnWriteArrayList LinkedTransferQueue
ConcurrentLinkedDeque CopyOnWriteArraySet PriorityBlockingQueue
ConcurrentLinkedQueue DelayQueue SynchronousQueue
CurrentSkipListMap LinkedBlockingDeque

Locks

Java provides support for locks (the ReentrantLock being one of them), which are Objects that offer an alternative to using synchronization to gain access to a shared resource. A lock protects a shared resource, granting/invoking the lock gives you access the the resource, threads are suspended until they obtain the lock. Two methods lock() and unlock() ares used to control the resouce, can also use the tryLock() method to see if the lock is available. The downside is that you may forget to provide a unlock or an exception happens and the unlock never gets reached this causes resource to be locked forever, however to get around this problem we must always use a try-catch-finally block, using the finally to make sure the resource is unlocked. You also have the lockInterruptibly() that can also unlock locked code.

If you require a read/write lock you can use the ReentrantReadWriteLock, if you have a write lock then no readers can access, a write lock can only be obtained if no readers have access. Thus data ia always protected for race conditions.

lock example
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrencyTest10 {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();

        new LockThread(lock, "A");
        new LockThread(lock, "B");
    }
}

// A static shared resource.
class Shared10 {
    static int count = 0;
}

// A thread of execution that increments count.
class LockThread implements Runnable {
    String name;
    ReentrantLock lock;

    LockThread(ReentrantLock lk, String n) {
        lock = lk;
        name = n;
        new Thread(this).start();
    }

    public void run() {

        System.out.println("Starting " + name);

        try {
            // First, lock count.
            System.out.println(name + " is waiting to lock count.");
            lock.lock();
            System.out.println(name + " is locking count.");

            Shared10.count++;
            System.out.println(name + ": " + Shared.count);

            // Now, allow a context switch -- if possible.
            System.out.println(name + " is sleeping.");
            Thread.sleep(1000);
        // you could use lockInterruptibly and catch a InterruptedException		
        } catch (InterruptedException exc) {
            System.out.println("Perform some cleanup" + exc);
        } finally {
            // Always use a finally block to Unlock any locks
            System.out.println(name + " is unlocking count.");
            lock.unlock();
        }
    }
}

Output
---------------------------------
Starting A
Starting B
A is waiting to lock count.
B is waiting to lock count.
B is locking count.
B: 0
B is sleeping.
B is unlocking count.
A is locking count.
A: 0
A is sleeping.
A is unlocking count.

Atomic Operations

Atomic operation is if an operation or a set of operations appear to the rest of the system as if it occurred at once, it must appear as a single step (all or nothing).

Another synchronous alternative is to use the atomic package, this allows you to access shared data using no synchronous or locks mechanisms, you have AtomicInteger, AtomicLong and AtomicReference classes, later Java releases are adding more features to the atomic package, so best to look up what is on offer. There are a number of methods that you can use getAndSet(), get() and set(), etc.

Atomic example
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrentTest11 {

    public static void main(String[] args) {
        new AtomThread("A");
        new AtomThread("B");
        new AtomThread("C");
    }
}

class Shared11 {
    static AtomicInteger ai = new AtomicInteger(0);
}

// A thread of execution that increments count.
class AtomThread implements Runnable {
    String name;

    AtomThread(String n) {
        name = n;
        new Thread(this).start();
    }

    public void run() {

        System.out.println("Starting " + name);

        for(int i=1; i <= 3; i++)
            System.out.println(name + " got: " +
                    Shared11.ai.getAndSet(i));
    }
}

Output
--------------------------
Starting A
Starting C
Starting B
C got: 1
B got: 1
A got: 0
B got: 2
C got: 1
B got: 2
A got: 2
C got: 3
A got: 3

Fork/Join Framework

Lastly I want to cover the Fork/Join Framework, this is an advanced feature in Java to truly use parallel processing on multi-core servers (not time-slicing), it simplifies the creation and use of multiple threads and it will make use of all cpu cores. This has a big advantage whe using cloud technically where system can grow and strink automatcially depending on load. Idea applications usage include sorting, transforming or searching large amounts of data, where multiple threads are working on different cores simultaneously. You can also specify the level of parallelism that you want as well, this is done when you construct the pool

The Fork/Join Framework includes a number of main classes as seen below

Fork/Join Classes
ForkJoinTask an abstract class that defines a task
ForkJoinPool manages the execution of ForkJoinTasks
RecursiveAction used for tasks that do not return values
RecursiveTask used for tasks that do return values

Fork/Join Framework is ideal for Divide-and-Conquer strategies because of its recursive features, hence the classes RecursiveAction and RecursiveTask, you first create the tasks then create the pool that will action the tasks in simplest terms. I have giving you a couple of demos but you can also cancel a task, restart a task, determines a tasks completion status, execute a task asynchronously and I point you to the Java documentation for the reminder of features the Fork/Join Framework offers.

Fork/Join simple example
package uk.co.datadisk;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

class SimpleRecursiveAction extends RecursiveAction {

    private int simulatedWork;

    public SimpleRecursiveAction(int simulatedWork) {
        this.simulatedWork = simulatedWork;
    }

    @Override
    protected void compute() {

        if( simulatedWork > 100 ) {

            System.out.println("Parallel execution and split the tasks..." + simulatedWork);

            SimpleRecursiveAction simpleRecursiveAction1 = new SimpleRecursiveAction(simulatedWork/2);
            SimpleRecursiveAction simpleRecursiveAction2 = new SimpleRecursiveAction(simulatedWork/2);

            simpleRecursiveAction1.fork();
            simpleRecursiveAction2.fork();

        } else {
            System.out.println("No need for parallel execution, sequential is OK for this task..." + simulatedWork);
        }
    }
}

public class ForkJoin1 {

    public static void main(String[] args) {

        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        SimpleRecursiveAction simpleRecursiveAction = new SimpleRecursiveAction(400);
        forkJoinPool.invoke(simpleRecursiveAction);
    }
}
Fork/Join example
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;

public class ConcurrencyTest12 {

    public static void main(String[] args) {

        // Create a task pool.
        ForkJoinPool fjp = new ForkJoinPool();

        double[] nums = new double[100000];

        // Give nums some values. 
        for(int i = 0; i < nums.length; i++)
            nums[i] = (double) i;

        System.out.println("A portion of the original sequence:");

        for(int i=0; i < 10; i++)
            System.out.print(nums[i] + " ");
        System.out.println("\n");

        SqrtTransform task = new SqrtTransform(nums, 0, nums.length);

        // Start the main ForkJoinTask.
        fjp.invoke(task);

        System.out.println("A portion of the transformed sequence (to four decimal places):");
        for(int i=0; i < 10; i++)
            System.out.format("%.4f ", nums[i]);
        System.out.println();
    }
}

// A ForkJoinTask (via RecursiveAction) that transforms
// the elements in an array of doubles into their square roots.
class SqrtTransform extends RecursiveAction {
    // The threshold value is arbitrarily set at 1,000 in this example.
    // In real world code, its optimal value can be determined by
    // profiling and experimentation.
    final int seqThreshold = 1000;

    // Array to be accessed.
    double[] data;

    // Determines what part of data to process.
    int start, end;

    SqrtTransform(double[] vals, int s, int e ) {
        data = vals;
        start = s;
        end = e;
    }

    // This is the method in which parallel computation will occur.
    protected void compute() {

        // If number of elements is below the sequential threshold,
        // then process sequentially.
        if((end - start) < seqThreshold) {
            // Transform each element into its square root.
            for(int i = start; i < end; i++) {
                data[i] = Math.sqrt(data[i]);
            }
        }
        else {
            // Otherwise, continue to break the data into smaller peices.

            // Find the midpoint.
            int middle = (start + end) / 2;

            // Invoke new tasks, using the subdivided data.
            invokeAll(new SqrtTransform(data, start, middle),
                    new SqrtTransform(data, middle, end));
        }
    }
}

Output
-----------------------------------
A portion of the original sequence:
0.0 1.0 2.0 3.0 4.0 5.0 6.0 7.0 8.0 9.0 

A portion of the transformed sequence (to four decimal places):
0.0000 1.0000 1.4142 1.7321 2.0000 2.2361 2.4495 2.6458 2.8284 3.0000 
Fork/Join example
package uk.co.datadisk;

import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

class MaximumFindTask extends RecursiveTask<Long> {

    private long[] nums;
    private int lowIndex;
    private int highIndex;

    public MaximumFindTask(long[] nums, int lowIndex, int highIndex) {
        this.highIndex = highIndex;
        this.lowIndex = lowIndex;
        this.nums = nums;
    }

    @Override
    protected Long compute() {

        if( highIndex - lowIndex < ForkJoin2.THREASHOLD ) {
            return sequentialMaxFinding();
        } else {

            int middleIndex = (lowIndex+highIndex) / 2;

            MaximumFindTask leftSubtask = new MaximumFindTask(nums, lowIndex, middleIndex);
            MaximumFindTask rightSubtask = new MaximumFindTask(nums, middleIndex, highIndex);

            invokeAll(leftSubtask, rightSubtask);

            return Math.max(leftSubtask.join(), rightSubtask.join());
        }
    }

    private long sequentialMaxFinding() {

        long max = nums[0];

        for(int i=lowIndex; i < highIndex;++i)
            if( nums[i] > max)
                max = nums[i];

        return max;
    }
}

class SequentialMaxFind {

    public long sequentialMaxFinding(long[] nums, int highIndex) {

        long max = nums[0];

        for (int i = 0; i < highIndex; ++i)
            if (nums[i] > max)
                max = nums[i];

        return max;
    }
}

public class ForkJoin2 {

    public static int THREASHOLD = 0;

    public static void main(String[] args) {

        long[] nums = initializeNums();
        THREASHOLD =  nums.length / Runtime.getRuntime().availableProcessors();

        SequentialMaxFind normalMaxFind = new SequentialMaxFind();

        long start = System.currentTimeMillis();
        System.out.println("Max: " + normalMaxFind.sequentialMaxFinding(nums, nums.length));
        System.out.println("Time taken: " + (System.currentTimeMillis() - start) + "ms");

        System.out.println();

        ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
        MaximumFindTask findTask = new MaximumFindTask(nums, 0, nums.length);

        start = System.currentTimeMillis();
        System.out.println("Max: " + forkJoinPool.invoke(findTask));
        System.out.println("Time taken: " + (System.currentTimeMillis() - start) + "ms");
    }

    private static long[] initializeNums() {

        Random random = new Random();

        long[] nums = new long[300000000];

        for(int i=0;i<300000000;++i)
            nums[i] = random.nextInt(100);

        return nums;
    }
}
Recursive and Increase Parallelism example
import java.util.concurrent.*;

// Demonstrate parallel execution.
class ConcurrencyTest13 {
    public static void main(String args[]) {
        // Create a task pool.
        ForkJoinPool fjp = new ForkJoinPool(4);             // use parallelism of 4

        double[] nums = new double[5000];

        // Initialize nums with values that alternate between
        // positive and negative.
        for(int i=0; i < nums.length; i++)
            nums[i] = (double) (((i%2) == 0) ? i : -i) ;

        Sum task = new Sum(nums, 0, nums.length);

        // Start the ForkJoinTasks.  Notice that in this case,
        // invoke() returns a result.
        double summation = fjp.invoke(task);

        System.out.println("Summation " + summation);
    }
}

// A RecursiveTask that computes the summation of an array of doubles.
class Sum extends RecursiveTask<Double> {

    // The sequential threshold value.
    final int seqThresHold = 500;

    // Array to be accessed.
    double[] data;

    // Determines what part of data to process.
    int start, end;

    Sum(double[] vals, int s, int e ) {
        data = vals;
        start = s;
        end = e;
    }

    // Find the summation of an array of doubles.
    protected Double compute() {
        double sum = 0;

        // If number of elements is below the sequential threshold,
        // then process sequentially.
        if((end - start) < seqThresHold) {
            // Sum the elements.
            for(int i = start; i < end; i++) sum += data[i];
        }
        else {
            // Otherwise, continue to break the data into smaller pieces.

            // Find the midpoint.
            int middle = (start + end) / 2;

            // Invoke new tasks, using the subdivided data.
            Sum subTaskA = new Sum(data, start, middle);
            Sum subTaskB = new Sum(data, middle, end);

            // Start each subtask by forking.
            subTaskA.fork();
            subTaskB.fork();

            // Wait for the subtasks to return, and aggregate the results.
            sum = subTaskA.join() + subTaskB.join();
        }
        // Return the final sum.
        return sum;
    }
}

Output
-----------------------------
Summation -2500.0