Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ encry {
networkChunkSize = 1000
localOnly = false
# List of peers we will connecting to
knownPeers = ["172.16.11.11:9001", "172.16.11.12:9001", "172.16.11.13:9001", "172.16.11.14:9001",
"172.16.11.15:9001", "172.16.11.16:9001", "172.16.11.17:9001", "172.16.11.18:9001",
"172.16.11.19:9001", "172.16.11.20:9001"]
knownPeers = []
#["172.16.11.11:9001", "172.16.11.12:9001", "172.16.11.13:9001", "172.16.11.14:9001",
# "172.16.11.15:9001", "172.16.11.16:9001", "172.16.11.17:9001", "172.16.11.18:9001",
# "172.16.11.19:9001", "172.16.11.20:9001"]
# Maximum number of connected peers
maxConnections = 20
# Time, after which connection will be closed
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/encry/network/DeliveryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class DeliveryManager(influxRef: Option[ActorRef],
s"${previousModifier.map(Algos.encode)}")
requestDownload(modifierTypeId, Seq(modifiersId), history, isBlockChainSynced, isMining)

case PeersForSyncInfo(peers) => sendSync(history.syncInfo, peers)
case PeersForSyncInfo(peers) => sendSync(history.getLastSyncInfo, peers)

case FullBlockChainIsSynced => context.become(basicMessageHandler(history, isBlockChainSynced = true, isMining, checkModScheduler))

Expand Down
49 changes: 24 additions & 25 deletions src/main/scala/encry/view/ModifiersCache.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package encry.view

import com.typesafe.scalalogging.StrictLogging
import encry.view.history.History
import encry.view.history.HistoryModifiersValidator
import encry.view.history.ValidationError.{FatalValidationError, NonFatalValidationError}
import org.encryfoundation.common.modifiers.PersistentModifier
import org.encryfoundation.common.modifiers.history.Header
Expand All @@ -11,7 +11,6 @@ import scala.annotation.tailrec
import scala.collection.immutable.SortedMap
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import encry.EncryApp.settings

object ModifiersCache extends StrictLogging {

Expand All @@ -30,7 +29,7 @@ object ModifiersCache extends StrictLogging {

def contains(key: Key): Boolean = cache.contains(key)

def put(key: Key, value: PersistentModifier, history: History): Unit = if (!contains(key)) {
def put(key: Key, value: PersistentModifier, validator: HistoryModifiersValidator): Unit = if (!contains(key)) {
logger.debug(s"Put ${value.encodedId} of type ${value.modifierTypeId} to cache.")
cache.put(key, value)
value match {
Expand All @@ -43,8 +42,8 @@ object ModifiersCache extends StrictLogging {
case _ =>
}

if (size > history.settings.node.modifiersCacheSize) cache.find { case (_, modifier) =>
history.testApplicable(modifier) match {
if (size > validator.settings.node.modifiersCacheSize) cache.find { case (_, modifier) =>
validator.testApplicable(modifier) match {
case Right(_) | Left(_: NonFatalValidationError) => false
case _ => true
}
Expand All @@ -56,15 +55,15 @@ object ModifiersCache extends StrictLogging {
cache.remove(key)
}

def popCandidate(history: History): List[PersistentModifier] = synchronized {
findCandidateKey(history).flatMap(k => remove(k))
def popCandidate(validator: HistoryModifiersValidator): List[PersistentModifier] = synchronized {
findCandidateKey(validator).flatMap(k => remove(k))
}

override def toString: String = cache.keys.map(key => Algos.encode(key.toArray)).mkString(",")

def findCandidateKey(history: History): List[Key] = {
def findCandidateKey(validator: HistoryModifiersValidator): List[Key] = {

def isApplicable(key: Key): Boolean = cache.get(key).exists(modifier => history.testApplicable(modifier) match {
def isApplicable(key: Key): Boolean = cache.get(key).exists(modifier => validator.testApplicable(modifier) match {
case Left(_: FatalValidationError) => remove(key); false
case Right(_) => true
case Left(_) => false
Expand All @@ -82,60 +81,60 @@ object ModifiersCache extends StrictLogging {
}

def findApplicablePayloadAtHeight(height: Int): List[Key] = {
history.headerIdsAtHeight(height).view.flatMap(history.getHeaderById).collect {
validator.headerIdsAtHeight(height).view.flatMap(validator.getHeaderById).collect {
case header: Header if isApplicable(new mutable.WrappedArray.ofByte(header.payloadId)) =>
new mutable.WrappedArray.ofByte(header.payloadId)
}
}.toList

def exhaustiveSearch: List[Key] = List(cache.find { case (k, v) =>
v match {
case _: Header if history.getBestHeaderId.exists(headerId => headerId sameElements v.parentId) => true
case _: Header if validator.getBestHeaderId.exists(headerId => headerId sameElements v.parentId) => true
case _ =>
val isApplicableMod: Boolean = isApplicable(k)
isApplicableMod
}
}).collect { case Some(v) => v._1 }

@tailrec
def applicableBestPayloadChain(atHeight: Int = history.getBestBlockHeight, prevKeys: List[Key] = List.empty[Key]): List[Key] = {
def applicableBestPayloadChain(atHeight: Int = validator.getBestBlockHeight, prevKeys: List[Key] = List.empty[Key]): List[Key] = {
val payloads = findApplicablePayloadAtHeight(atHeight)
if (payloads.nonEmpty) applicableBestPayloadChain(atHeight + 1, prevKeys ++ payloads)
else prevKeys
}

val bestHeadersIds: List[Key] = {
headersCollection.get(history.getBestHeaderHeight + 1) match {
headersCollection.get(validator.getBestHeaderHeight + 1) match {
case Some(value) =>
headersCollection = headersCollection - (history.getBestHeaderHeight + 1)
headersCollection = headersCollection - (validator.getBestHeaderHeight + 1)
logger.debug(s"HeadersCollection size is: ${headersCollection.size}")
logger.debug(s"Drop height ${history.getBestHeaderHeight + 1} in HeadersCollection")
logger.debug(s"Drop height ${validator.getBestHeaderHeight + 1} in HeadersCollection")
val res = value.map(cache.get(_)).collect {
case Some(v: Header)
if ((v.parentId sameElements history.getBestHeaderId.getOrElse(Array.emptyByteArray)) ||
(history.getBestHeaderHeight == history.settings.constants.PreGenesisHeight &&
if ((v.parentId sameElements validator.getBestHeaderId.getOrElse(Array.emptyByteArray)) ||
(validator.getBestHeaderHeight == validator.settings.constants.PreGenesisHeight &&
(v.parentId sameElements Header.GenesisParentId)
) || history.getHeaderById(v.parentId).nonEmpty) && isApplicable(new mutable.WrappedArray.ofByte(v.id)) =>
) || validator.getHeaderById(v.parentId).nonEmpty) && isApplicable(new mutable.WrappedArray.ofByte(v.id)) =>
logger.debug(s"Find new bestHeader in cache: ${Algos.encode(v.id)}")
new mutable.WrappedArray.ofByte(v.id)
}
value.map(id => new mutable.WrappedArray.ofByte(id)).filterNot(res.contains).foreach(cache.remove)
res
case None =>
logger.debug(s"${history.getBestHeader}")
logger.debug(s"No header in cache at height ${history.getBestHeaderHeight + 1}. " +
s"Trying to find in range [${history.getBestHeaderHeight - history.settings.constants.MaxRollbackDepth}, ${history.getBestHeaderHeight}]")
(history.getBestHeaderHeight - history.settings.constants.MaxRollbackDepth to history.getBestHeaderHeight).flatMap(height =>
logger.debug(s"${validator.getBestHeader}")
logger.debug(s"No header in cache at height ${validator.getBestHeaderHeight + 1}. " +
s"Trying to find in range [${validator.getBestHeaderHeight - validator.settings.constants.MaxRollbackDepth}, ${validator.getBestHeaderHeight}]")
(validator.getBestHeaderHeight - validator.settings.constants.MaxRollbackDepth to validator.getBestHeaderHeight).flatMap(height =>
getHeadersKeysAtHeight(height)
).toList
}
}
if (bestHeadersIds.nonEmpty) bestHeadersIds
else history.headerIdsAtHeight(history.getBestBlockHeight + 1).headOption match {
case Some(id) => history.getHeaderById(id) match {
else validator.headerIdsAtHeight(validator.getBestBlockHeight + 1).headOption match {
case Some(id) => validator.getHeaderById(id) match {
case Some(header: Header) if isApplicable(new mutable.WrappedArray.ofByte(header.payloadId)) =>
List(new mutable.WrappedArray.ofByte(header.payloadId))
case _ if history.isFullChainSynced => exhaustiveSearch
case _ if validator.isFullChainSynced => exhaustiveSearch
case _ => List.empty[Key]
}
case None if isChainSynced =>
Expand Down
30 changes: 17 additions & 13 deletions src/main/scala/encry/view/NodeViewHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import encry.view.NodeViewHolder._
import encry.view.fast.sync.SnapshotHolder.SnapshotManifest.ManifestId
import encry.view.fast.sync.SnapshotHolder._
import encry.view.history.storage.HistoryStorage
import encry.view.history.{History, HistoryHeadersProcessor, HistoryPayloadsProcessor}
import encry.view.history._
import encry.view.mempool.MemoryPool.RolledBackTransactions
import encry.view.state.UtxoState
import encry.view.state.avlTree.AvlTree
Expand Down Expand Up @@ -52,6 +52,11 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
case class NodeView(history: History, state: UtxoState, wallet: EncryWallet)

var nodeView: NodeView = restoreState().getOrElse(genesisState)
val validator: HistoryModifiersValidator = new HistoryModifiersValidator(
nodeView.history.historyStorage,
encrySettings,
new NetworkTimeProvider(encrySettings.ntp)
)
context.system.actorSelection("/user/nodeViewSynchronizer") ! ChangedHistory(nodeView.history)

dataHolder ! UpdatedHistory(nodeView.history)
Expand Down Expand Up @@ -96,15 +101,14 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
FileUtils.deleteDirectory(new File(s"${encrySettings.directory}/keysTmp"))
FileUtils.deleteDirectory(new File(s"${encrySettings.directory}/walletTmp"))
logger.info(s"Updated best block in fast sync mod. Updated state height.")
val newHistory = new History with HistoryHeadersProcessor with HistoryPayloadsProcessor {
val newHistory = new History with HistoryAPI with HistoryPayloadsNormalSyncProcessorComponent
with HistoryHeadersDefaultProcessorComponent {
override val settings: EncryAppSettings = encrySettings
override var isFullChainSynced: Boolean = settings.node.offlineGeneration
override val timeProvider: NetworkTimeProvider = EncryApp.timeProvider
override val historyStorage: HistoryStorage = nodeView.history.historyStorage
override protected[view] var isFullChainSyncedVariable: Boolean = true
override protected[view] var fastSyncInProgressVariable: Boolean = false
}
newHistory.fastSyncInProgress.fastSyncVal = false
newHistory.blockDownloadProcessor.updateMinimalBlockHeightVar(nodeView.history.blockDownloadProcessor.minimalBlockHeight)
newHistory.isHeadersChainSyncedVar = true
updateNodeView(
updatedHistory = Some(newHistory),
updatedState = Some(state),
Expand All @@ -119,7 +123,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
if (isInHistory || isInCache)
logger.info(s"Received modifier of type: ${mod.modifierTypeId} ${Algos.encode(mod.id)} " +
s"can't be placed into cache cause of: inCache: ${!isInCache}.")
else ModifiersCache.put(key(mod.id), mod, nodeView.history)
else ModifiersCache.put(key(mod.id), mod, validator)
computeApplications()

case lm: LocallyGeneratedModifier =>
Expand Down Expand Up @@ -166,7 +170,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef,

//todo refactor loop
def computeApplications(): Unit = {
val mods = ModifiersCache.popCandidate(nodeView.history)
val mods = ModifiersCache.popCandidate(validator)
if (mods.nonEmpty) {
logger.info(s"mods: ${mods.map(mod => Algos.encode(mod.id))}")
mods.foreach(mod => pmodModify(mod))
Expand Down Expand Up @@ -244,7 +248,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
case header: Header =>
val requiredHeight: Int = header.height - encrySettings.constants.MaxRollbackDepth
if (requiredHeight % encrySettings.constants.SnapshotCreationHeight == 0) {
newHis.lastAvailableManifestHeight = requiredHeight
newHis.lastAvailableManifestHeightVariable = requiredHeight
logger.info(s"heightOfLastAvailablePayloadForRequest -> ${newHis.lastAvailableManifestHeight}")
}
case _ =>
Expand Down Expand Up @@ -275,7 +279,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
context.system.eventStream.publish(SemanticallySuccessfulModifier(modToApply))
if (newHis.getBestHeaderId.exists(bestHeaderId =>
newHis.getBestBlockId.exists(bId => ByteArrayWrapper(bId) == ByteArrayWrapper(bestHeaderId))
)) newHis.isFullChainSynced = true
)) newHis.isFullChainSyncedVariable = true
influxRef.foreach { ref =>
logger.info(s"send info 2. about ${newHis.getBestHeaderHeight} | ${newHis.getBestBlockHeight}")
ref ! HeightStatistics(newHis.getBestHeaderHeight, stateAfterApply.height)
Expand All @@ -297,7 +301,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
}
uf.failedMod match {
case Some(_) =>
uf.history.updateIdsForSyncInfo()
uf.history.calculateNewSyncInfo()
updateState(uf.history, uf.state, uf.alternativeProgressInfo.get, uf.suffix, isLocallyGenerated)
case None => (uf.history, uf.state, uf.suffix)
}
Expand All @@ -318,7 +322,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
case Right((historyBeforeStUpdate, progressInfo)) =>
logger.info(s"Successfully applied modifier ${pmod.encodedId} of type ${pmod.modifierTypeId} on nodeViewHolder to history.")
logger.debug(s"Time of applying to history SUCCESS is: ${System.currentTimeMillis() - startAppHistory}. modId is: ${pmod.encodedId}")
if (pmod.modifierTypeId == Header.modifierTypeId) historyBeforeStUpdate.updateIdsForSyncInfo()
if (pmod.modifierTypeId == Header.modifierTypeId) historyBeforeStUpdate.calculateNewSyncInfo()
influxRef.foreach { ref =>
ref ! EndOfApplyingModifier(pmod.id)
val isHeader: Boolean = pmod match {
Expand All @@ -327,7 +331,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef,
}
ref ! ModifierAppendedToHistory(isHeader, success = true)
}
if (historyBeforeStUpdate.fastSyncInProgress.fastSyncVal && pmod.modifierTypeId == Payload.modifierTypeId &&
if (historyBeforeStUpdate.fastSyncInProgress && pmod.modifierTypeId == Payload.modifierTypeId &&
historyBeforeStUpdate.getBestBlockHeight >= historyBeforeStUpdate.lastAvailableManifestHeight) {
logger.info(s"nodeView.history.getBestBlockHeight ${historyBeforeStUpdate.getBestBlockHeight}")
logger.info(s"nodeView.history.heightOfLastAvailablePayloadForRequest ${historyBeforeStUpdate.lastAvailableManifestHeight}")
Expand Down
16 changes: 8 additions & 8 deletions src/main/scala/encry/view/history/BlockDownloadProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,34 @@ import org.encryfoundation.common.modifiers.history.Header
import org.encryfoundation.common.utils.constants.Constants

/** Class that keeps and calculates minimal height for full blocks starting from which we need to download these full
* blocks from the network and keep them in our history. */
* blocks from the network and keep them in our history. */
case class BlockDownloadProcessor(nodeSettings: NodeSettings, constants: Constants) {

private[history] var minimalBlockHeightVar: Int = Int.MaxValue

def updateMinimalBlockHeightVar(height: Int): Unit = minimalBlockHeightVar = height

/** Start height to download full blocks.
* Int.MaxValue when headers chain is not synchronized with the network and no full blocks download needed */
* Int.MaxValue when headers chain is not synchronized with the network and no full blocks download needed */
def minimalBlockHeight: Int = minimalBlockHeightVar

/** Update minimal full block height
*
* @param header - header of new best full block
* @return minimal height to process best full block */
*
* @param header - header of new best full block
* @return minimal height to process best full block */
def updateBestBlock(header: Header): Int = {
minimalBlockHeightVar = minimalBlockHeightAfter(header)
minimalBlockHeightVar
}

private def minimalBlockHeightAfter(header: Header): Int = {
private def minimalBlockHeightAfter(header: Header): Int =
if (minimalBlockHeightVar == Int.MaxValue) {
// just synced with the headers chain - determine first full block to apply
if (nodeSettings.blocksToKeep < 0) constants.GenesisHeight // keep all blocks in history
// TODO: start with the height of UTXO snapshot applied. Start from genesis until this is implemented
// Start from config.blocksToKeep blocks back
else Math.max(constants.GenesisHeight, header.height - nodeSettings.blocksToKeep + 1)
} else if (nodeSettings.blocksToKeep >= 0) Math.max(header.height - nodeSettings.blocksToKeep + 1, minimalBlockHeightVar)
} else if (nodeSettings.blocksToKeep >= 0)
Math.max(header.height - nodeSettings.blocksToKeep + 1, minimalBlockHeightVar)
else constants.GenesisHeight
}
}
26 changes: 0 additions & 26 deletions src/main/scala/encry/view/history/FastSyncProcessor.scala

This file was deleted.

Loading