Thursday, May 26, 2011

Using Date and Calendar – At your own risk!

You can find lots of issues discussed on the ‘net about the problems of the Calendar/Date classes in Java. Here are a few that recently reared their ugly head in our code:

1. This one is actually not in either of these but in trying to output our Date. Beware of the mm instead of MM when using SimpleDateFormat. Yep, one of my team members used the following code:

SimpleDateFormat sdf = new SimpleDateFormat(“yyyymmdd”);

Surprisingly everyone in our tests had a birthday of the first of the month! I think that the right solution would be that mm is wrong and mi is minutes and mo is month. This would solve the confusion.

2. Did you know that there is a difference whether you use Calendar.HOUR or Calendar.HOUR_OF_DAY? Ok, i admit it this one is documented in the javdocs, but i would still think that it is a bit of surprise that in the afternoon, if you do

cal.set(Calendar.HOUR,0);

that you will get 12:00 PM!

3. Why does toString() of Calendar not put out the date in some format?

4. Beware that DAY_OF_WEEK is not zero based while MONTH is.

Thursday, April 28, 2011

Using StAX to extract records from an XML file

When processing large XML files, it is impossible to build a DOM of the whole file as you are sure to run out of memory. Java 6 includes StAX (it was around earlier of courser) and it is the best solution to deal with large XML files to build a scalable XML file processor. However, it still may be desirable to build a Document Model when processing an individual record due to decisions in code that may need to be made based on information lower in the node. Due to this requirement, we use StAX to extract each record from our gargantuan XML files and then pass to a different class that builds a DOM for the processing the record. Here is the very simple code with explanation of what we did.


   1: public class StaxExtracter {



   2:     XMLEventReader reader;



   3:     boolean headerRead = false;



   4:  



   5:     public void openStream(InputStream s) throws XMLStreamException {



   6:         XMLInputFactory inputFactory = XMLInputFactory.newInstance();



   7:         reader = inputFactory.createXMLEventReader(s);



   8:     }



   9:  



  10:     public void close() throws XMLStreamException {



  11:         reader.close();



  12:     }



  13:  



  14:     //function to allow us to read part of a document that does not include the header



  15:     public void setHeaderRead(boolean val) {



  16:         headerRead = val;



  17:     }


The above is some boilerplate code needed for our class. The heavy work is the XMLEventReader, created in the openStream function on line 7.

Following this we have two main functions. The first is used to read the open XML tag of the document and any important attributes in this tag, and the second is to read an actual record. Since the class is written to read recursively all sub elements of a record, it is important that we not read the open XML tag of our document as a regular record, since that will mean we wind up reading our entire document as one record! However, since there may be need to work on part of a file, i allowed an override on line 15 above so we can skip reading the header. The open XML tag is obviously not a complete XML tag so we do some heavier work here, returning the attributes as a map for use by our application, in addition to the raw line which can be used upstream in our application.

Here is the actual code:


   1: /**



   2:      * Since header is not a complete element we need to do special treatment so we will read the header attributes



   3:      * into a hashmap to save all the attributes besides returning the raw data



   4:      *



   5:      * @return the rawData of document start and header



   6:      * @throws XMLStreamException



   7:      */



   8:     public String getHeaderAndAttributesAsMap(Map<String, String> attributes) throws XMLStreamException {



   9:         boolean documentStart = false;



  10:         StringBuffer rawData = new StringBuffer();



  11:         while (reader.hasNext() && !headerRead) {



  12:             XMLEvent e = reader.nextEvent();



  13:             if (e.isStartDocument()) {



  14:                 documentStart = true;



  15:             } else if (e.isStartElement()) {



  16:                 headerRead = true;



  17:                 StartElement startElement = e.asStartElement();



  18:                 for (Iterator i = startElement.getAttributes(); i.hasNext();) {



  19:                     Attribute attr = (Attribute) i.next();



  20:                     attributes.put(attr.getName().getLocalPart(), attr.getValue());



  21:                 }



  22:             }



  23:             //skip whitespace before document



  24:             if (documentStart) {



  25:                 rawData.append(e.toString());



  26:             }



  27:         }



  28:         if (!headerRead)



  29:             return null;



  30:         else {



  31:             return rawData.toString();



  32:         }



  33:     }


Now, on to the main function for reading each element of our XML file:


   1: public enum RecordStatus {BODY,NO_DATA,CLOSE_BODY_TAG,END_DOCUMENT}



   2:  



   3: RecordStatus getNextRecord(StringBuffer recordData) throws XMLStreamException {



   4:     if (!headerRead) {



   5:         throw new RuntimeException("need to read header line before calling this function");



   6:     }



   7:     RecordStatus rc = RecordStatus.NO_DATA;



   8:     String recordName = null;



   9:     boolean finishedReadingElement = false;



  10:     boolean startOfRecord = false;



  11:     while (reader.hasNext() && !finishedReadingElement) {



  12:         XMLEvent e = reader.nextEvent();



  13:         if (e.isStartElement()) {



  14:             if (!startOfRecord) {



  15:                 startOfRecord = true;



  16:                 recordName = e.asStartElement().getName().getLocalPart();



  17:             }



  18:         } else if (e.isEndElement()) {



  19:             EndElement endElement = e.asEndElement();



  20:             if (endElement.getName().getLocalPart().equals(recordName)) {



  21:                 finishedReadingElement = true;



  22:                 rc = RecordStatus.BODY;



  23:             }  else if (!startOfRecord) {



  24:                 //todo  we could make this better by e adding validation here to save the START_BODY_TAG



  25:                 rc = RecordStatus.CLOSE_BODY_TAG;



  26:                 finishedReadingElement = true;



  27:             }



  28:         } else if (e.isEndDocument()) {



  29:             rc = RecordStatus.END_DOCUMENT;



  30:         }



  31:         //skip whitespace before element



  32:         if (startOfRecord || rc == RecordStatus.CLOSE_BODY_TAG) {



  33:             if (e.isCharacters())



  34:                 //need to re-escape characters so SAX parser wont choke on it



  35:                 recordData.append(StringEscapeUtils.escapeXml(e.toString()));



  36:             else



  37:                 recordData.append(e.toString());



  38:         }



  39:     }



  40:     return rc;



  41: }


We use an enum to give us maximal information about what was read while return the actual raw data in a StringBuffer that is allocated outside our function.

Of course, you could do ALL the parsing in your StAX implementation as well, as described here and other places, but this code allows us to continue to use the more elegant DOM for our actual work.

Monday, February 14, 2011

Java in Hollywood -- Using the Actor Model in Java

We have all been hearing how the programming paradigm has started to change, in that Moore’s law will no longer bail us out to speed up our applications and instead we we need to program better to use our multiple processors. Many have been pushing the Actor Model as the simplest way to get to solid multithreading without locks, etc. And for that reason, we are getting pushed to use Scala and other languages instead of Java. But you can also use these frameworks in your existing Java application so i figured i would give it a spin using Akka, unfortunately postponing yet again my first application in Scala.
For a starter, my app just needs to run a job asynchronously with no response, but this would be enough to familiarize myself with Akka and see how to use it.

So, after downloading Akka, the following dependencies are needed to get started:


Now, we can write a small class. Here is the code to send asynchronous messages and not get a response back.


   1: public class MyActor1 extends UntypedActor {



   2:  



   3:     public void onReceive(Object message) {



   4:         Integer numMessage = (Integer) message;



   5:         System.out.println(Thread.currentThread().getName() + "\tincoming message " + numMessage);



   6:         for (int i=0;i<1000000000;i++) {           



   7:         }



   8:         System.out.println(Thread.currentThread().getName() + "\tFinished Message " + numMessage);



   9:         System.out.println(Thread.currentThread().getName() + "\tStopping on thread " + numMessage);



  10:         getContext().stop();



  11:     }



  12:  



  13:     public static void main(String[] args) throws InterruptedException {



  14:  



  15:         for (int i=0;i<50;i++) {



  16:             Actors.actorOf(MyActor1.class).start().sendOneWay(i);



  17:         }



  18:        Thread.sleep(50000);



  19:     }



  20: }


Line 16 shows the code to launch the thread as we “start’ the class and then send the message. In this case we sendOneWay but other methods include sendRequestReply, and sendRequestReplyFuture, as you can see in the documentation.

When you run this application you will see that it launched 16 threads and uses them for the 50 jobs. I found it nice that it was expressive – sendOneWay, as well as to access the context to end the activity of this object.

The next step was learning how to customize the behavior of my thread pool, which led to this change to the application:



   1: public class MyActor extends UntypedActor {



   2:  



   3:     public MyActor(MessageDispatcher dispatcher) {



   4:         getContext().setDispatcher(dispatcher);



   5:  



   6:     }



   7:     public void onReceive(Object message) {



   8:         Integer numMessage = (Integer) message;



   9:         System.out.println(Thread.currentThread().getName() + "\tincoming message " + numMessage);



  10:         for (int i=0;i<1000000000;i++) {           



  11:         }



  12:         System.out.println(Thread.currentThread().getName() + "\tFinished Message " + numMessage);



  13:         System.out.println(Thread.currentThread().getName() + "\tStopping on thread ");



  14:         getContext().stop();



  15:     }



  16:  



  17:     public static void main(String[] args) throws InterruptedException {



  18:        final MessageDispatcher d = Dispatchers.newExecutorBasedEventDrivenDispatcher("dispatcher").withNewThreadPoolWithLinkedBlockingQueueWithCapacity(50)



  19:       .setCorePoolSize(5)



  20:       .setMaxPoolSize(6)



  21:       .setKeepAliveTimeInMillis(60000)



  22:       .setRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy())



  23:       .build();



  24:  



  25:         for (int i=0;i<50;i++) {



  26:             ActorRef actor = Actors.actorOf(new UntypedActorFactory() {



  27:                 public UntypedActor create() {



  28:                     return new MyActor(d);



  29:             }



  30:             });



  31:             actor.start().sendOneWay(i);



  32:         }



  33:  



  34:        Thread.sleep(50000);



  35:     }



  36: }



As you see, on line 18 we create the ThreadPool, in Akka terminology called a MessageDispatcher. There are a few different kinds as mentioned here. But this change means that we now need a constructor to receive this dispatcher for use. (Don’t worry Spring integration is not a problem to use DI for the actors).

And voila, we see in the printouts that indeed this is running in multiple threads. Of course, for the need of one way with no reply, there really is no need to use Akka since the code is pretty straightforward using plain old Spring + Java as follows, where we define our ThreadPool bean in the application context:



   1: <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" destroy-method="destroy">



   2:         <property name="corePoolSize" value="5" />



   3:         <property name="maxPoolSize" value="10" />



   4:         <property name="queueCapacity" value="25" />



   5: </bean>


Once we have this, use of the code to do the same things just involves: taskExecutor.execute(bean). So, in this specific instance, the overhead of adding the scala and akka jars would not be justified, but clearly for more significant multithreaded needs, this could be a powerful tool to simplify code and make it very readable.