1   package eu.fbk.knowledgestore.filestore;
2   
3   import java.io.IOException;
4   import java.io.InputStream;
5   import java.io.OutputStream;
6   
7   import javax.annotation.Nullable;
8   
9   import com.google.common.base.MoreObjects;
10  import com.google.common.base.Preconditions;
11  import com.google.common.base.Throwables;
12  import com.google.common.collect.AbstractIterator;
13  
14  import org.apache.hadoop.fs.FileStatus;
15  import org.apache.hadoop.fs.FileSystem;
16  import org.apache.hadoop.fs.Path;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import eu.fbk.knowledgestore.data.Data;
21  import eu.fbk.knowledgestore.data.Stream;
22  
23  /**
24   * A {@code FileStore} implementation based on the Hadoop API.
25   * <p>
26   * An {@code HadoopFileStore} stores its files in an Hadoop {@link FileSystem}, under a certain,
27   * configurable root path; the filesystem can be any of the filesystems supported by the Hadoop
28   * API, including the local (raw) filesystem and the distributed HDFS filesystem.
29   * </p>
30   * <p>
31   * Files are stored in a a two-level directory structure, where first level directories reflect
32   * the MIME types of stored files, and second level directories are buckets of files whose name is
33   * obtained by hashing the filename; buckets are used in order to equally split a large number of
34   * files in several subdirectories, overcoming possible filesystem limitations in terms of maximum
35   * number of files storable in a directory.
36   * </p>
37   */
38  public class HadoopFileStore implements FileStore {
39  
40      private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFileStore.class);
41  
42      private static final String DEFAULT_PATH = "files";
43  
44      private final FileSystem fileSystem;
45  
46      private final Path rootPath;
47  
48      /**
49       * Creates a new {@code HadoopFileStore} storing files in the {@code FileSystem} and under the
50       * {@code rootPath} specified.
51       *
52       * @param fileSystem
53       *            the file system, not null
54       * @param path
55       *            the root path where to store files, possibly relative to the filesystem working
56       *            directory; if null, the default root path {@code files} will be used
57       */
58      public HadoopFileStore(final FileSystem fileSystem, @Nullable final String path) {
59          this.fileSystem = Preconditions.checkNotNull(fileSystem);
60          this.rootPath = new Path(MoreObjects.firstNonNull(path, DEFAULT_PATH))
61                  .makeQualified(this.fileSystem); // resolve wrt workdir
62          LOGGER.info("{} configured, path={}", getClass().getSimpleName(), this.rootPath);
63      }
64  
65      @Override
66      public void init() throws IOException {
67          if (!this.fileSystem.exists(this.rootPath)) {
68              this.fileSystem.mkdirs(this.rootPath);
69          }
70      }
71  
72      @Override
73      public InputStream read(final String fileName) throws FileMissingException, IOException {
74          final Path path = getFullPath(fileName);
75          try {
76              final InputStream stream = this.fileSystem.open(path);
77              if (LOGGER.isDebugEnabled()) {
78                  LOGGER.debug("Reading file " + getRelativePath(path));
79              }
80              return stream;
81          } catch (final IOException ex) {
82              if (!this.fileSystem.exists(path)) {
83                  throw new FileMissingException(fileName, "Cannot read non-existing file");
84              }
85              throw ex;
86          }
87      }
88  
89      @Override
90      public OutputStream write(final String fileName) throws FileExistsException, IOException {
91          final Path path = getFullPath(fileName);
92          try {
93              final OutputStream stream = this.fileSystem.create(path, false);
94              if (LOGGER.isDebugEnabled()) {
95                  LOGGER.debug("Creating file " + getRelativePath(path));
96              }
97              return stream;
98          } catch (final IOException ex) {
99              if (this.fileSystem.exists(path)) {
100                 throw new FileExistsException(fileName, "Cannot overwrite file");
101             }
102             throw ex;
103         }
104     }
105 
106     @Override
107     public void delete(final String fileName) throws FileMissingException, IOException {
108         final Path path = getFullPath(fileName);
109         boolean deleted = false;
110         try {
111             deleted = this.fileSystem.delete(path, false);
112             if (deleted) {
113                 final Path parent = path.getParent();
114                 if (this.fileSystem.listStatus(parent).length == 0) {
115                     this.fileSystem.delete(parent, false);
116                 }
117             }
118 
119         } finally {
120             if (!deleted && !this.fileSystem.exists(path)) {
121                 throw new FileMissingException(fileName, "Cannot delete non-existing file.");
122             }
123         }
124         if (LOGGER.isDebugEnabled()) {
125             LOGGER.debug("Deleted file " + getRelativePath(path));
126         }
127     }
128 
129     @Override
130     public Stream<String> list() throws IOException {
131         return Stream.create(new HadoopIterator());
132     }
133 
134     @Override
135     public void close() {
136         // Nothing to do here. FileSystems are cached and closed by Hadoop at shutdown.
137     }
138 
139     @Override
140     public String toString() {
141         return getClass().getSimpleName();
142     }
143 
144     private Path getFullPath(final String fileName) {
145         final String typeDirectory = MoreObjects.firstNonNull(Data.extensionToMimeType(fileName),
146                 "application/octet-stream").replace('/', '_');
147         final String bucketDirectory = Data.hash(fileName).substring(0, 2);
148         return new Path(this.rootPath, typeDirectory + "/" + bucketDirectory + "/" + fileName);
149     }
150 
151     private String getRelativePath(final Path path) {
152         return path.toString().substring(this.rootPath.toString().length());
153     }
154 
155     private class HadoopIterator extends AbstractIterator<String> {
156 
157         private final FileStatus[] typeDirectories;
158 
159         private FileStatus[] bucketDirectories;
160 
161         private FileStatus[] files;
162 
163         private int typeIndex;
164 
165         private int bucketIndex;
166 
167         private int fileIndex;
168 
169         HadoopIterator() throws IOException {
170             this.typeDirectories = HadoopFileStore.this.fileSystem
171                     .listStatus(HadoopFileStore.this.rootPath);
172             this.bucketDirectories = new FileStatus[] {};
173             this.files = new FileStatus[] {};
174         }
175 
176         @Override
177         protected String computeNext() {
178             try {
179                 while (true) {
180                     if (this.fileIndex < this.files.length) {
181                         final FileStatus file = this.files[this.fileIndex++];
182                         if (!file.isDir()) {
183                             return file.getPath().getName();
184                         }
185                     } else if (this.bucketIndex < this.bucketDirectories.length) {
186                         final FileStatus bucketDirectory;
187                         bucketDirectory = this.bucketDirectories[this.bucketIndex++];
188                         if (bucketDirectory.isDir()) {
189                             this.files = HadoopFileStore.this.fileSystem
190                                     .listStatus(bucketDirectory.getPath());
191                             this.fileIndex = 0;
192                         }
193                     } else if (this.typeIndex < this.typeDirectories.length) {
194                         final FileStatus typeDirectory;
195                         typeDirectory = this.typeDirectories[this.typeIndex++];
196                         if (typeDirectory.isDir()) {
197                             this.bucketDirectories = HadoopFileStore.this.fileSystem
198                                     .listStatus(typeDirectory.getPath());
199                             this.bucketIndex = 0;
200                         }
201                     } else {
202                         return endOfData();
203                     }
204                 }
205             } catch (final Throwable ex) {
206                 throw Throwables.propagate(ex);
207             }
208         }
209 
210     }
211 
212 }