/// <summary> /// Streaming APIを呼び出します。 /// </summary> /// <typeparam name="T">APIからの戻り値の型</typeparam> /// <param name="apiMethod">呼び出すAPI</param> /// <param name="token">OAuthトークン</param> /// <returns>APIからのレスポンスをpush通知する<see cref="IObservable{T}"/></returns> public static IObservable<T> CallApi<T>(this IStreamingApi<T> apiMethod, Token token) { return Observable.Create<T>(observer => { HttpWebRequest request = null; Scheduler.TaskPool.Schedule(() => { try { request = RequestGenerator.GenerateTwitterApiRequest( apiMethod.RequestUri, HttpMethodType.GET, token, DefaultSetting.Proxy, DefaultSetting.Timeout, DefaultSetting.UserAgent, null, apiMethod.Parameters, null, null ); using (var res = request.GetResponse())//あとでレスポンスも扱うかもしれないので一応定義 using (var sr = new StreamReader(res.GetResponseStream())) { while (!sr.EndOfStream) { var line = sr.ReadLine(); if (!string.IsNullOrWhiteSpace(line)) observer.OnNext(apiMethod.Parse(line)); } observer.OnCompleted(); } } catch (WebException ex) { if (ex.Status != WebExceptionStatus.RequestCanceled) { observer.OnError(ex); } } catch (Exception ex) { observer.OnError(ex); } }); return () => { if (request != null) request.Abort(); }; }); }