F#與ASP.NET:使用F#實現基于事件的異步模式
在之前的文章F#與ASP.NET:基于事件的異步模式與異步Action中,我們的簡單討論了.NET中兩種異步模型以及它們在異常處理上的區別,并且簡單觀察了ASP.NET MVC 2中異步Action的編寫方式。從中我們得知,ASP.NET MVC 2的異步Action并非使用了傳統基于Begin/End的異步編程模型,而是另一種基于事件的異步模式。
此外,ASP.NET MVC 2對于這種異步模式提供了必要的支持,使此方面的程序設計變得相對簡單一些。但是,簡單的原因主要還是在于已經由其他組件提供了良好的,基于事件的異步模式。那么現在我們就來看看一般我們應該如何來實現這樣的功能,以及F#是如何美化我們的生活的吧。
異步數據傳輸
我們為什么要異步,主要目的之一還是提高I/O操作的伸縮性。I/O操作主要是I/O設備的事情,這些設備在好好工作的時候,是不需要系統花心思進行照看的,它們只要能夠在完成指定工作后通知系統就可以了。這也是異步I/O高效的原理,因為它將程序從等待I/O完成的苦悶中解脫出來,這對用戶或是系統來說都是一件絕好的事情。
那么說到I/O操作,最典型的場景之一便是數據傳輸了。比如有兩個數據流streamIn和streamOut,我們需要異步地從streamIn中讀取數據,并異步地寫入到streamOut中。在這個過程中,我們使用一個相對較小的byte數組作為緩存空間,這樣程序在進行數據傳輸時便不會占用太多內存。
那么,如果現在需要您編寫一個組件完成這樣的數據傳輸工作,并使用標準的基于事件的異步模式釋放出來,您會怎么做?基于事件的異步模式,要求在任務完成時使用事件進行提示。同時在出錯的時候將異常對象保存在事件的參數中。現在我已經幫您寫好了這樣的事件參數:
- public class CompletedEventArgs : EventArgs
- {
- public CompletedEventArgs(Exception ex)
- {
- this.Error = ex;
- }
- public Exception Error { get; private set; }
- }
那么接下來的工作就交給您了,以下代碼僅供參考:
- public class AsyncTransfer
- {
- private Stream m_streamIn;
- private Stream m_streamOut;
- public AsyncTransfer(Stream streamIn, Stream streamOut)
- {
- this.m_streamIn = streamIn;
- this.m_streamOut = streamOut;
- }
- public void StartAsync()
- {
- byte[] buffer = new byte[1024];
- this.m_streamIn.BeginRead(
- buffer, 0, buffer.Length,
- this.EndReadInputStreamCallback, buffer);
- }
- private void EndReadInputStreamCallback(IAsyncResult ar)
- {
- var buffer = (byte[])ar.AsyncState;
- int lengthRead;
- try
- {
- lengthRead = this.m_streamIn.EndRead(ar);
- }
- catch (Exception ex)
- {
- this.OnCompleted(ex);
- return;
- }
- if (lengthRead <= 0)
- {
- this.OnCompleted(null);
- }
- else
- {
- try
- {
- this.m_streamOut.BeginWrite(
- buffer, 0, lengthRead,
- this.EndWriteOutputStreamCallback, buffer);
- }
- catch (Exception ex)
- {
- this.OnCompleted(ex);
- }
- }
- }
- private void EndWriteOutputStreamCallback(IAsyncResult ar)
- {
- try
- {
- this.m_streamOut.EndWrite(ar);
- var buffer = (byte[])ar.AsyncState;
- this.m_streamIn.BeginRead(
- buffer, 0, buffer.Length,
- this.EndReadInputStreamCallback, buffer);
- }
- catch (Exception ex)
- {
- this.OnCompleted(ex);
- }
- }
- private void OnCompleted(Exception ex)
- {
- var handler = this.Completed;
- if (handler != null)
- {
- handler(this, new CompletedEventArgs(ex));
- }
- }
- public event EventHandler<CompletedEventArgs> Completed;
- }
是不是很復雜的樣子?編寫異步程序,基本則意味著要將原本同步的調用拆成兩段:發起及回調,這樣便讓上下文狀態的保存便的困難起來。幸運的是,C#這門語言提供了方便好用的匿名函數語法,這對于編寫一個回調函數來說已經非常容易了。但是,如果需要真正寫一個穩定、安全的異步程序,需要做的事情還有很多。
例如,一次異步操作結束之后會執行一個回調函數,那么如果在這個回調函數中拋出了一個異常那該怎么辦?如果不正確處理這個異常,輕則造成資源泄露,重則造成進程退出。因此在每個回調函數中,您會發現try...catch塊是必不可少的——甚至還需要兩段。
更復雜的可能還是在于邏輯控制上。這樣一個數據傳輸操作很顯然需要循環——讀一段,寫一段。但是由于需要編寫成二段式的異步調用,因此程序的邏輯會被拆得七零八落,我們沒法使用一個while塊包圍整段邏輯,編寫一個異步程序本來就是那么復雜。 #p#
編寫簡單的代理
現在我們已經有了一個異步傳輸數據的組件,就用它來做一些有趣的事情吧。例如,我們可以在ASP.NET應用程序中建立一個簡單的代理,即給定一個URL,在服務器端發起這樣一個請求,并將這個URL的數據傳輸到客戶端來。簡單起見,除了進行數據傳輸之外,我們只需要簡單地輸出Content Type頭信息即可。以下代碼僅供參考:
- public class AsyncWebTransfer
- {
- private WebRequest m_request;
- private WebResponse m_response;
- private HttpContextBase m_context;
- private string m_url;
- public AsyncWebTransfer(HttpContextBase context, string url)
- {
- this.m_context = context;
- this.m_url = url;
- }
- public void StartAsync()
- {
- this.m_request = WebRequest.Create(this.m_url);
- this.m_request.BeginGetResponse(this.EndGetResponseCallback, null);
- }
- private void EndGetResponseCallback(IAsyncResult ar)
- {
- try
- {
- thisthis.m_response = this.m_request.EndGetResponse(ar);
- thisthis.m_context.Response.ContentType = this.m_response.ContentType;
- var streamIn = this.m_response.GetResponseStream();
- var streamOut = this.m_context.Response.OutputStream;
- var transfer = new AsyncTransfer(streamIn, streamOut);
- transfer.Completed += (sender, args) => this.OnCompleted(args.Error);
- transfer.StartAsync();
- }
- catch(Exception ex)
- {
- this.OnCompleted(ex);
- }
- }
- private void OnCompleted(Exception ex)
- {
- if (this.m_response != null)
- {
- this.m_response.Close();
- this.m_response = null;
- }
- var handler = this.Completed;
- if (handler != null)
- {
- handler(this, new CompletedEventArgs(ex));
- }
- }
- public event EventHandler<CompletedEventArgs> Completed;
- }
如果說之前的AsyncTransfer類是基于“Begin/End異步編程模型”實現的基于事件的異步模式,那么AsyncWebTransfer便是基于“基于事件的異步模式”實現的基于事件的異步模式了。嗯,似乎有點繞口,不過我相信這段代碼對您來說還是不難理解的。
使用F#完成異步工作
事實上我已經很久沒有寫過這樣的代碼了,咎其原因還是被F#給寵壞了。嘗試了C# 2.0之后我便拋棄了Java語言,熟悉了C# 3.0之后我用C# 2.0就快寫不了程序了,而使用了F#進行異步編程之后,我就再也沒有使用C#寫過異步操作了。那么我們就來看看F#是如何進行異步數據傳輸的吧:
- let rec transferAsync (streamIn: Stream) (streamOut: Stream) buffer =
- async {
- let! lengthRead = streamIn.AsyncRead(buffer, 0, buffer.Length)
- if lengthRead > 0 then
- do! streamOut.AsyncWrite(buffer, 0, lengthRead)
- do! transferAsync streamIn streamOut buffer
- }
上面的代碼利用了尾遞歸進行不斷地數據傳輸,我們也可以使用傳統的while循環來實現這個功能:
- let transferImperativelyAsync (streamIn: Stream) (streamOut: Stream) buffer =
- async {
- let hasData = ref true
- while (hasData.Value) do
- let! lengthRead = streamIn.AsyncRead(buffer, 0, buffer.Length)
- if lengthRead > 0 then
- do! streamOut.AsyncWrite(buffer, 0, lengthRead)
- else
- hasData := false
- }
有了transferAsync函數,編寫一個資源請求的代理也是幾分鐘的事情:
- let webTransferAsync (context: HttpContextBase) (url: string) =
- async {
- let request = WebRequest.Create(url)
- use! response = request.GetResponseAsync()
- context.Response.ContentType <- response.ContentType
- let streamIn = response.GetResponseStream()
- let streamOut = context.Response.OutputStream
- let buffer = Array.zeroCreate 1024
- do! transferAsync streamIn streamOut buffer
- }
沒錯,就是這么簡單,這就是F#中編寫異步任務方式。在執行這兩個函數時(當然確切地說,是執行這兩個函數所生成的異步工作流),便會在出現“感嘆號”的操作之處自動分成二段式的異步調用,但是在程序的寫法上和同步代碼可謂毫無二致。 #p#
使用F#實現基于事件的異步模式
當然,光有上面的代碼還不夠,因為這樣的代碼無法交給C#代碼來使用,我們還需要將它們封裝成基于事件的異步模式。不過這也非常簡單,使用一個通用的抽象基類即可:
- [<AbstractClass>]
- type AsyncWorker(asyncWork: Async) =
- let completed = new Event()
- [<CLIEvent>]
- member e.Completed = completed.Publish
- member e.StartAsync() =
- Async.StartWithContinuations
- (asyncWork,
- (fun _ -> completed.Trigger(new CompletedEventArgs(null))),
- (fun ex -> completed.Trigger(new CompletedEventArgs(ex))),
- (fun ex -> ex |> ignore))
在使用F#進行面向對象開發時,由于不需要C#的架子代碼,它實現相同的結構一般都會顯得緊湊不少(不過在我看來,C#在進行一般的命令式編程時還是比F#來的方便一些)。在StartAsync方法中,我們使用Async.StartWithContinuations發起一個異步工作流,而這個異步工作流便是從構造函數中傳入的具體任務。StartWithContinuations方法的后三個參數分別是成功時的回調,失敗后的回調,以及任務取消后的回調。您可能會說,難道F#中不需要異常處理,不需要資源釋放嗎?當然需要。只不過:
1) 異常處理已經由StartWithContinuations統一完成了,我們只要按照“同步式”代碼的寫法編寫邏輯,也就是說,從語義上說您可以看作存在一個巨大的try...catch圍繞著整段代碼。
2) 而對于資源釋放來說,您可以發現在webTransferAsync方法中有一個use!指令,這便是告訴F#的異步框架,在整個異步工作流結束之后需要調用這個資源的Dispose方法——沒錯,您可以把它看作是一種能在異步環境下工作的C# using關鍵字。有了AsyncWorker類之后,AsyncTransfer和WebAsyncTransfer類也可輕易實現了:
- type AsyncTransfer(streamIn: Stream, streamOut: Stream) =
- inherit AsyncWorker(
- Transfer.transferAsync streamIn streamOut (Array.zeroCreate 1024))
- type AsyncWebTransfer(context: HttpContextBase, url: string) =
- inherit AsyncWorker(Transfer.webTransferAsync context url)最后,只要在ASP.NET MVC中使用即可:
- public void LoadFsAsync(string url)
- {
- AsyncManager.OutstandingOperations.Increment();
- var transfer = new FSharpAsync.AsyncWebTransfer(HttpContext, url);
- transfer.Completed += (sender, args) =>
- AsyncManager.OutstandingOperations.Decrement();
- transfer.StartAsync();
- }
- public ActionResult LoadFsCompleted()
- {
- return new EmptyResult();
- }
事實上,在ImageController中我還提供了一個LoadAsync及對應的LoadCompleted方法,它們使用的是利用C#實現的AsyncWebTransfer類。猜猜看這樣的代碼長成什么樣?其實只是將上面的AsyncWebTransfer的命名空間改成CSharpAsync而已——F#與其它.NET代碼是真正做到無縫集成的。
總結
這便是F#的偉大之處。時常有朋友會問我為什么對F#有那么大的興趣,我想,如果借助F#可以用十分之一的時間,十分之一的代碼行數,寫出執行效果相同,但可維護性高出好幾倍的程序來。
【編輯推薦】

















