本文地址:,转载请注明源地址。
hadoop借鉴了Linux虚拟文件系统的概念,引入了hadoop抽象文件系统,并在此基础上,提供了大量的具体文件系统的实现,满足构建于hadoop上应用的各种数据访问需求
hadoop文件系统API
hadoop提供一个抽象的文件系统,HDFS只是这个抽象文件系统的一个具体的实现。hadoop文件系统的抽象类org.apache.hadoop.fs.FileSystem
hadoop抽象文件系统的方法可以分为两部分:
1、用于处理文件和目录的相关事务
2、用于读写文件数据
hadoop抽象文件系统的操作
Hadoop的FileSystem | Java操作 | Linux操作 | 描述 |
URL.openSteam FileSystem.open FileSystem.create FileSystem.append | URL.openStream | open | 打开一个文件 |
FSDataInputStream.read | InputSteam.read | read | 读取文件中的数据 |
FSDataOutputStream.write | OutputSteam.write | write | 向文件写入数据 |
FSDataInputStream.close FSDataOutputStream.close | InputSteam.close OutputSteam.close | close | 关闭一个文件 |
FSDataInputStream.seek | RandomAccessFile.seek | lseek | 改变文件读写位置 |
FileSystem.getFileStatus FileSystem.get* | File.get* | stat | 获取文件/目录的属性 |
FileSystem.set* | File.set* | Chmod等 | 改变文件的属性 |
FileSystem.createNewFile | File.createNewFile | create | 创建一个文件 |
FileSystem.delete | File.delete | remove | 从文件系统中删除一个文件 |
FileSystem.rename | File.renameTo | rename | 更改文件/目录名 |
FileSystem.mkdirs | File.mkdir | mkdir | 在给定目录下创建一个子目录 |
FileSystem.delete | File.delete | rmdir | 从一个目录中删除一个空的子目录 |
FileSystem.listStatus | File.list | readdir | 读取一个目录下的项目 |
FileSystem.getWorkingDirectory |
| getcwd/getwd | 返回当前工作目录 |
FileSystem.setWorkingDirectory |
| chdir | 更改当前工作目录 |
通过FileSystem.getFileStatus()方法,Hadoop抽象文件系统可以一次获得文件/目录的所有属性,这些属性被保存在类FileStatus中
public class FileStatus implements Writable, Comparable { private Path path; //文件路径 private long length; //文件长度 private boolean isdir; //是否是目录 private short block_replication; //副本数(为HDFS而准的特殊参数) private long blocksize; //块大小(为HDFS而准的特殊参数) private long modification_time; //最后修改时间 private long access_time; //最后访问时间 private FsPermission permission; //许可信息 private String owner; //文件所有者 private String group; //用户组 ……}
FileStatus实现了Writable接口,也就是说,FileStatus可以被序列化后在网络上传输,同时一次性将文件的所有属性读出并返回到客户端,可以减少在分布式系统中进行网络传输的次数
完整的FileStatus类的源代码如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.fs;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.fs.permission.FsPermission;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.Writable;/** Interface that represents the client side information for a file. */public class FileStatus implements Writable, Comparable { private Path path; private long length; private boolean isdir; private short block_replication; private long blocksize; private long modification_time; private long access_time; private FsPermission permission; private String owner; private String group; public FileStatus() { this(0, false, 0, 0, 0, 0, null, null, null, null); } //We should deprecate this soon? public FileStatus(long length, boolean isdir, int block_replication, long blocksize, long modification_time, Path path) { this(length, isdir, block_replication, blocksize, modification_time, 0, null, null, null, path); } public FileStatus(long length, boolean isdir, int block_replication, long blocksize, long modification_time, long access_time, FsPermission permission, String owner, String group, Path path) { this.length = length; this.isdir = isdir; this.block_replication = (short)block_replication; this.blocksize = blocksize; this.modification_time = modification_time; this.access_time = access_time; this.permission = (permission == null) ? FsPermission.getDefault() : permission; this.owner = (owner == null) ? "" : owner; this.group = (group == null) ? "" : group; this.path = path; } /* * @return the length of this file, in blocks */ public long getLen() { return length; } /** * Is this a directory? * @return true if this is a directory */ public boolean isDir() { return isdir; } /** * Get the block size of the file. * @return the number of bytes */ public long getBlockSize() { return blocksize; } /** * Get the replication factor of a file. * @return the replication factor of a file. */ public short getReplication() { return block_replication; } /** * Get the modification time of the file. * @return the modification time of file in milliseconds since January 1, 1970 UTC. */ public long getModificationTime() { return modification_time; } /** * Get the access time of the file. * @return the access time of file in milliseconds since January 1, 1970 UTC. */ public long getAccessTime() { return access_time; } /** * Get FsPermission associated with the file. * @return permssion. If a filesystem does not have a notion of permissions * or if permissions could not be determined, then default * permissions equivalent of "rwxrwxrwx" is returned. */ public FsPermission getPermission() { return permission; } /** * Get the owner of the file. * @return owner of the file. The string could be empty if there is no * notion of owner of a file in a filesystem or if it could not * be determined (rare). */ public String getOwner() { return owner; } /** * Get the group associated with the file. * @return group for the file. The string could be empty if there is no * notion of group of a file in a filesystem or if it could not * be determined (rare). */ public String getGroup() { return group; } public Path getPath() { return path; } /* These are provided so that these values could be loaded lazily * by a filesystem (e.g. local file system). */ /** * Sets permission. * @param permission if permission is null, default value is set */ protected void setPermission(FsPermission permission) { this.permission = (permission == null) ? FsPermission.getDefault() : permission; } /** * Sets owner. * @param owner if it is null, default value is set */ protected void setOwner(String owner) { this.owner = (owner == null) ? "" : owner; } /** * Sets group. * @param group if it is null, default value is set */ protected void setGroup(String group) { this.group = (group == null) ? "" : group; } // // Writable // public void write(DataOutput out) throws IOException { Text.writeString(out, getPath().toString()); out.writeLong(length); out.writeBoolean(isdir); out.writeShort(block_replication); out.writeLong(blocksize); out.writeLong(modification_time); out.writeLong(access_time); permission.write(out); Text.writeString(out, owner); Text.writeString(out, group); } public void readFields(DataInput in) throws IOException { String strPath = Text.readString(in); this.path = new Path(strPath); this.length = in.readLong(); this.isdir = in.readBoolean(); this.block_replication = in.readShort(); blocksize = in.readLong(); modification_time = in.readLong(); access_time = in.readLong(); permission.readFields(in); owner = Text.readString(in); group = Text.readString(in); } /** * Compare this object to another object * * @param o the object to be compared. * @return a negative integer, zero, or a positive integer as this object * is less than, equal to, or greater than the specified object. * * @throws ClassCastException if the specified object's is not of * type FileStatus */ public int compareTo(Object o) { FileStatus other = (FileStatus)o; return this.getPath().compareTo(other.getPath()); } /** Compare if this object is equal to another object * @param o the object to be compared. * @return true if two file status has the same path name; false if not. */ public boolean equals(Object o) { if (o == null) { return false; } if (this == o) { return true; } if (!(o instanceof FileStatus)) { return false; } FileStatus other = (FileStatus)o; return this.getPath().equals(other.getPath()); } /** * Returns a hash code value for the object, which is defined as * the hash code of the path name. * * @return a hash code value for the path name. */ public int hashCode() { return getPath().hashCode(); }}
出现在FileSystem中的,但在java文件API中找不到对应的方法有:setReplication()、getReplication()、getContentSummary(),其声明如下:
public boolean setReplication(Path src, short replication) throws IOException { return true;}public short getReplication(Path src) throws IOException { return getFileStatus(src).getReplication();}public ContentSummary getContentSummary(Path f) throws IOException { FileStatus status = getFileStatus(f); if (!status.isDir()) { // f is a file return new ContentSummary(status.getLen(), 1, 0); } // f is a directory long[] summary = {0, 0, 1}; for(FileStatus s : listStatus(f)) { ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) : new ContentSummary(s.getLen(), 1, 0); summary[0] += c.getLength(); summary[1] += c.getFileCount(); summary[2] += c.getDirectoryCount(); } return new ContentSummary(summary[0], summary[1], summary[2]);}
实现一个Hadoop具体文件系统,需要实现的功能有哪些?下面整理org.apache.hadoop.fs.FileSystem中的抽象方法:
//获取文件系统URIpublic abstract URI getUri();//为读打开一个文件,并返回一个输入流public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;//创建一个文件,并返回一个输出流public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException;//在一个已经存在的文件中追加数据public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException;//修改文件名或目录名public abstract boolean rename(Path src, Path dst) throws IOException;//删除文件public abstract boolean delete(Path f) throws IOException;public abstract boolean delete(Path f, boolean recursive) throws IOException;//如果Path是一个目录,读取一个目录下的所有项目和项目属性//如果Path是一个文件,获取文件属性public abstract FileStatus[] listStatus(Path f) throws IOException;//设置当前的工作目录public abstract void setWorkingDirectory(Path new_dir);//获取当前的工作目录public abstract Path getWorkingDirectory();//如果Path是一个文件,获取文件属性public abstract boolean mkdirs(Path f, FsPermission permission ) throws IOException;//获取文件或目录的属性public abstract FileStatus getFileStatus(Path f) throws IOException;
实现一个具体的文件系统,至少需要实现上面的这些抽象方法
hadoop完整的FileSystem类的源代码如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.fs;import java.io.Closeable;import java.io.FileNotFoundException;import java.io.IOException;import java.net.URI;import java.util.ArrayList;import java.util.Arrays;import java.util.Collection;import java.util.HashMap;import java.util.IdentityHashMap;import java.util.Iterator;import java.util.List;import java.util.Map;import java.util.Set;import java.util.TreeSet;import java.util.concurrent.atomic.AtomicLong;import java.util.regex.Pattern;import javax.security.auth.login.LoginException;import org.apache.commons.logging.*;import org.apache.hadoop.conf.*;import org.apache.hadoop.net.NetUtils;import org.apache.hadoop.util.*;import org.apache.hadoop.fs.permission.FsPermission;import org.apache.hadoop.io.MultipleIOException;import org.apache.hadoop.security.UserGroupInformation;/**************************************************************** * An abstract base class for a fairly generic filesystem. It * may be implemented as a distributed filesystem, or as a "local" * one that reflects the locally-connected disk. The local version * exists for small Hadoop instances and for testing. * ** * All user code that may potentially use the Hadoop Distributed * File System should be written to use a FileSystem object. The * Hadoop DFS is a multi-machine system that appears as a single * disk. It's useful because of its fault tolerance and potentially * very large capacity. * *
* The local implementation is {
@link LocalFileSystem} and distributed * implementation is DistributedFileSystem. *****************************************************************/public abstract class FileSystem extends Configured implements Closeable { private static final String FS_DEFAULT_NAME_KEY = "fs.default.name"; public static final Log LOG = LogFactory.getLog(FileSystem.class); /** FileSystem cache */ private static final Cache CACHE = new Cache(); /** The key this instance is stored under in the cache. */ private Cache.Key key; /** Recording statistics per a FileSystem class */ private static final Map, Statistics> statisticsTable = new IdentityHashMap , Statistics>(); /** * The statistics for this file system. */ protected Statistics statistics; /** * A cache of files that should be deleted when filsystem is closed * or the JVM is exited. */ private Set deleteOnExit = new TreeSet (); /** Returns the configured filesystem implementation.*/ public static FileSystem get(Configuration conf) throws IOException { return get(getDefaultUri(conf), conf); } /** Get the default filesystem URI from a configuration. * @param conf the configuration to access * @return the uri of the default filesystem */ public static URI getDefaultUri(Configuration conf) { return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, "file:///"))); } /** Set the default filesystem URI in a configuration. * @param conf the configuration to alter * @param uri the new default filesystem uri */ public static void setDefaultUri(Configuration conf, URI uri) { conf.set(FS_DEFAULT_NAME_KEY, uri.toString()); } /** Set the default filesystem URI in a configuration. * @param conf the configuration to alter * @param uri the new default filesystem uri */ public static void setDefaultUri(Configuration conf, String uri) { setDefaultUri(conf, URI.create(fixName(uri))); } /** Called after a new FileSystem instance is constructed. * @param name a uri whose authority section names the host, port, etc. * for this FileSystem * @param conf the configuration */ public void initialize(URI name, Configuration conf) throws IOException { statistics = getStatistics(name.getScheme(), getClass()); } /** Returns a URI whose scheme and authority identify this FileSystem.*/ public abstract URI getUri(); /** @deprecated call #getUri() instead.*/ public String getName() { return getUri().toString(); } /** @deprecated call #get(URI,Configuration) instead. */ public static FileSystem getNamed(String name, Configuration conf) throws IOException { return get(URI.create(fixName(name)), conf); } /** Update old-format filesystem names, for back-compatibility. This should * eventually be replaced with a checkName() method that throws an exception * for old-format names. */ private static String fixName(String name) { // convert old-format name to new-format name if (name.equals("local")) { // "local" is now "file:///". LOG.warn("\"local\" is a deprecated filesystem name." +" Use \"file:///\" instead."); name = "file:///"; } else if (name.indexOf('/')==-1) { // unqualified is "hdfs://" LOG.warn("\""+name+"\" is a deprecated filesystem name." +" Use \"hdfs://"+name+"/\" instead."); name = "hdfs://"+name; } return name; } /** * Get the local file syste * @param conf the configuration to configure the file system with * @return a LocalFileSystem */ public static LocalFileSystem getLocal(Configuration conf) throws IOException { return (LocalFileSystem)get(LocalFileSystem.NAME, conf); } /** Returns the FileSystem for this URI's scheme and authority. The scheme * of the URI determines a configuration property name, * fs.scheme.class whose value names the FileSystem class. * The entire URI is passed to the FileSystem instance's initialize method. */ public static FileSystem get(URI uri, Configuration conf) throws IOException { String scheme = uri.getScheme(); String authority = uri.getAuthority(); if (scheme == null) { // no scheme: use default FS return get(conf); } if (authority == null) { // no authority URI defaultUri = getDefaultUri(conf); if (scheme.equals(defaultUri.getScheme()) // if scheme matches default && defaultUri.getAuthority() != null) { // & default has authority return get(defaultUri, conf); // return default } } String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) { return createFileSystem(uri, conf); } return CACHE.get(uri, conf); } private static class ClientFinalizer extends Thread { public synchronized void run() { try { FileSystem.closeAll(); } catch (IOException e) { LOG.info("FileSystem.closeAll() threw an exception:\n" + e); } } } private static final ClientFinalizer clientFinalizer = new ClientFinalizer(); /** * Close all cached filesystems. Be sure those filesystems are not * used anymore. * * @throws IOException */ public static void closeAll() throws IOException { CACHE.closeAll(); } /** Make sure that a path specifies a FileSystem. */ public Path makeQualified(Path path) { checkPath(path); return path.makeQualified(this); } /** create a file with the provided permission * The permission of the file is set to be the provided permission as in * setPermission, not permission&~umask * * It is implemented using two RPCs. It is understood that it is inefficient, * but the implementation is thread-safe. The other option is to change the * value of umask in configuration to be 0, but it is not thread-safe. * * @param fs file system handle * @param file the name of the file to be created * @param permission the permission of the file * @return an output stream * @throws IOException */ public static FSDataOutputStream create(FileSystem fs, Path file, FsPermission permission) throws IOException { // create the file with default permission FSDataOutputStream out = fs.create(file); // set its permission to the supplied one fs.setPermission(file, permission); return out; } /** create a directory with the provided permission * The permission of the directory is set to be the provided permission as in * setPermission, not permission&~umask * * @see #create(FileSystem, Path, FsPermission) * * @param fs file system handle * @param dir the name of the directory to be created * @param permission the permission of the directory * @return true if the directory creation succeeds; false otherwise * @throws IOException */ public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission) throws IOException { // create the directory using the default permission boolean result = fs.mkdirs(dir); // set its permission to be the supplied one fs.setPermission(dir, permission); return result; } /// // FileSystem /// protected FileSystem() { super(null); } /** Check that a Path belongs to this FileSystem. */ protected void checkPath(Path path) { URI uri = path.toUri(); if (uri.getScheme() == null) // fs is relative return; String thisScheme = this.getUri().getScheme(); String thatScheme = uri.getScheme(); String thisAuthority = this.getUri().getAuthority(); String thatAuthority = uri.getAuthority(); //authority and scheme are not case sensitive if (thisScheme.equalsIgnoreCase(thatScheme)) { // schemes match if (thisAuthority == thatAuthority || // & authorities match (thisAuthority != null && thisAuthority.equalsIgnoreCase(thatAuthority))) return; if (thatAuthority == null && // path's authority is null thisAuthority != null) { // fs has an authority URI defaultUri = getDefaultUri(getConf()); // & is the conf default if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) && thisAuthority.equalsIgnoreCase(defaultUri.getAuthority())) return; try { // or the default fs's uri defaultUri = get(getConf()).getUri(); } catch (IOException e) { throw new RuntimeException(e); } if (thisScheme.equalsIgnoreCase(defaultUri.getScheme()) && thisAuthority.equalsIgnoreCase(defaultUri.getAuthority())) return; } } throw new IllegalArgumentException("Wrong FS: "+path+ ", expected: "+this.getUri()); } /** * Return an array containing hostnames, offset and size of * portions of the given file. For a nonexistent * file or regions, null will be returned. * * This call is most helpful with DFS, where it returns * hostnames of machines that contain the given file. * * The FileSystem will simply return an elt containing 'localhost'. */ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException { if (file == null) { return null; } if ( (start<0) || (len < 0) ) { throw new IllegalArgumentException("Invalid start or len parameter"); } if (file.getLen() < start) { return new BlockLocation[0]; } String[] name = { "localhost:50010" }; String[] host = { "localhost" }; return new BlockLocation[] { new BlockLocation(name, host, 0, file.getLen()) }; } /** * Opens an FSDataInputStream at the indicated Path. * @param f the file name to open * @param bufferSize the size of the buffer to be used. */ public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException; /** * Opens an FSDataInputStream at the indicated Path. * @param f the file to open */ public FSDataInputStream open(Path f) throws IOException { return open(f, getConf().getInt("io.file.buffer.size", 4096)); } /** * Opens an FSDataOutputStream at the indicated Path. * Files are overwritten by default. */ public FSDataOutputStream create(Path f) throws IOException { return create(f, true); } /** * Opens an FSDataOutputStream at the indicated Path. */ public FSDataOutputStream create(Path f, boolean overwrite) throws IOException { return create(f, overwrite, getConf().getInt("io.file.buffer.size", 4096), getDefaultReplication(), getDefaultBlockSize()); } /** * Create an FSDataOutputStream at the indicated Path with write-progress * reporting. * Files are overwritten by default. */ public FSDataOutputStream create(Path f, Progressable progress) throws IOException { return create(f, true, getConf().getInt("io.file.buffer.size", 4096), getDefaultReplication(), getDefaultBlockSize(), progress); } /** * Opens an FSDataOutputStream at the indicated Path. * Files are overwritten by default. */ public FSDataOutputStream create(Path f, short replication) throws IOException { return create(f, true, getConf().getInt("io.file.buffer.size", 4096), replication, getDefaultBlockSize()); } /** * Opens an FSDataOutputStream at the indicated Path with write-progress * reporting. * Files are overwritten by default. */ public FSDataOutputStream create(Path f, short replication, Progressable progress) throws IOException { return create(f, true, getConf().getInt("io.file.buffer.size", 4096), replication, getDefaultBlockSize(), progress); } /** * Opens an FSDataOutputStream at the indicated Path. * @param f the file name to open * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. */ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize ) throws IOException { return create(f, overwrite, bufferSize, getDefaultReplication(), getDefaultBlockSize()); } /** * Opens an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. */ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress ) throws IOException { return create(f, overwrite, bufferSize, getDefaultReplication(), getDefaultBlockSize(), progress); } /** * Opens an FSDataOutputStream at the indicated Path. * @param f the file name to open * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. */ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize ) throws IOException { return create(f, overwrite, bufferSize, replication, blockSize, null); } /** * Opens an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. */ public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress ) throws IOException { return this.create(f, FsPermission.getDefault(), overwrite, bufferSize, replication, blockSize, progress); } /** * Opens an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open * @param permission * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. * @param blockSize * @param progress * @throws IOException * @see #setPermission(Path, FsPermission) */ public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException; /** * Creates the given Path as a brand-new zero-length file. If * create fails, or if it already existed, return false. */ public boolean createNewFile(Path f) throws IOException { if (exists(f)) { return false; } else { create(f, false, getConf().getInt("io.file.buffer.size", 4096)).close(); return true; } } /** * Append to an existing file (optional operation). * Same as append(f, getConf().getInt("io.file.buffer.size", 4096), null) * @param f the existing file to be appended. * @throws IOException */ public FSDataOutputStream append(Path f) throws IOException { return append(f, getConf().getInt("io.file.buffer.size", 4096), null); } /** * Append to an existing file (optional operation). * Same as append(f, bufferSize, null). * @param f the existing file to be appended. * @param bufferSize the size of the buffer to be used. * @throws IOException */ public FSDataOutputStream append(Path f, int bufferSize) throws IOException { return append(f, bufferSize, null); } /** * Append to an existing file (optional operation). * @param f the existing file to be appended. * @param bufferSize the size of the buffer to be used. * @param progress for reporting progress if it is not null. * @throws IOException */ public abstract FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException; /** * Get replication. * * @deprecated Use getFileStatus() instead * @param src file name * @return file replication * @throws IOException */ @Deprecated public short getReplication(Path src) throws IOException { return getFileStatus(src).getReplication(); } /** * Set replication for an existing file. * * @param src file name * @param replication new replication * @throws IOException * @return true if successful; * false if file does not exist or is a directory */ public boolean setReplication(Path src, short replication) throws IOException { return true; } /** * Renames Path src to Path dst. Can take place on local fs * or remote DFS. */ public abstract boolean rename(Path src, Path dst) throws IOException; /** Delete a file. */ /** @deprecated Use delete(Path, boolean) instead */ @Deprecated public abstract boolean delete(Path f) throws IOException; /** Delete a file. * * @param f the path to delete. * @param recursive if path is a directory and set to * true, the directory is deleted else throws an exception. In * case of a file the recursive can be set to either true or false. * @return true if delete is successful else false. * @throws IOException */ public abstract boolean delete(Path f, boolean recursive) throws IOException; /** * Mark a path to be deleted when FileSystem is closed. * When the JVM shuts down, * all FileSystem objects will be closed automatically. * Then, * the marked path will be deleted as a result of closing the FileSystem. * * The path has to exist in the file system. * * @param f the path to delete. * @return true if deleteOnExit is successful, otherwise false. * @throws IOException */ public boolean deleteOnExit(Path f) throws IOException { if (!exists(f)) { return false; } synchronized (deleteOnExit) { deleteOnExit.add(f); } return true; } /** * Delete all files that were marked as delete-on-exit. This recursively * deletes all files in the specified paths. */ protected void processDeleteOnExit() { synchronized (deleteOnExit) { for (Iterator iter = deleteOnExit.iterator(); iter.hasNext();) { Path path = iter.next(); try { delete(path, true); } catch (IOException e) { LOG.info("Ignoring failure to deleteOnExit for path " + path); } iter.remove(); } } } /** Check if exists. * @param f source file */ public boolean exists(Path f) throws IOException { try { return getFileStatus(f) != null; } catch (FileNotFoundException e) { return false; } } /** True iff the named path is a directory. */ /** @deprecated Use getFileStatus() instead */ @Deprecated public boolean isDirectory(Path f) throws IOException { try { return getFileStatus(f).isDir(); } catch (FileNotFoundException e) { return false; // f does not exist } } /** True iff the named path is a regular file. */ public boolean isFile(Path f) throws IOException { try { return !getFileStatus(f).isDir(); } catch (FileNotFoundException e) { return false; // f does not exist } } /** The number of bytes in a file. */ /** @deprecated Use getFileStatus() instead */ @Deprecated public long getLength(Path f) throws IOException { return getFileStatus(f).getLen(); } /** Return the { @link ContentSummary} of a given { @link Path}. */ public ContentSummary getContentSummary(Path f) throws IOException { FileStatus status = getFileStatus(f); if (!status.isDir()) { // f is a file return new ContentSummary(status.getLen(), 1, 0); } // f is a directory long[] summary = {0, 0, 1}; for(FileStatus s : listStatus(f)) { ContentSummary c = s.isDir() ? getContentSummary(s.getPath()) : new ContentSummary(s.getLen(), 1, 0); summary[0] += c.getLength(); summary[1] += c.getFileCount(); summary[2] += c.getDirectoryCount(); } return new ContentSummary(summary[0], summary[1], summary[2]); } final private static PathFilter DEFAULT_FILTER = new PathFilter() { public boolean accept(Path file) { return true; } }; /** * List the statuses of the files/directories in the given path if the path is * a directory. * * @param f * given path * @return the statuses of the files/directories in the given patch * @throws IOException */ public abstract FileStatus[] listStatus(Path f) throws IOException; /* * Filter files/directories in the given path using the user-supplied path * filter. Results are added to the given array results
. */ private void listStatus(ArrayListresults, Path f, PathFilter filter) throws IOException { FileStatus listing[] = listStatus(f); if (listing != null) { for (int i = 0; i < listing.length; i++) { if (filter.accept(listing[i].getPath())) { results.add(listing[i]); } } } } /** * Filter files/directories in the given path using the user-supplied path * filter. * * @param f * a path name * @param filter * the user-supplied path filter * @return an array of FileStatus objects for the files under the given path * after applying the filter * @throws IOException * if encounter any problem while fetching the status */ public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException { ArrayList results = new ArrayList (); listStatus(results, f, filter); return results.toArray(new FileStatus[results.size()]); } /** * Filter files/directories in the given list of paths using default * path filter. * * @param files * a list of paths * @return a list of statuses for the files under the given paths after * applying the filter default Path filter * @exception IOException */ public FileStatus[] listStatus(Path[] files) throws IOException { return listStatus(files, DEFAULT_FILTER); } /** * Filter files/directories in the given list of paths using user-supplied * path filter. * * @param files * a list of paths * @param filter * the user-supplied path filter * @return a list of statuses for the files under the given paths after * applying the filter * @exception IOException */ public FileStatus[] listStatus(Path[] files, PathFilter filter) throws IOException { ArrayList results = new ArrayList (); for (int i = 0; i < files.length; i++) { listStatus(results, files[i], filter); } return results.toArray(new FileStatus[results.size()]); } /** * Return all the files that match filePattern and are not checksum * files. Results are sorted by their names. * *
* A filename pattern is composed of regular characters and * special pattern matching characters, which are: * *
- *
- *
- *
- ? *
- Matches any single character. * *
*
- * *
- Matches zero or more characters. * *
*
- [abc] *
- Matches a single character from character set * {a,b,c}. * *
*
- [a-b] *
- Matches a single character from the character range * {a...b}. Note that character a must be * lexicographically less than or equal to character b. * *
*
- [^a] *
- Matches a single character that is not from character set or range * {a}. Note that the ^ character must occur * immediately to the right of the opening bracket. * *
*
- \c *
- Removes (escapes) any special meaning of character c. * *
*
- {ab,cd} *
- Matches a string from the string set {ab, cd} * *
*
- {ab,c{de,fh}} *
- Matches a string from the string set {ab, cde, cfh} * *
*
*
level
, N-1
]. */ private Path[] globPathsLevel(Path[] parents, String[] filePattern, int level, boolean[] hasGlob) throws IOException { if (level == filePattern.length - 1) return parents; if (parents == null || parents.length == 0) { return null; } GlobFilter fp = new GlobFilter(filePattern[level]); if (fp.hasPattern()) { parents = FileUtil.stat2Paths(listStatus(parents, fp)); hasGlob[0] = true; } else { for (int i = 0; i < parents.length; i++) { parents[i] = new Path(parents[i], filePattern[level]); } } return globPathsLevel(parents, filePattern, level + 1, hasGlob); } /* A class that could decide if a string matches the glob or not */ private static class GlobFilter implements PathFilter { private PathFilter userFilter = DEFAULT_FILTER; private Pattern regex; private boolean hasPattern = false; /** Default pattern character: Escape any special meaning. */ private static final char PAT_ESCAPE = '\\'; /** Default pattern character: Any single character. */ private static final char PAT_ANY = '.'; /** Default pattern character: Character set close. */ private static final char PAT_SET_CLOSE = ']'; GlobFilter() { } GlobFilter(String filePattern) throws IOException { setRegex(filePattern); } GlobFilter(String filePattern, PathFilter filter) throws IOException { userFilter = filter; setRegex(filePattern); } private boolean isJavaRegexSpecialChar(char pChar) { return pChar == '.' || pChar == '$' || pChar == '(' || pChar == ')' || pChar == '|' || pChar == '+'; } void setRegex(String filePattern) throws IOException { int len; int setOpen; int curlyOpen; boolean setRange; StringBuilder fileRegex = new StringBuilder(); // Validate the pattern len = filePattern.length(); if (len == 0) return; setOpen = 0; setRange = false; curlyOpen = 0; for (int i = 0; i < len; i++) { char pCh; // Examine a single pattern character pCh = filePattern.charAt(i); if (pCh == PAT_ESCAPE) { fileRegex.append(pCh); i++; if (i >= len) error("An escaped character does not present", filePattern, i); pCh = filePattern.charAt(i); } else if (isJavaRegexSpecialChar(pCh)) { fileRegex.append(PAT_ESCAPE); } else if (pCh == '*') { fileRegex.append(PAT_ANY); hasPattern = true; } else if (pCh == '?') { pCh = PAT_ANY; hasPattern = true; } else if (pCh == '{') { fileRegex.append('('); pCh = '('; curlyOpen++; hasPattern = true; } else if (pCh == ',' && curlyOpen > 0) { fileRegex.append(")|"); pCh = '('; } else if (pCh == '}' && curlyOpen > 0) { // End of a group curlyOpen--; fileRegex.append(")"); pCh = ')'; } else if (pCh == '[' && setOpen == 0) { setOpen++; hasPattern = true; } else if (pCh == '^' && setOpen > 0) { } else if (pCh == '-' && setOpen > 0) { // Character set range setRange = true; } else if (pCh == PAT_SET_CLOSE && setRange) { // Incomplete character set range error("Incomplete character set range", filePattern, i); } else if (pCh == PAT_SET_CLOSE && setOpen > 0) { // End of a character set if (setOpen < 2) error("Unexpected end of set", filePattern, i); setOpen = 0; } else if (setOpen > 0) { // Normal character, or the end of a character set range setOpen++; setRange = false; } fileRegex.append(pCh); } // Check for a well-formed pattern if (setOpen > 0 || setRange || curlyOpen > 0) { // Incomplete character set or character range error("Expecting set closure character or end of range, or }", filePattern, len); } regex = Pattern.compile(fileRegex.toString()); } boolean hasPattern() { return hasPattern; } public boolean accept(Path path) { return regex.matcher(path.getName()).matches() && userFilter.accept(path); } private void error(String s, String pattern, int pos) throws IOException { throw new IOException("Illegal file pattern: " +s+ " for glob "+ pattern + " at " + pos); } } /** Return the current user's home directory in this filesystem. * The default implementation returns "/user/$USER/". */ public Path getHomeDirectory() { return new Path("/user/"+System.getProperty("user.name")) .makeQualified(this); } /** * Set the current working directory for the given file system. All relative * paths will be resolved relative to it. * * @param new_dir */ public abstract void setWorkingDirectory(Path new_dir); /** * Get the current working directory for the given file system * @return the directory pathname */ public abstract Path getWorkingDirectory(); /** * Call { @link #mkdirs(Path, FsPermission)} with default permission. */ public boolean mkdirs(Path f) throws IOException { return mkdirs(f, FsPermission.getDefault()); } /** * Make the given file and all non-existent parents into * directories. Has the semantics of Unix 'mkdir -p'. * Existence of the directory hierarchy is not an error. */ public abstract boolean mkdirs(Path f, FsPermission permission ) throws IOException; /** * The src file is on the local disk. Add it to FS at * the given dst name and the source is kept intact afterwards */ public void copyFromLocalFile(Path src, Path dst) throws IOException { copyFromLocalFile(false, src, dst); } /** * The src files is on the local disk. Add it to FS at * the given dst name, removing the source afterwards. */ public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException { copyFromLocalFile(true, true, srcs, dst); } /** * The src file is on the local disk. Add it to FS at * the given dst name, removing the source afterwards. */ public void moveFromLocalFile(Path src, Path dst) throws IOException { copyFromLocalFile(true, src, dst); } /** * The src file is on the local disk. Add it to FS at * the given dst name. * delSrc indicates if the source should be removed */ public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws IOException { copyFromLocalFile(delSrc, true, src, dst); } /** * The src files are on the local disk. Add it to FS at * the given dst name. * delSrc indicates if the source should be removed */ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] srcs, Path dst) throws IOException { Configuration conf = getConf(); FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf); } /** * The src file is on the local disk. Add it to FS at * the given dst name. * delSrc indicates if the source should be removed */ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, Path dst) throws IOException { Configuration conf = getConf(); FileUtil.copy(getLocal(conf), src, this, dst, delSrc, overwrite, conf); } /** * The src file is under FS, and the dst is on the local disk. * Copy it from FS control to the local dst name. */ public void copyToLocalFile(Path src, Path dst) throws IOException { copyToLocalFile(false, src, dst); } /** * The src file is under FS, and the dst is on the local disk. * Copy it from FS control to the local dst name. * Remove the source afterwards */ public void moveToLocalFile(Path src, Path dst) throws IOException { copyToLocalFile(true, src, dst); } /** * The src file is under FS, and the dst is on the local disk. * Copy it from FS control to the local dst name. * delSrc indicates if the src will be removed or not. */ public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws IOException { FileUtil.copy(this, src, getLocal(getConf()), dst, delSrc, getConf()); } /** * Returns a local File that the user can write output to. The caller * provides both the eventual FS target name and the local working * file. If the FS is local, we write directly into the target. If * the FS is remote, we write into the tmp local area. */ public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException { return tmpLocalFile; } /** * Called when we're all done writing to the target. A local FS will * do nothing, because we've written to exactly the right place. A remote * FS will copy the contents of tmpLocalFile to the correct target at * fsOutputFile. */ public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile) throws IOException { moveFromLocalFile(tmpLocalFile, fsOutputFile); } /** * No more filesystem operations are needed. Will * release any held locks. */ public void close() throws IOException { // delete all files that were marked as delete-on-exit. processDeleteOnExit(); CACHE.remove(this.key, this); } /** Return the total size of all files in the filesystem.*/ public long getUsed() throws IOException{ long used = 0; FileStatus[] files = listStatus(new Path("/")); for(FileStatus file:files){ used += file.getLen(); } return used; } /** * Get the block size for a particular file. * @param f the filename * @return the number of bytes in a block */ /** @deprecated Use getFileStatus() instead */ @Deprecated public long getBlockSize(Path f) throws IOException { return getFileStatus(f).getBlockSize(); } /** Return the number of bytes that large input files should be optimally * be split into to minimize i/o time. */ public long getDefaultBlockSize() { // default to 32MB: large enough to minimize the impact of seeks return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024); } /** * Get the default replication. */ public short getDefaultReplication() { return 1; } /** * Return a file status object that represents the path. * @param f The path we want information from * @return a FileStatus object * @throws FileNotFoundException when the path does not exist; * IOException see specific implementation */ public abstract FileStatus getFileStatus(Path f) throws IOException; /** * Get the checksum of a file. * * @param f The file path * @return The file checksum. The default return value is null, * which indicates that no checksum algorithm is implemented * in the corresponding FileSystem. */ public FileChecksum getFileChecksum(Path f) throws IOException { return null; } /** * Set the verify checksum flag. This is only applicable if the * corresponding FileSystem supports checksum. By default doesn't do anything. * @param verifyChecksum */ public void setVerifyChecksum(boolean verifyChecksum) { //doesn't do anything } /** * Return a list of file status objects that corresponds to the list of paths * excluding those non-existent paths. * * @param paths * the list of paths we want information from * @return a list of FileStatus objects * @throws IOException * see specific implementation */ private FileStatus[] getFileStatus(Path[] paths) throws IOException { if (paths == null) { return null; } ArrayList Hadoop 输入/输出流
Hadoop抽象文件系统和java类似,也是使用流机制进行文件的读写,用于读文件数据流和写文件的抽象类分别是:FSDataInputStream和FSDataOutputStream
1、FSDataInputStream
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable {……}
可以看到,FSDataInputStream继承自DataInputStream类,实现了Seekable和PositionedReadable接口
Seekable接口提供在(文件)流中进行随机存取的方法,其功能类似于RandomAccessFile中的getFilePointer()和seek()方法,它提供了某种随机定位文件读取位置的能力
Seekable接口代码以及相关注释如下:
/** 接口,用于支持在流中定位. */public interface Seekable { /** * 将当前偏移量设置到参数位置,下次读取数据将从该位置开始 */ void seek(long pos) throws IOException; /**得到当前偏移量 */ long getPos() throws IOException; /**重新选择一个副本 */ boolean seekToNewSource(long targetPos) throws IOException;}
完整的FSDataInputStream类源代码如下:
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.fs;import java.io.*;/** Utility that wraps a { @link FSInputStream} in a { @link DataInputStream} * and buffers input through a { @link BufferedInputStream}. */public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable { public FSDataInputStream(InputStream in) throws IOException { super(in); if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) { throw new IllegalArgumentException( "In is not an instance of Seekable or PositionedReadable"); } } public synchronized void seek(long desired) throws IOException { ((Seekable)in).seek(desired); } public long getPos() throws IOException { return ((Seekable)in).getPos(); } public int read(long position, byte[] buffer, int offset, int length) throws IOException { return ((PositionedReadable)in).read(position, buffer, offset, length); } public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { ((PositionedReadable)in).readFully(position, buffer, offset, length); } public void readFully(long position, byte[] buffer) throws IOException { ((PositionedReadable)in).readFully(position, buffer, 0, buffer.length); } public boolean seekToNewSource(long targetPos) throws IOException { return ((Seekable)in).seekToNewSource(targetPos); }}
FSDataInputStream实现的另一个接口是PositionedReadable,它提供了从流中某一个位置开始读数据的一系列方法:
//接口,用于在流中进行定位读public interface PositionedReadable { //从指定位置开始,读最多指定长度的数据到buffer中offset开始的缓冲区中 //注意,该函数不改变读流的当前位置,同时,它是线程安全的 public int read(long position, byte[] buffer, int offset, int length) throws IOException; //从指定位置开始,读指定长度的数据到buffer中offset开始的缓冲区中 public void readFully(long position, byte[] buffer, int offset, int length) throws IOException; public void readFully(long position, byte[] buffer) throws IOException;}
PositionedReadable中的3个读方法,都不会改变流的当前位置,而且还是线程安全的
2、FSInputStream
org.apache.hadoop.fs包中还包含抽象类FSInputStream。Seekable接口和PositionedReadable中的方法都成为这个类的抽象方法
在FSInputStream类中,通过Seekable接口的seek()方法实现了PositionedReadable接口中的read()方法
//实现PositionedReadable.read()方法public int read(long position, byte[] buffer, int offset, int length) throws IOException { /** * 由于PositionedReadable.read()是线程安全的,所以此处要借助synchronized (this) * 来保证方法被调用的时候其他方法不会被调用,也保证不会有其他线程改变Seekable.getPos()保存的 * 当前读位置 */ synchronized (this) { long oldPos = getPos(); //保存当前读的位置,调用 Seekable.getPos() int nread = -1; try { seek(position); //移动读数据的位置,调用Seekable.seek() nread = read(buffer, offset, length); //调用InputStream.read()读取数据 } finally { seek(oldPos); //调用Seekable.seek()恢复InputStream.read()前的位置 } return nread; }}
完整的FSInputStream源代码如下:
package org.apache.hadoop.fs;import java.io.*;/**************************************************************** * FSInputStream is a generic old InputStream with a little bit * of RAF-style seek ability. * *****************************************************************/public abstract class FSInputStream extends InputStream implements Seekable, PositionedReadable { /** * Seek to the given offset from the start of the file. * The next read() will be from that location. Can't * seek past the end of the file. */ public abstract void seek(long pos) throws IOException; /** * Return the current offset from the start of the file */ public abstract long getPos() throws IOException; /** * Seeks a different copy of the data. Returns true if * found a new source, false otherwise. */ public abstract boolean seekToNewSource(long targetPos) throws IOException; public int read(long position, byte[] buffer, int offset, int length) throws IOException { synchronized (this) { long oldPos = getPos(); int nread = -1; try { seek(position); nread = read(buffer, offset, length); } finally { seek(oldPos); } return nread; } } public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { int nread = 0; while (nread < length) { int nbytes = read(position+nread, buffer, offset+nread, length-nread); if (nbytes < 0) { throw new EOFException("End of file reached before reading fully."); } nread += nbytes; } } public void readFully(long position, byte[] buffer) throws IOException { readFully(position, buffer, 0, buffer.length); }}
注意:hadoop中没有相对应的FSOutputStream类
3、FSDataOutputStream
FSDataOutputStream用于写数据,和FSDataInputStream类似,继承自DataOutputStream,提供writeInt()和writeChar()等方法,但是FSDataOutputStream更加的简单,没有实现Seekable接口,也就是说,Hadoop文件系统不支持随机写,用户不能在文件中重新定位写位置,并通过写数据来覆盖文件原有的内容。单用户可以通过getPos()方法获得当前流的写位置,为了实现getPos()方法,FSDataOutputStream定义了内部类PositionCache,该类继承自FilterOutputStream,并通过重载write()方法跟踪目前流的写位置.
PositionCache是一个典型的过滤流,在基础的流功能上添加了getPos()方法,同时利用FileSystem.Statistics实现了文件系统读写的一些统计。
public class FSDataOutputStream extends DataOutputStream implements Syncable { private OutputStream wrappedStream; private static class PositionCache extends FilterOutputStream { private FileSystem.Statistics statistics; long position; //当前流的写位置 public PositionCache(OutputStream out, FileSystem.Statistics stats, long pos) throws IOException { super(out); statistics = stats; position = pos; } public void write(int b) throws IOException { out.write(b); position++; //跟新当前位置 if (statistics != null) { statistics.incrementBytesWritten(1); //跟新文件统计值 } } public void write(byte b[], int off, int len) throws IOException { out.write(b, off, len); position += len; // update position if (statistics != null) { statistics.incrementBytesWritten(len); } } public long getPos() throws IOException { return position; //返回当前流的写位置 } public void close() throws IOException { out.close(); } } @Deprecated public FSDataOutputStream(OutputStream out) throws IOException { this(out, null); } public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) throws IOException { this(out, stats, 0); } public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats, long startPosition) throws IOException { super(new PositionCache(out, stats, startPosition)); //直接生成PositionCache对象并调用父类构造方法 wrappedStream = out; } public long getPos() throws IOException { return ((PositionCache)out).getPos(); } public void close() throws IOException { out.close(); // This invokes PositionCache.close() } // Returns the underlying output stream. This is used by unit tests. public OutputStream getWrappedStream() { return wrappedStream; } /** { @inheritDoc} */ public void sync() throws IOException { if (wrappedStream instanceof Syncable) { ((Syncable)wrappedStream).sync(); } }}
FSDataOutputStream实现了Syncable接口,该接口只有一个函数sync(),其目的和Linux中系统调用sync()类似,用于将流中保存的数据同步到设备中
/** This interface declare the sync() operation. */public interface Syncable { /** * Synchronize all buffer with the underlying devices. * @throws IOException */ public void sync() throws IOException;}