Shard coordinator is not indepentent from the shard 0

This commit is contained in:
Master Kwoth 2017-10-10 18:24:36 +02:00
parent 0bacb1f780
commit db6fa9af1a
14 changed files with 225 additions and 210 deletions

View File

@ -1,19 +0,0 @@
using System;
using System.Threading.Tasks;
using Discord.Commands;
using Discord.WebSocket;
namespace NadekoBot.Common
{
public class Shard0Precondition : PreconditionAttribute
{
public override Task<PreconditionResult> CheckPermissions(ICommandContext context, CommandInfo command, IServiceProvider services)
{
var c = (DiscordSocketClient)context.Client;
if (c.ShardId != 0)
return Task.FromResult(PreconditionResult.FromError("Must be ran from shard #0"));
return Task.FromResult(PreconditionResult.FromSuccess());
}
}
}

View File

@ -1,28 +0,0 @@
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using Newtonsoft.Json;
using NadekoBot.Services;
using StackExchange.Redis;
namespace NadekoBot.Common.ShardCom
{
public class ShardComClient
{
private readonly IDataCache _cache;
public ShardComClient(IDataCache cache)
{
_cache = cache;
}
public async Task Send(ShardComMessage data)
{
var sub = _cache.Redis.GetSubscriber();
var msg = JsonConvert.SerializeObject(data);
await sub.PublishAsync("shardcoord_send", msg).ConfigureAwait(false);
}
}
}

View File

@ -56,9 +56,29 @@ namespace NadekoBot.Modules.Administration
return;
name = name.ToTitleCase();
if (await _bot.LoadPackage(name))
await ReplyAsync(":ok:");
else
await ReplyAsync(":x:");
}
[NadekoCommand, Usage, Description, Aliases]
[RequireContext(ContextType.Guild)]
[OwnerOnly]
public async Task PackageReload(string name)
{
if (name.Contains(".") || name.Contains("\\") || name.Contains("/") || name.Contains("~"))
return;
name = name.ToTitleCase();
if (await _bot.UnloadPackage(name))
{
await _bot.LoadPackage(name);
await ReplyAsync(":ok:");
}
else
await ReplyAsync(":x:");
}
}
}
}

View File

