Tuesday, August 28, 2012

Concurrency- APIs


Concurrency- APIs

Now days everyone wants to know about new Concurrency API , but after lots of goggling i haven't found single place where i can get all Concurrency API feature with example, then i have prepared following notes , I hope it will help you to understand  Concurrency API  

The java.util.concurrent package in J2SE 5.0 provides classes and interfaces aiming to simplify the development of concurrent classes and applications by providing high quality implementations of common building blocks used in concurrent applications. The package includes classes optimized for concurrent access, including: 

1) Task scheduling framework: The Executor framework is for standardizing invocation, scheduling, execution, and control of asynchronous tasks according to a set of execution policies. Tasks are allowed to be executed within the submitting thread, in a single background thread (events in Swing), in a newly created thread, or in a thread pool. Also, the built-in implementations offer configurable policies such as queue length limits and saturation policy that can improve the stability of applications by preventing runaway source consumption.

2) Concurrent collections: New collection classes have been added, including Queue and BlockingQueue interfaces, as well as high-performance concurrent implementations of Map, List, and Queue.

3) Atomic variables: The java.util.concurrent.atomic package provides classes for automatically manipulating single variables, which can be primitive types or references. The implementations offer higher performance than would be available by using synchronization. This is useful for implementing high performance concurrent algorithms and for implementing counters and sequence number generators.

4) Synchronizers: General purpose synchronization classes that facilitate coordination between threads have been added including: semaphores, mutexes, barriers, latches, and exchangers

5) Locks: The synchronized keyword can be used for locking, but as mentioned earlier, this built-in monitor has its own limitations. The java.util.concurrent.locks provides a high-performance lock implementation, and also allows developers to specify a timeout when attempting to acquire a lock, multiple condition variables per lock, non-nested scoped locks, and support for interrupting threads that are waiting to acquire a lock.

6) Nanosecond-granularity timing: The new java.lang.System.nanotime() method that provides high-precision timing facilities has been added to enable access to nanosecond-granularity time source for making relative time measurements, and methods that accept timeouts such as BlockingQueue.offer(), BlockingQueue.poll(), Lock.tryLock(), Condition.wait(), and Thread.sleep(). Note that the precision of nanotime() is platform-dependent. 


Following are the detail description and details about java high level concurrency API, its mandatory you should aware about low level Thread API before go to high level, it will help to understand high level API.  

1) Java Concurrency –Executors and thread pools

This time we’ll learn how to start cleanly new threads and to manage thread pools. In Java, if you have a Runnable like this :

Runnable runnable = new Runnable(){
   public void run(){
      System.out.println("Run");
   }
}

You can easily run it in a new thread :

new Thread(runnable).start();


This is very simple and clean, but what if you’ve several long running tasks that you want to load in parallel and then wait for the completion of all the tasks, it’s a little bit harder to code and if you want to get the return value of all the tasks it becomes really difficult to keep a good code. 

But like for almost any problems, Java has a solution for you, the Executors. This simple class allows you to create thread pools and thread factories.

A thread pool is represented by an instance of the class ExecutorService. With an ExecutorService, you can submit task that will be completed in the future. Here are the type of thread pools you can create with the Executors class :

Single Thread Executor : A thread pool with only one thread. So all the submitted task will be executed sequentially. Method : Executors.newSingleThreadExecutor() 

Cached Thread Pool : A thread pool that create as many threads it needs to execute the task in parallel. The old available threads will be reused for the new tasks. If a thread is not used during 60 seconds, it will be terminated and removed from the pool. Method : Executors.newCachedThreadPool() 

Fixed Thread Pool : A thread pool with a fixed number of threads. If a thread is not available for the task, the task is put in queue waiting for an other task to ends. Method : Executors.newFixedThreadPool() 

Scheduled Thread Pool : A thread pool made to schedule future task. Method : Executors.newScheduledThreadPool() 

Single Thread Scheduled Pool : A thread pool with only one thread to schedule future task. Method : Executors.newSingleThreadScheduledExecutor()


