123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656 |
- // Copyright 2014 The Go Authors. All rights reserved.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- package runtime
- // This file contains the implementation of Go channels.
- import "unsafe"
- const (
- maxAlign = 8
- hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
- debugChan = false
- )
- // TODO(khr): make hchan.buf an unsafe.Pointer, not a *uint8
- func makechan(t *chantype, size int64) *hchan {
- elem := t.elem
- // compiler checks this but be safe.
- if elem.size >= 1<<16 {
- gothrow("makechan: invalid channel element type")
- }
- if hchanSize%maxAlign != 0 || elem.align > maxAlign {
- gothrow("makechan: bad alignment")
- }
- if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (maxmem-hchanSize)/uintptr(elem.size)) {
- panic("makechan: size out of range")
- }
- var c *hchan
- if elem.kind&kindNoPointers != 0 || size == 0 {
- // Allocate memory in one call.
- // Hchan does not contain pointers interesting for GC in this case:
- // buf points into the same allocation, elemtype is persistent.
- // SudoG's are referenced from their owning thread so they can't be collected.
- // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
- c = (*hchan)(mallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan))
- if size > 0 && elem.size != 0 {
- c.buf = (*uint8)(add(unsafe.Pointer(c), hchanSize))
- } else {
- c.buf = (*uint8)(unsafe.Pointer(c)) // race detector uses this location for synchronization
- }
- } else {
- c = new(hchan)
- c.buf = (*uint8)(newarray(elem, uintptr(size)))
- }
- c.elemsize = uint16(elem.size)
- c.elemtype = elem
- c.dataqsiz = uint(size)
- if debugChan {
- print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
- }
- return c
- }
- // chanbuf(c, i) is pointer to the i'th slot in the buffer.
- func chanbuf(c *hchan, i uint) unsafe.Pointer {
- return add(unsafe.Pointer(c.buf), uintptr(i)*uintptr(c.elemsize))
- }
- // entry point for c <- x from compiled code
- //go:nosplit
- func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
- chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t)))
- }
- /*
- * generic single channel send/recv
- * If block is not nil,
- * then the protocol will not
- * sleep but return if it could
- * not complete.
- *
- * sleep can wake up with g.param == nil
- * when a channel involved in the sleep has
- * been closed. it is easiest to loop and re-run
- * the operation; we'll see that it's now closed.
- */
- func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
- if raceenabled {
- raceReadObjectPC(t.elem, ep, callerpc, funcPC(chansend))
- }
- if c == nil {
- if !block {
- return false
- }
- gopark(nil, nil, "chan send (nil chan)")
- gothrow("unreachable")
- }
- if debugChan {
- print("chansend: chan=", c, "\n")
- }
- if raceenabled {
- racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
- }
- // Fast path: check for failed non-blocking operation without acquiring the lock.
- //
- // After observing that the channel is not closed, we observe that the channel is
- // not ready for sending. Each of these observations is a single word-sized read
- // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
- // Because a closed channel cannot transition from 'ready for sending' to
- // 'not ready for sending', even if the channel is closed between the two observations,
- // they imply a moment between the two when the channel was both not yet closed
- // and not ready for sending. We behave as if we observed the channel at that moment,
- // and report that the send cannot proceed.
- //
- // It is okay if the reads are reordered here: if we observe that the channel is not
- // ready for sending and then observe that it is not closed, that implies that the
- // channel wasn't closed during the first observation.
- if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
- (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
- return false
- }
- var t0 int64
- if blockprofilerate > 0 {
- t0 = cputicks()
- }
- lock(&c.lock)
- if c.closed != 0 {
- unlock(&c.lock)
- panic("send on closed channel")
- }
- if c.dataqsiz == 0 { // synchronous channel
- sg := c.recvq.dequeue()
- if sg != nil { // found a waiting receiver
- if raceenabled {
- racesync(c, sg)
- }
- unlock(&c.lock)
- recvg := sg.g
- if sg.elem != nil {
- memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.elemsize))
- sg.elem = nil
- }
- recvg.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(recvg)
- return true
- }
- if !block {
- unlock(&c.lock)
- return false
- }
- // no receiver available: block on this channel.
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.elem = ep
- mysg.waitlink = nil
- gp.waiting = mysg
- mysg.g = gp
- mysg.selectdone = nil
- gp.param = nil
- c.sendq.enqueue(mysg)
- goparkunlock(&c.lock, "chan send")
- // someone woke us up.
- if mysg != gp.waiting {
- gothrow("G waiting list is corrupted!")
- }
- gp.waiting = nil
- if gp.param == nil {
- if c.closed == 0 {
- gothrow("chansend: spurious wakeup")
- }
- panic("send on closed channel")
- }
- gp.param = nil
- if mysg.releasetime > 0 {
- blockevent(int64(mysg.releasetime)-t0, 2)
- }
- releaseSudog(mysg)
- return true
- }
- // asynchronous channel
- // wait for some space to write our data
- var t1 int64
- for c.qcount >= c.dataqsiz {
- if !block {
- unlock(&c.lock)
- return false
- }
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.g = gp
- mysg.elem = nil
- mysg.selectdone = nil
- c.sendq.enqueue(mysg)
- goparkunlock(&c.lock, "chan send")
- // someone woke us up - try again
- if mysg.releasetime > 0 {
- t1 = mysg.releasetime
- }
- releaseSudog(mysg)
- lock(&c.lock)
- if c.closed != 0 {
- unlock(&c.lock)
- panic("send on closed channel")
- }
- }
- // write our data into the channel buffer
- if raceenabled {
- raceacquire(chanbuf(c, c.sendx))
- racerelease(chanbuf(c, c.sendx))
- }
- memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize))
- c.sendx++
- if c.sendx == c.dataqsiz {
- c.sendx = 0
- }
- c.qcount++
- // wake up a waiting receiver
- sg := c.recvq.dequeue()
- if sg != nil {
- recvg := sg.g
- unlock(&c.lock)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(recvg)
- } else {
- unlock(&c.lock)
- }
- if t1 > 0 {
- blockevent(t1-t0, 2)
- }
- return true
- }
- func closechan(c *hchan) {
- if c == nil {
- panic("close of nil channel")
- }
- lock(&c.lock)
- if c.closed != 0 {
- unlock(&c.lock)
- panic("close of closed channel")
- }
- if raceenabled {
- callerpc := getcallerpc(unsafe.Pointer(&c))
- racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
- racerelease(unsafe.Pointer(c))
- }
- c.closed = 1
- // release all readers
- for {
- sg := c.recvq.dequeue()
- if sg == nil {
- break
- }
- gp := sg.g
- sg.elem = nil
- gp.param = nil
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp)
- }
- // release all writers
- for {
- sg := c.sendq.dequeue()
- if sg == nil {
- break
- }
- gp := sg.g
- sg.elem = nil
- gp.param = nil
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp)
- }
- unlock(&c.lock)
- }
- // entry points for <- c from compiled code
- //go:nosplit
- func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) {
- chanrecv(t, c, elem, true)
- }
- //go:nosplit
- func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
- _, received = chanrecv(t, c, elem, true)
- return
- }
- // chanrecv receives on channel c and writes the received data to ep.
- // ep may be nil, in which case received data is ignored.
- // If block == false and no elements are available, returns (false, false).
- // Otherwise, if c is closed, zeros *ep and returns (true, false).
- // Otherwise, fills in *ep with an element and returns (true, true).
- func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
- // raceenabled: don't need to check ep, as it is always on the stack.
- if debugChan {
- print("chanrecv: chan=", c, "\n")
- }
- if c == nil {
- if !block {
- return
- }
- gopark(nil, nil, "chan receive (nil chan)")
- gothrow("unreachable")
- }
- // Fast path: check for failed non-blocking operation without acquiring the lock.
- //
- // After observing that the channel is not ready for receiving, we observe that the
- // channel is not closed. Each of these observations is a single word-sized read
- // (first c.sendq.first or c.qcount, and second c.closed).
- // Because a channel cannot be reopened, the later observation of the channel
- // being not closed implies that it was also not closed at the moment of the
- // first observation. We behave as if we observed the channel at that moment
- // and report that the receive cannot proceed.
- //
- // The order of operations is important here: reversing the operations can lead to
- // incorrect behavior when racing with a close.
- if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
- c.dataqsiz > 0 && atomicloaduint(&c.qcount) == 0) &&
- atomicload(&c.closed) == 0 {
- return
- }
- var t0 int64
- if blockprofilerate > 0 {
- t0 = cputicks()
- }
- lock(&c.lock)
- if c.dataqsiz == 0 { // synchronous channel
- if c.closed != 0 {
- return recvclosed(c, ep)
- }
- sg := c.sendq.dequeue()
- if sg != nil {
- if raceenabled {
- racesync(c, sg)
- }
- unlock(&c.lock)
- if ep != nil {
- memmove(ep, sg.elem, uintptr(c.elemsize))
- }
- sg.elem = nil
- gp := sg.g
- gp.param = unsafe.Pointer(sg)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp)
- selected = true
- received = true
- return
- }
- if !block {
- unlock(&c.lock)
- return
- }
- // no sender available: block on this channel.
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.elem = ep
- mysg.waitlink = nil
- gp.waiting = mysg
- mysg.g = gp
- mysg.selectdone = nil
- gp.param = nil
- c.recvq.enqueue(mysg)
- goparkunlock(&c.lock, "chan receive")
- // someone woke us up
- if mysg != gp.waiting {
- gothrow("G waiting list is corrupted!")
- }
- gp.waiting = nil
- if mysg.releasetime > 0 {
- blockevent(mysg.releasetime-t0, 2)
- }
- haveData := gp.param != nil
- gp.param = nil
- releaseSudog(mysg)
- if haveData {
- // a sender sent us some data. It already wrote to ep.
- selected = true
- received = true
- return
- }
- lock(&c.lock)
- if c.closed == 0 {
- gothrow("chanrecv: spurious wakeup")
- }
- return recvclosed(c, ep)
- }
- // asynchronous channel
- // wait for some data to appear
- var t1 int64
- for c.qcount <= 0 {
- if c.closed != 0 {
- selected, received = recvclosed(c, ep)
- if t1 > 0 {
- blockevent(t1-t0, 2)
- }
- return
- }
- if !block {
- unlock(&c.lock)
- return
- }
- // wait for someone to send an element
- gp := getg()
- mysg := acquireSudog()
- mysg.releasetime = 0
- if t0 != 0 {
- mysg.releasetime = -1
- }
- mysg.elem = nil
- mysg.g = gp
- mysg.selectdone = nil
- c.recvq.enqueue(mysg)
- goparkunlock(&c.lock, "chan receive")
- // someone woke us up - try again
- if mysg.releasetime > 0 {
- t1 = mysg.releasetime
- }
- releaseSudog(mysg)
- lock(&c.lock)
- }
- if raceenabled {
- raceacquire(chanbuf(c, c.recvx))
- racerelease(chanbuf(c, c.recvx))
- }
- if ep != nil {
- memmove(ep, chanbuf(c, c.recvx), uintptr(c.elemsize))
- }
- memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
- c.recvx++
- if c.recvx == c.dataqsiz {
- c.recvx = 0
- }
- c.qcount--
- // ping a sender now that there is space
- sg := c.sendq.dequeue()
- if sg != nil {
- gp := sg.g
- unlock(&c.lock)
- if sg.releasetime != 0 {
- sg.releasetime = cputicks()
- }
- goready(gp)
- } else {
- unlock(&c.lock)
- }
- if t1 > 0 {
- blockevent(t1-t0, 2)
- }
- selected = true
- received = true
- return
- }
- // recvclosed is a helper function for chanrecv. Handles cleanup
- // when the receiver encounters a closed channel.
- // Caller must hold c.lock, recvclosed will release the lock.
- func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
- if raceenabled {
- raceacquire(unsafe.Pointer(c))
- }
- unlock(&c.lock)
- if ep != nil {
- memclr(ep, uintptr(c.elemsize))
- }
- return true, false
- }
- // compiler implements
- //
- // select {
- // case c <- v:
- // ... foo
- // default:
- // ... bar
- // }
- //
- // as
- //
- // if selectnbsend(c, v) {
- // ... foo
- // } else {
- // ... bar
- // }
- //
- func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
- return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
- }
- // compiler implements
- //
- // select {
- // case v = <-c:
- // ... foo
- // default:
- // ... bar
- // }
- //
- // as
- //
- // if selectnbrecv(&v, c) {
- // ... foo
- // } else {
- // ... bar
- // }
- //
- func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
- selected, _ = chanrecv(t, c, elem, false)
- return
- }
- // compiler implements
- //
- // select {
- // case v, ok = <-c:
- // ... foo
- // default:
- // ... bar
- // }
- //
- // as
- //
- // if c != nil && selectnbrecv2(&v, &ok, c) {
- // ... foo
- // } else {
- // ... bar
- // }
- //
- func selectnbrecv2(t *chantype, elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
- // TODO(khr): just return 2 values from this function, now that it is in Go.
- selected, *received = chanrecv(t, c, elem, false)
- return
- }
- func reflect_chansend(t *chantype, c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
- return chansend(t, c, elem, !nb, getcallerpc(unsafe.Pointer(&t)))
- }
- func reflect_chanrecv(t *chantype, c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
- return chanrecv(t, c, elem, !nb)
- }
- func reflect_chanlen(c *hchan) int {
- if c == nil {
- return 0
- }
- return int(c.qcount)
- }
- func reflect_chancap(c *hchan) int {
- if c == nil {
- return 0
- }
- return int(c.dataqsiz)
- }
- func (q *waitq) enqueue(sgp *sudog) {
- sgp.next = nil
- if q.first == nil {
- q.first = sgp
- q.last = sgp
- return
- }
- q.last.next = sgp
- q.last = sgp
- }
- func (q *waitq) dequeue() *sudog {
- for {
- sgp := q.first
- if sgp == nil {
- return nil
- }
- q.first = sgp.next
- sgp.next = nil
- if q.last == sgp {
- q.last = nil
- }
- // if sgp participates in a select and is already signaled, ignore it
- if sgp.selectdone != nil {
- // claim the right to signal
- if *sgp.selectdone != 0 || !cas(sgp.selectdone, 0, 1) {
- continue
- }
- }
- return sgp
- }
- }
- func racesync(c *hchan, sg *sudog) {
- racerelease(chanbuf(c, 0))
- raceacquireg(sg.g, chanbuf(c, 0))
- racereleaseg(sg.g, chanbuf(c, 0))
- raceacquire(chanbuf(c, 0))
- }
|