Monday, February 14, 2011

Java in Hollywood -- Using the Actor Model in Java

We have all been hearing how the programming paradigm has started to change, in that Moore’s law will no longer bail us out to speed up our applications and instead we we need to program better to use our multiple processors. Many have been pushing the Actor Model as the simplest way to get to solid multithreading without locks, etc. And for that reason, we are getting pushed to use Scala and other languages instead of Java. But you can also use these frameworks in your existing Java application so i figured i would give it a spin using Akka, unfortunately postponing yet again my first application in Scala.
For a starter, my app just needs to run a job asynchronously with no response, but this would be enough to familiarize myself with Akka and see how to use it.

So, after downloading Akka, the following dependencies are needed to get started:

Now, we can write a small class. Here is the code to send asynchronous messages and not get a response back.

   1: public class MyActor1 extends UntypedActor {


   3:     public void onReceive(Object message) {

   4:         Integer numMessage = (Integer) message;

   5:         System.out.println(Thread.currentThread().getName() + "\tincoming message " + numMessage);

   6:         for (int i=0;i<1000000000;i++) {           

   7:         }

   8:         System.out.println(Thread.currentThread().getName() + "\tFinished Message " + numMessage);

   9:         System.out.println(Thread.currentThread().getName() + "\tStopping on thread " + numMessage);

  10:         getContext().stop();

  11:     }


  13:     public static void main(String[] args) throws InterruptedException {


  15:         for (int i=0;i<50;i++) {

  16:             Actors.actorOf(MyActor1.class).start().sendOneWay(i);

  17:         }

  18:        Thread.sleep(50000);

  19:     }

  20: }

Line 16 shows the code to launch the thread as we “start’ the class and then send the message. In this case we sendOneWay but other methods include sendRequestReply, and sendRequestReplyFuture, as you can see in the documentation.

When you run this application you will see that it launched 16 threads and uses them for the 50 jobs. I found it nice that it was expressive – sendOneWay, as well as to access the context to end the activity of this object.

The next step was learning how to customize the behavior of my thread pool, which led to this change to the application:

   1: public class MyActor extends UntypedActor {


   3:     public MyActor(MessageDispatcher dispatcher) {

   4:         getContext().setDispatcher(dispatcher);


   6:     }

   7:     public void onReceive(Object message) {

   8:         Integer numMessage = (Integer) message;

   9:         System.out.println(Thread.currentThread().getName() + "\tincoming message " + numMessage);

  10:         for (int i=0;i<1000000000;i++) {           

  11:         }

  12:         System.out.println(Thread.currentThread().getName() + "\tFinished Message " + numMessage);

  13:         System.out.println(Thread.currentThread().getName() + "\tStopping on thread ");

  14:         getContext().stop();

  15:     }


  17:     public static void main(String[] args) throws InterruptedException {

  18:        final MessageDispatcher d = Dispatchers.newExecutorBasedEventDrivenDispatcher("dispatcher").withNewThreadPoolWithLinkedBlockingQueueWithCapacity(50)

  19:       .setCorePoolSize(5)

  20:       .setMaxPoolSize(6)

  21:       .setKeepAliveTimeInMillis(60000)

  22:       .setRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy())

  23:       .build();


  25:         for (int i=0;i<50;i++) {

  26:             ActorRef actor = Actors.actorOf(new UntypedActorFactory() {

  27:                 public UntypedActor create() {

  28:                     return new MyActor(d);

  29:             }

  30:             });

  31:             actor.start().sendOneWay(i);

  32:         }


  34:        Thread.sleep(50000);

  35:     }

  36: }

As you see, on line 18 we create the ThreadPool, in Akka terminology called a MessageDispatcher. There are a few different kinds as mentioned here. But this change means that we now need a constructor to receive this dispatcher for use. (Don’t worry Spring integration is not a problem to use DI for the actors).

And voila, we see in the printouts that indeed this is running in multiple threads. Of course, for the need of one way with no reply, there really is no need to use Akka since the code is pretty straightforward using plain old Spring + Java as follows, where we define our ThreadPool bean in the application context:

   1: <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="destroy">

   2:         <property name="corePoolSize" value="5" />

   3:         <property name="maxPoolSize" value="10" />

   4:         <property name="queueCapacity" value="25" />

   5: </bean>

Once we have this, use of the code to do the same things just involves: taskExecutor.execute(bean). So, in this specific instance, the overhead of adding the scala and akka jars would not be justified, but clearly for more significant multithreaded needs, this could be a powerful tool to simplify code and make it very readable.

No comments:

Post a Comment