Wednesday, May 16, 2012

Streaming Streaming Streaming ...

Now that I work for MarkLogic I am dealing with more and more "Big" "Data" ... and as usual xmlsh + marklogic is a huge win.   But as I start ramping up my use of large datasets especially large numbers of small documents (millions, hundred million ...) the old tricks dont work quite so well.

For example recently I needed to upload 3 million XML files to a ML server from a relational DB.
My first pass was my favorite tool for this ... xsql + xsplit + ml:put
Since I like to debug stuff as I build it ... the simple way is to do this.

xsql ... > bigfile.xml
xsplit -o xml bigfile.xml
cd xml
ml:put -baseuri /xxx/ -m 100 - -maxthreads 4 *.xml

On my big beefy server box this worked although a bit slowly.   So ok I wanted to now transfer this data to an EC2 instance.  Its "only" 10G of data so I did this

tar -cvzf xml.tar.gz  xml

then transfered the now compressed file to the EC2 machine.
Then on the EC2 machine I tried to replicate the above steps.

tar -xzf xml.tar.gz

I waited ... waited ... waited ... 3 DAYS and it wasnt done yet.   Admitedly this was a medium instance of EC2 but it should have handled this.   The problem seemed to be the system was stuck in 90% system time.
My guess is the age old problem of lots of files in a directory.  Especially over EBS ... it just doesnt perform well.  Its actually exponentially slow to add files to directory once they get big ... particurly nasty when the files are small so the overhead of simply creating a file entry is much bigger then the file IO itself.

So what to do ... I did 2 things ... I restarted the EC2 instance as an m1.xlarge ... ($$$$ ka chink)
Then instead of pre extracting the xml to a directory in whole I used a new feature I recently added to ml:put ...

tar -xzf xml.tar.gz | ml:put -baseuri /xxx/ -m 100 -maxthreads 4 -f - -delete

What this does is let tar still extract the files but it then lists them to stdout.
From there ml:put reads the list of files as they are extracting, batches them up and sends them to MarkLogic then deletes them.    The end result is that there is only about 500 or so files in the xml directory at any one time.   This completed in about half an hour ... about 2000 docs/sec ... much better.
Of course this speedup was due to the larger instance as well as the technique ...

But this gets me thinking ... Why do I need the overhead of writing to a temp directory for this ? Its still adding a significant unnecessary overhead.   I should be able to send a bunch of XML files to ml:put in a stream and use no temporary files.   In fact I should be able to do a full pipeline with no overhead like
   xquery 'for 1 to 10000000 return document {  ... } ' |  ml:put ...

or perhaps

    xsql 'select * from table' |  xsplit -stream | ml:put ....

The core problem here is the lack of a streaming interface for XDM.   In order to send a bunch of XML files (or XDM values) through a stream (or to a file and back) they need to be packaged in something.   Typically wrapped in a root element or maybe zipped or tar'd.

Zip is really lousy for this because its TOC is at the end so you cant stream unpack a zip file.  Tar is good because each file entry is contigous and you can stream unpack them.  But what about cases where I just want to dynamically create (or transform) XML and spit it out like the first example
      xquery 'for 1 to 10000000 return document {  ... } ' |  ml:put ...

If I wrap this in a single document it becomes hard to stream.  ml:put *could* have xsplit builtin  ... but to keep to the tools approach I'd rather split the functionality.   So say I put xsplit into the pipeline like the second example.    How is xsplit to produce *multiple* documents on its output stream in a way that is readable ?   Were back to a serialization format for XDM  (

This is a fundimental problem in traditional XML toolchains.  There is simply no standard and efficient way to stream sequences.   So what to do ?

I'm considering a 3 phase approach.
1) Implement an enhancement to xmlsh commands and pipes such that they can request, produce, and consume sequences through ports.  So for example "xsplit -stream" could output the split documents all to stdout.     But what would this look like ?  How to implement it ?

2) For pipes implement an optional XDM stream pipe.   This would allow streaming of XDM values (including sequences of documents), without serialization directly through the pipe.    This does mean that the pipe might get large if the documents are large ... I may have to limit the pipe to a small number of values.

