Esper学习之八:EPL语法(四)

转载请注明出处:

国庆假期之后的工作周,居然苦逼的只有一个休息日,博客没写成不说,打球还把脚给扭了。不仅如此,这周开始疯狂加班了,所以今天这篇拖了又拖。。。

关于EPL,已经写了三篇了,预估计了一下,除了今天这篇,后面还有5篇左右。大家可别嫌多,官方的文档对EPL的讲解有将近140页,我已经尽量将废话都干掉了,再配合我附上的例子,看我的10篇文章比那140页英文文档肯定舒服多了吧。也请各位原谅我一周一篇的速度,毕竟我还要学习,生活,工作,一个都不能少。

今天讲解的内容包括三块:Order by,Limit,Insert into。大家会SQL的应该很熟悉这三个东西,前两个比较简单,Insert into会有一些差别,篇幅也相对多些。

1.Order by

EPL的Order by和SQL的几乎一模一样,作用都是对输出结果进行排序,但是也有一些需要注意的地方。语法如下:

order by expression [asc | desc] [, expression [asc | desc]] [, …]

expreession表示要排序的字段,asc表示升序排列(从小到大),desc表示降序排列(从大到小)。举个例子:

// 每进入5个事件输出一次,并且先按照name升序排列,再按照age降序排列。select * from User output every 5 events order by name, age desc使用方法很简单,除了和SQL相似的特点外,还有他自己需要注意的几点:

a. 如果不特别说明是升序还是降序,默认情况下按照升序排列。

b. 如果order by的子句中出现了聚合函数,那么该聚合函数必须出现在select的子句中。

c. 出现在select中的expression或者在select中定义的expression,在order by中也有效。

d. 如果order by所在的句子没有join或者没有group by,则排序结果幂等,否则为非幂等。

2. Limit

Limit在EPL中和在SQL中也基本一样,不过SQL中是用具体的数字来表示限制范围,而EPL可以是常量或者变量来表示限制范围。语法如下:

limit row_count [offset offset_count]row_count表示输出多少行,可以是一个整型常量,也可以是一个整型变量,以方便运行时修改。

offset_count表示在当前结果集中跳过n行然后再输出,同样也可以是一个整型变量。如果不使用此参数,则表示跳过0行,即从第一行输出。举例如下:

// 输出结果集的第3行到第10行select uri, count(*) from WebEvent group by uri output snapshot every 1 minute order by count(*) desc limit 8 offset 2除了以上的语法,limit还有一种简化的写法,实际上是参照SQL的标准。limit offset_count[, row_count]两个参数的含义和上面的一样,并且我们将上面的例子改写一下:// 输出结果集的第3行到第10行select uri, count(*) from WebEvent group by uri output snapshot every 1 minute order by count(*) desc limit 2, 8如果这个两个参数是负数会怎么样呢?

row_count为负数,则无限制输出,若为0,则不输出。当row_count是变量表示并且变量为null,则无限制输出。

offset _count是不允许负数的,如果是变量表示,并且变量值为null或者负数,则EPL会把他假设为0。

3. Insert into

3.1 简单用法

EPL的Insert into和SQL的有比较大的区别。SQL是往一张表里插入数据,而EPL是把一个事件流的计算结果放入另一个事件流,然后可以对这个事件流进行别的计算。所以Insert into的一个好处就是可以将是事件流的计算结果不断级联,对于那种需要将上一个业务的结果数据放到下一个业务处理的场景再适合不过了。除此之外,Insert into还有合并多个计算结果的作用。到这里相信大家已经对他越来越好奇了,不急,咱们先来看看语法:

insert [istream | irstream | rstream] into event_stream_name [ (property_name [, property_name] ) ]

event_stream_name定义了事件流的名称,在执行完insert的定义之后,我们可以使用select对这个事件流进行别的计算。

istream | irstream | rstream表示该事件流允许另一个事件的输入/输入和输出/输出数据能够进入(解释好像很绕。。一会儿看例子就能明白了)

property_name表示该事件流里包含的属性名称,多个属性名之间用逗号分割,并且用小括号括起来。

上面的说明可能不是很好理解,咱们先看个例子:

