🦖
Wii
  • 原码补码反码
  • Archive
    • Job
      • Learn
      • 算法
      • Company
        • HundunDaxue
      • Company
      • 基础
        • 原码补码反码
      • 项目经验
      • require
    • Hobbies
      • Physics
        • 上帝粒子
        • 概述
        • 时间
      • Movie
        • MovieList
      • Psychology
        • Psychology
        • Chenli
          • ChenliLivingRoom
      • Philosophy
        • Philosophy
        • Conceptions
        • 导言
      • Travel
        • City
          • 昆明
          • 沈阳
        • articles-check-list
      • Sports
        • Swimming
        • Skiing
      • Earth
        • Ocean
          • Biology
      • Read
        • BookList
        • 道德经
        • BookToRead
      • Music
        • sort
      • PickUp
        • SoldiersSortie
    • Care
      • Illness
        • cold
        • 腹泻
        • acne
        • EmotionalControl
        • 咽炎
        • Anemia
      • Foods
        • 破壁机
          • 食谱
      • I
      • WishList
      • WithL
        • MF
          • LY
    • Wfw
      • QA
    • Mac
      • Brew
        • 软件安装目录
      • Usage
        • RunScriptAsRootOnBoot
        • Mac-Config
      • 制作启动U盘
      • Software
        • IntelliJIDEALicenseServer
          • run-license-server
    • PlantUML
      • plantuml
    • Windows
      • Windows常用命令
      • PowerShell
        • powershell 命令
      • Cmder
      • MTP服务驱动无法安装
    • English
      • 英语阅读2016-07-29
      • 英语阅读2016-08-11
    • Tools
      • Plantuml
        • Setup
      • Eclipse
        • Eclipse
        • Eclipse常见问题
      • CommonHotkey
      • Jetbrain
        • JetbrainIDEs
      • VSCode
      • SublimeText
        • 格式化代码
    • I
      • WHATIAM
    • Device
      • Netgear
        • Astrill
      • RPi
        • Hardware
    • AwesomeSoftware
    • RESTful
    • Course
      • 自然辩证法
        • 我国在生态文明建设中存在的困境及解决对策
        • 工程师应该具有的基本道德素养
        • 科学文化与人文文化的关系
      • 英语写作
        • Description
      • 分布式系统
        • 分布式系统概论
      • 英语口语
        • 辩论赛
    • CloudLib
      • 推荐0.1
    • Project
      • README
        • emq
          • Emq架构
        • 启动
        • Hikvision
          • TimeSetting
    • Efficient
    • Neu
      • IpCamera
        • live
        • ffmpeg
    • Matlab
      • Matlab 2016b 破解
    • SchoolWork
      • 学术道德与学术规范
    • git-push
  • Coding
    • Design Pattern
      • 设计模式笔记_四_工厂模式
      • 设计模式笔记_六_命令模式
      • 设计模式笔记_三_装饰者模式
      • 设计模式入门
      • 设计模式笔记_八_模板方法模式
      • 设计模式笔记_一_策略模式
      • 设计模式笔记_五_单件模式
      • 设计模式笔记_七_适配器模式与外观模式
      • 设计模式笔记_二_观察者模式
      • 设计模式笔记_十_状态模式
      • 设计模式笔记_九_迭代器与组合模式
    • C++
      • Notes
        • Practice
          • Logger
        • Thread
          • PosixThreadPrograming
          • ThreadNote
        • Features
          • FuturePromise
          • Lambda
        • STL
          • STLPractice
          • 迭代器
          • UnorderedMapSet
          • Containor
          • STL
          • Vector
        • CMake
          • Startup
          • CMakeExample
          • CMake
          • CMakeUsage
          • CMakeKnowledge
        • Mutex
        • Gdb
          • Gdb
        • LanguageNotes
          • Pointer
          • String
          • Functions
          • 友元
          • IO
          • OOP
          • Exceptions
          • Basic
          • 初始化
          • Random
          • 模板函数
        • Glog
          • glog
        • Thrift
          • Thrift
        • Valgrind
          • valgrind
        • 动态库 & 静态库
        • BookNotes
          • AboutC++
        • LRvalues
      • map
      • protobuf
      • Build
      • Seastar
        • Notes
          • std::move
          • Introduce
          • Install
            • BuildAndInstall
          • Steps
          • cmd
      • Tricks
      • Map
      • CommonOperation
      • FreqAlgorithm
    • Tools
      • Git
        • GitExamples
        • GitUsage
        • GitKnowledge
        • GitIgnoreExample
        • DeleteBigFileFromHistory
      • Vim
        • VimTips
        • 安装
        • Vim-Usage
        • Plugins
        • Vim-Config
      • SVN
        • svn服务器搭建
        • svn
    • Scala
      • Notes
        • Scala-模式匹配
        • Scala: 隐式
        • Scala-符号语法
        • Scala-函数
        • Scala面向对象编程
        • Scala 函数式编程
        • Scala:zipWithIndex
        • Future
        • Scala-语法
        • Scala-基础
      • DateTime
      • 规范
    • Python
      • Notes
        • BookNotes
          • 生成器
          • 垃圾回收机制
          • 数据结构
          • 数据类型
          • RegularExpression
          • 迭代器
          • NetworkProgramming
          • 函数式编程
          • 上下文管理器
          • PythonDataModel
          • 运算符
          • 魔术方法
          • 面向对象
          • 装饰器
          • 模块
          • MultithreadProgramming
          • 异常
          • 函数
        • Modules
          • stack
          • Datetime
          • shutil:文件操作
          • logging
          • urllib
          • Re
          • 容器数据类型
          • TypeError
          • str
          • queue
          • urllib-and-requests
          • Exception
          • path
          • os
        • Others
          • PythonSerialization
          • Python函数的docstring
          • PIL
          • type-cast
          • operations
          • Python-类
          • 组及命名组匹配
          • Package
          • jieba分词
          • logging模块
          • Python
          • print
        • Examples
          • 文件读取写入
          • 命名
          • 递归更改文件为windows合法名称
          • 定制命令行运行方式
          • Python处理Excel文件(xlsx文件格式)
          • 读取ini配置文件
          • tor代理
          • 添加父目录到Path
        • CommonTips
        • CodingStandards
          • python注释
          • PEP-8
      • Django
        • DjangoDocs
          • making queries
          • 设置media路径
          • models
          • manage.py使用
          • template
          • view
          • forms
          • setting.py 文件配置说明
          • nginx-deploy
          • 使用pymysql
          • 自定义tags和filters
          • admin-interface
        • DjangoRestFramework
          • Customer Permissions
          • Serializers
          • FileField绝对路径问题
          • DjangoRestFrameworkNotes
          • ViewSet
        • DjangoNotes
          • Model对象转化为Dict
          • QuerySet
      • Scrapy
        • Scrapy
        • Spider
        • Scrapy安装出错
        • Selector
        • Scrapy模拟登陆
      • Job
        • 字典
      • Pandas
        • pandas
        • PandasExamples
      • VirtualEnv
        • virtualenv
      • Numpy
        • NumpyUsage
        • numpy
      • Matplotlib
        • MatplotlibNotes
        • MatplotlibUsage
      • Database
        • 获取表字段
      • Pip
        • 更改源
      • Scipy
        • scipy
    • Web
      • 插件
        • bootstrap-table
          • bootstrap-table
        • bootstrap
          • 模态框
        • requirejs
          • requirejs
        • toastr
          • toastor
      • Koa
        • Notes
          • KoaNotes
      • SCSS
        • 常用标签
        • Watch
      • Vue
        • Notes
          • 路由
          • 参考
          • 组件
          • Plugins
          • Vuex
          • StartUp
      • 样式
      • CSS
        • CSS
      • 排版
      • Notes
        • 跨域访问
      • Hexo
        • HexoUsage
      • Nodejs
        • Koa
          • jest
          • ParamValidate
        • 仓库镜像
      • Express
        • Express
        • Jade
          • Jade
      • Canvas
        • Canvas
    • Basic
      • Data Structure
        • Heap
          • Heap
        • Tree
          • Tree
        • Benchmark
          • map
      • Boolean
      • MultithreadProgramming
      • Software Engineering
        • UML
          • UML
      • OOP
      • 介绍
    • Antlr
      • Example
      • Grammar
      • Antlr
    • Java
      • Library
        • MyBatis
          • generator
            • mybatis配置详解
          • mybatis-获取自增ID
          • mybatis
          • problems
        • log4j
          • Usage
      • Maven
        • MavenUsage
        • Maven
        • MavenProject
        • 项目RUL路径问题
        • MavenPom
        • Settings
        • PomCommon
        • PomExample
      • Notes
        • Features
          • Reflect
          • Java函数式编程
          • toMap
          • Closeable & AutoCloseable & Flushable
          • Annotations
        • Common
        • ThinkingInJava
          • 控制执行流程
          • 接口
          • 复用类
          • 内部类
          • 操作符
          • 访问权限控制
          • 一切都是对象
          • 多态
          • 初始化与清理
          • 对象导论
        • SwordToOffer
        • Network
        • Thread
          • ThreadPool
        • Basic Library
        • Collections
          • List Interface
        • CommandLine
        • Project Common
        • JavaLang
      • JVM
        • Monitor
          • Jmap
          • mat
          • Jstat
          • Monitor
        • Notes
          • JVM
        • GC
          • GC
          • Shenandoah
            • Shenandoah
        • JVM
    • Algorithm
      • Code
        • LeetCode
          • Python
            • 0000-0050
              • 0005
              • 0030
        • SwordToOffer
          • SwordToOffer
      • AlgorithmSummary
      • Classics
        • string
          • KMP
        • Other
          • FullPermutation
        • 链表
        • Sort
          • Sort
      • Other
        • README
      • Notes
        • Math
          • 两点计算直线方程
    • Go
      • Notes
        • Go Project Layout
        • Install
        • Startup
      • Basic
        • Startup
        • Types
    • JavaScript
      • MasonryLayouts
      • jquery
      • Notes
        • Promise
      • js
    • Android
      • SDK
        • 打开SDK Manager
    • C#
      • WebBrowser
      • c#图片
      • 跨线程访问控件
    • Knowledge
      • 函数式编程
      • 设计框架
    • Rules
      • Rules
    • React
      • ReactNative
        • React Native Navigation
        • 打包Apk
        • ReactNative
      • React
        • README
    • RegExp
      • 正则表达式
    • WeChatApp
      • 登陆
    • Node
      • Notes
        • StartUp
  • Computer Science
    • ICS Security
      • 工控网络
      • 工业控制系统
      • HoneyPot
        • 蜜罐软件
          • Honeyd
      • 工业以太网
      • CNVD
        • 环境及依赖
      • 现有蜜罐系统及工具
      • 工控系统安全措施
      • 蜜网
      • 蜜罐
      • 工控安全相关概念
    • Data Analysis
      • Data Mining
        • Notes
          • Data_Preprocessing
          • 数据预处理
          • 认识数据
          • Mining_Modeling
          • 数据探索
          • Python_Data_Mining_Functions
          • Python数据分析平台搭建
          • Reference_Books
          • 数据分析与挖掘基础
        • Jupyter
          • show
            • mean
      • Hadoop
        • Hadoop权威指南:数据完整性
        • Hadoop权威指南: I/O操作序 - 列化
        • Hadoop权威指南-从Hadoop URL读取数据
        • Hadoop权威指南:FSDataInputStream对象
        • HDFS常用命令
        • Hadoop权威指南:HDFS-数据流
        • Hadoop权威指南:通过distcp并行复制
        • Hadoop权威指南:压缩
        • Hadoop权威指南:HDFS-Hadoop存档
        • 解决使用Idea/Eclipse编写Hadoop程序包依赖问题
        • HDFS
        • Hadoop-命令
        • 简单javaHadoop应用程序从打包到提交运行
        • Hadoop权威指南:HDFS-写入数据
        • Hadoop权威指南:HDFS-目录,查询文件系统,删除文件
        • HadoopInputFormat-OutputFormart
        • Hadoop-HDFS命令行接口
        • Hadoop权威指南-MapReduce应用开发
        • Linux下使用javac编译Hadoop程序
        • Hadoop权威指南:通过FileSystem API读取数据
        • Hadoop专有数据类型
      • Spark
        • Spark计算模型
        • Spark-入门二
        • 安装Hadoop及Spark(Ubuntu 16.04)
        • Spark:核心概念简介
        • Spark:控制日志输入
        • Spark - RDD编程
        • Spark工作机制
        • Spark-一个独立应用
        • Spark
        • Spark:使用Spark Shell的两个示例
    • Linux
      • Notes
        • BuildInCommand
          • ls
          • ip
          • ftp
          • 目录栈操作
          • scp
          • expect
            • expect示例
            • expect手册
            • expect笔记
          • ps
          • vsftpd
          • wget
          • 压缩程序
            • zip_unzip
            • tar
            • p7zip
          • 部署web服务
          • avidemux
          • cat
          • Awk
          • find
          • pssh使用
          • grep
          • sed
          • 路径
          • 通用命令
          • 安装JDK
          • 进程管理
          • network
          • rsync
          • cron
          • 示例
          • 用户管理
          • supervisor
        • Common
        • TestFileProcess
          • 替换文件内容
        • Commonds
        • Permissions
      • Ubuntu
        • Ubuntu 服务器配置部署
        • Ubuntu笔记
        • Ubuntu网络配置
        • Ubuntu 16.04 几个国内源
      • Script
        • ShellProgramming
        • ShellExamples
        • ShellCommands
      • CentOS
        • Centos笔记
        • 源
        • CentOS-Network-Config
        • CentOS-Security
      • Squid
        • BuildByDocker
        • Squid
      • Problem
        • 常见错误
      • Linux
        • Linux-c-cpp
        • Linux
        • Linux-NetworkProgramming
      • Codes
        • cron-test-01
      • Software
        • Shortcut key
        • Anaconda
      • Make
        • tricks
      • Deepin
        • 安装docker
      • SRE
        • CommonCommand
    • Cloud Computing
      • OpenStack
        • Fuel离线安装OpenStack
        • 验证网络
        • OpenStackNotes
    • Network
      • TCP/IP
      • 套接字
      • OSI模型
    • Data mining
      • StartUp
    • Machine Learning
      • Notes
        • 决策树学习-周志华
        • 神经网络-周志华
        • 概念学习和一般到特殊序
        • MachineLearningProblems
        • Math
          • 概率论与数理统计
          • 数学概念
          • KKT条件
          • 最优化问题
          • 优化算法
          • 最小二乘
        • 模型评估与选择
        • 引言
        • 过拟合处理
        • StatisticalLearningMethod
          • 统计学习方法概论
          • 感知机
        • 评估假设
        • Code
          • FeatureEngineering
            • Iris
        • 概念
        • SVM
        • FeatureEngineering
        • 神经网络
        • 决策树学习
        • MachineLearningKnowledge
        • 线性模型
        • 术语概念
        • 拉格朗日乘子法
      • route
      • Jupyter
        • JupyterUsage
      • Anaconda
        • AnacondaUsage
      • Coursera
        • Week01
      • ScikitLearn
        • FitTransform
        • Preprocessing
      • Octave
        • Octave
    • Search
      • Lucene
        • Api
        • Concepts
    • Virtualization Tech
      • Docker
        • dockerNetwork
        • Ubuntu
        • DockerUsage
        • Mac OS
    • Database
      • MySQL
        • Mysql Cluster
        • mysql-cluster
        • mysql
      • 部署phpmyadmin
      • SQL
      • SQL
        • SQLStatement
    • Concepts
      • Other
      • Mohout
      • LDA
    • Distributed System
      • Concepts
        • TODO
        • TODO
    • Recommend System
      • DataPipline
        • DataBus
        • 系统
    • OS
      • OS-Code
      • Notes
        • Introduce
        • ProcessManagement
        • Kernel
    • Deep Learning
      • Code
        • README
      • Notes
        • Conceptions
        • 神经网络
        • LeNet5
        • CNN
      • Tensorflow
        • Notes
          • Tensorflow
          • tensorflow开始
        • Anaconda
    • Media
      • FFmpeg
        • LiveStream
          • run
    • Spider
      • Selenium
        • Selenium
    • IoT
      • emq
        • Authentication
    • Big Data
      • Hadoop
        • MR 作业
  • Architecture
    • Storage
      • Mongodb
        • Mongodb
        • Failed to unlink socket file
      • Pegasus
        • Pegasus
        • ShellTools
      • Rocksdb
        • RocksJava
        • RocksDB
        • 本地缓存
      • Redis
        • Install
        • RedisUsage
      • 基本要素
      • HBase
        • HBase
    • MQ
      • Kafka
        • VersionCompare
        • Deploy
        • cppkafka
        • CommandLineTools
        • OffsetManage
        • Attentions
        • Notes
        • QA
    • Framework
      • Java
        • Dubbo
          • Annotation
          • 简介
        • Spring
          • SpringTest
          • 常见错误
          • TransactionRollback
          • FileUpload
          • SpringMVCNote
          • IoC
          • Start
          • Notes
            • Config
          • Spring
          • springmvc
          • Modules
        • Rose
          • Get request body
        • Netty
          • Netty
        • SpringBoot
          • SpringBoot
      • Esper
        • Documents
          • Keyed Segmented Context
          • 01 - Getting Started
          • 02 - Event Representations
          • 03 - Processing Modal
      • Swagger
        • Swagger
    • RPC
      • Thrift
        • Notes
          • Set fields
          • ThriftTyps
        • BuildFromSource
        • ThriftUsage
        • Install
      • ProtocolBuffer
        • 减少内存拷贝
        • UsageExample
        • Arena
        • ProtocolBuffer
    • Distribution
      • CAP
    • Streaming
      • Spark
        • Init
      • MapReduce
      • Spark
    • Nginx
      • Nginx knowledge
      • nginx-configurtion
      • 403
      • nginx解析php文件时502
    • Governance
      • Consul
      • Zookeeper
        • Zookeeper
      • MicroServiceArchitecture
      • 依赖
    • Conceptions
      • CloudNative
    • Kibana
      • Query
    • Performance Optimizaition
      • Notes
        • Conceptions
        • CPUAffinity
  • Math
    • Probability Theory
      • 一些概率分布
    • Statistics
      • 统计量与估计量
    • Other
      • 排列组合
  • Tools
    • Markdown
      • syntax
    • Jetbrains
      • Jetbrains
    • Zsh
      • Install
  • TODO
