filter_system.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. // Package filters implements an ethereum filtering system for block,
  17. // transactions and log events.
  18. package filters
  19. import (
  20. "context"
  21. "errors"
  22. "fmt"
  23. "sync"
  24. "time"
  25. ethereum "github.com/ethereum/go-ethereum"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/core"
  28. "github.com/ethereum/go-ethereum/core/rawdb"
  29. "github.com/ethereum/go-ethereum/core/types"
  30. "github.com/ethereum/go-ethereum/event"
  31. "github.com/ethereum/go-ethereum/log"
  32. "github.com/ethereum/go-ethereum/rpc"
  33. )
  34. // Type determines the kind of filter and is used to put the filter in to
  35. // the correct bucket when added.
  36. type Type byte
  37. const (
  38. // UnknownSubscription indicates an unknown subscription type
  39. UnknownSubscription Type = iota
  40. // LogsSubscription queries for new or removed (chain reorg) logs
  41. LogsSubscription
  42. // PendingLogsSubscription queries for logs in pending blocks
  43. PendingLogsSubscription
  44. // MinedAndPendingLogsSubscription queries for logs in mined and pending blocks.
  45. MinedAndPendingLogsSubscription
  46. // PendingTransactionsSubscription queries tx hashes for pending
  47. // transactions entering the pending state
  48. PendingTransactionsSubscription
  49. // BlocksSubscription queries hashes for blocks that are imported
  50. BlocksSubscription
  51. // LastSubscription keeps track of the last index
  52. LastIndexSubscription
  53. )
  54. const (
  55. // txChanSize is the size of channel listening to NewTxsEvent.
  56. // The number is referenced from the size of tx pool.
  57. txChanSize = 4096
  58. // rmLogsChanSize is the size of channel listening to RemovedLogsEvent.
  59. rmLogsChanSize = 10
  60. // logsChanSize is the size of channel listening to LogsEvent.
  61. logsChanSize = 10
  62. // chainEvChanSize is the size of channel listening to ChainEvent.
  63. chainEvChanSize = 10
  64. )
  65. var (
  66. ErrInvalidSubscriptionID = errors.New("invalid id")
  67. )
  68. type subscription struct {
  69. id rpc.ID
  70. typ Type
  71. created time.Time
  72. logsCrit ethereum.FilterQuery
  73. logs chan []*types.Log
  74. hashes chan []common.Hash
  75. headers chan *types.Header
  76. installed chan struct{} // closed when the filter is installed
  77. err chan error // closed when the filter is uninstalled
  78. }
  79. // EventSystem creates subscriptions, processes events and broadcasts them to the
  80. // subscription which match the subscription criteria.
  81. type EventSystem struct {
  82. mux *event.TypeMux
  83. backend Backend
  84. lightMode bool
  85. lastHead *types.Header
  86. // Subscriptions
  87. txsSub event.Subscription // Subscription for new transaction event
  88. logsSub event.Subscription // Subscription for new log event
  89. rmLogsSub event.Subscription // Subscription for removed log event
  90. chainSub event.Subscription // Subscription for new chain event
  91. pendingLogSub *event.TypeMuxSubscription // Subscription for pending log event
  92. // Channels
  93. install chan *subscription // install filter for event notification
  94. uninstall chan *subscription // remove filter for event notification
  95. txsCh chan core.NewTxsEvent // Channel to receive new transactions event
  96. logsCh chan []*types.Log // Channel to receive new log event
  97. rmLogsCh chan core.RemovedLogsEvent // Channel to receive removed log event
  98. chainCh chan core.ChainEvent // Channel to receive new chain event
  99. }
  100. // NewEventSystem creates a new manager that listens for event on the given mux,
  101. // parses and filters them. It uses the all map to retrieve filter changes. The
  102. // work loop holds its own index that is used to forward events to filters.
  103. //
  104. // The returned manager has a loop that needs to be stopped with the Stop function
  105. // or by stopping the given mux.
  106. func NewEventSystem(mux *event.TypeMux, backend Backend, lightMode bool) *EventSystem {
  107. m := &EventSystem{
  108. mux: mux,
  109. backend: backend,
  110. lightMode: lightMode,
  111. install: make(chan *subscription),
  112. uninstall: make(chan *subscription),
  113. txsCh: make(chan core.NewTxsEvent, txChanSize),
  114. logsCh: make(chan []*types.Log, logsChanSize),
  115. rmLogsCh: make(chan core.RemovedLogsEvent, rmLogsChanSize),
  116. chainCh: make(chan core.ChainEvent, chainEvChanSize),
  117. }
  118. // Subscribe events
  119. m.txsSub = m.backend.SubscribeNewTxsEvent(m.txsCh)
  120. m.logsSub = m.backend.SubscribeLogsEvent(m.logsCh)
  121. m.rmLogsSub = m.backend.SubscribeRemovedLogsEvent(m.rmLogsCh)
  122. m.chainSub = m.backend.SubscribeChainEvent(m.chainCh)
  123. // TODO(rjl493456442): use feed to subscribe pending log event
  124. m.pendingLogSub = m.mux.Subscribe(core.PendingLogsEvent{})
  125. // Make sure none of the subscriptions are empty
  126. if m.txsSub == nil || m.logsSub == nil || m.rmLogsSub == nil || m.chainSub == nil ||
  127. m.pendingLogSub.Closed() {
  128. log.Crit("Subscribe for event system failed")
  129. }
  130. go m.eventLoop()
  131. return m
  132. }
  133. // Subscription is created when the client registers itself for a particular event.
  134. type Subscription struct {
  135. ID rpc.ID
  136. f *subscription
  137. es *EventSystem
  138. unsubOnce sync.Once
  139. }
  140. // Err returns a channel that is closed when unsubscribed.
  141. func (sub *Subscription) Err() <-chan error {
  142. return sub.f.err
  143. }
  144. // Unsubscribe uninstalls the subscription from the event broadcast loop.
  145. func (sub *Subscription) Unsubscribe() {
  146. sub.unsubOnce.Do(func() {
  147. uninstallLoop:
  148. for {
  149. // write uninstall request and consume logs/hashes. This prevents
  150. // the eventLoop broadcast method to deadlock when writing to the
  151. // filter event channel while the subscription loop is waiting for
  152. // this method to return (and thus not reading these events).
  153. select {
  154. case sub.es.uninstall <- sub.f:
  155. break uninstallLoop
  156. case <-sub.f.logs:
  157. case <-sub.f.hashes:
  158. case <-sub.f.headers:
  159. }
  160. }
  161. // wait for filter to be uninstalled in work loop before returning
  162. // this ensures that the manager won't use the event channel which
  163. // will probably be closed by the client asap after this method returns.
  164. <-sub.Err()
  165. })
  166. }
  167. // subscribe installs the subscription in the event broadcast loop.
  168. func (es *EventSystem) subscribe(sub *subscription) *Subscription {
  169. es.install <- sub
  170. <-sub.installed
  171. return &Subscription{ID: sub.id, f: sub, es: es}
  172. }
  173. // SubscribeLogs creates a subscription that will write all logs matching the
  174. // given criteria to the given logs channel. Default value for the from and to
  175. // block is "latest". If the fromBlock > toBlock an error is returned.
  176. func (es *EventSystem) SubscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) (*Subscription, error) {
  177. var from, to rpc.BlockNumber
  178. if crit.FromBlock == nil {
  179. from = rpc.LatestBlockNumber
  180. } else {
  181. from = rpc.BlockNumber(crit.FromBlock.Int64())
  182. }
  183. if crit.ToBlock == nil {
  184. to = rpc.LatestBlockNumber
  185. } else {
  186. to = rpc.BlockNumber(crit.ToBlock.Int64())
  187. }
  188. // only interested in pending logs
  189. if from == rpc.PendingBlockNumber && to == rpc.PendingBlockNumber {
  190. return es.subscribePendingLogs(crit, logs), nil
  191. }
  192. // only interested in new mined logs
  193. if from == rpc.LatestBlockNumber && to == rpc.LatestBlockNumber {
  194. return es.subscribeLogs(crit, logs), nil
  195. }
  196. // only interested in mined logs within a specific block range
  197. if from >= 0 && to >= 0 && to >= from {
  198. return es.subscribeLogs(crit, logs), nil
  199. }
  200. // interested in mined logs from a specific block number, new logs and pending logs
  201. if from >= rpc.LatestBlockNumber && to == rpc.PendingBlockNumber {
  202. return es.subscribeMinedPendingLogs(crit, logs), nil
  203. }
  204. // interested in logs from a specific block number to new mined blocks
  205. if from >= 0 && to == rpc.LatestBlockNumber {
  206. return es.subscribeLogs(crit, logs), nil
  207. }
  208. return nil, fmt.Errorf("invalid from and to block combination: from > to")
  209. }
  210. // subscribeMinedPendingLogs creates a subscription that returned mined and
  211. // pending logs that match the given criteria.
  212. func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
  213. sub := &subscription{
  214. id: rpc.NewID(),
  215. typ: MinedAndPendingLogsSubscription,
  216. logsCrit: crit,
  217. created: time.Now(),
  218. logs: logs,
  219. hashes: make(chan []common.Hash),
  220. headers: make(chan *types.Header),
  221. installed: make(chan struct{}),
  222. err: make(chan error),
  223. }
  224. return es.subscribe(sub)
  225. }
  226. // subscribeLogs creates a subscription that will write all logs matching the
  227. // given criteria to the given logs channel.
  228. func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
  229. sub := &subscription{
  230. id: rpc.NewID(),
  231. typ: LogsSubscription,
  232. logsCrit: crit,
  233. created: time.Now(),
  234. logs: logs,
  235. hashes: make(chan []common.Hash),
  236. headers: make(chan *types.Header),
  237. installed: make(chan struct{}),
  238. err: make(chan error),
  239. }
  240. return es.subscribe(sub)
  241. }
  242. // subscribePendingLogs creates a subscription that writes transaction hashes for
  243. // transactions that enter the transaction pool.
  244. func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan []*types.Log) *Subscription {
  245. sub := &subscription{
  246. id: rpc.NewID(),
  247. typ: PendingLogsSubscription,
  248. logsCrit: crit,
  249. created: time.Now(),
  250. logs: logs,
  251. hashes: make(chan []common.Hash),
  252. headers: make(chan *types.Header),
  253. installed: make(chan struct{}),
  254. err: make(chan error),
  255. }
  256. return es.subscribe(sub)
  257. }
  258. // SubscribeNewHeads creates a subscription that writes the header of a block that is
  259. // imported in the chain.
  260. func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscription {
  261. sub := &subscription{
  262. id: rpc.NewID(),
  263. typ: BlocksSubscription,
  264. created: time.Now(),
  265. logs: make(chan []*types.Log),
  266. hashes: make(chan []common.Hash),
  267. headers: headers,
  268. installed: make(chan struct{}),
  269. err: make(chan error),
  270. }
  271. return es.subscribe(sub)
  272. }
  273. // SubscribePendingTxs creates a subscription that writes transaction hashes for
  274. // transactions that enter the transaction pool.
  275. func (es *EventSystem) SubscribePendingTxs(hashes chan []common.Hash) *Subscription {
  276. sub := &subscription{
  277. id: rpc.NewID(),
  278. typ: PendingTransactionsSubscription,
  279. created: time.Now(),
  280. logs: make(chan []*types.Log),
  281. hashes: hashes,
  282. headers: make(chan *types.Header),
  283. installed: make(chan struct{}),
  284. err: make(chan error),
  285. }
  286. return es.subscribe(sub)
  287. }
  288. type filterIndex map[Type]map[rpc.ID]*subscription
  289. // broadcast event to filters that match criteria.
  290. func (es *EventSystem) broadcast(filters filterIndex, ev interface{}) {
  291. if ev == nil {
  292. return
  293. }
  294. switch e := ev.(type) {
  295. case []*types.Log:
  296. if len(e) > 0 {
  297. for _, f := range filters[LogsSubscription] {
  298. if matchedLogs := filterLogs(e, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
  299. f.logs <- matchedLogs
  300. }
  301. }
  302. }
  303. case core.RemovedLogsEvent:
  304. for _, f := range filters[LogsSubscription] {
  305. if matchedLogs := filterLogs(e.Logs, f.logsCrit.FromBlock, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
  306. f.logs <- matchedLogs
  307. }
  308. }
  309. case *event.TypeMuxEvent:
  310. switch muxe := e.Data.(type) {
  311. case core.PendingLogsEvent:
  312. for _, f := range filters[PendingLogsSubscription] {
  313. if e.Time.After(f.created) {
  314. if matchedLogs := filterLogs(muxe.Logs, nil, f.logsCrit.ToBlock, f.logsCrit.Addresses, f.logsCrit.Topics); len(matchedLogs) > 0 {
  315. f.logs <- matchedLogs
  316. }
  317. }
  318. }
  319. }
  320. case core.NewTxsEvent:
  321. hashes := make([]common.Hash, 0, len(e.Txs))
  322. for _, tx := range e.Txs {
  323. hashes = append(hashes, tx.Hash())
  324. }
  325. for _, f := range filters[PendingTransactionsSubscription] {
  326. f.hashes <- hashes
  327. }
  328. case core.ChainEvent:
  329. for _, f := range filters[BlocksSubscription] {
  330. f.headers <- e.Block.Header()
  331. }
  332. if es.lightMode && len(filters[LogsSubscription]) > 0 {
  333. es.lightFilterNewHead(e.Block.Header(), func(header *types.Header, remove bool) {
  334. for _, f := range filters[LogsSubscription] {
  335. if matchedLogs := es.lightFilterLogs(header, f.logsCrit.Addresses, f.logsCrit.Topics, remove); len(matchedLogs) > 0 {
  336. f.logs <- matchedLogs
  337. }
  338. }
  339. })
  340. }
  341. }
  342. }
  343. func (es *EventSystem) lightFilterNewHead(newHeader *types.Header, callBack func(*types.Header, bool)) {
  344. oldh := es.lastHead
  345. es.lastHead = newHeader
  346. if oldh == nil {
  347. return
  348. }
  349. newh := newHeader
  350. // find common ancestor, create list of rolled back and new block hashes
  351. var oldHeaders, newHeaders []*types.Header
  352. for oldh.Hash() != newh.Hash() {
  353. if oldh.Number.Uint64() >= newh.Number.Uint64() {
  354. oldHeaders = append(oldHeaders, oldh)
  355. oldh = rawdb.ReadHeader(es.backend.ChainDb(), oldh.ParentHash, oldh.Number.Uint64()-1)
  356. }
  357. if oldh.Number.Uint64() < newh.Number.Uint64() {
  358. newHeaders = append(newHeaders, newh)
  359. newh = rawdb.ReadHeader(es.backend.ChainDb(), newh.ParentHash, newh.Number.Uint64()-1)
  360. if newh == nil {
  361. // happens when CHT syncing, nothing to do
  362. newh = oldh
  363. }
  364. }
  365. }
  366. // roll back old blocks
  367. for _, h := range oldHeaders {
  368. callBack(h, true)
  369. }
  370. // check new blocks (array is in reverse order)
  371. for i := len(newHeaders) - 1; i >= 0; i-- {
  372. callBack(newHeaders[i], false)
  373. }
  374. }
  375. // filter logs of a single header in light client mode
  376. func (es *EventSystem) lightFilterLogs(header *types.Header, addresses []common.Address, topics [][]common.Hash, remove bool) []*types.Log {
  377. if bloomFilter(header.Bloom, addresses, topics) {
  378. // Get the logs of the block
  379. ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  380. defer cancel()
  381. logsList, err := es.backend.GetLogs(ctx, header.Hash())
  382. if err != nil {
  383. return nil
  384. }
  385. var unfiltered []*types.Log
  386. for _, logs := range logsList {
  387. for _, log := range logs {
  388. logcopy := *log
  389. logcopy.Removed = remove
  390. unfiltered = append(unfiltered, &logcopy)
  391. }
  392. }
  393. logs := filterLogs(unfiltered, nil, nil, addresses, topics)
  394. if len(logs) > 0 && logs[0].TxHash == (common.Hash{}) {
  395. // We have matching but non-derived logs
  396. receipts, err := es.backend.GetReceipts(ctx, header.Hash())
  397. if err != nil {
  398. return nil
  399. }
  400. unfiltered = unfiltered[:0]
  401. for _, receipt := range receipts {
  402. for _, log := range receipt.Logs {
  403. logcopy := *log
  404. logcopy.Removed = remove
  405. unfiltered = append(unfiltered, &logcopy)
  406. }
  407. }
  408. logs = filterLogs(unfiltered, nil, nil, addresses, topics)
  409. }
  410. return logs
  411. }
  412. return nil
  413. }
  414. // eventLoop (un)installs filters and processes mux events.
  415. func (es *EventSystem) eventLoop() {
  416. // Ensure all subscriptions get cleaned up
  417. defer func() {
  418. es.pendingLogSub.Unsubscribe()
  419. es.txsSub.Unsubscribe()
  420. es.logsSub.Unsubscribe()
  421. es.rmLogsSub.Unsubscribe()
  422. es.chainSub.Unsubscribe()
  423. }()
  424. index := make(filterIndex)
  425. for i := UnknownSubscription; i < LastIndexSubscription; i++ {
  426. index[i] = make(map[rpc.ID]*subscription)
  427. }
  428. for {
  429. select {
  430. // Handle subscribed events
  431. case ev := <-es.txsCh:
  432. es.broadcast(index, ev)
  433. case ev := <-es.logsCh:
  434. es.broadcast(index, ev)
  435. case ev := <-es.rmLogsCh:
  436. es.broadcast(index, ev)
  437. case ev := <-es.chainCh:
  438. es.broadcast(index, ev)
  439. case ev, active := <-es.pendingLogSub.Chan():
  440. if !active { // system stopped
  441. return
  442. }
  443. es.broadcast(index, ev)
  444. case f := <-es.install:
  445. if f.typ == MinedAndPendingLogsSubscription {
  446. // the type are logs and pending logs subscriptions
  447. index[LogsSubscription][f.id] = f
  448. index[PendingLogsSubscription][f.id] = f
  449. } else {
  450. index[f.typ][f.id] = f
  451. }
  452. close(f.installed)
  453. case f := <-es.uninstall:
  454. if f.typ == MinedAndPendingLogsSubscription {
  455. // the type are logs and pending logs subscriptions
  456. delete(index[LogsSubscription], f.id)
  457. delete(index[PendingLogsSubscription], f.id)
  458. } else {
  459. delete(index[f.typ], f.id)
  460. }
  461. close(f.err)
  462. // System stopped
  463. case <-es.txsSub.Err():
  464. return
  465. case <-es.logsSub.Err():
  466. return
  467. case <-es.rmLogsSub.Err():
  468. return
  469. case <-es.chainSub.Err():
  470. return
  471. }
  472. }
  473. }