Once you have a thread pool, you can submit task to it using the different submit methods. You can submit a Runnable or a Callable to the thread pool. The method returns a Future representing the future state of the task. If you submitted a Runnable, the Future object return null once the task finished.

By example, if you have this Callable:

private final class StringTask implements Callable<String> {
   public String call(){
      //Long operations

      return "Run";
   }
}

If you want to execute that task 10 times using 4 threads, you can use that code:

ExecutorService pool = Executors.newFixedThreadPool(4);

for(int i = 0; i < 10; i++){
   pool.submit(new StringTask());
}


But you must shutdown the thread pool in order to terminate all the threads of the pool :

                                    pool.shutdown();

If you don’t do that, the JVM risk to not shutdown because there is still threads not terminated. You can also force the shutdown of the pool using shutdownNow, with that the currently running tasks will be interrupted and the tasks not started will not be started at all.

But with that example, you cannot get the result of the task. So let’s get the Future objects of the tasks:

ExecutorService pool = Executors.newFixedThreadPool(4);

List<Future<String>> futures = new ArrayList<Future<String>>(10);

for(int i = 0; i < 10; i++){
   futures.add(pool.submit(new StringTask()));
}

for(Future<String> future : futures){
   String result = future.get();

   //Compute the result
}

                                  pool.shutdown();


But this code is a bit complicated. And there is a disadvantage. If the first task takes a long time to compute and all the other tasks ends before the first, the current thread cannot compute the result before the first task ends. Once again, Java has the solution for you, CompletionService.

A CompletionService is a service that make easier to wait for result of submitted task to an executor. The implementation is ExecutorCompletionService who’s based on an ExecutorService to work. So let’s try:

ExecutorService threadPool = Executors.newFixedThreadPool(4);
CompletionService<String> pool = new ExecutorCompletionService<String>(threadPool);

for(int i = 0; i < 10; i++){
   pool.submit(new StringTask());
}

for(int i = 0; i < 10; i++){
   String result = pool.take().get();

   //Compute the result
}
                       threadPool.shutdown();

With that, you have the result in the order they are completed and you don’t have to keep a collection of Future.

Here we are, you have the tools in hand to launch tasks in parallel using performing thread pools. Using Executors, ExecutorService and CompletionService you can create complex algorithm using several taks. With that tool, it’s really easy to change the number of threads performing in parallel or adding more tasks without changing a lot of code.


2) Java Concurrency –: Concurrent collections:

The java.util.concurrent package includes a number of additions to the Java Collections Framework. These are most easily categorized by the collection interfaces provided:

BlockingQueue defines a first-in-first-out data structure that blocks or times out when you attempt to add to a full queue, or retrieve from an empty queue. 

ConcurrentMap is a subinterface of java.util.Map that defines useful atomic operations. These operations remove or replace a key-value pair only if the key is present, or add a key-value pair only if the key is absent. Making these operations atomic helps avoid synchronization. The standard general-purpose implementation of ConcurrentMap is ConcurrentHashMap, which is a concurrent analog of HashMap. 

ConcurrentNavigableMap is a subinterface of ConcurrentMap that supports approximate matches. The standard general-purpose implementation of ConcurrentNavigableMap is ConcurrentSkipListMap, which is a concurrent analog of TreeMap. 

All of these collections help avoid Memory Consistency Errors by defining a happens-before relationship between an operation that adds an object to the collection with subsequent operations that access or remove that object.

3) Java Concurrency –: Atomic Variables
When a data (typically a variable) can be accessed by several threads, you must synchronize the access to the data to ensure visibility and correctness.

By example, if you have a simple counter 

public class Counter {
    private int value;

    public int getValue(){
        return value;
    }

    public int getNextValue(){
        return value++;
    }

    public int getPreviousValue(){
        return value--;
    }
}

This class works really well in single-threaded environment, but don’t work at all when several threads access the same Counter instance. You can solve the problem using synchronized at method level :

