summaryrefslogtreecommitdiffstats
path: root/src/perfetto/rx_producer.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/perfetto/rx_producer.cc')
-rw-r--r--src/perfetto/rx_producer.cc100
1 files changed, 87 insertions, 13 deletions
diff --git a/src/perfetto/rx_producer.cc b/src/perfetto/rx_producer.cc
index 14311a1..824600c 100644
--- a/src/perfetto/rx_producer.cc
+++ b/src/perfetto/rx_producer.cc
@@ -83,7 +83,7 @@ PerfettoDependencies::Component PerfettoDependencies::CreateComponent() {
.bind<PerfettoConsumer, PerfettoConsumerImpl>()
.registerProvider([]() /* -> TraceConfig */ {
return CreateConfig(kTraceDurationMs,
- /*deferred_start*/true,
+ /*deferred_start*/false,
kBufferSize);
});
}
@@ -199,6 +199,12 @@ class StateChangedSubject {
using State = ::perfetto::consumer::State;
using Handle = ::perfetto::consumer::Handle;
+ // Static members to solve use-after-free bug.
+ // The object is accessed from not only perfetto thread, but also iorap
+ // thread. Use this global map to manage it.
+ static std::mutex state_subject_mutex_;
+ static std::unordered_map<Handle, StateChangedSubject*> state_subject_map_;
+
StateChangedSubject(const ::perfetto::protos::TraceConfig& trace_config,
rxcpp::subscriber<PerfettoStateChange> destination,
std::shared_ptr<PerfettoConsumer> perfetto_consumer)
@@ -391,7 +397,9 @@ class StateChangedSubject {
bound_.store(true); // seq_cst release.
}
- // Thread safety: Called by libperfetto background thread (same one every time).
+
+ // Called by libperfetto background thread (same one every time) and iorap
+ // thread.
static void CallbackOnStateChanged(Handle handle, State state, void* callback_arg) {
LOG(VERBOSE) << "CallbackOnStateChanged(handle=" << handle << ",state=" << state
<< ",callback_arg=" << callback_arg << ")";
@@ -399,24 +407,30 @@ class StateChangedSubject {
// Validate OnStateChanged callback invariants, guaranteed by libperfetto.
DCHECK_NE(handle, ::perfetto::consumer::kInvalidHandle);
- // Note: Perfetto guarantees this callback always occurs on the same thread,
- // so we don't need to do any extra thread synchronization here since we are only mutating
- // StateChangedSubject from within this function.
-
// TODO: the memory ordering guarantees should be explicitly specified in consumer_api.h:
// This isn't specific enough:
// "The callback will be invoked on an internal thread and must not block."
// However looking at the implementation it posts onto a single-thread task runner,
// so this must be the case.
- StateChangedSubject* state_subject = reinterpret_cast<StateChangedSubject*>(callback_arg);
// This current thread owns 'StateChangedSubject', no other threads must access it.
// Explicit synchronization is not necessary.
- if (!state_subject->OnStateChanged(handle, state)) {
- // Clean up the state tracker when we reach a terminal state.
- // This means that no future callbacks will occur anymore.
- delete state_subject;
+ {
+ std::lock_guard<std::mutex> guard(StateChangedSubject::state_subject_mutex_);
+ auto it = StateChangedSubject::state_subject_map_.find(handle);
+ // If the object is already deleted, do nothing.
+ if (it == StateChangedSubject::state_subject_map_.end()) {
+ return;
+ }
+
+ StateChangedSubject* state_subject = it->second;
+ if (!state_subject->OnStateChanged(handle, state)) {
+ // Clean up the state tracker when we reach a terminal state.
+ // This means that no future callbacks will occur anymore.
+ StateChangedSubject::state_subject_map_.erase(it);
+ delete state_subject;
+ }
}
}
@@ -447,6 +461,10 @@ class StateChangedSubject {
// of just being subject-like?
};
+std::mutex StateChangedSubject::state_subject_mutex_;
+std::unordered_map<::perfetto::consumer::Handle,
+ StateChangedSubject*> StateChangedSubject::state_subject_map_;
+
// Note: The states will be emitted on a separate thread, so e.g. #as_blocking()
// needs to be used to avoid dropping everything on the floor.
//
@@ -484,6 +502,11 @@ static auto /*[observable<State>, shared_ptr<PerfettoConsumerHandle>]*/
return;
}
+ {
+ std::lock_guard<std::mutex> guard(StateChangedSubject::state_subject_mutex_);
+ StateChangedSubject::state_subject_map_[handle] = state_subject.get();
+ }
+
std::shared_ptr<PerfettoConsumerHandle> safe_handle{
new PerfettoConsumerHandle{perfetto_consumer, handle}};
@@ -567,11 +590,63 @@ bool BinaryWireProtobuf<T>::WriteStringToFd(int fd) const {
return true;
}
+template <typename T>
+std::optional<BinaryWireProtobuf<T>> BinaryWireProtobuf<T>::ReadFullyFromFile(
+ const std::string& path,
+ bool follow_symlinks) {
+ std::vector<std::byte> data;
+
+ int flags = O_RDONLY | O_CLOEXEC | O_BINARY | (follow_symlinks ? 0 : O_NOFOLLOW);
+ android::base::unique_fd fd(TEMP_FAILURE_RETRY(open(path.c_str(), flags)));
+ if (fd == -1) {
+ return std::nullopt;
+ }
+
+ if (ReadFdToString(fd.get(), /*out*/&data)) {
+ return BinaryWireProtobuf<T>{std::move(data)};
+ } else {
+ return std::nullopt;
+ }
+}
+
+template <typename T>
+bool BinaryWireProtobuf<T>::operator==(const BinaryWireProtobuf<T>& other) const {
+ if (data_.size() != other.data_.size()) {
+ return false;
+ }
+ return std::equal(data_.begin(), data_.end(), other.data_.begin());
+}
+
+template <typename T>
+bool BinaryWireProtobuf<T>::ReadFdToString(int fd, /*out*/std::vector<std::byte>* content) {
+ DCHECK(content != nullptr);
+
+ content->clear();
+
+ struct stat sb;
+ if (fstat(fd, /*out*/&sb) != -1 && sb.st_size > 0) {
+ content->reserve(sb.st_size);
+ }
+
+ char buf[BUFSIZ];
+ auto it = content->begin();
+ ssize_t n;
+ while ((n = TEMP_FAILURE_RETRY(read(fd, &buf[0], sizeof(buf)))) > 0) {
+ content->insert(it,
+ reinterpret_cast<std::byte*>(&buf[0]),
+ reinterpret_cast<std::byte*>(&buf[n]));
+
+ std::advance(/*inout*/it, static_cast<size_t>(n));
+
+ static_assert(sizeof(char) == sizeof(std::byte), "sanity check for reinterpret cast");
+ }
+ return (n == 0) ? true : false;
+}
+
// explicit template instantiation.
template struct BinaryWireProtobuf<::google::protobuf::MessageLite>;
// TODO: refactor this not to need the template instantiation.
-#if defined(__ANDROID__)
// Copy of the 2.6.18 kernel header (linux/ioprio.h)
#define IOPRIO_WHO_PROCESS (1)
@@ -584,7 +659,6 @@ template struct BinaryWireProtobuf<::google::protobuf::MessageLite>;
#define IOPRIO_PRIO_CLASS(mask) ((mask) >> IOPRIO_CLASS_SHIFT)
#define IOPRIO_PRIO_DATA(mask) ((mask) & IOPRIO_PRIO_MASK)
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
-#endif
static int ioprio_get(int which, int who) {
return syscall(SYS_ioprio_get, which, who);