由 GitBook 提供支持
在本页
  • Introduction
  • Insert Stream
  • 示例代码
  • Insert and Remove Stream
  • 示例代码
  • Filters and Where-clauses
  • 示例代码
  • Time Windows
  • Time Windows
  • Time Batch
  • Batch windows
  • Aggregation and Grouping
  • Insert and Remove Stream
  • Terms(术语)

这有帮助吗?

  1. Architecture
  2. Framework
  3. Esper
  4. Documents

03 - Processing Modal

上一页02 - Event Representations下一页Swagger

最后更新于4年前

这有帮助吗?

Introduction

Esper的处理模型是持续的: 根据语句对事件流, 试图, 过滤器和输出率, 引擎在处理该语句事件的同时更新监听器(update listeners)和/或语句的订阅者会接收到更新数据.

监听器的接口是com.espertech.esper.client.UpdateListener.

Esper引擎通过把结果放到com.espertech.esper.client.EventBean实例中, 来更新监听器. 典型的监听器实现通过getter方法查询EventBean实例, 以获取语句生成的结果.

EventBean接口的get方法可以被用来通过名称(name)检索结果列. 提供给get方法的属性名(property name)可以查询对象的嵌套, 索引, 数组属性.

EventBean接口的getUnderlying方法允许更新监听器获取underlying event object.

