TaskExecutionBackgroundService.cs 14.1 KB
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Application.Services;
using Rcs.Application.Services.PathFind;
using Rcs.Application.Services.PathFind.Models;
using Rcs.Application.Services.Protocol;
using Rcs.Application.Shared;
using Rcs.Domain.Entities;
using Rcs.Domain.Enums;
using Rcs.Domain.Models.VDA5050;
using Rcs.Domain.Repositories;
using Rcs.Domain.Settings;
using Rcs.Infrastructure.PathFinding.Services;
using StackExchange.Redis;
using System.Text.Json;
using TaskStatus = Rcs.Domain.Entities.TaskStatus;

namespace Rcs.Infrastructure.Services
{
    /// <summary>
    /// 后台任务执行服务 - 将已分配的任务下发给空闲机器人执行
    /// @author zzy
    /// </summary>
    public class TaskExecutionBackgroundService : BackgroundService, ITaskExecutionService
    {
        private readonly ILogger<TaskExecutionBackgroundService> _logger;
        private readonly IServiceScopeFactory _serviceScopeFactory;
        private readonly IRobotCacheService _robotCacheService;
        private readonly IAgvPathService _agvPathService;
        private readonly IConnectionMultiplexer _redis;
        private readonly AppSettings _settings;
        private readonly TimeSpan _executionInterval = TimeSpan.FromSeconds(3);
        private const int MaxAssignedTasksPerCycle = 10;
        

        public TaskExecutionBackgroundService(
            ILogger<TaskExecutionBackgroundService> logger,
            IServiceScopeFactory serviceScopeFactory,
            IRobotCacheService robotCacheService,
            IAgvPathService agvPathService,
            IConnectionMultiplexer redis,
            IOptions<AppSettings> settings)
        {
            _logger = logger;
            _serviceScopeFactory = serviceScopeFactory;
            _robotCacheService = robotCacheService;
            _agvPathService = agvPathService;
            _redis = redis;
            _settings = settings.Value;
        }

        /// <summary>
        /// 后台服务执行入口
        /// @author zzy
        /// </summary>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("[任务执行] 后台任务执行服务已启动");

            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    await ExecuteTasksAsync(stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "[任务执行] 执行过程发生异常");
                }

