GenericDeviceProcessor.cs 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. using Edge.Core.Parser;
  2. using Edge.Core.Parser.BinaryParser.MessageEntity;
  3. using Edge.Core.Processor.Communicator;
  4. using Edge.Core.Processor.Dispatcher.Attributes;
  5. using Microsoft.Extensions.DependencyInjection;
  6. using Microsoft.Extensions.Logging;
  7. using Microsoft.Extensions.Logging.Abstractions;
  8. using System;
  9. using System.Collections.Concurrent;
  10. using System.Collections.Generic;
  11. using System.Diagnostics;
  12. using System.Linq;
  13. using System.Text;
  14. using System.Threading;
  15. using System.Threading.Tasks;
  16. namespace Edge.Core.Processor
  17. {
  18. [MetaPartsDescriptor(
  19. "lang-zh-cn:通用型设备处理器lang-en-us:GenericDeviceProcessor",
  20. "lang-zh-cn:通用型设备处理器lang-en-us:Generic processor for comm with devices")]
  21. public class GenericDeviceProcessor<TRaw, TMessage> : IDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  22. {
  23. #region performance related
  24. //private static NLog.Logger perfLogger = NLog.LogManager.LoadConfiguration("nlog.config").GetLogger("Performance");
  25. private static ILogger perfLogger = NullLogger.Instance;
  26. private Stopwatch perfWatch_From_CommOnDataReceived_To_HandlerProcessed;
  27. private Stopwatch perfWatch_From_CommOnDataReceived_To_CommOnDataWriting;
  28. private ConcurrentQueue<string> perfPeriodLogStrings;
  29. private System.Timers.Timer perfPeriodToLogFileTimer;
  30. #endregion
  31. private object syncObject = new object();
  32. public string MetaConfigName { get; set; }
  33. public IContext<TRaw, TMessage> Context { get; protected set; }
  34. public IList<IInterceptor<TRaw, TMessage>> Interceptors { get; }
  35. public ICommunicator<TRaw, TMessage> Communicator { get; }
  36. public GenericDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  37. {
  38. if (services != null)
  39. {
  40. var loggerFactory = services.GetRequiredService<ILoggerFactory>();
  41. perfLogger = loggerFactory.CreateLogger("Performance");
  42. }
  43. if (perfLogger.IsEnabled(LogLevel.Trace))
  44. {
  45. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed = new Stopwatch();
  46. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed.Stop();
  47. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting = new Stopwatch();
  48. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting.Stop();
  49. this.perfPeriodLogStrings = new ConcurrentQueue<string>();
  50. this.perfPeriodToLogFileTimer = new System.Timers.Timer(30 * 1000);// flush to log every 30 seconds
  51. this.perfPeriodToLogFileTimer.Elapsed += (a, b) =>
  52. {
  53. try
  54. {
  55. this.perfPeriodToLogFileTimer.Stop();
  56. if (!perfLogger.IsEnabled(LogLevel.Trace))
  57. {
  58. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Stop();
  59. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Stop();
  60. return;
  61. }
  62. }
  63. catch { }
  64. finally { }
  65. try
  66. {
  67. StringBuilder sb = null;
  68. string s = null;
  69. while (this.perfPeriodLogStrings.TryDequeue(out s))
  70. {
  71. if (sb == null) sb = new StringBuilder();
  72. sb.Append(s + Environment.NewLine);
  73. }
  74. if (sb != null && sb.Length > 0)
  75. perfLogger.LogTrace(sb.ToString());
  76. }
  77. finally
  78. {
  79. this.perfPeriodToLogFileTimer.Start();
  80. }
  81. };
  82. this.perfPeriodToLogFileTimer.Start();
  83. }
  84. this.Communicator = communicator;
  85. var incoming = new HistoryKeepIncoming<TMessage>(10);
  86. Outgoing<TRaw, TMessage> outgoing = new Outgoing<TRaw, TMessage>(incoming, services);
  87. this.Context = new Context<TRaw, TMessage>(this, handler, communicator, incoming, outgoing);
  88. this.Context.Outgoing.OnWriting += (s, a) =>
  89. {
  90. if (a.ExtraControlParameter != null)
  91. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  92. else
  93. this.Communicator.Write(a.Message);
  94. };
  95. this.Communicator.OnConnected += (communicator, a) =>
  96. {
  97. outgoing.OnConnect();
  98. handler.SendQRCodeAsync();
  99. //if (communicator is IClinet)
  100. //{
  101. // IClinet clinet = (IClinet)communicator;
  102. // handler.SetTcpClient(clinet?.GetTcpClient(), clinet?.GetServerPort());
  103. // handler.SendQRCodeAsync();
  104. //}
  105. };
  106. this.Communicator.OnDisconnected += (communicator, a) =>
  107. {
  108. outgoing.OnDisconnect();
  109. //handler.OnTcpDisconnect();
  110. };
  111. this.Communicator.OnDataReceived += (s, a) =>
  112. {
  113. lock (this.syncObject)
  114. {
  115. if (perfLogger.IsEnabled(LogLevel.Trace))
  116. {
  117. this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.Restart();
  118. this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.Restart();
  119. }
  120. this.Context.Incoming.DisablePropagate = false;
  121. this.Context.Incoming.Message = a.Message;
  122. if (!this.Context.Incoming.DisablePropagate)
  123. handler.Process(this.Context);
  124. if (perfLogger.IsEnabled(LogLevel.Trace))
  125. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  126. + " From_CommOnDataReceived_To_HandlerProcessed elapsed " + (this.perfWatch_From_CommOnDataReceived_To_HandlerProcessed?.ElapsedMilliseconds ?? -1));
  127. }
  128. };
  129. this.Communicator.OnRawDataWriting += (s, a) =>
  130. {
  131. if (perfLogger.IsEnabled(LogLevel.Trace))
  132. {
  133. this.perfPeriodLogStrings?.Enqueue(DateTime.Now.ToString("HH:mm:ss.fff") + " - "
  134. + " From_CommOnDataReceived_To_CommOnDataWriting elapsed " + (this.perfWatch_From_CommOnDataReceived_To_CommOnDataWriting?.ElapsedMilliseconds ?? -1));
  135. }
  136. };
  137. }
  138. //public void Dispose()
  139. //{
  140. // this.Communicator.Dispose();
  141. //}
  142. public async Task<bool> Start()
  143. {
  144. this.Context.Handler.Init(this.Context);
  145. var r = await this.Communicator.Start();
  146. return r;
  147. }
  148. public Task<bool> Stop()
  149. {
  150. this.perfPeriodToLogFileTimer?.Stop();
  151. this.Communicator.Dispose();
  152. this.Context.Dispose();
  153. if (this.Context.Handler is IDisposable dp)
  154. dp.Dispose();
  155. return Task.FromResult(true);
  156. }
  157. public Task Test(params object[] parameters)
  158. {
  159. return this.Context.Handler.Test(parameters);
  160. }
  161. }
  162. /// <summary>
  163. /// used for scenario of FC as master to actively polling devices.
  164. /// </summary>
  165. /// <typeparam name="TRaw"></typeparam>
  166. /// <typeparam name="TMessage"></typeparam>
  167. [MetaPartsDescriptor(
  168. "lang-zh-cn:能自动发送消息的设备处理器lang-en-us:AutoPollingDeviceProcessor",
  169. "lang-zh-cn:自动发送消息型处理器lang-en-us:Auto send polling message processor for comm with devices")]
  170. public class HalfDuplexActivePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  171. {
  172. [ParamsJsonSchemas("HalfDuplexActivePollingDeviceProcessorCtorSchema")]
  173. public HalfDuplexActivePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, int autoPollingInterval, IServiceProvider services)
  174. : base(handler, communicator, services)
  175. {
  176. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  177. new TimeWindowWithActivePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, autoPollingInterval, services));
  178. this.Context.Outgoing.OnWriting += (s, a) =>
  179. {
  180. if (a.ExtraControlParameter != null)
  181. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  182. else
  183. this.Communicator.Write(a.Message);
  184. };
  185. }
  186. }
  187. /// <summary>
  188. /// used for scenario of FC as slave to receive polling from devices.
  189. /// </summary>
  190. /// <typeparam name="TRaw"></typeparam>
  191. /// <typeparam name="TMessage"></typeparam>
  192. public class HalfDuplexNegativePollingDeviceProcessor<TRaw, TMessage> : GenericDeviceProcessor<TRaw, TMessage> where TMessage : MessageBase
  193. {
  194. public HalfDuplexNegativePollingDeviceProcessor(IDeviceHandler<TRaw, TMessage> handler, ICommunicator<TRaw, TMessage> communicator, IServiceProvider services)
  195. : base(handler, communicator, services)
  196. {
  197. base.Context = new Context<TRaw, TMessage>(this, handler, communicator, base.Context.Incoming,
  198. new TimeWindowWithNegativePollingOutgoing<TRaw, TMessage>(base.Context.Incoming, services));
  199. this.Context.Outgoing.OnWriting += (s, a) =>
  200. {
  201. if (a.ExtraControlParameter != null)
  202. this.Communicator.Write(a.Message, a.ExtraControlParameter);
  203. else
  204. this.Communicator.Write(a.Message);
  205. };
  206. }
  207. }
  208. }