matcher.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651
  1. // Copyright 2017 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package bloombits
  17. import (
  18. "bytes"
  19. "context"
  20. "errors"
  21. "math"
  22. "sort"
  23. "sync"
  24. "sync/atomic"
  25. "time"
  26. "github.com/ethereum/go-ethereum/common/bitutil"
  27. "github.com/ethereum/go-ethereum/crypto"
  28. )
  29. // bloomIndexes represents the bit indexes inside the bloom filter that belong
  30. // to some key.
  31. type bloomIndexes [3]uint
  32. // calcBloomIndexes returns the bloom filter bit indexes belonging to the given key.
  33. func calcBloomIndexes(b []byte) bloomIndexes {
  34. b = crypto.Keccak256(b)
  35. var idxs bloomIndexes
  36. for i := 0; i < len(idxs); i++ {
  37. idxs[i] = (uint(b[2*i])<<8)&2047 + uint(b[2*i+1])
  38. }
  39. return idxs
  40. }
  41. // partialMatches with a non-nil vector represents a section in which some sub-
  42. // matchers have already found potential matches. Subsequent sub-matchers will
  43. // binary AND their matches with this vector. If vector is nil, it represents a
  44. // section to be processed by the first sub-matcher.
  45. type partialMatches struct {
  46. section uint64
  47. bitset []byte
  48. }
  49. // Retrieval represents a request for retrieval task assignments for a given
  50. // bit with the given number of fetch elements, or a response for such a request.
  51. // It can also have the actual results set to be used as a delivery data struct.
  52. //
  53. // The contest and error fields are used by the light client to terminate matching
  54. // early if an error is enountered on some path of the pipeline.
  55. type Retrieval struct {
  56. Bit uint
  57. Sections []uint64
  58. Bitsets [][]byte
  59. Context context.Context
  60. Error error
  61. }
  62. // Matcher is a pipelined system of schedulers and logic matchers which perform
  63. // binary AND/OR operations on the bit-streams, creating a stream of potential
  64. // blocks to inspect for data content.
  65. type Matcher struct {
  66. sectionSize uint64 // Size of the data batches to filter on
  67. filters [][]bloomIndexes // Filter the system is matching for
  68. schedulers map[uint]*scheduler // Retrieval schedulers for loading bloom bits
  69. retrievers chan chan uint // Retriever processes waiting for bit allocations
  70. counters chan chan uint // Retriever processes waiting for task count reports
  71. retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
  72. deliveries chan *Retrieval // Retriever processes waiting for task response deliveries
  73. running uint32 // Atomic flag whether a session is live or not
  74. }
  75. // NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
  76. // address and topic filtering on them. Setting a filter component to `nil` is
  77. // allowed and will result in that filter rule being skipped (OR 0x11...1).
  78. func NewMatcher(sectionSize uint64, filters [][][]byte) *Matcher {
  79. // Create the matcher instance
  80. m := &Matcher{
  81. sectionSize: sectionSize,
  82. schedulers: make(map[uint]*scheduler),
  83. retrievers: make(chan chan uint),
  84. counters: make(chan chan uint),
  85. retrievals: make(chan chan *Retrieval),
  86. deliveries: make(chan *Retrieval),
  87. }
  88. // Calculate the bloom bit indexes for the groups we're interested in
  89. m.filters = nil
  90. for _, filter := range filters {
  91. // Gather the bit indexes of the filter rule, special casing the nil filter
  92. if len(filter) == 0 {
  93. continue
  94. }
  95. bloomBits := make([]bloomIndexes, len(filter))
  96. for i, clause := range filter {
  97. if clause == nil {
  98. bloomBits = nil
  99. break
  100. }
  101. bloomBits[i] = calcBloomIndexes(clause)
  102. }
  103. // Accumulate the filter rules if no nil rule was within
  104. if bloomBits != nil {
  105. m.filters = append(m.filters, bloomBits)
  106. }
  107. }
  108. // For every bit, create a scheduler to load/download the bit vectors
  109. for _, bloomIndexLists := range m.filters {
  110. for _, bloomIndexList := range bloomIndexLists {
  111. for _, bloomIndex := range bloomIndexList {
  112. m.addScheduler(bloomIndex)
  113. }
  114. }
  115. }
  116. return m
  117. }
  118. // addScheduler adds a bit stream retrieval scheduler for the given bit index if
  119. // it has not existed before. If the bit is already selected for filtering, the
  120. // existing scheduler can be used.
  121. func (m *Matcher) addScheduler(idx uint) {
  122. if _, ok := m.schedulers[idx]; ok {
  123. return
  124. }
  125. m.schedulers[idx] = newScheduler(idx)
  126. }
  127. // Start starts the matching process and returns a stream of bloom matches in
  128. // a given range of blocks. If there are no more matches in the range, the result
  129. // channel is closed.
  130. func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
  131. // Make sure we're not creating concurrent sessions
  132. if atomic.SwapUint32(&m.running, 1) == 1 {
  133. return nil, errors.New("matcher already running")
  134. }
  135. defer atomic.StoreUint32(&m.running, 0)
  136. // Initiate a new matching round
  137. session := &MatcherSession{
  138. matcher: m,
  139. quit: make(chan struct{}),
  140. kill: make(chan struct{}),
  141. ctx: ctx,
  142. }
  143. for _, scheduler := range m.schedulers {
  144. scheduler.reset()
  145. }
  146. sink := m.run(begin, end, cap(results), session)
  147. // Read the output from the result sink and deliver to the user
  148. session.pend.Add(1)
  149. go func() {
  150. defer session.pend.Done()
  151. defer close(results)
  152. for {
  153. select {
  154. case <-session.quit:
  155. return
  156. case res, ok := <-sink:
  157. // New match result found
  158. if !ok {
  159. return
  160. }
  161. // Calculate the first and last blocks of the section
  162. sectionStart := res.section * m.sectionSize
  163. first := sectionStart
  164. if begin > first {
  165. first = begin
  166. }
  167. last := sectionStart + m.sectionSize - 1
  168. if end < last {
  169. last = end
  170. }
  171. // Iterate over all the blocks in the section and return the matching ones
  172. for i := first; i <= last; i++ {
  173. // Skip the entire byte if no matches are found inside (and we're processing an entire byte!)
  174. next := res.bitset[(i-sectionStart)/8]
  175. if next == 0 {
  176. if i%8 == 0 {
  177. i += 7
  178. }
  179. continue
  180. }
  181. // Some bit it set, do the actual submatching
  182. if bit := 7 - i%8; next&(1<<bit) != 0 {
  183. select {
  184. case <-session.quit:
  185. return
  186. case results <- i:
  187. }
  188. }
  189. }
  190. }
  191. }
  192. }()
  193. return session, nil
  194. }
  195. // run creates a daisy-chain of sub-matchers, one for the address set and one
  196. // for each topic set, each sub-matcher receiving a section only if the previous
  197. // ones have all found a potential match in one of the blocks of the section,
  198. // then binary AND-ing its own matches and forwaring the result to the next one.
  199. //
  200. // The method starts feeding the section indexes into the first sub-matcher on a
  201. // new goroutine and returns a sink channel receiving the results.
  202. func (m *Matcher) run(begin, end uint64, buffer int, session *MatcherSession) chan *partialMatches {
  203. // Create the source channel and feed section indexes into
  204. source := make(chan *partialMatches, buffer)
  205. session.pend.Add(1)
  206. go func() {
  207. defer session.pend.Done()
  208. defer close(source)
  209. for i := begin / m.sectionSize; i <= end/m.sectionSize; i++ {
  210. select {
  211. case <-session.quit:
  212. return
  213. case source <- &partialMatches{i, bytes.Repeat([]byte{0xff}, int(m.sectionSize/8))}:
  214. }
  215. }
  216. }()
  217. // Assemble the daisy-chained filtering pipeline
  218. next := source
  219. dist := make(chan *request, buffer)
  220. for _, bloom := range m.filters {
  221. next = m.subMatch(next, dist, bloom, session)
  222. }
  223. // Start the request distribution
  224. session.pend.Add(1)
  225. go m.distributor(dist, session)
  226. return next
  227. }
  228. // subMatch creates a sub-matcher that filters for a set of addresses or topics, binary OR-s those matches, then
  229. // binary AND-s the result to the daisy-chain input (source) and forwards it to the daisy-chain output.
  230. // The matches of each address/topic are calculated by fetching the given sections of the three bloom bit indexes belonging to
  231. // that address/topic, and binary AND-ing those vectors together.
  232. func (m *Matcher) subMatch(source chan *partialMatches, dist chan *request, bloom []bloomIndexes, session *MatcherSession) chan *partialMatches {
  233. // Start the concurrent schedulers for each bit required by the bloom filter
  234. sectionSources := make([][3]chan uint64, len(bloom))
  235. sectionSinks := make([][3]chan []byte, len(bloom))
  236. for i, bits := range bloom {
  237. for j, bit := range bits {
  238. sectionSources[i][j] = make(chan uint64, cap(source))
  239. sectionSinks[i][j] = make(chan []byte, cap(source))
  240. m.schedulers[bit].run(sectionSources[i][j], dist, sectionSinks[i][j], session.quit, &session.pend)
  241. }
  242. }
  243. process := make(chan *partialMatches, cap(source)) // entries from source are forwarded here after fetches have been initiated
  244. results := make(chan *partialMatches, cap(source))
  245. session.pend.Add(2)
  246. go func() {
  247. // Tear down the goroutine and terminate all source channels
  248. defer session.pend.Done()
  249. defer close(process)
  250. defer func() {
  251. for _, bloomSources := range sectionSources {
  252. for _, bitSource := range bloomSources {
  253. close(bitSource)
  254. }
  255. }
  256. }()
  257. // Read sections from the source channel and multiplex into all bit-schedulers
  258. for {
  259. select {
  260. case <-session.quit:
  261. return
  262. case subres, ok := <-source:
  263. // New subresult from previous link
  264. if !ok {
  265. return
  266. }
  267. // Multiplex the section index to all bit-schedulers
  268. for _, bloomSources := range sectionSources {
  269. for _, bitSource := range bloomSources {
  270. select {
  271. case <-session.quit:
  272. return
  273. case bitSource <- subres.section:
  274. }
  275. }
  276. }
  277. // Notify the processor that this section will become available
  278. select {
  279. case <-session.quit:
  280. return
  281. case process <- subres:
  282. }
  283. }
  284. }
  285. }()
  286. go func() {
  287. // Tear down the goroutine and terminate the final sink channel
  288. defer session.pend.Done()
  289. defer close(results)
  290. // Read the source notifications and collect the delivered results
  291. for {
  292. select {
  293. case <-session.quit:
  294. return
  295. case subres, ok := <-process:
  296. // Notified of a section being retrieved
  297. if !ok {
  298. return
  299. }
  300. // Gather all the sub-results and merge them together
  301. var orVector []byte
  302. for _, bloomSinks := range sectionSinks {
  303. var andVector []byte
  304. for _, bitSink := range bloomSinks {
  305. var data []byte
  306. select {
  307. case <-session.quit:
  308. return
  309. case data = <-bitSink:
  310. }
  311. if andVector == nil {
  312. andVector = make([]byte, int(m.sectionSize/8))
  313. copy(andVector, data)
  314. } else {
  315. bitutil.ANDBytes(andVector, andVector, data)
  316. }
  317. }
  318. if orVector == nil {
  319. orVector = andVector
  320. } else {
  321. bitutil.ORBytes(orVector, orVector, andVector)
  322. }
  323. }
  324. if orVector == nil {
  325. orVector = make([]byte, int(m.sectionSize/8))
  326. }
  327. if subres.bitset != nil {
  328. bitutil.ANDBytes(orVector, orVector, subres.bitset)
  329. }
  330. if bitutil.TestBytes(orVector) {
  331. select {
  332. case <-session.quit:
  333. return
  334. case results <- &partialMatches{subres.section, orVector}:
  335. }
  336. }
  337. }
  338. }
  339. }()
  340. return results
  341. }
  342. // distributor receives requests from the schedulers and queues them into a set
  343. // of pending requests, which are assigned to retrievers wanting to fulfil them.
  344. func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
  345. defer session.pend.Done()
  346. var (
  347. requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
  348. unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
  349. retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
  350. )
  351. var (
  352. allocs int // Number of active allocations to handle graceful shutdown requests
  353. shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
  354. )
  355. // assign is a helper method fo try to assign a pending bit an actively
  356. // listening servicer, or schedule it up for later when one arrives.
  357. assign := func(bit uint) {
  358. select {
  359. case fetcher := <-m.retrievers:
  360. allocs++
  361. fetcher <- bit
  362. default:
  363. // No retrievers active, start listening for new ones
  364. retrievers = m.retrievers
  365. unallocs[bit] = struct{}{}
  366. }
  367. }
  368. for {
  369. select {
  370. case <-shutdown:
  371. // Graceful shutdown requested, wait until all pending requests are honoured
  372. if allocs == 0 {
  373. return
  374. }
  375. shutdown = nil
  376. case <-session.kill:
  377. // Pending requests not honoured in time, hard terminate
  378. return
  379. case req := <-dist:
  380. // New retrieval request arrived to be distributed to some fetcher process
  381. queue := requests[req.bit]
  382. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= req.section })
  383. requests[req.bit] = append(queue[:index], append([]uint64{req.section}, queue[index:]...)...)
  384. // If it's a new bit and we have waiting fetchers, allocate to them
  385. if len(queue) == 0 {
  386. assign(req.bit)
  387. }
  388. case fetcher := <-retrievers:
  389. // New retriever arrived, find the lowest section-ed bit to assign
  390. bit, best := uint(0), uint64(math.MaxUint64)
  391. for idx := range unallocs {
  392. if requests[idx][0] < best {
  393. bit, best = idx, requests[idx][0]
  394. }
  395. }
  396. // Stop tracking this bit (and alloc notifications if no more work is available)
  397. delete(unallocs, bit)
  398. if len(unallocs) == 0 {
  399. retrievers = nil
  400. }
  401. allocs++
  402. fetcher <- bit
  403. case fetcher := <-m.counters:
  404. // New task count request arrives, return number of items
  405. fetcher <- uint(len(requests[<-fetcher]))
  406. case fetcher := <-m.retrievals:
  407. // New fetcher waiting for tasks to retrieve, assign
  408. task := <-fetcher
  409. if want := len(task.Sections); want >= len(requests[task.Bit]) {
  410. task.Sections = requests[task.Bit]
  411. delete(requests, task.Bit)
  412. } else {
  413. task.Sections = append(task.Sections[:0], requests[task.Bit][:want]...)
  414. requests[task.Bit] = append(requests[task.Bit][:0], requests[task.Bit][want:]...)
  415. }
  416. fetcher <- task
  417. // If anything was left unallocated, try to assign to someone else
  418. if len(requests[task.Bit]) > 0 {
  419. assign(task.Bit)
  420. }
  421. case result := <-m.deliveries:
  422. // New retrieval task response from fetcher, split out missing sections and
  423. // deliver complete ones
  424. var (
  425. sections = make([]uint64, 0, len(result.Sections))
  426. bitsets = make([][]byte, 0, len(result.Bitsets))
  427. missing = make([]uint64, 0, len(result.Sections))
  428. )
  429. for i, bitset := range result.Bitsets {
  430. if len(bitset) == 0 {
  431. missing = append(missing, result.Sections[i])
  432. continue
  433. }
  434. sections = append(sections, result.Sections[i])
  435. bitsets = append(bitsets, bitset)
  436. }
  437. m.schedulers[result.Bit].deliver(sections, bitsets)
  438. allocs--
  439. // Reschedule missing sections and allocate bit if newly available
  440. if len(missing) > 0 {
  441. queue := requests[result.Bit]
  442. for _, section := range missing {
  443. index := sort.Search(len(queue), func(i int) bool { return queue[i] >= section })
  444. queue = append(queue[:index], append([]uint64{section}, queue[index:]...)...)
  445. }
  446. requests[result.Bit] = queue
  447. if len(queue) == len(missing) {
  448. assign(result.Bit)
  449. }
  450. }
  451. // If we're in the process of shutting down, terminate
  452. if allocs == 0 && shutdown == nil {
  453. return
  454. }
  455. }
  456. }
  457. }
  458. // MatcherSession is returned by a started matcher to be used as a terminator
  459. // for the actively running matching operation.
  460. type MatcherSession struct {
  461. matcher *Matcher
  462. closer sync.Once // Sync object to ensure we only ever close once
  463. quit chan struct{} // Quit channel to request pipeline termination
  464. kill chan struct{} // Term channel to signal non-graceful forced shutdown
  465. ctx context.Context // Context used by the light client to abort filtering
  466. err atomic.Value // Global error to track retrieval failures deep in the chain
  467. pend sync.WaitGroup
  468. }
  469. // Close stops the matching process and waits for all subprocesses to terminate
  470. // before returning. The timeout may be used for graceful shutdown, allowing the
  471. // currently running retrievals to complete before this time.
  472. func (s *MatcherSession) Close() {
  473. s.closer.Do(func() {
  474. // Signal termination and wait for all goroutines to tear down
  475. close(s.quit)
  476. time.AfterFunc(time.Second, func() { close(s.kill) })
  477. s.pend.Wait()
  478. })
  479. }
  480. // Error returns any failure encountered during the matching session.
  481. func (s *MatcherSession) Error() error {
  482. if err := s.err.Load(); err != nil {
  483. return err.(error)
  484. }
  485. return nil
  486. }
  487. // AllocateRetrieval assigns a bloom bit index to a client process that can either
  488. // immediately reuest and fetch the section contents assigned to this bit or wait
  489. // a little while for more sections to be requested.
  490. func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
  491. fetcher := make(chan uint)
  492. select {
  493. case <-s.quit:
  494. return 0, false
  495. case s.matcher.retrievers <- fetcher:
  496. bit, ok := <-fetcher
  497. return bit, ok
  498. }
  499. }
  500. // PendingSections returns the number of pending section retrievals belonging to
  501. // the given bloom bit index.
  502. func (s *MatcherSession) PendingSections(bit uint) int {
  503. fetcher := make(chan uint)
  504. select {
  505. case <-s.quit:
  506. return 0
  507. case s.matcher.counters <- fetcher:
  508. fetcher <- bit
  509. return int(<-fetcher)
  510. }
  511. }
  512. // AllocateSections assigns all or part of an already allocated bit-task queue
  513. // to the requesting process.
  514. func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
  515. fetcher := make(chan *Retrieval)
  516. select {
  517. case <-s.quit:
  518. return nil
  519. case s.matcher.retrievals <- fetcher:
  520. task := &Retrieval{
  521. Bit: bit,
  522. Sections: make([]uint64, count),
  523. }
  524. fetcher <- task
  525. return (<-fetcher).Sections
  526. }
  527. }
  528. // DeliverSections delivers a batch of section bit-vectors for a specific bloom
  529. // bit index to be injected into the processing pipeline.
  530. func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) {
  531. select {
  532. case <-s.kill:
  533. return
  534. case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}:
  535. }
  536. }
  537. // Multiplex polls the matcher session for rerieval tasks and multiplexes it into
  538. // the reuested retrieval queue to be serviced together with other sessions.
  539. //
  540. // This method will block for the lifetime of the session. Even after termination
  541. // of the session, any request in-flight need to be responded to! Empty responses
  542. // are fine though in that case.
  543. func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
  544. for {
  545. // Allocate a new bloom bit index to retrieve data for, stopping when done
  546. bit, ok := s.AllocateRetrieval()
  547. if !ok {
  548. return
  549. }
  550. // Bit allocated, throttle a bit if we're below our batch limit
  551. if s.PendingSections(bit) < batch {
  552. select {
  553. case <-s.quit:
  554. // Session terminating, we can't meaningfully service, abort
  555. s.AllocateSections(bit, 0)
  556. s.DeliverSections(bit, []uint64{}, [][]byte{})
  557. return
  558. case <-time.After(wait):
  559. // Throttling up, fetch whatever's available
  560. }
  561. }
  562. // Allocate as much as we can handle and request servicing
  563. sections := s.AllocateSections(bit, batch)
  564. request := make(chan *Retrieval)
  565. select {
  566. case <-s.quit:
  567. // Session terminating, we can't meaningfully service, abort
  568. s.DeliverSections(bit, sections, make([][]byte, len(sections)))
  569. return
  570. case mux <- request:
  571. // Retrieval accepted, something must arrive before we're aborting
  572. request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
  573. result := <-request
  574. if result.Error != nil {
  575. s.err.Store(result.Error)
  576. s.Close()
  577. }
  578. s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
  579. }
  580. }
  581. }