                await Task.Delay(_executionInterval, stoppingToken);
            }

            _logger.LogInformation("[任务执行] 后台任务执行服务已停止");
        }

        /// <summary>
        /// 执行一次任务执行检查
        /// @author zzy
        /// </summary>
        public async Task<TaskExecutionResult> ExecuteTasksAsync(CancellationToken cancellationToken = default)
        {
            using var scope = _serviceScopeFactory.CreateScope();
            var taskRepo = scope.ServiceProvider.GetRequiredService<IRobotTaskRepository>();
            var robotRepo = scope.ServiceProvider.GetRequiredService<IRobotRepository>();

            // 1. 获取已分配和执行中的任务(执行中的任务可能有待执行的子任务)
            var assignedTasks = await taskRepo.GetByStatusAsync(TaskStatus.Assigned, cancellationToken);
            var inProgressTasks = await taskRepo.GetByStatusAsync(TaskStatus.InProgress, cancellationToken);
            
            var activeTasks = assignedTasks
                .Concat(inProgressTasks)
                .Where(t => t.RobotId.HasValue)
                .ToList();

            if (!activeTasks.Any())
            {
                return new TaskExecutionResult { Success = true, ExecutedCount = 0, Message = "无待执行任务" };
            }

            var tasksToExecute = await BuildExecutionQueueAsync(activeTasks, robotRepo, taskRepo, cancellationToken);
            if (!tasksToExecute.Any())
            {
                return new TaskExecutionResult { Success = true, ExecutedCount = 0, Message = "无待执行任务" };
            }

            int executedCount = 0;

            foreach (var task in tasksToExecute)
            {
                if (!task.RobotId.HasValue) continue;

                // 2. 获取机器人信息
                var robot = await robotRepo.GetByIdFullDataAsync(task.RobotId.Value, cancellationToken);
                if (robot == null)
                {
                    _logger.LogWarning("[任务执行] 任务 {TaskCode} 对应的机器人不存在", task.TaskCode);
                    continue;
                }

                if (!robot.IsRobotAvailable())
                {
                    // _logger.LogDebug("[任务执行] 机器人 {RobotCode} 当前不可用", robot.RobotCode);
                    continue;
                }

                // 4. 检查机器人是否有执行中的任务
                var hasInProgressTask = await HasInProgressTaskAsync(robot.RobotId, task.TaskId, taskRepo, cancellationToken);
                if (hasInProgressTask)
                {
                    // _logger.LogDebug("[任务执行] 机器人 {RobotCode} 存在执行中任务", robot.RobotCode);
                    continue;
                }

                // 5. 根据协议类型执行任务
                var success = await ExecuteTaskByProtocolAsync(task, robot, taskRepo, scope, cancellationToken);
                if (success)
                {
                    executedCount++;
                }
            }

            return new TaskExecutionResult
            {
                Success = true,
                ExecutedCount = executedCount,
                Message = $"本次执行完成,已下发 {executedCount} 个任务"
            };
        }

        /// <summary>
        /// 构建执行队列:按机器人分组后,为每台机器人挑选一个最优候选任务。
        /// 候选任务只按“当前位置到目标点距离”排序(不进行路径重合率评估)。
        /// </summary>
        /// <param name="activeTasks">当前活动任务集合(Assigned + InProgress)</param>
        /// <param name="robotRepo">机器人仓储</param>
        /// <param name="taskRepo">任务仓储</param>
        /// <param name="cancellationToken">取消令牌</param>
        /// <returns>本轮待执行任务列表</returns>
        private async Task<List<RobotTask>> BuildExecutionQueueAsync(
            IEnumerable<RobotTask> activeTasks,
            IRobotRepository robotRepo,
            IRobotTaskRepository taskRepo,
            CancellationToken cancellationToken)
        {
            var queue = new List<RobotTask>();

            // 按机器人分组,每个机器人每轮最多选择一个“下一下发任务”
            var taskGroups = activeTasks
                .Where(t => t.RobotId.HasValue)
                .GroupBy(t => t.RobotId!.Value)
                .OrderBy(g => g.Min(t => t.Priority))
                .ThenBy(g => g.Min(t => t.CreatedAt));

            foreach (var taskGroup in taskGroups)
            {
                var robot = await robotRepo.GetByIdFullDataAsync(taskGroup.Key, cancellationToken);
                if (robot == null)
                {
                    continue;
                }
                // 判断机器人是否仍在执行:只看“执行中子任务”,不依赖主任务状态
                var hasInProgressSubTask = taskGroup.Any(t =>
                    t.SubTasks.Any(st => st.Status == TaskStatus.InProgress));
                if (hasInProgressSubTask)
                {
                    continue;
                }

                var candidateTasks = new List<TaskCandidate>();
                foreach (var task in taskGroup)
                {
                    var taskWithDetails = await taskRepo.GetByIdWithDetailsAsync(task.TaskId, cancellationToken);
                    if (taskWithDetails == null)
                    {
                        continue;
                    }

                    if (!taskWithDetails.SubTasks.Any() && taskWithDetails.Status == TaskStatus.InProgress)
                    {
                        continue;
                    }

                    var nextSubTask = taskWithDetails.GetNextExecutableSubTask();
                    if (taskWithDetails.SubTasks.Any() && nextSubTask == null)
                    {
                        continue;
                    }

                    var distance = CalculateDistanceToTask(robot, taskWithDetails, nextSubTask);
                    candidateTasks.Add(new TaskCandidate(taskWithDetails, distance));
                }

                if (!candidateTasks.Any())
                {
                    continue;
                }

                // 仅按优先级、距离与创建时间做稳定排序
                var selectedTask = candidateTasks
                    .OrderBy(c => c.Task.Priority)
                    .ThenBy(c => c.Distance)
                    .ThenBy(c => c.Task.CreatedAt)
                    .Select(c => c.Task)
                    .FirstOrDefault();

                if (selectedTask != null)
                {
                    queue.Add(selectedTask);
                }
            }

            return queue
                .OrderBy(t => t.Priority)
                .ThenBy(t => t.CreatedAt)
                .Take(MaxAssignedTasksPerCycle)
                .ToList();
        }

        /// <summary>
        /// 计算机器人到候选任务目标点的直线距离。
        /// 若坐标或目标缺失,返回 <see cref="double.MaxValue"/>。
        /// </summary>
        /// <param name="robot">机器人实体</param>
        /// <param name="task">候选任务</param>
        /// <param name="nextSubTask">候选任务的下一子任务</param>
        /// <returns>直线距离</returns>
        private static double CalculateDistanceToTask(Robot robot, RobotTask task, RobotSubTask? nextSubTask)
        {
            var sourceX = robot.CurrentX ?? robot.MapNode?.X;
            var sourceY = robot.CurrentY ?? robot.MapNode?.Y;

            if (!sourceX.HasValue || !sourceY.HasValue)
            {
                return double.MaxValue;
            }

            var targetNode = nextSubTask?.EndNode
                ?? task.BeginLocation?.MapNode
                ?? task.EndLocation?.MapNode;
            if (targetNode == null)
            {
                return double.MaxValue;
            }

            var dx = sourceX.Value - targetNode.X;
            var dy = sourceY.Value - targetNode.Y;
            return Math.Sqrt(dx * dx + dy * dy);
        }

        /// <summary>
        /// 候选任务评分上下文。
        /// 封装单个候选任务的排序比较信息。
        /// </summary>
        private sealed class TaskCandidate
        {
            public TaskCandidate(RobotTask task, double distance)
            {
                Task = task;
                Distance = distance;
            }

            public RobotTask Task { get; }
            public double Distance { get; }
        }

        /// <summary>
        /// 检查机器人是否有执行中的任务
        /// @author zzy
        /// </summary>
        private async Task<bool> HasInProgressTaskAsync(
            Guid robotId,
            Guid taskId,
            IRobotTaskRepository taskRepo,
            CancellationToken cancellationToken)
        {
            var robotTasks = await taskRepo.GetByRobotIdAsync(robotId, cancellationToken);
            return robotTasks.Any(t =>
                t.TaskId != taskId &&
                t.SubTasks.Any(st => st.Status == TaskStatus.InProgress));
        }

        /// <summary>
        /// 根据协议类型执行任务
        /// @author zzy
        /// 重构为使用协议工厂模式
        /// </summary>
        private async Task<bool> ExecuteTaskByProtocolAsync(
            RobotTask task,
            Robot robot,
            IRobotTaskRepository taskRepo,
            IServiceScope scope,
            CancellationToken cancellationToken)
        {
            try
            {
                // 使用同一scope的taskRepo获取带详情的任务,避免跨scope实体跟踪冲突
                var taskWithDetails = await taskRepo.GetByIdWithDetailsAsync(task.TaskId, cancellationToken);
                if (taskWithDetails == null)
                {
                    _logger.LogWarning("[任务执行] 任务 {TaskCode} 详情获取失败", task.TaskCode);
                    return false;
                }
                if (taskWithDetails.SubTasks.Count <= 0 && robot.ProtocolType == ProtocolType.VDA)
                {
                    _logger.LogWarning("[任务执行] 任务 {TaskCode} 不存在子任务,执行失败", task.TaskCode);
                    return false;
                }

                // 获取协议服务工厂
                var protocolServiceFactory = scope.ServiceProvider.GetRequiredService<IProtocolServiceFactory>();
                var protocolService = protocolServiceFactory.GetService(robot);

                // 执行任务
                var res = await protocolService.SendOrderAsync(robot, taskWithDetails, cancellationToken);

                if (res.Success)
                {
                    taskWithDetails.StartExecution();
                    await taskRepo.UpdateAsync(taskWithDetails, cancellationToken);
                    await taskRepo.SaveChangesAsync(cancellationToken);
                    _logger.LogInformation("[任务执行] 任务 {TaskCode} 已下发给机器人 {RobotCode},协议类型: {ProtocolType}",
                        taskWithDetails.TaskCode, robot.RobotCode, robot.ProtocolType);
                    return true;
                }
                else
                {
                    throw new Exception(res.Message);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "[任务执行] 任务 {TaskCode} 执行失败", task.TaskCode);
                return false;
            }
        }
    }
}