summaryrefslogtreecommitdiff
path: root/src/Network/Haskoin/Node/Peer.hs
diff options
context:
space:
mode:
Diffstat (limited to 'src/Network/Haskoin/Node/Peer.hs')
-rw-r--r--src/Network/Haskoin/Node/Peer.hs22
1 files changed, 12 insertions, 10 deletions
diff --git a/src/Network/Haskoin/Node/Peer.hs b/src/Network/Haskoin/Node/Peer.hs
index db2a4f8..a0ef82b 100644
--- a/src/Network/Haskoin/Node/Peer.hs
+++ b/src/Network/Haskoin/Node/Peer.hs
@@ -10,7 +10,6 @@ module Network.Haskoin.Node.Peer
( peer
) where
-import Control.Concurrent.NQE
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
@@ -34,6 +33,7 @@ import Network.Haskoin.Network
import Network.Haskoin.Node.Common
import Network.Haskoin.Transaction
import Network.Socket (SockAddr)
+import NQE
import System.Random
import UnliftIO
@@ -78,18 +78,21 @@ peer pc p =
go = handshake >> exchangePing >> peerLoop
net = peerConfNetwork pc
peerSession hp ad = do
- let src = appSource ad .| inPeerConduit net
+ let src =
+ runConduit $
+ appSource ad .| inPeerConduit net .| conduitMailbox p
snk = outPeerConduit net .| appSink ad
- withSource src p . const $ do
+ withAsync src $ \as -> do
+ link as
pbox <- newTVarIO []
let rd =
PeerReader
- { myConfig = pc
- , mySelf = p
- , myHostPort = hp
- , mySockAddr = na
- , myPending = pbox
- }
+ { myConfig = pc
+ , mySelf = p
+ , myHostPort = hp
+ , mySockAddr = na
+ , myPending = pbox
+ }
runReaderT (runConduit (go .| snk)) rd
handshake :: MonadPeer m => ConduitT () Message m ()
@@ -324,7 +327,6 @@ incoming m = do
inPeerConduit :: MonadIO m => Network -> ConduitT ByteString PeerMessage m ()
inPeerConduit net = do
headerBytes <- CB.take 24
- when (BL.null headerBytes) $ throwIO MessageHeaderEmpty
case decodeLazy headerBytes of
Left e -> throwIO $ DecodeMessageError e
Right (MessageHeader _ _cmd len _) -> do