diff options
Diffstat (limited to 'src/prefetcher/prefetcher_daemon.cc')
-rw-r--r-- | src/prefetcher/prefetcher_daemon.cc | 1367 |
1 files changed, 1367 insertions, 0 deletions
diff --git a/src/prefetcher/prefetcher_daemon.cc b/src/prefetcher/prefetcher_daemon.cc new file mode 100644 index 0000000..f4b9087 --- /dev/null +++ b/src/prefetcher/prefetcher_daemon.cc @@ -0,0 +1,1367 @@ +// Copyright (C) 2019 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "prefetcher/minijail.h" +#include "common/cmd_utils.h" +#include "prefetcher/prefetcher_daemon.h" +#include "prefetcher/session_manager.h" +#include "prefetcher/session.h" + +#include <android-base/logging.h> +#include <android-base/properties.h> + +#include <deque> +#include <iomanip> +#include <string> +#include <sstream> +#include <vector> + +#include <fcntl.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <sys/wait.h> +#include <sys/un.h> +#include <unistd.h> + +namespace iorap::prefetcher { + +// Gate super-spammy IPC logging behind a property. +// This is beyond merely annoying, enabling this logging causes prefetching to be about 1000x slower. +static bool LogVerboseIpc() { + static bool initialized = false; + static bool verbose_ipc; + + if (initialized == false) { + initialized = true; + + verbose_ipc = + ::android::base::GetBoolProperty("iorapd.readahead.verbose_ipc", /*default*/false); + } + + return verbose_ipc; +} + +static const bool kInstallMiniJail = + ::android::base::GetBoolProperty("iorapd.readahead.minijail", /*default*/true); + +static constexpr const char kCommandFileName[] = "/system/bin/iorap.prefetcherd"; + +static constexpr size_t kPipeBufferSize = 1024 * 1024; // matches /proc/sys/fs/pipe-max-size + +using ArgString = const char*; + +std::ostream& operator<<(std::ostream& os, ReadAheadKind ps) { + switch (ps) { + case ReadAheadKind::kFadvise: + os << "fadvise"; + break; + case ReadAheadKind::kMmapLocked: + os << "mmap"; + break; + case ReadAheadKind::kMlock: + os << "mlock"; + break; + default: + os << "<invalid>"; + } + return os; +} + +std::ostream& operator<<(std::ostream& os, CommandChoice choice) { + switch (choice) { + case CommandChoice::kRegisterFilePath: + os << "kRegisterFilePath"; + break; + case CommandChoice::kUnregisterFilePath: + os << "kUnregisterFilePath"; + break; + case CommandChoice::kReadAhead: + os << "kReadAhead"; + break; + case CommandChoice::kExit: + os << "kExit"; + break; + case CommandChoice::kCreateSession: + os << "kCreateSession"; + break; + case CommandChoice::kDestroySession: + os << "kDestroySession"; + break; + case CommandChoice::kDumpSession: + os << "kDumpSession"; + break; + case CommandChoice::kDumpEverything: + os << "kDumpEverything"; + break; + case CommandChoice::kCreateFdSession: + os << "kCreateFdSession"; + break; + default: + CHECK(false) << "forgot to handle this choice"; + break; + } + return os; +} + +std::ostream& operator<<(std::ostream& os, const Command& command) { + os << "Command{"; + os << "choice=" << command.choice << ","; + + bool has_session_id = true; + bool has_id = true; + switch (command.choice) { + case CommandChoice::kDumpEverything: + case CommandChoice::kExit: + has_session_id = false; + FALLTHROUGH_INTENDED; + case CommandChoice::kCreateFdSession: + case CommandChoice::kCreateSession: + case CommandChoice::kDestroySession: + case CommandChoice::kDumpSession: + has_id = false; + break; + default: + break; + } + + if (has_session_id) { + os << "sid=" << command.session_id << ","; + } + + if (has_id) { + os << "id=" << command.id << ","; + } + + switch (command.choice) { + case CommandChoice::kRegisterFilePath: + os << "file_path="; + + if (command.file_path) { + os << *(command.file_path); + } else { + os << "(nullopt)"; + } + break; + case CommandChoice::kUnregisterFilePath: + break; + case CommandChoice::kReadAhead: + os << "read_ahead_kind=" << command.read_ahead_kind << ","; + os << "length=" << command.length << ","; + os << "offset=" << command.offset << ","; + break; + case CommandChoice::kExit: + break; + case CommandChoice::kCreateFdSession: + os << "fd="; + if (command.fd.has_value()) { + os << command.fd.value(); + } else { + os << "(nullopt)"; + } + os << ","; + FALLTHROUGH_INTENDED; + case CommandChoice::kCreateSession: + os << "description="; + if (command.file_path) { + os << "'" << *(command.file_path) << "'"; + } else { + os << "(nullopt)"; + } + break; + case CommandChoice::kDestroySession: + break; + case CommandChoice::kDumpSession: + break; + case CommandChoice::kDumpEverything: + break; + default: + CHECK(false) << "forgot to handle this choice"; + break; + } + + os << "}"; + + return os; +} + +template <typename T> +struct ParseResult { + T value; + char* next_token; + size_t stream_size; + + ParseResult() : value{}, next_token{nullptr}, stream_size{} { + } + + constexpr operator bool() const { + return next_token != nullptr; + } +}; + +// Very spammy: Keep it off by default. Set to true if changing this code. +static constexpr bool kDebugParsingRead = false; + +#define DEBUG_PREAD if (kDebugParsingRead) LOG(VERBOSE) << "ParsingRead " + + + +// Parse a strong type T from a buffer stream. +// If there's insufficient space left to parse the value, an empty ParseResult is returned. +template <typename T> +ParseResult<T> ParsingRead(char* stream, size_t stream_size) { + if (stream == nullptr) { + DEBUG_PREAD << "stream was null"; + return {}; + } + + if constexpr (std::is_same_v<T, std::string>) { + ParseResult<uint32_t> length = ParsingRead<uint32_t>(stream, stream_size); + + if (!length) { + DEBUG_PREAD << "could not find length"; + // Not enough bytes left? + return {}; + } + + ParseResult<std::string> string_result; + string_result.value.reserve(length); + + stream = length.next_token; + stream_size = length.stream_size; + + for (size_t i = 0; i < length.value; ++i) { + ParseResult<char> char_result = ParsingRead<char>(stream, stream_size); + + stream = char_result.next_token; + stream_size = char_result.stream_size; + + if (!char_result) { + DEBUG_PREAD << "too few chars in stream, expected length: " << length.value; + // Not enough bytes left? + return {}; + } + + string_result.value += char_result.value; + + DEBUG_PREAD << "string preliminary is : " << string_result.value; + } + + DEBUG_PREAD << "parsed string to: " << string_result.value; + string_result.next_token = stream; + return string_result; + } else { + if (sizeof(T) > stream_size) { + return {}; + } + + ParseResult<T> result; + result.next_token = stream + sizeof(T); + result.stream_size = stream_size - sizeof(T); + + memcpy(&result.value, stream, sizeof(T)); + + return result; + } +} + +// Convenience overload to chain multiple ParsingRead together. +template <typename T, typename U> +ParseResult<T> ParsingRead(ParseResult<U> result) { + return ParsingRead<T>(result.next_token, result.stream_size); +} + +class CommandParser { + public: + CommandParser(PrefetcherForkParameters params) { + params_ = params; + } + + std::vector<Command> ParseSocketCommands(bool& eof) { + eof = false; + + std::vector<Command> commands_vec; + + std::vector<char> buf_vector; + buf_vector.resize(1024*1024); // 1MB. + char* buf = &buf_vector[0]; + + // Binary only parsing. The higher level code can parse text + // with ifstream if it really wants to. + char* stream = &buf[0]; + size_t stream_size = buf_vector.size(); + + while (true) { + if (stream_size == 0) { + // TODO: reply with an overflow command. + LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; + stream = &buf[0]; + stream_size = buf_vector.size(); + memset(&buf[0], /*c*/0, buf_vector.size()); + } + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon block recvmsg for commands (fd=" << params_.input_fd << ")"; + } + + ssize_t count; + struct msghdr hdr; + memset(&hdr, 0, sizeof(hdr)); + + { + union { + struct cmsghdr cmh; + char control[CMSG_SPACE(sizeof(int))]; + } control_un; + memset(&control_un, 0, sizeof(control_un)); + + /* Set 'control_un' to describe ancillary data that we want to receive */ + control_un.cmh.cmsg_len = CMSG_LEN(sizeof(int)); /* fd is sizeof(int) */ + control_un.cmh.cmsg_level = SOL_SOCKET; + control_un.cmh.cmsg_type = SCM_CREDENTIALS; + + // the regular message data will be read into stream + struct iovec iov; + memset(&iov, 0, sizeof(iov)); + iov.iov_base = stream; + iov.iov_len = stream_size; + + /* Set hdr fields to describe 'control_un' */ + hdr.msg_control = control_un.control; + hdr.msg_controllen = sizeof(control_un.control); + hdr.msg_iov = &iov; + hdr.msg_iovlen = 1; + hdr.msg_name = nullptr; /* no peer address */ + hdr.msg_namelen = 0; + + count = TEMP_FAILURE_RETRY(recvmsg(params_.input_fd, &hdr, /*flags*/0)); + } + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon recvmsg " << count << " for stream size:" << stream_size; + } + + if (count < 0) { + PLOG(ERROR) << "failed to recvmsg from input fd"; + break; + // TODO: let the daemon be restarted by higher level code? + } else if (count == 0) { + LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; + eof = true; + break; + // TODO: let the daemon be restarted by higher level code? + } + + { + /* Extract fd from ancillary data if present */ + struct cmsghdr* hp; + hp = CMSG_FIRSTHDR(&hdr); + if (hp && + // FIXME: hp->cmsg_len returns an absurdly large value. is it overflowing? + // (hp->cmsg_len == CMSG_LEN(sizeof(int))) && + (hp->cmsg_level == SOL_SOCKET) && + (hp->cmsg_type == SCM_RIGHTS)) { + + int passed_fd = *(int*) CMSG_DATA(hp); + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon received FD " << passed_fd; + } + + // tack the FD into our dequeue. + // we assume the FDs are sent in-order same as the regular iov are sent in-order. + longbuf_fds_.insert(longbuf_fds_.end(), passed_fd); + } else if (hp != nullptr) { + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon::read got CMSG but it wasn't matching SCM_RIGHTS," + << "cmsg_len=" << hp->cmsg_len << "," + << "cmsg_level=" << hp->cmsg_level << "," + << "cmsg_type=" << hp->cmsg_type; + } + } + } + + longbuf_.insert(longbuf_.end(), stream, stream + count); + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); + } + + // reconstruct a stream of [iov_Command chdr_fd?]* back into [Command]* + { + if (longbuf_.size() == 0) { + break; + } + + std::vector<char> v(longbuf_.begin(), + longbuf_.end()); + + std::vector<int> v_fds{longbuf_fds_.begin(), longbuf_fds_.end()}; + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); + if (WOULD_LOG(VERBOSE)) { + std::stringstream dump; + dump << std::hex << std::setfill('0'); + for (size_t i = 0; i < v.size(); ++i) { + dump << std::setw(2) << static_cast<unsigned>(v[i]); + } + + LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); + } + LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ size: " << v_fds.size(); + if (WOULD_LOG(VERBOSE)) { + std::stringstream dump; + for (size_t i = 0; i < v_fds.size(); ++i) { + dump << v_fds[i] << ", "; + } + + LOG(VERBOSE) << "PrefetcherDaemon longbuf_fds_ dump: " << dump.str(); + } + + } + + size_t v_fds_off = 0; + size_t consumed_fds_total = 0; + + size_t v_off = 0; + size_t consumed_bytes = std::numeric_limits<size_t>::max(); + size_t consumed_total = 0; + + while (true) { + std::optional<Command> maybe_command; + maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); + consumed_total += consumed_bytes; + // Normal every time we get to the end of a buffer. + if (!maybe_command) { + if (LogVerboseIpc()) { + LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); + } + break; + } + + if (maybe_command->RequiresFd()) { + if (v_fds_off < v_fds.size()) { + maybe_command->fd = v_fds[v_fds_off++]; + consumed_fds_total++; + if (LogVerboseIpc()) { + LOG(VERBOSE) << "Append the FD to " << *maybe_command; + } + } else { + LOG(WARNING) << "Failed to acquire FD for " << *maybe_command; + } + } + + // in the next pass ignore what we already consumed. + v_off += consumed_bytes; + + // true as long we don't hit the 'break' above. + DCHECK_EQ(v_off, consumed_total); + if (LogVerboseIpc()) { + LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() + << "," << *maybe_command; + + // Pretty-print a single command for debugging/testing. + LOG(VERBOSE) << *maybe_command; + } + + // add to the commands we parsed. + commands_vec.push_back(*maybe_command); + } + + // erase however many were consumed + longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); + + // erase however many FDs were consumed. + longbuf_fds_.erase(longbuf_fds_.begin(), longbuf_fds_.begin() + consumed_fds_total); + } + break; + } + + return commands_vec; + } + + std::vector<Command> ParseCommands(bool& eof) { + eof = false; + + std::vector<Command> commands_vec; + + std::vector<char> buf_vector; + buf_vector.resize(kPipeBufferSize); + char* buf = &buf_vector[0]; + + // Binary only parsing. The higher level code can parse text + // with ifstream if it really wants to. + char* stream = &buf[0]; + size_t stream_size = buf_vector.size(); + + while (true) { + if (stream_size == 0) { + // TODO: reply with an overflow command. + LOG(WARNING) << "prefetcher_daemon command overflow, dropping all commands."; + stream = &buf[0]; + stream_size = buf_vector.size(); + memset(&buf[0], /*c*/0, buf_vector.size()); + } + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon block read for commands (fd=" << params_.input_fd << ")"; + } + ssize_t count = TEMP_FAILURE_RETRY(read(params_.input_fd, stream, stream_size)); + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon::read " << count << " for stream size:" << stream_size; + } + + if (count < 0) { + PLOG(ERROR) << "failed to read from input fd"; + break; + // TODO: let the daemon be restarted by higher level code? + } else if (count == 0) { + LOG(WARNING) << "prefetcher_daemon input_fd end-of-file; terminating"; + eof = true; + break; + // TODO: let the daemon be restarted by higher level code? + } + + longbuf_.insert(longbuf_.end(), stream, stream + count); + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon updated longbuf size: " << longbuf_.size(); + } + + std::optional<Command> maybe_command; + { + if (longbuf_.size() == 0) { + break; + } + + std::vector<char> v(longbuf_.begin(), + longbuf_.end()); + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon longbuf_ size: " << v.size(); + if (WOULD_LOG(VERBOSE)) { + std::stringstream dump; + dump << std::hex << std::setfill('0'); + for (size_t i = 0; i < v.size(); ++i) { + dump << std::setw(2) << static_cast<unsigned>(v[i]); + } + + LOG(VERBOSE) << "PrefetcherDaemon longbuf_ dump: " << dump.str(); + } + } + + size_t v_off = 0; + size_t consumed_bytes = std::numeric_limits<size_t>::max(); + size_t consumed_total = 0; + + while (true) { + maybe_command = Command::Read(&v[v_off], v.size() - v_off, &consumed_bytes); + consumed_total += consumed_bytes; + // Normal every time we get to the end of a buffer. + if (!maybe_command) { + if (LogVerboseIpc()) { + LOG(VERBOSE) << "failed to read command, v_off=" << v_off << ",v_size:" << v.size(); + } + break; + } + + // in the next pass ignore what we already consumed. + v_off += consumed_bytes; + + // true as long we don't hit the 'break' above. + DCHECK_EQ(v_off, consumed_total); + if (LogVerboseIpc()) { + LOG(VERBOSE) << "success to read command, v_off=" << v_off << ",v_size:" << v.size() + << "," << *maybe_command; + + // Pretty-print a single command for debugging/testing. + LOG(VERBOSE) << *maybe_command; + } + + // add to the commands we parsed. + commands_vec.push_back(*maybe_command); + } + + // erase however many were consumed + longbuf_.erase(longbuf_.begin(), longbuf_.begin() + consumed_total); + } + break; + } + + return commands_vec; + } + + private: + bool IsTextMode() const { + return params_.format_text; + } + + PrefetcherForkParameters params_; + + // A buffer long enough to contain a lot of buffers. + // This handles reads that only contain a partial command. + std::deque<char> longbuf_; + + // File descriptor buffers. + std::deque<int> longbuf_fds_; +}; + +static constexpr bool kDebugCommandRead = true; + +#define DEBUG_READ if (kDebugCommandRead) LOG(VERBOSE) << "Command::Read " + +std::optional<Command> Command::Read(char* buf, size_t buf_size, /*out*/size_t* consumed_bytes) { + *consumed_bytes = 0; + if (buf == nullptr) { + return std::nullopt; + } + + Command cmd{}; // zero-initialize any unused fields + ParseResult<CommandChoice> parsed_choice = ParsingRead<CommandChoice>(buf, buf_size); + cmd.choice = parsed_choice.value; + + if (!parsed_choice) { + DEBUG_READ << "no choice"; + return std::nullopt; + } + + switch (parsed_choice.value) { + case CommandChoice::kRegisterFilePath: { + ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); + if (!parsed_session_id) { + DEBUG_READ << "no parsed session id"; + return std::nullopt; + } + + ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); + if (!parsed_id) { + DEBUG_READ << "no parsed id"; + return std::nullopt; + } + + ParseResult<std::string> parsed_file_path = ParsingRead<std::string>(parsed_id); + + if (!parsed_file_path) { + DEBUG_READ << "no file path"; + return std::nullopt; + } + *consumed_bytes = parsed_file_path.next_token - buf; + + cmd.session_id = parsed_session_id.value; + cmd.id = parsed_id.value; + cmd.file_path = parsed_file_path.value; + + break; + } + case CommandChoice::kUnregisterFilePath: { + ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); + if (!parsed_session_id) { + DEBUG_READ << "no parsed session id"; + return std::nullopt; + } + + ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); + if (!parsed_id) { + DEBUG_READ << "no parsed id"; + return std::nullopt; + } + *consumed_bytes = parsed_id.next_token - buf; + + cmd.session_id = parsed_session_id.value; + cmd.id = parsed_id.value; + + break; + } + case CommandChoice::kReadAhead: { + ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); + if (!parsed_session_id) { + DEBUG_READ << "no parsed session id"; + return std::nullopt; + } + + ParseResult<uint32_t> parsed_id = ParsingRead<uint32_t>(parsed_session_id); + if (!parsed_id) { + DEBUG_READ << "no parsed id"; + return std::nullopt; + } + + ParseResult<ReadAheadKind> parsed_kind = ParsingRead<ReadAheadKind>(parsed_id); + if (!parsed_kind) { + DEBUG_READ << "no parsed kind"; + return std::nullopt; + } + ParseResult<uint64_t> parsed_length = ParsingRead<uint64_t>(parsed_kind); + if (!parsed_length) { + DEBUG_READ << "no parsed length"; + return std::nullopt; + } + ParseResult<uint64_t> parsed_offset = ParsingRead<uint64_t>(parsed_length); + if (!parsed_offset) { + DEBUG_READ << "no parsed offset"; + return std::nullopt; + } + *consumed_bytes = parsed_offset.next_token - buf; + + cmd.session_id = parsed_session_id.value; + cmd.id = parsed_id.value; + cmd.read_ahead_kind = parsed_kind.value; + cmd.length = parsed_length.value; + cmd.offset = parsed_offset.value; + + break; + } + case CommandChoice::kCreateSession: + case CommandChoice::kCreateFdSession: { + ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); + if (!parsed_session_id) { + DEBUG_READ << "no parsed session id"; + return std::nullopt; + } + + ParseResult<std::string> parsed_description = ParsingRead<std::string>(parsed_session_id); + + if (!parsed_description) { + DEBUG_READ << "no description"; + return std::nullopt; + } + *consumed_bytes = parsed_description.next_token - buf; + + cmd.session_id = parsed_session_id.value; + cmd.file_path = parsed_description.value; + + break; + } + case CommandChoice::kDestroySession: + case CommandChoice::kDumpSession: { + ParseResult<uint32_t> parsed_session_id = ParsingRead<uint32_t>(parsed_choice); + if (!parsed_session_id) { + DEBUG_READ << "no parsed session id"; + return std::nullopt; + } + + *consumed_bytes = parsed_session_id.next_token - buf; + + cmd.session_id = parsed_session_id.value; + + break; + } + case CommandChoice::kExit: + case CommandChoice::kDumpEverything: + *consumed_bytes = parsed_choice.next_token - buf; + // Only need to parse the choice. + break; + default: + LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(parsed_choice.value); + break; + } + + return cmd; +} + +bool Command::Write(char* buf, size_t buf_size, /*out*/size_t* produced_bytes) const { + *produced_bytes = 0; + if (buf == nullptr) { + LOG(WARNING) << "null buf, is this expected?"; + return false; + } + + bool has_enough_space = false; + size_t space_requirement = std::numeric_limits<size_t>::max(); + + space_requirement = sizeof(choice); + + switch (choice) { + case CommandChoice::kRegisterFilePath: + space_requirement += sizeof(session_id); + space_requirement += sizeof(id); + space_requirement += sizeof(uint32_t); // string length + + if (!file_path) { + LOG(WARNING) << "Missing file path for kRegisterFilePath"; + return false; + } + + space_requirement += file_path->size(); // string contents + break; + case CommandChoice::kUnregisterFilePath: + space_requirement += sizeof(session_id); + space_requirement += sizeof(id); + break; + case CommandChoice::kReadAhead: + space_requirement += sizeof(session_id); + space_requirement += sizeof(id); + space_requirement += sizeof(read_ahead_kind); + space_requirement += sizeof(length); + space_requirement += sizeof(offset); + break; + case CommandChoice::kCreateSession: + case CommandChoice::kCreateFdSession: + space_requirement += sizeof(session_id); + space_requirement += sizeof(uint32_t); // string length + + if (!file_path) { + LOG(WARNING) << "Missing file path for kCreateSession"; + return false; + } + + space_requirement += file_path->size(); // string contents + break; + case CommandChoice::kDestroySession: + case CommandChoice::kDumpSession: + space_requirement += sizeof(session_id); + break; + case CommandChoice::kExit: + case CommandChoice::kDumpEverything: + // Only need space for the choice. + break; + default: + LOG(FATAL) << "unrecognized command number " << static_cast<uint32_t>(choice); + break; + } + + if (buf_size < space_requirement) { + return false; + } + + *produced_bytes = space_requirement; + + // Always write out the choice. + size_t buf_offset = 0; + + memcpy(&buf[buf_offset], &choice, sizeof(choice)); + buf_offset += sizeof(choice); + + switch (choice) { + case CommandChoice::kRegisterFilePath: + memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); + buf_offset += sizeof(session_id); + memcpy(&buf[buf_offset], &id, sizeof(id)); + buf_offset += sizeof(id); + + { + uint32_t string_length = static_cast<uint32_t>(file_path->size()); + memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); + buf_offset += sizeof(string_length); + } + + DCHECK(file_path.has_value()); + + memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); + buf_offset += file_path->size(); + break; + case CommandChoice::kUnregisterFilePath: + memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); + buf_offset += sizeof(session_id); + memcpy(&buf[buf_offset], &id, sizeof(id)); + buf_offset += sizeof(id); + break; + case CommandChoice::kReadAhead: + memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); + buf_offset += sizeof(session_id); + memcpy(&buf[buf_offset], &id, sizeof(id)); + buf_offset += sizeof(id); + memcpy(&buf[buf_offset], &read_ahead_kind, sizeof(read_ahead_kind)); + buf_offset += sizeof(read_ahead_kind); + memcpy(&buf[buf_offset], &length, sizeof(length)); + buf_offset += sizeof(length); + memcpy(&buf[buf_offset], &offset, sizeof(offset)); + buf_offset += sizeof(offset); + break; + case CommandChoice::kCreateSession: + case CommandChoice::kCreateFdSession: + memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); + buf_offset += sizeof(session_id); + + { + uint32_t string_length = static_cast<uint32_t>(file_path->size()); + memcpy(&buf[buf_offset], &string_length, sizeof(string_length)); + buf_offset += sizeof(string_length); + } + + DCHECK(file_path.has_value()); + + memcpy(&buf[buf_offset], file_path->c_str(), file_path->size()); + buf_offset += file_path->size(); + + DCHECK_EQ(buf_offset, space_requirement) << *this << ",file_path_size:" << file_path->size(); + DCHECK_EQ(buf_offset, *produced_bytes) << *this; + + break; + case CommandChoice::kDestroySession: + case CommandChoice::kDumpSession: + memcpy(&buf[buf_offset], &session_id, sizeof(session_id)); + buf_offset += sizeof(session_id); + break; + case CommandChoice::kExit: + case CommandChoice::kDumpEverything: + // Only need to write out the choice. + break; + default: + LOG(FATAL) << "should have fallen out in the above switch" + << static_cast<uint32_t>(choice); + break; + } + + DCHECK_EQ(buf_offset, space_requirement) << *this; + DCHECK_EQ(buf_offset, *produced_bytes) << *this; + + return true; +} + +class PrefetcherDaemon::Impl { + public: + std::optional<PrefetcherForkParameters> StartPipesViaFork() { + int pipefds[2]; + if (pipe(&pipefds[0]) != 0) { + PLOG(FATAL) << "Failed to create read/write pipes"; + } + + if (WOULD_LOG(VERBOSE)) { + long pipe_size = static_cast<long>(fcntl(pipefds[0], F_GETPIPE_SZ)); + if (pipe_size < 0) { + PLOG(ERROR) << "Failed to F_GETPIPE_SZ:"; + } + LOG(VERBOSE) << "StartPipesViaFork: default pipe size: " << pipe_size; + } + + for (int i = 0; i < 2; ++i) { + // Default pipe size is usually 64KB. + // Increase to 1MB so that iorapd has to rarely run during prefetching. + if (fcntl(pipefds[i], F_SETPIPE_SZ, kPipeBufferSize) < 0) { + PLOG(FATAL) << "Failed to increase pipe size to max"; + } + } + + pipefd_read_ = pipefds[0]; + pipefd_write_ = pipefds[1]; + + PrefetcherForkParameters params; + params.input_fd = pipefd_read_; + params.output_fd = pipefd_write_; + params.format_text = false; + params.use_sockets = false; + + bool res = StartViaFork(params); + if (res) { + return params; + } else { + return std::nullopt; + } + } + +std::optional<PrefetcherForkParameters> StartSocketViaFork() { + int socket_fds[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, /*protocol*/0, &socket_fds[0]) != 0) { + PLOG(FATAL) << "Failed to create read/write socketpair"; + } + + pipefd_read_ = socket_fds[0]; // iorapd writer, iorap.prefetcherd reader + pipefd_write_ = socket_fds[1]; // iorapd reader, iorap.prefetcherd writer + + PrefetcherForkParameters params; + params.input_fd = pipefd_read_; + params.output_fd = pipefd_write_; + params.format_text = false; + params.use_sockets = true; + + bool res = StartViaFork(params); + if (res) { + return params; + } else { + return std::nullopt; + } + } + + bool StartViaFork(PrefetcherForkParameters params) { + params_ = params; + + forked_ = true; + child_ = fork(); + + if (child_ == -1) { + LOG(FATAL) << "Failed to fork PrefetcherDaemon"; + } else if (child_ > 0) { // we are the caller of this function + LOG(DEBUG) << "forked into iorap.prefetcherd, pid = " << child_; + + return true; + } else { + // we are the child that was forked. + std::stringstream argv; // for logging + std::vector<std::string> argv_vec; + + { + std::stringstream s; + s << "--input-fd"; + argv_vec.push_back(s.str()); + + std::stringstream s2; + s2 << params.input_fd; + argv_vec.push_back(s2.str()); + + argv << " --input-fd" << " " << params.input_fd; + } + + { + std::stringstream s; + s << "--output-fd"; + argv_vec.push_back(s.str()); + + std::stringstream s2; + s2 << params.output_fd; + argv_vec.push_back(s2.str()); + + argv << " --output-fd" << " " << params.output_fd; + } + + + if (params.use_sockets) { + std::stringstream s; + s << "--use-sockets"; + argv_vec.push_back(s.str()); + + argv << " --use-sockets"; + } + + if (WOULD_LOG(VERBOSE)) { + std::stringstream s; + s << "--verbose"; + argv_vec.push_back(s.str()); + + argv << " --verbose"; + } + + std::unique_ptr<ArgString[]> argv_ptr = common::VecToArgv(kCommandFileName, argv_vec); + + LOG(DEBUG) << "fork+exec: " << kCommandFileName << " " + << argv.str(); + execve(kCommandFileName, (char **)argv_ptr.get(), /*envp*/nullptr); + // This should never return. + _exit(EXIT_FAILURE); + } + + DCHECK(false); + return false; + } + + // TODO: Not very useful since this can never return 'true' + // -> in the child we would've already execd which loses all this code. + bool IsDaemon() { + // In the child the pid is always 0. + return child_ > 0; + } + + bool Main(PrefetcherForkParameters params) { + LOG(VERBOSE) << "PrefetcherDaemon::Main " << params; + + CommandParser command_parser{params}; + + Command next_command{}; + + std::vector<Command> many_commands; + + // Ensure alogd is pre-initialized before installing minijail. + LOG(DEBUG) << "Installing minijail"; + + // Install seccomp filter using libminijail. + if (kInstallMiniJail) { + MiniJail(); + } + + while (true) { + bool eof = false; + + if (params.use_sockets) { + // use recvmsg(2). supports receiving FDs. + many_commands = command_parser.ParseSocketCommands(/*out*/eof); + } else { + // use read(2). does not support receiving FDs. + many_commands = command_parser.ParseCommands(/*out*/eof); + } + + if (eof) { + LOG(WARNING) << "PrefetcherDaemon got EOF, terminating"; + return true; + } + + for (auto& command : many_commands) { + if (LogVerboseIpc()) { + LOG(VERBOSE) << "PrefetcherDaemon got command: " << command; + } + + if (command.choice == CommandChoice::kExit) { + LOG(DEBUG) << "PrefetcherDaemon got kExit command, terminating"; + return true; + } + + if (!ReceiveCommand(command)) { + // LOG(WARNING) << "PrefetcherDaemon command processing failure: " << command; + } + + // ReceiveCommand should dup to keep the FD. Avoid leaks. + if (command.fd.has_value()) { + close(*command.fd); + } + } + } + + LOG(VERBOSE) << "PrefetcherDaemon::Main got exit, terminating"; + + return true; + // Terminate. + } + + Impl(PrefetcherDaemon* daemon) { + session_manager_ = SessionManager::CreateManager(SessionKind::kInProcessDirect); + DCHECK(session_manager_ != nullptr); + }; + + ~Impl() { + // Don't do anything if we never called 'StartViaFork' + if (forked_) { + if (!IsDaemon()) { + int status; + waitpid(child_, /*out*/&status, /*options*/0); + } else { + LOG(WARNING) << "execve should have avoided this path"; + // DCHECK(false) << "not possible because the execve would avoid this path"; + } + } + } + + bool SendCommand(const Command& command) { + // Only parent is the sender. + DCHECK(forked_); + //DCHECK(!IsDaemon()); + + char buf[1024]; + size_t stream_size; + if (!command.Write(buf, sizeof(buf), /*out*/&stream_size)) { + PLOG(ERROR) << "Failed to serialize command: " << command; + return false; + } + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "pre-write(fd=" << pipefd_write_ << ", buf=" << buf + << ", size=" << stream_size<< ")"; + } + + if (params_.use_sockets) { + /* iov contains the normal message (Command) */ + struct iovec iov; + memset(&iov, 0, sizeof(iov)); + iov.iov_base = &buf[0]; + iov.iov_len = stream_size; + + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + + /* point to iov to transmit */ + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + + /* no dest address; socket is connected */ + msg.msg_name = nullptr; + msg.msg_namelen = 0; + + // append a CMSG with SCM_RIGHTS if we have an FD. + if (command.fd.has_value()) { + union { + struct cmsghdr cmh; + char control[CMSG_SPACE(sizeof(int))]; /* sized to hold an fd (int) */ + } control_un; + memset(&control_un, 0, sizeof(control_un)); + + msg.msg_control = &control_un.control[0]; + msg.msg_controllen = sizeof(control_un.control); + + struct cmsghdr *hp; + hp = CMSG_FIRSTHDR(&msg); + hp->cmsg_len = CMSG_LEN(sizeof(int)); + hp->cmsg_level = SOL_SOCKET; + hp->cmsg_type = SCM_RIGHTS; + *((int *) CMSG_DATA(hp)) = *(command.fd); + + DCHECK(command.RequiresFd()) << command; + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "append FD to sendmsg: " << *(command.fd); + } + } + + // TODO: add CMSG for the FD passage. + + if (TEMP_FAILURE_RETRY(sendmsg(pipefd_write_, &msg, /*flags*/0)) < 0) { + PLOG(ERROR) << "Failed to sendmsg command: " << command; + return false; + } + } else { + if (TEMP_FAILURE_RETRY(write(pipefd_write_, buf, stream_size)) < 0) { + PLOG(ERROR) << "Failed to write command: " << command; + return false; + } + } + + if (LogVerboseIpc()) { + LOG(VERBOSE) << "write(fd=" << pipefd_write_ << ", buf=" << buf + << ", size=" << stream_size<< ")"; + } + + // TODO: also read the reply? + return true; + } + + bool ReceiveCommand(const Command& command) { + // Only child is the command receiver. + // DCHECK(IsDaemon()); + + switch (command.choice) { + case CommandChoice::kRegisterFilePath: { + std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); + + if (!session) { + LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; + return false; + } + + CHECK(command.file_path.has_value()) << command; + return session->RegisterFilePath(command.id, *command.file_path); + } + case CommandChoice::kUnregisterFilePath: { + std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); + + if (!session) { + LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; + return false; + } + + return session->UnregisterFilePath(command.id); + } + case CommandChoice::kReadAhead: { + std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); + + if (!session) { + LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; + return false; + } + + return session->ReadAhead(command.id, command.read_ahead_kind, command.length, command.offset); + } + // TODO: unreadahead + case CommandChoice::kExit: { + LOG(WARNING) << "kExit should be handled earlier."; + return true; + } + case CommandChoice::kCreateSession: { + std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); + if (session != nullptr) { + LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; + return false; + } + CHECK(command.file_path.has_value()) << command; + if (session_manager_->CreateSession(command.session_id, /*description*/*command.file_path) + == nullptr) { + LOG(ERROR) << "ReceiveCommand: Failure to kCreateSession: " << command; + return false; + } + return true; + } + case CommandChoice::kDestroySession: { + if (!session_manager_->DestroySession(command.session_id)) { + LOG(ERROR) << "ReceiveCommand: Failure to kDestroySession: " << command; + return false; + } + return true; + } + case CommandChoice::kDumpSession: { + std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); + + if (!session) { + LOG(ERROR) << "ReceiveCommand: Could not find session for command: " << command; + return false; + } + + // TODO: Consider doing dumpsys support somehow? + session->Dump(LOG_STREAM(DEBUG), /*multiline*/true); + return true; + } + case CommandChoice::kDumpEverything: { + session_manager_->Dump(LOG_STREAM(DEBUG), /*multiline*/true); + break; + } + case CommandChoice::kCreateFdSession: { + std::shared_ptr<Session> session = session_manager_->FindSession(command.session_id); + if (session != nullptr) { + LOG(ERROR) << "ReceiveCommand: session for ID already exists: " << command; + return false; + } + CHECK(command.file_path.has_value()) << command; + CHECK(command.fd.has_value()) << command; + + LOG(VERBOSE) << "ReceiveCommand: kCreateFdSession fd=" << *(command.fd); + + // TODO: Maybe use CreateFdSession instead? + session = + session_manager_->CreateSession(command.session_id, + /*description*/*command.file_path, + command.fd.value()); + if (session == nullptr) { + LOG(ERROR) << "ReceiveCommand: Failure to kCreateFdSession: " << command; + return false; + } + + return session->ProcessFd(*command.fd); + } + } + + return true; + } + + pid_t child_; + bool forked_; + int pipefd_read_; + int pipefd_write_; + PrefetcherForkParameters params_; + // do not ever use an indirect session manager here, as it would cause a lifetime cycle. + std::unique_ptr<SessionManager> session_manager_; // direct only. +}; + +PrefetcherDaemon::PrefetcherDaemon() + : impl_{new Impl{this}} { + LOG(VERBOSE) << "PrefetcherDaemon() constructor"; +} + +bool PrefetcherDaemon::StartViaFork(PrefetcherForkParameters params) { + return impl_->StartViaFork(std::move(params)); +} + + +std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartPipesViaFork() { + return impl_->StartPipesViaFork(); +} + +std::optional<PrefetcherForkParameters> PrefetcherDaemon::StartSocketViaFork() { + return impl_->StartSocketViaFork(); +} + +bool PrefetcherDaemon::Main(PrefetcherForkParameters params) { + return impl_->Main(params); +} + +bool PrefetcherDaemon::SendCommand(const Command& command) { + return impl_->SendCommand(command); +} + +PrefetcherDaemon::~PrefetcherDaemon() { + // required for unique_ptr for incomplete types. +} + +} // namespace iorap::prefetcher |