123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- // Copyright 2016 The go-ethereum Authors
- // This file is part of the go-ethereum library.
- //
- // The go-ethereum library is free software: you can redistribute it and/or modify
- // it under the terms of the GNU Lesser General Public License as published by
- // the Free Software Foundation, either version 3 of the License, or
- // (at your option) any later version.
- //
- // The go-ethereum library is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU Lesser General Public License for more details.
- //
- // You should have received a copy of the GNU Lesser General Public License
- // along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
- package rpc
- import (
- "context"
- "errors"
- "sync"
- )
- var (
- // ErrNotificationsUnsupported is returned when the connection doesn't support notifications
- ErrNotificationsUnsupported = errors.New("notifications not supported")
- // ErrNotificationNotFound is returned when the notification for the given id is not found
- ErrSubscriptionNotFound = errors.New("subscription not found")
- )
- // ID defines a pseudo random number that is used to identify RPC subscriptions.
- type ID string
- // a Subscription is created by a notifier and tight to that notifier. The client can use
- // this subscription to wait for an unsubscribe request for the client, see Err().
- type Subscription struct {
- ID ID
- namespace string
- err chan error // closed on unsubscribe
- }
- // Err returns a channel that is closed when the client send an unsubscribe request.
- func (s *Subscription) Err() <-chan error {
- return s.err
- }
- // notifierKey is used to store a notifier within the connection context.
- type notifierKey struct{}
- // Notifier is tight to a RPC connection that supports subscriptions.
- // Server callbacks use the notifier to send notifications.
- type Notifier struct {
- codec ServerCodec
- subMu sync.RWMutex // guards active and inactive maps
- active map[ID]*Subscription
- inactive map[ID]*Subscription
- }
- // newNotifier creates a new notifier that can be used to send subscription
- // notifications to the client.
- func newNotifier(codec ServerCodec) *Notifier {
- return &Notifier{
- codec: codec,
- active: make(map[ID]*Subscription),
- inactive: make(map[ID]*Subscription),
- }
- }
- // NotifierFromContext returns the Notifier value stored in ctx, if any.
- func NotifierFromContext(ctx context.Context) (*Notifier, bool) {
- n, ok := ctx.Value(notifierKey{}).(*Notifier)
- return n, ok
- }
- // CreateSubscription returns a new subscription that is coupled to the
- // RPC connection. By default subscriptions are inactive and notifications
- // are dropped until the subscription is marked as active. This is done
- // by the RPC server after the subscription ID is send to the client.
- func (n *Notifier) CreateSubscription() *Subscription {
- s := &Subscription{ID: NewID(), err: make(chan error)}
- n.subMu.Lock()
- n.inactive[s.ID] = s
- n.subMu.Unlock()
- return s
- }
- // Notify sends a notification to the client with the given data as payload.
- // If an error occurs the RPC connection is closed and the error is returned.
- func (n *Notifier) Notify(id ID, data interface{}) error {
- n.subMu.RLock()
- defer n.subMu.RUnlock()
- sub, active := n.active[id]
- if active {
- notification := n.codec.CreateNotification(string(id), sub.namespace, data)
- if err := n.codec.Write(notification); err != nil {
- n.codec.Close()
- return err
- }
- }
- return nil
- }
- // Closed returns a channel that is closed when the RPC connection is closed.
- func (n *Notifier) Closed() <-chan interface{} {
- return n.codec.Closed()
- }
- // unsubscribe a subscription.
- // If the subscription could not be found ErrSubscriptionNotFound is returned.
- func (n *Notifier) unsubscribe(id ID) error {
- n.subMu.Lock()
- defer n.subMu.Unlock()
- if s, found := n.active[id]; found {
- close(s.err)
- delete(n.active, id)
- return nil
- }
- return ErrSubscriptionNotFound
- }
- // activate enables a subscription. Until a subscription is enabled all
- // notifications are dropped. This method is called by the RPC server after
- // the subscription ID was sent to client. This prevents notifications being
- // send to the client before the subscription ID is send to the client.
- func (n *Notifier) activate(id ID, namespace string) {
- n.subMu.Lock()
- defer n.subMu.Unlock()
- if sub, found := n.inactive[id]; found {
- sub.namespace = namespace
- n.active[id] = sub
- delete(n.inactive, id)
- }
- }
|