filter.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright 2014 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package filters
  17. import (
  18. "context"
  19. "math/big"
  20. "github.com/ethereum/go-ethereum/common"
  21. "github.com/ethereum/go-ethereum/core"
  22. "github.com/ethereum/go-ethereum/core/bloombits"
  23. "github.com/ethereum/go-ethereum/core/types"
  24. "github.com/ethereum/go-ethereum/ethdb"
  25. "github.com/ethereum/go-ethereum/event"
  26. "github.com/ethereum/go-ethereum/rpc"
  27. )
  28. type Backend interface {
  29. ChainDb() ethdb.Database
  30. EventMux() *event.TypeMux
  31. HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
  32. GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
  33. GetLogs(ctx context.Context, blockHash common.Hash) ([][]*types.Log, error)
  34. SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
  35. SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
  36. SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription
  37. SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription
  38. BloomStatus() (uint64, uint64)
  39. ServiceFilter(ctx context.Context, session *bloombits.MatcherSession)
  40. }
  41. // Filter can be used to retrieve and filter logs.
  42. type Filter struct {
  43. backend Backend
  44. db ethdb.Database
  45. begin, end int64
  46. addresses []common.Address
  47. topics [][]common.Hash
  48. matcher *bloombits.Matcher
  49. }
  50. // New creates a new filter which uses a bloom filter on blocks to figure out whether
  51. // a particular block is interesting or not.
  52. func New(backend Backend, begin, end int64, addresses []common.Address, topics [][]common.Hash) *Filter {
  53. // Flatten the address and topic filter clauses into a single bloombits filter
  54. // system. Since the bloombits are not positional, nil topics are permitted,
  55. // which get flattened into a nil byte slice.
  56. var filters [][][]byte
  57. if len(addresses) > 0 {
  58. filter := make([][]byte, len(addresses))
  59. for i, address := range addresses {
  60. filter[i] = address.Bytes()
  61. }
  62. filters = append(filters, filter)
  63. }
  64. for _, topicList := range topics {
  65. filter := make([][]byte, len(topicList))
  66. for i, topic := range topicList {
  67. filter[i] = topic.Bytes()
  68. }
  69. filters = append(filters, filter)
  70. }
  71. // Assemble and return the filter
  72. size, _ := backend.BloomStatus()
  73. return &Filter{
  74. backend: backend,
  75. begin: begin,
  76. end: end,
  77. addresses: addresses,
  78. topics: topics,
  79. db: backend.ChainDb(),
  80. matcher: bloombits.NewMatcher(size, filters),
  81. }
  82. }
  83. // Logs searches the blockchain for matching log entries, returning all from the
  84. // first block that contains matches, updating the start of the filter accordingly.
  85. func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) {
  86. // Figure out the limits of the filter range
  87. header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber)
  88. if header == nil {
  89. return nil, nil
  90. }
  91. head := header.Number.Uint64()
  92. if f.begin == -1 {
  93. f.begin = int64(head)
  94. }
  95. end := uint64(f.end)
  96. if f.end == -1 {
  97. end = head
  98. }
  99. // Gather all indexed logs, and finish with non indexed ones
  100. var (
  101. logs []*types.Log
  102. err error
  103. )
  104. size, sections := f.backend.BloomStatus()
  105. if indexed := sections * size; indexed > uint64(f.begin) {
  106. if indexed > end {
  107. logs, err = f.indexedLogs(ctx, end)
  108. } else {
  109. logs, err = f.indexedLogs(ctx, indexed-1)
  110. }
  111. if err != nil {
  112. return logs, err
  113. }
  114. }
  115. rest, err := f.unindexedLogs(ctx, end)
  116. logs = append(logs, rest...)
  117. return logs, err
  118. }
  119. // indexedLogs returns the logs matching the filter criteria based on the bloom
  120. // bits indexed available locally or via the network.
  121. func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
  122. // Create a matcher session and request servicing from the backend
  123. matches := make(chan uint64, 64)
  124. session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches)
  125. if err != nil {
  126. return nil, err
  127. }
  128. defer session.Close()
  129. f.backend.ServiceFilter(ctx, session)
  130. // Iterate over the matches until exhausted or context closed
  131. var logs []*types.Log
  132. for {
  133. select {
  134. case number, ok := <-matches:
  135. // Abort if all matches have been fulfilled
  136. if !ok {
  137. err := session.Error()
  138. if err == nil {
  139. f.begin = int64(end) + 1
  140. }
  141. return logs, err
  142. }
  143. f.begin = int64(number) + 1
  144. // Retrieve the suggested block and pull any truly matching logs
  145. header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number))
  146. if header == nil || err != nil {
  147. return logs, err
  148. }
  149. found, err := f.checkMatches(ctx, header)
  150. if err != nil {
  151. return logs, err
  152. }
  153. logs = append(logs, found...)
  154. case <-ctx.Done():
  155. return logs, ctx.Err()
  156. }
  157. }
  158. }
  159. // indexedLogs returns the logs matching the filter criteria based on raw block
  160. // iteration and bloom matching.
  161. func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) {
  162. var logs []*types.Log
  163. for ; f.begin <= int64(end); f.begin++ {
  164. header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin))
  165. if header == nil || err != nil {
  166. return logs, err
  167. }
  168. if bloomFilter(header.Bloom, f.addresses, f.topics) {
  169. found, err := f.checkMatches(ctx, header)
  170. if err != nil {
  171. return logs, err
  172. }
  173. logs = append(logs, found...)
  174. }
  175. }
  176. return logs, nil
  177. }
  178. // checkMatches checks if the receipts belonging to the given header contain any log events that
  179. // match the filter criteria. This function is called when the bloom filter signals a potential match.
  180. func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) {
  181. // Get the logs of the block
  182. logsList, err := f.backend.GetLogs(ctx, header.Hash())
  183. if err != nil {
  184. return nil, err
  185. }
  186. var unfiltered []*types.Log
  187. for _, logs := range logsList {
  188. unfiltered = append(unfiltered, logs...)
  189. }
  190. logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
  191. if len(logs) > 0 {
  192. // We have matching logs, check if we need to resolve full logs via the light client
  193. if logs[0].TxHash == (common.Hash{}) {
  194. receipts, err := f.backend.GetReceipts(ctx, header.Hash())
  195. if err != nil {
  196. return nil, err
  197. }
  198. unfiltered = unfiltered[:0]
  199. for _, receipt := range receipts {
  200. unfiltered = append(unfiltered, receipt.Logs...)
  201. }
  202. logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics)
  203. }
  204. return logs, nil
  205. }
  206. return nil, nil
  207. }
  208. func includes(addresses []common.Address, a common.Address) bool {
  209. for _, addr := range addresses {
  210. if addr == a {
  211. return true
  212. }
  213. }
  214. return false
  215. }
  216. // filterLogs creates a slice of logs matching the given criteria.
  217. func filterLogs(logs []*types.Log, fromBlock, toBlock *big.Int, addresses []common.Address, topics [][]common.Hash) []*types.Log {
  218. var ret []*types.Log
  219. Logs:
  220. for _, log := range logs {
  221. if fromBlock != nil && fromBlock.Int64() >= 0 && fromBlock.Uint64() > log.BlockNumber {
  222. continue
  223. }
  224. if toBlock != nil && toBlock.Int64() >= 0 && toBlock.Uint64() < log.BlockNumber {
  225. continue
  226. }
  227. if len(addresses) > 0 && !includes(addresses, log.Address) {
  228. continue
  229. }
  230. // If the to filtered topics is greater than the amount of topics in logs, skip.
  231. if len(topics) > len(log.Topics) {
  232. continue Logs
  233. }
  234. for i, topics := range topics {
  235. match := len(topics) == 0 // empty rule set == wildcard
  236. for _, topic := range topics {
  237. if log.Topics[i] == topic {
  238. match = true
  239. break
  240. }
  241. }
  242. if !match {
  243. continue Logs
  244. }
  245. }
  246. ret = append(ret, log)
  247. }
  248. return ret
  249. }
  250. func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]common.Hash) bool {
  251. if len(addresses) > 0 {
  252. var included bool
  253. for _, addr := range addresses {
  254. if types.BloomLookup(bloom, addr) {
  255. included = true
  256. break
  257. }
  258. }
  259. if !included {
  260. return false
  261. }
  262. }
  263. for _, sub := range topics {
  264. included := len(sub) == 0 // empty rule set == wildcard
  265. for _, topic := range sub {
  266. if types.BloomLookup(bloom, topic) {
  267. included = true
  268. break
  269. }
  270. }
  271. if !included {
  272. return false
  273. }
  274. }
  275. return true
  276. }