gstdspbase.c 44 KB


  1. /*
  2. * Copyright (C) 2009-2010 Felipe Contreras
  3. *
  4. * Author: Felipe Contreras <felipe.contreras@gmail.com>
  5. *
  6. * This file may be used under the terms of the GNU Lesser General Public
  7. * License version 2.1, a copy of which is found in LICENSE included in the
  8. * packaging of this file.
  9. */
  10. #include "gstdspbase.h"
  11. #include "gstdspvdec.h"
  12. #include "gstdspbuffer.h"
  13. #include "gstdspipp.h"
  14. #include "plugin.h"
  15. #include "dsp_bridge.h"
  16. #include <string.h> /* for memcpy */
  17. #include "util.h"
  18. #include "log.h"
  19. #define GST_CAT_DEFAULT gstdsp_debug
  20. /* gst-dsp errors */
  21. enum {
  22. GSTDSP_ERROR_NONE,
  23. GSTDSP_ERROR_DSP_MMUFAULT,
  24. GSTDSP_ERROR_DSP_SYSERROR,
  25. GSTDSP_ERROR_DSP_UNKNOWN,
  26. GSTDSP_ERROR_OTHER,
  27. GSTDSP_ERROR_BUSY,
  28. };
  29. static inline GstFlowReturn send_buffer(GstDspBase *self, struct td_buffer *tb);
  30. static inline void
  31. map_buffer(GstDspBase *self,
  32. GstBuffer *g_buf,
  33. struct td_buffer *tb);
  34. struct td_codec td_fake_codec = {
  35. .uuid = NULL,
  36. };
  37. static inline long
  38. get_elapsed_eos(GstDspBase *self)
  39. {
  40. struct timespec cur;
  41. long start, elapsed;
  42. if (!self->eos_start.tv_sec)
  43. return 0;
  44. clock_gettime(CLOCK_MONOTONIC, &cur);
  45. start = self->eos_start.tv_sec * 1000 + self->eos_start.tv_nsec / 1000000;
  46. elapsed = cur.tv_sec * 1000 + cur.tv_nsec / 1000000 - start;
  47. return elapsed;
  48. }
  49. static inline void
  50. check_elapsed(GstDspBase *self)
  51. {
  52. long elapsed = get_elapsed_eos(self);
  53. if (elapsed > 500)
  54. pr_warning(self, "eos took %lu ms", elapsed);
  55. }
  56. du_port_t *
  57. du_port_new(int id,
  58. int dir)
  59. {
  60. du_port_t *p;
  61. p = calloc(1, sizeof(*p));
  62. if (!p)
  63. return NULL;
  64. p->id = id;
  65. p->queue = async_queue_new();
  66. p->dir = dir;
  67. return p;
  68. }
  69. void
  70. du_port_free(du_port_t *p)
  71. {
  72. if (!p)
  73. return;
  74. free(p->buffers);
  75. async_queue_free(p->queue);
  76. free(p);
  77. }
  78. void
  79. du_port_alloc_buffers(du_port_t *p, guint num_buffers)
  80. {
  81. p->num_buffers = num_buffers;
  82. free(p->buffers);
  83. p->buffers = calloc(num_buffers, sizeof(*p->buffers));
  84. for (unsigned i = 0; i < p->num_buffers; i++)
  85. p->buffers[i].port = p;
  86. }
  87. static inline void
  88. du_port_flush(du_port_t *p)
  89. {
  90. guint i;
  91. struct td_buffer *tb = p->buffers;
  92. for (i = 0; i < p->num_buffers; i++, tb++) {
  93. dmm_buffer_t *b = tb->data;
  94. if (!b)
  95. continue;
  96. if (tb->user_data)
  97. gst_buffer_unref(tb->user_data);
  98. dmm_buffer_free(b);
  99. tb->data = NULL;
  100. }
  101. async_queue_flush(p->queue);
  102. }
  103. static inline void
  104. g_sem_down_status(GSem *sem,
  105. const GstFlowReturn *status)
  106. {
  107. GstFlowReturn ret = GST_FLOW_OK;
  108. g_mutex_lock(sem->mutex);
  109. while (sem->count == 0 &&
  110. (ret = g_atomic_int_get(status)) == GST_FLOW_OK)
  111. g_cond_wait(sem->condition, sem->mutex);
  112. if (ret == GST_FLOW_OK)
  113. sem->count--;
  114. g_mutex_unlock(sem->mutex);
  115. }
  116. static inline void
  117. g_sem_signal(GSem *sem)
  118. {
  119. g_mutex_lock(sem->mutex);
  120. g_cond_signal(sem->condition);
  121. g_mutex_unlock(sem->mutex);
  122. }
  123. static inline void
  124. g_sem_reset(GSem *sem,
  125. guint count)
  126. {
  127. g_mutex_lock(sem->mutex);
  128. sem->count = count;
  129. g_mutex_unlock(sem->mutex);
  130. }
  131. typedef struct {
  132. union {
  133. struct {
  134. uint32_t buffer_data;
  135. uint32_t buffer_size;
  136. uint32_t param_data;
  137. uint32_t param_size;
  138. uint32_t buffer_len;
  139. uint32_t silly_eos;
  140. uint32_t silly_buf_state;
  141. uint32_t silly_buf_active;
  142. uint32_t silly_buf_id;
  143. uint32_t nb_available_buf;
  144. uint32_t donot_flush_buf;
  145. uint32_t donot_invalidate_buf;
  146. uint32_t reserved;
  147. uint32_t msg_virt;
  148. uint32_t buffer_virt;
  149. uint32_t param_virt;
  150. uint32_t silly_out_buffer_index;
  151. uint32_t silly_in_buffer_index;
  152. uint32_t user_data;
  153. uint32_t stream_id;
  154. }v2;
  155. struct {
  156. uint32_t buffer_data;
  157. uint32_t buffer_size;
  158. uint32_t param_data;
  159. uint32_t param_size;
  160. uint32_t buffer_len;
  161. uint32_t silly_eos;
  162. uint32_t silly_buf_state;
  163. uint32_t silly_buf_active;
  164. uint32_t silly_buf_id;
  165. uint32_t reserved;
  166. uint32_t msg_virt;
  167. uint32_t buffer_virt;
  168. uint32_t param_virt;
  169. uint32_t silly_out_buffer_index;
  170. uint32_t silly_in_buffer_index;
  171. uint32_t user_data;
  172. uint32_t stream_id;
  173. }v0;
  174. }ver;
  175. } dsp_comm_t;
  176. #define DSP_COMM_VER(self,dsp_comm,value) \
  177. *((self->sn_api>=2)?&(dsp_comm->ver.v2.value):&(dsp_comm->ver.v0.value))
  178. static GstElementClass *parent_class;
  179. static inline void
  180. dsp_unlock(GstDspBase *self, gboolean unlock)
  181. {
  182. if (unlock) {
  183. async_queue_disable(self->ports[0]->queue);
  184. async_queue_disable(self->ports[1]->queue);
  185. if (GST_IS_DSP_IPP(self))
  186. async_queue_disable(((GstDspIpp *) self)->ipp_queue);
  187. } else {
  188. async_queue_enable(self->ports[0]->queue);
  189. async_queue_enable(self->ports[1]->queue);
  190. if (GST_IS_DSP_IPP(self))
  191. async_queue_enable(((GstDspIpp *) self)->ipp_queue);
  192. }
  193. }
  194. static inline void
  195. got_message(GstDspBase *self,
  196. struct dsp_msg *msg)
  197. {
  198. int32_t id;
  199. uint32_t command_id;
  200. id = msg->cmd & 0x000000ff;
  201. command_id = msg->cmd & 0xffffff00;
  202. switch (command_id) {
  203. case 0x0600: {
  204. dmm_buffer_t *b;
  205. du_port_t *p;
  206. dsp_comm_t *msg_data;
  207. dmm_buffer_t *param;
  208. unsigned i;
  209. struct td_buffer *tb = NULL;
  210. for (i = 0; i < ARRAY_SIZE(self->ports); i++)
  211. if (self->ports[i]->id == id) {
  212. p = self->ports[i];
  213. break;
  214. }
  215. if (i >= ARRAY_SIZE(self->ports))
  216. g_error("bad port index: %i", id);
  217. pr_debug(self, "got %s buffer", id == 0 ? "input" : "output");
  218. for (i = 0; i < p->num_buffers; i++) {
  219. if (msg->arg_1 == (uint32_t) p->buffers[i].comm->map) {
  220. tb = &p->buffers[i];
  221. break;
  222. }
  223. }
  224. if (!tb)
  225. g_error("buffer mismatch");
  226. dmm_buffer_end(tb->comm, tb->comm->size);
  227. msg_data = tb->comm->data;
  228. b = (void *) DSP_COMM_VER(self,msg_data,user_data);
  229. b->len = DSP_COMM_VER(self,msg_data,buffer_len);
  230. if (G_UNLIKELY(b->len > b->size))
  231. g_error("wrong buffer size");
  232. if (tb->pinned)
  233. dmm_buffer_end(b, b->len);
  234. else
  235. dmm_buffer_unmap(b);
  236. param = (void *) DSP_COMM_VER(self,msg_data,param_virt);
  237. if (param)
  238. dmm_buffer_end(param, param->size);
  239. /* clear time so sn might set its own */
  240. if (id != 0 && tb->user_data)
  241. GST_BUFFER_TIMESTAMP(tb->user_data) = GST_CLOCK_TIME_NONE;
  242. if (p->recv_cb)
  243. p->recv_cb(self, tb);
  244. if (id == 0) {
  245. if (tb->user_data) {
  246. gst_buffer_unref(tb->user_data);
  247. tb->user_data = NULL;
  248. }
  249. }
  250. async_queue_push(p->queue, tb);
  251. break;
  252. }
  253. case 0x0500:
  254. pr_debug(self, "got flush");
  255. break;
  256. case 0x0200:
  257. pr_debug(self, "got stop");
  258. g_sem_up(self->flush);
  259. break;
  260. case 0x0400:
  261. pr_debug(self, "got alg ctrl");
  262. dmm_buffer_free(self->alg_ctrl);
  263. self->alg_ctrl = NULL;
  264. break;
  265. case 0x0e00:
  266. if (msg->arg_1 == 1 && msg->arg_2 == 0x0500) {
  267. pr_debug(self, "playback completed");
  268. break;
  269. }
  270. if (msg->arg_1 == 1 && (msg->arg_2 & 0x0600) == 0x0600) {
  271. struct td_codec *codec = self->codec;
  272. if (codec->update_params)
  273. codec->update_params(self, self->node, msg->arg_2);
  274. break;
  275. }
  276. pr_warning(self, "DSP event: cmd=0x%04X, arg1=%u, arg2=0x%04X",
  277. msg->cmd, msg->arg_1, msg->arg_2);
  278. if ((msg->arg_2 & 0x0F00) == 0x0F00)
  279. gstdsp_got_error(self, 0, "algo error");
  280. break;
  281. default:
  282. pr_warning(self, "unhandled command: %u", command_id);
  283. }
  284. }
  285. static inline void
  286. setup_buffers(GstDspBase *self)
  287. {
  288. GstBuffer *buf = NULL;
  289. dmm_buffer_t *b;
  290. du_port_t *p;
  291. guint i;
  292. p = self->ports[0];
  293. for (i = 0; i < p->num_buffers; i++) {
  294. p->buffers[i].data = b = dmm_buffer_new(self->dsp_handle, self->proc, p->dir);
  295. async_queue_push(p->queue, &p->buffers[i]);
  296. }
  297. p = self->ports[1];
  298. for (i = 0; i < p->num_buffers; i++) {
  299. struct td_buffer *tb = &p->buffers[i];
  300. tb->data = b = dmm_buffer_new(self->dsp_handle, self->proc, p->dir);
  301. if (self->use_pad_alloc) {
  302. GstFlowReturn ret;
  303. ret = gst_pad_alloc_buffer_and_set_caps(self->srcpad,
  304. GST_BUFFER_OFFSET_NONE,
  305. self->output_buffer_size,
  306. GST_PAD_CAPS(self->srcpad),
  307. &buf);
  308. /* might fail if not (yet) linked */
  309. if (G_UNLIKELY(ret != GST_FLOW_OK)) {
  310. pr_err(self, "couldn't allocate buffer: %s", gst_flow_get_name(ret));
  311. dmm_buffer_allocate(b, self->output_buffer_size);
  312. b->need_copy = true;
  313. } else {
  314. map_buffer(self, buf, tb);
  315. gst_buffer_unref(buf);
  316. }
  317. }
  318. else {
  319. dmm_buffer_allocate(b, self->output_buffer_size);
  320. if (self->use_pinned) {
  321. dmm_buffer_map(b);
  322. tb->pinned = tb->clean = true;
  323. }
  324. }
  325. self->send_buffer(self, tb);
  326. }
  327. }
  328. static inline void
  329. pause_task(GstDspBase *self, GstFlowReturn status)
  330. {
  331. bool deferred_eos;
  332. /* synchronize to ensure we are not dropping the EOS event */
  333. g_mutex_lock(self->ts_mutex);
  334. (void) g_atomic_int_compare_and_exchange(&self->status, GST_FLOW_OK, status);
  335. deferred_eos = g_atomic_int_compare_and_exchange(&self->deferred_eos, true, false);
  336. g_mutex_unlock(self->ts_mutex);
  337. pr_info(self, "pausing task; reason %s", gst_flow_get_name(status));
  338. gst_pad_pause_task(self->srcpad);
  339. /* avoid waiting for buffers that will never come */
  340. dsp_unlock(self, TRUE);
  341. /* there's a pending deferred EOS, it's now or never */
  342. if (deferred_eos) {
  343. pr_info(self, "send elapsed eos");
  344. check_elapsed(self);
  345. self->eos_start.tv_sec = self->eos_start.tv_nsec = 0;
  346. gst_pad_push_event(self->srcpad, gst_event_new_eos());
  347. g_atomic_int_set(&self->eos, true);
  348. }
  349. }
  350. static inline GstFlowReturn
  351. check_status(GstDspBase *self)
  352. {
  353. GstFlowReturn ret;
  354. ret = g_atomic_int_get(&self->status);
  355. if (G_UNLIKELY(ret != GST_FLOW_OK))
  356. pause_task(self, ret);
  357. return ret;
  358. }
  359. /* determine timestamp/duration for @out_buf using input @timestamp and @duration */
  360. static void
  361. do_timestamp(GstDspBase *self, GstBuffer *out_buf, GstClockTime timestamp, GstClockTime duration)
  362. {
  363. /* timestamp checking and heuristics */
  364. switch (g_atomic_int_get(&self->ts_mode)) {
  365. case TS_MODE_CHECK_OUT:
  366. /* maybe SN provided a valid one, fall-back to in ts otherwise */
  367. if (GST_BUFFER_TIMESTAMP_IS_VALID(out_buf)) {
  368. timestamp = GST_BUFFER_TIMESTAMP(out_buf);
  369. duration = GST_BUFFER_DURATION(out_buf);
  370. pr_debug(self, "SN ts %" GST_TIME_FORMAT, GST_TIME_ARGS(timestamp));
  371. }
  372. if (GST_CLOCK_TIME_IS_VALID(self->last_ts) && GST_CLOCK_TIME_IS_VALID(timestamp) &&
  373. self->last_ts > timestamp) {
  374. pr_debug(self, "SN ts out-of-order -> interpolate");
  375. g_atomic_int_set(&self->ts_mode, TS_MODE_INTERPOLATE);
  376. self->next_ts = GST_CLOCK_TIME_NONE;
  377. }
  378. self->last_ts = timestamp;
  379. break;
  380. case TS_MODE_INTERPOLATE: {
  381. gboolean keyframe = !GST_BUFFER_FLAG_IS_SET(out_buf, GST_BUFFER_FLAG_DELTA_UNIT);
  382. pr_debug(self, "interpolate: keyframe %d, next_ts %" GST_TIME_FORMAT,
  383. keyframe, GST_TIME_ARGS(self->next_ts));
  384. if (G_LIKELY(!keyframe && GST_CLOCK_TIME_IS_VALID(self->next_ts))) {
  385. pr_debug(self, "not keyframe: using interpolated ts");
  386. timestamp = self->next_ts;
  387. }
  388. if (G_LIKELY(GST_CLOCK_TIME_IS_VALID(duration) && GST_CLOCK_TIME_IS_VALID(timestamp)))
  389. self->next_ts = timestamp + duration;
  390. break;
  391. }
  392. default:
  393. break;
  394. }
  395. GST_BUFFER_TIMESTAMP(out_buf) = timestamp;
  396. GST_BUFFER_DURATION(out_buf) = duration;
  397. }
  398. /* to be called with ts_lock */
  399. static void
  400. process_event(GstDspBase *self, GstEvent *event)
  401. {
  402. switch (GST_EVENT_TYPE(event)) {
  403. case GST_EVENT_NEWSEGMENT: {
  404. GstFormat format;
  405. gdouble rate, arate;
  406. gint64 start, stop, time;
  407. gboolean update;
  408. GstSegment segment;
  409. gst_segment_init(&segment, GST_FORMAT_UNDEFINED);
  410. gst_event_parse_new_segment_full(event, &update, &rate, &arate, &format,
  411. &start, &stop, &time);
  412. gst_segment_set_newsegment_full(&segment, update, rate, arate, format,
  413. start, stop, time);
  414. GST_DEBUG_OBJECT(self, "applying format %d newsegment %" GST_SEGMENT_FORMAT, format,
  415. &segment);
  416. /* avoid (unlikely) format complaints */
  417. if (format != self->segment.format)
  418. gst_segment_init(&self->segment, GST_FORMAT_UNDEFINED);
  419. gst_segment_set_newsegment_full(&self->segment, update, rate, arate,
  420. format, start, stop, time);
  421. self->last_ts = GST_CLOCK_TIME_NONE;
  422. self->next_ts = GST_CLOCK_TIME_NONE;
  423. break;
  424. }
  425. default:
  426. break;
  427. }
  428. }
  429. static void
  430. push_events(GstDspBase *self)
  431. {
  432. GSList **events;
  433. gboolean flush_buffer;
  434. g_mutex_lock(self->ts_mutex);
  435. events = &self->ts_array[self->ts_out_pos].events;
  436. flush_buffer = (self->ts_out_pos != self->ts_push_pos);
  437. while (*events) {
  438. GstEvent *event;
  439. event = (*events)->data;
  440. *events = g_slist_delete_link(*events, *events);
  441. if (G_LIKELY(!flush_buffer)) {
  442. process_event(self, event);
  443. pr_debug(self, "pushing event: %s", GST_EVENT_TYPE_NAME(event));
  444. gst_pad_push_event(self->srcpad, event);
  445. } else {
  446. pr_debug(self, "ignored flushed event: %s", GST_EVENT_TYPE_NAME(event));
  447. gst_event_unref(event);
  448. }
  449. }
  450. g_mutex_unlock(self->ts_mutex);
  451. }
  452. /* some typical familiar code ... */
  453. /* returns TRUE if buffer is within segment, else FALSE.
  454. * if Buffer is on segment border, it's timestamp and duration will be clipped */
  455. static gboolean
  456. clip_video_buffer(GstDspBase *self, GstBuffer *buf)
  457. {
  458. gboolean res = TRUE;
  459. gint64 cstart, cstop;
  460. GstClockTime stop, in_ts, in_dur;
  461. in_ts = GST_BUFFER_TIMESTAMP(buf);
  462. in_dur = GST_BUFFER_DURATION(buf);
  463. GST_LOG_OBJECT(self,
  464. "timestamp:%" GST_TIME_FORMAT " , duration:%" GST_TIME_FORMAT,
  465. GST_TIME_ARGS(in_ts), GST_TIME_ARGS(in_dur));
  466. /* can't clip without TIME segment */
  467. if (G_UNLIKELY(self->segment.format != GST_FORMAT_TIME))
  468. goto exit;
  469. /* we need a start time */
  470. if (G_UNLIKELY(!GST_CLOCK_TIME_IS_VALID(in_ts)))
  471. goto exit;
  472. /* generate valid stop, if duration unknown, we have unknown stop */
  473. stop = GST_CLOCK_TIME_IS_VALID(in_dur) ?
  474. (in_ts + in_dur) : GST_CLOCK_TIME_NONE;
  475. /* now clip */
  476. res = gst_segment_clip(&self->segment, GST_FORMAT_TIME, in_ts, stop,
  477. &cstart, &cstop);
  478. if (G_UNLIKELY(!res))
  479. goto exit;
  480. /* we're pretty sure the duration of this buffer is not till the end of this
  481. * segment (which _clip will assume when the stop is -1) */
  482. if (stop == GST_CLOCK_TIME_NONE)
  483. cstop = GST_CLOCK_TIME_NONE;
  484. /* update timestamp and possibly duration if the clipped stop time is
  485. * valid */
  486. GST_BUFFER_TIMESTAMP(buf) = cstart;
  487. if (GST_CLOCK_TIME_IS_VALID(cstop))
  488. GST_BUFFER_DURATION(buf) = cstop - cstart;
  489. GST_LOG_OBJECT(self,
  490. "clipped timestamp:%" GST_TIME_FORMAT " , duration:%" GST_TIME_FORMAT,
  491. GST_TIME_ARGS(cstart), GST_TIME_ARGS(GST_BUFFER_DURATION(buf)));
  492. exit:
  493. GST_LOG_OBJECT(self, "%sdropping", (res ? "not " : ""));
  494. return res;
  495. }
  496. static void
  497. output_loop(gpointer data)
  498. {
  499. GstPad *pad;
  500. GstDspBase *self;
  501. GstFlowReturn ret = GST_FLOW_OK;
  502. GstBuffer *out_buf = NULL;
  503. dmm_buffer_t *b;
  504. gboolean flush_buffer;
  505. gboolean got_eos = FALSE;
  506. gboolean keyframe = FALSE;
  507. du_port_t *p;
  508. struct td_buffer *tb;
  509. bool handled;
  510. GstClockTime timestamp, duration;
  511. pad = data;
  512. self = GST_DSP_BASE(GST_OBJECT_PARENT(pad));
  513. p = self->ports[1];
  514. pr_debug(self, "begin");
  515. tb = async_queue_pop(p->queue);
  516. /*
  517. * queue might have been disabled above, so perhaps tb == NULL,
  518. * but then right here in between self->status may have been set to
  519. * OK by e.g. FLUSH_STOP
  520. */
  521. if (G_UNLIKELY(!tb)) {
  522. pr_info(self, "no buffer");
  523. ret = check_status(self);
  524. goto nok;
  525. }
  526. b = tb->data;
  527. ret = check_status(self);
  528. if (G_UNLIKELY(ret != GST_FLOW_OK)) {
  529. async_queue_push(p->queue, tb);
  530. goto end;
  531. }
  532. if (G_UNLIKELY(self->skip_hack_2 > 0)) {
  533. self->skip_hack_2--;
  534. goto leave;
  535. }
  536. /* check for too many buffers returned */
  537. g_mutex_lock(self->ts_mutex);
  538. if (G_UNLIKELY(b->len && !self->ts_count)) {
  539. pr_warning(self, "no timestamp; unexpected buffer");
  540. g_mutex_unlock(self->ts_mutex);
  541. goto leave;
  542. }
  543. g_mutex_unlock(self->ts_mutex);
  544. /* first clear pending events */
  545. push_events(self);
  546. /* a pending reallocation from the previous run */
  547. if (G_UNLIKELY(!b->data)) {
  548. dmm_buffer_allocate(b, self->output_buffer_size);
  549. send_buffer(self, tb);
  550. goto end;
  551. }
  552. if (G_UNLIKELY(!b->len)) {
  553. /* no need to process this buffer */
  554. if (G_UNLIKELY(b->skip)) {
  555. b->skip = FALSE;
  556. g_mutex_lock(self->ts_mutex);
  557. flush_buffer = (self->ts_out_pos != self->ts_push_pos);
  558. self->ts_out_pos = (self->ts_out_pos + 1) % ARRAY_SIZE(self->ts_array);
  559. self->ts_count--;
  560. if (G_LIKELY(!flush_buffer))
  561. self->ts_push_pos = self->ts_out_pos;
  562. if (G_UNLIKELY(g_atomic_int_get(&self->deferred_eos)) && self->ts_count == 0)
  563. got_eos = TRUE;
  564. g_mutex_unlock(self->ts_mutex);
  565. }
  566. /* no real frame data, so no need to consume a real frame's ts */
  567. goto leave;
  568. }
  569. g_mutex_lock(self->ts_mutex);
  570. flush_buffer = (self->ts_out_pos != self->ts_push_pos);
  571. g_mutex_unlock(self->ts_mutex);
  572. if (G_UNLIKELY(flush_buffer)) {
  573. g_mutex_lock(self->ts_mutex);
  574. pr_debug(self, "ignored flushed output buffer for %" GST_TIME_FORMAT,
  575. GST_TIME_ARGS((self->ts_array[self->ts_out_pos].time)));
  576. self->ts_count--;
  577. self->ts_out_pos = (self->ts_out_pos + 1) % ARRAY_SIZE(self->ts_array);
  578. if (G_UNLIKELY(g_atomic_int_get(&self->deferred_eos)) && self->ts_count == 0)
  579. got_eos = TRUE;
  580. g_mutex_unlock(self->ts_mutex);
  581. goto leave;
  582. }
  583. /* now go after the data, but let's first see if it is keyframe */
  584. keyframe = tb->keyframe;
  585. if (self->use_pad_alloc) {
  586. GstBuffer *new_buf;
  587. ret = gst_pad_alloc_buffer_and_set_caps(self->srcpad,
  588. GST_BUFFER_OFFSET_NONE,
  589. self->output_buffer_size,
  590. GST_PAD_CAPS(self->srcpad),
  591. &new_buf);
  592. if (G_UNLIKELY(ret != GST_FLOW_OK)) {
  593. pr_info(self, "couldn't allocate buffer: %s", gst_flow_get_name(ret));
  594. async_queue_push(p->queue, tb);
  595. goto nok;
  596. }
  597. if (tb->user_data) {
  598. out_buf = tb->user_data;
  599. tb->user_data = NULL;
  600. }
  601. else
  602. out_buf = new_buf;
  603. if (b->need_copy) {
  604. pr_info(self, "copy");
  605. memcpy(GST_BUFFER_DATA(out_buf), b->data, b->len);
  606. }
  607. GST_BUFFER_SIZE(out_buf) = b->len;
  608. if (out_buf != new_buf) {
  609. map_buffer(self, new_buf, tb);
  610. gst_buffer_unref(new_buf);
  611. }
  612. }
  613. else {
  614. /* this should only happen in overwrite ipp case */
  615. if (G_UNLIKELY(tb->user_data && !tb->pinned)) {
  616. /* let's make sure */
  617. g_assert(GST_IS_DSP_IPP(self));
  618. g_assert(self->use_pinned);
  619. out_buf = tb->user_data;
  620. tb->user_data = NULL;
  621. pr_debug(self, "re-using output buffer %p", out_buf);
  622. } else
  623. out_buf = gst_dsp_buffer_new(self, tb);
  624. if (!self->use_pinned)
  625. /* invalidate data to force reallocation */
  626. b->data = b->allocated_data = NULL;
  627. }
  628. if (G_UNLIKELY(self->skip_hack > 0)) {
  629. self->skip_hack--;
  630. gst_buffer_unref(out_buf);
  631. goto leave;
  632. }
  633. if (!keyframe)
  634. GST_BUFFER_FLAGS(out_buf) |= GST_BUFFER_FLAG_DELTA_UNIT;
  635. g_mutex_lock(self->ts_mutex);
  636. timestamp = self->ts_array[self->ts_out_pos].time;
  637. duration = self->ts_array[self->ts_out_pos].duration;
  638. pr_debug(self, "in ts %" GST_TIME_FORMAT, GST_TIME_ARGS(timestamp));
  639. self->ts_out_pos = (self->ts_out_pos + 1) % ARRAY_SIZE(self->ts_array);
  640. self->ts_push_pos = self->ts_out_pos;
  641. self->ts_count--;
  642. g_cond_signal(self->ts_cond);
  643. if (G_UNLIKELY(g_atomic_int_get(&self->deferred_eos)) && self->ts_count == 0)
  644. got_eos = TRUE;
  645. #ifdef TS_COUNT
  646. if (self->ts_count > 2 || self->ts_count < 1)
  647. pr_info(self, "tsc=%lu", self->ts_count);
  648. #endif
  649. g_mutex_unlock(self->ts_mutex);
  650. if (!GST_CLOCK_TIME_IS_VALID(duration) && self->default_duration) {
  651. duration = self->default_duration;
  652. pr_debug(self, "using default duration %" GST_TIME_FORMAT, GST_TIME_ARGS(duration));
  653. }
  654. else if (GST_CLOCK_TIME_IS_VALID(duration) && !self->default_duration)
  655. self->default_duration = duration;
  656. do_timestamp(self, out_buf, timestamp, duration);
  657. /* segment clipping */
  658. if (GST_IS_DSP_VDEC(self)) {
  659. if (G_UNLIKELY(!clip_video_buffer(self, out_buf))) {
  660. gst_buffer_unref(out_buf);
  661. goto leave;
  662. }
  663. }
  664. pr_debug(self, "pushing buffer %" GST_TIME_FORMAT,
  665. GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(out_buf)));
  666. ret = gst_pad_push(self->srcpad, out_buf);
  667. if (G_UNLIKELY(ret != GST_FLOW_OK)) {
  668. pr_info(self, "pad push failed: %s", gst_flow_get_name(ret));
  669. goto leave;
  670. }
  671. leave:
  672. handled = tb->pinned && out_buf;
  673. if (G_UNLIKELY(got_eos)) {
  674. pr_info(self, "got eos");
  675. check_elapsed(self);
  676. self->eos_start.tv_sec = self->eos_start.tv_nsec = 0;
  677. gst_pad_push_event(self->srcpad, gst_event_new_eos());
  678. g_atomic_int_set(&self->eos, true);
  679. g_atomic_int_set(&self->deferred_eos, false);
  680. ret = GST_FLOW_UNEXPECTED;
  681. if (self->use_pinned) {
  682. if (!handled)
  683. self->send_buffer(self, tb);
  684. goto nok;
  685. }
  686. /*
  687. * We don't want to allocate data unnecessarily; postpone after
  688. * EOS and flush.
  689. */
  690. if (b->data)
  691. send_buffer(self, tb);
  692. else
  693. /* we'll need to allocate on the next run */
  694. async_queue_push(p->queue, tb);
  695. }
  696. else {
  697. if (self->use_pinned) {
  698. if (!handled)
  699. self->send_buffer(self, tb);
  700. goto nok;
  701. }
  702. if (!b->data)
  703. dmm_buffer_allocate(b, self->output_buffer_size);
  704. self->send_buffer(self, tb);
  705. }
  706. nok:
  707. if (G_UNLIKELY(ret != GST_FLOW_OK))
  708. pause_task(self, ret);
  709. end:
  710. pr_debug(self, "end");
  711. }
  712. void
  713. gstdsp_base_flush_buffer(GstDspBase *self)
  714. {
  715. struct td_buffer *tb;
  716. tb = async_queue_pop(self->ports[0]->queue);
  717. if (!tb)
  718. return;
  719. dmm_buffer_allocate(tb->data, 1);
  720. send_buffer(self, tb);
  721. }
  722. void
  723. gstdsp_post_error(GstDspBase *self,
  724. const char *message)
  725. {
  726. GError *gerror;
  727. GstMessage *gst_msg;
  728. if (self->dsp_error == GSTDSP_ERROR_BUSY)
  729. gerror = g_error_new_literal(GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_BUSY, message);
  730. else
  731. gerror = g_error_new_literal(GST_STREAM_ERROR, GST_STREAM_ERROR_FAILED, message);
  732. gst_msg = gst_message_new_error(GST_OBJECT(self), gerror, NULL);
  733. gst_element_post_message(GST_ELEMENT(self), gst_msg);
  734. g_error_free(gerror);
  735. }
  736. void
  737. gstdsp_got_error(GstDspBase *self,
  738. guint id,
  739. const char *message)
  740. {
  741. pr_err(self, "%s", message);
  742. self->dsp_error = id;
  743. gstdsp_post_error(self, message);
  744. g_atomic_int_set(&self->status, GST_FLOW_ERROR);
  745. dsp_unlock(self, TRUE);
  746. }
  747. static gpointer
  748. dsp_thread(gpointer data)
  749. {
  750. GstDspBase *self = data;
  751. pr_info(self, "begin");
  752. while (!self->done) {
  753. unsigned int index = 0;
  754. pr_debug(self, "waiting for events");
  755. if (!dsp_wait_for_events(self->dsp_handle, self->events, 3, &index, 10000)) {
  756. int dsp_error = GSTDSP_ERROR_OTHER;
  757. if (errno == ETIME) {
  758. long elapsed = get_elapsed_eos(self);
  759. pr_info(self, "timed out waiting for events");
  760. if (self->eos_timeout && elapsed >= self->eos_timeout) {
  761. pr_err(self, "eos timed out after %lu ms", elapsed);
  762. /* wind out of output loop */
  763. g_atomic_int_set(&self->status, GST_FLOW_UNEXPECTED);
  764. async_queue_disable(self->ports[1]->queue);
  765. }
  766. continue;
  767. } else if (errno == EBUSY) {
  768. pr_info(self, "preempted");
  769. dsp_error = GSTDSP_ERROR_BUSY;
  770. self->busy = true;
  771. }
  772. pr_err(self, "failed waiting for events: %i", errno);
  773. gstdsp_got_error(self, dsp_error, "unable to get event");
  774. break;
  775. }
  776. if (index == 0) {
  777. struct dsp_msg msg;
  778. while (true) {
  779. if (!dsp_node_get_message(self->dsp_handle, self->node, &msg, 10))
  780. break;
  781. pr_debug(self, "got dsp message: 0x%0x 0x%0x 0x%0x",
  782. msg.cmd, msg.arg_1, msg.arg_2);
  783. self->got_message(self, &msg);
  784. }
  785. }
  786. else if (index == 1) {
  787. gstdsp_got_error(self, GSTDSP_ERROR_DSP_MMUFAULT, "got DSP MMUFAULT");
  788. break;
  789. }
  790. else if (index == 2) {
  791. gstdsp_got_error(self, GSTDSP_ERROR_DSP_SYSERROR, "got DSP SYSERROR");
  792. break;
  793. }
  794. else {
  795. gstdsp_got_error(self, GSTDSP_ERROR_DSP_UNKNOWN, "wrong event index");
  796. break;
  797. }
  798. }
  799. pr_info(self, "end");
  800. return NULL;
  801. }
  802. static inline bool
  803. destroy_node(GstDspBase *self)
  804. {
  805. if (self->node) {
  806. if (!dsp_node_free(self->dsp_handle, self->node)) {
  807. pr_err(self, "dsp node free failed");
  808. return false;
  809. }
  810. pr_info(self, "dsp node deleted");
  811. }
  812. return true;
  813. }
  814. static gboolean
  815. dsp_init(GstDspBase *self)
  816. {
  817. int dsp_handle;
  818. self->dsp_handle = dsp_handle = dsp_open();
  819. if (dsp_handle < 0) {
  820. pr_err(self, "dsp open failed");
  821. return FALSE;
  822. }
  823. if (!dsp_attach(dsp_handle, 0, NULL, &self->proc)) {
  824. pr_err(self, "dsp attach failed");
  825. goto fail;
  826. }
  827. return TRUE;
  828. fail:
  829. self->proc = NULL;
  830. if (self->dsp_handle >= 0) {
  831. if (dsp_close(dsp_handle) < 0)
  832. pr_err(self, "dsp close failed");
  833. self->dsp_handle = -1;
  834. }
  835. return FALSE;
  836. }
  837. static gboolean
  838. dsp_deinit(GstDspBase *self)
  839. {
  840. gboolean ret = TRUE;
  841. if (self->dsp_error)
  842. goto leave;
  843. leave:
  844. self->proc = NULL;
  845. if (self->dsp_handle >= 0) {
  846. if (dsp_close(self->dsp_handle) < 0) {
  847. pr_err(self, "dsp close failed");
  848. ret = FALSE;
  849. }
  850. self->dsp_handle = -1;
  851. }
  852. return ret;
  853. }
  854. static bool
  855. send_play_message(GstDspBase *self)
  856. {
  857. return dsp_send_message(self->dsp_handle, self->node, 0x0100, 0, 0);
  858. };
  859. gboolean
  860. gstdsp_start(GstDspBase *self)
  861. {
  862. struct td_codec *codec = self->codec;
  863. bool ret = true;
  864. guint i;
  865. for (i = 0; i < ARRAY_SIZE(self->ports); i++) {
  866. du_port_t *p = self->ports[i];
  867. guint j;
  868. for (j = 0; j < p->num_buffers; j++) {
  869. struct td_buffer *tb = &p->buffers[j];
  870. tb->comm = dmm_buffer_new(self->dsp_handle, self->proc, DMA_BIDIRECTIONAL);
  871. dmm_buffer_allocate(tb->comm, sizeof(*tb->comm));
  872. dmm_buffer_map(tb->comm);
  873. }
  874. }
  875. if (!dsp_node_run(self->dsp_handle, self->node)) {
  876. pr_err(self, "dsp node run failed");
  877. return false;
  878. }
  879. pr_info(self, "dsp node running");
  880. self->events[0] = calloc(1, sizeof(struct dsp_notification));
  881. if (!dsp_node_register_notify(self->dsp_handle, self->node,
  882. DSP_NODEMESSAGEREADY, 1,
  883. self->events[0]))
  884. {
  885. pr_err(self, "failed to register for notifications");
  886. return false;
  887. }
  888. self->events[1] = calloc(1, sizeof(struct dsp_notification));
  889. if (!dsp_register_notify(self->dsp_handle, self->proc,
  890. DSP_MMUFAULT, 1,
  891. self->events[1]))
  892. {
  893. pr_err(self, "failed to register for DSP_MMUFAULT");
  894. return false;
  895. }
  896. self->events[2] = calloc(1, sizeof(struct dsp_notification));
  897. if (!dsp_register_notify(self->dsp_handle, self->proc,
  898. DSP_SYSERROR, 1,
  899. self->events[2]))
  900. {
  901. pr_err(self, "failed to register for DSP_SYSERROR");
  902. return false;
  903. }
  904. pr_info(self, "creating dsp thread");
  905. self->dsp_thread = g_thread_create(dsp_thread, self, TRUE, NULL);
  906. gst_pad_start_task(self->srcpad, output_loop, self->srcpad);
  907. if(!self->send_play_message(self))
  908. {
  909. pr_err(self, "failed to send play message");
  910. return false;
  911. }
  912. setup_buffers(self);
  913. if (self->codec_data) {
  914. GstBuffer *buf = self->codec_data;
  915. self->codec_data = NULL;
  916. if (codec->handle_extra_data)
  917. ret = codec->handle_extra_data(self, buf);
  918. else
  919. ret = gstdsp_send_codec_data(self, buf);
  920. gst_buffer_unref(buf);
  921. }
  922. return ret;
  923. }
  924. static bool
  925. send_stop_message(GstDspBase *self)
  926. {
  927. if (dsp_send_message(self->dsp_handle, self->node, 0x0200, 0, 0))
  928. if (!self->busy && !g_sem_down_timed(self->flush, 2))
  929. pr_warning(self, "timed out waiting for DSP STOP");
  930. /** @todo find a way to stop wait_for_events */
  931. return true;
  932. };
  933. static gboolean
  934. _dsp_stop(GstDspBase *self)
  935. {
  936. unsigned long exit_status;
  937. unsigned i;
  938. if (!self->node)
  939. return TRUE;
  940. if (!self->dsp_error) {
  941. self->send_stop_message(self);
  942. self->done = TRUE;
  943. }
  944. g_thread_join(self->dsp_thread);
  945. gst_pad_stop_task(self->srcpad);
  946. for (i = 0; i < ARRAY_SIZE(self->ports); i++)
  947. du_port_flush(self->ports[i]);
  948. for (i = 0; i < ARRAY_SIZE(self->ports); i++) {
  949. guint j;
  950. du_port_t *port = self->ports[i];
  951. for (j = 0; j < port->num_buffers; j++) {
  952. dmm_buffer_free(port->buffers[j].params);
  953. port->buffers[j].params = NULL;
  954. }
  955. }
  956. for (i = 0; i < ARRAY_SIZE(self->ts_array); i++) {
  957. GSList **events = &self->ts_array[i].events;
  958. if (*events) {
  959. g_slist_foreach(*events, (GFunc) gst_event_unref, NULL);
  960. g_slist_free(*events);
  961. *events = NULL;
  962. }
  963. }
  964. self->ts_in_pos = self->ts_out_pos = self->ts_push_pos = 0;
  965. self->ts_count = 0;
  966. self->skip_hack = 0;
  967. self->skip_hack_2 = 0;
  968. for (i = 0; i < ARRAY_SIZE(self->events); i++) {
  969. free(self->events[i]);
  970. self->events[i] = NULL;
  971. }
  972. if (self->alg_ctrl) {
  973. dmm_buffer_free(self->alg_ctrl);
  974. self->alg_ctrl = NULL;
  975. }
  976. if (self->dsp_error)
  977. goto leave;
  978. if (!dsp_node_terminate(self->dsp_handle, self->node, &exit_status))
  979. pr_err(self, "dsp node terminate failed: 0x%lx", exit_status);
  980. leave:
  981. if (!destroy_node(self))
  982. pr_err(self, "dsp node destroy failed");
  983. self->node = NULL;
  984. for (i = 0; i < ARRAY_SIZE(self->ports); i++) {
  985. du_port_t *p = self->ports[i];
  986. guint j;
  987. for (j = 0; j < p->num_buffers; j++) {
  988. dmm_buffer_free(p->buffers[j].comm);
  989. p->buffers[j].comm = NULL;
  990. }
  991. du_port_alloc_buffers(p, 0);
  992. }
  993. pr_info(self, "dsp node terminated");
  994. return TRUE;
  995. }
  996. gboolean gstdsp_need_node_reset(GstDspBase *base, GstCaps *new_caps, gint w, gint h)
  997. {
  998. gint width, height;
  999. GstStructure *struc;
  1000. if (G_UNLIKELY(!base->node))
  1001. return FALSE;
  1002. struc = gst_caps_get_structure(new_caps, 0);
  1003. gst_structure_get_int(struc, "width", &width);
  1004. gst_structure_get_int(struc, "height", &height);
  1005. if (w == width && h == height)
  1006. return FALSE;
  1007. return TRUE;
  1008. }
  1009. gboolean gstdsp_reinit(GstDspBase *self)
  1010. {
  1011. /* deinit */
  1012. g_atomic_int_set(&self->status, GST_FLOW_WRONG_STATE);
  1013. dsp_unlock(self, TRUE);
  1014. /* ends buffer recycling */
  1015. g_mutex_lock(self->pool_mutex);
  1016. self->cycle++;
  1017. g_mutex_unlock(self->pool_mutex);
  1018. /* push optional remaining events */
  1019. gst_pad_pause_task(self->srcpad);
  1020. push_events(self);
  1021. if (!_dsp_stop(self))
  1022. gstdsp_post_error(self, "dsp stop failed");
  1023. if (self->reset)
  1024. self->reset(self);
  1025. gst_caps_replace(&self->tmp_caps, NULL);
  1026. /* init */
  1027. g_atomic_int_set(&self->status, GST_FLOW_OK);
  1028. self->done = FALSE;
  1029. dsp_unlock(self, FALSE);
  1030. return true;
  1031. }
  1032. static inline void
  1033. map_buffer(GstDspBase *self,
  1034. GstBuffer *g_buf,
  1035. struct td_buffer *tb)
  1036. {
  1037. if (gstdsp_map_buffer(self, g_buf, tb->data))
  1038. tb->user_data = g_buf;
  1039. }
  1040. static inline GstFlowReturn send_buffer(GstDspBase *self, struct td_buffer *tb)
  1041. {
  1042. dsp_comm_t *msg_data;
  1043. du_port_t *port = tb->port;
  1044. int index = port->id;
  1045. dmm_buffer_t *buffer = tb->data;
  1046. pr_debug(self, "sending %s buffer", index == 0 ? "input" : "output");
  1047. msg_data = tb->comm->data;
  1048. if (port->send_cb)
  1049. port->send_cb(self, tb);
  1050. if (tb->params)
  1051. dmm_buffer_begin(tb->params, tb->params->size);
  1052. if (tb->pinned) {
  1053. if (G_LIKELY(!tb->clean))
  1054. dmm_buffer_begin(buffer, buffer->len);
  1055. else
  1056. tb->clean = false;
  1057. } else {
  1058. dmm_buffer_map(buffer);
  1059. }
  1060. memset(msg_data, 0, sizeof(*msg_data));
  1061. DSP_COMM_VER(self,msg_data,buffer_data) = (uint32_t) buffer->map;
  1062. DSP_COMM_VER(self,msg_data,buffer_size) = buffer->size;
  1063. DSP_COMM_VER(self,msg_data,stream_id) = port->id;
  1064. DSP_COMM_VER(self,msg_data,buffer_len) = index == 0 ? buffer->len : 0;
  1065. DSP_COMM_VER(self,msg_data,user_data) = (uint32_t) buffer;
  1066. if (tb->params) {
  1067. DSP_COMM_VER(self,msg_data,param_data) = (uint32_t) tb->params->map;
  1068. DSP_COMM_VER(self,msg_data,param_size) = tb->params->len;
  1069. DSP_COMM_VER(self,msg_data,param_virt) = (uint32_t) tb->params;
  1070. }
  1071. dmm_buffer_begin(tb->comm, sizeof(*msg_data));
  1072. dsp_send_message(self->dsp_handle, self->node,
  1073. 0x0600 | port->id, (uint32_t) tb->comm->map, 0);
  1074. return GST_FLOW_OK;
  1075. }
  1076. void
  1077. gstdsp_send_alg_ctrl(GstDspBase *self,
  1078. struct dsp_node *node,
  1079. dmm_buffer_t *b)
  1080. {
  1081. self->alg_ctrl = b;
  1082. dmm_buffer_map(b);
  1083. dsp_send_message(self->dsp_handle, node,
  1084. 0x0400, 3, (uint32_t) b->map);
  1085. }
  1086. static inline bool check_dsp_preemption(GstDspBase *self)
  1087. {
  1088. if (errno == EBUSY) {
  1089. pr_info(self, "preempted");
  1090. self->busy = true;
  1091. gstdsp_got_error(self, GSTDSP_ERROR_BUSY, "dsp init failed");
  1092. return true;
  1093. }
  1094. return false;
  1095. }
  1096. static GstStateChangeReturn
  1097. change_state(GstElement *element,
  1098. GstStateChange transition)
  1099. {
  1100. GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
  1101. GstDspBase *self;
  1102. self = GST_DSP_BASE(element);
  1103. pr_info(self, "%s -> %s",
  1104. gst_element_state_get_name(GST_STATE_TRANSITION_CURRENT(transition)),
  1105. gst_element_state_get_name(GST_STATE_TRANSITION_NEXT(transition)));
  1106. switch (transition) {
  1107. case GST_STATE_CHANGE_NULL_TO_READY:
  1108. if (!dsp_init(self)) {
  1109. if (!check_dsp_preemption(self))
  1110. gstdsp_post_error(self, "dsp init failed");
  1111. }
  1112. break;
  1113. case GST_STATE_CHANGE_READY_TO_PAUSED:
  1114. self->status = GST_FLOW_OK;
  1115. self->done = FALSE;
  1116. dsp_unlock(self, FALSE);
  1117. self->deferred_eos = false;
  1118. self->eos = false;
  1119. self->last_ts = GST_CLOCK_TIME_NONE;
  1120. break;
  1121. case GST_STATE_CHANGE_PAUSED_TO_READY:
  1122. g_atomic_int_set(&self->status, GST_FLOW_WRONG_STATE);
  1123. dsp_unlock(self, TRUE);
  1124. break;
  1125. default:
  1126. break;
  1127. }
  1128. ret = parent_class->change_state(element, transition);
  1129. if (ret == GST_STATE_CHANGE_FAILURE)
  1130. return ret;
  1131. switch (transition) {
  1132. case GST_STATE_CHANGE_PAUSED_TO_READY:
  1133. /* ends buffer recycling */
  1134. g_mutex_lock(self->pool_mutex);
  1135. self->cycle++;
  1136. g_mutex_unlock(self->pool_mutex);
  1137. if (!_dsp_stop(self))
  1138. gstdsp_post_error(self, "dsp stop failed");
  1139. if (self->reset)
  1140. self->reset(self);
  1141. gst_caps_replace(&self->tmp_caps, NULL);
  1142. break;
  1143. case GST_STATE_CHANGE_READY_TO_NULL:
  1144. if (!dsp_deinit(self))
  1145. gstdsp_post_error(self, "dsp deinit failed");
  1146. break;
  1147. default:
  1148. break;
  1149. }
  1150. return ret;
  1151. }
  1152. static inline gboolean
  1153. init_node(GstDspBase *self,
  1154. GstBuffer *buf)
  1155. {
  1156. if (self->parse_func) {
  1157. if (self->codec_data && self->parse_func(self, self->codec_data))
  1158. goto ok;
  1159. if (self->parse_func(self, buf))
  1160. goto ok;
  1161. pr_err(self, "error while parsing");
  1162. }
  1163. ok:
  1164. #ifdef DEBUG
  1165. {
  1166. gchar *str = gst_caps_to_string(self->tmp_caps);
  1167. pr_info(self, "src caps: %s", str);
  1168. g_free(str);
  1169. }
  1170. #endif
  1171. if (!gst_pad_set_caps(self->srcpad, self->tmp_caps)) {
  1172. pr_err(self, "couldn't setup output caps");
  1173. return FALSE;
  1174. }
  1175. if (!self->output_buffer_size)
  1176. return FALSE;
  1177. self->node = self->create_node(self);
  1178. if (!self->node) {
  1179. pr_err(self, "dsp node creation failed");
  1180. return FALSE;
  1181. }
  1182. if (!gstdsp_start(self)) {
  1183. pr_err(self, "dsp start failed");
  1184. return FALSE;
  1185. }
  1186. return TRUE;
  1187. }
  1188. gboolean
  1189. gstdsp_send_codec_data(GstDspBase *self,
  1190. GstBuffer *buf)
  1191. {
  1192. struct td_buffer *tb;
  1193. /*
  1194. * codec-data must make it through as part of setcaps setup,
  1195. * otherwise node will miss (likely vital) config data,
  1196. * Since the port's async_queue might be disabled/flushing,
  1197. * we forcibly pop a buffer here.
  1198. */
  1199. tb = async_queue_pop_forced(self->ports[0]->queue);
  1200. /* there should always be one available, as we are just starting */
  1201. g_assert(tb);
  1202. dmm_buffer_allocate(tb->data, GST_BUFFER_SIZE(buf));
  1203. memcpy(tb->data->data, GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf));
  1204. send_buffer(self, tb);
  1205. return TRUE;
  1206. }
  1207. static gboolean base_query(GstPad *pad, GstQuery *query)
  1208. {
  1209. GstDspBase *base = GST_DSP_BASE(GST_PAD_PARENT(pad));
  1210. gboolean res;
  1211. pr_debug(base, "handling %s query",
  1212. gst_query_type_get_name(GST_QUERY_TYPE(query)));
  1213. res = gst_pad_peer_query(base->sinkpad, query);
  1214. if (!res)
  1215. return FALSE;
  1216. switch (GST_QUERY_TYPE(query)) {
  1217. case GST_QUERY_LATENCY: {
  1218. gboolean live;
  1219. GstClockTime min, max;
  1220. GstClockTime frame_duration;
  1221. gst_query_parse_latency(query, &live, &min, &max);
  1222. pr_debug(base, "latency query live=%d, min=%" GST_TIME_FORMAT",max=%" GST_TIME_FORMAT,
  1223. live, GST_TIME_ARGS(min), GST_TIME_ARGS(max));
  1224. if (!base->codec) {
  1225. pr_debug(base, "no codec identified yet, bailing");
  1226. break;
  1227. }
  1228. if (base->default_duration) {
  1229. frame_duration = base->default_duration;
  1230. } else {
  1231. GstClockTime c, first, last;
  1232. unsigned i, count = 0;
  1233. /* find first and last timestamps */
  1234. g_mutex_lock(base->ts_mutex);
  1235. i = base->ts_out_pos;
  1236. first = last = base->ts_array[i].time;
  1237. while (i != base->ts_in_pos) {
  1238. c = base->ts_array[i].time;
  1239. if (c < first)
  1240. first = c;
  1241. if (c > last)
  1242. last = c;
  1243. i = (i + 1) % ARRAY_SIZE(base->ts_array);
  1244. count++;
  1245. }
  1246. g_mutex_unlock(base->ts_mutex);
  1247. if (count > 0)
  1248. frame_duration = (last - first) / count;
  1249. else
  1250. frame_duration = 0;
  1251. /* more than 1s per frame means something's wrong */
  1252. if (frame_duration > GST_SECOND)
  1253. frame_duration = GST_SECOND;
  1254. }
  1255. if (base->codec->get_latency) {
  1256. GstClockTime latency;
  1257. latency = base->codec->get_latency(base, frame_duration / 1000000) * 1000000;
  1258. /* really need to avoid doing stuff with _NONE */
  1259. if (GST_CLOCK_TIME_IS_VALID(min))
  1260. min += latency;
  1261. if (GST_CLOCK_TIME_IS_VALID(max))
  1262. max += latency;
  1263. }
  1264. pr_debug(base, "latency query after live=%d, min=%" GST_TIME_FORMAT",max=%" GST_TIME_FORMAT,
  1265. live, GST_TIME_ARGS(min), GST_TIME_ARGS(max));
  1266. gst_query_set_latency(query, live, min, max);
  1267. break;
  1268. }
  1269. default:
  1270. /* peer handles other queries */
  1271. break;
  1272. }
  1273. return TRUE;
  1274. }
  1275. gboolean
  1276. gstdsp_set_codec_data_caps(GstDspBase *base,
  1277. GstBuffer *buf)
  1278. {
  1279. GstCaps *caps = NULL;
  1280. GstStructure *structure;
  1281. GValue value = { .g_type = 0 };
  1282. caps = gst_pad_get_negotiated_caps(base->srcpad);
  1283. caps = gst_caps_make_writable(caps);
  1284. structure = gst_caps_get_structure(caps, 0);
  1285. g_value_init(&value, GST_TYPE_BUFFER);
  1286. gst_value_set_buffer(&value, buf);
  1287. gst_structure_set_value(structure, "codec_data", &value);
  1288. g_value_unset(&value);
  1289. return gst_pad_take_caps(base->srcpad, caps);
  1290. }
  1291. static GstFlowReturn
  1292. pad_chain(GstPad *pad,
  1293. GstBuffer *buf)
  1294. {
  1295. GstDspBase *self;
  1296. dmm_buffer_t *b;
  1297. GstFlowReturn ret = GST_FLOW_OK;
  1298. du_port_t *p;
  1299. struct td_buffer *tb;
  1300. self = GST_DSP_BASE(GST_OBJECT_PARENT(pad));
  1301. p = self->ports[0];
  1302. pr_debug(self, "begin");
  1303. if (G_UNLIKELY(GST_BUFFER_SIZE(buf) == 0)) {
  1304. /* 0 size buffers are used for fast negotiation in 0.10 */
  1305. pr_debug(self, "Got buffer of size 0, unref and return");
  1306. goto leave;
  1307. }
  1308. if (self->pre_process_buffer)
  1309. self->pre_process_buffer(self, buf);
  1310. if (G_UNLIKELY(!self->node)) {
  1311. if (!init_node(self, buf)) {
  1312. if (!check_dsp_preemption(self))
  1313. gstdsp_post_error(self, "couldn't start node");
  1314. ret = GST_FLOW_ERROR;
  1315. goto leave;
  1316. }
  1317. }
  1318. /*
  1319. * Check a few timestamps to see if we are dealing with PTS or DTS in order to
  1320. * activate the reordering logic or not.
  1321. */
  1322. if (self->ts_mode == TS_MODE_CHECK_IN) {
  1323. if (GST_CLOCK_TIME_IS_VALID(self->last_ts) &&
  1324. GST_BUFFER_TIMESTAMP_IS_VALID(buf) &&
  1325. self->last_ts > GST_BUFFER_TIMESTAMP(buf))
  1326. {
  1327. pr_debug(self, "in ts out-of-order -> sn ts");
  1328. self->last_ts = GST_CLOCK_TIME_NONE;
  1329. g_atomic_int_set(&self->ts_mode, TS_MODE_CHECK_OUT);
  1330. goto next;
  1331. }
  1332. self->last_ts = GST_BUFFER_TIMESTAMP(buf);
  1333. }
  1334. next:
  1335. tb = async_queue_pop(p->queue);
  1336. ret = g_atomic_int_get(&self->status);
  1337. if (ret != GST_FLOW_OK) {
  1338. pr_info(self, "status: %s", gst_flow_get_name(self->status));
  1339. if (tb)
  1340. async_queue_push(p->queue, tb);
  1341. goto leave;
  1342. }
  1343. b = tb->data;
  1344. if (GST_BUFFER_SIZE(buf) >= self->input_buffer_size)
  1345. map_buffer(self, buf, tb);
  1346. else {
  1347. dmm_buffer_allocate(b, self->input_buffer_size);
  1348. b->need_copy = true;
  1349. }
  1350. if (b->need_copy) {
  1351. pr_info(self, "copy");
  1352. memcpy(b->data, GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf));
  1353. /* clear state for next time decision */
  1354. b->need_copy = false;
  1355. }
  1356. g_mutex_lock(self->ts_mutex);
  1357. self->ts_array[self->ts_in_pos].time = GST_BUFFER_TIMESTAMP(buf);
  1358. self->ts_array[self->ts_in_pos].duration = GST_BUFFER_DURATION(buf);
  1359. b->ts_index = self->ts_in_pos;
  1360. self->ts_in_pos = (self->ts_in_pos + 1) % ARRAY_SIZE(self->ts_array);
  1361. self->ts_count++;
  1362. g_mutex_unlock(self->ts_mutex);
  1363. ret = self->send_buffer(self, tb);
  1364. if (ret != GST_FLOW_OK) {
  1365. pr_info(self, "status: %s", gst_flow_get_name(self->status));
  1366. if (ret == GST_FLOW_ERROR)
  1367. gstdsp_post_error(self, "sending buffer failed");
  1368. }
  1369. leave:
  1370. gst_buffer_unref(buf);
  1371. pr_debug(self, "end");
  1372. return ret;
  1373. }
  1374. static gboolean
  1375. sink_event(GstDspBase *self,
  1376. GstEvent *event)
  1377. {
  1378. gboolean ret = TRUE;
  1379. pr_info(self, "event: %s", GST_EVENT_TYPE_NAME(event));
  1380. switch (GST_EVENT_TYPE(event)) {
  1381. case GST_EVENT_EOS: {
  1382. bool defer_eos = false;
  1383. g_mutex_lock(self->ts_mutex);
  1384. if (self->ts_count != 0)
  1385. defer_eos = true;
  1386. if (self->status != GST_FLOW_OK)
  1387. defer_eos = false;
  1388. g_atomic_int_set(&self->deferred_eos, defer_eos);
  1389. g_mutex_unlock(self->ts_mutex);
  1390. if (defer_eos) {
  1391. clock_gettime(CLOCK_MONOTONIC, &self->eos_start);
  1392. if (self->flush_buffer)
  1393. self->flush_buffer(self);
  1394. gst_event_unref(event);
  1395. } else {
  1396. ret = gst_pad_push_event(self->srcpad, event);
  1397. g_atomic_int_set(&self->eos, true);
  1398. }
  1399. break;
  1400. }
  1401. case GST_EVENT_FLUSH_START:
  1402. ret = gst_pad_push_event(self->srcpad, event);
  1403. g_atomic_int_set(&self->status, GST_FLOW_WRONG_STATE);
  1404. dsp_unlock(self, TRUE);
  1405. gst_pad_pause_task(self->srcpad);
  1406. break;
  1407. case GST_EVENT_FLUSH_STOP: {
  1408. GSList **events;
  1409. ret = gst_pad_push_event(self->srcpad, event);
  1410. g_atomic_int_set(&self->eos, false);
  1411. g_mutex_lock(self->ts_mutex);
  1412. /*
  1413. * Flush the current list of pending events, just in case
  1414. * somebody is doing something crazy.
  1415. */
  1416. events = &self->ts_array[self->ts_in_pos].events;
  1417. if (*events) {
  1418. g_slist_foreach(*events, (GFunc) gst_event_unref, NULL);
  1419. g_slist_free(*events);
  1420. *events = NULL;
  1421. }
  1422. self->ts_push_pos = self->ts_in_pos;
  1423. pr_debug(self, "flushing next %u buffer(s)",
  1424. self->ts_push_pos - self->ts_out_pos);
  1425. g_atomic_int_set(&self->deferred_eos, false);
  1426. g_mutex_unlock(self->ts_mutex);
  1427. g_atomic_int_set(&self->status, GST_FLOW_OK);
  1428. dsp_unlock(self, FALSE);
  1429. self->last_ts = GST_CLOCK_TIME_NONE;
  1430. self->next_ts = GST_CLOCK_TIME_NONE;
  1431. gst_pad_start_task(self->srcpad, output_loop, self->srcpad);
  1432. break;
  1433. }
  1434. default:
  1435. if (!GST_EVENT_IS_SERIALIZED(event)) {
  1436. ret = gst_pad_push_event(self->srcpad, event);
  1437. } else {
  1438. GSList **events;
  1439. g_mutex_lock(self->ts_mutex);
  1440. pr_debug(self, "storing event");
  1441. events = &self->ts_array[self->ts_in_pos].events;
  1442. *events = g_slist_append(*events, event);
  1443. g_mutex_unlock(self->ts_mutex);
  1444. }
  1445. break;
  1446. }
  1447. return ret;
  1448. }
  1449. static gboolean
  1450. src_event(GstDspBase *self,
  1451. GstEvent *event)
  1452. {
  1453. return gst_pad_push_event(self->sinkpad, event);
  1454. }
  1455. static gboolean
  1456. base_sink_event(GstPad *pad,
  1457. GstEvent *event)
  1458. {
  1459. GstDspBase *self;
  1460. GstDspBaseClass *class;
  1461. gboolean ret = TRUE;
  1462. self = GST_DSP_BASE(gst_pad_get_parent(pad));
  1463. class = GST_DSP_BASE_GET_CLASS(self);
  1464. if (class->sink_event)
  1465. ret = class->sink_event(self, event);
  1466. gst_object_unref(self);
  1467. return ret;
  1468. }
  1469. static gboolean
  1470. base_src_event(GstPad *pad,
  1471. GstEvent *event)
  1472. {
  1473. GstDspBase *self;
  1474. GstDspBaseClass *class;
  1475. gboolean ret = TRUE;
  1476. self = GST_DSP_BASE(gst_pad_get_parent(pad));
  1477. class = GST_DSP_BASE_GET_CLASS(self);
  1478. if (class->src_event)
  1479. ret = class->src_event(self, event);
  1480. gst_object_unref(self);
  1481. return ret;
  1482. }
  1483. static void
  1484. instance_init(GTypeInstance *instance,
  1485. gpointer g_class)
  1486. {
  1487. GstDspBase *self;
  1488. GstElementClass *element_class;
  1489. GstPadTemplate *template;
  1490. element_class = GST_ELEMENT_CLASS(g_class);
  1491. self = GST_DSP_BASE(instance);
  1492. self->ports[0] = du_port_new(0, DMA_TO_DEVICE);
  1493. self->ports[1] = du_port_new(1, DMA_FROM_DEVICE);
  1494. self->ts_mode = TS_MODE_PASS;
  1495. self->got_message = got_message;
  1496. self->send_buffer = send_buffer;
  1497. self->send_play_message = send_play_message;
  1498. self->send_stop_message = send_stop_message;
  1499. template = gst_element_class_get_pad_template(element_class, "sink");
  1500. self->sinkpad = gst_pad_new_from_template(template, "sink");
  1501. gst_pad_set_chain_function(self->sinkpad, pad_chain);
  1502. gst_pad_set_event_function(self->sinkpad, base_sink_event);
  1503. template = gst_element_class_get_pad_template(element_class, "src");
  1504. self->srcpad = gst_pad_new_from_template(template, "src");
  1505. gst_pad_use_fixed_caps(self->srcpad);
  1506. gst_pad_set_event_function(self->srcpad, base_src_event);
  1507. gst_pad_set_query_function(self->srcpad, base_query);
  1508. gst_element_add_pad(GST_ELEMENT(self), self->sinkpad);
  1509. gst_element_add_pad(GST_ELEMENT(self), self->srcpad);
  1510. self->ts_mutex = g_mutex_new();
  1511. self->pool_mutex = g_mutex_new();
  1512. self->ts_cond = g_cond_new();
  1513. self->flush = g_sem_new(0);
  1514. self->eos_timeout = 10000;
  1515. gst_segment_init(&self->segment, GST_FORMAT_UNDEFINED);
  1516. }
  1517. static void
  1518. finalize(GObject *obj)
  1519. {
  1520. GstDspBase *self;
  1521. self = GST_DSP_BASE(obj);
  1522. g_sem_free(self->flush);
  1523. g_mutex_free(self->ts_mutex);
  1524. g_mutex_free(self->pool_mutex);
  1525. g_cond_free(self->ts_cond);
  1526. du_port_free(self->ports[1]);
  1527. du_port_free(self->ports[0]);
  1528. G_OBJECT_CLASS(parent_class)->finalize(obj);
  1529. }
  1530. static void
  1531. class_init(gpointer g_class,
  1532. gpointer class_data)
  1533. {
  1534. GstElementClass *gstelement_class;
  1535. GObjectClass *gobject_class;
  1536. GstDspBaseClass *class;
  1537. parent_class = g_type_class_peek_parent(g_class);
  1538. gstelement_class = GST_ELEMENT_CLASS(g_class);
  1539. gobject_class = G_OBJECT_CLASS(g_class);
  1540. class = GST_DSP_BASE_CLASS(g_class);
  1541. gstelement_class->change_state = change_state;
  1542. gobject_class->finalize = finalize;
  1543. class->sink_event = sink_event;
  1544. class->src_event = src_event;
  1545. }
  1546. GType
  1547. gst_dsp_base_get_type(void)
  1548. {
  1549. static GType type;
  1550. if (G_UNLIKELY(type == 0)) {
  1551. GTypeInfo type_info = {
  1552. .class_size = sizeof(GstDspBaseClass),
  1553. .class_init = class_init,
  1554. .instance_size = sizeof(GstDspBase),
  1555. .instance_init = instance_init,
  1556. };
  1557. type = g_type_register_static(GST_TYPE_ELEMENT, "GstDspBase", &type_info, 0);
  1558. /* cache buffer type */
  1559. gst_dsp_buffer_get_type();
  1560. }
  1561. return type;
  1562. }