Insert Stream

我们来看一下一个非常简单的EPL语句的输出. 该语句选择一个事件流, 不使用数据窗口(data windows), 也不用过滤器:

select * from Withdrawal

该语句选择所有的Withdrawal事件. 每当引擎处理Withdrawal类型的事件, 或者Withdrawal子类型的事件时, 都会调用所有的更新监听器, 把新事件交给每一个该语句的监听器.

术语插入流(inset stream)表示新事件到达并且进入了一个数据窗口或集合. 上述例子中的Insert Stream是到达的Withdrawal事件, 并且被当做新事件发送给监听器.

下面的图展示了1-6系列Withdrawal事件依次到达. 括号中的数字是Withdrawal数量, 一个事件属性, 用来在接下的例子中对事件进行过滤.

上面的例子中, Esper引擎只对语句的监听者推送了new events, 没有old events.

示例代码

EsperInsertStream.java

package com.bovenson.esper;

import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;

public class EsperInsertStream {

    public static void main(String args[]) {
        EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider();

        engine.getEPAdministrator().getConfiguration().addEventType(EsperSimpleEventBean.class);

        String epl = "select * from " + EsperSimpleEventBean.class.getName();
        EPStatement statement = engine.getEPAdministrator().createEPL(epl);

        // 添加 update listener
        statement.addListener((newData, oldData) -> {
            if (newData != null) {
                String name = (String) newData[0].get("name");
                String data = (String) newData[0].get("data");
                System.out.println(String.format("newData info - Name: %s, Data: %s", name, data));
            } else {
                System.out.println("newData is null.");
            }

            if (oldData != null) {
                String name = (String) oldData[0].get("name");
                String data = (String) oldData[0].get("data");
                System.out.println(String.format("oldData info - Name: %s, Data: %s", name, data));
            } else {
                System.out.println("oldData is null.");
            }
        });

        // 发送事件
        engine.getEPRuntime().sendEvent(new EsperSimpleEventBean("Mike", "Good Bye."));
    }

