OpenStreetMap (OSM) is one of the largest open datasets. The GeoKnow project of which OpenLink is a part is, among other things, drawing on this resource. The EU FP7 project LOD2 has developed a geospatial drill-down benchmark for RDF and here we will be applying it to PostGIS and Virtuoso in plain SQL.

Generic relational performance is the necessary predecessor of any graph performance, so we will talk SQL at first; the Linked Geodata RDFisms will come later.

In this article, we will look in detail at ETL> from PostGIS to Virtuoso via SQL federation. In the TPC-H series, we looked at bulk loading from files which are 1:1 shaped like the tables, but life is seldom this simple.

Here we will see how to change normalization in schemas, from a denormalized key-value pair-structure in PostGIS, to a normalized "triple table" in Virtuoso. We will also look at data type conversion, overall data transfer speed, and automatic parallelization.

ETL, even with medium data sizes, like with OSM at a little under 600 GB in PostgreSQL files, is a performance game, like everything in database. Data must move fast, expressing the transformation logic must be compact, and parallelism must be automatic. Next to nobody can write parallel code and the few that can are needed somewhere else.

I suppose, without insider knowledge, I would dump the data into CSV; do some sed scripts or the like for the transformation, maybe in Hadoop if the data were really large; and then I would use the target database's bulk load utility. This makes the steps so simple that they can be delegated with some possibility of success. This is what data integration tends to be like. As we saw with the TPC-H bulk load, CSV loading is foolproof, easy, and fast.

Further, I would not make a JDBC program to first read one database and write into another because this would have to be explicitly multithreaded, would have loops, would require use of array parameters in order not to get killed by client server latency, would be liable to run into oddities of JDBC implementations, and so forth. Plus, this could be a few hundred lines long, and the developer would come back with questions like, "Why is it slow?" Well, it is slow because of lock contention, because transactions are not turned off, or something of the sort. No. Shell scripts and bulk load anytime.

Now we will explore a third possibility: vectored stored procedures. It is true that nobody uses stored procedures. They are sooo nineties -- where's the client side Javascript? I will introduce a design pattern that runs table-to-table copy and normalization changes, with perfect parallelism and scale-out, in SQL procedures. This will work from the file system as well, since a CSV file can be accessed as a table. For number of code lines, time-to-solution, as well as run-time performance, this is unbeatable.

Schema Choices

The LOD2 project developed a benchmark for geo retrieval in SPARQL. We have adapted the benchmark to work in SQL against the PostgreSQL OSM schema and a Virtuoso SQL equivalent.

The intent is to run the LOD2 geobench against the planet-wide OSM dataset in PostgreSQL and Virtuoso. With Virtuoso we will also compare scale-out and single server versions.

The PostgreSQL OSM implementation exists in both normalized and denormalized variants. The denormalized variant uses an H-Store column type, which is a built-in non-first-normal-form set of key-value pairs that can occur as a column value. In Virtuoso, the equivalent would be to use an array in a column value, but this is not very efficient. Rather, we will go the normalized route, getting outstanding JOIN performance and space efficiency from the column store. Since this is a freestyle race, we take the liberty of borrowing the IRI data type from the RDF side of Virtuoso. This offers a fast mapping between names and integer identifiers. This is especially handy for tags. PostgreSQL likely has some similar encoding as part of the H-Store implementation.

The geometry types are transferred as strings, and then re-parsed into the Virtuoso equivalents. The EWKT syntax is compatible between the systems. The potentially long geometries are stored in a LONG ANY column, and the always short ones (e.g., bounding boxes and points) into an ANY column. In both implementations, there is an R-tree index on the points but not on the linestrings.

We will later look at space consumption and access locality in more detail.

To ETL the PostgreSQL based dataset, we attach the OSM tables as remote tables using Virtuoso's SQL federation (VDB) feature. This is not in the Open Source Edition (VOS) but you can get the same effect by dumping the tables into files, and defining the files as tables with the file-table feature.

The tables which have no need of special transformation go with just an INSERT ... SELECT, like this:

log_enable (2); 
INSERT INTO users 
  SELECT * 
  FROM users1 &

The tables which have special datatypes (like geometries or H-Stores) need a little application logic, like this:

CREATE PROCEDURE copy_ways ()
  {
    log_enable (2);
    RETURN 
      ( SELECT COUNT (ins_ways ( id, 
                                 version, 
                                 user_id, 
                                 tstamp, 
                                 changeset_id,
                                 tags, 
                                 linestring_wkt, 
                                 bbox_wkt
                     )         ) 
         FROM ways1)
    ;
  }

The first line disables logging and makes inserts non-transactional. The rest does the copy. The scan of the remote table is automatically split by ranges of its primary key, so there is no need for explicit parallelism. The ins_ways function is called on each thread, on a whole vector of values for each column. In this way operations are batched together, gaining by locality, and eliminating interpretation overhead.

The ins_ways procedure follows:

