using Confluent.Kafka;
using System.Net;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using Serilog;

namespace CommonLib.MessageBus
{
    public sealed class Kafka : MessageBusBase, IDisposable
    {
        CancellationTokenSource Source = new CancellationTokenSource();
        CancellationToken Token;
        IProducer<Null, string> p;
        string host = "127.0.0.1:9092";
        List<Thread> cthreads = new List<Thread>();

        public Kafka()
        {
            Token = Source.Token;
            var config = new ProducerConfig
            {
                BootstrapServers = host,
                ClientId = Dns.GetHostName()
            };
            p = new ProducerBuilder<Null, string>(config).Build();
        }
        public Kafka(string hostname) : base(hostname)
        {
            host = hostname;
            var config = new ProducerConfig
            {
                BootstrapServers = host,
                ClientId = Dns.GetHostName()
            };
            p = new ProducerBuilder<Null, string>(config).Build();
        }

        public async Task<string> PublishTopicsAsync(string exchange_channel, string message)
        {
            try
            {
                var dr = await p.ProduceAsync(exchange_channel, new Message<Null, string> { Value = message });
                return dr.Value;
            }
            catch (ProduceException<Null, string> e)
            {
                Log.Error($"Delivery failed: {e.Error.Reason}");
                throw new Exception($"Delivery failed: {e.Error.Reason}");
            }
            catch (ArgumentException e)
            {
                Log.Error($"Argument Exception: {e.Message}");
                throw new Exception($"Argument Exception: {e.Message}");
            }
        }

        public override void PublishTopics(string exchange_channel, string message)
        {
            var task = Task.Run(async () => await PublishTopicsAsync(exchange_channel, message));
            task.Wait();
        }

        public override void SubscribeTopic(string exchange_channel, TopicCallback callback_method, object user_data = null)
        {
            Thread TempThread = new Thread(() =>
            {
                var config = new ConsumerConfig
                {
                    BootstrapServers = host,
                    GroupId = Guid.NewGuid().ToString(),
                    GroupInstanceId = "0",
                    AutoOffsetReset = AutoOffsetReset.Latest
                };
                var c = new ConsumerBuilder<Ignore, string>(config).Build();
                c.Subscribe(exchange_channel);
                while (true)
                {
                    try
                    {
                        var consumeResult = c.Consume(Token);
                        callback_method(consumeResult.Message.Value, user_data);
                    }
                    catch (Exception ex)
                    {
                        if (Token.IsCancellationRequested)
                        {
                            c.Unsubscribe();
                            c.Dispose();
                            return;
                        }
                        else
                        {
                            c.Unsubscribe();
                            c.Dispose();
                            throw new Exception(ex.Message);
                        }
                    }
                }
            });
            TempThread.IsBackground = true;
            TempThread.Start();
            cthreads.Add(TempThread);
        }

        public void Dispose()
        {
            Source.Cancel();
            p.Dispose();
            Task.Delay(1000).Wait();
        }
    }
}