Sharding over processes almost done

This commit is contained in:
Master Kwoth 2017-06-20 04:23:11 +02:00
parent 4684117654
commit 01cf59d83e
18 changed files with 85 additions and 269 deletions

View File

@ -1,64 +0,0 @@
using Discord;
using Discord.Commands;
using NadekoBot.Attributes;
using NadekoBot.Extensions;
using NadekoBot.Services;
using NadekoBot.Services.Utility;
using System.Collections.Concurrent;
using System.Threading.Tasks;
namespace NadekoBot.Modules.Utility
{
public partial class Utility
{
[Group]
public class CrossServerTextChannel : NadekoSubmodule
{
private readonly CrossServerTextService _service;
public CrossServerTextChannel(CrossServerTextService service)
{
_service = service;
}
[NadekoCommand, Usage, Description, Aliases]
[RequireContext(ContextType.Guild)]
[OwnerOnly]
public async Task Scsc()
{
var token = new NadekoRandom().Next();
var set = new ConcurrentHashSet<ITextChannel>();
if (_service.Subscribers.TryAdd(token, set))
{
set.Add((ITextChannel) Context.Channel);
await ((IGuildUser) Context.User).SendConfirmAsync(GetText("csc_token"), token.ToString())
.ConfigureAwait(false);
}
}
[NadekoCommand, Usage, Description, Aliases]
[RequireContext(ContextType.Guild)]
[RequireUserPermission(GuildPermission.ManageGuild)]
public async Task Jcsc(int token)
{
ConcurrentHashSet<ITextChannel> set;
if (!_service.Subscribers.TryGetValue(token, out set))
return;
set.Add((ITextChannel) Context.Channel);
await ReplyConfirmLocalized("csc_join").ConfigureAwait(false);
}
[NadekoCommand, Usage, Description, Aliases]
[RequireContext(ContextType.Guild)]
[RequireUserPermission(GuildPermission.ManageGuild)]
public async Task Lcsc()
{
foreach (var subscriber in _service.Subscribers)
{
subscriber.Value.TryRemove((ITextChannel) Context.Channel);
}
await ReplyConfirmLocalized("csc_leave").ConfigureAwait(false);
}
}
}
}

View File

@ -37,83 +37,6 @@ namespace NadekoBot.Modules.Utility
_bot = bot; _bot = bot;
} }
//[NadekoCommand, Usage, Description, Aliases]
//[RequireContext(ContextType.Guild)]
//public async Task Midorina([Remainder] string arg)
//{
// var channel = (ITextChannel)Context.Channel;
// var roleNames = arg?.Split(';');
// if (roleNames == null || roleNames.Length == 0)
// return;
// var j = 0;
// var roles = roleNames.Select(x => Context.Guild.Roles.FirstOrDefault(r => String.Compare(r.Name, x, StringComparison.OrdinalIgnoreCase) == 0))
// .Where(x => x != null)
// .Take(10)
// .ToArray();
// var rnd = new NadekoRandom();
// var reactions = new[] { "🎬", "🐧", "🌍", "🌺", "🚀", "☀", "🌲", "🍒", "🐾", "🏀" }
// .OrderBy(x => rnd.Next())
// .ToArray();
// var roleStrings = roles
// .Select(x => $"{reactions[j++]} -> {x.Name}");
// var msg = await Context.Channel.SendConfirmAsync("Pick a Role",
// string.Join("\n", roleStrings)).ConfigureAwait(false);
// for (int i = 0; i < roles.Length; i++)
// {
// try { await msg.AddReactionAsync(reactions[i]).ConfigureAwait(false); }
// catch (Exception ex) { _log.Warn(ex); }
// await Task.Delay(1000).ConfigureAwait(false);
// }
// msg.OnReaction((r) => Task.Run(async () =>
// {
// try
// {
// var usr = r.User.GetValueOrDefault() as IGuildUser;
// if (usr == null)
// return;
// var index = Array.IndexOf<string>(reactions, r.Emoji.Name);
// if (index == -1)
// return;
// await usr.RemoveRolesAsync(roles[index]);
// }
// catch (Exception ex)
// {
// _log.Warn(ex);
// }
// }), (r) => Task.Run(async () =>
// {
// try
// {
// var usr = r.User.GetValueOrDefault() as IGuildUser;
// if (usr == null)
// return;
// var index = Array.IndexOf<string>(reactions, r.Emoji.Name);
// if (index == -1)
// return;
// await usr.RemoveRolesAsync(roles[index]);
// }
// catch (Exception ex)
// {
// _log.Warn(ex);
// }
// }));
//}
[NadekoCommand, Usage, Description, Aliases] [NadekoCommand, Usage, Description, Aliases]
[RequireContext(ContextType.Guild)] [RequireContext(ContextType.Guild)]
[RequireUserPermission(GuildPermission.ManageRoles)] [RequireUserPermission(GuildPermission.ManageRoles)]

