summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxenog <>2018-10-09 19:38:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-10-09 19:38:00 (GMT)
commitca1e7f4ba2946a9106e9a52b92eb02c8101264e0 (patch)
tree0f0698024f64f2ecc824f1bc5059f5fb2e09c27a
parent4969029af589f3fb7806fcdfd2912170665a6f49 (diff)
version 0.7.30.7.3
-rw-r--r--CHANGELOG.md7
-rw-r--r--haskoin-node.cabal4
-rw-r--r--src/Haskoin/Node.hs3
-rw-r--r--src/Network/Haskoin/Node/Chain.hs72
-rw-r--r--src/Network/Haskoin/Node/Common.hs12
-rw-r--r--src/Network/Haskoin/Node/Manager.hs9
-rw-r--r--src/Network/Haskoin/Node/Peer.hs15
7 files changed, 64 insertions, 58 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6f29bf6..226cf11 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+## 0.8.0
+### Changed
+- Peers are now killed directly instead of through peer manager.
+
+### Removed
+- Chain no longer needs peer manager.
+
## 0.7.2
### Added
- Compatibility with base 4.12.
diff --git a/haskoin-node.cabal b/haskoin-node.cabal
index fd25b87..b63f95a 100644
--- a/haskoin-node.cabal
+++ b/haskoin-node.cabal
@@ -2,10 +2,10 @@
--
-- see: https://github.com/sol/hpack
--
--- hash: 1161a8dd2e01cf78483f75d114e2d6c9fd4aabcd28f9ac7876d6ebc3c2080beb
+-- hash: 452c92d9fa3318d7245a306b30b0d4299de05bf280116ff37a2f38a9f1651d26
name: haskoin-node
-version: 0.7.2
+version: 0.7.3
synopsis: Haskoin Node P2P library for Bitcoin and Bitcoin Cash
description: Bitcoin and Bitcoin Cash peer-to-peer protocol library featuring headers-first synchronisation.
category: Bitcoin, Finance, Network
diff --git a/src/Haskoin/Node.hs b/src/Haskoin/Node.hs
index 9b30943..8b6d50f 100644
--- a/src/Haskoin/Node.hs
+++ b/src/Haskoin/Node.hs
@@ -31,7 +31,7 @@ module Haskoin.Node
, node
, managerGetPeers
, managerGetPeer
- , managerKill
+ , killPeer
, sendMessage
, peerGetPublisher
, peerGetBlocks
@@ -94,7 +94,6 @@ node cfg mgr_inbox ch_inbox = do
let chain_config =
ChainConfig
{ chainConfDB = nodeConfDB cfg
- , chainConfManager = mgr
, chainConfNetwork = nodeConfNet cfg
, chainConfEvents = chain_events
, chainConfTimeout = nodeConfTimeout cfg
diff --git a/src/Network/Haskoin/Node/Chain.hs b/src/Network/Haskoin/Node/Chain.hs
index c762e49..acb17cb 100644
--- a/src/Network/Haskoin/Node/Chain.hs
+++ b/src/Network/Haskoin/Node/Chain.hs
@@ -24,6 +24,9 @@ module Network.Haskoin.Node.Chain
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
+import Control.Monad.Trans.Maybe
+import Data.List
+import Data.Maybe
import Data.String.Conversions
import Data.Text (Text)
import Data.Time.Clock
@@ -31,13 +34,14 @@ import Network.Haskoin.Block
import Network.Haskoin.Network
import Network.Haskoin.Node.Chain.Logic
import Network.Haskoin.Node.Common
+import Network.Socket (SockAddr)
import NQE
import System.Random
import UnliftIO
import UnliftIO.Concurrent
type MonadChain m
- = (MonadLoggerIO m, MonadChainLogic ChainConfig Peer m)
+ = (MonadLoggerIO m, MonadChainLogic ChainConfig (Peer, SockAddr) m)
-- | Launch process to synchronize block headers in current thread.
chain ::
@@ -80,10 +84,12 @@ chainEvent e = do
processHeaders ::
MonadChain m => Peer -> [BlockHeader] -> m ()
-processHeaders p hs = do
- s <- peerString p
+processHeaders p hs = void . runMaybeT $ do
+ a <- peerAddr p >>= \case
+ Just a -> return a
+ Nothing -> MaybeT $ return Nothing
+ let s = peerString (Just a)
net <- chainConfNetwork <$> asks myReader
- mgr <- chainConfManager <$> asks myReader
$(logDebugS) "Chain" $
"Importing " <> cs (show (length hs)) <> " headers from peer " <> s
importHeaders net hs >>= \case
@@ -91,7 +97,7 @@ processHeaders p hs = do
$(logErrorS) "Chain" $
"Could not connect headers sent by peer " <> s <> ": " <>
cs (show e)
- managerKill e p mgr
+ e `killPeer` p
Right done -> do
setLastReceived
best <- getBestBlockHeader
@@ -101,10 +107,10 @@ processHeaders p hs = do
$(logDebugS) "Chain" $
"Finished importing headers from peer: " <> s
MSendHeaders `sendMessage` p
- finishPeer p
+ finishPeer (p, a)
syncNewPeer
syncNotif
- else syncPeer p
+ else syncPeer p a
syncNewPeer :: MonadChain m => m ()
syncNewPeer = do
@@ -115,39 +121,40 @@ syncNewPeer = do
nextPeer >>= \case
Nothing ->
$(logInfoS) "Chain" "Finished syncing against all peers"
- Just p -> syncPeer p
- Just p -> do
- s <- peerString p
- $(logDebugS) "Chain" $ "Already syncing against peer " <> s
+ Just (p, a) -> syncPeer p a
+ Just (_, a) ->
+ $(logDebugS) "Chain" $
+ "Already syncing against peer " <> peerString (Just a)
syncNotif :: MonadChain m => m ()
syncNotif =
notifySynced >>= \x ->
when x $ getBestBlockHeader >>= chainEvent . ChainSynced
-syncPeer :: MonadChain m => Peer -> m ()
-syncPeer p = do
- s <- peerString p
- $(logInfoS) "Chain" $ "Syncing against peer " <> s
+syncPeer :: MonadChain m => Peer -> SockAddr -> m ()
+syncPeer p a = do
+ $(logInfoS) "Chain" $ "Syncing against peer " <> peerString (Just a)
bb <- getBestBlockHeader
- gh <- syncHeaders bb p
+ gh <- syncHeaders bb (p, a)
MGetHeaders gh `sendMessage` p
chainMessage :: MonadChain m => ChainMessage -> m ()
chainMessage (ChainGetBest reply) =
getBestBlockHeader >>= atomically . reply
chainMessage (ChainHeaders p hs) = do
- s <- peerString p
+ s <- peerString <$> peerAddr p
$(logDebugS) "Chain" $
"Processing " <> cs (show (length hs)) <> " headers from peer " <> s
processHeaders p hs
chainMessage (ChainPeerConnected p a) = do
- $(logDebugS) "Chain" $ "Adding new peer to sync queue: " <> cs (show a)
- addPeer p
+ let s = peerString (Just a)
+ $(logDebugS) "Chain" $ "Adding new peer to sync queue: " <> s
+ addPeer (p, a)
syncNewPeer
chainMessage (ChainPeerDisconnected p a) = do
- $(logWarnS) "Chain" $ "Removing a peer from sync queue: " <> cs (show a)
- finishPeer p
+ let s = peerString (Just a)
+ $(logWarnS) "Chain" $ "Removing a peer from sync queue: " <> s
+ finishPeer (p, a)
syncNewPeer
chainMessage (ChainGetAncestor h n reply) =
getAncestor h n >>= atomically . reply
@@ -158,15 +165,15 @@ chainMessage (ChainGetBlock h reply) =
chainMessage (ChainIsSynced reply) =
isSynced >>= atomically . reply
chainMessage ChainPing = do
- ChainConfig {chainConfManager = mgr, chainConfTimeout = to} <- asks myReader
+ ChainConfig {chainConfTimeout = to} <- asks myReader
now <- liftIO getCurrentTime
lastMessage >>= \case
Nothing -> return ()
- Just (p, t)
+ Just ((p, a), t)
| diffUTCTime now t > fromIntegral to -> do
- s <- peerString p
+ let s = peerString (Just a)
$(logErrorS) "Chain" $ "Syncing peer timed out: " <> s
- managerKill PeerTimeout p mgr
+ PeerTimeout `killPeer` p
| otherwise -> return ()
withSyncLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Chain -> m a -> m a
@@ -178,9 +185,12 @@ withSyncLoop ch f = withAsync go $ \a -> link a >> f
liftIO (randomRIO (250 * 1000, 1000 * 1000))
ChainPing `send` ch
-peerString :: MonadChain m => Peer -> m Text
-peerString p = do
- ChainConfig {chainConfManager = mgr} <- asks myReader
- managerGetPeer p mgr >>= \case
- Nothing -> return "[unknown]"
- Just o -> return . cs . show $ onlinePeerAddress o
+peerAddr :: MonadChain m => Peer -> m (Maybe SockAddr)
+peerAddr p =
+ asks chainState >>= readTVarIO >>= \s ->
+ let ps = newPeers s <> maybeToList (fst <$> syncingPeer s)
+ in return $ snd <$> find ((== p) . fst) ps
+
+peerString :: Maybe SockAddr -> Text
+peerString Nothing = "[unknown]"
+peerString (Just a) = cs $ show a
diff --git a/src/Network/Haskoin/Node/Common.hs b/src/Network/Haskoin/Node/Common.hs
index 7031baa..dff6c5d 100644
--- a/src/Network/Haskoin/Node/Common.hs
+++ b/src/Network/Haskoin/Node/Common.hs
@@ -130,9 +130,6 @@ data ManagerConfig = ManagerConfig
data ManagerMessage
= ManagerConnect
-- ^ try to connect to peers
- | ManagerKill !PeerException
- !Peer
- -- ^ kill this peer with supplied exception
| ManagerGetPeers !(Listen [OnlinePeer])
-- ^ get all connected peers
| ManagerGetOnlinePeer !Peer !(Listen (Maybe OnlinePeer))
@@ -151,9 +148,7 @@ data ManagerMessage
-- | Configuration for chain syncing process.
data ChainConfig = ChainConfig
{ chainConfDB :: !DB
- -- ^ RocksDB database handle
- , chainConfManager :: !Manager
- -- ^ peer manager mailbox
+ -- ^ database handle
, chainConfNetwork :: !Network
-- ^ network constants
, chainConfEvents :: !(Listen ChainEvent)
@@ -264,6 +259,7 @@ data PeerEvent
-- | Incoming messages that a peer accepts.
data PeerMessage
= GetPublisher !(Listen (Publisher Message))
+ | KillPeer !PeerException
| SendMessage !Message
-- | Resolve a host and port to a list of 'SockAddr'. May make use DNS resolver.
@@ -318,8 +314,8 @@ managerGetPeer :: MonadIO m => Peer -> Manager -> m (Maybe OnlinePeer)
managerGetPeer p mgr = ManagerGetOnlinePeer p `query` mgr
-- | Ask manager to kill a peer with the provided exception.
-managerKill :: MonadIO m => PeerException -> Peer -> Manager -> m ()
-managerKill e p mgr = ManagerKill e p `send` mgr
+killPeer :: MonadIO m => PeerException -> Peer -> m ()
+killPeer e p = KillPeer e `send` p
-- | Internal function used by manager to check peers periodically.
managerCheck :: MonadIO m => Peer -> Manager -> m ()
diff --git a/src/Network/Haskoin/Node/Manager.hs b/src/Network/Haskoin/Node/Manager.hs
index 7172637..cbf8955 100644
--- a/src/Network/Haskoin/Node/Manager.hs
+++ b/src/Network/Haskoin/Node/Manager.hs
@@ -214,7 +214,6 @@ managerMessage ManagerConnect = do
getNewPeer >>= \case
Nothing -> return ()
Just sa -> connectPeer sa
-managerMessage (ManagerKill e p) = killPeer e p
managerMessage (ManagerPeerDied a e) = processPeerOffline a e
managerMessage ManagerPurgePeers = do
$(logWarnS) "Manager" "Purging connected peers and peer database"
@@ -260,14 +259,6 @@ pingPeer p = do
$(logWarnS) "Manager" $
"Will not ping peer " <> s <> " until handshake complete"
-killPeer :: MonadManager m => PeerException -> Peer -> m ()
-killPeer e p = void . runMaybeT $ do
- b <- asks onlinePeers
- o <- MaybeT . atomically $ findPeer b p
- s <- atomically $ peerString b p
- $(logErrorS) "Manager" $ "Killing peer " <> s <> ": " <> cs (show e)
- onlinePeerAsync o `cancelWith` e
-
processPeerOffline :: MonadManager m => Child -> Maybe SomeException -> m ()
processPeerOffline a e = do
b <- asks onlinePeers
diff --git a/src/Network/Haskoin/Node/Peer.hs b/src/Network/Haskoin/Node/Peer.hs
index 829ec9a..1a6d95e 100644
--- a/src/Network/Haskoin/Node/Peer.hs
+++ b/src/Network/Haskoin/Node/Peer.hs
@@ -47,10 +47,11 @@ peer pc inbox = withConnection a $ \ad -> runReaderT (peer_session ad) pc
a = peerConfAddress pc
go = forever $ receive inbox >>= dispatchMessage pc
net = peerConfNetwork pc
+ p = inboxToMailbox inbox
peer_session ad =
let ins = appSource ad
ons = appSink ad
- src = runConduit $ ins .| inPeerConduit net a .| mapM_C send_msg
+ src = runConduit $ ins .| inPeerConduit net a p .| mapM_C send_msg
snk = outPeerConduit net .| ons
in withAsync src $ \as -> do
link as
@@ -66,36 +67,38 @@ dispatchMessage cfg (SendMessage msg) = do
yield msg
dispatchMessage cfg (GetPublisher reply) =
atomically $ reply (peerConfListen cfg)
+dispatchMessage _ (KillPeer e) =
+ throwIO e
-- | Internal conduit to parse messages coming from peer.
inPeerConduit ::
MonadLoggerIO m
=> Network
-> SockAddr
+ -> Peer
-> ConduitT ByteString Message m ()
-inPeerConduit net a = do
+inPeerConduit net a p = forever $ do
x <- takeCE 24 .| foldC
case decode x of
Left _ -> do
$(logErrorS)
(peerString a)
"Could not decode incoming message header"
- throwIO DecodeHeaderError
+ DecodeHeaderError `killPeer` p
Right (MessageHeader _ _ len _) -> do
when (len > 32 * 2 ^ (20 :: Int)) $ do
$(logErrorS) (peerString a) "Payload too large"
- throwIO $ PayloadTooLarge len
+ PayloadTooLarge len `killPeer` p
y <- takeCE (fromIntegral len) .| foldC
case runGet (getMessage net) $ x `B.append` y of
Left e -> do
$(logErrorS) (peerString a) $
"Cannot decode payload: " <> cs (show e)
- throwIO CannotDecodePayload
+ CannotDecodePayload `killPeer` p
Right msg -> do
$(logDebugS) (peerString a) $
"Incoming: " <> cs (commandToString (msgType msg))
yield msg
- inPeerConduit net a
-- | Outgoing peer conduit to serialize and send messages.
outPeerConduit :: Monad m => Network -> ConduitT Message ByteString m ()