StreamSplitter.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. /*
  2. * Copyright 2014 The Android Open Source Project
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include <inttypes.h>
  17. #define LOG_TAG "StreamSplitter"
  18. #define ATRACE_TAG ATRACE_TAG_GRAPHICS
  19. //#define LOG_NDEBUG 0
  20. #include <gui/BufferItem.h>
  21. #include <gui/IGraphicBufferConsumer.h>
  22. #include <gui/IGraphicBufferProducer.h>
  23. #include <gui/StreamSplitter.h>
  24. #include <ui/GraphicBuffer.h>
  25. #include <binder/ProcessState.h>
  26. #include <utils/Trace.h>
  27. namespace android {
  28. status_t StreamSplitter::createSplitter(
  29. const sp<IGraphicBufferConsumer>& inputQueue,
  30. sp<StreamSplitter>* outSplitter) {
  31. if (inputQueue == NULL) {
  32. ALOGE("createSplitter: inputQueue must not be NULL");
  33. return BAD_VALUE;
  34. }
  35. if (outSplitter == NULL) {
  36. ALOGE("createSplitter: outSplitter must not be NULL");
  37. return BAD_VALUE;
  38. }
  39. sp<StreamSplitter> splitter(new StreamSplitter(inputQueue));
  40. status_t status = splitter->mInput->consumerConnect(splitter, false);
  41. if (status == NO_ERROR) {
  42. splitter->mInput->setConsumerName(String8("StreamSplitter"));
  43. *outSplitter = splitter;
  44. }
  45. return status;
  46. }
  47. StreamSplitter::StreamSplitter(const sp<IGraphicBufferConsumer>& inputQueue)
  48. : mIsAbandoned(false), mMutex(), mReleaseCondition(),
  49. mOutstandingBuffers(0), mInput(inputQueue), mOutputs(), mBuffers() {}
  50. StreamSplitter::~StreamSplitter() {
  51. mInput->consumerDisconnect();
  52. Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
  53. for (; output != mOutputs.end(); ++output) {
  54. (*output)->disconnect(NATIVE_WINDOW_API_CPU);
  55. }
  56. if (mBuffers.size() > 0) {
  57. ALOGE("%zu buffers still being tracked", mBuffers.size());
  58. }
  59. }
  60. status_t StreamSplitter::addOutput(
  61. const sp<IGraphicBufferProducer>& outputQueue) {
  62. if (outputQueue == NULL) {
  63. ALOGE("addOutput: outputQueue must not be NULL");
  64. return BAD_VALUE;
  65. }
  66. Mutex::Autolock lock(mMutex);
  67. IGraphicBufferProducer::QueueBufferOutput queueBufferOutput;
  68. sp<OutputListener> listener(new OutputListener(this, outputQueue));
  69. IInterface::asBinder(outputQueue)->linkToDeath(listener);
  70. status_t status = outputQueue->connect(listener, NATIVE_WINDOW_API_CPU,
  71. /* producerControlledByApp */ false, &queueBufferOutput);
  72. if (status != NO_ERROR) {
  73. ALOGE("addOutput: failed to connect (%d)", status);
  74. return status;
  75. }
  76. mOutputs.push_back(outputQueue);
  77. return NO_ERROR;
  78. }
  79. void StreamSplitter::setName(const String8 &name) {
  80. Mutex::Autolock lock(mMutex);
  81. mInput->setConsumerName(name);
  82. }
  83. void StreamSplitter::onFrameAvailable(const BufferItem& /* item */) {
  84. ATRACE_CALL();
  85. Mutex::Autolock lock(mMutex);
  86. // The current policy is that if any one consumer is consuming buffers too
  87. // slowly, the splitter will stall the rest of the outputs by not acquiring
  88. // any more buffers from the input. This will cause back pressure on the
  89. // input queue, slowing down its producer.
  90. // If there are too many outstanding buffers, we block until a buffer is
  91. // released back to the input in onBufferReleased
  92. while (mOutstandingBuffers >= MAX_OUTSTANDING_BUFFERS) {
  93. mReleaseCondition.wait(mMutex);
  94. // If the splitter is abandoned while we are waiting, the release
  95. // condition variable will be broadcast, and we should just return
  96. // without attempting to do anything more (since the input queue will
  97. // also be abandoned).
  98. if (mIsAbandoned) {
  99. return;
  100. }
  101. }
  102. ++mOutstandingBuffers;
  103. // Acquire and detach the buffer from the input
  104. BufferItem bufferItem;
  105. status_t status = mInput->acquireBuffer(&bufferItem, /* presentWhen */ 0);
  106. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  107. "acquiring buffer from input failed (%d)", status);
  108. ALOGV("acquired buffer %#" PRIx64 " from input",
  109. bufferItem.mGraphicBuffer->getId());
  110. status = mInput->detachBuffer(bufferItem.mBuf);
  111. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  112. "detaching buffer from input failed (%d)", status);
  113. // Initialize our reference count for this buffer
  114. mBuffers.add(bufferItem.mGraphicBuffer->getId(),
  115. new BufferTracker(bufferItem.mGraphicBuffer));
  116. IGraphicBufferProducer::QueueBufferInput queueInput(
  117. bufferItem.mTimestamp, bufferItem.mIsAutoTimestamp,
  118. bufferItem.mDataSpace, bufferItem.mCrop,
  119. static_cast<int32_t>(bufferItem.mScalingMode),
  120. bufferItem.mTransform, bufferItem.mIsDroppable,
  121. bufferItem.mFence);
  122. // Attach and queue the buffer to each of the outputs
  123. Vector<sp<IGraphicBufferProducer> >::iterator output = mOutputs.begin();
  124. for (; output != mOutputs.end(); ++output) {
  125. int slot;
  126. status = (*output)->attachBuffer(&slot, bufferItem.mGraphicBuffer);
  127. if (status == NO_INIT) {
  128. // If we just discovered that this output has been abandoned, note
  129. // that, increment the release count so that we still release this
  130. // buffer eventually, and move on to the next output
  131. onAbandonedLocked();
  132. mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
  133. incrementReleaseCountLocked();
  134. continue;
  135. } else {
  136. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  137. "attaching buffer to output failed (%d)", status);
  138. }
  139. IGraphicBufferProducer::QueueBufferOutput queueOutput;
  140. status = (*output)->queueBuffer(slot, queueInput, &queueOutput);
  141. if (status == NO_INIT) {
  142. // If we just discovered that this output has been abandoned, note
  143. // that, increment the release count so that we still release this
  144. // buffer eventually, and move on to the next output
  145. onAbandonedLocked();
  146. mBuffers.editValueFor(bufferItem.mGraphicBuffer->getId())->
  147. incrementReleaseCountLocked();
  148. continue;
  149. } else {
  150. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  151. "queueing buffer to output failed (%d)", status);
  152. }
  153. ALOGV("queued buffer %#" PRIx64 " to output %p",
  154. bufferItem.mGraphicBuffer->getId(), output->get());
  155. }
  156. }
  157. void StreamSplitter::onBufferReleasedByOutput(
  158. const sp<IGraphicBufferProducer>& from) {
  159. ATRACE_CALL();
  160. Mutex::Autolock lock(mMutex);
  161. sp<GraphicBuffer> buffer;
  162. sp<Fence> fence;
  163. status_t status = from->detachNextBuffer(&buffer, &fence);
  164. if (status == NO_INIT) {
  165. // If we just discovered that this output has been abandoned, note that,
  166. // but we can't do anything else, since buffer is invalid
  167. onAbandonedLocked();
  168. return;
  169. } else {
  170. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  171. "detaching buffer from output failed (%d)", status);
  172. }
  173. ALOGV("detached buffer %#" PRIx64 " from output %p",
  174. buffer->getId(), from.get());
  175. const sp<BufferTracker>& tracker = mBuffers.editValueFor(buffer->getId());
  176. // Merge the release fence of the incoming buffer so that the fence we send
  177. // back to the input includes all of the outputs' fences
  178. tracker->mergeFence(fence);
  179. // Check to see if this is the last outstanding reference to this buffer
  180. size_t releaseCount = tracker->incrementReleaseCountLocked();
  181. ALOGV("buffer %#" PRIx64 " reference count %zu (of %zu)", buffer->getId(),
  182. releaseCount, mOutputs.size());
  183. if (releaseCount < mOutputs.size()) {
  184. return;
  185. }
  186. // If we've been abandoned, we can't return the buffer to the input, so just
  187. // stop tracking it and move on
  188. if (mIsAbandoned) {
  189. mBuffers.removeItem(buffer->getId());
  190. return;
  191. }
  192. // Attach and release the buffer back to the input
  193. int consumerSlot;
  194. status = mInput->attachBuffer(&consumerSlot, tracker->getBuffer());
  195. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  196. "attaching buffer to input failed (%d)", status);
  197. status = mInput->releaseBuffer(consumerSlot, /* frameNumber */ 0,
  198. EGL_NO_DISPLAY, EGL_NO_SYNC_KHR, tracker->getMergedFence());
  199. LOG_ALWAYS_FATAL_IF(status != NO_ERROR,
  200. "releasing buffer to input failed (%d)", status);
  201. ALOGV("released buffer %#" PRIx64 " to input", buffer->getId());
  202. // We no longer need to track the buffer once it has been returned to the
  203. // input
  204. mBuffers.removeItem(buffer->getId());
  205. // Notify any waiting onFrameAvailable calls
  206. --mOutstandingBuffers;
  207. mReleaseCondition.signal();
  208. }
  209. void StreamSplitter::onAbandonedLocked() {
  210. ALOGE("one of my outputs has abandoned me");
  211. if (!mIsAbandoned) {
  212. mInput->consumerDisconnect();
  213. }
  214. mIsAbandoned = true;
  215. mReleaseCondition.broadcast();
  216. }
  217. StreamSplitter::OutputListener::OutputListener(
  218. const sp<StreamSplitter>& splitter,
  219. const sp<IGraphicBufferProducer>& output)
  220. : mSplitter(splitter), mOutput(output) {}
  221. StreamSplitter::OutputListener::~OutputListener() {}
  222. void StreamSplitter::OutputListener::onBufferReleased() {
  223. mSplitter->onBufferReleasedByOutput(mOutput);
  224. }
  225. void StreamSplitter::OutputListener::binderDied(const wp<IBinder>& /* who */) {
  226. Mutex::Autolock lock(mSplitter->mMutex);
  227. mSplitter->onAbandonedLocked();
  228. }
  229. StreamSplitter::BufferTracker::BufferTracker(const sp<GraphicBuffer>& buffer)
  230. : mBuffer(buffer), mMergedFence(Fence::NO_FENCE), mReleaseCount(0) {}
  231. StreamSplitter::BufferTracker::~BufferTracker() {}
  232. void StreamSplitter::BufferTracker::mergeFence(const sp<Fence>& with) {
  233. mMergedFence = Fence::merge(String8("StreamSplitter"), mMergedFence, with);
  234. }
  235. } // namespace android