Tuesday, December 31, 2013

Haoop - Some basic administration

Check HDFS file system integrity:
A file in HDFS can become corrupt if all copies of one or more blocks are unavailable, this will leave a hole in the file of up tp the block size of the file. If the file or block is corrupted, any attempt to read a file would result in a failure. Hadoop admins should check HDFS integrity time to time. By default, the "hadoop fsck" tool generates a summary report that lists the overall health of the filesystem. HDFS is considered healthy if—and only if—all files have a minimum number of replicas available.

$ hdfs fsck /
Connecting to namenode via http://rtlnn1.retailigence.com:50070
FSCK started by tony (auth:SIMPLE) from /10.6.70.1 for path / at Mon Dec 30 15:03:29 EST 2013
....................................................................................................
...
...
....................................................................................................
Total size:>577533796439 B (Total open files size: 96827207 B)
 Total dirs:3696
 Total files:31082 (Files currently being written: 16)
 Total blocks (validated):27626 (avg. block size 20905444 B) (Total open file blocks (not validated): 10)
 Minimally replicated blocks:27626 (100.0 %)
 Over-replicated blocks:0 (0.0 %)
 Under-replicated blocks:0 (0.0 %)
 Mis-replicated blocks:0 (0.0 %)
 Default replication factor:3
 Average block replication:2.9951134
 Corrupt blocks:0
 Missing replicas:0 (0.0 %)
 Number of data-nodes:6
 Number of racks:1
FSCK ended at Mon Dec 30 15:03:30 EST 2013 in 1013 milliseconds

The filesystem under path '/' is HEALTHY

"Hdfs fsck" report the effective HDFS storage space used, for example, they show the "normal" file size (just like what you see on the local file system) and it doesn't count for replication in HDFS. In the above example, root directory has stored 577533796439 bytes (0.5TB). "hdfs fsck" also tells us that the default replication factor is 3. This means that the total raw HDFS storage space used by these files in "/" is actually
3 x 577533796439 bytes = 1747386582249 bytes (1.58TB)

This 1.58TB is the total storage consumed by files in "/"

The result is same as "hadoop fs -du -s /":
$ hadoop fs -du -s /
582685550139  /

and also the "hadoop fs -du -s /" command

It is also possible to check only subtrees of the filesystem.

The -files, -blocks, and -locations options can be used to figure out exactly which files are affected by missing blocks, as well as which blocks fall on which datanodes.

$ hdfs fsck / -files -blocks -locations
/var/run/cloudera-scm-agent/process/4509-hue-HUE_SERVER/hql/conf/job_jp_request_log.properties 760 bytes, 1 block(s):  OK
0. BP-217549046-198.72.102.208-1367506950143:blk_-4962627322737477711_520143 len=760 repl=3 [10.6.70.35:50010, 10.6.70.30:50010, 10.6.70.33:50010]
/var/run/cloudera-scm-agent/process/4509-hue-HUE_SERVER/hql/conf/job_jp_response_log.properties 761 bytes, 1 block(s):  OK
0. BP-217549046-198.72.102.208-1367506950143:blk_4654615167774951055_520145 len=761 repl=3 [10.6.70.31:50010, 10.6.70.30:50010, 10.6.70.33:50010]
...
...

Status: HEALTHY
 Total size:577533796439 B (Total open files size: 96827207 B)
 Total dirs:3697
 Total files:31082 (Files currently being written: 19)
 Total blocks (validated):27626 (avg. block size 20905444 B) (Total open file blocks (not validated): 16)
 Minimally replicated blocks:27626 (100.0 %)
 Over-replicated blocks:0 (0.0 %)
 Under-replicated blocks:0 (0.0 %)
 Mis-replicated blocks:0 (0.0 %)
 Default replication factor:3
 Average block replication:2.9951134
 Corrupt blocks:0
 Missing replicas:<0 (0.0 %)
 Number of data-nodes:6
 Number of racks:1
FSCK ended at Mon Dec 30 15:12:19 EST 2013 in 13591 milliseconds

The filesystem under path '/' is HEALTHY

Balancing HDFS Block Data:

One of the most common ways HDFS file system become unbalanced is when some nodes of the cluster have colocated clients. A colocated client means an HDFS client that is running on a node that is also a member of HDFS cluster. The reason this situation is important has to do with the intended use case of HDFS: MAP-Reduce. In MapReduce, reducers write data to HDFS in almost all cases. Further, it’s true that local disk IO is greater than writing to a disk across the network. It is for this reason that the namenode will assign the local machine as the destination for the first replica when an HDFS client is running on a datanode. Also, when adding a new node, HDFS tries to write all the new blocks to the new datanode. So HDFS cluster needs constantly to be rebalanced.

To balance the HDFS file system:
1. Become the HDFS superuser or a user with equivalent privileges (or use sudo -u username when executing commands).
2. Execute hadoop balancer -threshold N to run the balancer in the foreground, where N is the percentage of blocks within which datanodes should be with one another. You can stop the process by pressing CTRL+C or execute "kill PID" from another terminal.
3. Monitor the output (or log file, if you choose to run the balancer in the background) to track progress.

