api.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553
  1. // Copyright 2015 The go-ethereum Authors
  2. // This file is part of the go-ethereum library.
  3. //
  4. // The go-ethereum library is free software: you can redistribute it and/or modify
  5. // it under the terms of the GNU Lesser General Public License as published by
  6. // the Free Software Foundation, either version 3 of the License, or
  7. // (at your option) any later version.
  8. //
  9. // The go-ethereum library is distributed in the hope that it will be useful,
  10. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. // GNU Lesser General Public License for more details.
  13. //
  14. // You should have received a copy of the GNU Lesser General Public License
  15. // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
  16. package filters
  17. import (
  18. "context"
  19. "encoding/json"
  20. "errors"
  21. "fmt"
  22. "math/big"
  23. "sync"
  24. "time"
  25. ethereum "github.com/ethereum/go-ethereum"
  26. "github.com/ethereum/go-ethereum/common"
  27. "github.com/ethereum/go-ethereum/common/hexutil"
  28. "github.com/ethereum/go-ethereum/core/types"
  29. "github.com/ethereum/go-ethereum/ethdb"
  30. "github.com/ethereum/go-ethereum/event"
  31. "github.com/ethereum/go-ethereum/rpc"
  32. )
  33. var (
  34. deadline = 5 * time.Minute // consider a filter inactive if it has not been polled for within deadline
  35. )
  36. // filter is a helper struct that holds meta information over the filter type
  37. // and associated subscription in the event system.
  38. type filter struct {
  39. typ Type
  40. deadline *time.Timer // filter is inactiv when deadline triggers
  41. hashes []common.Hash
  42. crit FilterCriteria
  43. logs []*types.Log
  44. s *Subscription // associated subscription in event system
  45. }
  46. // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
  47. // information related to the Ethereum protocol such als blocks, transactions and logs.
  48. type PublicFilterAPI struct {
  49. backend Backend
  50. mux *event.TypeMux
  51. quit chan struct{}
  52. chainDb ethdb.Database
  53. events *EventSystem
  54. filtersMu sync.Mutex
  55. filters map[rpc.ID]*filter
  56. }
  57. // NewPublicFilterAPI returns a new PublicFilterAPI instance.
  58. func NewPublicFilterAPI(backend Backend, lightMode bool) *PublicFilterAPI {
  59. api := &PublicFilterAPI{
  60. backend: backend,
  61. mux: backend.EventMux(),
  62. chainDb: backend.ChainDb(),
  63. events: NewEventSystem(backend.EventMux(), backend, lightMode),
  64. filters: make(map[rpc.ID]*filter),
  65. }
  66. go api.timeoutLoop()
  67. return api
  68. }
  69. // timeoutLoop runs every 5 minutes and deletes filters that have not been recently used.
  70. // Tt is started when the api is created.
  71. func (api *PublicFilterAPI) timeoutLoop() {
  72. ticker := time.NewTicker(5 * time.Minute)
  73. for {
  74. <-ticker.C
  75. api.filtersMu.Lock()
  76. for id, f := range api.filters {
  77. select {
  78. case <-f.deadline.C:
  79. f.s.Unsubscribe()
  80. delete(api.filters, id)
  81. default:
  82. continue
  83. }
  84. }
  85. api.filtersMu.Unlock()
  86. }
  87. }
  88. // NewPendingTransactionFilter creates a filter that fetches pending transaction hashes
  89. // as transactions enter the pending state.
  90. //
  91. // It is part of the filter package because this filter can be used through the
  92. // `eth_getFilterChanges` polling method that is also used for log filters.
  93. //
  94. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilter
  95. func (api *PublicFilterAPI) NewPendingTransactionFilter() rpc.ID {
  96. var (
  97. pendingTxs = make(chan []common.Hash)
  98. pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
  99. )
  100. api.filtersMu.Lock()
  101. api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: pendingTxSub}
  102. api.filtersMu.Unlock()
  103. go func() {
  104. for {
  105. select {
  106. case ph := <-pendingTxs:
  107. api.filtersMu.Lock()
  108. if f, found := api.filters[pendingTxSub.ID]; found {
  109. f.hashes = append(f.hashes, ph...)
  110. }
  111. api.filtersMu.Unlock()
  112. case <-pendingTxSub.Err():
  113. api.filtersMu.Lock()
  114. delete(api.filters, pendingTxSub.ID)
  115. api.filtersMu.Unlock()
  116. return
  117. }
  118. }
  119. }()
  120. return pendingTxSub.ID
  121. }
  122. // NewPendingTransactions creates a subscription that is triggered each time a transaction
  123. // enters the transaction pool and was signed from one of the transactions this nodes manages.
  124. func (api *PublicFilterAPI) NewPendingTransactions(ctx context.Context) (*rpc.Subscription, error) {
  125. notifier, supported := rpc.NotifierFromContext(ctx)
  126. if !supported {
  127. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  128. }
  129. rpcSub := notifier.CreateSubscription()
  130. go func() {
  131. txHashes := make(chan []common.Hash, 128)
  132. pendingTxSub := api.events.SubscribePendingTxs(txHashes)
  133. for {
  134. select {
  135. case hashes := <-txHashes:
  136. // To keep the original behaviour, send a single tx hash in one notification.
  137. // TODO(rjl493456442) Send a batch of tx hashes in one notification
  138. for _, h := range hashes {
  139. notifier.Notify(rpcSub.ID, h)
  140. }
  141. case <-rpcSub.Err():
  142. pendingTxSub.Unsubscribe()
  143. return
  144. case <-notifier.Closed():
  145. pendingTxSub.Unsubscribe()
  146. return
  147. }
  148. }
  149. }()
  150. return rpcSub, nil
  151. }
  152. // NewBlockFilter creates a filter that fetches blocks that are imported into the chain.
  153. // It is part of the filter package since polling goes with eth_getFilterChanges.
  154. //
  155. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newblockfilter
  156. func (api *PublicFilterAPI) NewBlockFilter() rpc.ID {
  157. var (
  158. headers = make(chan *types.Header)
  159. headerSub = api.events.SubscribeNewHeads(headers)
  160. )
  161. api.filtersMu.Lock()
  162. api.filters[headerSub.ID] = &filter{typ: BlocksSubscription, deadline: time.NewTimer(deadline), hashes: make([]common.Hash, 0), s: headerSub}
  163. api.filtersMu.Unlock()
  164. go func() {
  165. for {
  166. select {
  167. case h := <-headers:
  168. api.filtersMu.Lock()
  169. if f, found := api.filters[headerSub.ID]; found {
  170. f.hashes = append(f.hashes, h.Hash())
  171. }
  172. api.filtersMu.Unlock()
  173. case <-headerSub.Err():
  174. api.filtersMu.Lock()
  175. delete(api.filters, headerSub.ID)
  176. api.filtersMu.Unlock()
  177. return
  178. }
  179. }
  180. }()
  181. return headerSub.ID
  182. }
  183. // NewHeads send a notification each time a new (header) block is appended to the chain.
  184. func (api *PublicFilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) {
  185. notifier, supported := rpc.NotifierFromContext(ctx)
  186. if !supported {
  187. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  188. }
  189. rpcSub := notifier.CreateSubscription()
  190. go func() {
  191. headers := make(chan *types.Header)
  192. headersSub := api.events.SubscribeNewHeads(headers)
  193. for {
  194. select {
  195. case h := <-headers:
  196. notifier.Notify(rpcSub.ID, h)
  197. case <-rpcSub.Err():
  198. headersSub.Unsubscribe()
  199. return
  200. case <-notifier.Closed():
  201. headersSub.Unsubscribe()
  202. return
  203. }
  204. }
  205. }()
  206. return rpcSub, nil
  207. }
  208. // Logs creates a subscription that fires for all new log that match the given filter criteria.
  209. func (api *PublicFilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subscription, error) {
  210. notifier, supported := rpc.NotifierFromContext(ctx)
  211. if !supported {
  212. return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
  213. }
  214. var (
  215. rpcSub = notifier.CreateSubscription()
  216. matchedLogs = make(chan []*types.Log)
  217. )
  218. logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), matchedLogs)
  219. if err != nil {
  220. return nil, err
  221. }
  222. go func() {
  223. for {
  224. select {
  225. case logs := <-matchedLogs:
  226. for _, log := range logs {
  227. notifier.Notify(rpcSub.ID, &log)
  228. }
  229. case <-rpcSub.Err(): // client send an unsubscribe request
  230. logsSub.Unsubscribe()
  231. return
  232. case <-notifier.Closed(): // connection dropped
  233. logsSub.Unsubscribe()
  234. return
  235. }
  236. }
  237. }()
  238. return rpcSub, nil
  239. }
  240. // FilterCriteria represents a request to create a new filter.
  241. // Same as ethereum.FilterQuery but with UnmarshalJSON() method.
  242. type FilterCriteria ethereum.FilterQuery
  243. // NewFilter creates a new filter and returns the filter id. It can be
  244. // used to retrieve logs when the state changes. This method cannot be
  245. // used to fetch logs that are already stored in the state.
  246. //
  247. // Default criteria for the from and to block are "latest".
  248. // Using "latest" as block number will return logs for mined blocks.
  249. // Using "pending" as block number returns logs for not yet mined (pending) blocks.
  250. // In case logs are removed (chain reorg) previously returned logs are returned
  251. // again but with the removed property set to true.
  252. //
  253. // In case "fromBlock" > "toBlock" an error is returned.
  254. //
  255. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_newfilter
  256. func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) (rpc.ID, error) {
  257. logs := make(chan []*types.Log)
  258. logsSub, err := api.events.SubscribeLogs(ethereum.FilterQuery(crit), logs)
  259. if err != nil {
  260. return rpc.ID(""), err
  261. }
  262. api.filtersMu.Lock()
  263. api.filters[logsSub.ID] = &filter{typ: LogsSubscription, crit: crit, deadline: time.NewTimer(deadline), logs: make([]*types.Log, 0), s: logsSub}
  264. api.filtersMu.Unlock()
  265. go func() {
  266. for {
  267. select {
  268. case l := <-logs:
  269. api.filtersMu.Lock()
  270. if f, found := api.filters[logsSub.ID]; found {
  271. f.logs = append(f.logs, l...)
  272. }
  273. api.filtersMu.Unlock()
  274. case <-logsSub.Err():
  275. api.filtersMu.Lock()
  276. delete(api.filters, logsSub.ID)
  277. api.filtersMu.Unlock()
  278. return
  279. }
  280. }
  281. }()
  282. return logsSub.ID, nil
  283. }
  284. // GetLogs returns logs matching the given argument that are stored within the state.
  285. //
  286. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs
  287. func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) {
  288. // Convert the RPC block numbers into internal representations
  289. if crit.FromBlock == nil {
  290. crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
  291. }
  292. if crit.ToBlock == nil {
  293. crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64())
  294. }
  295. // Create and run the filter to get all the logs
  296. filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics)
  297. logs, err := filter.Logs(ctx)
  298. if err != nil {
  299. return nil, err
  300. }
  301. return returnLogs(logs), err
  302. }
  303. // UninstallFilter removes the filter with the given filter id.
  304. //
  305. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_uninstallfilter
  306. func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool {
  307. api.filtersMu.Lock()
  308. f, found := api.filters[id]
  309. if found {
  310. delete(api.filters, id)
  311. }
  312. api.filtersMu.Unlock()
  313. if found {
  314. f.s.Unsubscribe()
  315. }
  316. return found
  317. }
  318. // GetFilterLogs returns the logs for the filter with the given id.
  319. // If the filter could not be found an empty array of logs is returned.
  320. //
  321. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs
  322. func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]*types.Log, error) {
  323. api.filtersMu.Lock()
  324. f, found := api.filters[id]
  325. api.filtersMu.Unlock()
  326. if !found || f.typ != LogsSubscription {
  327. return nil, fmt.Errorf("filter not found")
  328. }
  329. begin := rpc.LatestBlockNumber.Int64()
  330. if f.crit.FromBlock != nil {
  331. begin = f.crit.FromBlock.Int64()
  332. }
  333. end := rpc.LatestBlockNumber.Int64()
  334. if f.crit.ToBlock != nil {
  335. end = f.crit.ToBlock.Int64()
  336. }
  337. // Create and run the filter to get all the logs
  338. filter := New(api.backend, begin, end, f.crit.Addresses, f.crit.Topics)
  339. logs, err := filter.Logs(ctx)
  340. if err != nil {
  341. return nil, err
  342. }
  343. return returnLogs(logs), nil
  344. }
  345. // GetFilterChanges returns the logs for the filter with the given id since
  346. // last time it was called. This can be used for polling.
  347. //
  348. // For pending transaction and block filters the result is []common.Hash.
  349. // (pending)Log filters return []Log.
  350. //
  351. // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges
  352. func (api *PublicFilterAPI) GetFilterChanges(id rpc.ID) (interface{}, error) {
  353. api.filtersMu.Lock()
  354. defer api.filtersMu.Unlock()
  355. if f, found := api.filters[id]; found {
  356. if !f.deadline.Stop() {
  357. // timer expired but filter is not yet removed in timeout loop
  358. // receive timer value and reset timer
  359. <-f.deadline.C
  360. }
  361. f.deadline.Reset(deadline)
  362. switch f.typ {
  363. case PendingTransactionsSubscription, BlocksSubscription:
  364. hashes := f.hashes
  365. f.hashes = nil
  366. return returnHashes(hashes), nil
  367. case LogsSubscription:
  368. logs := f.logs
  369. f.logs = nil
  370. return returnLogs(logs), nil
  371. }
  372. }
  373. return []interface{}{}, fmt.Errorf("filter not found")
  374. }
  375. // returnHashes is a helper that will return an empty hash array case the given hash array is nil,
  376. // otherwise the given hashes array is returned.
  377. func returnHashes(hashes []common.Hash) []common.Hash {
  378. if hashes == nil {
  379. return []common.Hash{}
  380. }
  381. return hashes
  382. }
  383. // returnLogs is a helper that will return an empty log array in case the given logs array is nil,
  384. // otherwise the given logs array is returned.
  385. func returnLogs(logs []*types.Log) []*types.Log {
  386. if logs == nil {
  387. return []*types.Log{}
  388. }
  389. return logs
  390. }
  391. // UnmarshalJSON sets *args fields with given data.
  392. func (args *FilterCriteria) UnmarshalJSON(data []byte) error {
  393. type input struct {
  394. From *rpc.BlockNumber `json:"fromBlock"`
  395. ToBlock *rpc.BlockNumber `json:"toBlock"`
  396. Addresses interface{} `json:"address"`
  397. Topics []interface{} `json:"topics"`
  398. }
  399. var raw input
  400. if err := json.Unmarshal(data, &raw); err != nil {
  401. return err
  402. }
  403. if raw.From != nil {
  404. args.FromBlock = big.NewInt(raw.From.Int64())
  405. }
  406. if raw.ToBlock != nil {
  407. args.ToBlock = big.NewInt(raw.ToBlock.Int64())
  408. }
  409. args.Addresses = []common.Address{}
  410. if raw.Addresses != nil {
  411. // raw.Address can contain a single address or an array of addresses
  412. switch rawAddr := raw.Addresses.(type) {
  413. case []interface{}:
  414. for i, addr := range rawAddr {
  415. if strAddr, ok := addr.(string); ok {
  416. addr, err := decodeAddress(strAddr)
  417. if err != nil {
  418. return fmt.Errorf("invalid address at index %d: %v", i, err)
  419. }
  420. args.Addresses = append(args.Addresses, addr)
  421. } else {
  422. return fmt.Errorf("non-string address at index %d", i)
  423. }
  424. }
  425. case string:
  426. addr, err := decodeAddress(rawAddr)
  427. if err != nil {
  428. return fmt.Errorf("invalid address: %v", err)
  429. }
  430. args.Addresses = []common.Address{addr}
  431. default:
  432. return errors.New("invalid addresses in query")
  433. }
  434. }
  435. // topics is an array consisting of strings and/or arrays of strings.
  436. // JSON null values are converted to common.Hash{} and ignored by the filter manager.
  437. if len(raw.Topics) > 0 {
  438. args.Topics = make([][]common.Hash, len(raw.Topics))
  439. for i, t := range raw.Topics {
  440. switch topic := t.(type) {
  441. case nil:
  442. // ignore topic when matching logs
  443. case string:
  444. // match specific topic
  445. top, err := decodeTopic(topic)
  446. if err != nil {
  447. return err
  448. }
  449. args.Topics[i] = []common.Hash{top}
  450. case []interface{}:
  451. // or case e.g. [null, "topic0", "topic1"]
  452. for _, rawTopic := range topic {
  453. if rawTopic == nil {
  454. // null component, match all
  455. args.Topics[i] = nil
  456. break
  457. }
  458. if topic, ok := rawTopic.(string); ok {
  459. parsed, err := decodeTopic(topic)
  460. if err != nil {
  461. return err
  462. }
  463. args.Topics[i] = append(args.Topics[i], parsed)
  464. } else {
  465. return fmt.Errorf("invalid topic(s)")
  466. }
  467. }
  468. default:
  469. return fmt.Errorf("invalid topic(s)")
  470. }
  471. }
  472. }
  473. return nil
  474. }
  475. func decodeAddress(s string) (common.Address, error) {
  476. b, err := hexutil.Decode(s)
  477. if err == nil && len(b) != common.AddressLength {
  478. err = fmt.Errorf("hex has invalid length %d after decoding", len(b))
  479. }
  480. return common.BytesToAddress(b), err
  481. }
  482. func decodeTopic(s string) (common.Hash, error) {
  483. b, err := hexutil.Decode(s)
  484. if err == nil && len(b) != common.HashLength {
  485. err = fmt.Errorf("hex has invalid length %d after decoding", len(b))
  486. }
  487. return common.BytesToHash(b), err
  488. }