    static class EsperSimpleEventBean {
        private String name;
        private String data;

        EsperSimpleEventBean(String name, String data) {
            this.name = name;
            this.data = data;
        }

        public String getName() {
            return name;
        }

        public String getData() {
            return data;
        }
    }
}

输出:

newData info - Name: Mike, Data: Good Bye.
oldData is null.

Insert and Remove Stream

长度窗口指示引擎仅保留流的最后N个事件。下面的语句对Withdrawal事件流用了长度窗口. 该声明用于说明数据窗口和进入离开数据窗口事件的概念.

select * from Withdrawal#length(5)

该语句中, 长度窗口是5个事件. Esper引擎把所有到达的Withdrawal事件放到这个长度窗口中. 当长度窗口被塞满之后, 最先到达的Withdrawal事件被推出该窗口. Esper引擎告知监听器: 所有进入长度窗口的事件为new events, 所有离开长度窗口的事件为old events.

术语insert stream表示新事件到达, 术语remove stream表示事件离开数据窗口或者更改集合数值. 在这个例子中, Withdrawal事件离开长度窗口, 并且该事件被当做old events推送给监听者.

下图阐述了当事件到达时, 长度窗口(length window)内容如何改变, 以及展示了事件被推送更新监听器.

像之前一样, 所有新到达的事件被当做new events推送给监听器. 另外, 当W6事件到达, W1事件离开长度窗口被当做old event推送给监听器.

