Streaming APIを実行する部分

  • タグ:
  • タグはありません
/// <summary>
/// Streaming APIを呼び出します。
/// </summary>
/// <typeparam name="T">APIからの戻り値の型</typeparam>
/// <param name="apiMethod">呼び出すAPI</param>
/// <param name="token">OAuthトークン</param>
/// <returns>APIからのレスポンスをpush通知する<see cref="IObservable{T}"/></returns>
public static IObservable<T> CallApi<T>(this IStreamingApi<T> apiMethod, Token token)
{
    return Observable.Create<T>(observer =>
    {
        HttpWebRequest request = null;

        Scheduler.TaskPool.Schedule(() =>
        {
            try
            {
                request = RequestGenerator.GenerateTwitterApiRequest(
                    apiMethod.RequestUri,
                    HttpMethodType.GET,
                    token,
                    DefaultSetting.Proxy,
                    DefaultSetting.Timeout,
                    DefaultSetting.UserAgent,
                    null,
                    apiMethod.Parameters,
                    null,
                    null
                );

                using (var res = request.GetResponse())//あとでレスポンスも扱うかもしれないので一応定義
                using (var sr = new StreamReader(res.GetResponseStream()))
                {
                    while (!sr.EndOfStream)
                    {
                        var line = sr.ReadLine();

                        if (!string.IsNullOrWhiteSpace(line))
                            observer.OnNext(apiMethod.Parse(line));
                    }

                    observer.OnCompleted();
                }
            }
            catch (WebException ex)
            {
                if (ex.Status != WebExceptionStatus.RequestCanceled)
                {
                    observer.OnError(ex);
                }
            }
            catch (Exception ex)
            {
                observer.OnError(ex);
            }
        });

        return () =>
        {
            if (request != null)
                request.Abort();
        };
    });
}