用Java開發IBM Streams應用
InfoSphere Streams 概述
是一個高級計算平臺,幫助用戶開發的應用程序快速攝取、分析和關聯來自數千個實時源的信息。該解決方案可處理非常高的數據吞吐率,最高可達每秒數百萬個事件或消息。該平臺支持流數據的實時處理,支持不斷更新持續查詢的結果,可在仍在移動的數據流中檢測洞察。Streams旨在從一個幾分鐘到幾小時的窗口中的移動信息(數據流)中揭示有意義的模式。該平臺能夠獲取低延遲洞察,并為注重時效的應用程序(比如欺詐檢測或網絡管理)獲取更好的成果,從而提供業務價值。流處理的演示如下圖所示:
Streams 的主要設計目的是:
- 快速響應事件和不斷變化的業務條件與需求。
- 支持以比現有系統快幾個數量級的數據處理速度對數據執行持續分析。
- 快速適應不斷變化的數據形式和類型。
- 管理流模式的高可用性、異構性和分布。
- 為共享的信息提供安全性和信息機密性。
提供了一種編程模型和 IDE 來定義數據來源,還提供了已融合到處理執行單元中的稱為運算符的軟件分析模塊。它還提供了基礎架構來支持從這些組件合成可擴展的流處理應用程序。主要平臺組件包括:
- 運行時環境:這包括平臺服務,以及一個用于在單個主機或一組集成的主機上部署和監視 Streams 應用程序的調度程序。
- 編程模型:您可使用 SPL(Streams Processing Language,流處理語言,一種聲明性語言)來編寫 Streams 應用程序。可使用該語言陳述您的需求,運行時環境會承擔確定如何最佳地服務該請求的責任。在此模型中,一個 Streams 應用程序表示為一個由運算符和連接它們的流組成的圖表。
- 監視工具和管理接口:Streams 應用程序處理數據的速度比普通的操作系統監視實用程序快得多。InfoSphere Streams 提供了可處理此環境的工具。
Streams原生編程語言--SPL
Streams Processing Language (SPL),Streams 的編程語言,是一種分布式數據流合成語言。它是一種類似 C++ 或 Java™ 的可擴展且全功能的語言,支持用戶定義的數據類型。您可以使用 SPL 或原生語言(C++ 或Java)編寫自定義函數。也可以使用 C++ 或 Java 編寫用戶定義的運算符。
Streams 通過SPL將應用程序會描述一個導向圖,該圖由各個互聯且處理多個數據流的運算符組成。數據流可來自系統外部,或者在應用程序內部生成。SPL 程序的基本構建塊包括:
- 流:一個無限的結構化元組序列。它可逐個元組地由運算符使用或通過一個窗口的定義來使用。
- 元組:屬性及其類型的一個結構化列表。流上的每個元組擁有由其流類型指定的形式。
- 流類型:指定元組中每個屬性的名稱和數據類型。
- 窗口:一個有限、有序的元組分組。它可以基于計數、時間、屬性值或標點符號。
- 運算符:SPL 的基礎構建塊,它的運算符會處理來自流的數據并可生成新流。
- 處理元素 (PE):基礎執行單元。一個 PE 可封裝單個運算符或多個合并的運算符。
- 作業:一個已部署好的用來執行的 Streams 應用程序。它由一個或多個 PE 組成。除了一組 PE 之外,SPL 編譯器還會生成一個 ADL(Application Description Language,應用程序描述語言)文件來描述應用程序的結構。該 ADL 文件包含每個 PE 的詳細信息,比如要加載和執行哪個二進制文件,調度限制、流格式和一個內部運算符數據流圖。
Streams編程語言的另一選擇--Java
Java 作為面向對象的高級編程語言,以其使用簡單、完全面象對象、平臺可移植性、健壯的沙盒安全機制、動態性,以及大量可用的開發包等一系列優勢,在互聯網分布式環境下得到了極其廣泛的應用,具有廣泛的用戶基礎。為了Streams用戶重用已有的Java開發技能、保護已有的Java資產,IBM Streams平臺提供了使用 Java 編程語言來構建 Streams 應用程序的框架,具體包括 Java 運算符模型描述文件以及 Java 運算符 API(JavaOp)兩種方式。這兩種方式在一定程度上讓開發人員集成Java功能模塊。
streamsx.topology項目
雖然Streams所提供的Java運算符模型描述文件以及Java運算符API(JavaOp)方式支持了Java代碼調用,但是,傳統的Java是面向對象的編程語言,它只能幫助開發人員實現業務邏輯或重用Java代碼,但它無法以“流處理”的思維,直接進行類似SPL的流應用開發。
streamsx.topology開源項目的出現,豐富了Streams的開發方式,為流應用的開發者提供更多的語言選擇。streamsx.topology項目提供Java Application API,面向流處理應用的將Java封裝成一套類庫,使得開發者完全使用Java和Scala語言并按照“流處理”的思維創建IBM Streams流處理應用。
streamsx.topology開源項目參考網址:
//ibmstreams.github.io/streamsx.topology/
運行streamx.topology的的sample程序
1. 從www.ibm.com/software/data/infosphere/stream-computing/trials.html下載“IBM InfoSphere Streams 4.0 Java API BetaQuickStart VM Image”。Streams Quick StartEdition 是 InfoSphere Streams 的一個免費的、可下載的非生產版本,它沒有數據或時間限制,支持您在自己的獨特環境中試驗流計算,構建一個強大的分析平臺。該平臺能夠處理難以置信的高數據吞吐量,高達每秒數百萬個事件或消息。InfoSphere Streams QuickStart Edition 沒有提供支持選項,僅適用于非生產用途。要獲得相應的支持,請購買 InfoSphereStreams。
2.解壓VM鏡像,并在VMPlayer啟動VM。
該VM已經安裝com.ibm.streamsx.topology工具箱,工具箱位于/home/streamsadmin/streamx.topology/streamsx.topology,包含:
- com.ibm.streamsx.topology- 拓撲工具箱,讓您能采用Java開發您的Streams應用
- samples- 演示JavaApplication API的示例集合。
- 運行示例應用
1)在桌面雙擊InfoSphere Streams Studio (Eclipse)圖標啟動Streams Studio.
2)指定workspace為:/home/streamsadmin/Workspaces/topology/
3) 運行"Hello World" 示例程序:在Project Explorer標簽, 打開src->simple->HelloWorld->HelloWorld.java,代碼如下:
package simple; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; publicclass HelloWorld { publicstaticvoid main(String[] args) throws Exception { /* * Create the container for the topology that will * hold the streams of tuples. */ Topology topology = new Topology("HelloWorld"); /* * Declare a source stream (hw) with String tuples containing two tuples, * "Hello" and "World!". */ TStream<String> hw = topology.strings("Hello", "World!"); /* * Sink hw by printing each of its tuples to System.out. */ hw.print(); if (args.length == 0) StreamsContextFactory.getEmbedded().submit(topology).get(); else StreamsContextFactory.getStreamsContext(args[0]).submit(topology) .get(); } } |
4) 運行"Hello World" 示例程序:右擊HelloWorld.java,選擇Run As-> Run Configurations. 在Run Configurations 'Main' 標簽頁面,確保Main class填 simple.HelloWorld. 在 arguments標簽頁面, 設置Program arguments為EMBEDDED (EMBEDDED表示程序獨立編譯并嵌入到JVM運行,而不依賴Streams運行時環境)。
5) 設置必要參數后,運行該應用您會看到以下的輸出:
Hello
world!
使用Java Application API開發Streams應用
我們創建一個名叫MyGrep的Sample應用,用于指導關鍵字搜索某個文件夾下的文件,搜索到則顯示相應內容所在的行數和內容。具體步驟如下:
1)創建Java項目: File->New->Project->Java->JavaProject,點擊Next,在Create a Java Project填寫MySamples,點擊Next。
2)在Libraies標簽頁:
點擊External Jar按鈕,選擇com.ibm.streams.topology.jar
點擊Add Library按鈕,選擇IBM InfoSphere Streams
點擊Next和Finish完成項目的創建。新創建項目視圖如下圖所示:
3)創建命名空間:右擊src->New->Package->JavaPackage的Name填寫:mysapce
4)創建Java主類:src->右擊myspace->New->Class,在Name填寫:mysapce,確保勾選“public static void main(String[]args)”。確定后生成MyGrep.java。
5)創建Java類:src->右擊myspace->New->Class,在Name填寫:GrepInfo,不要勾選“public static void main(String[]args)”,確定后生成GrepInfo.java。
6)MyGrep.java和GrepInfo.java的代碼內容如下:
MyGrep.java
package myspace; import java.io.ObjectStreamException; import java.util.Arrays; import java.util.concurrent.Future; import com.ibm.streamsx.topology.TStream; import com.ibm.streamsx.topology.Topology; import com.ibm.streamsx.topology.context.StreamsContextFactory; import com.ibm.streamsx.topology.file.FileStreams; import com.ibm.streamsx.topology.function7.Function; publicclass MyGrep { publicstaticvoid main(String[] args) throws Exception { String contextType = args[0]; String directory = args[1]; final String term = args[2]; Topology topology = new Topology("MyGrep"); TStream<String> filePaths = FileStreams.directoryWatcher(topology, directory); TStream<String> lines = FileStreams.textFileReader(filePaths); TStream<GrepInfo> grepInfo = lines.multiTransform( new Function<String, Iterable<GrepInfo>>() { privatestaticfinallongserialVersionUID = 1L; privateintlineNum = 0; @Override public Iterable<GrepInfo> apply(String line) { ++lineNum; if(line.contains(term)){ return Arrays.asList(new GrepInfo(lineNum, line)); } else returnnull; } private Object readResolve() throws ObjectStreamException { returnthis; } }, GrepInfo.class); grepInfo.print(); Future<?> future = StreamsContextFactory.getStreamsContext(contextType) .submit(topology); Thread.sleep(30 * 1000); future.cancel(true); } } |
GrepInfo.java
package myspace; import java.io.Serializable; import com.ibm.streamsx.topology.tuple.Keyable; publicclass GrepInfo implements Keyable<GrepInfo>, Serializable { privatestaticfinallongserialVersionUID = 1L; intlineNum; String lineStr; public GrepInfo(int ln, String ls) { this.lineNum = ln; this.lineStr = ls; } @Override public String toString() { return"Line Num " + lineNum + " : " + lineStr; } @Override public GrepInfo getKey() { // TODO Auto-generated method stub returnnull; } } |
7)運行MyGrep之前,請確保Streams Instance已經啟動,并在/home/streamsadmin/test創建一個文本文件并寫如若干內容。
8)運行程序:右擊MyGrep.java,選擇Run As -> RunConfigurations. 在Run Configurations 'Main' 標簽頁面,確保Project填寫MySamples和Main class填 myspace.MyGrep。
在 arguments標簽頁面, 設置Program arguments為DISTRIBUTED /home/streamsadmin/test China (DISTRIBUTED 表示程序部署到Streams運行時環境,/home/streamsadmin/test是程序搜索關鍵的目錄;China是搜索關鍵字)。
9)查看結果:
在Streams Exploere -> StreamsInstances ->右擊default:<instance>@<Domain>,選擇Show Instance Graph
在Instance Graph窗口,我們能看到MyGrep最終運行圖。右擊最后的Print PE->Show Log->Show PEConsole
在Console將會顯現MyGrep運行的結果
總結
streams.topology開源項目所提供的Java Application API使得Streams開發者對流應用的編程語言有了新的選擇,它能幫助開發者重用Java編程能力,并按照“流處理”的思路簡化流應用的開發過程,讓開發者更專注于業務的處理邏輯而不是流處理的框架。然而,該項目還處于早期階段,很多功能和接口尚未實現;對比成熟的、完善的SPL,Java Application API的功能和成熟性還有很大差距。相信在不久的將來,streams.topology將會逐漸完善并成為IBM Streams平臺的一個重要補充。
詳情請咨詢!
客服熱線:023-66090381