和长度窗口类似, 时间窗口(time window)还将最近的事件保持到给定的时间段. 比如:

select rstream * from Withdrawal#time(30 sec)

该语句将最近30秒内的Withdrawal事件保存在事件窗口中. 随着时间流逝, 事件窗口的事件会被推出, 并被当做old events推送给监听者.

示例代码

package com.bovenson.esper;

import com.espertech.esper.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EsperInsertRemoveStreamLambda {
    private static final Logger log = LoggerFactory.getLogger(EsperInsertRemoveStreamLambda.class);

    public static void main(String args[]) {
        EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider();

        engine.getEPAdministrator().getConfiguration().addEventType(Withdrawal.class);

        String epl = String.format("select irstream * from %s#length(4)", Withdrawal.class.getName());
        // String epl = "select * from Withdrawal.win:length(2)";
        EPStatement statement = engine.getEPAdministrator().createEPL(epl);

        // 添加 update listener
        statement.addListener((EventBean[] newEvents, EventBean[] oldEvents) -> {
            if (newEvents != null) {
                // System.out.println("Length:" + newEvents.length);
                String name = (String) newEvents[0].get("name");
                int amount = (int) newEvents[0].get("amount");
                System.out.println(String.format("newEvents info - Name: %s, Amount: %d", name, amount));
            } else {
                System.out.println("newEvents is null.");
            }

            if (oldEvents != null) {
                String name = (String) oldEvents[0].get("name");
                int amount = (int) oldEvents[0].get("amount");
                System.out.println(String.format("oldEvents info - Name: %s, Amount: %d", name, amount));
            } else {
                System.out.println("oldEvents is null.");
            }
        });

        // 发送事件
        EPRuntime runtime = engine.getEPRuntime();
        runtime.sendEvent(new Withdrawal("W1", 500));
        runtime.sendEvent(new Withdrawal("W2", 100));
        runtime.sendEvent(new Withdrawal("W3", 200));
        runtime.sendEvent(new Withdrawal("W4", 300));
        runtime.sendEvent(new Withdrawal("W5", 50));
        runtime.sendEvent(new Withdrawal("W6", 150));
    }
}

