diff options
Diffstat (limited to 'src/perfetto/rx_producer.cc')
-rw-r--r-- | src/perfetto/rx_producer.cc | 100 |
1 files changed, 87 insertions, 13 deletions
diff --git a/src/perfetto/rx_producer.cc b/src/perfetto/rx_producer.cc index 14311a1..824600c 100644 --- a/src/perfetto/rx_producer.cc +++ b/src/perfetto/rx_producer.cc @@ -83,7 +83,7 @@ PerfettoDependencies::Component PerfettoDependencies::CreateComponent() { .bind<PerfettoConsumer, PerfettoConsumerImpl>() .registerProvider([]() /* -> TraceConfig */ { return CreateConfig(kTraceDurationMs, - /*deferred_start*/true, + /*deferred_start*/false, kBufferSize); }); } @@ -199,6 +199,12 @@ class StateChangedSubject { using State = ::perfetto::consumer::State; using Handle = ::perfetto::consumer::Handle; + // Static members to solve use-after-free bug. + // The object is accessed from not only perfetto thread, but also iorap + // thread. Use this global map to manage it. + static std::mutex state_subject_mutex_; + static std::unordered_map<Handle, StateChangedSubject*> state_subject_map_; + StateChangedSubject(const ::perfetto::protos::TraceConfig& trace_config, rxcpp::subscriber<PerfettoStateChange> destination, std::shared_ptr<PerfettoConsumer> perfetto_consumer) @@ -391,7 +397,9 @@ class StateChangedSubject { bound_.store(true); // seq_cst release. } - // Thread safety: Called by libperfetto background thread (same one every time). + + // Called by libperfetto background thread (same one every time) and iorap + // thread. static void CallbackOnStateChanged(Handle handle, State state, void* callback_arg) { LOG(VERBOSE) << "CallbackOnStateChanged(handle=" << handle << ",state=" << state << ",callback_arg=" << callback_arg << ")"; @@ -399,24 +407,30 @@ class StateChangedSubject { // Validate OnStateChanged callback invariants, guaranteed by libperfetto. DCHECK_NE(handle, ::perfetto::consumer::kInvalidHandle); - // Note: Perfetto guarantees this callback always occurs on the same thread, - // so we don't need to do any extra thread synchronization here since we are only mutating - // StateChangedSubject from within this function. - // TODO: the memory ordering guarantees should be explicitly specified in consumer_api.h: // This isn't specific enough: // "The callback will be invoked on an internal thread and must not block." // However looking at the implementation it posts onto a single-thread task runner, // so this must be the case. - StateChangedSubject* state_subject = reinterpret_cast<StateChangedSubject*>(callback_arg); // This current thread owns 'StateChangedSubject', no other threads must access it. // Explicit synchronization is not necessary. - if (!state_subject->OnStateChanged(handle, state)) { - // Clean up the state tracker when we reach a terminal state. - // This means that no future callbacks will occur anymore. - delete state_subject; + { + std::lock_guard<std::mutex> guard(StateChangedSubject::state_subject_mutex_); + auto it = StateChangedSubject::state_subject_map_.find(handle); + // If the object is already deleted, do nothing. + if (it == StateChangedSubject::state_subject_map_.end()) { + return; + } + + StateChangedSubject* state_subject = it->second; + if (!state_subject->OnStateChanged(handle, state)) { + // Clean up the state tracker when we reach a terminal state. + // This means that no future callbacks will occur anymore. + StateChangedSubject::state_subject_map_.erase(it); + delete state_subject; + } } } @@ -447,6 +461,10 @@ class StateChangedSubject { // of just being subject-like? }; +std::mutex StateChangedSubject::state_subject_mutex_; +std::unordered_map<::perfetto::consumer::Handle, + StateChangedSubject*> StateChangedSubject::state_subject_map_; + // Note: The states will be emitted on a separate thread, so e.g. #as_blocking() // needs to be used to avoid dropping everything on the floor. // @@ -484,6 +502,11 @@ static auto /*[observable<State>, shared_ptr<PerfettoConsumerHandle>]*/ return; } + { + std::lock_guard<std::mutex> guard(StateChangedSubject::state_subject_mutex_); + StateChangedSubject::state_subject_map_[handle] = state_subject.get(); + } + std::shared_ptr<PerfettoConsumerHandle> safe_handle{ new PerfettoConsumerHandle{perfetto_consumer, handle}}; @@ -567,11 +590,63 @@ bool BinaryWireProtobuf<T>::WriteStringToFd(int fd) const { return true; } +template <typename T> +std::optional<BinaryWireProtobuf<T>> BinaryWireProtobuf<T>::ReadFullyFromFile( + const std::string& path, + bool follow_symlinks) { + std::vector<std::byte> data; + + int flags = O_RDONLY | O_CLOEXEC | O_BINARY | (follow_symlinks ? 0 : O_NOFOLLOW); + android::base::unique_fd fd(TEMP_FAILURE_RETRY(open(path.c_str(), flags))); + if (fd == -1) { + return std::nullopt; + } + + if (ReadFdToString(fd.get(), /*out*/&data)) { + return BinaryWireProtobuf<T>{std::move(data)}; + } else { + return std::nullopt; + } +} + +template <typename T> +bool BinaryWireProtobuf<T>::operator==(const BinaryWireProtobuf<T>& other) const { + if (data_.size() != other.data_.size()) { + return false; + } + return std::equal(data_.begin(), data_.end(), other.data_.begin()); +} + +template <typename T> +bool BinaryWireProtobuf<T>::ReadFdToString(int fd, /*out*/std::vector<std::byte>* content) { + DCHECK(content != nullptr); + + content->clear(); + + struct stat sb; + if (fstat(fd, /*out*/&sb) != -1 && sb.st_size > 0) { + content->reserve(sb.st_size); + } + + char buf[BUFSIZ]; + auto it = content->begin(); + ssize_t n; + while ((n = TEMP_FAILURE_RETRY(read(fd, &buf[0], sizeof(buf)))) > 0) { + content->insert(it, + reinterpret_cast<std::byte*>(&buf[0]), + reinterpret_cast<std::byte*>(&buf[n])); + + std::advance(/*inout*/it, static_cast<size_t>(n)); + + static_assert(sizeof(char) == sizeof(std::byte), "sanity check for reinterpret cast"); + } + return (n == 0) ? true : false; +} + // explicit template instantiation. template struct BinaryWireProtobuf<::google::protobuf::MessageLite>; // TODO: refactor this not to need the template instantiation. -#if defined(__ANDROID__) // Copy of the 2.6.18 kernel header (linux/ioprio.h) #define IOPRIO_WHO_PROCESS (1) @@ -584,7 +659,6 @@ template struct BinaryWireProtobuf<::google::protobuf::MessageLite>; #define IOPRIO_PRIO_CLASS(mask) ((mask) >> IOPRIO_CLASS_SHIFT) #define IOPRIO_PRIO_DATA(mask) ((mask) & IOPRIO_PRIO_MASK) #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) -#endif static int ioprio_get(int which, int who) { return syscall(SYS_ioprio_get, which, who); |