summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxenog <>2018-09-09 21:27:00 (GMT)
committerhdiff <hdiff@hdiff.luite.com>2018-09-09 21:27:00 (GMT)
commit506131a45496f6ed8a628f95aea18860e0ca93c9 (patch)
tree2efd540b8ce426ca32a60d144f7685f195c8541c
parentb8555cf91d969104a2cdcc97cde92939e6d2d53f (diff)
version 0.5.00.5.0
-rw-r--r--CHANGELOG.md23
-rw-r--r--Network/Haskoin/Node/BlockChain.hs713
-rw-r--r--Network/Haskoin/Node/Checkpoints.hs28
-rw-r--r--Network/Haskoin/Node/HeaderTree.hs656
-rw-r--r--Network/Haskoin/Node/HeaderTree/Model.hs27
-rw-r--r--Network/Haskoin/Node/HeaderTree/Types.hs30
-rw-r--r--Network/Haskoin/Node/Peer.hs720
-rw-r--r--Network/Haskoin/Node/STM.hs412
-rw-r--r--README.md4
-rw-r--r--haskoin-node.cabal186
-rw-r--r--src/Network/Haskoin/Node.hs9
-rw-r--r--src/Network/Haskoin/Node/Chain.hs281
-rw-r--r--src/Network/Haskoin/Node/Common.hs371
-rw-r--r--src/Network/Haskoin/Node/Manager.hs594
-rw-r--r--src/Network/Haskoin/Node/Node.hs54
-rw-r--r--src/Network/Haskoin/Node/Peer.hs339
-rw-r--r--test/Spec.hs233
-rw-r--r--tests/Main.hs13
-rw-r--r--tests/Network/Haskoin/Node/Tests.hs12
-rw-r--r--tests/Network/Haskoin/Node/Units.hs252
20 files changed, 1990 insertions, 2967 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..1158595
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,23 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
+and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
+
+## 0.5.0
+### Added
+- New `CHANGELOG.md` file.
+- Use `nqe` for concurrency.
+- Peer discovery.
+- RocksDB peer and block header storage.
+- Support for Merkle blocks.
+
+### Changed
+- Split out of former `haskoin` repository.
+- Use hpack and `package.yaml`.
+- Old `haskoin-node` package now renamed to `old-haskoin-node` and deprecated.
+
+### Removed
+- Removed Old Haskoin Node package completely.
+- Removed Stylish Haskell configuration file.
+- Remvoed `haskoin-core` and `haskoin-wallet` packages from this repository.
diff --git a/Network/Haskoin/Node/BlockChain.hs b/Network/Haskoin/Node/BlockChain.hs
deleted file mode 100644
index eaca0d0..0000000
--- a/Network/Haskoin/Node/BlockChain.hs
+++ /dev/null
@@ -1,713 +0,0 @@
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE TemplateHaskell #-}
-module Network.Haskoin.Node.BlockChain where
-
-import Control.Concurrent (threadDelay)
-import Control.Concurrent.Async.Lifted (link, mapConcurrently,
- waitAnyCancel, withAsync)
-import Control.Concurrent.STM (STM, atomically, isEmptyTMVar,
- putTMVar, readTVar, retry,
- takeTMVar, tryReadTMVar,
- tryTakeTMVar)
-import Control.Concurrent.STM.Lock (locked)
-import qualified Control.Concurrent.STM.Lock as Lock (with)
-import Control.Concurrent.STM.TBMChan (isEmptyTBMChan, readTBMChan)
-import Control.Exception.Lifted (throw)
-import Control.Monad (forM, forM_, forever, unless,
- void, when)
-import Control.Monad.Logger (MonadLoggerIO, logDebug,
- logError, logInfo, logWarn)
-import Control.Monad.Reader (ask, asks)
-import Control.Monad.Trans (MonadIO, lift, liftIO)
-import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_)
-import qualified Data.ByteString.Char8 as C (unpack)
-import Data.Conduit (Source, yield)
-import Data.List (nub)
-import qualified Data.Map as M (delete, keys, lookup,
- null)
-import Data.Maybe (listToMaybe)
-import qualified Data.Sequence as S (length)
-import Data.String.Conversions (cs)
-import Data.Text (pack)
-import Data.Time.Clock.POSIX (getPOSIXTime)
-import Data.Unique (hashUnique)
-import Data.Word (Word32)
-import Network.Haskoin.Block
-import Network.Haskoin.Node
-import Network.Haskoin.Node.HeaderTree
-import Network.Haskoin.Node.Peer
-import Network.Haskoin.Node.STM
-import Network.Haskoin.Transaction
-import System.Random (randomIO)
-
-startSPVNode :: (MonadLoggerIO m, MonadBaseControl IO m)
- => [PeerHost]
- -> BloomFilter
- -> Int
- -> NodeT m ()
-startSPVNode hosts bloom elems = do
- $(logDebug) "Setting our bloom filter in the node"
- atomicallyNodeT $ sendBloomFilter bloom elems
- $(logDebug) $ pack $ unwords
- [ "Starting SPV node with", show $ length hosts, "hosts" ]
- withAsync (void $ mapConcurrently startReconnectPeer hosts) $ \a1 -> do
- link a1
- $(logInfo) "Starting the initial header sync"
- headerSync
- $(logInfo) "Initial header sync complete"
- $(logDebug) "Starting the tickle processing thread"
- withAsync processTickles $ \a2 -> link a2 >> do
- _ <- liftIO $ waitAnyCancel [a1, a2]
- return ()
- $(logDebug) "Exiting SPV-node thread"
-
--- Source of all transaction broadcasts
-txSource :: (MonadLoggerIO m, MonadBaseControl IO m)
- => Source (NodeT m) Tx
-txSource = do
- chan <- lift $ asks sharedTxChan
- $(logDebug) "Waiting to receive a transaction..."
- resM <- liftIO $ atomically $ readTBMChan chan
- case resM of
- Just (pid, ph, tx) -> do
- $(logInfo) $ formatPid pid ph $ unwords
- [ "Received transaction broadcast", cs $ txHashToHex $ txHash tx ]
- yield tx >> txSource
- _ -> $(logError) "Tx channel closed unexpectedly"
-
-handleGetData :: (MonadLoggerIO m, MonadBaseControl IO m)
- => (TxHash -> m (Maybe Tx))
- -> NodeT m ()
-handleGetData handler = forever $ do
- $(logDebug) "Waiting for GetData transaction requests..."
- -- Wait for tx GetData requests to be available
- txids <- atomicallyNodeT $ do
- datMap <- readTVarS sharedTxGetData
- if M.null datMap then lift retry else return $ M.keys datMap
- forM (nub txids) $ \tid -> lift (handler tid) >>= \txM -> do
- $(logDebug) $ pack $ unwords
- [ "Processing GetData txid request", cs $ txHashToHex tid ]
- pidsM <- atomicallyNodeT $ do
- datMap <- readTVarS sharedTxGetData
- writeTVarS sharedTxGetData $ M.delete tid datMap
- return $ M.lookup tid datMap
- case (txM, pidsM) of
- -- Send the transaction to the required peers
- (Just tx, Just pids) -> forM_ pids $ \(pid, ph) -> do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Sending tx", cs $ txHashToHex tid, "to peer" ]
- atomicallyNodeT $ trySendMessage pid $ MTx tx
- _ -> return ()
-
-broadcastTxs :: (MonadLoggerIO m, MonadBaseControl IO m)
- => [TxHash]
- -> NodeT m ()
-broadcastTxs txids = do
- forM_ txids $ \tid -> $(logInfo) $ pack $ unwords
- [ "Transaction INV broadcast:", cs $ txHashToHex tid ]
- -- Broadcast an INV message for new transactions
- let msg = MInv $ Inv $ map (InvVector InvTx . getTxHash) txids
- atomicallyNodeT $ sendMessageAll msg
-
-rescanTs :: Timestamp -> NodeT STM ()
-rescanTs ts = do
- rescanTMVar <- asks sharedRescan
- lift $ do
- -- Make sure the TMVar is empty
- _ <- tryTakeTMVar rescanTMVar
- putTMVar rescanTMVar $ Left ts
-
-rescanHeight :: BlockHeight -> NodeT STM ()
-rescanHeight h = do
- rescanTMVar <- asks sharedRescan
- lift $ do
- -- Make sure the TMVar is empty
- _ <- tryTakeTMVar rescanTMVar
- putTMVar rescanTMVar $ Right h
-
--- Wait for the next merkle batch to be available. This function will check for
--- rescans.
-merkleDownload
- :: (MonadLoggerIO m, MonadBaseControl IO m)
- => BlockHash
- -> Word32
- -> NodeT m
- (BlockChainAction, Source (NodeT m) (Either (MerkleBlock, MerkleTxs) Tx))
-merkleDownload walletHash batchSize = do
- -- Store the best block received from the wallet for information only.
- -- This will be displayed in `hw status`
- merkleSyncedActions walletHash
- walletBlockM <- runSqlNodeT $ getBlockByHash walletHash
- walletBlock <- case walletBlockM of
- Just walletBlock -> do
- atomicallyNodeT $ writeTVarS sharedBestBlock walletBlock
- return walletBlock
- Nothing ->
- error "Could not find wallet best block in headers"
- rescanTMVar <- asks sharedRescan
- -- Wait either for a new block to arrive or a rescan to be triggered
- $(logDebug) "Waiting for a new block or a rescan..."
- resE <- atomicallyNodeT $ orElseNodeT
- (fmap Left $ lift $ takeTMVar rescanTMVar)
- (const (Right ()) <$> waitNewBlock walletHash)
- resM <- case resE of
- -- A rescan was triggered
- Left valE -> do
- $(logInfo) $ pack $ unwords
- [ "Got rescan request", show valE ]
- -- Wait until rescan conditions are met
- newValE <- waitRescan rescanTMVar valE
- $(logDebug) $ pack $ unwords
- [ "Rescan condition reached:", show newValE ]
- case newValE of
- Left ts -> tryMerkleDwnTimestamp ts batchSize
- Right _ -> tryMerkleDwnHeight walletBlock batchSize
- -- Continue download from a hash
- Right _ -> tryMerkleDwnBlock walletBlock batchSize
- case resM of
- Just res -> return res
- _ -> do
- $(logWarn) "Invalid merkleDownload result. Retrying ..."
- -- Sleep 10 seconds and retry
- liftIO $ threadDelay $ 10*1000000
- merkleDownload walletHash batchSize
- where
- waitRescan rescanTMVar valE = do
- resE <- atomicallyNodeT $ orElseNodeT
- (fmap Left (lift $ takeTMVar rescanTMVar))
- (waitVal valE >> return (Right valE))
- case resE of
- Left newValE -> waitRescan rescanTMVar newValE
- Right res -> return res
- waitVal valE = case valE of
- Left ts -> waitFastCatchup ts
- Right h -> waitHeight h
-
--- | Perform some actions only when headers have been synced.
-merkleSyncedActions
- :: (MonadLoggerIO m, MonadBaseControl IO m)
- => BlockHash -- ^ Wallet best block
- -> NodeT m ()
-merkleSyncedActions walletHash =
- asks sharedSyncLock >>= \lock -> liftBaseOp_ (Lock.with lock) $ do
- -- Check if we are synced
- (synced, mempool, header) <- atomicallyNodeT $ do
- header <- readTVarS sharedBestHeader
- synced <- areBlocksSynced walletHash
- mempool <- readTVarS sharedMempool
- return (synced, mempool, header)
- when synced $ do
- $(logInfo) $ pack $ unwords
- [ "Merkle blocks are in sync with the"
- , "network at height", show walletHash
- ]
- -- Prune side chains
- bestBlock <- runSqlNodeT $ pruneChain header
- atomicallyNodeT $ do
- -- Update shared best header after pruning
- writeTVarS sharedBestHeader bestBlock
- writeTVarS sharedMerklePeer Nothing
- -- Do a mempool sync on the first merkle sync
- unless mempool $ do
- atomicallyNodeT $ do
- sendMessageAll MMempool
- writeTVarS sharedMempool True
- $(logInfo) "Requesting a mempool sync"
-
--- Wait for headers to catch up to the given height
-waitHeight :: BlockHeight -> NodeT STM ()
-waitHeight height = do
- node <- readTVarS sharedBestHeader
- -- Check if we passed the timestamp condition
- unless (height < nodeBlockHeight node) $ lift retry
-
--- Wait for headers to catch up to the given timestamp
-waitFastCatchup :: Timestamp -> NodeT STM ()
-waitFastCatchup ts = do
- node <- readTVarS sharedBestHeader
- -- Check if we passed the timestamp condition
- unless (ts < blockTimestamp (nodeHeader node)) $
- lift retry
-
--- Wait for a new block to be available for download
-waitNewBlock :: BlockHash -> NodeT STM ()
-waitNewBlock bh = do
- node <- readTVarS sharedBestHeader
- -- We have more merkle blocks to download
- unless (bh /= nodeHash node) $
- lift retry
-
-tryMerkleDwnHeight
- :: (MonadLoggerIO m, MonadBaseControl IO m)
- => NodeBlock
- -> Word32
- -> NodeT m (Maybe (BlockChainAction,
- Source (NodeT m) (Either (MerkleBlock, MerkleTxs) Tx)))
-tryMerkleDwnHeight block batchSize = do
- $(logInfo) $ pack $ unwords
- [ "Requesting merkle blocks at height", show $ nodeBlockHeight block
- , "with batch size", show batchSize
- ]
- -- Request height - 1 as we want to start downloading at height
- nodeM <- runSqlNodeT $ getParentBlock block
- case nodeM of
- Just bn ->
- tryMerkleDwnBlock bn batchSize
- _ -> do
- $(logDebug) $ pack $ unwords
- [ "Can't download merkle blocks."
- , "Waiting for headers to sync ..."
- ]
- return Nothing
-
-tryMerkleDwnTimestamp
- :: (MonadLoggerIO m, MonadBaseControl IO m)
- => Timestamp
- -> Word32
- -> NodeT m (Maybe (BlockChainAction,
- Source (NodeT m) (Either (MerkleBlock, MerkleTxs) Tx)))
-tryMerkleDwnTimestamp ts batchSize = do
- $(logInfo) $ pack $ unwords
- [ "Requesting merkle blocks after timestamp", show ts
- , "with batch size", show batchSize
- ]
- nodeM <- runSqlNodeT $ getBlockAfterTime ts
- case nodeM of
- Just bh ->
- tryMerkleDwnBlock bh batchSize
- _ -> do
- $(logDebug) $ pack $ unwords
- [ "Can't download merkle blocks."
- , "Waiting for headers to sync ..."
- ]
- return Nothing
-
-tryMerkleDwnBlock
- :: (MonadLoggerIO m, MonadBaseControl IO m)
- => NodeBlock
- -> Word32
- -> NodeT m (Maybe (BlockChainAction,
- Source (NodeT m) (Either (MerkleBlock, MerkleTxs) Tx)))
-tryMerkleDwnBlock bh batchSize = do
- $(logDebug) $ pack $ unwords
- [ "Requesting merkle download from block"
- , cs $ blockHashToHex (nodeHash bh)
- , "and batch size", show batchSize
- ]
- -- Get the list of merkle blocks to download from our headers
- best <- atomicallyNodeT $ readTVarS sharedBestHeader
- action <- runSqlNodeT $ getBlockWindow best bh batchSize
- case actionNodes action of
- [] -> do
- $(logError) "BlockChainAction was empty"
- return Nothing
- ns -> do
- -- Wait for a peer available for merkle download
- (pid, PeerSession{..}) <- waitMerklePeer $
- nodeBlockHeight $ last ns
-
- $(logDebug) $ formatPid pid peerSessionHost $ unwords
- [ "Found merkle downloading peer with score"
- , show peerSessionScore
- ]
-
- let source = peerMerkleDownload pid peerSessionHost action
- return $ Just (action, source)
- where
- waitMerklePeer height = atomicallyNodeT $ do
- pidM <- readTVarS sharedHeaderPeer
- allPeers <- getPeersAtHeight (>= height)
- let f (pid,_) = Just pid /= pidM
- -- Filter out the peer syncing headers (if there is one)
- peers = filter f allPeers
- case listToMaybe peers of
- Just res@(pid,_) -> do
- writeTVarS sharedMerklePeer $ Just pid
- return res
- _ -> lift retry
-
-peerMerkleDownload
- :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> BlockChainAction
- -> Source (NodeT m) (Either (MerkleBlock, MerkleTxs) Tx)
-peerMerkleDownload pid ph action = do
- let bids = map nodeHash $ actionNodes action
- vs = map (InvVector InvMerkleBlock . getBlockHash) bids
- $(logInfo) $ formatPid pid ph $ unwords
- [ "Requesting", show $ length bids, "merkle block(s)" ]
- nonce <- liftIO randomIO
- -- Request a merkle batch download
- sessM <- lift . atomicallyNodeT $ do
- _ <- trySendMessage pid $ MGetData $ GetData vs
- -- Send a ping to have a recognizable end message for
- -- the last merkle block download.
- _ <- trySendMessage pid $ MPing $ Ping nonce
- tryGetPeerSession pid
- case sessM of
- Just PeerSession{..} -> checkOrder peerSessionMerkleChan bids
- _ -> lift . atomicallyNodeT $
- writeTVarS sharedMerklePeer Nothing
- where
- -- Build a source that that will check the order of the received merkle
- -- blocks against the initial request. If merkle blocks are sent out of
- -- order, the source will close and the peer will be flagged as
- -- misbehaving. The source will also close once all the requested merkle
- -- blocks have been received from the peer.
- checkOrder _ [] = lift . atomicallyNodeT $
- writeTVarS sharedMerklePeer Nothing
- checkOrder chan (bid:bids) = do
- -- Read the channel or disconnect the peer after waiting for 2 minutes
- resM <- lift $ raceTimeout 120
- (disconnectPeer pid ph)
- (liftIO . atomically $ readTBMChan chan)
- case resM of
- -- Forward transactions
- Right (Just res@(Right _)) ->
- yield res >> checkOrder chan (bid:bids)
- Right (Just res@(Left (MerkleBlock mHead _ _ _, _))) -> do
- let mBid = headerHash mHead
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Processing merkle block", cs $ blockHashToHex mBid ]
- -- Check if we were expecting this merkle block
- if mBid == bid
- then yield res >> checkOrder chan bids
- else lift $ do
- atomicallyNodeT $ writeTVarS sharedMerklePeer Nothing
- -- If we were not expecting this merkle block, do not
- -- yield the merkle block and close the source
- misbehaving pid ph moderateDoS $ unwords
- [ "Peer sent us merkle block hash"
- , cs $ blockHashToHex $ headerHash mHead
- , "but we expected merkle block hash"
- , cs $ blockHashToHex bid
- ]
- -- Not sure how to recover from this situation.
- -- Disconnect the peer. TODO: Is there a way to recover
- -- without buffering the whole batch in memory and
- -- re-order it?
- disconnectPeer pid ph
- -- The channel closed. Stop here.
- _ -> do
- $(logWarn) $ formatPid pid ph
- "Merkle channel closed unexpectedly"
- lift $ atomicallyNodeT $ writeTVarS sharedMerklePeer Nothing
-
-processTickles :: (MonadLoggerIO m, MonadBaseControl IO m)
- => NodeT m ()
-processTickles = forever $ do
- $(logDebug) $ pack "Waiting for a block tickle ..."
- (pid, ph, tickle) <- atomicallyNodeT waitTickle
- $(logInfo) $ formatPid pid ph $ unwords
- [ "Received block tickle", cs $ blockHashToHex tickle ]
- heightM <- fmap nodeBlockHeight <$> runSqlNodeT (getBlockByHash tickle)
- case heightM of
- Just height -> do
- $(logInfo) $ formatPid pid ph $ unwords
- [ "The block tickle", cs $ blockHashToHex tickle
- , "is already connected"
- ]
- updatePeerHeight pid ph height
- _ -> do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "The tickle", cs $ blockHashToHex tickle
- , "is unknown. Requesting a peer header sync."
- ]
- peerHeaderSyncFull pid ph `catchAny` const (disconnectPeer pid ph)
- newHeightM <-
- fmap nodeBlockHeight <$> runSqlNodeT (getBlockByHash tickle)
- case newHeightM of
- Just height -> do
- $(logInfo) $ formatPid pid ph $ unwords
- [ "The block tickle", cs $ blockHashToHex tickle
- , "was connected successfully"
- ]
- updatePeerHeight pid ph height
- _ -> $(logWarn) $ formatPid pid ph $ unwords
- [ "Could not find the height of block tickle"
- , cs $ blockHashToHex tickle
- ]
- where
- updatePeerHeight pid ph height = do
- $(logInfo) $ formatPid pid ph $ unwords
- [ "Updating peer height to", show height ]
- atomicallyNodeT $ do
- modifyPeerSession pid $ \s ->
- s{ peerSessionHeight = height }
- updateNetworkHeight
-
-waitTickle :: NodeT STM (PeerId, PeerHost, BlockHash)
-waitTickle = do
- tickleChan <- asks sharedTickleChan
- resM <- lift $ readTBMChan tickleChan
- case resM of
- Just res -> return res
- _ -> throw $ NodeException "tickle channel closed unexpectedly"
-
-syncedHeight :: MonadIO m => NodeT m (Bool, Word32)
-syncedHeight = atomicallyNodeT $ do
- synced <- areHeadersSynced
- ourHeight <- nodeBlockHeight <$> readTVarS sharedBestHeader
- return (synced, ourHeight)
-
-headerSync :: (MonadLoggerIO m, MonadBaseControl IO m)
- => NodeT m ()
-headerSync = do
- -- Start the header sync
- $(logDebug) "Syncing more headers. Finding the best peer..."
- (pid, PeerSession{..}) <- atomicallyNodeT $ do
- peers <- getPeersAtNetHeight
- case listToMaybe peers of
- Just res@(pid,_) -> do
- -- Save the header syncing peer
- writeTVarS sharedHeaderPeer $ Just pid
- return res
- _ -> lift retry
-
- $(logDebug) $ formatPid pid peerSessionHost $ unwords
- [ "Found best header syncing peer with score"
- , show peerSessionScore
- ]
-
- -- Run a maximum of 10 header downloads with this peer.
- -- Then we re-evaluate the best peer
- continue <- catchAny (peerHeaderSyncLimit pid peerSessionHost 10) $
- \e -> do
- $(logError) $ pack $ unwords ["peerHeaderSyncLimit:", show e]
- disconnectPeer pid peerSessionHost >> return True
-
- -- Reset the header syncing peer
- atomicallyNodeT $ writeTVarS sharedHeaderPeer Nothing
-
- -- Check if we should continue the header sync
- if continue then headerSync else do
- (synced, ourHeight) <- syncedHeight
- if synced
- then do
- $(logInfo) $ formatPid pid peerSessionHost $ unwords
- [ "Block headers are in sync with the"
- , "network at height", show ourHeight
- ]
- -- Continue the download if we are not yet synced
- else headerSync
-
-peerHeaderSyncLimit :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> Int
- -> NodeT m Bool
-peerHeaderSyncLimit pid ph initLimit
- | initLimit < 1 = error "Limit must be at least 1"
- | otherwise = go initLimit Nothing
- where
- go limit prevM = peerHeaderSync pid ph prevM >>= \actionM -> case actionM of
- Just action ->
- -- If we received a side chain or a known chain, we want to
- -- continue szncing from this peer even if the limit has been
- -- reached.
- if limit > 1 || isSideChain action || isKnownChain action
- then go (limit - 1) actionM
- -- We got a Just, so we can continue the download from
- -- this peer
- else return True
- _ -> return False
-
--- Sync all the headers from a given peer
-peerHeaderSyncFull :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> NodeT m ()
-peerHeaderSyncFull pid ph =
- go Nothing
- where
- go prevM = peerHeaderSync pid ph prevM >>= \actionM -> case actionM of
- Just _ -> go actionM
- Nothing -> do
- (synced, ourHeight) <- syncedHeight
- when synced $ $(logInfo) $ formatPid pid ph $ unwords
- [ "Block headers are in sync with the"
- , "network at height", show ourHeight
- ]
-
-areBlocksSynced :: BlockHash -> NodeT STM Bool
-areBlocksSynced walletHash = do
- headersSynced <- areHeadersSynced
- bestHeader <- readTVarS sharedBestHeader
- return $ headersSynced && walletHash == nodeHash bestHeader
-
--- Check if the block headers are synced with the network height
-areHeadersSynced :: NodeT STM Bool
-areHeadersSynced = do
- ourHeight <- nodeBlockHeight <$> readTVarS sharedBestHeader
- netHeight <- readTVarS sharedNetworkHeight
- -- If netHeight == 0 then we did not connect to any peers yet
- return $ ourHeight >= netHeight && netHeight > 0
-
--- | Sync one batch of headers from the given peer. Accept the result of a
--- previous peerHeaderSync to correctly compute block locators in the
--- presence of side chains.
-peerHeaderSync :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> Maybe BlockChainAction
- -> NodeT m (Maybe BlockChainAction)
-peerHeaderSync pid ph prevM = do
- $(logDebug) $ formatPid pid ph "Waiting for the HeaderSync lock"
- -- Aquire the header syncing lock
- lock <- asks sharedSyncLock
- liftBaseOp_ (Lock.with lock) $ do
-
- best <- atomicallyNodeT $ readTVarS sharedBestHeader
-
- -- Retrieve the block locator
- loc <- case prevM of
- Just (KnownChain ns) -> do
- $(logDebug) $ formatPid pid ph "Building a known chain locator"
- runSqlNodeT $ blockLocator $ last ns
- Just (SideChain ns) -> do
- $(logDebug) $ formatPid pid ph "Building a side chain locator"
- runSqlNodeT $ blockLocator $ last ns
- Just (BestChain ns) -> do
- $(logDebug) $ formatPid pid ph "Building a best chain locator"
- runSqlNodeT $ blockLocator $ last ns
- Just (ChainReorg _ _ ns) -> do
- $(logDebug) $ formatPid pid ph "Building a reorg locator"
- runSqlNodeT $ blockLocator $ last ns
- Nothing -> do
- $(logDebug) $ formatPid pid ph "Building a locator to best"
- runSqlNodeT $ blockLocator best
-
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Requesting headers with block locator of size"
- , show $ length loc
- , "Start block:", cs $ blockHashToHex $ head loc
- , "End block:", cs $ blockHashToHex $ last loc
- ]
-
- -- Send a GetHeaders message to the peer
- atomicallyNodeT $ sendMessage pid $ MGetHeaders $ GetHeaders 0x01 loc z
-
- $(logDebug) $ formatPid pid ph "Waiting 2 minutes for headers..."
-
- -- Wait 120 seconds for a response or time out
- continueE <- raceTimeout 120 (disconnectPeer pid ph) (waitHeaders best)
-
- -- Return True if we can continue syncing from this peer
- return $ either (const Nothing) id continueE
- where
- z = "0000000000000000000000000000000000000000000000000000000000000000"
- -- Wait for the headers to be available
- waitHeaders best = do
- (rPid, headers) <- atomicallyNodeT $ takeTMVarS sharedHeaders
- if rPid == pid
- then processHeaders best headers
- else waitHeaders best
- processHeaders _ (Headers []) = do
- $(logDebug) $ formatPid pid ph
- "Received empty headers. Finished downloading headers."
- -- Do not continue the header download
- return Nothing
- processHeaders best (Headers hs) = do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received", show $ length hs, "headers."
- , "Start blocks:", cs $ blockHashToHex $ headerHash $ fst $ head hs
- , "End blocks:", cs $ blockHashToHex $ headerHash $ fst $ last hs
- ]
- now <- round <$> liftIO getPOSIXTime
- actionE <- runSqlNodeT $ connectHeaders best (map fst hs) now
- case actionE of
- Left err -> do
- misbehaving pid ph severeDoS err
- return Nothing
- Right action -> case actionNodes action of
- [] -> do
- $(logWarn) $ formatPid pid ph $ unwords
- [ "Received an empty blockchain action:", show action ]
- return Nothing
- nodes -> do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received", show $ length nodes
- , "nodes in the action"
- ]
- let height = nodeBlockHeight $ last nodes
- case action of
- KnownChain _ ->
- $(logInfo) $ formatPid pid ph $ unwords
- [ "KnownChain headers received"
- , "up to height", show height
- ]
- SideChain _ ->
- $(logInfo) $ formatPid pid ph $ unwords
- [ "SideChain headers connected successfully"
- , "up to height", show height
- ]
- -- Headers extend our current best head
- _ -> do
- $(logInfo) $ formatPid pid ph $ unwords
- [ "Best headers connected successfully"
- , "up to height", show height
- ]
- atomicallyNodeT $
- writeTVarS sharedBestHeader $ last nodes
- -- If we received less than 2000 headers, we are done
- -- syncing from this peer and we return Nothing.
- return $ if length hs < 2000
- then Nothing
- else Just action
-
-nodeStatus :: NodeT STM NodeStatus
-nodeStatus = do
- nodeStatusPeers <- mapM peerStatus =<< getPeers
- SharedNodeState{..} <- ask
- lift $ do
- best <- readTVar sharedBestBlock
- header <- readTVar sharedBestHeader
- let nodeStatusBestBlock = nodeHash best
- nodeStatusBestBlockHeight = nodeBlockHeight best
- nodeStatusBestHeader = nodeHash header
- nodeStatusBestHeaderHeight = nodeBlockHeight header
- nodeStatusNetworkHeight <-
- readTVar sharedNetworkHeight
- nodeStatusBloomSize <-
- maybe 0 (S.length . bloomData . fst) <$> readTVar sharedBloomFilter
- nodeStatusHeaderPeer <-
- fmap hashUnique <$> readTVar sharedHeaderPeer
- nodeStatusMerklePeer <-
- fmap hashUnique <$> readTVar sharedMerklePeer
- nodeStatusHaveHeaders <-
- not <$> isEmptyTMVar sharedHeaders
- nodeStatusHaveTickles <-
- not <$> isEmptyTBMChan sharedTickleChan
- nodeStatusHaveTxs <-
- not <$> isEmptyTBMChan sharedTxChan
- nodeStatusGetData <-
- M.keys <$> readTVar sharedTxGetData
- nodeStatusRescan <-
- tryReadTMVar sharedRescan
- nodeStatusMempool <-
- readTVar sharedMempool
- nodeStatusSyncLock <-
- locked sharedSyncLock
- return NodeStatus{..}
-
-peerStatus :: (PeerId, PeerSession) -> NodeT STM PeerStatus
-peerStatus (pid, PeerSession{..}) = do
- hostM <- getHostSession peerSessionHost
- let peerStatusPeerId = hashUnique pid
- peerStatusHost = peerSessionHost
- peerStatusConnected = peerSessionConnected
- peerStatusHeight = peerSessionHeight
- peerStatusProtocol = version <$> peerSessionVersion
- peerStatusUserAgent =
- C.unpack . getVarString . userAgent <$> peerSessionVersion
- peerStatusPing = show <$> peerSessionScore
- peerStatusDoSScore = peerHostSessionScore <$> hostM
- peerStatusLog = peerHostSessionLog <$> hostM
- peerStatusReconnectTimer = peerHostSessionReconnect <$> hostM
- lift $ do
- peerStatusHaveMerkles <- not <$> isEmptyTBMChan peerSessionMerkleChan
- peerStatusHaveMessage <- not <$> isEmptyTBMChan peerSessionChan
- peerStatusPingNonces <- readTVar peerSessionPings
- return PeerStatus{..}
-
diff --git a/Network/Haskoin/Node/Checkpoints.hs b/Network/Haskoin/Node/Checkpoints.hs
deleted file mode 100644
index 059941e..0000000
--- a/Network/Haskoin/Node/Checkpoints.hs
+++ /dev/null
@@ -1,28 +0,0 @@
-module Network.Haskoin.Node.Checkpoints
-( checkpointMap
-, checkpointList
-, verifyCheckpoint
-) where
-
-import qualified Data.IntMap.Strict as M (IntMap, fromList, lookup)
-
-import Network.Haskoin.Block
-import Network.Haskoin.Constants
-
--- | Checkpoints from bitcoind reference implementation /src/checkpoints.cpp
--- presented as an IntMap.
-checkpointMap :: M.IntMap BlockHash
-checkpointMap = M.fromList checkpointList
-
--- | Checkpoints from bitcoind reference implementation /src/checkpoints.cpp
--- presented as a list.
-checkpointList :: [(Int, BlockHash)]
-checkpointList = checkpoints
-
--- | Verify that a block hash at a given height either matches an existing
--- checkpoint or is not a checkpoint.
-verifyCheckpoint :: Int -> BlockHash -> Bool
-verifyCheckpoint height hash = case M.lookup height checkpointMap of
- Just value -> hash == value
- Nothing -> True
-
diff --git a/Network/Haskoin/Node/HeaderTree.hs b/Network/Haskoin/Node/HeaderTree.hs
deleted file mode 100644
index 639a4c3..0000000
--- a/Network/Haskoin/Node/HeaderTree.hs
+++ /dev/null
@@ -1,656 +0,0 @@
-{-# LANGUAGE KindSignatures #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE RankNTypes #-}
-module Network.Haskoin.Node.HeaderTree
-( BlockChainAction(..)
-, BlockHeight
-, NodeBlock
-, Timestamp
-, initHeaderTree
-, migrateHeaderTree
-, getBestBlock
-, getHeads
-, getBlockByHash
-, getParentBlock
-, getBlockWindow
-, getBlockAfterTime
-, getChildBlocks
-, getBlockByHeight
-, getBlocksByHeight
-, getBlocksFromHeight
-, getBlocksAtHeight
-, putBlock
-, putBlocks
-, genesisBlock
-, splitBlock
-, splitChains
-, nodeBlock
-, nodeBlockHeight
-, nodeHash
-, nodeHeader
-, nodePrev
-, nodeTimestamp
-, nodeWork
-, nodeHeight
-, nodeChain
-, isBestChain
-, isChainReorg
-, isSideChain
-, isKnownChain
-, connectHeader
-, connectHeaders
-, blockLocator
-, pruneChain
-) where
-
-import Control.Monad (foldM, forM, unless,
- when, (<=<))
-import Control.Monad.State (evalStateT, get, put)
-import Control.Monad.Trans (MonadIO, lift)
-import Control.Monad.Trans.Either (EitherT, left,
- runEitherT)
-import Data.Bits (shiftL)
-import qualified Data.ByteString as BS (reverse, take)
-import Data.Function (on)
-import Data.List (find, maximumBy, sort)
-import Data.Maybe (fromMaybe, isNothing,
- listToMaybe, mapMaybe)
-import Data.Serialize (decode, encode)
-import Data.String.Conversions (cs)
-import Data.Word (Word32)
-import Database.Esqueleto (Esqueleto, Value, asc,
- delete, from, groupBy,
- in_, insertMany_, limit,
- max_, not_, orderBy,
- select, set, unValue,
- update, val, valList,
- where_, (!=.), (&&.),
- (<=.), (=.), (==.),
- (>.), (>=.), (^.),
- (||.))
-import Database.Persist (Entity (..), insert_)
-import Database.Persist.Sql (SqlPersistT)
-import Network.Haskoin.Block
-import Network.Haskoin.Constants
-import Network.Haskoin.Crypto
-import Network.Haskoin.Node.Checkpoints
-import Network.Haskoin.Node.HeaderTree.Model
-import Network.Haskoin.Node.HeaderTree.Types
-import Network.Haskoin.Util
-
-data BlockChainAction
- = BestChain { actionNodes :: ![NodeBlock] }
- | ChainReorg { actionSplitNode :: !NodeBlock
- , actionOldNodes :: ![NodeBlock]
- , actionNodes :: ![NodeBlock]
- }
- | SideChain { actionNodes :: ![NodeBlock] }
- | KnownChain { actionNodes :: ![NodeBlock] }
- deriving (Show, Eq)
-
-type MinWork = Word32
-
-shortHash :: BlockHash -> ShortHash
-shortHash = fromRight . decode . BS.take 8 . getHash256 . getBlockHash
-
-nodeHeader :: NodeBlock -> BlockHeader
-nodeHeader = getNodeHeader . nodeBlockHeader
-
-nodeHash :: NodeBlock -> BlockHash
-nodeHash = headerHash . nodeHeader
-
-nodePrev :: NodeBlock -> BlockHash
-nodePrev = prevBlock . nodeHeader
-
-nodeTimestamp :: NodeBlock -> Timestamp
-nodeTimestamp = blockTimestamp . nodeHeader
-
-nodeWork :: NodeBlock -> Work
-nodeWork = nodeBlockWork
-
-nodeHeight :: NodeBlock -> BlockHeight
-nodeHeight = nodeBlockHeight
-
-nodeChain :: NodeBlock -> Word32
-nodeChain = nodeBlockChain
-
--- | Number of blocks on average between difficulty cycles (2016 blocks).
-diffInterval :: Word32
-diffInterval = targetTimespan `div` targetSpacing
-
--- | Genesis block.
-genesisBlock :: NodeBlock
-genesisBlock = NodeBlock
- { nodeBlockHash = shortHash $ headerHash genesisHeader
- , nodeBlockHeader = NodeHeader genesisHeader
- , nodeBlockWork = 1.0
- , nodeBlockHeight = 0
- , nodeBlockChain = 0
- }
-
--- | Initialize the block header chain by inserting the genesis block if it
--- doesn't already exist.
-initHeaderTree :: MonadIO m => SqlPersistT m ()
-initHeaderTree = do
- nodeM <- getBlockByHash $ headerHash genesisHeader
- when (isNothing nodeM) $ putBlock genesisBlock
-
-getVerifyParams
- :: MonadIO m
- => BlockHeader
- -> EitherT String (SqlPersistT m)
- (NodeBlock, [Timestamp], Timestamp, Word32, Maybe Word32)
-getVerifyParams bh = do
- parentM <- lift $ getBlockByHash $ prevBlock bh
- parent <- maybe (left "Could not get parent node") return parentM
- checkPointM <- fmap nodeBlockHeight <$> lift lastSeenCheckpoint
- diffBlockM <- lift $ getBlockByHeight parent $
- nodeBlockHeight parent `div` diffInterval * diffInterval
- diffTime <- maybe (left "Could not get difficulty change block")
- (return . nodeTimestamp)
- diffBlockM
- medianBlocks <- lift $ map nodeTimestamp <$>
- getBlocksFromHeight parent 11 (min 0 $ nodeBlockHeight parent - 10)
- minWork <- lift $ findMinWork parent
- return (parent, medianBlocks, diffTime, minWork, checkPointM)
-
-findMinWork :: MonadIO m => NodeBlock -> SqlPersistT m MinWork
-findMinWork bn
- | isMinWork bn = return $ blockBits $ nodeHeader bn
- | otherwise = getParentBlock bn >>=
- maybe (return $ blockBits $ nodeHeader bn) findMinWork
-
-isMinWork :: NodeBlock -> Bool
-isMinWork bn
- | not allowMinDifficultyBlocks = True
- | nodeBlockHeight bn `mod` diffInterval == 0 = True
- | blockBits (nodeHeader bn) /= encodeCompact powLimit = True
- | otherwise = False
-
-splitKnown :: MonadIO m
- => [BlockHeader]
- -> SqlPersistT m ([NodeBlock], [BlockHeader])
-splitKnown hs = do
- (kno, unk) <- foldM f ([], []) hs
- return (reverse kno, reverse unk)
- where
- f (kno, []) n = do
- bnM <- getBlockByHash (headerHash n)
- case bnM of
- Nothing -> return (kno, [n])
- Just bn -> return (bn:kno, [])
- f (kno, unk) n = return (kno, n:unk)
-
--- | Connect a block header to this block header chain. Corresponds to bitcoind
--- function ProcessBlockHeader and AcceptBlockHeader in main.cpp.
-connectHeader :: MonadIO m
- => NodeBlock
- -> BlockHeader
- -> Timestamp
- -> SqlPersistT m (Either String BlockChainAction)
-connectHeader best bh ts = runEitherT $ do
- (kno, _) <- lift $ splitKnown [bh]
- case kno of
- [] -> do
- (parent, medians, diffTime, minWork, cpM) <- getVerifyParams bh
- chain <- lift $ getChain parent
- let bn = nodeBlock parent chain bh
- liftEither $
- verifyBlockHeader parent medians diffTime cpM minWork ts bh
- lift $ putBlock bn
- lift $ evalNewChain best [bn]
- _ -> return $ KnownChain kno
-
--- | A more efficient way of connecting a list of block headers than connecting
--- them individually. The list of block headers have must form a valid chain.
-connectHeaders :: MonadIO m
- => NodeBlock
- -> [BlockHeader]
- -> Timestamp
- -> SqlPersistT m (Either String BlockChainAction)
-connectHeaders _ [] _ = runEitherT $ left "Nothing to connect"
-connectHeaders best bhs ts = runEitherT $ do
- unless (validChain bhs) $ left "Block headers do not form a valid chain"
- (kno, unk) <- lift $ splitKnown bhs
- case unk of
- [] -> return $ KnownChain kno
- (bh:_) -> do
- (parent, medians, diffTime, minWork, cpM) <- getVerifyParams bh
- chain <- lift $ getChain parent
- nodes <- (`evalStateT` (parent, diffTime, medians, minWork)) $
- forM unk $ \b -> do
- (p, d, ms, mw) <- get
- lift $ liftEither $ verifyBlockHeader p ms d cpM mw ts b
- let bn = nodeBlock p chain b
- d' = if nodeBlockHeight bn `mod` diffInterval == 0
- then blockTimestamp b
- else d
- ms' = blockTimestamp b : if length ms == 11
- then tail ms
- else ms
- mw' = if isMinWork bn then blockBits b else mw
- put (bn, d', ms', mw')
- return bn
- lift $ putBlocks nodes
- lift $ evalNewChain best nodes
- where
- validChain (a:b:xs) = prevBlock b == headerHash a && validChain (b:xs)
- validChain [_] = True
- validChain _ = False
-
--- | Returns True if the action is a best chain.
-isBestChain :: BlockChainAction -> Bool
-isBestChain (BestChain _) = True
-isBestChain _ = False
-
--- | Returns True if the action is a chain reorg.
-isChainReorg :: BlockChainAction -> Bool
-isChainReorg ChainReorg{} = True
-isChainReorg _ = False
-
--- | Returns True if the action is a side chain.
-isSideChain :: BlockChainAction -> Bool
-isSideChain (SideChain _) = True
-isSideChain _ = False
-
--- | Returns True if the action is a known chain.
-isKnownChain :: BlockChainAction -> Bool
-isKnownChain (KnownChain _) = True
-isKnownChain _ = False
-
--- | Returns a BlockLocator object for a given block hash.
-blockLocator :: MonadIO m => NodeBlock -> SqlPersistT m BlockLocator
-blockLocator node = do
- nodes <- getBlocksByHeight node bs
- return $ map nodeHash nodes
- where
- h = nodeBlockHeight node
- f x s = (fst x - s, fst x > s)
- bs = (++ [0]) $ map fst $ takeWhile snd $
- [(h - x, x < h) | x <- [0..9]] ++
- scanl f (h - 10, h > 10) [2 ^ (x :: Word32) | x <- [1..]]
-
--- | Verify block header conforms to protocol.
-verifyBlockHeader :: NodeBlock -- ^ Parent block header
- -> [Timestamp] -- ^ Timestamps of previous 11 blocks
- -> Timestamp -- ^ Previous difficulty change
- -> Maybe Word32 -- ^ Height of most recent checkpoint
- -> MinWork -- ^ Last MinWork (e.g. Testnet3)
- -> Timestamp -- ^ Current time
- -> BlockHeader -- ^ Block header to validate
- -> Either String ()
--- TODO: Add DOS return values
-verifyBlockHeader par mts dt cp mw ts bh = do
- unless (isValidPOW bh) $
- Left "Invalid proof of work"
-
- unless (blockTimestamp bh <= ts + 2 * 60 * 60) $
- Left "Invalid header timestamp"
-
- let nextWork = nextWorkRequired par dt mw bh
- unless (blockBits bh == nextWork) $
- Left "Incorrect work transition (bits)"
-
- let sortedMedians = sort mts
- medianTime = sortedMedians !! (length sortedMedians `div` 2)
- when (blockTimestamp bh <= medianTime) $
- Left "Block timestamp is too early"
-
- let newHeight = nodeBlockHeight par + 1
- unless (maybe True (fromIntegral newHeight >) cp) $
- Left "Rewriting pre-checkpoint chain"
-
- unless (verifyCheckpoint (fromIntegral newHeight) (headerHash bh)) $
- Left "Rejected by checkpoint lock-in"
-
- -- All block of height 227836 or more use version 2 in prodnet
- -- TODO: Find out the value here for testnet
- when (networkName == "prodnet"
- && blockVersion bh == 1
- && nodeBlockHeight par + 1 >= 227836) $
- Left "Rejected version 1 block"
-
--- | Create a block node data structure from a block header.
-nodeBlock :: NodeBlock -- ^ Parent block node
- -> Word32 -- ^ Chain number for new node
- -> BlockHeader
- -> NodeBlock
-nodeBlock parent chain bh = NodeBlock
- { nodeBlockHash = shortHash $ headerHash bh
- , nodeBlockHeader = NodeHeader bh
- , nodeBlockWork = newWork
- , nodeBlockHeight = height
- , nodeBlockChain = chain
- }
- where
- newWork = nodeBlockWork parent + fromIntegral
- (headerWork bh `div` headerWork genesisHeader)
- height = nodeBlockHeight parent + 1
-
--- | Return blockchain action to connect given block with best block. Count will
--- limit the amount of blocks building up from split point towards the best
--- block.
-getBlockWindow :: MonadIO m
- => NodeBlock -- ^ Best block
- -> NodeBlock -- ^ Start of window
- -> Word32 -- ^ Window count
- -> SqlPersistT m BlockChainAction
-getBlockWindow best node cnt = do
- (_, old, new) <- splitChains (node, 0) (best, cnt)
- return $ if null old then BestChain new else ChainReorg node old new
-
--- | Find the split point between two nodes. It also returns the two partial
--- chains leading from the split point to the respective nodes. Tuples must
--- contain a block node and the count of nodes that should be returned from the
--- split towards that block. 0 means all.
-splitChains :: MonadIO m
- => (NodeBlock, Word32)
- -> (NodeBlock, Word32)
- -> SqlPersistT m (NodeBlock, [NodeBlock], [NodeBlock])
-splitChains (l, ln) (r, rn) = do
- sn <- splitBlock l r
- (split:ls) <- getBlocksFromHeight l ln (nodeBlockHeight sn)
- rs <- getBlocksFromHeight r rn (nodeBlockHeight sn + 1)
- return (split, ls, rs)
-
--- | Finds the parent of a block.
-getParentBlock :: MonadIO m
- => NodeBlock
- -> SqlPersistT m (Maybe NodeBlock)
-getParentBlock node
- | nodeBlockHeight node == 0 = return Nothing
- | otherwise = getBlockByHash p
- where
- p = nodePrev node
-
--- | Get all children for a block
-getChildBlocks :: MonadIO m
- => BlockHash
- -> SqlPersistT m [NodeBlock]
-getChildBlocks h = do
- ch <- (+1) . nodeBlockHeight . fromMaybe e <$> getBlockByHash h
- filter ((==h) . nodePrev) <$> getBlocksAtHeight ch
- where
- e = error $ "Cannot find block hash " ++ cs (blockHashToHex h)
-
-
--- | Get the last checkpoint that we have seen.
-lastSeenCheckpoint :: MonadIO m
- => SqlPersistT m (Maybe NodeBlock)
-lastSeenCheckpoint =
- fmap listToMaybe $ getBlocksByHash $ map snd $ reverse checkpointList
-
--- | Returns the work required for a block header given the previous block. This
--- coresponds to bitcoind function GetNextWorkRequired in main.cpp.
-nextWorkRequired :: NodeBlock
- -> Timestamp
- -> MinWork
- -> BlockHeader
- -> Word32
-nextWorkRequired par ts mw bh
- -- Genesis block
- | nodeBlockHeight par == 0 = encodeCompact powLimit
- -- Only change the difficulty once per interval
- | (nodeBlockHeight par + 1) `mod` diffInterval /= 0 =
- if allowMinDifficultyBlocks
- then minPOW
- else blockBits $ nodeHeader par
- | otherwise = workFromInterval ts (nodeHeader par)
- where
- delta = targetSpacing * 2
- minPOW
- | blockTimestamp bh > nodeTimestamp par + delta = encodeCompact powLimit
- | otherwise = mw
-
--- | Computes the work required for the next block given a timestamp and the
--- current block. The timestamp should come from the block that matched the
--- last jump in difficulty (spaced out by 2016 blocks in prodnet).
-workFromInterval :: Timestamp -> BlockHeader -> Word32
-workFromInterval ts lastB
- | newDiff > powLimit = encodeCompact powLimit
- | otherwise = encodeCompact newDiff
- where
- t = fromIntegral $ blockTimestamp lastB - ts
- actualTime
- | t < targetTimespan `div` 4 = targetTimespan `div` 4
- | t > targetTimespan * 4 = targetTimespan * 4
- | otherwise = t
- lastDiff = decodeCompact $ blockBits lastB
- newDiff = lastDiff * toInteger actualTime `div` toInteger targetTimespan
-
--- | Returns True if the difficulty target (bits) of the header is valid and the
--- proof of work of the header matches the advertised difficulty target. This
--- function corresponds to the function CheckProofOfWork from bitcoind in
--- main.cpp.
-isValidPOW :: BlockHeader -> Bool
-isValidPOW bh
- | target <= 0 || target > powLimit = False
- | otherwise = headerPOW bh <= fromIntegral target
- where
- target = decodeCompact $ blockBits bh
-
--- | Returns the proof of work of a block header as an Integer number.
-headerPOW :: BlockHeader -> Integer
-headerPOW = bsToInteger . BS.reverse . encode . headerHash
-
--- | Returns the work represented by this block. Work is defined as the number
--- of tries needed to solve a block in the average case with respect to the
--- target.
-headerWork :: BlockHeader -> Integer
-headerWork bh =
- fromIntegral $ largestHash `div` (target + 1)
- where
- target = decodeCompact (blockBits bh)
- largestHash = 1 `shiftL` 256
-
-{- Persistent backend -}
-
-chainPathQuery :: forall (expr :: * -> *) (query :: * -> *) backend.
- Esqueleto query expr backend
- => expr (Entity NodeBlock)
- -> [NodeBlock]
- -> expr (Value Bool)
-chainPathQuery _ [] = error "Monsters, monsters everywhere"
-
-chainPathQuery t [NodeBlock{..}] =
- t ^. NodeBlockHeight <=. val nodeBlockHeight &&.
- t ^. NodeBlockChain ==. val nodeBlockChain
-
-chainPathQuery t (n1:bs@(n2:_)) = chainPathQuery t bs ||.
- ( t ^. NodeBlockHeight <=. val (nodeBlockHeight n1)
- &&. t ^. NodeBlockHeight >. val (nodeBlockHeight n2)
- &&. t ^. NodeBlockChain ==. val (nodeBlockChain n1)
- )
-
-getHeads :: MonadIO m => SqlPersistT m [NodeBlock]
-getHeads = fmap (map (entityVal . snd)) $ select $ from $ \t -> do
- groupBy $ t ^. NodeBlockChain
- return (max_ (t ^. NodeBlockHeight), t)
-
--- | Chain for new block building on a parent node
-getChain :: MonadIO m
- => NodeBlock -- ^ Parent node
- -> SqlPersistT m Word32
-getChain parent = do
- maxHeightM <- fmap (unValue <=< listToMaybe) $ select $ from $ \t -> do
- where_ $ t ^. NodeBlockChain ==. val (nodeBlockChain parent)
- return $ max_ $ t ^. NodeBlockHeight
- let maxHeight = fromMaybe (error "That chain does not exist") maxHeightM
- if maxHeight == nodeBlockHeight parent
- then return $ nodeBlockChain parent
- else do
- maxChainM <- fmap (unValue <=< listToMaybe) $ select $ from $ \t ->
- return $ max_ $ t ^. NodeBlockChain
- let maxChain = fromMaybe (error "Ran out of chains") maxChainM
- return $ maxChain + 1
-
-getPivots :: MonadIO m => NodeBlock -> SqlPersistT m [NodeBlock]
-getPivots = go []
- where
- go acc b
- | nodeBlockChain b == 0 = return $ genesisBlock : b : acc
- | otherwise = do
- l <- fromMaybe (error "Houston, we have a problem") <$>
- getChainLowest b
- c <- fromMaybe (error "Ground Control to Major Tom") <$>
- getParentBlock l
- go (b:acc) c
-
-getChainLowest :: MonadIO m => NodeBlock -> SqlPersistT m (Maybe NodeBlock)
-getChainLowest nb = fmap (listToMaybe . map entityVal) $
- select $ from $ \t -> do
- where_ $ t ^. NodeBlockChain ==. val (nodeBlockChain nb)
- orderBy [ asc $ t ^. NodeBlockHeight ]
- limit 1
- return t
-
--- | Get node height and chain common to both given.
-splitBlock :: MonadIO m
- => NodeBlock
- -> NodeBlock
- -> SqlPersistT m NodeBlock
-splitBlock l r = if nodeBlockChain l == nodeBlockChain r
- then if nodeBlockHeight l < nodeBlockHeight r
- then return l
- else return r
- else do
- pivotsL <- getPivots l
- pivotsR <- getPivots r
- let ns = zip pivotsL pivotsR
- f (x,y) = nodeBlockChain x == nodeBlockChain y
- (one, two) = last $ takeWhile f ns
- if nodeBlockHeight one < nodeBlockHeight two
- then return one
- else return two
-
--- | Put single block in database.
-putBlock :: MonadIO m => NodeBlock -> SqlPersistT m ()
-putBlock = insert_
-
--- | Put multiple blocks in database.
-putBlocks :: MonadIO m => [NodeBlock] -> SqlPersistT m ()
-putBlocks = mapM_ insertMany_ . f
- where
- f [] = []
- f xs = let (xs',xxs) = splitAt 50 xs in xs' : f xxs
-
-getBestBlock :: MonadIO m => SqlPersistT m NodeBlock
-getBestBlock =
- maximumBy (compare `on` nodeBlockWork) <$> getHeads
-
-getBlockByHash :: MonadIO m => BlockHash -> SqlPersistT m (Maybe NodeBlock)
-getBlockByHash h =
- fmap (listToMaybe . map entityVal) $ select $ from $ \t -> do
- where_ $ t ^. NodeBlockHash ==. val (shortHash h)
- return t
-
--- | Get multiple blocks corresponding to given hashes
-getBlocksByHash :: MonadIO m
- => [BlockHash]
- -> SqlPersistT m [NodeBlock]
-getBlocksByHash hashes = do
- nodes <- fmap (map entityVal) $ select $ from $ \t -> do
- where_ $ t ^. NodeBlockHash `in_` valList (map shortHash hashes)
- return t
- return $ mapMaybe
- (\h -> find ((== shortHash h) . nodeBlockHash) nodes)
- hashes
-
--- | Get ancestor of specified block at given height.
-getBlockByHeight :: MonadIO m
- => NodeBlock -- ^ Best block
- -> BlockHeight
- -> SqlPersistT m (Maybe NodeBlock)
-getBlockByHeight block height = do
- forks <- reverse <$> getPivots block
- fmap (listToMaybe . map entityVal) $ select $ from $ \t -> do
- where_ $ chainPathQuery t forks &&.
- t ^. NodeBlockHeight ==. val height
- return t
-
--- | Get ancestors for specified block at given heights.
-getBlocksByHeight :: MonadIO m
- => NodeBlock -- ^ Best block
- -> [BlockHeight]
- -> SqlPersistT m [NodeBlock]
-getBlocksByHeight best heights = do
- forks <- reverse <$> getPivots best
- nodes <- fmap (map entityVal) $ select $ from $ \t -> do
- where_ $ chainPathQuery t forks &&.
- t ^. NodeBlockHeight `in_` valList heights
- return t
- return $ mapMaybe (\h -> find ((==h) . nodeBlockHeight) nodes) heights
-
--- | Get a range of block headers building up to specified block. If
--- specified height is too large, an empty list will be returned.
-getBlocksFromHeight :: MonadIO m
- => NodeBlock -- ^ Best block
- -> Word32 -- ^ Count (0 for all)
- -> BlockHeight -- ^ Height from (including)
- -> SqlPersistT m [NodeBlock]
-getBlocksFromHeight block cnt height = do
- forks <- reverse <$> getPivots block
- fmap (map entityVal) $ select $ from $ \t -> do
- where_ $ chainPathQuery t forks &&.
- t ^. NodeBlockHeight >=. val height
- when (cnt > 0) $ limit $ fromIntegral cnt
- return t
-
--- | Get node immediately at or after timestamp in main chain.
-getBlockAfterTime :: MonadIO m => Timestamp -> SqlPersistT m (Maybe NodeBlock)
-getBlockAfterTime ts = do
- n@NodeBlock{..} <- getBestBlock
- f genesisBlock n
- where
- f l r | nodeTimestamp r < ts =
- return Nothing
- | nodeTimestamp l >= ts =
- return $ Just l
- | (nodeBlockHeight r - nodeBlockHeight l) `div` 2 == 0 =
- return $ Just r
- | otherwise = do
- let rh = nodeBlockHeight r
- lh = nodeBlockHeight l
- mh = rh - (rh - lh) `div` 2
- m <- fromMaybe (error "My God, it’s full of stars!") <$>
- getBlockByHeight r mh
- if nodeTimestamp m > ts then f l m else f m r
-
--- | Get blocks at specified height in all chains.
-getBlocksAtHeight :: MonadIO m => BlockHeight -> SqlPersistT m [NodeBlock]
-getBlocksAtHeight height = fmap (map entityVal) $ select $ from $ \t -> do
- where_ $ t ^. NodeBlockHeight ==. val height
- return t
-
--- | Evaluate block action for provided best block and chain of new blocks.
-evalNewChain :: MonadIO m
- => NodeBlock
- -> [NodeBlock]
- -> SqlPersistT m BlockChainAction
-evalNewChain _ [] = error "You find yourself in the dungeon of missing blocks"
-evalNewChain best newNodes
- | buildsOnBest = do
- return $ BestChain newNodes
- | nodeBlockWork (last newNodes) > nodeBlockWork best = do
- (split, old, new) <- splitChains (best, 0) (head newNodes, 0)
- return $ ChainReorg split old (new ++ tail newNodes)
- | otherwise = do
- (split, _, new) <- splitChains (best, 0) (head newNodes, 0)
- case new of
- [] -> return $ KnownChain newNodes
- _ -> return $ SideChain $ split : new ++ tail newNodes
- where
- buildsOnBest = nodePrev (head newNodes) == nodeHash best
-
--- | Remove all other chains from database and return updated best block node.
-pruneChain :: MonadIO m
- => NodeBlock
- -> SqlPersistT m NodeBlock
-pruneChain best = if (nodeBlockChain best == 0) then return best else do
- forks <- reverse <$> getPivots best
- delete $ from $ \t -> where_ $ not_ (chainPathQuery t forks)
- update $ \t -> do
- set t [ NodeBlockChain =. val 0 ]
- where_ $ t ^. NodeBlockHeight <=. val (nodeBlockHeight best)
- &&. t ^. NodeBlockChain !=. val 0
- return best{ nodeBlockChain = 0 }
diff --git a/Network/Haskoin/Node/HeaderTree/Model.hs b/Network/Haskoin/Node/HeaderTree/Model.hs
deleted file mode 100644
index 6e7e507..0000000
--- a/Network/Haskoin/Node/HeaderTree/Model.hs
+++ /dev/null
@@ -1,27 +0,0 @@
-{-# LANGUAGE EmptyDataDecls #-}
-{-# LANGUAGE GADTs #-}
-{-# LANGUAGE GeneralizedNewtypeDeriving #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE QuasiQuotes #-}
-{-# LANGUAGE TemplateHaskell #-}
-{-# LANGUAGE TypeFamilies #-}
-module Network.Haskoin.Node.HeaderTree.Model where
-
-import Data.Word (Word32)
-import Database.Persist.TH (mkMigrate, mkPersist,
- persistLowerCase, share,
- sqlSettings)
-import Network.Haskoin.Node.HeaderTree.Types
-
-share [mkPersist sqlSettings, mkMigrate "migrateHeaderTree"] [persistLowerCase|
-NodeBlock
- hash ShortHash
- header NodeHeader maxlen=80
- work Work
- height BlockHeight
- chain Word32
- UniqueHash hash
- UniqueChain chain height
- deriving Show
- deriving Eq
-|]
diff --git a/Network/Haskoin/Node/HeaderTree/Types.hs b/Network/Haskoin/Node/HeaderTree/Types.hs
deleted file mode 100644
index bc50154..0000000
--- a/Network/Haskoin/Node/HeaderTree/Types.hs
+++ /dev/null
@@ -1,30 +0,0 @@
-module Network.Haskoin.Node.HeaderTree.Types where
-
-import Data.Serialize (decode, encode)
-import Data.String (fromString)
-import Data.Word (Word32, Word64)
-import Database.Persist (PersistField (..), PersistValue (..),
- SqlType (..))
-import Database.Persist.Sql (PersistFieldSql (..))
-import Network.Haskoin.Block
-
-type BlockHeight = Word32
-type ShortHash = Word64
-type Timestamp = Word32
-type Work = Double
-
-newtype NodeHeader = NodeHeader { getNodeHeader :: BlockHeader }
- deriving (Show, Eq)
-
-{- SQL database backend for HeaderTree -}
-
-instance PersistField NodeHeader where
- toPersistValue = PersistByteString . encode . getNodeHeader
- fromPersistValue (PersistByteString bs) =
- case decode bs of
- Right x -> Right (NodeHeader x)
- Left e -> Left (fromString e)
- fromPersistValue _ = Left "Invalid persistent block header"
-
-instance PersistFieldSql NodeHeader where
- sqlType _ = SqlBlob
diff --git a/Network/Haskoin/Node/Peer.hs b/Network/Haskoin/Node/Peer.hs
deleted file mode 100644
index 516bc0c..0000000
--- a/Network/Haskoin/Node/Peer.hs
+++ /dev/null
@@ -1,720 +0,0 @@
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE TemplateHaskell #-}
-module Network.Haskoin.Node.Peer where
-
-import Control.Concurrent (killThread, myThreadId,
- threadDelay)
-import Control.Concurrent.Async.Lifted (link, race, waitAnyCancel,
- waitCatch, withAsync)
-import Control.Concurrent.STM (STM, atomically, modifyTVar',
- newTVarIO, readTVar, retry,
- swapTVar)
-import Control.Concurrent.STM.TBMChan (TBMChan, closeTBMChan,
- newTBMChan, writeTBMChan)
-import Control.Exception (AsyncException(ThreadKilled))
-import Control.Exception.Lifted (finally, fromException, throw,
- throwIO)
-import Control.Monad (forM_, forever, join, unless,
- when)
-import Control.Monad.Logger (MonadLoggerIO, logDebug,
- logError, logInfo, logWarn)
-import Control.Monad.Reader (asks)
-import Control.Monad.State (StateT, evalStateT, get, put)
-import Control.Monad.Trans (MonadIO, lift, liftIO)
-import Control.Monad.Trans.Control (MonadBaseControl)
-import Data.Bits (testBit)
-import qualified Data.ByteString as BS (ByteString, append,
- null)
-import qualified Data.ByteString.Char8 as C (pack)
-import qualified Data.ByteString.Lazy as BL (toStrict)
-import Data.Conduit (Conduit, Sink, awaitForever,
- yield, ($$), ($=))
-import qualified Data.Conduit.Binary as CB (take)
-import Data.Conduit.Network (appSink, appSource,
- clientSettings,
- runGeneralTCPClient)
-import Data.Conduit.TMChan (sourceTBMChan)
-import Data.List (nub, sort, sortBy)
-import qualified Data.Map as M (assocs, elems, fromList,
- keys, lookup, unionWith)
-import Data.Maybe (fromMaybe, isJust,
- listToMaybe)
-import Data.Serialize (decode, encode)
-import Data.String.Conversions (cs)
-import Data.Text (Text, pack)
-import Data.Time.Clock (diffUTCTime, getCurrentTime)
-import Data.Time.Clock.POSIX (getPOSIXTime)
-import Data.Unique (hashUnique, newUnique)
-import Data.Word (Word32)
-import Network.Haskoin.Block
-import Network.Haskoin.Constants
-import Network.Haskoin.Node
-import Network.Haskoin.Node.HeaderTree
-import Network.Haskoin.Node.STM
-import Network.Haskoin.Transaction
-import Network.Haskoin.Util
-import Network.Socket (SockAddr (SockAddrInet))
-import System.Random (randomIO)
-
--- TODO: Move constants elsewhere ?
-minProtocolVersion :: Word32
-minProtocolVersion = 70001
-
--- Start a reconnecting peer that will idle once the connection is established
--- and the handshake is performed.
-startPeer :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerHost
- -> NodeT m ()
-startPeer ph@PeerHost{..} = do
- -- Create a new unique ID for this peer
- pid <- liftIO newUnique
- -- Start the peer with the given PID
- startPeerPid pid ph
-
--- Start a peer that will try to reconnect when the connection is closed. The
--- reconnections are performed using an expoential backoff time. This function
--- blocks until the peer cannot reconnect (either the peer is banned or we
--- already have a peer connected to the given peer host).
-startReconnectPeer :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerHost
- -> NodeT m ()
-startReconnectPeer ph@PeerHost{..} = do
- -- Create a new unique ID for this peer
- pid <- liftIO newUnique
- -- Wait if there is a reconnection timeout
- maybeWaitReconnect pid
- -- Launch the peer
- withAsync (startPeerPid pid ph) $ \a -> do
- resE <- liftIO $ waitCatch a
- reconnect <- case resE of
- Left se -> do
- $(logError) $ formatPid pid ph $ unwords
- [ "Peer thread stopped with exception:", show se ]
- return $ case fromException se of
- Just NodeExceptionBanned -> False
- Just NodeExceptionConnected -> False
- Just (NodeExceptionInvalidPeer _) -> False
- _ -> fromException se /= Just ThreadKilled
- Right _ -> do
- $(logDebug) $ formatPid pid ph "Peer thread stopped"
- return True
- -- Try to reconnect
- when reconnect $ startReconnectPeer ph
- where
- maybeWaitReconnect pid = do
- reconnect <- atomicallyNodeT $ do
- sessM <- getHostSession ph
- case sessM of
- Just PeerHostSession{..} -> do
- -- Compute the new reconnection time (max 15 minutes)
- let reconnect = min 900 $ 2 * peerHostSessionReconnect
- -- Save the reconnection time
- modifyHostSession ph $ \s ->
- s{ peerHostSessionReconnect = reconnect }
- return reconnect
- _ -> return 0
-
- when (reconnect > 0) $ do
- $(logInfo) $ formatPid pid ph $ unwords
- [ "Reconnecting peer in", show reconnect, "seconds" ]
- -- Wait for some time before calling a reconnection
- liftIO $ threadDelay $ reconnect * 1000000
-
--- Start a peer with with the given peer host/peer id and initiate the
--- network protocol handshake. This function will block until the peer
--- connection is closed or an exception is raised.
-startPeerPid :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> NodeT m ()
-startPeerPid pid ph@PeerHost{..} = do
- -- Check if the peer host is banned
- banned <- atomicallyNodeT $ isPeerHostBanned ph
- when banned $ do
- $(logWarn) $ formatPid pid ph "Failed to start banned host"
- liftIO $ throwIO NodeExceptionBanned
-
- -- Check if the peer host is already connected
- connected <- atomicallyNodeT $ isPeerHostConnected ph
- when connected $ do
- $(logWarn) $ formatPid pid ph "This host is already connected"
- liftIO $ throwIO NodeExceptionConnected
-
- tid <- liftIO myThreadId
- chan <- liftIO . atomically $ newTBMChan 1024
- mChan <- liftIO . atomically $ newTBMChan 1024
- pings <- liftIO $ newTVarIO []
- atomicallyNodeT $ do
- newPeerSession pid PeerSession
- { peerSessionConnected = False
- , peerSessionVersion = Nothing
- , peerSessionHeight = 0
- , peerSessionChan = chan
- , peerSessionHost = ph
- , peerSessionThreadId = tid
- , peerSessionMerkleChan = mChan
- , peerSessionPings = pings
- , peerSessionScore = Nothing
- }
- newHostSession ph PeerHostSession
- { peerHostSessionScore = 0
- , peerHostSessionReconnect = 1
- , peerHostSessionLog = []
- }
-
- $(logDebug) $ formatPid pid ph "Starting a new client TCP connection"
-
- -- Start the client TCP connection
- let c = clientSettings peerPort $ C.pack peerHost
- runGeneralTCPClient c (peerTCPClient chan) `finally` cleanupPeer
- return ()
- where
- peerTCPClient chan ad = do
- -- Conduit for receiving messages from the remote host
- let recvMsg = appSource ad $$ decodeMessage pid ph
- -- Conduit for sending messages to the remote host
- sendMsg = sourceTBMChan chan $= encodeMessage $$ appSink ad
-
- withAsync (evalStateT recvMsg Nothing) $ \a1 -> link a1 >> do
- $(logDebug) $ formatPid pid ph
- "Receiving message thread started..."
- withAsync sendMsg $ \a2 -> link a2 >> do
- $(logDebug) $ formatPid pid ph
- "Sending message thread started..."
- -- Perform the peer handshake before we continue
- -- Timeout after 2 minutes
- resE <- raceTimeout 120 (disconnectPeer pid ph)
- (peerHandshake pid ph chan)
- case resE of
- Left _ -> $(logError) $ formatPid pid ph
- "Peer timed out during the connection handshake"
- _ -> do
- -- Send the bloom filter if we have one
- $(logDebug) $ formatPid pid ph
- "Sending the bloom filter if we have one"
- atomicallyNodeT $ do
- bloomM <- readTVarS sharedBloomFilter
- case bloomM of
- Just (bloom, _) ->
- sendMessage pid $
- MFilterLoad $ FilterLoad bloom
- _ -> return ()
- withAsync (peerPing pid ph) $ \a3 -> link a3 >> do
- $(logDebug) $ formatPid pid ph "Ping thread started"
- _ <- liftIO $ waitAnyCancel [a1, a2, a3]
- $(logDebug) $ formatPid pid ph "Exiting peer TCP thread"
- return ()
-
- cleanupPeer = do
- $(logWarn) $ formatPid pid ph "Peer is closing. Running cleanup..."
- atomicallyNodeT $ do
- -- Remove the header syncing peer if necessary
- hPidM <- readTVarS sharedHeaderPeer
- when (hPidM == Just pid) $ writeTVarS sharedHeaderPeer Nothing
- -- Remove the merkle syncing peer if necessary
- mPidM <- readTVarS sharedMerklePeer
- when (mPidM == Just pid) $ writeTVarS sharedMerklePeer Nothing
- -- Remove the session and close the channels
- sessM <- removePeerSession pid
- case sessM of
- Just PeerSession{..} -> lift $ do
- closeTBMChan peerSessionChan
- closeTBMChan peerSessionMerkleChan
- _ -> return ()
- -- Update the network height
- updateNetworkHeight
-
--- Return True if the PeerHost is banned
-isPeerHostBanned :: PeerHost -> NodeT STM Bool
-isPeerHostBanned ph = do
- hostMap <- readTVarS sharedHostMap
- case M.lookup ph hostMap of
- Just sessTVar -> do
- sess <- lift $ readTVar sessTVar
- return $ isHostScoreBanned $ peerHostSessionScore sess
- _ -> return False
-
--- Returns True if we have a peer connected to that PeerHost already
-isPeerHostConnected :: PeerHost -> NodeT STM Bool
-isPeerHostConnected ph = do
- peerMap <- readTVarS sharedPeerMap
- sess <- lift $ mapM readTVar $ M.elems peerMap
- return $ ph `elem` map peerSessionHost sess
-
--- | Decode messages sent from the remote host and send them to the peers main
--- message queue for processing. If we receive invalid messages, this function
--- will also notify the PeerManager about a misbehaving remote host.
-decodeMessage
- :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> Sink BS.ByteString (StateT (Maybe (MerkleBlock, MerkleTxs)) (NodeT m)) ()
-decodeMessage pid ph = do
- -- Message header is always 24 bytes
- headerBytes <- BL.toStrict <$> CB.take 24
- -- If headerBytes is empty, the conduit has disconnected and we need to
- -- exit (not recurse). Otherwise, we go into an infinite loop here.
- unless (BS.null headerBytes) $ do
- -- Introspection required to know the length of the payload
- case decode headerBytes of
- Left err -> lift . lift $ misbehaving pid ph moderateDoS $ unwords
- [ "Could not decode message header:", err
- , "Bytes:", cs (encodeHex headerBytes)
- ]
- Right (MessageHeader _ cmd len _) -> do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received message header of type", show cmd ]
- payloadBytes <- BL.toStrict <$> CB.take (fromIntegral len)
- case decode $ headerBytes `BS.append` payloadBytes of
- Left err -> lift . lift $ misbehaving pid ph moderateDoS $
- unwords [ "Could not decode message payload:", err ]
- Right msg -> lift $ processMessage pid ph msg
- decodeMessage pid ph
-
--- Handle a message from a peer
-processMessage :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> Message
- -> StateT (Maybe (MerkleBlock, MerkleTxs)) (NodeT m) ()
-processMessage pid ph msg = checkMerkleEnd >> case msg of
- MVersion v -> lift $ do
- $(logDebug) $ formatPid pid ph "Processing MVersion message"
- join . atomicallyNodeT $ do
- oldVerM <- peerSessionVersion <$> getPeerSession pid
- case oldVerM of
- Just _ -> do
- _ <- trySendMessage pid $ MReject $ reject
- MCVersion RejectDuplicate "Duplicate version message"
- return $
- misbehaving pid ph minorDoS "Duplicate version message"
- Nothing -> do
- modifyPeerSession pid $ \s ->
- s{ peerSessionVersion = Just v }
- return $ return ()
- $(logDebug) $ formatPid pid ph "Done processing MVersion message"
- MPing (Ping nonce) -> lift $ do
- $(logDebug) $ formatPid pid ph "Processing MPing message"
- -- Just reply to the Ping with a Pong message
- _ <- atomicallyNodeT $ trySendMessage pid $ MPong $ Pong nonce
- return ()
- MPong (Pong nonce) -> lift $ do
- $(logDebug) $ formatPid pid ph "Processing MPong message"
- atomicallyNodeT $ do
- PeerSession{..} <- getPeerSession pid
- -- Add the Pong response time
- lift $ modifyTVar' peerSessionPings (++ [nonce])
- MHeaders h -> lift $ do
- $(logDebug) $ formatPid pid ph "Processing MHeaders message"
- _ <- atomicallyNodeT $ tryPutTMVarS sharedHeaders (pid, h)
- return ()
- MInv inv -> lift $ do
- $(logDebug) $ formatPid pid ph "Processing MInv message"
- processInvMessage pid ph inv
- MGetData (GetData inv) -> do
- $(logDebug) $ formatPid pid ph "Processing MGetData message"
- let txlist = filter ((== InvTx) . invType) inv
- txids = nub $ map (TxHash . invHash) txlist
- $(logDebug) $ formatPid pid ph $ unlines $
- "Received GetData request for transactions"
- : map ((" " ++) . cs . txHashToHex) txids
- -- Add the txids to the GetData request map
- mapTVar <- asks sharedTxGetData
- liftIO . atomically $ modifyTVar' mapTVar $ \datMap ->
- let newMap = M.fromList $ map (\tid -> (tid, [(pid, ph)])) txids
- in M.unionWith (\x -> nub . (x ++)) newMap datMap
- MTx tx -> do
- $(logDebug) $ formatPid pid ph "Processing MTx message"
- PeerSession{..} <- lift . atomicallyNodeT $ getPeerSession pid
- txChan <- lift $ asks sharedTxChan
- get >>= \merkleM -> case merkleM of
- Just (_, mTxs) -> if txHash tx `elem` mTxs
- then do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received merkle tx", cs $ txHashToHex $ txHash tx ]
- liftIO . atomically $
- writeTBMChan peerSessionMerkleChan $ Right tx
- else do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received tx broadcast (ending a merkle block)"
- , cs $ txHashToHex $ txHash tx
- ]
- endMerkle
- liftIO . atomically $ writeTBMChan txChan (pid, ph, tx)
- _ -> do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received tx broadcast", cs $ txHashToHex $ txHash tx ]
- liftIO . atomically $ writeTBMChan txChan (pid, ph, tx)
- MMerkleBlock mb@(MerkleBlock mHead ntx hs fs) -> do
- $(logDebug) $ formatPid pid ph "Processing MMerkleBlock message"
- case extractMatches fs hs (fromIntegral ntx) of
- Left err -> lift $ misbehaving pid ph severeDoS $ unwords
- [ "Received an invalid merkle block:", err ]
- Right (decodedRoot, mTxs) ->
- -- Make sure that the merkle roots match
- if decodedRoot == merkleRoot mHead
- then do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received valid merkle block"
- , cs $ blockHashToHex $ headerHash mHead
- ]
- forM_ mTxs $ \h ->
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Matched merkle tx:", cs $ txHashToHex h ]
- if null mTxs
- -- Deliver the merkle block
- then lift . atomicallyNodeT $ do
- PeerSession{..} <- getPeerSession pid
- lift $ writeTBMChan peerSessionMerkleChan $
- Left (mb, [])
- -- Buffer the merkle block until we received all txs
- else put $ Just (mb, mTxs)
- else lift $ misbehaving pid ph severeDoS
- "Received a merkle block with an invalid merkle root"
- _ -> return () -- Ignore other requests
- where
- checkMerkleEnd = unless (isTxMsg msg) endMerkle
- endMerkle = get >>= \merkleM -> case merkleM of
- Just (mb, mTxs) -> do
- lift . atomicallyNodeT $ do
- PeerSession{..} <- getPeerSession pid
- lift $ writeTBMChan peerSessionMerkleChan $ Left (mb, mTxs)
- put Nothing
- _ -> return ()
- isTxMsg (MTx _) = True
- isTxMsg _ = False
-
-processInvMessage :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> Inv
- -> NodeT m ()
-processInvMessage pid ph (Inv vs) = case tickleM of
- Just tickle -> do
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Received block tickle", cs $ blockHashToHex tickle ]
- tickleChan <- asks sharedTickleChan
- liftIO $ atomically $ writeTBMChan tickleChan (pid, ph, tickle)
- _ -> do
- unless (null txlist) $ do
- forM_ txlist $ \tid -> $(logDebug) $ formatPid pid ph $ unwords
- [ "Received transaction INV", cs (txHashToHex tid) ]
- -- We simply request the transactions.
- -- TODO: Should we do something more elaborate here?
- atomicallyNodeT $ sendMessage pid $ MGetData $ GetData $
- map (InvVector InvTx . getTxHash) txlist
- unless (null blocklist) $ do
- $(logDebug) $ formatPid pid ph $ unlines $
- "Received block INV"
- : map ((" " ++) . cs . blockHashToHex) blocklist
- -- We ignore block INVs as we do headers-first sync
- return ()
- where
- -- Single blockhash INV is a tickle
- tickleM = case blocklist of
- [h] -> if null txlist then Just h else Nothing
- _ -> Nothing
- txlist :: [TxHash]
- txlist = map (TxHash . invHash) $
- filter ((== InvTx) . invType) vs
- blocklist :: [BlockHash]
- blocklist = map (BlockHash . invHash) $ filter ((== InvBlock) . invType) vs
-
--- | Encode message that are being sent to the remote host.
-encodeMessage :: MonadLoggerIO m
- => Conduit Message (NodeT m) BS.ByteString
-encodeMessage = awaitForever $ yield . encode
-
-peerPing :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> NodeT m ()
-peerPing pid ph = forever $ do
- $(logDebug) $ formatPid pid ph
- "Waiting until the peer is available for sending pings..."
- atomicallyNodeT $ waitPeerAvailable pid
-
- nonce <- liftIO randomIO
- nonceTVar <- atomicallyNodeT $ do
- PeerSession{..} <- getPeerSession pid
- sendMessage pid $ MPing $ Ping nonce
- return peerSessionPings
-
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Waiting for Ping nonce", show nonce ]
- -- Wait 120 seconds for the pong or time out
- startTime <- liftIO getCurrentTime
- resE <- raceTimeout 120 (killPeer nonce) (waitPong nonce nonceTVar)
- case resE of
- Right _ -> do
- endTime <- liftIO getCurrentTime
- (diff, score) <- atomicallyNodeT $ do
- PeerSession{..} <- getPeerSession pid
- -- Compute the ping time and the new score
- let diff = diffUTCTime endTime startTime
- score = 0.5 * diff + 0.5 * fromMaybe diff peerSessionScore
- -- Save the score in the peer session unless the peer is busy
- modifyPeerSession pid $ \s -> s{ peerSessionScore = Just score }
- return (diff, score)
- $(logDebug) $ formatPid pid ph $ unwords
- [ "Got response to ping", show nonce
- , "with time", show diff, "and score", show score
- ]
- _ -> return ()
-
- -- Sleep 30 seconds before sending the next ping
- liftIO $ threadDelay $ 30 * 1000000
- where
- -- Wait for the Pong message of our Ping nonce to arrive
- waitPong nonce nonceTVar = do
- ns <- liftIO . atomically $ do
- ns <- swapTVar nonceTVar []
- if null ns then retry else return ns
- unless (nonce `elem` ns) $ waitPong nonce nonceTVar
- killPeer nonce = do
- $(logWarn) $ formatPid pid ph $ concat
- [ "Did not receive a timely reply for Ping ", show nonce
- , ". Reconnecting the peer."
- ]
- disconnectPeer pid ph
-
-isBloomDisabled :: Version -> Bool
-isBloomDisabled ver = version ver >= 70011 && not (services ver `testBit` 2)
-
-peerHandshake :: (MonadLoggerIO m, MonadBaseControl IO m)
- => PeerId
- -> PeerHost
- -> TBMChan Message
- -> NodeT m ()
-peerHandshake pid ph chan = do
- ourVer <- buildVersion
- $(logDebug) $ formatPid pid ph "Sending our version message"
- liftIO . atomically $ writeTBMChan chan $ MVersion ourVer
- -- Wait for the peer version message to arrive
- $(logDebug) $ formatPid pid ph "Waiting for the peers version message..."
- peerVer <- atomicallyNodeT $ waitPeerVersion pid
- $(logInfo) $ formatPid pid ph $ unlines
- [ unwords [ "Connected to peer host"
- , show $ naAddress $ addrSend peerVer
- ]
- , unwords [ " version :", show $ version peerVer ]
- , unwords [ " subVer :", show $ userAgent peerVer ]
- , unwords [ " services :", show $ services peerVer ]
- , unwords [ " time :", show $ timestamp peerVer ]
- , unwords [ " blocks :", show $ startHeight peerVer ]
- ]
-
- -- Check the protocol version
- go peerVer $ do
- atomicallyNodeT $ do
- -- Save the peers height and update the network height
- modifyPeerSession pid $ \s ->
- s{ peerSessionHeight = startHeight peerVer
- , peerSessionConnected = True
- }
- updateNetworkHeight
- -- Reset the reconnection timer (exponential backoff)
- modifyHostSession ph $ \s ->
- s{ peerHostSessionReconnect = 1 }
- -- ACK the version message
- lift $ writeTBMChan chan MVerAck
- $(logDebug) $ formatPid pid ph "Handshake complete"
- where
- go ver action
- | version ver < minProtocolVersion =
- misbehaving pid ph severeDoS $ unwords
- [ "Connected to a peer speaking protocol version"
- , show $ version ver
- , "but we require at least"
- , show minProtocolVersion
- ]
- | isBloomDisabled ver =
- misbehaving pid ph severeDoS "Peer does not support bloom filters"
- | otherwise = action
- buildVersion = do
- -- TODO: Get our correct IP here
- let add = NetworkAddress 1 $ SockAddrInet 0 0
- ua = VarString haskoinUserAgent
- time <- floor <$> liftIO getPOSIXTime
- rdmn <- liftIO randomIO -- nonce
- height <- nodeBlockHeight <$> atomicallyNodeT (readTVarS sharedBestHeader)
- return Version { version = 70011
- , services = 5
- , timestamp = time
- , addrRecv = add
- , addrSend = add
- , verNonce = rdmn
- , userAgent = ua
- , startHeight = height
- , relay = False
- }
-
--- Wait for the version message of a peer and return it
-waitPeerVersion :: PeerId -> NodeT STM Version
-waitPeerVersion pid = do
- PeerSession{..} <- getPeerSession pid
- case peerSessionVersion of
- Just ver -> return ver
- _ -> lift retry
-
--- Delete the session of a peer and send a kill signal to the peers thread.
--- Unless the peer is banned, the peer will try to reconnect.
-disconnectPeer :: (MonadLoggerIO m)
- => PeerId
- -> PeerHost
- -> NodeT m ()
-disconnectPeer pid ph = do
- sessM <- atomicallyNodeT $ tryGetPeerSession pid
- case sessM of
- Just PeerSession{..} -> do
- $(logDebug) $ formatPid pid ph "Killing the peer thread"
- liftIO $ killThread peerSessionThreadId
- _ -> return ()
-
-{- Peer utility functions -}
-
---- Wait until the given peer is not syncing headers or merkle blocks
-waitPeerAvailable :: PeerId -> NodeT STM ()
-waitPeerAvailable pid = do
- hPidM <- readTVarS sharedHeaderPeer
- mPidM <- readTVarS sharedMerklePeer
- when (Just pid `elem` [hPidM, mPidM]) $ lift retry
-
--- Wait for a non-empty bloom filter to be available
-waitBloomFilter :: NodeT STM BloomFilter
-waitBloomFilter =
- maybe (lift retry) (return . fst) =<< readTVarS sharedBloomFilter
-
-sendBloomFilter :: BloomFilter -> Int -> NodeT STM ()
-sendBloomFilter bloom elems = unless (isBloomEmpty bloom) $ do
- oldBloomM <- readTVarS sharedBloomFilter
- let oldElems = maybe 0 snd oldBloomM
- -- Only update the bloom filter if the number of elements is larger
- when (elems > oldElems) $ do
- writeTVarS sharedBloomFilter $ Just (bloom, elems)
- sendMessageAll $ MFilterLoad $ FilterLoad bloom
-
--- Returns the median height of all the peers
-getMedianHeight :: NodeT STM BlockHeight
-getMedianHeight = do
- hs <- map (peerSessionHeight . snd) <$> getConnectedPeers
- let (_,ms) = splitAt (length hs `div` 2) $ sort hs
- return $ fromMaybe 0 $ listToMaybe ms
-
--- Set the network height to the median height of all peers.
-updateNetworkHeight :: NodeT STM ()
-updateNetworkHeight = writeTVarS sharedNetworkHeight =<< getMedianHeight
-
-getPeers :: NodeT STM [(PeerId, PeerSession)]
-getPeers = do
- peerMap <- readTVarS sharedPeerMap
- lift $ mapM f $ M.assocs peerMap
- where
- f (pid, sess) = (,) pid <$> readTVar sess
-
-getConnectedPeers :: NodeT STM [(PeerId, PeerSession)]
-getConnectedPeers = filter (peerSessionConnected . snd) <$> getPeers
-
--- Returns a peer that is connected, at the network height and
--- with the best score.
-getPeersAtNetHeight :: NodeT STM [(PeerId, PeerSession)]
-getPeersAtNetHeight = do
- -- Find the current network height
- height <- readTVarS sharedNetworkHeight
- getPeersAtHeight (== height)
-
--- Find the best peer at the given height
-getPeersAtHeight :: (BlockHeight -> Bool)
- -> NodeT STM [(PeerId, PeerSession)]
-getPeersAtHeight cmpHeight = do
- peers <- filter f <$> getPeers
- -- Choose the peer with the best score
- return $ sortBy s peers
- where
- f (_, p) =
- peerSessionConnected p && -- Only connected peers
- isJust (peerSessionScore p) && -- Only peers with scores
- cmpHeight (peerSessionHeight p) -- Only peers at the required height
- s (_,a) (_,b) = peerSessionScore a `compare` peerSessionScore b
-
--- Send a message to a peer only if it is connected. It returns True on
--- success.
-trySendMessage :: PeerId -> Message -> NodeT STM Bool
-trySendMessage pid msg = do
- sessM <- tryGetPeerSession pid
- lift $ case sessM of
- Just PeerSession{..} ->
- if peerSessionConnected
- then writeTBMChan peerSessionChan msg >> return True
- else return False -- The peer is not yet connected
- _ -> return False -- The peer does not exist
-
--- Send a message to a peer only if it is connected. It returns True on
--- success. Throws an exception if the peer does not exist or is not connected.
-sendMessage :: PeerId -> Message -> NodeT STM ()
-sendMessage pid msg = do
- PeerSession{..} <- getPeerSession pid
- if peerSessionConnected
- then lift $ writeTBMChan peerSessionChan msg
- else throw $ NodeExceptionPeerNotConnected $ ShowPeerId pid
-
--- Send a message to all connected peers.
-sendMessageAll :: Message -> NodeT STM ()
-sendMessageAll msg = do
- peerMap <- readTVarS sharedPeerMap
- forM_ (M.keys peerMap) $ \pid -> trySendMessage pid msg
-
-getNetworkHeight :: NodeT STM BlockHeight
-getNetworkHeight = readTVarS sharedNetworkHeight
-
-misbehaving :: (MonadLoggerIO m)
- => PeerId
- -> PeerHost
- -> (PeerHostScore -> PeerHostScore)
- -> String
- -> NodeT m ()
-misbehaving pid ph f msg = do
- sessM <- atomicallyNodeT $ do
- modifyHostSession ph $ \s ->
- s{ peerHostSessionScore = f $! peerHostSessionScore s
- , peerHostSessionLog = msg : peerHostSessionLog s
- }
- getHostSession ph
- case sessM of
- Just PeerHostSession{..} -> do
- $(logWarn) $ formatPid pid ph $ unlines
- [ "Misbehaving peer"
- , unwords [ " Score:", show peerHostSessionScore ]
- , unwords [ " Reason:", msg ]
- ]
- when (isHostScoreBanned peerHostSessionScore) $
- disconnectPeer pid ph
- _ -> return ()
-
-{- Run header tree database action -}
-
--- runHeaderTree :: MonadIO m => ReaderT L.DB IO a -> NodeT m a
--- runHeaderTree action = undefined
-
-{- Utilities -}
-
-raceTimeout :: (MonadIO m, MonadBaseControl IO m)
- => Int
- -- ^ Timeout value in seconds
- -> m a
- -- ^ Action to run if the main action times out
- -> m b
- -- ^ Action to run until the time runs out
- -> m (Either a b)
-raceTimeout sec cleanup action = do
- resE <- race (liftIO $ threadDelay (sec * 1000000)) action
- case resE of
- Right res -> return $ Right res
- Left _ -> fmap Left cleanup
-
-formatPid :: PeerId -> PeerHost -> String -> Text
-formatPid pid ph str = pack $ concat
- [ "[Peer ", show $ hashUnique pid
- , " | ", peerHostString ph, "] ", str
- ]
-
diff --git a/Network/Haskoin/Node/STM.hs b/Network/Haskoin/Node/STM.hs
deleted file mode 100644
index 818c490..0000000
--- a/Network/Haskoin/Node/STM.hs
+++ /dev/null
@@ -1,412 +0,0 @@
-{-# LANGUAGE KindSignatures #-}
-{-# LANGUAGE MultiParamTypeClasses #-}
-{-# LANGUAGE RankNTypes #-}
-{-# LANGUAGE TemplateHaskell #-}
-module Network.Haskoin.Node.STM where
-
-import Control.Concurrent (ThreadId)
-import Control.Concurrent.STM (STM, TMVar, TVar, atomically,
- isEmptyTMVar, modifyTVar',
- newEmptyTMVarIO, newTVar,
- newTVarIO, orElse, putTMVar,
- readTMVar, readTVar,
- takeTMVar, tryPutTMVar,
- tryReadTMVar, writeTVar)
-import Control.Concurrent.STM.Lock (Lock)
-import qualified Control.Concurrent.STM.Lock as Lock (new)
-import Control.Concurrent.STM.TBMChan (TBMChan, closeTBMChan,
- newTBMChan)
-import Control.DeepSeq (NFData (..))
-import Control.Exception.Lifted (Exception, SomeException,
- catch, fromException, throw)
-import Control.Monad ((<=<))
-import Control.Monad.Logger (MonadLoggerIO, logDebug)
-import Control.Monad.Reader (ReaderT, ask, asks,
- runReaderT)
-import Control.Monad.Trans (MonadIO, lift, liftIO)
-import Control.Monad.Trans.Control (MonadBaseControl)
-import Data.Aeson.TH (deriveJSON)
-import qualified Data.Map.Strict as M (Map, delete, empty,
- insert, lookup)
-import Data.Maybe (isJust)
-import Data.Time.Clock (NominalDiffTime)
-import Data.Typeable (Typeable)
-import Data.Unique (Unique, hashUnique)
-import Data.Word (Word32, Word64)
-import Database.Persist.Sql (ConnectionPool, SqlBackend,
- SqlPersistT, runSqlConn,
- runSqlPool)
-import Network.Haskoin.Block
-import Network.Haskoin.Node
-import Network.Haskoin.Node.HeaderTree
-import Network.Haskoin.Transaction
-import Network.Haskoin.Util
-
-{- Type aliases -}
-
-type MerkleTxs = [TxHash]
-type NodeT = ReaderT SharedNodeState
-type PeerId = Unique
-type PeerHostScore = Word32
-
-newtype ShowPeerId = ShowPeerId { getShowPeerId :: PeerId }
- deriving (Eq)
-
-instance Show ShowPeerId where
- show = show . hashUnique . getShowPeerId
-
-runSql :: (MonadBaseControl IO m)
- => SqlPersistT m a
- -> Either SqlBackend ConnectionPool
- -> m a
-runSql f (Left conn) = runSqlConn f conn
-runSql f (Right pool) = runSqlPool f pool
-
-runSqlNodeT :: (MonadBaseControl IO m) => SqlPersistT m a -> NodeT m a
-runSqlNodeT f = asks sharedSqlBackend >>= lift . runSql f
-
-getNodeState :: (MonadLoggerIO m, MonadBaseControl IO m)
- => Either SqlBackend ConnectionPool
- -> m SharedNodeState
-getNodeState sharedSqlBackend = do
- -- Initialize the HeaderTree
- $(logDebug) "Initializing the HeaderTree and NodeState"
- best <- runSql (initHeaderTree >> getBestBlock) sharedSqlBackend
- liftIO $ do
- sharedPeerMap <- newTVarIO M.empty
- sharedHostMap <- newTVarIO M.empty
- sharedNetworkHeight <- newTVarIO 0
- sharedHeaders <- newEmptyTMVarIO
- sharedHeaderPeer <- newTVarIO Nothing
- sharedMerklePeer <- newTVarIO Nothing
- sharedSyncLock <- atomically Lock.new
- sharedTickleChan <- atomically $ newTBMChan 1024
- sharedTxChan <- atomically $ newTBMChan 1024
- sharedTxGetData <- newTVarIO M.empty
- sharedRescan <- newEmptyTMVarIO
- sharedMempool <- newTVarIO False
- sharedBloomFilter <- newTVarIO Nothing
- -- Find our best node in the HeaderTree
- sharedBestHeader <- newTVarIO best
- sharedBestBlock <- newTVarIO genesisBlock
- return SharedNodeState{..}
-
-runNodeT :: Monad m => NodeT m a -> SharedNodeState -> m a
-runNodeT = runReaderT
-
-withNodeT :: (MonadLoggerIO m, MonadBaseControl IO m)
- => NodeT m a
- -> Either SqlBackend ConnectionPool
- -> m a
-withNodeT action sql = runNodeT action =<< getNodeState sql
-
-atomicallyNodeT :: MonadIO m => NodeT STM a -> NodeT m a
-atomicallyNodeT action = liftIO . atomically . runReaderT action =<< ask
-
-{- PeerHost Session -}
-
-data PeerHostSession = PeerHostSession
- { peerHostSessionScore :: !PeerHostScore
- , peerHostSessionReconnect :: !Int
- , peerHostSessionLog :: ![String]
- -- ^ Important host log messages that should appear in status command
- }
-
-instance NFData PeerHostSession where
- rnf PeerHostSession{..} =
- rnf peerHostSessionScore `seq`
- rnf peerHostSessionReconnect `seq`
- rnf peerHostSessionLog
-
-{- Shared Peer STM Type -}
-
-data SharedNodeState = SharedNodeState
- { sharedPeerMap :: !(TVar (M.Map PeerId (TVar PeerSession)))
- -- ^ Map of all active peers and their sessions
- , sharedHostMap :: !(TVar (M.Map PeerHost (TVar PeerHostSession)))
- -- ^ The peer that is currently syncing the block headers
- , sharedNetworkHeight :: !(TVar BlockHeight)
- -- ^ The current height of the network
- , sharedHeaders :: !(TMVar (PeerId, Headers))
- -- ^ Block headers sent from a peer
- , sharedHeaderPeer :: !(TVar (Maybe PeerId))
- -- ^ Peer currently syncing headers
- , sharedMerklePeer :: !(TVar (Maybe PeerId))
- -- ^ Peer currently downloading merkle blocks
- , sharedSyncLock :: !Lock
- -- ^ Lock on the header syncing process
- , sharedBestHeader :: !(TVar NodeBlock)
- -- ^ Our best block header
- , sharedBestBlock :: !(TVar NodeBlock)
- -- ^ Our best merkle block's height
- , sharedTxGetData :: !(TVar (M.Map TxHash [(PeerId, PeerHost)]))
- -- ^ List of Tx GetData requests
- , sharedBloomFilter :: !(TVar (Maybe (BloomFilter, Int)))
- -- ^ Bloom filter
- , sharedTickleChan :: !(TBMChan (PeerId, PeerHost, BlockHash))
- -- ^ Channel containing all the block tickles received from peers
- , sharedTxChan :: !(TBMChan (PeerId, PeerHost, Tx))
- -- ^ Transaction channel
- , sharedRescan :: !(TMVar (Either Timestamp BlockHeight))
- -- ^ Rescan requests from a timestamp or from a block height
- , sharedMempool :: !(TVar Bool)
- -- ^ Did we do a Mempool sync ?
- , sharedSqlBackend :: !(Either SqlBackend ConnectionPool)
- }
-
-{- Peer Data -}
-
-type PingNonce = Word64
-
--- Data stored about a peer
-data PeerSession = PeerSession
- { peerSessionConnected :: !Bool
- -- ^ True if the peer is connected (completed the handshake)
- , peerSessionVersion :: !(Maybe Version)
- -- ^ Contains the version message that we received from the peer
- , peerSessionHeight :: !BlockHeight
- -- ^ Current known height of the peer
- , peerSessionChan :: !(TBMChan Message)
- -- ^ Message channel to send messages to the peer
- , peerSessionHost :: !PeerHost
- -- ^ Host to which this peer is connected
- , peerSessionThreadId :: !ThreadId
- -- ^ Peer ThreadId
- , peerSessionMerkleChan :: !(TBMChan (Either (MerkleBlock, MerkleTxs) Tx))
- -- ^ Merkle block/Merkle transaction channel
- , peerSessionPings :: !(TVar [PingNonce])
- -- ^ Time at which we requested pings
- , peerSessionScore :: !(Maybe NominalDiffTime)
- -- ^ Ping scores for this peer (round trip times)
- }
-
-instance NFData PeerSession where
- rnf PeerSession{..} =
- rnf peerSessionConnected `seq`
- rnf peerSessionVersion `seq`
- rnf peerSessionHeight `seq`
- peerSessionChan `seq`
- rnf peerSessionHost `seq`
- peerSessionThreadId `seq` ()
-
-{- Peer Hosts -}
-
-data PeerHost = PeerHost
- { peerHost :: !String
- , peerPort :: !Int
- }
- deriving (Eq, Ord)
-
-$(deriveJSON (dropFieldLabel 4) ''PeerHost)
-
-peerHostString :: PeerHost -> String
-peerHostString PeerHost{..} = concat [ peerHost, ":", show peerPort ]
-
-instance NFData PeerHost where
- rnf PeerHost{..} =
- rnf peerHost `seq`
- rnf peerPort
-
-{- Node Status -}
-
-data PeerStatus = PeerStatus
- -- Regular fields
- { peerStatusPeerId :: !Int
- , peerStatusHost :: !PeerHost
- , peerStatusConnected :: !Bool
- , peerStatusHeight :: !BlockHeight
- , peerStatusProtocol :: !(Maybe Word32)
- , peerStatusUserAgent :: !(Maybe String)
- , peerStatusPing :: !(Maybe String)
- , peerStatusDoSScore :: !(Maybe PeerHostScore)
- -- Debug fields
- , peerStatusHaveMerkles :: !Bool
- , peerStatusHaveMessage :: !Bool
- , peerStatusPingNonces :: ![PingNonce]
- , peerStatusReconnectTimer :: !(Maybe Int)
- , peerStatusLog :: !(Maybe [String])
- }
-
-$(deriveJSON (dropFieldLabel 10) ''PeerStatus)
-
-data NodeStatus = NodeStatus
- -- Regular fields
- { nodeStatusPeers :: ![PeerStatus]
- , nodeStatusNetworkHeight :: !BlockHeight
- , nodeStatusBestHeader :: !BlockHash
- , nodeStatusBestHeaderHeight :: !BlockHeight
- , nodeStatusBestBlock :: !BlockHash
- , nodeStatusBestBlockHeight :: !BlockHeight
- , nodeStatusBloomSize :: !Int
- -- Debug fields
- , nodeStatusHeaderPeer :: !(Maybe Int)
- , nodeStatusMerklePeer :: !(Maybe Int)
- , nodeStatusHaveHeaders :: !Bool
- , nodeStatusHaveTickles :: !Bool
- , nodeStatusHaveTxs :: !Bool
- , nodeStatusGetData :: ![TxHash]
- , nodeStatusRescan :: !(Maybe (Either Timestamp BlockHeight))
- , nodeStatusMempool :: !Bool
- , nodeStatusSyncLock :: !Bool
- }
-
-$(deriveJSON (dropFieldLabel 10) ''NodeStatus)
-
-{- Getters / Setters -}
-
-tryGetPeerSession :: PeerId -> NodeT STM (Maybe PeerSession)
-tryGetPeerSession pid = do
- peerMap <- readTVarS sharedPeerMap
- case M.lookup pid peerMap of
- Just sessTVar -> fmap Just $ lift $ readTVar sessTVar
- _ -> return Nothing
-
-getPeerSession :: PeerId -> NodeT STM PeerSession
-getPeerSession pid = do
- sessM <- tryGetPeerSession pid
- case sessM of
- Just sess -> return sess
- _ -> throw $ NodeExceptionInvalidPeer $ ShowPeerId pid
-
-newPeerSession :: PeerId -> PeerSession -> NodeT STM ()
-newPeerSession pid sess = do
- peerMapTVar <- asks sharedPeerMap
- peerMap <- lift $ readTVar peerMapTVar
- case M.lookup pid peerMap of
- Just _ -> return ()
- Nothing -> do
- sessTVar <- lift $ newTVar sess
- let newMap = M.insert pid sessTVar peerMap
- lift $ writeTVar peerMapTVar $! newMap
-
-modifyPeerSession :: PeerId -> (PeerSession -> PeerSession) -> NodeT STM ()
-modifyPeerSession pid f = do
- peerMap <- readTVarS sharedPeerMap
- case M.lookup pid peerMap of
- Just sessTVar -> lift $ modifyTVar' sessTVar f
- _ -> return ()
-
-removePeerSession :: PeerId -> NodeT STM (Maybe PeerSession)
-removePeerSession pid = do
- peerMapTVar <- asks sharedPeerMap
- peerMap <- lift $ readTVar peerMapTVar
- -- Close the peer TBMChan
- sessM <- case M.lookup pid peerMap of
- Just sessTVar -> lift $ do
- sess@PeerSession{..} <- readTVar sessTVar
- closeTBMChan peerSessionChan
- return $ Just sess
- _ -> return Nothing
- -- Remove the peer from the peerMap
- let newMap = M.delete pid peerMap
- lift $ writeTVar peerMapTVar $! newMap
- return sessM
-
-getHostSession :: PeerHost
- -> NodeT STM (Maybe PeerHostSession)
-getHostSession ph = do
- hostMap <- readTVarS sharedHostMap
- lift $ case M.lookup ph hostMap of
- Just hostSessionTVar -> Just <$> readTVar hostSessionTVar
- _ -> return Nothing
-
-modifyHostSession :: PeerHost
- -> (PeerHostSession -> PeerHostSession)
- -> NodeT STM ()
-modifyHostSession ph f = do
- hostMap <- readTVarS sharedHostMap
- case M.lookup ph hostMap of
- Just hostSessionTVar -> lift $ modifyTVar' hostSessionTVar f
- _ -> newHostSession ph $!
- f PeerHostSession { peerHostSessionScore = 0
- , peerHostSessionReconnect = 1
- , peerHostSessionLog = []
- }
-
-newHostSession :: PeerHost -> PeerHostSession -> NodeT STM ()
-newHostSession ph session = do
- hostMapTVar <- asks sharedHostMap
- hostMap <- lift $ readTVar hostMapTVar
- case M.lookup ph hostMap of
- Just _ -> return ()
- Nothing -> lift $ do
- hostSessionTVar <- newTVar session
- let newHostMap = M.insert ph hostSessionTVar hostMap
- writeTVar hostMapTVar $! newHostMap
-
-{- Host DOS Scores -}
-
-bannedScore :: PeerHostScore
-bannedScore = 100
-
-minorDoS :: PeerHostScore -> PeerHostScore
-minorDoS = (+ 1)
-
-moderateDoS :: PeerHostScore -> PeerHostScore
-moderateDoS = (+ 10)
-
-severeDoS :: PeerHostScore -> PeerHostScore
-severeDoS = (+ bannedScore)
-
-isHostScoreBanned :: PeerHostScore -> Bool
-isHostScoreBanned = (>= bannedScore)
-
-{- STM Utilities -}
-
-orElseNodeT :: NodeT STM a -> NodeT STM a -> NodeT STM a
-orElseNodeT a b = do
- s <- ask
- lift $ runNodeT a s `orElse` runNodeT b s
-
-{- TVar Utilities -}
-
-readTVarS :: (SharedNodeState -> TVar a) -> NodeT STM a
-readTVarS = lift . readTVar <=< asks
-
-writeTVarS :: (SharedNodeState -> TVar a) -> a -> NodeT STM ()
-writeTVarS f val = lift . flip writeTVar val =<< asks f
-
-{- TMVar Utilities -}
-
-takeTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM a
-takeTMVarS = lift . takeTMVar <=< asks
-
-readTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM a
-readTMVarS = lift . readTMVar <=< asks
-
-tryReadTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM (Maybe a)
-tryReadTMVarS = lift . tryReadTMVar <=< asks
-
-putTMVarS :: (SharedNodeState -> TMVar a) -> a -> NodeT STM ()
-putTMVarS f val = lift . flip putTMVar val =<< asks f
-
-tryPutTMVarS :: (SharedNodeState -> TMVar a) -> a -> NodeT STM Bool
-tryPutTMVarS f val = lift . flip tryPutTMVar val =<< asks f
-
-swapTMVarS :: (SharedNodeState -> TMVar a) -> a -> NodeT STM ()
-swapTMVarS f val = lift . flip putTMVar val =<< asks f
-
-isEmptyTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM Bool
-isEmptyTMVarS f = lift . isEmptyTMVar =<< asks f
-
-data NodeException
- = NodeExceptionBanned
- | NodeExceptionConnected
- | NodeExceptionInvalidPeer !ShowPeerId
- | NodeExceptionPeerNotConnected !ShowPeerId
- | NodeException !String
- deriving (Show, Typeable)
-
-instance Exception NodeException
-
-isNodeException :: SomeException -> Bool
-isNodeException se = isJust (fromException se :: Maybe NodeException)
-
-catchAny :: MonadBaseControl IO m
- => m a -> (SomeException -> m a) -> m a
-catchAny = catch
-
-catchAny_ :: MonadBaseControl IO m
- => m () -> m ()
-catchAny_ = flip catchAny $ \_ -> return ()
-
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..73f0f08
--- /dev/null
+++ b/README.md
@@ -0,0 +1,4 @@
+# Haskoin Node
+
+Haskoin Node is a peer-to-peer library for Bitcoin and Bitcoin Cash. It uses a RocksDB database to store blockchain headers and peers.
+
diff --git a/haskoin-node.cabal b/haskoin-node.cabal
index 7f91309..654879b 100644
--- a/haskoin-node.cabal
+++ b/haskoin-node.cabal
@@ -1,110 +1,88 @@
-name: haskoin-node
-version: 0.4.2
-synopsis:
- Implementation of a Bitoin node.
-description:
- haskoin-node provides an implementation of the Bitcoin network protocol
- that allows you to synchronize headers (with SPV validation) and download
- merkle blocks and full blocks. This package can be used to implement
- wallets or other Bitcoin components that require talking to the Bitcoin
- network. It provides the following features:
- .
- * Implementation of the Bitcoin network protocol
- * Headertree implementation with SPV verification
- * Headers-first synchronization
- * Merkle block download from peers with bloom filters
- * Full block download from peers
- .
- A wallet implementation using this package is available in haskoin-wallet.
+-- This file has been generated from package.yaml by hpack version 0.28.2.
+--
+-- see: https://github.com/sol/hpack
+--
+-- hash: d91f6e4382f00bd57a9888e1f5c251960be102facc20627c259d1c072a5a50e8
-homepage: http://github.com/haskoin/haskoin
-bug-reports: http://github.com/haskoin/haskoin/issues
-tested-with: GHC==8.0.2
-stability: stable
-license: PublicDomain
-license-file: UNLICENSE
-author: Philippe Laprade, Jean-Pierre Rupp
-maintainer: xenog@protonmail.com
-category: Bitcoin, Finance, Network
-build-type: Simple
-cabal-version: >= 1.9.2
+name: haskoin-node
+version: 0.5.0
+synopsis: Haskoin Node P2P library for Bitcoin and Bitcoin Cash
+description: Bitcoin and Bitcoin Cash peer-to-peer protocol library featuring headers-first synchronisation.
+category: Bitcoin, Finance, Network
+homepage: http://github.com/haskoin/haskoin#readme
+bug-reports: http://github.com/haskoin/haskoin/issues
+author: Jean-Pierre Rupp
+maintainer: xenog@protonmail.com
+license: PublicDomain
+license-file: UNLICENSE
+build-type: Simple
+cabal-version: >= 1.10
+extra-source-files:
+ CHANGELOG.md
+ README.md
source-repository head
- type: git
- location: git://github.com/haskoin/haskoin.git
+ type: git
+ location: https://github.com/haskoin/haskoin.git
library
- exposed-modules: Network.Haskoin.Node.HeaderTree
- Network.Haskoin.Node.Checkpoints
- Network.Haskoin.Node.Peer
- Network.Haskoin.Node.BlockChain
- Network.Haskoin.Node.STM
- other-modules: Network.Haskoin.Node.HeaderTree.Types
- Network.Haskoin.Node.HeaderTree.Model
+ exposed-modules:
+ Network.Haskoin.Node
+ Network.Haskoin.Node.Common
+ other-modules:
+ Network.Haskoin.Node.Chain
+ Network.Haskoin.Node.Manager
+ Network.Haskoin.Node.Node
+ Network.Haskoin.Node.Peer
+ Paths_haskoin_node
+ hs-source-dirs:
+ src
+ build-depends:
+ base >=4.7 && <5
+ , bytestring
+ , cereal
+ , conduit
+ , conduit-extra
+ , hashable
+ , haskoin-core
+ , hspec
+ , monad-logger
+ , mtl
+ , network
+ , nqe
+ , random
+ , resourcet
+ , rocksdb-haskell
+ , rocksdb-query
+ , string-conversions
+ , time
+ , unique
+ , unliftio
+ default-language: Haskell2010
- extensions: OverloadedStrings
- FlexibleInstances
- FlexibleContexts
- RecordWildCards
- DeriveDataTypeable
-
- build-depends: aeson >= 0.7 && < 1.1
- , async >= 2.0 && < 2.2
- , base >= 4.8 && < 5
- , bytestring >= 0.10 && < 0.11
- , concurrent-extra >= 0.7 && < 0.8
- , cereal >= 0.5 && < 0.6
- , conduit >= 1.2 && < 1.3
- , conduit-extra >= 1.1 && < 1.2
- , containers >= 0.5 && < 0.6
- , data-default >= 0.5 && < 0.8
- , deepseq >= 1.4 && < 1.5
- , either >= 4.3 && < 4.5
- , esqueleto >= 2.4 && < 2.6
- , exceptions >= 0.8 && < 0.9
- , haskoin-core >= 0.3 && < 0.5
- , largeword >= 1.2.4 && < 1.3
- , lifted-async >= 0.2 && < 0.10
- , lifted-base >= 0.2 && < 0.3
- , monad-control >= 1.0 && < 1.1
- , monad-logger >= 0.3 && < 0.4
- , mtl >= 2.2 && < 2.3
- , network >= 2.4 && < 2.7
- , persistent >= 2.2 && < 2.7
- , persistent-template >= 2.1 && < 2.6
- , resource-pool >= 0.2 && < 0.3
- , random >= 1.0 && < 1.2
- , stm >= 2.4 && < 2.5
- , stm-chans >= 3.0 && < 3.1
- , stm-conduit >= 2.5 && < 3.1
- , string-conversions >= 0.4 && < 0.5
- , text >= 0.11 && < 1.3
- , time >= 1.4 && < 1.7
-
- ghc-options: -Wall
-
-test-suite test-haskoin-node
- type: exitcode-stdio-1.0
- main-is: Main.hs
-
- extensions: EmptyDataDecls
-
- other-modules: Network.Haskoin.Node.Tests
- Network.Haskoin.Node.Units
-
- build-depends: base >= 4.8 && < 5
- , haskoin-core
- , haskoin-node
- , HUnit >= 1.2 && < 1.6
- , QuickCheck >= 2.6 && < 2.10
- , monad-logger >= 0.3 && < 0.4
- , mtl >= 2.2 && < 2.3
- , persistent >= 2.2 && < 2.7
- , persistent-sqlite >= 2.2 && < 2.7
- , resourcet >= 1.1 && < 1.2
- , test-framework >= 0.8 && < 0.9
- , test-framework-quickcheck2 >= 0.3 && < 0.4
- , test-framework-hunit >= 0.3 && < 0.4
-
- ghc-options: -Wall
- hs-source-dirs: tests
+test-suite haskoin-node-test
+ type: exitcode-stdio-1.0
+ main-is: Spec.hs
+ other-modules:
+ Paths_haskoin_node
+ hs-source-dirs:
+ test
+ ghc-options: -threaded -rtsopts -with-rtsopts=-N
+ build-depends:
+ base >=4.7 && <5
+ , bytestring
+ , cereal
+ , haskoin-core
+ , haskoin-node
+ , hspec
+ , monad-logger
+ , mtl
+ , network
+ , nqe
+ , random
+ , resourcet
+ , rocksdb-haskell
+ , rocksdb-query
+ , string-conversions
+ , unliftio
+ default-language: Haskell2010
diff --git a/src/Network/Haskoin/Node.hs b/src/Network/Haskoin/Node.hs
new file mode 100644
index 0000000..a5fa8fe
--- /dev/null
+++ b/src/Network/Haskoin/Node.hs
@@ -0,0 +1,9 @@
+module Network.Haskoin.Node
+( module X
+) where
+
+import Network.Haskoin.Node.Chain as X
+import Network.Haskoin.Node.Common as X
+import Network.Haskoin.Node.Manager as X
+import Network.Haskoin.Node.Node as X
+import Network.Haskoin.Node.Peer as X
diff --git a/src/Network/Haskoin/Node/Chain.hs b/src/Network/Haskoin/Node/Chain.hs
new file mode 100644
index 0000000..26e82b9
--- /dev/null
+++ b/src/Network/Haskoin/Node/Chain.hs
@@ -0,0 +1,281 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE FlexibleInstances #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE UndecidableInstances #-}
+{-# OPTIONS_GHC -fno-warn-orphans #-}
+module Network.Haskoin.Node.Chain
+( chain
+) where
+
+import Control.Concurrent.NQE
+import Control.Monad
+import Control.Monad.Logger
+import Control.Monad.Reader
+import qualified Data.ByteString as BS
+import Data.Either
+import Data.List (delete, nub)
+import Data.Maybe
+import Data.Serialize
+import Data.String
+import Data.String.Conversions
+import Database.RocksDB (DB)
+import Database.RocksDB.Query as R
+import Network.Haskoin.Block
+import Network.Haskoin.Network
+import Network.Haskoin.Node.Common
+import UnliftIO
+
+type MonadChain m
+ = ( BlockHeaders m
+ , MonadLoggerIO m
+ , MonadReader ChainReader m)
+
+data ChainState = ChainState
+ { syncingPeer :: !(Maybe Peer)
+ , newPeers :: ![Peer]
+ , mySynced :: !Bool
+ }
+
+data ChainReader = ChainReader
+ { headerDB :: !DB
+ , myConfig :: !ChainConfig
+ , chainState :: !(TVar ChainState)
+ }
+
+newtype BlockHeaderKey = BlockHeaderKey BlockHash deriving (Eq, Show)
+
+instance Serialize BlockHeaderKey where
+ get = do
+ guard . (== 0x90) =<< getWord8
+ BlockHeaderKey <$> get
+ put (BlockHeaderKey bh) = do
+ putWord8 0x90
+ put bh
+
+data BestBlockKey = BestBlockKey deriving (Eq, Show)
+
+instance KeyValue BlockHeaderKey BlockNode
+instance KeyValue BestBlockKey BlockNode
+
+instance Serialize BestBlockKey where
+ get = do
+ guard . (== 0x91) =<< getWord8
+ return BestBlockKey
+ put BestBlockKey = putWord8 0x91
+
+instance (Monad m, MonadLoggerIO m, MonadReader ChainReader m) =>
+ BlockHeaders m where
+ addBlockHeader bn = do
+ db <- asks headerDB
+ insert db (BlockHeaderKey (headerHash (nodeHeader bn))) bn
+ getBlockHeader bh = do
+ db <- asks headerDB
+ retrieve db Nothing (BlockHeaderKey bh)
+ getBestBlockHeader = do
+ db <- asks headerDB
+ retrieve db Nothing BestBlockKey >>= \case
+ Nothing -> error "Could not get best block from database"
+ Just b -> return b
+ setBestBlockHeader bn = do
+ db <- asks headerDB
+ insert db BestBlockKey bn
+ addBlockHeaders bns = do
+ db <- asks headerDB
+ writeBatch db (map f bns)
+ where
+ f bn = insertOp (BlockHeaderKey (headerHash (nodeHeader bn))) bn
+
+chain ::
+ ( MonadUnliftIO m
+ , MonadLoggerIO m
+ )
+ => ChainConfig
+ -> m ()
+chain cfg = do
+ st <-
+ newTVarIO
+ ChainState {syncingPeer = Nothing, mySynced = False, newPeers = []}
+ let rd =
+ ChainReader
+ {myConfig = cfg, headerDB = chainConfDB cfg, chainState = st}
+ run `runReaderT` rd
+ where
+ net = chainConfNetwork cfg
+ run = do
+ db <- asks headerDB
+ m :: Maybe BlockNode <- retrieve db Nothing BestBlockKey
+ when (isNothing m) $ do
+ addBlockHeader (genesisNode net)
+ insert db BestBlockKey (genesisNode net)
+ forever $ do
+ msg <- receive $ chainConfChain cfg
+ processChainMessage msg
+
+processChainMessage :: MonadChain m => ChainMessage -> m ()
+processChainMessage (ChainNewHeaders p hcs) = do
+ stb <- asks chainState
+ st <- readTVarIO stb
+ net <- chainConfNetwork <$> asks myConfig
+ let spM = syncingPeer st
+ t <- computeTime
+ bb <- getBestBlockHeader
+ bhsE <- connectBlocks net t (map fst hcs)
+ case bhsE of
+ Right bhs -> conn bb bhs spM
+ Left e -> do
+ $(logWarnS) "Chain" $ "Could not connect headers: " <> cs e
+ case spM of
+ Nothing -> do
+ bb' <- getBestBlockHeader
+ atomically . modifyTVar stb $ \s ->
+ s {newPeers = nub $ p : newPeers s}
+ syncHeaders bb' p
+ Just sp
+ | sp == p -> do
+ pstr <- peerString p
+ $(logErrorS) "Chain" $
+ "Syncing peer " <> pstr <> " sent bad headers"
+ mgr <- chainConfManager <$> asks myConfig
+ managerKill PeerSentBadHeaders p mgr
+ atomically . modifyTVar stb $ \s ->
+ s {syncingPeer = Nothing}
+ processSyncQueue
+ | otherwise ->
+ atomically . modifyTVar stb $ \s ->
+ s {newPeers = nub $ p : newPeers s}
+ where
+ synced = do
+ st <- asks chainState
+ atomically . modifyTVar st $ \s -> s {syncingPeer = Nothing}
+ MSendHeaders `sendMessage` p
+ processSyncQueue
+ upeer bb = do
+ mgr <- chainConfManager <$> asks myConfig
+ managerSetPeerBest p bb mgr
+ conn bb bhs spM = do
+ bb' <- getBestBlockHeader
+ when (bb /= bb') $ do
+ $(logInfoS) "Chain" $
+ "Best header at height " <> cs (show (nodeHeight bb'))
+ mgr <- chainConfManager <$> asks myConfig
+ managerSetBest bb' mgr
+ l <- chainConfListener <$> asks myConfig
+ atomically . l $ ChainNewBest bb'
+ case length hcs of
+ 0 -> synced
+ 2000 ->
+ case spM of
+ Just sp
+ | sp == p -> do
+ upeer $ head bhs
+ syncHeaders (head bhs) p
+ _ -> do
+ st <- asks chainState
+ atomically . modifyTVar st $ \s ->
+ s {newPeers = nub $ p : newPeers s}
+ _ -> do
+ upeer $ head bhs
+ synced
+
+processChainMessage (ChainNewPeer p) = do
+ st <- asks chainState
+ sp <-
+ atomically $ do
+ modifyTVar st $ \s -> s {newPeers = nub $ p : newPeers s}
+ syncingPeer <$> readTVar st
+ case sp of
+ Nothing -> processSyncQueue
+ Just _ -> return ()
+
+-- Getting a new block should trigger an action equivalent to getting a new peer
+processChainMessage (ChainNewBlocks p _) = processChainMessage (ChainNewPeer p)
+
+processChainMessage (ChainRemovePeer p) = do
+ st <- asks chainState
+ sp <-
+ atomically $ do
+ modifyTVar st $ \s -> s {newPeers = delete p (newPeers s)}
+ syncingPeer <$> readTVar st
+ case sp of
+ Just p' ->
+ when (p == p') $ do
+ atomically . modifyTVar st $ \s ->
+ s {syncingPeer = Nothing}
+ processSyncQueue
+ Nothing -> return ()
+
+processChainMessage (ChainGetBest reply) =
+ getBestBlockHeader >>= atomically . reply
+
+processChainMessage (ChainGetAncestor h n reply) =
+ getAncestor h n >>= atomically . reply
+
+processChainMessage (ChainGetSplit r l reply) =
+ splitPoint r l >>= atomically . reply
+
+processChainMessage (ChainGetBlock h reply) =
+ getBlockHeader h >>= atomically . reply
+
+processChainMessage (ChainSendHeaders _) = return ()
+
+processChainMessage (ChainIsSynced reply) = do
+ st <- asks chainState
+ s <- mySynced <$> readTVarIO st
+ atomically (reply s)
+
+processSyncQueue :: MonadChain m => m ()
+processSyncQueue = do
+ s <- asks chainState >>= readTVarIO
+ when (isNothing (syncingPeer s)) $ getBestBlockHeader >>= go s
+ where
+ go s bb =
+ case newPeers s of
+ [] -> do
+ t <- computeTime
+ let h2 = t - 2 * 60 * 60
+ tg = blockTimestamp (nodeHeader bb) > h2
+ if tg
+ then unless (mySynced s) $ do
+ l <- chainConfListener <$> asks myConfig
+ st <- asks chainState
+ atomically $ do
+ l (ChainSynced bb)
+ writeTVar st s {mySynced = True}
+ else do
+ l <- chainConfListener <$> asks myConfig
+ st <- asks chainState
+ atomically $ do
+ l (ChainNotSynced bb)
+ writeTVar st s {mySynced = False}
+ p:_ -> syncHeaders bb p
+
+syncHeaders :: MonadChain m => BlockNode -> Peer -> m ()
+syncHeaders bb p = do
+ st <- asks chainState
+ s <- readTVarIO st
+ atomically . writeTVar st $
+ s {syncingPeer = Just p, newPeers = delete p (newPeers s)}
+ loc <- blockLocator bb
+ let m =
+ MGetHeaders
+ GetHeaders
+ { getHeadersVersion = myVersion
+ , getHeadersBL = loc
+ , getHeadersHashStop =
+ fromRight (error "Could not decode zero hash") . decode $
+ BS.replicate 32 0
+ }
+ PeerOutgoing m `send` p
+
+peerString :: (MonadChain m, IsString a) => Peer -> m a
+peerString p = do
+ mgr <- chainConfManager <$> asks myConfig
+ managerGetPeer mgr p >>= \case
+ Nothing -> return "[unknown]"
+ Just o -> return $ fromString $ show (onlinePeerAddress o)
diff --git a/src/Network/Haskoin/Node/Common.hs b/src/Network/Haskoin/Node/Common.hs
new file mode 100644
index 0000000..b0f11d6
--- /dev/null
+++ b/src/Network/Haskoin/Node/Common.hs
@@ -0,0 +1,371 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE RankNTypes #-}
+module Network.Haskoin.Node.Common where
+
+import Control.Concurrent.NQE
+import Control.Concurrent.Unique
+import Data.ByteString (ByteString)
+import Data.Hashable
+import Data.Time.Clock
+import Data.Time.Clock.POSIX
+import Data.Word
+import Database.RocksDB (DB)
+import Network.Haskoin.Block
+import Network.Haskoin.Constants
+import Network.Haskoin.Network
+import Network.Haskoin.Transaction
+import Network.Socket (AddrInfo (..), AddrInfoFlag (..),
+ Family (..), NameInfoFlag (..),
+ SockAddr (..), SocketType (..),
+ addrAddress, defaultHints,
+ getAddrInfo, getNameInfo)
+import Text.Read
+import UnliftIO
+
+type HostPort = (Host, Port)
+type Host = String
+type Port = Int
+
+data OnlinePeer = OnlinePeer
+ { onlinePeerAddress :: !SockAddr
+ , onlinePeerConnected :: !Bool
+ , onlinePeerVersion :: !Word32
+ , onlinePeerServices :: !Word64
+ , onlinePeerRemoteNonce :: !Word64
+ , onlinePeerUserAgent :: !ByteString
+ , onlinePeerRelay :: !Bool
+ , onlinePeerBestBlock :: !BlockNode
+ , onlinePeerAsync :: !(Async ())
+ , onlinePeerMailbox :: !Peer
+ , onlinePeerNonce :: !Word64
+ , onlinePeerPings :: ![NominalDiffTime]
+ }
+
+data UniqueInbox a = UniqueInbox
+ { uniqueInbox :: Inbox a
+ , uniqueId :: Unique
+ }
+
+type PeerSupervisor m = Inbox (SupervisorMessage m)
+type NodeSupervisor m = Inbox (SupervisorMessage m)
+
+type Peer = UniqueInbox PeerMessage
+type Chain = Inbox ChainMessage
+type Manager = Inbox ManagerMessage
+
+instance Eq (UniqueInbox a) where
+ UniqueInbox {uniqueId = a} == UniqueInbox {uniqueId = b} = a == b
+
+instance Hashable (UniqueInbox a) where
+ hashWithSalt n UniqueInbox {uniqueId = i} = hashWithSalt n i
+ hash UniqueInbox {uniqueId = i} = hash i
+
+instance Mailbox UniqueInbox where
+ mailboxEmptySTM UniqueInbox {uniqueInbox = mbox} = mailboxEmptySTM mbox
+ sendSTM msg UniqueInbox {uniqueInbox = mbox} = msg `sendSTM` mbox
+ receiveSTM UniqueInbox {uniqueInbox = mbox} = receiveSTM mbox
+ requeueMsg msg UniqueInbox {uniqueInbox = mbox} = msg `requeueMsg` mbox
+
+data NodeConfig m = NodeConfig
+ { maxPeers :: !Int
+ , database :: !DB
+ , initPeers :: ![HostPort]
+ , discover :: !Bool
+ , nodeEvents :: !(Listen NodeEvent)
+ , netAddress :: !NetworkAddress
+ , nodeSupervisor :: !(NodeSupervisor m)
+ , nodeChain :: !Chain
+ , nodeManager :: !Manager
+ , nodeNet :: !Network
+ }
+
+data ManagerConfig m = ManagerConfig
+ { mgrConfMaxPeers :: !Int
+ , mgrConfDB :: !DB
+ , mgrConfPeers :: ![HostPort]
+ , mgrConfDiscover :: !Bool
+ , mgrConfMgrListener :: !(Listen ManagerEvent)
+ , mgrConfPeerListener :: !(Listen (Peer, PeerEvent))
+ , mgrConfNetAddr :: !NetworkAddress
+ , mgrConfManager :: !Manager
+ , mgrConfChain :: !Chain
+ , mgrConfPeerSupervisor :: !(PeerSupervisor m)
+ , mgrConfNetwork :: !Network
+ }
+
+data NodeEvent
+ = ManagerEvent !ManagerEvent
+ | ChainEvent !ChainEvent
+ | PeerEvent !(Peer, PeerEvent)
+
+data ManagerEvent
+ = ManagerConnect !Peer
+ | ManagerDisconnect !Peer
+
+data ManagerMessage
+ = ManagerSetFilter !BloomFilter
+ | ManagerSetBest !BlockNode
+ | ManagerPing
+ | ManagerGetAddr !Peer
+ | ManagerNewPeers !Peer
+ ![NetworkAddressTime]
+ | ManagerKill !PeerException
+ !Peer
+ | ManagerSetPeerBest !Peer
+ !BlockNode
+ | ManagerGetPeerBest !Peer
+ !(Reply (Maybe BlockNode))
+ | ManagerSetPeerVersion !Peer
+ !Version
+ | ManagerGetPeerVersion !Peer
+ !(Reply (Maybe Word32))
+ | ManagerGetPeers !(Reply [OnlinePeer])
+ | ManagerGetOnlinePeer !Peer !(Reply (Maybe OnlinePeer))
+ | ManagerPeerPing !Peer
+ !NominalDiffTime
+ | PeerStopped !(Async (), Either SomeException ())
+
+data ChainConfig = ChainConfig
+ { chainConfDB :: !DB
+ , chainConfListener :: !(Listen ChainEvent)
+ , chainConfManager :: !Manager
+ , chainConfChain :: !Chain
+ , chainConfNetwork :: !Network
+ }
+
+data ChainMessage
+ = ChainNewHeaders !Peer
+ ![BlockHeaderCount]
+ | ChainNewPeer !Peer
+ | ChainRemovePeer !Peer
+ | ChainGetBest !(BlockNode -> STM ())
+ | ChainGetAncestor !BlockHeight
+ !BlockNode
+ !(Reply (Maybe BlockNode))
+ | ChainGetSplit !BlockNode
+ !BlockNode
+ !(Reply BlockNode)
+ | ChainGetBlock !BlockHash
+ !(Reply (Maybe BlockNode))
+ | ChainNewBlocks !Peer ![BlockHash]
+ | ChainSendHeaders !Peer
+ | ChainIsSynced !(Reply Bool)
+
+data ChainEvent
+ = ChainNewBest !BlockNode
+ | ChainSynced !BlockNode
+ | ChainNotSynced !BlockNode
+ deriving (Eq, Show)
+
+data PeerConfig = PeerConfig
+ { peerConfConnect :: !NetworkAddress
+ , peerConfInitBest :: !BlockNode
+ , peerConfLocal :: !NetworkAddress
+ , peerConfManager :: !Manager
+ , peerConfChain :: !Chain
+ , peerConfListener :: !(Listen (Peer, PeerEvent))
+ , peerConfNonce :: !Word64
+ , peerConfNetwork :: !Network
+ }
+
+data PeerException
+ = PeerMisbehaving !String
+ | DecodeMessageError !String
+ | CannotDecodePayload !String
+ | MessageHeaderEmpty
+ | PeerIsMyself
+ | PayloadTooLarge !Word32
+ | PeerAddressInvalid
+ | BloomFiltersNotSupported
+ | PeerSentBadHeaders
+ | NotNetworkPeer
+ | PeerNoSegWit
+ | PeerTimeout
+ deriving (Eq, Show)
+
+instance Exception PeerException
+
+data PeerEvent
+ = TxAvail ![TxHash]
+ | GotBlock !Block
+ | GotMerkleBlock !MerkleBlock
+ | GotTx !Tx
+ | GotPong !Word64
+ | SendBlocks !GetBlocks
+ | SendHeaders !GetHeaders
+ | SendData ![InvVector]
+ | TxNotFound !TxHash
+ | BlockNotFound !BlockHash
+ | WantMempool
+ | Rejected !Reject
+
+data PeerMessage
+ = PeerOutgoing !Message
+ | PeerIncoming !Message
+
+toSockAddr :: (MonadUnliftIO m) => HostPort -> m [SockAddr]
+toSockAddr (host, port) = go `catch` e
+ where
+ go =
+ fmap (map addrAddress) . liftIO $
+ getAddrInfo
+ (Just
+ defaultHints
+ { addrFlags = [AI_ADDRCONFIG]
+ , addrSocketType = Stream
+ , addrFamily = AF_INET
+ })
+ (Just host)
+ (Just (show port))
+ e :: Monad m => SomeException -> m [SockAddr]
+ e _ = return []
+
+fromSockAddr ::
+ (MonadUnliftIO m) => SockAddr -> m (Maybe HostPort)
+fromSockAddr sa = go `catch` e
+ where
+ go = do
+ (hostM, portM) <- liftIO (getNameInfo flags True True sa)
+ return $ (,) <$> hostM <*> (readMaybe =<< portM)
+ flags = [NI_NUMERICHOST, NI_NUMERICSERV]
+ e :: Monad m => SomeException -> m (Maybe a)
+ e _ = return Nothing
+
+computeTime :: MonadIO m => m Word32
+computeTime = round <$> liftIO getPOSIXTime
+
+myVersion :: Word32
+myVersion = 70012
+
+managerSetBest :: MonadIO m => BlockNode -> Manager -> m ()
+managerSetBest bn mgr = ManagerSetBest bn `send` mgr
+
+managerSetPeerVersion :: MonadIO m => Peer -> Version -> Manager -> m ()
+managerSetPeerVersion p v mgr = ManagerSetPeerVersion p v `send` mgr
+
+managerGetPeerVersion :: MonadIO m => Peer -> Manager -> m (Maybe Word32)
+managerGetPeerVersion p mgr = ManagerGetPeerVersion p `query` mgr
+
+managerGetPeerBest :: MonadIO m => Peer -> Manager -> m (Maybe BlockNode)
+managerGetPeerBest p mgr = ManagerGetPeerBest p `query` mgr
+
+managerSetPeerBest :: MonadIO m => Peer -> BlockNode -> Manager -> m ()
+managerSetPeerBest p bn mgr = ManagerSetPeerBest p bn `send` mgr
+
+managerGetPeers :: MonadIO m => Manager -> m [OnlinePeer]
+managerGetPeers mgr = ManagerGetPeers `query` mgr
+
+managerGetPeer :: MonadIO m => Manager -> Peer -> m (Maybe OnlinePeer)
+managerGetPeer mgr p = ManagerGetOnlinePeer p `query` mgr
+
+managerGetAddr :: MonadIO m => Peer -> Manager -> m ()
+managerGetAddr p mgr = ManagerGetAddr p `send` mgr
+
+managerKill :: MonadIO m => PeerException -> Peer -> Manager -> m ()
+managerKill e p mgr = ManagerKill e p `send` mgr
+
+managerNewPeers ::
+ MonadIO m => Peer -> [NetworkAddressTime] -> Manager -> m ()
+managerNewPeers p as mgr = ManagerNewPeers p as `send` mgr
+
+setManagerFilter :: MonadIO m => BloomFilter -> Manager -> m ()
+setManagerFilter bf mgr = ManagerSetFilter bf `send` mgr
+
+sendMessage :: MonadIO m => Message -> Peer -> m ()
+sendMessage msg p = PeerOutgoing msg `send` p
+
+peerSetFilter :: MonadIO m => BloomFilter -> Peer -> m ()
+peerSetFilter f p = MFilterLoad (FilterLoad f) `sendMessage` p
+
+getMerkleBlocks ::
+ (MonadIO m)
+ => Peer
+ -> [BlockHash]
+ -> m ()
+getMerkleBlocks p bhs = PeerOutgoing (MGetData (GetData ivs)) `send` p
+ where
+ ivs = map (InvVector InvMerkleBlock . getBlockHash) bhs
+
+peerGetBlocks ::
+ MonadIO m => Network -> Peer -> [BlockHash] -> m ()
+peerGetBlocks net p bhs = PeerOutgoing (MGetData (GetData ivs)) `send` p
+ where
+ con
+ | getSegWit net = InvWitnessBlock
+ | otherwise = InvBlock
+ ivs = map (InvVector con . getBlockHash) bhs
+
+peerGetTxs :: MonadIO m => Network -> Peer -> [TxHash] -> m ()
+peerGetTxs net p ths = PeerOutgoing (MGetData (GetData ivs)) `send` p
+ where
+ con
+ | getSegWit net = InvWitnessTx
+ | otherwise = InvTx
+ ivs = map (InvVector con . getTxHash) ths
+
+buildVersion ::
+ MonadIO m
+ => Network
+ -> Word64
+ -> BlockHeight
+ -> NetworkAddress
+ -> NetworkAddress
+ -> m Version
+buildVersion net nonce height loc rmt = do
+ time <- fromIntegral <$> computeTime
+ return
+ Version
+ { version = myVersion
+ , services = naServices loc
+ , timestamp = time
+ , addrRecv = rmt
+ , addrSend = loc
+ , verNonce = nonce
+ , userAgent = VarString (getHaskoinUserAgent net)
+ , startHeight = height
+ , relay = True
+ }
+
+chainNewPeer :: MonadIO m => Peer -> Chain -> m ()
+chainNewPeer p ch = ChainNewPeer p `send` ch
+
+chainRemovePeer :: MonadIO m => Peer -> Chain -> m ()
+chainRemovePeer p ch = ChainRemovePeer p `send` ch
+
+chainGetBlock :: MonadIO m => BlockHash -> Chain -> m (Maybe BlockNode)
+chainGetBlock bh ch = ChainGetBlock bh `query` ch
+
+chainGetBest :: MonadIO m => Chain -> m BlockNode
+chainGetBest ch = ChainGetBest `query` ch
+
+chainGetAncestor ::
+ MonadIO m => BlockHeight -> BlockNode -> Chain -> m (Maybe BlockNode)
+chainGetAncestor h n c = ChainGetAncestor h n `query` c
+
+chainGetParents ::
+ MonadIO m => BlockHeight -> BlockNode -> Chain -> m [BlockNode]
+chainGetParents height top ch = go [] top
+ where
+ go acc b
+ | height >= nodeHeight b = return acc
+ | otherwise = do
+ m <- chainGetBlock (prevBlock $ nodeHeader b) ch
+ case m of
+ Nothing -> return acc
+ Just p -> go (p : acc) p
+
+chainGetSplitBlock ::
+ MonadIO m => BlockNode -> BlockNode -> Chain -> m BlockNode
+chainGetSplitBlock l r c = ChainGetSplit l r `query` c
+
+chainBlockMain :: MonadIO m => BlockHash -> Chain -> m Bool
+chainBlockMain bh ch =
+ chainGetBest ch >>= \bb ->
+ chainGetBlock bh ch >>= \case
+ Nothing -> return False
+ bm@(Just bn) -> (== bm) <$> chainGetAncestor (nodeHeight bn) bb ch
+
+chainIsSynced :: MonadIO m => Chain -> m Bool
+chainIsSynced ch = ChainIsSynced `query` ch
diff --git a/src/Network/Haskoin/Node/Manager.hs b/src/Network/Haskoin/Node/Manager.hs
new file mode 100644
index 0000000..f5bde57
--- /dev/null
+++ b/src/Network/Haskoin/Node/Manager.hs
@@ -0,0 +1,594 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TupleSections #-}
+module Network.Haskoin.Node.Manager
+ ( manager
+ ) where
+
+import Control.Applicative
+import Control.Concurrent.NQE
+import Control.Concurrent.Unique
+import Control.Monad
+import Control.Monad.Except
+import Control.Monad.Logger
+import Control.Monad.Reader
+import Data.Bits
+import qualified Data.ByteString as BS
+import Data.Conduit
+import qualified Data.Conduit.Combinators as CC
+import Data.Function
+import Data.List
+import Data.Maybe
+import Data.Serialize (Get, Put, Serialize, get, put)
+import qualified Data.Serialize as S
+import Data.String
+import Data.String.Conversions
+import Data.Word
+import Database.RocksDB (DB)
+import Database.RocksDB.Query
+import Network.Haskoin.Block
+import Network.Haskoin.Constants
+import Network.Haskoin.Network
+import Network.Haskoin.Node.Common
+import Network.Haskoin.Node.Peer
+import Network.Socket (SockAddr (..))
+import System.Random
+import UnliftIO
+import UnliftIO.Concurrent
+import UnliftIO.Resource
+
+type MonadManager n m
+ = ( MonadUnliftIO n
+ , MonadLoggerIO n
+ , MonadUnliftIO m
+ , MonadLoggerIO m
+ , MonadReader (ManagerReader n) m)
+
+data ManagerReader n = ManagerReader
+ { mySelf :: !Manager
+ , myChain :: !Chain
+ , myConfig :: !(ManagerConfig n)
+ , myPeerDB :: !DB
+ , myPeerSupervisor :: !(PeerSupervisor n)
+ , onlinePeers :: !(TVar [OnlinePeer])
+ , myBloomFilter :: !(TVar (Maybe BloomFilter))
+ , myBestBlock :: !(TVar BlockNode)
+ }
+
+data Priority
+ = PriorityNetwork
+ | PrioritySeed
+ | PriorityManual
+ deriving (Eq, Show, Ord)
+
+data PeerAddress
+ = PeerAddress { getPeerAddress :: SockAddr }
+ | PeerAddressBase
+ deriving (Eq, Show)
+
+instance Serialize Priority where
+ get =
+ S.getWord8 >>= \case
+ 0x00 -> return PriorityManual
+ 0x01 -> return PrioritySeed
+ 0x02 -> return PriorityNetwork
+ _ -> mzero
+ put PriorityManual = S.putWord8 0x00
+ put PrioritySeed = S.putWord8 0x01
+ put PriorityNetwork = S.putWord8 0x02
+
+instance Serialize PeerAddress where
+ get = do
+ guard . (== 0x81) =<< S.getWord8
+ record <|> return PeerAddressBase
+ where
+ record = do
+ getPeerAddress <- decodeSockAddr
+ return PeerAddress {..}
+ put PeerAddress {..} = do
+ S.putWord8 0x81
+ encodeSockAddr getPeerAddress
+ put PeerAddressBase = S.putWord8 0x81
+
+data PeerTimeAddress
+ = PeerTimeAddress { getPeerPrio :: !Priority
+ , getPeerBanned :: !Word32
+ , getPeerLastConnect :: !Word32
+ , getPeerNextConnect :: !Word32
+ , getPeerTimeAddress :: !PeerAddress }
+ | PeerTimeAddressBase
+ deriving (Eq, Show)
+
+instance Key PeerTimeAddress
+instance KeyValue PeerAddress PeerTimeAddress
+instance KeyValue PeerTimeAddress PeerAddress
+
+instance Serialize PeerTimeAddress where
+ get = do
+ guard . (== 0x80) =<< S.getWord8
+ record <|> return PeerTimeAddressBase
+ where
+ record = do
+ getPeerPrio <- S.get
+ getPeerBanned <- S.get
+ getPeerLastConnect <- (maxBound -) <$> S.get
+ getPeerNextConnect <- S.get
+ getPeerTimeAddress <- S.get
+ return PeerTimeAddress {..}
+ put PeerTimeAddress {..} = do
+ S.putWord8 0x80
+ S.put getPeerPrio
+ S.put getPeerBanned
+ S.put (maxBound - getPeerLastConnect)
+ S.put getPeerNextConnect
+ S.put getPeerTimeAddress
+ put PeerTimeAddressBase = S.putWord8 0x80
+
+manager :: (MonadUnliftIO m, MonadLoggerIO m) => ManagerConfig m -> m ()
+manager cfg = do
+ bb <- chainGetBest $ mgrConfChain cfg
+ opb <- newTVarIO []
+ bfb <- newTVarIO Nothing
+ bbb <- newTVarIO bb
+ withConnectLoop (mgrConfManager cfg) $ do
+ let rd =
+ ManagerReader
+ { mySelf = mgrConfManager cfg
+ , myChain = mgrConfChain cfg
+ , myConfig = cfg
+ , myPeerDB = mgrConfDB cfg
+ , myPeerSupervisor = mgrConfPeerSupervisor cfg
+ , onlinePeers = opb
+ , myBloomFilter = bfb
+ , myBestBlock = bbb
+ }
+ run `runReaderT` rd
+ where
+ run = do
+ connectNewPeers
+ managerLoop
+
+resolvePeers :: (MonadUnliftIO m, MonadManager n m) => m [(SockAddr, Priority)]
+resolvePeers = do
+ cfg <- asks myConfig
+ let net = mgrConfNetwork cfg
+ confPeers <-
+ fmap
+ (map (, PriorityManual) . concat)
+ (mapM toSockAddr (mgrConfPeers cfg))
+ if mgrConfDiscover cfg
+ then do
+ seedPeers <-
+ fmap
+ (map (, PrioritySeed) . concat)
+ (mapM (toSockAddr . (, getDefaultPort net)) (getSeeds net))
+ return (confPeers ++ seedPeers)
+ else return confPeers
+
+encodeSockAddr :: SockAddr -> Put
+encodeSockAddr (SockAddrInet6 p _ (a, b, c, d) _) = do
+ S.putWord32be a
+ S.putWord32be b
+ S.putWord32be c
+ S.putWord32be d
+ S.putWord16be (fromIntegral p)
+
+encodeSockAddr (SockAddrInet p a) = do
+ S.putWord32be 0x00000000
+ S.putWord32be 0x00000000
+ S.putWord32be 0x0000ffff
+ S.putWord32host a
+ S.putWord16be (fromIntegral p)
+
+encodeSockAddr x = error $ "Colud not encode address: " <> show x
+
+decodeSockAddr :: Get SockAddr
+decodeSockAddr = do
+ a <- S.getWord32be
+ b <- S.getWord32be
+ c <- S.getWord32be
+ if a == 0x00000000 && b == 0x00000000 && c == 0x0000ffff
+ then do
+ d <- S.getWord32host
+ p <- S.getWord16be
+ return $ SockAddrInet (fromIntegral p) d
+ else do
+ d <- S.getWord32be
+ p <- S.getWord16be
+ return $ SockAddrInet6 (fromIntegral p) 0 (a, b, c, d) 0
+
+connectPeer :: MonadManager n m => SockAddr -> m ()
+connectPeer sa = do
+ db <- asks myPeerDB
+ let k = PeerAddress sa
+ retrieve db Nothing k >>= \case
+ Nothing -> error "Could not find peer to mark connected"
+ Just v -> do
+ now <- computeTime
+ let v' = v {getPeerLastConnect = now}
+ writeBatch db [deleteOp v, insertOp v' k, insertOp k v']
+ logPeersConnected
+
+
+logPeersConnected :: MonadManager n m => m ()
+logPeersConnected = do
+ mo <- mgrConfMaxPeers <$> asks myConfig
+ ps <- getOnlinePeers
+ $(logInfoS) "Manager" $
+ "Peers connected: " <> cs (show (length ps)) <> "/" <> cs (show mo)
+
+storePeer :: MonadManager n m => SockAddr -> Priority -> m ()
+storePeer sa prio = do
+ db <- asks myPeerDB
+ let k = PeerAddress sa
+ retrieve db Nothing k >>= \case
+ Nothing -> do
+ let v =
+ PeerTimeAddress
+ { getPeerPrio = prio
+ , getPeerBanned = 0
+ , getPeerLastConnect = 0
+ , getPeerNextConnect = 0
+ , getPeerTimeAddress = k
+ }
+ writeBatch
+ db
+ [insertOp v k, insertOp k v]
+ Just v@PeerTimeAddress {..} ->
+ when (getPeerPrio < prio) $ do
+ let v' = v {getPeerPrio = prio}
+ writeBatch
+ db
+ [deleteOp v, insertOp v' k, insertOp k v']
+ Just PeerTimeAddressBase ->
+ error "Key for peer is corrupted"
+
+banPeer :: MonadManager n m => SockAddr -> m ()
+banPeer sa = do
+ db <- asks myPeerDB
+ let k = PeerAddress sa
+ retrieve db Nothing k >>= \case
+ Nothing -> error "Cannot find peer to be banned"
+ Just v -> do
+ now <- computeTime
+ let v' =
+ v
+ { getPeerBanned = now
+ , getPeerNextConnect = now + 6 * 60 * 60
+ }
+ when (getPeerPrio v == PriorityNetwork) $ do
+ $(logWarnS) "Manager" $ "Banning peer " <> cs (show sa)
+ writeBatch db [deleteOp v, insertOp k v', insertOp v' k]
+
+backoffPeer :: MonadManager n m => SockAddr -> m ()
+backoffPeer sa = do
+ db <- asks myPeerDB
+ onlinePeers <- map onlinePeerAddress <$> getOnlinePeers
+ let k = PeerAddress sa
+ retrieve db Nothing k >>= \case
+ Nothing -> error "Cannot find peer to backoff in database"
+ Just v -> do
+ now <- computeTime
+ r <-
+ liftIO . randomRIO $
+ if null onlinePeers
+ then (90, 300) -- Don't backoff so much if possibly offline
+ else (900, 1800)
+ let t = max (now + r) (getPeerNextConnect v)
+ v' = v {getPeerNextConnect = t}
+ when (getPeerPrio v == PriorityNetwork) $ do
+ $(logWarnS) "Manager" $
+ "Backing off peer " <> cs (show sa) <> " for " <>
+ cs (show r) <>
+ " seconds"
+ writeBatch db [deleteOp v, insertOp k v', insertOp v' k]
+
+getNewPeer :: (MonadUnliftIO m, MonadManager n m) => m (Maybe SockAddr)
+getNewPeer = do
+ ManagerConfig {..} <- asks myConfig
+ online_peers <- map onlinePeerAddress <$> getOnlinePeers
+ config_peers <- concat <$> mapM toSockAddr mgrConfPeers
+ if mgrConfDiscover
+ then do
+ db <- asks myPeerDB
+ now <- computeTime
+ runResourceT . runConduit $
+ matching db Nothing PeerTimeAddressBase .|
+ CC.filter ((<= now) . getPeerNextConnect . fst) .|
+ CC.map (getPeerAddress . snd) .|
+ CC.find (not . (`elem` online_peers))
+ else return $ find (not . (`elem` online_peers)) config_peers
+
+getConnectedPeers :: MonadManager n m => m [OnlinePeer]
+getConnectedPeers = filter onlinePeerConnected <$> getOnlinePeers
+
+withConnectLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Manager -> m a -> m a
+withConnectLoop mgr f = withAsync go $ const f
+ where
+ go =
+ forever $ do
+ ManagerPing `send` mgr
+ i <- liftIO (randomRIO (30, 90))
+ threadDelay (i * 1000 * 1000)
+
+managerLoop :: (MonadUnliftIO m, MonadManager n m) => m ()
+managerLoop =
+ forever $ do
+ mgr <- asks mySelf
+ msg <- receive mgr
+ processManagerMessage msg
+
+processManagerMessage ::
+ (MonadUnliftIO m, MonadManager n m) => ManagerMessage -> m ()
+
+processManagerMessage (ManagerSetFilter bf) = setFilter bf
+
+processManagerMessage (ManagerSetBest bb) =
+ asks myBestBlock >>= atomically . (`writeTVar` bb)
+
+processManagerMessage ManagerPing = connectNewPeers
+
+processManagerMessage (ManagerGetAddr p) = do
+ pn <- peerString p
+ $(logWarnS) "Manager" $ "Ignoring address request from peer " <> fromString pn
+
+processManagerMessage (ManagerNewPeers p as) =
+ asks myConfig >>= \case
+ ManagerConfig {..}
+ | not mgrConfDiscover -> return ()
+ | otherwise -> do
+ pn <- peerString p
+ $(logInfoS) "Manager" $
+ "Received " <> cs (show (length as)) <>
+ " peers from " <>
+ fromString pn
+ forM_ as $ \(_, na) ->
+ let sa = naAddress na
+ in storePeer sa PriorityNetwork
+
+processManagerMessage (ManagerKill e p) =
+ findPeer p >>= \case
+ Nothing -> return ()
+ Just op -> do
+ $(logErrorS) "Manager" $
+ "Killing peer " <> cs (show (onlinePeerAddress op))
+ banPeer $ onlinePeerAddress op
+ onlinePeerAsync op `cancelWith` e
+
+processManagerMessage (ManagerSetPeerBest p bn) = modifyPeer f p
+ where
+ f op = op {onlinePeerBestBlock = bn}
+
+processManagerMessage (ManagerGetPeerBest p reply) =
+ findPeer p >>= \op -> atomically (reply (fmap onlinePeerBestBlock op))
+
+processManagerMessage (ManagerSetPeerVersion p v) =
+ modifyPeer f p >> findPeer p >>= \case
+ Nothing -> return ()
+ Just op ->
+ runExceptT testVersion >>= \case
+ Left ex -> do
+ banPeer (onlinePeerAddress op)
+ onlinePeerAsync op `cancelWith` ex
+ Right () -> do
+ loadFilter
+ askForPeers
+ connectPeer (onlinePeerAddress op)
+ announcePeer
+ where
+ f op =
+ op
+ { onlinePeerVersion = version v
+ , onlinePeerServices = services v
+ , onlinePeerRemoteNonce = verNonce v
+ , onlinePeerUserAgent = getVarString (userAgent v)
+ , onlinePeerRelay = relay v
+ }
+ testVersion = do
+ when (services v .&. nodeNetwork == 0) $ throwError NotNetworkPeer
+ bfb <- asks myBloomFilter
+ bf <- readTVarIO bfb
+ when (isJust bf && services v .&. nodeBloom == 0) $
+ throwError BloomFiltersNotSupported
+ myself <-
+ any ((verNonce v ==) . onlinePeerNonce) <$> lift getOnlinePeers
+ when myself $ throwError PeerIsMyself
+ loadFilter = do
+ bfb <- asks myBloomFilter
+ bf <- readTVarIO bfb
+ case bf of
+ Nothing -> return ()
+ Just b -> b `peerSetFilter` p
+ askForPeers =
+ mgrConfDiscover <$> asks myConfig >>= \discover ->
+ when discover (MGetAddr `sendMessage` p)
+ announcePeer =
+ findPeer p >>= \case
+ Nothing -> return ()
+ Just op
+ | onlinePeerConnected op -> return ()
+ | otherwise -> do
+ $(logInfoS) "Manager" $
+ "Connected to " <> cs (show (onlinePeerAddress op))
+ l <- mgrConfMgrListener <$> asks myConfig
+ atomically (l (ManagerConnect p))
+ ch <- asks myChain
+ chainNewPeer p ch
+ setPeerAnnounced p
+
+processManagerMessage (ManagerGetPeerVersion p reply) =
+ fmap onlinePeerVersion <$> findPeer p >>= atomically . reply
+
+processManagerMessage (ManagerGetPeers reply) =
+ getPeers >>= atomically . reply
+
+processManagerMessage (ManagerGetOnlinePeer p reply) =
+ getOnlinePeer p >>= atomically . reply
+
+processManagerMessage (ManagerPeerPing p i) =
+ modifyPeer (\x -> x {onlinePeerPings = take 11 $ i : onlinePeerPings x}) p
+
+processManagerMessage (PeerStopped (p, _ex)) = do
+ opb <- asks onlinePeers
+ m <- atomically $ do
+ m <- findPeerAsync p opb
+ when (isJust m) $ removePeer p opb
+ return m
+ case m of
+ Just op -> do
+ backoffPeer (onlinePeerAddress op)
+ processPeerOffline op
+ Nothing -> return ()
+
+processPeerOffline :: MonadManager n m => OnlinePeer -> m ()
+processPeerOffline op
+ | onlinePeerConnected op = do
+ let p = onlinePeerMailbox op
+ $(logWarnS) "Manager" $
+ "Disconnected peer " <> cs (show (onlinePeerAddress op))
+ asks myChain >>= chainRemovePeer p
+ l <- mgrConfMgrListener <$> asks myConfig
+ atomically (l (ManagerDisconnect p))
+ logPeersConnected
+ | otherwise =
+ $(logWarnS) "Manager" $
+ "Could not connect to peer " <> cs (show (onlinePeerAddress op))
+
+getPeers :: MonadManager n m => m [OnlinePeer]
+getPeers = sortBy (compare `on` median . onlinePeerPings) <$> getConnectedPeers
+
+getOnlinePeer :: MonadManager n m => Peer -> m (Maybe OnlinePeer)
+getOnlinePeer p = find ((== p) . onlinePeerMailbox) <$> getConnectedPeers
+
+connectNewPeers :: MonadManager n m => m ()
+connectNewPeers = do
+ mo <- mgrConfMaxPeers <$> asks myConfig
+ ps <- getOnlinePeers
+ let n = mo - length ps
+ when (null ps) $ do
+ ps' <- resolvePeers
+ mapM_ (uncurry storePeer) ps'
+ go n
+ where
+ go 0 = return ()
+ go n =
+ getNewPeer >>= \case
+ Nothing -> return ()
+ Just sa -> conn sa >> go (n - 1)
+ conn sa = do
+ ad <- mgrConfNetAddr <$> asks myConfig
+ mgr <- asks mySelf
+ ch <- asks myChain
+ pl <- mgrConfPeerListener <$> asks myConfig
+ net <- mgrConfNetwork <$> asks myConfig
+ $(logInfoS) "Manager" $ "Connecting to peer " <> cs (show sa)
+ bbb <- asks myBestBlock
+ bb <- readTVarIO bbb
+ nonce <- liftIO randomIO
+ let pc =
+ PeerConfig
+ { peerConfConnect = NetworkAddress (srv net) sa
+ , peerConfInitBest = bb
+ , peerConfLocal = ad
+ , peerConfManager = mgr
+ , peerConfChain = ch
+ , peerConfListener = pl
+ , peerConfNonce = nonce
+ , peerConfNetwork = net
+ }
+ psup <- asks myPeerSupervisor
+ pmbox <- newTBQueueIO 100
+ uid <- liftIO newUnique
+ let p = UniqueInbox {uniqueInbox = Inbox pmbox, uniqueId = uid}
+ a <- psup `addChild` peer pc p
+ newPeerConnection net sa nonce p a
+ srv net
+ | getSegWit net = 8
+ | otherwise = 0
+
+newPeerConnection ::
+ MonadManager n m
+ => Network
+ -> SockAddr
+ -> Word64
+ -> Peer
+ -> Async ()
+ -> m ()
+newPeerConnection net sa nonce p a =
+ addPeer
+ OnlinePeer
+ { onlinePeerAddress = sa
+ , onlinePeerConnected = False
+ , onlinePeerVersion = 0
+ , onlinePeerServices = 0
+ , onlinePeerRemoteNonce = 0
+ , onlinePeerUserAgent = BS.empty
+ , onlinePeerRelay = False
+ , onlinePeerBestBlock = genesisNode net
+ , onlinePeerAsync = a
+ , onlinePeerMailbox = p
+ , onlinePeerNonce = nonce
+ , onlinePeerPings = []
+ }
+
+peerString :: MonadManager n m => Peer -> m String
+peerString p = maybe "[unknown]" (show . onlinePeerAddress) <$> findPeer p
+
+setPeerAnnounced :: MonadManager n m => Peer -> m ()
+setPeerAnnounced = modifyPeer (\x -> x {onlinePeerConnected = True})
+
+setFilter :: MonadManager n m => BloomFilter -> m ()
+setFilter bl = do
+ bfb <- asks myBloomFilter
+ atomically . writeTVar bfb $ Just bl
+ ops <- getOnlinePeers
+ forM_ ops $ \op ->
+ when (onlinePeerConnected op) $
+ if acceptsFilters $ onlinePeerServices op
+ then bl `peerSetFilter` onlinePeerMailbox op
+ else do
+ $(logErrorS) "Manager" $
+ "Peer " <> cs (show (onlinePeerAddress op)) <>
+ "does not support bloom filters"
+ banPeer (onlinePeerAddress op)
+ onlinePeerAsync op `cancelWith` BloomFiltersNotSupported
+
+findPeer :: MonadManager n m => Peer -> m (Maybe OnlinePeer)
+findPeer p = find ((== p) . onlinePeerMailbox) <$> getOnlinePeers
+
+findPeerAsync :: Async () -> TVar [OnlinePeer] -> STM (Maybe OnlinePeer)
+findPeerAsync a t = find ((== a) . onlinePeerAsync) <$> readTVar t
+
+modifyPeer :: MonadManager n m => (OnlinePeer -> OnlinePeer) -> Peer -> m ()
+modifyPeer f p = modifyOnlinePeers $ map upd
+ where
+ upd op =
+ if onlinePeerMailbox op == p
+ then f op
+ else op
+
+addPeer :: MonadManager n m => OnlinePeer -> m ()
+addPeer op = modifyOnlinePeers $ nubBy f . (op :)
+ where
+ f = (==) `on` onlinePeerMailbox
+
+removePeer :: Async () -> TVar [OnlinePeer] -> STM ()
+removePeer a t = modifyTVar t $ filter ((/= a) . onlinePeerAsync)
+
+getOnlinePeers :: MonadManager n m => m [OnlinePeer]
+getOnlinePeers = asks onlinePeers >>= readTVarIO
+
+modifyOnlinePeers :: MonadManager n m => ([OnlinePeer] -> [OnlinePeer]) -> m ()
+modifyOnlinePeers f = asks onlinePeers >>= atomically . (`modifyTVar` f)
+
+median :: Fractional a => [a] -> Maybe a
+median ls
+ | null ls = Nothing
+ | length ls `mod` 2 == 0 =
+ Just . (/ 2) . sum . take 2 $ drop (length ls `div` 2 - 1) ls
+ | otherwise = Just . head $ drop (length ls `div` 2) ls
diff --git a/src/Network/Haskoin/Node/Node.hs b/src/Network/Haskoin/Node/Node.hs
new file mode 100644
index 0000000..cb6da4b
--- /dev/null
+++ b/src/Network/Haskoin/Node/Node.hs
@@ -0,0 +1,54 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE GADTs #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TemplateHaskell #-}
+module Network.Haskoin.Node.Node
+ ( node
+ ) where
+
+import Control.Concurrent.NQE
+import Control.Monad.Logger
+import Network.Haskoin.Node.Chain
+import Network.Haskoin.Node.Common
+import Network.Haskoin.Node.Manager
+import UnliftIO
+
+node ::
+ ( MonadLoggerIO m
+ , MonadUnliftIO m
+ )
+ => NodeConfig m
+ -> m ()
+node cfg = do
+ psup <- Inbox <$> newTQueueIO
+ $(logInfoS) "Node" "Starting..."
+ supervisor
+ KillAll
+ (nodeSupervisor cfg)
+ [chain chCfg, manager (mgrCfg psup), peerSup psup]
+ where
+ peerSup psup = supervisor (Notify deadPeer) psup []
+ chCfg =
+ ChainConfig
+ { chainConfDB = database cfg
+ , chainConfListener = nodeEvents cfg . ChainEvent
+ , chainConfManager = nodeManager cfg
+ , chainConfChain = nodeChain cfg
+ , chainConfNetwork = nodeNet cfg
+ }
+ mgrCfg psup =
+ ManagerConfig
+ { mgrConfMaxPeers = maxPeers cfg
+ , mgrConfDB = database cfg
+ , mgrConfDiscover = discover cfg
+ , mgrConfMgrListener = nodeEvents cfg . ManagerEvent
+ , mgrConfPeerListener = nodeEvents cfg . PeerEvent
+ , mgrConfNetAddr = netAddress cfg
+ , mgrConfPeers = initPeers cfg
+ , mgrConfManager = nodeManager cfg
+ , mgrConfChain = nodeChain cfg
+ , mgrConfPeerSupervisor = psup
+ , mgrConfNetwork = nodeNet cfg
+ }
+ deadPeer ex = PeerStopped ex `sendSTM` nodeManager cfg
diff --git a/src/Network/Haskoin/Node/Peer.hs b/src/Network/Haskoin/Node/Peer.hs
new file mode 100644
index 0000000..db2a4f8
--- /dev/null
+++ b/src/Network/Haskoin/Node/Peer.hs
@@ -0,0 +1,339 @@
+{-# LANGUAGE ConstraintKinds #-}
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE TemplateHaskell #-}
+{-# LANGUAGE TupleSections #-}
+{-# LANGUAGE TypeFamilies #-}
+module Network.Haskoin.Node.Peer
+ ( peer
+ ) where
+
+import Control.Concurrent.NQE
+import Control.Monad
+import Control.Monad.Logger
+import Control.Monad.Reader
+import Data.Bits
+import Data.ByteString (ByteString)
+import qualified Data.ByteString as BS
+import qualified Data.ByteString.Char8 as C8
+import qualified Data.ByteString.Lazy as BL
+import Data.Conduit
+import qualified Data.Conduit.Binary as CB
+import Data.Conduit.Network
+import Data.Maybe
+import Data.Serialize
+import Data.String
+import Data.String.Conversions
+import Data.Time.Clock
+import Data.Word
+import Network.Haskoin.Block
+import Network.Haskoin.Constants
+import Network.Haskoin.Network
+import Network.Haskoin.Node.Common
+import Network.Haskoin.Transaction
+import Network.Socket (SockAddr)
+import System.Random
+import UnliftIO
+
+type MonadPeer m = (MonadUnliftIO m, MonadLoggerIO m, MonadReader PeerReader m)
+
+data Pending
+ = PendingTx !TxHash
+ | PendingBlock !BlockHash
+ | PendingMerkle !BlockHash
+ | PendingHeaders
+ deriving (Show, Eq)
+
+data PeerReader = PeerReader
+ { mySelf :: !Peer
+ , myConfig :: !PeerConfig
+ , mySockAddr :: !SockAddr
+ , myHostPort :: !(Host, Port)
+ , myPending :: !(TVar [(Pending, Word32)])
+ }
+
+time :: Int
+time = 15 * 1000 * 1000
+
+logMsg :: IsString a => Message -> a
+logMsg = fromString . cs . commandToString . msgType
+
+logPeer ::
+ (ConvertibleStrings String a, Semigroup a, IsString a) => SockAddr -> a
+logPeer sa = "Peer<" <> cs (show sa) <> ">"
+
+peer :: (MonadUnliftIO m, MonadLoggerIO m) => PeerConfig -> Peer -> m ()
+peer pc p =
+ fromSockAddr na >>= \case
+ Nothing -> do
+ $(logErrorS) (logPeer na) "Invalid network address"
+ throwIO PeerAddressInvalid
+ Just (host, port) -> do
+ let cset = clientSettings port (C8.pack host)
+ runGeneralTCPClient cset (peerSession (host, port))
+ where
+ na = naAddress (peerConfConnect pc)
+ go = handshake >> exchangePing >> peerLoop
+ net = peerConfNetwork pc
+ peerSession hp ad = do
+ let src = appSource ad .| inPeerConduit net
+ snk = outPeerConduit net .| appSink ad
+ withSource src p . const $ do
+ pbox <- newTVarIO []
+ let rd =
+ PeerReader
+ { myConfig = pc
+ , mySelf = p
+ , myHostPort = hp
+ , mySockAddr = na
+ , myPending = pbox
+ }
+ runReaderT (runConduit (go .| snk)) rd
+
+handshake :: MonadPeer m => ConduitT () Message m ()
+handshake = do
+ p <- asks mySelf
+ ch <- peerConfChain <$> asks myConfig
+ rmt <- peerConfConnect <$> asks myConfig
+ loc <- peerConfLocal <$> asks myConfig
+ net <- peerConfNetwork <$> asks myConfig
+ nonce <- peerConfNonce <$> asks myConfig
+ bb <- chainGetBest ch
+ ver <- buildVersion net nonce (nodeHeight bb) loc rmt
+ yield $ MVersion ver
+ lift (remoteVer p) >>= \case
+ v
+ | testSegWit net v -> do
+ yield MVerAck
+ lift (remoteVerAck p)
+ mgr <- peerConfManager <$> asks myConfig
+ managerSetPeerVersion p v mgr
+ | otherwise -> do
+ yield . MReject $
+ reject MCVersion RejectObsolete "No SegWit support"
+ throwIO PeerNoSegWit
+ where
+ testSegWit net v
+ | getSegWit net = services v `testBit` 3
+ | otherwise = True
+ remoteVer p = do
+ m <-
+ timeout time . receiveMatch p $ \case
+ PeerIncoming (MVersion v) -> Just v
+ _ -> Nothing
+ case m of
+ Just v -> return v
+ Nothing -> throwIO PeerTimeout
+ remoteVerAck p = do
+ m <-
+ timeout time . receiveMatch p $ \case
+ PeerIncoming MVerAck -> Just ()
+ _ -> Nothing
+ when (isNothing m) $ throwIO PeerTimeout
+
+peerLoop :: MonadPeer m => ConduitT () Message m ()
+peerLoop =
+ forever $ do
+ me <- asks mySelf
+ m <- lift $ timeout (2 * 60 * 1000 * 1000) (receive me)
+ case m of
+ Nothing -> exchangePing
+ Just msg -> processMessage msg
+
+exchangePing :: MonadPeer m => ConduitT () Message m ()
+exchangePing = do
+ lp <- logMe
+ i <- liftIO randomIO
+ yield $ MPing (Ping i)
+ me <- asks mySelf
+ mgr <- peerConfManager <$> asks myConfig
+ t1 <- liftIO getCurrentTime
+ m <-
+ lift . timeout time . receiveMatch me $ \case
+ PeerIncoming (MPong (Pong j))
+ | i == j -> Just ()
+ _ -> Nothing
+ case m of
+ Nothing -> do
+ $(logErrorS) lp "Timeout while waiting for pong"
+ throwIO PeerTimeout
+ Just () -> do
+ t2 <- liftIO getCurrentTime
+ let d = t2 `diffUTCTime` t1
+ $(logDebugS) lp $
+ "Roundtrip: " <> cs (show (d * 1000)) <> " ms"
+ ManagerPeerPing me d `send` mgr
+
+checkStale :: MonadPeer m => ConduitM () Message m ()
+checkStale = do
+ pbox <- asks myPending
+ ps <- readTVarIO pbox
+ case ps of
+ [] -> return ()
+ (_, ts):_ -> do
+ cur <- computeTime
+ when (cur > ts + 30) $ throwIO PeerTimeout
+
+registerOutgoing :: MonadPeer m => Message -> m ()
+registerOutgoing (MGetData (GetData ivs)) = do
+ pbox <- asks myPending
+ cur <- computeTime
+ ms <-
+ fmap catMaybes . forM ivs $ \iv ->
+ case toPending iv of
+ Nothing -> return Nothing
+ Just p -> return $ Just (p, cur)
+ atomically (modifyTVar pbox (++ ms))
+ where
+ toPending InvVector {invType = InvTx, invHash = hash} =
+ Just (PendingTx (TxHash hash))
+ toPending InvVector {invType = InvWitnessTx, invHash = hash} =
+ Just (PendingTx (TxHash hash))
+ toPending InvVector {invType = InvBlock, invHash = hash} =
+ Just (PendingBlock (BlockHash hash))
+ toPending InvVector {invType = InvWitnessBlock, invHash = hash} =
+ Just (PendingBlock (BlockHash hash))
+ toPending InvVector {invType = InvMerkleBlock, invHash = hash} =
+ Just (PendingMerkle (BlockHash hash))
+ toPending InvVector {invType = InvWitnessMerkleBlock, invHash = hash} =
+ Just (PendingMerkle (BlockHash hash))
+ toPending _ = Nothing
+registerOutgoing MGetHeaders {} = do
+ pbox <- asks myPending
+ cur <- computeTime
+ atomically (modifyTVar pbox (reverse . ((PendingHeaders, cur) :) . reverse))
+registerOutgoing _ = return ()
+
+registerIncoming :: MonadPeer m => Message -> m ()
+registerIncoming (MNotFound (NotFound ivs)) =
+ asks myPending >>= \pbox ->
+ atomically (modifyTVar pbox (filter (matchNotFound . fst)))
+ where
+ matchNotFound (PendingTx (TxHash hash)) =
+ InvVector InvTx hash `notElem` ivs &&
+ InvVector InvWitnessTx hash `notElem` ivs
+ matchNotFound (PendingBlock (BlockHash hash)) =
+ InvVector InvBlock hash `notElem` ivs &&
+ InvVector InvWitnessBlock hash `notElem` ivs
+ matchNotFound (PendingMerkle (BlockHash hash)) =
+ InvVector InvBlock hash `notElem` ivs &&
+ InvVector InvMerkleBlock hash `notElem` ivs &&
+ InvVector InvWitnessMerkleBlock hash `notElem` ivs
+ matchNotFound _ = False
+registerIncoming (MTx t) =
+ asks myPending >>= \pbox ->
+ atomically (modifyTVar pbox (filter ((/= PendingTx (txHash t)) . fst)))
+registerIncoming (MBlock b) =
+ asks myPending >>= \pbox ->
+ atomically $
+ modifyTVar
+ pbox
+ (filter ((/= PendingBlock (headerHash (blockHeader b))) . fst))
+registerIncoming (MMerkleBlock b) =
+ asks myPending >>= \pbox ->
+ atomically $
+ modifyTVar
+ pbox
+ (filter ((/= PendingMerkle (headerHash (merkleHeader b))) . fst))
+registerIncoming MHeaders {} =
+ asks myPending >>= \pbox ->
+ atomically $ modifyTVar pbox (filter ((/= PendingHeaders) . fst))
+registerIncoming _ = return ()
+
+processMessage :: MonadPeer m => PeerMessage -> ConduitM () Message m ()
+processMessage m = do
+ checkStale
+ case m of
+ PeerOutgoing msg -> do
+ lift (registerOutgoing msg)
+ yield msg
+ PeerIncoming msg -> do
+ lift (registerIncoming msg)
+ incoming msg
+
+logMe ::
+ ( ConvertibleStrings String a
+ , Semigroup a
+ , IsString a
+ , MonadReader PeerReader m
+ )
+ => m a
+logMe = logPeer <$> asks mySockAddr
+
+incoming :: MonadPeer m => Message -> ConduitT () Message m ()
+incoming m = do
+ lp <- lift logMe
+ p <- asks mySelf
+ l <- peerConfListener <$> asks myConfig
+ mgr <- peerConfManager <$> asks myConfig
+ ch <- peerConfChain <$> asks myConfig
+ case m of
+ MVersion _ -> do
+ $(logErrorS) lp $ "Received duplicate " <> logMsg m
+ yield $
+ MReject
+ Reject
+ { rejectMessage = MCVersion
+ , rejectCode = RejectDuplicate
+ , rejectReason = VarString BS.empty
+ , rejectData = BS.empty
+ }
+ MPing (Ping n) -> yield $ MPong (Pong n)
+ MPong (Pong n) -> atomically (l (p, GotPong n))
+ MSendHeaders {} -> ChainSendHeaders p `send` ch
+ MAlert {} -> $(logWarnS) lp $ "Deprecated " <> logMsg m
+ MAddr (Addr as) -> managerNewPeers p as mgr
+ MInv (Inv is) -> do
+ let ts = [TxHash (invHash i) | i <- is, invType i == InvTx]
+ bs =
+ [ BlockHash (invHash i)
+ | i <- is
+ , invType i == InvBlock || invType i == InvMerkleBlock
+ ]
+ unless (null ts) $ atomically $ l (p, TxAvail ts)
+ unless (null bs) $ ChainNewBlocks p bs `send` ch
+ MTx tx -> atomically (l (p, GotTx tx))
+ MBlock b -> atomically (l (p, GotBlock b))
+ MMerkleBlock b -> atomically (l (p, GotMerkleBlock b))
+ MHeaders (Headers hcs) -> ChainNewHeaders p hcs `send` ch
+ MGetData (GetData d) -> atomically (l (p, SendData d))
+ MNotFound (NotFound ns) -> do
+ let f (InvVector InvTx hash) = Just (TxNotFound (TxHash hash))
+ f (InvVector InvWitnessTx hash) =
+ Just (TxNotFound (TxHash hash))
+ f (InvVector InvBlock hash) =
+ Just (BlockNotFound (BlockHash hash))
+ f (InvVector InvWitnessBlock hash) =
+ Just (BlockNotFound (BlockHash hash))
+ f (InvVector InvMerkleBlock hash) =
+ Just (BlockNotFound (BlockHash hash))
+ f (InvVector InvWitnessMerkleBlock hash) =
+ Just (BlockNotFound (BlockHash hash))
+ f _ = Nothing
+ events = mapMaybe f ns
+ atomically (mapM_ (l . (p, )) events)
+ MGetBlocks g -> atomically (l (p, SendBlocks g))
+ MGetHeaders h -> atomically (l (p, SendHeaders h))
+ MReject r -> atomically (l (p, Rejected r))
+ MMempool -> atomically (l (p, WantMempool))
+ MGetAddr -> managerGetAddr p mgr
+ _ -> $(logWarnS) lp $ "Ignoring message: " <> logMsg m
+
+inPeerConduit :: MonadIO m => Network -> ConduitT ByteString PeerMessage m ()
+inPeerConduit net = do
+ headerBytes <- CB.take 24
+ when (BL.null headerBytes) $ throwIO MessageHeaderEmpty
+ case decodeLazy headerBytes of
+ Left e -> throwIO $ DecodeMessageError e
+ Right (MessageHeader _ _cmd len _) -> do
+ when (len > 32 * 2 ^ (20 :: Int)) . throwIO $ PayloadTooLarge len
+ payloadBytes <- CB.take (fromIntegral len)
+ case runGetLazy (getMessage net) $ headerBytes `BL.append` payloadBytes of
+ Left e -> throwIO $ CannotDecodePayload e
+ Right msg -> yield $ PeerIncoming msg
+ inPeerConduit net
+
+outPeerConduit :: Monad m => Network -> ConduitT Message ByteString m ()
+outPeerConduit net = awaitForever $ yield . runPut . putMessage net
diff --git a/test/Spec.hs b/test/Spec.hs
new file mode 100644
index 0000000..ab94c4b
--- /dev/null
+++ b/test/Spec.hs
@@ -0,0 +1,233 @@
+{-# LANGUAGE FlexibleContexts #-}
+{-# LANGUAGE LambdaCase #-}
+{-# LANGUAGE MultiParamTypeClasses #-}
+{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE RecordWildCards #-}
+import Control.Concurrent.NQE
+import Control.Monad
+import Control.Monad.Logger
+import Control.Monad.Trans
+import qualified Data.ByteString as BS
+import Data.Either
+import Data.Maybe
+import Data.Serialize
+import qualified Database.RocksDB as RocksDB
+import Network.Haskoin.Address
+import Network.Haskoin.Block
+import Network.Haskoin.Constants
+import Network.Haskoin.Keys
+import Network.Haskoin.Network
+import Network.Haskoin.Node
+import Network.Haskoin.Transaction
+import Network.Socket (SockAddr (..))
+import System.Random
+import Test.Hspec
+import UnliftIO
+
+data TestNode = TestNode
+ { testMgr :: Manager
+ , testChain :: Chain
+ , testEvents :: Inbox NodeEvent
+ }
+
+main :: IO ()
+main = do
+ let net = btcTest
+ hspec . describe "peer-to-peer client" $ do
+ it "connects to a peer" $
+ withTestNode net "connect-one-peer" $ \TestNode {..} -> do
+ p <-
+ receiveMatch testEvents $ \case
+ ManagerEvent (ManagerConnect p) -> Just p
+ _ -> Nothing
+ v <-
+ fromMaybe (error "No version") <$>
+ managerGetPeerVersion p testMgr
+ v `shouldSatisfy` (>= 70002)
+ bb <-
+ fromMaybe (error "No best block") <$>
+ managerGetPeerBest p testMgr
+ bb `shouldBe` genesisNode net
+ it "downloads some blocks" $
+ withTestNode net "get-blocks" $ \TestNode {..} -> do
+ let hs = [h1, h2]
+ h1 =
+ "000000000babf10e26f6cba54d9c282983f1d1ce7061f7e875b58f8ca47db932"
+ h2 =
+ "00000000851f278a8b2c466717184aae859af5b83c6f850666afbc349cf61577"
+ p <-
+ receiveMatch testEvents $ \case
+ ManagerEvent (ManagerConnect p) -> Just p
+ _ -> Nothing
+ peerGetBlocks net p hs
+ b1 <-
+ receiveMatch testEvents $ \case
+ PeerEvent (p', GotBlock b)
+ | p == p' -> Just b
+ _ -> Nothing
+ b2 <-
+ receiveMatch testEvents $ \case
+ PeerEvent (p', GotBlock b)
+ | p == p' -> Just b
+ _ -> Nothing
+ headerHash (blockHeader b1) `shouldSatisfy` (`elem` hs)
+ headerHash (blockHeader b2) `shouldSatisfy` (`elem` hs)
+ let testMerkle b =
+ merkleRoot (blockHeader b) `shouldBe`
+ buildMerkleRoot (map txHash (blockTxns b))
+ testMerkle b1
+ testMerkle b2
+ it "downloads some merkle blocks" $
+ withTestNode net "get-merkle-blocks" $ \TestNode {..} -> do
+ let a =
+ fromJust $
+ stringToAddr net "mgpS4Zis8iwNhriKMro1QSGDAbY6pqzRtA"
+ k :: PubKey
+ k =
+ "02c3cface1777c70251cb206f7c80cabeae195dfbeeff0767cbd2a58d22be383da"
+ h1 =
+ "000000006cf9d53d65522002a01d8c7091c78d644106832bc3da0b7644f94d36"
+ h2 =
+ "000000000babf10e26f6cba54d9c282983f1d1ce7061f7e875b58f8ca47db932"
+ bhs = [h1, h2]
+ n <- randomIO
+ let f0 = bloomCreate 2 0.001 n BloomUpdateAll
+ f1 = bloomInsert f0 $ exportPubKey True k
+ f2 = bloomInsert f1 $ encode $ getAddrHash160 a
+ f2 `setManagerFilter` testMgr
+ p <-
+ receiveMatch testEvents $ \case
+ ManagerEvent (ManagerConnect p) -> Just p
+ _ -> Nothing
+ getMerkleBlocks p bhs
+ b1 <-
+ receiveMatch testEvents $ \case
+ PeerEvent (p', GotMerkleBlock b)
+ | p == p' -> Just b
+ _ -> Nothing
+ b2 <-
+ receiveMatch testEvents $ \case
+ PeerEvent (p', GotMerkleBlock b)
+ | p == p' -> Just b
+ _ -> Nothing
+ liftIO $ do
+ a `shouldBe` pubKeyAddr net (wrapPubKey True k)
+ b1 `shouldSatisfy` testMerkleRoot net
+ b2 `shouldSatisfy` testMerkleRoot net
+ it "connects to multiple peers" $
+ withTestNode net "connect-peers" $ \TestNode {..} -> do
+ replicateM_ 3 $ do
+ pc <- receive testEvents
+ case pc of
+ ManagerEvent (ManagerDisconnect _) ->
+ expectationFailure "Received peer disconnection"
+ _ -> return ()
+ ps <- managerGetPeers testMgr
+ length ps `shouldSatisfy` (>= 2)
+ it "connects and syncs some headers" $
+ withTestNode net "connect-sync" $ \TestNode {..} -> do
+ let h =
+ "000000009ec921df4bb16aedd11567e27ede3c0b63835b257475d64a059f102b"
+ hs =
+ [ "0000000005bdbddb59a3cd33b69db94fa67669c41d9d32751512b5d7b68c71cf"
+ , "00000000185b36fa6e406626a722793bea80531515e0b2a99ff05b73738901f1"
+ , "000000001ab69b12b73ccdf46c9fbb4489e144b54f1565e42e481c8405077bdd"
+ ]
+ bns <-
+ replicateM 3 . receiveMatch testEvents $ \case
+ ChainEvent (ChainNewBest bn) -> Just bn
+ _ -> Nothing
+ bb <- chainGetBest testChain
+ an <-
+ fromMaybe (error "No ancestor found") <$>
+ chainGetAncestor 2357 (last bns) testChain
+ map (headerHash . nodeHeader) bns `shouldBe` hs
+ nodeHeight bb `shouldSatisfy` (>= 6000)
+ headerHash (nodeHeader an) `shouldBe` h
+ it "downloads a single block" $
+ withTestNode net "download-block" $ \TestNode {..} -> do
+ let h =
+ "000000009ec921df4bb16aedd11567e27ede3c0b63835b257475d64a059f102b"
+ p <-
+ receiveMatch testEvents $ \case
+ ManagerEvent (ManagerConnect p) -> Just p
+ _ -> Nothing
+ peerGetBlocks net p [h]
+ b <-
+ receiveMatch testEvents $ \case
+ PeerEvent (p', GotBlock b)
+ | p == p' -> Just b
+ _ -> Nothing
+ headerHash (blockHeader b) `shouldBe` h
+ it "attempts to get inexistent things" $
+ withTestNode net "download-fail" $ \TestNode {..} -> do
+ let h =
+ TxHash .
+ fromRight (error "We will, we will rock you!") . decode $
+ BS.replicate 32 0xaa
+ p <-
+ receiveMatch testEvents $ \case
+ ManagerEvent (ManagerConnect p) -> Just p
+ _ -> Nothing
+ peerGetTxs net p [h]
+ n <-
+ receiveMatch testEvents $ \case
+ PeerEvent (p', TxNotFound n)
+ | p == p' -> Just n
+ _ -> Nothing
+ n `shouldBe` h
+ it "downloads some block parents" $
+ withTestNode net "parents" $ \TestNode {..} -> do
+ let hs =
+ [ "00000000c74a24e1b1f2c04923c514ed88fc785cf68f52ed0ccffd3c6fe3fbd9"
+ , "000000007e5c5f40e495186ac4122f2e4ee25788cc36984a5760c55ecb376cb1"
+ , "00000000a6299059b2bff3479bc569019792e75f3c0f39b10a0bc85eac1b1615"
+ ]
+ bn <-
+ receiveMatch testEvents $ \case
+ ChainEvent (ChainNewBest bn) -> Just bn
+ _ -> Nothing
+ nodeHeight bn `shouldBe` 2000
+ ps <- chainGetParents 1997 bn testChain
+ length ps `shouldBe` 3
+ forM_ (zip ps hs) $ \(p, h) ->
+ headerHash (nodeHeader p) `shouldBe` h
+
+withTestNode ::
+ (MonadUnliftIO m)
+ => Network
+ -> String
+ -> (TestNode -> m ())
+ -> m ()
+withTestNode net t f =
+ runNoLoggingT . withSystemTempDirectory ("haskoin-node-test-" <> t <> "-") $ \w -> do
+ events <- Inbox <$> liftIO newTQueueIO
+ ch <- Inbox <$> liftIO newTQueueIO
+ ns <- Inbox <$> liftIO newTQueueIO
+ mgr <- Inbox <$> liftIO newTQueueIO
+ db <-
+ RocksDB.open
+ w
+ RocksDB.defaultOptions
+ { RocksDB.createIfMissing = True
+ , RocksDB.compression = RocksDB.SnappyCompression
+ }
+ let cfg =
+ NodeConfig
+ { maxPeers = 20
+ , database = db
+ , initPeers = []
+ , discover = True
+ , nodeEvents = (`sendSTM` events)
+ , netAddress = NetworkAddress 0 (SockAddrInet 0 0)
+ , nodeSupervisor = ns
+ , nodeChain = ch
+ , nodeManager = mgr
+ , nodeNet = net
+ }
+ withAsync (node cfg) $ \nd -> do
+ link nd
+ lift $
+ f TestNode {testMgr = mgr, testChain = ch, testEvents = events}
+ stopSupervisor ns
+ wait nd
diff --git a/tests/Main.hs b/tests/Main.hs
deleted file mode 100644
index 656fa5e..0000000
--- a/tests/Main.hs
+++ /dev/null
@@ -1,13 +0,0 @@
-module Main where
-
-import Test.Framework (defaultMain)
-
-import qualified Network.Haskoin.Node.Tests (tests)
-import qualified Network.Haskoin.Node.Units (tests)
-
-main :: IO ()
-main = defaultMain
- ( Network.Haskoin.Node.Tests.tests
- ++ Network.Haskoin.Node.Units.tests
- )
-
diff --git a/tests/Network/Haskoin/Node/Tests.hs b/tests/Network/Haskoin/Node/Tests.hs
deleted file mode 100644
index 2173de4..0000000
--- a/tests/Network/Haskoin/Node/Tests.hs
+++ /dev/null
@@ -1,12 +0,0 @@
-module Network.Haskoin.Node.Tests (tests) where
-
-import Test.Framework (Test, testGroup)
--- import Test.Framework.Providers.QuickCheck2 (testProperty)
-
-tests :: [Test]
-tests =
- [ testGroup "Serialize & de-serialize haskoin node types to JSON"
- [
- ]
- ]
-
diff --git a/tests/Network/Haskoin/Node/Units.hs b/tests/Network/Haskoin/Node/Units.hs
deleted file mode 100644
index 9d28dff..0000000
--- a/tests/Network/Haskoin/Node/Units.hs
+++ /dev/null
@@ -1,252 +0,0 @@
-{-# LANGUAGE OverloadedStrings #-}
-module Network.Haskoin.Node.Units where
-import Control.Monad (forM_, when)
-import Control.Monad.Logger (NoLoggingT)
-import Control.Monad.Trans (MonadIO, liftIO)
-import Control.Monad.Trans.Resource (ResourceT)
-import Data.Maybe (fromJust, isNothing,
- maybeToList)
-import Data.Word (Word32)
-import Database.Persist.Sqlite (SqlPersistT,
- runMigrationSilent, runSqlite)
-import Network.Haskoin.Block
-import Network.Haskoin.Constants
-import Network.Haskoin.Node.HeaderTree
-import Test.Framework (Test, testGroup)
-import Test.Framework.Providers.HUnit (testCase)
-import Test.HUnit (Assertion, assertBool,
- assertEqual, assertFailure)
-
--- TODO: Make sure that evalNewChain for a partially overlapping best chain
--- properly evaluates to BestChain.
-
-type App = SqlPersistT (NoLoggingT (ResourceT IO))
-
-tests :: [Test]
-tests =
- [ testGroup "Header Tree"
- [ testCase "Initalization successful" $ runUnit initialize
- , testCase "Add second block" $ runUnit addSecondBlock
- , testCase "Blockchain head correct" $ runUnit blockChainHead
- , testCase "Find fork node" $ runUnit forkNode
- , testCase "Find fork node (non-head)" $ runUnit forkNodeNonHead
- , testCase "Find fork node (same chain)" $ runUnit forkNodeSameChain
- , testCase "Get best chain" $ runUnit getBestChain
- , testCase "Get side chain" $ runUnit getSideChain
- , testCase "Nodes at height" $ runUnit getNodesHeight
- , testCase "Block locator to head" $ runUnit blockLocatorToHead
- , testCase "Block locator to non-head" $ runUnit blockLocatorToNode
- , testCase "Find split node" $ runUnit splitNode
- ]
- ]
-
-initialize :: App ()
-initialize = do
- initHeaderTree
- bM <- getBlockByHash (headerHash genesisHeader)
- liftIO $ assertEqual "Genesis node in header tree" (Just genesisBlock) bM
- hs <- getHeads
- liftIO $ assertEqual "Genesis node is only head" [genesisBlock] hs
- bh <- getBestBlock
- liftIO $ assertEqual "Genesis node matches best header" genesisBlock bh
-
-addSecondBlock :: App ()
-addSecondBlock = do
- initHeaderTree
- let block = head chain0
- liftIO $ assertEqual "Block builds on genesis block"
- (headerHash genesisHeader)
- (nodePrev block)
- putBlock block
- block' <- getBlockByHash $ nodeHash block
- liftIO $ assertEqual "Block can be retrieved" (Just block) block'
-
-blockChainHead :: App ()
-blockChainHead = mockBlockChain >> do
- heads <- getHeads
- liftIO $ assertEqual "Heads match"
- [last chain0, last chain1, last chain2, last chain3]
- heads
- bh <- getBestBlock
- liftIO $ assertEqual "Best block has correct hash"
- (nodeHash $ last chain3) (nodeHash bh)
- liftIO $ assertEqual "Best block height is right"
- (nodeBlockHeight $ last chain3) (nodeBlockHeight bh)
-
-forkNode :: App ()
-forkNode = mockBlockChain >> do
- let l = last chain2
- r = last chain3
- bn <- splitBlock l r
-
- liftIO $ assertEqual "Split block are correct"
- (chain0 !! 1) bn
-
- commonLM <- getBlockByHeight l $ nodeBlockHeight bn
- when (isNothing commonLM) $ liftIO $
- assertFailure "Could not find fork on left side"
- let commonL = fromJust commonLM
-
- commonRM <- getBlockByHeight r $ nodeBlockHeight bn
- when (isNothing commonRM) $ liftIO $
- assertFailure "Could not find fork on right side"
- let commonR = fromJust commonRM
-
- firstLM <- getBlockByHeight l (nodeBlockHeight bn + 1)
- when (isNothing firstLM) $ liftIO $
- assertFailure "Could not find fork child on left side"
- let firstL = fromJust firstLM
-
- firstRM <- getBlockByHeight r (nodeBlockHeight bn + 1)
- when (isNothing firstLM) $ liftIO $
- assertFailure "Could not find fork child on right side"
- let firstR = fromJust firstRM
-
- liftIO $ assertEqual "Fork node is same in both sides" commonL commonR
- liftIO $ assertEqual "Fork node connect with left side"
- (nodeHash commonL)
- (nodePrev firstL)
- liftIO $ assertEqual "Fork node connect with right side"
- (nodeHash commonR)
- (nodePrev firstR)
- liftIO $ assertBool "After-fork chains diverge" $ firstL /= firstR
- liftIO $ assertEqual "Fork node matches hardcoded one"
- (chain0 !! 1) commonL
-
-forkNodeNonHead :: App ()
-forkNodeNonHead = mockBlockChain >> do
- let l = chain2 !! 1
- r = chain1 !! 1
- height <- nodeBlockHeight <$> splitBlock l r
- splitM <- getBlockByHeight l height
- liftIO $ assertEqual "Fork node is correct" (Just $ chain1 !! 1) splitM
-
-forkNodeSameChain :: App ()
-forkNodeSameChain = mockBlockChain >> do
- let l = chain3 !! 5
- r = chain3 !! 3
- height <- nodeBlockHeight <$> splitBlock l r
- splitM <- getBlockByHeight r height
- liftIO $ assertEqual "Fork node is correct" (Just $ chain3 !! 3) splitM
-
-getBestChain :: App ()
-getBestChain = mockBlockChain >> do
- h <- getBestBlock
- ch <- getBlocksFromHeight h 0 0
- liftIO $ assertEqual "Best chain correct" bch ch
- where
- bch = genesisBlock : take 2 chain0 ++ chain3
-
-getSideChain :: App ()
-getSideChain = mockBlockChain >> do
- ch <- getBlocksFromHeight (chain2 !! 1) 0 0
- liftIO $ assertEqual "Side chain correct" sch ch
- where
- sch = genesisBlock :
- take 3 chain0 ++ take 2 chain1 ++ take 2 chain2
-
-getNodesHeight :: App ()
-getNodesHeight = mockBlockChain >> do
- ns <- getBlocksAtHeight 3
- liftIO $ assertEqual "Nodes at height match" hns ns
- where
- hns = [chain0 !! 2, head chain3]
-
-blockLocatorToHead :: App ()
-blockLocatorToHead = do
- mockBlockChain
- putBlocks bs
- h <- getBestBlock
- liftIO $ assertEqual "Head matches" (last bs) h
- ls <- blockLocator h
- liftIO $ assertEqual "Last is genesis"
- (last ls)
- (headerHash genesisHeader)
- liftIO $ assertEqual "First is current head"
- (head ls)
- (nodeHash h)
- last10 <- map nodeHash . reverse <$>
- getBlocksFromHeight h 0 (nodeBlockHeight h - 9)
- liftIO $ assertEqual "Last ten blocks contiguous"
- last10
- (take 10 ls)
- let h10 = nodeBlockHeight h - 10
- bhs <- map (nodeHash . fromJust) <$>
- mapM (getBlockByHeight h)
- [h10, h10 - 2, h10 - 6, h10 - 14, h10 - 30, h10 - 62]
- liftIO $ assertEqual "All block hashes correct"
- (last10 ++ bhs ++ [headerHash genesisHeader])
- ls
- where
- bs = manyBlocks $ last chain1
-
-blockLocatorToNode :: App ()
-blockLocatorToNode = do
- mockBlockChain
- putBlocks bs
- n <- fromJust <$> getBlockByHash (nodeHash $ chain3 !! 4)
- ls <- blockLocator n
- xs <- map nodeHash . reverse <$>
- getBlocksFromHeight n 0 0
- liftIO $ assertEqual "Block locator for non-head node is correct" xs ls
- where
- bs = manyBlocks $ last chain1
-
-splitNode :: App ()
-splitNode = do
- mockBlockChain
- (split, ls, rs) <- splitChains (last chain2, 0) (last chain3, 0)
- liftIO $ assertEqual "Split node correct" (chain0 !! 1) split
- liftIO $ assertEqual "Left correct"
- ([chain0 !! 2] ++ take 2 chain1 ++ chain2)
- ls
- liftIO $ assertEqual "Right correct" chain3 rs
-
-runUnit :: App () -> Assertion
-runUnit action = runSqlite ":memory:" $ do
- _ <- runMigrationSilent migrateHeaderTree
- action
-
-mockBlockChain :: MonadIO m => SqlPersistT m ()
-mockBlockChain = do
- initHeaderTree
- forM_ (concat [chain0, chain1, chain2, chain3]) putBlock
-
-manyBlocks :: NodeBlock -> [NodeBlock]
-manyBlocks b =
- tail $ reverse $ foldBlock (Just b) $ zip [18..117] (repeat 4)
-
-
-chain0 :: [NodeBlock]
-chain0 =
- tail $ reverse $ foldBlock Nothing $ zip [1..4] (repeat 0)
-
-chain1 :: [NodeBlock]
-chain1 =
- tail $ reverse $ foldBlock (Just $ chain0 !! 2) $ zip [5..7] (repeat 1)
-
-chain2 :: [NodeBlock]
-chain2 =
- tail $ reverse $ foldBlock (Just $ chain1 !! 1) $ zip [8..10] (repeat 2)
-
-chain3 :: [NodeBlock]
-chain3 =
- tail $ reverse $ foldBlock (Just $ chain0 !! 1) $ zip [11..17] (repeat 3)
-
-foldBlock :: Maybe NodeBlock -> [(Word32, Word32)] -> [NodeBlock]
-foldBlock nM =
- foldl f (maybeToList nM)
- where
- f [] _ = [genesisBlock]
- f ls@(l:_) (n, chain) = mockBlock l chain n : ls
-
-mockBlock :: NodeBlock -> Word32 -> Word32 -> NodeBlock
-mockBlock parent chain n = nodeBlock parent chain bh
- where
- bh = createBlockHeader
- (blockVersion $ nodeHeader parent)
- (nodeHash parent)
- "0000000000000000000000000000000000000000000000000000000000000000"
- (nodeTimestamp parent + 600)
- (blockBits $ nodeHeader parent)
- n