summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxenog <>2018-10-09 16:03:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-10-09 16:03:00 (GMT)
commit8d64d14eb95814285d20e584048dd49a5129271d (patch)
tree474235397bebd97ab6a55191c6894efbf2279ba6
parenta268a529b02c730442c671f6d78e82c62eaa9ada (diff)
version 0.7.00.7.0
-rw-r--r--CHANGELOG.md22
-rw-r--r--haskoin-node.cabal22
-rw-r--r--src/Haskoin/Node.hs118
-rw-r--r--src/Network/Haskoin/Node/Chain.hs377
-rw-r--r--src/Network/Haskoin/Node/Chain/Logic.hs284
-rw-r--r--src/Network/Haskoin/Node/Common.hs599
-rw-r--r--src/Network/Haskoin/Node/Manager.hs874
-rw-r--r--src/Network/Haskoin/Node/Manager/Logic.hs406
-rw-r--r--src/Network/Haskoin/Node/Peer.hs391
-rw-r--r--test/Haskoin/NodeSpec.hs147
-rw-r--r--test/Spec.hs220
11 files changed, 1829 insertions, 1631 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 40382eb..d9a780c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,28 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+## 0.7.0
+### Added
+- Versioning for chain and peer database.
+- Automatic purging of chain and peer database when version changes.
+- Add extra timers.
+- Add publishers to every peer.
+
+### Changed
+- Full reimplementation of node API.
+- Simplify peer selection and management.
+- Merge manager and peer events.
+- Rename configuration variables for node.
+- Separate logic from actors for peer manager and chain.
+
+### Removed
+- Remove irrelevant fields from peer information.
+- Remove unreliable peer block head tracking.
+- Remove dependency on deprecated binary conduits.
+- Remove Bloom filter support from manager.
+- Remove unreliable peer request tracking code.
+- Remove separate manager events.
+
## 0.6.1
### Changed
- Fix bug where peer height did not update in certain cases.
diff --git a/haskoin-node.cabal b/haskoin-node.cabal
index 8f7819f..bc2c34b 100644
--- a/haskoin-node.cabal
+++ b/haskoin-node.cabal
@@ -2,10 +2,10 @@
--
-- see: https://github.com/sol/hpack
--
--- hash: 442a53cc8f60132054fb6ad206821818652bc1a8f6f0f0661f2c0609db575380
+-- hash: 0e33d7c5993dd152a270e0957b4aec8b09e67fb8f760b01945022ae62df8ac7b
name: haskoin-node
-version: 0.6.1
+version: 0.7.0
synopsis: Haskoin Node P2P library for Bitcoin and Bitcoin Cash
description: Bitcoin and Bitcoin Cash peer-to-peer protocol library featuring headers-first synchronisation.
category: Bitcoin, Finance, Network
@@ -28,11 +28,13 @@ source-repository head
library
exposed-modules:
Haskoin.Node
- other-modules:
Network.Haskoin.Node.Chain
+ Network.Haskoin.Node.Chain.Logic
Network.Haskoin.Node.Common
Network.Haskoin.Node.Manager
+ Network.Haskoin.Node.Manager.Logic
Network.Haskoin.Node.Peer
+ other-modules:
Paths_haskoin_node
hs-source-dirs:
src
@@ -42,6 +44,8 @@ library
, cereal
, conduit
, conduit-extra
+ , data-default
+ , hashable
, haskoin-core
, monad-logger
, mtl
@@ -52,20 +56,24 @@ library
, rocksdb-haskell
, rocksdb-query
, string-conversions
+ , text
, time
+ , transformers
, unliftio
+ , unordered-containers
default-language: Haskell2010
-test-suite haskoin-node-test
+test-suite spec
type: exitcode-stdio-1.0
main-is: Spec.hs
other-modules:
+ Haskoin.NodeSpec
Paths_haskoin_node
hs-source-dirs:
test
- ghc-options: -threaded -rtsopts -with-rtsopts=-N
build-depends:
- base >=4.7 && <5
+ HUnit
+ , base >=4.7 && <5
, bytestring
, cereal
, haskoin-core
@@ -77,5 +85,7 @@ test-suite haskoin-node-test
, nqe
, random
, rocksdb-haskell
+ , safe
, unliftio
default-language: Haskell2010
+ build-tool-depends: hspec-discover:hspec-discover
diff --git a/src/Haskoin/Node.hs b/src/Haskoin/Node.hs
index 8d150a5..d1a4fbe 100644
--- a/src/Haskoin/Node.hs
+++ b/src/Haskoin/Node.hs
@@ -1,25 +1,38 @@
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
+{-|
+Module : Haskoin.Node
+Copyright : No rights reserved
+License : UNLICENSE
+Maintainer : xenog@protonmail.com
+Stability : experimental
+Portability : POSIX
+
+Integrates peers, manager, and block header synchronization processes.
+
+Messages from peers that aren't consumed by the peer manager or chain are
+forwarded to the listen action provided in the node configuration.
+-}
module Haskoin.Node
- ( Host, Port, HostPort
- , Peer, Chain, Manager
+ ( Host
+ , Port
+ , HostPort
+ , Peer
+ , Chain
+ , Manager
, OnlinePeer(..)
, NodeConfig(..)
, NodeEvent(..)
- , ManagerEvent(..)
, ChainEvent(..)
, PeerEvent(..)
, PeerException(..)
, withNode
- , managerGetPeerVersion
- , managerGetPeerBest
+ , node
, managerGetPeers
, managerGetPeer
, managerKill
- , setManagerFilter
, sendMessage
- , getMerkleBlocks
, peerGetBlocks
, peerGetTxs
, chainGetBlock
@@ -33,46 +46,75 @@ module Haskoin.Node
) where
import Control.Monad.Logger
+import Haskoin
import Network.Haskoin.Node.Chain
import Network.Haskoin.Node.Common
import Network.Haskoin.Node.Manager
import NQE
import UnliftIO
+-- | Launch a node in the background. Pass a 'Manager' and 'Chain' to a
+-- function. Node will stop once the function ends.
withNode ::
- ( MonadLoggerIO m
- , MonadUnliftIO m
- )
+ (MonadLoggerIO m, MonadUnliftIO m)
=> NodeConfig
-> ((Manager, Chain) -> m a)
-> m a
withNode cfg f = do
- c <- newInbox =<< newTQueueIO
- m <- newInbox =<< newTQueueIO
- withAsync (chain (chain_conf c m)) $ \ch ->
- withAsync (manager (manager_conf c m)) $ \mgr -> do
- link ch
- link mgr
- f (m, c)
+ mgr_inbox <- newInbox
+ ch_inbox <- newInbox
+ withAsync (node cfg mgr_inbox ch_inbox) $ \a -> do
+ link a
+ f (inboxToMailbox mgr_inbox, inboxToMailbox ch_inbox)
+
+-- | Launch node process in the foreground.
+node ::
+ ( MonadLoggerIO m
+ , MonadUnliftIO m
+ )
+ => NodeConfig
+ -> Inbox ManagerMessage
+ -> Inbox ChainMessage
+ -> m ()
+node cfg mgr_inbox ch_inbox = do
+ let mgr_config =
+ ManagerConfig
+ { mgrConfMaxPeers = nodeConfMaxPeers cfg
+ , mgrConfDB = nodeConfDB cfg
+ , mgrConfPeers = nodeConfPeers cfg
+ , mgrConfDiscover = nodeConfDiscover cfg
+ , mgrConfNetAddr = nodeConfNetAddr cfg
+ , mgrConfNetwork = nodeConfNet cfg
+ , mgrConfEvents = mgr_events
+ , mgrConfTimeout = nodeConfTimeout cfg
+ }
+ withAsync (manager mgr_config mgr_inbox) $ \mgr_async -> do
+ link mgr_async
+ let chain_config =
+ ChainConfig
+ { chainConfDB = nodeConfDB cfg
+ , chainConfManager = mgr
+ , chainConfNetwork = nodeConfNet cfg
+ , chainConfEvents = chain_events
+ , chainConfTimeout = nodeConfTimeout cfg
+ }
+ chain chain_config ch_inbox
where
- chain_conf c m =
- ChainConfig
- { chainConfDB = database cfg
- , chainConfListener = nodeEvents cfg . ChainEvent
- , chainConfManager = m
- , chainConfChain = c
- , chainConfNetwork = nodeNet cfg
- }
- manager_conf c m =
- ManagerConfig
- { mgrConfMaxPeers = maxPeers cfg
- , mgrConfDB = database cfg
- , mgrConfDiscover = discover cfg
- , mgrConfMgrListener = nodeEvents cfg . ManagerEvent
- , mgrConfPeerListener = nodeEvents cfg . PeerEvent
- , mgrConfNetAddr = netAddress cfg
- , mgrConfPeers = initPeers cfg
- , mgrConfManager = m
- , mgrConfChain = c
- , mgrConfNetwork = nodeNet cfg
- }
+ ch = inboxToMailbox ch_inbox
+ mgr = inboxToMailbox mgr_inbox
+ mgr_events event =
+ case event of
+ PeerMessage p (MHeaders (Headers hcs)) ->
+ ChainHeaders p (map fst hcs) `sendSTM` ch
+ PeerConnected p a -> do
+ ChainPeerConnected p a `sendSTM` ch
+ Event (PeerEvent event) `sendSTM` nodeConfEvents cfg
+ PeerDisconnected p a -> do
+ ChainPeerDisconnected p a `sendSTM` ch
+ Event (PeerEvent event) `sendSTM` nodeConfEvents cfg
+ _ -> Event (PeerEvent event) `sendSTM` nodeConfEvents cfg
+ chain_events event = do
+ Event (ChainEvent event) `sendSTM` nodeConfEvents cfg
+ case event of
+ ChainBestBlock b -> ManagerBestBlock (nodeHeight b) `sendSTM` mgr
+ _ -> return ()
diff --git a/src/Network/Haskoin/Node/Chain.hs b/src/Network/Haskoin/Node/Chain.hs
index 0074f8a..c762e49 100644
--- a/src/Network/Haskoin/Node/Chain.hs
+++ b/src/Network/Haskoin/Node/Chain.hs
@@ -5,269 +5,182 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
-{-# OPTIONS_GHC -fno-warn-orphans #-}
+{-|
+Module : Network.Haskoin.Node.Chain
+Copyright : No rights reserved
+License : UNLICENSE
+Maintainer : xenog@protonmail.com
+Stability : experimental
+Portability : POSIX
+
+Block chain headers synchronizing process.
+-}
module Network.Haskoin.Node.Chain
-( chain
-) where
+ ( chain
+ ) where
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
-import qualified Data.ByteString as BS
-import Data.Either
-import Data.List (delete, nub)
-import Data.Maybe
-import Data.Serialize
-import Data.String
import Data.String.Conversions
-import Database.RocksDB (DB)
-import Database.RocksDB.Query as R
+import Data.Text (Text)
+import Data.Time.Clock
import Network.Haskoin.Block
import Network.Haskoin.Network
+import Network.Haskoin.Node.Chain.Logic
import Network.Haskoin.Node.Common
import NQE
+import System.Random
import UnliftIO
+import UnliftIO.Concurrent
type MonadChain m
- = ( BlockHeaders m
- , MonadLoggerIO m
- , MonadReader ChainReader m)
-
-data ChainState = ChainState
- { syncingPeer :: !(Maybe Peer)
- , newPeers :: ![Peer]
- , mySynced :: !Bool
- }
-
-data ChainReader = ChainReader
- { headerDB :: !DB
- , myConfig :: !ChainConfig
- , chainState :: !(TVar ChainState)
- }
-
-newtype BlockHeaderKey = BlockHeaderKey BlockHash deriving (Eq, Show)
-
-instance Serialize BlockHeaderKey where
- get = do
- guard . (== 0x90) =<< getWord8
- BlockHeaderKey <$> get
- put (BlockHeaderKey bh) = do
- putWord8 0x90
- put bh
-
-data BestBlockKey = BestBlockKey deriving (Eq, Show)
-
-instance KeyValue BlockHeaderKey BlockNode
-instance KeyValue BestBlockKey BlockNode
-
-instance Serialize BestBlockKey where
- get = do
- guard . (== 0x91) =<< getWord8
- return BestBlockKey
- put BestBlockKey = putWord8 0x91
-
-instance (Monad m, MonadLoggerIO m, MonadReader ChainReader m) =>
- BlockHeaders m where
- addBlockHeader bn = do
- db <- asks headerDB
- insert db (BlockHeaderKey (headerHash (nodeHeader bn))) bn
- getBlockHeader bh = do
- db <- asks headerDB
- retrieve db Nothing (BlockHeaderKey bh)
- getBestBlockHeader = do
- db <- asks headerDB
- retrieve db Nothing BestBlockKey >>= \case
- Nothing -> error "Could not get best block from database"
- Just b -> return b
- setBestBlockHeader bn = do
- db <- asks headerDB
- insert db BestBlockKey bn
- addBlockHeaders bns = do
- db <- asks headerDB
- writeBatch db (map f bns)
- where
- f bn = insertOp (BlockHeaderKey (headerHash (nodeHeader bn))) bn
+ = (MonadLoggerIO m, MonadChainLogic ChainConfig Peer m)
+-- | Launch process to synchronize block headers in current thread.
chain ::
- ( MonadUnliftIO m
- , MonadLoggerIO m
- )
+ (MonadUnliftIO m, MonadLoggerIO m)
=> ChainConfig
+ -> Inbox ChainMessage
-> m ()
-chain cfg = do
+chain cfg inbox = do
st <-
newTVarIO
- ChainState {syncingPeer = Nothing, mySynced = False, newPeers = []}
- let rd =
- ChainReader
- {myConfig = cfg, headerDB = chainConfDB cfg, chainState = st}
- run `runReaderT` rd
+ ChainState
+ { syncingPeer = Nothing
+ , mySynced = False
+ , newPeers = []
+ }
+ let rd = ChainReader {myReader = cfg, myChainDB = db, chainState = st}
+ withSyncLoop ch $ run `runReaderT` rd
where
net = chainConfNetwork cfg
+ db = chainConfDB cfg
+ ch = inboxToMailbox inbox
run = do
- db <- asks headerDB
- m :: Maybe BlockNode <- retrieve db Nothing BestBlockKey
- when (isNothing m) $ do
- addBlockHeader (genesisNode net)
- insert db BestBlockKey (genesisNode net)
- forever $ do
- msg <- receive $ chainConfChain cfg
- processChainMessage msg
-
-processChainMessage :: MonadChain m => ChainMessage -> m ()
-processChainMessage (ChainNewHeaders p hcs) = do
- stb <- asks chainState
- st <- readTVarIO stb
- net <- chainConfNetwork <$> asks myConfig
- let spM = syncingPeer st
- t <- computeTime
- bb <- getBestBlockHeader
- bhsE <- connectBlocks net t (map fst hcs)
- case bhsE of
- Right bhs -> conn bb bhs spM
- Left e -> do
- $(logWarnS) "Chain" $ "Could not connect headers: " <> cs e
- case spM of
- Nothing -> do
- bb' <- getBestBlockHeader
- atomically . modifyTVar stb $ \s ->
- s {newPeers = nub $ p : newPeers s}
- syncHeaders bb' p
- Just sp
- | sp == p -> do
- pstr <- peerString p
- $(logErrorS) "Chain" $
- "Syncing peer " <> pstr <> " sent bad headers"
- mgr <- chainConfManager <$> asks myConfig
- managerKill PeerSentBadHeaders p mgr
- atomically . modifyTVar stb $ \s ->
- s {syncingPeer = Nothing}
- processSyncQueue
- | otherwise ->
- atomically . modifyTVar stb $ \s ->
- s {newPeers = nub $ p : newPeers s}
- where
- synced = do
- st <- asks chainState
- atomically . modifyTVar st $ \s -> s {syncingPeer = Nothing}
- MSendHeaders `sendMessage` p
- processSyncQueue
- upeer bb = do
- mgr <- chainConfManager <$> asks myConfig
- managerSetPeerBest p bb mgr
- conn bb bhs spM = do
- bb' <- getBestBlockHeader
- when (bb /= bb') $ do
+ $(logDebugS) "Chain" "Initializing..."
+ initChainDB net
+ getBestBlockHeader >>= chainEvent . ChainBestBlock
+ $(logInfoS) "Chain" "Initialization complete"
+ forever $ receive inbox >>= chainMessage
+
+chainEvent :: MonadChain m => ChainEvent -> m ()
+chainEvent e = do
+ l <- chainConfEvents <$> asks myReader
+ case e of
+ ChainBestBlock b ->
$(logInfoS) "Chain" $
- "Best header at height " <> cs (show (nodeHeight bb'))
- mgr <- chainConfManager <$> asks myConfig
- managerSetBest bb' mgr
- l <- chainConfListener <$> asks myConfig
- atomically . l $ ChainNewBest bb'
- case length hcs of
- 0 -> do
- upeer bb
- synced
- 2000 ->
- case spM of
- Just sp
- | sp == p -> do
- upeer $ head bhs
- syncHeaders (head bhs) p
- _ -> do
- st <- asks chainState
- atomically . modifyTVar st $ \s ->
- s {newPeers = nub $ p : newPeers s}
- _ -> do
- upeer $ head bhs
- synced
-
-processChainMessage (ChainNewPeer p) = do
- st <- asks chainState
- sp <-
- atomically $ do
- modifyTVar st $ \s -> s {newPeers = nub $ p : newPeers s}
- syncingPeer <$> readTVar st
- case sp of
- Nothing -> processSyncQueue
- Just _ -> return ()
-
--- Getting a new block should trigger an action equivalent to getting a new peer
-processChainMessage (ChainNewBlocks p _) = processChainMessage (ChainNewPeer p)
-
-processChainMessage (ChainRemovePeer p) = do
- st <- asks chainState
- sp <-
- atomically $ do
- modifyTVar st $ \s -> s {newPeers = delete p (newPeers s)}
- syncingPeer <$> readTVar st
- case sp of
- Just p' ->
- when (p == p') $ do
- atomically . modifyTVar st $ \s ->
- s {syncingPeer = Nothing}
- processSyncQueue
- Nothing -> return ()
+ "Best block header at height " <> cs (show (nodeHeight b))
+ ChainSynced b ->
+ $(logInfoS) "Chain" $
+ "Headers now synced at height " <> cs (show (nodeHeight b))
+ atomically $ l e
+
+processHeaders ::
+ MonadChain m => Peer -> [BlockHeader] -> m ()
+processHeaders p hs = do
+ s <- peerString p
+ net <- chainConfNetwork <$> asks myReader
+ mgr <- chainConfManager <$> asks myReader
+ $(logDebugS) "Chain" $
+ "Importing " <> cs (show (length hs)) <> " headers from peer " <> s
+ importHeaders net hs >>= \case
+ Left e -> do
+ $(logErrorS) "Chain" $
+ "Could not connect headers sent by peer " <> s <> ": " <>
+ cs (show e)
+ managerKill e p mgr
+ Right done -> do
+ setLastReceived
+ best <- getBestBlockHeader
+ chainEvent $ ChainBestBlock best
+ if done
+ then do
+ $(logDebugS) "Chain" $
+ "Finished importing headers from peer: " <> s
+ MSendHeaders `sendMessage` p
+ finishPeer p
+ syncNewPeer
+ syncNotif
+ else syncPeer p
+
+syncNewPeer :: MonadChain m => m ()
+syncNewPeer = do
+ $(logDebugS) "Chain" "Attempting to sync against a new peer"
+ getSyncingPeer >>= \case
+ Nothing -> do
+ $(logDebugS) "Chain" "Getting next peer to sync from"
+ nextPeer >>= \case
+ Nothing ->
+ $(logInfoS) "Chain" "Finished syncing against all peers"
+ Just p -> syncPeer p
+ Just p -> do
+ s <- peerString p
+ $(logDebugS) "Chain" $ "Already syncing against peer " <> s
+
+syncNotif :: MonadChain m => m ()
+syncNotif =
+ notifySynced >>= \x ->
+ when x $ getBestBlockHeader >>= chainEvent . ChainSynced
+
+syncPeer :: MonadChain m => Peer -> m ()
+syncPeer p = do
+ s <- peerString p
+ $(logInfoS) "Chain" $ "Syncing against peer " <> s
+ bb <- getBestBlockHeader
+ gh <- syncHeaders bb p
+ MGetHeaders gh `sendMessage` p
-processChainMessage (ChainGetBest reply) =
+chainMessage :: MonadChain m => ChainMessage -> m ()
+chainMessage (ChainGetBest reply) =
getBestBlockHeader >>= atomically . reply
-
-processChainMessage (ChainGetAncestor h n reply) =
+chainMessage (ChainHeaders p hs) = do
+ s <- peerString p
+ $(logDebugS) "Chain" $
+ "Processing " <> cs (show (length hs)) <> " headers from peer " <> s
+ processHeaders p hs
+chainMessage (ChainPeerConnected p a) = do
+ $(logDebugS) "Chain" $ "Adding new peer to sync queue: " <> cs (show a)
+ addPeer p
+ syncNewPeer
+chainMessage (ChainPeerDisconnected p a) = do
+ $(logWarnS) "Chain" $ "Removing a peer from sync queue: " <> cs (show a)
+ finishPeer p
+ syncNewPeer
+chainMessage (ChainGetAncestor h n reply) =
getAncestor h n >>= atomically . reply
-
-processChainMessage (ChainGetSplit r l reply) =
+chainMessage (ChainGetSplit r l reply) =
splitPoint r l >>= atomically . reply
-
-processChainMessage (ChainGetBlock h reply) =
+chainMessage (ChainGetBlock h reply) =
getBlockHeader h >>= atomically . reply
-
-processChainMessage (ChainSendHeaders _) = return ()
-
-processChainMessage (ChainIsSynced reply) = do
- st <- asks chainState
- s <- mySynced <$> readTVarIO st
- atomically (reply s)
-
-processSyncQueue :: MonadChain m => m ()
-processSyncQueue = do
- s <- asks chainState >>= readTVarIO
- when (isNothing (syncingPeer s)) $ getBestBlockHeader >>= go s
+chainMessage (ChainIsSynced reply) =
+ isSynced >>= atomically . reply
+chainMessage ChainPing = do
+ ChainConfig {chainConfManager = mgr, chainConfTimeout = to} <- asks myReader
+ now <- liftIO getCurrentTime
+ lastMessage >>= \case
+ Nothing -> return ()
+ Just (p, t)
+ | diffUTCTime now t > fromIntegral to -> do
+ s <- peerString p
+ $(logErrorS) "Chain" $ "Syncing peer timed out: " <> s
+ managerKill PeerTimeout p mgr
+ | otherwise -> return ()
+
+withSyncLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Chain -> m a -> m a
+withSyncLoop ch f = withAsync go $ \a -> link a >> f
where
- go s bb =
- case newPeers s of
- [] ->
- unless (mySynced s) $ do
- l <- chainConfListener <$> asks myConfig
- st <- asks chainState
- atomically $ do
- l (ChainSynced bb)
- writeTVar st s {mySynced = True}
- p:_ -> syncHeaders bb p
-
-syncHeaders :: MonadChain m => BlockNode -> Peer -> m ()
-syncHeaders bb p = do
- st <- asks chainState
- s <- readTVarIO st
- atomically . writeTVar st $
- s {syncingPeer = Just p, newPeers = delete p (newPeers s)}
- loc <- blockLocator bb
- let m =
- MGetHeaders
- GetHeaders
- { getHeadersVersion = myVersion
- , getHeadersBL = loc
- , getHeadersHashStop =
- fromRight (error "Could not decode zero hash") . decode $
- BS.replicate 32 0
- }
- PeerOutgoing m `send` p
+ go =
+ forever $ do
+ threadDelay =<<
+ liftIO (randomRIO (250 * 1000, 1000 * 1000))
+ ChainPing `send` ch
-peerString :: (MonadChain m, IsString a) => Peer -> m a
+peerString :: MonadChain m => Peer -> m Text
peerString p = do
- mgr <- chainConfManager <$> asks myConfig
- managerGetPeer mgr p >>= \case
+ ChainConfig {chainConfManager = mgr} <- asks myReader
+ managerGetPeer p mgr >>= \case
Nothing -> return "[unknown]"
- Just o -> return $ fromString $ show (onlinePeerAddress o)
+ Just o -> return . cs . show $ onlinePeerAddress o
diff --git a/src/Network/Haskoin/Node/Chain/Logic.hs b/src/Network/Haskoin/Node/Chain/Logic.hs
new file mode 100644
index 0000000..ea34e7f
--- /dev/null
+++ b/src/Network/Haskoin/Node/Chain/Logic.hs
@@ -0,0 +1,284 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE UndecidableInstances #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+{-|
+Module : Network.Haskoin.Node.Chain.Logic
+Copyright : No rights reserved
+License : UNLICENSE
+Maintainer : xenog@protonmail.com
+Stability : experimental
+Portability : POSIX
+
+State and code for block header synchronization.
+-}
+module Network.Haskoin.Node.Chain.Logic where
+
+import Control.Arrow
+import Control.Monad
+import Control.Monad.Except
+import Control.Monad.Reader
+import Control.Monad.Trans.Maybe
+import qualified Data.ByteString as B
+import Data.Default
+import Data.List
+import Data.Maybe
+import Data.Serialize as S
+import Data.Time.Clock
+import Data.Time.Clock.POSIX
+import Data.Word
+import Database.RocksDB (DB)
+import qualified Database.RocksDB as R
+import Database.RocksDB.Query as R
+import Haskoin
+import Network.Haskoin.Node.Common
+import UnliftIO
+import UnliftIO.Resource
+
+-- | Version of the database.
+dataVersion :: Word32
+dataVersion = 1
+
+-- | Database key for version.
+data ChainDataVersionKey = ChainDataVersionKey
+ deriving (Eq, Ord, Show)
+
+instance Key ChainDataVersionKey
+instance KeyValue ChainDataVersionKey Word32
+
+instance Serialize ChainDataVersionKey where
+ get = do
+ guard . (== 0x92) =<< S.getWord8
+ return ChainDataVersionKey
+ put ChainDataVersionKey = S.putWord8 0x92
+
+-- | Mutable state for the header chain process.
+data ChainState p = ChainState
+ { syncingPeer :: !(Maybe (p, UTCTime))
+ -- ^ peer to sync against and time of last received message
+ , newPeers :: ![p]
+ -- ^ queue of peers to sync against
+ , mySynced :: !Bool
+ -- ^ has the header chain ever been considered synced?
+ }
+
+-- | Key for block header in database.
+newtype BlockHeaderKey = BlockHeaderKey BlockHash deriving (Eq, Show)
+
+instance Serialize BlockHeaderKey where
+ get = do
+ guard . (== 0x90) =<< getWord8
+ BlockHeaderKey <$> get
+ put (BlockHeaderKey bh) = do
+ putWord8 0x90
+ put bh
+
+-- | Key for best block in database.
+data BestBlockKey = BestBlockKey deriving (Eq, Show)
+
+instance KeyValue BlockHeaderKey BlockNode
+instance KeyValue BestBlockKey BlockNode
+
+instance Serialize BestBlockKey where
+ get = do
+ guard . (== 0x91) =<< getWord8
+ return BestBlockKey
+ put BestBlockKey = putWord8 0x91
+
+-- | Type alias for monad commonly used in this module.
+type MonadChainLogic a p m
+ = (BlockHeaders m, MonadReader (ChainReader a p) m)
+
+-- | Reader for header synchronization code.
+data ChainReader a p = ChainReader
+ { myReader :: !a
+ -- ^ placeholder for upstream data
+ , myChainDB :: !DB
+ -- ^ database handle
+ , chainState :: !(TVar (ChainState p))
+ -- ^ mutable state for header synchronization
+ }
+
+instance (Monad m, MonadIO m, MonadReader (ChainReader a p) m) =>
+ BlockHeaders m where
+ addBlockHeader bn = do
+ db <- asks myChainDB
+ R.insert db (BlockHeaderKey (headerHash (nodeHeader bn))) bn
+ getBlockHeader bh = do
+ db <- asks myChainDB
+ retrieve db def (BlockHeaderKey bh)
+ getBestBlockHeader = do
+ db <- asks myChainDB
+ retrieve db def BestBlockKey >>= \case
+ Nothing -> error "Could not get best block from database"
+ Just b -> return b
+ setBestBlockHeader bn = do
+ db <- asks myChainDB
+ R.insert db BestBlockKey bn
+ addBlockHeaders bns = do
+ db <- asks myChainDB
+ writeBatch db (map f bns)
+ where
+ f bn = insertOp (BlockHeaderKey (headerHash (nodeHeader bn))) bn
+
+-- | Initialize header database. If version is different from current, the
+-- database is purged of conflicting elements first.
+initChainDB :: (MonadChainLogic a p m, MonadUnliftIO m) => Network -> m ()
+initChainDB net = do
+ db <- asks myChainDB
+ ver <- retrieve db def ChainDataVersionKey
+ when (ver /= Just dataVersion) purgeChainDB
+ R.insert db ChainDataVersionKey dataVersion
+ retrieve db def BestBlockKey >>= \b ->
+ when (isNothing (b :: Maybe BlockNode)) $ do
+ addBlockHeader (genesisNode net)
+ setBestBlockHeader (genesisNode net)
+
+-- | Purge database of elements having keys that may conflict with those used in
+-- this module.
+purgeChainDB :: (MonadChainLogic a p m, MonadUnliftIO m) => m ()
+purgeChainDB = do
+ db <- asks myChainDB
+ runResourceT . R.withIterator db def $ \it -> do
+ R.iterSeek it $ B.singleton 0x90
+ recurse_delete it db
+ where
+ recurse_delete it db =
+ R.iterKey it >>= \case
+ Nothing -> return ()
+ Just k
+ | B.head k == 0x90 || B.head k == 0x91 -> do
+ R.delete db def k
+ R.iterNext it
+ recurse_delete it db
+ | otherwise -> return ()
+
+-- | Import a bunch of continuous headers. Returns 'True' if the number of
+-- headers is 2000, which means that there are possibly more headers to sync
+-- from whatever peer delivered these.
+importHeaders ::
+ (MonadIO m, BlockHeaders m)
+ => Network
+ -> [BlockHeader]
+ -> m (Either PeerException Bool)
+importHeaders net hs =
+ runExceptT $ do
+ now <- fromIntegral <$> computeTime
+ lift (connectBlocks net now hs) >>= \case
+ Right _ ->
+ case length hs of
+ 2000 -> return False
+ _ -> return True
+ Left _ ->
+ throwError PeerSentBadHeaders
+
+-- | Check if best block header is in sync with the rest of the block chain by
+-- comparing the best block with the current time, verifying that there are no
+-- peers in the queue to be synced, and no peer is being synced at the moment.
+-- This function will only return 'True' once. It should be used to decide
+-- whether to notify other processes that the header chain has been synced. The
+-- state of the chain will be flipped to synced when this function returns
+-- 'True'.
+notifySynced :: (MonadIO m, MonadChainLogic a p m) => m Bool
+notifySynced =
+ fmap isJust $
+ runMaybeT $ do
+ bb <- getBestBlockHeader
+ now <- liftIO getCurrentTime
+ let bt =
+ posixSecondsToUTCTime . realToFrac . blockTimestamp $
+ nodeHeader bb
+ guard (diffUTCTime now bt < 2 * 60 * 60)
+ st <- asks chainState
+ MaybeT . atomically . runMaybeT $ do
+ s <- lift $ readTVar st
+ guard . isNothing $ syncingPeer s
+ guard . null $ newPeers s
+ guard . not $ mySynced s
+ lift $ writeTVar st s {mySynced = True}
+ return ()
+
+-- | Get next peer to sync against from the queue.
+nextPeer :: (MonadIO m, MonadChainLogic a p m) => m (Maybe p)
+nextPeer = listToMaybe . newPeers <$> (asks chainState >>= readTVarIO)
+
+-- | Set a syncing peer and generate a 'GetHeaders' data structure with a block
+-- locator to send to that peer for syncing.
+syncHeaders ::
+ (Eq p, MonadChainLogic a p m, MonadIO m)
+ => BlockNode
+ -> p
+ -> m GetHeaders
+syncHeaders bb p = do
+ st <- asks chainState
+ now <- liftIO getCurrentTime
+ atomically . modifyTVar st $ \s ->
+ s {syncingPeer = Just (p, now), newPeers = delete p (newPeers s)}
+ loc <- blockLocator bb
+ return
+ GetHeaders
+ { getHeadersVersion = myVersion
+ , getHeadersBL = loc
+ , getHeadersHashStop =
+ "0000000000000000000000000000000000000000000000000000000000000000"
+ }
+
+-- | Set the time of last received data to now if a syncing peer is active.
+setLastReceived :: (MonadChainLogic a p m, MonadIO m) => m ()
+setLastReceived = do
+ st <- asks chainState
+ now <- liftIO getCurrentTime
+ atomically . modifyTVar st $ \s ->
+ s {syncingPeer = second (const now) <$> syncingPeer s}
+
+-- | Add a new peer to the queue of peers to sync against.
+addPeer :: (Eq p, MonadIO m, MonadChainLogic a p m) => p -> m ()
+addPeer p = do
+ st <- asks chainState
+ atomically . modifyTVar st $ \s -> s {newPeers = nub (p : newPeers s)}
+
+-- | Get syncing peer if there is one.
+getSyncingPeer :: (MonadChainLogic a p m, MonadIO m) => m (Maybe p)
+getSyncingPeer = fmap fst . syncingPeer <$> (readTVarIO =<< asks chainState)
+
+-- | Set syncing peer to the pone provided.
+setSyncingPeer :: (MonadChainLogic a p m, MonadIO m) => p -> m ()
+setSyncingPeer p = do
+ now <- liftIO getCurrentTime
+ asks chainState >>= \v ->
+ atomically . modifyTVar v $ \s -> s {syncingPeer = Just (p, now)}
+
+-- | Return 'True' if the chain has ever been considered synced. it will always
+-- return 'True', even if the chain gets out of sync for any reason.
+isSynced :: (MonadChainLogic a p m, MonadIO m) => m Bool
+isSynced = mySynced <$> (asks chainState >>= readTVarIO)
+
+-- | Set chain as synced.
+setSynced :: (MonadChainLogic a p m, MonadIO m) => m ()
+setSynced =
+ asks chainState >>= \v ->
+ atomically . modifyTVar v $ \s -> s {mySynced = True}
+
+-- | Remove a peer from the queue of peers to sync and unset the syncing peer if
+-- it is set to the provided value.
+finishPeer :: (Eq p, MonadIO m, MonadChainLogic a p m) => p -> m ()
+finishPeer p =
+ asks chainState >>= \st ->
+ atomically . modifyTVar st $ \s ->
+ s
+ { newPeers = delete p (newPeers s)
+ , syncingPeer =
+ case syncingPeer s of
+ Just (p', _)
+ | p == p' -> Nothing
+ _ -> syncingPeer s
+ }
+
+-- | Return the syncing peer and time of last communication received, if any.
+lastMessage :: (MonadChainLogic a p m, MonadIO m) => m (Maybe (p, UTCTime))
+lastMessage = syncingPeer <$> (readTVarIO =<< asks chainState)
diff --git a/src/Network/Haskoin/Node/Common.hs b/src/Network/Haskoin/Node/Common.hs
index 64c185f..1bdb0fd 100644
--- a/src/Network/Haskoin/Node/Common.hs
+++ b/src/Network/Haskoin/Node/Common.hs
@@ -2,9 +2,26 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
+{-|
+Module : Network.Haskoin.Node.Common
+Copyright : No rights reserved
+License : UNLICENSE
+Maintainer : xenog@protonmail.com
+Stability : experimental
+Portability : POSIX
+
+Common functions used by Haskoin Node.
+-}
module Network.Haskoin.Node.Common where
-import Data.ByteString (ByteString)
+import Conduit
+import Control.Monad
+import Control.Monad.Trans.Maybe
+import Data.Conduit.Network
+import Data.Function
+import Data.List
+import Data.Maybe
+import Data.String.Conversions
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Word
@@ -13,287 +30,244 @@ import Network.Haskoin.Block
import Network.Haskoin.Constants
import Network.Haskoin.Network
import Network.Haskoin.Transaction
-import Network.Socket (AddrInfo (..), AddrInfoFlag (..),
- Family (..), NameInfoFlag (..),
- SockAddr (..), SocketType (..),
- addrAddress, defaultHints,
- getAddrInfo, getNameInfo)
+import Network.Socket hiding (send)
import NQE
+import System.Random
import Text.Read
import UnliftIO
+-- | Type alias for a combination of hostname and port.
type HostPort = (Host, Port)
+
+-- | Type alias for a hostname.
type Host = String
+
+-- | Type alias for a port number.
type Port = Int
-- | Data structure representing an online peer.
data OnlinePeer = OnlinePeer
- { onlinePeerAddress :: !SockAddr
+ { onlinePeerAddress :: !SockAddr
-- ^ network address
- , onlinePeerConnected :: !Bool
- -- ^ has it finished handshake
- , onlinePeerVersion :: !Word32
+ , onlinePeerVerAck :: !Bool
+ -- ^ got version acknowledgement from peer
+ , onlinePeerConnected :: !Bool
+ -- ^ peer is connected and ready
+ , onlinePeerVersion :: !(Maybe Version)
-- ^ protocol version
- , onlinePeerServices :: !Word64
- -- ^ services field
- , onlinePeerRemoteNonce :: !Word64
- -- ^ random nonce sent by peer
- , onlinePeerUserAgent :: !ByteString
- -- ^ user agent string
- , onlinePeerRelay :: !Bool
- -- ^ peer will relay transactions (BIP-37)
- , onlinePeerBestBlock :: !BlockNode
- -- ^ estimated best block that peer has
- , onlinePeerAsync :: !(Async ())
+ , onlinePeerAsync :: !(Async ())
-- ^ peer asynchronous process
- , onlinePeerMailbox :: !Peer
+ , onlinePeerMailbox :: !Peer
-- ^ peer mailbox
- , onlinePeerNonce :: !Word64
+ , onlinePeerNonce :: !Word64
-- ^ random nonce sent during handshake
- , onlinePeerPings :: ![NominalDiffTime]
+ , onlinePeerPing :: !(Maybe (UTCTime, Word64))
+ -- ^ last sent ping time and nonce
+ , onlinePeerPings :: ![NominalDiffTime]
-- ^ last few ping rountrip duration
}
--- | Mailbox for a peer process.
-type Peer = Inbox PeerMessage
+instance Eq OnlinePeer where
+ (==) = (==) `on` f
+ where
+ f OnlinePeer {onlinePeerMailbox = p} = p
+
+instance Ord OnlinePeer where
+ compare = compare `on` f
+ where
+ f OnlinePeer {onlinePeerPings = pings} = fromMaybe 60 (median pings)
--- | Mailbox for chain headers process.
-type Chain = Inbox ChainMessage
+-- | Mailbox for a peer.
+type Peer = Mailbox PeerMessage
+
+-- | Mailbox for chain header syncing process.
+type Chain = Mailbox ChainMessage
-- | Mailbox for peer manager process.
-type Manager = Inbox ManagerMessage
+type Manager = Mailbox ManagerMessage
--- | Node configuration. Mailboxes for manager and chain processes must be
--- created before launching the node. The node will start those processes and
--- receive any messages sent to those mailboxes.
+-- | General node configuration.
data NodeConfig = NodeConfig
- { maxPeers :: !Int
+ { nodeConfMaxPeers :: !Int
-- ^ maximum number of connected peers allowed
- , database :: !DB
- -- ^ RocksDB database handler
- , initPeers :: ![HostPort]
+ , nodeConfDB :: !DB
+ -- ^ database handler
+ , nodeConfPeers :: ![HostPort]
-- ^ static list of peers to connect to
- , discover :: !Bool
+ , nodeConfDiscover :: !Bool
-- ^ activate peer discovery
- , nodeEvents :: !(Listen NodeEvent)
- -- ^ listener for events originated by the node
- , netAddress :: !NetworkAddress
+ , nodeConfNetAddr :: !NetworkAddress
-- ^ network address for the local host
- , nodeNet :: !Network
+ , nodeConfNet :: !Network
-- ^ network constants
+ , nodeConfEvents :: !(Publisher NodeEvent)
+ -- ^ node events are sent to this publisher
+ , nodeConfTimeout :: !Int
+ -- ^ timeout in seconds
}
--- | Peer manager configuration. Mailbox must be created before starting the
--- process.
+-- | Peer manager configuration.
data ManagerConfig = ManagerConfig
- { mgrConfMaxPeers :: !Int
+ { mgrConfMaxPeers :: !Int
-- ^ maximum number of peers to connect to
- , mgrConfDB :: !DB
- -- ^ RocksDB database handler to store peer information
- , mgrConfPeers :: ![HostPort]
+ , mgrConfDB :: !DB
+ -- ^ database handler to store peer information
+ , mgrConfPeers :: ![HostPort]
-- ^ static list of peers to connect to
- , mgrConfDiscover :: !Bool
+ , mgrConfDiscover :: !Bool
-- ^ activate peer discovery
- , mgrConfMgrListener :: !(Listen ManagerEvent)
- -- ^ listener for events originating from peer manager
- , mgrConfPeerListener :: !(Listen (Peer, PeerEvent))
- -- ^ listener for events originating from individual peers
- , mgrConfNetAddr :: !NetworkAddress
+ , mgrConfNetAddr :: !NetworkAddress
-- ^ network address for the local host
- , mgrConfManager :: !Manager
- -- ^ peer manager mailbox
- , mgrConfChain :: !Chain
- -- ^ chain process mailbox
- , mgrConfNetwork :: !Network
+ , mgrConfNetwork :: !Network
-- ^ network constants
+ , mgrConfEvents :: !(Listen PeerEvent)
+ -- ^ send manager and peer messages to this mailbox
+ , mgrConfTimeout :: !Int
+ -- ^ timeout in seconds
}
--- | Event originating from the node. Aggregates events from the peer manager,
--- chain, and any connected peers.
-data NodeEvent
- = ManagerEvent !ManagerEvent
- -- ^ event originating from peer manager
- | ChainEvent !ChainEvent
- -- ^ event originating from chain process
- | PeerEvent !(Peer, PeerEvent)
- -- ^ event originating from a peer
-
--- | Peer manager event.
-data ManagerEvent
- = ManagerConnect !Peer
- -- ^ a new peer connected and its handshake completed
- | ManagerDisconnect !Peer
- -- ^ a peer disconnected
-
-- | Messages that can be sent to the peer manager.
data ManagerMessage
- = ManagerSetFilter !BloomFilter
- -- ^ set a bloom filter in all peers
- | ManagerSetBest !BlockNode
- -- ^ set our best block
- | ManagerPing
- -- ^ internal timer signal that triggers housekeeping tasks
- | ManagerGetAddr !Peer
- -- ^ peer requests all peers we know about
- | ManagerNewPeers !Peer
- ![NetworkAddressTime]
- -- ^ peer sent list of peers it knows about
+ = ManagerConnect
+ -- ^ try to connect to peers
| ManagerKill !PeerException
!Peer
- -- ^ please kill this peer with supplied exception
- | ManagerSetPeerBest !Peer
- !BlockNode
- -- ^ set best block for this peer
- | ManagerGetPeerBest !Peer
- !(Reply (Maybe BlockNode))
- -- ^ get best block that manager thinks peer has
- | ManagerSetPeerVersion !Peer
- !Version
- -- ^ set version for this peer
- | ManagerGetPeerVersion !Peer
- !(Reply (Maybe Word32))
- -- ^ get protocol version for this peer
- | ManagerGetPeers !(Reply [OnlinePeer])
+ -- ^ kill this peer with supplied exception
+ | ManagerGetPeers !(Listen [OnlinePeer])
-- ^ get all connected peers
- | ManagerGetOnlinePeer !Peer !(Reply (Maybe OnlinePeer))
+ | ManagerGetOnlinePeer !Peer !(Listen (Maybe OnlinePeer))
-- ^ get a peer information
- | ManagerPeerPing !Peer
- !NominalDiffTime
- -- ^ add a peer roundtrip time for this peer
- | PeerStopped !(Async (), Either SomeException ())
- -- ^ peer corresponding to 'Async' has stopped
-
--- | Configuration for the chain process.
+ | ManagerPurgePeers
+ -- ^ delete all known peers
+ | ManagerCheckPeer !Peer
+ -- ^ check this peer
+ | ManagerPeerMessage !Peer !Message
+ -- ^ peer got a message that is forwarded to manager
+ | ManagerPeerDied !Child !(Maybe SomeException)
+ -- ^ child died
+ | ManagerBestBlock !BlockHeight
+ -- ^ set this as our best block
+
+-- | Configuration for chain syncing process.
data ChainConfig = ChainConfig
- { chainConfDB :: !DB
+ { chainConfDB :: !DB
-- ^ RocksDB database handle
- , chainConfListener :: !(Listen ChainEvent)
- -- ^ listener for events originating from the chain process
- , chainConfManager :: !Manager
+ , chainConfManager :: !Manager
-- ^ peer manager mailbox
- , chainConfChain :: !Chain
- -- ^ chain process mailbox
- , chainConfNetwork :: !Network
+ , chainConfNetwork :: !Network
-- ^ network constants
+ , chainConfEvents :: !(Listen ChainEvent)
+ -- ^ send header chain events here
+ , chainConfTimeout :: !Int
+ -- ^ timeout in seconds
}
-- | Messages that can be sent to the chain process.
data ChainMessage
- = ChainNewHeaders !Peer
- ![BlockHeaderCount]
- -- ^ peer sent some block headers
- | ChainNewPeer !Peer
- -- ^ a new peer connected
- | ChainRemovePeer !Peer
- -- ^ a peer disconnected
- | ChainGetBest !(Reply BlockNode)
+ = ChainGetBest !(Listen BlockNode)
-- ^ get best block known
+ | ChainHeaders !Peer
+ ![BlockHeader]
| ChainGetAncestor !BlockHeight
!BlockNode
- !(Reply (Maybe BlockNode))
+ !(Listen (Maybe BlockNode))
-- ^ get ancestor for 'BlockNode' at 'BlockHeight'
| ChainGetSplit !BlockNode
!BlockNode
- !(Reply BlockNode)
+ !(Listen BlockNode)
-- ^ get highest common node
| ChainGetBlock !BlockHash
- !(Reply (Maybe BlockNode))
+ !(Listen (Maybe BlockNode))
-- ^ get a block header
- | ChainNewBlocks !Peer ![BlockHash]
- -- ^ peer sent block inventory
- | ChainSendHeaders !Peer
- -- ^ peer asks for our block headers in the future
- | ChainIsSynced !(Reply Bool)
+ | ChainIsSynced !(Listen Bool)
-- ^ is chain in sync with network?
-
--- | Events originating from chain process.
+ | ChainPing
+ -- ^ internal message for process housekeeping
+ | ChainPeerConnected !Peer
+ !SockAddr
+ -- ^ internal message to notify that a peer has connected
+ | ChainPeerDisconnected !Peer
+ !SockAddr
+ -- ^ internal message to notify that a peer has disconnected
+
+-- | Events originating from chain syncing process.
data ChainEvent
- = ChainNewBest !BlockNode
+ = ChainBestBlock !BlockNode
-- ^ chain has new best block
| ChainSynced !BlockNode
-- ^ chain is in sync with the network
deriving (Eq, Show)
+-- | Chain and peer events generated by the node.
+data NodeEvent
+ = ChainEvent !ChainEvent
+ -- ^ events from the chain syncing process
+ | PeerEvent !PeerEvent
+ -- ^ events from peers and peer manager
+ deriving Eq
+
-- | Configuration for a particular peer.
data PeerConfig = PeerConfig
- { peerConfConnect :: !NetworkAddress
- -- ^ address of remote peer
- , peerConfLocal :: !NetworkAddress
- -- ^ our address to send to peer
- , peerConfManager :: !Manager
- -- ^ peer manager mailbox
- , peerConfChain :: !Chain
- -- ^ chain process mailbox
- , peerConfListener :: !(Listen (Peer, PeerEvent))
- -- ^ listener for peer events
- , peerConfNonce :: !Word64
- -- ^ our random nonce to send to peer
- , peerConfNetwork :: !Network
+ { peerConfListen :: !(Publisher Message)
+ -- ^ Send peer messages to publisher
+ , peerConfNetwork :: !Network
-- ^ network constants
+ , peerConfAddress :: !SockAddr
+ -- ^ peer address
}
-- | Reasons why a peer may stop working.
data PeerException
= PeerMisbehaving !String
- -- ^ peer was a naughty boy
- | DecodeMessageError !String
- -- ^ incoming message could not be decoded
- | CannotDecodePayload !String
+ -- ^ peer is being a naughty boy
+ | DuplicateVersion
+ -- ^ peer sent an extra version message
+ | DecodeHeaderError
+ -- ^ incoming message headers could not be decoded
+ | CannotDecodePayload
-- ^ incoming message payload could not be decoded
| PeerIsMyself
-- ^ nonce for peer matches ours
| PayloadTooLarge !Word32
-- ^ message payload too large
| PeerAddressInvalid
- -- ^ peer address did not parse with 'fromSockAddr'
- | BloomFiltersNotSupported
- -- ^ peer does not support bloom filters
+ -- ^ peer address not valid
| PeerSentBadHeaders
-- ^ peer sent wrong headers
| NotNetworkPeer
- -- ^ peer is SPV and cannot serve blockchain data
+ -- ^ peer cannot serve block chain data
| PeerNoSegWit
-- ^ peer has no segwit support
| PeerTimeout
-- ^ request to peer timed out
+ | PurgingPeer
+ -- ^ peers are being purged
+ | UnknownPeer
+ -- ^ peer is unknown
deriving (Eq, Show)
instance Exception PeerException
--- | Events originating from a peer.
+-- | Events originating from peers and the peer manager.
data PeerEvent
- = TxAvail ![TxHash]
- -- ^ peer sent transaction inventory
- | GotBlock !Block
- -- ^ peer sent a 'Block'
- | GotMerkleBlock !MerkleBlock
- -- ^ peer sent a 'MerkleBlock'
- | GotTx !Tx
- -- ^ peer sent a 'Tx'
- | GotPong !Word64
- -- ^ peer responded to a 'Ping'
- | SendBlocks !GetBlocks
- -- ^ peer is requesting some blocks
- | SendHeaders !GetHeaders
- -- ^ peer is requesting some headers
- | SendData ![InvVector]
- -- ^ per is requesting an inventory
- | TxNotFound !TxHash
- -- ^ peer could not find transaction
- | BlockNotFound !BlockHash
- -- ^ peer could not find block
- | WantMempool
- -- ^ peer wants our mempool
- | Rejected !Reject
- -- ^ peer rejected something we sent
-
--- | Internal type for peer messages.
+ = PeerConnected !Peer
+ !SockAddr
+ -- ^ new peer connected
+ | PeerDisconnected !Peer
+ !SockAddr
+ -- ^ peer disconnected
+ | PeerMessage !Peer
+ !Message
+ -- ^ peer sent a message
+ deriving Eq
+
+-- | Incoming messages that a peer accepts.
data PeerMessage
- = PeerOutgoing !Message
- | PeerIncoming !Message
+ = GetPublisher !(Listen (Publisher Message))
+ | SendMessage !Message
--- | Convert a host and port into a list of matching 'SockAddr'.
-toSockAddr :: (MonadUnliftIO m) => HostPort -> m [SockAddr]
+-- | Resolve a host and port to a list of 'SockAddr'. May make use DNS resolver.
+toSockAddr :: MonadUnliftIO m => HostPort -> m [SockAddr]
toSockAddr (host, port) = go `catch` e
where
go =
@@ -310,141 +284,178 @@ toSockAddr (host, port) = go `catch` e
e :: Monad m => SomeException -> m [SockAddr]
e _ = return []
--- | Convert a 'SockAddr' into a host and port.
+-- | Convert a 'SockAddr' to a a numeric host and port.
fromSockAddr ::
(MonadUnliftIO m) => SockAddr -> m (Maybe HostPort)
fromSockAddr sa = go `catch` e
where
go = do
- (hostM, portM) <- liftIO (getNameInfo flags True True sa)
- return $ (,) <$> hostM <*> (readMaybe =<< portM)
+ (maybe_host, maybe_port) <- liftIO (getNameInfo flags True True sa)
+ return $ (,) <$> maybe_host <*> (readMaybe =<< maybe_port)
flags = [NI_NUMERICHOST, NI_NUMERICSERV]
e :: Monad m => SomeException -> m (Maybe a)
e _ = return Nothing
-- | Integer current time in seconds from 1970-01-01T00:00Z.
-computeTime :: MonadIO m => m Word32
+computeTime :: MonadIO m => m Word64
computeTime = round <$> liftIO getPOSIXTime
-- | Our protocol version.
myVersion :: Word32
myVersion = 70012
--- | Set best block in the manager.
-managerSetBest :: MonadIO m => BlockNode -> Manager -> m ()
-managerSetBest bn mgr = ManagerSetBest bn `send` mgr
-
--- | Set version of peer in manager.
-managerSetPeerVersion :: MonadIO m => Peer -> Version -> Manager -> m ()
-managerSetPeerVersion p v mgr = ManagerSetPeerVersion p v `send` mgr
-
--- | Get version of peer from manager.
-managerGetPeerVersion :: MonadIO m => Peer -> Manager -> m (Maybe Word32)
-managerGetPeerVersion p mgr = ManagerGetPeerVersion p `query` mgr
+-- | Internal function used by peer to send a message to the peer manager.
+managerPeerMessage :: MonadIO m => Peer -> Message -> Manager -> m ()
+managerPeerMessage p msg mgr = ManagerPeerMessage p msg `send` mgr
--- | Get best block for peer from manager.
-managerGetPeerBest :: MonadIO m => Peer -> Manager -> m (Maybe BlockNode)
-managerGetPeerBest p mgr = ManagerGetPeerBest p `query` mgr
-
--- | Set best block for peer in manager.
-managerSetPeerBest :: MonadIO m => Peer -> BlockNode -> Manager -> m ()
-managerSetPeerBest p bn mgr = ManagerSetPeerBest p bn `send` mgr
-
--- | Get list of peers from manager.
-managerGetPeers :: MonadIO m => Manager -> m [OnlinePeer]
+-- | Get list of connected peers from manager.
+managerGetPeers ::
+ MonadIO m => Manager -> m [OnlinePeer]
managerGetPeers mgr = ManagerGetPeers `query` mgr
--- | Get peer information for a peer from manager.
-managerGetPeer :: MonadIO m => Manager -> Peer -> m (Maybe OnlinePeer)
-managerGetPeer mgr p = ManagerGetOnlinePeer p `query` mgr
-
--- | Ask manager to send all known peers to a peer.
-managerGetAddr :: MonadIO m => Peer -> Manager -> m ()
-managerGetAddr p mgr = ManagerGetAddr p `send` mgr
+-- | Get information for an online peer from manager.
+managerGetPeer :: MonadIO m => Peer -> Manager -> m (Maybe OnlinePeer)
+managerGetPeer p mgr = ManagerGetOnlinePeer p `query` mgr
-- | Ask manager to kill a peer with the provided exception.
managerKill :: MonadIO m => PeerException -> Peer -> Manager -> m ()
managerKill e p mgr = ManagerKill e p `send` mgr
--- | Peer sends manager list of known peers.
-managerNewPeers ::
- MonadIO m => Peer -> [NetworkAddressTime] -> Manager -> m ()
-managerNewPeers p as mgr = ManagerNewPeers p as `send` mgr
+-- | Internal function used by manager to check peers periodically.
+managerCheck :: MonadIO m => Peer -> Manager -> m ()
+managerCheck p mgr = ManagerCheckPeer p `send` mgr
+
+-- | Internal function used to ask manager to connect to a new peer.
+managerConnect :: MonadIO m => Manager -> m ()
+managerConnect mgr = ManagerConnect `send` mgr
--- | Set bloom filters in peer manager.
-setManagerFilter :: MonadIO m => BloomFilter -> Manager -> m ()
-setManagerFilter bf mgr = ManagerSetFilter bf `send` mgr
+-- | Set the best block that the manager knows about.
+managerSetBest :: MonadIO m => BlockHeight -> Manager -> m ()
+managerSetBest bh mgr = ManagerBestBlock bh `send` mgr
-- | Send a network message to peer.
sendMessage :: MonadIO m => Message -> Peer -> m ()
-sendMessage msg p = PeerOutgoing msg `send` p
+sendMessage msg p = SendMessage msg `send` p
--- | Upload bloom filter to remote peer.
-peerSetFilter :: MonadIO m => BloomFilter -> Peer -> m ()
-peerSetFilter f p = MFilterLoad (FilterLoad f) `sendMessage` p
-
--- | Request Merkle blocks from peer.
-getMerkleBlocks ::
- (MonadIO m)
- => Peer
- -> [BlockHash]
- -> m ()
-getMerkleBlocks p bhs = PeerOutgoing (MGetData (GetData ivs)) `send` p
- where
- ivs = map (InvVector InvMerkleBlock . getBlockHash) bhs
-
--- | Request full blocks from peer.
+-- | Request full blocks from peer. Will return 'Nothing' if the list of blocks
+-- returned by the peer is incomplete, comes out of order, or a timeout is
+-- reached.
peerGetBlocks ::
- MonadIO m => Network -> Peer -> [BlockHash] -> m ()
-peerGetBlocks net p bhs = PeerOutgoing (MGetData (GetData ivs)) `send` p
+ MonadUnliftIO m
+ => Network
+ -> Int
+ -> Peer
+ -> [BlockHash]
+ -> m (Maybe [Block])
+peerGetBlocks net time p bhs =
+ runMaybeT $ mapM f =<< MaybeT (peerGetData time p (GetData ivs))
where
- con
+ f (Right b) = return b
+ f (Left _) = MaybeT $ return Nothing
+ c
| getSegWit net = InvWitnessBlock
| otherwise = InvBlock
- ivs = map (InvVector con . getBlockHash) bhs
+ ivs = map (InvVector c . getBlockHash) bhs
--- | Request transactions from peer.
-peerGetTxs :: MonadIO m => Network -> Peer -> [TxHash] -> m ()
-peerGetTxs net p ths = PeerOutgoing (MGetData (GetData ivs)) `send` p
+-- | Request transactions from peer. Will return 'Nothing' if the list of
+-- transactions returned by the peer is incomplete, comes out of order, or a
+-- timeout is reached.
+peerGetTxs ::
+ MonadUnliftIO m
+ => Network
+ -> Int
+ -> Peer
+ -> [TxHash]
+ -> m (Maybe [Tx])
+peerGetTxs net time p ths =
+ runMaybeT $ mapM f =<< MaybeT (peerGetData time p (GetData ivs))
where
- con
+ f (Right _) = MaybeT $ return Nothing
+ f (Left t) = return t
+ c
| getSegWit net = InvWitnessTx
| otherwise = InvTx
- ivs = map (InvVector con . getTxHash) ths
-
--- | Build my version structure.
+ ivs = map (InvVector c . getTxHash) ths
+
+-- | Request transactions and/or blocks from peer. Return maybe if any single
+-- inventory fails to be retrieved, if they come out of order, or if timeout is
+-- reached.
+peerGetData ::
+ MonadUnliftIO m => Int -> Peer -> GetData -> m (Maybe [Either Tx Block])
+peerGetData time p gd@(GetData ivs) =
+ runMaybeT $ do
+ pub <- MaybeT $ queryS time GetPublisher p
+ MaybeT $
+ withSubscription pub $ \sub -> do
+ MGetData gd `sendMessage` p
+ r <- liftIO randomIO
+ MPing (Ping r) `sendMessage` p
+ join <$>
+ timeout
+ (time * 1000 * 1000)
+ (runMaybeT (get_thing sub r [] ivs))
+ where
+ get_thing _ _ acc [] = return $ reverse acc
+ get_thing sub r acc hss@(InvVector t h:hs) =
+ receive sub >>= \case
+ MTx tx
+ | is_tx t && getTxHash (txHash tx) == h ->
+ get_thing sub r (Left tx : acc) hs
+ MBlock b@(Block bh _)
+ | is_block t && getBlockHash (headerHash bh) == h ->
+ get_thing sub r (Right b : acc) hs
+ MNotFound (NotFound nvs)
+ | not (null (nvs `union` hs)) -> MaybeT $ return Nothing
+ MPong (Pong r')
+ | r == r' -> MaybeT $ return Nothing
+ _
+ | null acc -> get_thing sub r acc hss
+ | otherwise -> MaybeT $ return Nothing
+ is_tx InvWitnessTx = True
+ is_tx InvTx = True
+ is_tx _ = False
+ is_block InvWitnessBlock = True
+ is_block InvBlock = True
+ is_block _ = False
+
+-- | Ping a peer and await response. Return 'False' if response not received
+-- before timeout.
+peerPing :: MonadUnliftIO m => Int -> Peer -> m Bool
+peerPing time p =
+ fmap isJust . runMaybeT $ do
+ pub <- MaybeT $ queryS time GetPublisher p
+ MaybeT $
+ withSubscription pub $ \sub -> do
+ r <- liftIO randomIO
+ MPing (Ping r) `sendMessage` p
+ receiveMatchS time sub $ \case
+ MPong (Pong r')
+ | r == r' -> Just ()
+ _ -> Nothing
+
+-- | Create version data structure.
buildVersion ::
- MonadIO m
- => Network
+ Network
-> Word64
-> BlockHeight
-> NetworkAddress
-> NetworkAddress
- -> m Version
-buildVersion net nonce height loc rmt = do
- time <- fromIntegral <$> computeTime
- return
- Version
- { version = myVersion
- , services = naServices loc
- , timestamp = time
- , addrRecv = rmt
- , addrSend = loc
- , verNonce = nonce
- , userAgent = VarString (getHaskoinUserAgent net)
- , startHeight = height
- , relay = True
- }
-
--- | Notify chain of a new peer that connected.
-chainNewPeer :: MonadIO m => Peer -> Chain -> m ()
-chainNewPeer p ch = ChainNewPeer p `send` ch
-
--- | Notify chain that a peer has disconnected.
-chainRemovePeer :: MonadIO m => Peer -> Chain -> m ()
-chainRemovePeer p ch = ChainRemovePeer p `send` ch
-
--- | Get a block header from chain process.
+ -> Word64
+ -> Version
+buildVersion net nonce height loc rmt time =
+ Version
+ { version = myVersion
+ , services = naServices loc
+ , timestamp = time
+ , addrRecv = rmt
+ , addrSend = loc
+ , verNonce = nonce
+ , userAgent = VarString (getHaskoinUserAgent net)
+ , startHeight = height
+ , relay = True
+ }
+
+-- | Get a block header from 'Chain' process.
chainGetBlock :: MonadIO m => BlockHash -> Chain -> m (Maybe BlockNode)
chainGetBlock bh ch = ChainGetBlock bh `query` ch
@@ -475,6 +486,14 @@ chainGetSplitBlock ::
MonadIO m => BlockNode -> BlockNode -> Chain -> m BlockNode
chainGetSplitBlock l r c = ChainGetSplit l r `query` c
+-- | Notify chain that a new peer is connected.
+chainPeerConnected :: MonadIO m => Peer -> SockAddr -> Chain -> m ()
+chainPeerConnected p a ch = ChainPeerConnected p a `send` ch
+
+-- | Notify chain that a peer has disconnected.
+chainPeerDisconnected :: MonadIO m => Peer -> SockAddr -> Chain -> m ()
+chainPeerDisconnected p a ch = ChainPeerDisconnected p a `send` ch
+
-- | Is given 'BlockHash' in the main chain?
chainBlockMain :: MonadIO m => BlockHash -> Chain -> m Bool
chainBlockMain bh ch =
@@ -486,3 +505,25 @@ chainBlockMain bh ch =
-- | Is chain in sync with network?
chainIsSynced :: MonadIO m => Chain -> m Bool
chainIsSynced ch = ChainIsSynced `query` ch
+
+-- | Peer sends a bunch of headers to the chain process.
+chainHeaders :: MonadIO m => Peer -> [BlockHeader] -> Chain -> m ()
+chainHeaders p hs ch = ChainHeaders p hs `send` ch
+
+-- | Connect to a socket via TCP.
+withConnection ::
+ MonadUnliftIO m => SockAddr -> (AppData -> m a) -> m a
+withConnection na f =
+ fromSockAddr na >>= \case
+ Nothing -> throwIO PeerAddressInvalid
+ Just (host, port) ->
+ let cset = clientSettings port (cs host)
+ in runGeneralTCPClient cset f
+
+-- | Calculate the median value from a list. The list must not be empty.
+median :: Fractional a => [a] -> Maybe a
+median ls
+ | null ls = Nothing
+ | length ls `mod` 2 == 0 =
+ Just . (/ 2) . sum . take 2 $ drop (length ls `div` 2 - 1) ls
+ | otherwise = Just . head $ drop (length ls `div` 2) ls
diff --git a/src/Network/Haskoin/Node/Manager.hs b/src/Network/Haskoin/Node/Manager.hs
index d580a22..7172637 100644
--- a/src/Network/Haskoin/Node/Manager.hs
+++ b/src/Network/Haskoin/Node/Manager.hs
@@ -1,594 +1,380 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
+{-|
+Module : Network.Haskoin.Node.Manager
+Copyright : No rights reserved
+License : UNLICENSE
+Maintainer : xenog@protonmail.com
+Stability : experimental
+Portability : POSIX
+
+Peer manager process.
+-}
module Network.Haskoin.Node.Manager
( manager
) where
+import Conduit
import Control.Applicative
import Control.Monad
import Control.Monad.Except
import Control.Monad.Logger
import Control.Monad.Reader
-import Data.Bits
-import qualified Data.ByteString as BS
-import Data.Conduit
-import qualified Data.Conduit.Combinators as CC
-import Data.Function
-import Data.List
-import Data.Maybe
-import Data.Serialize (Get, Put, Serialize, get, put)
-import qualified Data.Serialize as S
-import Data.String
+import Control.Monad.Trans.Maybe
import Data.String.Conversions
-import Data.Word
-import Database.RocksDB (DB)
-import Database.RocksDB.Query
+import Data.Time.Clock
+import Database.RocksDB (DB)
import Network.Haskoin.Block
import Network.Haskoin.Constants
import Network.Haskoin.Network
import Network.Haskoin.Node.Common
+import Network.Haskoin.Node.Manager.Logic
import Network.Haskoin.Node.Peer
-import Network.Socket (SockAddr (..))
+import Network.Socket (SockAddr (..))
import NQE
import System.Random
import UnliftIO
import UnliftIO.Concurrent
-import UnliftIO.Resource
-type MonadManager n m
- = ( MonadUnliftIO n
- , MonadLoggerIO n
- , MonadUnliftIO m
- , MonadLoggerIO m
- , MonadReader (ManagerReader n) m)
+-- | Monad used by most functions in this module.
+type MonadManager m = (MonadLoggerIO m, MonadReader ManagerReader m)
-data ManagerReader m = ManagerReader
- { mySelf :: !Manager
- , myChain :: !Chain
- , myConfig :: !ManagerConfig
- , myPeerDB :: !DB
- , myPeerSupervisor :: !(Inbox (SupervisorMessage m))
- , onlinePeers :: !(TVar [OnlinePeer])
- , myBloomFilter :: !(TVar (Maybe BloomFilter))
- , myBestBlock :: !(TVar BlockNode)
+-- | Reader for peer configuration and state.
+data ManagerReader = ManagerReader
+ { myConfig :: !ManagerConfig
+ , mySupervisor :: !Supervisor
+ , myMailbox :: !Manager
+ , myBestBlock :: !(TVar BlockHeight)
+ , onlinePeers :: !(TVar [OnlinePeer])
}
-data Priority
- = PriorityNetwork
- | PrioritySeed
- | PriorityManual
- deriving (Eq, Show, Ord)
-
-data PeerAddress
- = PeerAddress { getPeerAddress :: SockAddr }
- | PeerAddressBase
- deriving (Eq, Show)
-
-instance Serialize Priority where
- get =
- S.getWord8 >>= \case
- 0x00 -> return PriorityManual
- 0x01 -> return PrioritySeed
- 0x02 -> return PriorityNetwork
- _ -> mzero
- put PriorityManual = S.putWord8 0x00
- put PrioritySeed = S.putWord8 0x01
- put PriorityNetwork = S.putWord8 0x02
-
-instance Serialize PeerAddress where
- get = do
- guard . (== 0x81) =<< S.getWord8
- record <|> return PeerAddressBase
- where
- record = do
- getPeerAddress <- decodeSockAddr
- return PeerAddress {..}
- put PeerAddress {..} = do
- S.putWord8 0x81
- encodeSockAddr getPeerAddress
- put PeerAddressBase = S.putWord8 0x81
-
-data PeerTimeAddress
- = PeerTimeAddress { getPeerPrio :: !Priority
- , getPeerBanned :: !Word32
- , getPeerLastConnect :: !Word32
- , getPeerNextConnect :: !Word32
- , getPeerTimeAddress :: !PeerAddress }
- | PeerTimeAddressBase
- deriving (Eq, Show)
-
-instance Key PeerTimeAddress
-instance KeyValue PeerAddress PeerTimeAddress
-instance KeyValue PeerTimeAddress PeerAddress
-
-instance Serialize PeerTimeAddress where
- get = do
- guard . (== 0x80) =<< S.getWord8
- record <|> return PeerTimeAddressBase
- where
- record = do
- getPeerPrio <- S.get
- getPeerBanned <- S.get
- getPeerLastConnect <- (maxBound -) <$> S.get
- getPeerNextConnect <- S.get
- getPeerTimeAddress <- S.get
- return PeerTimeAddress {..}
- put PeerTimeAddress {..} = do
- S.putWord8 0x80
- S.put getPeerPrio
- S.put getPeerBanned
- S.put (maxBound - getPeerLastConnect)
- S.put getPeerNextConnect
- S.put getPeerTimeAddress
- put PeerTimeAddressBase = S.putWord8 0x80
-
-manager :: (MonadUnliftIO m, MonadLoggerIO m) => ManagerConfig -> m ()
-manager cfg = do
- psup <- newInbox =<< newTQueueIO
- withAsync (supervisor (Notify dead) psup []) $ \sup -> do
- link sup
- bb <- chainGetBest $ mgrConfChain cfg
- opb <- newTVarIO []
- bfb <- newTVarIO Nothing
- bbb <- newTVarIO bb
- withConnectLoop (mgrConfManager cfg) $ do
- let rd =
- ManagerReader
- { mySelf = mgrConfManager cfg
- , myChain = mgrConfChain cfg
- , myConfig = cfg
- , myPeerDB = mgrConfDB cfg
- , myPeerSupervisor = psup
- , onlinePeers = opb
- , myBloomFilter = bfb
- , myBestBlock = bbb
- }
- run `runReaderT` rd
+-- | Peer Manager process. In order to fully start it needs to receive a
+-- 'ManageBestBlock' event.
+manager ::
+ (MonadUnliftIO m, MonadLoggerIO m)
+ => ManagerConfig
+ -> Inbox ManagerMessage
+ -> m ()
+manager cfg inbox =
+ withSupervisor (Notify f) $ \sup -> do
+ bb <- newTVarIO 0
+ ob <- newTVarIO []
+ let rd =
+ ManagerReader
+ { myConfig = cfg
+ , mySupervisor = sup
+ , myMailbox = mgr
+ , myBestBlock = bb
+ , onlinePeers = ob
+ }
+ go `runReaderT` rd
where
- dead ex = PeerStopped ex `sendSTM` mgrConfManager cfg
- run = do
- connectNewPeers
- managerLoop
-
-resolvePeers :: (MonadUnliftIO m, MonadManager n m) => m [(SockAddr, Priority)]
-resolvePeers = do
- cfg <- asks myConfig
- let net = mgrConfNetwork cfg
- confPeers <-
- fmap
- (map (, PriorityManual) . concat)
- (mapM toSockAddr (mgrConfPeers cfg))
- if mgrConfDiscover cfg
- then do
- seedPeers <-
- fmap
- (map (, PrioritySeed) . concat)
- (mapM (toSockAddr . (, getDefaultPort net)) (getSeeds net))
- return (confPeers ++ seedPeers)
- else return confPeers
-
-encodeSockAddr :: SockAddr -> Put
-encodeSockAddr (SockAddrInet6 p _ (a, b, c, d) _) = do
- S.putWord32be a
- S.putWord32be b
- S.putWord32be c
- S.putWord32be d
- S.putWord16be (fromIntegral p)
-
-encodeSockAddr (SockAddrInet p a) = do
- S.putWord32be 0x00000000
- S.putWord32be 0x00000000
- S.putWord32be 0x0000ffff
- S.putWord32host a
- S.putWord16be (fromIntegral p)
-
-encodeSockAddr x = error $ "Colud not encode address: " <> show x
+ mgr = inboxToMailbox inbox
+ discover = mgrConfDiscover cfg
+ go = do
+ db <- getManagerDB
+ $(logDebugS) "Manager" "Initializing..."
+ initPeerDB db discover
+ putBestBlock <=< receiveMatch inbox $ \case
+ ManagerBestBlock b -> Just b
+ _ -> Nothing
+ $(logDebugS) "Manager" "Initialization complete"
+ withConnectLoop mgr $ \a -> do
+ link a
+ forever $ receive inbox >>= managerMessage
+ f (a, mex) = ManagerPeerDied a mex `sendSTM` mgr
+
+putBestBlock :: MonadManager m => BlockHeight -> m ()
+putBestBlock bb = asks myBestBlock >>= \b -> atomically $ writeTVar b bb
+
+getBestBlock :: MonadManager m => m BlockHeight
+getBestBlock = asks myBestBlock >>= readTVarIO
+
+getNetwork :: MonadManager m => m Network
+getNetwork = mgrConfNetwork <$> asks myConfig
+
+populatePeerDB :: (MonadUnliftIO m, MonadManager m) => m ()
+populatePeerDB = do
+ ManagerConfig { mgrConfPeers = statics
+ , mgrConfDB = db
+ , mgrConfDiscover = discover
+ } <- asks myConfig
+ cfg_peers <- concat <$> mapM toSockAddr statics
+ forM_ cfg_peers $ newPeerDB db staticPeerScore
+ when discover $ do
+ net <- getNetwork
+ net_seeds <- concat <$> mapM toSockAddr (networkSeeds net)
+ forM_ net_seeds $ newPeerDB db netSeedScore
+
+logConnectedPeers :: MonadManager m => m ()
+logConnectedPeers = do
+ m <- mgrConfMaxPeers <$> asks myConfig
+ l <- length <$> getConnectedPeers
+ $(logInfoS) "Manager" $
+ "Peers connected: " <> cs (show l) <> "/" <> cs (show m)
-decodeSockAddr :: Get SockAddr
-decodeSockAddr = do
- a <- S.getWord32be
- b <- S.getWord32be
- c <- S.getWord32be
- if a == 0x00000000 && b == 0x00000000 && c == 0x0000ffff
- then do
- d <- S.getWord32host
- p <- S.getWord16be
- return $ SockAddrInet (fromIntegral p) d
- else do
- d <- S.getWord32be
- p <- S.getWord16be
- return $ SockAddrInet6 (fromIntegral p) 0 (a, b, c, d) 0
+getManagerDB :: MonadManager m => m DB
+getManagerDB = mgrConfDB <$> asks myConfig
-connectPeer :: MonadManager n m => SockAddr -> m ()
-connectPeer sa = do
- db <- asks myPeerDB
- let k = PeerAddress sa
- retrieve db Nothing k >>= \case
- Nothing -> error "Could not find peer to mark connected"
- Just v -> do
- now <- computeTime
- let v' = v {getPeerLastConnect = now}
- writeBatch db [deleteOp v, insertOp v' k, insertOp k v']
- logPeersConnected
+getOnlinePeers :: MonadManager m => m [OnlinePeer]
+getOnlinePeers = asks onlinePeers >>= readTVarIO
+getConnectedPeers :: MonadManager m => m [OnlinePeer]
+getConnectedPeers = filter onlinePeerConnected <$> getOnlinePeers
-logPeersConnected :: MonadManager n m => m ()
-logPeersConnected = do
- mo <- mgrConfMaxPeers <$> asks myConfig
- ps <- getOnlinePeers
+purgePeers :: (MonadUnliftIO m, MonadManager m) => m ()
+purgePeers = do
+ db <- getManagerDB
+ ops <- getOnlinePeers
+ forM_ ops $ \OnlinePeer {onlinePeerMailbox = p} -> killPeer PurgingPeer p
+ purgePeerDB db
+
+forwardMessage :: MonadManager m => Peer -> Message -> m ()
+forwardMessage p msg = managerEvent $ PeerMessage p msg
+
+managerEvent :: MonadManager m => PeerEvent -> m ()
+managerEvent e = mgrConfEvents <$> asks myConfig >>= \l -> atomically $ l e
+
+managerMessage :: (MonadUnliftIO m, MonadManager m) => ManagerMessage -> m ()
+managerMessage (ManagerPeerMessage p (MVersion v)) = do
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ e <-
+ runExceptT $ do
+ let ua = getVarString $ userAgent v
+ $(logDebugS) "Manager" $
+ "Got version from peer " <> s <> ": " <> cs ua
+ o <- ExceptT . atomically $ setPeerVersion b p v
+ when (onlinePeerConnected o) $ announcePeer p
+ case e of
+ Right () -> do
+ $(logDebugS) "Manager" $ "Version accepted for peer " <> s
+ MVerAck `sendMessage` p
+ Left x -> do
+ $(logErrorS) "Manager" $
+ "Version rejected for peer " <> s <> ": " <> cs (show x)
+ killPeer x p
+managerMessage (ManagerPeerMessage p MVerAck) = do
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ atomically (setPeerVerAck b p) >>= \case
+ Just o -> do
+ $(logDebugS) "Manager" $ "Received verack from peer: " <> s
+ when (onlinePeerConnected o) $ announcePeer p
+ Nothing -> do
+ $(logErrorS) "Manager" $ "Received verack from unknown peer: " <> s
+ killPeer UnknownPeer p
+managerMessage (ManagerPeerMessage p (MAddr (Addr nas))) = do
+ db <- getManagerDB
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
$(logInfoS) "Manager" $
- "Peers connected: " <> cs (show (length ps)) <> "/" <> cs (show mo)
-
-storePeer :: MonadManager n m => SockAddr -> Priority -> m ()
-storePeer sa prio = do
- db <- asks myPeerDB
- let k = PeerAddress sa
- retrieve db Nothing k >>= \case
+ "Received " <> cs (show (length nas)) <> " peers from " <> s
+ let sas = map (naAddress . snd) nas
+ forM_ sas $ newPeerDB db netPeerScore
+managerMessage (ManagerPeerMessage p msg@(MPong (Pong n))) = do
+ now <- liftIO getCurrentTime
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ atomically (gotPong b n now p) >>= \case
Nothing -> do
- let v =
- PeerTimeAddress
- { getPeerPrio = prio
- , getPeerBanned = 0
- , getPeerLastConnect = 0
- , getPeerNextConnect = 0
- , getPeerTimeAddress = k
- }
- writeBatch
- db
- [insertOp v k, insertOp k v]
- Just v@PeerTimeAddress {..} ->
- when (getPeerPrio < prio) $ do
- let v' = v {getPeerPrio = prio}
- writeBatch
- db
- [deleteOp v, insertOp v' k, insertOp k v']
- Just PeerTimeAddressBase ->
- error "Key for peer is corrupted"
-
-banPeer :: MonadManager n m => SockAddr -> m ()
-banPeer sa = do
- db <- asks myPeerDB
- let k = PeerAddress sa
- retrieve db Nothing k >>= \case
- Nothing -> error "Cannot find peer to be banned"
- Just v -> do
- now <- computeTime
- let v' =
- v
- { getPeerBanned = now
- , getPeerNextConnect = now + 6 * 60 * 60
- }
- when (getPeerPrio v == PriorityNetwork) $ do
- $(logWarnS) "Manager" $ "Banning peer " <> cs (show sa)
- writeBatch db [deleteOp v, insertOp k v', insertOp v' k]
-
-backoffPeer :: MonadManager n m => SockAddr -> m ()
-backoffPeer sa = do
- db <- asks myPeerDB
- onlinePeers <- map onlinePeerAddress <$> getOnlinePeers
- let k = PeerAddress sa
- retrieve db Nothing k >>= \case
- Nothing -> error "Cannot find peer to backoff in database"
- Just v -> do
- now <- computeTime
- r <-
- liftIO . randomRIO $
- if null onlinePeers
- then (90, 300) -- Don't backoff so much if possibly offline
- else (900, 1800)
- let t = max (now + r) (getPeerNextConnect v)
- v' = v {getPeerNextConnect = t}
- when (getPeerPrio v == PriorityNetwork) $ do
+ $(logDebugS) "Manager" $
+ "Forwarding pong " <> cs (show n) <> " from " <> s
+ forwardMessage p msg
+ Just d -> do
+ let ms = d * 1000
+ $(logDebugS) "Manager" $
+ "Ping " <> cs (show n) <> " to " <> s <> " took " <>
+ cs (show ms) <>
+ " milliseconds"
+managerMessage (ManagerPeerMessage p (MPing (Ping n))) = do
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ $(logDebugS) "Manager" $
+ "Responding to ping " <> cs (show n) <> " from " <> s
+ MPong (Pong n) `sendMessage` p
+managerMessage (ManagerPeerMessage p msg) = do
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ let cmd = commandToString $ msgType msg
+ $(logDebugS) "Manager" $
+ "Forwarding message " <> cs cmd <> " from peer " <> s
+ forwardMessage p msg
+managerMessage (ManagerBestBlock h) = putBestBlock h
+managerMessage ManagerConnect = do
+ l <- length <$> getConnectedPeers
+ x <- mgrConfMaxPeers <$> asks myConfig
+ when (l < x) $
+ getNewPeer >>= \case
+ Nothing -> return ()
+ Just sa -> connectPeer sa
+managerMessage (ManagerKill e p) = killPeer e p
+managerMessage (ManagerPeerDied a e) = processPeerOffline a e
+managerMessage ManagerPurgePeers = do
+ $(logWarnS) "Manager" "Purging connected peers and peer database"
+ purgePeers
+managerMessage (ManagerGetPeers reply) =
+ getConnectedPeers >>= atomically . reply
+managerMessage (ManagerGetOnlinePeer p reply) = do
+ b <- asks onlinePeers
+ atomically $ findPeer b p >>= reply
+managerMessage (ManagerCheckPeer p) = checkPeer p
+
+checkPeer :: MonadManager m => Peer -> m ()
+checkPeer p = do
+ ManagerConfig {mgrConfTimeout = to} <- asks myConfig
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ atomically (lastPing b p) >>= \case
+ Nothing -> pingPeer p
+ Just t -> do
+ now <- liftIO getCurrentTime
+ if diffUTCTime now t > fromIntegral to
+ then do
+ $(logErrorS) "Manager" $
+ "Peer " <> s <> " did not respond ping on time"
+ killPeer PeerTimeout p
+ else $(logDebugS) "Manager" $ "peer " <> s <> " awaiting pong"
+
+pingPeer :: MonadManager m => Peer -> m ()
+pingPeer p = do
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ atomically (findPeer b p) >>= \case
+ Nothing -> $(logErrorS) "Manager" $ "Will not ping unknown peer " <> s
+ Just o
+ | onlinePeerConnected o -> do
+ n <- liftIO randomIO
+ now <- liftIO getCurrentTime
+ atomically (setPeerPing b n now p)
+ $(logDebugS) "Manager" $
+ "Sending ping " <> cs (show n) <> " to peer " <> s
+ MPing (Ping n) `sendMessage` p
+ | otherwise ->
$(logWarnS) "Manager" $
- "Backing off peer " <> cs (show sa) <> " for " <>
- cs (show r) <>
- " seconds"
- writeBatch db [deleteOp v, insertOp k v', insertOp v' k]
-
-getNewPeer :: (MonadUnliftIO m, MonadManager n m) => m (Maybe SockAddr)
-getNewPeer = do
- ManagerConfig {..} <- asks myConfig
- online_peers <- map onlinePeerAddress <$> getOnlinePeers
- config_peers <- concat <$> mapM toSockAddr mgrConfPeers
- if mgrConfDiscover
- then do
- db <- asks myPeerDB
- now <- computeTime
- runResourceT . runConduit $
- matching db Nothing PeerTimeAddressBase .|
- CC.filter ((<= now) . getPeerNextConnect . fst) .|
- CC.map (getPeerAddress . snd) .|
- CC.find (not . (`elem` online_peers))
- else return $ find (not . (`elem` online_peers)) config_peers
-
-getConnectedPeers :: MonadManager n m => m [OnlinePeer]
-getConnectedPeers = filter onlinePeerConnected <$> getOnlinePeers
-
-withConnectLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Manager -> m a -> m a
-withConnectLoop mgr f = withAsync go $ const f
+ "Will not ping peer " <> s <> " until handshake complete"
+
+killPeer :: MonadManager m => PeerException -> Peer -> m ()
+killPeer e p = void . runMaybeT $ do
+ b <- asks onlinePeers
+ o <- MaybeT . atomically $ findPeer b p
+ s <- atomically $ peerString b p
+ $(logErrorS) "Manager" $ "Killing peer " <> s <> ": " <> cs (show e)
+ onlinePeerAsync o `cancelWith` e
+
+processPeerOffline :: MonadManager m => Child -> Maybe SomeException -> m ()
+processPeerOffline a e = do
+ b <- asks onlinePeers
+ atomically (findPeerAsync b a) >>= \case
+ Nothing -> log_unknown e
+ Just o -> do
+ let p = onlinePeerMailbox o
+ d = onlinePeerAddress o
+ s <- atomically $ peerString b p
+ if onlinePeerConnected o
+ then do
+ log_disconnected s e
+ managerEvent $ PeerDisconnected p d
+ else log_not_connect s e
+ atomically $ removePeer b p
+ db <- getManagerDB
+ demotePeerDB db (onlinePeerAddress o)
+ logConnectedPeers
where
- go =
- forever $ do
- ManagerPing `send` mgr
- i <- liftIO (randomRIO (30, 90))
- threadDelay (i * 1000 * 1000)
-
-managerLoop :: (MonadUnliftIO m, MonadManager n m) => m ()
-managerLoop =
- forever $ do
- mgr <- asks mySelf
- msg <- receive mgr
- processManagerMessage msg
-
-processManagerMessage ::
- (MonadUnliftIO m, MonadManager n m) => ManagerMessage -> m ()
-
-processManagerMessage (ManagerSetFilter bf) = setFilter bf
-
-processManagerMessage (ManagerSetBest bb) =
- asks myBestBlock >>= atomically . (`writeTVar` bb)
-
-processManagerMessage ManagerPing = connectNewPeers
-
-processManagerMessage (ManagerGetAddr p) = do
- pn <- peerString p
- -- TODO: send list of peers we know about
- $(logWarnS) "Manager" $ "Ignoring address request from peer " <> fromString pn
-
-processManagerMessage (ManagerNewPeers p as) =
- asks myConfig >>= \case
- ManagerConfig {..}
- | not mgrConfDiscover -> return ()
- | otherwise -> do
- pn <- peerString p
- $(logInfoS) "Manager" $
- "Received " <> cs (show (length as)) <>
- " peers from " <>
- fromString pn
- forM_ as $ \(_, na) ->
- let sa = naAddress na
- in storePeer sa PriorityNetwork
-
-processManagerMessage (ManagerKill e p) =
- findPeer p >>= \case
- Nothing -> return ()
- Just op -> do
+ log_unknown Nothing = $(logErrorS) "Manager" "Disconnected unknown peer"
+ log_unknown (Just x) =
+ $(logErrorS) "Manager" $ "Unknown peer died: " <> cs (show x)
+ log_disconnected s Nothing =
+ $(logWarnS) "Manager" $ "Disconnected peer: " <> s
+ log_disconnected s (Just x) =
+ $(logErrorS) "Manager" $ "Peer " <> s <> " died: " <> cs (show x)
+ log_not_connect s Nothing =
+ $(logWarnS) "Manager" $ "Could not connect to peer " <> s
+ log_not_connect s (Just x) =
+ $(logErrorS) "Manager" $
+ "Could not connect to peer " <> s <> ": " <> cs (show x)
+
+announcePeer :: MonadManager m => Peer -> m ()
+announcePeer p = do
+ b <- asks onlinePeers
+ s <- atomically $ peerString b p
+ mgr <- asks myMailbox
+ atomically (findPeer b p) >>= \case
+ Just OnlinePeer {onlinePeerAddress = a, onlinePeerConnected = True} -> do
+ $(logInfoS) "Manager" $ "Handshake completed for peer " <> s
+ managerEvent $ PeerConnected p a
+ logConnectedPeers
+ managerCheck p mgr
+ db <- getManagerDB
+ promotePeerDB db a
+ Just OnlinePeer {onlinePeerConnected = False} ->
$(logErrorS) "Manager" $
- "Killing peer " <> cs (show (onlinePeerAddress op))
- banPeer $ onlinePeerAddress op
- onlinePeerAsync op `cancelWith` e
-
-processManagerMessage (ManagerSetPeerBest p bn) = modifyPeer f p
- where
- f op = op {onlinePeerBestBlock = bn}
-
-processManagerMessage (ManagerGetPeerBest p reply) =
- findPeer p >>= \op -> atomically (reply (fmap onlinePeerBestBlock op))
-
-processManagerMessage (ManagerSetPeerVersion p v) =
- modifyPeer f p >> findPeer p >>= \case
- Nothing -> return ()
- Just op ->
- runExceptT testVersion >>= \case
- Left ex -> do
- banPeer (onlinePeerAddress op)
- onlinePeerAsync op `cancelWith` ex
- Right () -> do
- loadFilter
- askForPeers
- connectPeer (onlinePeerAddress op)
- announcePeer
- where
- f op =
- op
- { onlinePeerVersion = version v
- , onlinePeerServices = services v
- , onlinePeerRemoteNonce = verNonce v
- , onlinePeerUserAgent = getVarString (userAgent v)
- , onlinePeerRelay = relay v
- }
- testVersion = do
- when (services v .&. nodeNetwork == 0) $ throwError NotNetworkPeer
- bfb <- asks myBloomFilter
- bf <- readTVarIO bfb
- when (isJust bf && services v .&. nodeBloom == 0) $
- throwError BloomFiltersNotSupported
- myself <-
- any ((verNonce v ==) . onlinePeerNonce) <$> lift getOnlinePeers
- when myself $ throwError PeerIsMyself
- loadFilter = do
- bfb <- asks myBloomFilter
- bf <- readTVarIO bfb
- case bf of
- Nothing -> return ()
- Just b -> b `peerSetFilter` p
- askForPeers =
- mgrConfDiscover <$> asks myConfig >>= \discover ->
- when discover (MGetAddr `sendMessage` p)
- announcePeer =
- findPeer p >>= \case
- Nothing -> return ()
- Just op
- | onlinePeerConnected op -> return ()
- | otherwise -> do
- $(logInfoS) "Manager" $
- "Connected to " <> cs (show (onlinePeerAddress op))
- l <- mgrConfMgrListener <$> asks myConfig
- atomically (l (ManagerConnect p))
- ch <- asks myChain
- chainNewPeer p ch
- setPeerAnnounced p
-
-processManagerMessage (ManagerGetPeerVersion p reply) =
- fmap onlinePeerVersion <$> findPeer p >>= atomically . reply
-
-processManagerMessage (ManagerGetPeers reply) =
- getPeers >>= atomically . reply
-
-processManagerMessage (ManagerGetOnlinePeer p reply) =
- getOnlinePeer p >>= atomically . reply
-
-processManagerMessage (ManagerPeerPing p i) =
- modifyPeer (\x -> x {onlinePeerPings = take 11 $ i : onlinePeerPings x}) p
+ "Not announcing because not handshaken: " <> s
+ Nothing -> $(logErrorS) "Manager" "Will not announce unknown peer"
-processManagerMessage (PeerStopped (p, _ex)) = do
- opb <- asks onlinePeers
- m <- atomically $ do
- m <- findPeerAsync p opb
- when (isJust m) $ removePeer p opb
- return m
+getNewPeer :: (MonadUnliftIO m, MonadManager m) => m (Maybe SockAddr)
+getNewPeer = do
+ ManagerConfig {mgrConfDB = db} <- asks myConfig
+ exclude <- map onlinePeerAddress <$> getOnlinePeers
+ m <-
+ runMaybeT $
+ MaybeT (getNewPeerDB db exclude) <|>
+ MaybeT (populatePeerDB >> getNewPeerDB db exclude)
case m of
- Just op -> do
- backoffPeer (onlinePeerAddress op)
- processPeerOffline op
- Nothing -> return ()
-
-processPeerOffline :: MonadManager n m => OnlinePeer -> m ()
-processPeerOffline op
- | onlinePeerConnected op = do
- let p = onlinePeerMailbox op
- $(logWarnS) "Manager" $
- "Disconnected peer " <> cs (show (onlinePeerAddress op))
- asks myChain >>= chainRemovePeer p
- l <- mgrConfMgrListener <$> asks myConfig
- atomically (l (ManagerDisconnect p))
- logPeersConnected
- | otherwise =
- $(logWarnS) "Manager" $
- "Could not connect to peer " <> cs (show (onlinePeerAddress op))
-
-getPeers :: MonadManager n m => m [OnlinePeer]
-getPeers = sortBy (compare `on` median . onlinePeerPings) <$> getConnectedPeers
-
-getOnlinePeer :: MonadManager n m => Peer -> m (Maybe OnlinePeer)
-getOnlinePeer p = find ((== p) . onlinePeerMailbox) <$> getConnectedPeers
+ Just a -> return $ Just a
+ Nothing -> return Nothing
-connectNewPeers :: MonadManager n m => m ()
-connectNewPeers = do
- mo <- mgrConfMaxPeers <$> asks myConfig
- ps <- getOnlinePeers
- let n = mo - length ps
- when (null ps) $ do
- ps' <- resolvePeers
- mapM_ (uncurry storePeer) ps'
- go n
+connectPeer :: (MonadUnliftIO m, MonadManager m) => SockAddr -> m ()
+connectPeer sa = do
+ $(logInfoS) "Manager" $ "Connecting to peer: " <> cs (show sa)
+ ManagerConfig {mgrConfNetAddr = ad, mgrConfNetwork = net} <- asks myConfig
+ mgr <- asks myMailbox
+ sup <- asks mySupervisor
+ nonce <- liftIO randomIO
+ bb <- getBestBlock
+ now <- computeTime
+ let rmt = NetworkAddress (srv net) sa
+ ver = buildVersion net nonce bb ad rmt now
+ (inbox, p) <- newMailbox
+ let pc pub =
+ PeerConfig
+ { peerConfListen = pub
+ , peerConfNetwork = net
+ , peerConfAddress = sa
+ }
+ a <- withRunInIO $ \io -> sup `addChild` io (launch mgr pc inbox p)
+ MVersion ver `sendMessage` p
+ b <- asks onlinePeers
+ _ <- atomically $ newOnlinePeer b sa nonce p a
+ return ()
where
- go 0 = return ()
- go n =
- getNewPeer >>= \case
- Nothing -> return ()
- Just sa -> conn sa >> go (n - 1)
- conn sa = do
- ad <- mgrConfNetAddr <$> asks myConfig
- mgr <- asks mySelf
- ch <- asks myChain
- pl <- mgrConfPeerListener <$> asks myConfig
- net <- mgrConfNetwork <$> asks myConfig
- $(logInfoS) "Manager" $ "Connecting to peer " <> cs (show sa)
- nonce <- liftIO randomIO
- let pc =
- PeerConfig
- { peerConfConnect = NetworkAddress (srv net) sa
- , peerConfLocal = ad
- , peerConfManager = mgr
- , peerConfChain = ch
- , peerConfListener = pl
- , peerConfNonce = nonce
- , peerConfNetwork = net
- }
- psup <- asks myPeerSupervisor
- pmbox <- newTBQueueIO 100
- p <- newInbox pmbox
- a <- psup `addChild` peer pc p
- newPeerConnection net sa nonce p a
+ l mgr p m = ManagerPeerMessage p m `sendSTM` mgr
srv net
| getSegWit net = 8
| otherwise = 0
-
-newPeerConnection ::
- MonadManager n m
- => Network
- -> SockAddr
- -> Word64
- -> Peer
- -> Async ()
- -> m ()
-newPeerConnection net sa nonce p a =
- addPeer
- OnlinePeer
- { onlinePeerAddress = sa
- , onlinePeerConnected = False
- , onlinePeerVersion = 0
- , onlinePeerServices = 0
- , onlinePeerRemoteNonce = 0
- , onlinePeerUserAgent = BS.empty
- , onlinePeerRelay = False
- , onlinePeerBestBlock = genesisNode net
- , onlinePeerAsync = a
- , onlinePeerMailbox = p
- , onlinePeerNonce = nonce
- , onlinePeerPings = []
- }
-
-peerString :: MonadManager n m => Peer -> m String
-peerString p = maybe "[unknown]" (show . onlinePeerAddress) <$> findPeer p
-
-setPeerAnnounced :: MonadManager n m => Peer -> m ()
-setPeerAnnounced = modifyPeer (\x -> x {onlinePeerConnected = True})
-
-setFilter :: MonadManager n m => BloomFilter -> m ()
-setFilter bl = do
- bfb <- asks myBloomFilter
- atomically . writeTVar bfb $ Just bl
- ops <- getOnlinePeers
- forM_ ops $ \op ->
- when (onlinePeerConnected op) $
- if acceptsFilters $ onlinePeerServices op
- then bl `peerSetFilter` onlinePeerMailbox op
- else do
- $(logErrorS) "Manager" $
- "Peer " <> cs (show (onlinePeerAddress op)) <>
- "does not support bloom filters"
- banPeer (onlinePeerAddress op)
- onlinePeerAsync op `cancelWith` BloomFiltersNotSupported
-
-findPeer :: MonadManager n m => Peer -> m (Maybe OnlinePeer)
-findPeer p = find ((== p) . onlinePeerMailbox) <$> getOnlinePeers
-
-findPeerAsync :: Async () -> TVar [OnlinePeer] -> STM (Maybe OnlinePeer)
-findPeerAsync a t = find ((== a) . onlinePeerAsync) <$> readTVar t
-
-modifyPeer :: MonadManager n m => (OnlinePeer -> OnlinePeer) -> Peer -> m ()
-modifyPeer f p = modifyOnlinePeers $ map upd
+ launch mgr pc inbox p =
+ withPublisher $ \pub ->
+ bracket (subscribe pub (l mgr p)) (unsubscribe pub) $ \_ ->
+ withPeerLoop p mgr $ \a -> do
+ link a
+ peer (pc pub) inbox
+
+withPeerLoop :: MonadUnliftIO m => Peer -> Manager -> (Async a -> m a) -> m a
+withPeerLoop p mgr = withAsync go
where
- upd op =
- if onlinePeerMailbox op == p
- then f op
- else op
+ go = forever $ do
+ threadDelay =<< liftIO (randomRIO (30 * 1000 * 1000, 90 * 1000 * 1000))
+ ManagerCheckPeer p `send` mgr
-addPeer :: MonadManager n m => OnlinePeer -> m ()
-addPeer op = modifyOnlinePeers $ nubBy f . (op :)
+withConnectLoop :: MonadUnliftIO m => Manager -> (Async a -> m a) -> m a
+withConnectLoop mgr = withAsync go
where
- f = (==) `on` onlinePeerMailbox
-
-removePeer :: Async () -> TVar [OnlinePeer] -> STM ()
-removePeer a t = modifyTVar t $ filter ((/= a) . onlinePeerAsync)
-
-getOnlinePeers :: MonadManager n m => m [OnlinePeer]
-getOnlinePeers = asks onlinePeers >>= readTVarIO
-
-modifyOnlinePeers :: MonadManager n m => ([OnlinePeer] -> [OnlinePeer]) -> m ()
-modifyOnlinePeers f = asks onlinePeers >>= atomically . (`modifyTVar` f)
-
-median :: Fractional a => [a] -> Maybe a
-median ls
- | null ls = Nothing
- | length ls `mod` 2 == 0 =
- Just . (/ 2) . sum . take 2 $ drop (length ls `div` 2 - 1) ls
- | otherwise = Just . head $ drop (length ls `div` 2) ls
+ go = forever $ do
+ ManagerConnect `send` mgr
+ threadDelay =<< liftIO (randomRIO (250 * 1000, 1000 * 1000))
diff --git a/src/Network/Haskoin/Node/Manager/Logic.hs b/src/Network/Haskoin/Node/Manager/Logic.hs
new file mode 100644
index 0000000..b8c5e20
--- /dev/null
+++ b/src/Network/Haskoin/Node/Manager/Logic.hs
@@ -0,0 +1,406 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TupleSections #-}
+{-|
+Module : Network.Haskoin.Manager.Logic
+Copyright : No rights reserved
+License : UNLICENSE
+Maintainer : xenog@protonmail.com
+Stability : experimental
+Portability : POSIX
+
+Logic and data for peer manager.
+-}
+module Network.Haskoin.Node.Manager.Logic where
+
+import Conduit
+import Control.Monad
+import Control.Monad.Except
+import Control.Monad.Trans.Maybe
+import Data.Bits
+import qualified Data.ByteString as B
+import Data.Default
+import Data.List
+import Data.Maybe
+import Data.Serialize (Get, Put, Serialize)
+import Data.Serialize as S
+import Data.String.Conversions
+import Data.Text (Text)
+import Data.Time.Clock
+import Data.Word
+import Database.RocksDB (DB)
+import qualified Database.RocksDB as R
+import Database.RocksDB.Query as R
+import Haskoin
+import Network.Haskoin.Node.Common
+import Network.Socket (SockAddr (..))
+import UnliftIO
+import UnliftIO.Resource as U
+
+-- | Database version.
+versionPeerDB :: Word32
+versionPeerDB = 4
+
+-- | Peer score. Lower is better.
+type Score = Word8
+
+-- | Peer address key in database.
+data PeerAddress
+ = PeerAddress {
+ getPeerAddress :: !SockAddr
+ -- ^ peer network address and port
+ }
+ | PeerAddressBase
+ deriving (Eq, Ord, Show)
+
+-- | Peer score key in database.
+data PeerScore
+ = PeerScore !Score !SockAddr
+ | PeerScoreBase
+ deriving (Eq, Ord, Show)
+
+instance Serialize PeerScore where
+ put (PeerScore s sa) = do
+ putWord8 0x83
+ S.put s
+ encodeSockAddr sa
+ put PeerScoreBase = S.putWord8 0x83
+ get = do
+ guard . (== 0x83) =<< S.getWord8
+ PeerScore <$> S.get <*> decodeSockAddr
+
+instance Key PeerScore
+instance KeyValue PeerScore ()
+
+-- | Database version key.
+data PeerDataVersionKey = PeerDataVersionKey
+ deriving (Eq, Ord, Show)
+
+instance Serialize PeerDataVersionKey where
+ get = do
+ guard . (== 0x82) =<< S.getWord8
+ return PeerDataVersionKey
+ put PeerDataVersionKey = S.putWord8 0x82
+
+instance Key PeerDataVersionKey
+instance KeyValue PeerDataVersionKey Word32
+
+instance Serialize PeerAddress where
+ get = do
+ guard . (== 0x81) =<< S.getWord8
+ PeerAddress <$> decodeSockAddr
+ put PeerAddress {getPeerAddress = a} = do
+ S.putWord8 0x81
+ encodeSockAddr a
+ put PeerAddressBase = S.putWord8 0x81
+
+-- | Peer data.
+data PeerData = PeerData !Score !Bool
+ deriving (Eq, Show, Ord)
+
+instance Serialize PeerData where
+ put (PeerData s p) = S.put s >> S.put p
+ get = PeerData <$> S.get <*> S.get
+
+instance Key PeerAddress
+instance KeyValue PeerAddress PeerData
+
+-- | Update peer score and pass in database.
+updatePeerDB :: MonadIO m => DB -> SockAddr -> Score -> Bool -> m ()
+updatePeerDB db sa score pass =
+ retrieve db def (PeerAddress sa) >>= \case
+ Nothing ->
+ writeBatch
+ db
+ [ insertOp (PeerAddress sa) (PeerData score pass)
+ , insertOp (PeerScore score sa) ()
+ ]
+ Just (PeerData score' _) ->
+ writeBatch
+ db
+ [ deleteOp (PeerScore score' sa)
+ , insertOp (PeerAddress sa) (PeerData score pass)
+ , insertOp (PeerScore score sa) ()
+ ]
+
+-- | Add a peer to database if it is not already there.
+newPeerDB :: MonadIO m => DB -> Score -> SockAddr -> m ()
+newPeerDB db score sa =
+ retrieve db def (PeerAddress sa) >>= \case
+ Just PeerData {} -> return ()
+ Nothing ->
+ writeBatch
+ db
+ [ insertOp (PeerAddress sa) (PeerData score False)
+ , insertOp (PeerScore score sa) ()
+ ]
+
+-- | Initialize peer database, purging it of conflicting records if version
+-- doesn't match current one.
+initPeerDB :: MonadUnliftIO m => DB -> Bool -> m ()
+initPeerDB db discover = do
+ ver <- retrieve db def PeerDataVersionKey
+ when (ver /= Just versionPeerDB || not discover) $ purgePeerDB db
+ R.insert db PeerDataVersionKey versionPeerDB
+
+-- | Purge conflicting records from database.
+purgePeerDB :: MonadUnliftIO m => DB -> m ()
+purgePeerDB db = purge_byte 0x81 >> purge_byte 0x83
+ where
+ purge_byte byte =
+ U.runResourceT . R.withIterator db def $ \it -> do
+ R.iterSeek it $ B.singleton byte
+ recurse_delete it byte
+ recurse_delete it byte =
+ R.iterKey it >>= \case
+ Nothing -> return ()
+ Just k
+ | B.head k == byte -> do
+ R.delete db def k
+ R.iterNext it
+ recurse_delete it byte
+ | otherwise -> return ()
+
+-- | Get static network seeds.
+networkSeeds :: Network -> [HostPort]
+networkSeeds net = map (, getDefaultPort net) (getSeeds net)
+
+-- | Get default score for statically-defined peers.
+staticPeerScore :: Score
+staticPeerScore = maxBound `div` 4
+
+-- | Get default score for network seeds.
+netSeedScore :: Score
+netSeedScore = maxBound `div` 2
+
+-- | Get default score for discovered peers.
+netPeerScore :: Score
+netPeerScore = maxBound `div` 3 * 4
+
+-- | Serialize a network address/port.
+encodeSockAddr :: SockAddr -> Put
+encodeSockAddr (SockAddrInet6 p _ (a, b, c, d) _) = do
+ S.putWord32be a
+ S.putWord32be b
+ S.putWord32be c
+ S.putWord32be d
+ S.putWord16be (fromIntegral p)
+encodeSockAddr (SockAddrInet p a) = do
+ S.putWord32be 0x00000000
+ S.putWord32be 0x00000000
+ S.putWord32be 0x0000ffff
+ S.putWord32host a
+ S.putWord16be (fromIntegral p)
+encodeSockAddr x = error $ "Could not encode address: " <> show x
+
+-- | Deserialize a network address/port.
+decodeSockAddr :: Get SockAddr
+decodeSockAddr = do
+ a <- S.getWord32be
+ b <- S.getWord32be
+ c <- S.getWord32be
+ if a == 0x00000000 && b == 0x00000000 && c == 0x0000ffff
+ then do
+ d <- S.getWord32host
+ p <- S.getWord16be
+ return $ SockAddrInet (fromIntegral p) d
+ else do
+ d <- S.getWord32be
+ p <- S.getWord16be
+ return $ SockAddrInet6 (fromIntegral p) 0 (a, b, c, d) 0
+
+-- | Get database entry for provided peer address.
+getPeerDB :: MonadIO m => DB -> SockAddr -> m (Maybe PeerData)
+getPeerDB db sa = retrieve db def (PeerAddress sa)
+
+-- | Promote a peer by improving its score (decreasing by one).
+promotePeerDB :: MonadIO m => DB -> SockAddr -> m ()
+promotePeerDB db sa =
+ getPeerDB db sa >>= \case
+ Nothing -> return ()
+ Just (PeerData score pass) ->
+ updatePeerDB
+ db
+ sa
+ (if score == 0
+ then score
+ else score - 1)
+ pass
+
+-- | Demote a peer increasing its score by one.
+demotePeerDB :: MonadIO m => DB -> SockAddr -> m ()
+demotePeerDB db sa =
+ getPeerDB db sa >>= \case
+ Nothing -> return ()
+ Just (PeerData score pass) ->
+ updatePeerDB
+ db
+ sa
+ (if score == maxBound
+ then score
+ else score + 1)
+ pass
+
+-- | Get a peer from database. Pass a list of addresses to exclude.
+getNewPeerDB :: MonadUnliftIO m => DB -> [SockAddr] -> m (Maybe SockAddr)
+getNewPeerDB db exclude = go >>= maybe (reset_pass >> go) (return . Just)
+ where
+ go =
+ U.runResourceT . runConduit $
+ matching db def PeerScoreBase .| filterMC filter_pass .|
+ mapC get_address .|
+ filterC (`notElem` exclude) .|
+ headC
+ reset_pass =
+ U.runResourceT . runConduit $
+ matching db def PeerAddressBase .| mapM_C reset_peer_pass
+ reset_peer_pass (PeerAddress addr, PeerData score _) =
+ updatePeerDB db addr score False
+ reset_peer_pass _ = return ()
+ filter_pass (PeerScore _ addr, ()) =
+ retrieve db def (PeerAddress addr) >>= \case
+ Just (PeerData score pass)
+ | not pass -> do
+ updatePeerDB db addr score True
+ return True
+ _ -> return False
+ filter_pass _ = return False
+ get_address (PeerScore _ addr, ()) = addr
+ get_address _ = error "Something is wrong with peer database"
+
+-- | Report receiving a pong from a connected peer. Will store ping roundtrip
+-- time in a window of latest eleven. Peers are returned by the manager in order
+-- of median roundtrip time.
+gotPong ::
+ TVar [OnlinePeer]
+ -> Word64
+ -> UTCTime
+ -> Peer
+ -> STM (Maybe NominalDiffTime)
+gotPong b nonce now p =
+ runMaybeT $ do
+ o <- MaybeT $ findPeer b p
+ (time, old_nonce) <- MaybeT . return $ onlinePeerPing o
+ guard $ nonce == old_nonce
+ let diff = now `diffUTCTime` time
+ lift $
+ insertPeer
+ b
+ o
+ { onlinePeerPing = Nothing
+ , onlinePeerPings = take 11 $ diff : onlinePeerPings o
+ }
+ return diff
+
+-- | Return time of last ping sent to peer, if any.
+lastPing :: TVar [OnlinePeer] -> Peer -> STM (Maybe UTCTime)
+lastPing b p =
+ findPeer b p >>= \case
+ Just OnlinePeer {onlinePeerPing = Just (time, _)} -> return (Just time)
+ _ -> return Nothing
+
+-- | Set nonce and time of last ping sent to peer.
+setPeerPing :: TVar [OnlinePeer] -> Word64 -> UTCTime -> Peer -> STM ()
+setPeerPing b nonce now p =
+ modifyPeer b p $ \o -> o {onlinePeerPing = Just (now, nonce)}
+
+-- | Set version for online peer. Will set the peer connected status to 'True'
+-- if a verack message has already been registered for that peer.
+setPeerVersion ::
+ TVar [OnlinePeer]
+ -> Peer
+ -> Version
+ -> STM (Either PeerException OnlinePeer)
+setPeerVersion b p v =
+ runExceptT $ do
+ when (services v .&. nodeNetwork == 0) $ throwError NotNetworkPeer
+ ops <- lift $ readTVar b
+ when (any ((verNonce v ==) . onlinePeerNonce) ops) $
+ throwError PeerIsMyself
+ lift (findPeer b p) >>= \case
+ Nothing -> throwError UnknownPeer
+ Just o -> do
+ let n =
+ o
+ { onlinePeerVersion = Just v
+ , onlinePeerConnected = onlinePeerVerAck o
+ }
+ lift $ insertPeer b n
+ return n
+
+-- | Register that a verack message was received from a peer.
+setPeerVerAck :: TVar [OnlinePeer] -> Peer -> STM (Maybe OnlinePeer)
+setPeerVerAck b p =
+ runMaybeT $ do
+ o <- MaybeT $ findPeer b p
+ let n =
+ o
+ { onlinePeerVerAck = True
+ , onlinePeerConnected = isJust (onlinePeerVersion o)
+ }
+ lift $ insertPeer b n
+ return n
+
+-- | Create 'OnlinePeer' data structure.
+newOnlinePeer ::
+ TVar [OnlinePeer]
+ -> SockAddr
+ -- ^ peer address
+ -> Word64
+ -- ^ nonce sent to peer
+ -> Peer
+ -- ^ peer mailbox
+ -> Async ()
+ -- ^ peer asynchronous handle
+ -> STM OnlinePeer
+newOnlinePeer b sa nonce peer peer_async = do
+ let op =
+ OnlinePeer
+ { onlinePeerAddress = sa
+ , onlinePeerVerAck = False
+ , onlinePeerConnected = False
+ , onlinePeerVersion = Nothing
+ , onlinePeerAsync = peer_async
+ , onlinePeerMailbox = peer
+ , onlinePeerNonce = nonce
+ , onlinePeerPings = []
+ , onlinePeerPing = Nothing
+ }
+ insertPeer b op
+ return op
+
+-- | Get a human-readable string for the peer address.
+peerString :: TVar [OnlinePeer] -> Peer -> STM Text
+peerString b p =
+ maybe "[unknown]" (cs . show . onlinePeerAddress) <$> findPeer b p
+
+-- | Find a connected peer.
+findPeer :: TVar [OnlinePeer] -> Peer -> STM (Maybe OnlinePeer)
+findPeer b p = find ((== p) . onlinePeerMailbox) <$> readTVar b
+
+-- | Insert or replace a connected peer.
+insertPeer :: TVar [OnlinePeer] -> OnlinePeer -> STM ()
+insertPeer b o = modifyTVar b $ \x -> sort . nub $ o : x
+
+-- | Modify an online peer.
+modifyPeer :: TVar [OnlinePeer] -> Peer -> (OnlinePeer -> OnlinePeer) -> STM ()
+modifyPeer b p f =
+ findPeer b p >>= \case
+ Nothing -> return ()
+ Just o -> insertPeer b $ f o
+
+-- | Remove an online peer.
+removePeer :: TVar [OnlinePeer] -> Peer -> STM ()
+removePeer b p = modifyTVar b $ \x -> filter ((/= p) . onlinePeerMailbox) x
+
+-- | Find online peer by asynchronous handle.
+findPeerAsync :: TVar [OnlinePeer] -> Async () -> STM (Maybe OnlinePeer)
+findPeerAsync b a = find ((== a) . onlinePeerAsync) <$> readTVar b
+
+-- | Remove online peer by asynchronous handle.
+removePeerAsync :: TVar [OnlinePeer] -> Async () -> STM ()
+removePeerAsync b a = modifyTVar b $ \x -> filter ((/= a) . onlinePeerAsync) x
diff --git a/src/Network/Haskoin/Node/Peer.hs b/src/Network/Haskoin/Node/Peer.hs
index a0ef82b..829ec9a 100644
--- a/src/Network/Haskoin/Node/Peer.hs
+++ b/src/Network/Haskoin/Node/Peer.hs
@@ -1,341 +1,106 @@
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
-{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
+{-|
+Module : Network.Haskoin.Node.Peer
+Copyright : No rights reserved
+License : UNLICENSE
+Maintainer : xenog@protonmail.com
+Stability : experimental
+Portability : POSIX
+
+Network peer process. Represents a network peer connection locally.
+-}
module Network.Haskoin.Node.Peer
( peer
) where
+import Conduit
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
-import Data.Bits
import Data.ByteString (ByteString)
-import qualified Data.ByteString as BS
-import qualified Data.ByteString.Char8 as C8
-import qualified Data.ByteString.Lazy as BL
-import Data.Conduit
-import qualified Data.Conduit.Binary as CB
+import qualified Data.ByteString as B
import Data.Conduit.Network
-import Data.Maybe
import Data.Serialize
-import Data.String
import Data.String.Conversions
-import Data.Time.Clock
-import Data.Word
-import Network.Haskoin.Block
+import Data.Text (Text)
import Network.Haskoin.Constants
import Network.Haskoin.Network
import Network.Haskoin.Node.Common
-import Network.Haskoin.Transaction
import Network.Socket (SockAddr)
import NQE
-import System.Random
import UnliftIO
-type MonadPeer m = (MonadUnliftIO m, MonadLoggerIO m, MonadReader PeerReader m)
-
-data Pending
- = PendingTx !TxHash
- | PendingBlock !BlockHash
- | PendingMerkle !BlockHash
- | PendingHeaders
- deriving (Show, Eq)
-
-data PeerReader = PeerReader
- { mySelf :: !Peer
- , myConfig :: !PeerConfig
- , mySockAddr :: !SockAddr
- , myHostPort :: !(Host, Port)
- , myPending :: !(TVar [(Pending, Word32)])
- }
-
-time :: Int
-time = 15 * 1000 * 1000
-
-logMsg :: IsString a => Message -> a
-logMsg = fromString . cs . commandToString . msgType
-
-logPeer ::
- (ConvertibleStrings String a, Semigroup a, IsString a) => SockAddr -> a
-logPeer sa = "Peer<" <> cs (show sa) <> ">"
-
-peer :: (MonadUnliftIO m, MonadLoggerIO m) => PeerConfig -> Peer -> m ()
-peer pc p =
- fromSockAddr na >>= \case
- Nothing -> do
- $(logErrorS) (logPeer na) "Invalid network address"
- throwIO PeerAddressInvalid
- Just (host, port) -> do
- let cset = clientSettings port (C8.pack host)
- runGeneralTCPClient cset (peerSession (host, port))
+-- | Run peer process in current thread.
+peer ::
+ (MonadUnliftIO m, MonadLoggerIO m)
+ => PeerConfig
+ -> Inbox PeerMessage
+ -> m ()
+peer pc inbox = withConnection a $ \ad -> runReaderT (peer_session ad) pc
where
- na = naAddress (peerConfConnect pc)
- go = handshake >> exchangePing >> peerLoop
+ a = peerConfAddress pc
+ go = forever $ receive inbox >>= dispatchMessage pc
net = peerConfNetwork pc
- peerSession hp ad = do
- let src =
- runConduit $
- appSource ad .| inPeerConduit net .| conduitMailbox p
- snk = outPeerConduit net .| appSink ad
- withAsync src $ \as -> do
- link as
- pbox <- newTVarIO []
- let rd =
- PeerReader
- { myConfig = pc
- , mySelf = p
- , myHostPort = hp
- , mySockAddr = na
- , myPending = pbox
- }
- runReaderT (runConduit (go .| snk)) rd
-
-handshake :: MonadPeer m => ConduitT () Message m ()
-handshake = do
- p <- asks mySelf
- ch <- peerConfChain <$> asks myConfig
- rmt <- peerConfConnect <$> asks myConfig
- loc <- peerConfLocal <$> asks myConfig
- net <- peerConfNetwork <$> asks myConfig
- nonce <- peerConfNonce <$> asks myConfig
- bb <- chainGetBest ch
- ver <- buildVersion net nonce (nodeHeight bb) loc rmt
- yield $ MVersion ver
- lift (remoteVer p) >>= \case
- v
- | testSegWit net v -> do
- yield MVerAck
- lift (remoteVerAck p)
- mgr <- peerConfManager <$> asks myConfig
- managerSetPeerVersion p v mgr
- | otherwise -> do
- yield . MReject $
- reject MCVersion RejectObsolete "No SegWit support"
- throwIO PeerNoSegWit
- where
- testSegWit net v
- | getSegWit net = services v `testBit` 3
- | otherwise = True
- remoteVer p = do
- m <-
- timeout time . receiveMatch p $ \case
- PeerIncoming (MVersion v) -> Just v
- _ -> Nothing
- case m of
- Just v -> return v
- Nothing -> throwIO PeerTimeout
- remoteVerAck p = do
- m <-
- timeout time . receiveMatch p $ \case
- PeerIncoming MVerAck -> Just ()
- _ -> Nothing
- when (isNothing m) $ throwIO PeerTimeout
-
-peerLoop :: MonadPeer m => ConduitT () Message m ()
-peerLoop =
- forever $ do
- me <- asks mySelf
- m <- lift $ timeout (2 * 60 * 1000 * 1000) (receive me)
- case m of
- Nothing -> exchangePing
- Just msg -> processMessage msg
-
-exchangePing :: MonadPeer m => ConduitT () Message m ()
-exchangePing = do
- lp <- logMe
- i <- liftIO randomIO
- yield $ MPing (Ping i)
- me <- asks mySelf
- mgr <- peerConfManager <$> asks myConfig
- t1 <- liftIO getCurrentTime
- m <-
- lift . timeout time . receiveMatch me $ \case
- PeerIncoming (MPong (Pong j))
- | i == j -> Just ()
- _ -> Nothing
- case m of
- Nothing -> do
- $(logErrorS) lp "Timeout while waiting for pong"
- throwIO PeerTimeout
- Just () -> do
- t2 <- liftIO getCurrentTime
- let d = t2 `diffUTCTime` t1
- $(logDebugS) lp $
- "Roundtrip: " <> cs (show (d * 1000)) <> " ms"
- ManagerPeerPing me d `send` mgr
-
-checkStale :: MonadPeer m => ConduitM () Message m ()
-checkStale = do
- pbox <- asks myPending
- ps <- readTVarIO pbox
- case ps of
- [] -> return ()
- (_, ts):_ -> do
- cur <- computeTime
- when (cur > ts + 30) $ throwIO PeerTimeout
-
-registerOutgoing :: MonadPeer m => Message -> m ()
-registerOutgoing (MGetData (GetData ivs)) = do
- pbox <- asks myPending
- cur <- computeTime
- ms <-
- fmap catMaybes . forM ivs $ \iv ->
- case toPending iv of
- Nothing -> return Nothing
- Just p -> return $ Just (p, cur)
- atomically (modifyTVar pbox (++ ms))
- where
- toPending InvVector {invType = InvTx, invHash = hash} =
- Just (PendingTx (TxHash hash))
- toPending InvVector {invType = InvWitnessTx, invHash = hash} =
- Just (PendingTx (TxHash hash))
- toPending InvVector {invType = InvBlock, invHash = hash} =
- Just (PendingBlock (BlockHash hash))
- toPending InvVector {invType = InvWitnessBlock, invHash = hash} =
- Just (PendingBlock (BlockHash hash))
- toPending InvVector {invType = InvMerkleBlock, invHash = hash} =
- Just (PendingMerkle (BlockHash hash))
- toPending InvVector {invType = InvWitnessMerkleBlock, invHash = hash} =
- Just (PendingMerkle (BlockHash hash))
- toPending _ = Nothing
-registerOutgoing MGetHeaders {} = do
- pbox <- asks myPending
- cur <- computeTime
- atomically (modifyTVar pbox (reverse . ((PendingHeaders, cur) :) . reverse))
-registerOutgoing _ = return ()
-
-registerIncoming :: MonadPeer m => Message -> m ()
-registerIncoming (MNotFound (NotFound ivs)) =
- asks myPending >>= \pbox ->
- atomically (modifyTVar pbox (filter (matchNotFound . fst)))
- where
- matchNotFound (PendingTx (TxHash hash)) =
- InvVector InvTx hash `notElem` ivs &&
- InvVector InvWitnessTx hash `notElem` ivs
- matchNotFound (PendingBlock (BlockHash hash)) =
- InvVector InvBlock hash `notElem` ivs &&
- InvVector InvWitnessBlock hash `notElem` ivs
- matchNotFound (PendingMerkle (BlockHash hash)) =
- InvVector InvBlock hash `notElem` ivs &&
- InvVector InvMerkleBlock hash `notElem` ivs &&
- InvVector InvWitnessMerkleBlock hash `notElem` ivs
- matchNotFound _ = False
-registerIncoming (MTx t) =
- asks myPending >>= \pbox ->
- atomically (modifyTVar pbox (filter ((/= PendingTx (txHash t)) . fst)))
-registerIncoming (MBlock b) =
- asks myPending >>= \pbox ->
- atomically $
- modifyTVar
- pbox
- (filter ((/= PendingBlock (headerHash (blockHeader b))) . fst))
-registerIncoming (MMerkleBlock b) =
- asks myPending >>= \pbox ->
- atomically $
- modifyTVar
- pbox
- (filter ((/= PendingMerkle (headerHash (merkleHeader b))) . fst))
-registerIncoming MHeaders {} =
- asks myPending >>= \pbox ->
- atomically $ modifyTVar pbox (filter ((/= PendingHeaders) . fst))
-registerIncoming _ = return ()
-
-processMessage :: MonadPeer m => PeerMessage -> ConduitM () Message m ()
-processMessage m = do
- checkStale
- case m of
- PeerOutgoing msg -> do
- lift (registerOutgoing msg)
- yield msg
- PeerIncoming msg -> do
- lift (registerIncoming msg)
- incoming msg
-
-logMe ::
- ( ConvertibleStrings String a
- , Semigroup a
- , IsString a
- , MonadReader PeerReader m
- )
- => m a
-logMe = logPeer <$> asks mySockAddr
-
-incoming :: MonadPeer m => Message -> ConduitT () Message m ()
-incoming m = do
- lp <- lift logMe
- p <- asks mySelf
- l <- peerConfListener <$> asks myConfig
- mgr <- peerConfManager <$> asks myConfig
- ch <- peerConfChain <$> asks myConfig
- case m of
- MVersion _ -> do
- $(logErrorS) lp $ "Received duplicate " <> logMsg m
- yield $
- MReject
- Reject
- { rejectMessage = MCVersion
- , rejectCode = RejectDuplicate
- , rejectReason = VarString BS.empty
- , rejectData = BS.empty
- }
- MPing (Ping n) -> yield $ MPong (Pong n)
- MPong (Pong n) -> atomically (l (p, GotPong n))
- MSendHeaders {} -> ChainSendHeaders p `send` ch
- MAlert {} -> $(logWarnS) lp $ "Deprecated " <> logMsg m
- MAddr (Addr as) -> managerNewPeers p as mgr
- MInv (Inv is) -> do
- let ts = [TxHash (invHash i) | i <- is, invType i == InvTx]
- bs =
- [ BlockHash (invHash i)
- | i <- is
- , invType i == InvBlock || invType i == InvMerkleBlock
- ]
- unless (null ts) $ atomically $ l (p, TxAvail ts)
- unless (null bs) $ ChainNewBlocks p bs `send` ch
- MTx tx -> atomically (l (p, GotTx tx))
- MBlock b -> atomically (l (p, GotBlock b))
- MMerkleBlock b -> atomically (l (p, GotMerkleBlock b))
- MHeaders (Headers hcs) -> ChainNewHeaders p hcs `send` ch
- MGetData (GetData d) -> atomically (l (p, SendData d))
- MNotFound (NotFound ns) -> do
- let f (InvVector InvTx hash) = Just (TxNotFound (TxHash hash))
- f (InvVector InvWitnessTx hash) =
- Just (TxNotFound (TxHash hash))
- f (InvVector InvBlock hash) =
- Just (BlockNotFound (BlockHash hash))
- f (InvVector InvWitnessBlock hash) =
- Just (BlockNotFound (BlockHash hash))
- f (InvVector InvMerkleBlock hash) =
- Just (BlockNotFound (BlockHash hash))
- f (InvVector InvWitnessMerkleBlock hash) =
- Just (BlockNotFound (BlockHash hash))
- f _ = Nothing
- events = mapMaybe f ns
- atomically (mapM_ (l . (p, )) events)
- MGetBlocks g -> atomically (l (p, SendBlocks g))
- MGetHeaders h -> atomically (l (p, SendHeaders h))
- MReject r -> atomically (l (p, Rejected r))
- MMempool -> atomically (l (p, WantMempool))
- MGetAddr -> managerGetAddr p mgr
- _ -> $(logWarnS) lp $ "Ignoring message: " <> logMsg m
-
-inPeerConduit :: MonadIO m => Network -> ConduitT ByteString PeerMessage m ()
-inPeerConduit net = do
- headerBytes <- CB.take 24
- case decodeLazy headerBytes of
- Left e -> throwIO $ DecodeMessageError e
- Right (MessageHeader _ _cmd len _) -> do
- when (len > 32 * 2 ^ (20 :: Int)) . throwIO $ PayloadTooLarge len
- payloadBytes <- CB.take (fromIntegral len)
- case runGetLazy (getMessage net) $ headerBytes `BL.append` payloadBytes of
- Left e -> throwIO $ CannotDecodePayload e
- Right msg -> yield $ PeerIncoming msg
- inPeerConduit net
-
+ peer_session ad =
+ let ins = appSource ad
+ ons = appSink ad
+ src = runConduit $ ins .| inPeerConduit net a .| mapM_C send_msg
+ snk = outPeerConduit net .| ons
+ in withAsync src $ \as -> do
+ link as
+ runConduit (go .| snk)
+ send_msg = (`send` peerConfListen pc) . Event
+
+-- | Internal function to dispatch peer messages.
+dispatchMessage ::
+ MonadLoggerIO m => PeerConfig -> PeerMessage -> ConduitT i Message m ()
+dispatchMessage cfg (SendMessage msg) = do
+ $(logDebugS) (peerString (peerConfAddress cfg)) $
+ "Outgoing: " <> cs (commandToString (msgType msg))
+ yield msg
+dispatchMessage cfg (GetPublisher reply) =
+ atomically $ reply (peerConfListen cfg)
+
+-- | Internal conduit to parse messages coming from peer.
+inPeerConduit ::
+ MonadLoggerIO m
+ => Network
+ -> SockAddr
+ -> ConduitT ByteString Message m ()
+inPeerConduit net a = do
+ x <- takeCE 24 .| foldC
+ case decode x of
+ Left _ -> do
+ $(logErrorS)
+ (peerString a)
+ "Could not decode incoming message header"
+ throwIO DecodeHeaderError
+ Right (MessageHeader _ _ len _) -> do
+ when (len > 32 * 2 ^ (20 :: Int)) $ do
+ $(logErrorS) (peerString a) "Payload too large"
+ throwIO $ PayloadTooLarge len
+ y <- takeCE (fromIntegral len) .| foldC
+ case runGet (getMessage net) $ x `B.append` y of
+ Left e -> do
+ $(logErrorS) (peerString a) $
+ "Cannot decode payload: " <> cs (show e)
+ throwIO CannotDecodePayload
+ Right msg -> do
+ $(logDebugS) (peerString a) $
+ "Incoming: " <> cs (commandToString (msgType msg))
+ yield msg
+ inPeerConduit net a
+
+-- | Outgoing peer conduit to serialize and send messages.
outPeerConduit :: Monad m => Network -> ConduitT Message ByteString m ()
outPeerConduit net = awaitForever $ yield . runPut . putMessage net
+
+-- | Peer string for logging
+peerString :: SockAddr -> Text
+peerString a = "Peer{" <> cs (show a) <> "}"
diff --git a/test/Haskoin/NodeSpec.hs b/test/Haskoin/NodeSpec.hs
new file mode 100644
index 0000000..a9d3864
--- /dev/null
+++ b/test/Haskoin/NodeSpec.hs
@@ -0,0 +1,147 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+module Haskoin.NodeSpec
+ ( spec
+ ) where
+
+import Control.Monad
+import Control.Monad.Logger
+import Control.Monad.Trans
+import qualified Database.RocksDB as R
+import Haskoin
+import Haskoin.Node
+import Network.Socket (SockAddr (..))
+import NQE
+import Test.Hspec
+import UnliftIO
+
+data TestNode = TestNode
+ { testMgr :: Manager
+ , testChain :: Chain
+ , nodeEvents :: Inbox NodeEvent
+ }
+
+spec :: Spec
+spec = do
+ let net = btcTest
+ describe "peer manager on test network" $ do
+ it "connects to a peer" $
+ withTestNode net "connect-one-peer" $ \TestNode {..} -> do
+ p <- waitForPeer nodeEvents
+ Just OnlinePeer {onlinePeerVersion = Just Version {version = ver}} <-
+ managerGetPeer p testMgr
+ ver `shouldSatisfy` (>= 70002)
+ it "downloads some blocks" $
+ withTestNode net "get-blocks" $ \TestNode {..} -> do
+ let h1 =
+ "000000000babf10e26f6cba54d9c282983f1d1ce7061f7e875b58f8ca47db932"
+ h2 =
+ "00000000851f278a8b2c466717184aae859af5b83c6f850666afbc349cf61577"
+ p <- waitForPeer nodeEvents
+ Just [b1, b2] <- peerGetBlocks net 30 p [h1, h2]
+ headerHash (blockHeader b1) `shouldBe` h1
+ headerHash (blockHeader b2) `shouldBe` h2
+ let testMerkle b =
+ merkleRoot (blockHeader b) `shouldBe`
+ buildMerkleRoot (map txHash (blockTxns b))
+ testMerkle b1
+ testMerkle b2
+ it "connects to two peers" $
+ withTestNode net "connect-peers" $ \TestNode {..} ->
+ replicateM_ 2 (waitForPeer nodeEvents) `shouldReturn` ()
+ it "attempts to get inexistent things" $
+ withTestNode net "download-fail" $ \TestNode {..} -> do
+ let hs =
+ [ "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
+ , "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
+ ]
+ p <- waitForPeer nodeEvents
+ peerGetTxs net 30 p hs `shouldReturn` Nothing
+ describe "chain on test network" $ do
+ it "syncs some headers" $
+ withTestNode net "connect-sync" $ \TestNode {..} -> do
+ let h =
+ "000000009ec921df4bb16aedd11567e27ede3c0b63835b257475d64a059f102b"
+ hs =
+ [ headerHash (getGenesisHeader net)
+ , "0000000005bdbddb59a3cd33b69db94fa67669c41d9d32751512b5d7b68c71cf"
+ , "00000000185b36fa6e406626a722793bea80531515e0b2a99ff05b73738901f1"
+ , "000000001ab69b12b73ccdf46c9fbb4489e144b54f1565e42e481c8405077bdd"
+ ]
+ bns <-
+ replicateM 4 . receiveMatch nodeEvents $ \case
+ ChainEvent (ChainBestBlock bn) -> Just bn
+ _ -> Nothing
+ bb <- chainGetBest testChain
+ nodeHeight bb `shouldSatisfy` (>= 6000)
+ an <-
+ maybe (throwString "No ancestor found") return =<<
+ chainGetAncestor 2357 (last bns) testChain
+ map (headerHash . nodeHeader) bns `shouldBe` hs
+ headerHash (nodeHeader an) `shouldBe` h
+ it "downloads some block parents" $
+ withTestNode net "parents" $ \TestNode {..} -> do
+ let hs =
+ [ "00000000c74a24e1b1f2c04923c514ed88fc785cf68f52ed0ccffd3c6fe3fbd9"
+ , "000000007e5c5f40e495186ac4122f2e4ee25788cc36984a5760c55ecb376cb1"
+ , "00000000a6299059b2bff3479bc569019792e75f3c0f39b10a0bc85eac1b1615"
+ ]
+ [_, bn] <-
+ replicateM 2 $
+ receiveMatch nodeEvents $ \case
+ ChainEvent (ChainBestBlock bn) -> Just bn
+ _ -> Nothing
+ nodeHeight bn `shouldBe` 2000
+ ps <- chainGetParents 1997 bn testChain
+ length ps `shouldBe` 3
+ forM_ (zip ps hs) $ \(p, h) ->
+ headerHash (nodeHeader p) `shouldBe` h
+
+waitForPeer :: MonadIO m => Inbox NodeEvent -> m Peer
+waitForPeer inbox =
+ receiveMatch inbox $ \case
+ PeerEvent (PeerConnected p _) -> Just p
+ _ -> Nothing
+
+
+withTestNode ::
+ MonadUnliftIO m
+ => Network
+ -> String
+ -> (TestNode -> m a)
+ -> m a
+withTestNode net str f =
+ runNoLoggingT $
+ withSystemTempDirectory ("haskoin-node-test-" <> str <> "-") $ \w ->
+ withPublisher $ \pub ->
+ withSubscription pub $ \node_inbox -> do
+ db <-
+ R.open
+ w
+ R.defaultOptions
+ { R.createIfMissing = True
+ , R.compression = R.SnappyCompression
+ }
+ let cfg =
+ NodeConfig
+ { nodeConfMaxPeers = 20
+ , nodeConfDB = db
+ , nodeConfPeers = []
+ , nodeConfDiscover = True
+ , nodeConfNetAddr =
+ NetworkAddress 0 (SockAddrInet 0 0)
+ , nodeConfNet = net
+ , nodeConfEvents = pub
+ , nodeConfTimeout = 10
+ }
+ withNode cfg $ \(mgr, ch) ->
+ lift $
+ f
+ TestNode
+ { testMgr = mgr
+ , testChain = ch
+ , nodeEvents = node_inbox
+ }
diff --git a/test/Spec.hs b/test/Spec.hs
index 59ce7c4..a824f8c 100644
--- a/test/Spec.hs
+++ b/test/Spec.hs
@@ -1,219 +1 @@
-{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE LambdaCase #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE RecordWildCards #-}
-import Control.Monad
-import Control.Monad.Logger
-import Control.Monad.Trans
-import qualified Data.ByteString as BS
-import Data.Either
-import Data.Maybe
-import Data.Serialize
-import qualified Database.RocksDB as RocksDB
-import Haskoin
-import Haskoin.Node
-import Network.Socket (SockAddr (..))
-import NQE
-import System.Random
-import Test.Hspec
-import UnliftIO
-
-data TestNode = TestNode
- { testMgr :: Manager
- , testChain :: Chain
- , testEvents :: Inbox NodeEvent
- }
-
-main :: IO ()
-main = do
- let net = btcTest
- hspec . describe "peer-to-peer client" $ do
- it "connects to a peer" $
- withTestNode net "connect-one-peer" $ \TestNode {..} -> do
- p <-
- receiveMatch testEvents $ \case
- ManagerEvent (ManagerConnect p) -> Just p
- _ -> Nothing
- v <-
- fromMaybe (error "No version") <$>
- managerGetPeerVersion p testMgr
- v `shouldSatisfy` (>= 70002)
- bb <-
- fromMaybe (error "No best block") <$>
- managerGetPeerBest p testMgr
- bb `shouldBe` genesisNode net
- it "downloads some blocks" $
- withTestNode net "get-blocks" $ \TestNode {..} -> do
- let hs = [h1, h2]
- h1 =
- "000000000babf10e26f6cba54d9c282983f1d1ce7061f7e875b58f8ca47db932"
- h2 =
- "00000000851f278a8b2c466717184aae859af5b83c6f850666afbc349cf61577"
- p <-
- receiveMatch testEvents $ \case
- ManagerEvent (ManagerConnect p) -> Just p
- _ -> Nothing
- peerGetBlocks net p hs
- b1 <-
- receiveMatch testEvents $ \case
- PeerEvent (p', GotBlock b)
- | p == p' -> Just b
- _ -> Nothing
- b2 <-
- receiveMatch testEvents $ \case
- PeerEvent (p', GotBlock b)
- | p == p' -> Just b
- _ -> Nothing
- headerHash (blockHeader b1) `shouldSatisfy` (`elem` hs)
- headerHash (blockHeader b2) `shouldSatisfy` (`elem` hs)
- let testMerkle b =
- merkleRoot (blockHeader b) `shouldBe`
- buildMerkleRoot (map txHash (blockTxns b))
- testMerkle b1
- testMerkle b2
- it "downloads some merkle blocks" $
- withTestNode net "get-merkle-blocks" $ \TestNode {..} -> do
- let a =
- fromJust $
- stringToAddr net "mgpS4Zis8iwNhriKMro1QSGDAbY6pqzRtA"
- k :: PubKey
- k =
- "02c3cface1777c70251cb206f7c80cabeae195dfbeeff0767cbd2a58d22be383da"
- h1 =
- "000000006cf9d53d65522002a01d8c7091c78d644106832bc3da0b7644f94d36"
- h2 =
- "000000000babf10e26f6cba54d9c282983f1d1ce7061f7e875b58f8ca47db932"
- bhs = [h1, h2]
- n <- randomIO
- let f0 = bloomCreate 2 0.001 n BloomUpdateAll
- f1 = bloomInsert f0 $ exportPubKey True k
- f2 = bloomInsert f1 $ encode $ getAddrHash160 a
- f2 `setManagerFilter` testMgr
- p <-
- receiveMatch testEvents $ \case
- ManagerEvent (ManagerConnect p) -> Just p
- _ -> Nothing
- getMerkleBlocks p bhs
- b1 <-
- receiveMatch testEvents $ \case
- PeerEvent (p', GotMerkleBlock b)
- | p == p' -> Just b
- _ -> Nothing
- b2 <-
- receiveMatch testEvents $ \case
- PeerEvent (p', GotMerkleBlock b)
- | p == p' -> Just b
- _ -> Nothing
- liftIO $ do
- a `shouldBe` pubKeyAddr net (wrapPubKey True k)
- b1 `shouldSatisfy` testMerkleRoot net
- b2 `shouldSatisfy` testMerkleRoot net
- it "connects to multiple peers" $
- withTestNode net "connect-peers" $ \TestNode {..} -> do
- replicateM_ 3 $ do
- pc <- receive testEvents
- case pc of
- ManagerEvent (ManagerDisconnect _) ->
- expectationFailure "Received peer disconnection"
- _ -> return ()
- ps <- managerGetPeers testMgr
- length ps `shouldSatisfy` (>= 2)
- it "connects and syncs some headers" $
- withTestNode net "connect-sync" $ \TestNode {..} -> do
- let h =
- "000000009ec921df4bb16aedd11567e27ede3c0b63835b257475d64a059f102b"
- hs =
- [ "0000000005bdbddb59a3cd33b69db94fa67669c41d9d32751512b5d7b68c71cf"
- , "00000000185b36fa6e406626a722793bea80531515e0b2a99ff05b73738901f1"
- , "000000001ab69b12b73ccdf46c9fbb4489e144b54f1565e42e481c8405077bdd"
- ]
- bns <-
- replicateM 3 . receiveMatch testEvents $ \case
- ChainEvent (ChainNewBest bn) -> Just bn
- _ -> Nothing
- bb <- chainGetBest testChain
- an <-
- fromMaybe (error "No ancestor found") <$>
- chainGetAncestor 2357 (last bns) testChain
- map (headerHash . nodeHeader) bns `shouldBe` hs
- nodeHeight bb `shouldSatisfy` (>= 6000)
- headerHash (nodeHeader an) `shouldBe` h
- it "downloads a single block" $
- withTestNode net "download-block" $ \TestNode {..} -> do
- let h =
- "000000009ec921df4bb16aedd11567e27ede3c0b63835b257475d64a059f102b"
- p <-
- receiveMatch testEvents $ \case
- ManagerEvent (ManagerConnect p) -> Just p
- _ -> Nothing
- peerGetBlocks net p [h]
- b <-
- receiveMatch testEvents $ \case
- PeerEvent (p', GotBlock b)
- | p == p' -> Just b
- _ -> Nothing
- headerHash (blockHeader b) `shouldBe` h
- it "attempts to get inexistent things" $
- withTestNode net "download-fail" $ \TestNode {..} -> do
- let h =
- TxHash .
- fromRight (error "We will, we will rock you!") . decode $
- BS.replicate 32 0xaa
- p <-
- receiveMatch testEvents $ \case
- ManagerEvent (ManagerConnect p) -> Just p
- _ -> Nothing
- peerGetTxs net p [h]
- n <-
- receiveMatch testEvents $ \case
- PeerEvent (p', TxNotFound n)
- | p == p' -> Just n
- _ -> Nothing
- n `shouldBe` h
- it "downloads some block parents" $
- withTestNode net "parents" $ \TestNode {..} -> do
- let hs =
- [ "00000000c74a24e1b1f2c04923c514ed88fc785cf68f52ed0ccffd3c6fe3fbd9"
- , "000000007e5c5f40e495186ac4122f2e4ee25788cc36984a5760c55ecb376cb1"
- , "00000000a6299059b2bff3479bc569019792e75f3c0f39b10a0bc85eac1b1615"
- ]
- bn <-
- receiveMatch testEvents $ \case
- ChainEvent (ChainNewBest bn) -> Just bn
- _ -> Nothing
- nodeHeight bn `shouldBe` 2000
- ps <- chainGetParents 1997 bn testChain
- length ps `shouldBe` 3
- forM_ (zip ps hs) $ \(p, h) ->
- headerHash (nodeHeader p) `shouldBe` h
-
-withTestNode ::
- (MonadUnliftIO m)
- => Network
- -> String
- -> (TestNode -> m ())
- -> m ()
-withTestNode net t f =
- runNoLoggingT . withSystemTempDirectory ("haskoin-node-test-" <> t <> "-") $ \w -> do
- events <- newInbox =<< newTQueueIO
- db <-
- RocksDB.open
- w
- RocksDB.defaultOptions
- { RocksDB.createIfMissing = True
- , RocksDB.compression = RocksDB.SnappyCompression
- }
- let cfg =
- NodeConfig
- { maxPeers = 20
- , database = db
- , initPeers = []
- , discover = True
- , nodeEvents = (`sendSTM` events)
- , netAddress = NetworkAddress 0 (SockAddrInet 0 0)
- , nodeNet = net
- }
- withNode cfg $ \(mgr, ch) ->
- lift $
- f TestNode {testMgr = mgr, testChain = ch, testEvents = events}
+{-# OPTIONS_GHC -F -pgmF hspec-discover #-}