/// <summary>
/// Streaming APIを呼び出します。
/// </summary>
/// <typeparam name="T">APIからの戻り値の型</typeparam>
/// <param name="apiMethod">呼び出すAPI</param>
/// <param name="token">OAuthトークン</param>
/// <returns>APIからのレスポンスをpush通知する<see cref="IObservable{T}"/></returns>
public static IObservable<T> CallApi<T>(this IStreamingApi<T> apiMethod, Token token)
{
return Observable.Create<T>(observer =>
{
HttpWebRequest request = null;
Scheduler.TaskPool.Schedule(() =>
{
try
{
request = RequestGenerator.GenerateTwitterApiRequest(
apiMethod.RequestUri,
HttpMethodType.GET,
token,
DefaultSetting.Proxy,
DefaultSetting.Timeout,
DefaultSetting.UserAgent,
null,
apiMethod.Parameters,
null,
null
);
using (var res = request.GetResponse())//あとでレスポンスも扱うかもしれないので一応定義
using (var sr = new StreamReader(res.GetResponseStream()))
{
while (!sr.EndOfStream)
{
var line = sr.ReadLine();
if (!string.IsNullOrWhiteSpace(line))
observer.OnNext(apiMethod.Parse(line));
}
observer.OnCompleted();
}
}
catch (WebException ex)
{
if (ex.Status != WebExceptionStatus.RequestCanceled)
{
observer.OnError(ex);
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
return () =>
{
if (request != null)
request.Abort();
};
});
}