输出:

newEvents info - Name: W1, Amount: 500
oldEvents is null.
newEvents info - Name: W2, Amount: 100
oldEvents is null.
newEvents info - Name: W3, Amount: 200
oldEvents is null.
newEvents info - Name: W4, Amount: 300
oldEvents is null.
newEvents info - Name: W5, Amount: 50
oldEvents info - Name: W1, Amount: 500
newEvents info - Name: W6, Amount: 150
oldEvents info - Name: W2, Amount: 100

Filters and Where-clauses

事件流过滤器允许在事件进入数据窗口之前, 对给定的流进行过滤. 下面的语句是一个选择amount>=200的Withdrawal事件的过滤器.

select * from Withdrawal(amount >= 200)#length(5)

利用这个过滤器, 任何amount小于200的Withdrawal事件都不会进入长度窗口, 也因此不会推送给更新监听器.

示例代码

package com.bovenson.esper.example;

import com.bovenson.esper.Withdrawal;
import com.espertech.esper.client.*;

public class EsperFilterExample {
    public static void main(String args[]) {
        EPServiceProvider engine = EPServiceProviderManager.getDefaultProvider();

        engine.getEPAdministrator().getConfiguration().addEventType(Withdrawal.class);

        String epl = String.format("select irstream * from %s(amount >= 200)", Withdrawal.class.getName());
        EPStatement statement = engine.getEPAdministrator().createEPL(epl);

        // 添加 update listener
        statement.addListener((EventBean[] newEvents, EventBean[] oldEvents) -> {
            if (newEvents != null) {
                String name = (String) newEvents[0].get("name");
                int amount = (int) newEvents[0].get("amount");
                System.out.println(String.format("newEvents info - Name: %s, Amount: %d", name, amount));
            } else {
                System.out.println("newEvents is null.");
            }

            if (oldEvents != null) {
                String name = (String) oldEvents[0].get("name");
                int amount = (int) oldEvents[0].get("amount");
                System.out.println(String.format("oldEvents info - Name: %s, Amount: %d", name, amount));
            } else {
                System.out.println("oldEvents is null.");
            }
            System.out.println("**********************************");
        });

        // 发送事件
        EPRuntime runtime = engine.getEPRuntime();
        runtime.sendEvent(new Withdrawal("W1", 500));
        runtime.sendEvent(new Withdrawal("W2", 100));
        runtime.sendEvent(new Withdrawal("W3", 200));
        runtime.sendEvent(new Withdrawal("W4", 300));
        runtime.sendEvent(new Withdrawal("W5", 50));
        runtime.sendEvent(new Withdrawal("W6", 150));
    }
}

