using NadekoBot.Core.Services.Impl; using NLog; using System; using System.Diagnostics; using System.Threading.Tasks; using NadekoBot.Common.ShardCom; using StackExchange.Redis; using Newtonsoft.Json; using NadekoBot.Extensions; using NadekoBot.Common.Collections; using System.Linq; using System.Collections.Generic; namespace NadekoBot.Core.Services { public class ShardsCoordinator { private class ShardsCoordinatorQueue { private readonly object _locker = new object(); private readonly HashSet _set = new HashSet(); private readonly Queue _queue = new Queue(); public int Count => _queue.Count; public void Enqueue(int i) { lock (_locker) { if (_set.Add(i)) _queue.Enqueue(i); } } public bool TryPeek(out int id) { lock (_locker) { return _queue.TryPeek(out id); } } public bool TryDequeue(out int id) { lock (_locker) { if (_queue.TryDequeue(out id)) { _set.Remove(id); return true; } } return false; } } private readonly BotCredentials _creds; private readonly string _key; private readonly Process[] _shardProcesses; private readonly Logger _log; private readonly int _curProcessId; private readonly ConnectionMultiplexer _redis; private ShardComMessage _defaultShardState; private ShardsCoordinatorQueue _shardStartQueue = new ShardsCoordinatorQueue(); private ConcurrentHashSet _shardRestartWaitingList = new ConcurrentHashSet(); public ShardsCoordinator() { //load main stuff LogSetup.SetupLogger(-1); _log = LogManager.GetCurrentClassLogger(); _creds = new BotCredentials(); _log.Info("Starting NadekoBot v" + StatsService.BotVersion); _key = _creds.RedisKey(); _redis = ConnectionMultiplexer.Connect("127.0.0.1"); new RedisImagesCache(_redis, _creds).Reload(); //reload images into redis //setup initial shard statuses _defaultShardState = new ShardComMessage() { ConnectionState = Discord.ConnectionState.Disconnected, Guilds = 0, Time = DateTime.UtcNow }; var db = _redis.GetDatabase(); //clear previous statuses db.KeyDelete(_key + "_shardstats"); _shardProcesses = new Process[_creds.TotalShards]; for (int i = 0; i < _creds.TotalShards; i++) { //add it to the list of shards which should be started #if DEBUG if (i > 0) _shardStartQueue.Enqueue(i); else _shardProcesses[i] = Process.GetCurrentProcess(); #else _shardStartQueue.Enqueue(i); #endif //set the shard's initial state in redis cache var msg = _defaultShardState.Clone(); msg.ShardId = i; //this is to avoid the shard coordinator thinking that //the shard is unresponsive while starting up var delay = 45; #if GLOBAL_NADEKO delay = 180; #endif msg.Time = DateTime.UtcNow + TimeSpan.FromSeconds(delay * (i + 1)); db.ListRightPush(_key + "_shardstats", JsonConvert.SerializeObject(msg), flags: CommandFlags.FireAndForget); } _curProcessId = Process.GetCurrentProcess().Id; //subscribe to shardcoord events var sub = _redis.GetSubscriber(); //send is called when shard status is updated. Every 7.5 seconds atm sub.Subscribe(_key + "_shardcoord_send", OnDataReceived, CommandFlags.FireAndForget); //called to stop the shard, although the shard will start again when it finds out it's dead sub.Subscribe(_key + "_shardcoord_stop", OnStop, CommandFlags.FireAndForget); //called kill the bot sub.Subscribe(_key + "_die", (ch, x) => Environment.Exit(0), CommandFlags.FireAndForget); } private void OnStop(RedisChannel ch, RedisValue data) { var shardId = JsonConvert.DeserializeObject(data); OnStop(shardId); } private void OnStop(int shardId) { var db = _redis.GetDatabase(); var msg = _defaultShardState.Clone(); msg.ShardId = shardId; db.ListSetByIndex(_key + "_shardstats", shardId, JsonConvert.SerializeObject(msg), CommandFlags.FireAndForget); var p = _shardProcesses[shardId]; _shardProcesses[shardId] = null; try { p?.Kill(); } catch { } try { p?.Dispose(); } catch { } } private void OnDataReceived(RedisChannel ch, RedisValue data) { var msg = JsonConvert.DeserializeObject(data); if (msg == null) return; var db = _redis.GetDatabase(); //sets the shard state db.ListSetByIndex(_key + "_shardstats", msg.ShardId, data, CommandFlags.FireAndForget); if (msg.ConnectionState == Discord.ConnectionState.Disconnected || msg.ConnectionState == Discord.ConnectionState.Disconnecting) { _log.Error("!!! SHARD {0} IS IN {1} STATE !!!", msg.ShardId, msg.ConnectionState.ToString()); OnShardUnavailable(msg.ShardId); } else { // remove the shard from the waiting list if it's on it, // because it's connected/connecting now _shardRestartWaitingList.TryRemove(msg.ShardId); } return; } private void OnShardUnavailable(int shardId) { //if the shard is dc'd, add it to the restart waiting list if (!_shardRestartWaitingList.Add(shardId)) { //if it's already on the waiting list //stop the shard OnStop(shardId); //add it to the start queue (start the shard) _shardStartQueue.Enqueue(shardId); //remove it from the waiting list _shardRestartWaitingList.TryRemove(shardId); } } public async Task RunAsync() { //this task will complete when the initial start of the shards //is complete, but will keep running in order to restart shards //which are disconnected for too long TaskCompletionSource tsc = new TaskCompletionSource(); var _ = Task.Run(async () => { do { //start a shard which is scheduled for start every 6 seconds while (_shardStartQueue.TryPeek(out var id)) { // if the shard is on the waiting list again // remove it since it's starting up now _shardRestartWaitingList.TryRemove(id); //if the task is already completed, //it means the initial shard starting is done, //and this is an auto-restart if (tsc.Task.IsCompleted) { _log.Warn("Auto-restarting shard {0}", id); } var p = StartShard(id); var toRemove = _shardProcesses[id]; try { toRemove?.Kill(); } catch { } try { toRemove?.Dispose(); } catch { } _shardProcesses[id] = p; _shardStartQueue.TryDequeue(out var __); await Task.Delay(6000).ConfigureAwait(false); } tsc.TrySetResult(true); await Task.Delay(6000).ConfigureAwait(false); } while (true); // ^ keep checking for shards which need to be restarted }); //restart unresponsive shards _ = Task.Run(async () => { //after all shards have started initially await tsc.Task.ConfigureAwait(false); while (true) { await Task.Delay(15000).ConfigureAwait(false); try { var db = _redis.GetDatabase(); //get all shards which didn't communicate their status in the last 30 seconds var all = db.ListRange(_creds.RedisKey() + "_shardstats") .Select(x => JsonConvert.DeserializeObject(x)); var statuses = all .Where(x => x.Time < DateTime.UtcNow - TimeSpan.FromSeconds(30)); if (!statuses.Any()) { #if DEBUG for (var i = 0; i < _shardProcesses.Length; i++) { var p = _shardProcesses[i]; if (p == null || p.HasExited) { _log.Warn("Scheduling shard {0} for restart because it's process is stopped.", i); _shardStartQueue.Enqueue(i); } } #endif } else { foreach (var s in statuses) { OnStop(s.ShardId); _shardStartQueue.Enqueue(s.ShardId); //to prevent shards which are already scheduled for restart to be scheduled again s.Time = DateTime.UtcNow + TimeSpan.FromSeconds(30 * _shardStartQueue.Count); db.ListSetByIndex(_key + "_shardstats", s.ShardId, JsonConvert.SerializeObject(s), CommandFlags.FireAndForget); _log.Warn("Shard {0} is scheduled for a restart because it's unresponsive.", s.ShardId); } } } catch (Exception ex) { _log.Error(ex); throw; } } }); await tsc.Task.ConfigureAwait(false); return; } private Process StartShard(int shardId) { return Process.Start(new ProcessStartInfo() { FileName = _creds.ShardRunCommand, Arguments = string.Format(_creds.ShardRunArguments, shardId, _curProcessId, "") }); // last "" in format is for backwards compatibility // because current startup commands have {2} in them probably } public async Task RunAndBlockAsync() { try { await RunAsync().ConfigureAwait(false); } catch (Exception ex) { _log.Error(ex); foreach (var p in _shardProcesses) { try { p.Kill(); } catch { } try { p.Dispose(); } catch { } } return; } await Task.Delay(-1); } } }