fixed .shardstats and shard will now auto-restart after crashing or getting disconnected for more than 15 seconds

This commit is contained in:
Master Kwoth 2017-10-23 19:46:59 +02:00
parent d596eb1895
commit fb6d89368e
8 changed files with 189 additions and 25 deletions

View File

@ -240,7 +240,7 @@ namespace NadekoBot.Modules.Administration
.Select(x => .Select(x =>
{ {
var timeDiff = DateTime.UtcNow - x.Time; var timeDiff = DateTime.UtcNow - x.Time;
if (timeDiff > TimeSpan.FromSeconds(20)) if (timeDiff >= TimeSpan.FromSeconds(30))
return $"Shard #{Format.Bold(x.ShardId.ToString())} **UNRESPONSIVE** for {timeDiff.ToString(@"hh\:mm\:ss")}"; return $"Shard #{Format.Bold(x.ShardId.ToString())} **UNRESPONSIVE** for {timeDiff.ToString(@"hh\:mm\:ss")}";
return GetText("shard_stats_txt", x.ShardId.ToString(), return GetText("shard_stats_txt", x.ShardId.ToString(),
Format.Bold(x.ConnectionState.ToString()), Format.Bold(x.Guilds.ToString()), timeDiff.ToString(@"hh\:mm\:ss")); Format.Bold(x.ConnectionState.ToString()), Format.Bold(x.Guilds.ToString()), timeDiff.ToString(@"hh\:mm\:ss"));

View File

@ -44,4 +44,10 @@
<ItemGroup> <ItemGroup>
<None Include="Modules\Utility\NadekoBot.Modules.Searches.csproj" /> <None Include="Modules\Utility\NadekoBot.Modules.Searches.csproj" />
</ItemGroup> </ItemGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'GlobalNadeko' ">
<DefineConstants>$(DefineConstants);GLOBAL_NADEKO</DefineConstants>
<NoWarn>$(NoWarn);CS1573;CS1591</NoWarn>
</PropertyGroup>
</Project> </Project>

View File

@ -79,7 +79,7 @@ namespace NadekoBot.Core.Services.Impl
if (string.IsNullOrWhiteSpace(ShardRunCommand)) if (string.IsNullOrWhiteSpace(ShardRunCommand))
ShardRunCommand = "dotnet"; ShardRunCommand = "dotnet";
if (string.IsNullOrWhiteSpace(ShardRunArguments)) if (string.IsNullOrWhiteSpace(ShardRunArguments))
ShardRunArguments = "run -c Release -- {0} {1} {2}"; ShardRunArguments = "run -c Release -- {0} {1}";
var portStr = data[nameof(ShardRunPort)]; var portStr = data[nameof(ShardRunPort)];
if (string.IsNullOrWhiteSpace(portStr)) if (string.IsNullOrWhiteSpace(portStr))

View File

@ -21,12 +21,12 @@ namespace NadekoBot.Core.Services.Impl
private readonly IBotCredentials _creds; private readonly IBotCredentials _creds;
private readonly DateTime _started; private readonly DateTime _started;
public const string BotVersion = "2.0.3"; public const string BotVersion = "2.0.4";
public string Author => "Kwoth#2560"; public string Author => "Kwoth#2560";
public string Library => "Discord.Net"; public string Library => "Discord.Net";
public string Heap => public string Heap => Math.Round((double)GC.GetTotalMemory(false) / 1.MiB(), 2)
Math.Round((double)GC.GetTotalMemory(false) / 1.MiB(), 2).ToString(CultureInfo.InvariantCulture); .ToString(CultureInfo.InvariantCulture);
public double MessagesPerSecond => MessageCounter / GetUptime().TotalSeconds; public double MessagesPerSecond => MessageCounter / GetUptime().TotalSeconds;
private long _textChannels; private long _textChannels;

View File

@ -115,7 +115,7 @@ namespace NadekoBot
var msg = JsonConvert.SerializeObject(data); var msg = JsonConvert.SerializeObject(data);
await sub.PublishAsync(Credentials.RedisKey() + "_shardcoord_send", msg).ConfigureAwait(false); await sub.PublishAsync(Credentials.RedisKey() + "_shardcoord_send", msg).ConfigureAwait(false);
await Task.Delay(5000); await Task.Delay(7500);
} }
}); });
} }

View File