View File

@ -65,7 +65,8 @@ namespace NadekoBot
private const string _mutexName = @"Global\nadeko_shards_lock"; private const string _mutexName = @"Global\nadeko_shards_lock";
private readonly Semaphore sem = new Semaphore(1, 1, _mutexName); private readonly Semaphore sem = new Semaphore(1, 1, _mutexName);
public int ShardId { get; } public int ShardId { get; }
private readonly Thread waitForParentKill; public ShardsCoordinator ShardCoord { get; private set; }
private readonly ShardComClient _comClient = new ShardComClient(); private readonly ShardComClient _comClient = new ShardComClient();
public NadekoBot(int shardId, int parentProcessId) public NadekoBot(int shardId, int parentProcessId)
@ -79,22 +80,6 @@ namespace NadekoBot
_log = LogManager.GetCurrentClassLogger(); _log = LogManager.GetCurrentClassLogger();
TerribleElevatedPermissionCheck(); TerribleElevatedPermissionCheck();
waitForParentKill = new Thread(new ThreadStart(() =>
{
try
{
var p = Process.GetProcessById(parentProcessId);
if (p == null)
return;
p.WaitForExit();
}
finally
{
Environment.Exit(10);
}
}));
waitForParentKill.Start();
Credentials = new BotCredentials(); Credentials = new BotCredentials();
Db = new DbService(Credentials); Db = new DbService(Credentials);
@ -121,12 +106,11 @@ namespace NadekoBot
DefaultRunMode = RunMode.Async, DefaultRunMode = RunMode.Async,
}); });
//foundation services
Images = new ImagesService(); Images = new ImagesService();
Currency = new CurrencyService(BotConfig, Db); Currency = new CurrencyService(BotConfig, Db);
GoogleApi = new GoogleApiService(Credentials); GoogleApi = new GoogleApiService(Credentials);
StartSendingData(); SetupShard(shardId, parentProcessId);
#if GLOBAL_NADEKO #if GLOBAL_NADEKO
Client.Log += Client_Log; Client.Log += Client_Log;
@ -152,9 +136,10 @@ namespace NadekoBot
private void AddServices() private void AddServices()
{ {
var startingGuildIdList = Client.Guilds.Select(x => (long)x.Id).ToList();
using (var uow = Db.UnitOfWork) using (var uow = Db.UnitOfWork)
{ {
AllGuildConfigs = uow.GuildConfigs.GetAllGuildConfigs(Client.Guilds.Select(x => (long)x.Id).ToList()).ToImmutableArray(); AllGuildConfigs = uow.GuildConfigs.GetAllGuildConfigs(startingGuildIdList).ToImmutableArray();
} }
Localization = new Localization(BotConfig.Locale, AllGuildConfigs.ToDictionary(x => x.GuildId, x => x.Locale), Db); Localization = new Localization(BotConfig.Locale, AllGuildConfigs.ToDictionary(x => x.GuildId, x => x.Locale), Db);
Strings = new NadekoStrings(Localization); Strings = new NadekoStrings(Localization);
@ -170,8 +155,7 @@ namespace NadekoBot
//module services //module services
//todo 90 - autodiscover, DI, and add instead of manual like this //todo 90 - autodiscover, DI, and add instead of manual like this
#region utility #region utility
var crossServerTextService = new CrossServerTextService(AllGuildConfigs, Client); var remindService = new RemindService(Client, BotConfig, Db, startingGuildIdList);
var remindService = new RemindService(Client, BotConfig, Db);
var repeaterService = new MessageRepeaterService(this, Client, AllGuildConfigs); var repeaterService = new MessageRepeaterService(this, Client, AllGuildConfigs);
var converterService = new ConverterService(Db); var converterService = new ConverterService(Db);
var commandMapService = new CommandMapService(AllGuildConfigs); var commandMapService = new CommandMapService(AllGuildConfigs);
@ -181,7 +165,7 @@ namespace NadekoBot
#endregion #endregion
#region permissions #region permissions
var permissionsService = new PermissionService(Db, BotConfig, CommandHandler); var permissionsService = new PermissionService(Client, Db, BotConfig, CommandHandler);
var blacklistService = new BlacklistService(BotConfig); var blacklistService = new BlacklistService(BotConfig);
var cmdcdsService = new CmdCdService(AllGuildConfigs); var cmdcdsService = new CmdCdService(AllGuildConfigs);
var filterService = new FilterService(Client, AllGuildConfigs); var filterService = new FilterService(Client, AllGuildConfigs);
@ -241,7 +225,6 @@ namespace NadekoBot
.Add<CommandHandler>(CommandHandler) .Add<CommandHandler>(CommandHandler)
.Add<DbService>(Db) .Add<DbService>(Db)
//modules //modules
.Add(crossServerTextService)
.Add(commandMapService) .Add(commandMapService)
.Add(remindService) .Add(remindService)
.Add(repeaterService) .Add(repeaterService)
@ -375,7 +358,10 @@ namespace NadekoBot
public async Task RunAndBlockAsync(params string[] args) public async Task RunAndBlockAsync(params string[] args)
{ {
await RunAsync(args).ConfigureAwait(false); await RunAsync(args).ConfigureAwait(false);
await Task.Delay(-1).ConfigureAwait(false); if (ShardCoord != null)
await ShardCoord.RunAndBlockAsync();
else
await Task.Delay(-1).ConfigureAwait(false);
} }
private void TerribleElevatedPermissionCheck() private void TerribleElevatedPermissionCheck()
@ -392,5 +378,30 @@ namespace NadekoBot
Environment.Exit(2); Environment.Exit(2);
} }
} }
private void SetupShard(int shardId, int parentProcessId)
{
if (shardId != 0)
{
new Thread(new ThreadStart(() =>
{
try
{
var p = Process.GetProcessById(parentProcessId);
if (p == null)
return;
p.WaitForExit();
}
finally
{
Environment.Exit(10);
}
})).Start();
}
else
{
ShardCoord = new ShardsCoordinator();
}
}
} }
} }