// 将新进入的Asus事件传递到Computer,且Asus的id,size和Computer的cid,csize对应insert into Computer(cid,csize) select id,size from Asus// 第二种写法insert into Computer select id as cid, size as csize Asus 从例子中可以看到,insert into需要配合select进行使用,以表明前一个事件流有哪些计算结果将进入insert into定义的事件流。并且在select中的字段要和insert里的事件流的属性要对应(这里指的对应是数据类型对应,而且属性数量也必须一样)。如果说insert定义的事件流名称在之前已经定义过(insert into中定义的除外),重名是不允许的。

我个人推荐第二种写法,通过as设置的别名即为insert定义的事件流的属性,这样可以避免属性的个数不一致的错误。

刚才说了istream | irstream | rstream的用法,可能有点表述不清楚,这里看一个完整的例子。

/** * * @author luonanqin * */class Asus{private int id;private int size;public int getId(){return id;}public void setId(int id){this.id = id;}public int getSize(){return size;}public void setSize(int size){this.size = size;}public String toString(){return "id: " + id + ", size: " + size;}}class InsertRstreamListener implements UpdateListener{public void update(EventBean[] newEvents, EventBean[] oldEvents){if (newEvents != null){for (int i = 0; i < newEvents.length; i++){Object id = newEvents[i].get("cid");System.out.println("Insert Asus: cid: " + id);}}if (oldEvents != null){for (int i = 0; i < oldEvents.length; i++){Object id = oldEvents[i].get("cid");System.out.println("Remove Asus: cid: " + id);}}System.out.println();}}public class InsertRstreamTest {public static void main(String[] args) throws InterruptedException {EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();EPAdministrator admin = epService.getEPAdministrator();String asus = Asus.class.getName();String insertEPL = "insert rstream into Computer(cid,csize) select id,size from " + asus + ".win:length(1)";String insertSelectEPL = "select cid from Computer.win:length_batch(2)";EPStatement state = admin.createEPL(insertEPL);EPStatement state1 = admin.createEPL(insertSelectEPL);state1.addListener(new InsertRstreamListener());EPRuntime runtime = epService.getEPRuntime();Asus apple1 = new Asus();apple1.setId(1);apple1.setSize(1);System.out.println("Send Asus: " + apple1);runtime.sendEvent(apple1);Asus apple2 = new Asus();apple2.setId(2);apple2.setSize(1);System.out.println("Send Asus: " + apple2);runtime.sendEvent(apple2);Asus apple3 = new Asus();apple3.setId(3);apple3.setSize(3);System.out.println("Send Asus: " + apple3);runtime.sendEvent(apple3);Asus apple4 = new Asus();apple4.setId(4);apple4.setSize(4);System.out.println("Send Asus: " + apple4);runtime.sendEvent(apple4);Asus apple5 = new Asus();apple5.setId(5);apple5.setSize(3);System.out.println("Send Asus: " + apple5);runtime.sendEvent(apple5);Asus apple6 = new Asus();apple6.setId(6);apple6.setSize(4);System.out.println("Send Asus: " + apple6);runtime.sendEvent(apple6);}}执行结果:Send Asus: id: 1, size: 1Send Asus: id: 2, size: 1Send Asus: id: 3, size: 3Insert Asus: cid: 1Insert Asus: cid: 2Send Asus: id: 4, size: 4Send Asus: id: 5, size: 3Insert Asus: cid: 3Insert Asus: cid: 4Send Asus: id: 6, size: 4 这个例子中,insertEPL表示当Asus事件从length为1的view中移除时,把移除的事件放入Computer。insertSelectEPL是对Computer的事件流进行计算,这里只是在每进入两个事件时才输出这两个事件的cid。而rstream在这里的表现,从执行结果中可以看到,在进入id为1 2 3的事件后,insertSelectEPL的监听器被触发,因为id为1和2的事件是在发送了Asus的id为2和3的事件之后被移除了,之后就进入了Computer,并满足了length=2,因此在监听器里看到有id为1和2的事件进入了Computer。

如果不显示指定rstream,则insert into只允许istream的事件流进入Computer。如果指定为irstream,,那么进入的和移除的Asus都会进入到Computer。

那么,不如我们礼貌地保持相对距离,不至于太冷,不至于太痛。

Esper学习之八:EPL语法(四)

相关文章:

你感兴趣的文章:

标签云: