R and Hadoop Integrated Processing Environment

R and h adoop i ntegrated p rocessing e nvironment l.jpg
1 / 40
974 days ago, 422 views
PowerPoint PPT Presentation
R and H adoop I ntegrated P rocessing E nvironment. Utilizing RHIPE for Information Administration. R and Extensive Information. .Rdata configuration is poor for huge/numerous items connect stacks all variables in memory No metadata Interfaces to vast information groups HDF5, NetCDF.

Presentation Transcript

Slide 1

R and H adoop I ntegrated P rocessing E nvironment Using RHIPE for Data Management

Slide 2

R and Large Data .Rdata configuration is poor for substantial/many items connect stacks all factors in memory No metadata Interfaces to vast information positions HDF5, NetCDF To register with huge information we require all around composed stockpiling designs

Slide 3

R and HPC Plenty of choices On a solitary PC: snow, rmpi, multicore Across a bunch: snow, rmpi, rsge Data must be in memory, appropriates calculation crosswise over hubs Needs isolate framework for adjusting and recuperation Computation not mindful of the area of the information

Slide 4

Computing With Data Scenario: Data can be partitioned into subsets Compute crosswise over subsets Produce reactions (shows) for subsets Combine comes about insufficient to store documents over a circulated document framework (NFS, LustreFS, GFS and so forth) The figure condition must consider the cost of system get to

Slide 5

Using Hadoop DFS to Store Open source usage of Google FS Distributed record framework crosswise over PCs Files are separated into pieces, duplicated and put away over the group Clients require not know about the striping Targets compose once ,read numerous – high throughput peruses

Slide 6

customer File Namenode Blocks Block 1 Block 2 Block 3 Replication Datanode 1 Datanode 2 Datanode 3

Slide 7

Mapreduce One way to deal with programming with expansive information Powerful tapply tapply(x, fac, g) Apply g to columns of x which compare to special levels of fac Can do considerably more, chips away at gigabytes of information and crosswise over PCs

Slide 8

Mapreduce in R If R might, it be able to would: Map: imd <- lapply(input,function(j) list(key=K1(j), value=V1(j))) keys <- lapply(imd,"[[",1) values <- lapply(imd, "[[",2) Reduce: tapply(values,keys, function(k,v) list(key=K1(k,v), value=V1(v,k)))

Slide 9

File Divide into Records of K V sets Divide into Records of K V sets Divide into Records of K V sets For each record, return key, esteem For each record, return key, esteem For each record, return key, esteem Map Sort Shuffle For each KEY diminish  K,V For each KEY lessen  K,V For each KEY decrease  K,V Reduce Write K,V to plate Write K,V to circle Write K,V to plate

Slide 10

R and Hadoop Manipulate extensive informational indexes utilizing Mapreduce in the R dialect Though not local Java, still moderately quick Can compose and spare an assortment of R articles Atomic vectors,lists and characteristics … information outlines, variables and so forth

Slide 11

