From fb6d89368e52f6b49a5dadd34c496814cee10b0d Mon Sep 17 00:00:00 2001 From: Master Kwoth Date: Mon, 23 Oct 2017 19:46:59 +0200 Subject: [PATCH] fixed .shardstats and shard will now auto-restart after crashing or getting disconnected for more than 15 seconds --- .../Modules/Administration/SelfCommands.cs | 2 +- NadekoBot.Core/NadekoBot.Core.csproj | 6 + .../Services/Impl/BotCredentials.cs | 2 +- NadekoBot.Core/Services/Impl/StatsService.cs | 6 +- NadekoBot.Core/Services/NadekoBot.cs | 2 +- NadekoBot.Core/Services/ShardsCoordinator.cs | 184 ++++++++++++++++-- NadekoBot.Core/_Extensions/Extensions.cs | 6 +- NadekoBot.sln | 6 +- 8 files changed, 189 insertions(+), 25 deletions(-) diff --git a/NadekoBot.Core/Modules/Administration/SelfCommands.cs b/NadekoBot.Core/Modules/Administration/SelfCommands.cs index b4665b98..5ba0c7e8 100644 --- a/NadekoBot.Core/Modules/Administration/SelfCommands.cs +++ b/NadekoBot.Core/Modules/Administration/SelfCommands.cs @@ -240,7 +240,7 @@ namespace NadekoBot.Modules.Administration .Select(x => { var timeDiff = DateTime.UtcNow - x.Time; - if (timeDiff > TimeSpan.FromSeconds(20)) + if (timeDiff >= TimeSpan.FromSeconds(30)) return $"Shard #{Format.Bold(x.ShardId.ToString())} **UNRESPONSIVE** for {timeDiff.ToString(@"hh\:mm\:ss")}"; return GetText("shard_stats_txt", x.ShardId.ToString(), Format.Bold(x.ConnectionState.ToString()), Format.Bold(x.Guilds.ToString()), timeDiff.ToString(@"hh\:mm\:ss")); diff --git a/NadekoBot.Core/NadekoBot.Core.csproj b/NadekoBot.Core/NadekoBot.Core.csproj index 7e1d5a24..d654b138 100644 --- a/NadekoBot.Core/NadekoBot.Core.csproj +++ b/NadekoBot.Core/NadekoBot.Core.csproj @@ -44,4 +44,10 @@ + + + $(DefineConstants);GLOBAL_NADEKO + $(NoWarn);CS1573;CS1591 + + diff --git a/NadekoBot.Core/Services/Impl/BotCredentials.cs b/NadekoBot.Core/Services/Impl/BotCredentials.cs index b7176b47..767f5bf0 100644 --- a/NadekoBot.Core/Services/Impl/BotCredentials.cs +++ b/NadekoBot.Core/Services/Impl/BotCredentials.cs @@ -79,7 +79,7 @@ namespace NadekoBot.Core.Services.Impl if (string.IsNullOrWhiteSpace(ShardRunCommand)) ShardRunCommand = "dotnet"; if (string.IsNullOrWhiteSpace(ShardRunArguments)) - ShardRunArguments = "run -c Release -- {0} {1} {2}"; + ShardRunArguments = "run -c Release -- {0} {1}"; var portStr = data[nameof(ShardRunPort)]; if (string.IsNullOrWhiteSpace(portStr)) diff --git a/NadekoBot.Core/Services/Impl/StatsService.cs b/NadekoBot.Core/Services/Impl/StatsService.cs index 26cb34dd..742e7e3e 100644 --- a/NadekoBot.Core/Services/Impl/StatsService.cs +++ b/NadekoBot.Core/Services/Impl/StatsService.cs @@ -21,12 +21,12 @@ namespace NadekoBot.Core.Services.Impl private readonly IBotCredentials _creds; private readonly DateTime _started; - public const string BotVersion = "2.0.3"; + public const string BotVersion = "2.0.4"; public string Author => "Kwoth#2560"; public string Library => "Discord.Net"; - public string Heap => - Math.Round((double)GC.GetTotalMemory(false) / 1.MiB(), 2).ToString(CultureInfo.InvariantCulture); + public string Heap => Math.Round((double)GC.GetTotalMemory(false) / 1.MiB(), 2) + .ToString(CultureInfo.InvariantCulture); public double MessagesPerSecond => MessageCounter / GetUptime().TotalSeconds; private long _textChannels; diff --git a/NadekoBot.Core/Services/NadekoBot.cs b/NadekoBot.Core/Services/NadekoBot.cs index 6e974535..e68777cd 100644 --- a/NadekoBot.Core/Services/NadekoBot.cs +++ b/NadekoBot.Core/Services/NadekoBot.cs @@ -115,7 +115,7 @@ namespace NadekoBot var msg = JsonConvert.SerializeObject(data); await sub.PublishAsync(Credentials.RedisKey() + "_shardcoord_send", msg).ConfigureAwait(false); - await Task.Delay(5000); + await Task.Delay(7500); } }); } diff --git a/NadekoBot.Core/Services/ShardsCoordinator.cs b/NadekoBot.Core/Services/ShardsCoordinator.cs index 4f4aa948..25b36a88 100644 --- a/NadekoBot.Core/Services/ShardsCoordinator.cs +++ b/NadekoBot.Core/Services/ShardsCoordinator.cs @@ -7,11 +7,44 @@ using NadekoBot.Common.ShardCom; using StackExchange.Redis; using Newtonsoft.Json; using NadekoBot.Extensions; +using NadekoBot.Common.Collections; +using System.Linq; +using System.Collections.Generic; namespace NadekoBot.Core.Services { public class ShardsCoordinator { + private class ShardsCoordinatorQueue + { + private readonly object _locker = new object(); + private readonly HashSet _set = new HashSet(); + private readonly Queue _queue = new Queue(); + public int Count => _queue.Count; + + public void Enqueue(int i) + { + lock (_locker) + { + if (_set.Add(i)) + _queue.Enqueue(i); + } + } + + public bool TryDequeue(out int id) + { + lock (_locker) + { + if (_queue.TryDequeue(out id)) + { + _set.Remove(id); + return true; + } + } + return false; + } + } + private readonly BotCredentials _creds; private readonly string _key; private readonly Process[] _shardProcesses; @@ -21,6 +54,12 @@ namespace NadekoBot.Core.Services private readonly ConnectionMultiplexer _redis; private ShardComMessage _defaultShardState; + private ShardsCoordinatorQueue _shardStartQueue = + new ShardsCoordinatorQueue(); + + private ConcurrentHashSet _shardRestartWaitingList = + new ConcurrentHashSet(); + public ShardsCoordinator() { //load main stuff @@ -38,13 +77,27 @@ namespace NadekoBot.Core.Services { ConnectionState = Discord.ConnectionState.Disconnected, Guilds = 0, - Time = DateTime.Now - TimeSpan.FromMinutes(1) + Time = DateTime.UtcNow }; var db = _redis.GetDatabase(); + //clear previous statuses + db.KeyDelete(_key + "_shardstats"); + _shardProcesses = new Process[_creds.TotalShards]; for (int i = 0; i < _creds.TotalShards; i++) { + //add it to the list of shards which should be started +#if DEBUG + if (i > 0) + _shardStartQueue.Enqueue(i); +#else + _shardStartQueue.Enqueue(i); +#endif + //set the shard's initial state in redis cache _defaultShardState.ShardId = i; + //this is to avoid the shard coordinator thinking that + //the shard is unresponsive while startup up + _defaultShardState.Time = DateTime.UtcNow + TimeSpan.FromSeconds(20 * i); db.ListRightPush(_key + "_shardstats", JsonConvert.SerializeObject(_defaultShardState), flags: CommandFlags.FireAndForget); @@ -52,20 +105,25 @@ namespace NadekoBot.Core.Services _curProcessId = Process.GetCurrentProcess().Id; - _redis = ConnectionMultiplexer.Connect("127.0.0.1"); + //subscribe to shardcoord events var sub = _redis.GetSubscriber(); + + //send is called when shard status is updated. Every 7.5 seconds atm sub.Subscribe(_key + "_shardcoord_send", OnDataReceived, CommandFlags.FireAndForget); + //restart is called when shzard should be stopped and then started again sub.Subscribe(_key + "_shardcoord_restart", OnRestart, CommandFlags.FireAndForget); + //called to kill the shard sub.Subscribe(_key + "_shardcoord_stop", OnStop, CommandFlags.FireAndForget); + //called kill the bot sub.Subscribe(_key + "_die", (ch, x) => Environment.Exit(0), CommandFlags.FireAndForget); @@ -74,6 +132,11 @@ namespace NadekoBot.Core.Services private void OnStop(RedisChannel ch, RedisValue data) { var shardId = JsonConvert.DeserializeObject(data); + OnStop(shardId); + } + + private void OnStop(int shardId) + { var db = _redis.GetDatabase(); _defaultShardState.ShardId = shardId; db.ListSetByIndex(_key + "_shardstats", @@ -88,8 +151,8 @@ namespace NadekoBot.Core.Services private void OnRestart(RedisChannel ch, RedisValue data) { - OnStop(ch, data); var shardId = JsonConvert.DeserializeObject(data); + OnStop(shardId); _shardProcesses[shardId] = StartShard(shardId); } @@ -99,6 +162,7 @@ namespace NadekoBot.Core.Services if (msg == null) return; var db = _redis.GetDatabase(); + //sets the shard state db.ListSetByIndex(_key + "_shardstats", msg.ShardId, data, @@ -107,24 +171,118 @@ namespace NadekoBot.Core.Services || msg.ConnectionState == Discord.ConnectionState.Disconnecting) { _log.Error("!!! SHARD {0} IS IN {1} STATE !!!", msg.ShardId, msg.ConnectionState.ToString()); + + OnShardUnavailable(msg.ShardId); + } + else + { + // remove the shard from the waiting list if it's on it, + // because it's connected/connecting now + _shardRestartWaitingList.TryRemove(msg.ShardId); } return; } + private void OnShardUnavailable(int shardId) + { + //if the shard is dc'd, add it to the restart waiting list + if (!_shardRestartWaitingList.Add(shardId)) + { + //if it's already on the waiting list + //stop the shard + OnStop(shardId); + //add it to the start queue (start the shard) + _shardStartQueue.Enqueue(shardId); + //remove it from the waiting list + _shardRestartWaitingList.TryRemove(shardId); + } + } + public async Task RunAsync() { - int i = 0; -#if DEBUG - i = 1; -#endif - - for (; i < _creds.TotalShards; i++) + //this task will complete when the initial start of the shards + //is complete, but will keep running in order to restart shards + //which are disconnected for too long + TaskCompletionSource tsc = new TaskCompletionSource(); + var _ = Task.Run(async () => { - var p = StartShard(i); + do + { + //start a shard which is scheduled for start every 6 seconds + while (_shardStartQueue.TryDequeue(out var id)) + { + // if the shard is on the waiting list again + // remove it since it's starting up now - _shardProcesses[i] = p; - await Task.Delay(6000); - } + _shardRestartWaitingList.TryRemove(id); + //if the task is already completed, + //it means the initial shard starting is done, + //and this is an auto-restart + if (tsc.Task.IsCompleted) + { + _log.Warn("Auto-restarting shard {0}", id); + } + var p = StartShard(id); + + _shardProcesses[id] = p; + await Task.Delay(6000).ConfigureAwait(false); + } + tsc.TrySetResult(true); + } + while (true); + // ^ keep checking for shards which need to be restarted + }); + + //restart unresponsive shards + _ = Task.Run(async () => + { + //after all shards have started initially + await tsc.Task.ConfigureAwait(false); + while (true) + { + try + { + var db = _redis.GetDatabase(); + //get all shards which didn't communicate their status in the last 30 seconds + var all = db.ListRange(_creds.RedisKey() + "_shardstats") + .Select(x => JsonConvert.DeserializeObject(x)); + var statuses = all + .Where(x => x.Time < DateTime.UtcNow - TimeSpan.FromSeconds(30)); + + if (!statuses.Any()) + { + for (var i = 0; i < _shardProcesses.Length; i++) + { + var p = _shardProcesses[i]; + if (p == null || p.HasExited) + _shardStartQueue.Enqueue(i); + } + } + else + { + foreach (var s in statuses) + { + OnStop(s.ShardId); + _shardStartQueue.Enqueue(s.ShardId); + + //to prevent shards which are already scheduled for restart to be scheduled again + s.Time = DateTime.UtcNow + TimeSpan.FromSeconds(30 * _shardStartQueue.Count); + db.ListSetByIndex(_key + "_shardstats", s.ShardId, + JsonConvert.SerializeObject(s), CommandFlags.FireAndForget); + _log.Warn("Shard {0} is scheduled for a restart because it's unresponsive.", s.ShardId); + } + } + } + catch (Exception ex) { _log.Error(ex); throw; } + finally + { + await Task.Delay(10000).ConfigureAwait(false); + } + } + }); + + await tsc.Task.ConfigureAwait(false); + return; } private Process StartShard(int shardId) diff --git a/NadekoBot.Core/_Extensions/Extensions.cs b/NadekoBot.Core/_Extensions/Extensions.cs index ff22382a..c4e3c85c 100644 --- a/NadekoBot.Core/_Extensions/Extensions.cs +++ b/NadekoBot.Core/_Extensions/Extensions.cs @@ -58,6 +58,9 @@ namespace NadekoBot.Extensions return new PathCollection(cornerToptLeft, cornerBottomLeft, cornerTopRight, cornerBottomRight); } + /// + /// First 10 characters of teh bot token. + /// public static string RedisKey(this IBotCredentials bc) { return bc.Token.Substring(0, 10); @@ -147,9 +150,6 @@ namespace NadekoBot.Extensions return module; } - //public static async Task> MentionedUsers(this IUserMessage msg) => - - public static void AddRange(this HashSet target, IEnumerable elements) where T : class { foreach (var item in elements) diff --git a/NadekoBot.sln b/NadekoBot.sln index ea86b972..79ae8766 100644 --- a/NadekoBot.sln +++ b/NadekoBot.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 -VisualStudioVersion = 15.0.26730.16 +VisualStudioVersion = 15.0.27004.2005 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{04929013-5BAB-42B0-B9B2-8F2BB8F16AF2}" EndProject @@ -30,8 +30,8 @@ Global {45EC1473-C678-4857-A544-07DFE0D0B478}.Release|Any CPU.Build.0 = Release|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Debug|Any CPU.Build.0 = Debug|Any CPU - {A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.ActiveCfg = Debug|Any CPU - {A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.Build.0 = Debug|Any CPU + {A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.ActiveCfg = Release|Any CPU + {A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.Build.0 = Release|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Release|Any CPU.ActiveCfg = Release|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection