diff --git a/src/MiningCore.Tests/Blockchain/Bitcoin/BitcoinJobTests.cs b/src/MiningCore.Tests/Blockchain/Bitcoin/BitcoinJobTests.cs index 2282c2ea..399f50a2 100644 --- a/src/MiningCore.Tests/Blockchain/Bitcoin/BitcoinJobTests.cs +++ b/src/MiningCore.Tests/Blockchain/Bitcoin/BitcoinJobTests.cs @@ -1,4 +1,5 @@ using System; +using System.Net; using MiningCore.Blockchain.Bitcoin; using MiningCore.Configuration; using MiningCore.Crypto; @@ -7,6 +8,7 @@ using MiningCore.Extensions; using MiningCore.Stratum; using MiningCore.Tests.Util; +using MiningCore.Time; using NBitcoin; using Newtonsoft.Json; using Xunit; @@ -25,7 +27,7 @@ public class BitcoinJobTests : TestBase [Fact] public void BitcoinJob_Should_Accept_Valid_Share() { - var worker = new StratumClient(); + var worker = new StratumClient(new StandardClock(), new IPEndPoint(IPAddress.Any, 3000), string.Empty); worker.SetContext(new BitcoinWorkerContext { @@ -61,9 +63,9 @@ public void BitcoinJob_Should_Accept_Valid_Share() [Fact] public void BitcoinJob_Should_Not_Accept_Invalid_Share() { - var worker = new StratumClient(); + var worker = new StratumClient(new StandardClock(), new IPEndPoint(IPAddress.Any, 3000), string.Empty); - worker.SetContext(new BitcoinWorkerContext + worker.SetContext(new BitcoinWorkerContext { Difficulty = 0.5, ExtraNonce1 = "01000058", diff --git a/src/MiningCore.Tests/Blockchain/Monero/MoneroJobTests.cs b/src/MiningCore.Tests/Blockchain/Monero/MoneroJobTests.cs index 39f2ac5e..6d440623 100644 --- a/src/MiningCore.Tests/Blockchain/Monero/MoneroJobTests.cs +++ b/src/MiningCore.Tests/Blockchain/Monero/MoneroJobTests.cs @@ -1,7 +1,9 @@ -using MiningCore.Blockchain.Monero; +using System.Net; +using MiningCore.Blockchain.Monero; using MiningCore.Configuration; using MiningCore.Extensions; using MiningCore.Stratum; +using MiningCore.Time; using Newtonsoft.Json; using Xunit; @@ -15,9 +17,9 @@ public class BitcoinJobTests : TestBase [Fact] public void MoneroJob_Should_Accept_Valid_Share() { - var worker = new StratumClient(); + var worker = new StratumClient(new StandardClock(), new IPEndPoint(IPAddress.Any, 3000), string.Empty); - worker.SetContext(new MoneroWorkerContext + worker.SetContext(new MoneroWorkerContext { Difficulty = 1000, }); @@ -39,9 +41,9 @@ public void MoneroJob_Should_Accept_Valid_Share() [Fact] public void MoneroJob_Should_Not_Accept_Invalid_Share() { - var worker = new StratumClient(); + var worker = new StratumClient(new StandardClock(), new IPEndPoint(IPAddress.Any, 3000), string.Empty); - worker.SetContext(new MoneroWorkerContext + worker.SetContext(new MoneroWorkerContext { Difficulty = 1000, }); diff --git a/src/MiningCore/Blockchain/Bitcoin/BitcoinProperties.cs b/src/MiningCore/Blockchain/Bitcoin/BitcoinProperties.cs index 40e4a661..35aa4a81 100644 --- a/src/MiningCore/Blockchain/Bitcoin/BitcoinProperties.cs +++ b/src/MiningCore/Blockchain/Bitcoin/BitcoinProperties.cs @@ -148,6 +148,7 @@ private static BitcoinCoinProperties GetDigiByteProperties(string algorithm) switch(algorithm.ToLower()) { case "sha256d": + case "sha256": return sha256Coin; case "skein": diff --git a/src/MiningCore/Blockchain/CoinMetaData.cs b/src/MiningCore/Blockchain/CoinMetaData.cs index f97494bb..e3ea96b1 100644 --- a/src/MiningCore/Blockchain/CoinMetaData.cs +++ b/src/MiningCore/Blockchain/CoinMetaData.cs @@ -28,6 +28,7 @@ public static class CoinMetaData { CoinType.XMR, new Dictionary { { string.Empty, $"https://chainradar.com/xmr/block/{BlockHeightPH}" }}}, { CoinType.ETN, new Dictionary { { string.Empty, $"https://blockexplorer.electroneum.com/block/{BlockHeightPH}" } }}, { CoinType.LTC, new Dictionary { { string.Empty, $"https://chainz.cryptoid.info/ltc/block.dws?{BlockHeightPH}.htm" } }}, + { CoinType.PPC, new Dictionary { { string.Empty, $"https://chainz.cryptoid.info/ppc/block.dws?{BlockHeightPH}.htm" } }}, { CoinType.BCH, new Dictionary { { string.Empty, $"https://www.blocktrail.com/BCC/block/{BlockHeightPH}" }}}, { CoinType.DASH, new Dictionary { { string.Empty, $"https://chainz.cryptoid.info/dash/block.dws?{BlockHeightPH}.htm" }}}, { CoinType.BTC, new Dictionary { { string.Empty, $"https://blockchain.info/block/{BlockHeightPH}" }}}, @@ -58,6 +59,7 @@ public static class CoinMetaData { CoinType.ETH, "https://etherscan.io/tx/{0}" }, { CoinType.ETC, "https://gastracker.io/tx/{0}" }, { CoinType.LTC, "https://chainz.cryptoid.info/ltc/tx.dws?{0}.htm" }, + { CoinType.PPC, "https://chainz.cryptoid.info/ppc/tx.dws?{0}.htm" }, { CoinType.BCH, "https://www.blocktrail.com/BCC/tx/{0}" }, { CoinType.DASH, "https://chainz.cryptoid.info/dash/tx.dws?{0}.htm" }, { CoinType.BTC, "https://blockchain.info/tx/{0}" }, @@ -88,6 +90,7 @@ public static class CoinMetaData { CoinType.ETH, "https://etherscan.io/address/{0}" }, { CoinType.ETC, "https://gastracker.io/addr/{0}" }, { CoinType.LTC, "https://chainz.cryptoid.info/ltc/address.dws?{0}.htm" }, + { CoinType.PPC, "https://chainz.cryptoid.info/ppc/address.dws?{0}.htm" }, { CoinType.BCH, "https://www.blocktrail.com/BCC/address/{0}" }, { CoinType.DASH, "https://chainz.cryptoid.info/dash/address.dws?{0}.htm" }, { CoinType.BTC, "https://blockchain.info/address/{0}" }, @@ -120,6 +123,7 @@ public static class CoinMetaData { CoinType.ETH, (coin, alg)=> Ethash }, { CoinType.ETC, (coin, alg)=> Ethash }, { CoinType.LTC, BitcoinProperties.GetAlgorithm }, + { CoinType.PPC, BitcoinProperties.GetAlgorithm }, { CoinType.BCH, BitcoinProperties.GetAlgorithm }, { CoinType.DASH, BitcoinProperties.GetAlgorithm }, { CoinType.BTC, BitcoinProperties.GetAlgorithm }, diff --git a/src/MiningCore/Configuration/ClusterConfig.cs b/src/MiningCore/Configuration/ClusterConfig.cs index 523dbaaf..fac144e8 100644 --- a/src/MiningCore/Configuration/ClusterConfig.cs +++ b/src/MiningCore/Configuration/ClusterConfig.cs @@ -116,6 +116,18 @@ public class PoolEndpoint public string Name { get; set; } public double Difficulty { get; set; } public VarDiffConfig VarDiff { get; set; } + + /// + /// Enable Transport layer security (TLS) + /// If set to true, you must specify values for either TlsPemFile or TlsPfxFile + /// If TlsPemFile does not include the private key, TlsKeyFile is also required + /// + public bool Tls { get; set; } + + /// + /// PKCS certificate file + /// + public string TlsPfxFile { get; set; } } public partial class VarDiffConfig diff --git a/src/MiningCore/Configuration/ClusterConfigValidation.cs b/src/MiningCore/Configuration/ClusterConfigValidation.cs index c8db2c09..0ece5314 100644 --- a/src/MiningCore/Configuration/ClusterConfigValidation.cs +++ b/src/MiningCore/Configuration/ClusterConfigValidation.cs @@ -18,8 +18,11 @@ portions of the Software. SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +using System; +using System.IO; using FluentValidation; using System.Linq; +using System.Security.Cryptography.X509Certificates; using FluentValidation.Attributes; namespace MiningCore.Configuration @@ -100,6 +103,33 @@ public PoolEndpointValidator() .GreaterThan(0) .WithMessage("Pool Endpoint: Difficulty missing or invalid"); + RuleFor(j => j.TlsPfxFile) + .NotNull() + .NotEmpty() + .When(j=> j.Tls) + .WithMessage("Pool Endpoint: Tls enabled but neither TlsPemFile nor TlsPfxFile specified"); + + RuleFor(j => j.TlsPfxFile) + .Must(j=> File.Exists(j)) + .When(j => j.Tls) + .WithMessage(j=> $"Pool Endpoint: {j.TlsPfxFile} does not exist"); + + RuleFor(j => j.TlsPfxFile) + .Must(j => + { + try + { + var tlsCert = new X509Certificate2(j); + return tlsCert.HasPrivateKey; + } + catch + { + return false; + } + }) + .When(j => j.Tls) + .WithMessage(j => $"Pool Endpoint: {j.TlsPfxFile} is not valid or does not include the private key and cannot be used"); + RuleFor(j => j.VarDiff) .SetValidator(new VarDiffConfigValidator()) .When(x => x.VarDiff != null); diff --git a/src/MiningCore/Extensions/NumberExtensions.cs b/src/MiningCore/Extensions/NumberExtensions.cs index 6e0e1183..69fd5385 100644 --- a/src/MiningCore/Extensions/NumberExtensions.cs +++ b/src/MiningCore/Extensions/NumberExtensions.cs @@ -1,20 +1,20 @@ -/* +/* Copyright 2017 Coin Foundry (coinfoundry.org) Authors: Oliver Weichhold (oliver@weichhold.com) -Permission is hereby granted, free of charge, to any person obtaining a copy of this software and -associated documentation files (the "Software"), to deal in the Software without restriction, -including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, -and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +associated documentation files (the "Software"), to deal in the Software without restriction, +including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, +and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: -The above copyright notice and this permission notice shall be included in all copies or substantial +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT -LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. -IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT +LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ diff --git a/src/MiningCore/Mining/PoolBase.cs b/src/MiningCore/Mining/PoolBase.cs index 2342b47e..7eeac7c8 100644 --- a/src/MiningCore/Mining/PoolBase.cs +++ b/src/MiningCore/Mining/PoolBase.cs @@ -435,11 +435,15 @@ public virtual async Task StartAsync() if (poolConfig.EnableInternalStratum == true) { - var ipEndpoints = poolConfig.Ports.Keys - .Select(port => PoolEndpoint2IPEndpoint(port, poolConfig.Ports[port])) + var endpoints = poolConfig.Ports.Keys + .Select(port => + { + var endpointConfig = poolConfig.Ports[port]; + return (PoolEndpoint2IPEndpoint(port, endpointConfig), endpointConfig); + }) .ToArray(); - StartListeners(poolConfig.Id, ipEndpoints); + Start(poolConfig.Id, endpoints); } if(poolConfig.ExternalStratums?.Length > 0) diff --git a/src/MiningCore/MiningCore.csproj b/src/MiningCore/MiningCore.csproj index 74b9172f..34d9a521 100644 --- a/src/MiningCore/MiningCore.csproj +++ b/src/MiningCore/MiningCore.csproj @@ -56,7 +56,6 @@ - diff --git a/src/MiningCore/Program.cs b/src/MiningCore/Program.cs index 0d2bc867..0bf4b1d2 100644 --- a/src/MiningCore/Program.cs +++ b/src/MiningCore/Program.cs @@ -24,6 +24,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Globalization; using System.IO; using System.Linq; +using System.Net; using System.Reactive; using System.Reactive.Linq; using System.Reactive.Threading.Tasks; @@ -41,6 +42,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using MiningCore.Api; using MiningCore.Api.Responses; using MiningCore.Blockchain; +using MiningCore.Blockchain.Bitcoin; using MiningCore.Blockchain.Bitcoin.DaemonResponses; using MiningCore.Blockchain.ZCash; using MiningCore.Configuration; @@ -256,6 +258,9 @@ private static void Bootstrap() var amConf = new MapperConfiguration(cfg => { cfg.AddProfile(new AutoMapperProfile()); }); builder.Register((ctx, parms) => amConf.CreateMapper()); + // Misc + ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12; + ConfigurePersistence(builder); container = builder.Build(); ConfigureLogging(); diff --git a/src/MiningCore/Properties/launchSettings.json b/src/MiningCore/Properties/launchSettings.json index 72807c19..9218cd2b 100644 --- a/src/MiningCore/Properties/launchSettings.json +++ b/src/MiningCore/Properties/launchSettings.json @@ -1,8 +1,8 @@ -{ - "profiles": { - "MiningCore": { - "commandName": "Project", - "commandLineArgs": "-c config.json" - } - } +{ + "profiles": { + "MiningCore": { + "commandName": "Project", + "commandLineArgs": "-c ..\\..\\..\\config.json" + } + } } \ No newline at end of file diff --git a/src/MiningCore/Stratum/StratumClient.cs b/src/MiningCore/Stratum/StratumClient.cs index 394374a7..666199a8 100644 --- a/src/MiningCore/Stratum/StratumClient.cs +++ b/src/MiningCore/Stratum/StratumClient.cs @@ -20,17 +20,17 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; using System.Buffers; -using System.Collections.Concurrent; +using System.Diagnostics; using System.IO; using System.Net; using System.Reactive.Disposables; -using Autofac; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using MiningCore.Buffers; using MiningCore.JsonRpc; using MiningCore.Mining; using MiningCore.Time; using MiningCore.Util; -using NetUV.Core.Handles; using Newtonsoft.Json; using Newtonsoft.Json.Serialization; using NLog; @@ -40,13 +40,20 @@ namespace MiningCore.Stratum { public class StratumClient { + public StratumClient(IMasterClock clock, IPEndPoint endpointConfig, string connectionId) + { + this.clock = clock; + PoolEndpoint = endpointConfig; + ConnectionId = connectionId; + } + private static readonly ILogger logger = LogManager.GetCurrentClassLogger(); private const int MaxInboundRequestLength = 8192; private const int MaxOutboundRequestLength = 0x4000; - private ConcurrentQueue> sendQueue; - private Async sendQueueDrainer; + private readonly IMasterClock clock; + private BufferBlock> sendQueue; private readonly PooledLineBuffer plb = new PooledLineBuffer(logger, MaxInboundRequestLength); private IDisposable subscription; private bool isAlive = true; @@ -59,47 +66,31 @@ public class StratumClient #region API-Surface - public void Init(Loop loop, Tcp tcp, IComponentContext ctx, IMasterClock clock, - IPEndPoint endpointConfig, string connectionId, - Action> onNext, Action onCompleted, Action onError) + public void Start(Stream stream, IPEndPoint remoteEndpoint, Func, Task> onNext, Action onCompleted, Action onError) { - PoolEndpoint = endpointConfig; - ConnectionId = connectionId; - RemoteEndpoint = tcp.GetPeerEndPoint(); - - // initialize send queue - sendQueue = new ConcurrentQueue>(); - sendQueueDrainer = loop.CreateAsync(DrainSendQueue); - sendQueueDrainer.UserToken = tcp; + RemoteEndpoint = remoteEndpoint; + sendQueue = new BufferBlock>(); // cleanup preparation - var sub = Disposable.Create(() => + subscription = Disposable.Create(() => { - if (tcp.IsValid) + if (isAlive) { logger.Debug(() => $"[{ConnectionId}] Last subscriber disconnected from receiver stream"); isAlive = false; - tcp.Shutdown(); + sendQueue.Complete(); + stream.Close(); } }); - // ensure subscription is disposed on loop thread - var disposer = loop.CreateAsync((handle) => - { - sub.Dispose(); - - handle.Dispose(); - }); - - subscription = Disposable.Create(() => { disposer.Send(); }); - // go - Receive(tcp, clock, onNext, onCompleted, onError); + DoReceive(stream, onNext, onCompleted, onError); + DoSend(stream, onError); } - public string ConnectionId { get; private set; } - public IPEndPoint PoolEndpoint { get; private set; } + public string ConnectionId { get; } + public IPEndPoint PoolEndpoint { get; } public IPEndPoint RemoteEndpoint { get; private set; } public DateTime? LastReceive { get; set; } public bool IsAlive { get; set; } = true; @@ -116,44 +107,31 @@ public T GetContextAs() where T: WorkerContextBase public void Respond(T payload, object id) { - Contract.RequiresNonNull(payload, nameof(payload)); - Contract.RequiresNonNull(id, nameof(id)); - Respond(new JsonRpcResponse(payload, id)); } public void RespondError(StratumError code, string message, object id, object result = null, object data = null) { - Contract.RequiresNonNull(message, nameof(message)); - Respond(new JsonRpcResponse(new JsonRpcException((int) code, message, null), id, result)); } public void Respond(JsonRpcResponse response) { - Contract.RequiresNonNull(response, nameof(response)); - Send(response); } public void Notify(string method, T payload) { - Contract.Requires(!string.IsNullOrEmpty(method), $"{nameof(method)} must not be empty"); - Notify(new JsonRpcRequest(method, payload, null)); } public void Notify(JsonRpcRequest request) { - Contract.RequiresNonNull(request, nameof(request)); - Send(request); } public void Send(T payload) { - Contract.RequiresNonNull(payload, nameof(payload)); - if (isAlive) { var buf = ArrayPool.Shared.Rent(MaxOutboundRequestLength); @@ -163,7 +141,6 @@ public void Send(T payload) using (var stream = new MemoryStream(buf, true)) { stream.SetLength(0); - int size; using (var writer = new StreamWriter(stream, StratumConstants.Encoding)) { @@ -172,12 +149,13 @@ public void Send(T payload) // append newline stream.WriteByte(0xa); - size = (int)stream.Position; - } + var cb = (int)stream.Position; - logger.Trace(() => $"[{ConnectionId}] Sending: {StratumConstants.Encoding.GetString(buf, 0, size)}"); + // xmit + sendQueue.Post(new PooledArraySegment(buf, 0, cb)); - SendInternal(new PooledArraySegment(buf, 0, size)); + logger.Trace(() => $"[{ConnectionId}] Sent: {StratumConstants.Encoding.GetString(buf, 0, cb)}"); + } } } @@ -191,10 +169,11 @@ public void Send(T payload) public void Disconnect() { - subscription?.Dispose(); - subscription = null; - - IsAlive = false; + if (subscription != null) + { + subscription.Dispose(); + subscription = null; + } } public void RespondError(object id, int code, string message) @@ -235,88 +214,89 @@ public JsonRpcRequest DeserializeRequest(PooledArraySegment data) #endregion // API-Surface - private void Receive(Tcp tcp, IMasterClock clock, - Action> onNext, Action onCompleted, Action onError) + private async void DoReceive(Stream stream, Func, Task> onNext, Action onCompleted, Action onError) { - tcp.OnRead((handle, buffer) => + var buf = ArrayPool.Shared.Rent(0x10000); + + try { - // onAccept - using (buffer) + while (isAlive) { - if (buffer.Count == 0 || !isAlive) - return; + try + { + var cb = await stream.ReadAsync(buf, 0, buf.Length); + + if (cb == 0 || !isAlive) + { + if(isAlive) + onCompleted(); - LastReceive = clock.Now; + break; + } - plb.Receive(buffer, buffer.Count, - (src, dst, count) => src.ReadBytes(dst, count), - onNext, - onError); - } - }, (handle, ex) => - { - // onError - onError(ex); - }, handle => - { - // onCompleted - isAlive = false; - onCompleted(); + LastReceive = clock.Now; - // release handles - sendQueueDrainer.UserToken = null; - sendQueueDrainer.Dispose(); + await plb.ReceiveAsync(buf, cb, + (src, dst, count) => Array.Copy(src, dst, count), + onNext, + onError); + } - // empty queues - while (sendQueue.TryDequeue(out var fragment)) - fragment.Dispose(); + catch (ObjectDisposedException) + { + Debug.Assert(!isAlive); + break; + } - plb.Dispose(); + catch (Exception ex) + { + if (isAlive) + onError(ex); - handle.CloseHandle(); - }); - } + break; + } + } - private void SendInternal(PooledArraySegment buffer) - { - try - { - sendQueue.Enqueue(buffer); - sendQueueDrainer.Send(); + logger.Trace(() => $"[{ConnectionId}] DoReceive loop exited"); } - catch (ObjectDisposedException) + finally { - buffer.Dispose(); + ArrayPool.Shared.Return(buf); } } - private void DrainSendQueue(Async handle) + private async void DoSend(Stream stream, Action onError) { - try + while(isAlive) { - var tcp = (Tcp)handle.UserToken; - - if (tcp?.IsValid == true && !tcp.IsClosing && tcp.IsWritable && sendQueue != null) + try { - var queueSize = sendQueue.Count; - if (queueSize >= 256) - logger.Warn(() => $"[{ConnectionId}] Send queue backlog now at {queueSize}"); - - while (sendQueue.TryDequeue(out var segment)) + while (await sendQueue.OutputAvailableAsync()) { - using (segment) + using (var segment = await sendQueue.ReceiveAsync()) { - tcp.QueueWrite(segment.Array, 0, segment.Size); + await stream.WriteAsync(segment.Array, segment.Offset, segment.Size); } } } - } - catch (Exception ex) - { - logger.Error(ex); + catch (ObjectDisposedException) + { + Debug.Assert(!isAlive); + break; + } + + catch (Exception ex) + { + if (isAlive) + onError(ex); + + break; + } } + + logger.Trace(() => $"[{ConnectionId}] DoSend loop exited"); } } } diff --git a/src/MiningCore/Stratum/StratumServer.cs b/src/MiningCore/Stratum/StratumServer.cs index ab0dde9e..4997647b 100644 --- a/src/MiningCore/Stratum/StratumServer.cs +++ b/src/MiningCore/Stratum/StratumServer.cs @@ -19,11 +19,15 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ using System; -using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net; +using System.Net.Security; +using System.Net.Sockets; using System.Reactive; +using System.Security.Authentication; +using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; using Autofac; @@ -33,8 +37,6 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using MiningCore.JsonRpc; using MiningCore.Time; using MiningCore.Util; -using NetUV.Core.Handles; -using NetUV.Core.Native; using Newtonsoft.Json; using NLog; using Contract = MiningCore.Contracts.Contract; @@ -56,7 +58,7 @@ protected StratumServer(IComponentContext ctx, IMasterClock clock) protected readonly IComponentContext ctx; protected readonly IMasterClock clock; - protected readonly Dictionary ports = new Dictionary(); + protected readonly Dictionary ports = new Dictionary(); protected ClusterConfig clusterConfig; protected IBanManager banManager; protected bool disableConnectionLogging = false; @@ -64,184 +66,177 @@ protected StratumServer(IComponentContext ctx, IMasterClock clock) protected abstract string LogCat { get; } - public void StartListeners(string id, params IPEndPoint[] stratumPorts) + public void Start(string id, params (IPEndPoint IPEndpoint, PoolEndpoint PoolEndpoint)[] stratumPorts) { Contract.RequiresNonNull(stratumPorts, nameof(stratumPorts)); - // every port gets serviced by a dedicated loop thread - foreach(var endpoint in stratumPorts) + foreach(var port in stratumPorts) { - var thread = new Thread(_ => + Task.Run(async () => { - var loop = new Loop(); + // TLS cert loading + X509Certificate2 tlsCert = null; - try - { - var listener = loop - .CreateTcp() - .NoDelay(true) - .SimultaneousAccepts(false) - .Listen(endpoint, (con, ex) => - { - if (ex == null) - OnClientConnected(con, endpoint, loop); - else - logger.Error(() => $"[{LogCat}] Connection error state: {ex.Message}"); - }); + if (port.PoolEndpoint.Tls) + tlsCert = new X509Certificate2(port.PoolEndpoint.TlsPfxFile); - lock (ports) - { - ports[endpoint.Port] = listener; - } - } + // Setup socket + var server = new Socket(SocketType.Stream, ProtocolType.Tcp); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + server.Bind(port.IPEndpoint); + server.Listen(512); - catch (Exception ex) + lock (ports) { - logger.Error(ex, $"[{LogCat}] {ex}"); - throw; + ports[port.IPEndpoint.Port] = server; } - logger.Info(() => $"[{LogCat}] Stratum port {endpoint.Address}:{endpoint.Port} online"); + var portDesc = tlsCert != null ? " [TLS]" : string.Empty; + logger.Info(() => $"[{LogCat}] Stratum port {port.IPEndpoint.Address}:{port.IPEndpoint.Port} online{portDesc}"); - try + while (true) { - loop.RunDefault(); - } + try + { + var socket = await server.AcceptAsync(); - catch(Exception ex) - { - logger.Error(ex, $"[{LogCat}] {ex}"); - } - }) { Name = $"UvLoopThread {id}:{endpoint.Port}" }; + var remoteEndpoint = (IPEndPoint) socket.RemoteEndPoint; + var connectionId = CorrelationIdGenerator.GetNextId(); + logger.Debug(() => $"[{LogCat}] Accepting connection [{connectionId}] from {remoteEndpoint.Address}:{remoteEndpoint.Port}"); - thread.Start(); - } - } + // get rid of banned clients as early as possible + if (banManager?.IsBanned(remoteEndpoint.Address) == true) + { + logger.Debug(() => $"[{LogCat}] Disconnecting banned ip {remoteEndpoint.Address}"); + socket.Close(); + continue; + } - public void StopListeners() - { - lock(ports) - { - var portValues = ports.Values.ToArray(); + // prepare socket + socket.NoDelay = true; - for(int i = 0; i < portValues.Length; i++) - { - var listener = portValues[i]; + // create stream + var stream = (Stream) new NetworkStream(socket, true); - listener.Shutdown((tcp, ex) => - { - if (tcp?.IsValid == true) - tcp.Dispose(); - }); - } - } - } + // TLS handshake + if (port.PoolEndpoint.Tls) + { + SslStream sslStream = null; - private void OnClientConnected(Tcp con, IPEndPoint endpointConfig, Loop loop) - { - try - { - var remoteEndPoint = con.GetPeerEndPoint(); + try + { + sslStream = new SslStream(stream, false); + await sslStream.AuthenticateAsServerAsync(tlsCert, false, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false); + } - // get rid of banned clients as early as possible - if (banManager?.IsBanned(remoteEndPoint.Address) == true) - { - logger.Debug(() => $"[{LogCat}] Disconnecting banned ip {remoteEndPoint.Address}"); - con.Dispose(); - return; - } + catch (Exception ex) + { + logger.Error(() => $"[{LogCat}] TLS init failed: {ex.Message}: {ex.InnerException.ToString() ?? string.Empty}"); + (sslStream ?? stream).Close(); + continue; + } - var connectionId = CorrelationIdGenerator.GetNextId(); - logger.Debug(() => $"[{LogCat}] Accepting connection [{connectionId}] from {remoteEndPoint.Address}:{remoteEndPoint.Port}"); + stream = sslStream; + } - // setup client connection - con.KeepAlive(true, 1); + // setup client + var client = new StratumClient(clock, port.IPEndpoint, connectionId); - // setup client - var client = new StratumClient(); + lock (clients) + { + clients[connectionId] = client; + } - client.Init(loop, con, ctx, clock, endpointConfig, connectionId, - data => OnReceive(client, data), - () => OnReceiveComplete(client), - ex => OnReceiveError(client, ex)); + OnConnect(client); - // register client - lock(clients) - { - clients[connectionId] = client; - } + client.Start(stream, remoteEndpoint, + data => OnReceiveAsync(client, data), + () => OnReceiveComplete(client), + ex => OnReceiveError(client, ex)); + } - OnConnect(client); + catch (Exception ex) + { + logger.Error(ex, () => Thread.CurrentThread.Name); + } + } + }); } + } - catch(Exception ex) + public void Stop() + { + lock(ports) { - logger.Error(ex, () => nameof(OnClientConnected)); + var portValues = ports.Values.ToArray(); + + for(int i = 0; i < portValues.Length; i++) + { + var socket = portValues[i]; + + socket.Close(); + } } } - protected virtual void OnReceive(StratumClient client, PooledArraySegment data) + protected virtual async Task OnReceiveAsync(StratumClient client, PooledArraySegment data) { - // get off of LibUV event-loop-thread immediately - Task.Run(async () => + using (data) { - using (data) - { - JsonRpcRequest request = null; + JsonRpcRequest request = null; - try + try + { + // boot pre-connected clients + if (banManager?.IsBanned(client.RemoteEndpoint.Address) == true) { - // boot pre-connected clients - if (banManager?.IsBanned(client.RemoteEndpoint.Address) == true) - { - logger.Info(() => $"[{LogCat}] [{client.ConnectionId}] Disconnecting banned client @ {client.RemoteEndpoint.Address}"); - DisconnectClient(client); - return; - } - - // de-serialize - logger.Trace(() => $"[{LogCat}] [{client.ConnectionId}] Received request data: {StratumConstants.Encoding.GetString(data.Array, 0, data.Size)}"); - request = client.DeserializeRequest(data); + logger.Info(() => $"[{LogCat}] [{client.ConnectionId}] Disconnecting banned client @ {client.RemoteEndpoint.Address}"); + DisconnectClient(client); + return; + } - // dispatch - if (request != null) - { - logger.Debug(() => $"[{LogCat}] [{client.ConnectionId}] Dispatching request '{request.Method}' [{request.Id}]"); - await OnRequestAsync(client, new Timestamped(request, clock.Now)); - } + // de-serialize + logger.Trace(() => $"[{LogCat}] [{client.ConnectionId}] Received request data: {StratumConstants.Encoding.GetString(data.Array, 0, data.Size)}"); + request = client.DeserializeRequest(data); - else - logger.Trace(() => $"[{LogCat}] [{client.ConnectionId}] Unable to deserialize request"); + // dispatch + if (request != null) + { + logger.Debug(() => $"[{LogCat}] [{client.ConnectionId}] Dispatching request '{request.Method}' [{request.Id}]"); + await OnRequestAsync(client, new Timestamped(request, clock.Now)); } - catch (JsonReaderException jsonEx) - { - // junk received (no valid json) - logger.Error(() => $"[{LogCat}] [{client.ConnectionId}] Connection json error state: {jsonEx.Message}"); + else + logger.Trace(() => $"[{LogCat}] [{client.ConnectionId}] Unable to deserialize request"); + } - if (clusterConfig.Banning?.BanOnJunkReceive.HasValue == false || clusterConfig.Banning?.BanOnJunkReceive == true) - { - logger.Info(() => $"[{LogCat}] [{client.ConnectionId}] Banning client for sending junk"); - banManager?.Ban(client.RemoteEndpoint.Address, TimeSpan.FromMinutes(30)); - } - } + catch (JsonReaderException jsonEx) + { + // junk received (no valid json) + logger.Error(() => $"[{LogCat}] [{client.ConnectionId}] Connection json error state: {jsonEx.Message}"); - catch (Exception ex) + if (clusterConfig.Banning?.BanOnJunkReceive.HasValue == false || clusterConfig.Banning?.BanOnJunkReceive == true) { - if (request != null) - logger.Error(ex, () => $"[{LogCat}] [{client.ConnectionId}] Error processing request {request.Method} [{request.Id}]"); + logger.Info(() => $"[{LogCat}] [{client.ConnectionId}] Banning client for sending junk"); + banManager?.Ban(client.RemoteEndpoint.Address, TimeSpan.FromMinutes(30)); } } - }); + + catch (Exception ex) + { + if (request != null) + logger.Error(ex, () => $"[{LogCat}] [{client.ConnectionId}] Error processing request {request.Method} [{request.Id}]"); + } + } } protected virtual void OnReceiveError(StratumClient client, Exception ex) { switch (ex) { - case OperationException opEx: + case SocketException opEx: // log everything but ECONNRESET which just indicates the client disconnecting - if (opEx.ErrorCode != ErrorCode.ECONNRESET) + if (opEx.SocketErrorCode != SocketError.ConnectionReset) logger.Error(() => $"[{LogCat}] [{client.ConnectionId}] Connection error state: {ex.Message}"); break; diff --git a/src/MiningCore/Util/PooledLineBuffer.cs b/src/MiningCore/Util/PooledLineBuffer.cs index 1a1d4648..20e60955 100644 --- a/src/MiningCore/Util/PooledLineBuffer.cs +++ b/src/MiningCore/Util/PooledLineBuffer.cs @@ -4,6 +4,7 @@ using System.IO; using System.Linq; using System.Text; +using System.Threading.Tasks; using MiningCore.Buffers; using MiningCore.Extensions; using NLog; @@ -154,5 +155,126 @@ public void Receive(T buffer, int bufferSize, ByteArrayPool.Return(buf); } } + + public async Task ReceiveAsync(T buffer, int bufferSize, + Action readBuffer, + Func, Task> onNext, + Action onError, + bool forceNewLine = false) + { + if (bufferSize == 0) + return; + + // prevent flooding + if (maxLength.HasValue && bufferSize > maxLength) + { + onError(new InvalidDataException($"Incoming data exceeds maximum of {maxLength.Value}")); + return; + } + + var remaining = bufferSize; + var buf = ArrayPool.Shared.Rent(bufferSize); + var prevIndex = 0; + var keepLease = false; + + try + { + // clear left-over contents + if (buf.Length > bufferSize) + Array.Clear(buf, bufferSize, buf.Length - bufferSize); + + // read buffer + readBuffer(buffer, buf, bufferSize); + + // diagnostics + logger.Trace(() => $"recv: {Encoding.GetString(buf, 0, bufferSize)}"); + + while (remaining > 0) + { + // check if we got a newline + var index = buf.IndexOf(0xa, prevIndex, buf.Length - prevIndex); + var found = index != -1; + + if (found || forceNewLine) + { + // fastpath + if (!forceNewLine && index + 1 == bufferSize && recvQueue.Count == 0) + { + var length = index - prevIndex; + + if (length > 0) + { + await onNext(new PooledArraySegment(buf, prevIndex, length)); + keepLease = true; + } + + break; + } + + // assemble line buffer + var queuedLength = recvQueue.Sum(x => x.Size); + var segmentLength = !forceNewLine ? index - prevIndex : bufferSize - prevIndex; + var lineLength = queuedLength + segmentLength; + var line = ArrayPool.Shared.Rent(lineLength); + var offset = 0; + + while (recvQueue.TryDequeue(out var segment)) + { + using (segment) + { + Array.Copy(segment.Array, 0, line, offset, segment.Size); + offset += segment.Size; + } + } + + // append remaining characters + if (segmentLength > 0) + Array.Copy(buf, prevIndex, line, offset, segmentLength); + + // emit + if (lineLength > 0) + await onNext(new PooledArraySegment(line, 0, lineLength)); + + if (forceNewLine) + break; + + prevIndex = index + 1; + remaining -= segmentLength + 1; + continue; + } + + // store + if (prevIndex != 0) + { + var segmentLength = bufferSize - prevIndex; + + if (segmentLength > 0) + { + var fragment = ArrayPool.Shared.Rent(segmentLength); + Array.Copy(buf, prevIndex, fragment, 0, segmentLength); + recvQueue.Enqueue(new PooledArraySegment(fragment, 0, segmentLength)); + } + } + + else + { + recvQueue.Enqueue(new PooledArraySegment(buf, 0, remaining)); + keepLease = true; + } + + // prevent flooding + if (maxLength.HasValue && recvQueue.Sum(x => x.Size) > maxLength.Value) + onError(new InvalidDataException($"Incoming request size exceeds maximum of {maxLength.Value}")); + + break; + } + } + + finally + { + if (!keepLease) + ByteArrayPool.Return(buf); + } + } } }