Everything is a key-esteem match Keys require not be interesting Block Run client setup R expression For key-esteem matches in square: run client R delineate Each square is an errand Tasks are keep running in parallel (# is configurable) Reducer Run client setup R expression For each key: while new esteem exists: get new esteem do something Each reducer emphasizes through keys Reducers keep running in parallel

Slide 12

Airline Data Flight data of each flight for a long time ~ 12 Gb of information, 120MN lines 1987,10,29,4,1644,1558,1833,1750,PS,1892,NA,109,112,NA,43,46,SEA,..

Slide 13

Save Airline as R Data Frames 1 . Some setup code, run once every square of e.g. 128MB (Hadoop piece measure) setup <- expression({ convertHHMM <- function(s){ t(sapply(s,function(r){ l=nchar(r) if(l==4) c(substr(r,1,2),substr(r,3,4)) else if(l==3) c(substr(r,1,1),substr(r,2,3)) else c('0','0') }) )})

Slide 14

Save Airline as R Data Frames 2 . Perused lines and store N pushes as information casings delineate;- expression({ y <- do.call("rbind",lapply(map.values,function(r){ if(substr(r,1,4)!='Year') strsplit(r,",")[[1]] })) mu <- rep(1,nrow(y)) yr <- y[,1]; mn=y[,2];dy=y[,3] hr <- convertHHMM(y[,5]) withdraw <- ISOdatetime(year=yr,month=mn,day=dy,hour=hr[,1],min=hr[,2],sec=mu) .... .... Cont'd

Slide 15

Save Airline as R Data Frames 2 . Perused lines and store N pushes as information edges delineate;- expression({ .... From past page .... d <- data.frame(depart= depart,sdepart = sdepart ,arrive = arrive,sarrive =sarrive ,transporter = y[,9],origin = y[,17] ,dest=y[,18],dist = y[,19] ,cancelled=y[,22], stringsAsFactors=FALSE) rhcollect(map.keys[[1]],d) }) Key is superfluous for us Cont'd

Slide 16

Save Airline as R Data Frames 3 . Run z <- rhmr(map=map,setup=setup,inout=c("text","sequence") ,ifolder="/air/",ofolder="/airline") rhex(z)

Slide 17

Quantile Plot of Delay 120MN postpone times Display 1K quantiles For discrete information, very conceivable to ascertain correct quantiles Frequency table of particular defer values Sort on postpone esteem and get quantile

Slide 18

Quantile Plot of Delay guide <- expression({ r <- do.call("rbind",map.values) delay <- as.vector(r[,'arrive'])- as.vector(r[,'sarrive']) delay <- delay[delay >= 0] unq <- table(delay) for(n in names(unq)) rhcollect(as.numeric(n),unq[n]) }) decrease <- expression( pre = { summ <- 0 }, lessen = { summ <- sum(summ,unlist(reduce.values)) }, post = { rhcollect(reduce.key,summ) } )

Slide 19

Quantile Plot of Delay Run z=rhmr(map=map, reduce=reduce,ifolder="/aircraft/",ofolder='/tmp/f' ,inout=c('sequence','sequence'),combiner=TRUE ,mapred=list(rhipe_map_buff_size=5)) rhex(z) Read in results and spare as information casing res <- rhread("/tmp/f",doloc=FALSE) tb <- data.frame(delay=unlist(lapply(res,"[[",1)) ,freq = unlist(lapply(res,"[[",2)))

Slide 21

Conditioning Can make the boards, yet need to fasten them together Small change … delineate;- expression({ r <- do.call("rbind",map.values) r$delay <- as.vector(r[,'arrive'])- as.vector(r[,'sarrive']) r-r[r$delay>=0,,drop=FALSE] r$cond <- r[,'dest'] mu <- split(r$delay, r$cond) for(dst in names(mu) ){ unq <- table(mu[[dst]]) for(n in names(unq)) rhcollect( list(dst,as.numeric(n) ),unq[n]) })

Slide 22

Conditioning After perusing in the information (rundown of records) rundown( list("ABE",7980),15) We can get a table, prepared for show dest delay freq 1 ABE 7980 15 2 ABE 61800 4 3 ABE 35280 5 4 ABE 56160 1

Slide 23

Running a FF Design Have a calculation to identify keystrokes in a SSH TCP/IP stream Accepts 8 tuning parameters, what are the ideal qualities? Every parameter has 3 levels, build a 3^(8-3) FF plan which traverses configuration space 243 trials, every trial an utilization of calculation to 1817 associations (for a given arrangement of parameters)

Slide 24

Running a FF Design 1809 associations in 94MB 439,587 calculation applications Approaches Each association run 243 circumstances? (1809 in parallel) Slow, running time is intensely skewed Better: lump 439,587

Slide 25

Chunk == 1, send information to reducers m2 <- expression({ lapply(seq_along(map.keys),function(r){ key <- map.keys[[r]] esteem <- map.values[[r]] apply(para3.r,1,function(j){ rhcollect(list(k=key,p=j), esteem) }) }) }) map.values is a rundown of association information map.keys are association identifiers para3.r is rundown of 243 parameter sets

Slide 26

Reduce: apply calculation r2 <- expression( reduce={ esteem <- reduce.values[[1]]; params <- as.list(reduce.key$p) tt=system.time(v <- ks.detect(value,debug=F,params=params ,dorules=FALSE)) rhcounter('param','_all_',1) rhcollect(unlist(params) ,list(hash=reduce.key$k,numks=v$numks, time=tt)) }) rhcounter refreshes "counters" obvious on Jobtracker site and came back to R as a rundown

Slide 27

FF Design … cont'd Sequential running time: 80 days Across 72 centers: ~32 hrs Across 320 cores(EC2 group, 80 c1.medium occasions): 6.5 hrs ($100) A more quick witted piece size would enhance execution

Slide 28

FF Design … cont'd Catch : Map changes 95MB into 3.5GB! (37X). Soln : Use Fair Scheduler and submit (rhex) 243 separate MapReduce employments. Each is only a guide Upon finishing: One more MapReduce to consolidate the outcomes. Will use all centers and save money on information exchange Problem : RHIPE can dispatch MapReduce occupations nonconcurrently, however can't wait on their culmination

Slide 29

Large Data Now we have 1.2MN associations crosswise over 140GB of information Stored as ~1.4MN R information outlines Each association as numerous information casings of 10K bundles Apply calculation to every association m2 <- expression({ params <- unserialize(charToRaw(Sys.getenv("myparams"))) lapply(seq_along(map.keys),function(r){ key <- map.keys[[r]] esteem <- map.values[[r]] v=ks.detect(value,debug=F,params=params,dorules=FALSE) … .

Slide 30

Large Data Can't matter calculation to enormous associations – takes everlastingly to stack in memory For each of 1.2 MN associations, spare 1 st (time) 1500 bundles Use a combiner – this runs the lessen code on the guide machine saving money on system exchange and the information required in memory

Slide 31

Large Data lapply(seq_along(map.values), function(r) { v <- map.values[[r]] k <- map.keys[[r]] first1500 <- v[order(v$timeOfPacket)[1:min(nrow(v), 1500)],] rhcollect(k[1], first1500) }) r <- expression( pre={ first1500 <- NULL }, reduce={ first1500 <- rbind(first1500, do.call(rbind, reduce.values)) first1500 <- first1500[order(first1500$timeOfPacket)[1:min(nrow(first1500), 1500)],] }, post={ rhcollect(reduce.key, first1500) } ) min(x,y,z) = min(x,min(y,z))

Slide 32

Large Data Using tcpdump, Python, R and RHIPE to gather organize information Data accumulation in moving 5 day windows (tcpdump) Convert pcap documents to content, store on HDFS (Python/C) Convert to R information outlines (RHIPE) Summarize and store first