-
Notifications
You must be signed in to change notification settings - Fork 0
Manipulating ADAM format with Spark
David Lauzon edited this page Apr 14, 2015
·
2 revisions
Start the spark-shell and enter:
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
scala> val adamFile = sqlContext.parquetFile("/adamcloud/SRR062634.adam")
or if using file from HDFS:
scala> val adamFile = sqlContext.parquetFile("hdfs://hdfs-namenode:9000/SRR062634b.adam")
scala> parquetFile.count()
res11: Long = 308846
scala> adamFile.printSchema()
root
|-- contig: struct (nullable = true)
| |-- contigName: string (nullable = true)
| |-- contigLength: long (nullable = true)
| |-- contigMD5: string (nullable = true)
| |-- referenceURL: string (nullable = true)
| |-- assembly: string (nullable = true)
| |-- species: string (nullable = true)
|-- start: long (nullable = true)
|-- oldPosition: long (nullable = true)
|-- end: long (nullable = true)
|-- mapq: integer (nullable = true)
|-- readName: string (nullable = true)
|-- sequence: string (nullable = true)
|-- qual: string (nullable = true)
|-- cigar: string (nullable = true)
|-- oldCigar: string (nullable = true)
|-- basesTrimmedFromStart: integer (nullable = true)
|-- basesTrimmedFromEnd: integer (nullable = true)
|-- readPaired: boolean (nullable = true)
|-- properPair: boolean (nullable = true)
|-- readMapped: boolean (nullable = true)
|-- mateMapped: boolean (nullable = true)
|-- firstOfPair: boolean (nullable = true)
|-- secondOfPair: boolean (nullable = true)
|-- failedVendorQualityChecks: boolean (nullable = true)
|-- duplicateRead: boolean (nullable = true)
|-- readNegativeStrand: boolean (nullable = true)
|-- mateNegativeStrand: boolean (nullable = true)
|-- primaryAlignment: boolean (nullable = true)
|-- secondaryAlignment: boolean (nullable = true)
|-- supplementaryAlignment: boolean (nullable = true)
|-- mismatchingPositions: string (nullable = true)
|-- origQual: string (nullable = true)
|-- attributes: string (nullable = true)
|-- recordGroupName: string (nullable = true)
|-- recordGroupSequencingCenter: string (nullable = true)
|-- recordGroupDescription: string (nullable = true)
|-- recordGroupRunDateEpoch: long (nullable = true)
|-- recordGroupFlowOrder: string (nullable = true)
|-- recordGroupKeySequence: string (nullable = true)
|-- recordGroupLibrary: string (nullable = true)
|-- recordGroupPredictedMedianInsertSize: integer (nullable = true)
|-- recordGroupPlatform: string (nullable = true)
|-- recordGroupPlatformUnit: string (nullable = true)
|-- recordGroupSample: string (nullable = true)
|-- mateAlignmentStart: long (nullable = true)
|-- mateAlignmentEnd: long (nullable = true)
|-- mateContig: struct (nullable = true)
| |-- contigName: string (nullable = true)
| |-- contigLength: long (nullable = true)
| |-- contigMD5: string (nullable = true)
| |-- referenceURL: string (nullable = true)
| |-- assembly: string (nullable = true)
| |-- species: string (nullable = true)
First, register the RDD dataset as SparkSQL table:
scala> adamFile.registerAsTable("adamFile")
scala> val countResult = sqlContext.sql("SELECT COUNT(*) FROM adamFile").collect();
countResult: Array[org.apache.spark.sql.Row] = Array([308846])
scala> countResult.head.getLong(0)
res55: Long = 308846
scala> val row = sqlContext.sql("SELECT * FROM adamFile LIMIT 1").collect()
row: Array[org.apache.spark.sql.Row] = Array([null,null,null,null,null,SRR062634.1810878,GACCTTGGGGCAATTGAGTAGAGGCTGCACATAACATCCATCGCTCAGTCAGGAGCCCAGAGCATAGGCTCGGGAAGGTCTCCCAGCTTGGAGGTGCTCA,GFGGGGGGGGGGGGGEGGFGGGGGGGGFGGGGFGGGGEFGGGGGGGGFEFEGGFGGGGEEGEGBEEEG=EGDED=C?BACACEC=?5???@:??(<868,,*,null,0,0,false,false,false,false,false,false,false,false,false,false,false,false,false,null,null,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ,null,null,null,null,null,lb,null,Illumina,pu,sm,null,null,null])
scala> // Read the 6th field
scala> row.getString(5)
<console>:17: error: value getString is not a member of Array[org.apache.spark.sql.Row]
row.getString(5)
^
scala> // The row variable is in fact an array.
scala> // You can access the first item of the row via row(0)
scala> row(0).getString(5)
res54: String = SRR062634.1810878
The first command creates an RDD for our query and stores it in the firstTenRDD variable. Note that, the actual data is not read until the second command.
The second command tells Sparks to run our SQL query, aggregate the results, then run the function println on each of them.
scala> val firstTenRDD = sqlContext.sql("SELECT readName, sequence, qual, attributes, recordGroupName FROM adamFile LIMIT 10")
firstTenRDD: org.apache.spark.sql.SchemaRDD =
SchemaRDD[72] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Limit 10
ParquetTableScan [readName#5,sequence#6,qual#7,attributes#27,recordGroupName#28], (ParquetRelation hdfs://hdfs-namenode:9000/SRR062634b.adam, Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml), org.apache.spark.sql.SQLContext@21e0683e, []), []
scala> firstTenRDD.collect().foreach(println)
[SRR062634.1810878,GACCTTGGGGCAATTGAGTAGAGGCTGCACATAACATCCATCGCTCAGTCAGGAGCCCAGAGCATAGGCTCGGGAAGGTCTCCCAGCTTGGAGGTGCTCA,GFGGGGGGGGGGGGGEGGFGGGGGGGGFGGGGFGGGGEFGGGGGGGGFEFEGGFGGGGEEGEGBEEEG=EGDED=C?BACACEC=?5???@:??(<868,,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810879,GTGGCTCACGCCTGTATTCCCAGCACTTCGGGAGGCCAAGGTGGGCGGATCATAAGGTGAAGAGATCAAGACCATCCTGGCCAACATGGTGAAACCCCAT,CACABBCBCBE@EEEDCCDDEEDEBECCEEEGGEEEEDGGGEFEGGGGGGGGGGGGGGGGGGGGGFGGGGGGGGGGGGGGGGEGGGGGGGGGGGEDFGGG,PU:Z:pu SM:Z:sm NM:i:1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810880,TGATAATGTGTTTTATAACTTTTTTTTTTTTTTGCGTTCTAGTATGGGATCCAGCCTAGGATTAGGTTTAGTTGTCATGATTCTAGTATCCTTTAACCTG,FFEFFEEEAEGGGGEGGGFGGGGGGGGGGGGGDG=EEC=C@DA==CCC:?A?@BBCC5EC-BB-C6<?70:>?5<:5;>3A?,>+=:>>>3>??>;=6:>,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810881,TGTAAAATATATAATATATAACATATGTGACATGTAATTAATAACATAGTCATATAATATATATTTAATATAGGTATTATACATACAACGTATTATATAT,GEGGGGGGGGGGGGGGGGGGGGGGDGGGGGGGGGGFGGGGGGGGGGGGGEFGGGGGGGGGGGFGGGGGGGFGGGEGGFGGDFDGEGFGEGEEGEGFGDGF,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810882,GGTCTAGATTTGATTCTTAACTTTATTATTGGCTAGTTGTGTAAACCTGGGTAAGTTATGTAACCTTCTGGAGCTTATTAAATTTTTATTAAGCCATTAT,GGGGGGGGGGGGGGGGGGFGGGGGGGGGGGGGGEGEEGDBE??EEEFEFFC-@CAACCCBDEFFDDGDDDE=EEDDDFFAEDBFFFF9?C6B=?BABE:D,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810883,TGAAACAGAGGAGAGACCTCTGCCTGCTGTGTAGTGTTTTTTGAGGAGGTAGCAGAATGACTGTAAGGCTAGCTTAGTACATCTACACAGCCTTAAACTT,GGGGGGGGEGGGGGGGGGGGGGFGGGGGGFGEGGFGEGGGGGCDFEBEEAGGDEGGDGGEEGFEEDEF?EEFF@GEEEEEFEFEEFDFEE?CDCBCACCC,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810884,AAACATGAAGAAATGAGTGAATTTTAGGGAAATTTGCAAGTCGGCTGCAATCATGAACATTGTATCAAGCCAAGTAATGAAAGCCACTCCTGCCTGGGTG,GGGGEGGGGGFGGGGGGGGGGGGGGGGGGFGGGGGGGGFGGGGGDGFGFGDFGGFFFFGGGFEFGFGGFDECDGCDEGGGEGGEEEEFEEEEGFEEFEBE,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810885,GTTTGAACTCCCAATTATTGTAACTCCAAGCTATAATATTAAAAATAAACTGACCTTTAAGAACAAATTTTTGAACTAATGACCTATAGCAAAAGTCATG,GFGFGGGGGGGFGGGGGGFGGEGGGGGGFGFGGGGFGGGGGFGGGGGEGEGGEGEGGGFGGEFGDGEGGGGGFDDGDEDEGFFEGEGGGDDFEEBADBED,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810886,CATCATACTCCTTGATCATACTATAGTAGAATTACTCCTATGTCCACTCCTCCTTCCCACCACTCATGAACTCCAAAACTTAAGGCACTGTGCCTTCTTC,FGGGGGGGFGGGGDDGGFGGGGGGGGGGGGGGGGFGDGGGGGGEGFGGGFGGGGGGGGGDGGGGGGGEEEGGGFEGEFDEGGEEEGEDGGBGEEFEGGFG,PU:Z:pu SM:Z:sm NM:i:-1 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
[SRR062634.1810887,GAAGACAGACAAAATATATAGTTTTTAAGAAATTGATCTTTTGTTTCAAATGTAGGAAGGTCAGTTGTGGAATTTAGGTAAGGCAGCCCGTAGAGCATTT,GFGEEGGEFGGFGGFGGGGGEEFEDGGGEGGEEGEEEGEEEGGBEEBGGGEGFFGFGGGGGGGGGGGGGGFGGGGGGGGGGGGGGGGGGGGGGGGGGGGG,PU:Z:pu SM:Z:sm NM:i:9 PL:Z:Illumina RG:Z:FASTQ PG:Z:SNAP LB:Z:lb,FASTQ]
- Spark Quick Start: first steps with Spark.
- Spark Programming Guide: for understanding how Spark works and how to use the RDD model.
- Spark Scala Doc: useful to see the methods you can call from the spark shell (see methods for Row, ParquetTableScan, and SQLContext).
- SparkSQL Programming Guide