We have received some more questions about Virtuoso's parallel query evaluation model.
In answer, we will here explain how we do search engine style processing by writing SPARQL. There is no need for custom procedural code because the query optimizer does all the partitioning and the equivalent of map reduce.
The point is that what used to require programming can often be done in a generic query language. The technical detail is that the implementation must be smart enough with respect to parallelizing queries for this to be of practical benefit. But by combining these two things, we are a step closer to the web being the database.
I will here show how we do some joins combining full text, RDF conditions, and aggregates and ORDER BY. The sample task is finding the top 20 entities with New York in some attribute value. Then we specify the search further by only taking actors associated with New York. The results are returned in the order of a composite of entity rank and text match score.
The basic query is:
SELECT
(
sql:s_sum_page
(
<sql:vector_agg>
(
<bif:vector> ( ?c1 , ?sm )
),
bif:vector
( 'new', 'york' )
)
) AS ?res
WHERE
{
{
SELECT
(
<SHORT_OR_LONG::>(?s1)
) AS ?c1
(
<sql:S_SUM>
(
<SHORT_OR_LONG::IRI_RANK> ( ?s1 ) ,
<SHORT_OR_LONG::> ( ?s1textp ) ,
<SHORT_OR_LONG::> ( ?o1 ) ,
?sc
)
) AS ?sm
WHERE
{
?s1 ?s1textp ?o1 .
?o1 bif:contains "new AND york"
OPTION ( SCORE ?sc )
}
ORDER BY
DESC
(
<sql:sum_rank>
((
<sql:S_SUM>
(
<SHORT_OR_LONG::IRI_RANK> ( ?s1 ) ,
<SHORT_OR_LONG::> ( ?s1textp ) ,
<SHORT_OR_LONG::> ( ?o1 ) ,
?sc
)
))
)
LIMIT 20
}
}
This takes some explaining. The basic part is
{
?s1 ?s1textp ?o1 .
?o1 bif:contains "new AND york"
OPTION ( SCORE ?sc )
}
This just makes tuples where ?s1 is the object, ?s1textp the property, and ?o1 the literal which contains "New York". For a single ?s1, there can of course be many properties which all contain "New York".
The rest of the query gathers all the "New York" containing properties of an entity into a single aggregate, and then gets the entity ranks of all such entities.
After this, the aggregates are sorted by a sum of the entity rank and a combined text score calculated based on the individual text match scores between "New York" and the strings containing "New York". The text hit score is higher if the words repeat often and in close proximity.
The s_sum function is a user-defined aggregate which takes 4 arguments: The rank of the subject of the triple; the predicate of the triple containing the text; the object of the triple containing the text; and the text match score.
These are grouped by the subject of the triple. After this, these are sorted by sum_score of the aggregate constructed with s_sum. The sum_score is a SQL function combining the entity rank with the text scores of the different literals.
This executes as one would expect: All partitions make a text index lookup, retrieving the object of the triple. The text index entries of an object are stored in the same partition as the object. But the entity rank is a property of the subject and is partitioned by the subject. Also the GROUP BY is by the subject. Thus the data is produced from all partitions, then streamed into the receiving partitions, determined by the subject. This partition can then get the score and group the matches by the subject. Since all these partial aggregates are partitioned by the subject, there is no need to merge them; thus, the top k sort can be done for each partition separately. Finally, the top 20 of each partition are merged into the global top 20. This is then passed to a final function s_sum_page that turns this all into an XML fragment that can be processed with XSLT for inclusion on a web page.
This differs from the text search engine in that the query pipeline can contain arbitrary cross-partition joins. Also, the string "New York" is a common label that occurs in many distinct entities. Thus one text match, to one document, in the case the containing only the string "New York" will get many entities, likely all from different partitions.
So, if we only want actors with a mention of "New York", we need to get the inner part of the query as:
{
?s1 ?s1textp ?o1 .
?o1 bif:contains "new AND york"
OPTION ( SCORE ?sc ) .
?s1 a <http://umbel.org/umbel/sc/Actor>
}
Whether an entity is an actor can be checked in the same partition as the rank of the entity. Thus the query plan gets this check right before getting the rank. This is natural since there is no point in getting the rank of something that is not an actor.
The <short_or_long::sql:func> notation means that we call func, which is a SQL stored procedure with the arguments in their internal form. Thus, if a variable bound to an IRI is passed, the short_or_long specifies that it is passed as its internal ID and is not converted into its text form. This is essential, since there is no point getting the text of half a million IRIs when only 20 at most will be shown in the end.
Now, when we run this on a collection of 4.5 billion triples of linked data, once we have the working set, we can get the top 20 "New York" occurrences, with text summaries and all, in just 1.1s, with 12 of 16 cores busy. (The hardware is two boxes with two quad-core Xeon 5345 each.)
If we run this query in two parallel sessions, we get both results in 1.9s, with 14 of 16 cores busy. This gets about 200K "New York" strings, which becomes about 400K entities with New York somewhere, for which a rank then has to be retrieved. After this, all the possibly-many occurrences of New York in the title, text, and other properties of the entity are aggregated together, resolving into some 220K groups. These are then sorted. This is internally over 1.5 million random lookups and some 40MB of traffic between processes. Restricting the type of the entity to actor drops the execution time of one query to 0.8s because there are then fewer ranks to retrieve and less data to aggregate and sort.
By adding partitions and cores, we scale horizontally, as evaluating the query involves almost no central control, even though data are swapped between partitions. There is some flow control to avoid constructing overly-large intermediate results but generally partitions run independently and asynchronously. In the above case, there is just one fence at the point where all aggregates are complete, so that they can be sorted; otherwise, all is asynchronous.
Doing JOINs between partitions and partitioned GROUP BY/ORDER BY is pretty regular database stuff. Applying this to RDF is a most natural thing.
If we do not parallelize the user-defined aggregate for grouping all the "New York" occurrences, the query takes 8s instead of 1.1s. If we could not put SQL procedures as user-defined aggregates to be parallelized with the query, we'd have to either bring all the data to a central point before the top k, which would destroy performance, or we would have to do procedures with explicit parallel procedure calls which is hard to write, surely too hard for ad hoc queries.
Results of live execution may not be complete on initial load, as this link includes a "Virtuoso Anytime" timeout of 10 seconds. Running against a cold cache, these results may take much longer to return; a warm cache will deliver response times along the lines of those discussed above.
Engineering matters. If we wish to commoditize queries on a lot of data, such intelligence in the DBMS is necessary; it is very unscalable to require people to do procedural code or give query parallelization hints. If you need to optimize a workload of 10 different transactions, this is of course possible and even desirable, but for the infinity of all search or analysis, this will not happen.