filter_system_test.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. // Copyright 2016 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package filters
  17. import (
  18. "context"
  19. "fmt"
  20. "math/big"
  21. "math/rand"
  22. "reflect"
  23. "testing"
  24. "time"
  25. ethereum "github.com/ethereum/go-ethereum"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/consensus/ethash"
  28. "github.com/ethereum/go-ethereum/core"
  29. "github.com/ethereum/go-ethereum/core/bloombits"
  30. "github.com/ethereum/go-ethereum/core/rawdb"
  31. "github.com/ethereum/go-ethereum/core/types"
  32. "github.com/ethereum/go-ethereum/ethdb"
  33. "github.com/ethereum/go-ethereum/event"
  34. "github.com/ethereum/go-ethereum/params"
  35. "github.com/ethereum/go-ethereum/rpc"
  36. )
  37. type testBackend struct {
  38. mux *event.TypeMux
  39. db ethdb.Database
  40. sections uint64
  41. txFeed *event.Feed
  42. rmLogsFeed *event.Feed
  43. logsFeed *event.Feed
  44. chainFeed *event.Feed
  45. }
  46. func (b *testBackend) ChainDb() ethdb.Database {
  47. return b.db
  48. }
  49. func (b *testBackend) EventMux() *event.TypeMux {
  50. return b.mux
  51. }
  52. func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
  53. var (
  54. hash common.Hash
  55. num uint64
  56. )
  57. if blockNr == rpc.LatestBlockNumber {
  58. hash = rawdb.ReadHeadBlockHash(b.db)
  59. number := rawdb.ReadHeaderNumber(b.db, hash)
  60. if number == nil {
  61. return nil, nil
  62. }
  63. num = *number
  64. } else {
  65. num = uint64(blockNr)
  66. hash = rawdb.ReadCanonicalHash(b.db, num)
  67. }
  68. return rawdb.ReadHeader(b.db, hash, num), nil
  69. }
  70. func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
  71. if number := rawdb.ReadHeaderNumber(b.db, hash); number != nil {
  72. return rawdb.ReadReceipts(b.db, hash, *number), nil
  73. }
  74. return nil, nil
  75. }
  76. func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
  77. number := rawdb.ReadHeaderNumber(b.db, hash)
  78. if number == nil {
  79. return nil, nil
  80. }
  81. receipts := rawdb.ReadReceipts(b.db, hash, *number)
  82. logs := make([][]*types.Log, len(receipts))
  83. for i, receipt := range receipts {
  84. logs[i] = receipt.Logs
  85. }
  86. return logs, nil
  87. }
  88. func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
  89. return b.txFeed.Subscribe(ch)
  90. }
  91. func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
  92. return b.rmLogsFeed.Subscribe(ch)
  93. }
  94. func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
  95. return b.logsFeed.Subscribe(ch)
  96. }
  97. func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
  98. return b.chainFeed.Subscribe(ch)
  99. }
  100. func (b *testBackend) BloomStatus() (uint64, uint64) {
  101. return params.BloomBitsBlocks, b.sections
  102. }
  103. func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
  104. requests := make(chan chan *bloombits.Retrieval)
  105. go session.Multiplex(16, 0, requests)
  106. go func() {
  107. for {
  108. // Wait for a service request or a shutdown
  109. select {
  110. case <-ctx.Done():
  111. return
  112. case request := <-requests:
  113. task := <-request
  114. task.Bitsets = make([][]byte, len(task.Sections))
  115. for i, section := range task.Sections {
  116. if rand.Int()%4 != 0 { // Handle occasional missing deliveries
  117. head := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
  118. task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head)
  119. }
  120. }
  121. request <- task
  122. }
  123. }
  124. }()
  125. }
  126. // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
  127. // It creates multiple subscriptions:
  128. // - one at the start and should receive all posted chain events and a second (blockHashes)
  129. // - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2])
  130. // - one that is created after the second cutoff moment (blockHashes[cutoff2:])
  131. func TestBlockSubscription(t *testing.T) {
  132. t.Parallel()
  133. var (
  134. mux = new(event.TypeMux)
  135. db = ethdb.NewMemDatabase()
  136. txFeed = new(event.Feed)
  137. rmLogsFeed = new(event.Feed)
  138. logsFeed = new(event.Feed)
  139. chainFeed = new(event.Feed)
  140. backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
  141. api = NewPublicFilterAPI(backend, false)
  142. genesis = new(core.Genesis).MustCommit(db)
  143. chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
  144. chainEvents = []core.ChainEvent{}
  145. )
  146. for _, blk := range chain {
  147. chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
  148. }
  149. chan0 := make(chan *types.Header)
  150. sub0 := api.events.SubscribeNewHeads(chan0)
  151. chan1 := make(chan *types.Header)
  152. sub1 := api.events.SubscribeNewHeads(chan1)
  153. go func() { // simulate client
  154. i1, i2 := 0, 0
  155. for i1 != len(chainEvents) || i2 != len(chainEvents) {
  156. select {
  157. case header := <-chan0:
  158. if chainEvents[i1].Hash != header.Hash() {
  159. t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
  160. }
  161. i1++
  162. case header := <-chan1:
  163. if chainEvents[i2].Hash != header.Hash() {
  164. t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
  165. }
  166. i2++
  167. }
  168. }
  169. sub0.Unsubscribe()
  170. sub1.Unsubscribe()
  171. }()
  172. time.Sleep(1 * time.Second)
  173. for _, e := range chainEvents {
  174. chainFeed.Send(e)
  175. }
  176. <-sub0.Err()
  177. <-sub1.Err()
  178. }
  179. // TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
  180. func TestPendingTxFilter(t *testing.T) {
  181. t.Parallel()
  182. var (
  183. mux = new(event.TypeMux)
  184. db = ethdb.NewMemDatabase()
  185. txFeed = new(event.Feed)
  186. rmLogsFeed = new(event.Feed)
  187. logsFeed = new(event.Feed)
  188. chainFeed = new(event.Feed)
  189. backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
  190. api = NewPublicFilterAPI(backend, false)
  191. transactions = []*types.Transaction{
  192. types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
  193. types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
  194. types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
  195. types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
  196. types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
  197. }
  198. hashes []common.Hash
  199. )
  200. fid0 := api.NewPendingTransactionFilter()
  201. time.Sleep(1 * time.Second)
  202. txFeed.Send(core.NewTxsEvent{Txs: transactions})
  203. timeout := time.Now().Add(1 * time.Second)
  204. for {
  205. results, err := api.GetFilterChanges(fid0)
  206. if err != nil {
  207. t.Fatalf("Unable to retrieve logs: %v", err)
  208. }
  209. h := results.([]common.Hash)
  210. hashes = append(hashes, h...)
  211. if len(hashes) >= len(transactions) {
  212. break
  213. }
  214. // check timeout
  215. if time.Now().After(timeout) {
  216. break
  217. }
  218. time.Sleep(100 * time.Millisecond)
  219. }
  220. if len(hashes) != len(transactions) {
  221. t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
  222. return
  223. }
  224. for i := range hashes {
  225. if hashes[i] != transactions[i].Hash() {
  226. t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
  227. }
  228. }
  229. }
  230. // TestLogFilterCreation test whether a given filter criteria makes sense.
  231. // If not it must return an error.
  232. func TestLogFilterCreation(t *testing.T) {
  233. var (
  234. mux = new(event.TypeMux)
  235. db = ethdb.NewMemDatabase()
  236. txFeed = new(event.Feed)
  237. rmLogsFeed = new(event.Feed)
  238. logsFeed = new(event.Feed)
  239. chainFeed = new(event.Feed)
  240. backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
  241. api = NewPublicFilterAPI(backend, false)
  242. testCases = []struct {
  243. crit FilterCriteria
  244. success bool
  245. }{
  246. // defaults
  247. {FilterCriteria{}, true},
  248. // valid block number range
  249. {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true},
  250. // "mined" block range to pending
  251. {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true},
  252. // new mined and pending blocks
  253. {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true},
  254. // from block "higher" than to block
  255. {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false},
  256. // from block "higher" than to block
  257. {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
  258. // from block "higher" than to block
  259. {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
  260. // from block "higher" than to block
  261. {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, false},
  262. }
  263. )
  264. for i, test := range testCases {
  265. _, err := api.NewFilter(test.crit)
  266. if test.success && err != nil {
  267. t.Errorf("expected filter creation for case %d to success, got %v", i, err)
  268. }
  269. if !test.success && err == nil {
  270. t.Errorf("expected testcase %d to fail with an error", i)
  271. }
  272. }
  273. }
  274. // TestInvalidLogFilterCreation tests whether invalid filter log criteria results in an error
  275. // when the filter is created.
  276. func TestInvalidLogFilterCreation(t *testing.T) {
  277. t.Parallel()
  278. var (
  279. mux = new(event.TypeMux)
  280. db = ethdb.NewMemDatabase()
  281. txFeed = new(event.Feed)
  282. rmLogsFeed = new(event.Feed)
  283. logsFeed = new(event.Feed)
  284. chainFeed = new(event.Feed)
  285. backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
  286. api = NewPublicFilterAPI(backend, false)
  287. )
  288. // different situations where log filter creation should fail.
  289. // Reason: fromBlock > toBlock
  290. testCases := []FilterCriteria{
  291. 0: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())},
  292. 1: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)},
  293. 2: {FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)},
  294. }
  295. for i, test := range testCases {
  296. if _, err := api.NewFilter(test); err == nil {
  297. t.Errorf("Expected NewFilter for case #%d to fail", i)
  298. }
  299. }
  300. }
  301. // TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
  302. func TestLogFilter(t *testing.T) {
  303. t.Parallel()
  304. var (
  305. mux = new(event.TypeMux)
  306. db = ethdb.NewMemDatabase()
  307. txFeed = new(event.Feed)
  308. rmLogsFeed = new(event.Feed)
  309. logsFeed = new(event.Feed)
  310. chainFeed = new(event.Feed)
  311. backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
  312. api = NewPublicFilterAPI(backend, false)
  313. firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
  314. secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
  315. thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
  316. notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
  317. firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
  318. secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
  319. notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
  320. // posted twice, once as vm.Logs and once as core.PendingLogsEvent
  321. allLogs = []*types.Log{
  322. {Address: firstAddr},
  323. {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1},
  324. {Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1},
  325. {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 2},
  326. {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3},
  327. }
  328. expectedCase7 = []*types.Log{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]}
  329. expectedCase11 = []*types.Log{allLogs[1], allLogs[2], allLogs[1], allLogs[2]}
  330. testCases = []struct {
  331. crit FilterCriteria
  332. expected []*types.Log
  333. id rpc.ID
  334. }{
  335. // match all
  336. 0: {FilterCriteria{}, allLogs, ""},
  337. // match none due to no matching addresses
  338. 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""},
  339. // match logs based on addresses, ignore topics
  340. 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
  341. // match none due to no matching topics (match with address)
  342. 3: {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, ""},
  343. // match logs based on addresses and topics
  344. 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""},
  345. // match logs based on multiple addresses and "or" topics
  346. 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[2:5], ""},
  347. // logs in the pending block
  348. 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""},
  349. // mined logs with block num >= 2 or pending logs
  350. 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""},
  351. // all "mined" logs with block num >= 2
  352. 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""},
  353. // all "mined" logs
  354. 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""},
  355. // all "mined" logs with 1>= block num <=2 and topic secondTopic
  356. 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""},
  357. // all "mined" and pending logs with topic firstTopic
  358. 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""},
  359. // match all logs due to wildcard topic
  360. 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""},
  361. }
  362. )
  363. // create all filters
  364. for i := range testCases {
  365. testCases[i].id, _ = api.NewFilter(testCases[i].crit)
  366. }
  367. // raise events
  368. time.Sleep(1 * time.Second)
  369. if nsend := logsFeed.Send(allLogs); nsend == 0 {
  370. t.Fatal("Shoud have at least one subscription")
  371. }
  372. if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
  373. t.Fatal(err)
  374. }
  375. for i, tt := range testCases {
  376. var fetched []*types.Log
  377. timeout := time.Now().Add(1 * time.Second)
  378. for { // fetch all expected logs
  379. results, err := api.GetFilterChanges(tt.id)
  380. if err != nil {
  381. t.Fatalf("Unable to fetch logs: %v", err)
  382. }
  383. fetched = append(fetched, results.([]*types.Log)...)
  384. if len(fetched) >= len(tt.expected) {
  385. break
  386. }
  387. // check timeout
  388. if time.Now().After(timeout) {
  389. break
  390. }
  391. time.Sleep(100 * time.Millisecond)
  392. }
  393. if len(fetched) != len(tt.expected) {
  394. t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
  395. return
  396. }
  397. for l := range fetched {
  398. if fetched[l].Removed {
  399. t.Errorf("expected log not to be removed for log %d in case %d", l, i)
  400. }
  401. if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
  402. t.Errorf("invalid log on index %d for case %d", l, i)
  403. }
  404. }
  405. }
  406. }
  407. // TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
  408. func TestPendingLogsSubscription(t *testing.T) {
  409. t.Parallel()
  410. var (
  411. mux = new(event.TypeMux)
  412. db = ethdb.NewMemDatabase()
  413. txFeed = new(event.Feed)
  414. rmLogsFeed = new(event.Feed)
  415. logsFeed = new(event.Feed)
  416. chainFeed = new(event.Feed)
  417. backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
  418. api = NewPublicFilterAPI(backend, false)
  419. firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
  420. secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
  421. thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
  422. notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
  423. firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
  424. secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
  425. thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
  426. fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
  427. notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
  428. allLogs = []core.PendingLogsEvent{
  429. {Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}},
  430. {Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}},
  431. {Logs: []*types.Log{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}},
  432. {Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}},
  433. {Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}},
  434. {Logs: []*types.Log{
  435. {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
  436. {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5},
  437. {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5},
  438. {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
  439. }},
  440. }
  441. convertLogs = func(pl []core.PendingLogsEvent) []*types.Log {
  442. var logs []*types.Log
  443. for _, l := range pl {
  444. logs = append(logs, l.Logs...)
  445. }
  446. return logs
  447. }
  448. testCases = []struct {
  449. crit ethereum.FilterQuery
  450. expected []*types.Log
  451. c chan []*types.Log
  452. sub *Subscription
  453. }{
  454. // match all
  455. {ethereum.FilterQuery{}, convertLogs(allLogs), nil, nil},
  456. // match none due to no matching addresses
  457. {ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil},
  458. // match logs based on addresses, ignore topics
  459. {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
  460. // match none due to no matching topics (match with address)
  461. {ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, nil, nil},
  462. // match logs based on addresses and topics
  463. {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil},
  464. // match logs based on multiple addresses and "or" topics
  465. {ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil},
  466. // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
  467. {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
  468. // multiple pending logs, should match only 2 topics from the logs in block 5
  469. {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
  470. }
  471. )
  472. // create all subscriptions, this ensures all subscriptions are created before the events are posted.
  473. // on slow machines this could otherwise lead to missing events when the subscription is created after
  474. // (some) events are posted.
  475. for i := range testCases {
  476. testCases[i].c = make(chan []*types.Log)
  477. testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
  478. }
  479. for n, test := range testCases {
  480. i := n
  481. tt := test
  482. go func() {
  483. var fetched []*types.Log
  484. fetchLoop:
  485. for {
  486. logs := <-tt.c
  487. fetched = append(fetched, logs...)
  488. if len(fetched) >= len(tt.expected) {
  489. break fetchLoop
  490. }
  491. }
  492. if len(fetched) != len(tt.expected) {
  493. panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)))
  494. }
  495. for l := range fetched {
  496. if fetched[l].Removed {
  497. panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i))
  498. }
  499. if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
  500. panic(fmt.Sprintf("invalid log on index %d for case %d", l, i))
  501. }
  502. }
  503. }()
  504. }
  505. // raise events
  506. time.Sleep(1 * time.Second)
  507. // allLogs are type of core.PendingLogsEvent
  508. for _, l := range allLogs {
  509. if err := mux.Post(l); err != nil {
  510. t.Fatal(err)
  511. }
  512. }
  513. }