È necessario utilizzare sqlContext.cacheTable("table_name")
per memorizzarlo nella cache oppure utilizzare la query SQL CACHE TABLE table_name
.
Ecco un esempio. Ho questo file sul HDFS:
1|Alex|[email protected]
2|Paul|[email protected]
3|John|[email protected]
Quindi il codice in PySpark:
people = sc.textFile('hdfs://sparkdemo:8020/people.txt')
people_t = people.map(lambda x: x.split('|')).map(lambda x: Row(id=x[0], name=x[1], email=x[2]))
tbl = sqlContext.inferSchema(people_t)
tbl.registerTempTable('people')
Ora abbiamo un tavolo e si può interrogare:
sqlContext.sql('select * from people').collect()
a persistere esso, abbiamo 3 opzioni:
# 1st - using SQL
sqlContext.sql('CACHE TABLE people').collect()
# 2nd - using SQLContext
sqlContext.cacheTable('people')
sqlContext.sql('select count(*) from people').collect()
# 3rd - using Spark cache underlying RDD
tbl.cache()
sqlContext.sql('select count(*) from people').collect()
1a e 2a opzione a ri preferivano come sarebbero in cache i dati nel formato a colonne in memoria ottimizzata, mentre la terza sarebbe memorizzare nella cache proprio come qualsiasi altro RDD nella moda fila orientata
Quindi, tornando alla tua domanda, ecco una possibile soluzione:
output = sqlContext.sql("SELECT * From people")
output.registerTempTable('people2')
sqlContext.cacheTable('people2')
sqlContext.sql("SELECT count(*) From people2").collect()
"Spark SQL può memorizzare nella cache le tabelle utilizzando un formato colonnare in memoria chiamando spark.catalog.cacheTable (" tableName ") o dataFrame.cache()." (https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory). La documentazione sembra implicare che le opzioni 1a, 2a e 3a sono tutte equivalenti. – asmaier