diff --git a/NadekoBot/Modules/Music.cs b/NadekoBot/Modules/Music.cs index 7da39505..52c59c81 100644 --- a/NadekoBot/Modules/Music.cs +++ b/NadekoBot/Modules/Music.cs @@ -13,11 +13,10 @@ using VideoLibrary; using System.Threading; using System.Diagnostics; using Discord.Legacy; +using System.Net; -namespace NadekoBot.Modules -{ - class Music : DiscordModule - { +namespace NadekoBot.Modules { + class Music : DiscordModule { private static bool exit = true; public static bool NextSong = false; @@ -28,10 +27,9 @@ namespace NadekoBot.Modules public static YouTubeVideo CurrentSong; - public static bool Exit - { + public static bool Exit { get { return exit; } - set { exit = value;} // if i set this to true, break the song and exit the main loop + set { exit = value; } // if i set this to true, break the song and exit the main loop } public Music() : base() { @@ -46,21 +44,17 @@ namespace NadekoBot.Modules //m sh - shuffle songs //m pl - current playlist - public override void Install(ModuleManager manager) - { + public override void Install(ModuleManager manager) { var client = NadekoBot.client; - manager.CreateCommands("!m", cgb => - { + manager.CreateCommands("!m", cgb => { //queue all more complex commands commands.ForEach(cmd => cmd.Init(cgb)); cgb.CreateCommand("n") .Alias("next") .Description("Goes to the next song in the queue.") - .Do(e => - { - if (Voice != null && Exit == false) - { + .Do(e => { + if (Voice != null && Exit == false) { NextSong = true; } }); @@ -68,10 +62,8 @@ namespace NadekoBot.Modules cgb.CreateCommand("s") .Alias("stop") .Description("Completely stops the music and unbinds the bot from the channel.") - .Do(e => - { - if (Voice != null && Exit == false) - { + .Do(e => { + if (Voice != null && Exit == false) { Exit = true; SongQueue = new List(); } @@ -80,18 +72,13 @@ namespace NadekoBot.Modules cgb.CreateCommand("p") .Alias("pause") .Description("Pauses the song") - .Do(async e => - { - if (Voice != null && Exit == false && CurrentSong != null) - { + .Do(async e => { + if (Voice != null && Exit == false && CurrentSong != null) { Pause = !Pause; - if (Pause) - { - await e.Send( "Pausing. Run the command again to resume."); - } - else - { - await e.Send( "Resuming..."); + if (Pause) { + await e.Send("Pausing. Run the command again to resume."); + } else { + await e.Send("Resuming..."); } } }); @@ -115,113 +102,93 @@ namespace NadekoBot.Modules .Alias("yq") .Description("Queue a song using a multi/single word name.\nUsage: `!m q Dream Of Venice`") .Parameter("Query", ParameterType.Unparsed) - .Do(async e => - { + .Do(async e => { var youtube = YouTube.Default; var video = youtube.GetAllVideos(Searches.FindYoutubeUrlByKeywords(e.Args[0])) .Where(v => v.AdaptiveKind == AdaptiveKind.Audio) .OrderByDescending(v => v.AudioBitrate).FirstOrDefault(); - if (video?.Uri != "" && video.Uri != null) - { + if (video?.Uri != "" && video.Uri != null) { SongQueue.Add(video); - await e.Send( "**Queued** " + video.FullName); + await e.Send("**Queued** " + video.FullName); } }); - + cgb.CreateCommand("lq") .Alias("ls").Alias("lp") .Description("Lists up to 10 currently queued songs.") - .Do(async e => - { - await e.Send( SongQueue.Count + " videos currently queued."); - await e.Send( string.Join("\n", SongQueue.Select(v => v.FullName).Take(10))); + .Do(async e => { + await e.Send(SongQueue.Count + " videos currently queued."); + await e.Send(string.Join("\n", SongQueue.Select(v => v.FullName).Take(10))); }); cgb.CreateCommand("sh") .Description("Shuffles the current playlist.") - .Do(async e => - { - if (SongQueue.Count < 2) - { - await e.Send( "Not enough songs in order to perform the shuffle."); + .Do(async e => { + if (SongQueue.Count < 2) { + await e.Send("Not enough songs in order to perform the shuffle."); return; } SongQueue.Shuffle(); - await e.Send( "Songs shuffled!"); + await e.Send("Songs shuffled!"); }); cgb.CreateCommand("radio") .Alias("music") .Description("Binds to a voice and text channel in order to play music.") .Parameter("ChannelName", ParameterType.Unparsed) - .Do(async e => - { - if (Voice != null) return; - VoiceChannel = e.Server.FindChannels(e.GetArg("ChannelName").Trim(), ChannelType.Voice).FirstOrDefault(); - Voice = await client.Audio().Join(VoiceChannel); - Exit = false; - NextSong = false; - Pause = false; - try - { - while (true) - { + .Do(async e => { + if (Voice != null) return; + VoiceChannel = e.Server.FindChannels(e.GetArg("ChannelName").Trim(), ChannelType.Voice).FirstOrDefault(); + Voice = await client.Audio().Join(VoiceChannel); + Exit = false; + NextSong = false; + Pause = false; + try { + while (true) { if (Exit) break; if (SongQueue.Count == 0 || Pause) { Thread.Sleep(100); continue; } if (!LoadNextSong()) break; - await Task.Run(async () => - { - if (Exit) - { + await Task.Run(async () => { + if (Exit) { Voice = null; Exit = false; - await e.Send( "Exiting..."); + await e.Send("Exiting..."); return; } - int blockSize = 3840; - byte[] buffer = new byte[3840]; - - var msg = await e.Send( "Playing " + Music.CurrentSong.FullName + " [00:00]"); + + var streamer = new AudioStreamer(Music.CurrentSong.Uri); + streamer.Start(); + while (streamer.BytesSentToTranscoder < 100 * 0x1000 || streamer.NetworkDone) + await Task.Delay(500); + + int blockSize = 1920 * client.Audio().Config.Channels; + byte[] buffer = new byte[blockSize]; + + var msg = await e.Send("Playing " + Music.CurrentSong.FullName + " [00:00]"); int counter = 0; int byteCount; - using (var stream = GetAudioFileStream(Music.CurrentSong.Uri)) - { - var m = await e.Send("Downloading song..."); - var memStream = new MemoryStream(); - while (true) { - byte[] buff = new byte[0x4000 * 10]; - int read = stream.Read(buff, 0, buff.Length); - if (read <= 0) break; - memStream.Write(buff, 0, read); - } + var m = await e.Send("Downloading song..."); - e.Send("Song downloaded"); - memStream.Position = 0; - while ((byteCount = memStream.Read(buffer, 0, blockSize)) > 0) - { - Voice.Send(buffer, byteCount); - counter += blockSize; - if (NextSong) - { - NextSong = false; - break; - } - if (Exit) - { - Exit = false; - return; - } - while (Pause) Thread.Sleep(100); + while ((byteCount = streamer.PCMOutput.Read(buffer, 0, blockSize)) > 0) { + Voice.Send(buffer, byteCount); + counter += blockSize; + if (NextSong) { + NextSong = false; + break; } + if (Exit) { + Exit = false; + return; + } + while (Pause) Thread.Sleep(100); } }); } - Voice.Wait(); - } - catch (Exception ex) { Console.WriteLine(ex.ToString()); } + Voice.Wait(); + } catch (Exception ex) { Console.WriteLine(ex.ToString()); } await Voice.Disconnect(); Voice = null; VoiceChannel = null; @@ -229,10 +196,8 @@ namespace NadekoBot.Modules }); } - private Stream GetAudioFileStream(string file) - { - Process p = Process.Start(new ProcessStartInfo() - { + private Stream GetAudioFileStream(string file) { + Process p = Process.Start(new ProcessStartInfo() { FileName = "ffmpeg", Arguments = "-i \"" + Uri.EscapeUriString(file) + "\" -f s16le -ar 48000 -af volume=1 -ac 2 pipe:1 ", UseShellExecute = false, @@ -241,8 +206,7 @@ namespace NadekoBot.Modules return p.StandardOutput.BaseStream; } - private bool LoadNextSong() - { + private bool LoadNextSong() { if (SongQueue.Count == 0) { CurrentSong = null; return false; @@ -252,4 +216,136 @@ namespace NadekoBot.Modules return true; } } + + //new stuff + class AudioStreamer { + string sourceUrl; Channel statusTextChannel; + int totalSourceBytes; + public bool NetworkDone { get; private set; } + public int BytesSentToTranscoder { get; private set; } + public Stream PCMOutput { get; private set; } + CancellationTokenSource tokenSource1 = new CancellationTokenSource(); + CancellationTokenSource tokenSource2 = new CancellationTokenSource(); + Task transcoderTask; Task outputTask; + public AudioStreamer(string streamUrl, Channel statusTextChannel = null) { + sourceUrl = streamUrl; + this.statusTextChannel = statusTextChannel; + } + public void Start() { + Task.Run(async () => { + var bufferingStream = GetBufferingStream(sourceUrl); + Console.WriteLine("Buffering video..."); // Wait for some data to arrive + while (bufferingStream.Length < 1000 || NetworkDone) + await Task.Delay(500); + Console.WriteLine("buf done"); + Stream input, pcmOutput; + var ffmpegProcess = GetTranscoderStreams(out input, out pcmOutput); + PCMOutput = new DualStream(); // Keep pumping network stuff into the transcoder + transcoderTask = Task.Run(() => TranscoderFunc(bufferingStream, input, tokenSource1.Token)); + // Keep pumping transcoder output into the PCMOutput stream + outputTask = Task.Run(() => OutputFunc(pcmOutput, PCMOutput, tokenSource2.Token)); + // Wait until network stuff is all done + while (!NetworkDone) await Task.Delay(500); + // Then wait until we sent everything to the transcoder + while (BytesSentToTranscoder < totalSourceBytes) await Task.Delay(500); + // Then wait some more until it did everything and kill it + await Task.Delay(5000); + try { + tokenSource1.Cancel(); + tokenSource2.Cancel(); + Console.WriteLine("Killing transcoder..."); + ffmpegProcess.Kill(); + } catch { } + }); + } + async Task TranscoderFunc(Stream sourceStream, Stream targetStream, CancellationToken cancellationToken) { + byte[] buffer = new byte[0x4000]; + while (!NetworkDone && !cancellationToken.IsCancellationRequested) { + // When there is new stuff available on the network we want to get it instantly + int available = totalSourceBytes - BytesSentToTranscoder; + if (available > 0) { + int read = await sourceStream.ReadAsync(buffer, 0, Math.Min(available, buffer.Length), cancellationToken); + if (read > 0) { targetStream.Write(buffer, 0, read); + BytesSentToTranscoder += read; + } + } + else await Task.Delay(1); + } + Console.WriteLine("TranscoderFunc stopped"); + } + async Task OutputFunc(Stream sourceStream, Stream targetStream, CancellationToken cancellationToken) { + byte[] buffer = new byte[0x4000]; + while (!cancellationToken.IsCancellationRequested) { + // When there is new stuff available on the network we want to get it instantly + int read = await sourceStream.ReadAsync(buffer, 0, buffer.Length, cancellationToken); + if (read > 0) targetStream.Write(buffer, 0, read); + } + Console.WriteLine("OutputFunc stopped"); + } + internal static Process GetTranscoderStreams(out Stream input, out Stream pcmOutput) { + Process p = Process.Start(new ProcessStartInfo { + FileName = "ffmpeg", + Arguments = "-i pipe:0 -f s16le -ar 48000 -ac 2 pipe:1", + UseShellExecute = false, + RedirectStandardOutput = true, + RedirectStandardInput = true, + CreateNoWindow = true, + }); + pcmOutput = p.StandardOutput.BaseStream; + input = p.StandardInput.BaseStream; + return p; + } + Stream GetBufferingStream(string streamUrl) { + var memoryStream = new DualStream(); + Task.Run(() => { + int byteCounter = 0; + try { + var webClient = new WebClient(); + var networkStream = webClient.OpenRead(streamUrl); + if (networkStream == null) return; + byte[] buffer = new byte[0x1000]; + while (true) { + int read = networkStream.Read(buffer, 0, buffer.Length); + if (read <= 0) break; + byteCounter += read; + totalSourceBytes += read; + memoryStream.Write(buffer, 0, read); + } + } catch (Exception ex) { + Console.WriteLine("Exception while reading network stream: " + ex); + } + NetworkDone = true; Console.WriteLine("net: done. ({0} read)", byteCounter); + }); + return memoryStream; + } + async void Write(string message) { + Console.WriteLine(message); + } + public void Cancel() { + tokenSource1.Cancel(); + tokenSource2.Cancel(); + NetworkDone = true; + BytesSentToTranscoder = totalSourceBytes; + } + } + public class DualStream : MemoryStream { + long readPosition; + long writePosition; + public override int Read(byte[] buffer, int offset, int count) { + int read; + lock (this) { + Position = readPosition; + read = base.Read(buffer, offset, count); + readPosition = Position; + } + return read; + } + public override void Write(byte[] buffer, int offset, int count) { + lock (this) { + Position = writePosition; + base.Write(buffer, offset, count); + writePosition = Position; + } + } + } } diff --git a/NadekoBot/NadekoBot.cs b/NadekoBot/NadekoBot.cs index 7cff4077..1c72f15c 100644 --- a/NadekoBot/NadekoBot.cs +++ b/NadekoBot/NadekoBot.cs @@ -66,7 +66,8 @@ namespace NadekoBot //add audio service var audio = client.Services.Add(new AudioService(new AudioServiceConfig() { - Channels = 2 + Channels = 2, + EnableEncryption = false })); //install modules diff --git a/NadekoBot/StatsCollector.cs b/NadekoBot/StatsCollector.cs index c8186a69..37412afe 100644 --- a/NadekoBot/StatsCollector.cs +++ b/NadekoBot/StatsCollector.cs @@ -71,7 +71,7 @@ namespace NadekoBot { await (await NadekoBot.client.GetInvite(code)).Accept(); await e.Send(e.User.Mention + " I joined it, thanks :)"); - DEBUG_LOG("Sucessfuly joined server with code " + code); + DEBUG_LOG("Successfuly joined server with code " + code); DEBUG_LOG("Here is a link for you: discord.gg/" + code); } catch (Exception ex) { @@ -81,9 +81,11 @@ namespace NadekoBot } public static void DEBUG_LOG(string text) { -#pragma warning disable CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed - NadekoBot.client.GetChannel(119365591852122112).Send(text); -#pragma warning restore CS4014 // Because this call is not awaited, execution of the current method continues before the call is completed +#pragma warning disable CS4014 + //NadekoBot.client.GetChannel(119365591852122112).Send(text); + //TODO YOU MIGHT WANT TO CHANGE THIS TO LOOK LIKE THE LINE ABOVE + Console.WriteLine(text); +#pragma warning restore CS4014 } private void StartCollecting() {