public class SynchronizedCounter {
    private int value;

    public synchronized int getValue(){
        return value;
    }

    public synchronized int getNextValue(){
        return value++;
    }

    public synchronized int getPreviousValue(){
        return value--;
    }
}

This class now works well. But locking is not a lightweight mechanism and have several disadvantages. 

When several threads try to acquire the same lock, one or more threads will be suspended and they will be resumed later. When the critical section is little, the overhead is really heavy especially when the lock is often acquired and there is a lot of contention.

 Another disadvantage is that the other threads waiting of the lock cannot do something else during waiting and if the thread who has the lock is delayed (due to a page fault or the end of the time quanta by example), the others threads cannot take their turn.

So how to do to avoid this disadvantages? We must use non-blocking algorithms. These algorithms don’t use blocking mechanisms and by that fact are more scalable and performing. 

These algorithms use low-level machine instructions which are atomic to ensure the atomicity of higher-level operations. While locking is a pessimistic approach, we can also use optimistic technique to develop algorithms.

 This time, we’ll detect collisions between threads in which case, the operation fails and we do something else (often retrying the same operation).

The actual processors provide several instructions that simplify greatly the implementation of these non-blocking algorithms; the most-used operation today is the compare-and-swap operation (CAS). This operation takes three parameters, the memory address, the expected current value and the new value.

 It atomically update the value at the given memory address if the current value is the expected, otherwise it do nothing. In both cases, the operations return the value at the address after the operation execution. So when several threads try to execute the CAS operation, one thread wins and the others do nothing. So the caller can choose to retry or to do something else. We often use this operation to implement another operation, the compare-and-set. This method makes exactly the same things as CAS but return a boolean indicating if the operation succeeded or not.

Before Java 5.0, this operation was not available directly to developer, but in Java 5.0 several atomic variables (for int, long, boolean and reference values) were added. The int and long versions also supports numeric operations. The JVM compiles these classes with the better operations provided by the hardware machine, CAS or a Java implementation of the operation using a lock. Here are the classes :

AtomicInteger 
AtomicLong 
AtomicBoolean 
AtomicReference

All these classes supports compare-and-set (via the compareAndSet() method) and other operations (get(), set() and getAndSet()). The setters operations are implemented using compareAndSet. These classes support multi-threaded access and have a better scalability than synchronizing all the operations.Here is how we can rewrite our counter using an AtomicInteger :

public class AtomicCounter {
    private final AtomicInteger value = new AtomicInteger(0);

    public int getValue(){
        return value.get();
    }

    public int getNextValue(){
        return value.incrementAndGet();
    }

    public int getPreviousValue(){
        return value.decrementAndGet();
    }
}


The incrementAndGet() and decrementAndGet() methods are two of the numeric operations provided by the AtomicLong and AtomicInteger classes. You also have getAndDecrement(), getAndIncrement(), getAndAdd(int i) and addAndGet().

This version is faster than the synchronized one and is also thread safe.
If you only have the compareAndSet(), here is how we can implement increment() method using it :

public void increment(AtomicInteger integer){
    while(true){
        int current = integer.get();
        int next = current + 1;

        if(integer.compareAndSet(current, next)){
            return;
        }
    }
}

This seems to be complicated, but this is the cost of non-blocking algorithms. When we detect collision, we retry until the operation succeeded. This is the common schema for non-blocking algorithms.

Here is a thread-safe Stack implemented using AtomicReference :
public class Stack {
    private final AtomicReference<Element> head = new AtomicReference<Element>(null);

    public void push(String value){
        Element newElement = new Element(value);

        while(true){
            Element oldHead = head.get();
            newElement.next = oldHead;

            //Trying to set the new element as the head
            if(head.compareAndSet(oldHead, newElement)){
                return;
            }
        }
    }



    public String pop(){
        while(true){
            Element oldHead = head.get();

            //The stack is empty
            if(oldHead == null){
                return null;
            }

            Element newHead = oldHead.next;

            //Trying to set the new element as the head
            if(head.compareAndSet(oldHead, newHead)){
                return oldHead.value;
            }
        }
    }