CREATE PROCEDURE ins_ways 
  ( IN  id            BIGINT,
    IN  version       INT, 
    IN  user_id       INT, 
    IN  tstamp        DATETIME,
    IN  changeset_id  BIGINT, 
    IN  tags          ANY ARRAY,
    IN  linestring    VARCHAR, 
    IN  bbox          VARCHAR
  )
  {

-- The vectored declaration means that each statement is run on the full input before going to the next.

-- Thus, by default, the insert gets 10K consecutive rows to insert. The conversion functions like st_ewkt_read are also run in a tight loop over a large number of values.

    VECTORED;
    INSERT INTO ways 
      VALUES ( id, 
               version, 
               user_id, 
               tstamp, 
               changeset_id,
               st_ewkt_read
                 ( charset_recode
                     ( linestring, '_WIDE_', 'UTF-8' )
                 ),
               st_ewkt_read
                 ( charset_recode
                     ( bbox, '_WIDE_', 'UTF-8' )
                 )
             ) 
    ;

-- The tags is a vector of strings where each string is a serialization of the H-Store content. split_and_decode splits each string into an array at the delimiter.

    tags := 
      split_and_decode
        ( TRIM
            ( REPLACE
                ( REPLACE
                    ( REPLACE
                        ( REPLACE
                            ( tags, 
                              '"=>"', 
                              '!!!'
                            ), 
                          '&', 
                          '%26'
                        ), 
                      '", "', 
                      '&'
                    ), 
                  '=', 
                  '%3D'
                ), 
              '"'
            )
        );
    NOT VECTORED 
      {
        DECLARE  a1, 
                 b1       VARCHAR 
        ;
        DECLARE  ws, 
                 vs, 
                 ts       ANY ARRAY
        ;
        DECLARE  n_sets, 
                 n_tags, 
                 set_no, 
                 wid, 
                 inx, 
                 pos, 
                 fill     INT
        ;

-- We insert triples of the form tag, way_id, tag_value. For each of these, we reserve an array of 100K elements. We put the values into the array, and insert when full or when all rows of input are done. An insert of 100K values in one go is much faster than inserting 100K values singly, especially on a cluster.

        ws := make_array (100000, 'ANY');
        ts := make_array (100000, 'ANY');
        vs := make_array (100000, 'ANY');
        fill := 0;
        DECLARE tag_arr, 
                str ANY ARRAY;
        n_sets := vec_length (tags);

-- For each row of input to the vectored function:

        FOR ( set_no  :=  0      ; 
              set_no  <   n_sets ; 
              set_no  :=  set_no + 1
            )
          {
            wid     :=  vec_ref (id, set_no);
            tag_arr :=  vec_ref (tags, set_no);
            n_tags  :=  LENGTH (tag_arr);

-- for each tag in the H-Store string:

            FOR ( inx  :=  0; 
                  inx  <   n_tags; 
                  inx  :=  inx + 2)
              {

-- split the tag into a key and a value at the !!! delimiter

                str :=  tag_arr[inx];
                pos :=  strstr(str, '!!!');
                a1  :=  substring(str, 1, pos);
                b1  :=  subseq(str, pos + 3);

-- add to the array of key-value pairs to insert

                way_tag_add (ws, ts, vs, fill, wid, a1, b1);
              }
          }
        way_tag_ins (ws, ts, vs);
      }
  }

Now we define the functions for adding a way, key, value triple into the batch, and for inserting the batch.

CREATE PROCEDURE way_tag_ins 
  ( INOUT  ws  ANY ARRAY, 
    INOUT  ts  ANY ARRAY, 
    INOUT  vs  ANY ARRAY
  )
  {

-- given an array of way ids, tag names, and tag values, insert all rows where the tag is not 0. If the tag is empty, call it unknown instead.

-- The __i2id function replaces the tag name with an IRI ID that is persistently mapped to the name. The insert and the tag name-to-id mapping are done as a single operation. This is a single network round trip for each in a cluster setting.

    FOR VECTORED 
      ( IN  wid  INT     := ws, 
        IN  tag  ANY     := ts, 
        IN  val  VARCHAR := vs
      ) 
      {
        IF (tag <>  0)
          {
            IF ('' = tag)
              tag := 'unknown';
              INSERT INTO ways_tags 
                VALUES ( __i2id (tag), wid, val );
          }
      }
  }

CREATE PROCEDURE way_tag_add 
  ( INOUT  ws    ANY ARRAY, 
    INOUT  ts    ANY ARRAY, 
    INOUT  vs    ANY ARRAY, 
    INOUT  fill  INT, 
    IN     wid   INT, 
    INOUT  tg    VARCHAR, 
    INOUT  val   VARCHAR
  )
  {

-- Add at the end of the arrays; if full, insert the content and replace with fresh arrays.

-- The INOUT keyword means call by reference, which is important; you do not want to copy larger arrays, and you want to return new ones to the caller.

    ws[fill] := wid;
    ts[fill] := tg;
    vs[fill] := val;
    fill := fill + 1;
    IF (100000 = fill)
      {
        way_tag_ins (ws, ts, vs);
        fill := 0;
        ws := make_array (100000, 'ANY');
        ts := make_array (100000, 'ANY');
        vs := make_array (100000, 'ANY');
      }
  }

The same logic can be applied to any simple data transformation task. Vectoring and automatic parallelism make sure that there is full platform utilization without explicitly working with threads. The NOT VECTORED {} section allows the procedure to aggregate over all the values in a vector. The FOR VECTORED construct in the INSERT function switches back into running on a vector composed in the scalar part so as to get the insert throughput and cluster-friendly message pattern.

Because every non-1MF hack in every application is different, it is not possible to make this fully declarative. But the code is very repetitive and a skeleton could be easily generated from the schema.

In the next installment, we we will analyze the performance of copying the full Open Street Map dataset from PostgreSQL to Virtuoso. To be continued...