DataSyncBackgroundService.cs 11 KB
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Application.Services;
using Rcs.Domain.Repositories;
using Rcs.Domain.Settings;
using StackExchange.Redis;

namespace Rcs.Infrastructure.Services
{
    /// <summary>
    /// 数据同步后台服务 - 程序启动时将地图和机器人数据同步到Redis
    /// @author zzy
    /// </summary>
    public class DataSyncBackgroundService : IHostedService, IDataSyncService
    {
        private readonly ILogger<DataSyncBackgroundService> _logger;
        private readonly IServiceProvider _serviceProvider;
        private readonly IConnectionMultiplexer _redis;
        private readonly AppSettings _settings;

        // JSON序列化选项,允许特殊浮点数值(Infinity、NaN)
        private static readonly JsonSerializerOptions _jsonOptions = new()
        {
            NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals
        };

        public DataSyncBackgroundService(
            ILogger<DataSyncBackgroundService> logger,
            IServiceProvider serviceProvider,
            IConnectionMultiplexer redis,
            IOptions<AppSettings> settings)
        {
            _logger = logger;
            _serviceProvider = serviceProvider;
            _redis = redis;
            _settings = settings.Value;
        }

        /// <summary>
        /// 服务启动时执行数据同步
        /// @author zzy
        /// </summary>
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("[数据同步] 数据同步服务启动中...");

            try
            {
                await SyncAllAsync(cancellationToken);
                _logger.LogInformation("[数据同步] 数据同步服务启动完成");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[数据同步] 数据同步服务启动失败");
            }
        }

        /// <summary>
        /// 服务停止
        /// @author zzy
        /// </summary>
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("[数据同步] 数据同步服务已停止");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 同步所有数据到Redis
        /// @author zzy
        /// </summary>
        public async Task SyncAllAsync(CancellationToken cancellationToken = default)
        {
            await SyncMapsAsync(cancellationToken);
            await SyncRobotsAsync(cancellationToken);
        }

        /// <summary>
        /// 同步所有地图数据到Redis(包含节点、边、资源)
        /// @author zzy
        /// </summary>
        public async Task SyncMapsAsync(CancellationToken cancellationToken = default)
        {
            using var scope = _serviceProvider.CreateScope();
            var mapRepo = scope.ServiceProvider.GetRequiredService<IMapRepository>();
            var db = _redis.GetDatabase();

            var maps = await mapRepo.GetAllAsync(cancellationToken);
            var mapIds = new System.Collections.Generic.List<string>();

            foreach (var mapBasic in maps)
            {
                // 获取包含完整关联数据的地图
                var map = await mapRepo.GetWithFullDetailsAsync(mapBasic.MapId, cancellationToken);
                if (map == null) continue;

                var key = $"{_settings.Redis.KeyPrefixes.Map}:{map.MapId}";
                var cacheData = new MapCacheData
                {
                    MapId = map.MapId,
                    MapCode = map.MapCode,
                    MapName = map.MapName,
                    MapType = (int)map.MapType,
                    Version = map.Version,
                    Active = map.Active,
                    Nodes = map.MapNodes.Select(n => new MapNodeCache
                    {
                        NodeId = n.NodeId,
                        NodeCode = n.NodeCode,
                        X = n.X,
                        Y = n.Y,
                        Theta = n.Theta,
                        Type = (int)n.Type,
                        Active = n.Active,
                        IsReverseParking = n.IsReverseParking,
                        AllowRotate = n.AllowRotate,
                        MaxCoordinateOffset = n.MaxCoordinateOffset
                    }).ToList(),
                    Edges = map.MapEdges.Select(e => new MapEdgeCache
                    {
                        EdgeId = e.EdgeId,
                        EdgeCode = e.EdgeCode,
                        FromNode = e.FromNode,
                        ToNode = e.ToNode,
                        Length = e.Length,
                        Cost = e.Cost,
                        Active = e.Active,
                        OrientationRads = e.OrientationRads,
                        MaxSpeed = e.MaxSpeed,
                        MaxRadDeviation = e.MaxRadDeviation,
                        IsCurve = e.IsCurve,
                        ControlPoints = e.ControlPoints?
                            .Select(p => new PointCache { X = p.X, Y = p.Y })
                            .ToList()
                    }).ToList(),
                    Resources = map.MapResources.Select(r => new MapResourceCache
                    {
                        ResourceId = r.ResourceId,
                        ResourceCode = r.ResourceCode,
                        Type = (int)r.Type,
                        Capacity = r.Capacity,
                        LocationCoordinates = r.LocationCoordinates?.Coordinates
                            .Select(c => new PointCache { X = c.X, Y = c.Y })
                            .ToList(),
                        MaxSpeed = r.MaxSpeed,
                        CanRotate = r.CanRotate,
                        PreAction1Type = r.PreAction1Type,
                        PreNetActions1 = r.PreNetActions1,
                        PostAction1Type = r.PostAction1Type,
                        PostNetActions1 = r.PostNetActions1,
                        PreAction2Type = r.PreAction2Type,
                        PreNetActions2 = r.PreNetActions2,
                        PostAction2Type = r.PostAction2Type,
                        PostNetActions2 = r.PostNetActions2
                    }).ToList()
                };

                var json = JsonSerializer.Serialize(cacheData, _jsonOptions);

                await db.StringSetAsync(key, json);
                mapIds.Add(map.MapId.ToString());
            }

            // 存储地图ID列表
            await db.StringSetAsync(_settings.Redis.KeyPrefixes.MapList, JsonSerializer.Serialize(mapIds, _jsonOptions));

            _logger.LogInformation("[数据同步] 已同步 {Count} 个地图数据到Redis(含节点、边、资源)", mapIds.Count);
        }

