2016-04-19 20 views
5

Ricevo dati (in streaming) da una fonte esterna (tramite Lightstreamer) nella mia applicazione C#. L'applicazione My C# riceve i dati dal listener. I dati dal listener sono memorizzati in una coda (ConcurrentQueue). La coda viene pulita ogni 0,5 secondi con TryDequeue in un DataTable. Il DataTable verrà quindi copiato in un database SQL utilizzando SqlBulkCopy. Il database SQL elabora i nuovi dati arrivati ​​dalla tabella di staging nella tabella finale. Attualmente ricevo circa 300.000 file al giorno (può aumentare notevolmente nelle prossime settimane) e il mio obiettivo è di rimanere sotto 1 secondo dal momento in cui ricevo i dati finché non sono disponibili nella tabella SQL finale. Attualmente le righe massime per i secondi che devo elaborare sono circa 50 righe.C# modo più rapido per inserire dati nel database SQL

Sfortunatamente, da quando ricevo sempre più dati, la mia logica sta rallentando in termini di prestazioni (ancora molto meno di 1 secondo, ma voglio continuare a migliorare). Il collo di bottiglia principale (fino ad ora) è l'elaborazione dei dati di staging (sul database SQL) nella tabella finale. Per migliorare le prestazioni, vorrei passare alla tabella di staging in una tabella ottimizzata per la memoria. Il tavolo finale è già un tavolo ottimizzato per la memoria, quindi funzioneranno sicuramente bene.

Le mie domande:

  1. C'è un modo per utilizzare SqlBulkCopy (di C#), con tabelle di memoria ottimizzato? (per quanto ne so non c'è ancora modo)
  2. Qualche suggerimento per il modo più veloce per scrivere i dati ricevuti dalla mia applicazione C# nella tabella di staging ottimizzata per la memoria?

EDIT (con la soluzione):

Dopo i commenti/risposte e le valutazioni delle prestazioni ho deciso di rinunciare l'inserimento di massa e utilizzare SQLCommand alla consegna di un IEnumerable con i miei dati come parametro con valori di tabella in una stored procedure compilata nativa per archiviare i dati direttamente nella mia tabella finale ottimizzata per la memoria (così come una copia nella tabella "staging" che ora funge da archivio). Le prestazioni sono aumentate in modo significativo (anche se non ho considerato di parallelizzare gli inserimenti ancora (lo sarà in una fase successiva)).

Ecco parte del codice:

definito dall'utente tipo di tabella di memoria ottimizzato (alla consegna dei dati da C# in SQL (stored procedure):

CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
    [CityIndexInstrumentID] [int] NOT NULL, 
    [CityIndexTimeStamp] [bigint] NOT NULL, 
    [BidPrice] [numeric](18, 8) NOT NULL, 
    [AskPrice] [numeric](18, 8) NOT NULL, 
    INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED 
(
    [CityIndexInstrumentID] ASC, 
    [CityIndexTimeStamp] ASC, 
    [BidPrice] ASC, 
    [AskPrice] ASC 
) 
) 
WITH (MEMORY_OPTIMIZED = ON) 

nativi stored procedure compilate da inserire i dati in tavolo finale e messa in scena (che funge da archivio in questo caso):

create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging] 
(
    @ProcessingID int, 
    @CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly 
) 
with native_compilation, schemabinding, execute as owner 
as 
begin atomic 
with (transaction isolation level=snapshot, language=N'us_english') 


    -- store prices 

    insert into TimeSeries.CityIndexIntradayLivePrices 
    (
     ObjectID, 
     PerDateTime, 
     BidPrice, 
     AskPrice, 
     ProcessingID 
    ) 
    select Objects.ObjectID, 
    CityIndexTimeStamp, 
    CityIndexIntradayLivePricesStaging.BidPrice, 
    CityIndexIntradayLivePricesStaging.AskPrice, 
    @ProcessingID 
    from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging, 
    Objects.Objects 
    where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID 


    -- store data in staging table 

    insert into Staging.CityIndexIntradayLivePricesStaging 
    (
     ImportProcessingID, 
     CityIndexInstrumentID, 
     CityIndexTimeStamp, 
     BidPrice, 
     AskPrice 
    ) 
    select @ProcessingID, 
    CityIndexInstrumentID, 
    CityIndexTimeStamp, 
    BidPrice, 
    AskPrice 
    from @CityIndexIntradayLivePrices 


end 

IEnumerable pieno di dalla coda:

