RabbitMQHandleBackgroundService.cs 6.58 KB
using HHECS.BllModel;
using HHECS.DAQShared.Models;
using RabbitMQ.Client;
using System.Text.Json;

namespace HHECS.DAQServer.Services
{
    /// <summary>
    /// RabbitMQ存储线程
    /// </summary>
    public class RabbitMQHandleBackgroundService : BackgroundService
    {
        private readonly IFreeSql _freeSql;
        private readonly DataCacheService _dataCacheService;
        private readonly ILogger<RabbitMQHandleBackgroundService> _logger;
        private readonly int _limit = 1000;
        private DateTime _lastReloadTime = DateTime.MinValue;

        private List<EquipmentExtend> _equipments = new List<EquipmentExtend>();

        private List<EquipmentTypeExtend> _equipmentTypes = new List<EquipmentTypeExtend>();

        private readonly bool _isProductionEnvironment;

        /// <summary>
        /// 数据刷新时间 间隔
        /// </summary>
        private readonly TimeSpan _reloadTimeSpan = TimeSpan.FromMinutes(1);

        public RabbitMQHandleBackgroundService(IFreeSql freeSql, DataCacheService dataCacheService, IConfiguration configuration, ILogger<RabbitMQHandleBackgroundService> logger)
        {
            _freeSql = freeSql;
            _dataCacheService = dataCacheService;
            _logger = logger;
            _isProductionEnvironment = configuration.GetValue<bool>("IsProductionEnvironment");
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            //失败缓存数据
            var equipmentDataRecordTemps = new List<EquipmentDataRecord>();
            var factory = new ConnectionFactory()
            {
                HostName = "172.16.29.90",
                UserName = "producer",
                Password = "Aa123456",
                VirtualHost = "hhecs.daq"
            };
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    if (_dataCacheService.EquipmentDataRecordQueue.IsEmpty)
                    {
                        await Task.Delay(1000, stoppingToken);
                        continue;
                    }

                    //每分钟刷新一次
                    if ((DateTime.Now - _lastReloadTime) > _reloadTimeSpan)
                    {
                        var result = ReloadData();
                        if (!result.Success)
                        {
                            _logger.LogError($"[{nameof(RabbitMQHandleBackgroundService)}]线程,数据刷新失败:{result.Msg}");
                        }
                        _lastReloadTime = DateTime.Now;
                    }

                    if (_equipments.Count == 0 || _equipmentTypes.Count == 0)
                    {
                        await Task.Delay(1000, stoppingToken);
                        continue;
                    }

                    //存在数据,需要先处理
                    if (equipmentDataRecordTemps.Count > 0)
                    {
                        //异常情况,待完善
                    }

                    using var connection = factory.CreateConnection();
                    var total = 0;
                    do
                    {
                        //队列为空,结束循环
                        if (_dataCacheService.EquipmentDataRecordQueue.IsEmpty)
                        {
                            break;
                        }

                        _dataCacheService.EquipmentDataRecordQueue.TryDequeue(out var data);
                        if (data != null)
                        {
                            var equipmentTypeId = _equipments.Where(x => x.Code == data.EquipmentCode).Select(x => x.EquipmentTypeId).FirstOrDefault();
                            if (equipmentTypeId == default)
                            {
                                continue;
                            }

                            var equipmentTypeCode = _equipmentTypes.Where(x => x.Id == equipmentTypeId).Select(x => x.Code).FirstOrDefault();

                            if (string.IsNullOrWhiteSpace(equipmentTypeCode))
                            {
                                continue;
                            }

                            using var channel = connection.CreateModel();
                            var exchangeName = string.Empty;
                            if (!_isProductionEnvironment)
                            {
                                exchangeName = $"daq.development.{equipmentTypeCode}";
                            }
                            else
                            {
                                exchangeName = $"daq.production.{equipmentTypeCode}";
                            }

                            channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, true);

                            // 数据
                            var body = JsonSerializer.SerializeToUtf8Bytes(data);
                            var properties = channel.CreateBasicProperties();
                            properties.Persistent = true;
                            properties.DeliveryMode = 2;
                            channel.BasicPublish(exchangeName, $"{equipmentTypeCode}.{data.EquipmentCode}", properties, body);
                            total++;
                        }
                    } while (DateTime.Now - _lastReloadTime < _reloadTimeSpan);

                    if (total > 0)
                    {
                        _logger.LogInformation($"成功写入{total}条数据");
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError($"[{nameof(RabbitMQHandleBackgroundService)}]线程异常:{ex.Message}");
                }
            }
        }

        /// <summary>
        /// 刷新数据
        /// </summary>
        private BllResult ReloadData()
        {
            try
            {
                _equipments = _freeSql.Queryable<EquipmentExtend>().ToList(x => new EquipmentExtend
                {
                    Id = x.Id,
                    Code = x.Code,
                    Name = x.Name,
                    EquipmentTypeId = x.EquipmentTypeId,
                });
                _equipmentTypes = _freeSql.Queryable<EquipmentTypeExtend>().ToList(x => new EquipmentTypeExtend
                {
                    Id = x.Id,
                    Code = x.Code,
                    Name = x.Name,
                });
                return BllResultFactory.Success();
            }
            catch (Exception ex)
            {
                return BllResultFactory.Error(ex.Message);
            }
        }
    }
}