输出:

newEvents info - Name: W1, Amount: 500
oldEvents is null.
**********************************
newEvents info - Name: W3, Amount: 200
oldEvents is null.
**********************************
newEvents info - Name: W4, Amount: 300
oldEvents is null.
**********************************

EPL语句中的where子句和having子句, 在更晚的阶段对数据进行过滤, 在此之前, 事件已经到达了数据窗口或其他视图.

下面的语句是使用where子句的示例.

select * from WithDrawal#length(5) where amount >= 200

where子句同时适用于new events 和 old events. 新事件到达时(new events), 只有满足where子句, 事件才会被推送个更新监听器; 当事件(old events)离开数据窗口时, 只有满足where子句的事件, 才会推送给更新监听器.

Time Windows

Time Windows

事件窗口是一个移动的窗口, 基于系统时间扩展特定的时间间隔到刚刚过去的时间. 时间窗口允许我们限制事件的数量.

有这样一个需求: 确定最后4秒提款的每个帐户的平均提款量大于1000的所有帐户, 实现的语句可以这样写:

select account, avg(amount)
from Withdrawal#time(4 sec)
group by account
having amount > 1000

下面的图, 展示了select irstream * from Withdrawal#time(4 sec)的过程.

Time Batch

Time Batch view (批时间处理视图) 缓存事件, 并以一定的时间间隔释放在一个更新中.