        /// <summary>
        /// 同步所有机器人数据到Redis(使用Hash结构分离存储基础/状态/位置数据)
        /// @author zzy
        /// </summary>
        public async Task SyncRobotsAsync(CancellationToken cancellationToken = default)
        {
            using var scope = _serviceProvider.CreateScope();
            var robotRepo = scope.ServiceProvider.GetRequiredService<IRobotRepository>();
            var db = _redis.GetDatabase();

            var robots = await robotRepo.GetAllAsync(cancellationToken);

            foreach (var robot in robots)
            {
                var robotKey = $"{robot.RobotManufacturer}:{robot.RobotSerialNumber}";

                // 基础数据 Hash(使用制造商+序列号作为唯一标识)
                var basicKey = $"{_settings.Redis.KeyPrefixes.Robot}:{robotKey}:basic";
                await db.HashSetAsync(basicKey, new HashEntry[]
                {
                    new("RobotId", robot.RobotId.ToString()),
                    new("RobotCode", robot.RobotCode),
                    new("RobotName", robot.RobotName),
                    new("RobotVersion", robot.RobotVersion),
                    new("ProtocolName", robot.ProtocolName),
                    new("ProtocolVersion", robot.ProtocolVersion),
                    new("ProtocolType", (int)robot.ProtocolType),
                    new("RobotManufacturer", robot.RobotManufacturer ?? ""),
                    new("RobotSerialNumber", robot.RobotSerialNumber),
                    new("RobotType", (int)robot.RobotType),
                    new("IpAddress", robot.IpAddress),
                    new("CoordinateScale", robot.CoordinateScale),
                    new("Active", robot.Active)
                });

                // 状态数据 Hash(使用制造商+序列号作为唯一标识)
                var statusKey = $"{_settings.Redis.KeyPrefixes.Robot}:{robotKey}:status";
                await db.HashSetAsync(statusKey, new HashEntry[]
                {
                    new("Status", (int)robot.Status),
                    new("Online", (int)robot.Online),
                    new("BatteryLevel", robot.BatteryLevel ?? 0),
                    new("Driving", robot.Driving),
                    new("Paused", robot.Paused),
                    new("Charging", robot.Charging),
                    new("OperatingMode", (int)robot.OperatingMode),
                    new("Errors", robot.Errors ?? ""),
                    new("UpdatedAt", DateTime.Now.ToString("O"))
                });

                // 位置数据 Hash(使用制造商+序列号作为唯一标识)
                var locationKey = $"{_settings.Redis.KeyPrefixes.Robot}:{robotKey}:location";
                await db.HashSetAsync(locationKey, new HashEntry[]
                {
                    new("MapId", robot.CurrentMapCodeId?.ToString() ?? ""),
                    new("NodeId", robot.CurrentNodeId?.ToString() ?? ""),
                    new("X", robot.CurrentX?.ToString(System.Globalization.CultureInfo.InvariantCulture) ?? ""),
                    new("Y", robot.CurrentY?.ToString(System.Globalization.CultureInfo.InvariantCulture) ?? ""),
                    new("Theta", robot.CurrentTheta?.ToString(System.Globalization.CultureInfo.InvariantCulture) ?? ""),
                    new("Path", ""),
                    new("UpdatedAt", DateTime.Now.ToString("O"))
                });

                // 添加到机器人集合(使用制造商:序列号作为标识)
                await db.SetAddAsync(_settings.Redis.KeyPrefixes.RobotsSet, robotKey);

                // 更新在线/空闲集合
                if (robot.Online == Domain.Entities.OnlineStatus.Online)
                    await db.SetAddAsync(_settings.Redis.KeyPrefixes.RobotsOnlineSet, robotKey);
                if (robot.Status == Domain.Entities.RobotStatus.Idle)
                    await db.SetAddAsync(_settings.Redis.KeyPrefixes.RobotsIdleSet, robotKey);
            }

            _logger.LogInformation("[数据同步] 已同步 {Count} 个机器人数据到Redis(Hash结构)", robots.Count());
        }
    }
}