Ricevo dati (in streaming) da una fonte esterna (tramite Lightstreamer) nella mia applicazione C#. L'applicazione My C# riceve i dati dal listener. I dati dal listener sono memorizzati in una coda (ConcurrentQueue). La coda viene pulita ogni 0,5 secondi con TryDequeue in un DataTable. Il DataTable verrà quindi copiato in un database SQL utilizzando SqlBulkCopy. Il database SQL elabora i nuovi dati arrivati dalla tabella di staging nella tabella finale. Attualmente ricevo circa 300.000 file al giorno (può aumentare notevolmente nelle prossime settimane) e il mio obiettivo è di rimanere sotto 1 secondo dal momento in cui ricevo i dati finché non sono disponibili nella tabella SQL finale. Attualmente le righe massime per i secondi che devo elaborare sono circa 50 righe.C# modo più rapido per inserire dati nel database SQL
Sfortunatamente, da quando ricevo sempre più dati, la mia logica sta rallentando in termini di prestazioni (ancora molto meno di 1 secondo, ma voglio continuare a migliorare). Il collo di bottiglia principale (fino ad ora) è l'elaborazione dei dati di staging (sul database SQL) nella tabella finale. Per migliorare le prestazioni, vorrei passare alla tabella di staging in una tabella ottimizzata per la memoria. Il tavolo finale è già un tavolo ottimizzato per la memoria, quindi funzioneranno sicuramente bene.
Le mie domande:
- C'è un modo per utilizzare SqlBulkCopy (di C#), con tabelle di memoria ottimizzato? (per quanto ne so non c'è ancora modo)
- Qualche suggerimento per il modo più veloce per scrivere i dati ricevuti dalla mia applicazione C# nella tabella di staging ottimizzata per la memoria?
EDIT (con la soluzione):
Dopo i commenti/risposte e le valutazioni delle prestazioni ho deciso di rinunciare l'inserimento di massa e utilizzare SQLCommand alla consegna di un IEnumerable con i miei dati come parametro con valori di tabella in una stored procedure compilata nativa per archiviare i dati direttamente nella mia tabella finale ottimizzata per la memoria (così come una copia nella tabella "staging" che ora funge da archivio). Le prestazioni sono aumentate in modo significativo (anche se non ho considerato di parallelizzare gli inserimenti ancora (lo sarà in una fase successiva)).
Ecco parte del codice:
definito dall'utente tipo di tabella di memoria ottimizzato (alla consegna dei dati da C# in SQL (stored procedure):
CREATE TYPE [Staging].[CityIndexIntradayLivePrices] AS TABLE(
[CityIndexInstrumentID] [int] NOT NULL,
[CityIndexTimeStamp] [bigint] NOT NULL,
[BidPrice] [numeric](18, 8) NOT NULL,
[AskPrice] [numeric](18, 8) NOT NULL,
INDEX [IndexCityIndexIntradayLivePrices] NONCLUSTERED
(
[CityIndexInstrumentID] ASC,
[CityIndexTimeStamp] ASC,
[BidPrice] ASC,
[AskPrice] ASC
)
)
WITH (MEMORY_OPTIMIZED = ON)
nativi stored procedure compilate da inserire i dati in tavolo finale e messa in scena (che funge da archivio in questo caso):
create procedure [Staging].[spProcessCityIndexIntradayLivePricesStaging]
(
@ProcessingID int,
@CityIndexIntradayLivePrices Staging.CityIndexIntradayLivePrices readonly
)
with native_compilation, schemabinding, execute as owner
as
begin atomic
with (transaction isolation level=snapshot, language=N'us_english')
-- store prices
insert into TimeSeries.CityIndexIntradayLivePrices
(
ObjectID,
PerDateTime,
BidPrice,
AskPrice,
ProcessingID
)
select Objects.ObjectID,
CityIndexTimeStamp,
CityIndexIntradayLivePricesStaging.BidPrice,
CityIndexIntradayLivePricesStaging.AskPrice,
@ProcessingID
from @CityIndexIntradayLivePrices CityIndexIntradayLivePricesStaging,
Objects.Objects
where Objects.CityIndexInstrumentID = CityIndexIntradayLivePricesStaging.CityIndexInstrumentID
-- store data in staging table
insert into Staging.CityIndexIntradayLivePricesStaging
(
ImportProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
)
select @ProcessingID,
CityIndexInstrumentID,
CityIndexTimeStamp,
BidPrice,
AskPrice
from @CityIndexIntradayLivePrices
end
IEnumerable pieno di dalla coda:
private static IEnumerable<SqlDataRecord> CreateSqlDataRecords()
{
// set columns (the sequence is important as the sequence will be accordingly to the sequence of columns in the table-value parameter)
SqlMetaData MetaDataCol1;
SqlMetaData MetaDataCol2;
SqlMetaData MetaDataCol3;
SqlMetaData MetaDataCol4;
MetaDataCol1 = new SqlMetaData("CityIndexInstrumentID", SqlDbType.Int);
MetaDataCol2 = new SqlMetaData("CityIndexTimeStamp", SqlDbType.BigInt);
MetaDataCol3 = new SqlMetaData("BidPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
MetaDataCol4 = new SqlMetaData("AskPrice", SqlDbType.Decimal, 18, 8); // precision 18, 8 scale
// define sql data record with the columns
SqlDataRecord DataRecord = new SqlDataRecord(new SqlMetaData[] { MetaDataCol1, MetaDataCol2, MetaDataCol3, MetaDataCol4 });
// remove each price row from queue and add it to the sql data record
LightstreamerAPI.PriceDTO PriceDTO = new LightstreamerAPI.PriceDTO();
while (IntradayQuotesQueue.TryDequeue(out PriceDTO))
{
DataRecord.SetInt32(0, PriceDTO.MarketID); // city index market id
DataRecord.SetInt64(1, Convert.ToInt64((PriceDTO.TickDate.Replace(@"\/Date(", "")).Replace(@")\/", ""))); // @ is used to avoid problem with/as escape sequence)
DataRecord.SetDecimal(2, PriceDTO.Bid); // bid price
DataRecord.SetDecimal(3, PriceDTO.Offer); // ask price
yield return DataRecord;
}
}
Manipolazione dei dati ogni 0,5 secondi:
public static void ChildThreadIntradayQuotesHandler(Int32 CityIndexInterfaceProcessingID)
{
try
{
// open new sql connection
using (SqlConnection TimeSeriesDatabaseSQLConnection = new SqlConnection("Data Source=XXX;Initial Catalog=XXX;Integrated Security=SSPI;MultipleActiveResultSets=false"))
{
// open connection
TimeSeriesDatabaseSQLConnection.Open();
// endless loop to keep thread alive
while(true)
{
// ensure queue has rows to process (otherwise no need to continue)
if(IntradayQuotesQueue.Count > 0)
{
// define stored procedure for sql command
SqlCommand InsertCommand = new SqlCommand("Staging.spProcessCityIndexIntradayLivePricesStaging", TimeSeriesDatabaseSQLConnection);
// set command type to stored procedure
InsertCommand.CommandType = CommandType.StoredProcedure;
// define sql parameters (table-value parameter gets data from CreateSqlDataRecords())
SqlParameter ParameterCityIndexIntradayLivePrices = InsertCommand.Parameters.AddWithValue("@CityIndexIntradayLivePrices", CreateSqlDataRecords()); // table-valued parameter
SqlParameter ParameterProcessingID = InsertCommand.Parameters.AddWithValue("@ProcessingID", CityIndexInterfaceProcessingID); // processing id parameter
// set sql db type to structured for table-value paramter (structured = special data type for specifying structured data contained in table-valued parameters)
ParameterCityIndexIntradayLivePrices.SqlDbType = SqlDbType.Structured;
// execute stored procedure
InsertCommand.ExecuteNonQuery();
}
// wait 0.5 seconds
Thread.Sleep(500);
}
}
}
catch (Exception e)
{
// handle error (standard error messages and update processing)
ThreadErrorHandling(CityIndexInterfaceProcessingID, "ChildThreadIntradayQuotesHandler (handler stopped now)", e);
};
}
Guardare TVP (parametro valore tabella) - è possibile utilizzare come un DataReader inverso. https://lennilobel.wordpress.com/2009/07/29/sql-server-2008-table-valued-parameters-and-c-custom-iterators-a-match-made-in-heaven/ – Paparazzi