DataSyncBackgroundService.cs 16.6 KB
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Application.Services;
using Rcs.Domain.Entities;
using Rcs.Domain.Repositories;
using Rcs.Domain.Settings;
using StackExchange.Redis;

namespace Rcs.Infrastructure.Services
{
    /// <summary>
    /// 数据同步后台服务 - 程序启动时将地图和机器人数据同步到Redis
    /// @author zzy
    /// </summary>
    public class DataSyncBackgroundService : IHostedService, IDataSyncService
    {
        private readonly ILogger<DataSyncBackgroundService> _logger;
        private readonly IServiceProvider _serviceProvider;
        private readonly IConnectionMultiplexer _redis;
        private readonly AppSettings _settings;

        // JSON序列化选项,允许特殊浮点数值(Infinity、NaN)
        private static readonly JsonSerializerOptions _jsonOptions = new()
        {
            NumberHandling = JsonNumberHandling.AllowNamedFloatingPointLiterals
        };

        public DataSyncBackgroundService(
            ILogger<DataSyncBackgroundService> logger,
            IServiceProvider serviceProvider,
            IConnectionMultiplexer redis,
            IOptions<AppSettings> settings)
        {
            _logger = logger;
            _serviceProvider = serviceProvider;
            _redis = redis;
            _settings = settings.Value;
        }

        /// <summary>
        /// 服务启动时执行数据同步
        /// @author zzy
        /// </summary>
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("[数据同步] 数据同步服务启动中...");

            try
            {
                await SyncAllAsync(cancellationToken);
                _logger.LogInformation("[数据同步] 数据同步服务启动完成");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[数据同步] 数据同步服务启动失败");
            }
        }

