diff options
Diffstat (limited to 'finder/finder.go')
-rw-r--r-- | finder/finder.go | 1399 |
1 files changed, 1399 insertions, 0 deletions
diff --git a/finder/finder.go b/finder/finder.go new file mode 100644 index 00000000..ad85ee9a --- /dev/null +++ b/finder/finder.go @@ -0,0 +1,1399 @@ +// Copyright 2017 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package finder + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "runtime" + "sort" + "strings" + "sync" + "sync/atomic" + "time" + + "android/soong/fs" + "errors" +) + +// This file provides a Finder struct that can quickly search for files satisfying +// certain criteria. +// This Finder gets its speed partially from parallelism and partially from caching. +// If a Stat call returns the same result as last time, then it means Finder +// can skip the ReadDir call for that dir. + +// The primary data structure used by the finder is the field Finder.nodes , +// which is a tree of nodes of type *pathMap . +// Each node represents a directory on disk, along with its stats, subdirectories, +// and contained files. + +// The common use case for the Finder is that the caller creates a Finder and gives +// it the same query that was given to it in the previous execution. +// In this situation, the major events that take place are: +// 1. The Finder begins to load its db +// 2. The Finder begins to stat the directories mentioned in its db (using multiple threads) +// Calling Stat on each of these directories is generally a large fraction of the total time +// 3. The Finder begins to construct a separate tree of nodes in each of its threads +// 4. The Finder merges the individual node trees into the main node tree +// 5. The Finder may call ReadDir a few times if there are a few directories that are out-of-date +// These ReadDir calls might prompt additional Stat calls, etc +// 6. The Finder waits for all loading to complete +// 7. The Finder searches the cache for files matching the user's query (using multiple threads) + +// These are the invariants regarding concurrency: +// 1. The public methods of Finder are threadsafe. +// The public methods are only performance-optimized for one caller at a time, however. +// For the moment, multiple concurrent callers shouldn't expect any better performance than +// multiple serial callers. +// 2. While building the node tree, only one thread may ever access the <children> collection of a +// *pathMap at once. +// a) The thread that accesses the <children> collection is the thread that discovers the +// children (by reading them from the cache or by having received a response to ReadDir). +// 1) Consequently, the thread that discovers the children also spawns requests to stat +// subdirs. +// b) Consequently, while building the node tree, no thread may do a lookup of its +// *pathMap via filepath because another thread may be adding children to the +// <children> collection of an ancestor node. Additionally, in rare cases, another thread +// may be removing children from an ancestor node if the children were only discovered to +// be irrelevant after calling ReadDir (which happens if a prune-file was just added). +// 3. No query will begin to be serviced until all loading (both reading the db +// and scanning the filesystem) is complete. +// Tests indicate that it only takes about 10% as long to search the in-memory cache as to +// generate it, making this not a huge loss in performance. +// 4. The parsing of the db and the initial setup of the pathMap tree must complete before +// beginning to call listDirSync (because listDirSync can create new entries in the pathMap) + +// see cmd/finder.go or finder_test.go for usage examples + +// Update versionString whenever making a backwards-incompatible change to the cache file format +const versionString = "Android finder version 1" + +// a CacheParams specifies which files and directories the user wishes be scanned and +// potentially added to the cache +type CacheParams struct { + // WorkingDirectory is used as a base for any relative file paths given to the Finder + WorkingDirectory string + + // RootDirs are the root directories used to initiate the search + RootDirs []string + + // ExcludeDirs are directory names that if encountered are removed from the search + ExcludeDirs []string + + // PruneFiles are file names that if encountered prune their entire directory + // (including siblings) + PruneFiles []string + + // IncludeFiles are file names to include as matches + IncludeFiles []string +} + +// a cacheConfig stores the inputs that determine what should be included in the cache +type cacheConfig struct { + CacheParams + + // FilesystemView is a unique identifier telling which parts of which file systems + // are readable by the Finder. In practice its value is essentially username@hostname. + // FilesystemView is set to ensure that a cache file copied to another host or + // found by another user doesn't inadvertently get reused. + FilesystemView string +} + +func (p *cacheConfig) Dump() ([]byte, error) { + bytes, err := json.Marshal(p) + return bytes, err +} + +// a cacheMetadata stores version information about the cache +type cacheMetadata struct { + // The Version enables the Finder to determine whether it can even parse the file + // If the version changes, the entire cache file must be regenerated + Version string + + // The CacheParams enables the Finder to determine whether the parameters match + // If the CacheParams change, the Finder can choose how much of the cache file to reuse + // (although in practice, the Finder will probably choose to ignore the entire file anyway) + Config cacheConfig +} + +type Logger interface { + Output(calldepth int, s string) error +} + +// the Finder is the main struct that callers will want to use +type Finder struct { + // configuration + DbPath string + numDbLoadingThreads int + numSearchingThreads int + cacheMetadata cacheMetadata + logger Logger + filesystem fs.FileSystem + + // temporary state + threadPool *threadPool + mutex sync.Mutex + + // non-temporary state + modifiedFlag int32 + nodes pathMap +} + +// New creates a new Finder for use +func New(cacheParams CacheParams, filesystem fs.FileSystem, + logger Logger, dbPath string) *Finder { + + numThreads := runtime.NumCPU() * 2 + numDbLoadingThreads := numThreads + numSearchingThreads := numThreads + + metadata := cacheMetadata{ + Version: versionString, + Config: cacheConfig{ + CacheParams: cacheParams, + FilesystemView: filesystem.ViewId(), + }, + } + + finder := &Finder{ + numDbLoadingThreads: numDbLoadingThreads, + numSearchingThreads: numSearchingThreads, + cacheMetadata: metadata, + logger: logger, + filesystem: filesystem, + + nodes: *newPathMap("/"), + DbPath: dbPath, + } + + finder.loadFromFilesystem() + + finder.verbosef("Done parsing db\n") + return finder +} + +// FindNamed searches for every cached file +func (f *Finder) FindAll() []string { + return f.FindAt("/") +} + +// FindNamed searches for every cached file under <rootDir> +func (f *Finder) FindAt(rootDir string) []string { + filter := func(entries DirEntries) (dirNames []string, fileNames []string) { + return entries.DirNames, entries.FileNames + } + return f.FindMatching(rootDir, filter) +} + +// FindNamed searches for every cached file named <fileName> +func (f *Finder) FindNamed(fileName string) []string { + return f.FindNamedAt("/", fileName) +} + +// FindNamedAt searches under <rootPath> for every file named <fileName> +// The reason a caller might use FindNamedAt instead of FindNamed is if they want +// to limit their search to a subset of the cache +func (f *Finder) FindNamedAt(rootPath string, fileName string) []string { + filter := func(entries DirEntries) (dirNames []string, fileNames []string) { + matches := []string{} + for _, foundName := range entries.FileNames { + if foundName == fileName { + matches = append(matches, foundName) + } + } + return entries.DirNames, matches + + } + return f.FindMatching(rootPath, filter) +} + +// FindFirstNamed searches for every file named <fileName> +// Whenever it finds a match, it stops search subdirectories +func (f *Finder) FindFirstNamed(fileName string) []string { + return f.FindFirstNamedAt("/", fileName) +} + +// FindFirstNamedAt searches for every file named <fileName> +// Whenever it finds a match, it stops search subdirectories +func (f *Finder) FindFirstNamedAt(rootPath string, fileName string) []string { + filter := func(entries DirEntries) (dirNames []string, fileNames []string) { + matches := []string{} + for _, foundName := range entries.FileNames { + if foundName == fileName { + matches = append(matches, foundName) + } + } + + if len(matches) > 0 { + return []string{}, matches + } + return entries.DirNames, matches + } + return f.FindMatching(rootPath, filter) +} + +// FindMatching is the most general exported function for searching for files in the cache +// The WalkFunc will be invoked repeatedly and is expected to modify the provided DirEntries +// in place, removing file paths and directories as desired. +// WalkFunc will be invoked potentially many times in parallel, and must be threadsafe. +func (f *Finder) FindMatching(rootPath string, filter WalkFunc) []string { + // set up some parameters + scanStart := time.Now() + var isRel bool + workingDir := f.cacheMetadata.Config.WorkingDirectory + + isRel = !filepath.IsAbs(rootPath) + if isRel { + rootPath = filepath.Join(workingDir, rootPath) + } + + rootPath = filepath.Clean(rootPath) + + // ensure nothing else is using the Finder + f.verbosef("FindMatching waiting for finder to be idle\n") + f.lock() + defer f.unlock() + + node := f.nodes.GetNode(rootPath, false) + if node == nil { + f.verbosef("No data for path %v ; apparently not included in cache params: %v\n", + rootPath, f.cacheMetadata.Config.CacheParams) + // path is not found; don't do a search + return []string{} + } + + // search for matching files + f.verbosef("Finder finding %v using cache\n", rootPath) + results := f.findInCacheMultithreaded(node, filter, f.numSearchingThreads) + + // format and return results + if isRel { + for i := 0; i < len(results); i++ { + results[i] = strings.Replace(results[i], workingDir+"/", "", 1) + } + } + sort.Strings(results) + f.verbosef("Found %v files under %v in %v using cache\n", + len(results), rootPath, time.Since(scanStart)) + return results +} + +// Shutdown saves the contents of the Finder to its database file +func (f *Finder) Shutdown() { + f.verbosef("Shutting down\n") + if f.wasModified() { + err := f.dumpDb() + if err != nil { + f.verbosef("%v\n", err) + } + } else { + f.verbosef("Skipping dumping unmodified db\n") + } +} + +// End of public api + +// joinCleanPaths is like filepath.Join but is faster because +// joinCleanPaths doesn't have to support paths ending in "/" or containing ".." +func joinCleanPaths(base string, leaf string) string { + if base == "" { + return leaf + } + if base == "/" { + return base + leaf + } + if leaf == "" { + return base + } + return base + "/" + leaf +} + +func (f *Finder) verbosef(format string, args ...interface{}) { + f.logger.Output(2, fmt.Sprintf(format, args...)) +} + +// loadFromFilesystem populates the in-memory cache based on the contents of the filesystem +func (f *Finder) loadFromFilesystem() { + f.threadPool = newThreadPool(f.numDbLoadingThreads) + + err := f.startFromExternalCache() + if err != nil { + f.startWithoutExternalCache() + } + + startTime := time.Now() + f.verbosef("Waiting for pending requests to complete\n") + f.threadPool.Wait() + f.verbosef("Is idle after %v\n", time.Now().Sub(startTime)) + f.threadPool = nil +} + +func (f *Finder) startFind(path string) { + if !filepath.IsAbs(path) { + path = filepath.Join(f.cacheMetadata.Config.WorkingDirectory, path) + } + node := f.nodes.GetNode(path, true) + f.statDirAsync(node) +} + +func (f *Finder) lock() { + f.mutex.Lock() +} + +func (f *Finder) unlock() { + f.mutex.Unlock() +} + +// a statResponse is the relevant portion of the response from the filesystem to a Stat call +type statResponse struct { + ModTime int64 + Inode uint64 + Device uint64 +} + +// a pathAndStats stores a path and its stats +type pathAndStats struct { + statResponse + + Path string +} + +// a dirFullInfo stores all of the relevant information we know about a directory +type dirFullInfo struct { + pathAndStats + + FileNames []string +} + +// a PersistedDirInfo is the information about a dir that we save to our cache on disk +type PersistedDirInfo struct { + // These field names are short because they are repeated many times in the output json file + P string // path + T int64 // modification time + I uint64 // inode number + F []string // relevant filenames contained +} + +// a PersistedDirs is the information that we persist for a group of dirs +type PersistedDirs struct { + // the device on which each directory is stored + Device uint64 + // the common root path to which all contained dirs are relative + Root string + // the directories themselves + Dirs []PersistedDirInfo +} + +// a CacheEntry is the smallest unit that can be read and parsed from the cache (on disk) at a time +type CacheEntry []PersistedDirs + +// a DirEntries lists the files and directories contained directly within a specific directory +type DirEntries struct { + Path string + + // elements of DirNames are just the dir names; they don't include any '/' character + DirNames []string + // elements of FileNames are just the file names; they don't include '/' character + FileNames []string +} + +// a WalkFunc is the type that is passed into various Find functions for determining which +// directories the caller wishes be walked. The WalkFunc is expected to decide which +// directories to walk and which files to consider as matches to the original query. +type WalkFunc func(DirEntries) (dirs []string, files []string) + +// a mapNode stores the relevant stats about a directory to be stored in a pathMap +type mapNode struct { + statResponse + FileNames []string +} + +// a pathMap implements the directory tree structure of nodes +type pathMap struct { + mapNode + + path string + + children map[string]*pathMap + + // number of descendent nodes, including self + approximateNumDescendents int +} + +func newPathMap(path string) *pathMap { + result := &pathMap{path: path, children: make(map[string]*pathMap, 4), + approximateNumDescendents: 1} + return result +} + +// GetNode returns the node at <path> +func (m *pathMap) GetNode(path string, createIfNotFound bool) *pathMap { + if len(path) > 0 && path[0] == '/' { + path = path[1:] + } + + node := m + for { + if path == "" { + return node + } + + index := strings.Index(path, "/") + var firstComponent string + if index >= 0 { + firstComponent = path[:index] + path = path[index+1:] + } else { + firstComponent = path + path = "" + } + + child, found := node.children[firstComponent] + + if !found { + if createIfNotFound { + child = node.newChild(firstComponent) + } else { + return nil + } + } + + node = child + } +} + +func (m *pathMap) newChild(name string) (child *pathMap) { + path := joinCleanPaths(m.path, name) + newChild := newPathMap(path) + m.children[name] = newChild + + return m.children[name] +} + +func (m *pathMap) UpdateNumDescendents() int { + count := 1 + for _, child := range m.children { + count += child.approximateNumDescendents + } + m.approximateNumDescendents = count + return count +} + +func (m *pathMap) UpdateNumDescendentsRecursive() { + for _, child := range m.children { + child.UpdateNumDescendentsRecursive() + } + m.UpdateNumDescendents() +} + +func (m *pathMap) MergeIn(other *pathMap) { + for key, theirs := range other.children { + ours, found := m.children[key] + if found { + ours.MergeIn(theirs) + } else { + m.children[key] = theirs + } + } + if other.ModTime != 0 { + m.mapNode = other.mapNode + } + m.UpdateNumDescendents() +} + +func (m *pathMap) DumpAll() []dirFullInfo { + results := []dirFullInfo{} + m.dumpInto("", &results) + return results +} + +func (m *pathMap) dumpInto(path string, results *[]dirFullInfo) { + *results = append(*results, + dirFullInfo{ + pathAndStats{statResponse: m.statResponse, Path: path}, + m.FileNames}, + ) + for key, child := range m.children { + childPath := joinCleanPaths(path, key) + if len(childPath) == 0 || childPath[0] != '/' { + childPath = "/" + childPath + } + child.dumpInto(childPath, results) + } +} + +// a semaphore can be locked by up to <capacity> callers at once +type semaphore struct { + pool chan bool +} + +func newSemaphore(capacity int) *semaphore { + return &semaphore{pool: make(chan bool, capacity)} +} + +func (l *semaphore) Lock() { + l.pool <- true +} + +func (l *semaphore) Unlock() { + <-l.pool +} + +// A threadPool runs goroutines and supports throttling and waiting. +// Without throttling, Go may exhaust the maximum number of various resources, such as +// threads or file descriptors, and crash the program. +type threadPool struct { + receivedRequests sync.WaitGroup + activeRequests semaphore +} + +func newThreadPool(maxNumConcurrentThreads int) *threadPool { + return &threadPool{ + receivedRequests: sync.WaitGroup{}, + activeRequests: *newSemaphore(maxNumConcurrentThreads), + } +} + +// Run requests to run the given function in its own goroutine +func (p *threadPool) Run(function func()) { + p.receivedRequests.Add(1) + // If Run() was called from within a goroutine spawned by this threadPool, + // then we may need to return from Run() before having capacity to actually + // run <function>. + // + // It's possible that the body of <function> contains a statement (such as a syscall) + // that will cause Go to pin it to a thread, or will contain a statement that uses + // another resource that is in short supply (such as a file descriptor), so we can't + // actually run <function> until we have capacity. + // + // However, the semaphore used for synchronization is implemented via a channel and + // shouldn't require a new thread for each access. + go func() { + p.activeRequests.Lock() + function() + p.activeRequests.Unlock() + p.receivedRequests.Done() + }() +} + +// Wait waits until all goroutines are done, just like sync.WaitGroup's Wait +func (p *threadPool) Wait() { + p.receivedRequests.Wait() +} + +func (f *Finder) serializeCacheEntry(dirInfos []dirFullInfo) ([]byte, error) { + // group each dirFullInfo by its Device, to avoid having to repeat it in the output + dirsByDevice := map[uint64][]PersistedDirInfo{} + for _, entry := range dirInfos { + _, found := dirsByDevice[entry.Device] + if !found { + dirsByDevice[entry.Device] = []PersistedDirInfo{} + } + dirsByDevice[entry.Device] = append(dirsByDevice[entry.Device], + PersistedDirInfo{P: entry.Path, T: entry.ModTime, I: entry.Inode, F: entry.FileNames}) + } + + cacheEntry := CacheEntry{} + + for device, infos := range dirsByDevice { + // find common prefix + prefix := "" + if len(infos) > 0 { + prefix = infos[0].P + } + for _, info := range infos { + for !strings.HasPrefix(info.P+"/", prefix+"/") { + prefix = filepath.Dir(prefix) + } + } + // remove common prefix + for i := range infos { + suffix := strings.Replace(infos[i].P, prefix, "", 1) + if len(suffix) > 0 && suffix[0] == '/' { + suffix = suffix[1:] + } + infos[i].P = suffix + } + + // turn the map (keyed by device) into a list of structs with labeled fields + // this is to improve readability of the output + cacheEntry = append(cacheEntry, PersistedDirs{Device: device, Root: prefix, Dirs: infos}) + } + + // convert to json. + // it would save some space to use a different format than json for the db file, + // but the space and time savings are small, and json is easy for humans to read + bytes, err := json.Marshal(cacheEntry) + return bytes, err +} + +func (f *Finder) parseCacheEntry(bytes []byte) ([]dirFullInfo, error) { + var cacheEntry CacheEntry + err := json.Unmarshal(bytes, &cacheEntry) + if err != nil { + return nil, err + } + + // convert from a CacheEntry to a []dirFullInfo (by copying a few fields) + capacity := 0 + for _, element := range cacheEntry { + capacity += len(element.Dirs) + } + nodes := make([]dirFullInfo, capacity) + count := 0 + for _, element := range cacheEntry { + for _, dir := range element.Dirs { + path := joinCleanPaths(element.Root, dir.P) + + nodes[count] = dirFullInfo{ + pathAndStats: pathAndStats{ + statResponse: statResponse{ + ModTime: dir.T, Inode: dir.I, Device: element.Device, + }, + Path: path}, + FileNames: dir.F} + count++ + } + } + return nodes, nil +} + +// We use the following separator byte to distinguish individually parseable blocks of json +// because we know this separator won't appear in the json that we're parsing. +// +// The newline byte can only appear in a UTF-8 stream if the newline character appears, because: +// - The newline character is encoded as "0000 1010" in binary ("0a" in hex) +// - UTF-8 dictates that bytes beginning with a "0" bit are never emitted as part of a multibyte +// character. +// +// We know that the newline character will never appear in our json string, because: +// - If a newline character appears as part of a data string, then json encoding will +// emit two characters instead: '\' and 'n'. +// - The json encoder that we use doesn't emit the optional newlines between any of its +// other outputs. +const lineSeparator = byte('\n') + +func (f *Finder) readLine(reader *bufio.Reader) ([]byte, error) { + return reader.ReadBytes(lineSeparator) +} + +// validateCacheHeader reads the cache header from cacheReader and tells whether the cache is compatible with this Finder +func (f *Finder) validateCacheHeader(cacheReader *bufio.Reader) bool { + cacheVersionBytes, err := f.readLine(cacheReader) + if err != nil { + f.verbosef("Failed to read database header; database is invalid\n") + return false + } + if len(cacheVersionBytes) > 0 && cacheVersionBytes[len(cacheVersionBytes)-1] == lineSeparator { + cacheVersionBytes = cacheVersionBytes[:len(cacheVersionBytes)-1] + } + cacheVersionString := string(cacheVersionBytes) + currentVersion := f.cacheMetadata.Version + if cacheVersionString != currentVersion { + f.verbosef("Version changed from %q to %q, database is not applicable\n", cacheVersionString, currentVersion) + return false + } + + cacheParamBytes, err := f.readLine(cacheReader) + if err != nil { + f.verbosef("Failed to read database search params; database is invalid\n") + return false + } + + if len(cacheParamBytes) > 0 && cacheParamBytes[len(cacheParamBytes)-1] == lineSeparator { + cacheParamBytes = cacheParamBytes[:len(cacheParamBytes)-1] + } + + currentParamBytes, err := f.cacheMetadata.Config.Dump() + if err != nil { + panic("Finder failed to serialize its parameters") + } + cacheParamString := string(cacheParamBytes) + currentParamString := string(currentParamBytes) + if cacheParamString != currentParamString { + f.verbosef("Params changed from %q to %q, database is not applicable\n", cacheParamString, currentParamString) + return false + } + return true +} + +// loadBytes compares the cache info in <data> to the state of the filesystem +// loadBytes returns a map representing <data> and also a slice of dirs that need to be re-walked +func (f *Finder) loadBytes(id int, data []byte) (m *pathMap, dirsToWalk []string, err error) { + + helperStartTime := time.Now() + + cachedNodes, err := f.parseCacheEntry(data) + if err != nil { + return nil, nil, fmt.Errorf("Failed to parse block %v: %v\n", id, err.Error()) + } + + unmarshalDate := time.Now() + f.verbosef("Unmarshaled %v objects for %v in %v\n", + len(cachedNodes), id, unmarshalDate.Sub(helperStartTime)) + + tempMap := newPathMap("/") + stats := make([]statResponse, len(cachedNodes)) + + for i, node := range cachedNodes { + // check the file system for an updated timestamp + stats[i] = f.statDirSync(node.Path) + } + + dirsToWalk = []string{} + for i, cachedNode := range cachedNodes { + updated := stats[i] + // save the cached value + container := tempMap.GetNode(cachedNode.Path, true) + container.mapNode = mapNode{statResponse: updated} + + // if the metadata changed and the directory still exists, then + // make a note to walk it later + if !f.isInfoUpToDate(cachedNode.statResponse, updated) && updated.ModTime != 0 { + f.setModified() + // make a note that the directory needs to be walked + dirsToWalk = append(dirsToWalk, cachedNode.Path) + } else { + container.mapNode.FileNames = cachedNode.FileNames + } + } + // count the number of nodes to improve our understanding of the shape of the tree, + // thereby improving parallelism of subsequent searches + tempMap.UpdateNumDescendentsRecursive() + + f.verbosef("Statted inodes of block %v in %v\n", id, time.Now().Sub(unmarshalDate)) + return tempMap, dirsToWalk, nil +} + +// startFromExternalCache loads the cache database from disk +// startFromExternalCache waits to return until the load of the cache db is complete, but +// startFromExternalCache does not wait for all every listDir() or statDir() request to complete +func (f *Finder) startFromExternalCache() (err error) { + startTime := time.Now() + dbPath := f.DbPath + + // open cache file and validate its header + reader, err := f.filesystem.Open(dbPath) + if err != nil { + return errors.New("No data to load from database\n") + } + bufferedReader := bufio.NewReader(reader) + if !f.validateCacheHeader(bufferedReader) { + return errors.New("Cache header does not match") + } + f.verbosef("Database header matches, will attempt to use database %v\n", f.DbPath) + + // read the file and spawn threads to process it + nodesToWalk := [][]*pathMap{} + mainTree := newPathMap("/") + + // read the blocks and stream them into <blockChannel> + type dataBlock struct { + id int + err error + data []byte + } + blockChannel := make(chan dataBlock, f.numDbLoadingThreads) + readBlocks := func() { + index := 0 + for { + // It takes some time to unmarshal the input from json, so we want + // to unmarshal it in parallel. In order to find valid places to + // break the input, we scan for the line separators that we inserted + // (for this purpose) when we dumped the database. + data, err := f.readLine(bufferedReader) + var response dataBlock + done := false + if err != nil && err != io.EOF { + response = dataBlock{id: index, err: err, data: nil} + done = true + } else { + done = (err == io.EOF) + response = dataBlock{id: index, err: nil, data: data} + } + blockChannel <- response + index++ + duration := time.Since(startTime) + f.verbosef("Read block %v after %v\n", index, duration) + if done { + f.verbosef("Read %v blocks in %v\n", index, duration) + close(blockChannel) + return + } + } + } + go readBlocks() + + // Read from <blockChannel> and stream the responses into <resultChannel>. + type workResponse struct { + id int + err error + tree *pathMap + updatedDirs []string + } + resultChannel := make(chan workResponse) + processBlocks := func() { + numProcessed := 0 + threadPool := newThreadPool(f.numDbLoadingThreads) + for { + // get a block to process + block, received := <-blockChannel + if !received { + break + } + + if block.err != nil { + resultChannel <- workResponse{err: block.err} + break + } + numProcessed++ + // wait until there is CPU available to process it + threadPool.Run( + func() { + processStartTime := time.Now() + f.verbosef("Starting to process block %v after %v\n", + block.id, processStartTime.Sub(startTime)) + tempMap, updatedDirs, err := f.loadBytes(block.id, block.data) + var response workResponse + if err != nil { + f.verbosef( + "Block %v failed to parse with error %v\n", + block.id, err) + response = workResponse{err: err} + } else { + response = workResponse{ + id: block.id, + err: nil, + tree: tempMap, + updatedDirs: updatedDirs, + } + } + f.verbosef("Processed block %v in %v\n", + block.id, time.Since(processStartTime), + ) + resultChannel <- response + }, + ) + } + threadPool.Wait() + f.verbosef("Finished processing %v blocks in %v\n", + numProcessed, time.Since(startTime)) + close(resultChannel) + } + go processBlocks() + + // Read from <resultChannel> and use the results + combineResults := func() (err error) { + for { + result, received := <-resultChannel + if !received { + break + } + if err != nil { + // In case of an error, wait for work to complete before + // returning the error. This ensures that any subsequent + // work doesn't need to compete for resources (and possibly + // fail due to, for example, a filesystem limit on the number of + // concurrently open files) with past work. + continue + } + if result.err != nil { + err = result.err + continue + } + // update main tree + mainTree.MergeIn(result.tree) + // record any new directories that we will need to Stat() + updatedNodes := make([]*pathMap, len(result.updatedDirs)) + for j, dir := range result.updatedDirs { + node := mainTree.GetNode(dir, false) + updatedNodes[j] = node + } + nodesToWalk = append(nodesToWalk, updatedNodes) + } + return err + } + err = combineResults() + if err != nil { + return err + } + + f.nodes = *mainTree + + // after having loaded the entire db and therefore created entries for + // the directories we know of, now it's safe to start calling ReadDir on + // any updated directories + for i := range nodesToWalk { + f.listDirsAsync(nodesToWalk[i]) + } + f.verbosef("Loaded db and statted its contents in %v\n", time.Since(startTime)) + return err +} + +// startWithoutExternalCache starts scanning the filesystem according to the cache config +// startWithoutExternalCache should be called if startFromExternalCache is not applicable +func (f *Finder) startWithoutExternalCache() { + configDirs := f.cacheMetadata.Config.RootDirs + + // clean paths + candidates := make([]string, len(configDirs)) + for i, dir := range configDirs { + candidates[i] = filepath.Clean(dir) + } + // remove duplicates + dirsToScan := make([]string, 0, len(configDirs)) + for _, candidate := range candidates { + include := true + for _, included := range dirsToScan { + if included == "/" || strings.HasPrefix(candidate+"/", included+"/") { + include = false + break + } + } + if include { + dirsToScan = append(dirsToScan, candidate) + } + } + + // start searching finally + for _, path := range dirsToScan { + f.verbosef("Starting find of %v\n", path) + f.startFind(path) + } +} + +// isInfoUpToDate tells whether <new> can confirm that results computed at <old> are still valid +func (f *Finder) isInfoUpToDate(old statResponse, new statResponse) (equal bool) { + if old.Inode != new.Inode { + return false + } + if old.ModTime != new.ModTime { + return false + } + if old.Device != new.Device { + return false + } + return true +} + +func (f *Finder) wasModified() bool { + return atomic.LoadInt32(&f.modifiedFlag) > 0 +} + +func (f *Finder) setModified() { + var newVal int32 + newVal = 1 + atomic.StoreInt32(&f.modifiedFlag, newVal) +} + +// sortedDirEntries exports directory entries to facilitate dumping them to the external cache +func (f *Finder) sortedDirEntries() []dirFullInfo { + startTime := time.Now() + nodes := make([]dirFullInfo, 0) + for _, node := range f.nodes.DumpAll() { + if node.ModTime != 0 { + nodes = append(nodes, node) + } + } + discoveryDate := time.Now() + f.verbosef("Generated %v cache entries in %v\n", len(nodes), discoveryDate.Sub(startTime)) + less := func(i int, j int) bool { + return nodes[i].Path < nodes[j].Path + } + sort.Slice(nodes, less) + sortDate := time.Now() + f.verbosef("Sorted %v cache entries in %v\n", len(nodes), sortDate.Sub(discoveryDate)) + + return nodes +} + +// serializeDb converts the cache database into a form to save to disk +func (f *Finder) serializeDb() ([]byte, error) { + // sort dir entries + var entryList = f.sortedDirEntries() + + // Generate an output file that can be conveniently loaded using the same number of threads + // as were used in this execution (because presumably that will be the number of threads + // used in the next execution too) + + // generate header + header := []byte{} + header = append(header, []byte(f.cacheMetadata.Version)...) + header = append(header, lineSeparator) + configDump, err := f.cacheMetadata.Config.Dump() + if err != nil { + return nil, err + } + header = append(header, configDump...) + + // serialize individual blocks in parallel + numBlocks := f.numDbLoadingThreads + if numBlocks > len(entryList) { + numBlocks = len(entryList) + } + blocks := make([][]byte, 1+numBlocks) + blocks[0] = header + blockMin := 0 + wg := sync.WaitGroup{} + var errLock sync.Mutex + + for i := 1; i <= numBlocks; i++ { + // identify next block + blockMax := len(entryList) * i / numBlocks + block := entryList[blockMin:blockMax] + + // process block + wg.Add(1) + go func(index int, block []dirFullInfo) { + byteBlock, subErr := f.serializeCacheEntry(block) + f.verbosef("Serialized block %v into %v bytes\n", index, len(byteBlock)) + if subErr != nil { + f.verbosef("%v\n", subErr.Error()) + errLock.Lock() + err = subErr + errLock.Unlock() + } else { + blocks[index] = byteBlock + } + wg.Done() + }(i, block) + + blockMin = blockMax + } + + wg.Wait() + + if err != nil { + return nil, err + } + + content := bytes.Join(blocks, []byte{lineSeparator}) + + return content, nil +} + +// dumpDb saves the cache database to disk +func (f *Finder) dumpDb() error { + startTime := time.Now() + f.verbosef("Dumping db\n") + + tempPath := f.DbPath + ".tmp" + + bytes, err := f.serializeDb() + if err != nil { + return err + } + serializeDate := time.Now() + f.verbosef("Serialized db in %v\n", serializeDate.Sub(startTime)) + // dump file and atomically move + err = f.filesystem.WriteFile(tempPath, bytes, 0777) + if err != nil { + return err + } + err = f.filesystem.Rename(tempPath, f.DbPath) + if err != nil { + return err + } + + f.verbosef("Wrote db in %v\n", time.Now().Sub(serializeDate)) + return nil +} + +func (f *Finder) statDirAsync(dir *pathMap) { + node := dir + path := dir.path + f.threadPool.Run( + func() { + updatedStats := f.statDirSync(path) + + if !f.isInfoUpToDate(node.statResponse, updatedStats) { + node.mapNode = mapNode{ + statResponse: updatedStats, + FileNames: []string{}, + } + f.setModified() + if node.statResponse.ModTime != 0 { + // modification time was updated, so re-scan for + // child directories + f.listDirAsync(dir) + } + } + }, + ) +} + +func (f *Finder) statDirSync(path string) statResponse { + + fileInfo, err := f.filesystem.Lstat(path) + + var stats statResponse + if err != nil { + // in case of a failure to stat the directory, treat the directory as missing (modTime = 0) + return stats + } + modTime := fileInfo.ModTime() + stats = statResponse{} + inode, err := f.filesystem.InodeNumber(fileInfo) + if err != nil { + panic(fmt.Sprintf("Could not get inode number of %v: %v\n", path, err.Error())) + } + stats.Inode = inode + device, err := f.filesystem.DeviceNumber(fileInfo) + if err != nil { + panic(fmt.Sprintf("Could not get device number of %v: %v\n", path, err.Error())) + } + stats.Device = device + permissionsChangeTime, err := f.filesystem.PermTime(fileInfo) + + if err != nil { + panic(fmt.Sprintf("Could not get permissions modification time (CTime) of %v: %v\n", path, err.Error())) + } + // We're only interested in knowing whether anything about the directory + // has changed since last check, so we use the latest of the two + // modification times (content modification (mtime) and + // permission modification (ctime)) + if permissionsChangeTime.After(modTime) { + modTime = permissionsChangeTime + } + stats.ModTime = modTime.UnixNano() + + return stats +} + +// pruneCacheCandidates removes the items that we don't want to include in our persistent cache +func (f *Finder) pruneCacheCandidates(items *DirEntries) { + + for _, fileName := range items.FileNames { + for _, abortedName := range f.cacheMetadata.Config.PruneFiles { + if fileName == abortedName { + items.FileNames = []string{} + items.DirNames = []string{} + return + } + } + } + + // remove any files that aren't the ones we want to include + writeIndex := 0 + for _, fileName := range items.FileNames { + // include only these files + for _, includedName := range f.cacheMetadata.Config.IncludeFiles { + if fileName == includedName { + items.FileNames[writeIndex] = fileName + writeIndex++ + break + } + } + } + // resize + items.FileNames = items.FileNames[:writeIndex] + + writeIndex = 0 + for _, dirName := range items.DirNames { + items.DirNames[writeIndex] = dirName + // ignore other dirs that are known to not be inputs to the build process + include := true + for _, excludedName := range f.cacheMetadata.Config.ExcludeDirs { + if dirName == excludedName { + // don't include + include = false + break + } + } + if include { + writeIndex++ + } + } + // resize + items.DirNames = items.DirNames[:writeIndex] +} + +func (f *Finder) listDirsAsync(nodes []*pathMap) { + f.threadPool.Run( + func() { + for i := range nodes { + f.listDirSync(nodes[i]) + } + }, + ) +} + +func (f *Finder) listDirAsync(node *pathMap) { + f.threadPool.Run( + func() { + f.listDirSync(node) + }, + ) +} + +func (f *Finder) listDirSync(dir *pathMap) { + path := dir.path + children, err := f.filesystem.ReadDir(path) + + if err != nil { + // if listing the contents of the directory fails (presumably due to + // permission denied), then treat the directory as empty + children = []os.FileInfo{} + } + + var subdirs []string + var subfiles []string + + for _, child := range children { + linkBits := child.Mode() & os.ModeSymlink + isLink := linkBits != 0 + if child.IsDir() { + if !isLink { + // Skip symlink dirs. + // We don't have to support symlink dirs because + // that would cause duplicates. + subdirs = append(subdirs, child.Name()) + } + } else { + // We do have to support symlink files because the link name might be + // different than the target name + // (for example, Android.bp -> build/soong/root.bp) + subfiles = append(subfiles, child.Name()) + } + + } + parentNode := dir + + entry := &DirEntries{Path: path, DirNames: subdirs, FileNames: subfiles} + f.pruneCacheCandidates(entry) + + // create a pathMap node for each relevant subdirectory + relevantChildren := map[string]*pathMap{} + for _, subdirName := range entry.DirNames { + childNode, found := parentNode.children[subdirName] + // if we already knew of this directory, then we already have a request pending to Stat it + // if we didn't already know of this directory, then we must Stat it now + if !found { + childNode = parentNode.newChild(subdirName) + f.statDirAsync(childNode) + } + relevantChildren[subdirName] = childNode + } + // Note that in rare cases, it's possible that we're reducing the set of + // children via this statement, if these are all true: + // 1. we previously had a cache that knew about subdirectories of parentNode + // 2. the user created a prune-file (described in pruneCacheCandidates) + // inside <parentNode>, which specifies that the contents of parentNode + // are to be ignored. + // The fact that it's possible to remove children here means that *pathMap structs + // must not be looked up from f.nodes by filepath (and instead must be accessed by + // direct pointer) until after every listDirSync completes + parentNode.FileNames = entry.FileNames + parentNode.children = relevantChildren + +} + +// listMatches takes a node and a function that specifies which subdirectories and +// files to include, and listMatches returns the matches +func (f *Finder) listMatches(node *pathMap, + filter WalkFunc) (subDirs []*pathMap, filePaths []string) { + entries := DirEntries{ + FileNames: node.FileNames, + } + entries.DirNames = make([]string, 0, len(node.children)) + for childName := range node.children { + entries.DirNames = append(entries.DirNames, childName) + } + + dirNames, fileNames := filter(entries) + + subDirs = []*pathMap{} + filePaths = make([]string, 0, len(fileNames)) + for _, fileName := range fileNames { + filePaths = append(filePaths, joinCleanPaths(node.path, fileName)) + } + subDirs = make([]*pathMap, 0, len(dirNames)) + for _, childName := range dirNames { + child, ok := node.children[childName] + if ok { + subDirs = append(subDirs, child) + } + } + + return subDirs, filePaths +} + +// findInCacheMultithreaded spawns potentially multiple goroutines with which to search the cache. +func (f *Finder) findInCacheMultithreaded(node *pathMap, filter WalkFunc, + approxNumThreads int) []string { + + if approxNumThreads < 2 { + // Done spawning threads; process remaining directories + return f.findInCacheSinglethreaded(node, filter) + } + + totalWork := 0 + for _, child := range node.children { + totalWork += child.approximateNumDescendents + } + childrenResults := make(chan []string, len(node.children)) + + subDirs, filePaths := f.listMatches(node, filter) + + // process child directories + for _, child := range subDirs { + numChildThreads := approxNumThreads * child.approximateNumDescendents / totalWork + childProcessor := func(child *pathMap) { + childResults := f.findInCacheMultithreaded(child, filter, numChildThreads) + childrenResults <- childResults + } + // If we're allowed to use more than 1 thread to process this directory, + // then instead we use 1 thread for each subdirectory. + // It would be strange to spawn threads for only some subdirectories. + go childProcessor(child) + } + + // collect results + for i := 0; i < len(subDirs); i++ { + childResults := <-childrenResults + filePaths = append(filePaths, childResults...) + } + close(childrenResults) + + return filePaths +} + +// findInCacheSinglethreaded synchronously searches the cache for all matching file paths +// note findInCacheSinglethreaded runs 2X to 4X as fast by being iterative rather than recursive +func (f *Finder) findInCacheSinglethreaded(node *pathMap, filter WalkFunc) []string { + if node == nil { + return []string{} + } + + nodes := []*pathMap{node} + matches := []string{} + + for len(nodes) > 0 { + currentNode := nodes[0] + nodes = nodes[1:] + + subDirs, filePaths := f.listMatches(currentNode, filter) + + nodes = append(nodes, subDirs...) + + matches = append(matches, filePaths...) + } + return matches +} |