    private static final class Element {
        private final String value;
        private Element next;

        private Element(String value) {
            this.value = value;
        }
    }
}


It’s really more complicated than using synchronized on the two methods but also more performing if there is contention (and often even if there is no contention).
So this ends this post. To conclude, atomic variables classes are a really good way to implement non-blocking algorithms and moreover are also a very good alternative to volatile variables, because they can provide atomicity and visibility.

 5) Java Concurrency –: Locks
lock is a thread synchronization mechanism like synchronized blocks except locks can be more sophisticated than Java's synchronized blocks. Locks (and other more advanced synchronization mechanisms) are created using synchronized blocks, so it is not like we can get totally rid of the synchronized keyword. 

From Java 5 the package java.util.concurrent.locks contains several lock implementations, so you may not have to implement your own locks. But you will still need to know how to use them, and it can still be useful to know the theory behind their implementation. For more details, see my tutorial on the java.util.concurrent.locks.Lock interface. 
Here is a list of the topics covered in this text: 

1. A Simple Lock 
2. Lock Reentrance 
3. Lock Fairness 
4. Calling unlock() From a finally-clause 

A Simple Lock
Let's start out by looking at a synchronized block of Java code: 
public class Counter{

  private int count = 0;

  public int inc(){
    synchronized(this){
      return ++count;
    }
  }
}

Notice the synchronized(this) block in the inc() method. This block makes sure that only one thread can execute the return ++count at a time. The code in the synchronized block could have been more advanced, but the simple ++count suffices to get the point across. 
The Counter class could have been written like this instead, using a Lock instead of a synchronized block: 

public class Counter{

  private Lock lock = new Lock();
  private int count = 0;

  public int inc(){
    lock.lock();
    int newCount = ++count;
    lock.unlock();
    return newCount;
  }
}

The lock() method locks the Lock instance so that all threads calling lock() are blocked until unlock() is executed. 
Here is a simple Lock implementation: 
public class Lock{
  
  private boolean isLocked = false;
  
  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      wait();
    }
    isLocked = true;
  }
  
  public synchronized void unlock(){
    isLocked = false;
    notify();
  }
}

Notice the while(isLocked) loop, which is also called a "spin lock". Spin locks and the methods wait() and notify() are covered in more detail in the text Thread Signaling. While isLocked is true, the thread calling lock() is parked waiting in the wait() call. In case the thread should return unexpectedly from the wait() call without having received a notify() call (AKA a Spurious Wakeup) the thread re-checks the isLocked condition to see if it is safe to proceed or not, rather than just assume that being awakened means it is safe to proceed. If isLocked is false, the thread exits the while(isLocked) loop, and sets isLocked back to true, to lock the Lock instance for other threads calling lock(). 

When the thread is done with the code in the critical section (the code between lock() and unlock()), the thread calls unlock(). Executing unlock() sets isLocked back to false, and notifies (awakens) one of the threads waiting in the wait() call in the lock() method, if any. 

Lock Reentrance
Synchronized blocks in Java are reentrant. This means, that if a Java thread enters a synchronized block of code, and thereby take the lock on the monitor object the block is synchronized on, the thread can enter other Java code blocks synchronized on the same monitor object. Here is an example: 
public class Reentrant{

  public synchronized outer(){
    inner();
  }

  public synchronized inner(){
    //do something
  }
}
Notice how both outer() and inner() are declared synchronized, which in Java is equivalent to a synchronized(this) block. If a thread calls outer() there is no problem calling inner() from inside outer(), since both methods (or blocks) are synchronized on the same monitor object ("this"). If a thread already holds the lock on a monitor object, it has access to all blocks synchronized on the same monitor object. This is called reentrance. The thread can reenter any block of code for which it already holds the lock. 

