For a starter, my app just needs to run a job asynchronously with no response, but this would be enough to familiarize myself with Akka and see how to use it.
So, after downloading Akka, the following dependencies are needed to get started:
Now, we can write a small class. Here is the code to send asynchronous messages and not get a response back.
1: public class MyActor1 extends UntypedActor {
2:
3: public void onReceive(Object message) {
4: Integer numMessage = (Integer) message;
5: System.out.println(Thread.currentThread().getName() + "\tincoming message " + numMessage);
6: for (int i=0;i<1000000000;i++) {
7: }
8: System.out.println(Thread.currentThread().getName() + "\tFinished Message " + numMessage);
9: System.out.println(Thread.currentThread().getName() + "\tStopping on thread " + numMessage);
10: getContext().stop();
11: }
12:
13: public static void main(String[] args) throws InterruptedException {
14:
15: for (int i=0;i<50;i++) {
16: Actors.actorOf(MyActor1.class).start().sendOneWay(i);
17: }
18: Thread.sleep(50000);
19: }
20: }
When you run this application you will see that it launched 16 threads and uses them for the 50 jobs. I found it nice that it was expressive – sendOneWay, as well as to access the context to end the activity of this object.
The next step was learning how to customize the behavior of my thread pool, which led to this change to the application:
1: public class MyActor extends UntypedActor {
2:
3: public MyActor(MessageDispatcher dispatcher) {
4: getContext().setDispatcher(dispatcher);
5:
6: }
7: public void onReceive(Object message) {
8: Integer numMessage = (Integer) message;
9: System.out.println(Thread.currentThread().getName() + "\tincoming message " + numMessage);
10: for (int i=0;i<1000000000;i++) {
11: }
12: System.out.println(Thread.currentThread().getName() + "\tFinished Message " + numMessage);
13: System.out.println(Thread.currentThread().getName() + "\tStopping on thread ");
14: getContext().stop();
15: }
16:
17: public static void main(String[] args) throws InterruptedException {
18: final MessageDispatcher d = Dispatchers.newExecutorBasedEventDrivenDispatcher("dispatcher").withNewThreadPoolWithLinkedBlockingQueueWithCapacity(50)
19: .setCorePoolSize(5)
20: .setMaxPoolSize(6)
21: .setKeepAliveTimeInMillis(60000)
22: .setRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy())
23: .build();
24:
25: for (int i=0;i<50;i++) {
26: ActorRef actor = Actors.actorOf(new UntypedActorFactory() {
27: public UntypedActor create() {
28: return new MyActor(d);
29: }
30: });
31: actor.start().sendOneWay(i);
32: }
33:
34: Thread.sleep(50000);
35: }
36: }
As you see, on line 18 we create the ThreadPool, in Akka terminology called a MessageDispatcher. There are a few different kinds as mentioned here. But this change means that we now need a constructor to receive this dispatcher for use. (Don’t worry Spring integration is not a problem to use DI for the actors).
And voila, we see in the printouts that indeed this is running in multiple threads. Of course, for the need of one way with no reply, there really is no need to use Akka since the code is pretty straightforward using plain old Spring + Java as follows, where we define our ThreadPool bean in the application context:
1: <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="destroy">
2: <property name="corePoolSize" value="5" />
3: <property name="maxPoolSize" value="10" />
4: <property name="queueCapacity" value="25" />
5: </bean>
No comments:
Post a Comment