@ -14,6 +14,8 @@ using Microsoft.EntityFrameworkCore;
using System.Diagnostics;
using NadekoBot.Common.Attributes;
using NadekoBot.Modules.Administration.Services;
using Newtonsoft.Json;
using NadekoBot.Common.ShardCom;
namespace NadekoBot.Modules.Administration
{
@ -30,10 +32,11 @@ namespace NadekoBot.Modules.Administration
private readonly IBotConfigProvider _bc;
private readonly NadekoBot _bot;
private readonly IBotCredentials _creds;
private readonly IDataCache _cache;
public SelfCommands(DbService db, NadekoBot bot, DiscordSocketClient client,
IImagesService images, IBotConfigProvider bc,
IBotCredentials creds)
IBotCredentials creds, IDataCache cache)
{
_db = db;
_client = client;
@ -41,6 +44,7 @@ namespace NadekoBot.Modules.Administration
_bc = bc;
_bot = bot;
_creds = creds;
_cache = cache;
}
[NadekoCommand, Usage, Description, Aliases]
@ -217,28 +221,62 @@ namespace NadekoBot.Modules.Administration
}
//todo 2 shard commands
//[NadekoCommand, Usage, Description, Aliases]
//[Shard0Precondition]
//[OwnerOnly]
//public async Task RestartShard(int shardid)
//{
// if (shardid == 0 || shardid > b)
// {
// await ReplyErrorLocalized("no_shard_id").ConfigureAwait(false);
// return;
// }
// try
// {
// await ReplyConfirmLocalized("shard_reconnecting", Format.Bold("#" + shardid)).ConfigureAwait(false);
// await shard.StartAsync().ConfigureAwait(false);
// await ReplyConfirmLocalized("shard_reconnected", Format.Bold("#" + shardid)).ConfigureAwait(false);
// }
// catch (Exception ex)
// {
// _log.Warn(ex);
// }
//}
[NadekoCommand, Usage, Description, Aliases]
public async Task ShardStats(int page = 1)
{
if (--page < 0)
return;
var db = _cache.Redis.GetDatabase();
var statuses = db.ListRange(_creds.RedisKey() + "_shardstats")
.Select(x => JsonConvert.DeserializeObject<ShardComMessage>(x));
var status = string.Join(", ", statuses
.GroupBy(x => x.ConnectionState)
.Select(x => $"{x.Count()} {x.Key}")
.ToArray());
var allShardStrings = statuses
.Select(x =>
{
var timeDiff = DateTime.UtcNow - x.Time;
if (timeDiff > TimeSpan.FromSeconds(20))
return $"Shard #{Format.Bold(x.ShardId.ToString())} **UNRESPONSIVE** for {timeDiff.ToString(@"hh\:mm\:ss")}";
return GetText("shard_stats_txt", x.ShardId.ToString(),
Format.Bold(x.ConnectionState.ToString()), Format.Bold(x.Guilds.ToString()), timeDiff.ToString(@"hh\:mm\:ss"));
})
.ToArray();
await Context.Channel.SendPaginatedConfirmAsync(_client, page, (curPage) =>
{
var str = string.Join("\n", allShardStrings.Skip(25 * curPage).Take(25));
if (string.IsNullOrWhiteSpace(str))
str = GetText("no_shards_on_page");
return new EmbedBuilder()
.WithAuthor(a => a.WithName(GetText("shard_stats")))
.WithTitle(status)
.WithOkColor()
.WithDescription(str);
}, allShardStrings.Length / 25);
}
[NadekoCommand, Usage, Description, Aliases]
[OwnerOnly]
public async Task RestartShard(int shardid)
{
if (shardid < 0 || shardid >= _creds.TotalShards)
{
await ReplyErrorLocalized("no_shard_id").ConfigureAwait(false);
return;
}
var pub = _cache.Redis.GetSubscriber();
pub.Publish(_creds.RedisKey() + "_shard_restart",
JsonConvert.SerializeObject(_client.ShardId),
StackExchange.Redis.CommandFlags.FireAndForget);
await ReplyConfirmLocalized("shard_reconnecting", Format.Bold("#" + shardid)).ConfigureAwait(false);
}
[NadekoCommand, Usage, Description, Aliases]
[OwnerOnly]

View File

@ -13,7 +13,6 @@ namespace NadekoBot.Services
double MessagesPerSecond { get; }
long TextChannels { get; }
long VoiceChannels { get; }
int GuildCount { get; }
TimeSpan GetUptime();
string GetUptimeString(string separator = ", ");

View File

@ -15,11 +15,8 @@ namespace NadekoBot.Services.Impl
private Logger _log;
public ulong ClientId { get; }
public string GoogleApiKey { get; }
public string MashapeKey { get; }
public string Token { get; }
public ImmutableArray<ulong> OwnerIds { get; }

View File

