using Confluent.Kafka;
using System.Net;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using Serilog;
namespace CommonLib.MessageBus
{
public sealed class Kafka : MessageBusBase, IDisposable
{
CancellationTokenSource Source = new CancellationTokenSource();
CancellationToken Token;
IProducer<Null, string> p;
string host = "127.0.0.1:9092";
List<Thread> cthreads = new List<Thread>();
public Kafka()
{
Token = Source.Token;
var config = new ProducerConfig
{
BootstrapServers = host,
ClientId = Dns.GetHostName()
};
p = new ProducerBuilder<Null, string>(config).Build();
}
public Kafka(string hostname) : base(hostname)
{
host = hostname;
var config = new ProducerConfig
{
BootstrapServers = host,
ClientId = Dns.GetHostName()
};
p = new ProducerBuilder<Null, string>(config).Build();
}
public async Task<string> PublishTopicsAsync(string exchange_channel, string message)
{
try
{
var dr = await p.ProduceAsync(exchange_channel, new Message<Null, string> { Value = message });
return dr.Value;
}
catch (ProduceException<Null, string> e)
{
Log.Error($"Delivery failed: {e.Error.Reason}");
throw new Exception($"Delivery failed: {e.Error.Reason}");
}
catch (ArgumentException e)
{
Log.Error($"Argument Exception: {e.Message}");
throw new Exception($"Argument Exception: {e.Message}");
}
}
public override void PublishTopics(string exchange_channel, string message)
{
var task = Task.Run(async () => await PublishTopicsAsync(exchange_channel, message));
task.Wait();
}
public override void SubscribeTopic(string exchange_channel, TopicCallback callback_method, object user_data = null)
{
Thread TempThread = new Thread(() =>
{
var config = new ConsumerConfig
{
BootstrapServers = host,
GroupId = Guid.NewGuid().ToString(),
GroupInstanceId = "0",
AutoOffsetReset = AutoOffsetReset.Latest
};
var c = new ConsumerBuilder<Ignore, string>(config).Build();
c.Subscribe(exchange_channel);
while (true)
{
try
{
var consumeResult = c.Consume(Token);
callback_method(consumeResult.Message.Value, user_data);
}
catch (Exception ex)
{
if (Token.IsCancellationRequested)
{
c.Unsubscribe();
c.Dispose();
return;
}
else
{
c.Unsubscribe();
c.Dispose();
throw new Exception(ex.Message);
}
}
}
});
TempThread.IsBackground = true;
TempThread.Start();
cthreads.Add(TempThread);
}
public void Dispose()
{
Source.Cancel();
p.Dispose();
Task.Delay(1000).Wait();
}
}
}