123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896 |
- /*
- * Copyright (C) 2009-2010 Felipe Contreras
- *
- * Author: Felipe Contreras <felipe.contreras@gmail.com>
- *
- * This file may be used under the terms of the GNU Lesser General Public
- * License version 2.1, a copy of which is found in LICENSE included in the
- * packaging of this file.
- */
- #include "gstdspbase.h"
- #include "gstdspvdec.h"
- #include "gstdspbuffer.h"
- #include "gstdspipp.h"
- #include "plugin.h"
- #include "dsp_bridge.h"
- #include <string.h> /* for memcpy */
- #include "util.h"
- #include "log.h"
- #define GST_CAT_DEFAULT gstdsp_debug
- /* gst-dsp errors */
- enum {
- GSTDSP_ERROR_NONE,
- GSTDSP_ERROR_DSP_MMUFAULT,
- GSTDSP_ERROR_DSP_SYSERROR,
- GSTDSP_ERROR_DSP_UNKNOWN,
- GSTDSP_ERROR_OTHER,
- GSTDSP_ERROR_BUSY,
- };
- static inline GstFlowReturn send_buffer(GstDspBase *self, struct td_buffer *tb);
- static inline void
- map_buffer(GstDspBase *self,
- GstBuffer *g_buf,
- struct td_buffer *tb);
- struct td_codec td_fake_codec = {
- .uuid = NULL,
- };
- static inline long
- get_elapsed_eos(GstDspBase *self)
- {
- struct timespec cur;
- long start, elapsed;
- if (!self->eos_start.tv_sec)
- return 0;
- clock_gettime(CLOCK_MONOTONIC, &cur);
- start = self->eos_start.tv_sec * 1000 + self->eos_start.tv_nsec / 1000000;
- elapsed = cur.tv_sec * 1000 + cur.tv_nsec / 1000000 - start;
- return elapsed;
- }
- static inline void
- check_elapsed(GstDspBase *self)
- {
- long elapsed = get_elapsed_eos(self);
- if (elapsed > 500)
- pr_warning(self, "eos took %lu ms", elapsed);
- }
- du_port_t *
- du_port_new(int id,
- int dir)
- {
- du_port_t *p;
- p = calloc(1, sizeof(*p));
- if (!p)
- return NULL;
- p->id = id;
- p->queue = async_queue_new();
- p->dir = dir;
- return p;
- }
- void
- du_port_free(du_port_t *p)
- {
- if (!p)
- return;
- free(p->buffers);
- async_queue_free(p->queue);
- free(p);
- }
- void
- du_port_alloc_buffers(du_port_t *p, guint num_buffers)
- {
- p->num_buffers = num_buffers;
- free(p->buffers);
- p->buffers = calloc(num_buffers, sizeof(*p->buffers));
- for (unsigned i = 0; i < p->num_buffers; i++)
- p->buffers[i].port = p;
- }
- static inline void
- du_port_flush(du_port_t *p)
- {
- guint i;
- struct td_buffer *tb = p->buffers;
- for (i = 0; i < p->num_buffers; i++, tb++) {
- dmm_buffer_t *b = tb->data;
- if (!b)
- continue;
- if (tb->user_data)
- gst_buffer_unref(tb->user_data);
- dmm_buffer_free(b);
- tb->data = NULL;
- }
- async_queue_flush(p->queue);
- }
- static inline void
- g_sem_down_status(GSem *sem,
- const GstFlowReturn *status)
- {
- GstFlowReturn ret = GST_FLOW_OK;
- g_mutex_lock(sem->mutex);
- while (sem->count == 0 &&
- (ret = g_atomic_int_get(status)) == GST_FLOW_OK)
- g_cond_wait(sem->condition, sem->mutex);
- if (ret == GST_FLOW_OK)
- sem->count--;
- g_mutex_unlock(sem->mutex);
- }
- static inline void
- g_sem_signal(GSem *sem)
- {
- g_mutex_lock(sem->mutex);
- g_cond_signal(sem->condition);
- g_mutex_unlock(sem->mutex);
- }
- static inline void
- g_sem_reset(GSem *sem,
- guint count)
- {
- g_mutex_lock(sem->mutex);
- sem->count = count;
- g_mutex_unlock(sem->mutex);
- }
- typedef struct {
- union {
- struct {
- uint32_t buffer_data;
- uint32_t buffer_size;
- uint32_t param_data;
- uint32_t param_size;
- uint32_t buffer_len;
- uint32_t silly_eos;
- uint32_t silly_buf_state;
- uint32_t silly_buf_active;
- uint32_t silly_buf_id;
- uint32_t nb_available_buf;
- uint32_t donot_flush_buf;
- uint32_t donot_invalidate_buf;
- uint32_t reserved;
- uint32_t msg_virt;
- uint32_t buffer_virt;
- uint32_t param_virt;
- uint32_t silly_out_buffer_index;
- uint32_t silly_in_buffer_index;
- uint32_t user_data;
- uint32_t stream_id;
- }v2;
- struct {
- uint32_t buffer_data;
- uint32_t buffer_size;
- uint32_t param_data;
- uint32_t param_size;
- uint32_t buffer_len;
- uint32_t silly_eos;
- uint32_t silly_buf_state;
- uint32_t silly_buf_active;
- uint32_t silly_buf_id;
- uint32_t reserved;
- uint32_t msg_virt;
- uint32_t buffer_virt;
- uint32_t param_virt;
- uint32_t silly_out_buffer_index;
- uint32_t silly_in_buffer_index;
- uint32_t user_data;
- uint32_t stream_id;
- }v0;
- }ver;
- } dsp_comm_t;
- #define DSP_COMM_VER(self,dsp_comm,value) \
- *((self->sn_api>=2)?&(dsp_comm->ver.v2.value):&(dsp_comm->ver.v0.value))
- static GstElementClass *parent_class;
- static inline void
- dsp_unlock(GstDspBase *self, gboolean unlock)
- {
- if (unlock) {
- async_queue_disable(self->ports[0]->queue);
- async_queue_disable(self->ports[1]->queue);
- if (GST_IS_DSP_IPP(self))
- async_queue_disable(((GstDspIpp *) self)->ipp_queue);
- } else {
- async_queue_enable(self->ports[0]->queue);
- async_queue_enable(self->ports[1]->queue);
- if (GST_IS_DSP_IPP(self))
- async_queue_enable(((GstDspIpp *) self)->ipp_queue);
- }
- }
- static inline void
- got_message(GstDspBase *self,
- struct dsp_msg *msg)
- {
- int32_t id;
- uint32_t command_id;
- id = msg->cmd & 0x000000ff;
- command_id = msg->cmd & 0xffffff00;
- switch (command_id) {
- case 0x0600: {
- dmm_buffer_t *b;
- du_port_t *p;
- dsp_comm_t *msg_data;
- dmm_buffer_t *param;
- unsigned i;
- struct td_buffer *tb = NULL;
- for (i = 0; i < ARRAY_SIZE(self->ports); i++)
- if (self->ports[i]->id == id) {
- p = self->ports[i];
- break;
- }
- if (i >= ARRAY_SIZE(self->ports))
- g_error("bad port index: %i", id);
- pr_debug(self, "got %s buffer", id == 0 ? "input" : "output");
- for (i = 0; i < p->num_buffers; i++) {
- if (msg->arg_1 == (uint32_t) p->buffers[i].comm->map) {
- tb = &p->buffers[i];
- break;
- }
- }
- if (!tb)
- g_error("buffer mismatch");
- dmm_buffer_end(tb->comm, tb->comm->size);
- msg_data = tb->comm->data;
- b = (void *) DSP_COMM_VER(self,msg_data,user_data);
- b->len = DSP_COMM_VER(self,msg_data,buffer_len);
- if (G_UNLIKELY(b->len > b->size))
- g_error("wrong buffer size");
- if (tb->pinned)
- dmm_buffer_end(b, b->len);
- else
- dmm_buffer_unmap(b);
- param = (void *) DSP_COMM_VER(self,msg_data,param_virt);
- if (param)
- dmm_buffer_end(param, param->size);
- /* clear time so sn might set its own */
- if (id != 0 && tb->user_data)
- GST_BUFFER_TIMESTAMP(tb->user_data) = GST_CLOCK_TIME_NONE;
- if (p->recv_cb)
- p->recv_cb(self, tb);
- if (id == 0) {
- if (tb->user_data) {
- gst_buffer_unref(tb->user_data);
- tb->user_data = NULL;
- }
- }
- async_queue_push(p->queue, tb);
- break;
- }
- case 0x0500:
- pr_debug(self, "got flush");
- break;
- case 0x0200:
- pr_debug(self, "got stop");
- g_sem_up(self->flush);
- break;
- case 0x0400:
- pr_debug(self, "got alg ctrl");
- dmm_buffer_free(self->alg_ctrl);
- self->alg_ctrl = NULL;
- break;
- case 0x0e00:
- if (msg->arg_1 == 1 && msg->arg_2 == 0x0500) {
- pr_debug(self, "playback completed");
- break;
- }
- if (msg->arg_1 == 1 && (msg->arg_2 & 0x0600) == 0x0600) {
- struct td_codec *codec = self->codec;
- if (codec->update_params)
- codec->update_params(self, self->node, msg->arg_2);
- break;
- }
- pr_warning(self, "DSP event: cmd=0x%04X, arg1=%u, arg2=0x%04X",
- msg->cmd, msg->arg_1, msg->arg_2);
- if ((msg->arg_2 & 0x0F00) == 0x0F00)
- gstdsp_got_error(self, 0, "algo error");
- break;
- default:
- pr_warning(self, "unhandled command: %u", command_id);
- }
- }
- static inline void
- setup_buffers(GstDspBase *self)
- {
- GstBuffer *buf = NULL;
- dmm_buffer_t *b;
- du_port_t *p;
- guint i;
- p = self->ports[0];
- for (i = 0; i < p->num_buffers; i++) {
- p->buffers[i].data = b = dmm_buffer_new(self->dsp_handle, self->proc, p->dir);
- async_queue_push(p->queue, &p->buffers[i]);
- }
- p = self->ports[1];
- for (i = 0; i < p->num_buffers; i++) {
- struct td_buffer *tb = &p->buffers[i];
- tb->data = b = dmm_buffer_new(self->dsp_handle, self->proc, p->dir);
- if (self->use_pad_alloc) {
- GstFlowReturn ret;
- ret = gst_pad_alloc_buffer_and_set_caps(self->srcpad,
- GST_BUFFER_OFFSET_NONE,
- self->output_buffer_size,
- GST_PAD_CAPS(self->srcpad),
- &buf);
- /* might fail if not (yet) linked */
- if (G_UNLIKELY(ret != GST_FLOW_OK)) {
- pr_err(self, "couldn't allocate buffer: %s", gst_flow_get_name(ret));
- dmm_buffer_allocate(b, self->output_buffer_size);
- b->need_copy = true;
- } else {
- map_buffer(self, buf, tb);
- gst_buffer_unref(buf);
- }
- }
- else {
- dmm_buffer_allocate(b, self->output_buffer_size);
- if (self->use_pinned) {
- dmm_buffer_map(b);
- tb->pinned = tb->clean = true;
- }
- }
- self->send_buffer(self, tb);
- }
- }
- static inline void
- pause_task(GstDspBase *self, GstFlowReturn status)
- {
- bool deferred_eos;
- /* synchronize to ensure we are not dropping the EOS event */
- g_mutex_lock(self->ts_mutex);
- (void) g_atomic_int_compare_and_exchange(&self->status, GST_FLOW_OK, status);
- deferred_eos = g_atomic_int_compare_and_exchange(&self->deferred_eos, true, false);
- g_mutex_unlock(self->ts_mutex);
- pr_info(self, "pausing task; reason %s", gst_flow_get_name(status));
- gst_pad_pause_task(self->srcpad);
- /* avoid waiting for buffers that will never come */
- dsp_unlock(self, TRUE);
- /* there's a pending deferred EOS, it's now or never */
- if (deferred_eos) {
- pr_info(self, "send elapsed eos");
- check_elapsed(self);
- self->eos_start.tv_sec = self->eos_start.tv_nsec = 0;
- gst_pad_push_event(self->srcpad, gst_event_new_eos());
- g_atomic_int_set(&self->eos, true);
- }
- }
- static inline GstFlowReturn
- check_status(GstDspBase *self)
- {
- GstFlowReturn ret;
- ret = g_atomic_int_get(&self->status);
- if (G_UNLIKELY(ret != GST_FLOW_OK))
- pause_task(self, ret);
- return ret;
- }
- /* determine timestamp/duration for @out_buf using input @timestamp and @duration */
- static void
- do_timestamp(GstDspBase *self, GstBuffer *out_buf, GstClockTime timestamp, GstClockTime duration)
- {
- /* timestamp checking and heuristics */
- switch (g_atomic_int_get(&self->ts_mode)) {
- case TS_MODE_CHECK_OUT:
- /* maybe SN provided a valid one, fall-back to in ts otherwise */
- if (GST_BUFFER_TIMESTAMP_IS_VALID(out_buf)) {
- timestamp = GST_BUFFER_TIMESTAMP(out_buf);
- duration = GST_BUFFER_DURATION(out_buf);
- pr_debug(self, "SN ts %" GST_TIME_FORMAT, GST_TIME_ARGS(timestamp));
- }
- if (GST_CLOCK_TIME_IS_VALID(self->last_ts) && GST_CLOCK_TIME_IS_VALID(timestamp) &&
- self->last_ts > timestamp) {
- pr_debug(self, "SN ts out-of-order -> interpolate");
- g_atomic_int_set(&self->ts_mode, TS_MODE_INTERPOLATE);
- self->next_ts = GST_CLOCK_TIME_NONE;
- }
- self->last_ts = timestamp;
- break;
- case TS_MODE_INTERPOLATE: {
- gboolean keyframe = !GST_BUFFER_FLAG_IS_SET(out_buf, GST_BUFFER_FLAG_DELTA_UNIT);
- pr_debug(self, "interpolate: keyframe %d, next_ts %" GST_TIME_FORMAT,
- keyframe, GST_TIME_ARGS(self->next_ts));
- if (G_LIKELY(!keyframe && GST_CLOCK_TIME_IS_VALID(self->next_ts))) {
- pr_debug(self, "not keyframe: using interpolated ts");
- timestamp = self->next_ts;
- }
- if (G_LIKELY(GST_CLOCK_TIME_IS_VALID(duration) && GST_CLOCK_TIME_IS_VALID(timestamp)))
- self->next_ts = timestamp + duration;
- break;
- }
- default:
- break;
- }
- GST_BUFFER_TIMESTAMP(out_buf) = timestamp;
- GST_BUFFER_DURATION(out_buf) = duration;
- }
- /* to be called with ts_lock */
- static void
- process_event(GstDspBase *self, GstEvent *event)
- {
- switch (GST_EVENT_TYPE(event)) {
- case GST_EVENT_NEWSEGMENT: {
- GstFormat format;
- gdouble rate, arate;
- gint64 start, stop, time;
- gboolean update;
- GstSegment segment;
- gst_segment_init(&segment, GST_FORMAT_UNDEFINED);
- gst_event_parse_new_segment_full(event, &update, &rate, &arate, &format,
- &start, &stop, &time);
- gst_segment_set_newsegment_full(&segment, update, rate, arate, format,
- start, stop, time);
- GST_DEBUG_OBJECT(self, "applying format %d newsegment %" GST_SEGMENT_FORMAT, format,
- &segment);
- /* avoid (unlikely) format complaints */
- if (format != self->segment.format)
- gst_segment_init(&self->segment, GST_FORMAT_UNDEFINED);
- gst_segment_set_newsegment_full(&self->segment, update, rate, arate,
- format, start, stop, time);
- self->last_ts = GST_CLOCK_TIME_NONE;
- self->next_ts = GST_CLOCK_TIME_NONE;
- break;
- }
- default:
- break;
- }
- }
- static void
- push_events(GstDspBase *self)
- {
- GSList **events;
- gboolean flush_buffer;
- g_mutex_lock(self->ts_mutex);
- events = &self->ts_array[self->ts_out_pos].events;
- flush_buffer = (self->ts_out_pos != self->ts_push_pos);
- while (*events) {
- GstEvent *event;
- event = (*events)->data;
- *events = g_slist_delete_link(*events, *events);
- if (G_LIKELY(!flush_buffer)) {
- process_event(self, event);
- pr_debug(self, "pushing event: %s", GST_EVENT_TYPE_NAME(event));
- gst_pad_push_event(self->srcpad, event);
- } else {
- pr_debug(self, "ignored flushed event: %s", GST_EVENT_TYPE_NAME(event));
- gst_event_unref(event);
- }
- }
- g_mutex_unlock(self->ts_mutex);
- }
- /* some typical familiar code ... */
- /* returns TRUE if buffer is within segment, else FALSE.
- * if Buffer is on segment border, it's timestamp and duration will be clipped */
- static gboolean
- clip_video_buffer(GstDspBase *self, GstBuffer *buf)
- {
- gboolean res = TRUE;
- gint64 cstart, cstop;
- GstClockTime stop, in_ts, in_dur;
- in_ts = GST_BUFFER_TIMESTAMP(buf);
- in_dur = GST_BUFFER_DURATION(buf);
- GST_LOG_OBJECT(self,
- "timestamp:%" GST_TIME_FORMAT " , duration:%" GST_TIME_FORMAT,
- GST_TIME_ARGS(in_ts), GST_TIME_ARGS(in_dur));
- /* can't clip without TIME segment */
- if (G_UNLIKELY(self->segment.format != GST_FORMAT_TIME))
- goto exit;
- /* we need a start time */
- if (G_UNLIKELY(!GST_CLOCK_TIME_IS_VALID(in_ts)))
- goto exit;
- /* generate valid stop, if duration unknown, we have unknown stop */
- stop = GST_CLOCK_TIME_IS_VALID(in_dur) ?
- (in_ts + in_dur) : GST_CLOCK_TIME_NONE;
- /* now clip */
- res = gst_segment_clip(&self->segment, GST_FORMAT_TIME, in_ts, stop,
- &cstart, &cstop);
- if (G_UNLIKELY(!res))
- goto exit;
- /* we're pretty sure the duration of this buffer is not till the end of this
- * segment (which _clip will assume when the stop is -1) */
- if (stop == GST_CLOCK_TIME_NONE)
- cstop = GST_CLOCK_TIME_NONE;
- /* update timestamp and possibly duration if the clipped stop time is
- * valid */
- GST_BUFFER_TIMESTAMP(buf) = cstart;
- if (GST_CLOCK_TIME_IS_VALID(cstop))
- GST_BUFFER_DURATION(buf) = cstop - cstart;
- GST_LOG_OBJECT(self,
- "clipped timestamp:%" GST_TIME_FORMAT " , duration:%" GST_TIME_FORMAT,
- GST_TIME_ARGS(cstart), GST_TIME_ARGS(GST_BUFFER_DURATION(buf)));
- exit:
- GST_LOG_OBJECT(self, "%sdropping", (res ? "not " : ""));
- return res;
- }
- static void
- output_loop(gpointer data)
- {
- GstPad *pad;
- GstDspBase *self;
- GstFlowReturn ret = GST_FLOW_OK;
- GstBuffer *out_buf = NULL;
- dmm_buffer_t *b;
- gboolean flush_buffer;
- gboolean got_eos = FALSE;
- gboolean keyframe = FALSE;
- du_port_t *p;
- struct td_buffer *tb;
- bool handled;
- GstClockTime timestamp, duration;
- pad = data;
- self = GST_DSP_BASE(GST_OBJECT_PARENT(pad));
- p = self->ports[1];
- pr_debug(self, "begin");
- tb = async_queue_pop(p->queue);
- /*
- * queue might have been disabled above, so perhaps tb == NULL,
- * but then right here in between self->status may have been set to
- * OK by e.g. FLUSH_STOP
- */
- if (G_UNLIKELY(!tb)) {
- pr_info(self, "no buffer");
- ret = check_status(self);
- goto nok;
- }
- b = tb->data;
- ret = check_status(self);
- if (G_UNLIKELY(ret != GST_FLOW_OK)) {
- async_queue_push(p->queue, tb);
- goto end;
- }
- if (G_UNLIKELY(self->skip_hack_2 > 0)) {
- self->skip_hack_2--;
- goto leave;
- }
- /* check for too many buffers returned */
- g_mutex_lock(self->ts_mutex);
- if (G_UNLIKELY(b->len && !self->ts_count)) {
- pr_warning(self, "no timestamp; unexpected buffer");
- g_mutex_unlock(self->ts_mutex);
- goto leave;
- }
- g_mutex_unlock(self->ts_mutex);
- /* first clear pending events */
- push_events(self);
- /* a pending reallocation from the previous run */
- if (G_UNLIKELY(!b->data)) {
- dmm_buffer_allocate(b, self->output_buffer_size);
- send_buffer(self, tb);
- goto end;
- }
- if (G_UNLIKELY(!b->len)) {
- /* no need to process this buffer */
- if (G_UNLIKELY(b->skip)) {
- b->skip = FALSE;
- g_mutex_lock(self->ts_mutex);
- flush_buffer = (self->ts_out_pos != self->ts_push_pos);
- self->ts_out_pos = (self->ts_out_pos + 1) % ARRAY_SIZE(self->ts_array);
- self->ts_count--;
- if (G_LIKELY(!flush_buffer))
- self->ts_push_pos = self->ts_out_pos;
- if (G_UNLIKELY(g_atomic_int_get(&self->deferred_eos)) && self->ts_count == 0)
- got_eos = TRUE;
- g_mutex_unlock(self->ts_mutex);
- }
- /* no real frame data, so no need to consume a real frame's ts */
- goto leave;
- }
- g_mutex_lock(self->ts_mutex);
- flush_buffer = (self->ts_out_pos != self->ts_push_pos);
- g_mutex_unlock(self->ts_mutex);
- if (G_UNLIKELY(flush_buffer)) {
- g_mutex_lock(self->ts_mutex);
- pr_debug(self, "ignored flushed output buffer for %" GST_TIME_FORMAT,
- GST_TIME_ARGS((self->ts_array[self->ts_out_pos].time)));
- self->ts_count--;
- self->ts_out_pos = (self->ts_out_pos + 1) % ARRAY_SIZE(self->ts_array);
- if (G_UNLIKELY(g_atomic_int_get(&self->deferred_eos)) && self->ts_count == 0)
- got_eos = TRUE;
- g_mutex_unlock(self->ts_mutex);
- goto leave;
- }
- /* now go after the data, but let's first see if it is keyframe */
- keyframe = tb->keyframe;
- if (self->use_pad_alloc) {
- GstBuffer *new_buf;
- ret = gst_pad_alloc_buffer_and_set_caps(self->srcpad,
- GST_BUFFER_OFFSET_NONE,
- self->output_buffer_size,
- GST_PAD_CAPS(self->srcpad),
- &new_buf);
- if (G_UNLIKELY(ret != GST_FLOW_OK)) {
- pr_info(self, "couldn't allocate buffer: %s", gst_flow_get_name(ret));
- async_queue_push(p->queue, tb);
- goto nok;
- }
- if (tb->user_data) {
- out_buf = tb->user_data;
- tb->user_data = NULL;
- }
- else
- out_buf = new_buf;
- if (b->need_copy) {
- pr_info(self, "copy");
- memcpy(GST_BUFFER_DATA(out_buf), b->data, b->len);
- }
- GST_BUFFER_SIZE(out_buf) = b->len;
- if (out_buf != new_buf) {
- map_buffer(self, new_buf, tb);
- gst_buffer_unref(new_buf);
- }
- }
- else {
- /* this should only happen in overwrite ipp case */
- if (G_UNLIKELY(tb->user_data && !tb->pinned)) {
- /* let's make sure */
- g_assert(GST_IS_DSP_IPP(self));
- g_assert(self->use_pinned);
- out_buf = tb->user_data;
- tb->user_data = NULL;
- pr_debug(self, "re-using output buffer %p", out_buf);
- } else
- out_buf = gst_dsp_buffer_new(self, tb);
- if (!self->use_pinned)
- /* invalidate data to force reallocation */
- b->data = b->allocated_data = NULL;
- }
- if (G_UNLIKELY(self->skip_hack > 0)) {
- self->skip_hack--;
- gst_buffer_unref(out_buf);
- goto leave;
- }
- if (!keyframe)
- GST_BUFFER_FLAGS(out_buf) |= GST_BUFFER_FLAG_DELTA_UNIT;
- g_mutex_lock(self->ts_mutex);
- timestamp = self->ts_array[self->ts_out_pos].time;
- duration = self->ts_array[self->ts_out_pos].duration;
- pr_debug(self, "in ts %" GST_TIME_FORMAT, GST_TIME_ARGS(timestamp));
- self->ts_out_pos = (self->ts_out_pos + 1) % ARRAY_SIZE(self->ts_array);
- self->ts_push_pos = self->ts_out_pos;
- self->ts_count--;
- g_cond_signal(self->ts_cond);
- if (G_UNLIKELY(g_atomic_int_get(&self->deferred_eos)) && self->ts_count == 0)
- got_eos = TRUE;
- #ifdef TS_COUNT
- if (self->ts_count > 2 || self->ts_count < 1)
- pr_info(self, "tsc=%lu", self->ts_count);
- #endif
- g_mutex_unlock(self->ts_mutex);
- if (!GST_CLOCK_TIME_IS_VALID(duration) && self->default_duration) {
- duration = self->default_duration;
- pr_debug(self, "using default duration %" GST_TIME_FORMAT, GST_TIME_ARGS(duration));
- }
- else if (GST_CLOCK_TIME_IS_VALID(duration) && !self->default_duration)
- self->default_duration = duration;
- do_timestamp(self, out_buf, timestamp, duration);
- /* segment clipping */
- if (GST_IS_DSP_VDEC(self)) {
- if (G_UNLIKELY(!clip_video_buffer(self, out_buf))) {
- gst_buffer_unref(out_buf);
- goto leave;
- }
- }
- pr_debug(self, "pushing buffer %" GST_TIME_FORMAT,
- GST_TIME_ARGS(GST_BUFFER_TIMESTAMP(out_buf)));
- ret = gst_pad_push(self->srcpad, out_buf);
- if (G_UNLIKELY(ret != GST_FLOW_OK)) {
- pr_info(self, "pad push failed: %s", gst_flow_get_name(ret));
- goto leave;
- }
- leave:
- handled = tb->pinned && out_buf;
- if (G_UNLIKELY(got_eos)) {
- pr_info(self, "got eos");
- check_elapsed(self);
- self->eos_start.tv_sec = self->eos_start.tv_nsec = 0;
- gst_pad_push_event(self->srcpad, gst_event_new_eos());
- g_atomic_int_set(&self->eos, true);
- g_atomic_int_set(&self->deferred_eos, false);
- ret = GST_FLOW_UNEXPECTED;
- if (self->use_pinned) {
- if (!handled)
- self->send_buffer(self, tb);
- goto nok;
- }
- /*
- * We don't want to allocate data unnecessarily; postpone after
- * EOS and flush.
- */
- if (b->data)
- send_buffer(self, tb);
- else
- /* we'll need to allocate on the next run */
- async_queue_push(p->queue, tb);
- }
- else {
- if (self->use_pinned) {
- if (!handled)
- self->send_buffer(self, tb);
- goto nok;
- }
- if (!b->data)
- dmm_buffer_allocate(b, self->output_buffer_size);
- self->send_buffer(self, tb);
- }
- nok:
- if (G_UNLIKELY(ret != GST_FLOW_OK))
- pause_task(self, ret);
- end:
- pr_debug(self, "end");
- }
- void
- gstdsp_base_flush_buffer(GstDspBase *self)
- {
- struct td_buffer *tb;
- tb = async_queue_pop(self->ports[0]->queue);
- if (!tb)
- return;
- dmm_buffer_allocate(tb->data, 1);
- send_buffer(self, tb);
- }
- void
- gstdsp_post_error(GstDspBase *self,
- const char *message)
- {
- GError *gerror;
- GstMessage *gst_msg;
- if (self->dsp_error == GSTDSP_ERROR_BUSY)
- gerror = g_error_new_literal(GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_BUSY, message);
- else
- gerror = g_error_new_literal(GST_STREAM_ERROR, GST_STREAM_ERROR_FAILED, message);
- gst_msg = gst_message_new_error(GST_OBJECT(self), gerror, NULL);
- gst_element_post_message(GST_ELEMENT(self), gst_msg);
- g_error_free(gerror);
- }
- void
- gstdsp_got_error(GstDspBase *self,
- guint id,
- const char *message)
- {
- pr_err(self, "%s", message);
- self->dsp_error = id;
- gstdsp_post_error(self, message);
- g_atomic_int_set(&self->status, GST_FLOW_ERROR);
- dsp_unlock(self, TRUE);
- }
- static gpointer
- dsp_thread(gpointer data)
- {
- GstDspBase *self = data;
- pr_info(self, "begin");
- while (!self->done) {
- unsigned int index = 0;
- pr_debug(self, "waiting for events");
- if (!dsp_wait_for_events(self->dsp_handle, self->events, 3, &index, 10000)) {
- int dsp_error = GSTDSP_ERROR_OTHER;
- if (errno == ETIME) {
- long elapsed = get_elapsed_eos(self);
- pr_info(self, "timed out waiting for events");
- if (self->eos_timeout && elapsed >= self->eos_timeout) {
- pr_err(self, "eos timed out after %lu ms", elapsed);
- /* wind out of output loop */
- g_atomic_int_set(&self->status, GST_FLOW_UNEXPECTED);
- async_queue_disable(self->ports[1]->queue);
- }
- continue;
- } else if (errno == EBUSY) {
- pr_info(self, "preempted");
- dsp_error = GSTDSP_ERROR_BUSY;
- self->busy = true;
- }
- pr_err(self, "failed waiting for events: %i", errno);
- gstdsp_got_error(self, dsp_error, "unable to get event");
- break;
- }
- if (index == 0) {
- struct dsp_msg msg;
- while (true) {
- if (!dsp_node_get_message(self->dsp_handle, self->node, &msg, 10))
- break;
- pr_debug(self, "got dsp message: 0x%0x 0x%0x 0x%0x",
- msg.cmd, msg.arg_1, msg.arg_2);
- self->got_message(self, &msg);
- }
- }
- else if (index == 1) {
- gstdsp_got_error(self, GSTDSP_ERROR_DSP_MMUFAULT, "got DSP MMUFAULT");
- break;
- }
- else if (index == 2) {
- gstdsp_got_error(self, GSTDSP_ERROR_DSP_SYSERROR, "got DSP SYSERROR");
- break;
- }
- else {
- gstdsp_got_error(self, GSTDSP_ERROR_DSP_UNKNOWN, "wrong event index");
- break;
- }
- }
- pr_info(self, "end");
- return NULL;
- }
- static inline bool
- destroy_node(GstDspBase *self)
- {
- if (self->node) {
- if (!dsp_node_free(self->dsp_handle, self->node)) {
- pr_err(self, "dsp node free failed");
- return false;
- }
- pr_info(self, "dsp node deleted");
- }
- return true;
- }
- static gboolean
- dsp_init(GstDspBase *self)
- {
- int dsp_handle;
- self->dsp_handle = dsp_handle = dsp_open();
- if (dsp_handle < 0) {
- pr_err(self, "dsp open failed");
- return FALSE;
- }
- if (!dsp_attach(dsp_handle, 0, NULL, &self->proc)) {
- pr_err(self, "dsp attach failed");
- goto fail;
- }
- return TRUE;
- fail:
- self->proc = NULL;
- if (self->dsp_handle >= 0) {
- if (dsp_close(dsp_handle) < 0)
- pr_err(self, "dsp close failed");
- self->dsp_handle = -1;
- }
- return FALSE;
- }
- static gboolean
- dsp_deinit(GstDspBase *self)
- {
- gboolean ret = TRUE;
- if (self->dsp_error)
- goto leave;
- leave:
- self->proc = NULL;
- if (self->dsp_handle >= 0) {
- if (dsp_close(self->dsp_handle) < 0) {
- pr_err(self, "dsp close failed");
- ret = FALSE;
- }
- self->dsp_handle = -1;
- }
- return ret;
- }
- static bool
- send_play_message(GstDspBase *self)
- {
- return dsp_send_message(self->dsp_handle, self->node, 0x0100, 0, 0);
- };
- gboolean
- gstdsp_start(GstDspBase *self)
- {
- struct td_codec *codec = self->codec;
- bool ret = true;
- guint i;
- for (i = 0; i < ARRAY_SIZE(self->ports); i++) {
- du_port_t *p = self->ports[i];
- guint j;
- for (j = 0; j < p->num_buffers; j++) {
- struct td_buffer *tb = &p->buffers[j];
- tb->comm = dmm_buffer_new(self->dsp_handle, self->proc, DMA_BIDIRECTIONAL);
- dmm_buffer_allocate(tb->comm, sizeof(*tb->comm));
- dmm_buffer_map(tb->comm);
- }
- }
- if (!dsp_node_run(self->dsp_handle, self->node)) {
- pr_err(self, "dsp node run failed");
- return false;
- }
- pr_info(self, "dsp node running");
- self->events[0] = calloc(1, sizeof(struct dsp_notification));
- if (!dsp_node_register_notify(self->dsp_handle, self->node,
- DSP_NODEMESSAGEREADY, 1,
- self->events[0]))
- {
- pr_err(self, "failed to register for notifications");
- return false;
- }
- self->events[1] = calloc(1, sizeof(struct dsp_notification));
- if (!dsp_register_notify(self->dsp_handle, self->proc,
- DSP_MMUFAULT, 1,
- self->events[1]))
- {
- pr_err(self, "failed to register for DSP_MMUFAULT");
- return false;
- }
- self->events[2] = calloc(1, sizeof(struct dsp_notification));
- if (!dsp_register_notify(self->dsp_handle, self->proc,
- DSP_SYSERROR, 1,
- self->events[2]))
- {
- pr_err(self, "failed to register for DSP_SYSERROR");
- return false;
- }
- pr_info(self, "creating dsp thread");
- self->dsp_thread = g_thread_create(dsp_thread, self, TRUE, NULL);
- gst_pad_start_task(self->srcpad, output_loop, self->srcpad);
- if(!self->send_play_message(self))
- {
- pr_err(self, "failed to send play message");
- return false;
- }
- setup_buffers(self);
- if (self->codec_data) {
- GstBuffer *buf = self->codec_data;
- self->codec_data = NULL;
- if (codec->handle_extra_data)
- ret = codec->handle_extra_data(self, buf);
- else
- ret = gstdsp_send_codec_data(self, buf);
- gst_buffer_unref(buf);
- }
- return ret;
- }
- static bool
- send_stop_message(GstDspBase *self)
- {
- if (dsp_send_message(self->dsp_handle, self->node, 0x0200, 0, 0))
- if (!self->busy && !g_sem_down_timed(self->flush, 2))
- pr_warning(self, "timed out waiting for DSP STOP");
- /** @todo find a way to stop wait_for_events */
- return true;
- };
- static gboolean
- _dsp_stop(GstDspBase *self)
- {
- unsigned long exit_status;
- unsigned i;
- if (!self->node)
- return TRUE;
- if (!self->dsp_error) {
- self->send_stop_message(self);
- self->done = TRUE;
- }
- g_thread_join(self->dsp_thread);
- gst_pad_stop_task(self->srcpad);
- for (i = 0; i < ARRAY_SIZE(self->ports); i++)
- du_port_flush(self->ports[i]);
- for (i = 0; i < ARRAY_SIZE(self->ports); i++) {
- guint j;
- du_port_t *port = self->ports[i];
- for (j = 0; j < port->num_buffers; j++) {
- dmm_buffer_free(port->buffers[j].params);
- port->buffers[j].params = NULL;
- }
- }
- for (i = 0; i < ARRAY_SIZE(self->ts_array); i++) {
- GSList **events = &self->ts_array[i].events;
- if (*events) {
- g_slist_foreach(*events, (GFunc) gst_event_unref, NULL);
- g_slist_free(*events);
- *events = NULL;
- }
- }
- self->ts_in_pos = self->ts_out_pos = self->ts_push_pos = 0;
- self->ts_count = 0;
- self->skip_hack = 0;
- self->skip_hack_2 = 0;
- for (i = 0; i < ARRAY_SIZE(self->events); i++) {
- free(self->events[i]);
- self->events[i] = NULL;
- }
- if (self->alg_ctrl) {
- dmm_buffer_free(self->alg_ctrl);
- self->alg_ctrl = NULL;
- }
- if (self->dsp_error)
- goto leave;
- if (!dsp_node_terminate(self->dsp_handle, self->node, &exit_status))
- pr_err(self, "dsp node terminate failed: 0x%lx", exit_status);
- leave:
- if (!destroy_node(self))
- pr_err(self, "dsp node destroy failed");
- self->node = NULL;
- for (i = 0; i < ARRAY_SIZE(self->ports); i++) {
- du_port_t *p = self->ports[i];
- guint j;
- for (j = 0; j < p->num_buffers; j++) {
- dmm_buffer_free(p->buffers[j].comm);
- p->buffers[j].comm = NULL;
- }
- du_port_alloc_buffers(p, 0);
- }
- pr_info(self, "dsp node terminated");
- return TRUE;
- }
- gboolean gstdsp_need_node_reset(GstDspBase *base, GstCaps *new_caps, gint w, gint h)
- {
- gint width, height;
- GstStructure *struc;
- if (G_UNLIKELY(!base->node))
- return FALSE;
- struc = gst_caps_get_structure(new_caps, 0);
- gst_structure_get_int(struc, "width", &width);
- gst_structure_get_int(struc, "height", &height);
- if (w == width && h == height)
- return FALSE;
- return TRUE;
- }
- gboolean gstdsp_reinit(GstDspBase *self)
- {
- /* deinit */
- g_atomic_int_set(&self->status, GST_FLOW_WRONG_STATE);
- dsp_unlock(self, TRUE);
- /* ends buffer recycling */
- g_mutex_lock(self->pool_mutex);
- self->cycle++;
- g_mutex_unlock(self->pool_mutex);
- /* push optional remaining events */
- gst_pad_pause_task(self->srcpad);
- push_events(self);
- if (!_dsp_stop(self))
- gstdsp_post_error(self, "dsp stop failed");
- if (self->reset)
- self->reset(self);
- gst_caps_replace(&self->tmp_caps, NULL);
- /* init */
- g_atomic_int_set(&self->status, GST_FLOW_OK);
- self->done = FALSE;
- dsp_unlock(self, FALSE);
- return true;
- }
- static inline void
- map_buffer(GstDspBase *self,
- GstBuffer *g_buf,
- struct td_buffer *tb)
- {
- if (gstdsp_map_buffer(self, g_buf, tb->data))
- tb->user_data = g_buf;
- }
- static inline GstFlowReturn send_buffer(GstDspBase *self, struct td_buffer *tb)
- {
- dsp_comm_t *msg_data;
- du_port_t *port = tb->port;
- int index = port->id;
- dmm_buffer_t *buffer = tb->data;
- pr_debug(self, "sending %s buffer", index == 0 ? "input" : "output");
- msg_data = tb->comm->data;
- if (port->send_cb)
- port->send_cb(self, tb);
- if (tb->params)
- dmm_buffer_begin(tb->params, tb->params->size);
- if (tb->pinned) {
- if (G_LIKELY(!tb->clean))
- dmm_buffer_begin(buffer, buffer->len);
- else
- tb->clean = false;
- } else {
- dmm_buffer_map(buffer);
- }
- memset(msg_data, 0, sizeof(*msg_data));
- DSP_COMM_VER(self,msg_data,buffer_data) = (uint32_t) buffer->map;
- DSP_COMM_VER(self,msg_data,buffer_size) = buffer->size;
- DSP_COMM_VER(self,msg_data,stream_id) = port->id;
- DSP_COMM_VER(self,msg_data,buffer_len) = index == 0 ? buffer->len : 0;
- DSP_COMM_VER(self,msg_data,user_data) = (uint32_t) buffer;
- if (tb->params) {
- DSP_COMM_VER(self,msg_data,param_data) = (uint32_t) tb->params->map;
- DSP_COMM_VER(self,msg_data,param_size) = tb->params->len;
- DSP_COMM_VER(self,msg_data,param_virt) = (uint32_t) tb->params;
- }
- dmm_buffer_begin(tb->comm, sizeof(*msg_data));
- dsp_send_message(self->dsp_handle, self->node,
- 0x0600 | port->id, (uint32_t) tb->comm->map, 0);
- return GST_FLOW_OK;
- }
- void
- gstdsp_send_alg_ctrl(GstDspBase *self,
- struct dsp_node *node,
- dmm_buffer_t *b)
- {
- self->alg_ctrl = b;
- dmm_buffer_map(b);
- dsp_send_message(self->dsp_handle, node,
- 0x0400, 3, (uint32_t) b->map);
- }
- static inline bool check_dsp_preemption(GstDspBase *self)
- {
- if (errno == EBUSY) {
- pr_info(self, "preempted");
- self->busy = true;
- gstdsp_got_error(self, GSTDSP_ERROR_BUSY, "dsp init failed");
- return true;
- }
- return false;
- }
- static GstStateChangeReturn
- change_state(GstElement *element,
- GstStateChange transition)
- {
- GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
- GstDspBase *self;
- self = GST_DSP_BASE(element);
- pr_info(self, "%s -> %s",
- gst_element_state_get_name(GST_STATE_TRANSITION_CURRENT(transition)),
- gst_element_state_get_name(GST_STATE_TRANSITION_NEXT(transition)));
- switch (transition) {
- case GST_STATE_CHANGE_NULL_TO_READY:
- if (!dsp_init(self)) {
- if (!check_dsp_preemption(self))
- gstdsp_post_error(self, "dsp init failed");
- }
- break;
- case GST_STATE_CHANGE_READY_TO_PAUSED:
- self->status = GST_FLOW_OK;
- self->done = FALSE;
- dsp_unlock(self, FALSE);
- self->deferred_eos = false;
- self->eos = false;
- self->last_ts = GST_CLOCK_TIME_NONE;
- break;
- case GST_STATE_CHANGE_PAUSED_TO_READY:
- g_atomic_int_set(&self->status, GST_FLOW_WRONG_STATE);
- dsp_unlock(self, TRUE);
- break;
- default:
- break;
- }
- ret = parent_class->change_state(element, transition);
- if (ret == GST_STATE_CHANGE_FAILURE)
- return ret;
- switch (transition) {
- case GST_STATE_CHANGE_PAUSED_TO_READY:
- /* ends buffer recycling */
- g_mutex_lock(self->pool_mutex);
- self->cycle++;
- g_mutex_unlock(self->pool_mutex);
- if (!_dsp_stop(self))
- gstdsp_post_error(self, "dsp stop failed");
- if (self->reset)
- self->reset(self);
- gst_caps_replace(&self->tmp_caps, NULL);
- break;
- case GST_STATE_CHANGE_READY_TO_NULL:
- if (!dsp_deinit(self))
- gstdsp_post_error(self, "dsp deinit failed");
- break;
- default:
- break;
- }
- return ret;
- }
- static inline gboolean
- init_node(GstDspBase *self,
- GstBuffer *buf)
- {
- if (self->parse_func) {
- if (self->codec_data && self->parse_func(self, self->codec_data))
- goto ok;
- if (self->parse_func(self, buf))
- goto ok;
- pr_err(self, "error while parsing");
- }
- ok:
- #ifdef DEBUG
- {
- gchar *str = gst_caps_to_string(self->tmp_caps);
- pr_info(self, "src caps: %s", str);
- g_free(str);
- }
- #endif
- if (!gst_pad_set_caps(self->srcpad, self->tmp_caps)) {
- pr_err(self, "couldn't setup output caps");
- return FALSE;
- }
- if (!self->output_buffer_size)
- return FALSE;
- self->node = self->create_node(self);
- if (!self->node) {
- pr_err(self, "dsp node creation failed");
- return FALSE;
- }
- if (!gstdsp_start(self)) {
- pr_err(self, "dsp start failed");
- return FALSE;
- }
- return TRUE;
- }
- gboolean
- gstdsp_send_codec_data(GstDspBase *self,
- GstBuffer *buf)
- {
- struct td_buffer *tb;
- /*
- * codec-data must make it through as part of setcaps setup,
- * otherwise node will miss (likely vital) config data,
- * Since the port's async_queue might be disabled/flushing,
- * we forcibly pop a buffer here.
- */
- tb = async_queue_pop_forced(self->ports[0]->queue);
- /* there should always be one available, as we are just starting */
- g_assert(tb);
- dmm_buffer_allocate(tb->data, GST_BUFFER_SIZE(buf));
- memcpy(tb->data->data, GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf));
- send_buffer(self, tb);
- return TRUE;
- }
- static gboolean base_query(GstPad *pad, GstQuery *query)
- {
- GstDspBase *base = GST_DSP_BASE(GST_PAD_PARENT(pad));
- gboolean res;
- pr_debug(base, "handling %s query",
- gst_query_type_get_name(GST_QUERY_TYPE(query)));
- res = gst_pad_peer_query(base->sinkpad, query);
- if (!res)
- return FALSE;
- switch (GST_QUERY_TYPE(query)) {
- case GST_QUERY_LATENCY: {
- gboolean live;
- GstClockTime min, max;
- GstClockTime frame_duration;
- gst_query_parse_latency(query, &live, &min, &max);
- pr_debug(base, "latency query live=%d, min=%" GST_TIME_FORMAT",max=%" GST_TIME_FORMAT,
- live, GST_TIME_ARGS(min), GST_TIME_ARGS(max));
- if (!base->codec) {
- pr_debug(base, "no codec identified yet, bailing");
- break;
- }
- if (base->default_duration) {
- frame_duration = base->default_duration;
- } else {
- GstClockTime c, first, last;
- unsigned i, count = 0;
- /* find first and last timestamps */
- g_mutex_lock(base->ts_mutex);
- i = base->ts_out_pos;
- first = last = base->ts_array[i].time;
- while (i != base->ts_in_pos) {
- c = base->ts_array[i].time;
- if (c < first)
- first = c;
- if (c > last)
- last = c;
- i = (i + 1) % ARRAY_SIZE(base->ts_array);
- count++;
- }
- g_mutex_unlock(base->ts_mutex);
- if (count > 0)
- frame_duration = (last - first) / count;
- else
- frame_duration = 0;
- /* more than 1s per frame means something's wrong */
- if (frame_duration > GST_SECOND)
- frame_duration = GST_SECOND;
- }
- if (base->codec->get_latency) {
- GstClockTime latency;
- latency = base->codec->get_latency(base, frame_duration / 1000000) * 1000000;
- /* really need to avoid doing stuff with _NONE */
- if (GST_CLOCK_TIME_IS_VALID(min))
- min += latency;
- if (GST_CLOCK_TIME_IS_VALID(max))
- max += latency;
- }
- pr_debug(base, "latency query after live=%d, min=%" GST_TIME_FORMAT",max=%" GST_TIME_FORMAT,
- live, GST_TIME_ARGS(min), GST_TIME_ARGS(max));
- gst_query_set_latency(query, live, min, max);
- break;
- }
- default:
- /* peer handles other queries */
- break;
- }
- return TRUE;
- }
- gboolean
- gstdsp_set_codec_data_caps(GstDspBase *base,
- GstBuffer *buf)
- {
- GstCaps *caps = NULL;
- GstStructure *structure;
- GValue value = { .g_type = 0 };
- caps = gst_pad_get_negotiated_caps(base->srcpad);
- caps = gst_caps_make_writable(caps);
- structure = gst_caps_get_structure(caps, 0);
- g_value_init(&value, GST_TYPE_BUFFER);
- gst_value_set_buffer(&value, buf);
- gst_structure_set_value(structure, "codec_data", &value);
- g_value_unset(&value);
- return gst_pad_take_caps(base->srcpad, caps);
- }
- static GstFlowReturn
- pad_chain(GstPad *pad,
- GstBuffer *buf)
- {
- GstDspBase *self;
- dmm_buffer_t *b;
- GstFlowReturn ret = GST_FLOW_OK;
- du_port_t *p;
- struct td_buffer *tb;
- self = GST_DSP_BASE(GST_OBJECT_PARENT(pad));
- p = self->ports[0];
- pr_debug(self, "begin");
- if (G_UNLIKELY(GST_BUFFER_SIZE(buf) == 0)) {
- /* 0 size buffers are used for fast negotiation in 0.10 */
- pr_debug(self, "Got buffer of size 0, unref and return");
- goto leave;
- }
- if (self->pre_process_buffer)
- self->pre_process_buffer(self, buf);
- if (G_UNLIKELY(!self->node)) {
- if (!init_node(self, buf)) {
- if (!check_dsp_preemption(self))
- gstdsp_post_error(self, "couldn't start node");
- ret = GST_FLOW_ERROR;
- goto leave;
- }
- }
- /*
- * Check a few timestamps to see if we are dealing with PTS or DTS in order to
- * activate the reordering logic or not.
- */
- if (self->ts_mode == TS_MODE_CHECK_IN) {
- if (GST_CLOCK_TIME_IS_VALID(self->last_ts) &&
- GST_BUFFER_TIMESTAMP_IS_VALID(buf) &&
- self->last_ts > GST_BUFFER_TIMESTAMP(buf))
- {
- pr_debug(self, "in ts out-of-order -> sn ts");
- self->last_ts = GST_CLOCK_TIME_NONE;
- g_atomic_int_set(&self->ts_mode, TS_MODE_CHECK_OUT);
- goto next;
- }
- self->last_ts = GST_BUFFER_TIMESTAMP(buf);
- }
- next:
- tb = async_queue_pop(p->queue);
- ret = g_atomic_int_get(&self->status);
- if (ret != GST_FLOW_OK) {
- pr_info(self, "status: %s", gst_flow_get_name(self->status));
- if (tb)
- async_queue_push(p->queue, tb);
- goto leave;
- }
- b = tb->data;
- if (GST_BUFFER_SIZE(buf) >= self->input_buffer_size)
- map_buffer(self, buf, tb);
- else {
- dmm_buffer_allocate(b, self->input_buffer_size);
- b->need_copy = true;
- }
- if (b->need_copy) {
- pr_info(self, "copy");
- memcpy(b->data, GST_BUFFER_DATA(buf), GST_BUFFER_SIZE(buf));
- /* clear state for next time decision */
- b->need_copy = false;
- }
- g_mutex_lock(self->ts_mutex);
- self->ts_array[self->ts_in_pos].time = GST_BUFFER_TIMESTAMP(buf);
- self->ts_array[self->ts_in_pos].duration = GST_BUFFER_DURATION(buf);
- b->ts_index = self->ts_in_pos;
- self->ts_in_pos = (self->ts_in_pos + 1) % ARRAY_SIZE(self->ts_array);
- self->ts_count++;
- g_mutex_unlock(self->ts_mutex);
- ret = self->send_buffer(self, tb);
- if (ret != GST_FLOW_OK) {
- pr_info(self, "status: %s", gst_flow_get_name(self->status));
- if (ret == GST_FLOW_ERROR)
- gstdsp_post_error(self, "sending buffer failed");
- }
- leave:
- gst_buffer_unref(buf);
- pr_debug(self, "end");
- return ret;
- }
- static gboolean
- sink_event(GstDspBase *self,
- GstEvent *event)
- {
- gboolean ret = TRUE;
- pr_info(self, "event: %s", GST_EVENT_TYPE_NAME(event));
- switch (GST_EVENT_TYPE(event)) {
- case GST_EVENT_EOS: {
- bool defer_eos = false;
- g_mutex_lock(self->ts_mutex);
- if (self->ts_count != 0)
- defer_eos = true;
- if (self->status != GST_FLOW_OK)
- defer_eos = false;
- g_atomic_int_set(&self->deferred_eos, defer_eos);
- g_mutex_unlock(self->ts_mutex);
- if (defer_eos) {
- clock_gettime(CLOCK_MONOTONIC, &self->eos_start);
- if (self->flush_buffer)
- self->flush_buffer(self);
- gst_event_unref(event);
- } else {
- ret = gst_pad_push_event(self->srcpad, event);
- g_atomic_int_set(&self->eos, true);
- }
- break;
- }
- case GST_EVENT_FLUSH_START:
- ret = gst_pad_push_event(self->srcpad, event);
- g_atomic_int_set(&self->status, GST_FLOW_WRONG_STATE);
- dsp_unlock(self, TRUE);
- gst_pad_pause_task(self->srcpad);
- break;
- case GST_EVENT_FLUSH_STOP: {
- GSList **events;
- ret = gst_pad_push_event(self->srcpad, event);
- g_atomic_int_set(&self->eos, false);
- g_mutex_lock(self->ts_mutex);
- /*
- * Flush the current list of pending events, just in case
- * somebody is doing something crazy.
- */
- events = &self->ts_array[self->ts_in_pos].events;
- if (*events) {
- g_slist_foreach(*events, (GFunc) gst_event_unref, NULL);
- g_slist_free(*events);
- *events = NULL;
- }
- self->ts_push_pos = self->ts_in_pos;
- pr_debug(self, "flushing next %u buffer(s)",
- self->ts_push_pos - self->ts_out_pos);
- g_atomic_int_set(&self->deferred_eos, false);
- g_mutex_unlock(self->ts_mutex);
- g_atomic_int_set(&self->status, GST_FLOW_OK);
- dsp_unlock(self, FALSE);
- self->last_ts = GST_CLOCK_TIME_NONE;
- self->next_ts = GST_CLOCK_TIME_NONE;
- gst_pad_start_task(self->srcpad, output_loop, self->srcpad);
- break;
- }
- default:
- if (!GST_EVENT_IS_SERIALIZED(event)) {
- ret = gst_pad_push_event(self->srcpad, event);
- } else {
- GSList **events;
- g_mutex_lock(self->ts_mutex);
- pr_debug(self, "storing event");
- events = &self->ts_array[self->ts_in_pos].events;
- *events = g_slist_append(*events, event);
- g_mutex_unlock(self->ts_mutex);
- }
- break;
- }
- return ret;
- }
- static gboolean
- src_event(GstDspBase *self,
- GstEvent *event)
- {
- return gst_pad_push_event(self->sinkpad, event);
- }
- static gboolean
- base_sink_event(GstPad *pad,
- GstEvent *event)
- {
- GstDspBase *self;
- GstDspBaseClass *class;
- gboolean ret = TRUE;
- self = GST_DSP_BASE(gst_pad_get_parent(pad));
- class = GST_DSP_BASE_GET_CLASS(self);
- if (class->sink_event)
- ret = class->sink_event(self, event);
- gst_object_unref(self);
- return ret;
- }
- static gboolean
- base_src_event(GstPad *pad,
- GstEvent *event)
- {
- GstDspBase *self;
- GstDspBaseClass *class;
- gboolean ret = TRUE;
- self = GST_DSP_BASE(gst_pad_get_parent(pad));
- class = GST_DSP_BASE_GET_CLASS(self);
- if (class->src_event)
- ret = class->src_event(self, event);
- gst_object_unref(self);
- return ret;
- }
- static void
- instance_init(GTypeInstance *instance,
- gpointer g_class)
- {
- GstDspBase *self;
- GstElementClass *element_class;
- GstPadTemplate *template;
- element_class = GST_ELEMENT_CLASS(g_class);
- self = GST_DSP_BASE(instance);
- self->ports[0] = du_port_new(0, DMA_TO_DEVICE);
- self->ports[1] = du_port_new(1, DMA_FROM_DEVICE);
- self->ts_mode = TS_MODE_PASS;
- self->got_message = got_message;
- self->send_buffer = send_buffer;
- self->send_play_message = send_play_message;
- self->send_stop_message = send_stop_message;
- template = gst_element_class_get_pad_template(element_class, "sink");
- self->sinkpad = gst_pad_new_from_template(template, "sink");
- gst_pad_set_chain_function(self->sinkpad, pad_chain);
- gst_pad_set_event_function(self->sinkpad, base_sink_event);
- template = gst_element_class_get_pad_template(element_class, "src");
- self->srcpad = gst_pad_new_from_template(template, "src");
- gst_pad_use_fixed_caps(self->srcpad);
- gst_pad_set_event_function(self->srcpad, base_src_event);
- gst_pad_set_query_function(self->srcpad, base_query);
- gst_element_add_pad(GST_ELEMENT(self), self->sinkpad);
- gst_element_add_pad(GST_ELEMENT(self), self->srcpad);
- self->ts_mutex = g_mutex_new();
- self->pool_mutex = g_mutex_new();
- self->ts_cond = g_cond_new();
- self->flush = g_sem_new(0);
- self->eos_timeout = 10000;
- gst_segment_init(&self->segment, GST_FORMAT_UNDEFINED);
- }
- static void
- finalize(GObject *obj)
- {
- GstDspBase *self;
- self = GST_DSP_BASE(obj);
- g_sem_free(self->flush);
- g_mutex_free(self->ts_mutex);
- g_mutex_free(self->pool_mutex);
- g_cond_free(self->ts_cond);
- du_port_free(self->ports[1]);
- du_port_free(self->ports[0]);
- G_OBJECT_CLASS(parent_class)->finalize(obj);
- }
- static void
- class_init(gpointer g_class,
- gpointer class_data)
- {
- GstElementClass *gstelement_class;
- GObjectClass *gobject_class;
- GstDspBaseClass *class;
- parent_class = g_type_class_peek_parent(g_class);
- gstelement_class = GST_ELEMENT_CLASS(g_class);
- gobject_class = G_OBJECT_CLASS(g_class);
- class = GST_DSP_BASE_CLASS(g_class);
- gstelement_class->change_state = change_state;
- gobject_class->finalize = finalize;
- class->sink_event = sink_event;
- class->src_event = src_event;
- }
- GType
- gst_dsp_base_get_type(void)
- {
- static GType type;
- if (G_UNLIKELY(type == 0)) {
- GTypeInfo type_info = {
- .class_size = sizeof(GstDspBaseClass),
- .class_init = class_init,
- .instance_size = sizeof(GstDspBase),
- .instance_init = instance_init,
- };
- type = g_type_register_static(GST_TYPE_ELEMENT, "GstDspBase", &type_info, 0);
- /* cache buffer type */
- gst_dsp_buffer_get_type();
- }
- return type;
- }
|