chan.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. // Copyright 2014 The Go Authors. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package runtime
  5. // This file contains the implementation of Go channels.
  6. import "unsafe"
  7. const (
  8. maxAlign = 8
  9. hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
  10. debugChan = false
  11. )
  12. // TODO(khr): make hchan.buf an unsafe.Pointer, not a *uint8
  13. func makechan(t *chantype, size int64) *hchan {
  14. elem := t.elem
  15. // compiler checks this but be safe.
  16. if elem.size >= 1<<16 {
  17. gothrow("makechan: invalid channel element type")
  18. }
  19. if hchanSize%maxAlign != 0 || elem.align > maxAlign {
  20. gothrow("makechan: bad alignment")
  21. }
  22. if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (maxmem-hchanSize)/uintptr(elem.size)) {
  23. panic("makechan: size out of range")
  24. }
  25. var c *hchan
  26. if elem.kind&kindNoPointers != 0 || size == 0 {
  27. // Allocate memory in one call.
  28. // Hchan does not contain pointers interesting for GC in this case:
  29. // buf points into the same allocation, elemtype is persistent.
  30. // SudoG's are referenced from their owning thread so they can't be collected.
  31. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
  32. c = (*hchan)(mallocgc(hchanSize+uintptr(size)*uintptr(elem.size), nil, flagNoScan))
  33. if size > 0 && elem.size != 0 {
  34. c.buf = (*uint8)(add(unsafe.Pointer(c), hchanSize))
  35. } else {
  36. c.buf = (*uint8)(unsafe.Pointer(c)) // race detector uses this location for synchronization
  37. }
  38. } else {
  39. c = new(hchan)
  40. c.buf = (*uint8)(newarray(elem, uintptr(size)))
  41. }
  42. c.elemsize = uint16(elem.size)
  43. c.elemtype = elem
  44. c.dataqsiz = uint(size)
  45. if debugChan {
  46. print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
  47. }
  48. return c
  49. }
  50. // chanbuf(c, i) is pointer to the i'th slot in the buffer.
  51. func chanbuf(c *hchan, i uint) unsafe.Pointer {
  52. return add(unsafe.Pointer(c.buf), uintptr(i)*uintptr(c.elemsize))
  53. }
  54. // entry point for c <- x from compiled code
  55. //go:nosplit
  56. func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) {
  57. chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t)))
  58. }
  59. /*
  60. * generic single channel send/recv
  61. * If block is not nil,
  62. * then the protocol will not
  63. * sleep but return if it could
  64. * not complete.
  65. *
  66. * sleep can wake up with g.param == nil
  67. * when a channel involved in the sleep has
  68. * been closed. it is easiest to loop and re-run
  69. * the operation; we'll see that it's now closed.
  70. */
  71. func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  72. if raceenabled {
  73. raceReadObjectPC(t.elem, ep, callerpc, funcPC(chansend))
  74. }
  75. if c == nil {
  76. if !block {
  77. return false
  78. }
  79. gopark(nil, nil, "chan send (nil chan)")
  80. gothrow("unreachable")
  81. }
  82. if debugChan {
  83. print("chansend: chan=", c, "\n")
  84. }
  85. if raceenabled {
  86. racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
  87. }
  88. // Fast path: check for failed non-blocking operation without acquiring the lock.
  89. //
  90. // After observing that the channel is not closed, we observe that the channel is
  91. // not ready for sending. Each of these observations is a single word-sized read
  92. // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
  93. // Because a closed channel cannot transition from 'ready for sending' to
  94. // 'not ready for sending', even if the channel is closed between the two observations,
  95. // they imply a moment between the two when the channel was both not yet closed
  96. // and not ready for sending. We behave as if we observed the channel at that moment,
  97. // and report that the send cannot proceed.
  98. //
  99. // It is okay if the reads are reordered here: if we observe that the channel is not
  100. // ready for sending and then observe that it is not closed, that implies that the
  101. // channel wasn't closed during the first observation.
  102. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
  103. (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
  104. return false
  105. }
  106. var t0 int64
  107. if blockprofilerate > 0 {
  108. t0 = cputicks()
  109. }
  110. lock(&c.lock)
  111. if c.closed != 0 {
  112. unlock(&c.lock)
  113. panic("send on closed channel")
  114. }
  115. if c.dataqsiz == 0 { // synchronous channel
  116. sg := c.recvq.dequeue()
  117. if sg != nil { // found a waiting receiver
  118. if raceenabled {
  119. racesync(c, sg)
  120. }
  121. unlock(&c.lock)
  122. recvg := sg.g
  123. if sg.elem != nil {
  124. memmove(unsafe.Pointer(sg.elem), ep, uintptr(c.elemsize))
  125. sg.elem = nil
  126. }
  127. recvg.param = unsafe.Pointer(sg)
  128. if sg.releasetime != 0 {
  129. sg.releasetime = cputicks()
  130. }
  131. goready(recvg)
  132. return true
  133. }
  134. if !block {
  135. unlock(&c.lock)
  136. return false
  137. }
  138. // no receiver available: block on this channel.
  139. gp := getg()
  140. mysg := acquireSudog()
  141. mysg.releasetime = 0
  142. if t0 != 0 {
  143. mysg.releasetime = -1
  144. }
  145. mysg.elem = ep
  146. mysg.waitlink = nil
  147. gp.waiting = mysg
  148. mysg.g = gp
  149. mysg.selectdone = nil
  150. gp.param = nil
  151. c.sendq.enqueue(mysg)
  152. goparkunlock(&c.lock, "chan send")
  153. // someone woke us up.
  154. if mysg != gp.waiting {
  155. gothrow("G waiting list is corrupted!")
  156. }
  157. gp.waiting = nil
  158. if gp.param == nil {
  159. if c.closed == 0 {
  160. gothrow("chansend: spurious wakeup")
  161. }
  162. panic("send on closed channel")
  163. }
  164. gp.param = nil
  165. if mysg.releasetime > 0 {
  166. blockevent(int64(mysg.releasetime)-t0, 2)
  167. }
  168. releaseSudog(mysg)
  169. return true
  170. }
  171. // asynchronous channel
  172. // wait for some space to write our data
  173. var t1 int64
  174. for c.qcount >= c.dataqsiz {
  175. if !block {
  176. unlock(&c.lock)
  177. return false
  178. }
  179. gp := getg()
  180. mysg := acquireSudog()
  181. mysg.releasetime = 0
  182. if t0 != 0 {
  183. mysg.releasetime = -1
  184. }
  185. mysg.g = gp
  186. mysg.elem = nil
  187. mysg.selectdone = nil
  188. c.sendq.enqueue(mysg)
  189. goparkunlock(&c.lock, "chan send")
  190. // someone woke us up - try again
  191. if mysg.releasetime > 0 {
  192. t1 = mysg.releasetime
  193. }
  194. releaseSudog(mysg)
  195. lock(&c.lock)
  196. if c.closed != 0 {
  197. unlock(&c.lock)
  198. panic("send on closed channel")
  199. }
  200. }
  201. // write our data into the channel buffer
  202. if raceenabled {
  203. raceacquire(chanbuf(c, c.sendx))
  204. racerelease(chanbuf(c, c.sendx))
  205. }
  206. memmove(chanbuf(c, c.sendx), ep, uintptr(c.elemsize))
  207. c.sendx++
  208. if c.sendx == c.dataqsiz {
  209. c.sendx = 0
  210. }
  211. c.qcount++
  212. // wake up a waiting receiver
  213. sg := c.recvq.dequeue()
  214. if sg != nil {
  215. recvg := sg.g
  216. unlock(&c.lock)
  217. if sg.releasetime != 0 {
  218. sg.releasetime = cputicks()
  219. }
  220. goready(recvg)
  221. } else {
  222. unlock(&c.lock)
  223. }
  224. if t1 > 0 {
  225. blockevent(t1-t0, 2)
  226. }
  227. return true
  228. }
  229. func closechan(c *hchan) {
  230. if c == nil {
  231. panic("close of nil channel")
  232. }
  233. lock(&c.lock)
  234. if c.closed != 0 {
  235. unlock(&c.lock)
  236. panic("close of closed channel")
  237. }
  238. if raceenabled {
  239. callerpc := getcallerpc(unsafe.Pointer(&c))
  240. racewritepc(unsafe.Pointer(c), callerpc, funcPC(closechan))
  241. racerelease(unsafe.Pointer(c))
  242. }
  243. c.closed = 1
  244. // release all readers
  245. for {
  246. sg := c.recvq.dequeue()
  247. if sg == nil {
  248. break
  249. }
  250. gp := sg.g
  251. sg.elem = nil
  252. gp.param = nil
  253. if sg.releasetime != 0 {
  254. sg.releasetime = cputicks()
  255. }
  256. goready(gp)
  257. }
  258. // release all writers
  259. for {
  260. sg := c.sendq.dequeue()
  261. if sg == nil {
  262. break
  263. }
  264. gp := sg.g
  265. sg.elem = nil
  266. gp.param = nil
  267. if sg.releasetime != 0 {
  268. sg.releasetime = cputicks()
  269. }
  270. goready(gp)
  271. }
  272. unlock(&c.lock)
  273. }
  274. // entry points for <- c from compiled code
  275. //go:nosplit
  276. func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) {
  277. chanrecv(t, c, elem, true)
  278. }
  279. //go:nosplit
  280. func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) {
  281. _, received = chanrecv(t, c, elem, true)
  282. return
  283. }
  284. // chanrecv receives on channel c and writes the received data to ep.
  285. // ep may be nil, in which case received data is ignored.
  286. // If block == false and no elements are available, returns (false, false).
  287. // Otherwise, if c is closed, zeros *ep and returns (true, false).
  288. // Otherwise, fills in *ep with an element and returns (true, true).
  289. func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  290. // raceenabled: don't need to check ep, as it is always on the stack.
  291. if debugChan {
  292. print("chanrecv: chan=", c, "\n")
  293. }
  294. if c == nil {
  295. if !block {
  296. return
  297. }
  298. gopark(nil, nil, "chan receive (nil chan)")
  299. gothrow("unreachable")
  300. }
  301. // Fast path: check for failed non-blocking operation without acquiring the lock.
  302. //
  303. // After observing that the channel is not ready for receiving, we observe that the
  304. // channel is not closed. Each of these observations is a single word-sized read
  305. // (first c.sendq.first or c.qcount, and second c.closed).
  306. // Because a channel cannot be reopened, the later observation of the channel
  307. // being not closed implies that it was also not closed at the moment of the
  308. // first observation. We behave as if we observed the channel at that moment
  309. // and report that the receive cannot proceed.
  310. //
  311. // The order of operations is important here: reversing the operations can lead to
  312. // incorrect behavior when racing with a close.
  313. if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
  314. c.dataqsiz > 0 && atomicloaduint(&c.qcount) == 0) &&
  315. atomicload(&c.closed) == 0 {
  316. return
  317. }
  318. var t0 int64
  319. if blockprofilerate > 0 {
  320. t0 = cputicks()
  321. }
  322. lock(&c.lock)
  323. if c.dataqsiz == 0 { // synchronous channel
  324. if c.closed != 0 {
  325. return recvclosed(c, ep)
  326. }
  327. sg := c.sendq.dequeue()
  328. if sg != nil {
  329. if raceenabled {
  330. racesync(c, sg)
  331. }
  332. unlock(&c.lock)
  333. if ep != nil {
  334. memmove(ep, sg.elem, uintptr(c.elemsize))
  335. }
  336. sg.elem = nil
  337. gp := sg.g
  338. gp.param = unsafe.Pointer(sg)
  339. if sg.releasetime != 0 {
  340. sg.releasetime = cputicks()
  341. }
  342. goready(gp)
  343. selected = true
  344. received = true
  345. return
  346. }
  347. if !block {
  348. unlock(&c.lock)
  349. return
  350. }
  351. // no sender available: block on this channel.
  352. gp := getg()
  353. mysg := acquireSudog()
  354. mysg.releasetime = 0
  355. if t0 != 0 {
  356. mysg.releasetime = -1
  357. }
  358. mysg.elem = ep
  359. mysg.waitlink = nil
  360. gp.waiting = mysg
  361. mysg.g = gp
  362. mysg.selectdone = nil
  363. gp.param = nil
  364. c.recvq.enqueue(mysg)
  365. goparkunlock(&c.lock, "chan receive")
  366. // someone woke us up
  367. if mysg != gp.waiting {
  368. gothrow("G waiting list is corrupted!")
  369. }
  370. gp.waiting = nil
  371. if mysg.releasetime > 0 {
  372. blockevent(mysg.releasetime-t0, 2)
  373. }
  374. haveData := gp.param != nil
  375. gp.param = nil
  376. releaseSudog(mysg)
  377. if haveData {
  378. // a sender sent us some data. It already wrote to ep.
  379. selected = true
  380. received = true
  381. return
  382. }
  383. lock(&c.lock)
  384. if c.closed == 0 {
  385. gothrow("chanrecv: spurious wakeup")
  386. }
  387. return recvclosed(c, ep)
  388. }
  389. // asynchronous channel
  390. // wait for some data to appear
  391. var t1 int64
  392. for c.qcount <= 0 {
  393. if c.closed != 0 {
  394. selected, received = recvclosed(c, ep)
  395. if t1 > 0 {
  396. blockevent(t1-t0, 2)
  397. }
  398. return
  399. }
  400. if !block {
  401. unlock(&c.lock)
  402. return
  403. }
  404. // wait for someone to send an element
  405. gp := getg()
  406. mysg := acquireSudog()
  407. mysg.releasetime = 0
  408. if t0 != 0 {
  409. mysg.releasetime = -1
  410. }
  411. mysg.elem = nil
  412. mysg.g = gp
  413. mysg.selectdone = nil
  414. c.recvq.enqueue(mysg)
  415. goparkunlock(&c.lock, "chan receive")
  416. // someone woke us up - try again
  417. if mysg.releasetime > 0 {
  418. t1 = mysg.releasetime
  419. }
  420. releaseSudog(mysg)
  421. lock(&c.lock)
  422. }
  423. if raceenabled {
  424. raceacquire(chanbuf(c, c.recvx))
  425. racerelease(chanbuf(c, c.recvx))
  426. }
  427. if ep != nil {
  428. memmove(ep, chanbuf(c, c.recvx), uintptr(c.elemsize))
  429. }
  430. memclr(chanbuf(c, c.recvx), uintptr(c.elemsize))
  431. c.recvx++
  432. if c.recvx == c.dataqsiz {
  433. c.recvx = 0
  434. }
  435. c.qcount--
  436. // ping a sender now that there is space
  437. sg := c.sendq.dequeue()
  438. if sg != nil {
  439. gp := sg.g
  440. unlock(&c.lock)
  441. if sg.releasetime != 0 {
  442. sg.releasetime = cputicks()
  443. }
  444. goready(gp)
  445. } else {
  446. unlock(&c.lock)
  447. }
  448. if t1 > 0 {
  449. blockevent(t1-t0, 2)
  450. }
  451. selected = true
  452. received = true
  453. return
  454. }
  455. // recvclosed is a helper function for chanrecv. Handles cleanup
  456. // when the receiver encounters a closed channel.
  457. // Caller must hold c.lock, recvclosed will release the lock.
  458. func recvclosed(c *hchan, ep unsafe.Pointer) (selected, recevied bool) {
  459. if raceenabled {
  460. raceacquire(unsafe.Pointer(c))
  461. }
  462. unlock(&c.lock)
  463. if ep != nil {
  464. memclr(ep, uintptr(c.elemsize))
  465. }
  466. return true, false
  467. }
  468. // compiler implements
  469. //
  470. // select {
  471. // case c <- v:
  472. // ... foo
  473. // default:
  474. // ... bar
  475. // }
  476. //
  477. // as
  478. //
  479. // if selectnbsend(c, v) {
  480. // ... foo
  481. // } else {
  482. // ... bar
  483. // }
  484. //
  485. func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) {
  486. return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t)))
  487. }
  488. // compiler implements
  489. //
  490. // select {
  491. // case v = <-c:
  492. // ... foo
  493. // default:
  494. // ... bar
  495. // }
  496. //
  497. // as
  498. //
  499. // if selectnbrecv(&v, c) {
  500. // ... foo
  501. // } else {
  502. // ... bar
  503. // }
  504. //
  505. func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) {
  506. selected, _ = chanrecv(t, c, elem, false)
  507. return
  508. }
  509. // compiler implements
  510. //
  511. // select {
  512. // case v, ok = <-c:
  513. // ... foo
  514. // default:
  515. // ... bar
  516. // }
  517. //
  518. // as
  519. //
  520. // if c != nil && selectnbrecv2(&v, &ok, c) {
  521. // ... foo
  522. // } else {
  523. // ... bar
  524. // }
  525. //
  526. func selectnbrecv2(t *chantype, elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
  527. // TODO(khr): just return 2 values from this function, now that it is in Go.
  528. selected, *received = chanrecv(t, c, elem, false)
  529. return
  530. }
  531. func reflect_chansend(t *chantype, c *hchan, elem unsafe.Pointer, nb bool) (selected bool) {
  532. return chansend(t, c, elem, !nb, getcallerpc(unsafe.Pointer(&t)))
  533. }
  534. func reflect_chanrecv(t *chantype, c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) {
  535. return chanrecv(t, c, elem, !nb)
  536. }
  537. func reflect_chanlen(c *hchan) int {
  538. if c == nil {
  539. return 0
  540. }
  541. return int(c.qcount)
  542. }
  543. func reflect_chancap(c *hchan) int {
  544. if c == nil {
  545. return 0
  546. }
  547. return int(c.dataqsiz)
  548. }
  549. func (q *waitq) enqueue(sgp *sudog) {
  550. sgp.next = nil
  551. if q.first == nil {
  552. q.first = sgp
  553. q.last = sgp
  554. return
  555. }
  556. q.last.next = sgp
  557. q.last = sgp
  558. }
  559. func (q *waitq) dequeue() *sudog {
  560. for {
  561. sgp := q.first
  562. if sgp == nil {
  563. return nil
  564. }
  565. q.first = sgp.next
  566. sgp.next = nil
  567. if q.last == sgp {
  568. q.last = nil
  569. }
  570. // if sgp participates in a select and is already signaled, ignore it
  571. if sgp.selectdone != nil {
  572. // claim the right to signal
  573. if *sgp.selectdone != 0 || !cas(sgp.selectdone, 0, 1) {
  574. continue
  575. }
  576. }
  577. return sgp
  578. }
  579. }
  580. func racesync(c *hchan, sg *sudog) {
  581. racerelease(chanbuf(c, 0))
  582. raceacquireg(sg.g, chanbuf(c, 0))
  583. racereleaseg(sg.g, chanbuf(c, 0))
  584. raceacquire(chanbuf(c, 0))
  585. }