View File

@ -4,12 +4,10 @@
{ {
public static void Main(string[] args) public static void Main(string[] args)
{ {
if (args.Length == 0) if (args.Length == 2 && int.TryParse(args[0], out int shardId) && int.TryParse(args[1], out int parentProcessId))
return;
if (args[0].ToLowerInvariant() == "main")
new ShardsCoordinator().RunAndBlockAsync(args).GetAwaiter().GetResult();
else if (int.TryParse(args[0], out int shardId) && int.TryParse(args[1], out int parentProcessId))
new NadekoBot(shardId, parentProcessId).RunAndBlockAsync(args).GetAwaiter().GetResult(); new NadekoBot(shardId, parentProcessId).RunAndBlockAsync(args).GetAwaiter().GetResult();
else
new NadekoBot(0, 0).RunAndBlockAsync(args).GetAwaiter().GetResult();
} }
} }
} }

View File

@ -1,8 +1,7 @@
{ {
"profiles": { "profiles": {
"NadekoBot": { "NadekoBot": {
"commandName": "Project", "commandName": "Project"
"commandLineArgs": "main"
} }
} }
} }

View File

@ -12,9 +12,9 @@ namespace NadekoBot.Services.Database.Repositories
GuildConfig LogSettingsFor(ulong guildId); GuildConfig LogSettingsFor(ulong guildId);
IEnumerable<GuildConfig> OldPermissionsForAll(); IEnumerable<GuildConfig> OldPermissionsForAll();
IEnumerable<GuildConfig> GetAllGuildConfigs(List<long> availableGuilds); IEnumerable<GuildConfig> GetAllGuildConfigs(List<long> availableGuilds);
IEnumerable<FollowedStream> GetAllFollowedStreams(); IEnumerable<FollowedStream> GetAllFollowedStreams(List<long> included);
void SetCleverbotEnabled(ulong id, bool cleverbotEnabled); void SetCleverbotEnabled(ulong id, bool cleverbotEnabled);
IEnumerable<GuildConfig> Permissionsv2ForAll(); IEnumerable<GuildConfig> Permissionsv2ForAll(List<long> include);
GuildConfig GcWithPermissionsv2For(ulong guildId); GuildConfig GcWithPermissionsv2For(ulong guildId);
} }
} }

