SPAseq: Semantic Complex Event Processing over RDF Graph Streams

Overview

Our goal is to make both offline and online application of SPAseq as simple as possible for other users. This allows them to easily apply SPAseq to their own data streams and facilitates reproduction of our evaluation. Therefore, we provide an easy-to-run executable of SPAseq. The source code of our system will be avialable after the reviewing process.

In our paper, we briefly dicussed all the features of our language; here we describe them with complete examples and uses cases.

V-Shaped Pattern

A query with V-shaped temporal pattern describes the sequence of values that go down to a local minimum, then rising up to a local maximum, which was higher than the starting value.

This uses case was described in detail in pur paper, here we use the stock-marked based query to showcase how it is written in SPAseq.

Query 1

PREFIX  c: <http://example/company#> 
PREFIX  pred: <http://example/>
SELECT ?company ?p1 ?p2 ?p3 ?vol1 ?vol2 ?vol3 
WITHIN  10 SECONDS 
PARTITON BY (?company) 
FROM STREAM  S1 <http://example.org/main> 
WHERE 
SEQ (A, B, C)
{
DEFINE GPM  A ON S1
{ 
?company pred:price ?p1.
?company pred:volume ?vol1. 
} 
DEFINE GPM  B ON S1
{
?company pred:price ?p2.
?company pred:volume ?vol2. 
Filter (?p2 < ?p1 )
}
DEFINE GPM  C ON S1
{
?company pred:price ?p3.
?company pred:volume ?vol3. 
Filter (?p3 > ?p2 && ?p3 > ?p1). 
}
}

The Query 1 showcase three important features of the SPAseq: first, the stateful behviour between different variables (?company, ?p1 and ?p2) among GPM expressions; second, the filter clause as utilised in the SPARQL; third the partition-by caluse. Due to space restriction we do not provide the details of the semantics of partition-by clause. However, it provides an important funtionality for SCEP application: events are typically partitioned by certain attributes and pattern detection usually becomes meaningful after partitioning them. We explain it through an example. Consider the above mentioned query is registered on the stock market stream where events from multiple different companies arrives in the stream. As we are only interested in V-shapped pattern for each distinct company, partitioning the stream my the ?company variable only consider the V-shapped pattern of each distinct mapping of ?company variable. Thus, its semantics are similar to the GROUP BY clause of SPARQL, where the mappings are first gruoped and then aggregates are applied on them. Although pattern-by operator does not add to the expressiveness, it provides a more concrete syntax for complex queries and optimisation opportunities at system level.

Inventory Management

Consider a system monitoring the status (surgical usage, recycling etc.) of equipments in a hospital by using various RFID sensors. Then we can constrcut critical alerts, such that if a surgical tool is washed/recycled and is put back into the use without being first disinfected, then inform the administrator.

The above use case requires the non-occurrence of an event by using a negation operator in the sequence expression (i.e., not disinfected). The results of such query can be composed using a CONSTRUCT clause from SPARQL; the query of this use case is described below.

Query 2

CONSTRUCT
{
?inst :usedIn ?uroomID.
?inst :hasName ?name.
?inst :usedBy ?e.
?e :employeeName ?eNameC.
?e :employeeID ?eIDC.
}
WITHIN  24 hours
FROM STREAM  S1 <http://hospital.org/instrumentRFID>
WHERE
{
SEQ (A, B!, C)
DEFINE GPM A ON S1 {
?inst :hasRoom ?roomID.
?inst :hasName ?name
?inst :hasStatus ?statusA.
Filter (?statusA = ‘recycle’^^xsd:string)
}
DEFINE GPM  B ON S1
?inst :hasStatus ?statusB.
Filter (?statusB = ‘disinfected’^^xsd:string)
}
DEFINE GPM C ON S1 {
?inst :hasStatus ?statusC.
?inst :usedIn ?uroomID.
?inst :usedBy ?e.
Graph <http://www.example.com/background/Knoweldge>
{
?e :employeeID ?eIDC.
?e :employeeName ?eNameC
}
Filter (?statusC = ‘To be used’^^xsd:string)
}
}

Query 2 presents two important features: first, the use of CONSTRUCT clause to produce new stream; second the use of the background knowledge to enrich the response of certain events. Query 2 consists of three GPM expression: the first clause (GPM A ON S1) determines the “recycling” status of an instrument and room ID of such operation. The second expression (GPM B ON S1) uses the stateful variable (?inst) to determine the current status of the same instrument. The third GPM expression (GPM C ON S1) uses the same stateful variable (?inst), and a set of triple patterns for static background knowledge within a GRAPH clause to extract the knowledge about the room and a person involved with such events.

SPAseq Sources