For example:
$ hdfs balancer -threshold 25
Time Stamp               Iteration#  Bytes Already Moved  Bytes Left To Move  Bytes Being Moved
Aug 26, 2013 9:29:16 AM           0                 0 KB             5.82 TB              50 GB
Aug 26, 2013 9:46:40 AM           1             49.88 GB             5.76 TB              50 GB
Aug 26, 2013 10:04:27 AM          2             99.67 GB             5.72 TB              50 GB
Aug 26, 2013 10:22:36 AM          3            149.62 GB             5.69 TB              50 GB
...
Aug 27, 2013 3:24:54 AM          61              2.07 TB             4.08 TB              30 GB
Aug 27, 2013 3:42:32 AM          62               2.1 TB             4.05 TB              30 GB
Aug 27, 2013 4:00:19 AM          63              2.13 TB             4.02 TB              30 GB
Aug 27, 2013 4:18:15 AM          64              2.16 TB             3.95 TB              30 GB
...

Note: balancer needs to be run regularly to keep constant performances of HDFS.


Failed disk?
With a large number of machines that each have many disks, it’s usual for disks to fail. But don't worry, both HDFS and MapReduce are built to tolerate this kind of failure. Disk failures in worker nodes are usually much simpler to handle than those that occur on master nodes.

Technically Hadoop doesn’t detect bad disks. Instead, it checks specific attributes of special directories such as the datanode block directories (dfs.data.dir) and MapReduce scratch space (mapred.local.dir). A path is said to be healthy and available if and only if the following are all true:
1. The specified path is a directory.
2. The directory exists.
3. The directory is readable.
4. The directory is writable.

A path that doesn't meet all the conditions above is reported as failed. Any blocks that in the reported directory are assumed to be lost and are removed from the metadata. When this happens, the datanode detects one of these failures, it logs the condition and sends an updated block report to the namenode. The namenode then updates the replication count for the affected blocks and creates new replicas of the now under-replicated blocks.

The datanode will shut down if more disks than dfs.datanode.failed.volumes.tolerated fail. By default, this parameter is set to zero, which means that a single disk failure results in the entire datanode failing. When there is a disk failing, follow the steps:
1. Stop any Hadoop-related processes (optionally following the decommissioning process for the datanode).
2. Replace any failed disks.
3. Follow the process for adding the node back into the cluster.
4. Run the Hadoop fsck utility to validate the health of HDFS. Over-replicated blocks are normal immediately after a node is reintroduced to the cluster, which is automatically corrected over time.


How to kill a MapReduce Job:
Sometimes you need to kill a MapReduce job running in the cluster. Killing a MapReduce job is akin to terminating a SQL query in a relational database; the job, including any outstanding tasks, is abandoned, and the client that initiated it is notified that it has failed to complete. Any temporary map output is discarded. To kill the job:
1. Become the HDFS superuser or a user with equivalent privileges (or use sudo -u username when executing commands).
2. Execute "hadoop job -list" or use the jobtracker web user interface to find the job ID of the job you wish to terminate.
3. Execute "hadoop job -kill jobID" to terminate the job.
4. Confirm that the job is terminated using hadoop job -list or by checking the jobtracker web user interface.

How to kill a MapReduce Task:
Sometimes a MapReduce task could be misbehaving, if this happens, you need to kill the specific MapReduce task. By killing the task, we can force the jobtracker to reattempt the work elsewhere. Unfortunately, it’s not guaranteed that the jobtracker will select another worker; some- times the task will be immediately reassigned to the same machine. It does, however, force the task to reenter the scheduler queue, possibly delaying execution of the task and reducing temporary contention for system resources, for instance.

To Kill a task:
1. Become the HDFS superuser, a user with equivalent privileges, or the owner of the MapReduce job (or use sudo -u username when executing commands).
2. Locate the task attempt you wish to kill using hadoop job -list-attempt-ids jobID taskType taskState, where jobID is the job ID of the job that contains the task attempt, taskType is the type of task (such as map, reduce), and taskState is the current state of the task (such as running, completed). Alternatively, the jobtracker web user interface can be used to locate the task attempt ID.
3. Execute hadoop job -kill-task taskAttemptId to kill the task.


Dealing with a Blacklisted Tasktracker:
Sometime a datanode in a Hadoop cluster could be misbehaving, for example, keep failing on one task, keep running out of space, ...etc. In this case, to protect cluster from misbehaving hosts, MapReduce can temporarily blacklist servers, removing them from the available pool. The heuristic for blacklisting a tasktracker is simple but effective. Any tasktracker with three or more failed tasks from a single job is ineligible to receive any further tasks for that job. Tasktrackers may be blacklisted at the job level from time to time, usually due to poorly written MapReduce code in which tasks rapidly fail due to an error in logic.

Sometimes, the failures are persistent over multiple MapReduce jobs. If tasks contine to fail on the same tasktracker, the jobtrcker will add the host to a global blacklist and it will not receive any work for 24 hours. Currently, there is no graceful way to administratively remove a machine from the global blacklist. The jobtracker retains this information in memory. A less than polite way of forcing the jobtracker to forgive a host is to restart the jobtracker.

No comments: