SegmentRetryBackgroundService.cs 13.2 KB
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Application.Services;
using Rcs.Application.Services.PathFind;
using Rcs.Application.Services.PathFind.Models;
using Rcs.Application.Services.Protocol;
using Rcs.Domain.Repositories;
using Rcs.Domain.Settings;
using StackExchange.Redis;
using System.Text.Json;
using TaskStatus = Rcs.Domain.Entities.TaskStatus;

namespace Rcs.Infrastructure.Services
{
    /// <summary>
    /// 路径段发送重试后台服务(轮询兜底机制)
    /// 定期检查机器人是否到达当前段终点,若到达则尝试重发当前段路径
    /// 2026-02-06 更新:支持网络动作等待,有网络动作时不进行兜底发送
    /// @author zzy
    /// </summary>
    public class SegmentRetryBackgroundService : BackgroundService
    {
        private readonly ILogger<SegmentRetryBackgroundService> _logger;
        private readonly IServiceScopeFactory _serviceScopeFactory;
        private readonly IConnectionMultiplexer _redis;
        private readonly IRobotCacheService _robotCacheService;
        private readonly AppSettings _settings;
        private readonly TimeSpan _pollingInterval = TimeSpan.FromSeconds(2);

        public SegmentRetryBackgroundService(
            ILogger<SegmentRetryBackgroundService> logger,
            IServiceScopeFactory serviceScopeFactory,
            IConnectionMultiplexer redis,
            IRobotCacheService robotCacheService,
            IOptions<AppSettings> settings)
        {
            _logger = logger;
            _serviceScopeFactory = serviceScopeFactory;
            _redis = redis;
            _robotCacheService = robotCacheService;
            _settings = settings.Value;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("[段重试] 路径段发送重试后台服务已启动,轮询间隔: {Interval}秒", _pollingInterval.TotalSeconds);

            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    // 处理进行中任务的段重发
                    await ProcessInProgressTasksAsync(stoppingToken);

                    // 检查并处理超时的异步网络动作
                    await CheckAndHandleTimeoutNetActionsAsync();
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "[段重试] 处理重发时发生异常");
                }

