summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxenog <>2018-09-14 11:33:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-09-14 11:33:00 (GMT)
commit800f8153f816ebed430f1e72edc609f5117778f1 (patch)
tree5e49560ee332bf88ad112c00fbe18d59ad3c6a5c
parentd5860e5710f1038afa1d273e1bb18798e2b85f82 (diff)
version 0.6.00.6.0
-rw-r--r--CHANGELOG.md17
-rw-r--r--haskoin-node.cabal11
-rw-r--r--src/Haskoin/Node.hs78
-rw-r--r--src/Network/Haskoin/Node.hs9
-rw-r--r--src/Network/Haskoin/Node/Chain.hs26
-rw-r--r--src/Network/Haskoin/Node/Common.hs185
-rw-r--r--src/Network/Haskoin/Node/Manager.hs58
-rw-r--r--src/Network/Haskoin/Node/Node.hs54
-rw-r--r--src/Network/Haskoin/Node/Peer.hs22
-rw-r--r--test/Spec.hs54
10 files changed, 319 insertions, 195 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
index b5bab58..654cb0c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,23 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+## 0.6.0
+### Added
+- Documentation everywhere.
+
+### Changed
+- Make compatible with NQE 0.5.
+- Use supervisor only in peer manager.
+- API quality of life changes.
+- Exposed module is now only `Haskoin.Node`.
+
+### Removed
+- No more direct access to internals.
+
+## 0.5.2
+### Changed
+- Improve dependency definitions.
+
## 0.5.1
### Changed
- Dependency `sec256k1` changes to `secp256k1-haskell`.
diff --git a/haskoin-node.cabal b/haskoin-node.cabal
index 1346148..7492f3b 100644
--- a/haskoin-node.cabal
+++ b/haskoin-node.cabal
@@ -2,10 +2,10 @@
--
-- see: https://github.com/sol/hpack
--
--- hash: 3bc09f3cbc9492d161ea61ffaaec723ba421b2d4f8b183432371f0d07f61a9dd
+-- hash: 8949422a8017ba0fe881ca85e0a4a86b1c1ed21c32203ad158481607273b8149
name: haskoin-node
-version: 0.5.2
+version: 0.6.0
synopsis: Haskoin Node P2P library for Bitcoin and Bitcoin Cash
description: Bitcoin and Bitcoin Cash peer-to-peer protocol library featuring headers-first synchronisation.
category: Bitcoin, Finance, Network
@@ -27,12 +27,11 @@ source-repository head
library
exposed-modules:
- Network.Haskoin.Node
- Network.Haskoin.Node.Common
+ Haskoin.Node
other-modules:
Network.Haskoin.Node.Chain
+ Network.Haskoin.Node.Common
Network.Haskoin.Node.Manager
- Network.Haskoin.Node.Node
Network.Haskoin.Node.Peer
Paths_haskoin_node
hs-source-dirs:
@@ -43,7 +42,6 @@ library
, cereal
, conduit
, conduit-extra
- , hashable
, haskoin-core
, monad-logger
, mtl
@@ -55,7 +53,6 @@ library
, rocksdb-query
, string-conversions
, time
- , unique
, unliftio
default-language: Haskell2010
diff --git a/src/Haskoin/Node.hs b/src/Haskoin/Node.hs
new file mode 100644
index 0000000..8d150a5
--- /dev/null
+++ b/src/Haskoin/Node.hs
@@ -0,0 +1,78 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+module Haskoin.Node
+ ( Host, Port, HostPort
+ , Peer, Chain, Manager
+ , OnlinePeer(..)
+ , NodeConfig(..)
+ , NodeEvent(..)
+ , ManagerEvent(..)
+ , ChainEvent(..)
+ , PeerEvent(..)
+ , PeerException(..)
+ , withNode
+ , managerGetPeerVersion
+ , managerGetPeerBest
+ , managerGetPeers
+ , managerGetPeer
+ , managerKill
+ , setManagerFilter
+ , sendMessage
+ , getMerkleBlocks
+ , peerGetBlocks
+ , peerGetTxs
+ , chainGetBlock
+ , chainGetBest
+ , chainGetAncestor
+ , chainGetParents
+ , chainGetSplitBlock
+ , chainBlockMain
+ , chainIsSynced
+ , myVersion
+ ) where
+
+import Control.Monad.Logger
+import Network.Haskoin.Node.Chain
+import Network.Haskoin.Node.Common
+import Network.Haskoin.Node.Manager
+import NQE
+import UnliftIO
+
+withNode ::
+ ( MonadLoggerIO m
+ , MonadUnliftIO m
+ )
+ => NodeConfig
+ -> ((Manager, Chain) -> m a)
+ -> m a
+withNode cfg f = do
+ c <- newInbox =<< newTQueueIO
+ m <- newInbox =<< newTQueueIO
+ withAsync (chain (chain_conf c m)) $ \ch ->
+ withAsync (manager (manager_conf c m)) $ \mgr -> do
+ link ch
+ link mgr
+ f (m, c)
+ where
+ chain_conf c m =
+ ChainConfig
+ { chainConfDB = database cfg
+ , chainConfListener = nodeEvents cfg . ChainEvent
+ , chainConfManager = m
+ , chainConfChain = c
+ , chainConfNetwork = nodeNet cfg
+ }
+ manager_conf c m =
+ ManagerConfig
+ { mgrConfMaxPeers = maxPeers cfg
+ , mgrConfDB = database cfg
+ , mgrConfDiscover = discover cfg
+ , mgrConfMgrListener = nodeEvents cfg . ManagerEvent
+ , mgrConfPeerListener = nodeEvents cfg . PeerEvent
+ , mgrConfNetAddr = netAddress cfg
+ , mgrConfPeers = initPeers cfg
+ , mgrConfManager = m
+ , mgrConfChain = c
+ , mgrConfNetwork = nodeNet cfg
+ }
diff --git a/src/Network/Haskoin/Node.hs b/src/Network/Haskoin/Node.hs
deleted file mode 100644
index a5fa8fe..0000000
--- a/src/Network/Haskoin/Node.hs
+++ /dev/null
@@ -1,9 +0,0 @@
-module Network.Haskoin.Node
-( module X
-) where
-
-import Network.Haskoin.Node.Chain as X
-import Network.Haskoin.Node.Common as X
-import Network.Haskoin.Node.Manager as X
-import Network.Haskoin.Node.Node as X
-import Network.Haskoin.Node.Peer as X
diff --git a/src/Network/Haskoin/Node/Chain.hs b/src/Network/Haskoin/Node/Chain.hs
index 26e82b9..e5c9050 100644
--- a/src/Network/Haskoin/Node/Chain.hs
+++ b/src/Network/Haskoin/Node/Chain.hs
@@ -13,7 +13,6 @@ module Network.Haskoin.Node.Chain
( chain
) where
-import Control.Concurrent.NQE
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
@@ -29,6 +28,7 @@ import Database.RocksDB.Query as R
import Network.Haskoin.Block
import Network.Haskoin.Network
import Network.Haskoin.Node.Common
+import NQE
import UnliftIO
type MonadChain m
@@ -236,23 +236,13 @@ processSyncQueue = do
where
go s bb =
case newPeers s of
- [] -> do
- t <- computeTime
- let h2 = t - 2 * 60 * 60
- tg = blockTimestamp (nodeHeader bb) > h2
- if tg
- then unless (mySynced s) $ do
- l <- chainConfListener <$> asks myConfig
- st <- asks chainState
- atomically $ do
- l (ChainSynced bb)
- writeTVar st s {mySynced = True}
- else do
- l <- chainConfListener <$> asks myConfig
- st <- asks chainState
- atomically $ do
- l (ChainNotSynced bb)
- writeTVar st s {mySynced = False}
+ [] ->
+ unless (mySynced s) $ do
+ l <- chainConfListener <$> asks myConfig
+ st <- asks chainState
+ atomically $ do
+ l (ChainSynced bb)
+ writeTVar st s {mySynced = True}
p:_ -> syncHeaders bb p
syncHeaders :: MonadChain m => BlockNode -> Peer -> m ()
diff --git a/src/Network/Haskoin/Node/Common.hs b/src/Network/Haskoin/Node/Common.hs
index b0f11d6..64c185f 100644
--- a/src/Network/Haskoin/Node/Common.hs
+++ b/src/Network/Haskoin/Node/Common.hs
@@ -4,10 +4,7 @@
{-# LANGUAGE RankNTypes #-}
module Network.Haskoin.Node.Common where
-import Control.Concurrent.NQE
-import Control.Concurrent.Unique
import Data.ByteString (ByteString)
-import Data.Hashable
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Word
@@ -21,6 +18,7 @@ import Network.Socket (AddrInfo (..), AddrInfoFlag (..),
SockAddr (..), SocketType (..),
addrAddress, defaultHints,
getAddrInfo, getNameInfo)
+import NQE
import Text.Read
import UnliftIO
@@ -28,183 +26,273 @@ type HostPort = (Host, Port)
type Host = String
type Port = Int
+-- | Data structure representing an online peer.
data OnlinePeer = OnlinePeer
{ onlinePeerAddress :: !SockAddr
+ -- ^ network address
, onlinePeerConnected :: !Bool
+ -- ^ has it finished handshake
, onlinePeerVersion :: !Word32
+ -- ^ protocol version
, onlinePeerServices :: !Word64
+ -- ^ services field
, onlinePeerRemoteNonce :: !Word64
+ -- ^ random nonce sent by peer
, onlinePeerUserAgent :: !ByteString
+ -- ^ user agent string
, onlinePeerRelay :: !Bool
+ -- ^ peer will relay transactions (BIP-37)
, onlinePeerBestBlock :: !BlockNode
+ -- ^ estimated best block that peer has
, onlinePeerAsync :: !(Async ())
+ -- ^ peer asynchronous process
, onlinePeerMailbox :: !Peer
+ -- ^ peer mailbox
, onlinePeerNonce :: !Word64
+ -- ^ random nonce sent during handshake
, onlinePeerPings :: ![NominalDiffTime]
+ -- ^ last few ping rountrip duration
}
-data UniqueInbox a = UniqueInbox
- { uniqueInbox :: Inbox a
- , uniqueId :: Unique
- }
-
-type PeerSupervisor m = Inbox (SupervisorMessage m)
-type NodeSupervisor m = Inbox (SupervisorMessage m)
+-- | Mailbox for a peer process.
+type Peer = Inbox PeerMessage
-type Peer = UniqueInbox PeerMessage
+-- | Mailbox for chain headers process.
type Chain = Inbox ChainMessage
-type Manager = Inbox ManagerMessage
-instance Eq (UniqueInbox a) where
- UniqueInbox {uniqueId = a} == UniqueInbox {uniqueId = b} = a == b
-
-instance Hashable (UniqueInbox a) where
- hashWithSalt n UniqueInbox {uniqueId = i} = hashWithSalt n i
- hash UniqueInbox {uniqueId = i} = hash i
-
-instance Mailbox UniqueInbox where
- mailboxEmptySTM UniqueInbox {uniqueInbox = mbox} = mailboxEmptySTM mbox
- sendSTM msg UniqueInbox {uniqueInbox = mbox} = msg `sendSTM` mbox
- receiveSTM UniqueInbox {uniqueInbox = mbox} = receiveSTM mbox
- requeueMsg msg UniqueInbox {uniqueInbox = mbox} = msg `requeueMsg` mbox
+-- | Mailbox for peer manager process.
+type Manager = Inbox ManagerMessage
-data NodeConfig m = NodeConfig
+-- | Node configuration. Mailboxes for manager and chain processes must be
+-- created before launching the node. The node will start those processes and
+-- receive any messages sent to those mailboxes.
+data NodeConfig = NodeConfig
{ maxPeers :: !Int
+ -- ^ maximum number of connected peers allowed
, database :: !DB
+ -- ^ RocksDB database handler
, initPeers :: ![HostPort]
+ -- ^ static list of peers to connect to
, discover :: !Bool
+ -- ^ activate peer discovery
, nodeEvents :: !(Listen NodeEvent)
+ -- ^ listener for events originated by the node
, netAddress :: !NetworkAddress
- , nodeSupervisor :: !(NodeSupervisor m)
- , nodeChain :: !Chain
- , nodeManager :: !Manager
+ -- ^ network address for the local host
, nodeNet :: !Network
+ -- ^ network constants
}
-data ManagerConfig m = ManagerConfig
+-- | Peer manager configuration. Mailbox must be created before starting the
+-- process.
+data ManagerConfig = ManagerConfig
{ mgrConfMaxPeers :: !Int
+ -- ^ maximum number of peers to connect to
, mgrConfDB :: !DB
+ -- ^ RocksDB database handler to store peer information
, mgrConfPeers :: ![HostPort]
+ -- ^ static list of peers to connect to
, mgrConfDiscover :: !Bool
+ -- ^ activate peer discovery
, mgrConfMgrListener :: !(Listen ManagerEvent)
+ -- ^ listener for events originating from peer manager
, mgrConfPeerListener :: !(Listen (Peer, PeerEvent))
+ -- ^ listener for events originating from individual peers
, mgrConfNetAddr :: !NetworkAddress
+ -- ^ network address for the local host
, mgrConfManager :: !Manager
+ -- ^ peer manager mailbox
, mgrConfChain :: !Chain
- , mgrConfPeerSupervisor :: !(PeerSupervisor m)
+ -- ^ chain process mailbox
, mgrConfNetwork :: !Network
+ -- ^ network constants
}
+-- | Event originating from the node. Aggregates events from the peer manager,
+-- chain, and any connected peers.
data NodeEvent
= ManagerEvent !ManagerEvent
+ -- ^ event originating from peer manager
| ChainEvent !ChainEvent
+ -- ^ event originating from chain process
| PeerEvent !(Peer, PeerEvent)
+ -- ^ event originating from a peer
+-- | Peer manager event.
data ManagerEvent
= ManagerConnect !Peer
+ -- ^ a new peer connected and its handshake completed
| ManagerDisconnect !Peer
+ -- ^ a peer disconnected
+-- | Messages that can be sent to the peer manager.
data ManagerMessage
= ManagerSetFilter !BloomFilter
+ -- ^ set a bloom filter in all peers
| ManagerSetBest !BlockNode
+ -- ^ set our best block
| ManagerPing
+ -- ^ internal timer signal that triggers housekeeping tasks
| ManagerGetAddr !Peer
+ -- ^ peer requests all peers we know about
| ManagerNewPeers !Peer
![NetworkAddressTime]
+ -- ^ peer sent list of peers it knows about
| ManagerKill !PeerException
!Peer
+ -- ^ please kill this peer with supplied exception
| ManagerSetPeerBest !Peer
!BlockNode
+ -- ^ set best block for this peer
| ManagerGetPeerBest !Peer
!(Reply (Maybe BlockNode))
+ -- ^ get best block that manager thinks peer has
| ManagerSetPeerVersion !Peer
!Version
+ -- ^ set version for this peer
| ManagerGetPeerVersion !Peer
!(Reply (Maybe Word32))
+ -- ^ get protocol version for this peer
| ManagerGetPeers !(Reply [OnlinePeer])
+ -- ^ get all connected peers
| ManagerGetOnlinePeer !Peer !(Reply (Maybe OnlinePeer))
+ -- ^ get a peer information
| ManagerPeerPing !Peer
!NominalDiffTime
+ -- ^ add a peer roundtrip time for this peer
| PeerStopped !(Async (), Either SomeException ())
+ -- ^ peer corresponding to 'Async' has stopped
+-- | Configuration for the chain process.
data ChainConfig = ChainConfig
{ chainConfDB :: !DB
+ -- ^ RocksDB database handle
, chainConfListener :: !(Listen ChainEvent)
+ -- ^ listener for events originating from the chain process
, chainConfManager :: !Manager
+ -- ^ peer manager mailbox
, chainConfChain :: !Chain
+ -- ^ chain process mailbox
, chainConfNetwork :: !Network
+ -- ^ network constants
}
+-- | Messages that can be sent to the chain process.
data ChainMessage
= ChainNewHeaders !Peer
![BlockHeaderCount]
+ -- ^ peer sent some block headers
| ChainNewPeer !Peer
+ -- ^ a new peer connected
| ChainRemovePeer !Peer
- | ChainGetBest !(BlockNode -> STM ())
+ -- ^ a peer disconnected
+ | ChainGetBest !(Reply BlockNode)
+ -- ^ get best block known
| ChainGetAncestor !BlockHeight
!BlockNode
!(Reply (Maybe BlockNode))
+ -- ^ get ancestor for 'BlockNode' at 'BlockHeight'
| ChainGetSplit !BlockNode
!BlockNode
!(Reply BlockNode)
+ -- ^ get highest common node
| ChainGetBlock !BlockHash
!(Reply (Maybe BlockNode))
+ -- ^ get a block header
| ChainNewBlocks !Peer ![BlockHash]
+ -- ^ peer sent block inventory
| ChainSendHeaders !Peer
+ -- ^ peer asks for our block headers in the future
| ChainIsSynced !(Reply Bool)
+ -- ^ is chain in sync with network?
+-- | Events originating from chain process.
data ChainEvent
= ChainNewBest !BlockNode
+ -- ^ chain has new best block
| ChainSynced !BlockNode
- | ChainNotSynced !BlockNode
+ -- ^ chain is in sync with the network
deriving (Eq, Show)
+-- | Configuration for a particular peer.
data PeerConfig = PeerConfig
{ peerConfConnect :: !NetworkAddress
- , peerConfInitBest :: !BlockNode
+ -- ^ address of remote peer
, peerConfLocal :: !NetworkAddress
+ -- ^ our address to send to peer
, peerConfManager :: !Manager
+ -- ^ peer manager mailbox
, peerConfChain :: !Chain
+ -- ^ chain process mailbox
, peerConfListener :: !(Listen (Peer, PeerEvent))
+ -- ^ listener for peer events
, peerConfNonce :: !Word64
+ -- ^ our random nonce to send to peer
, peerConfNetwork :: !Network
+ -- ^ network constants
}
+-- | Reasons why a peer may stop working.
data PeerException
= PeerMisbehaving !String
+ -- ^ peer was a naughty boy
| DecodeMessageError !String
+ -- ^ incoming message could not be decoded
| CannotDecodePayload !String
- | MessageHeaderEmpty
+ -- ^ incoming message payload could not be decoded
| PeerIsMyself
+ -- ^ nonce for peer matches ours
| PayloadTooLarge !Word32
+ -- ^ message payload too large
| PeerAddressInvalid
+ -- ^ peer address did not parse with 'fromSockAddr'
| BloomFiltersNotSupported
+ -- ^ peer does not support bloom filters
| PeerSentBadHeaders
+ -- ^ peer sent wrong headers
| NotNetworkPeer
+ -- ^ peer is SPV and cannot serve blockchain data
| PeerNoSegWit
+ -- ^ peer has no segwit support
| PeerTimeout
+ -- ^ request to peer timed out
deriving (Eq, Show)
instance Exception PeerException
+-- | Events originating from a peer.
data PeerEvent
= TxAvail ![TxHash]
+ -- ^ peer sent transaction inventory
| GotBlock !Block
+ -- ^ peer sent a 'Block'
| GotMerkleBlock !MerkleBlock
+ -- ^ peer sent a 'MerkleBlock'
| GotTx !Tx
+ -- ^ peer sent a 'Tx'
| GotPong !Word64
+ -- ^ peer responded to a 'Ping'
| SendBlocks !GetBlocks
+ -- ^ peer is requesting some blocks
| SendHeaders !GetHeaders
+ -- ^ peer is requesting some headers
| SendData ![InvVector]
+ -- ^ per is requesting an inventory
| TxNotFound !TxHash
+ -- ^ peer could not find transaction
| BlockNotFound !BlockHash
+ -- ^ peer could not find block
| WantMempool
+ -- ^ peer wants our mempool
| Rejected !Reject
+ -- ^ peer rejected something we sent
+-- | Internal type for peer messages.
data PeerMessage
= PeerOutgoing !Message
| PeerIncoming !Message
+-- | Convert a host and port into a list of matching 'SockAddr'.
toSockAddr :: (MonadUnliftIO m) => HostPort -> m [SockAddr]
toSockAddr (host, port) = go `catch` e
where
@@ -222,6 +310,7 @@ toSockAddr (host, port) = go `catch` e
e :: Monad m => SomeException -> m [SockAddr]
e _ = return []
+-- | Convert a 'SockAddr' into a host and port.
fromSockAddr ::
(MonadUnliftIO m) => SockAddr -> m (Maybe HostPort)
fromSockAddr sa = go `catch` e
@@ -233,52 +322,68 @@ fromSockAddr sa = go `catch` e
e :: Monad m => SomeException -> m (Maybe a)
e _ = return Nothing
+-- | Integer current time in seconds from 1970-01-01T00:00Z.
computeTime :: MonadIO m => m Word32
computeTime = round <$> liftIO getPOSIXTime
+-- | Our protocol version.
myVersion :: Word32
myVersion = 70012
+-- | Set best block in the manager.
managerSetBest :: MonadIO m => BlockNode -> Manager -> m ()
managerSetBest bn mgr = ManagerSetBest bn `send` mgr
+-- | Set version of peer in manager.
managerSetPeerVersion :: MonadIO m => Peer -> Version -> Manager -> m ()
managerSetPeerVersion p v mgr = ManagerSetPeerVersion p v `send` mgr
+-- | Get version of peer from manager.
managerGetPeerVersion :: MonadIO m => Peer -> Manager -> m (Maybe Word32)
managerGetPeerVersion p mgr = ManagerGetPeerVersion p `query` mgr
+-- | Get best block for peer from manager.
managerGetPeerBest :: MonadIO m => Peer -> Manager -> m (Maybe BlockNode)
managerGetPeerBest p mgr = ManagerGetPeerBest p `query` mgr
+-- | Set best block for peer in manager.
managerSetPeerBest :: MonadIO m => Peer -> BlockNode -> Manager -> m ()
managerSetPeerBest p bn mgr = ManagerSetPeerBest p bn `send` mgr
+-- | Get list of peers from manager.
managerGetPeers :: MonadIO m => Manager -> m [OnlinePeer]
managerGetPeers mgr = ManagerGetPeers `query` mgr
+-- | Get peer information for a peer from manager.
managerGetPeer :: MonadIO m => Manager -> Peer -> m (Maybe OnlinePeer)
managerGetPeer mgr p = ManagerGetOnlinePeer p `query` mgr
+-- | Ask manager to send all known peers to a peer.
managerGetAddr :: MonadIO m => Peer -> Manager -> m ()
managerGetAddr p mgr = ManagerGetAddr p `send` mgr
+-- | Ask manager to kill a peer with the provided exception.
managerKill :: MonadIO m => PeerException -> Peer -> Manager -> m ()
managerKill e p mgr = ManagerKill e p `send` mgr
+-- | Peer sends manager list of known peers.
managerNewPeers ::
MonadIO m => Peer -> [NetworkAddressTime] -> Manager -> m ()
managerNewPeers p as mgr = ManagerNewPeers p as `send` mgr
+-- | Set bloom filters in peer manager.
setManagerFilter :: MonadIO m => BloomFilter -> Manager -> m ()
setManagerFilter bf mgr = ManagerSetFilter bf `send` mgr
+-- | Send a network message to peer.
sendMessage :: MonadIO m => Message -> Peer -> m ()
sendMessage msg p = PeerOutgoing msg `send` p
+-- | Upload bloom filter to remote peer.
peerSetFilter :: MonadIO m => BloomFilter -> Peer -> m ()
peerSetFilter f p = MFilterLoad (FilterLoad f) `sendMessage` p
+-- | Request Merkle blocks from peer.
getMerkleBlocks ::
(MonadIO m)
=> Peer
@@ -288,6 +393,7 @@ getMerkleBlocks p bhs = PeerOutgoing (MGetData (GetData ivs)) `send` p
where
ivs = map (InvVector InvMerkleBlock . getBlockHash) bhs
+-- | Request full blocks from peer.
peerGetBlocks ::
MonadIO m => Network -> Peer -> [BlockHash] -> m ()
peerGetBlocks net p bhs = PeerOutgoing (MGetData (GetData ivs)) `send` p
@@ -297,6 +403,7 @@ peerGetBlocks net p bhs = PeerOutgoing (MGetData (GetData ivs)) `send` p
| otherwise = InvBlock
ivs = map (InvVector con . getBlockHash) bhs
+-- | Request transactions from peer.
peerGetTxs :: MonadIO m => Network -> Peer -> [TxHash] -> m ()
peerGetTxs net p ths = PeerOutgoing (MGetData (GetData ivs)) `send` p
where
@@ -305,6 +412,7 @@ peerGetTxs net p ths = PeerOutgoing (MGetData (GetData ivs)) `send` p
| otherwise = InvTx
ivs = map (InvVector con . getTxHash) ths
+-- | Build my version structure.
buildVersion ::
MonadIO m
=> Network
@@ -328,22 +436,28 @@ buildVersion net nonce height loc rmt = do
, relay = True
}
+-- | Notify chain of a new peer that connected.
chainNewPeer :: MonadIO m => Peer -> Chain -> m ()
chainNewPeer p ch = ChainNewPeer p `send` ch
+-- | Notify chain that a peer has disconnected.
chainRemovePeer :: MonadIO m => Peer -> Chain -> m ()
chainRemovePeer p ch = ChainRemovePeer p `send` ch
+-- | Get a block header from chain process.
chainGetBlock :: MonadIO m => BlockHash -> Chain -> m (Maybe BlockNode)
chainGetBlock bh ch = ChainGetBlock bh `query` ch
+-- | Get best block header from chain process.
chainGetBest :: MonadIO m => Chain -> m BlockNode
chainGetBest ch = ChainGetBest `query` ch
+-- | Get ancestor of 'BlockNode' at 'BlockHeight' from chain process.
chainGetAncestor ::
MonadIO m => BlockHeight -> BlockNode -> Chain -> m (Maybe BlockNode)
chainGetAncestor h n c = ChainGetAncestor h n `query` c
+-- | Get parents of 'BlockNode' starting at 'BlockHeight' from chain process.
chainGetParents ::
MonadIO m => BlockHeight -> BlockNode -> Chain -> m [BlockNode]
chainGetParents height top ch = go [] top
@@ -356,10 +470,12 @@ chainGetParents height top ch = go [] top
Nothing -> return acc
Just p -> go (p : acc) p
+-- | Get last common block from chain process.
chainGetSplitBlock ::
MonadIO m => BlockNode -> BlockNode -> Chain -> m BlockNode
chainGetSplitBlock l r c = ChainGetSplit l r `query` c
+-- | Is given 'BlockHash' in the main chain?
chainBlockMain :: MonadIO m => BlockHash -> Chain -> m Bool
chainBlockMain bh ch =
chainGetBest ch >>= \bb ->
@@ -367,5 +483,6 @@ chainBlockMain bh ch =
Nothing -> return False
bm@(Just bn) -> (== bm) <$> chainGetAncestor (nodeHeight bn) bb ch
+-- | Is chain in sync with network?
chainIsSynced :: MonadIO m => Chain -> m Bool
chainIsSynced ch = ChainIsSynced `query` ch
diff --git a/src/Network/Haskoin/Node/Manager.hs b/src/Network/Haskoin/Node/Manager.hs
index f5bde57..d580a22 100644
--- a/src/Network/Haskoin/Node/Manager.hs
+++ b/src/Network/Haskoin/Node/Manager.hs
@@ -11,8 +11,6 @@ module Network.Haskoin.Node.Manager
) where
import Control.Applicative
-import Control.Concurrent.NQE
-import Control.Concurrent.Unique
import Control.Monad
import Control.Monad.Except
import Control.Monad.Logger
@@ -37,6 +35,7 @@ import Network.Haskoin.Network
import Network.Haskoin.Node.Common
import Network.Haskoin.Node.Peer
import Network.Socket (SockAddr (..))
+import NQE
import System.Random
import UnliftIO
import UnliftIO.Concurrent
@@ -49,12 +48,12 @@ type MonadManager n m
, MonadLoggerIO m
, MonadReader (ManagerReader n) m)
-data ManagerReader n = ManagerReader
+data ManagerReader m = ManagerReader
{ mySelf :: !Manager
, myChain :: !Chain
- , myConfig :: !(ManagerConfig n)
+ , myConfig :: !ManagerConfig
, myPeerDB :: !DB
- , myPeerSupervisor :: !(PeerSupervisor n)
+ , myPeerSupervisor :: !(Inbox (SupervisorMessage m))
, onlinePeers :: !(TVar [OnlinePeer])
, myBloomFilter :: !(TVar (Maybe BloomFilter))
, myBestBlock :: !(TVar BlockNode)
@@ -129,26 +128,30 @@ instance Serialize PeerTimeAddress where
S.put getPeerTimeAddress
put PeerTimeAddressBase = S.putWord8 0x80
-manager :: (MonadUnliftIO m, MonadLoggerIO m) => ManagerConfig m -> m ()
+manager :: (MonadUnliftIO m, MonadLoggerIO m) => ManagerConfig -> m ()
manager cfg = do
- bb <- chainGetBest $ mgrConfChain cfg
- opb <- newTVarIO []
- bfb <- newTVarIO Nothing
- bbb <- newTVarIO bb
- withConnectLoop (mgrConfManager cfg) $ do
- let rd =
- ManagerReader
- { mySelf = mgrConfManager cfg
- , myChain = mgrConfChain cfg
- , myConfig = cfg
- , myPeerDB = mgrConfDB cfg
- , myPeerSupervisor = mgrConfPeerSupervisor cfg
- , onlinePeers = opb
- , myBloomFilter = bfb
- , myBestBlock = bbb
- }
- run `runReaderT` rd
+ psup <- newInbox =<< newTQueueIO
+ withAsync (supervisor (Notify dead) psup []) $ \sup -> do
+ link sup
+ bb <- chainGetBest $ mgrConfChain cfg
+ opb <- newTVarIO []
+ bfb <- newTVarIO Nothing
+ bbb <- newTVarIO bb
+ withConnectLoop (mgrConfManager cfg) $ do
+ let rd =
+ ManagerReader
+ { mySelf = mgrConfManager cfg
+ , myChain = mgrConfChain cfg
+ , myConfig = cfg
+ , myPeerDB = mgrConfDB cfg
+ , myPeerSupervisor = psup
+ , onlinePeers = opb
+ , myBloomFilter = bfb
+ , myBestBlock = bbb
+ }
+ run `runReaderT` rd
where
+ dead ex = PeerStopped ex `sendSTM` mgrConfManager cfg
run = do
connectNewPeers
managerLoop
@@ -335,6 +338,7 @@ processManagerMessage ManagerPing = connectNewPeers
processManagerMessage (ManagerGetAddr p) = do
pn <- peerString p
+ -- TODO: send list of peers we know about
$(logWarnS) "Manager" $ "Ignoring address request from peer " <> fromString pn
processManagerMessage (ManagerNewPeers p as) =
@@ -403,7 +407,7 @@ processManagerMessage (ManagerSetPeerVersion p v) =
bf <- readTVarIO bfb
case bf of
Nothing -> return ()
- Just b -> b `peerSetFilter` p
+ Just b -> b `peerSetFilter` p
askForPeers =
mgrConfDiscover <$> asks myConfig >>= \discover ->
when discover (MGetAddr `sendMessage` p)
@@ -487,13 +491,10 @@ connectNewPeers = do
pl <- mgrConfPeerListener <$> asks myConfig
net <- mgrConfNetwork <$> asks myConfig
$(logInfoS) "Manager" $ "Connecting to peer " <> cs (show sa)
- bbb <- asks myBestBlock
- bb <- readTVarIO bbb
nonce <- liftIO randomIO
let pc =
PeerConfig
{ peerConfConnect = NetworkAddress (srv net) sa
- , peerConfInitBest = bb
, peerConfLocal = ad
, peerConfManager = mgr
, peerConfChain = ch
@@ -503,8 +504,7 @@ connectNewPeers = do
}
psup <- asks myPeerSupervisor
pmbox <- newTBQueueIO 100
- uid <- liftIO newUnique
- let p = UniqueInbox {uniqueInbox = Inbox pmbox, uniqueId = uid}
+ p <- newInbox pmbox
a <- psup `addChild` peer pc p
newPeerConnection net sa nonce p a
srv net
diff --git a/src/Network/Haskoin/Node/Node.hs b/src/Network/Haskoin/Node/Node.hs
deleted file mode 100644
index cb6da4b..0000000
--- a/src/Network/Haskoin/Node/Node.hs
+++ /dev/null
@@ -1,54 +0,0 @@
-{-# LANGUAGE FlexibleContexts #-}
-{-# LANGUAGE GADTs #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE TemplateHaskell #-}
-module Network.Haskoin.Node.Node
- ( node
- ) where
-
-import Control.Concurrent.NQE
-import Control.Monad.Logger
-import Network.Haskoin.Node.Chain
-import Network.Haskoin.Node.Common
-import Network.Haskoin.Node.Manager
-import UnliftIO
-
-node ::
- ( MonadLoggerIO m
- , MonadUnliftIO m
- )
- => NodeConfig m
- -> m ()
-node cfg = do
- psup <- Inbox <$> newTQueueIO
- $(logInfoS) "Node" "Starting..."
- supervisor
- KillAll
- (nodeSupervisor cfg)
- [chain chCfg, manager (mgrCfg psup), peerSup psup]
- where
- peerSup psup = supervisor (Notify deadPeer) psup []
- chCfg =
- ChainConfig
- { chainConfDB = database cfg
- , chainConfListener = nodeEvents cfg . ChainEvent
- , chainConfManager = nodeManager cfg
- , chainConfChain = nodeChain cfg
- , chainConfNetwork = nodeNet cfg
- }
- mgrCfg psup =
- ManagerConfig
- { mgrConfMaxPeers = maxPeers cfg
- , mgrConfDB = database cfg
- , mgrConfDiscover = discover cfg
- , mgrConfMgrListener = nodeEvents cfg . ManagerEvent
- , mgrConfPeerListener = nodeEvents cfg . PeerEvent
- , mgrConfNetAddr = netAddress cfg
- , mgrConfPeers = initPeers cfg
- , mgrConfManager = nodeManager cfg
- , mgrConfChain = nodeChain cfg
- , mgrConfPeerSupervisor = psup
- , mgrConfNetwork = nodeNet cfg
- }
- deadPeer ex = PeerStopped ex `sendSTM` nodeManager cfg
diff --git a/src/Network/Haskoin/Node/Peer.hs b/src/Network/Haskoin/Node/Peer.hs
index db2a4f8..a0ef82b 100644
--- a/src/Network/Haskoin/Node/Peer.hs
+++ b/src/Network/Haskoin/Node/Peer.hs
@@ -10,7 +10,6 @@ module Network.Haskoin.Node.Peer
( peer
) where
-import Control.Concurrent.NQE
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
@@ -34,6 +33,7 @@ import Network.Haskoin.Network
import Network.Haskoin.Node.Common
import Network.Haskoin.Transaction
import Network.Socket (SockAddr)
+import NQE
import System.Random
import UnliftIO
@@ -78,18 +78,21 @@ peer pc p =
go = handshake >> exchangePing >> peerLoop
net = peerConfNetwork pc
peerSession hp ad = do
- let src = appSource ad .| inPeerConduit net
+ let src =
+ runConduit $
+ appSource ad .| inPeerConduit net .| conduitMailbox p
snk = outPeerConduit net .| appSink ad
- withSource src p . const $ do
+ withAsync src $ \as -> do
+ link as
pbox <- newTVarIO []
let rd =
PeerReader
- { myConfig = pc
- , mySelf = p
- , myHostPort = hp
- , mySockAddr = na
- , myPending = pbox
- }
+ { myConfig = pc
+ , mySelf = p
+ , myHostPort = hp
+ , mySockAddr = na
+ , myPending = pbox
+ }
runReaderT (runConduit (go .| snk)) rd
handshake :: MonadPeer m => ConduitT () Message m ()
@@ -324,7 +327,6 @@ incoming m = do
inPeerConduit :: MonadIO m => Network -> ConduitT ByteString PeerMessage m ()
inPeerConduit net = do
headerBytes <- CB.take 24
- when (BL.null headerBytes) $ throwIO MessageHeaderEmpty
case decodeLazy headerBytes of
Left e -> throwIO $ DecodeMessageError e
Right (MessageHeader _ _cmd len _) -> do
diff --git a/test/Spec.hs b/test/Spec.hs
index ab94c4b..59ce7c4 100644
--- a/test/Spec.hs
+++ b/test/Spec.hs
@@ -3,23 +3,18 @@
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
-import Control.Concurrent.NQE
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Trans
-import qualified Data.ByteString as BS
+import qualified Data.ByteString as BS
import Data.Either
import Data.Maybe
import Data.Serialize
-import qualified Database.RocksDB as RocksDB
-import Network.Haskoin.Address
-import Network.Haskoin.Block
-import Network.Haskoin.Constants
-import Network.Haskoin.Keys
-import Network.Haskoin.Network
-import Network.Haskoin.Node
-import Network.Haskoin.Transaction
-import Network.Socket (SockAddr (..))
+import qualified Database.RocksDB as RocksDB
+import Haskoin
+import Haskoin.Node
+import Network.Socket (SockAddr (..))
+import NQE
import System.Random
import Test.Hspec
import UnliftIO
@@ -201,33 +196,24 @@ withTestNode ::
-> m ()
withTestNode net t f =
runNoLoggingT . withSystemTempDirectory ("haskoin-node-test-" <> t <> "-") $ \w -> do
- events <- Inbox <$> liftIO newTQueueIO
- ch <- Inbox <$> liftIO newTQueueIO
- ns <- Inbox <$> liftIO newTQueueIO
- mgr <- Inbox <$> liftIO newTQueueIO
+ events <- newInbox =<< newTQueueIO
db <-
RocksDB.open
w
RocksDB.defaultOptions
- { RocksDB.createIfMissing = True
- , RocksDB.compression = RocksDB.SnappyCompression
- }
+ { RocksDB.createIfMissing = True
+ , RocksDB.compression = RocksDB.SnappyCompression
+ }
let cfg =
NodeConfig
- { maxPeers = 20
- , database = db
- , initPeers = []
- , discover = True
- , nodeEvents = (`sendSTM` events)
- , netAddress = NetworkAddress 0 (SockAddrInet 0 0)
- , nodeSupervisor = ns
- , nodeChain = ch
- , nodeManager = mgr
- , nodeNet = net
- }
- withAsync (node cfg) $ \nd -> do
- link nd
+ { maxPeers = 20
+ , database = db
+ , initPeers = []
+ , discover = True
+ , nodeEvents = (`sendSTM` events)
+ , netAddress = NetworkAddress 0 (SockAddrInet 0 0)
+ , nodeNet = net
+ }
+ withNode cfg $ \(mgr, ch) ->
lift $
- f TestNode {testMgr = mgr, testChain = ch, testEvents = events}
- stopSupervisor ns
- wait nd
+ f TestNode {testMgr = mgr, testChain = ch, testEvents = events}