View File

@ -1,9 +1,11 @@
using NadekoBot.Services.Database.Models; using NadekoBot.Services.Database.Models;
using System.Collections;
using System.Collections.Generic;
namespace NadekoBot.Services.Database.Repositories namespace NadekoBot.Services.Database.Repositories
{ {
public interface IReminderRepository : IRepository<Reminder> public interface IReminderRepository : IRepository<Reminder>
{ {
IEnumerable<Reminder> GetIncludedReminders(List<long> guildIds);
} }
} }

View File

@ -136,9 +136,10 @@ namespace NadekoBot.Services.Database.Repositories.Impl
return query.ToList(); return query.ToList();
} }
public IEnumerable<GuildConfig> Permissionsv2ForAll() public IEnumerable<GuildConfig> Permissionsv2ForAll(List<long> include)
{ {
var query = _set var query = _set
.Where(x => include.Contains((long)x.GuildId))
.Include(gc => gc.Permissions); .Include(gc => gc.Permissions);
return query.ToList(); return query.ToList();
@ -169,8 +170,10 @@ namespace NadekoBot.Services.Database.Repositories.Impl
return config; return config;
} }
public IEnumerable<FollowedStream> GetAllFollowedStreams() => public IEnumerable<FollowedStream> GetAllFollowedStreams(List<long> included) =>
_set.Include(gc => gc.FollowedStreams) _set
.Where(gc => included.Contains((long)gc.GuildId))
.Include(gc => gc.FollowedStreams)
.SelectMany(gc => gc.FollowedStreams) .SelectMany(gc => gc.FollowedStreams)
.ToList(); .ToList();

View File

@ -1,5 +1,8 @@
using NadekoBot.Services.Database.Models; using NadekoBot.Services.Database.Models;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Linq;
namespace NadekoBot.Services.Database.Repositories.Impl namespace NadekoBot.Services.Database.Repositories.Impl
{ {
@ -8,5 +11,10 @@ namespace NadekoBot.Services.Database.Repositories.Impl
public ReminderRepository(DbContext context) : base(context) public ReminderRepository(DbContext context) : base(context)
{ {
} }
public IEnumerable<Reminder> GetIncludedReminders(List<long> guildIds)
{
return _set.Where(x => guildIds.Contains((long)x.ServerId)).ToList();
}
} }
} }

View File

