2013-08-09 4 views
9

Supponiamo di voler utilizzare una mappa mutabile in Scala per tenere traccia del numero di volte in cui ho visto alcune stringhe. In un contesto thread singolo, questo è facile:Thread: trasformazione sicura di un valore in una mappa mutabile

import scala.collection.mutable.{ Map => MMap } 

class Counter { 
    val counts = MMap.empty[String, Int].withDefaultValue(0) 

    def add(s: String): Unit = counts(s) += 1 
} 

Purtroppo questo non è thread-safe, poiché la get e update non accada atomicamente.

Concurrent maps aggiungere a few atomic operations alla mappa API mutevole, ma non quello che ho bisogno, che sarebbe simile a questa:

def replace(k: A, f: B => B): Option[B] 

So che posso usare ScalaSTM s' TMap:

import scala.concurrent.stm._ 

class Counter { 
    val counts = TMap.empty[String, Int] 

    def add(s: String): Unit = atomic { implicit txn => 
    counts(s) = counts.get(s).getOrElse(0) + 1 
    } 
} 

Ma (per ora) è ancora una dipendenza extra. Altre opzioni includono gli attori (un'altra dipendenza), la sincronizzazione (potenzialmente meno efficiente) o Java atomic references (less idiomatic).

In generale eviterei mappe mutabili a Scala, ma a volte ho bisogno di questo genere di cose, e più recentemente ho usato l'approccio STM (invece di incrociare le dita e sperare che non capisco morso dalla soluzione ingenua).

So che ci sono un certo numero di compromessi qui (dipendenze extra vs. prestazioni rispetto alla chiarezza, ecc.), Ma c'è qualcosa come una risposta "giusta" a questo problema in Scala 2.10?

+1

che dire di un singolo attore Akka che scrive alla mappa mutevole? "Counter.add" invia semplicemente un messaggio ignifugo. Per quanto riguarda le letture, a seconda delle esigenze possono accadere contemporaneamente o passare attraverso l'attore. – gourlaysama

risposta

3

La soluzione più semplice è sicuramente la sincronizzazione. Se non c'è troppa contesa, le prestazioni potrebbero non essere poi così male.

In caso contrario, è possibile provare a implementare la propria implementazione replace simile a STM. Qualcosa di simile potrebbe fare:

object ConcurrentMapOps { 
    private val rng = new util.Random 
    private val MaxReplaceRetryCount = 10 
    private val MinReplaceBackoffTime: Long = 1 
    private val MaxReplaceBackoffTime: Long = 20 
} 
implicit class ConcurrentMapOps[A, B](val m: collection.concurrent.Map[A,B]) { 
    import ConcurrentMapOps._ 
    private def replaceBackoff() { 
    Thread.sleep((MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime)).toLong) // A bit crude, I know 
    } 

    def replace(k: A, f: B => B): Option[B] = { 
    m.get(k) match { 
     case None => return None 
     case Some(old) => 
     var retryCount = 0 
     while (retryCount <= MaxReplaceRetryCount) { 
      val done = m.replace(k, old, f(old)) 
      if (done) { 
      return Some(old) 
      } 
      else {   
      retryCount += 1 
      replaceBackoff() 
      } 
     } 
     sys.error("Could not concurrently modify map") 
    } 
    } 
} 

Nota che le questioni di collisione sono localizzati ad una determinata chiave. Se due thread accedono alla stessa mappa ma funzionano su chiavi distinte, non si avranno collisioni e l'operazione di sostituzione avrà sempre successo la prima volta. Se viene rilevata una collisione, attendiamo un po '(una quantità casuale di tempo, in modo da ridurre al minimo la probabilità che i thread combacino per sempre con la stessa chiave) e riprovare.

Non posso garantire che questo sia pronto per la produzione (l'ho appena lanciato in questo momento), ma quello potrebbe fare il trucco.

UPDATE: Naturalmente (come Ionuţ G. Stan ha sottolineato), se invece si è aumentare/diminuire un valore, java di ConcurrentHashMap prevede già thoses operazioni in un modo senza blocchi. La mia soluzione di cui sopra si applica se è necessario un metodo più generale replace che utilizzi la funzione di trasformazione come parametro.

+0

Ho notato nel codice della mappa che è passato a ThreadLocalRandom https://github.com/scala/scala/blob/master/src/library/scala/collection/concurrent/TrieMap.scala#L473 –

10

Che ne dici di questo? Supponendo che non hai davvero bisogno di un metodo generale replace adesso, solo un contatore.

import java.util.concurrent.ConcurrentHashMap 
import java.util.concurrent.atomic.AtomicInteger 

object CountedMap { 
    private val counts = new ConcurrentHashMap[String, AtomicInteger] 

    def add(key: String): Int = { 
    val zero = new AtomicInteger(0) 
    val value = Option(counts.putIfAbsent(key, zero)).getOrElse(zero) 
    value.incrementAndGet 
    } 
} 

Si ottengono prestazioni migliori rispetto alla sincronizzazione sull'intera mappa e si ottengono anche incrementi atomici.

+0

Grazie-Sono interessato al caso generale, ma è bello vedere che è così facile. –

+0

Questa è la soluzione giusta e sfrutta le librerie concorrenti Java ad altissime prestazioni. –

+1

Sono curioso di sapere se c'è un motivo per raggiungere ConcurrentHashMap anziché concurrent.TrieMap. Non ho un'opinione, solo che il forum è un annuncio per API. –

2

Stai chiedendo guai se la tua mappa è semplicemente lì seduta come una val.Se soddisfa le vostre caso d'uso, io consiglierei qualcosa come

class Counter { 
    private[this] myCounts = MMap.empty[String, Int].withDefaultValue(0) 
    def counts(s: String) = myCounts.synchronized { myCounts(s) } 
    def add(s: String) = myCounts.synchronized { myCounts(s) += 1 } 
    def getCounts = myCounts.synchronized { Map[String,Int]() ++ myCounts } 
} 

per l'uso a bassa contesa. Per la contesa elevata, dovresti utilizzare una mappa simultanea progettata per supportare tale utilizzo (ad esempio java.util.concurrent.ConcurrentHashMap) e racchiudere i valori in AtomicWhatever.

2

Se siete ok per lavorare con l'interfaccia basata futuro:

trait SingleThreadedExecutionContext { 
    val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) 
} 

class Counter extends SingleThreadedExecutionContext { 
    private val counts = MMap.empty[String, Int].withDefaultValue(0) 

    def get(s: String): Future[Int] = future(counts(s))(ec) 

    def add(s: String): Future[Unit] = future(counts(s) += 1)(ec) 
} 

test sarà simile:

class MutableMapSpec extends Specification { 

    "thread safe" in { 

    import ExecutionContext.Implicits.global 

    val c = new Counter 
    val testData = Seq.fill(16)("1") 
    await(Future.traverse(testData)(c.add)) 
    await(c.get("1")) mustEqual 16 
    } 
} 
+0

Questo thread non è affatto sicuro. Mentre si garantisce un singolo writer alla volta, è ancora possibile avere discussioni durante la modifica della mappa –

+0

Come ho capito, tutte le operazioni - leggere, scrivere, miste - che utilizzano ec come contesto saranno thread-safe. Ops al di fuori di quel contesto non saranno thread-safe. Sarò felice di sentire dagli altri se questa comprensione è corretta. –

+0

Ma il fatto è che la lettura viene eseguita direttamente: quando si accede a 'c.counts' non si utilizza affatto' ExecutionContext'. –