The lock implementation shown earlier is not reentrant. If we rewrite the Reentrant class like below, the thread calling outer() will be blocked inside the lock.lock() in the inner() method. 

public class Reentrant2{

  Lock lock = new Lock();

  public outer(){
    lock.lock();
    inner();
    lock.unlock();
  }

  public synchronized inner(){
    lock.lock();
    //do something
    lock.unlock();
  }
}

A thread calling outer() will first lock the Lock instance. Then it will call inner(). Inside the inner() method the thread will again try to lock the Lock instance. This will fail (meaning the thread will be blocked), since the Lock instance was locked already in the outer() method. 

The reason the thread will be blocked the second time it calls lock() without having called unlock() in between, is apparent when we look at the lock() implementation: 

public class Lock{

  boolean isLocked = false;

  public synchronized void lock()
  throws InterruptedException{
    while(isLocked){
      wait();
    }
    isLocked = true;
  }

  ...
}

It is the condition inside the while loop (spin lock) that determines if a thread is allowed to exit the lock() method or not. Currently the condition is that isLocked must be false for this to be allowed, regardless of what thread locked it. 

To make the Lock class reentrant we need to make a small change: 

public class Lock{

  boolean isLocked = false;
  Thread  lockedBy = null;
  int     lockedCount = 0;

  public synchronized void lock()
  throws InterruptedException{
    Thread callingThread = Thread.currentThread();
    while(isLocked && lockedBy != callingThread){
      wait();
    }
    isLocked = true;
    lockedCount++;
    lockedBy = callingThread;
  }


  public synchronized void unlock(){
    if(Thread.curentThread() == this.lockedBy){
      lockedCount--;

      if(lockedCount == 0){
        isLocked = false;
        notify();
      }
    }
  }

  ...
}


Notice how the while loop (spin lock) now also takes the thread that locked the Lock instance into consideration. If either the lock is unlocked (isLocked = false) or the calling thread is the thread that locked the Lock instance, the while loop will not execute, and the thread calling lock() will be allowed to exit the method. 

Additionally, we need to count the number of times the lock has been locked by the same thread. Otherwise, a single call to unlock() will unlock the lock, even if the lock has been locked multiple times. We don't want the lock to be unloced until the thread that locked it, has executed the same amount of unlock() calls as lock() calls. 

The Lock class is now reentrant. 

Lock Fairness
Java's synchronized blocks makes no guarantees about the sequence in which threads trying to enter them are granted access. Therefore, if many threads are constantly competing for access to the same synchronized block, there is a risk that one or more of the threads are never granted access - that access is always granted to other threads. 

This is called starvation. To avoid this a Lock should be fair. Since the Lock implementations shown in this text uses synchronized blocks internally, they do not guarantee fairness. Starvation and fairness are discussed in more detail in the text Starvation and Fairness. 

Calling unlock() From a finally-clause
When guarding a critical section with a Lock, and the critical section may throw exceptions, it is important to call the unlock() method from inside a finally-clause. Doing so makes sure that the Lock is unlocked so other threads can lock it. Here is an example: 

lock.lock();
try{
  //do critical section code, which may throw exception
} finally {
  lock.unlock();
}

This little construct makes sure that the Lock is unlocked in case an exception is thrown from the code in the critical section. If unlock() was not called from inside a finally-clause, and an exception was thrown from the critical section, the Lock would remain locked forever, causing all threads calling lock() on that Lock instance to halt indefinately. 


6) Java Concurrency –: Nanosecond-granularity timing:
The new java.lang.System.nanotime() method that provides high-precision timing facilities has been added to enable access to nanosecond-granularity time source for making relative time measurements, and methods that accept timeouts such as BlockingQueue.offer(), BlockingQueue.poll(), Lock.tryLock(), Condition.wait(), and Thread.sleep(). Note that the precision of nanotime() is platform-dependent.

References: 

JAVA DOC 5:
JAVA DOC 6:
JDK 5.0 Concurrency-related APIs & Developer Guides:

,

No comments:

Post a Comment