Streaming APIを実行する部分

  • タグ:
  • タグはありません
/// <summary>
/// Streaming API
/// </summary>
/// <typeparam name="T">API</typeparam>
/// <param name="apiMethod">API</param>
/// <param name="token">OAuth</param>
/// <returns>APIpush<see cref="IObservable{T}"/></returns>
public static IObservable<T> CallApi<T>(this IStreamingApi<T> apiMethod, Token token)
{
return Observable.Create<T>(observer =>
{
HttpWebRequest request = null;
Scheduler.TaskPool.Schedule(() =>
{
try
{
request = RequestGenerator.GenerateTwitterApiRequest(
apiMethod.RequestUri,
HttpMethodType.GET,
token,
DefaultSetting.Proxy,
DefaultSetting.Timeout,
DefaultSetting.UserAgent,
null,
apiMethod.Parameters,
null,
null
);
using (var res = request.GetResponse())//
using (var sr = new StreamReader(res.GetResponseStream()))
{
while (!sr.EndOfStream)
{
var line = sr.ReadLine();
if (!string.IsNullOrWhiteSpace(line))
observer.OnNext(apiMethod.Parse(line));
}
observer.OnCompleted();
}
}
catch (WebException ex)
{
if (ex.Status != WebExceptionStatus.RequestCanceled)
{
observer.OnError(ex);
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
return () =>
{
if (request != null)
request.Abort();
};
});
}
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX