2012-04-21 3 views
7

È necessario implementare un meccanismo di limitazione (richieste al secondo) quando si utilizza HttpWebRequest per effettuare richieste parallele verso un server applicazioni. L'app My C# non deve inviare più di 80 richieste al secondo a un server remoto. Il limite è imposto dagli amministratori del servizio remoto non come limite rigido ma come "SLA" tra la mia piattaforma e la loro.Come limitare il numero di HttpWebRequest al secondo verso un server web?

Come posso controllare il numero di richieste al secondo quando si utilizza HttpWebRequest?

risposta

3

Ho avuto lo stesso problema e non sono riuscito a trovare una soluzione pronta, quindi ne ho creato uno ed eccolo qui. L'idea è di usare un BlockingCollection<T> per aggiungere elementi che necessitano di elaborazione e utilizzare le estensioni reattive per iscriversi con un processore a velocità limitata.

classe Throttle è la versione rinominata del this rate limiter

public static class BlockingCollectionExtensions 
{ 
    // TODO: devise a way to avoid problems if collection gets too big (produced faster than consumed) 
    public static IObservable<T> AsRateLimitedObservable<T>(this BlockingCollection<T> sequence, int items, TimeSpan timePeriod, CancellationToken producerToken) 
    { 
     Subject<T> subject = new Subject<T>(); 

     // this is a dummyToken just so we can recreate the TokenSource 
     // which we will pass the proxy class so it can cancel the task 
     // on disposal 
     CancellationToken dummyToken = new CancellationToken(); 
     CancellationTokenSource tokenSource = CancellationTokenSource.CreateLinkedTokenSource(producerToken, dummyToken); 

     var consumingTask = new Task(() => 
     { 
      using (var throttle = new Throttle(items, timePeriod)) 
      { 
       while (!sequence.IsCompleted) 
       { 
        try 
        { 
         T item = sequence.Take(producerToken); 
         throttle.WaitToProceed(); 
         try 
         { 
          subject.OnNext(item); 
         } 
         catch (Exception ex) 
         { 
          subject.OnError(ex); 
         } 
        } 
        catch (OperationCanceledException) 
        { 
         break; 
        } 
       } 
       subject.OnCompleted(); 
      } 
     }, TaskCreationOptions.LongRunning); 

     return new TaskAwareObservable<T>(subject, consumingTask, tokenSource); 
    } 

    private class TaskAwareObservable<T> : IObservable<T>, IDisposable 
    { 
     private readonly Task task; 
     private readonly Subject<T> subject; 
     private readonly CancellationTokenSource taskCancellationTokenSource; 

     public TaskAwareObservable(Subject<T> subject, Task task, CancellationTokenSource tokenSource) 
     { 
      this.task = task; 
      this.subject = subject; 
      this.taskCancellationTokenSource = tokenSource; 
     } 

     public IDisposable Subscribe(IObserver<T> observer) 
     { 
      var disposable = subject.Subscribe(observer); 
      if (task.Status == TaskStatus.Created) 
       task.Start(); 
      return disposable; 
     } 

     public void Dispose() 
     { 
      // cancel consumption and wait task to finish 
      taskCancellationTokenSource.Cancel(); 
      task.Wait(); 

      // dispose tokenSource and task 
      taskCancellationTokenSource.Dispose(); 
      task.Dispose(); 

      // dispose subject 
      subject.Dispose(); 
     } 
    } 
} 

prova Unità:

class BlockCollectionExtensionsTest 
{ 
    [Fact] 
    public void AsRateLimitedObservable() 
    { 
     const int maxItems = 1; // fix this to 1 to ease testing 
     TimeSpan during = TimeSpan.FromSeconds(1); 

     // populate collection 
     int[] items = new[] { 1, 2, 3, 4 }; 
     BlockingCollection<int> collection = new BlockingCollection<int>(); 
     foreach (var i in items) collection.Add(i); 
     collection.CompleteAdding(); 

     IObservable<int> observable = collection.AsRateLimitedObservable(maxItems, during, CancellationToken.None); 
     BlockingCollection<int> processedItems = new BlockingCollection<int>(); 
     ManualResetEvent completed = new ManualResetEvent(false); 
     DateTime last = DateTime.UtcNow; 
     observable 
      // this is so we'll receive exceptions 
      .ObserveOn(new SynchronizationContext()) 
      .Subscribe(item => 
       { 
        if (item == 1) 
         last = DateTime.UtcNow; 
        else 
        { 
         TimeSpan diff = (DateTime.UtcNow - last); 
         last = DateTime.UtcNow; 

         Assert.InRange(diff.TotalMilliseconds, 
          during.TotalMilliseconds - 30, 
          during.TotalMilliseconds + 30); 
        } 
        processedItems.Add(item); 
       }, 
       () => completed.Set() 
      ); 
     completed.WaitOne(); 
     Assert.Equal(items, processedItems, new CollectionEqualityComparer<int>()); 
    } 
} 
+0

qualcosa di brutto è successo all'URL –

-1

Il mio post originale ha discusso come aggiungere un meccanismo di limitazione a WCF tramite le estensioni del comportamento del client, ma poi è stato sottolineato che ho letto male la domanda (doh!).

In generale, l'approccio può essere quello di verificare con una classe che determina se stiamo violando il limite di velocità o meno. C'è già stato un sacco di discussioni su come controllare le violazioni dei tassi.

Throttling method calls to M requests in N seconds

Se si stanno violando il limite di velocità, poi dormire per un intervallo di correzione e prova di nuovo. In caso contrario, andare avanti e effettuare la chiamata HttpWebRequest.

+0

Nella domanda, non mi riferisco ad un webservice WCF. Si tratta di un semplice utilizzo della classe HttpWebRequest. –

+0

Ah è tardi e avrei dovuto leggere la domanda più da vicino :) Puoi ancora provare l'approccio di prima di effettuare una chiamata a HttpWebRequest, controlla con un'altra classe per assicurarti di non violare le 80 richieste/sec. Aggiornerò il mio codice qui sopra. –

+0

Ha chiesto C# non Java. – SmallChess

0

I metodi di estensione Throttle() e Sample() (On Observable) consentono di regolare una sequenza veloce di eventi in una sequenza "più lenta".

Here is a blog post with an example di Sample(Timespan) che garantisce una velocità massima.

+0

Il problema con Sample() e Throttle() è che saltano/buttano via i campioni per raggiungere la velocità specificata. – georgiosd