@ -24,7 +24,7 @@ namespace NadekoBot.Services.Permissions
public ConcurrentDictionary<ulong, PermissionCache> Cache { get; } = public ConcurrentDictionary<ulong, PermissionCache> Cache { get; } =
new ConcurrentDictionary<ulong, PermissionCache>(); new ConcurrentDictionary<ulong, PermissionCache>();
public PermissionService(DbService db, BotConfig bc, CommandHandler cmd) public PermissionService(DiscordSocketClient client, DbService db, BotConfig bc, CommandHandler cmd)
{ {
_log = LogManager.GetCurrentClassLogger(); _log = LogManager.GetCurrentClassLogger();
_db = db; _db = db;
@ -32,18 +32,23 @@ namespace NadekoBot.Services.Permissions
var sw = Stopwatch.StartNew(); var sw = Stopwatch.StartNew();
TryMigratePermissions(bc); TryMigratePermissions(bc);
using (var uow = _db.UnitOfWork)
client.Ready += delegate
{ {
foreach (var x in uow.GuildConfigs.Permissionsv2ForAll()) using (var uow = _db.UnitOfWork)
{ {
Cache.TryAdd(x.GuildId, new PermissionCache() foreach (var x in uow.GuildConfigs.Permissionsv2ForAll(client.Guilds.Select(x => (long)x.Id).ToList()))
{ {
Verbose = x.VerbosePermissions, Cache.TryAdd(x.GuildId, new PermissionCache()
PermRole = x.PermissionRole, {
Permissions = new PermissionsCollection<Permissionv2>(x.Permissions) Verbose = x.VerbosePermissions,
}); PermRole = x.PermissionRole,
Permissions = new PermissionsCollection<Permissionv2>(x.Permissions)
});
}
} }
} return Task.CompletedTask;
};
sw.Stop(); sw.Stop();
_log.Debug($"Loaded in {sw.Elapsed.TotalSeconds:F2}s"); _log.Debug($"Loaded in {sw.Elapsed.TotalSeconds:F2}s");

View File

@ -11,6 +11,7 @@ using System.Threading.Tasks;
namespace NadekoBot.Services.Searches namespace NadekoBot.Services.Searches
{ {
//todo move to the website
public class AnimeSearchService public class AnimeSearchService
{ {
private readonly Timer _anilistTokenRefresher; private readonly Timer _anilistTokenRefresher;

View File

@ -35,7 +35,7 @@ namespace NadekoBot.Services.Searches
IEnumerable<FollowedStream> streams; IEnumerable<FollowedStream> streams;
using (var uow = _db.UnitOfWork) using (var uow = _db.UnitOfWork)
{ {
streams = uow.GuildConfigs.GetAllFollowedStreams(); streams = uow.GuildConfigs.GetAllFollowedStreams(client.Guilds.Select(x => (long)x.Id).ToList());
} }
await Task.WhenAll(streams.Select(async fs => await Task.WhenAll(streams.Select(async fs =>

View File

@ -11,6 +11,7 @@ using System.Threading.Tasks;
namespace NadekoBot.Services.Utility namespace NadekoBot.Services.Utility
{ {
//todo periodically load from the database, update only on shard 0
public class ConverterService public class ConverterService
{ {
public List<ConvertUnit> Units { get; set; } = new List<ConvertUnit>(); public List<ConvertUnit> Units { get; set; } = new List<ConvertUnit>();

View File

@ -12,6 +12,7 @@ using System.Threading.Tasks;
namespace NadekoBot.Services.Utility namespace NadekoBot.Services.Utility
{ {
//todo periodically load from the database, update only on shard 0
public class PatreonRewardsService public class PatreonRewardsService
{ {
private readonly SemaphoreSlim getPledgesLocker = new SemaphoreSlim(1, 1); private readonly SemaphoreSlim getPledgesLocker = new SemaphoreSlim(1, 1);

View File

@ -33,7 +33,7 @@ namespace NadekoBot.Services.Utility
private readonly DiscordSocketClient _client; private readonly DiscordSocketClient _client;
private readonly DbService _db; private readonly DbService _db;
public RemindService(DiscordSocketClient client, BotConfig config, DbService db) public RemindService(DiscordSocketClient client, BotConfig config, DbService db, List<long> guilds)
{ {
_config = config; _config = config;
_client = client; _client = client;
@ -45,7 +45,7 @@ namespace NadekoBot.Services.Utility
List<Reminder> reminders; List<Reminder> reminders;
using (var uow = _db.UnitOfWork) using (var uow = _db.UnitOfWork)
{ {
reminders = uow.Reminders.GetAll().ToList(); reminders = uow.Reminders.GetIncludedReminders(guilds).ToList();
} }
RemindMessageFormat = _config.RemindMessageFormat; RemindMessageFormat = _config.RemindMessageFormat;

View File

@ -1,69 +0,0 @@
using Discord;
using Discord.WebSocket;
using NadekoBot.Extensions;
using NadekoBot.Services.Database.Models;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace NadekoBot.Services.Utility
{
public class CrossServerTextService
{
public readonly ConcurrentDictionary<int, ConcurrentHashSet<ITextChannel>> Subscribers =
new ConcurrentDictionary<int, ConcurrentHashSet<ITextChannel>>();
private DiscordSocketClient _client;
public CrossServerTextService(IEnumerable<GuildConfig> guildConfigs, DiscordSocketClient client)
{
_client = client;
_client.MessageReceived += Client_MessageReceived;
}
private Task Client_MessageReceived(SocketMessage imsg)
{
var _ = Task.Run(async () => {
try
{
if (imsg.Author.IsBot)
return;
var msg = imsg as IUserMessage;
if (msg == null)
return;
var channel = imsg.Channel as ITextChannel;
if (channel == null)
return;
if (msg.Author.Id == _client.CurrentUser.Id) return;
foreach (var subscriber in Subscribers)
{
var set = subscriber.Value;
if (!set.Contains(channel))
continue;
foreach (var chan in set.Except(new[] { channel }))
{
try
{
await chan.SendMessageAsync(GetMessage(channel, (IGuildUser)msg.Author,
msg)).ConfigureAwait(false);
}
catch
{
// ignored
}
}
}
}
catch
{
// ignored
}
});
return Task.CompletedTask;
}
private string GetMessage(ITextChannel channel, IGuildUser user, IUserMessage message) =>
$"**{channel.Guild.Name} | {channel.Name}** `{user.Username}`: " + message.Content.SanitizeMentions();
}
}

View File

@ -39,10 +39,10 @@ namespace NadekoBot
return Task.CompletedTask; return Task.CompletedTask;
} }
public async Task RunAsync(params string[] args) public async Task RunAsync()
{ {
var curProcessId = Process.GetCurrentProcess().Id; var curProcessId = Process.GetCurrentProcess().Id;
for (int i = 0; i < Credentials.TotalShards; i++) for (int i = 1; i < Credentials.TotalShards; i++)
{ {
var p = Process.Start(new ProcessStartInfo() var p = Process.Start(new ProcessStartInfo()
{ {
@ -56,11 +56,11 @@ namespace NadekoBot
} }
} }
public async Task RunAndBlockAsync(params string[] args) public async Task RunAndBlockAsync()
{ {
try try
{ {
await RunAsync(args).ConfigureAwait(false); await RunAsync().ConfigureAwait(false);
} }
catch (Exception ex) catch (Exception ex)
{ {

View File

@ -613,9 +613,6 @@
"utility_convert_not_found": "Cannot convert {0} to {1}: units not found", "utility_convert_not_found": "Cannot convert {0} to {1}: units not found",
"utility_convert_type_error": "Cannot convert {0} to {1}: types of unit are not equal", "utility_convert_type_error": "Cannot convert {0} to {1}: types of unit are not equal",
"utility_created_at": "Created at", "utility_created_at": "Created at",
"utility_csc_join": "Joined cross server channel.",
"utility_csc_leave": "Left cross server channel.",
"utility_csc_token": "This is your CSC token",
"utility_custom_emojis": "Custom emojis", "utility_custom_emojis": "Custom emojis",
"utility_error": "Error", "utility_error": "Error",
"utility_features": "Features", "utility_features": "Features",