private static IEnumerable<SqlDataRecord> CreateSqlDataRecords() 
{ 


    // set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter) 

    SqlMetaData MetaDataCol1; 
    SqlMetaData MetaDataCol2; 
    SqlMetaData MetaDataCol3; 
    SqlMetaData MetaDataCol4; 

    MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int); 
    MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt); 
    MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale 
    MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale 


    // define sql data record with the columns 

    SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 }); 


    // remove each price row from queue and add it to the sql data record 

    LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO(); 

    while (IntradayQuotesQueue.TryDequeue(out PriceDTO)) 
    { 

     DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id 
     DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with/as escape sequence) 
     DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price 
     DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price 

     yield return DataRecord; 

    } 


} 

Manipolazione dei dati ogni 0,5 secondi:

public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID) 
{ 


    try 
    { 

     // open new sql connection 

     using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false")) 
     { 


      // open connection 

      TimeSeriesDatabaseSQLConnection.Open(); 


      // endless loop to keep thread alive 

      while(true) 
      { 


       // ensure queue has rows to process (otherwise no need to continue) 

       if(IntradayQuotesQueue.Count > 0) 
       { 


        // define stored procedure for sql command 

        SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection); 


        // set command type to stored procedure 

        InsertCommand.CommandType = CommandType.StoredProcedure; 


        // define sql parameters (table-value parameter gets data from CreateSqlDataRecords()) 

        SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter 
        SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter 


        // set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters) 

        ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured; 


        // execute stored procedure 

        InsertCommand.ExecuteNonQuery(); 


       } 


       // wait 0.5 seconds 

       Thread.Sleep(500); 


      } 

     } 

    } 
    catch (Exception e) 
    { 

     // handle error (standard error messages and update processing) 

     ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e); 

    }; 


} 
+0

Guardare TVP (parametro valore tabella) - è possibile utilizzare come un DataReader inverso. https://lennilobel.wordpress.com/2009/07/29/sql-server-2008-table-valued-parameters-and-c-custom-iterators-a-match-made-in-heaven/ – Paparazzi

risposta

2

Utilizzare SQL Server 2016 (RTM non è ancora, ma è già molto meglio di 2014, quando si tratta di tabelle di memoria ottimizzato). Quindi usa uno memory-optimized table variable o fai esplodere un bel po 'di chiamate native stored procedure in una transazione, ognuna facendo un solo inserto, a seconda di cosa è più veloce nel tuo scenario (questo varia).Alcune cose a cui prestare attenzione:

  • Fare inserimenti multipli in una transazione è fondamentale per risparmiare sui viaggi di andata e ritorno della rete. Mentre le operazioni in memoria sono molto veloci, SQL Server deve ancora confermare ogni operazione.
  • A seconda di come si producono i dati, è possibile che parallelizzare gli inserti possa velocizzare le cose (non esagerare, si colpirà rapidamente il punto di saturazione). Non cercare di essere molto intelligente qui; leva async/await e/o Parallel.ForEach.
  • Se passi un parametro con valori di tabella, il modo più semplice per farlo è quello di passare un valore di parametro a DataTable, ma questo non è il modo più efficiente per farlo, vale a dire passare un IEnumerable<SqlDataRecord>. È possibile utilizzare un metodo iteratore per generare i valori, quindi viene allocata solo una quantità costante di memoria.

Dovrai sperimentare un po 'per trovare il modo ottimale di passare i dati; questo dipende molto dalle dimensioni dei tuoi dati e da come lo stai ottenendo.

+0

Attualmente sono utilizzando SQL Server 2014 e sono stato in grado di implementare la mia soluzione con esso (anche se ho dovuto fare alcuni piccoli compromessi). Ma considererò SQL Server 2016 il prima possibile. IEnumerable funziona alla grande, più velocemente di DataTable. L'uso di parametri con valori di tabella ottimizzati per la memoria e la stored procedure compilata nativa ha ridotto il carico di lavoro del database molto. – Reboon

0

Batch i dati dalla tabella di staging alla tabella finale nei conteggi delle righe inferiori a 5k, in genere utilizzo 4k e non li inserisco in una transazione. Invece, implementare le transazioni programmatiche, se necessario. Rimanendo sotto 5k righe inserite mantiene il numero di blocchi di riga da escalation in un lock di tabella, che deve attendere fino a quando tutti gli altri escono dalla tabella.

0

Sei sicuro che la tua logica rallenti e non le transazioni effettive nel database? Entity Framework ad esempio è "sensibile", per mancanza di un termine migliore, quando si tenta di inserire una tonnellata di righe e diventa piuttosto lento.

C'è una libreria di terze parti, BulkInsert, su CodePlex, che ho usato ed è abbastanza bello fare l'inserimento di massa di dati: https://efbulkinsert.codeplex.com/

si potrebbe anche scrivere il proprio metodo di estensione sul DbContext se EF che fa anche questo potrebbe essere basato sul conteggio dei record. Qualunque cosa al di sotto di 5000 righe usa Save(), qualsiasi cosa su cui puoi invocare la tua logica di inserimento di massa.