@ -1,6 +1,8 @@
using Discord;
using Discord.WebSocket;
using NadekoBot.Common.ShardCom;
using NadekoBot.Extensions;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Globalization;
@ -11,6 +13,7 @@ using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using StackExchange.Redis;
namespace NadekoBot.Services.Impl
{
@ -40,15 +43,16 @@ namespace NadekoBot.Services.Impl
private readonly Timer _carbonitexTimer;
private readonly Timer _dataTimer;
private readonly ShardsCoordinator _sc;
private readonly ConnectionMultiplexer _redis;
public int GuildCount =>
_sc?.GuildCount ?? _client.Guilds.Count();
public StatsService(DiscordSocketClient client, CommandHandler cmdHandler, IBotCredentials creds, NadekoBot nadeko)
public StatsService(DiscordSocketClient client, CommandHandler cmdHandler,
IBotCredentials creds, NadekoBot nadeko,
IDataCache cache)
{
_client = client;
_creds = creds;
_sc = nadeko.ShardCoord;
_redis = cache.Redis;
_started = DateTime.UtcNow;
_client.MessageReceived += _ => Task.FromResult(Interlocked.Increment(ref _messageCounter));
@ -142,7 +146,7 @@ namespace NadekoBot.Services.Impl
{
using (var content = new FormUrlEncodedContent(
new Dictionary<string, string> {
{ "servercount", _sc.GuildCount.ToString() },
{ "servercount", nadeko.GuildCount.ToString() },
{ "key", _creds.CarbonKey }}))
{
content.Headers.Clear();
@ -175,7 +179,7 @@ namespace NadekoBot.Services.Impl
using (var content = new FormUrlEncodedContent(
new Dictionary<string, string> {
{ "id", string.Concat(MD5.Create().ComputeHash(Encoding.ASCII.GetBytes(_creds.ClientId.ToString())).Select(x => x.ToString("X2"))) },
{ "guildCount", _sc.GuildCount.ToString() },
{ "guildCount", nadeko.GuildCount.ToString() },
{ "version", BotVersion },
{ "platform", platform }}))
{

View File

@ -29,7 +29,6 @@ namespace NadekoBot
private Logger _log;
public BotCredentials Credentials { get; }
public DiscordSocketClient Client { get; }
public CommandService CommandService { get; }
@ -51,11 +50,15 @@ namespace NadekoBot
public ShardsCoordinator ShardCoord { get; private set; }
private readonly ShardComClient _comClient;
private readonly BotConfig _botConfig;
public IDataCache Cache { get; private set; }
public int GuildCount =>
Cache.Redis.GetDatabase()
.ListRange(Credentials.RedisKey() + "_shardstats")
.Select(x => JsonConvert.DeserializeObject<ShardComMessage>(x))
.Sum(x => x.Guilds);
public NadekoBot(int shardId, int parentProcessId)
{
if (shardId < 0)
@ -83,8 +86,6 @@ namespace NadekoBot
DefaultRunMode = RunMode.Sync,
});
_comClient = new ShardComClient(Cache);
using (var uow = _db.UnitOfWork)
{
_botConfig = uow.BotConfig.GetOrCreate();
@ -105,13 +106,18 @@ namespace NadekoBot
{
while (true)
{
await _comClient.Send(new ShardComMessage()
var data = new ShardComMessage()
{
ConnectionState = Client.ConnectionState,
Guilds = Client.ConnectionState == ConnectionState.Connected ? Client.Guilds.Count : 0,
ShardId = Client.ShardId,
Time = DateTime.UtcNow,
});
};
var sub = Cache.Redis.GetSubscriber();
var msg = JsonConvert.SerializeObject(data);
await sub.PublishAsync(Credentials.RedisKey() + "_shardcoord_send", msg).ConfigureAwait(false);
await Task.Delay(5000);
}
});
@ -288,6 +294,7 @@ namespace NadekoBot
Ready.TrySetResult(true);
HandleStatusChanges();
StartSendingData();
_log.Info($"Shard {Client.ShardId} ready.");
}
@ -303,14 +310,8 @@ namespace NadekoBot
public async Task RunAndBlockAsync(params string[] args)
{
await RunAsync(args).ConfigureAwait(false);
StartSendingData();
if (ShardCoord != null)
await ShardCoord.RunAndBlockAsync();
else
{
await Task.Delay(-1).ConfigureAwait(false);
}
}
private void TerribleElevatedPermissionCheck()
{
@ -329,11 +330,6 @@ namespace NadekoBot
private void SetupShard(int parentProcessId)
{
if (Client.ShardId == 0)
{
ShardCoord = new ShardsCoordinator(Cache);
return;
}
new Thread(new ThreadStart(() =>
{
try

View File

@ -2,73 +2,128 @@
using NLog;
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using NadekoBot.Common.ShardCom;
using StackExchange.Redis;
using Newtonsoft.Json;
using NadekoBot.Extensions;
namespace NadekoBot.Services
{
public class ShardsCoordinator
{
private readonly BotCredentials _creds;
private readonly string _key;
private readonly Process[] _shardProcesses;
public ShardComMessage[] Statuses { get; }
public int GuildCount => Statuses.ToArray()
.Where(x => x != null)
.Sum(x => x.Guilds);
private readonly Logger _log;
private readonly ShardComServer _comServer;
private readonly int _curProcessId;
private readonly ConnectionMultiplexer _redis;
private ShardComMessage _defaultShardState;
public ShardsCoordinator(IDataCache cache)
public ShardsCoordinator()
{
//load main stuff
LogSetup.SetupLogger();
_creds = new BotCredentials();
_shardProcesses = new Process[_creds.TotalShards];
Statuses = new ShardComMessage[_creds.TotalShards];
for (int i = 0; i < Statuses.Length; i++)
{
Statuses[i] = new ShardComMessage();
var s = Statuses[i];
s.ConnectionState = Discord.ConnectionState.Disconnected;
s.Guilds = 0;
s.ShardId = i;
s.Time = DateTime.Now - TimeSpan.FromMinutes(1);
}
_log = LogManager.GetCurrentClassLogger();
_creds = new BotCredentials();
_key = _creds.RedisKey();
_comServer = new ShardComServer(cache);
_comServer.Start();
_comServer.OnDataReceived += _comServer_OnDataReceived;
//setup initial shard statuses
_defaultShardState = new ShardComMessage()
{
ConnectionState = Discord.ConnectionState.Disconnected,
Guilds = 0,
Time = DateTime.Now - TimeSpan.FromMinutes(1)
};
var db = _redis.GetDatabase();
_shardProcesses = new Process[_creds.TotalShards];
for (int i = 0; i < _creds.TotalShards; i++)
{
_defaultShardState.ShardId = i;
db.ListSetByIndex(_key + "shardstats",
i,
JsonConvert.SerializeObject(_defaultShardState),
CommandFlags.FireAndForget);
}
_curProcessId = Process.GetCurrentProcess().Id;
_redis = ConnectionMultiplexer.Connect("127.0.0.1");
var sub = _redis.GetSubscriber();
sub.Subscribe(_key + "_shardcoord_send",
OnDataReceived,
CommandFlags.FireAndForget);
sub.Subscribe(_key + "_shardcoord_restart",
OnRestart,
CommandFlags.FireAndForget);
sub.Subscribe(_key + "_shardcoord_stop",
OnStop,
CommandFlags.FireAndForget);
}
private Task _comServer_OnDataReceived(ShardComMessage msg)
private void OnStop(RedisChannel ch, RedisValue data)
{
Statuses[msg.ShardId] = msg;
if (msg.ConnectionState == Discord.ConnectionState.Disconnected || msg.ConnectionState == Discord.ConnectionState.Disconnecting)
_log.Error("!!! SHARD {0} IS IN {1} STATE", msg.ShardId, msg.ConnectionState.ToString());
return Task.CompletedTask;
var shardId = JsonConvert.DeserializeObject<int>(data);
var db = _redis.GetDatabase();
_defaultShardState.ShardId = shardId;
db.ListSetByIndex(_key + "shardstats",
shardId,
JsonConvert.SerializeObject(_defaultShardState),
CommandFlags.FireAndForget);
var p = _shardProcesses[shardId];
_shardProcesses[shardId] = null;
try { p?.Kill(); } catch { }
try { p?.Dispose(); } catch { }
}
private void OnRestart(RedisChannel ch, RedisValue data)
{
OnStop(ch, data);
var shardId = JsonConvert.DeserializeObject<int>(data);
_shardProcesses[shardId] = StartShard(shardId);
}
private void OnDataReceived(RedisChannel ch, RedisValue data)
{
var msg = JsonConvert.DeserializeObject<ShardComMessage>(data);
if (msg == null)
return;
var db = _redis.GetDatabase();
db.ListSetByIndex(_key + "shardstats",
msg.ShardId,
data,
CommandFlags.FireAndForget);
if (msg.ConnectionState == Discord.ConnectionState.Disconnected
|| msg.ConnectionState == Discord.ConnectionState.Disconnecting)
{
_log.Error("!!! SHARD {0} IS IN {1} STATE !!!", msg.ShardId, msg.ConnectionState.ToString());
}
return;
}
public async Task RunAsync()
{
for (int i = 1; i < _creds.TotalShards; i++)
for (int i = 0; i < _creds.TotalShards; i++)
{
var p = Process.Start(new ProcessStartInfo()
var p = StartShard(i);
_shardProcesses[i] = p;
await Task.Delay(6000);
}
}
private Process StartShard(int shardId)
{
return Process.Start(new ProcessStartInfo()
{
FileName = _creds.ShardRunCommand,
Arguments = string.Format(_creds.ShardRunArguments, i, _curProcessId, "")
Arguments = string.Format(_creds.ShardRunArguments, shardId, _curProcessId, "")
});
// last "" in format is for backwards compatibility
// because current startup commands have {2} in them probably
await Task.Delay(5000);
}
}
public async Task RunAndBlockAsync()
@ -80,14 +135,15 @@ namespace NadekoBot.Services
catch (Exception ex)
{
_log.Error(ex);
}
await Task.Delay(-1);
foreach (var p in _shardProcesses)
{
try { p.Kill(); } catch { }
try { p.Dispose(); } catch { }
}
return;
}
await Task.Delay(-1);
}
}
}

View File

@ -16,24 +16,15 @@ using System.Threading.Tasks;
using NadekoBot.Common.Collections;
using SixLabors.Primitives;
using NadekoBot.Common;
using NadekoBot.Services;
namespace NadekoBot.Extensions
{
public static class Extensions
{
//so ftw
public static bool IsSubclassOfRawGeneric(this Type toCheck, Type generic)
public static string RedisKey(this IBotCredentials bc)
{
while (toCheck != null && toCheck != typeof(object))
{
var cur = toCheck.IsGenericType ? toCheck.GetGenericTypeDefinition() : toCheck;
if (generic == cur)
{
return true;
}
toCheck = toCheck.BaseType;
}
return false;
return bc.Token.Substring(0, 10);
}
public static async Task<string> ReplaceAsync(this Regex regex, string input, Func<Match, Task<string>> replacementFn)
@ -223,7 +214,7 @@ namespace NadekoBot.Extensions
var xOffset = 0;
for (int i = 0; i < imgs.Length; i++)
{
canvas.DrawImage(imgs[i], 100, default(Size), new Point(xOffset, 0));
canvas.DrawImage(imgs[i], 100, default, new Point(xOffset, 0));
xOffset += imgs[i].Bounds.Width;
}

View File

@ -263,7 +263,7 @@ namespace NadekoBot.Modules.Utility.Services
throw new StreamRoleNotFoundException();
await user.RemoveRoleAsync(addRole).ConfigureAwait(false);
_log.Info("Removed stream role from a user {0} in {1} server", user.ToString(), user.Guild.ToString());
_log.Info("Removed stream role from the user {0} in {1} server", user.ToString(), user.Guild.ToString());
}
catch (HttpException ex) when (ex.HttpCode == System.Net.HttpStatusCode.Forbidden)
{
@ -272,7 +272,6 @@ namespace NadekoBot.Modules.Utility.Services
_log.Error(ex);
throw new StreamRolePermissionException();
}
_log.Info("Removed stream role from the user {0} in {1} server", user.ToString(), user.Guild.ToString());
}
}
}

View File

@ -5,11 +5,8 @@ using System.Linq;
using System.Threading.Tasks;
using System.Text;
using NadekoBot.Extensions;
using System.Reflection;
using NadekoBot.Services.Impl;
using System.Net.Http;
using System.Collections.Concurrent;
using System.Threading;
using ImageSharp;
using System.Collections.Generic;
using Newtonsoft.Json;
@ -17,7 +14,6 @@ using Discord.WebSocket;
using System.Diagnostics;
using NadekoBot.Common;
using NadekoBot.Common.Attributes;
using Color = Discord.Color;
using NadekoBot.Services;
namespace NadekoBot.Modules.Utility
@ -27,14 +23,14 @@ namespace NadekoBot.Modules.Utility
private readonly DiscordSocketClient _client;
private readonly IStatsService _stats;
private readonly IBotCredentials _creds;
private readonly ShardsCoordinator _shardCoord;
private readonly NadekoBot _bot;
public Utility(NadekoBot nadeko, DiscordSocketClient client, IStatsService stats, IBotCredentials creds)
{
_client = client;
_stats = stats;
_creds = creds;
_shardCoord = nadeko.ShardCoord;
_bot = nadeko;
}
[NadekoCommand, Usage, Description, Aliases]
@ -214,47 +210,6 @@ namespace NadekoBot.Modules.Utility
await Context.Channel.SendConfirmAsync($"{Context.User.Mention} https://discord.gg/{invite.Code}");
}
[NadekoCommand, Usage, Description, Aliases]
[Shard0Precondition]
public async Task ShardStats(int page = 1)
{
if (--page < 0)
return;
var statuses = _shardCoord.Statuses.ToArray()
.Where(x => x != null);
var status = string.Join(", ", statuses
.GroupBy(x => x.ConnectionState)
.Select(x => $"{x.Count()} {x.Key}")
.ToArray());
var allShardStrings = statuses
.Select(x =>
{
var timeDiff = DateTime.UtcNow - x.Time;
if (timeDiff > TimeSpan.FromSeconds(20))
return $"Shard #{Format.Bold(x.ShardId.ToString())} **UNRESPONSIVE** for {timeDiff.ToString(@"hh\:mm\:ss")}";
return GetText("shard_stats_txt", x.ShardId.ToString(),
Format.Bold(x.ConnectionState.ToString()), Format.Bold(x.Guilds.ToString()), timeDiff.ToString(@"hh\:mm\:ss"));
})
.ToArray();
await Context.Channel.SendPaginatedConfirmAsync(_client, page, (curPage) =>
{
var str = string.Join("\n", allShardStrings.Skip(25 * curPage).Take(25));
if (string.IsNullOrWhiteSpace(str))
str = GetText("no_shards_on_page");
return new EmbedBuilder()
.WithAuthor(a => a.WithName(GetText("shard_stats")))
.WithTitle(status)
.WithOkColor()
.WithDescription(str);
}, allShardStrings.Length / 25);
}
[NadekoCommand, Usage, Description, Aliases]
public async Task Stats()
{
@ -273,7 +228,7 @@ namespace NadekoBot.Modules.Utility
.AddField(efb => efb.WithName(GetText("uptime")).WithValue(_stats.GetUptimeString("\n")).WithIsInline(true))
.AddField(efb => efb.WithName(GetText("presence")).WithValue(
GetText("presence_txt",
_stats.GuildCount, _stats.TextChannels, _stats.VoiceChannels)).WithIsInline(true)));
_bot.GuildCount, _stats.TextChannels, _stats.VoiceChannels)).WithIsInline(true)));
}
[NadekoCommand, Usage, Description, Aliases]

View File

@ -1,4 +1,5 @@
using System.Threading.Tasks;
using NadekoBot.Services;
using System.Threading.Tasks;
namespace NadekoBot
{
@ -6,12 +7,18 @@ namespace NadekoBot
{
public static Task Main(string[] args)
{
if (args.Length == 2 && int.TryParse(args[0], out int shardId) && int.TryParse(args[1], out int parentProcessId))
if (args.Length == 2
&& int.TryParse(args[0], out int shardId)
&& int.TryParse(args[1], out int parentProcessId))
{
return new NadekoBot(shardId, parentProcessId).RunAndBlockAsync(args);
return new NadekoBot(shardId, parentProcessId)
.RunAndBlockAsync(args);
}
else
return new NadekoBot(0, 0).RunAndBlockAsync(args);
{
return new ShardsCoordinator()
.RunAndBlockAsync();
}
}
}
}