                await Task.Delay(_pollingInterval, stoppingToken);
            }

            _logger.LogInformation("[段重试] 路径段发送重试后台服务已停止");
        }

        /// <summary>
        /// 检查并处理超时的异步网络动作
        /// @author zzy
        /// 2026-02-07 新增
        /// </summary>
        private async Task CheckAndHandleTimeoutNetActionsAsync()
        {
            try
            {
                using var scope = _serviceScopeFactory.CreateScope();
                var netActionExecutionService = scope.ServiceProvider.GetRequiredService<INetActionExecutionService>();

                var timeoutCount = await netActionExecutionService.CheckAndHandleTimeoutActionsAsync();
                if (timeoutCount > 0)
                {
                    _logger.LogInformation("[段重试] 处理了 {Count} 个超时的网络动作", timeoutCount);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[段重试] 检查超时网络动作时发生异常");
            }
        }

        /// <summary>
        /// 处理所有进行中任务的段重发
        /// </summary>
        private async Task ProcessInProgressTasksAsync(CancellationToken ct)
        {
            using var scope = _serviceScopeFactory.CreateScope();
            var taskRepo = scope.ServiceProvider.GetRequiredService<IRobotTaskRepository>();
            var protocolServiceFactory = scope.ServiceProvider.GetRequiredService<IProtocolServiceFactory>();

            // 获取所有进行中的任务
            var inProgressTasks = await taskRepo.GetByStatusAsync(TaskStatus.InProgress, ct);

            foreach (var task in inProgressTasks)
            {
                if (!task.RobotId.HasValue) continue;

                try
                {
                    await ProcessTaskRetryAsync(task, protocolServiceFactory, ct);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "[段重试] 处理任务 {TaskCode} 时发生异常", task.TaskCode);
                }
            }
        }

        /// <summary>
        /// 处理单个任务的段重发
        /// 检查机器人是否到达当前段终点,若到达则使用 reExec=true 重发当前段
        /// 2026-02-06 更新:添加网络动作状态检查
        /// </summary>
        private async Task ProcessTaskRetryAsync(
            Domain.Entities.RobotTask task,
            IProtocolServiceFactory protocolServiceFactory,
            CancellationToken ct)
        {
            using var scope = _serviceScopeFactory.CreateScope();
            var netActionExecutionService = scope.ServiceProvider.GetRequiredService<INetActionExecutionService>();

            // 获取当前进行中的子任务
            var currentSubTask = task.SubTasks
                .OrderBy(s => s.Sequence)
                .FirstOrDefault(st => st.Status == TaskStatus.InProgress);

            if (currentSubTask == null) return;

            // 从缓存获取机器人基础信息和位置信息(高性能)
            var (robotCache, locationCache) = await GetRobotCacheWithLocationAsync(task.RobotId!.Value);
            if (robotCache == null)
            {
                _logger.LogDebug("[段重试] 无法从缓存获取机器人信息: RobotId={RobotId}", task.RobotId);
                return;
            }

            // 获取路径缓存,判断当前段终点
            var pathCache = await GetPathCacheAsync(task.RobotId!.Value, task.TaskId, currentSubTask.SubTaskId);
            if (pathCache == null)
            {
                // 没有路径缓存,说明任务还未开始或已完成
                return;
            }

            // 检查是否正在等待网络动作完成
            if (pathCache.IsWaitingForNetAction)
            {
                _logger.LogDebug("[段重试] 任务正在等待网络动作完成,跳过兜底发送: RobotId={RobotId}, TaskCode={TaskCode}, Status={Status}",
                    task.RobotId, task.TaskCode, pathCache.NetActionStatus);
                return;
            }

            // 检查是否有进行中的网络动作(通过服务再次确认)
            var hasInProgressNetAction = await netActionExecutionService.HasInProgressNetActionAsync(
                task.RobotId!.Value,
                task.TaskId,
                currentSubTask.SubTaskId,
                pathCache.CurrentJunctionIndex,
                pathCache.CurrentResourceIndex);

            if (hasInProgressNetAction)
            {
                _logger.LogDebug("[段重试] 存在进行中的网络动作,跳过兜底发送: RobotId={RobotId}, TaskCode={TaskCode}",
                    task.RobotId, task.TaskCode);
                return;
            }

            // 检查是否有等待异步响应的网络动作
            var hasWaitingAsync = await netActionExecutionService.HasWaitingAsyncResponseAsync(
                task.RobotId!.Value, task.TaskId, currentSubTask.SubTaskId);

            if (hasWaitingAsync)
            {
                _logger.LogDebug("[段重试] 存在等待异步响应的网络动作,跳过兜底发送: RobotId={RobotId}, TaskCode={TaskCode}",
                    task.RobotId, task.TaskCode);
                return;
            }

            // 判断机器人是否已到达当前段终点
            if (!IsRobotAtCurrentSegmentEndpoint(locationCache, pathCache))
            {
                // 机器人尚未到达终点,跳过
                return;
            }

            _logger.LogInformation("[段重试] 机器人已到达当前段终点,触发重发,机器人: {RobotCode}, 任务: {TaskCode}",
                robotCache.RobotCode, task.TaskCode);

            // 使用 reExec=true 循环重发当前段(通过manufacturer/serialNumber)
            // SendNextSegmentAsync 内部已实现分布式锁,无需在此加锁
            var protocolService = protocolServiceFactory.GetService(
                (Domain.Entities.ProtocolType)robotCache.ProtocolType);

            var result = await protocolService.SendNextSegmentAsync(
                robotCache.RobotManufacturer,
                robotCache.RobotSerialNumber,
                ct,
                reExec: true);

            if (result.Success)
            {
                _logger.LogDebug("[段重试] 重发成功,机器人: {RobotCode}", robotCache.RobotCode);
            }
            else
            {
                _logger.LogDebug("[段重试] 重发失败: {Reason}", result.Message);
            }
        }

        /// <summary>
        /// 从缓存中获取机器人基础信息和位置信息
        /// </summary>
        private async Task<(RobotBasicCache? Basic, RobotLocationCache? Location)> GetRobotCacheWithLocationAsync(Guid robotId)
        {
            var allRobots = await _robotCacheService.GetAllActiveRobotCacheAsync();
            var robotIdString = robotId.ToString();
            
            var robotData = allRobots.FirstOrDefault(r => r.Basic.RobotId == robotIdString);
            return (robotData.Basic, robotData.Location);
        }

        /// <summary>
        /// 从Redis获取路径缓存
        /// </summary>
        private async Task<VdaSegmentedPathCache?> GetPathCacheAsync(Guid robotId, Guid taskId, Guid subTaskId)
        {
            var key = $"{_settings.Redis.KeyPrefixes.VdaPath}:{robotId}:{taskId}:{subTaskId}";
            var cacheData = await _redis.GetDatabase().StringGetAsync(key);

            if (!cacheData.HasValue) return null;

            return JsonSerializer.Deserialize<VdaSegmentedPathCache>(cacheData.ToString());
        }

        /// <summary>
        /// 判断机器人是否已到达当前段终点(或在起点等待发送第一段)
        /// 比较机器人当前节点ID与上一个已发送段的终点节点ID
        /// 特殊情况:如果没有已发送的段,判断机器人是否在第一段起点
        /// </summary>
        private bool IsRobotAtCurrentSegmentEndpoint(RobotLocationCache? locationCache, VdaSegmentedPathCache pathCache)
        {
            if (locationCache?.NodeId == null) return false;

            // 获取预期的位置(上一个已发送段的终点,或第一段的起点)
            var expectedPosition = GetExpectedRobotPosition(pathCache);
            if (expectedPosition == null) return false;

            // 比较机器人当前节点与预期位置
            return locationCache.NodeId == expectedPosition;
        }

        /// <summary>
        /// 获取机器人预期所在位置
        /// - 如果有已发送的段:返回最后一个已发送段的终点
        /// - 如果没有已发送的段:返回第一段的起点(起点情况)
        /// </summary>
        private Guid? GetExpectedRobotPosition(VdaSegmentedPathCache pathCache)
        {
            // 先尝试获取上一个已发送段的终点
            var lastSentEndpoint = GetLastSentSegmentEndpoint(pathCache);
            if (lastSentEndpoint != null)
            {
                return lastSentEndpoint;
            }

            // 没有已发送的段,说明机器人在起点
            // 返回第一个资源段的起点节点
            var firstJunction = pathCache.JunctionSegments.FirstOrDefault();
            var firstResource = firstJunction?.ResourceSegments.FirstOrDefault();
            var firstSegment = firstResource?.Segments.FirstOrDefault();

            return firstSegment?.FromNodeId;
        }

        /// <summary>
        /// 获取上一个已发送段的终点节点ID
        /// </summary>
        private Guid? GetLastSentSegmentEndpoint(VdaSegmentedPathCache pathCache)
        {
            // 遍历所有路口段和资源段,找到最后一个已发送的段
            for (int jIndex = pathCache.CurrentJunctionIndex; jIndex >= 0; jIndex--)
            {
                var junction = pathCache.JunctionSegments.ElementAtOrDefault(jIndex);
                if (junction == null) continue;

                int rStartIndex = (jIndex == pathCache.CurrentJunctionIndex) 
                    ? pathCache.CurrentResourceIndex - 1 
                    : junction.ResourceSegments.Count - 1;

                for (int rIndex = rStartIndex; rIndex >= 0; rIndex--)
                {
                    var resource = junction.ResourceSegments.ElementAtOrDefault(rIndex);
                    if (resource?.IsSent == true && resource.Segments.Count > 0)
                    {
                        // 返回该资源段的终点节点
                        return resource.Segments.Last().ToNodeId;
                    }
                }
            }

            return null;
        }
    }
}