3) Implement some kind of text serialization for sequences.   Essentially back to  ... although I am not sure I like my proposal so much in the face of this use case.   The original proposal does not consider streaming as the major use case.   However the use cases it was designed for should overlap with streaming.  I'm not even sure I need to support most of XDM ... falling back to what XProc does (streams of documents) may be sufficient although I abhor the restriction on purely theoretical grounds.  But the fact is any text serialization of XDM will be lossy.  It is just a matter of drawing the line somewhere, and maybe the most valuable use case is drawing the line at documents.

Well back to the drawing board.  I'd like to implement this but still so many open issues !!!
Comments welcome.


  1. Looking at another use case, consider "feeds" for example the Twitter feed

    This seems to be available only in JSON (not sure yet but using .xml doesnt work) and the JSON is spit out one per line with newlines encoded.

    Pulling from a URL like:
    will produce an endless stream of data. It would be very useful to easily handle this and turn it into a stream of XML documents ... again using what format, what protocol ... and no, JSON Lovers, JSON doesnt define a streaming serialization either it just happens twitter uses LF to seperate JSON documents.

  2. Why not do the XML the same way? Just concatenate valid XML documents. When you've read to the end of the root element of your document, consider anything you get afterward to be the start of a new document.

  3. Great question ! I wish it were that simple.
    The problem is I would have to write my own XML parser to do that. All existing XML parsers (at least the common ones in Java) abort painfully if there is any text after the last ">" ... I think this is actually part of the specs for XML documents but not sure why it has to be this way. But it is. So in order to intercept this and allow simple concatenation of documents you need to intercept the stream of the XML parser and detect the end of the document and simulate a EOF, then pick up where you left off for the next parse.
    This is not easy to do without reimplementing a full parser ... Unless you have some magic string value separating documents which is invalid in XML.
    This is the guts of the rationale for XDM streaming as I've been thinking about the last few years ... how to have a format that is efficient to split out to separate pieces while reusing existing parsers. (see link in original post )

  4. Huh - re: Twitter, I sorta figured it would be encoded using JSONP, where each document in the stream is a fully valid Javascript function call, and so the whole stream would be valid Javascript code, but nope - just newline separated hashes.

    It seems like specifying a new format may be a bigger burden, but the partial/decontextualized data features may make it totally worth it for reasons beyond the streaming, so what do I know. :)

    Although if I were just focused on the problem of "I need to stream XML unknown and potentially endless documents from one place to another", I'd write a SAX parser and enough glue to build a DOM out of it and have it bail out as soon as I get an endElement and don't have any parent elements left. (Maybe eat whitespace after/leading up to the next ?xml tag.) Should make it brutally easy to implement in other languages/frameworks too.

  5. Why twitter doesn't use JSONP is or arrays for that matter is because this is a "FEED" there is no end to it. Thus there is no point you can say "Send this off to be executed". Although only trivially different, this is a whole new kind of beast over document oriented data. Stream oriented data needs to be processed as it comes in without waiting for any kind of end ...

    Your suggestion for a SAX parser is good .. but then that means you have to write a SAX parser or a StAX parser or whatever. Its not hard to write a simple one but it is hard to write a *good* one ... then you have to replicate it in every language and get every tool to use your parser. Even then very few (any?) tools know what to do with anything but the first document anyway. So you dont win :(

    This is why I belive that document streaming (JSON or XML ...) needs to be one layer different (above or below depending on if your standing on your head). So existing toolkits and parsers can be reused and the infrastructure can handle the streaming part without having to rewrite the other bits.

    Just an FYI , I put the sample JSON I got from twitter into a file and loaded it with my favorite JSON viewer. It quietly ignored all but the first object ...

    So even with the pure JSON stuff you need a way to split up the input *before* it gets to the parsers ... in this case its easy (line feed) but for arbitrary JSON or XML its not. (they may have embedded linefeeds.


Due to comment spam, moderation is turned on. I will approve all non-spam comments.