123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package filters
- import (
- "context"
- "fmt"
- "math/big"
- "math/rand"
- "reflect"
- "testing"
- "time"
- ethereum "github.com/ethereum/go-ethereum"
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/consensus/ethash"
- "github.com/ethereum/go-ethereum/core"
- "github.com/ethereum/go-ethereum/core/bloombits"
- "github.com/ethereum/go-ethereum/core/rawdb"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/ethdb"
- "github.com/ethereum/go-ethereum/event"
- "github.com/ethereum/go-ethereum/params"
- "github.com/ethereum/go-ethereum/rpc"
- )
- type testBackend struct {
- mux *event.TypeMux
- db ethdb.Database
- sections uint64
- txFeed *event.Feed
- rmLogsFeed *event.Feed
- logsFeed *event.Feed
- chainFeed *event.Feed
- }
- func (b *testBackend) ChainDb() ethdb.Database {
- return b.db
- }
- func (b *testBackend) EventMux() *event.TypeMux {
- return b.mux
- }
- func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
- var (
- hash common.Hash
- num uint64
- )
- if blockNr == rpc.LatestBlockNumber {
- hash = rawdb.ReadHeadBlockHash(b.db)
- number := rawdb.ReadHeaderNumber(b.db, hash)
- if number == nil {
- return nil, nil
- }
- num = *number
- } else {
- num = uint64(blockNr)
- hash = rawdb.ReadCanonicalHash(b.db, num)
- }
- return rawdb.ReadHeader(b.db, hash, num), nil
- }
- func (b *testBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) {
- if number := rawdb.ReadHeaderNumber(b.db, hash); number != nil {
- return rawdb.ReadReceipts(b.db, hash, *number), nil
- }
- return nil, nil
- }
- func (b *testBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) {
- number := rawdb.ReadHeaderNumber(b.db, hash)
- if number == nil {
- return nil, nil
- }
- receipts := rawdb.ReadReceipts(b.db, hash, *number)
- logs := make([][]*types.Log, len(receipts))
- for i, receipt := range receipts {
- logs[i] = receipt.Logs
- }
- return logs, nil
- }
- func (b *testBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
- return b.txFeed.Subscribe(ch)
- }
- func (b *testBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription {
- return b.rmLogsFeed.Subscribe(ch)
- }
- func (b *testBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription {
- return b.logsFeed.Subscribe(ch)
- }
- func (b *testBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription {
- return b.chainFeed.Subscribe(ch)
- }
- func (b *testBackend) BloomStatus() (uint64, uint64) {
- return params.BloomBitsBlocks, b.sections
- }
- func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) {
- requests := make(chan chan *bloombits.Retrieval)
- go session.Multiplex(16, 0, requests)
- go func() {
- for {
- // Wait for a service request or a shutdown
- select {
- case <-ctx.Done():
- return
- case request := <-requests:
- task := <-request
- task.Bitsets = make([][]byte, len(task.Sections))
- for i, section := range task.Sections {
- if rand.Int()%4 != 0 { // Handle occasional missing deliveries
- head := rawdb.ReadCanonicalHash(b.db, (section+1)*params.BloomBitsBlocks-1)
- task.Bitsets[i], _ = rawdb.ReadBloomBits(b.db, task.Bit, section, head)
- }
- }
- request <- task
- }
- }
- }()
- }
- // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events.
- // It creates multiple subscriptions:
- // - one at the start and should receive all posted chain events and a second (blockHashes)
- // - one that is created after a cutoff moment and uninstalled after a second cutoff moment (blockHashes[cutoff1:cutoff2])
- // - one that is created after the second cutoff moment (blockHashes[cutoff2:])
- func TestBlockSubscription(t *testing.T) {
- t.Parallel()
- var (
- mux = new(event.TypeMux)
- db = ethdb.NewMemDatabase()
- txFeed = new(event.Feed)
- rmLogsFeed = new(event.Feed)
- logsFeed = new(event.Feed)
- chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
- genesis = new(core.Genesis).MustCommit(db)
- chain, _ = core.GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 10, func(i int, gen *core.BlockGen) {})
- chainEvents = []core.ChainEvent{}
- )
- for _, blk := range chain {
- chainEvents = append(chainEvents, core.ChainEvent{Hash: blk.Hash(), Block: blk})
- }
- chan0 := make(chan *types.Header)
- sub0 := api.events.SubscribeNewHeads(chan0)
- chan1 := make(chan *types.Header)
- sub1 := api.events.SubscribeNewHeads(chan1)
- go func() { // simulate client
- i1, i2 := 0, 0
- for i1 != len(chainEvents) || i2 != len(chainEvents) {
- select {
- case header := <-chan0:
- if chainEvents[i1].Hash != header.Hash() {
- t.Errorf("sub0 received invalid hash on index %d, want %x, got %x", i1, chainEvents[i1].Hash, header.Hash())
- }
- i1++
- case header := <-chan1:
- if chainEvents[i2].Hash != header.Hash() {
- t.Errorf("sub1 received invalid hash on index %d, want %x, got %x", i2, chainEvents[i2].Hash, header.Hash())
- }
- i2++
- }
- }
- sub0.Unsubscribe()
- sub1.Unsubscribe()
- }()
- time.Sleep(1 * time.Second)
- for _, e := range chainEvents {
- chainFeed.Send(e)
- }
- <-sub0.Err()
- <-sub1.Err()
- }
- // TestPendingTxFilter tests whether pending tx filters retrieve all pending transactions that are posted to the event mux.
- func TestPendingTxFilter(t *testing.T) {
- t.Parallel()
- var (
- mux = new(event.TypeMux)
- db = ethdb.NewMemDatabase()
- txFeed = new(event.Feed)
- rmLogsFeed = new(event.Feed)
- logsFeed = new(event.Feed)
- chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
- transactions = []*types.Transaction{
- types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
- types.NewTransaction(1, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
- types.NewTransaction(2, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
- types.NewTransaction(3, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
- types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
- }
- hashes []common.Hash
- )
- fid0 := api.NewPendingTransactionFilter()
- time.Sleep(1 * time.Second)
- txFeed.Send(core.NewTxsEvent{Txs: transactions})
- timeout := time.Now().Add(1 * time.Second)
- for {
- results, err := api.GetFilterChanges(fid0)
- if err != nil {
- t.Fatalf("Unable to retrieve logs: %v", err)
- }
- h := results.([]common.Hash)
- hashes = append(hashes, h...)
- if len(hashes) >= len(transactions) {
- break
- }
- // check timeout
- if time.Now().After(timeout) {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- if len(hashes) != len(transactions) {
- t.Errorf("invalid number of transactions, want %d transactions(s), got %d", len(transactions), len(hashes))
- return
- }
- for i := range hashes {
- if hashes[i] != transactions[i].Hash() {
- t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), hashes[i])
- }
- }
- }
- // TestLogFilterCreation test whether a given filter criteria makes sense.
- // If not it must return an error.
- func TestLogFilterCreation(t *testing.T) {
- var (
- mux = new(event.TypeMux)
- db = ethdb.NewMemDatabase()
- txFeed = new(event.Feed)
- rmLogsFeed = new(event.Feed)
- logsFeed = new(event.Feed)
- chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
- testCases = []struct {
- crit FilterCriteria
- success bool
- }{
- // defaults
- {FilterCriteria{}, true},
- // valid block number range
- {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2)}, true},
- // "mined" block range to pending
- {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, true},
- // new mined and pending blocks
- {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, true},
- // from block "higher" than to block
- {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(1)}, false},
- // from block "higher" than to block
- {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
- // from block "higher" than to block
- {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)}, false},
- // from block "higher" than to block
- {FilterCriteria{FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, false},
- }
- )
- for i, test := range testCases {
- _, err := api.NewFilter(test.crit)
- if test.success && err != nil {
- t.Errorf("expected filter creation for case %d to success, got %v", i, err)
- }
- if !test.success && err == nil {
- t.Errorf("expected testcase %d to fail with an error", i)
- }
- }
- }
- // TestInvalidLogFilterCreation tests whether invalid filter log criteria results in an error
- // when the filter is created.
- func TestInvalidLogFilterCreation(t *testing.T) {
- t.Parallel()
- var (
- mux = new(event.TypeMux)
- db = ethdb.NewMemDatabase()
- txFeed = new(event.Feed)
- rmLogsFeed = new(event.Feed)
- logsFeed = new(event.Feed)
- chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
- )
- // different situations where log filter creation should fail.
- // Reason: fromBlock > toBlock
- testCases := []FilterCriteria{
- 0: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())},
- 1: {FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(100)},
- 2: {FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(100)},
- }
- for i, test := range testCases {
- if _, err := api.NewFilter(test); err == nil {
- t.Errorf("Expected NewFilter for case #%d to fail", i)
- }
- }
- }
- // TestLogFilter tests whether log filters match the correct logs that are posted to the event feed.
- func TestLogFilter(t *testing.T) {
- t.Parallel()
- var (
- mux = new(event.TypeMux)
- db = ethdb.NewMemDatabase()
- txFeed = new(event.Feed)
- rmLogsFeed = new(event.Feed)
- logsFeed = new(event.Feed)
- chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
- firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
- secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
- thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
- notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
- firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
- secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
- notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
- // posted twice, once as vm.Logs and once as core.PendingLogsEvent
- allLogs = []*types.Log{
- {Address: firstAddr},
- {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1},
- {Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1},
- {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 2},
- {Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3},
- }
- expectedCase7 = []*types.Log{allLogs[3], allLogs[4], allLogs[0], allLogs[1], allLogs[2], allLogs[3], allLogs[4]}
- expectedCase11 = []*types.Log{allLogs[1], allLogs[2], allLogs[1], allLogs[2]}
- testCases = []struct {
- crit FilterCriteria
- expected []*types.Log
- id rpc.ID
- }{
- // match all
- 0: {FilterCriteria{}, allLogs, ""},
- // match none due to no matching addresses
- 1: {FilterCriteria{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, ""},
- // match logs based on addresses, ignore topics
- 2: {FilterCriteria{Addresses: []common.Address{firstAddr}}, allLogs[:2], ""},
- // match none due to no matching topics (match with address)
- 3: {FilterCriteria{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, ""},
- // match logs based on addresses and topics
- 4: {FilterCriteria{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[3:5], ""},
- // match logs based on multiple addresses and "or" topics
- 5: {FilterCriteria{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, allLogs[2:5], ""},
- // logs in the pending block
- 6: {FilterCriteria{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, allLogs[:2], ""},
- // mined logs with block num >= 2 or pending logs
- 7: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64())}, expectedCase7, ""},
- // all "mined" logs with block num >= 2
- 8: {FilterCriteria{FromBlock: big.NewInt(2), ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs[3:], ""},
- // all "mined" logs
- 9: {FilterCriteria{ToBlock: big.NewInt(rpc.LatestBlockNumber.Int64())}, allLogs, ""},
- // all "mined" logs with 1>= block num <=2 and topic secondTopic
- 10: {FilterCriteria{FromBlock: big.NewInt(1), ToBlock: big.NewInt(2), Topics: [][]common.Hash{{secondTopic}}}, allLogs[3:4], ""},
- // all "mined" and pending logs with topic firstTopic
- 11: {FilterCriteria{FromBlock: big.NewInt(rpc.LatestBlockNumber.Int64()), ToBlock: big.NewInt(rpc.PendingBlockNumber.Int64()), Topics: [][]common.Hash{{firstTopic}}}, expectedCase11, ""},
- // match all logs due to wildcard topic
- 12: {FilterCriteria{Topics: [][]common.Hash{nil}}, allLogs[1:], ""},
- }
- )
- // create all filters
- for i := range testCases {
- testCases[i].id, _ = api.NewFilter(testCases[i].crit)
- }
- // raise events
- time.Sleep(1 * time.Second)
- if nsend := logsFeed.Send(allLogs); nsend == 0 {
- t.Fatal("Shoud have at least one subscription")
- }
- if err := mux.Post(core.PendingLogsEvent{Logs: allLogs}); err != nil {
- t.Fatal(err)
- }
- for i, tt := range testCases {
- var fetched []*types.Log
- timeout := time.Now().Add(1 * time.Second)
- for { // fetch all expected logs
- results, err := api.GetFilterChanges(tt.id)
- if err != nil {
- t.Fatalf("Unable to fetch logs: %v", err)
- }
- fetched = append(fetched, results.([]*types.Log)...)
- if len(fetched) >= len(tt.expected) {
- break
- }
- // check timeout
- if time.Now().After(timeout) {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
- if len(fetched) != len(tt.expected) {
- t.Errorf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched))
- return
- }
- for l := range fetched {
- if fetched[l].Removed {
- t.Errorf("expected log not to be removed for log %d in case %d", l, i)
- }
- if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
- t.Errorf("invalid log on index %d for case %d", l, i)
- }
- }
- }
- }
- // TestPendingLogsSubscription tests if a subscription receives the correct pending logs that are posted to the event feed.
- func TestPendingLogsSubscription(t *testing.T) {
- t.Parallel()
- var (
- mux = new(event.TypeMux)
- db = ethdb.NewMemDatabase()
- txFeed = new(event.Feed)
- rmLogsFeed = new(event.Feed)
- logsFeed = new(event.Feed)
- chainFeed = new(event.Feed)
- backend = &testBackend{mux, db, 0, txFeed, rmLogsFeed, logsFeed, chainFeed}
- api = NewPublicFilterAPI(backend, false)
- firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
- secondAddr = common.HexToAddress("0x2222222222222222222222222222222222222222")
- thirdAddress = common.HexToAddress("0x3333333333333333333333333333333333333333")
- notUsedAddress = common.HexToAddress("0x9999999999999999999999999999999999999999")
- firstTopic = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
- secondTopic = common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222")
- thirdTopic = common.HexToHash("0x3333333333333333333333333333333333333333333333333333333333333333")
- fourthTopic = common.HexToHash("0x4444444444444444444444444444444444444444444444444444444444444444")
- notUsedTopic = common.HexToHash("0x9999999999999999999999999999999999999999999999999999999999999999")
- allLogs = []core.PendingLogsEvent{
- {Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{}, BlockNumber: 0}}},
- {Logs: []*types.Log{{Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 1}}},
- {Logs: []*types.Log{{Address: secondAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 2}}},
- {Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 3}}},
- {Logs: []*types.Log{{Address: thirdAddress, Topics: []common.Hash{secondTopic}, BlockNumber: 4}}},
- {Logs: []*types.Log{
- {Address: thirdAddress, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
- {Address: thirdAddress, Topics: []common.Hash{thirdTopic}, BlockNumber: 5},
- {Address: thirdAddress, Topics: []common.Hash{fourthTopic}, BlockNumber: 5},
- {Address: firstAddr, Topics: []common.Hash{firstTopic}, BlockNumber: 5},
- }},
- }
- convertLogs = func(pl []core.PendingLogsEvent) []*types.Log {
- var logs []*types.Log
- for _, l := range pl {
- logs = append(logs, l.Logs...)
- }
- return logs
- }
- testCases = []struct {
- crit ethereum.FilterQuery
- expected []*types.Log
- c chan []*types.Log
- sub *Subscription
- }{
- // match all
- {ethereum.FilterQuery{}, convertLogs(allLogs), nil, nil},
- // match none due to no matching addresses
- {ethereum.FilterQuery{Addresses: []common.Address{{}, notUsedAddress}, Topics: [][]common.Hash{nil}}, []*types.Log{}, nil, nil},
- // match logs based on addresses, ignore topics
- {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
- // match none due to no matching topics (match with address)
- {ethereum.FilterQuery{Addresses: []common.Address{secondAddr}, Topics: [][]common.Hash{{notUsedTopic}}}, []*types.Log{}, nil, nil},
- // match logs based on addresses and topics
- {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[3:5]), allLogs[5].Logs[0]), nil, nil},
- // match logs based on multiple addresses and "or" topics
- {ethereum.FilterQuery{Addresses: []common.Address{secondAddr, thirdAddress}, Topics: [][]common.Hash{{firstTopic, secondTopic}}}, append(convertLogs(allLogs[2:5]), allLogs[5].Logs[0]), nil, nil},
- // block numbers are ignored for filters created with New***Filter, these return all logs that match the given criteria when the state changes
- {ethereum.FilterQuery{Addresses: []common.Address{firstAddr}, FromBlock: big.NewInt(2), ToBlock: big.NewInt(3)}, append(convertLogs(allLogs[:2]), allLogs[5].Logs[3]), nil, nil},
- // multiple pending logs, should match only 2 topics from the logs in block 5
- {ethereum.FilterQuery{Addresses: []common.Address{thirdAddress}, Topics: [][]common.Hash{{firstTopic, fourthTopic}}}, []*types.Log{allLogs[5].Logs[0], allLogs[5].Logs[2]}, nil, nil},
- }
- )
- // create all subscriptions, this ensures all subscriptions are created before the events are posted.
- // on slow machines this could otherwise lead to missing events when the subscription is created after
- // (some) events are posted.
- for i := range testCases {
- testCases[i].c = make(chan []*types.Log)
- testCases[i].sub, _ = api.events.SubscribeLogs(testCases[i].crit, testCases[i].c)
- }
- for n, test := range testCases {
- i := n
- tt := test
- go func() {
- var fetched []*types.Log
- fetchLoop:
- for {
- logs := <-tt.c
- fetched = append(fetched, logs...)
- if len(fetched) >= len(tt.expected) {
- break fetchLoop
- }
- }
- if len(fetched) != len(tt.expected) {
- panic(fmt.Sprintf("invalid number of logs for case %d, want %d log(s), got %d", i, len(tt.expected), len(fetched)))
- }
- for l := range fetched {
- if fetched[l].Removed {
- panic(fmt.Sprintf("expected log not to be removed for log %d in case %d", l, i))
- }
- if !reflect.DeepEqual(fetched[l], tt.expected[l]) {
- panic(fmt.Sprintf("invalid log on index %d for case %d", l, i))
- }
- }
- }()
- }
- // raise events
- time.Sleep(1 * time.Second)
- // allLogs are type of core.PendingLogsEvent
- for _, l := range allLogs {
- if err := mux.Post(l); err != nil {
- t.Fatal(err)
- }
- }
- }
|