diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 6718d3b58d..7b292ad69d 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -60,9 +60,10 @@ encry { networkChunkSize = 1000 localOnly = false # List of peers we will connecting to - knownPeers = ["172.16.11.11:9001", "172.16.11.12:9001", "172.16.11.13:9001", "172.16.11.14:9001", - "172.16.11.15:9001", "172.16.11.16:9001", "172.16.11.17:9001", "172.16.11.18:9001", - "172.16.11.19:9001", "172.16.11.20:9001"] + knownPeers = [] + #["172.16.11.11:9001", "172.16.11.12:9001", "172.16.11.13:9001", "172.16.11.14:9001", + # "172.16.11.15:9001", "172.16.11.16:9001", "172.16.11.17:9001", "172.16.11.18:9001", + # "172.16.11.19:9001", "172.16.11.20:9001"] # Maximum number of connected peers maxConnections = 20 # Time, after which connection will be closed diff --git a/src/main/scala/encry/network/DeliveryManager.scala b/src/main/scala/encry/network/DeliveryManager.scala index 8287e47666..7b90c8802e 100644 --- a/src/main/scala/encry/network/DeliveryManager.scala +++ b/src/main/scala/encry/network/DeliveryManager.scala @@ -187,7 +187,7 @@ class DeliveryManager(influxRef: Option[ActorRef], s"${previousModifier.map(Algos.encode)}") requestDownload(modifierTypeId, Seq(modifiersId), history, isBlockChainSynced, isMining) - case PeersForSyncInfo(peers) => sendSync(history.syncInfo, peers) + case PeersForSyncInfo(peers) => sendSync(history.getLastSyncInfo, peers) case FullBlockChainIsSynced => context.become(basicMessageHandler(history, isBlockChainSynced = true, isMining, checkModScheduler)) diff --git a/src/main/scala/encry/view/ModifiersCache.scala b/src/main/scala/encry/view/ModifiersCache.scala index f131c512a8..e7a208c736 100644 --- a/src/main/scala/encry/view/ModifiersCache.scala +++ b/src/main/scala/encry/view/ModifiersCache.scala @@ -1,7 +1,7 @@ package encry.view import com.typesafe.scalalogging.StrictLogging -import encry.view.history.History +import encry.view.history.HistoryModifiersValidator import encry.view.history.ValidationError.{FatalValidationError, NonFatalValidationError} import org.encryfoundation.common.modifiers.PersistentModifier import org.encryfoundation.common.modifiers.history.Header @@ -11,7 +11,6 @@ import scala.annotation.tailrec import scala.collection.immutable.SortedMap import scala.collection.concurrent.TrieMap import scala.collection.mutable -import encry.EncryApp.settings object ModifiersCache extends StrictLogging { @@ -30,7 +29,7 @@ object ModifiersCache extends StrictLogging { def contains(key: Key): Boolean = cache.contains(key) - def put(key: Key, value: PersistentModifier, history: History): Unit = if (!contains(key)) { + def put(key: Key, value: PersistentModifier, validator: HistoryModifiersValidator): Unit = if (!contains(key)) { logger.debug(s"Put ${value.encodedId} of type ${value.modifierTypeId} to cache.") cache.put(key, value) value match { @@ -43,8 +42,8 @@ object ModifiersCache extends StrictLogging { case _ => } - if (size > history.settings.node.modifiersCacheSize) cache.find { case (_, modifier) => - history.testApplicable(modifier) match { + if (size > validator.settings.node.modifiersCacheSize) cache.find { case (_, modifier) => + validator.testApplicable(modifier) match { case Right(_) | Left(_: NonFatalValidationError) => false case _ => true } @@ -56,15 +55,15 @@ object ModifiersCache extends StrictLogging { cache.remove(key) } - def popCandidate(history: History): List[PersistentModifier] = synchronized { - findCandidateKey(history).flatMap(k => remove(k)) + def popCandidate(validator: HistoryModifiersValidator): List[PersistentModifier] = synchronized { + findCandidateKey(validator).flatMap(k => remove(k)) } override def toString: String = cache.keys.map(key => Algos.encode(key.toArray)).mkString(",") - def findCandidateKey(history: History): List[Key] = { + def findCandidateKey(validator: HistoryModifiersValidator): List[Key] = { - def isApplicable(key: Key): Boolean = cache.get(key).exists(modifier => history.testApplicable(modifier) match { + def isApplicable(key: Key): Boolean = cache.get(key).exists(modifier => validator.testApplicable(modifier) match { case Left(_: FatalValidationError) => remove(key); false case Right(_) => true case Left(_) => false @@ -82,7 +81,7 @@ object ModifiersCache extends StrictLogging { } def findApplicablePayloadAtHeight(height: Int): List[Key] = { - history.headerIdsAtHeight(height).view.flatMap(history.getHeaderById).collect { + validator.headerIdsAtHeight(height).view.flatMap(validator.getHeaderById).collect { case header: Header if isApplicable(new mutable.WrappedArray.ofByte(header.payloadId)) => new mutable.WrappedArray.ofByte(header.payloadId) } @@ -90,7 +89,7 @@ object ModifiersCache extends StrictLogging { def exhaustiveSearch: List[Key] = List(cache.find { case (k, v) => v match { - case _: Header if history.getBestHeaderId.exists(headerId => headerId sameElements v.parentId) => true + case _: Header if validator.getBestHeaderId.exists(headerId => headerId sameElements v.parentId) => true case _ => val isApplicableMod: Boolean = isApplicable(k) isApplicableMod @@ -98,44 +97,44 @@ object ModifiersCache extends StrictLogging { }).collect { case Some(v) => v._1 } @tailrec - def applicableBestPayloadChain(atHeight: Int = history.getBestBlockHeight, prevKeys: List[Key] = List.empty[Key]): List[Key] = { + def applicableBestPayloadChain(atHeight: Int = validator.getBestBlockHeight, prevKeys: List[Key] = List.empty[Key]): List[Key] = { val payloads = findApplicablePayloadAtHeight(atHeight) if (payloads.nonEmpty) applicableBestPayloadChain(atHeight + 1, prevKeys ++ payloads) else prevKeys } val bestHeadersIds: List[Key] = { - headersCollection.get(history.getBestHeaderHeight + 1) match { + headersCollection.get(validator.getBestHeaderHeight + 1) match { case Some(value) => - headersCollection = headersCollection - (history.getBestHeaderHeight + 1) + headersCollection = headersCollection - (validator.getBestHeaderHeight + 1) logger.debug(s"HeadersCollection size is: ${headersCollection.size}") - logger.debug(s"Drop height ${history.getBestHeaderHeight + 1} in HeadersCollection") + logger.debug(s"Drop height ${validator.getBestHeaderHeight + 1} in HeadersCollection") val res = value.map(cache.get(_)).collect { case Some(v: Header) - if ((v.parentId sameElements history.getBestHeaderId.getOrElse(Array.emptyByteArray)) || - (history.getBestHeaderHeight == history.settings.constants.PreGenesisHeight && + if ((v.parentId sameElements validator.getBestHeaderId.getOrElse(Array.emptyByteArray)) || + (validator.getBestHeaderHeight == validator.settings.constants.PreGenesisHeight && (v.parentId sameElements Header.GenesisParentId) - ) || history.getHeaderById(v.parentId).nonEmpty) && isApplicable(new mutable.WrappedArray.ofByte(v.id)) => + ) || validator.getHeaderById(v.parentId).nonEmpty) && isApplicable(new mutable.WrappedArray.ofByte(v.id)) => logger.debug(s"Find new bestHeader in cache: ${Algos.encode(v.id)}") new mutable.WrappedArray.ofByte(v.id) } value.map(id => new mutable.WrappedArray.ofByte(id)).filterNot(res.contains).foreach(cache.remove) res case None => - logger.debug(s"${history.getBestHeader}") - logger.debug(s"No header in cache at height ${history.getBestHeaderHeight + 1}. " + - s"Trying to find in range [${history.getBestHeaderHeight - history.settings.constants.MaxRollbackDepth}, ${history.getBestHeaderHeight}]") - (history.getBestHeaderHeight - history.settings.constants.MaxRollbackDepth to history.getBestHeaderHeight).flatMap(height => + logger.debug(s"${validator.getBestHeader}") + logger.debug(s"No header in cache at height ${validator.getBestHeaderHeight + 1}. " + + s"Trying to find in range [${validator.getBestHeaderHeight - validator.settings.constants.MaxRollbackDepth}, ${validator.getBestHeaderHeight}]") + (validator.getBestHeaderHeight - validator.settings.constants.MaxRollbackDepth to validator.getBestHeaderHeight).flatMap(height => getHeadersKeysAtHeight(height) ).toList } } if (bestHeadersIds.nonEmpty) bestHeadersIds - else history.headerIdsAtHeight(history.getBestBlockHeight + 1).headOption match { - case Some(id) => history.getHeaderById(id) match { + else validator.headerIdsAtHeight(validator.getBestBlockHeight + 1).headOption match { + case Some(id) => validator.getHeaderById(id) match { case Some(header: Header) if isApplicable(new mutable.WrappedArray.ofByte(header.payloadId)) => List(new mutable.WrappedArray.ofByte(header.payloadId)) - case _ if history.isFullChainSynced => exhaustiveSearch + case _ if validator.isFullChainSynced => exhaustiveSearch case _ => List.empty[Key] } case None if isChainSynced => diff --git a/src/main/scala/encry/view/NodeViewHolder.scala b/src/main/scala/encry/view/NodeViewHolder.scala index 357b431ba2..9062b35dc3 100644 --- a/src/main/scala/encry/view/NodeViewHolder.scala +++ b/src/main/scala/encry/view/NodeViewHolder.scala @@ -24,7 +24,7 @@ import encry.view.NodeViewHolder._ import encry.view.fast.sync.SnapshotHolder.SnapshotManifest.ManifestId import encry.view.fast.sync.SnapshotHolder._ import encry.view.history.storage.HistoryStorage -import encry.view.history.{History, HistoryHeadersProcessor, HistoryPayloadsProcessor} +import encry.view.history._ import encry.view.mempool.MemoryPool.RolledBackTransactions import encry.view.state.UtxoState import encry.view.state.avlTree.AvlTree @@ -52,6 +52,11 @@ class NodeViewHolder(memoryPoolRef: ActorRef, case class NodeView(history: History, state: UtxoState, wallet: EncryWallet) var nodeView: NodeView = restoreState().getOrElse(genesisState) + val validator: HistoryModifiersValidator = new HistoryModifiersValidator( + nodeView.history.historyStorage, + encrySettings, + new NetworkTimeProvider(encrySettings.ntp) + ) context.system.actorSelection("/user/nodeViewSynchronizer") ! ChangedHistory(nodeView.history) dataHolder ! UpdatedHistory(nodeView.history) @@ -96,15 +101,14 @@ class NodeViewHolder(memoryPoolRef: ActorRef, FileUtils.deleteDirectory(new File(s"${encrySettings.directory}/keysTmp")) FileUtils.deleteDirectory(new File(s"${encrySettings.directory}/walletTmp")) logger.info(s"Updated best block in fast sync mod. Updated state height.") - val newHistory = new History with HistoryHeadersProcessor with HistoryPayloadsProcessor { + val newHistory = new History with HistoryAPI with HistoryPayloadsNormalSyncProcessorComponent + with HistoryHeadersDefaultProcessorComponent { override val settings: EncryAppSettings = encrySettings - override var isFullChainSynced: Boolean = settings.node.offlineGeneration - override val timeProvider: NetworkTimeProvider = EncryApp.timeProvider override val historyStorage: HistoryStorage = nodeView.history.historyStorage + override protected[view] var isFullChainSyncedVariable: Boolean = true + override protected[view] var fastSyncInProgressVariable: Boolean = false } - newHistory.fastSyncInProgress.fastSyncVal = false newHistory.blockDownloadProcessor.updateMinimalBlockHeightVar(nodeView.history.blockDownloadProcessor.minimalBlockHeight) - newHistory.isHeadersChainSyncedVar = true updateNodeView( updatedHistory = Some(newHistory), updatedState = Some(state), @@ -119,7 +123,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, if (isInHistory || isInCache) logger.info(s"Received modifier of type: ${mod.modifierTypeId} ${Algos.encode(mod.id)} " + s"can't be placed into cache cause of: inCache: ${!isInCache}.") - else ModifiersCache.put(key(mod.id), mod, nodeView.history) + else ModifiersCache.put(key(mod.id), mod, validator) computeApplications() case lm: LocallyGeneratedModifier => @@ -166,7 +170,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, //todo refactor loop def computeApplications(): Unit = { - val mods = ModifiersCache.popCandidate(nodeView.history) + val mods = ModifiersCache.popCandidate(validator) if (mods.nonEmpty) { logger.info(s"mods: ${mods.map(mod => Algos.encode(mod.id))}") mods.foreach(mod => pmodModify(mod)) @@ -244,7 +248,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, case header: Header => val requiredHeight: Int = header.height - encrySettings.constants.MaxRollbackDepth if (requiredHeight % encrySettings.constants.SnapshotCreationHeight == 0) { - newHis.lastAvailableManifestHeight = requiredHeight + newHis.lastAvailableManifestHeightVariable = requiredHeight logger.info(s"heightOfLastAvailablePayloadForRequest -> ${newHis.lastAvailableManifestHeight}") } case _ => @@ -275,7 +279,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, context.system.eventStream.publish(SemanticallySuccessfulModifier(modToApply)) if (newHis.getBestHeaderId.exists(bestHeaderId => newHis.getBestBlockId.exists(bId => ByteArrayWrapper(bId) == ByteArrayWrapper(bestHeaderId)) - )) newHis.isFullChainSynced = true + )) newHis.isFullChainSyncedVariable = true influxRef.foreach { ref => logger.info(s"send info 2. about ${newHis.getBestHeaderHeight} | ${newHis.getBestBlockHeight}") ref ! HeightStatistics(newHis.getBestHeaderHeight, stateAfterApply.height) @@ -297,7 +301,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, } uf.failedMod match { case Some(_) => - uf.history.updateIdsForSyncInfo() + uf.history.calculateNewSyncInfo() updateState(uf.history, uf.state, uf.alternativeProgressInfo.get, uf.suffix, isLocallyGenerated) case None => (uf.history, uf.state, uf.suffix) } @@ -318,7 +322,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, case Right((historyBeforeStUpdate, progressInfo)) => logger.info(s"Successfully applied modifier ${pmod.encodedId} of type ${pmod.modifierTypeId} on nodeViewHolder to history.") logger.debug(s"Time of applying to history SUCCESS is: ${System.currentTimeMillis() - startAppHistory}. modId is: ${pmod.encodedId}") - if (pmod.modifierTypeId == Header.modifierTypeId) historyBeforeStUpdate.updateIdsForSyncInfo() + if (pmod.modifierTypeId == Header.modifierTypeId) historyBeforeStUpdate.calculateNewSyncInfo() influxRef.foreach { ref => ref ! EndOfApplyingModifier(pmod.id) val isHeader: Boolean = pmod match { @@ -327,7 +331,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, } ref ! ModifierAppendedToHistory(isHeader, success = true) } - if (historyBeforeStUpdate.fastSyncInProgress.fastSyncVal && pmod.modifierTypeId == Payload.modifierTypeId && + if (historyBeforeStUpdate.fastSyncInProgress && pmod.modifierTypeId == Payload.modifierTypeId && historyBeforeStUpdate.getBestBlockHeight >= historyBeforeStUpdate.lastAvailableManifestHeight) { logger.info(s"nodeView.history.getBestBlockHeight ${historyBeforeStUpdate.getBestBlockHeight}") logger.info(s"nodeView.history.heightOfLastAvailablePayloadForRequest ${historyBeforeStUpdate.lastAvailableManifestHeight}") diff --git a/src/main/scala/encry/view/history/BlockDownloadProcessor.scala b/src/main/scala/encry/view/history/BlockDownloadProcessor.scala index 14495dfc84..5a6a520b72 100644 --- a/src/main/scala/encry/view/history/BlockDownloadProcessor.scala +++ b/src/main/scala/encry/view/history/BlockDownloadProcessor.scala @@ -5,7 +5,7 @@ import org.encryfoundation.common.modifiers.history.Header import org.encryfoundation.common.utils.constants.Constants /** Class that keeps and calculates minimal height for full blocks starting from which we need to download these full - * blocks from the network and keep them in our history. */ + * blocks from the network and keep them in our history. */ case class BlockDownloadProcessor(nodeSettings: NodeSettings, constants: Constants) { private[history] var minimalBlockHeightVar: Int = Int.MaxValue @@ -13,26 +13,26 @@ case class BlockDownloadProcessor(nodeSettings: NodeSettings, constants: Constan def updateMinimalBlockHeightVar(height: Int): Unit = minimalBlockHeightVar = height /** Start height to download full blocks. - * Int.MaxValue when headers chain is not synchronized with the network and no full blocks download needed */ + * Int.MaxValue when headers chain is not synchronized with the network and no full blocks download needed */ def minimalBlockHeight: Int = minimalBlockHeightVar /** Update minimal full block height - * - * @param header - header of new best full block - * @return minimal height to process best full block */ + * + * @param header - header of new best full block + * @return minimal height to process best full block */ def updateBestBlock(header: Header): Int = { minimalBlockHeightVar = minimalBlockHeightAfter(header) minimalBlockHeightVar } - private def minimalBlockHeightAfter(header: Header): Int = { + private def minimalBlockHeightAfter(header: Header): Int = if (minimalBlockHeightVar == Int.MaxValue) { // just synced with the headers chain - determine first full block to apply if (nodeSettings.blocksToKeep < 0) constants.GenesisHeight // keep all blocks in history // TODO: start with the height of UTXO snapshot applied. Start from genesis until this is implemented // Start from config.blocksToKeep blocks back else Math.max(constants.GenesisHeight, header.height - nodeSettings.blocksToKeep + 1) - } else if (nodeSettings.blocksToKeep >= 0) Math.max(header.height - nodeSettings.blocksToKeep + 1, minimalBlockHeightVar) + } else if (nodeSettings.blocksToKeep >= 0) + Math.max(header.height - nodeSettings.blocksToKeep + 1, minimalBlockHeightVar) else constants.GenesisHeight - } } diff --git a/src/main/scala/encry/view/history/FastSyncProcessor.scala b/src/main/scala/encry/view/history/FastSyncProcessor.scala deleted file mode 100644 index 6e71c10856..0000000000 --- a/src/main/scala/encry/view/history/FastSyncProcessor.scala +++ /dev/null @@ -1,26 +0,0 @@ -package encry.view.history - -import cats.syntax.option.none -import encry.consensus.HistoryConsensus.ProgressInfo -import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion} -import org.encryfoundation.common.modifiers.history.Payload - -trait FastSyncProcessor extends HistoryApi { - - def processPayload(payload: Payload): ProgressInfo = { - val startTime: Long = System.currentTimeMillis() - getBlockByPayload(payload).foreach { block => - logger.info(s"processPayloadFastSync") - historyStorage.bulkInsert(payload.id, Seq(BestBlockKey -> payload.headerId), Seq(payload)) - blockDownloadProcessor.updateBestBlock(block.header) - logger.info(s"BlockDownloadProcessor updated block at height ${block.header.height} successfully") - historyStorage.insert( - StorageVersion @@ validityKey(block.payload.id).untag(StorageKey), - List(block.header.id, block.payload.id).map(id => validityKey(id) -> StorageValue @@ Array(1.toByte)) - ) - logger.info(s"Finished processing block ${block.encodedId}. " + - s"Processing time is ${(System.currentTimeMillis() - startTime) / 1000} s") - } - ProgressInfo(none, Seq.empty, Seq.empty, none) - } -} diff --git a/src/main/scala/encry/view/history/History.scala b/src/main/scala/encry/view/history/History.scala index d2a6154eb3..9038ed4f99 100644 --- a/src/main/scala/encry/view/history/History.scala +++ b/src/main/scala/encry/view/history/History.scala @@ -1,46 +1,37 @@ package encry.view.history import java.io.File +import cats.syntax.either._ import com.typesafe.scalalogging.StrictLogging import encry.consensus.HistoryConsensus.ProgressInfo -import encry.settings._ +import encry.settings.EncryAppSettings import encry.storage.VersionalStorage -import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion} +import encry.storage.VersionalStorage.{ StorageKey, StorageValue, StorageVersion } import encry.storage.iodb.versionalIODB.IODBHistoryWrapper -import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} +import encry.storage.levelDb.versionalLevelDB.{ LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion } import encry.utils.NetworkTimeProvider import encry.view.history.storage.HistoryStorage -import io.iohk.iodb.{ByteArrayWrapper, LSMStore} +import io.iohk.iodb.LSMStore import org.encryfoundation.common.modifiers.PersistentModifier -import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} +import org.encryfoundation.common.modifiers.history.{ Block, Header, Payload } import org.encryfoundation.common.utils.Algos import org.encryfoundation.common.utils.TaggedTypes.ModifierId import org.iq80.leveldb.Options -import cats.syntax.either._ -import supertagged.@@ - -/** - * History implementation. It is processing persistent modifiers generated locally or received from the network. - **/ -trait History extends HistoryModifiersValidator with AutoCloseable { - var isFullChainSynced: Boolean +trait History extends HistoryAPI with AutoCloseable { + this: HistoryPayloadsProcessorComponent with HistoryHeadersProcessorComponent => /** Appends modifier to the history if it is applicable. */ def append(modifier: PersistentModifier): Either[Throwable, (History, ProgressInfo)] = { logger.info(s"Trying to append modifier ${Algos.encode(modifier.id)} of type ${modifier.modifierTypeId} to history") Either.catchNonFatal(modifier match { - case header: Header => + case header: Header => logger.info(s"Append header ${header.encodedId} at height ${header.height} to history") - (this, processHeader(header)) - case payload: Payload => (this, processPayload(payload)) + (this, headersProcessor.processHeader(header)) + case payload: Payload => (this, payloadProcessor.processPayload(payload)) }) } - def processHeader(h: Header): ProgressInfo - - def processPayload(payload: Payload): ProgressInfo - /** @return header, that corresponds to modifier */ private def correspondingHeader(modifier: PersistentModifier): Option[Header] = modifier match { case header: Header => Some(header) @@ -49,22 +40,28 @@ trait History extends HistoryModifiersValidator with AutoCloseable { } /** - * Marks modifier and all modifiers in child chains as invalid - * - * @param modifier that is invalid against the State - * @return ProgressInfo with next modifier to try to apply - */ + * Marks modifier and all modifiers in child chains as invalid + * + * @param modifier that is invalid against the State + * @return ProgressInfo with next modifier to try to apply + */ def reportModifierIsInvalid(modifier: PersistentModifier): (History, ProgressInfo) = { logger.info(s"Modifier ${modifier.encodedId} of type ${modifier.modifierTypeId} is marked as invalid") correspondingHeader(modifier) match { case Some(invalidatedHeader) => val invalidatedHeaders: Seq[Header] = continuationHeaderChains(invalidatedHeader, _ => true).flatten.distinct val validityRow: List[(StorageKey, StorageValue)] = invalidatedHeaders - .flatMap(h => Seq(h.id, h.payloadId) - .map(id => validityKey(id) -> StorageValue @@ Array(0.toByte))).toList + .flatMap( + h => + Seq(h.id, h.payloadId) + .map(id => validityKey(id) -> StorageValue @@ Array(0.toByte)) + ) + .toList logger.info(s"Going to invalidate ${invalidatedHeader.encodedId} and ${invalidatedHeaders.map(_.encodedId)}") - val bestHeaderIsInvalidated: Boolean = getBestHeaderId.exists(id => invalidatedHeaders.exists(_.id sameElements id)) - val bestFullIsInvalidated: Boolean = getBestBlockId.exists(id => invalidatedHeaders.exists(_.id sameElements id)) + val bestHeaderIsInvalidated: Boolean = + getBestHeaderId.exists(id => invalidatedHeaders.exists(_.id sameElements id)) + val bestFullIsInvalidated: Boolean = + getBestBlockId.exists(id => invalidatedHeaders.exists(_.id sameElements id)) (bestHeaderIsInvalidated, bestFullIsInvalidated) match { case (false, false) => // Modifiers from best header and best full chain are not involved, no rollback and links change required. @@ -86,17 +83,20 @@ trait History extends HistoryModifiersValidator with AutoCloseable { this -> ProgressInfo(None, Seq.empty, Seq.empty, None) } else { val invalidatedChain: Seq[Block] = getBestBlock.toSeq - .flatMap(f => headerChainBack(getBestBlockHeight + 1, f.header, h => !invalidatedHeaders.contains(h)).headers) + .flatMap( + f => headerChainBack(getBestBlockHeight + 1, f.header, h => !invalidatedHeaders.contains(h)).headers + ) .flatMap(h => getBlockByHeader(h)) .ensuring(_.lengthCompare(1) > 0, "invalidatedChain should contain at least bestFullBlock and parent") val branchPoint: Block = invalidatedChain.head val validChain: Seq[Block] = - continuationHeaderChains(branchPoint.header, h => getBlockByHeader(h).isDefined && !invalidatedHeaders.contains(h)) + continuationHeaderChains(branchPoint.header, + h => getBlockByHeader(h).isDefined && !invalidatedHeaders.contains(h)) .maxBy(chain => scoreOf(chain.last.id).getOrElse(BigInt(0))) .flatMap(h => getBlockByHeader(h)) val changedLinks: Seq[(StorageKey, StorageValue)] = List( - BestBlockKey -> StorageValue @@ validChain.last.id.untag(ModifierId), + BestBlockKey -> StorageValue @@ validChain.last.id.untag(ModifierId), BestHeaderKey -> StorageValue @@ newBestHeader.id.untag(ModifierId) ) val toInsert: List[(StorageKey, StorageValue)] = validityRow ++ changedLinks @@ -115,21 +115,22 @@ trait History extends HistoryModifiersValidator with AutoCloseable { } /** - * Marks modifier as valid - * - * @param modifier that is valid against the State - * @return ProgressInfo with next modifier to try to apply - */ + * Marks modifier as valid + * + * @param modifier that is valid against the State + * @return ProgressInfo with next modifier to try to apply + */ def reportModifierIsValid(modifier: PersistentModifier): History = { logger.info(s"Modifier ${modifier.encodedId} of type ${modifier.modifierTypeId} is marked as valid ") modifier match { case block: Block => val nonMarkedIds: Seq[ModifierId] = Seq(block.header.id, block.payload.id) .filter(id => historyStorage.get(validityKey(id)).isEmpty) - if (nonMarkedIds.nonEmpty) historyStorage.insert( - StorageVersion @@ validityKey(nonMarkedIds.head).untag(StorageKey), - nonMarkedIds.map(id => validityKey(id) -> StorageValue @@ Array(1.toByte)).toList - ) + if (nonMarkedIds.nonEmpty) + historyStorage.insert( + StorageVersion @@ validityKey(nonMarkedIds.head).untag(StorageKey), + nonMarkedIds.map(id => validityKey(id) -> StorageValue @@ Array(1.toByte)).toList + ) this case _ => historyStorage.insert( @@ -143,10 +144,10 @@ trait History extends HistoryModifiersValidator with AutoCloseable { override def close(): Unit = historyStorage.close() def closeStorage(): Unit = historyStorage.close() + } object History extends StrictLogging { - def getHistoryIndexDir(settings: EncryAppSettings): File = { val dir: File = new File(s"${settings.directory}/history/index") dir.mkdirs() @@ -162,12 +163,12 @@ object History extends StrictLogging { def readOrGenerate(settingsEncry: EncryAppSettings, ntp: NetworkTimeProvider): History = { val historyIndexDir: File = getHistoryIndexDir(settingsEncry) //Check what kind of storage in settings: - val vldbInit = settingsEncry.storage.history match { + val versionalDB: VersionalStorage = settingsEncry.storage.history match { case VersionalStorage.IODB => logger.info("Init history with iodb storage") val historyObjectsDir: File = getHistoryObjectsDir(settingsEncry) - val indexStore: LSMStore = new LSMStore(historyIndexDir, keepVersions = 0) - val objectsStore: LSMStore = new LSMStore(historyObjectsDir, keepVersions = 0) + val indexStore: LSMStore = new LSMStore(historyIndexDir, keepVersions = 0) + val objectsStore: LSMStore = new LSMStore(historyObjectsDir, keepVersions = 0) IODBHistoryWrapper(indexStore, objectsStore) case VersionalStorage.LevelDB => logger.info("Init history with levelDB storage") @@ -175,19 +176,21 @@ object History extends StrictLogging { VLDBWrapper(VersionalLevelDBCompanion(levelDBInit, settingsEncry.levelDB)) } if (settingsEncry.snapshotSettings.enableFastSynchronization && !settingsEncry.node.offlineGeneration) - new History with HistoryHeadersProcessor with FastSyncProcessor { - override val settings: EncryAppSettings = settingsEncry - override var isFullChainSynced: Boolean = settings.node.offlineGeneration - override val historyStorage: HistoryStorage = HistoryStorage(vldbInit) - override val timeProvider: NetworkTimeProvider = new NetworkTimeProvider(settingsEncry.ntp) + new History with HistoryAPI with HistoryPayloadsFastSyncProcessorComponent + with HistoryHeadersDefaultProcessorComponent { + override protected[view] val historyStorage: HistoryStorage = HistoryStorage(versionalDB) + override protected[view] val settings: EncryAppSettings = settingsEncry + override protected[view] var isFullChainSyncedVariable: Boolean = settings.node.offlineGeneration + override protected[view] var fastSyncInProgressVariable: Boolean = + settings.snapshotSettings.enableFastSynchronization && !settings.node.offlineGeneration + } else + new History with HistoryAPI with HistoryPayloadsNormalSyncProcessorComponent + with HistoryHeadersDefaultProcessorComponent { + override protected[view] val historyStorage: HistoryStorage = HistoryStorage(versionalDB) + override protected[view] val settings: EncryAppSettings = settingsEncry + override protected[view] var isFullChainSyncedVariable: Boolean = settings.node.offlineGeneration + override protected[view] var fastSyncInProgressVariable: Boolean = + settings.snapshotSettings.enableFastSynchronization && !settings.node.offlineGeneration } - else - new History with HistoryHeadersProcessor with HistoryPayloadsProcessor { - override val settings: EncryAppSettings = settingsEncry - override var isFullChainSynced: Boolean = settings.node.offlineGeneration - override val historyStorage: HistoryStorage = HistoryStorage(vldbInit) - override val timeProvider: NetworkTimeProvider = new NetworkTimeProvider(settingsEncry.ntp) - } - } -} \ No newline at end of file +} diff --git a/src/main/scala/encry/view/history/HistoryAPI.scala b/src/main/scala/encry/view/history/HistoryAPI.scala new file mode 100644 index 0000000000..f2883fdaf8 --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryAPI.scala @@ -0,0 +1,155 @@ +package encry.view.history + +import com.typesafe.scalalogging.StrictLogging +import encry.consensus.HistoryConsensus._ +import encry.modifiers.history.HeaderChain +import org.encryfoundation.common.modifiers.history.Header +import org.encryfoundation.common.network.SyncInfo +import org.encryfoundation.common.utils.TaggedTypes.ModifierId + +import scala.annotation.tailrec +import scala.collection.immutable.HashSet + +trait HistoryAPI extends HistoryReader with StrictLogging { + + def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): List[ModifierId] + + final def lastHeaders(count: Int): HeaderChain = + getBestHeader + .map(headerChainBack(count, _, _ => false)) + .getOrElse(HeaderChain.empty) + + final def getHeaderIds(count: Int, offset: Int = 0): List[ModifierId] = + (offset until (count + offset)) + .flatMap(getBestHeaderIdAtHeight) + .toList + + final def continuationIds(info: SyncInfo, size: Int): List[ModifierId] = + if (getBestHeaderId.isEmpty) info.startingPoints.map(_._2).toList + else if (info.lastHeaderIds.isEmpty) { + val heightFrom: Int = Math.min(getBestHeaderHeight, size - 1) + (for { + startId <- headerIdsAtHeight(heightFrom).headOption + startHeader <- getHeaderById(startId) + } yield headerChainBack(size, startHeader, _ => false)) match { + case Some(value) if value.headers.exists(_.height == settings.constants.GenesisHeight) => + value.headers.map(_.id).toList + case _ => List.empty + } + } else { + val ids: Seq[ModifierId] = info.lastHeaderIds + (for { + lastHeaderInOurBestChain <- ids.view.reverse.find(isInBestChain) + theirHeight <- heightOf(lastHeaderInOurBestChain) + heightFrom = Math.min(getBestHeaderHeight, theirHeight + size) + startId <- getBestHeaderIdAtHeight(heightFrom) + startHeader <- getHeaderById(startId) + } yield + headerChainBack(size, startHeader, h => h.parentId sameElements lastHeaderInOurBestChain).headers + .map(_.id)) match { + case Some(value) => value.toList + case None => List.empty + } + } + + final def getChainToHeader( + fromHeaderOpt: Option[Header], + toHeader: Header + ): (Option[ModifierId], HeaderChain) = + fromHeaderOpt match { + case Some(h1) => + val (prevChain, newChain) = commonBlockThenSuffixes(h1, toHeader) + (prevChain.headOption.map(_.id), newChain.tail) + case None => (None, headerChainBack(toHeader.height + 1, toHeader, _ => false)) + } + + protected[history] final def commonBlockThenSuffixes( + header1: Header, + header2: Header + ): (HeaderChain, HeaderChain) = { + val heightDelta: Int = Math.max(header1.height - header2.height, 0) + + @tailrec def loop( + numberBack: Int, + otherChain: HeaderChain + ): (HeaderChain, HeaderChain) = { + val chains: (HeaderChain, HeaderChain) = commonBlockThenSuffixes(otherChain, header1, numberBack + heightDelta) + if (chains._1.head == chains._2.head) chains + else { + val biggerOther: HeaderChain = headerChainBack(numberBack, otherChain.head, _ => false) ++ otherChain.tail + if (!otherChain.head.isGenesis) loop(biggerOther.length, biggerOther) + else throw new Exception(s"Common point not found for headers $header1 and $header2") + } + } + + def commonBlockThenSuffixes( + otherChain: HeaderChain, + startHeader: Header, + limit: Int + ): (HeaderChain, HeaderChain) = { + def until(h: Header): Boolean = otherChain.exists(_.id sameElements h.id) + + val currentChain: HeaderChain = headerChainBack(limit, startHeader, until) + (currentChain, otherChain.takeAfter(currentChain.head)) + } + + loop(2, HeaderChain(Seq(header2))) + } + + protected[history] final def continuationHeaderChains( + header: Header, + filterCond: Header => Boolean + ): List[List[Header]] = { + @tailrec def loop( + currentHeight: Int, + acc: List[List[Header]] + ): List[List[Header]] = { + val nextHeightHeaders: List[Header] = headerIdsAtHeight(currentHeight + 1).view + .flatMap(getHeaderById) + .filter(filterCond) + .toList + if (nextHeightHeaders.isEmpty) acc.map(_.reverse) + else { + val updatedChains: List[List[Header]] = nextHeightHeaders.flatMap { h: Header => + acc.find((chain: List[Header]) => chain.nonEmpty && (h.parentId sameElements chain.head.id)).map(h +: _) + } + val nonUpdatedChains: List[List[Header]] = + acc.filter((chain: List[Header]) => !nextHeightHeaders.exists(_.parentId sameElements chain.head.id)) + + loop(currentHeight + 1, updatedChains ++ nonUpdatedChains) + } + } + + loop(header.height, List(List(header))) + } + + @tailrec protected[history] final def loopHeightDown( + height: Int, + p: ModifierId => Boolean + ): Option[Header] = + headerIdsAtHeight(height) + .find(p) + .flatMap(getHeaderById) match { + case h @ Some(_) => h + case None if height > settings.constants.GenesisHeight => loopHeightDown(height - 1, p) + case n @ None => n + } + + def compare(si: SyncInfo): HistoryComparisonResult = getLastSyncInfo.lastHeaderIds.lastOption match { + //Our best header is the same as other history best header + case Some(id) if si.lastHeaderIds.lastOption.exists(_ sameElements id) => Equal + //Our best header is in other history best chain, but not at the last position + case Some(id) if si.lastHeaderIds.exists(_ sameElements id) => Older + /* Other history is empty, or our history contains last id from other history */ + case Some(_) if si.lastHeaderIds.isEmpty || si.lastHeaderIds.lastOption.exists(isHeaderDefined) => Younger + case Some(_) => + //Our history contains some ids from other history + if (si.lastHeaderIds.exists(isHeaderDefined)) Fork + //Unknown comparison result + else Unknown + //Both nodes do not keep any blocks + case None if si.lastHeaderIds.isEmpty => Equal + //Our history is empty, other contain some headers + case None => Older + } +} diff --git a/src/main/scala/encry/view/history/HistoryApi.scala b/src/main/scala/encry/view/history/HistoryApi.scala deleted file mode 100644 index a367c509ad..0000000000 --- a/src/main/scala/encry/view/history/HistoryApi.scala +++ /dev/null @@ -1,361 +0,0 @@ -package encry.view.history - -import cats.syntax.option._ -import encry.consensus.HistoryConsensus._ -import encry.consensus._ -import encry.modifiers.history._ -import encry.settings.EncryAppSettings -import encry.utils.NetworkTimeProvider -import encry.view.history.ValidationError.HistoryApiError -import io.iohk.iodb.ByteArrayWrapper -import org.encryfoundation.common.modifiers.history._ -import org.encryfoundation.common.network.SyncInfo -import org.encryfoundation.common.utils.Algos -import org.encryfoundation.common.utils.TaggedTypes.{Difficulty, Height, ModifierId, ModifierTypeId} -import scala.annotation.tailrec -import scala.collection.immutable.HashSet - -trait HistoryApi extends HistoryDBApi { //scalastyle:ignore - - val timeProvider: NetworkTimeProvider - - var headersCacheIndexes: Map[Int, Seq[ModifierId]] = Map.empty[Int, Seq[ModifierId]] - - var headersCache: Map[ByteArrayWrapper, Header] = Map.empty[ByteArrayWrapper, Header] - - var blocksCacheIndexes: Map[Int, Seq[ModifierId]] = Map.empty[Int, Seq[ModifierId]] - - var blocksCache: Map[ByteArrayWrapper, Block] = Map.empty[ByteArrayWrapper, Block] - - private var lastSyncInfo: SyncInfo = SyncInfo(Seq.empty[ModifierId]) - - lazy val blockDownloadProcessor: BlockDownloadProcessor = BlockDownloadProcessor(settings.node, settings.constants) - - var isHeadersChainSyncedVar: Boolean = false - - final case class FastSyncProcessor(localSettings: EncryAppSettings) { - var fastSyncVal: Boolean = settings.snapshotSettings.enableFastSynchronization && !settings.node.offlineGeneration - } - - var lastAvailableManifestHeight: Int = 0 - - lazy val fastSyncInProgress: FastSyncProcessor = FastSyncProcessor(settings) - - def getHeaderById(id: ModifierId): Option[Header] = headersCache - .get(ByteArrayWrapper(id)) - .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header)) - .orElse(getHeaderByIdDB(id)) - - def getBlockByHeaderId(id: ModifierId): Option[Block] = - blocksCache - .get(ByteArrayWrapper(id)) - .orElse(headersCache.get(ByteArrayWrapper(id)) - .flatMap(h => getPayloadByIdDB(h.payloadId).map(p => Block(h, p)))) - .orElse(getBlockByHeaderIdDB(id)) - - def getBlockByHeader(header: Header): Option[Block] = blocksCache - .get(ByteArrayWrapper(header.id)) - .orElse(getPayloadByIdDB(header.payloadId).map(p => Block(header, p))) - - def getBestHeader: Option[Header] = getBestHeaderId.flatMap(id => - headersCache - .get(ByteArrayWrapper(id)) - .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header)) - .orElse(getHeaderByIdDB(id)) - ) - - def getBestHeaderHeight: Int = getBestHeaderId.flatMap(id => - headersCache.get(ByteArrayWrapper(id)).map(_.height) - .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header.height)) - .orElse(getHeightByHeaderId(id)) - ).getOrElse(settings.constants.PreGenesisHeight) - - def getBestBlock: Option[Block] = getBestBlockId.flatMap(id => - blocksCache.get(ByteArrayWrapper(id)) - .orElse(getBlockByHeaderIdDB(id)) - ) - - def getBestBlockHeight: Int = getBestBlockId - .flatMap(id => blocksCache.get(ByteArrayWrapper(id)).map(_.header.height).orElse(getHeightByHeaderId(id))) - .getOrElse(settings.constants.PreGenesisHeight) - - def getHeaderOfBestBlock: Option[Header] = getBestBlockId.flatMap(id => - headersCache.get(ByteArrayWrapper(id)) - .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header)) - .orElse(getHeaderByIdDB(id)) - ) - - def getBestHeaderAtHeight(h: Int): Option[Header] = getBestHeaderAtHeightDB(h) - - def getBlockByPayload(payload: Payload): Option[Block] = headersCache - .get(ByteArrayWrapper(payload.headerId)).map(h => Block(h, payload)) - .orElse(blocksCache.get(ByteArrayWrapper(payload.headerId))) - .orElse(getHeaderById(payload.headerId).flatMap(h => Some(Block(h, payload)))) - - def getHeightByHeaderId(id: ModifierId): Option[Int] = headersCache - .get(ByteArrayWrapper(id)).map(_.height) - .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header.height)) - .orElse(getHeightByHeaderIdDB(id)) - - def isBestBlockDefined: Boolean = - getBestBlockId.map(id => blocksCache.contains(ByteArrayWrapper(id))).isDefined || - getHeaderOfBestBlock.map(h => isModifierDefined(h.payloadId)).isDefined - - def isBlockDefined(header: Header): Boolean = - blocksCache.get(ByteArrayWrapper(header.id)).isDefined || isModifierDefined(header.payloadId) - - def isHeaderDefined(id: ModifierId): Boolean = - headersCache.get(ByteArrayWrapper(id)).isDefined || - blocksCache.get(ByteArrayWrapper(id)).isDefined || - isModifierDefined(id) - - def getBestHeaderIdAtHeight(h: Int): Option[ModifierId] = getBestHeaderIdAtHeightDB(h) - def headerIdsAtHeight(height: Int): Seq[ModifierId] = headerIdsAtHeightDB(height) - .getOrElse(Seq.empty[ModifierId]) - - def modifierBytesById(id: ModifierId): Option[Array[Byte]] = headersCache - .get(ByteArrayWrapper(id)).map(h => HeaderProtoSerializer.toProto(h).toByteArray) - .orElse(blocksCache.get(ByteArrayWrapper(id)).map(b => BlockProtoSerializer.toProto(b).toByteArray)) - .orElse(modifierBytesByIdDB(id)) - - def lastHeaders(count: Int): HeaderChain = getBestHeader - .map(bestHeader => headerChainBack(count, bestHeader, _ => false)) - .getOrElse(HeaderChain.empty) - - def getHeaderIds(count: Int, offset: Int = 0): Seq[ModifierId] = (offset until (count + offset)) - .flatMap(getBestHeaderIdAtHeight) - - def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): Seq[ModifierId] = { - @tailrec def continuation(height: Int, acc: Seq[ModifierId]): Seq[ModifierId] = - if (acc.lengthCompare(howMany) >= 0) acc - else if (height > lastAvailableManifestHeight && fastSyncInProgress.fastSyncVal) acc - else getBestHeaderIdAtHeight(height).flatMap(getHeaderById) match { - case Some(h) if !excluding.exists(_.sameElements(h.payloadId)) && !isBlockDefined(h) => - continuation(height + 1, acc :+ h.payloadId) - case Some(_) => - continuation(height + 1, acc) - case None => - acc - } - - (for { - bestBlockId <- getBestBlockId - headerLinkedToBestBlock <- getHeaderById(bestBlockId) - } yield headerLinkedToBestBlock) match { - case _ if !isHeadersChainSynced => - Seq.empty - case Some(header) if isInBestChain(header) => - continuation(header.height + 1, Seq.empty) - case Some(header) => - lastBestBlockHeightRelevantToBestChain(header.height) - .map(height => continuation(height + 1, Seq.empty)) - .getOrElse(continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty)) - case None => - continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty) - } - } - - def toDownload(header: Header): Option[(ModifierTypeId, ModifierId)] = - // Already synced and header is not too far back. Download required modifiers - if (header.height >= blockDownloadProcessor.minimalBlockHeight) (Payload.modifierTypeId -> header.payloadId).some - // Headers chain is synced after this header. Start downloading full blocks - else if (!isHeadersChainSynced && isNewHeader(header)) { - isHeadersChainSyncedVar = true - blockDownloadProcessor.updateBestBlock(header) - none - } else none - - def headerChainBack(limit: Int, startHeader: Header, until: Header => Boolean): HeaderChain = { - @tailrec def loop(header: Header, acc: Seq[Header]): Seq[Header] = { - if (acc.length == limit || until(header)) acc - else getHeaderById(header.parentId) match { - case Some(parent: Header) => loop(parent, acc :+ parent) - case None if acc.contains(header) => acc - case _ => acc :+ header - } - } - - if (getBestHeaderId.isEmpty || (limit == 0)) HeaderChain(Seq.empty) - else HeaderChain(loop(startHeader, Seq(startHeader)).reverse) - } - - @tailrec final def loopHeightDown(height: Int, p: ModifierId => Boolean): Option[Header] = headerIdsAtHeight(height) - .find(p) - .flatMap(getHeaderById) match { - case h@Some(_) => h - case None if height > settings.constants.GenesisHeight => loopHeightDown(height - 1, p) - case n@None => n - } - - def requiredDifficultyAfter(parent: Header): Either[HistoryApiError, Difficulty] = { - val requiredHeights: Seq[Height] = PowLinearController.getHeightsForRetargetingAt(Height @@ (parent.height + 1), - settings.constants.EpochLength, settings.constants.RetargetingEpochsQty) - for { - _ <- Either.cond(requiredHeights.lastOption.contains(parent.height), (), - HistoryApiError("Incorrect heights sequence in requiredDifficultyAfter function")) - chain = headerChainBack(requiredHeights.max - requiredHeights.min + 1, parent, (_: Header) => false) - requiredHeaders = (requiredHeights.min to requiredHeights.max) - .zip(chain.headers) - .filter(p => requiredHeights.contains(p._1)) - _ <- Either.cond(requiredHeights.length == requiredHeaders.length, (), - HistoryApiError(s"Missed headers: $requiredHeights != ${requiredHeaders.map(_._1)}")) - } yield PowLinearController.getDifficulty(requiredHeaders, settings.constants.EpochLength, - settings.constants.DesiredBlockInterval, settings.constants.InitialDifficulty) - } - - def syncInfo: SyncInfo = lastSyncInfo - - def updateIdsForSyncInfo(): Unit = - lastSyncInfo = SyncInfo(getBestHeader.map { header: Header => - ((header.height - settings.network.maxInvObjects + 1) to header.height).flatMap { height: Int => - headerIdsAtHeight(height).headOption - }.toList - }.getOrElse(List.empty)) - - def compare(si: SyncInfo): HistoryComparisonResult = lastSyncInfo.lastHeaderIds.lastOption match { - //Our best header is the same as other history best header - case Some(id) if si.lastHeaderIds.lastOption.exists(_ sameElements id) => Equal - //Our best header is in other history best chain, but not at the last position - case Some(id) if si.lastHeaderIds.exists(_ sameElements id) => Older - /* Other history is empty, or our history contains last id from other history */ - case Some(_) if si.lastHeaderIds.isEmpty || si.lastHeaderIds.lastOption.exists(isHeaderDefined) => Younger - case Some(_) => - //Our history contains some ids from other history - if (si.lastHeaderIds.exists(isHeaderDefined)) Fork - //Unknown comparison result - else Unknown - //Both nodes do not keep any blocks - case None if si.lastHeaderIds.isEmpty => Equal - //Our history is empty, other contain some headers - case None => Older - } - - def continuationIds(info: SyncInfo, size: Int): Seq[ModifierId] = - if (getBestHeaderId.isEmpty) info.startingPoints.map(_._2) - else if (info.lastHeaderIds.isEmpty) { - val heightFrom: Int = Math.min(getBestHeaderHeight, size - 1) - (for { - startId <- headerIdsAtHeight(heightFrom).headOption - startHeader <- getHeaderById(startId) - } yield headerChainBack(size, startHeader, _ => false)) match { - case Some(value) if value.headers.exists(_.height == settings.constants.GenesisHeight) => value.headers.map(_.id) - case _ => Seq.empty - } - } else { - val ids: Seq[ModifierId] = info.lastHeaderIds - (for { - lastHeaderInOurBestChain <- ids.view.reverse.find(m => isInBestChain(m)) - theirHeight <- heightOf(lastHeaderInOurBestChain) - heightFrom = Math.min(getBestHeaderHeight, theirHeight + size) - startId <- headerIdsAtHeight(heightFrom).headOption - startHeader <- getHeaderById(startId) - } yield headerChainBack(size, startHeader, h => h.parentId sameElements lastHeaderInOurBestChain) - .headers - .map(_.id)) match { - case Some(value) => value - case None => Seq.empty - } - } - - def commonBlockThenSuffixes(header1: Header, - header2: Header): (HeaderChain, HeaderChain) = { - val heightDelta: Int = Math.max(header1.height - header2.height, 0) - - @scala.annotation.tailrec - def loop(numberBack: Int, otherChain: HeaderChain): (HeaderChain, HeaderChain) = { - val chains: (HeaderChain, HeaderChain) = commonBlockThenSuffixes(otherChain, header1, numberBack + heightDelta) - if (chains._1.head == chains._2.head) chains - else { - val biggerOther: HeaderChain = headerChainBack(numberBack, otherChain.head, _ => false) ++ otherChain.tail - if (!otherChain.head.isGenesis) loop(biggerOther.length, biggerOther) - else throw new Exception(s"Common point not found for headers $header1 and $header2") - } - } - - def commonBlockThenSuffixes(otherChain: HeaderChain, - startHeader: Header, - limit: Int): (HeaderChain, HeaderChain) = { - def until(h: Header): Boolean = otherChain.exists(_.id sameElements h.id) - - val currentChain: HeaderChain = headerChainBack(limit, startHeader, until) - (currentChain, otherChain.takeAfter(currentChain.head)) - } - - loop(2, HeaderChain(Seq(header2))) - } - - def getChainToHeader(fromHeaderOpt: Option[Header], - toHeader: Header): (Option[ModifierId], HeaderChain) = fromHeaderOpt match { - case Some(h1) => - val (prevChain, newChain) = commonBlockThenSuffixes(h1, toHeader) - (prevChain.headOption.map(_.id), newChain.tail) - case None => (None, headerChainBack(toHeader.height + 1, toHeader, _ => false)) - } - - def isNewHeader(header: Header): Boolean = - timeProvider.estimatedTime - header.timestamp < - settings.constants.DesiredBlockInterval.toMillis * settings.constants.NewHeaderTimeMultiplier - - def isHeadersChainSynced: Boolean = isHeadersChainSyncedVar - - def continuationHeaderChains(header: Header, - filterCond: Header => Boolean): Seq[Seq[Header]] = { - @tailrec def loop(currentHeight: Int, acc: Seq[Seq[Header]]): Seq[Seq[Header]] = { - val nextHeightHeaders: Seq[Header] = headerIdsAtHeight(currentHeight + 1) - .view - .flatMap(getHeaderById) - .filter(filterCond) - .toList - if (nextHeightHeaders.isEmpty) acc.map(_.reverse) - else { - val updatedChains: Seq[Seq[Header]] = nextHeightHeaders.flatMap(h => - acc.find(chain => chain.nonEmpty && (h.parentId sameElements chain.head.id)).map(h +: _) - ) - val nonUpdatedChains: Seq[Seq[Header]] = - acc.filter(chain => !nextHeightHeaders.exists(_.parentId sameElements chain.head.id)) - - loop(currentHeight + 1, updatedChains ++ nonUpdatedChains) - } - } - - loop(header.height, Seq(Seq(header))) - } - - def addHeaderToCacheIfNecessary(h: Header): Unit = - if (h.height >= getBestHeaderHeight - settings.constants.MaxRollbackDepth) { - logger.debug(s"Should add ${Algos.encode(h.id)} to header cache") - val newHeadersIdsAtHeaderHeight = headersCacheIndexes.getOrElse(h.height, Seq.empty[ModifierId]) :+ h.id - headersCacheIndexes = headersCacheIndexes + (h.height -> newHeadersIdsAtHeaderHeight) - headersCache = headersCache + (ByteArrayWrapper(h.id) -> h) - // cleanup cache if necessary - if (headersCacheIndexes.size > settings.constants.MaxRollbackDepth) { - headersCacheIndexes.get(getBestHeaderHeight - settings.constants.MaxRollbackDepth).foreach { headersIds => - val wrappedIds = headersIds.map(ByteArrayWrapper.apply) - logger.debug(s"Cleanup header cache from headers: ${headersIds.map(Algos.encode).mkString(",")}") - headersCache = headersCache.filterNot { case (id, _) => wrappedIds.contains(id) } - } - headersCacheIndexes = headersCacheIndexes - (getBestHeaderHeight - settings.constants.MaxRollbackDepth) - } - logger.debug(s"headersCache size: ${headersCache.size}") - logger.debug(s"headersCacheIndexes size: ${headersCacheIndexes.size}") - } - - def addBlockToCacheIfNecessary(b: Block): Unit = - if (!blocksCache.contains(ByteArrayWrapper(b.id)) && (b.header.height >= getBestBlockHeight - settings.constants.MaxRollbackDepth)) { - logger.debug(s"Should add ${Algos.encode(b.id)} to block cache") - val newBlocksIdsAtBlockHeight = blocksCacheIndexes.getOrElse(b.header.height, Seq.empty[ModifierId]) :+ b.id - blocksCacheIndexes = blocksCacheIndexes + (b.header.height -> newBlocksIdsAtBlockHeight) - blocksCache = blocksCache + (ByteArrayWrapper(b.id) -> b) - // cleanup cache if necessary - if (blocksCacheIndexes.size > settings.constants.MaxRollbackDepth) { - blocksCacheIndexes.get(getBestBlockHeight - settings.constants.MaxRollbackDepth).foreach { blocksIds => - val wrappedIds = blocksIds.map(ByteArrayWrapper.apply) - logger.debug(s"Cleanup block cache from headers: ${blocksIds.map(Algos.encode).mkString(",")}") - blocksCache = blocksCache.filterNot { case (id, _) => wrappedIds.contains(id) } - } - blocksCacheIndexes = blocksCacheIndexes - (getBestBlockHeight - settings.constants.MaxRollbackDepth) - } - logger.debug(s"headersCache size: ${blocksCache.size}") - logger.debug(s"headersCacheIndexes size: ${blocksCacheIndexes.size}") - } -} \ No newline at end of file diff --git a/src/main/scala/encry/view/history/HistoryDBApi.scala b/src/main/scala/encry/view/history/HistoryDBApi.scala deleted file mode 100644 index 55427766a8..0000000000 --- a/src/main/scala/encry/view/history/HistoryDBApi.scala +++ /dev/null @@ -1,96 +0,0 @@ -package encry.view.history - -import com.google.common.primitives.Ints -import com.typesafe.scalalogging.StrictLogging -import encry.settings.EncryAppSettings -import encry.storage.VersionalStorage.StorageKey -import encry.view.history.storage.HistoryStorage -import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} -import org.encryfoundation.common.utils.Algos -import org.encryfoundation.common.utils.TaggedTypes.{Height, ModifierId, ModifierTypeId} -import scorex.crypto.hash.Digest32 - -import scala.reflect.ClassTag - -trait HistoryDBApi extends StrictLogging { - - val settings: EncryAppSettings - - val historyStorage: HistoryStorage - - lazy val BestHeaderKey: StorageKey = - StorageKey @@ Array.fill(settings.constants.DigestLength)(Header.modifierTypeId.untag(ModifierTypeId)) - lazy val BestBlockKey: StorageKey = - StorageKey @@ Array.fill(settings.constants.DigestLength)(-1: Byte) - - private def getModifierById[T: ClassTag](id: ModifierId): Option[T] = historyStorage - .modifierById(id) - .collect { case m: T => m } - - def getHeightByHeaderIdDB(id: ModifierId): Option[Int] = historyStorage - .get(headerHeightKey(id)) - .map(Ints.fromByteArray) - - def getHeaderByIdDB(id: ModifierId): Option[Header] = getModifierById[Header](id) - def getPayloadByIdDB(pId: ModifierId): Option[Payload] = getModifierById[Payload](pId) - def getBlockByHeaderDB(header: Header): Option[Block] = getModifierById[Payload](header.payloadId) - .map(payload => Block(header, payload)) - def getBlockByHeaderIdDB(id: ModifierId): Option[Block] = getHeaderByIdDB(id) - .flatMap(h => getModifierById[Payload](h.payloadId).map(p => Block(h, p))) - - def getBestHeaderId: Option[ModifierId] = historyStorage.get(BestHeaderKey).map(ModifierId @@ _) - def getBestHeaderDB: Option[Header] = getBestHeaderId.flatMap(getHeaderByIdDB) - def getBestHeaderHeightDB: Int = getBestHeaderId - .flatMap(getHeightByHeaderIdDB) - .getOrElse(settings.constants.PreGenesisHeight) - - def getBestBlockId: Option[ModifierId] = historyStorage.get(BestBlockKey).map(ModifierId @@ _) - def getBestBlockDB: Option[Block] = getBestBlockId.flatMap(getBlockByHeaderIdDB) - def getBestBlockHeightDB: Int = getBestBlockId - .flatMap(getHeightByHeaderIdDB) - .getOrElse(settings.constants.PreGenesisHeight) - - def modifierBytesByIdDB(id: ModifierId): Option[Array[Byte]] = historyStorage.modifiersBytesById(id) - - def isModifierDefined(id: ModifierId): Boolean = historyStorage.containsMod(id) - - //todo probably rewrite with indexes collection - def lastBestBlockHeightRelevantToBestChain(probablyAt: Int): Option[Int] = (for { - headerId <- getBestHeaderIdAtHeightDB(probablyAt) - header <- getHeaderByIdDB(headerId) if isModifierDefined(header.payloadId) - } yield header.height).orElse(lastBestBlockHeightRelevantToBestChain(probablyAt - 1)) - - def headerIdsAtHeightDB(height: Int): Option[Seq[ModifierId]] = historyStorage - .get(heightIdsKey(height)) - .map(_.grouped(32).map(ModifierId @@ _).toSeq) - - def getBestHeaderIdAtHeightDB(h: Int): Option[ModifierId] = headerIdsAtHeightDB(h).flatMap(_.headOption) - - def getBestHeaderAtHeightDB(h: Int): Option[Header] = getBestHeaderIdAtHeightDB(h).flatMap(getHeaderByIdDB) - - def isInBestChain(h: Header): Boolean = getBestHeaderIdAtHeightDB(h.height) - .exists(_.sameElements(h.id)) - - def isInBestChain(id: ModifierId): Boolean = heightOf(id) - .flatMap(getBestHeaderIdAtHeightDB) - .exists(_.sameElements(id)) - - def getBestHeadersChainScore: BigInt = getBestHeaderId.flatMap(scoreOf).getOrElse(BigInt(0)) //todo ?.getOrElse(BigInt(0))? - - def scoreOf(id: ModifierId): Option[BigInt] = historyStorage - .get(headerScoreKey(id)) - .map(d => BigInt(d)) - - def heightOf(id: ModifierId): Option[Height] = historyStorage - .get(headerHeightKey(id)) - .map(d => Height @@ Ints.fromByteArray(d)) - - def heightIdsKey(height: Int): StorageKey = - StorageKey @@ Algos.hash(Ints.toByteArray(height)).untag(Digest32) - def headerScoreKey(id: ModifierId): StorageKey = - StorageKey @@ Algos.hash("score".getBytes(Algos.charset) ++ id).untag(Digest32) - def headerHeightKey(id: ModifierId): StorageKey = - StorageKey @@ Algos.hash("height".getBytes(Algos.charset) ++ id).untag(Digest32) - def validityKey(id: Array[Byte]): StorageKey = - StorageKey @@ Algos.hash("validity".getBytes(Algos.charset) ++ id).untag(Digest32) -} \ No newline at end of file diff --git a/src/main/scala/encry/view/history/HistoryHeadersDefaultProcessorComponent.scala b/src/main/scala/encry/view/history/HistoryHeadersDefaultProcessorComponent.scala new file mode 100644 index 0000000000..a5f6a206c7 --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryHeadersDefaultProcessorComponent.scala @@ -0,0 +1,100 @@ +package encry.view.history + +import cats.syntax.option._ +import com.google.common.primitives.Ints +import encry.EncryApp.forceStopApplication +import encry.consensus.ConsensusSchemeReaders +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.storage.VersionalStorage.{ StorageKey, StorageValue } +import org.encryfoundation.common.modifiers.history.{ Header, Payload } +import org.encryfoundation.common.utils.TaggedTypes.{ Difficulty, ModifierId, ModifierTypeId } + +trait HistoryHeadersDefaultProcessorComponent extends HistoryHeadersProcessorComponent { + this: HistoryAPI => + + val headersProcessor: HeadersProcessor = new DefaultHeadersProcessor + + class DefaultHeadersProcessor extends HeadersProcessor { + + override def processHeader(h: Header): ProgressInfo = getHeaderInfoUpdate(h) match { + case dataToUpdate: Seq[_] if dataToUpdate.nonEmpty => + historyStorage.bulkInsert(h.id, dataToUpdate, Seq(h)) + getBestHeaderId match { + case Some(bestHeaderId) => + ProgressInfo(none, Seq.empty, if (!bestHeaderId.sameElements(h.id)) Seq.empty else Seq(h), toDownload(h)) + case _ => + forceStopApplication(errorMessage = "Should always have best header after header application") + } + case _ => ProgressInfo(none, Seq.empty, Seq.empty, none) + } + + private def getHeaderInfoUpdate(header: Header): Seq[(StorageKey, StorageValue)] = { + addHeaderToCacheIfNecessary(header) + if (header.isGenesis) { + logger.info(s"Initialize header chain with genesis header ${header.encodedId}") + Seq( + BestHeaderKey -> StorageValue @@ header.id, + heightIdsKey(settings.constants.GenesisHeight) -> StorageValue @@ header.id, + headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(settings.constants.GenesisHeight), + headerScoreKey(header.id) -> StorageValue @@ header.difficulty.toByteArray + ) + } else + scoreOf(header.parentId).map { parentScore => + logger.info(s"getHeaderInfoUpdate for header $header") + val score: Difficulty = + Difficulty @@ (parentScore + ConsensusSchemeReaders.consensusScheme.realDifficulty(header)) + val bestHeaderHeight: Int = getBestHeaderHeight + val bestHeadersChainScore: BigInt = getBestHeadersChainScore + val bestRow: Seq[(StorageKey, StorageValue)] = + if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) + Seq(BestHeaderKey -> StorageValue @@ header.id.untag(ModifierId)) + else Seq.empty + val scoreRow: (StorageKey, StorageValue) = headerScoreKey(header.id) -> StorageValue @@ score.toByteArray + val heightRow: (StorageKey, StorageValue) = + headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(header.height) + val headerIdsRow: Seq[(StorageKey, StorageValue)] = + if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) + bestBlockHeaderIdsRow(header, score) + else orphanedBlockHeaderIdsRow(header, score) + Seq(scoreRow, heightRow) ++ bestRow ++ headerIdsRow + }.getOrElse(Seq.empty) + } + + private def bestBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { + logger.info(s"New best header ${h.encodedId} with score: $score at height ${h.height}") + val self: (StorageKey, StorageValue) = + heightIdsKey(h.height) -> + StorageValue @@ (Seq(h.id) ++ headerIdsAtHeight(h.height).filterNot(_ sameElements h.id)).flatten.toArray + val forkHeaders: Seq[(StorageKey, StorageValue)] = getHeaderById(h.parentId).toList.view + .flatMap(headerChainBack(h.height, _, h => isInBestChain(h)).headers) + .filterNot(isInBestChain) + .map( + header => + heightIdsKey(header.height) -> + StorageValue @@ (Seq(header.id) ++ + headerIdsAtHeight(header.height).filterNot(_ sameElements header.id)).flatten.toArray + ) + .toList + forkHeaders :+ self + } + + private def orphanedBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { + logger.info(s"New orphaned header ${h.encodedId} at height ${h.height} with score $score") + Seq(heightIdsKey(h.height) -> StorageValue @@ (headerIdsAtHeight(h.height) :+ h.id).flatten.toArray) + } + + private def toDownload(header: Header): Option[(ModifierTypeId, ModifierId)] = + // Already synced and header is not too far back. Download required modifiers + if (header.height >= blockDownloadProcessor.minimalBlockHeight) (Payload.modifierTypeId -> header.payloadId).some + // Headers chain is synced after this header. Start downloading full blocks + else if (!isHeadersChainSynced && isNewHeader(header)) { + isHeadersChainSyncedVariable = true + blockDownloadProcessor.updateBestBlock(header) + none + } else none + + private def isNewHeader(header: Header): Boolean = + timeProvider.estimatedTime - header.timestamp < + settings.constants.DesiredBlockInterval.toMillis * settings.constants.NewHeaderTimeMultiplier + } +} diff --git a/src/main/scala/encry/view/history/HistoryHeadersProcessor.scala b/src/main/scala/encry/view/history/HistoryHeadersProcessor.scala deleted file mode 100644 index b8e414cce8..0000000000 --- a/src/main/scala/encry/view/history/HistoryHeadersProcessor.scala +++ /dev/null @@ -1,80 +0,0 @@ -package encry.view.history - -import cats.syntax.option.none -import com.google.common.primitives.Ints -import encry.EncryApp.forceStopApplication -import encry.consensus.ConsensusSchemeReaders -import encry.consensus.HistoryConsensus.ProgressInfo -import encry.storage.VersionalStorage.{ StorageKey, StorageValue } -import org.encryfoundation.common.modifiers.history.Header -import org.encryfoundation.common.utils.TaggedTypes.{ Difficulty, ModifierId } - -trait HistoryHeadersProcessor extends HistoryApi { - - def processHeader(h: Header): ProgressInfo = getHeaderInfoUpdate(h) match { - case dataToUpdate: Seq[_] if dataToUpdate.nonEmpty => - historyStorage.bulkInsert(h.id, dataToUpdate, Seq(h)) - getBestHeaderId match { - case Some(bestHeaderId) => - ProgressInfo(none, Seq.empty, if (!bestHeaderId.sameElements(h.id)) Seq.empty else Seq(h), toDownload(h)) - case _ => - forceStopApplication(errorMessage = "Should always have best header after header application") - } - case _ => ProgressInfo(none, Seq.empty, Seq.empty, none) - } - - private def getHeaderInfoUpdate(header: Header): Seq[(StorageKey, StorageValue)] = { - addHeaderToCacheIfNecessary(header) - if (header.isGenesis) { - logger.info(s"Initialize header chain with genesis header ${header.encodedId}") - Seq( - BestHeaderKey -> StorageValue @@ header.id, - heightIdsKey(settings.constants.GenesisHeight) -> StorageValue @@ header.id, - headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(settings.constants.GenesisHeight), - headerScoreKey(header.id) -> StorageValue @@ header.difficulty.toByteArray - ) - } else - scoreOf(header.parentId).map { parentScore => - logger.info(s"getHeaderInfoUpdate for header $header") - val score: Difficulty = - Difficulty @@ (parentScore + ConsensusSchemeReaders.consensusScheme.realDifficulty(header)) - val bestHeaderHeight: Int = getBestHeaderHeight - val bestHeadersChainScore: BigInt = getBestHeadersChainScore - val bestRow: Seq[(StorageKey, StorageValue)] = - if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) - Seq(BestHeaderKey -> StorageValue @@ header.id.untag(ModifierId)) - else Seq.empty - val scoreRow: (StorageKey, StorageValue) = headerScoreKey(header.id) -> StorageValue @@ score.toByteArray - val heightRow: (StorageKey, StorageValue) = - headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(header.height) - val headerIdsRow: Seq[(StorageKey, StorageValue)] = - if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) - bestBlockHeaderIdsRow(header, score) - else orphanedBlockHeaderIdsRow(header, score) - Seq(scoreRow, heightRow) ++ bestRow ++ headerIdsRow - }.getOrElse(Seq.empty) - } - - private def bestBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { - logger.info(s"New best header ${h.encodedId} with score: $score at height ${h.height}") - val self: (StorageKey, StorageValue) = - heightIdsKey(h.height) -> - StorageValue @@ (Seq(h.id) ++ headerIdsAtHeight(h.height).filterNot(_ sameElements h.id)).flatten.toArray - val forkHeaders: Seq[(StorageKey, StorageValue)] = getHeaderById(h.parentId).toList.view - .flatMap(headerChainBack(h.height, _, h => isInBestChain(h)).headers) - .filterNot(isInBestChain) - .map( - header => - heightIdsKey(header.height) -> - StorageValue @@ (Seq(header.id) ++ - headerIdsAtHeight(header.height).filterNot(_ sameElements header.id)).flatten.toArray - ) - .toList - forkHeaders :+ self - } - - private def orphanedBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { - logger.info(s"New orphaned header ${h.encodedId} at height ${h.height} with score $score") - Seq(heightIdsKey(h.height) -> StorageValue @@ (headerIdsAtHeight(h.height) :+ h.id).flatten.toArray) - } -} diff --git a/src/main/scala/encry/view/history/HistoryHeadersProcessorComponent.scala b/src/main/scala/encry/view/history/HistoryHeadersProcessorComponent.scala new file mode 100644 index 0000000000..a63df870e6 --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryHeadersProcessorComponent.scala @@ -0,0 +1,14 @@ +package encry.view.history + +import encry.consensus.HistoryConsensus.ProgressInfo +import org.encryfoundation.common.modifiers.history.Header + +trait HistoryHeadersProcessorComponent extends HistoryAPI { + + val headersProcessor: HeadersProcessor + + trait HeadersProcessor { + def processHeader(h: Header): ProgressInfo + } + +} diff --git a/src/main/scala/encry/view/history/HistoryModifiersValidator.scala b/src/main/scala/encry/view/history/HistoryModifiersValidator.scala index 0741251609..9f2bcb645b 100644 --- a/src/main/scala/encry/view/history/HistoryModifiersValidator.scala +++ b/src/main/scala/encry/view/history/HistoryModifiersValidator.scala @@ -1,18 +1,32 @@ package encry.view.history import cats.syntax.either._ +import com.typesafe.scalalogging.StrictLogging import encry.consensus.EquihashPowScheme +import encry.settings.EncryAppSettings +import encry.utils.NetworkTimeProvider import encry.view.history.ValidationError.FatalValidationError._ import encry.view.history.ValidationError.NonFatalValidationError._ +import encry.view.history.storage.HistoryStorage import org.encryfoundation.common.modifiers.PersistentModifier -import org.encryfoundation.common.modifiers.history.{Header, Payload} -import org.encryfoundation.common.utils.TaggedTypes.{Difficulty, ModifierId} +import org.encryfoundation.common.modifiers.history.{ Header, Payload } +import org.encryfoundation.common.utils.TaggedTypes.{ Difficulty, ModifierId } import org.encryfoundation.common.validation.ModifierSemanticValidity -trait HistoryModifiersValidator extends HistoryApi { +class HistoryModifiersValidator( + val historyStorage: HistoryStorage, + val settings: EncryAppSettings, + timeProvider: NetworkTimeProvider +) extends StrictLogging + with HistoryReader { - lazy val powScheme: EquihashPowScheme = EquihashPowScheme(settings.constants.n, settings.constants.k, settings.constants.Version, - settings.constants.PreGenesisHeight, settings.constants.MaxTarget) + lazy val powScheme: EquihashPowScheme = EquihashPowScheme( + settings.constants.n, + settings.constants.k, + settings.constants.Version, + settings.constants.PreGenesisHeight, + settings.constants.MaxTarget + ) def testApplicable(modifier: PersistentModifier): Either[ValidationError, PersistentModifier] = (modifier match { @@ -20,77 +34,149 @@ trait HistoryModifiersValidator extends HistoryApi { case payload: Payload => validatePayload(payload) case mod => UnknownModifierFatalError(s"Modifier $mod has incorrect type.").asLeft[PersistentModifier] }) match { - case l@Left(value) => logger.info(s"Validation result for ${modifier.encodedId} failed cause $value"); l - case r@Right(_) => r + case l @ Left(value) => logger.info(s"Validation result for ${modifier.encodedId} failed cause $value"); l + case r @ Right(_) => r } private def validateHeader(h: Header): Either[ValidationError, Header] = if (h.isGenesis) genesisBlockHeaderValidator(h) - else getHeaderById(h.parentId) - .map(p => headerValidator(h, p)) - .getOrElse(HeaderNonFatalValidationError(s"Header's ${h.encodedId} parent doesn't contain in history").asLeft[Header]) + else + getHeaderById(h.parentId) + .map(p => headerValidator(h, p)) + .getOrElse( + HeaderNonFatalValidationError(s"Header's ${h.encodedId} parent doesn't contain in history").asLeft[Header] + ) - private def validatePayload(mod: Payload): Either[ValidationError, PersistentModifier] = getHeaderById(mod.headerId) - .map(header => payloadValidator(mod, header, blockDownloadProcessor.minimalBlockHeight)) - .getOrElse(PayloadNonFatalValidationError(s"Header for ${mod.encodedId} doesn't contain in history").asLeft[PersistentModifier]) + private def validatePayload(mod: Payload): Either[ValidationError, PersistentModifier] = + getHeaderById(mod.headerId) + .map(header => payloadValidator(mod, header, blockDownloadProcessor.minimalBlockHeight)) + .getOrElse( + PayloadNonFatalValidationError(s"Header for ${mod.encodedId} doesn't contain in history") + .asLeft[PersistentModifier] + ) private def realDifficulty(h: Header): Difficulty = Difficulty !@@ powScheme.realDifficulty(h) - private def isSemanticallyValid(modifierId: ModifierId): ModifierSemanticValidity = historyStorage - .get(validityKey(modifierId)) match { + private def isSemanticallyValid(modifierId: ModifierId): ModifierSemanticValidity = + historyStorage + .get(validityKey(modifierId)) match { case Some(mod) if mod.headOption.contains(1.toByte) => ModifierSemanticValidity.Valid case Some(mod) if mod.headOption.contains(0.toByte) => ModifierSemanticValidity.Invalid case None if isModifierDefined(modifierId) => ModifierSemanticValidity.Unknown case None => ModifierSemanticValidity.Absent - case mod => logger.error(s"Incorrect validity status: $mod") - ModifierSemanticValidity.Absent - } + case mod => + logger.error(s"Incorrect validity status: $mod") + ModifierSemanticValidity.Absent + } - private def genesisBlockHeaderValidator(h: Header): Either[ValidationError, Header] = for { - _ <- Either.cond(h.parentId.sameElements(Header.GenesisParentId), (), - GenesisBlockFatalValidationError(s"Genesis block with header ${h.encodedId} should has genesis parent id")) - _ <- Either.cond(getBestHeaderId.isEmpty, (), - GenesisBlockFatalValidationError(s"Genesis block with header ${h.encodedId} appended to non-empty history")) - _ <- Either.cond(h.height == settings.constants.GenesisHeight, (), - GenesisBlockFatalValidationError(s"Height of genesis block with header ${h.encodedId} is incorrect")) - } yield h + private def genesisBlockHeaderValidator(h: Header): Either[ValidationError, Header] = + for { + _ <- Either.cond( + h.parentId.sameElements(Header.GenesisParentId), + (), + GenesisBlockFatalValidationError(s"Genesis block with header ${h.encodedId} should has genesis parent id") + ) + _ <- Either.cond( + getBestHeaderId.isEmpty, + (), + GenesisBlockFatalValidationError(s"Genesis block with header ${h.encodedId} appended to non-empty history") + ) + _ <- Either.cond( + h.height == settings.constants.GenesisHeight, + (), + GenesisBlockFatalValidationError(s"Height of genesis block with header ${h.encodedId} is incorrect") + ) + } yield h - private def headerValidator(h: Header, parent: Header): Either[ValidationError, Header] = for { - _ <- Either.cond(h.timestamp > parent.timestamp, (), - HeaderFatalValidationError(s"Header ${h.encodedId} has timestamp ${h.timestamp}" + - s" less than parent's ${parent.timestamp}")) - _ <- Either.cond(h.height == parent.height + 1, (), - HeaderFatalValidationError(s"Header ${h.encodedId} has height ${h.height}" + - s" not greater by 1 than parent's ${parent.height}")) - _ <- Either.cond(!historyStorage.containsMod(h.id), (), - HeaderFatalValidationError(s"Header ${h.encodedId} is already in history")) - _ <- Either.cond(realDifficulty(h) >= h.requiredDifficulty, (), - HeaderFatalValidationError(s"Incorrect real difficulty in header ${h.encodedId}")) - _ <- Either.cond(requiredDifficultyAfter(parent).exists(_ <= h.difficulty), (), - HeaderFatalValidationError(s"Incorrect required difficulty in header ${h.encodedId}")) - _ <- Either.cond(heightOf(h.parentId).exists(h => getBestHeaderHeight - h < settings.constants.MaxRollbackDepth), (), - HeaderFatalValidationError(s"Header ${h.encodedId} has height greater than max roll back depth")) - powSchemeValidationResult = powScheme.verify(h) - _ <- Either.cond(powSchemeValidationResult.isRight, (), - HeaderFatalValidationError(s"Wrong proof-of-work solution in header ${h.encodedId}" + - s" caused: $powSchemeValidationResult")) - _ <- Either.cond(isSemanticallyValid(h.parentId) != ModifierSemanticValidity.Invalid, (), - HeaderFatalValidationError(s"Header ${h.encodedId} is semantically invalid")) - _ <- Either.cond(h.timestamp - timeProvider.estimatedTime <= settings.constants.MaxTimeDrift, (), - HeaderNonFatalValidationError(s"Header ${h.encodedId} with timestamp ${h.timestamp}" + - s" is too far in future from now ${timeProvider.estimatedTime}")) - } yield h + private def headerValidator(h: Header, parent: Header): Either[ValidationError, Header] = + for { + _ <- Either.cond( + h.timestamp > parent.timestamp, + (), + HeaderFatalValidationError( + s"Header ${h.encodedId} has timestamp ${h.timestamp}" + + s" less than parent's ${parent.timestamp}" + ) + ) + _ <- Either.cond( + h.height == parent.height + 1, + (), + HeaderFatalValidationError( + s"Header ${h.encodedId} has height ${h.height}" + + s" not greater by 1 than parent's ${parent.height}" + ) + ) + _ <- Either.cond( + !historyStorage.containsMod(h.id), + (), + HeaderFatalValidationError(s"Header ${h.encodedId} is already in history") + ) + _ <- Either.cond( + realDifficulty(h) >= h.requiredDifficulty, + (), + HeaderFatalValidationError(s"Incorrect real difficulty in header ${h.encodedId}") + ) + _ <- Either.cond( + requiredDifficultyAfter(parent).exists(_ <= h.difficulty), + (), + HeaderFatalValidationError(s"Incorrect required difficulty in header ${h.encodedId}") + ) + _ <- Either.cond( + heightOf(h.parentId).exists(h => getBestHeaderHeight - h < settings.constants.MaxRollbackDepth), + (), + HeaderFatalValidationError(s"Header ${h.encodedId} has height greater than max roll back depth") + ) + powSchemeValidationResult = powScheme.verify(h) + _ <- Either.cond( + powSchemeValidationResult.isRight, + (), + HeaderFatalValidationError( + s"Wrong proof-of-work solution in header ${h.encodedId}" + + s" caused: $powSchemeValidationResult" + ) + ) + _ <- Either.cond( + isSemanticallyValid(h.parentId) != ModifierSemanticValidity.Invalid, + (), + HeaderFatalValidationError(s"Header ${h.encodedId} is semantically invalid") + ) + _ <- Either.cond( + h.timestamp - timeProvider.estimatedTime <= settings.constants.MaxTimeDrift, + (), + HeaderNonFatalValidationError( + s"Header ${h.encodedId} with timestamp ${h.timestamp}" + + s" is too far in future from now ${timeProvider.estimatedTime}" + ) + ) + } yield h private def payloadValidator(m: PersistentModifier, header: Header, - minimalHeight: Int): Either[ValidationError, PersistentModifier] = for { - _ <- Either.cond(!historyStorage.containsMod(m.id), (), - PayloadFatalValidationError(s"Modifier ${m.encodedId} is already in history")) - _ <- Either.cond(header.isRelated(m), (), - PayloadFatalValidationError(s"Modifier ${m.encodedId} does not corresponds to header ${header.encodedId}")) - _ <- Either.cond(isSemanticallyValid(header.id) != ModifierSemanticValidity.Invalid, (), - PayloadFatalValidationError(s"Header ${header.encodedId} for modifier ${m.encodedId} is semantically invalid")) - _ <- Either.cond(header.height >= minimalHeight, (), - PayloadNonFatalValidationError(s"Too old modifier ${m.encodedId}: ${header.height} < $minimalHeight")) - } yield m -} \ No newline at end of file + minimalHeight: Int): Either[ValidationError, PersistentModifier] = + for { + _ <- Either.cond(!historyStorage.containsMod(m.id), + (), + PayloadFatalValidationError(s"Modifier ${m.encodedId} is already in history")) + _ <- Either.cond( + header.isRelated(m), + (), + PayloadFatalValidationError(s"Modifier ${m.encodedId} does not corresponds to header ${header.encodedId}") + ) + _ <- Either.cond( + isSemanticallyValid(header.id) != ModifierSemanticValidity.Invalid, + (), + PayloadFatalValidationError( + s"Header ${header.encodedId} for modifier ${m.encodedId} is semantically invalid" + ) + ) + _ <- Either.cond( + header.height >= minimalHeight, + (), + PayloadNonFatalValidationError(s"Too old modifier ${m.encodedId}: ${header.height} < $minimalHeight") + ) + } yield m + + override protected[view] var isFullChainSyncedVariable: Boolean = false + + override protected[view] var fastSyncInProgressVariable: Boolean = settings.node.offlineGeneration +} diff --git a/src/main/scala/encry/view/history/HistoryPayloadsFastSyncProcessorComponent.scala b/src/main/scala/encry/view/history/HistoryPayloadsFastSyncProcessorComponent.scala new file mode 100644 index 0000000000..371f4b7b85 --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryPayloadsFastSyncProcessorComponent.scala @@ -0,0 +1,76 @@ +package encry.view.history + +import cats.syntax.option._ +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.storage.VersionalStorage.{ StorageKey, StorageValue, StorageVersion } +import org.encryfoundation.common.modifiers.history.Payload +import org.encryfoundation.common.utils.TaggedTypes.ModifierId +import scala.annotation.tailrec +import scala.collection.immutable.HashSet + +trait HistoryPayloadsFastSyncProcessorComponent extends HistoryPayloadsProcessorComponent { + this: HistoryAPI => + + override val payloadProcessor: PayloadProcessor = new FastSyncProcessor + + override def payloadsIdsToDownload( + howMany: Int, + excluding: HashSet[ModifierId] + ): List[ModifierId] = { + @tailrec def continuation( + height: Int, + acc: List[ModifierId] + ): List[ModifierId] = + if (acc.lengthCompare(howMany) >= 0) acc + else if (height > lastAvailableManifestHeight && fastSyncInProgress) acc + else + getBestHeaderIdAtHeight(height).flatMap(getHeaderById) match { + case Some(h) if !excluding.exists(_.sameElements(h.payloadId)) && !isBlockDefined(h) => + continuation(height + 1, acc :+ h.payloadId) + case Some(_) => + continuation(height + 1, acc) + case None => + acc + } + + (for { + bestBlockId <- getBestBlockId + headerLinkedToBestBlock <- getHeaderById(bestBlockId) + } yield headerLinkedToBestBlock) match { + case _ if !isHeadersChainSynced => + List.empty + case Some(header) if isInBestChain(header) => + continuation(header.height + 1, List.empty) + case Some(header) => + lastBestBlockHeightRelevantToBestChain(header.height) + .map(height => continuation(height + 1, List.empty)) + .getOrElse(continuation(blockDownloadProcessor.minimalBlockHeightVar, List.empty)) + case None => + continuation(blockDownloadProcessor.minimalBlockHeightVar, List.empty) + } + } + + class FastSyncProcessor extends PayloadProcessor { + + override def processPayload(payload: Payload): ProgressInfo = { + logger.info(s"Start processing payload ${payload.encodedId} in fast sync node mod.") + val startTime: Long = System.currentTimeMillis() + getBlockByPayload(payload).foreach { block => + logger.info(s"Block exists.") + historyStorage.bulkInsert(payload.id, Seq(BestBlockKey -> payload.headerId), Seq(payload)) + blockDownloadProcessor.updateBestBlock(block.header) + logger.info(s"BlockDownloadProcessor was updated by block at height ${block.header.height} successfully.") + historyStorage.insert( + StorageVersion @@ validityKey(block.payload.id).untag(StorageKey), + List(block.header.id, block.payload.id).map(id => validityKey(id) -> StorageValue @@ Array(1.toByte)) + ) + logger.info( + s"Finished processing block ${block.encodedId} in fast sync node mod. " + + s"Processing time is ${(System.currentTimeMillis() - startTime) / 1000} s." + ) + } + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + } + +} diff --git a/src/main/scala/encry/view/history/HistoryPayloadsNormalSyncProcessorComponent.scala b/src/main/scala/encry/view/history/HistoryPayloadsNormalSyncProcessorComponent.scala new file mode 100644 index 0000000000..80d4f25a0d --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryPayloadsNormalSyncProcessorComponent.scala @@ -0,0 +1,185 @@ +package encry.view.history + +import cats.syntax.either._ +import cats.syntax.option._ +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.modifiers.history.HeaderChain +import org.encryfoundation.common.modifiers.PersistentModifier +import org.encryfoundation.common.modifiers.history.{ Block, Header, Payload } +import org.encryfoundation.common.utils.TaggedTypes.{ Height, ModifierId } +import scala.annotation.tailrec +import scala.collection.immutable.HashSet + +trait HistoryPayloadsNormalSyncProcessorComponent extends HistoryPayloadsProcessorComponent { + this: HistoryAPI => + + override val payloadProcessor: PayloadProcessor = new NormalSyncProcessor + + override def payloadsIdsToDownload( + howMany: Int, + excluding: HashSet[ModifierId] + ): List[ModifierId] = { + @tailrec def continuation( + height: Int, + acc: List[ModifierId] + ): List[ModifierId] = + if (acc.lengthCompare(howMany) >= 0) acc + else + getBestHeaderIdAtHeight(height).flatMap(getHeaderById) match { + case Some(h) if !excluding.exists(_.sameElements(h.payloadId)) && !isBlockDefined(h) => + continuation(height + 1, acc :+ h.payloadId) + case Some(_) => + continuation(height + 1, acc) + case None => + acc + } + + (for { + bestBlockId <- getBestBlockId + headerLinkedToBestBlock <- getHeaderById(bestBlockId) + } yield headerLinkedToBestBlock) match { + case _ if !isHeadersChainSynced => + List.empty + case Some(header) if isInBestChain(header) => + continuation(header.height + 1, List.empty) + case Some(header) => + lastBestBlockHeightRelevantToBestChain(header.height) + .map(height => continuation(height + 1, List.empty)) + .getOrElse(continuation(blockDownloadProcessor.minimalBlockHeightVar, List.empty)) + case None => + continuation(blockDownloadProcessor.minimalBlockHeightVar, List.empty) + } + } + + class NormalSyncProcessor extends PayloadProcessor { + + override def processPayload(payload: Payload): ProgressInfo = { + logger.info(s"Start processing payload ${payload.encodedId} in normal node mod.") + getBlockByPayload(payload).map { block => + logger.info(s"Processing block ${block.header.encodedId}!") + processBlock(block) + }.getOrElse(putToHistory(payload)) + } + + private def processBlock(blockToProcess: Block): ProgressInfo = { + logger.info( + s"Starting processing block to history ||${blockToProcess.encodedId}||${blockToProcess.header.height}||" + ) + val bestFullChain: Seq[Block] = calculateBestFullChain(blockToProcess) + addBlockToCacheIfNecessary(blockToProcess) + bestFullChain.lastOption.map(_.header) match { + case Some(header) if isValidFirstBlock(blockToProcess.header) => + processValidFirstBlock(blockToProcess, header, bestFullChain) + case Some(header) if isBestBlockDefined && isBetterChain(header.id) => + processBetterChain(blockToProcess, header, Seq.empty, settings.node.blocksToKeep) + case Some(_) => + nonBestBlock(blockToProcess) + case None => + logger.debug(s"Best full chain is empty. Returning empty progress info") + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + } + + private def processValidFirstBlock(fullBlock: Block, + newBestHeader: Header, + newBestChain: Seq[Block]): ProgressInfo = { + logger.info(s"Appending ${fullBlock.encodedId} as a valid first block with height ${fullBlock.header.height}") + updateStorage(fullBlock.payload, newBestHeader.id) + ProgressInfo(none, Seq.empty, newBestChain, none) + } + + private def processBetterChain(fullBlock: Block, + newBestHeader: Header, + newBestChain: Seq[Block], + blocksToKeep: Int): ProgressInfo = + getHeaderOfBestBlock.map { header => + val (prevChain: HeaderChain, newChain: HeaderChain) = commonBlockThenSuffixes(header, newBestHeader) + val toRemove: Seq[Block] = prevChain.tail.headers + .flatMap(getBlockByHeader) + val toApply: Seq[Block] = newChain.tail.headers + .flatMap(h => if (h == fullBlock.header) fullBlock.some else getBlockByHeader(h)) + toApply.foreach(addBlockToCacheIfNecessary) + if (toApply.lengthCompare(newChain.length - 1) != 0) nonBestBlock(fullBlock) + else { + //application of this block leads to full chain with higher score + logger.info(s"Appending ${fullBlock.encodedId}|${fullBlock.header.height} as a better chain") + val branchPoint: Option[ModifierId] = toRemove.headOption.map(_ => prevChain.head.id) + val bestHeaderHeight: Int = getBestHeaderHeight + val updateBestHeader: Boolean = + (fullBlock.header.height > bestHeaderHeight) || ( + (fullBlock.header.height == bestHeaderHeight) && + scoreOf(fullBlock.id) + .flatMap(fbScore => getBestHeaderId.flatMap(id => scoreOf(id).map(_ < fbScore))) + .getOrElse(false) + ) + val updatedHeadersAtHeightIds = + newChain.headers.map(header => updatedBestHeaderAtHeightRaw(header.id, Height @@ header.height)).toList + updateStorage(fullBlock.payload, newBestHeader.id, updateBestHeader, updatedHeadersAtHeightIds) + if (blocksToKeep >= 0) { + val lastKept: Int = blockDownloadProcessor.updateBestBlock(fullBlock.header) + val bestHeight: Int = toApply.lastOption.map(_.header.height).getOrElse(0) + val diff: Int = bestHeight - header.height + clipBlockDataAt(((lastKept - diff) until lastKept).filter(_ >= 0)) + } + ProgressInfo(branchPoint, toRemove, toApply, none) + } + }.getOrElse(ProgressInfo(none, Seq.empty, Seq.empty, none)) + + private def nonBestBlock(fullBlock: Block): ProgressInfo = { + //Orphaned block or full chain is not initialized yet + logger.info(s"Process block to history ${fullBlock.encodedId}||${fullBlock.header.height}||") + historyStorage.bulkInsert(fullBlock.payload.id, Seq.empty, Seq(fullBlock.payload)) + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + + private def updatedBestHeaderAtHeightRaw(headerId: ModifierId, height: Height): (Array[Byte], Array[Byte]) = + heightIdsKey(height) -> + (Seq(headerId) ++ + headerIdsAtHeight(height).filterNot(_ sameElements headerId)).flatten.toArray + + private def putToHistory(payload: Payload): ProgressInfo = { + historyStorage.insertObjects(Seq(payload)) + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + + private def isBetterChain(id: ModifierId): Boolean = + (for { + bestFullBlockId <- getBestBlockId + heightOfThisHeader <- getHeightByHeaderId(id) + prevBestScore <- scoreOf(bestFullBlockId) + score <- scoreOf(id) + bestBlockHeight = getBestBlockHeight + } yield + (bestBlockHeight < heightOfThisHeader) || (bestBlockHeight == heightOfThisHeader && score > prevBestScore)) + .getOrElse(false) + + private def calculateBestFullChain(block: Block): Seq[Block] = { + val continuations: Seq[Seq[Header]] = continuationHeaderChains(block.header, h => isBlockDefined(h)).map(_.tail) + logger.debug(s"continuations: ${continuations.map(seq => s"Seq contains: ${seq.length}").mkString(",")}") + val chains: Seq[Seq[Block]] = continuations.map(_.filter(isBlockDefined).flatMap(getBlockByHeader)) + logger.debug(s"Chains: ${chains.map(chain => s"chain contain: ${chain.length}").mkString(",")}") + chains.map(c => block +: c).maxBy(c => scoreOf(c.last.id).get) + } + + private def clipBlockDataAt(heights: Seq[Int]): Either[Throwable, Unit] = Either.catchNonFatal { + val toRemove: Seq[ModifierId] = heights + .flatMap(headerIdsAtHeight) + .flatMap(getHeaderById) + .map(_.payloadId) + historyStorage.removeObjects(toRemove) + } + + private def updateStorage(newModRow: PersistentModifier, + bestFullHeaderId: ModifierId, + updateHeaderInfo: Boolean = false, + additionalIndexes: List[(Array[Byte], Array[Byte])] = List.empty): Unit = { + val indicesToInsert: Seq[(Array[Byte], Array[Byte])] = + if (updateHeaderInfo) Seq(BestBlockKey -> bestFullHeaderId, BestHeaderKey -> bestFullHeaderId) + else Seq(BestBlockKey -> bestFullHeaderId) + historyStorage.bulkInsert(newModRow.id, indicesToInsert ++ additionalIndexes, Seq(newModRow)) + } + + private def isValidFirstBlock(header: Header): Boolean = + header.height == blockDownloadProcessor.minimalBlockHeight && getBestBlockId.isEmpty + } +} diff --git a/src/main/scala/encry/view/history/HistoryPayloadsProcessor.scala b/src/main/scala/encry/view/history/HistoryPayloadsProcessor.scala deleted file mode 100644 index ecdc4be798..0000000000 --- a/src/main/scala/encry/view/history/HistoryPayloadsProcessor.scala +++ /dev/null @@ -1,138 +0,0 @@ -package encry.view.history - -import cats.syntax.either._ -import cats.syntax.option._ -import encry.consensus.HistoryConsensus.ProgressInfo -import encry.modifiers.history.HeaderChain -import org.encryfoundation.common.modifiers.PersistentModifier -import org.encryfoundation.common.modifiers.history.{ Block, Header, Payload } -import org.encryfoundation.common.utils.TaggedTypes.{ Height, ModifierId } - -trait HistoryPayloadsProcessor extends HistoryApi { - - def processPayload(payload: Payload): ProgressInfo = - getBlockByPayload(payload).flatMap { block => - logger.info(s"proc block ${block.header.encodedId}!") - processBlock(block).some - }.getOrElse(putToHistory(payload)) - - private def processBlock(blockToProcess: Block): ProgressInfo = { - logger.info( - s"Starting processing block to history ||${blockToProcess.encodedId}||${blockToProcess.header.height}||" - ) - val bestFullChain: Seq[Block] = calculateBestFullChain(blockToProcess) - addBlockToCacheIfNecessary(blockToProcess) - bestFullChain.lastOption.map(_.header) match { - case Some(header) if isValidFirstBlock(blockToProcess.header) => - processValidFirstBlock(blockToProcess, header, bestFullChain) - case Some(header) if isBestBlockDefined && isBetterChain(header.id) => - processBetterChain(blockToProcess, header, Seq.empty, settings.node.blocksToKeep) - case Some(_) => - nonBestBlock(blockToProcess) - case None => - logger.debug(s"Best full chain is empty. Returning empty progress info") - ProgressInfo(none, Seq.empty, Seq.empty, none) - } - } - - private def processValidFirstBlock(fullBlock: Block, - newBestHeader: Header, - newBestChain: Seq[Block]): ProgressInfo = { - logger.info(s"Appending ${fullBlock.encodedId} as a valid first block with height ${fullBlock.header.height}") - updateStorage(fullBlock.payload, newBestHeader.id) - ProgressInfo(none, Seq.empty, newBestChain, none) - } - - private def processBetterChain(fullBlock: Block, - newBestHeader: Header, - newBestChain: Seq[Block], - blocksToKeep: Int): ProgressInfo = - getHeaderOfBestBlock.map { header => - val (prevChain: HeaderChain, newChain: HeaderChain) = commonBlockThenSuffixes(header, newBestHeader) - val toRemove: Seq[Block] = prevChain.tail.headers - .flatMap(getBlockByHeader) - val toApply: Seq[Block] = newChain.tail.headers - .flatMap(h => if (h == fullBlock.header) fullBlock.some else getBlockByHeader(h)) - toApply.foreach(addBlockToCacheIfNecessary) - if (toApply.lengthCompare(newChain.length - 1) != 0) nonBestBlock(fullBlock) - else { - //application of this block leads to full chain with higher score - logger.info(s"Appending ${fullBlock.encodedId}|${fullBlock.header.height} as a better chain") - val branchPoint: Option[ModifierId] = toRemove.headOption.map(_ => prevChain.head.id) - val bestHeaderHeight: Int = getBestHeaderHeight - val updateBestHeader: Boolean = - (fullBlock.header.height > bestHeaderHeight) || ( - (fullBlock.header.height == bestHeaderHeight) && - scoreOf(fullBlock.id) - .flatMap(fbScore => getBestHeaderId.flatMap(id => scoreOf(id).map(_ < fbScore))) - .getOrElse(false) - ) - val updatedHeadersAtHeightIds = - newChain.headers.map(header => updatedBestHeaderAtHeightRaw(header.id, Height @@ header.height)).toList - updateStorage(fullBlock.payload, newBestHeader.id, updateBestHeader, updatedHeadersAtHeightIds) - if (blocksToKeep >= 0) { - val lastKept: Int = blockDownloadProcessor.updateBestBlock(fullBlock.header) - val bestHeight: Int = toApply.lastOption.map(_.header.height).getOrElse(0) - val diff: Int = bestHeight - header.height - clipBlockDataAt(((lastKept - diff) until lastKept).filter(_ >= 0)) - } - ProgressInfo(branchPoint, toRemove, toApply, none) - } - }.getOrElse(ProgressInfo(none, Seq.empty, Seq.empty, none)) - - private def nonBestBlock(fullBlock: Block): ProgressInfo = { - //Orphaned block or full chain is not initialized yet - logger.info(s"Process block to history ${fullBlock.encodedId}||${fullBlock.header.height}||") - historyStorage.bulkInsert(fullBlock.payload.id, Seq.empty, Seq(fullBlock.payload)) - ProgressInfo(none, Seq.empty, Seq.empty, none) - } - - private def updatedBestHeaderAtHeightRaw(headerId: ModifierId, height: Height): (Array[Byte], Array[Byte]) = - heightIdsKey(height) -> - (Seq(headerId) ++ - headerIdsAtHeight(height).filterNot(_ sameElements headerId)).flatten.toArray - - private def putToHistory(payload: Payload): ProgressInfo = { - historyStorage.insertObjects(Seq(payload)) - ProgressInfo(none, Seq.empty, Seq.empty, none) - } - - private def isBetterChain(id: ModifierId): Boolean = - (for { - bestFullBlockId <- getBestBlockId - heightOfThisHeader <- getHeightByHeaderId(id) - prevBestScore <- scoreOf(bestFullBlockId) - score <- scoreOf(id) - bestBlockHeight = getBestBlockHeight - } yield (bestBlockHeight < heightOfThisHeader) || (bestBlockHeight == heightOfThisHeader && score > prevBestScore)) - .getOrElse(false) - - private def calculateBestFullChain(block: Block): Seq[Block] = { - val continuations: Seq[Seq[Header]] = continuationHeaderChains(block.header, h => isBlockDefined(h)).map(_.tail) - logger.debug(s"continuations: ${continuations.map(seq => s"Seq contains: ${seq.length}").mkString(",")}") - val chains: Seq[Seq[Block]] = continuations.map(_.filter(isBlockDefined).flatMap(getBlockByHeader)) - logger.debug(s"Chains: ${chains.map(chain => s"chain contain: ${chain.length}").mkString(",")}") - chains.map(c => block +: c).maxBy(c => scoreOf(c.last.id).get) - } - - private def clipBlockDataAt(heights: Seq[Int]): Either[Throwable, Unit] = Either.catchNonFatal { - val toRemove: Seq[ModifierId] = heights - .flatMap(headerIdsAtHeight) - .flatMap(getHeaderById) - .map(_.payloadId) - historyStorage.removeObjects(toRemove) - } - - private def updateStorage(newModRow: PersistentModifier, - bestFullHeaderId: ModifierId, - updateHeaderInfo: Boolean = false, - additionalIndexes: List[(Array[Byte], Array[Byte])] = List.empty): Unit = { - val indicesToInsert: Seq[(Array[Byte], Array[Byte])] = - if (updateHeaderInfo) Seq(BestBlockKey -> bestFullHeaderId, BestHeaderKey -> bestFullHeaderId) - else Seq(BestBlockKey -> bestFullHeaderId) - historyStorage.bulkInsert(newModRow.id, indicesToInsert ++ additionalIndexes, Seq(newModRow)) - } - - private def isValidFirstBlock(header: Header): Boolean = - header.height == blockDownloadProcessor.minimalBlockHeight && getBestBlockId.isEmpty -} diff --git a/src/main/scala/encry/view/history/HistoryPayloadsProcessorComponent.scala b/src/main/scala/encry/view/history/HistoryPayloadsProcessorComponent.scala new file mode 100644 index 0000000000..c34808beb7 --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryPayloadsProcessorComponent.scala @@ -0,0 +1,17 @@ +package encry.view.history + +import encry.consensus.HistoryConsensus.ProgressInfo +import org.encryfoundation.common.modifiers.history.Payload +import org.encryfoundation.common.utils.TaggedTypes.ModifierId +import scala.collection.immutable.HashSet + +trait HistoryPayloadsProcessorComponent extends HistoryAPI { + + val payloadProcessor: PayloadProcessor + def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): List[ModifierId] + + trait PayloadProcessor { + def processPayload(payload: Payload): ProgressInfo + } + +} diff --git a/src/main/scala/encry/view/history/HistoryReader.scala b/src/main/scala/encry/view/history/HistoryReader.scala new file mode 100644 index 0000000000..31c038fd54 --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryReader.scala @@ -0,0 +1,291 @@ +package encry.view.history + +import com.google.common.primitives.Ints +import encry.consensus.PowLinearController +import encry.modifiers.history.HeaderChain +import encry.storage.VersionalStorage.StorageKey +import encry.view.history.ValidationError.HistoryApiError +import encry.view.history.utils.instances.ModifierIdWrapper +import encry.view.history.utils.syntax.wrapper._ +import io.iohk.iodb.ByteArrayWrapper +import org.encryfoundation.common.modifiers.history._ +import org.encryfoundation.common.network.SyncInfo +import org.encryfoundation.common.utils.Algos +import org.encryfoundation.common.utils.TaggedTypes.{ Difficulty, Height, ModifierId, ModifierTypeId } +import scorex.crypto.hash.Digest32 + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.reflect.ClassTag + +/** + * The idea of this interface is to use it instead of the history implementation in places where + * full history functionality is excessive but some read only operations is needed. + * Also it is the frame for the history implementation. + */ +trait HistoryReader extends HistoryState { + + //todo was settings.constants.DigestLength, become settings.constants.ModifierIdSize + final lazy val BestHeaderKey: StorageKey = + StorageKey @@ Array.fill(settings.constants.ModifierIdSize)(Header.modifierTypeId.untag(ModifierTypeId)) + + //todo was settings.constants.DigestLength, become settings.constants.ModifierIdSize + final lazy val BestBlockKey: StorageKey = + StorageKey @@ Array.fill(settings.constants.ModifierIdSize)(-1: Byte) + + private final def getModifierById[T: ClassTag](id: ModifierId): Option[T] = + historyStorage + .modifierById(id) + .collect { case m: T => m } + + final def getHeaderById(id: ModifierId): Option[Header] = + headersCache + .get(id.wrap) + .orElse(blocksCache.get(id.wrap).map(_.header)) + .orElse(getModifierById[Header](id)) + + final def getBlockByHeader(header: Header): Option[Block] = + blocksCache + .get(header.id.wrap) + .orElse(getModifierById[Payload](header.payloadId).map(Block(header, _))) + + final def getBlockByHeaderId(id: ModifierId): Option[Block] = + getHeaderById(id) + .flatMap(getBlockByHeader) + + final def getBlockByPayload(payload: Payload): Option[Block] = + headersCache + .get(payload.headerId.wrap) + .map(Block(_, payload)) + .orElse(blocksCache.get(payload.headerId.wrap)) + .orElse(getHeaderById(payload.headerId).map(Block(_, payload))) + + final def getBestHeaderId: Option[ModifierId] = + historyStorage + .get(BestHeaderKey) + .map(ModifierId @@ _) + + final def getBestBlockId: Option[ModifierId] = + historyStorage + .get(BestBlockKey) + .map(ModifierId @@ _) + + final def getBestHeader: Option[Header] = + getBestHeaderId.flatMap( + (id: ModifierId) => + headersCache + .get(id.wrap) + .orElse(blocksCache.get(id.wrap).map(_.header)) + .orElse(getHeaderById(id)) + ) + + final def getBestBlock: Option[Block] = + getBestBlockId.flatMap { id: ModifierId => + blocksCache + .get(id.wrap) + .orElse(getBlockByHeaderId(id)) + } + + private def getHeightByHeaderIdDB(id: ModifierId): Option[Int] = + historyStorage + .get(headerHeightKey(id)) + .map(Ints.fromByteArray) + + final def getHeightByHeaderId(id: ModifierId): Option[Int] = + headersCache + .get(id.wrap) + .map(_.height) + .orElse( + blocksCache + .get(id.wrap) + .map(_.header.height) + ) + .orElse(getHeightByHeaderIdDB(id)) + + final def getBestHeaderHeight: Int = + getBestHeaderId.flatMap { id: ModifierId => + headersCache + .get(id.wrap) + .map(_.height) + .orElse(blocksCache.get(id.wrap).map(_.header.height)) + .orElse(getHeightByHeaderId(id)) + }.getOrElse(settings.constants.PreGenesisHeight) + + final def getBestBlockHeight: Int = + getBestBlockId.flatMap { id: ModifierId => + blocksCache.get(id.wrap).map(_.header.height).orElse(getHeightByHeaderId(id)) + }.getOrElse(settings.constants.PreGenesisHeight) + + final def getHeaderOfBestBlock: Option[Header] = + getBestBlockId.flatMap { id: ModifierId => + headersCache + .get(id.wrap) + .orElse(blocksCache.get(id.wrap).map(_.header)) + .orElse(getHeaderById(id)) + } + + final def headerIdsAtHeight(height: Int): List[ModifierId] = + historyStorage + .get(heightIdsKey(height)) + .map(_.grouped(32).map(ModifierId @@ _).toList) + .getOrElse(List.empty) + + final def getBestHeaderIdAtHeight(h: Int): Option[ModifierId] = + headerIdsAtHeight(h).headOption + + final def getBestHeaderAtHeight(h: Int): Option[Header] = + getBestHeaderIdAtHeight(h).flatMap(getHeaderById) + + final def isModifierDefined(id: ModifierId): Boolean = historyStorage.containsMod(id) + + final def isBestBlockDefined: Boolean = + getBestBlockId.map((id: ModifierId) => blocksCache.contains(id.wrap)).isDefined || + getHeaderOfBestBlock.map((h: Header) => isModifierDefined(h.payloadId)).isDefined + + final def isBlockDefined(header: Header): Boolean = + blocksCache.get(header.id.wrap).isDefined || isModifierDefined(header.payloadId) + + final def isHeaderDefined(id: ModifierId): Boolean = + headersCache.get(id.wrap).isDefined || + blocksCache.get(id.wrap).isDefined || + isModifierDefined(id) + + final def modifierBytesById(id: ModifierId): Option[Array[Byte]] = + headersCache + .get(id.wrap) + .map(HeaderProtoSerializer.toProto(_).toByteArray) + .orElse(blocksCache.get(id.wrap).map(BlockProtoSerializer.toProto(_).toByteArray)) + .orElse(historyStorage.modifiersBytesById(id)) + + final def heightOf(id: ModifierId): Option[Height] = + historyStorage + .get(headerHeightKey(id)) + .map(d => Height @@ Ints.fromByteArray(d)) + + final def isInBestChain(h: Header): Boolean = + getBestHeaderIdAtHeight(h.height) + .exists(_.sameElements(h.id)) + + final def isInBestChain(id: ModifierId): Boolean = + heightOf(id) + .flatMap(getBestHeaderIdAtHeight) + .exists(_.sameElements(id)) + + final def scoreOf(id: ModifierId): Option[BigInt] = + historyStorage + .get(headerScoreKey(id)) + .map(d => BigInt(d)) + + //todo is getOrElse(BigInt(0)) correct? + final def getBestHeadersChainScore: BigInt = getBestHeaderId.flatMap(scoreOf).getOrElse(BigInt(0)) + + final def lastBestBlockHeightRelevantToBestChain(probablyAt: Int): Option[Int] = + (for { + headerId <- getBestHeaderIdAtHeight(probablyAt) + header <- getHeaderById(headerId) if isModifierDefined(header.payloadId) + } yield header.height).orElse(lastBestBlockHeightRelevantToBestChain(probablyAt - 1)) + + protected[view] def calculateNewSyncInfo(): Unit = + lastSyncInfoVariable = SyncInfo(getBestHeader.map { header: Header => + ((header.height - settings.network.maxInvObjects + 1) to header.height).flatMap { height: Int => + headerIdsAtHeight(height).headOption + }.toList + }.getOrElse(List.empty)) + + protected[history] final def headerChainBack( + limit: Int, + startHeader: Header, + until: Header => Boolean + ): HeaderChain = { + @tailrec def loop( + header: Header, + acc: List[Header] + ): List[Header] = + if (acc.length == limit || until(header)) acc + else + getHeaderById(header.parentId) match { + case Some(parent: Header) => loop(parent, acc :+ parent) + case None if acc.contains(header) => acc + case _ => acc :+ header + } + + if (getBestHeaderId.isEmpty || (limit == 0)) HeaderChain(List.empty) + else HeaderChain(loop(startHeader, List(startHeader)).reverse) + } + + final def requiredDifficultyAfter(parent: Header): Either[HistoryApiError, Difficulty] = { + val requiredHeights: IndexedSeq[Height] = PowLinearController + .getHeightsForRetargetingAt( + Height @@ (parent.height + 1), + settings.constants.EpochLength, + settings.constants.RetargetingEpochsQty + ) + .toIndexedSeq + for { + _ <- Either.cond( + requiredHeights.lastOption.contains(parent.height), + (), + HistoryApiError("Incorrect required heights sequence in requiredDifficultyAfter function.") + ) + chain: HeaderChain = headerChainBack(requiredHeights.max - requiredHeights.min + 1, parent, (_: Header) => false) + requiredHeaders: immutable.IndexedSeq[(Int, Header)] = (requiredHeights.min to requiredHeights.max) + .zip(chain.headers) + .filter(p => requiredHeights.contains(p._1)) + _ <- Either.cond( + requiredHeights.length == requiredHeaders.length, + (), + HistoryApiError(s"Missed headers: $requiredHeights != ${requiredHeaders.map(_._1)}.") + ) + } yield + PowLinearController.getDifficulty( + requiredHeaders, + settings.constants.EpochLength, + settings.constants.DesiredBlockInterval, + settings.constants.InitialDifficulty + ) + } + + final def heightIdsKey(height: Int): StorageKey = + StorageKey @@ Algos.hash(Ints.toByteArray(height)).untag(Digest32) + + final def headerScoreKey(id: ModifierId): StorageKey = + StorageKey @@ Algos.hash("score".getBytes(Algos.charset) ++ id).untag(Digest32) + + final def headerHeightKey(id: ModifierId): StorageKey = + StorageKey @@ Algos.hash("height".getBytes(Algos.charset) ++ id).untag(Digest32) + + final def validityKey(id: Array[Byte]): StorageKey = + StorageKey @@ Algos.hash("validity".getBytes(Algos.charset) ++ id).untag(Digest32) + + protected[history] final def addHeaderToCacheIfNecessary(h: Header): Unit = + if (h.height >= getBestHeaderHeight - settings.constants.MaxRollbackDepth) { + val newHeadersIdsAtHeaderHeight: List[ModifierId] = + headersCacheIndexes.getOrElse(h.height, List.empty[ModifierId]) :+ h.id + headersCacheIndexes = headersCacheIndexes + (h.height -> newHeadersIdsAtHeaderHeight) + headersCache = headersCache + (h.id.wrap -> h) + if (headersCacheIndexes.size > settings.constants.MaxRollbackDepth) { + headersCacheIndexes.get(getBestHeaderHeight - settings.constants.MaxRollbackDepth).foreach { + headersIds: List[ModifierId] => + val wrappedIds = headersIds.map(_.wrap) + headersCache = headersCache.filterNot { case (id, _) => wrappedIds.contains(id) } + } + headersCacheIndexes = headersCacheIndexes - (getBestHeaderHeight - settings.constants.MaxRollbackDepth) + } + } + + protected[history] final def addBlockToCacheIfNecessary(b: Block): Unit = + if (!blocksCache.contains(b.id.wrap) && (b.header.height >= getBestBlockHeight - settings.constants.MaxRollbackDepth)) { + val newBlocksIdsAtBlockHeight: List[ModifierId] = + blocksCacheIndexes.getOrElse(b.header.height, List.empty[ModifierId]) :+ b.id + blocksCacheIndexes = blocksCacheIndexes + (b.header.height -> newBlocksIdsAtBlockHeight) + blocksCache = blocksCache + (b.id.wrap -> b) + if (blocksCacheIndexes.size > settings.constants.MaxRollbackDepth) { + blocksCacheIndexes.get(getBestBlockHeight - settings.constants.MaxRollbackDepth).foreach { + blocksIds: List[ModifierId] => + val wrappedIds: List[ByteArrayWrapper] = blocksIds.map(_.wrap) + blocksCache = blocksCache.filterNot { case (id, _) => wrappedIds.contains(id) } + } + blocksCacheIndexes = blocksCacheIndexes - (getBestBlockHeight - settings.constants.MaxRollbackDepth) + } + } +} diff --git a/src/main/scala/encry/view/history/HistoryState.scala b/src/main/scala/encry/view/history/HistoryState.scala new file mode 100644 index 0000000000..1c35c5d7db --- /dev/null +++ b/src/main/scala/encry/view/history/HistoryState.scala @@ -0,0 +1,54 @@ +package encry.view.history + +import encry.settings.EncryAppSettings +import encry.utils.NetworkTimeProvider +import encry.view.history.storage.HistoryStorage +import io.iohk.iodb.ByteArrayWrapper +import org.encryfoundation.common.modifiers.history.{Block, Header} +import org.encryfoundation.common.network.SyncInfo +import org.encryfoundation.common.utils.TaggedTypes.ModifierId + +trait HistoryState { + + protected[view] val historyStorage: HistoryStorage + + protected[view] val settings: EncryAppSettings + + protected[view] var isHeadersChainSyncedVariable: Boolean = false + + protected[view] var isFullChainSyncedVariable: Boolean + + protected[view] var lastAvailableManifestHeightVariable: Int = 0 + + protected[view] var fastSyncInProgressVariable: Boolean + + protected[history] var lastSyncInfoVariable: SyncInfo = SyncInfo(Seq.empty[ModifierId]) + + protected[history] lazy val timeProvider: NetworkTimeProvider = new NetworkTimeProvider(settings.ntp) + + protected[view] lazy val blockDownloadProcessor: BlockDownloadProcessor = + BlockDownloadProcessor(settings.node, settings.constants) + + protected[history] final var headersCacheIndexes: Map[Int, List[ModifierId]] = + Map.empty[Int, List[ModifierId]] + + protected[history] final var blocksCacheIndexes: Map[Int, List[ModifierId]] = + Map.empty[Int, List[ModifierId]] + + protected[history] final var headersCache: Map[ByteArrayWrapper, Header] = + Map.empty[ByteArrayWrapper, Header] + + protected[history] final var blocksCache: Map[ByteArrayWrapper, Block] = + Map.empty[ByteArrayWrapper, Block] + + final def isHeadersChainSynced: Boolean = isHeadersChainSyncedVariable + + final def isFullChainSynced: Boolean = isFullChainSyncedVariable + + final def getLastSyncInfo: SyncInfo = lastSyncInfoVariable + + final def lastAvailableManifestHeight: Int = lastAvailableManifestHeightVariable + + final def fastSyncInProgress: Boolean = fastSyncInProgressVariable + +} diff --git a/src/main/scala/encry/view/history/utils/Wrapper.scala b/src/main/scala/encry/view/history/utils/Wrapper.scala new file mode 100644 index 0000000000..c095341fe5 --- /dev/null +++ b/src/main/scala/encry/view/history/utils/Wrapper.scala @@ -0,0 +1,8 @@ +package encry.view.history.utils + +import io.iohk.iodb.ByteArrayWrapper + +trait Wrapper[T] { + def wrap(t: T): ByteArrayWrapper + def unwrap(b: ByteArrayWrapper): T +} diff --git a/src/main/scala/encry/view/history/utils/instances.scala b/src/main/scala/encry/view/history/utils/instances.scala new file mode 100644 index 0000000000..3ff0382720 --- /dev/null +++ b/src/main/scala/encry/view/history/utils/instances.scala @@ -0,0 +1,13 @@ +package encry.view.history.utils + +import io.iohk.iodb.ByteArrayWrapper +import org.encryfoundation.common.utils.TaggedTypes.ModifierId + +object instances { + + implicit val ModifierIdWrapper: Wrapper[ModifierId] = new Wrapper[ModifierId] { + override def wrap(t: ModifierId): ByteArrayWrapper = ByteArrayWrapper(t) + + override def unwrap(b: ByteArrayWrapper): ModifierId = ModifierId @@ b.data + } +} diff --git a/src/main/scala/encry/view/history/utils/syntax.scala b/src/main/scala/encry/view/history/utils/syntax.scala new file mode 100644 index 0000000000..2f404d1f55 --- /dev/null +++ b/src/main/scala/encry/view/history/utils/syntax.scala @@ -0,0 +1,16 @@ +package encry.view.history.utils + +import io.iohk.iodb.ByteArrayWrapper + +object syntax { + + object wrapper { + implicit class WrapperOps[T](val t: T) extends AnyVal { + def wrap(implicit wrapper: Wrapper[T]): ByteArrayWrapper = wrapper.wrap(t) + } + implicit class UnWrapperOps(val t: ByteArrayWrapper) extends AnyVal { + def unwrap(implicit wrapper: Wrapper[ByteArrayWrapper]): ByteArrayWrapper = wrapper.unwrap(t) + } + } + +} diff --git a/src/test/scala/encry/modifiers/InstanceFactory.scala b/src/test/scala/encry/modifiers/InstanceFactory.scala index 1636d72238..d987c1ea6b 100755 --- a/src/test/scala/encry/modifiers/InstanceFactory.scala +++ b/src/test/scala/encry/modifiers/InstanceFactory.scala @@ -6,7 +6,7 @@ import encry.modifiers.state.Keys import encry.settings.{EncryAppSettings, NodeSettings} import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} import encry.utils.{EncryGenerator, FileHelper, NetworkTimeProvider, TestHelper} -import encry.view.history.{History, HistoryHeadersProcessor, HistoryPayloadsProcessor} +import encry.view.history._ import encry.view.history.storage.HistoryStorage import io.iohk.iodb.LSMStore import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} @@ -157,7 +157,7 @@ trait InstanceFactory extends Keys with EncryGenerator { prevId.getOrElse(history.getBestHeader.map(_.id).getOrElse(Header.GenesisParentId)) val requiredDifficulty: Difficulty = history.getBestHeader.map(parent => history.requiredDifficultyAfter(parent).getOrElse(Difficulty @@ BigInt(0))) - .getOrElse(history.settings.constants.InitialDifficulty) + .getOrElse(settings.constants.InitialDifficulty) val txs = (if (txsQty != 0) genValidPaymentTxs(Scarand.nextInt(txsQty)) else Seq.empty) ++ Seq(coinbaseAt(history.getBestHeaderHeight + 1)) val header = genHeader.copy( @@ -229,11 +229,12 @@ trait InstanceFactory extends Keys with EncryGenerator { val ntp: NetworkTimeProvider = new NetworkTimeProvider(settingsEncry.ntp) - new History with HistoryHeadersProcessor with HistoryPayloadsProcessor { + new History with HistoryAPI with HistoryPayloadsNormalSyncProcessorComponent + with HistoryHeadersDefaultProcessorComponent { override val settings: EncryAppSettings = settingsEncry - override var isFullChainSynced = settings.node.offlineGeneration - override val historyStorage: HistoryStorage = storage - override val timeProvider: NetworkTimeProvider = ntp + override val historyStorage: HistoryStorage = storage + override var isFullChainSyncedVariable: Boolean = false + override var fastSyncInProgressVariable: Boolean = settingsEncry.node.offlineGeneration } } } \ No newline at end of file diff --git a/src/test/scala/encry/network/DeliveryManagerTests/wert.scala b/src/test/scala/encry/network/DeliveryManagerTests/wert.scala new file mode 100644 index 0000000000..c9fb7f587b --- /dev/null +++ b/src/test/scala/encry/network/DeliveryManagerTests/wert.scala @@ -0,0 +1,96 @@ +package encry.network.DeliveryManagerTests + +import akka.actor.{ Actor, ActorRef, ActorSystem, Props } +import encry.modifiers.InstanceFactory +import encry.settings.TestNetSettings +import org.scalatest.{ BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike } + +class wert + extends WordSpecLike + with BeforeAndAfterAll + with Matchers + with InstanceFactory + with OneInstancePerTest + with TestNetSettings { + + implicit val system: ActorSystem = ActorSystem("SynchronousTestingSpec") + + trait ABC { + def update() = qwerty += 1 + def show = println("qwerty " + qwerty) + var qwerty = 1 + } + + class A extends Actor { + + val f = new ABC {} + + override def receive: Receive = { + case "ui" => + println("ui") + f.update() + case YUI(r) => + println("YUI") + r ! f + } + } + + case class YUI(ref: ActorRef) + + class B extends Actor { + + override def preStart(): Unit = println("qui") + + var m: Option[ABC] = None + + override def receive: Receive = { + case a: ABC => + println(s"ABC _> ABC") + m = Some(a) + case "get" => + println("get") + m.map(_.show) + } + + } + + "test" should { + "ghj" in { + val actorA = system.actorOf(Props(new A), "A") + val actorB = system.actorOf(Props(new B), "B") + + Thread.sleep(1000) + + actorB ! "get" + actorA ! YUI(actorB) + Thread.sleep(1000) + actorB ! "get" + actorA ! "ui" + actorB ! "get" +// actorA ! YUI(actorB) +// Thread.sleep(1000) +// actorB ! "get" + } + } + + + + trait QWE { + var k = 0 + def updateK = k += 1 + var a: Int + } + + class QWEIM extends QWE { + var a: Int = 1 + def update = a += 1 + def show = println("a " + a) + } + + "t" should { + "hgjfkdls" in { + val ag = new QWEIM + ag.updateK + } + } +} diff --git a/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala b/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala index 265dfff90f..134c1f3e87 100644 --- a/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala +++ b/src/test/scala/encry/view/history/HistoryComparisionResultTest.scala @@ -22,7 +22,7 @@ class HistoryComparisionResultTest extends WordSpecLike val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => hst.append(block.header) - hst.updateIdsForSyncInfo() + hst.calculateNewSyncInfo() hst.append(block.payload) hst.reportModifierIsValid(block) } @@ -38,7 +38,7 @@ class HistoryComparisionResultTest extends WordSpecLike val updatedHistory: History = blocks.take(50).foldLeft(history) { case (hst, block) => hst.append(block.header) - hst.updateIdsForSyncInfo() + hst.calculateNewSyncInfo() hst.append(block.payload) hst.reportModifierIsValid(block) } @@ -54,7 +54,7 @@ class HistoryComparisionResultTest extends WordSpecLike val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => hst.append(block.header) - hst.updateIdsForSyncInfo() + hst.calculateNewSyncInfo() hst.append(block.payload) hst.reportModifierIsValid(block) } @@ -71,7 +71,7 @@ class HistoryComparisionResultTest extends WordSpecLike val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => hst.append(block.header) - hst.updateIdsForSyncInfo() + hst.calculateNewSyncInfo() hst.append(block.payload) hst.reportModifierIsValid(block) } @@ -91,7 +91,7 @@ class HistoryComparisionResultTest extends WordSpecLike val updatedHistory: History = fork._1.take(30).foldLeft(history) { case (hst, block) => hst.append(block.header) - hst.updateIdsForSyncInfo() + hst.calculateNewSyncInfo() hst.append(block.payload) hst.reportModifierIsValid(block) } diff --git a/src/test/scala/encry/view/history/ModifiersValidationTest.scala b/src/test/scala/encry/view/history/ModifiersValidationTest.scala index e879864ea8..aa1b6b420c 100644 --- a/src/test/scala/encry/view/history/ModifiersValidationTest.scala +++ b/src/test/scala/encry/view/history/ModifiersValidationTest.scala @@ -2,7 +2,8 @@ package encry.view.history import encry.modifiers.InstanceFactory import encry.network.DeliveryManagerTests.DMUtils.generateBlocks -import encry.settings.{EncryAppSettings, TestNetSettings} +import encry.settings.TestNetSettings +import encry.utils.NetworkTimeProvider import org.encryfoundation.common.modifiers.history.Block import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike} @@ -15,28 +16,40 @@ class ModifiersValidationTest extends WordSpecLike "Modifiers validator" should { "validate genesis block" in { val newHistory: History = generateDummyHistory(testNetSettings) + val newValidator = new HistoryModifiersValidator( + newHistory.historyStorage, + testNetSettings, + new NetworkTimeProvider(testNetSettings.ntp) + ) val genesisBlock: Block = generateGenesisBlock(testNetSettings.constants.GenesisHeight) - newHistory.testApplicable(genesisBlock.header).isRight shouldBe true + newValidator.testApplicable(genesisBlock.header).isRight shouldBe true newHistory.append(genesisBlock.header) - val updatedHistory: History = newHistory.reportModifierIsValid(genesisBlock.header) - updatedHistory.testApplicable(genesisBlock.payload).isRight shouldBe true + val h: History = newHistory.reportModifierIsValid(genesisBlock.header) + println(h.blockDownloadProcessor.minimalBlockHeight) + println(newValidator.blockDownloadProcessor.minimalBlockHeight) + newValidator.testApplicable(genesisBlock.payload).isRight shouldBe true } "reject incorrect modifiers" in { val blocks: List[Block] = generateBlocks(2, generateDummyHistory(testNetSettings))._2 val newHistory: History = generateDummyHistory(testNetSettings) + val newValidator = new HistoryModifiersValidator( + newHistory.historyStorage, + testNetSettings, + new NetworkTimeProvider(testNetSettings.ntp) + ) blocks.take(1).foldLeft(newHistory) { case (history, block) => - history.testApplicable(block.header).isRight shouldBe true + newValidator.testApplicable(block.header).isRight shouldBe true history.append(block.header) history.reportModifierIsValid(block.header) - history.testApplicable(block.payload).isRight shouldBe true + newValidator.testApplicable(block.payload).isRight shouldBe true history.append(block.payload) history.reportModifierIsValid(block) } blocks.takeRight(1).foldLeft(newHistory) { case (history, block) => - history.testApplicable(block.header).isRight shouldBe false + newValidator.testApplicable(block.header).isRight shouldBe false history.append(block.header) history.reportModifierIsValid(block.header) - history.testApplicable(block.payload).isRight shouldBe true + newValidator.testApplicable(block.payload).isRight shouldBe true history.append(block.payload) history.reportModifierIsValid(block) }