From 4251da0a8634d311473524137c4046631aec4949 Mon Sep 17 00:00:00 2001 From: Master Kwoth Date: Tue, 26 Jan 2016 20:18:48 +0100 Subject: [PATCH] Completely rewrote music. #worth 8 hours --- NadekoBot/Classes/Extensions.cs | 10 + NadekoBot/Classes/Music/MusicControls.cs | 48 ++ NadekoBot/Classes/Music/StreamRequest.cs | 260 +++++++++ NadekoBot/Modules/Music.cs | 647 +++-------------------- NadekoBot/NadekoBot.csproj | 2 + 5 files changed, 391 insertions(+), 576 deletions(-) create mode 100644 NadekoBot/Classes/Music/MusicControls.cs create mode 100644 NadekoBot/Classes/Music/StreamRequest.cs diff --git a/NadekoBot/Classes/Extensions.cs b/NadekoBot/Classes/Extensions.cs index b0759500..0ff856af 100644 --- a/NadekoBot/Classes/Extensions.cs +++ b/NadekoBot/Classes/Extensions.cs @@ -174,5 +174,15 @@ namespace NadekoBot.Extensions // Step 7 return d[n, m]; } + + public static int KiB(this int value) => value * 1024; + public static int KB(this int value) => value * 1000; + + public static int MiB(this int value) => value.KiB() * 1024; + public static int MB(this int value) => value.KB() * 1000; + + public static int GiB(this int value) => value.MiB() * 1024; + public static int GB(this int value) => value.MB() * 1000; + } } diff --git a/NadekoBot/Classes/Music/MusicControls.cs b/NadekoBot/Classes/Music/MusicControls.cs new file mode 100644 index 00000000..0347f516 --- /dev/null +++ b/NadekoBot/Classes/Music/MusicControls.cs @@ -0,0 +1,48 @@ +using Discord; +using Discord.Audio; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace NadekoBot.Classes.Music { + public class MusicControls { + public bool NextSong = false; + public IAudioClient Voice; + public Channel VoiceChannel; + public bool Pause = false; + public List SongQueue = new List(); + public StreamRequest CurrentSong; + + public bool IsPaused { get; internal set; } + + public MusicControls() { + Task.Run(async () => { + while (true) { + try { + if (CurrentSong == null || CurrentSong.State == StreamState.Completed) { + LoadNextSong(); + } + } catch (Exception e) { + Console.WriteLine("Bug in music task run. " + e); + } + await Task.Delay(200); + } + }); + } + + private void LoadNextSong() { + if (SongQueue.Count == 0) { + if (CurrentSong != null) + CurrentSong.Cancel(); + CurrentSong = null; + return; + } + CurrentSong = SongQueue[0]; + SongQueue.RemoveAt(0); + CurrentSong.Start(); + Console.WriteLine("starting"); + } + } +} diff --git a/NadekoBot/Classes/Music/StreamRequest.cs b/NadekoBot/Classes/Music/StreamRequest.cs new file mode 100644 index 00000000..65219452 --- /dev/null +++ b/NadekoBot/Classes/Music/StreamRequest.cs @@ -0,0 +1,260 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Discord; +using Discord.Commands; +using Discord.Audio; +using YoutubeExtractor; +using NadekoBot.Modules; +using System.IO; +using System.Diagnostics; +using NadekoBot.Extensions; +using System.Threading; + +namespace NadekoBot.Classes.Music { + public enum StreamState { + Resolving, + Queued, + Buffering, //not using it atm + Playing, + Completed + } + + public class StreamRequest { + public Channel Channel { get; } + public Server Server { get; } + public User User { get; } + public string Query { get; } + + private MusicStreamer musicStreamer = null; + public StreamState State => musicStreamer?.State ?? StreamState.Resolving; + + public StreamRequest(CommandEventArgs e, string query) { + if (e == null) + throw new ArgumentNullException(nameof(e)); + if (query == null) + throw new ArgumentNullException(nameof(query)); + if (e.User.VoiceChannel == null) + throw new NullReferenceException("Voicechannel is null."); + + this.Server = e.Server; + this.Channel = e.User.VoiceChannel; + this.User = e.User; + this.Query = query; + ResolveStreamLink(); + } + + private Task ResolveStreamLink() => + Task.Run(() => { + Console.WriteLine("Resolving video link"); + var video = DownloadUrlResolver.GetDownloadUrls(Searches.FindYoutubeUrlByKeywords(Query)) + .Where(v => v.AdaptiveType == AdaptiveType.Audio) + .OrderByDescending(v => v.AudioBitrate).FirstOrDefault(); + + if (video == null) // do something with this error + throw new Exception("Could not load any video elements"); + + Title = video.Title; + + musicStreamer = new MusicStreamer(this, video.DownloadUrl, Channel); + OnQueued(); + }); + + internal void Pause() { + throw new NotImplementedException(); + } + + public string Title { get; internal set; } = String.Empty; + + public Action OnQueued = null; + public Action OnBuffering = null; + public Action OnStarted = null; + public Action OnCompleted = null; + + //todo maybe add remove, in order to create remove at position command + + internal void Cancel() { + musicStreamer?.StopPlayback(); + } + + internal Task Start() => + Task.Run(async () => { + Console.WriteLine("Start called."); + + int attemptsLeft = 7; + //wait for up to 7 seconds to resolve a link + while (State == StreamState.Resolving) { + await Task.Delay(1000); + Console.WriteLine("Resolving..."); + if (--attemptsLeft == 0) { + Console.WriteLine("Resolving timed out."); + return; + } + } + try { + await musicStreamer.StartPlayback(); + } catch (Exception ex) { + Console.WriteLine("Error in start playback." + ex); + } + }); + } + + public class MusicStreamer { + private Channel channel; + private DualStream buffer; + + public StreamState State { get; internal set; } + public string Url { get; private set; } + + StreamRequest parent; + private readonly object _bufferLock = new object(); + private CancellationTokenSource cancelSource; + + public MusicStreamer(StreamRequest parent, string directUrl, Channel channel) { + this.parent = parent; + this.channel = channel; + this.buffer = new DualStream(); + this.Url = directUrl; + Console.WriteLine("Created new streamer"); + State = StreamState.Queued; + cancelSource = new CancellationTokenSource(); + } + //todo app will crash if song is too long, should load only next 20-ish seconds + private async Task BufferSong() { + Console.WriteLine("Buffering..."); + //start feeding the buffer + var p = Process.Start(new ProcessStartInfo { + FileName = "ffmpeg", + Arguments = $"-i {Url} -f s16le -ar 48000 -ac 2 pipe:1", + UseShellExecute = false, + RedirectStandardError = false, + RedirectStandardOutput = true, + CreateNoWindow = true, + }); + + + while (true) { + while (buffer.writePos - buffer.readPos > 2.MB() && !cancelSource.IsCancellationRequested) { + Console.WriteLine($"Got over 2MB more, waiting. Data length:{buffer.Length * 1.0f / 1.MB()}MB"); + await Task.Delay(1000); + } + + if (cancelSource.IsCancellationRequested) return; + + if (buffer.Length > 5.MB()) { // if buffer is over 10 MB, create new one + Console.WriteLine("Buffer over 10 megs, clearing."); + + var skip = 2.MB(); + byte[] data = buffer.ToArray().Skip(skip).ToArray(); + + lock (_bufferLock) { + var newWritePos = buffer.writePos - skip; + var newReadPos = buffer.readPos - skip; + var newPos = buffer.Position - skip; + + buffer = new DualStream(); + buffer.Write(data, 0, data.Length); + + buffer.writePos = newWritePos; + buffer.readPos = newReadPos; + buffer.Position = newPos; + } + + } + + var buf = new byte[1024]; + int read = 0; + read = await p.StandardOutput.BaseStream.ReadAsync(buf, 0, 1024); + + if (read == 0) { + Console.WriteLine("Didn't read anything from the stream"); + return; + } + await buffer.WriteAsync(buf, 0, read); + } + } + + internal Task StartPlayback() => + Task.Run(async () => { + Console.WriteLine("Starting playback."); + State = StreamState.Playing; + if (parent.OnBuffering != null) + parent.OnBuffering(); + Task.Run(async () => await BufferSong()); + await Task.Delay(2000,cancelSource.Token); + if (parent.OnStarted != null) + parent.OnStarted(); + Console.WriteLine("Prebuffering complete."); + //for now wait for 3 seconds before starting playback. + + var audio = NadekoBot.client.Audio(); + + var voiceClient = await audio.Join(channel); + int blockSize = 1920 * NadekoBot.client.Audio().Config.Channels; + byte[] voiceBuffer = new byte[blockSize]; + + while (true) { + int readCount = 0; + lock (_bufferLock) { + readCount = buffer.Read(voiceBuffer, 0, voiceBuffer.Length); + } + + if (readCount == 0) { + Console.WriteLine("Nothing to read, stream finished."); + break; + } + + // while (MusicControls.IsPaused && !cancelSource.IsCancellationRequested) + // await Task.Delay(100); + + if (cancelSource.IsCancellationRequested) { + Console.WriteLine("Canceled"); + break; + } + + voiceClient.Send(voiceBuffer, 0, voiceBuffer.Length); + } + + voiceClient.Wait(); + State = StreamState.Completed; + Console.WriteLine("Song completed."); + if (parent.OnCompleted != null) + parent.OnCompleted(); + }); + + internal void StopPlayback() { + Console.WriteLine("Stopping playback"); + State = StreamState.Completed; + if(cancelSource.Token.CanBeCanceled) + cancelSource.Cancel(); + } + } + + public class DualStream : MemoryStream { + public long readPos; + public long writePos; + + public DualStream() : base() { } + public DualStream(byte[] data) : base(data) { } + + public override int Read(byte[] buffer, int offset, int count) { + int read; + lock (this) { + Position = readPos; + read = base.Read(buffer, offset, count); + readPos = Position; + } + return read; + } + public override void Write(byte[] buffer, int offset, int count) { + lock (this) { + Position = writePos; + base.Write(buffer, offset, count); + writePos = Position; + } + } + } + +} diff --git a/NadekoBot/Modules/Music.cs b/NadekoBot/Modules/Music.cs index b14e96f1..897cdbd5 100644 --- a/NadekoBot/Modules/Music.cs +++ b/NadekoBot/Modules/Music.cs @@ -1,6 +1,4 @@ -// Thanks to @Bloodskilled for providing most of the music code from his BooBot -// check out his server https://discord.gg/0aMlLYi2e2V7h2Kr -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; @@ -16,70 +14,45 @@ using NadekoBot.Extensions; using System.Net; using System.Globalization; using System.Collections.Concurrent; +using NadekoBot.Classes.Music; namespace NadekoBot.Modules { class Music : DiscordModule { - public class MusicControls { - public bool NextSong = false; - public IAudioClient Voice; - public Channel VoiceChannel; - public bool Pause = false; - public List SongQueue = new List(); - public StreamRequest CurrentSong; - - public MusicControls() { - Task.Run(async () => { - while (true) { - try { - if (CurrentSong == null || CurrentSong.State == StreamTaskState.Completed) { - LoadNextSong(); - } - } catch (Exception e) { - Console.WriteLine("Bug in music task run. " + e); - } - await Task.Delay(200); - - CleanMusicPlayers(); - } - }); - } - - private void LoadNextSong() { - if (SongQueue.Count == 0 || !SongQueue[0].LinkResolved) { - if (CurrentSong != null) - CurrentSong.Cancel(); - CurrentSong = null; - return; - } - CurrentSong = SongQueue[0]; - SongQueue.RemoveAt(0); - CurrentSong.Start(); - return; - } - - } - public static ConcurrentDictionary musicPlayers = new ConcurrentDictionary(); - - public Music() : base() { - //commands.Add(new PlayMusic()); + internal static void CleanMusicPlayers() { + foreach (var mp in musicPlayers + .Where(kvp => kvp.Value.CurrentSong == null + && kvp.Value.SongQueue.Count == 0)) { + var val = mp.Value; + musicPlayers.TryRemove(mp.Key, out val); + } } - //m r,radio - init - //m n,next - next in que - //m p,pause - pauses, call again to unpause - //m yq [key_words] - queue from yt by keywords - //m s,stop - stop - //m sh - shuffle songs - //m pl - current playlist + internal static string GetMusicStats() { + var servers = 0; + var queued = 0; + musicPlayers.ForEach(kvp => { + var mp = kvp.Value; + if (mp.SongQueue.Count > 0 || mp.CurrentSong != null) + queued += mp.SongQueue.Count + 1; + servers++; + }); + + return $"Playing {queued} songs across {servers} servers."; + } + + public Music() : base() { + System.Timers.Timer cleaner = new System.Timers.Timer(); + cleaner.Elapsed += (s, e) => CleanMusicPlayers(); + cleaner.Interval = 10000; + cleaner.Start(); + } public override void Install(ModuleManager manager) { var client = NadekoBot.client; - - manager.CreateCommands("!m", cgb => { //queue all more complex commands commands.ForEach(cmd => cmd.Init(cgb)); @@ -89,12 +62,7 @@ namespace NadekoBot.Modules { .Description("Goes to the next song in the queue.") .Do(e => { if (musicPlayers.ContainsKey(e.Server) == false || (musicPlayers[e.Server]?.CurrentSong) == null) return; - var CurrentSong = musicPlayers[e.Server].CurrentSong; - CurrentSong.Cancel(); - CurrentSong = musicPlayers[e.Server].SongQueue.Take(1).FirstOrDefault(); - if (CurrentSong != null) { - CurrentSong.Start(); - } + musicPlayers[e.Server].CurrentSong.Cancel(); }); cgb.CreateCommand("s") @@ -115,19 +83,55 @@ namespace NadekoBot.Modules { .Description("Pauses the song") .Do(async e => { if (musicPlayers.ContainsKey(e.Server) == false) return; - await e.Send("Not yet implemented."); + await e.Send("This feature is coming VERY soon."); + /* + if (musicPlayers[e.Server].Pause()) + if (musicPlayers[e.Server].IsPaused) + await e.Send("Music player Paused"); + else + await e.Send("Music player unpaused."); + */ }); cgb.CreateCommand("q") .Alias("yq") .Description("Queue a song using keywords or link. **You must be in a voice channel**.\n**Usage**: `!m q Dream Of Venice`") - .Parameter("Query", ParameterType.Unparsed) + .Parameter("query", ParameterType.Unparsed) .Do(async e => { if (musicPlayers.ContainsKey(e.Server) == false) - musicPlayers.TryAdd(e.Server, new MusicControls()); + if (musicPlayers.Count > 25) { + await e.Send($"{e.User.Mention}, playlist supports up to 25 songs. If you think this is not enough, contact the owner.:warning:"); + return; + } + else + musicPlayers.TryAdd(e.Server, new MusicControls()); + var player = musicPlayers[e.Server]; - player.SongQueue.Add(new StreamRequest(NadekoBot.client, e, e.GetArg("Query"))); - await e.Send(":warning: Music is unstable atm, working on a fix. :warning:"); + try { + var sr = new StreamRequest(e, e.GetArg("query")); + Message msg = null; + sr.OnQueued += async() => { + msg = await e.Send($":musical_note:**Queued** {sr.Title}"); + }; + sr.OnCompleted += async () => { + await e.Send($":musical_note:**Finished playing** {sr.Title}"); + }; + sr.OnStarted += async () => { + if (msg == null) + await e.Send($":musical_note:**Started playing** {sr.Title}"); + else + await msg.Edit($":musical_note:**Started playing** {sr.Title}"); + }; + sr.OnBuffering += async () => { + if (msg != null) + msg = await e.Send($":musical_note:**Buffering the song**...{sr.Title}"); + }; + player.SongQueue.Add(sr); + } catch (Exception ex) { + Console.WriteLine(); + await e.Send("Error. :anger:"); + return; + } }); cgb.CreateCommand("lq") @@ -173,514 +177,5 @@ namespace NadekoBot.Modules { }); }); } - internal static void CleanMusicPlayers() { - foreach (var mp in musicPlayers - .Where(kvp => kvp.Value.CurrentSong == null - && kvp.Value.SongQueue.Count == 0)) { - var val = mp.Value; - musicPlayers.TryRemove(mp.Key, out val); - } - } - - internal static string GetMusicStats() { - var servers = 0; - var queued = 0; - musicPlayers.ForEach(kvp => { - var mp = kvp.Value; - if(mp.SongQueue.Count > 0 || mp.CurrentSong != null) - queued += mp.SongQueue.Count + 1; - servers++; - }); - - return $"Playing {queued} songs across {servers} servers."; - - } - } - - enum StreamTaskState { - Queued, - Playing, - Completed, - NotReady - } - - class StreamRequest { - static readonly string[] validFormats = { ".ogg", ".wav", ".mp3", ".webm", ".aac", ".mp4", ".flac" }; - - readonly DiscordClient client; - public readonly Server Server; - public readonly Channel Channel; - public Channel VoiceChannel; - public readonly User User; - public readonly string RequestText; - - const string DefaultTitle = ""; - public string Title = DefaultTitle; - public TimeSpan Length = TimeSpan.FromSeconds(0); - public string FileName; - - public bool LinkResolved = false; - public string StreamUrl; - public bool NetworkDone; - public long TotalSourceBytes; - - Stream bufferingStream; - StreamTask streamTask; - - public StreamTaskState State => streamTask?.State ?? StreamTaskState.Queued; - - - public StreamRequest(DiscordClient client, CommandEventArgs e, string text) { - this.client = client; - Server = e.Server; - Channel = e.Channel; - User = e.User; - RequestText = text.Trim(); - - FileName = "unresolved_" + Uri.EscapeUriString(RequestText); - - Task.Run(() => { - try { - ResolveLink(); - } catch (Exception ex) { - Console.WriteLine("Exception in ResolveLink: " + ex); - } - }); - } - - void ResolveLink() { - var query = RequestText; - try { - var video = DownloadUrlResolver.GetDownloadUrls(Searches.FindYoutubeUrlByKeywords(query)) - .Where(v => v.AdaptiveType == AdaptiveType.Audio) - .OrderByDescending(v => v.AudioBitrate).FirstOrDefault(); - - if (video == null) - throw new Exception("Could not load any video elements"); - - StreamUrl = video.DownloadUrl; - Title = video.Title; - string invalidChars = new string(Path.GetInvalidFileNameChars()) + new string(Path.GetInvalidPathChars()); - foreach (char c in invalidChars) { - FileName = FileName.Replace(c.ToString(), "_"); - } - - StartBuffering(); - LinkResolved = true; - Channel.Send($"{User.Mention}, Queued **{video.Title}**"); - } catch (Exception e) { - // Send a message to the guy that queued that - Channel.SendMessage("This video is unavailable in the country the Bot is running in, or you enter an invalid name or url."); - - Console.WriteLine("Cannot parse youtube url: " + query); - Cancel(); - } - } - - internal void StartBuffering() { - var folder = "StreamBuffers"; - Directory.CreateDirectory(folder); - var fullPath = Path.Combine(folder, FileName); - - FileStream fileStream; - try { - if (File.Exists(fullPath) && new FileInfo(fullPath).Length > 1024 * 2) { - NetworkDone = true; - TotalSourceBytes = new FileInfo(fullPath).Length; - bufferingStream = new FileStream(fullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); - - if (Length.TotalSeconds < double.Epsilon) - Length = GetFileLength(fullPath); - return; - } - - // Open a new file to stream into - fileStream = new FileStream(fullPath, FileMode.Create, FileAccess.Write, FileShare.Read); - bufferingStream = new FileStream(fullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite); - } catch (Exception ex) { - Console.WriteLine("Exception while creating or opening stream buffers: " + ex); - return; - } - - Task.Run(() => { - int byteCounter = 0; - - try { - var webClient = new WebClient(); - var networkStream = webClient.OpenRead(StreamUrl); - - if (networkStream == null) - return; - - byte[] buffer = new byte[0x1000]; - while (true) { - int read = networkStream.Read(buffer, 0, buffer.Length); - if (read <= 0) - break; - byteCounter += read; - TotalSourceBytes += read; - fileStream.Write(buffer, 0, read); - - if (TotalSourceBytes > 1024 * 2 && Length.TotalSeconds < 0.1) { - Length = GetFileLength(fullPath); - } - } - } catch (Exception ex) { - Console.WriteLine("Exception while buffering (network->file): " + ex); - } - - fileStream.Close(); - NetworkDone = true; - Console.WriteLine("net: done. ({0} read)", byteCounter); - }); - } - - internal void Start() { - if (State != StreamTaskState.Queued) - return; - - Stopwatch resolveTimer = Stopwatch.StartNew(); - - while (resolveTimer.ElapsedMilliseconds < 8000) { - if (bufferingStream != null) - break; - Thread.Sleep(50); - } - - if (bufferingStream == null) { - Console.WriteLine("Buffering stream was not set! Can't play track!"); - streamTask = new StreamTask(client, this, null); - streamTask.CancelStreaming(); - return; - } - - streamTask = new StreamTask(client, this, bufferingStream); - - VoiceChannel = GetVoiceChannelForUser(User); - if (VoiceChannel == null) { - Channel.SendMessage($":warning: {User.Mention} `I can't find you in any voice channel. Join one, then try again...`"); - streamTask.CancelStreaming(); // just to set the state to done - return; - } - - // Go! - streamTask.StartStreaming(); - } - - internal void Cancel() { - if (State == StreamTaskState.Completed) - return; - - if (streamTask == null) - streamTask = new StreamTask(client, this, bufferingStream); - streamTask.CancelStreaming(); - } - - - Channel GetVoiceChannelForUser(User user) { - return client.Servers.SelectMany(s => s.VoiceChannels).FirstOrDefault(c => c.Users.Any(u => u.Id == user.Id)); - } - - public static TimeSpan GetFileLength(string fileName) { - try { - var startInfo = new ProcessStartInfo("ffprobe", $"-i \"{fileName}\" -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1"); - startInfo.RedirectStandardOutput = true; - startInfo.RedirectStandardError = true; - startInfo.CreateNoWindow = true; - startInfo.UseShellExecute = false; - using (var process = Process.Start(startInfo)) { - var lengthLine = process.StandardOutput.ReadLine(); - double result; - if (double.TryParse(lengthLine, NumberStyles.AllowDecimalPoint, CultureInfo.InvariantCulture, out result)) { - int integerPart = (int)Math.Round(result); - return TimeSpan.FromSeconds(integerPart); - } else - return TimeSpan.Zero; - } - } catch (Exception) { - Console.WriteLine("Exception while determining file play-time"); - return TimeSpan.Zero; - } - } - } - class TranscodingTask { - readonly StreamRequest streamRequest; - readonly Stream bufferingStream; - - public long BytesSentToTranscoder { get; private set; } - public DualStream PCMOutput { get; private set; } - public long ReadyBytesLeft => PCMOutput?.writePos - PCMOutput?.readPos ?? 0; - - readonly CancellationTokenSource tokenSource = new CancellationTokenSource(); - - Task transcoderTask; - Task outputTask; - - public TranscodingTask(StreamRequest streamRequest, Stream bufferingStream) { - this.streamRequest = streamRequest; - this.bufferingStream = bufferingStream; - } - - public void Start() { - Task.Run(async () => { - // Wait for some data to arrive - while (true) { - if (streamRequest.NetworkDone) - break; - if (bufferingStream.Length > 1024 * 3) - break; - await Task.Delay(100); - } - - Stream input, pcmOutput; - var ffmpegProcess = GetTranscoderStreams(out input, out pcmOutput); - - PCMOutput = new DualStream(); - - // Keep pumping network stuff into the transcoder - transcoderTask = Task.Run(() => TranscoderFunc(bufferingStream, input, tokenSource.Token), tokenSource.Token); - - // Keep pumping transcoder output into the PCMOutput stream - outputTask = Task.Run(() => OutputFunc(pcmOutput, PCMOutput, tokenSource.Token), tokenSource.Token); - - // Wait until network stuff is all done - while (!streamRequest.NetworkDone) - await Task.Delay(200); - - // Then wait until we sent everything to the transcoder - while (BytesSentToTranscoder < streamRequest.TotalSourceBytes) - await Task.Delay(200); - - // Then wait some more until it did everything and kill it - await Task.Delay(5000); - - try { - tokenSource.Cancel(); - bufferingStream.Close(); - - Console.WriteLine("Killing transcoder..."); - ffmpegProcess.Kill(); - } catch { - } - }); - } - - async Task TranscoderFunc(Stream sourceStream, Stream transcoderInput, CancellationToken cancellationToken) { - try { - byte[] buffer = new byte[1024]; - while (true) { - if (cancellationToken.IsCancellationRequested) - break; - if (BytesSentToTranscoder >= streamRequest.TotalSourceBytes) - break; - - // When there is new stuff available on the network we want to get it instantly - long available = streamRequest.TotalSourceBytes - BytesSentToTranscoder; - - double availableRingSpace = PCMOutput.Length / (double)PCMOutput.Capacity; - - // How much data is in the final output buffer? - // We dont want to transcode too much in advance - if (available > 0) { - int read = await sourceStream.ReadAsync(buffer, 0, (int)Math.Min(available, buffer.LongLength), cancellationToken); - if (read > 0) { - // Write to transcoder - transcoderInput.Write(buffer, 0, read); - BytesSentToTranscoder += read; - } - } else { - // We have enough data transcoded already. Stall a bit so we dont do too much! - await Task.Delay(100); - } - } - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } - Console.WriteLine("TranscoderFunc stopped"); - } - - static async Task OutputFunc(Stream sourceStream, DualStream targetBuffer, CancellationToken cancellationToken) { - try { - byte[] buffer = new byte[1024]; - while (!cancellationToken.IsCancellationRequested) { - // When there is new stuff available on the network we want to get it instantly - int read = await sourceStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken); - if (read > 0) { - targetBuffer.Write(buffer, 0, read); - } - } - } catch (Exception ex) { - Console.WriteLine(ex.ToString()); - } - - Console.WriteLine("OutputFunc stopped"); - } - - internal static Process GetTranscoderStreams(out Stream input, out Stream pcmOutput) { - Process p = null; - Exception ex = null; - - try { - p = Process.Start(new ProcessStartInfo { - FileName = "ffmpeg", - Arguments = "-i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1", - UseShellExecute = false, - RedirectStandardError = false, - RedirectStandardOutput = true, - RedirectStandardInput = true, - CreateNoWindow = true, - }); - } catch (Exception exInner) { - ex = exInner; - } - - if (p == null || ex != null) { - input = null; - pcmOutput = null; - Console.WriteLine("Could not start ffmpeg: " + (ex?.Message ?? "")); - return null; - } - - pcmOutput = p.StandardOutput.BaseStream; - input = p.StandardInput.BaseStream; - return p; - } - - public void Cancel() { - tokenSource.Cancel(); - BytesSentToTranscoder = streamRequest.TotalSourceBytes; - } - } - public class DualStream : MemoryStream { - public long readPos; - public long writePos; - public override int Read(byte[] buffer, int offset, int count) { - int read; - lock (this) { - Position = readPos; - read = base.Read(buffer, offset, count); - readPos = Position; - } - return read; - } - public override void Write(byte[] buffer, int offset, int count) { - lock (this) { - Position = writePos; - base.Write(buffer, offset, count); - writePos = Position; - } - } - } - class StreamTask { - readonly DiscordClient client; - readonly StreamRequest streamRequest; - readonly Stream bufferingStream; - - CancellationTokenSource tokenSource; - Task audioTask; - - public StreamTaskState State { get; private set; } - - public StreamTask(DiscordClient client, StreamRequest streamRequest, Stream bufferingStream) { - this.streamRequest = streamRequest; - this.bufferingStream = bufferingStream; - this.client = client; - - State = StreamTaskState.Queued; - } - - public void StartStreaming() { - if (State != StreamTaskState.Queued) - return; - - State = StreamTaskState.Playing; - tokenSource = new CancellationTokenSource(); - audioTask = Task.Run(StreamFunc, tokenSource.Token); - } - - public void CancelStreaming() { - if (State != StreamTaskState.Queued && State != StreamTaskState.Playing) - return; - - tokenSource?.Cancel(false); - audioTask?.Wait(); - State = StreamTaskState.Completed; - } - - async Task StreamFunc() { - CancellationToken cancellationToken = tokenSource.Token; - IAudioClient voiceClient = null; - TranscodingTask streamer = null; - try { - uint byteCounter = 0; - - // Download and read audio from the url - streamer = new TranscodingTask(streamRequest, bufferingStream); - streamer.Start(); - - // Wait until we have at least a few kb transcoded or network stream done - while (true) { - if (streamRequest.NetworkDone) { - await Task.Delay(600); - break; - } - if (streamer.ReadyBytesLeft > 5 * 1024) - break; - await Task.Delay(200); - } - - if (cancellationToken.IsCancellationRequested) - return; - - // Start streaming to voice - await streamRequest.Channel.SendMessage($"Playing **{streamRequest.Title}** [{streamRequest.Length}]"); - - var audioService = client.Audio(); - voiceClient = await audioService.Join(streamRequest.VoiceChannel); - - int blockSize = 1920 * audioService.Config.Channels; - byte[] voiceBuffer = new byte[blockSize]; - var ringBuffer = streamer.PCMOutput; - - Stopwatch timeout = Stopwatch.StartNew(); - while (true) { - var readCount = ringBuffer.Read(voiceBuffer, 0, voiceBuffer.Length); - - if (readCount == 0) { - if (timeout.ElapsedMilliseconds > 1500) { - Console.WriteLine("Audio stream timed out. Disconnecting."); - break; - } - - await Task.Delay(200); - continue; - } - - if (cancellationToken.IsCancellationRequested) - return; - - timeout.Restart(); - - byteCounter += (uint)voiceBuffer.Length; - voiceClient.Send(voiceBuffer, 0, voiceBuffer.Length); - } - - streamer.Cancel(); - - voiceClient.Wait(); - } catch (Exception ex) { - await streamRequest.Channel.SendMessage($":musical_note: {streamRequest.User.Mention} Something went wrong, please report this. :angry: :anger:"); - Console.WriteLine("Exception while playing music: " + ex); - } finally { - if (voiceClient != null) { - await streamRequest.Channel.SendMessage($"Finished playing **{streamRequest.Title}**"); - State = StreamTaskState.Completed; - streamer?.Cancel(); - await voiceClient.Disconnect(); - await Task.Delay(500); - } - } - } - } - + } } diff --git a/NadekoBot/NadekoBot.csproj b/NadekoBot/NadekoBot.csproj index 88491a59..d9a54c9d 100644 --- a/NadekoBot/NadekoBot.csproj +++ b/NadekoBot/NadekoBot.csproj @@ -140,6 +140,8 @@ + +