1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
|
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#define USE_LOG SLAndroidLogLevel_Verbose
#include "sles_allinclusive.h"
#include "android_StreamPlayer.h"
#include <media/IStreamSource.h>
#include <media/IMediaPlayerService.h>
#include <media/stagefright/foundation/ADebug.h>
#include <binder/IPCThreadState.h>
#include "mpeg2ts/ATSParser.h"
//--------------------------------------------------------------------------------------------------
namespace android {
StreamSourceAppProxy::StreamSourceAppProxy(
IAndroidBufferQueue *androidBufferQueue,
const sp<CallbackProtector> &callbackProtector,
// sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
// construction. If you pass in a sp<> to 'this' inside a constructor, then first the
// refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
// destructor to run from inside it's own constructor.
StreamPlayer * /* const sp<StreamPlayer> & */ player) :
mBuffersHasBeenSet(false),
mAndroidBufferQueue(androidBufferQueue),
mCallbackProtector(callbackProtector),
mPlayer(player)
{
SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
}
StreamSourceAppProxy::~StreamSourceAppProxy() {
SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()");
disconnect();
}
const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
sizeof(SLuint32), // item size
SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
};
//--------------------------------------------------
// IStreamSource implementation
void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
assert(listener != NULL);
Mutex::Autolock _l(mLock);
assert(mListener == NULL);
mListener = listener;
}
void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
Mutex::Autolock _l(mLock);
assert(!mBuffersHasBeenSet);
mBuffers = buffers;
mBuffersHasBeenSet = true;
}
void StreamSourceAppProxy::onBufferAvailable(size_t index) {
//SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
{
Mutex::Autolock _l(mLock);
if (!mBuffersHasBeenSet) {
// no buffers available to push data to from the buffer queue, bail
return;
}
CHECK_LT(index, mBuffers.size());
#if 0 // enable if needed for debugging
sp<IMemory> mem = mBuffers.itemAt(index);
SLAint64 length = (SLAint64) mem->size();
#endif
mAvailableBuffers.push_back(index);
//SL_LOGD("onBufferAvailable() now %d buffers available in queue",
// mAvailableBuffers.size());
}
// a new shared mem buffer is available: let's try to fill immediately
pullFromBuffQueue();
}
void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
if (mListener != 0) {
mListener->issueCommand(cmd, false /* synchronous */, msg);
}
}
void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
if (mListener != 0) {
mListener->queueBuffer(buffIndex, buffLength);
}
}
void StreamSourceAppProxy::disconnect() {
Mutex::Autolock _l(mLock);
mListener.clear();
// Force binder to push the decremented reference count for sp<IStreamListener>.
// mediaserver and client both have sp<> to the other. When you decrement an sp<>
// reference count, binder doesn't push that to the other process immediately.
IPCThreadState::self()->flushCommands();
mBuffers.clear();
mBuffersHasBeenSet = false;
mAvailableBuffers.clear();
}
//--------------------------------------------------
// consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
void StreamSourceAppProxy::pullFromBuffQueue() {
if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
size_t bufferId;
void* bufferLoc;
size_t buffSize;
slAndroidBufferQueueCallback callback = NULL;
void* pBufferContext, *pBufferData, *callbackPContext = NULL;
AdvancedBufferHeader *oldFront = NULL;
uint32_t dataSize /* , dataUsed */;
// retrieve data from the buffer queue
interface_lock_exclusive(mAndroidBufferQueue);
// can this read operation cause us to call the buffer queue callback
// (either because there was a command with no data, or all the data has been consumed)
bool queueCallbackCandidate = false;
if (mAndroidBufferQueue->mState.count != 0) {
// SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
oldFront = mAndroidBufferQueue->mFront;
AdvancedBufferHeader *newFront = &oldFront[1];
// consume events when starting to read data from a buffer for the first time
if (oldFront->mDataSizeConsumed == 0) {
// note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
receivedCmd_l(IStreamListener::EOS);
// EOS has no associated data
queueCallbackCandidate = true;
} else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
receivedCmd_l(IStreamListener::DISCONTINUITY);
} else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
sp<AMessage> msg = new AMessage();
msg->setInt64(IStreamListener::kKeyResumeAtPTS,
(int64_t)oldFront->mItems.mTsCmdData.mPts);
receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
} else if (oldFront->mItems.mTsCmdData.mTsCmdCode
& ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) {
sp<AMessage> msg = new AMessage();
msg->setInt32(
IStreamListener::kKeyDiscontinuityMask,
ATSParser::DISCONTINUITY_FORMATCHANGE);
receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
} else if (oldFront->mItems.mTsCmdData.mTsCmdCode
& ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) {
sp<AMessage> msg = new AMessage();
msg->setInt32(
IStreamListener::kKeyDiscontinuityMask,
ATSParser::DISCONTINUITY_VIDEO_FORMAT);
receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
}
// note that here we are intentionally only supporting
// ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c
// some commands may introduce a time discontinuity, reevaluate position if needed
if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) {
const sp<StreamPlayer> player(mPlayer.promote());
if (player != NULL) {
// FIXME see note at onSeek
player->seek(ANDROID_UNKNOWN_TIME);
}
}
oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
}
{
// we're going to change the shared mem buffer queue, so lock it
Mutex::Autolock _l(mLock);
if (!mAvailableBuffers.empty()) {
bufferId = *mAvailableBuffers.begin();
CHECK_LT(bufferId, mBuffers.size());
sp<IMemory> mem = mBuffers.itemAt(bufferId);
bufferLoc = mem->pointer();
buffSize = mem->size();
char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
// more available than requested, copy as much as requested
// consume data: 1/ copy to given destination
memcpy(bufferLoc, pSrc, buffSize);
// 2/ keep track of how much has been consumed
oldFront->mDataSizeConsumed += buffSize;
// 3/ notify shared mem listener that new data is available
receivedBuffer_l(bufferId, buffSize);
mAvailableBuffers.erase(mAvailableBuffers.begin());
} else {
// requested as much available or more: consume the whole of the current
// buffer and move to the next
size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
//SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
oldFront->mDataSizeConsumed = oldFront->mDataSize;
// move queue to next
if (newFront == &mAndroidBufferQueue->
mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
// reached the end, circle back
newFront = mAndroidBufferQueue->mBufferArray;
}
mAndroidBufferQueue->mFront = newFront;
mAndroidBufferQueue->mState.count--;
mAndroidBufferQueue->mState.index++;
if (consumed > 0) {
// consume data: 1/ copy to given destination
memcpy(bufferLoc, pSrc, consumed);
// 2/ keep track of how much has been consumed
// here nothing to do because we are done with this buffer
// 3/ notify StreamPlayer that new data is available
receivedBuffer_l(bufferId, consumed);
mAvailableBuffers.erase(mAvailableBuffers.begin());
}
// data has been consumed, and the buffer queue state has been updated
// we will notify the client if applicable
queueCallbackCandidate = true;
}
}
if (queueCallbackCandidate) {
if (mAndroidBufferQueue->mCallbackEventsMask &
SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
callback = mAndroidBufferQueue->mCallback;
// save callback data while under lock
callbackPContext = mAndroidBufferQueue->mContext;
pBufferContext = (void *)oldFront->mBufferContext;
pBufferData = (void *)oldFront->mDataBuffer;
dataSize = oldFront->mDataSize;
// here a buffer is only dequeued when fully consumed
//dataUsed = oldFront->mDataSizeConsumed;
}
}
//SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
if (!mAvailableBuffers.empty()) {
// there is still room in the shared memory, recheck later if we can pull
// data from the buffer queue and write it to shared memory
const sp<StreamPlayer> player(mPlayer.promote());
if (player != NULL) {
player->queueRefilled();
}
}
}
} else { // empty queue
SL_LOGD("ABQ empty, starving!");
}
interface_unlock_exclusive(mAndroidBufferQueue);
// notify client of buffer processed
if (NULL != callback) {
SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
pBufferContext, pBufferData, dataSize,
dataSize, /* dataUsed */
// no messages during playback other than marking the buffer as processed
(const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
if (SL_RESULT_SUCCESS != result) {
// Reserved for future use
SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
}
}
mCallbackProtector->exitCb();
} // enterCbIfOk
}
//--------------------------------------------------------------------------------------------------
StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo,
IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
GenericMediaPlayer(params, hasVideo),
mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
mStopForDestroyCompleted(false)
{
SL_LOGD("StreamPlayer::StreamPlayer()");
}
StreamPlayer::~StreamPlayer() {
SL_LOGD("StreamPlayer::~StreamPlayer()");
mAppProxy->disconnect();
}
void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) {
case kWhatPullFromAbq:
onPullFromAndroidBufferQueue();
break;
case kWhatStopForDestroy:
onStopForDestroy();
break;
default:
GenericMediaPlayer::onMessageReceived(msg);
break;
}
}
void StreamPlayer::preDestroy() {
// FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
(new AMessage(kWhatStopForDestroy, this))->post();
{
Mutex::Autolock _l(mStopForDestroyLock);
while (!mStopForDestroyCompleted) {
mStopForDestroyCondition.wait(mStopForDestroyLock);
}
}
// GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
GenericMediaPlayer::preDestroy();
}
void StreamPlayer::onStopForDestroy() {
if (mPlayer != 0) {
mPlayer->stop();
// causes CHECK failure in Nuplayer
//mPlayer->setDataSource(NULL);
mPlayer->setVideoSurfaceTexture(NULL);
mPlayer->disconnect();
mPlayer.clear();
{
// FIXME ugh make this a method
Mutex::Autolock _l(mPreparedPlayerLock);
mPreparedPlayer.clear();
}
}
{
Mutex::Autolock _l(mStopForDestroyLock);
mStopForDestroyCompleted = true;
}
mStopForDestroyCondition.signal();
}
/**
* Asynchronously notify the player that the queue is ready to be pulled from.
*/
void StreamPlayer::queueRefilled() {
// async notification that the ABQ was refilled: the player should pull from the ABQ, and
// and push to shared memory (to the media server)
(new AMessage(kWhatPullFromAbq, this))->post();
}
void StreamPlayer::appClear_l() {
// the user of StreamPlayer has cleared its AndroidBufferQueue:
// there's no clear() for the shared memory queue, so this is a no-op
}
//--------------------------------------------------
// Event handlers
void StreamPlayer::onPrepare() {
SL_LOGD("StreamPlayer::onPrepare()");
sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
if (mediaPlayerService != NULL) {
mPlayer = mediaPlayerService->create(mPlayerClient /*IMediaPlayerClient*/,
mPlaybackParams.sessionId);
if (mPlayer == NULL) {
SL_LOGE("media player service failed to create player by app proxy");
} else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) {
SL_LOGE("setDataSource failed");
mPlayer.clear();
}
}
if (mPlayer == NULL) {
mStateFlags |= kFlagPreparedUnsuccessfully;
}
GenericMediaPlayer::onPrepare();
SL_LOGD("StreamPlayer::onPrepare() done");
}
void StreamPlayer::onPlay() {
SL_LOGD("StreamPlayer::onPlay()");
// enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
// player had starved the shared memory)
queueRefilled();
GenericMediaPlayer::onPlay();
}
void StreamPlayer::onPullFromAndroidBufferQueue() {
SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
mAppProxy->pullFromBuffQueue();
}
} // namespace android
|