make push and addLocal atomic
This commit is contained in:
parent
c779e4e8ee
commit
580980d149
@ -432,9 +432,14 @@ func (mp *MessagePool) runLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
|
||||
func (mp *MessagePool) addLocal(m *types.SignedMessage) error {
|
||||
mp.localAddrs[m.Message.From] = struct{}{}
|
||||
|
||||
msgb, err := m.Serialize()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("error serializing message: %w", err)
|
||||
}
|
||||
|
||||
if err := mp.localMsgs.Put(datastore.NewKey(string(m.Cid().Bytes())), msgb); err != nil {
|
||||
return xerrors.Errorf("persisting local message: %w", err)
|
||||
}
|
||||
@ -507,11 +512,6 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
<-mp.addSema
|
||||
}()
|
||||
|
||||
msgb, err := m.Serialize()
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
publish, err := mp.addTs(m, mp.curTs, true, false)
|
||||
if err != nil {
|
||||
@ -520,18 +520,19 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
|
||||
}
|
||||
mp.curTsLk.Unlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
if err := mp.addLocal(m, msgb); err != nil {
|
||||
mp.lk.Unlock()
|
||||
return cid.Undef, err
|
||||
}
|
||||
mp.lk.Unlock()
|
||||
|
||||
if publish {
|
||||
msgb, err := m.Serialize()
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("error serializing message: %w", err)
|
||||
}
|
||||
|
||||
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("error publishing message: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return m.Cid(), err
|
||||
return m.Cid(), nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
|
||||
@ -670,7 +671,19 @@ func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet, local,
|
||||
return false, err
|
||||
}
|
||||
|
||||
return publish, mp.addLocked(m, !local, untrusted)
|
||||
err = mp.addLocked(m, !local, untrusted)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if local {
|
||||
err = mp.addLocal(m)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("error persisting local message: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return publish, nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) addLoaded(m *types.SignedMessage) error {
|
||||
@ -837,11 +850,6 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
||||
<-mp.addSema
|
||||
}()
|
||||
|
||||
msgb, err := m.Serialize()
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
mp.curTsLk.Lock()
|
||||
publish, err := mp.addTs(m, mp.curTs, false, true)
|
||||
if err != nil {
|
||||
@ -850,18 +858,19 @@ func (mp *MessagePool) PushUntrusted(m *types.SignedMessage) (cid.Cid, error) {
|
||||
}
|
||||
mp.curTsLk.Unlock()
|
||||
|
||||
mp.lk.Lock()
|
||||
if err := mp.addLocal(m, msgb); err != nil {
|
||||
mp.lk.Unlock()
|
||||
return cid.Undef, err
|
||||
}
|
||||
mp.lk.Unlock()
|
||||
|
||||
if publish {
|
||||
msgb, err := m.Serialize()
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("error serializing message: %w", err)
|
||||
}
|
||||
|
||||
err = mp.api.PubSubPublish(build.MessagesTopic(mp.netName), msgb)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("error publishing message: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return m.Cid(), err
|
||||
return m.Cid(), nil
|
||||
}
|
||||
|
||||
func (mp *MessagePool) Remove(from address.Address, nonce uint64, applied bool) {
|
||||
|
Loading…
Reference in New Issue
Block a user