We have implemented our system in Java, which means it can simply be run on the Java Virtual Machine (requirement: 64-bit JDK 8 or higher version). We wanted to make sure that users do not have to install any third party libraries. Therefore, we have packaged our system with all necessary dependencies including the complete query parser, input file parser and both query engines. This explains the large size of the JAR. Our system accept the input stream file of the Nx format, where x = Triples, Quads, or any other number. For more details see the specification for the NQuads format, a extension for the N-Triples RDF format. Note that the parser handles any combination (cf. generalised triples or number of N-Triples syntax terms on each line (the number of terms per line can also vary).

Currently we provide the executable jar files for the SPAseq. The details are providede at this link . Our implementation is a command-line application with the following usage:

Usage: java -jar spaseq.jar [-s <STRING>] [-q <STRING>] [-st <String>]
-s,--streamfile <arg> Paths and prefixes of input stream files located in .stream file
-q,--query <arg> Input query file
-st,--streamtype <arg> Stream type for multiple streams, true: for random generation of events, false: for sequential-based generation
-kb,--knbase <arg> optional external KB file in N-Triples format
-h,--help show help


An Example of the usage is as follows:
java -jar spaseq.jar -s ./streamset.stream -q ./SHDQ1.q -st true -kb ./kb.nt

The first option describes how many triples should be parsed into one event. The second option describes if the streams are processed with event-based processing model or with incremental one. Then we have paths for the input file and query, finally the window option describe the size of the window in seconds. The output matches are stored in the ./result/queryResult1.txt. The folder named ./result should be placed in the same directory of the jar

Note: For large reservoir sizes, it makes sense to increase the JVM heap size (eg. -Xmx4096M) and use a concurrent garbage collector (-XX:+UseConcMarkSweepGC) since the concept of repeated creation/destroying of event-data may pose a challenge to garbage collection.

Conjunction and Disjunction

The conjunction operator determines whether two events u and v -- that are matched to graph pattern P1 and P2 -- occurs at the same time. Two events are said to occur at the same time if their timestamps overlaps. The conjunction of events is described in the sequence expression using the operator (<>). That is, the sequence expression SEQ (A ,(B<>C), D) depicts that events matched to the GPM A is immediately followed-by the conjunction of events matched to the GPM B and GPM C, and immediately followed-by and event matched to the GPM C. The following query descibes the conjunction of events for the stock market-based events

PREFIX  c: <http://example/company#> 
PREFIX  pred: <http://example/>
SELECT ?company ?p1 ?p2 ?p3 ?vol1 ?vol2 ?vol3 
WITHIN  10 SECONDS 
PARTITON BY (?company) 
FROM STREAM  S1 <http://example.org/main> 
WHERE 
SEQ (A, (B <> C))
{
DEFINE GPM  A ON S1
{ 
?company pred:price ?p1.
?company pred:volume ?vol1. 
} 
DEFINE GPM  B ON S1
{
?company pred:price ?p2.
?company pred:volume ?vol2. 
Filter (?p2 < ?p1 )
}
DEFINE GPM  C ON S1
{
?company pred:price ?p3.
?company pred:volume ?vol3. 
Filter (?p3 > ?p2 && ?p3 > ?p1). 
}
}

The disjunction operator determines whether an event from a set of events -- related through the disjunction operator -- occurs without having any constraints over the timestamps or order. Thus, a sequence expression with disjunction operators SEQ (A ,(B|C), D) depicts that events matched to the GPM A is immediately followed-by the conjunction of events matched to either GPM B or GPM C, and immediately followed-by and event matched to the GPM D.

PREFIX  c: <http://example/company#> 
PREFIX  pred: <http://example/>
SELECT ?company ?p1 ?p2 ?p3 ?vol1 ?vol2 ?vol3 
WITHIN  10 SECONDS 
PARTITON BY (?company) 
FROM STREAM  S1 <http://example.org/main> 
WHERE 
SEQ (A, (B | C))
{
DEFINE GPM  A ON S1
{ 
?company pred:price ?p1.
?company pred:volume ?vol1. 
} 
DEFINE GPM  B ON S1
{
?company pred:price ?p2.
?company pred:volume ?vol2. 
Filter (?p2 < ?p1 )
}
DEFINE GPM  C ON S1
{
?company pred:price ?p3.
?company pred:volume ?vol3. 
Filter (?p3 > ?p2 && ?p3 > ?p1). 
}
}

Grammar

We have extended the SPARQL 1.1 grammar with the temporal sequence expression as discussed in our paper. The Lexer tokens and the Parser of the SPAseq in ANTLR syntax is avaiable at the following links:
SpaseqLexer.g4
Spaseq.g4

Dataset

The two dataset generators (SMD and SHD) as discussed in the paper are aviilable at these repositories (SMD , SHD ). Each repository contains the details of generating new dataset from the dataset generators.