更新時間:2019-10-16 來源:黑馬程序員 瀏覽量:
1、簡介
Flink CEP是在flink之上實現的復雜事件處理(CEP)庫,它允許我們在事件流中檢測事件的模式,讓我們有機會掌握數據中重要的事項。
本文章主要是介紹了flink cep中可用的api調用,首先介紹Pattern
API,它允許你指定要在事件流中檢測的模式,并介紹匹配事件并對其進行操作。最后分析下CEP庫在處理事件時間延遲問題。【推薦了解大數據培訓課程】
2、使用步驟
(1)首先我們需要引入cep的依賴
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>1.5.0</version> </dependency>
(2)確定equals()和hashcode()方法
如果使用CEP,需要我們在datastream中的事件實現正確的equals()和hashcode()方法,因為Flink CEP使用他們來比較和匹配事件。
簡單demo代碼:
al input: DataStream[Event] = ... val pattern = Pattern.begin[Event]("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end") val patternStream = CEP.pattern(input, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_))
3、Pattern API
Pattern API允許你定義要從輸入流中提取的復雜模式序列。
每個復雜模式序列都是由多個簡單模式組成,簡單模式就是尋找具有相同屬性的單個事件的模式,我們可以先定義一些簡單的模式,然后組合成復雜的序列模式。
可以將模式序列視為此類模式的結構圖,基于用戶指定的條件從一個模式轉換到下一個模式,例如:
event.getName().equals(“start”).
匹配的是一系列輸入事件,通過一系列有效的模式轉換訪問復雜模式圖中的所有模式。注意每個模式必須具有唯一的名稱,以便后續可以使用該名稱來標識匹配的事件。模式名稱中不能包含字符”:”。
下面我們首先介紹如何定義單個模式,然后再將各個模式組合到復雜模式中。
單個模式
Pattern可以是單個,也可以是循環模式,單個模式接收單個事件,而循環模式可以接收多個事件,在模式匹配符號中,模式“a b + c?d”(或“a”,后跟一個或多個“b”,可選地后跟“c”,后跟“d”),a,c ?,和d是單例模式,而b +是循環模式。
默認情況下,模式是單個模式,可以使用Quantifiers將其轉換為循環模式。每個模式可以有一個或多個條件,基于它接收的事件。
Quantifiers
在FlinkCEP中,可以使用以下方法指定循環模式:pattern.oneOrMore(),用于期望一個或多個事件發生的模式(例如之前提到的b+);用于期望給定類型事件的特定出現次數的模式,對于名為start的模式,以下是有效的Quantifiers:
// expecting 4 occurrences start.times(4); // expecting 0 or 4 occurrences start.times(4).optional(); // expecting 2, 3 or 4 occurrences start.times(2, 4); // expecting 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).greedy(); // expecting 0, 2, 3 or 4 occurrences start.times(2, 4).optional(); // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible start.times(2, 4).optional().greedy(); // expecting 1 or more occurrences start.oneOrMore(); // expecting 1 or more occurrences and repeating as many as possible start.oneOrMore().greedy(); // expecting 0 or more occurrences start.oneOrMore().optional(); // expecting 0 or more occurrences and repeating as many as possible start.oneOrMore().optional().greedy(); // expecting 2 or more occurrences start.timesOrMore(2); // expecting 2 or more occurrences and repeating as many as possible start.timesOrMore(2).greedy(); // expecting 0, 2 or more occurrences and repeating as many as possible start.timesOrMore(2).optional().greedy();
Conditions條件
每個模式中,從一個模式轉到下一個模式,可以指定其他條件,我們可以使用下面這些條件:
1.傳入事件的屬性,例如其值應大于5,或者大于先前接收的事件的平均值;
2.匹配事件的連續性,例如檢測模式a,b,c序列中不能有任何非匹配事件。
Conditions on Properties關于屬性的條件
可以通過pattern.where(),pattern.or()或pattern.until()方法指定事件屬性的條件,條件可以是iterativeConditions或SimpleConditions。
1.迭代條件
這是最常見的條件類型,你可以指定一個條件,該條件基于先前接收的事件的屬性或器子集的統計信息來接收后續事件。
下面代碼說的是:如果名稱以”foo”開頭同時如果該模式的先前接收的事件的價格總和加上當前事件的價格不超過該值5.0,則迭代條件接收名為”middle”的模式的下一個事件:迭代條件可以很強大,尤其是與循環模式相結合,例如:oneOrMore();
middle.oneOrMore() .subtype(classOf[SubEvent]) .where( (value, ctx) => { lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum value.getName.startsWith("foo") && sum + value.getPrice < 5.0 } )
注意對context.getEventsForPattern()的調用將為給定潛在匹配項查找所有先前接收的事件,此操作代價可能會變化巨大,因此應盡量減少其使用。
2.簡單條件
這種類型的條件時擴展了前面提到的IterativeCondition類,并且僅根據事件本身的屬性決定是否接收事件:
start.where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) { return value.getName().startsWith("foo"); }});
此外還可以通過pattern.subtype(subclass)方法將接收事件的類型限定為初始事件類型的子類型:
start.where(event => event.getName.startsWith("foo"))
組合條件:
如上所示,可以將子類型條件與其他條件組合使用,這適用于所有條件。我們可以通過順序調用where()來任意組合條件。最終結果將是各個條件的結果的邏輯and,要使用or組合條件,可以使用or()方法,如下所示:
pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)
停止條件
在循環模式(oneOrMore()和oneOrMore().optional())的情況下,還可以指定停止條件,例如:接收值大于5的事件,直到其值的總和小于50。
我們看個例子來更好的理解:
給定模式:(a+ until b),b之前,要出現一個或者多個a,
給定輸入的序列:a1,c,a2,b,a3
輸出結果:{a1 a2}{a1}{a2}{a3}
我們可以看到{a1,a2,a3},{a2,a3}兩個并沒有輸出,這就是停止條件的作用。
連續事件的條件
Flink CEP支持事件之間以一下形式連續:
嚴格連續性:希望所有匹配事件一個接一個的出現,中間沒有任何不匹配的事件;
寬松連續性:忽略匹配的事件之間出現不匹配事件,不能忽略兩個事件之間的匹配事件。
非確定性輕松連續性:進一步放寬連續性,允許忽略某些匹配事件的其它匹配。
為了解釋上面的內容,我們舉個例子。假如有個模式序列"a+ b",輸入序列"a1,c,a2,b",不同連續條件下有不同的區別:
嚴格連續性:{a2 b} - 由于c的存在導致a1被廢棄
寬松連續性:{a1,b}和{a1 a2 b} - c被忽略
非確定性寬松連續性:{a1 b}, {a2 b}, 和 {a1 a2 b}
對于循環模式(例如oneOrMore()和times()),默認是寬松的連續性。 如果你想要嚴格的連續性,你必須使用consecutive()顯式指定它, 如果你想要非確定性的松弛連續性,你可以使用allowCombinations()方法。
組合模式
簡介
已經了解了單個模式的樣子,現在是時候看看如何將它們組合成一個完整的模式序列。
模式序列必須以初始模式開始,如下所示:
Patternstart = Pattern.begin("start");
接下來,您可以通過指定它們之間所需的連續條件,為模式序列添加更多模式。
在上一節中,我們描述了Flink支持的不同鄰接模式,即嚴格,寬松和非確定性寬松,以及如何在循環模式中應用它們。 要在連續模式之間應用它們,可以使用:
next() 對應嚴格, followedBy() 對應寬松連續性 followedByAny() 對應非確定性寬松連續性亦或
notNext() 如果不希望一個事件類型緊接著另一個類型出現。 notFollowedBy() 不希望兩個事件之間任何地方出現該事件。 注意
模式序列不能以notFollowedBy()結束。 注意 NOT模式前面不能有可選模式。
// strict contiguity Pattern<Event, ?> strict = start.next("middle").where(...); // relaxed contiguity Pattern<Event, ?> relaxed = start.followedBy("middle").where(...); // non-deterministic relaxed contiguity Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...); // NOT pattern with strict contiguity Pattern<Event, ?> strictNot = start.notNext("not").where(...); // NOT pattern with relaxed contiguity Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);
寬松連續性指的是僅第一個成功匹配的事件會被匹配到,然而非確定性寬松連續性,相同的開始會有多個匹配結果發出。距離,如果一個模式是"a b",給定輸入序列是"a c b1 b2"。對于不同連續性會有不同輸出。
a和b之間嚴格連續性,將會返回{},也即是沒有匹配。因為c的出現導致a,拋棄了。
a和b之間寬松連續性,返回的是{a,b1},因為寬松連續性將會拋棄為匹配成功的元素,直至匹配到下一個要匹配的事件。
a和b之間非確定性寬松連續性,返回的是{a,b1},{a,b2}。
也可以為模式定義時間約束。 例如,可以通過pattern.within()方法定義模式應在10秒內發生。 時間模式支持處理時間和事件時間。 注意模式序列只能有一個時間約束。 如果在不同的單獨模式上定義了多個這樣的約束,則應用最小的約束。
next.within(Time.seconds(10));
可以為begin,followBy,followByAny和next定義一個模式序列作為條件。模式序列將被邏輯地視為匹配條件,而且將返回GroupPattern并且 可對GroupPattern使用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),consecutive(), allowCombinations()等方法。