2012-07-02 3 views

Mi piace generare più tuple da una singola tupla. Quello che voglio dire è: Ho un file con i seguenti dati in esso.Divisione di una tupla in più tuple nel maiale

>> cat data 
ID | ColumnName1:Value1 | ColumnName2:Value2 

quindi caricarlo con il seguente comando

grunt >> A = load '$data' using PigStorage('|');  
grunt >> dump A;  

Ora voglio dividere questo tupla in due tuple.

(ID, ColumnName1, Value1) 
(ID, ColumnName2, Value2) 

Posso utilizzare UDF insieme a foreach e generare. Qualcosa come il seguente?

grunt >> foreach A generate SOMEUDF(A) 


ingresso tupla: (ID1, column1, column2) uscita: due tuple (ID1, COLUMN1) e (ID2, column2) quindi è List o devo restituire un sacchetto?

public class SPLITTUPPLE extends EvalFunc <List<Tuple>> 
    public List<Tuple> exec(Tuple input) throws IOException { 
     if (input == null || input.size() == 0) 
      return null; 
      // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory. 
      // return list of tuples. 
     }catch(Exception e){ 
      throw WrappedIOException.wrap("Caught exception processing input row ", e); 

Questo approccio è corretto?



È possibile scrivere un UDF o utilizzare uno script PIG con funzioni integrate.

Ad esempio:

-- data should be chararray, PigStorage('|') return bytearray which will not work for this example 
inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 

-- split by | and create a row so we can dereference it later 
splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ; 

-- first column is id, rest is converted into a bag and flatten it to make rows 
id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value; 
-- there will be records with (id, id), but id should not have ':' 
id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals; 
final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val); 
dump final; 





spero che aiuta.



Grazie mille. Posso fare la stessa cosa scrivendo un UDF. Aggiorno la domanda – FourOfAKind


Sì, è possibile. Vedi la prossima risposta. – alexeipab


È un grande aiuto. Grazie per il tuo tempo. – FourOfAKind


Ecco la versione UDF. Io preferisco restituire un BORSA:

import java.io.IOException; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.FrontendException; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 

* Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
* {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...} 
* Default rows separator is '|' and key value separator is ':'. 
* In this implementation white spaces around separator characters are not removed. 
* ID can be made of any character (including sequence of white spaces). 
* @author 
public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> { 

    private static final TupleFactory tupleFactory = TupleFactory.getInstance(); 
    private static final BagFactory bagFactory = BagFactory.getInstance(); 

    //Row separator character. Default is '|'. 
    private String rowsSeparator; 
    //Column value separator character. Default i 
    private String columnValueSeparator; 

    public TupleToBagColumnValuePairs() { 
     this.rowsSeparator = "\\|"; 
     this.columnValueSeparator = ":"; 

    public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) { 
     this.rowsSeparator = rowsSeparator; 
     this.columnValueSeparator = keyValueSeparator; 

    * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray) 
    * @param outputBag Output tuples (id, column, value) are added to this bag 
    * @param id 
    * @param column 
    * @param value 
    * @throws ExecException 
    protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException { 
     Tuple outputTuple = tupleFactory.newTuple(); 

    * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value) 
    * @param outputBag Output tuples (id, column, value) should be added to this bag 
    * @param id 
    * @param splitInputLine format column{separator}value, which start from index 1 
    * @throws ExecException 
    protected void parseColumnValues(DataBag outputBag, String id, 
      String[] splitInputLine) throws ExecException { 
     for (int i = 1; i < splitInputLine.length; i++) { 
      if (splitInputLine[i] != null) { 
       int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator); 
       if (columnValueSplitIndex != -1) { 
        String column = splitInputLine[i].substring(0, columnValueSplitIndex); 
        String value = null; 
        if (columnValueSplitIndex + 1 < splitInputLine[i].length()) { 
         value = splitInputLine[i].substring(columnValueSplitIndex + 1); 
        this.addTuple(outputBag, id, column, value); 
       } else { 
        String column = splitInputLine[i]; 
        this.addTuple(outputBag, id, column, null); 

    * input - contains only one field of type chararray, which will be split by '|' 
    * All inputs that are: null or of length 0 are ignored. 
    public DataBag exec(Tuple input) throws IOException { 
     if (input == null || input.size() != 1 || input.isNull(0)) { 
      return null; 

     String inputLine = (String)input.get(0); 
     String[] splitInputLine = inputLine.split(this.rowsSeparator, -1); 

     if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) { 
      String id = splitInputLine[0]; 
      DataBag outputBag = bagFactory.newDefaultBag();    
      if (splitInputLine.length == 1) { // there is just an id in the line 
       this.addTuple(outputBag, id, null, null); 
      } else { 
       this.parseColumnValues(outputBag, id, splitInputLine); 

      return outputBag; 
     return null; 

    public Schema outputSchema(Schema input) { 
     try { 
      if (input.size() != 1) { 
       throw new RuntimeException("Expected input to have only one field"); 

      Schema.FieldSchema inputFieldSchema = input.getField(0); 
      if (inputFieldSchema.type != DataType.CHARARRAY) { 
       throw new RuntimeException("Expected a CHARARRAY as input"); 

      Schema tupleSchema = new Schema(); 
      tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY)); 

      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG)); 
     } catch (FrontendException exx) { 
      throw new RuntimeException(exx); 


Ecco come è utilizzato in PIG:

register 'path to the jar'; 
define IdColumnValue myPackage.TupleToBagColumnValuePairs(); 

inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 
result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2); 
dump result; 

una buona ispirazione per la scrittura UDF con borse vedono DataFu source code by LinkedIn


si potrebbe usare TransposeTupleToBag (UDF da DataFu lib) sull'output di STRSPLIT per ottenere la borsa, quindi FLATTEN per creare una riga separata per colonna originale.