        /// <summary>
        /// 服务停止
        /// @author zzy
        /// </summary>
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("[数据同步] 数据同步服务已停止");
            return Task.CompletedTask;
        }

        /// <summary>
        /// 同步所有数据到Redis
        /// @author zzy
        /// </summary>
        public async Task SyncAllAsync(CancellationToken cancellationToken = default)
        {
            await SyncMapsAsync(cancellationToken);
            await SyncRobotsAsync(cancellationToken);
        }

        /// <summary>
        /// 同步所有地图数据到Redis(包含节点、边、资源)
        /// @author zzy
        /// </summary>
        public async Task SyncMapsAsync(CancellationToken cancellationToken = default)
        {
            using var scope = _serviceProvider.CreateScope();
            var mapRepo = scope.ServiceProvider.GetRequiredService<IMapRepository>();
            var db = _redis.GetDatabase();

            var maps = await mapRepo.GetAllAsync(cancellationToken);
            var mapIds = new List<string>();

            foreach (var mapBasic in maps)
            {
                // 获取包含完整关联数据的地图
                var map = await mapRepo.GetWithFullDetailsAsync(mapBasic.MapId, cancellationToken);
                if (map == null) continue;

                // 将单个地图数据写入Redis
                await WriteMapToRedisAsync(db, map);
                mapIds.Add(map.MapId.ToString());
            }

            // 存储地图ID列表
            await db.StringSetAsync(_settings.Redis.KeyPrefixes.MapList, JsonSerializer.Serialize(mapIds, _jsonOptions));

            _logger.LogInformation("[数据同步] 已同步 {Count} 个地图数据到Redis(含节点、边、资源)", mapIds.Count);
        }

        /// <summary>
        /// 同步单个地图数据到Redis(用于地图数据修改后的热加载)
        /// @author zzy
        /// </summary>
        /// <param name="mapId">地图ID</param>
        /// <param name="cancellationToken">取消令牌</param>
        public async Task SyncMapAsync(Guid mapId, CancellationToken cancellationToken = default)
        {
            using var scope = _serviceProvider.CreateScope();
            var mapRepo = scope.ServiceProvider.GetRequiredService<IMapRepository>();
            var db = _redis.GetDatabase();

            var map = await mapRepo.GetWithFullDetailsAsync(mapId, cancellationToken);
            if (map == null)
            {
                _logger.LogWarning("[数据同步] 地图 {MapId} 不存在,跳过同步", mapId);
                return;
            }

            // 将单个地图数据写入Redis
            await WriteMapToRedisAsync(db, map);

            // 更新地图ID列表(确保新增地图也能被加入列表)
            var mapListJson = await db.StringGetAsync(_settings.Redis.KeyPrefixes.MapList);
            var mapIds = mapListJson.HasValue
                ? JsonSerializer.Deserialize<List<string>>(mapListJson!, _jsonOptions) ?? new List<string>()
                : new List<string>();

            var mapIdStr = mapId.ToString();
            if (!mapIds.Contains(mapIdStr))
            {
                mapIds.Add(mapIdStr);
                await db.StringSetAsync(_settings.Redis.KeyPrefixes.MapList, JsonSerializer.Serialize(mapIds, _jsonOptions));
            }

            _logger.LogInformation("[数据同步] 已同步地图 {MapId}({MapCode}) 到Redis", map.MapId, map.MapCode);
        }

        /// <summary>
        /// 直接将地图实体同步到Redis(不查库,适用于数据尚未持久化的场景)
        /// @author zzy
        /// </summary>
        /// <param name="map">地图实体(需包含完整关联数据:Nodes/Edges/Resources)</param>
        public async Task SyncMapAsync(Map map)
        {
            var db = _redis.GetDatabase();

            // 将地图数据写入Redis
            await WriteMapToRedisAsync(db, map);

            // 更新地图ID列表(确保新增地图也能被加入列表)
            var mapListJson = await db.StringGetAsync(_settings.Redis.KeyPrefixes.MapList);
            var mapIds = mapListJson.HasValue
                ? JsonSerializer.Deserialize<List<string>>(mapListJson!, _jsonOptions) ?? new List<string>()
                : new List<string>();

            var mapIdStr = map.MapId.ToString();
            if (!mapIds.Contains(mapIdStr))
            {
                mapIds.Add(mapIdStr);
                await db.StringSetAsync(_settings.Redis.KeyPrefixes.MapList, JsonSerializer.Serialize(mapIds, _jsonOptions));
            }

            _logger.LogInformation("[数据同步] 已通过实体直接同步地图 {MapId}({MapCode}) 到Redis", map.MapId, map.MapCode);
        }

        /// <summary>
        /// 将单个地图数据序列化并写入Redis
        /// @author zzy
        /// </summary>
        /// <param name="db">Redis数据库实例</param>
        /// <param name="map">地图实体(含完整关联数据)</param>
        private async Task WriteMapToRedisAsync(IDatabase db, Domain.Entities.Map map)
        {
            var key = $"{_settings.Redis.KeyPrefixes.Map}:{map.MapId}";
            var cacheData = BuildMapCacheData(map);
            var json = JsonSerializer.Serialize(cacheData, _jsonOptions);
            await db.StringSetAsync(key, json);
        }

        /// <summary>
        /// 构建地图缓存数据对象
        /// @author zzy
        /// </summary>
        /// <param name="map">地图实体(含完整关联数据)</param>
        /// <returns>地图缓存数据</returns>
        private static MapCacheData BuildMapCacheData(Domain.Entities.Map map)
        {
            return new MapCacheData
            {
                MapId = map.MapId,
                MapCode = map.MapCode,
                MapName = map.MapName,
                MapType = (int)map.MapType,
                Version = map.Version,
                Active = map.Active,
                Nodes = map.MapNodes.Select(n => new MapNodeCache
                {
                    NodeId = n.NodeId,
                    NodeCode = n.NodeCode,
                    X = n.X,
                    Y = n.Y,
                    Theta = n.Theta,
                    Type = (int)n.Type,
                    Active = n.Active,
                    IsReverseParking = n.IsReverseParking,
                    AllowRotate = n.AllowRotate,
                    MaxCoordinateOffset = n.MaxCoordinateOffset
                }).ToList(),
                Edges = map.MapEdges.Select(e => new MapEdgeCache
                {
                    EdgeId = e.EdgeId,
                    EdgeCode = e.EdgeCode,
                    FromNode = e.FromNode,
                    ToNode = e.ToNode,
                    Length = e.Length,
                    Cost = e.Cost,
                    Active = e.Active,
                    OrientationRads = e.OrientationRads,
                    MaxSpeed = e.MaxSpeed,
                    MaxRadDeviation = e.MaxRadDeviation,
                    IsCurve = e.IsCurve,
                    ControlPoints = e.ControlPoints?
                        .Select(p => new PointCache { X = p.X, Y = p.Y })
                        .ToList()
                }).ToList(),
                Resources = map.MapResources.Select(r => new MapResourceCache
                {
                    ResourceId = r.ResourceId,
                    ResourceCode = r.ResourceCode,
                    Type = (int)r.Type,
                    Capacity = r.Capacity,
                    LocationCoordinates = r.LocationCoordinates?.Coordinates
                        .Select(c => new PointCache { X = c.X, Y = c.Y })
                        .ToList(),
                    MaxSpeed = r.MaxSpeed,
                    CanRotate = r.CanRotate,
                    PreAction1Type = r.PreAction1Type,
                    PreNetActions1 = r.PreNetActions1,
                    PostAction1Type = r.PostAction1Type,
                    PostNetActions1 = r.PostNetActions1,
                    PreAction2Type = r.PreAction2Type,
                    PreNetActions2 = r.PreNetActions2,
                    PostAction2Type = r.PostAction2Type,
                    PostNetActions2 = r.PostNetActions2
                }).ToList()
            };
        }

        /// <summary>
        /// 同步所有机器人数据到Redis(使用Hash结构分离存储基础/状态/位置数据)
        /// @author zzy
        /// </summary>
        public async Task SyncRobotsAsync(CancellationToken cancellationToken = default)
        {
            using var scope = _serviceProvider.CreateScope();
            var robotRepo = scope.ServiceProvider.GetRequiredService<IRobotRepository>();
            var db = _redis.GetDatabase();

            var robots = await robotRepo.GetAllAsync(cancellationToken);

            foreach (var robot in robots)
            {
                await WriteRobotToRedisAsync(db, robot);
            }

            _logger.LogInformation("[数据同步] 已同步 {Count} 个机器人数据到Redis(Hash结构)", robots.Count());
        }

        /// <summary>
        /// 同步单个机器人数据到Redis(用于机器人数据修改后的热加载)
        /// @author zzy
        /// </summary>
        /// <param name="robotId">机器人ID</param>
        /// <param name="cancellationToken">取消令牌</param>
        public async Task SyncRobotAsync(Guid robotId, CancellationToken cancellationToken = default)
        {
            using var scope = _serviceProvider.CreateScope();
            var robotRepo = scope.ServiceProvider.GetRequiredService<IRobotRepository>();
            var db = _redis.GetDatabase();

            var robot = await robotRepo.GetByIdFullDataAsync(robotId, cancellationToken);
            if (robot == null)
            {
                _logger.LogWarning("[数据同步] 机器人 {RobotId} 不存在,跳过同步", robotId);
                return;
            }

            await WriteRobotToRedisAsync(db, robot);

            _logger.LogInformation("[数据同步] 已同步机器人 {RobotId}({RobotCode}) 到Redis", robot.RobotId, robot.RobotCode);
        }

        /// <summary>
        /// 直接将机器人实体同步到Redis(不查库,适用于数据尚未持久化的场景)
        /// @author zzy
        /// </summary>
        /// <param name="robot">机器人实体</param>
        public async Task SyncRobotAsync(Robot robot)
        {
            var db = _redis.GetDatabase();
            await WriteRobotToRedisAsync(db, robot);
            _logger.LogInformation("[数据同步] 已通过实体直接同步机器人 {RobotId}({RobotCode}) 到Redis", robot.RobotId, robot.RobotCode);
        }

        /// <summary>
        /// 将单个机器人数据写入Redis(Hash结构:基础/状态/位置 + 集合维护)
        /// @author zzy
        /// </summary>
        /// <param name="db">Redis数据库实例</param>
        /// <param name="robot">机器人实体</param>
        private async Task WriteRobotToRedisAsync(IDatabase db, Robot robot)
        {
            var robotKey = $"{robot.RobotManufacturer}:{robot.RobotSerialNumber}";

            // 基础数据 Hash(使用制造商+序列号作为唯一标识)
            var basicKey = $"{_settings.Redis.KeyPrefixes.Robot}:{robotKey}:basic";
            await db.HashSetAsync(basicKey, new HashEntry[]
            {
                new("RobotId", robot.RobotId.ToString()),
                new("RobotCode", robot.RobotCode),
                new("RobotName", robot.RobotName),
                new("RobotVersion", robot.RobotVersion),
                new("ProtocolName", robot.ProtocolName),
                new("ProtocolVersion", robot.ProtocolVersion),
                new("ProtocolType", (int)robot.ProtocolType),
                new("RobotManufacturer", robot.RobotManufacturer ?? ""),
                new("RobotSerialNumber", robot.RobotSerialNumber),
                new("RobotType", (int)robot.RobotType),
                new("IpAddress", robot.IpAddress),
                new("CoordinateScale", robot.CoordinateScale),
                new("Active", robot.Active)
            });

            // 状态数据 Hash(使用制造商+序列号作为唯一标识)
            var statusKey = $"{_settings.Redis.KeyPrefixes.Robot}:{robotKey}:status";
            await db.HashSetAsync(statusKey, new HashEntry[]
            {
                new("Status", (int)robot.Status),
                new("Online", (int)robot.Online),
                new("BatteryLevel", robot.BatteryLevel ?? 0),
                new("Driving", robot.Driving),
                new("Paused", robot.Paused),
                new("Charging", robot.Charging),
                new("OperatingMode", (int)robot.OperatingMode),
                new("Errors", robot.Errors ?? ""),
                new("UpdatedAt", DateTime.Now.ToString("O"))
            });

            // 位置数据 Hash(使用制造商+序列号作为唯一标识)
            var locationKey = $"{_settings.Redis.KeyPrefixes.Robot}:{robotKey}:location";
            await db.HashSetAsync(locationKey, new HashEntry[]
            {
                new("MapId", robot.CurrentMapCodeId?.ToString() ?? ""),
                new("NodeId", robot.CurrentNodeId?.ToString() ?? ""),
                new("X", robot.CurrentX?.ToString(System.Globalization.CultureInfo.InvariantCulture) ?? ""),
                new("Y", robot.CurrentY?.ToString(System.Globalization.CultureInfo.InvariantCulture) ?? ""),
                new("Theta", robot.CurrentTheta?.ToString(System.Globalization.CultureInfo.InvariantCulture) ?? ""),
                new("Path", ""),
                new("UpdatedAt", DateTime.Now.ToString("O"))
            });

            // 添加到机器人集合(使用制造商:序列号作为标识)
            await db.SetAddAsync(_settings.Redis.KeyPrefixes.RobotsSet, robotKey);

            // 更新在线/空闲集合
            if (robot.Online == OnlineStatus.Online)
                await db.SetAddAsync(_settings.Redis.KeyPrefixes.RobotsOnlineSet, robotKey);
            else
                await db.SetRemoveAsync(_settings.Redis.KeyPrefixes.RobotsOnlineSet, robotKey);

            if (robot.Status == RobotStatus.Idle)
                await db.SetAddAsync(_settings.Redis.KeyPrefixes.RobotsIdleSet, robotKey);
            else
                await db.SetRemoveAsync(_settings.Redis.KeyPrefixes.RobotsIdleSet, robotKey);
        }
    }
}