using Edge.Core.IndustryStandardInterface.NetworkController;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using static DeviceInfoToAliIotHubViaGateway.App;
namespace DeviceInfoToAliIotHubViaGateway
{
public static class ExtMethod
{
public static string TopicMaker(this string topicFormatStr, string productKey, string deviceName)
{
return topicFormatStr.Replace("{productKey}", productKey).Replace("{deviceName}", deviceName);
}
public static string TopicMaker(this string topicFormatStr, string productKey, string deviceName, string eventName)
{
return topicFormatStr.Replace("{productKey}", productKey).Replace("{deviceName}", deviceName).Replace("{tsl.event.identifier}", eventName);
}
public static string SignWithHMacSHA1(this byte[] target, byte[] key)
{
using (HMACSHA1 hmac = new HMACSHA1(key))
{
var hash = hmac.ComputeHash(target);
return hash.Select(h => h.ToString("X2")).Aggregate((acc, n) => acc + n);
}
}
///
/// Listen on IMqttClientNetworkController's event of OnMessageReceived, until the ->first<- message came,
/// then try Deserialize this message to object T.
/// or, when timedout, a default(T) will return.
///
///
///
///
/// timedout will return a default(T)
///
public static Task WaitFor(this IMqttClientNetworkController client, string topic, int timeout) where T : class
{
int isResponseGot = 0;
var source = new TaskCompletionSource();
EventHandler callback = null;
callback = (a, b) =>
{
if (b.Message.Topic == topic)
{
try
{
if (typeof(T) == typeof(System.String))
{
if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
{
var fff = Encoding.UTF8.GetString(b.Message.Message) as T;
source.SetResult(fff);
}
}
else
{
var response = JsonSerializer.Deserialize(Encoding.UTF8.GetString(b.Message.Message));
if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
{
source.SetResult(response);
}
}
}
catch (Exception exx)
{
if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
{
source.SetException(exx);
}
}
finally
{
client.OnMessageReceived -= callback;
}
}
};
client.OnMessageReceived += callback;
var _ = new System.Timers.Timer(timeout);
_.Elapsed += (__, ___) =>
{
_.Stop();
if (Interlocked.CompareExchange(ref isResponseGot, 1, 0) == 0)
{
source.SetResult(default(T));
}
};
_.Start();
return source.Task;
}
//public static async Task SubscribeAndWaitForAndUnsubscribe(this IMqttClientNetworkController client, string topic,
// int timeout, Mqtt_QosLevel qosLevel) where T : class
//{
// try
// {
// var subResult = await client.SubscribeAsync(0, topic, qosLevel);
// if (subResult)
// {
// var catchUp = client.WaitFor(topic, timeout);
// var r = await catchUp;
// await client.UnsubscribeAsync(0, topic);
// return r;
// }
// }
// catch (Exception exx)
// {
// try
// {
// await client.UnsubscribeAsync(0, topic);
// }
// catch { }
// var source0 = new TaskCompletionSource();
// source0.SetException(exx);
// return await source0.Task;
// }
// try
// {
// await client.UnsubscribeAsync(0, topic);
// }
// catch { }
// var source1 = new TaskCompletionSource();
// source1.SetException(new InvalidOperationException("SubscribeAsync failed with returned with false"));
// return await source1.Task;
//}
///
/// Subscribes on a topic until the ->first<- message came, and then try Deserialize this message to object T.
/// finally, will Unsubscribe from topic.
/// or, when timedout, a default(T) will return.
///
///
///
///
/// timedout will return a default(T)
///
public static async Task SubscribeAndWaitForThenUnsubscribe(this IMqttClientNetworkController client, string topic,
int timeout, Mqtt_QosLevel qosLevel) where T : class
{
try
{
var subResult = await client.SubscribeAsync(0, topic, qosLevel);
if (subResult)
return await client.WaitFor(topic, timeout);
else
{
var source1 = new TaskCompletionSource();
source1.SetException(new InvalidOperationException("SubscribeAsync failed with returned with false"));
return await source1.Task;
}
}
catch (Exception exx)
{
var source0 = new TaskCompletionSource();
source0.SetException(exx);
return await source0.Task;
}
finally
{
await client.UnsubscribeAsync(0, topic);
}
}
public static string WrapToPropertyChangedInLocalJson(this string propertyName, string value, int requestId)
{
return new List>(){
new KeyValuePair(propertyName,value)}.WrapToPropertiesChangedInLocalJson(requestId);
}
//public static string WrapToPropertiesChangedInLocalJson(this IEnumerable> propertyNameAndValues, int requestId)
//{
// var request = "{ \"id\": \"" + requestId + "\", \"version\": \"1.0\", \"params\": {";
// foreach (var nv in propertyNameAndValues)
// {
// string property_CurState_JsonStr = "\"" + nv.Key + "\": {\"value\": \"" + nv.Value + "\",\"time\": " + DateTime.Now.Ticks + "}";
// request += property_CurState_JsonStr + ",";
// }
// // remove last ,
// request = request.Substring(0, request.Length - 1);
// request += "}, \"method\": \"thing.event.property.post\"}";
// return request;
//}
public static string WrapToPropertiesChangedInLocalJson(this IEnumerable> propertyNameAndValues, int requestId)
{
var request = "{ \"id\": \"" + requestId + "\", \"version\": \"1.0\", \"params\": {";
foreach (var nv in propertyNameAndValues)
{
string variableValue = nv.Value.ToString();
if (nv.Value.GetType() == typeof(System.String))
variableValue = "\"" + variableValue + "\"";
string property_CurState_JsonStr = "\"" + nv.Key + "\": {\"value\": " + variableValue + ",\"time\": " + DateTime.Now.Ticks + "}";
request += property_CurState_JsonStr + ",";
}
// remove last ,
request = request.Substring(0, request.Length - 1);
request += "}, \"method\": \"thing.event.property.post\"}";
return request;
}
}
}