Akka
Web
DotNet Core
此文件會手把手教學,該如何在一個Web Server上啟用Akka系統,算偏向建置SOP,故不會在框架上多作解釋。閱讀時須對Dot Net Core與Akka.Net需有一定的了解程度。
此篇文章會以在Web Server內新增一個與外部PLC溝通的範例去實作。
在Visual Studio 2019新增Asp Dot Net Core Web應用程式 => 選擇Web MVC應用程式(進階設定先不作設置,可以先取消勾選)
Log工具實作,系統建置前先設計Akka Log實體Class,在此我們宣告一個ILog介面可抽換Log實際紀錄的方法。目前此例Log實作只對Akka Log做設計。
public interface ILog
{
void E(string title,string content);
void A(string title,string contetnt);
void I(string title,string content);
void D(string title,string content);
}
public class NLogSend : ILog
{
private ILoggingAdapter _nlog;
public NLogSend(ILoggingAdapter nlog = null)
{
_nlog = nlog;
}
public void A(string title, string contetnt)
{
_nlog.Warning("【" + title + "】" + ":" + contetnt);
}
public void D(string title, string content)
{
_nlog.Debug("【" + title + "】" + ":" + content);
}
public void E(string title, string content)
{
_nlog.Error("【" + title + "】" + ":" + content);
}
public void I(string title, string content)
{
_nlog.Info("【" + title + "】" + ":" + content);
}
}
在LogSender撰寫完後,接下來在Web Server下新建NLog Config檔案。此檔為Web Server裡Akka系統運作記錄Log的相關設定規則設置。
<?xml version="1.0" encoding="utf-8"?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
throwConfigExceptions="true"
internalLogToConsole="true"
internalLogLevel="info">
<targets async="true" >
</targets>
<rules>
</rules>
</nlog>
建至Akka基礎使用Base Class,這邊我們先跟據Akka Life Cycle紀錄Actor的生命週期去設計BaseActor。再根據Akka Socket功能額外設置BaseClient與BaseServer Actor。最後在設計一些基礎設置Model與Akka管理功能。
public class AkkaSocketIP
{
public IPEndPoint LocalIpEndPoint { get; set; }
public IPEndPoint RemoteIpEndPoint { get; set; }
}
public class BaseActor : ReceiveActor
{
protected ILog _log;
private string _actorName;
public BaseActor(ILog log)
{
_log = log;
_actorName = Context.Self.Path.Name;
}
protected override void PreStart()
{
_log.I("AThread生命週期", _actorName + "PreStart");
base.PreStart();
}
protected override void PreRestart(Exception reason, object message)
{
_log.E("AThread生命週期", _actorName + " PreRestart");
_log.E("AThread生命週期", "Reason:" + reason.Message);
base.PreRestart(reason, message);
}
protected override void PostStop()
{
_log.I("AThread生命週期", _actorName + " PostStop");
base.PostStop();
}
protected override void PostRestart(Exception reason)
{
_log.E("AThread生命週期", _actorName + " PostRestart");
_log.E("AThread生命週期", "Reason:" + reason.Message);
base.PostRestart(reason);
}
}
public IActorRef SndActor;
protected IActorRef _connection;
protected string _actorName;
private readonly AkkaSocketIP _AkkaSocketIP;
public BaseClientActor(AkkaSocketIP socketIP, ILog log) : base(log)
{
_AkkaSocketIP = socketIP;
_actorName = Context.Self.Path.Name;
Connect();
Receive<Tcp.Connected>(message => TCPConnected(message));
Receive<Tcp.ConnectionClosed>(message => TcpConnectionClosed(message));
Receive<Tcp.CommandFailed>(message => TcpCommandFailed(message));
Receive<Tcp.Received>(message => TcpReceivedData(message));
}
protected virtual void TcpReceivedData(Tcp.Received msg)
{
_log.I("TCP接收資料", "Handle_Tcp_Received. message=" + msg.ToString());
_log.I("TCP接收資料", "ByteString=" + msg.Data.ToString());
_log.I("TCP接收資料", "Count=" + msg.Data.Count.ToString());
}
protected virtual void TCPConnected(Tcp.Connected message)
{
_log.I("TCP連線成功", " Connected. message=" + message.ToString());
_log.I("TCP連線成功", " LocalAddress=" + message.LocalAddress.ToString());
_log.I("TCP連線成功", " RemoteAddress=" + message.RemoteAddress.ToString());
_connection = Sender;
_connection.Tell(new Tcp.Register(Self));
}
protected virtual void TcpConnectionClosed(Tcp.ConnectionClosed message)
{
_log.I("TCP連線關閉", " Tcp.ConnectionClosed. message=" + message.ToString());
_log.I("TCP連線關閉", " Message.Cause=" + message.Cause);
_log.I("TCP連線關閉", " Message.IsAborted=" + message.IsAborted.ToString());
_log.I("TCP連線關閉", " Message.IsConfirmed=" + message.IsConfirmed.ToString());
_log.I("TCP連線關閉", " Message.IsErrorClosed=" + message.IsErrorClosed.ToString());
_log.I("TCP連線關閉", " Message.IsPeerClosed=" + message.IsPeerClosed.ToString());
Connect();
}
protected virtual void TcpCommandFailed(Tcp.CommandFailed message)
{
_log.E("TCP操作失敗", " Tcp.CommandFailed. message=" + message.ToString());
_log.E("TCP操作失敗", " Tcp.CommandFailed. message=" + message.ToString());
_log.E("TCP操作失敗", " Cmd=" + message.Cmd.ToString());
_log.E("TCP操作失敗", " Message=" + message.Cmd.FailureMessage);
Connect();
}
/// <summary>
/// 連線
/// </summary>
protected void Connect()
{
Context.System.Tcp().Tell(new Tcp.Connect(_AkkaSocketIP.RemoteIpEndPoint));
}
public class BaseServerActor : BaseActor
{
public BaseServerActor(AkkaSocketIP akkaSysIp, ILog log) : base(log)
{
Context.System.Tcp().Tell(new Tcp.Bind(Self, akkaSysIp.LocalIpEndPoint));
Receive<Tcp.Bound>(message => TcpBound(message));
Receive<Tcp.Connected>(message => TCPConnected(message));
Receive<Tcp.CommandFailed>(msg => TCPCommandFail(msg));
Receive<Tcp.ConnectionClosed>(message => TcpConnectionClosed(message));
Receive<Tcp.Received>(message => TcpReceivedData(message));
}
/// <summary>
/// Tcp監聽事件觸發
/// </summary>
private void TcpBound(Tcp.Bound msg)
{
_log.I("TCP監聽", "Tcp.Bound Success. Listening on " + msg.LocalAddress);
}
protected virtual void TcpReceivedData(Tcp.Received msg)
{
_log.I("TCP接收資料", "Handle_Tcp_Received. message=" + msg.ToString());
_log.I("TCP接收資料", "ByteString=" + msg.Data.ToString());
_log.I("TCP接收資料", "Count=" + msg.Data.Count.ToString());
}
protected virtual void TCPConnected(Tcp.Connected msg)
{
_log.I("TCP已被連線", " Tcp.Connected. message=" + msg.ToString());
_log.I("TCP已被連線", " message.LocalAddress=" + msg.LocalAddress.ToString());
_log.I("TCP已被連線", " message.RemoteAddress=" + msg.RemoteAddress.ToString());
Sender.Tell(new Tcp.Register(Self));
}
protected virtual void TCPCommandFail(Tcp.CommandFailed msg)
{
Console.WriteLine($"[Error] Tcp Command Failed. {msg.Cmd}");
}
protected virtual void TcpConnectionClosed(Tcp.ConnectionClosed msg)
{
_log.I("TCP連線關閉", " Tcp.ConnectionClosed. message=" + msg.ToString());
_log.I("TCP連線關閉", " Message.Cause=" + msg.Cause);
_log.I("TCP連線關閉", " Message.IsAborted=" + msg.IsAborted.ToString());
_log.I("TCP連線關閉", " Message.IsConfirmed=" + msg.IsConfirmed.ToString());
_log.I("TCP連線關閉", " Message.IsErrorClosed=" + msg.IsErrorClosed.ToString());
_log.I("TCP連線關閉", " Message.IsPeerClosed=" + msg.IsPeerClosed.ToString());
}
protected virtual void TcpCommandFailed(Tcp.CommandFailed msg)
{
_log.E("TCP操作失敗", " Tcp.CommandFailed. message=" + msg.ToString());
_log.E("TCP操作失敗", " Tcp.CommandFailed. message=" + msg.ToString());
_log.E("TCP操作失敗", " Cmd=" + msg.Cmd.ToString());
_log.E("TCP操作失敗", " Message=" + msg.Cmd.FailureMessage);
}
}
public static class AkkaPara
{
/// <summary>
/// 建立 config
/// </summary>
/// <param name="port"> 本地端接口埠號 </param>
public static Config Config(string port, string publicHostName)
{
var strConfig = @"
akka
{
loglevel = DEBUG
loggers = [""Akka.Logger.NLog.NLogLogger, Akka.Logger.NLog""]
actor
{
provider = remote
debug
{
receive = on # log any received message
autoreceive = on # log automatically received messages, e.g. PoisonPill
lifecycle = on # log actor lifecycle changes
event-stream = on # log subscription changes for Akka.NET event stream
unhandled = on # log unhandled messages sent to actors
}
}
remote
{
dot-netty.tcp
{
port = {port}
hostname = 0.0.0.0
public-hostname = {publicHostName}
}
}
io
{
tcp
{
direct-buffer-pool
{
buffer-size = 1024
}
max-received-message-size = unlimited
}
}
}";
strConfig = strConfig.Replace("{port}", port);
return ConfigurationFactory.ParseString(strConfig);
}
}
public interface ISysAkkaManager
{
ActorSystem ActorSystem { get; }
IActorRef CreateActor<T>() where T : ActorBase;
IActorRef CreateChildActor<T>(IUntypedActorContext context) where T : ActorBase;
IActorRef GetActor(string actName);
ActorSelection GetActorSelection(string actorPath);
}
public class SysAkkaManager : ISysAkkaManager
{
public ActorSystem ActorSystem { get; }
private readonly Dictionary<string, IActorRef> _actorDics = new Dictionary<string, IActorRef>();
public SysAkkaManager(ActorSystem actorSystem)
{
ActorSystem = actorSystem;
}
public IActorRef CreateActor<T>() where T : ActorBase
{
return CreateActor<T>(() => ActorSystem);
}
public IActorRef CreateChildActor<T>(IUntypedActorContext context) where T : ActorBase
{
return CreateActor<T>(() => context);
}
private IActorRef CreateActor<T>(Func<IActorRefFactory> func) where T : ActorBase
{
var actName = typeof(T).Name;
if (_actorDics.ContainsKey(actName)) return _actorDics[actName];
return RegisterActor(actName, func().ActorOf(ActorSystem.DI().Props<T>(), typeof(T).Name));
}
private IActorRef RegisterActor(string actName, IActorRef actor)
{
if (_actorDics.ContainsKey(actName)) throw new ArgumentException($"You have been register Action {actName}");
_actorDics.Add(actName, actor);
return actor;
}
public IActorRef GetActor(string actName)
{
if (!_actorDics.ContainsKey(actName)) throw new ArgumentException($"It't doesn't has register Action {actName}");
return _actorDics[actName];
}
public ActorSelection GetActorSelection(string actorPath)
{
return ActorSystem.ActorSelection(actorPath);
}
}
上述Akka Base Template敲好後,接著實作與外部連線的PLC Akka System
public class PlcRcv : BaseServerActor
{
private readonly ISysAkkaManager _akkaManager;
private readonly ILog _log;
private readonly IActorRef _plcRcvEditActor;
public PlcRcv(ISysAkkaManager akkaManager, AkkaSocketIP akkaSysIp, ILog log) : base(akkaSysIp, log)
{
_akkaManager = akkaManager;
_plcRcvEditActor = akkaManager.GetActor(nameof(PlcRcvEdit));
_log = log;
}
protected override void TcpReceivedData(Tcp.Received msg)
{
_log.I("接收到訊息"," [Info] Handle_Tcp_Received. message=" + msg.ToString());
_log.I("接收到訊息"," [Info] Count=" + msg.Data.Count.ToString());
_plcRcvEditActor.Tell(msg.Data.ToArray());
}
}
public class PlcRcvEdit : BaseActor
{
private readonly ILog _log;
public PlcRcvEdit(ILog log) : base(log){
_log = log;
}
}
public class PlcSnd : BaseClientActor
{
private ILog _log;
public PlcSnd(AkkaSocketIP akkaSysIp, ILog log) : base(akkaSysIp, log)
{
_log = log;
}
}
public class PlcSndEdit : BaseActor
{
private readonly IActorRef _plcSndActor;
private readonly ILog _log;
public PlcSndEdit(ISysAkkaManager akkaManager, ILog log) : base(log)
{
_plcSndActor = akkaManager.GetActor(nameof(PlcSnd));
_log = log;
}
}
在ExternalSys Akka系統建置好後,目前使用的架構方法是使用Akka NLog去紀錄Akka系統的Log,設置方式則透過NLog Config設定準則去更改紀錄Log機制。
在剛剛建置的Akka Plc得知,除了Akka本身機制的Log設置紀錄,我們還須新增PlcRcv, PlcRcvEdit, PlcSnd, PlcSndEdit。
需設置參數區段
<?xml version="1.0" encoding="utf-8"?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
throwConfigExceptions="true"
internalLogToConsole="true"
internalLogLevel="info">
<variable name="LogPath" value="C:\AkkaWebTemplate"/>
<targets async="true" >
<target name="LogControl"
xsi:type="File"
fileName="${LogPath}/AkkaLog.log"
archiveFileName="${LogPath}/AkkaLog.{#}.log"
archiveNumbering="DateAndSequence"
archiveAboveSize="1000000"
encoding="utf-8"
maxArchiveFiles="5"
archiveDateFormat="yyyyMMdd"
archiveEvery="Day"
/>
<target name="PlcMgrLog"
xsi:type="File"
fileName="${LogPath}/PlcMgrLog.log"
archiveFileName="${LogPath}/PlcMgrLog.{#}.log"
archiveNumbering="DateAndSequence"
archiveAboveSize="1000000"
encoding="utf-8"
maxArchiveFiles="5"
archiveDateFormat="yyyyMMdd"
archiveEvery="Day"
/>
<target name="PlcRcvLog"
xsi:type="File"
fileName="${LogPath}/PlcRcvLog.log"
archiveFileName="${LogPath}/PlcRcvLog.{#}.log"
archiveNumbering="DateAndSequence"
archiveAboveSize="1000000"
encoding="utf-8"
maxArchiveFiles="5"
archiveDateFormat="yyyyMMdd"
archiveEvery="Day"
/>
<target name="PlcRcvEditLog"
xsi:type="File"
fileName="${LogPath}/PlcRcvEditLog.log"
archiveFileName="${LogPath}/PlcRcvEditLog.{#}.log"
archiveNumbering="DateAndSequence"
archiveAboveSize="1000000"
encoding="utf-8"
maxArchiveFiles="5"
archiveDateFormat="yyyyMMdd"
archiveEvery="Day"
/>
<target name="PlcSndLog"
xsi:type="File"
fileName="${LogPath}/PlcSndLog.log"
archiveFileName="${LogPath}/PlcSndLog.{#}.log"
archiveNumbering="DateAndSequence"
archiveAboveSize="1000000"
encoding="utf-8"
maxArchiveFiles="5"
archiveDateFormat="yyyyMMdd"
archiveEvery="Day"
/>
<target name="PlcSndEditLog"
xsi:type="File"
fileName="${LogPath}/PlcSndEditLog.log"
archiveFileName="${LogPath}/PlcSndEditLog.{#}.log"
archiveNumbering="DateAndSequence"
archiveAboveSize="1000000"
encoding="utf-8"
maxArchiveFiles="5"
archiveDateFormat="yyyyMMdd"
archiveEvery="Day"
/>
</targets>
<rules>
<logger name="PlcMgrLog" minlevel="Debug" writeTo="PlcMgrLog"/>
<logger name="PlcRcvLog" minlevel="Debug" writeTo="PlcRcvLog" />
<logger name="PlcRcvEditLog" minlevel="Debug" writeTo="PlcRcvEditLog"/>
<logger name="PlcSndLog" minlevel="Debug" writeTo="PlcSndLog" />
<logger name="PlcSndEditLog" minlevel="Debug" writeTo="PlcSndEditLog" />
<logger name="*" minlevel="Debug" writeTo="LogControl" />
</rules>
</nlog>
因為是在Web Server Service下啟用Akka,故需在appsetting.json內設置相關參數
"AkkaConfigure": {
"Name": "AkkaSystem",
"Port": "8200",
"PublicHostIP": "127.0.0.1",
"PLC": {
"RemoteIp": "127.0.0.1",
"RemotePort": "7791",
"LocalIp": "127.0.0.1",
"LocalPort": "9101"
}
}
Akka與外部連結系統建置完成後,接著要在Web Server啟用Akka系統。因為 Dot Net Core框架已全面使用Dependency Injection(DI)機制,故此部分在Akka建至需Align現有框架。所以需要將Akka註冊到DI Container裡
需註冊至Container項目
public class AkkaDIService
{
private readonly IServiceCollection _service;
private readonly IConfiguration _configuration;
public AkkaDIService(IServiceCollection service, IConfiguration configuration)
{
_service = service;
_configuration = configuration;
}
public void Inject()
{
// Register Akka System
_service.AddSingleton<ISysAkkaManager>(provider =>
{
var sysName = _configuration["AkkaConfigure:Name"];
var sysPort = _configuration["AkkaConfigure:Port"];
var sysPublicIP = _configuration["AkkaConfigure:PublicHostIP"];
var actSystem = ActorSystem.Create(sysName, AkkaPara.Config(sysPort, sysPublicIP));
actSystem.UseServiceProvider(provider);
return new SysAkkaManager(actSystem);
});
#region Register Akka PLC
_service.AddScoped(p =>
{
var akkaManager = p.GetService<ISysAkkaManager>();
return new PlcMgr(akkaManager, GetLog<PlcMgr>(p, "PlcMgrLog"));
});
_service.AddScoped(p =>
{
var ipPoint = NewIPPoint("PLC");
var akkaManager = p.GetService<ISysAkkaManager>();
return new PlcRcv(akkaManager, ipPoint, GetLog<PlcRcv>(p, "PlcRcvLog"));
});
_service.AddScoped(p =>
{
return new PlcRcvEdit(GetLog<PlcRcvEdit>(p, "PlcRcvEditLog"));
});
_service.AddScoped(p =>
{
var ipPoint = NewIPPoint("PLC");
return new PlcSnd(ipPoint, GetLog<PlcSnd>(p, "PlcSndLog"));
});
_service.AddScoped(p =>
{
var akkaManager = p.GetService<ISysAkkaManager>();
return new PlcSndEdit(akkaManager,GetLog<PlcSndEdit>(p, "PlcSndEditLog"));
});
#endregion
// Register Akka Server Engin
_service.AddScoped(provider =>
{
var akkaManager = provider.GetService<ISysAkkaManager>();
return new AkkaServerEngine(akkaManager);
});
}
private ILog GetLog<T>(IServiceProvider context, string nlogName)
{
return new NLogSend(Logging.GetLogger(context.GetService<ISysAkkaManager>().ActorSystem, nlogName));
}
private AkkaSocketIP NewIPPoint(string sysName)
{
var localIp = _configuration[$"AkkaConfigure:{sysName}:LocalIp"];
var localPort = _configuration[$"AkkaConfigure:{sysName}:LocalPort"];
var remoteIp = _configuration[$"AkkaConfigure:{sysName}:RemoteIp"];
var remotePort = _configuration[$"AkkaConfigure:{sysName}:RemotePort"];
var ipPoint = new AkkaSocketIP
{
LocalIpEndPoint = new IPEndPoint(IPAddress.Parse(localIp), Int32.Parse(localPort)),
RemoteIpEndPoint = new IPEndPoint(IPAddress.Parse(remoteIp), Int32.Parse(remotePort))
};
return ipPoint;
}
}
public class AkkaServerEngine
{
public AkkaServerEngine(ISysAkkaManager akkaManager)
{
akkaManager.CreateActor<PlcMgr>();
}
}
Akka DI相關設置Class與啟動Engine建至完後,將Akka DI Class與Engine加入Starup。
在ConfigureServices新增AkkaSysDIService,並使用Inject() Method。接著在Configure啟用AkkaEngine。在Web Server啟動時,走Middleware Pipline時就會啟動Akka System。
private readonly IConfiguration _configuration;
private readonly IWebHostEnvironment _environment;
public Startup(IConfiguration configuration,IWebHostEnvironment environment)
{
_configuration = configuration;
_environment = environment;
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IServiceProvider serviceProvider)
public void ConfigureServices(IServiceCollection services)
{
services.AddControllersWithViews();
new AkkaDIService(services, _configuration).Inject();
}
// 啟用Akka
var serverEngine = serviceProvider.GetService<AkkaServerEngine>();
Web啟動後,可用第三方SocketTest3測試是否可以跟Web Service裡的Akka連線以及檢察Log路徑檔是否有成功產生Nlog檔案
Akka Log檔案產生