architecture has the power to inspire confidence
“As a student in the 1960s (Ito) followed Modernists like Kenzo Tange as they rebuilt the country’s cultural confidence after the devastation of World War I”
Nicolai Ouroussoff, from his article on Toyo Ito, “Inside His Exteriors”
genius!
“The concept of starling is to have a single server handle reliable, ordered message queues. When you put a cluster of these servers together, with no cross communication, and pick a server at random whenever you do a set or get, you end up with a reliable, loosely ordered message queue.
“In many situations, loose ordering is sufficient. Dropping the requirement on cross communication makes it horizontally scale to infinity and beyond: no multicast, no clustering, no “elections”, no coordination at all. No talking! Shhh”
Robey Pointer
hadoop-scale
“Hundreds of gigabytes of data constitute the low end of Hadoop-scale. Actually Hadoop is built to process “web-scale” data on the order of hundreds of gigabytes to terabytes or petabytes. At this scale, it is likely that the input data set will not even fit on a single computer’s hard drive, much less in memory. So Hadoop includes a distributed file system which breaks up input data and sends fractions of the original data to several machines in your cluster to hold.”
notes from cloudera basic training > installing the cloudera hadoop distro locally and on ec2
ref: http://www.cloudera.com/hadoop
installing cloudera distro on a cluster
- motivation: hadoop is complocated to install
- cloudera uses Alternatives to manage a|b testing
- cloudera has created a “configurator” that will generate an rpm customized to your cluster
– generates the configuration files and a custom installer. Each can optionally be used together or separately
- alternatively, you can install an unconfigured distro
- for large scale deployment, use puppet, bcfg2, cfengine, etc. to manage the cluster
– cloudera’s tool can still be used to generate config scripts
- storing data in ebs takes advantage of locality and is much faster than s3
– ebs is more performant than normal hard drives
notes from cloudera basic training > hadoop mapreduce deep dive
//we moved quickly through this, so the notes are sparse
- job
– a full program
- task
– by default, hadoop creates the same amount of tasks as there are input blocks
– task attempts
— tasks are attempted at least once
— multiple attempts in parellel are performed w/ speculative execution turned on
- tasktracker
– forks jvm process for each task
- job distribution
– mapreduce programs = jar + xml config
– running a job puts jar and xml in hdfs
- data distribution
– data locality decreases when multiple tasks are running
- mapreduce flow
– client creates joconf
— identify map and reducer classes
— specify inputs/outputs
— set optional settings
– job launches jobclient
— runjob blcks until the job completes
— submitjob is non-blocking
– …
– tasttracker
— perioducally query jobtracker for work
– …
– write for cache coherency (re-use objects in loops(?))
— reusing memory locations => 2x speed-up
— all k/v pairs given by hadoop use this model
//is avro comparable to thrift?
- getting data to mapper
– data sets are specified
– input sets contain at least 1 record and are composed of full blocks
- file input format
– most people use SequenceFileInputFormat
– usually we store all our data in hdfs and then ignore what we don’t need, rather than spending time formatting the data when it’s input
– …
- shuffling
– what happens btwn map and reduce
- write the output
– OutputFormat is analagous to InputFormat
cloudera basic training > the hadoop ecosystem
ref: http://www.cloudera.com/hadoop-training-ecosystem-tour
- google origins
– mapreduce -> hadoop mapreduce
– gfs -> hdfs
– sawzall -> hive,pig (log data wherehouses)
– bigtable -> hbase
– chubby -> zookeeper (distributed block store)
- pig
– “tables” are directories in hadoop
- hive
– uses subset of sql instead of pig latin
– not good for serving realtime queries
– jdbc interface for hive exists
– pig and hive exercises on cloudera vm
– features for analyzing very large data sets
- hbase
– column-store database based on bigtable
– holds extremely large datasets
– still very young relative to hadoop
– uses hdfs
– fast single-element access
– only supports single-row transactions
– transactions block reads
– all data stored in memory. updates are written as logs to hdfs. limited because hadoop doesn’t have append (yet)
– each row is input to mapreduce
- zookeeper
– uses paxos(?) algorithm
– a distributed consensus engine
– zookeeper may be the method for creating a high-availability namenode
- fuse-dfs
– lets you mount hdfs via linux fuse
– not an alternative file server
– good for easy access to cluster
- hypertable
– competitor to hbase
– used by bidu (chinese search engine)
- kosmosfs
- sqoop
- chukwa
– hadoop log aggregation
- scribe
– general log aggregation
- mahout
– machine learning library
- cassandra
– column store database on a p2p backend
- dumbo
– python library for streaming
cloudera training > mapreduce and hdfs > hdfs
ref: http://www.cloudera.com/hadoop-training-mapreduce-hdfs
- redundant storage of massive amounts of data on unreliable computers
- advantages over existing file system:
– handles much bigger data sets
– different workload and design priorities
- it’s conceptually comparable (very roughly) to zip file structure
- assumptions
– high component failure rate
– modest number (~1000) of huge (100mb) files
– files are write-once and then appended to
– large streaming reads, instead of seeks
— disks are really good at streaming, but bad at seeking
– high sustained throughput > low latency
- design decisions
– files stored as blocks
— block replication is asynch (this is why there is no updates)
– reliability through replication
– single master (namenode)
— a simple architecture, but also a single point of failure
– no data caching
– data nodes periodically heartbeat w/ the namenode
– creating a file flow: start transaction > define metadata > end transaction
– intermediate files are written locally to mapper, and then reducers fetch that data
- based on gfs architecture
– all data fetched over http
- metadata
– single namenode stores all metadata in memory
– two data structures on disk
— a snapshot of metadata
— a log of changes since snapshot
– the “secondary namenode”, which has a terrible name (should be something like “namenode helper”), updates the snapshot and informs namenode of new snapshot
– namenode snapshot should be written to an nfs-mounted location, so if the namenode fails, the snapshot will survive
— google has optimized linux kernel for gfs, but cloudera just uses x3(?), and others use xfs
– datanodes store opaque file contents in “block” objects on underlying local filesystem
- conclusions
– tolerates failure
– interface is customized for the job, but familiar to developers
– reliably stores terabytes and petabytes of data
cloudera training > MapReduce and HDFS > map-reduce overview
ref: http://www.cloudera.com/sites/default/files/2-MapReduceAndHDFS.pdf
- borrows from functional programming: map, reduce
- provides an interface for map/reduce; we must implement the interface
- map
– the mapper can emit an arbitrary pair, not necessarily the input key/val
– the mapper runs simultaneously on multiple machines; the first to complete is used
– each map runs in its own jvm
– each run in parallel
– input is usualy 64MB – 128MB chunks (results in more streaming)
- reduce
– the number of reduces that run corresponds to the number of output files
– ideally, we want 1 reduce
– run in paralllel
- flow
– data store of k/v pairs > map > barrier (shuffle phase) > reduce > result
- chained map-reduce jobs are common
- all values are processed independently
- bottleneck: now reduce can run until all maps are finished
- combiner
– runs immediately after mapper on map node
– can use reducer function if reducer is commutative and associative
- conclusions
– mapreduce is a useful abstraction
– simplifies large scale comp
– lets the programmer focus on the problem and the library handle the details of distribution
hadoop summit 09 > applications track > lightning talks
emi
- hadoop is for performance, not speed
- use activerecord or hibernate for rapid, iterative web dev
- few businesses write map reduce jobs –> use cascading instead
- emi is a ruby shop
- I2P
– feed + pipe script + processing node
– written in a ruby dsl
– can run on a single node or in a cluster
– all data is pushed into S3, which is great cause it’s super cheap
– stack: aws > ec2 + s3 > conductor + processing node + processing center > spring + hadoop > admin + cascading > ruby-based dsl > zookeeper > jms > rest
– deployment via chef
– simple ui (built by engineers, no designer involved)
- cascading supports dsls
- “i helpig ciomputers learn languages
- higher accuracy can be achieved using a dependency syntax tree, but this is expensive to produce
- the expectation-maximum algorithm is a cheaper alternative
- easy to parallelize, but not a natural fit for map-reduce
– map-reduce overhead can become a bottleneck
- 15x speed-up using hadoop on 50 processors
- allowing 5% of data to be dropped results in a 22x speed-up w/ no loss in accuracy
- a more complex algorithm, not more data, resulted in better accuracy
- bayesian estimation w/ bilingual pairs, a more complex algo, with 8000 only sentences results in 62% accuracy (after a week of calculation!)
hadoop summit 09 > administration track > chip multi-threading (CMT)
ref: http://developer.yahoo.com/events/hadoopsummit09/
chip multi-threading (CMT)
- a high-throughput architecture targeted at web-serving, oracle, etc
- a “system-on-a-chip”
- 1/2 Terabyte of mem
logical domains
- a hardware-based virtualization supported by Sun hardware
zone
- software-based virtualization feature in solaris