简单语句 select * from Withdrawal#time_batch(4 sec) 图示如下:

Batch windows

批处理窗口.

对事件进行分组的内置数据窗口是win:time_batch和win:length_batch视图等。

win:time_batch数据窗口收集在给定时间间隔内到达的事件,并在时间间隔结束时将收集的事件作为批处理发送给侦听器。

win:length_batch数据窗口收集给定数量的事件,并在收集给定数量的事件时将收集的事件作为批处理发送给侦听器。

以包含时间窗口的语句: select account, amount from Withdrawal#time_batch(1 sec)为例: 该语句收集一秒钟内到达的事件, 在一秒结束时Esper引擎把收集到的事件作为new events(insert stream)推送给监听器, 同时把上个时间间隔收集到的事件作为old events(remove stream)推送给监听器.

对于包含聚合函数或者group by子句的语句, Esper引擎把每个分组统一的聚合结果推送给监听器. 例如:

select sum(amount) as mysum from Withdrawal#time_batch(1 sec)

Aggregation and Grouping

聚合及分组.

Insert and Remove Stream

Terms(术语)

  • length window: 长度窗口

  • insert stream: 插入流

  • update listener: 更新监听器

  • time window: 时间窗口

对于结果交付, Esper提供了另一个可供选择的, 强类型的, 高性能的, 原生的方法: 订阅者对象(Subscriber object). 订阅者对象是查询结果和一个Java对象的直接绑定. 这个Java对象是一个POJO, 通过方法调用来接受语句的查询结果. 该订阅者类不需要实现一个接口或者继承一个类. .

查看更多