Add code comments, remove debug logging
This commit is contained in:
parent
1ed3de57d4
commit
b8928b1e57
@ -49,6 +49,31 @@ func callbackifyChanPubSub(receiver, fn reflect.Value) *callback {
|
|||||||
|
|
||||||
retFnType := reflect.FuncOf(append([]reflect.Type{receiver.Type(), contextType}, c.argTypes...), []reflect.Type{subscriptionType, errorType}, false)
|
retFnType := reflect.FuncOf(append([]reflect.Type{receiver.Type(), contextType}, c.argTypes...), []reflect.Type{subscriptionType, errorType}, false)
|
||||||
|
|
||||||
|
// // What follows uses reflection to construct a dynamically typed function equivalent to:
|
||||||
|
// func(receiver <T>, cctx context.Context, args ...<T>) (rpc.Subscription, error) {
|
||||||
|
// notifier, supported := NotifierFromContext(cctx)
|
||||||
|
// if !supported { return Subscription{}, ErrNotificationsUnsupported}
|
||||||
|
// ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
// ch, err := fn()
|
||||||
|
// if err != nil { return Subscription{}, err }
|
||||||
|
// rpcSub := notifier.CreateSubscription()
|
||||||
|
// go func() {
|
||||||
|
// select {
|
||||||
|
// case v, ok := <- ch:
|
||||||
|
// if !ok { return }
|
||||||
|
// notifier.Notify(rpcSub.ID, v)
|
||||||
|
// case <-rpcSub.Err():
|
||||||
|
// cancel()
|
||||||
|
// return
|
||||||
|
// case <-notifier.Closed():
|
||||||
|
// cancel()
|
||||||
|
// return
|
||||||
|
// }
|
||||||
|
// }()
|
||||||
|
// return rpcSub, nil
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
|
||||||
c.fn = reflect.MakeFunc(retFnType, func(args []reflect.Value) ([]reflect.Value) {
|
c.fn = reflect.MakeFunc(retFnType, func(args []reflect.Value) ([]reflect.Value) {
|
||||||
notifier, supported := NotifierFromContext(args[1].Interface().(context.Context))
|
notifier, supported := NotifierFromContext(args[1].Interface().(context.Context))
|
||||||
if !supported {
|
if !supported {
|
||||||
@ -56,9 +81,7 @@ func callbackifyChanPubSub(receiver, fn reflect.Value) *callback {
|
|||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
args[1] = reflect.ValueOf(ctx)
|
args[1] = reflect.ValueOf(ctx)
|
||||||
log.Info("Calling with args", "args", args, "expecting", fntype.NumIn())
|
|
||||||
out := fn.Call(args)
|
out := fn.Call(args)
|
||||||
log.Info("Called with args", "args", args, "out", out)
|
|
||||||
if !out[1].IsNil() {
|
if !out[1].IsNil() {
|
||||||
// This amounts to: if err != nil { return nil, err }
|
// This amounts to: if err != nil { return nil, err }
|
||||||
return []reflect.Value{reflect.Zero(subscriptionType), out[1]}
|
return []reflect.Value{reflect.Zero(subscriptionType), out[1]}
|
||||||
@ -79,17 +102,13 @@ func callbackifyChanPubSub(receiver, fn reflect.Value) *callback {
|
|||||||
switch chosen {
|
switch chosen {
|
||||||
case 0: // val, ok := <-ch
|
case 0: // val, ok := <-ch
|
||||||
if !recvOK {
|
if !recvOK {
|
||||||
log.Info("!recvok, closing")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Info("Sending value to notifier", "value", val.Interface())
|
|
||||||
notifier.Notify(rpcSub.ID, val.Interface())
|
notifier.Notify(rpcSub.ID, val.Interface())
|
||||||
case 1:
|
case 1:
|
||||||
log.Info("rpcSubErr")
|
|
||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
case 2:
|
case 2:
|
||||||
log.Info("notifier closed")
|
|
||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user