@ -7,11 +7,44 @@ using NadekoBot.Common.ShardCom;
using StackExchange.Redis; using StackExchange.Redis;
using Newtonsoft.Json; using Newtonsoft.Json;
using NadekoBot.Extensions; using NadekoBot.Extensions;
using NadekoBot.Common.Collections;
using System.Linq;
using System.Collections.Generic;
namespace NadekoBot.Core.Services namespace NadekoBot.Core.Services
{ {
public class ShardsCoordinator public class ShardsCoordinator
{ {
private class ShardsCoordinatorQueue
{
private readonly object _locker = new object();
private readonly HashSet<int> _set = new HashSet<int>();
private readonly Queue<int> _queue = new Queue<int>();
public int Count => _queue.Count;
public void Enqueue(int i)
{
lock (_locker)
{
if (_set.Add(i))
_queue.Enqueue(i);
}
}
public bool TryDequeue(out int id)
{
lock (_locker)
{
if (_queue.TryDequeue(out id))
{
_set.Remove(id);
return true;
}
}
return false;
}
}
private readonly BotCredentials _creds; private readonly BotCredentials _creds;
private readonly string _key; private readonly string _key;
private readonly Process[] _shardProcesses; private readonly Process[] _shardProcesses;
@ -21,6 +54,12 @@ namespace NadekoBot.Core.Services
private readonly ConnectionMultiplexer _redis; private readonly ConnectionMultiplexer _redis;
private ShardComMessage _defaultShardState; private ShardComMessage _defaultShardState;
private ShardsCoordinatorQueue _shardStartQueue =
new ShardsCoordinatorQueue();
private ConcurrentHashSet<int> _shardRestartWaitingList =
new ConcurrentHashSet<int>();
public ShardsCoordinator() public ShardsCoordinator()
{ {
//load main stuff //load main stuff
@ -38,13 +77,27 @@ namespace NadekoBot.Core.Services
{ {
ConnectionState = Discord.ConnectionState.Disconnected, ConnectionState = Discord.ConnectionState.Disconnected,
Guilds = 0, Guilds = 0,
Time = DateTime.Now - TimeSpan.FromMinutes(1) Time = DateTime.UtcNow
}; };
var db = _redis.GetDatabase(); var db = _redis.GetDatabase();
//clear previous statuses
db.KeyDelete(_key + "_shardstats");
_shardProcesses = new Process[_creds.TotalShards]; _shardProcesses = new Process[_creds.TotalShards];
for (int i = 0; i < _creds.TotalShards; i++) for (int i = 0; i < _creds.TotalShards; i++)
{ {
//add it to the list of shards which should be started
#if DEBUG
if (i > 0)
_shardStartQueue.Enqueue(i);
#else
_shardStartQueue.Enqueue(i);
#endif
//set the shard's initial state in redis cache
_defaultShardState.ShardId = i; _defaultShardState.ShardId = i;
//this is to avoid the shard coordinator thinking that
//the shard is unresponsive while startup up
_defaultShardState.Time = DateTime.UtcNow + TimeSpan.FromSeconds(20 * i);
db.ListRightPush(_key + "_shardstats", db.ListRightPush(_key + "_shardstats",
JsonConvert.SerializeObject(_defaultShardState), JsonConvert.SerializeObject(_defaultShardState),
flags: CommandFlags.FireAndForget); flags: CommandFlags.FireAndForget);
@ -52,20 +105,25 @@ namespace NadekoBot.Core.Services
_curProcessId = Process.GetCurrentProcess().Id; _curProcessId = Process.GetCurrentProcess().Id;
_redis = ConnectionMultiplexer.Connect("127.0.0.1"); //subscribe to shardcoord events
var sub = _redis.GetSubscriber(); var sub = _redis.GetSubscriber();
//send is called when shard status is updated. Every 7.5 seconds atm
sub.Subscribe(_key + "_shardcoord_send", sub.Subscribe(_key + "_shardcoord_send",
OnDataReceived, OnDataReceived,
CommandFlags.FireAndForget); CommandFlags.FireAndForget);
//restart is called when shzard should be stopped and then started again
sub.Subscribe(_key + "_shardcoord_restart", sub.Subscribe(_key + "_shardcoord_restart",
OnRestart, OnRestart,
CommandFlags.FireAndForget); CommandFlags.FireAndForget);
//called to kill the shard
sub.Subscribe(_key + "_shardcoord_stop", sub.Subscribe(_key + "_shardcoord_stop",
OnStop, OnStop,
CommandFlags.FireAndForget); CommandFlags.FireAndForget);
//called kill the bot
sub.Subscribe(_key + "_die", sub.Subscribe(_key + "_die",
(ch, x) => Environment.Exit(0), (ch, x) => Environment.Exit(0),
CommandFlags.FireAndForget); CommandFlags.FireAndForget);
@ -74,6 +132,11 @@ namespace NadekoBot.Core.Services
private void OnStop(RedisChannel ch, RedisValue data) private void OnStop(RedisChannel ch, RedisValue data)
{ {
var shardId = JsonConvert.DeserializeObject<int>(data); var shardId = JsonConvert.DeserializeObject<int>(data);
OnStop(shardId);
}
private void OnStop(int shardId)
{
var db = _redis.GetDatabase(); var db = _redis.GetDatabase();
_defaultShardState.ShardId = shardId; _defaultShardState.ShardId = shardId;
db.ListSetByIndex(_key + "_shardstats", db.ListSetByIndex(_key + "_shardstats",
@ -88,8 +151,8 @@ namespace NadekoBot.Core.Services
private void OnRestart(RedisChannel ch, RedisValue data) private void OnRestart(RedisChannel ch, RedisValue data)
{ {
OnStop(ch, data);
var shardId = JsonConvert.DeserializeObject<int>(data); var shardId = JsonConvert.DeserializeObject<int>(data);
OnStop(shardId);
_shardProcesses[shardId] = StartShard(shardId); _shardProcesses[shardId] = StartShard(shardId);
} }
@ -99,6 +162,7 @@ namespace NadekoBot.Core.Services
if (msg == null) if (msg == null)
return; return;
var db = _redis.GetDatabase(); var db = _redis.GetDatabase();
//sets the shard state
db.ListSetByIndex(_key + "_shardstats", db.ListSetByIndex(_key + "_shardstats",
msg.ShardId, msg.ShardId,
data, data,
@ -107,24 +171,118 @@ namespace NadekoBot.Core.Services
|| msg.ConnectionState == Discord.ConnectionState.Disconnecting) || msg.ConnectionState == Discord.ConnectionState.Disconnecting)
{ {
_log.Error("!!! SHARD {0} IS IN {1} STATE !!!", msg.ShardId, msg.ConnectionState.ToString()); _log.Error("!!! SHARD {0} IS IN {1} STATE !!!", msg.ShardId, msg.ConnectionState.ToString());
OnShardUnavailable(msg.ShardId);
}
else
{
// remove the shard from the waiting list if it's on it,
// because it's connected/connecting now
_shardRestartWaitingList.TryRemove(msg.ShardId);
} }
return; return;
} }
private void OnShardUnavailable(int shardId)
{
//if the shard is dc'd, add it to the restart waiting list
if (!_shardRestartWaitingList.Add(shardId))
{
//if it's already on the waiting list
//stop the shard
OnStop(shardId);
//add it to the start queue (start the shard)
_shardStartQueue.Enqueue(shardId);
//remove it from the waiting list
_shardRestartWaitingList.TryRemove(shardId);
}
}
public async Task RunAsync() public async Task RunAsync()
{ {
int i = 0; //this task will complete when the initial start of the shards
#if DEBUG //is complete, but will keep running in order to restart shards
i = 1; //which are disconnected for too long
#endif TaskCompletionSource<bool> tsc = new TaskCompletionSource<bool>();
var _ = Task.Run(async () =>
for (; i < _creds.TotalShards; i++)
{ {
var p = StartShard(i); do
{
//start a shard which is scheduled for start every 6 seconds
while (_shardStartQueue.TryDequeue(out var id))
{
// if the shard is on the waiting list again
// remove it since it's starting up now
_shardProcesses[i] = p; _shardRestartWaitingList.TryRemove(id);
await Task.Delay(6000); //if the task is already completed,
//it means the initial shard starting is done,
//and this is an auto-restart
if (tsc.Task.IsCompleted)
{
_log.Warn("Auto-restarting shard {0}", id);
} }
var p = StartShard(id);
_shardProcesses[id] = p;
await Task.Delay(6000).ConfigureAwait(false);
}
tsc.TrySetResult(true);
}
while (true);
// ^ keep checking for shards which need to be restarted
});
//restart unresponsive shards
_ = Task.Run(async () =>
{
//after all shards have started initially
await tsc.Task.ConfigureAwait(false);
while (true)
{
try
{
var db = _redis.GetDatabase();
//get all shards which didn't communicate their status in the last 30 seconds
var all = db.ListRange(_creds.RedisKey() + "_shardstats")
.Select(x => JsonConvert.DeserializeObject<ShardComMessage>(x));
var statuses = all
.Where(x => x.Time < DateTime.UtcNow - TimeSpan.FromSeconds(30));
if (!statuses.Any())
{
for (var i = 0; i < _shardProcesses.Length; i++)
{
var p = _shardProcesses[i];
if (p == null || p.HasExited)
_shardStartQueue.Enqueue(i);
}
}
else
{
foreach (var s in statuses)
{
OnStop(s.ShardId);
_shardStartQueue.Enqueue(s.ShardId);
//to prevent shards which are already scheduled for restart to be scheduled again
s.Time = DateTime.UtcNow + TimeSpan.FromSeconds(30 * _shardStartQueue.Count);
db.ListSetByIndex(_key + "_shardstats", s.ShardId,
JsonConvert.SerializeObject(s), CommandFlags.FireAndForget);
_log.Warn("Shard {0} is scheduled for a restart because it's unresponsive.", s.ShardId);
}
}
}
catch (Exception ex) { _log.Error(ex); throw; }
finally
{
await Task.Delay(10000).ConfigureAwait(false);
}
}
});
await tsc.Task.ConfigureAwait(false);
return;
} }
private Process StartShard(int shardId) private Process StartShard(int shardId)

View File

@ -58,6 +58,9 @@ namespace NadekoBot.Extensions
return new PathCollection(cornerToptLeft, cornerBottomLeft, cornerTopRight, cornerBottomRight); return new PathCollection(cornerToptLeft, cornerBottomLeft, cornerTopRight, cornerBottomRight);
} }
/// <summary>
/// First 10 characters of teh bot token.
/// </summary>
public static string RedisKey(this IBotCredentials bc) public static string RedisKey(this IBotCredentials bc)
{ {
return bc.Token.Substring(0, 10); return bc.Token.Substring(0, 10);
@ -147,9 +150,6 @@ namespace NadekoBot.Extensions
return module; return module;
} }
//public static async Task<IEnumerable<IGuildUser>> MentionedUsers(this IUserMessage msg) =>
public static void AddRange<T>(this HashSet<T> target, IEnumerable<T> elements) where T : class public static void AddRange<T>(this HashSet<T> target, IEnumerable<T> elements) where T : class
{ {
foreach (var item in elements) foreach (var item in elements)

View File

@ -1,7 +1,7 @@
 
Microsoft Visual Studio Solution File, Format Version 12.00 Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15 # Visual Studio 15
VisualStudioVersion = 15.0.26730.16 VisualStudioVersion = 15.0.27004.2005
MinimumVisualStudioVersion = 10.0.40219.1 MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{04929013-5BAB-42B0-B9B2-8F2BB8F16AF2}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{04929013-5BAB-42B0-B9B2-8F2BB8F16AF2}"
EndProject EndProject
@ -30,8 +30,8 @@ Global
{45EC1473-C678-4857-A544-07DFE0D0B478}.Release|Any CPU.Build.0 = Release|Any CPU {45EC1473-C678-4857-A544-07DFE0D0B478}.Release|Any CPU.Build.0 = Release|Any CPU
{A6CCEFBD-DCF2-482C-9643-47664683548F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A6CCEFBD-DCF2-482C-9643-47664683548F}.Debug|Any CPU.Build.0 = Debug|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.ActiveCfg = Debug|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.ActiveCfg = Release|Any CPU
{A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.Build.0 = Debug|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.GlobalNadeko|Any CPU.Build.0 = Release|Any CPU
{A6CCEFBD-DCF2-482C-9643-47664683548F}.Release|Any CPU.ActiveCfg = Release|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A6CCEFBD-DCF2-482C-9643-47664683548F}.Release|Any CPU.Build.0 = Release|Any CPU {A6CCEFBD-DCF2-482C-9643-47664683548F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection