SegmentRetryBackgroundService.cs
13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Rcs.Application.Services;
using Rcs.Application.Services.PathFind;
using Rcs.Application.Services.PathFind.Models;
using Rcs.Application.Services.Protocol;
using Rcs.Domain.Repositories;
using Rcs.Domain.Settings;
using StackExchange.Redis;
using System.Text.Json;
using TaskStatus = Rcs.Domain.Entities.TaskStatus;
namespace Rcs.Infrastructure.Services
{
/// <summary>
/// 路径段发送重试后台服务(轮询兜底机制)
/// 定期检查机器人是否到达当前段终点,若到达则尝试重发当前段路径
/// 2026-02-06 更新:支持网络动作等待,有网络动作时不进行兜底发送
/// @author zzy
/// </summary>
public class SegmentRetryBackgroundService : BackgroundService
{
private readonly ILogger<SegmentRetryBackgroundService> _logger;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IConnectionMultiplexer _redis;
private readonly IRobotCacheService _robotCacheService;
private readonly AppSettings _settings;
private readonly TimeSpan _pollingInterval = TimeSpan.FromSeconds(2);
public SegmentRetryBackgroundService(
ILogger<SegmentRetryBackgroundService> logger,
IServiceScopeFactory serviceScopeFactory,
IConnectionMultiplexer redis,
IRobotCacheService robotCacheService,
IOptions<AppSettings> settings)
{
_logger = logger;
_serviceScopeFactory = serviceScopeFactory;
_redis = redis;
_robotCacheService = robotCacheService;
_settings = settings.Value;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("[段重试] 路径段发送重试后台服务已启动,轮询间隔: {Interval}秒", _pollingInterval.TotalSeconds);
while (!stoppingToken.IsCancellationRequested)
{
try
{
// 处理进行中任务的段重发
await ProcessInProgressTasksAsync(stoppingToken);
// 检查并处理超时的异步网络动作
await CheckAndHandleTimeoutNetActionsAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "[段重试] 处理重发时发生异常");
}
await Task.Delay(_pollingInterval, stoppingToken);
}
_logger.LogInformation("[段重试] 路径段发送重试后台服务已停止");
}
/// <summary>
/// 检查并处理超时的异步网络动作
/// @author zzy
/// 2026-02-07 新增
/// </summary>
private async Task CheckAndHandleTimeoutNetActionsAsync()
{
try
{
using var scope = _serviceScopeFactory.CreateScope();
var netActionExecutionService = scope.ServiceProvider.GetRequiredService<INetActionExecutionService>();
var timeoutCount = await netActionExecutionService.CheckAndHandleTimeoutActionsAsync();
if (timeoutCount > 0)
{
_logger.LogInformation("[段重试] 处理了 {Count} 个超时的网络动作", timeoutCount);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "[段重试] 检查超时网络动作时发生异常");
}
}
/// <summary>
/// 处理所有进行中任务的段重发
/// </summary>
private async Task ProcessInProgressTasksAsync(CancellationToken ct)
{
using var scope = _serviceScopeFactory.CreateScope();
var taskRepo = scope.ServiceProvider.GetRequiredService<IRobotTaskRepository>();
var protocolServiceFactory = scope.ServiceProvider.GetRequiredService<IProtocolServiceFactory>();
// 获取所有进行中的任务
var inProgressTasks = await taskRepo.GetByStatusAsync(TaskStatus.InProgress, ct);
foreach (var task in inProgressTasks)
{
if (!task.RobotId.HasValue) continue;
try
{
await ProcessTaskRetryAsync(task, protocolServiceFactory, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "[段重试] 处理任务 {TaskCode} 时发生异常", task.TaskCode);
}
}
}
/// <summary>
/// 处理单个任务的段重发
/// 检查机器人是否到达当前段终点,若到达则使用 reExec=true 重发当前段
/// 2026-02-06 更新:添加网络动作状态检查
/// </summary>
private async Task ProcessTaskRetryAsync(
Domain.Entities.RobotTask task,
IProtocolServiceFactory protocolServiceFactory,
CancellationToken ct)
{
using var scope = _serviceScopeFactory.CreateScope();
var netActionExecutionService = scope.ServiceProvider.GetRequiredService<INetActionExecutionService>();
// 获取当前进行中的子任务
var currentSubTask = task.SubTasks
.OrderBy(s => s.Sequence)
.FirstOrDefault(st => st.Status == TaskStatus.InProgress);
if (currentSubTask == null) return;
// 从缓存获取机器人基础信息和位置信息(高性能)
var (robotCache, locationCache) = await GetRobotCacheWithLocationAsync(task.RobotId!.Value);
if (robotCache == null)
{
_logger.LogDebug("[段重试] 无法从缓存获取机器人信息: RobotId={RobotId}", task.RobotId);
return;
}
// 获取路径缓存,判断当前段终点
var pathCache = await GetPathCacheAsync(task.RobotId!.Value, task.TaskId, currentSubTask.SubTaskId);
if (pathCache == null)
{
// 没有路径缓存,说明任务还未开始或已完成
return;
}
// 检查是否正在等待网络动作完成
if (pathCache.IsWaitingForNetAction)
{
_logger.LogDebug("[段重试] 任务正在等待网络动作完成,跳过兜底发送: RobotId={RobotId}, TaskCode={TaskCode}, Status={Status}",
task.RobotId, task.TaskCode, pathCache.NetActionStatus);
return;
}
// 检查是否有进行中的网络动作(通过服务再次确认)
var hasInProgressNetAction = await netActionExecutionService.HasInProgressNetActionAsync(
task.RobotId!.Value,
task.TaskId,
currentSubTask.SubTaskId,
pathCache.CurrentJunctionIndex,
pathCache.CurrentResourceIndex);
if (hasInProgressNetAction)
{
_logger.LogDebug("[段重试] 存在进行中的网络动作,跳过兜底发送: RobotId={RobotId}, TaskCode={TaskCode}",
task.RobotId, task.TaskCode);
return;
}
// 检查是否有等待异步响应的网络动作
var hasWaitingAsync = await netActionExecutionService.HasWaitingAsyncResponseAsync(
task.RobotId!.Value, task.TaskId, currentSubTask.SubTaskId);
if (hasWaitingAsync)
{
_logger.LogDebug("[段重试] 存在等待异步响应的网络动作,跳过兜底发送: RobotId={RobotId}, TaskCode={TaskCode}",
task.RobotId, task.TaskCode);
return;
}
// 判断机器人是否已到达当前段终点
if (!IsRobotAtCurrentSegmentEndpoint(locationCache, pathCache))
{
// 机器人尚未到达终点,跳过
return;
}
_logger.LogInformation("[段重试] 机器人已到达当前段终点,触发重发,机器人: {RobotCode}, 任务: {TaskCode}",
robotCache.RobotCode, task.TaskCode);
// 使用 reExec=true 循环重发当前段(通过manufacturer/serialNumber)
// SendNextSegmentAsync 内部已实现分布式锁,无需在此加锁
var protocolService = protocolServiceFactory.GetService(
(Domain.Entities.ProtocolType)robotCache.ProtocolType);
var result = await protocolService.SendNextSegmentAsync(
robotCache.RobotManufacturer,
robotCache.RobotSerialNumber,
ct,
reExec: true);
if (result.Success)
{
_logger.LogDebug("[段重试] 重发成功,机器人: {RobotCode}", robotCache.RobotCode);
}
else
{
_logger.LogDebug("[段重试] 重发失败: {Reason}", result.Message);
}
}
/// <summary>
/// 从缓存中获取机器人基础信息和位置信息
/// </summary>
private async Task<(RobotBasicCache? Basic, RobotLocationCache? Location)> GetRobotCacheWithLocationAsync(Guid robotId)
{
var allRobots = await _robotCacheService.GetAllActiveRobotCacheAsync();
var robotIdString = robotId.ToString();
var robotData = allRobots.FirstOrDefault(r => r.Basic.RobotId == robotIdString);
return (robotData.Basic, robotData.Location);
}
/// <summary>
/// 从Redis获取路径缓存
/// </summary>
private async Task<VdaSegmentedPathCache?> GetPathCacheAsync(Guid robotId, Guid taskId, Guid subTaskId)
{
var key = $"{_settings.Redis.KeyPrefixes.VdaPath}:{robotId}:{taskId}:{subTaskId}";
var cacheData = await _redis.GetDatabase().StringGetAsync(key);
if (!cacheData.HasValue) return null;
return JsonSerializer.Deserialize<VdaSegmentedPathCache>(cacheData.ToString());
}
/// <summary>
/// 判断机器人是否已到达当前段终点(或在起点等待发送第一段)
/// 比较机器人当前节点ID与上一个已发送段的终点节点ID
/// 特殊情况:如果没有已发送的段,判断机器人是否在第一段起点
/// </summary>
private bool IsRobotAtCurrentSegmentEndpoint(RobotLocationCache? locationCache, VdaSegmentedPathCache pathCache)
{
if (locationCache?.NodeId == null) return false;
// 获取预期的位置(上一个已发送段的终点,或第一段的起点)
var expectedPosition = GetExpectedRobotPosition(pathCache);
if (expectedPosition == null) return false;
// 比较机器人当前节点与预期位置
return locationCache.NodeId == expectedPosition;
}
/// <summary>
/// 获取机器人预期所在位置
/// - 如果有已发送的段:返回最后一个已发送段的终点
/// - 如果没有已发送的段:返回第一段的起点(起点情况)
/// </summary>
private Guid? GetExpectedRobotPosition(VdaSegmentedPathCache pathCache)
{
// 先尝试获取上一个已发送段的终点
var lastSentEndpoint = GetLastSentSegmentEndpoint(pathCache);
if (lastSentEndpoint != null)
{
return lastSentEndpoint;
}
// 没有已发送的段,说明机器人在起点
// 返回第一个资源段的起点节点
var firstJunction = pathCache.JunctionSegments.FirstOrDefault();
var firstResource = firstJunction?.ResourceSegments.FirstOrDefault();
var firstSegment = firstResource?.Segments.FirstOrDefault();
return firstSegment?.FromNodeId;
}
/// <summary>
/// 获取上一个已发送段的终点节点ID
/// </summary>
private Guid? GetLastSentSegmentEndpoint(VdaSegmentedPathCache pathCache)
{
// 遍历所有路口段和资源段,找到最后一个已发送的段
for (int jIndex = pathCache.CurrentJunctionIndex; jIndex >= 0; jIndex--)
{
var junction = pathCache.JunctionSegments.ElementAtOrDefault(jIndex);
if (junction == null) continue;
int rStartIndex = (jIndex == pathCache.CurrentJunctionIndex)
? pathCache.CurrentResourceIndex - 1
: junction.ResourceSegments.Count - 1;
for (int rIndex = rStartIndex; rIndex >= 0; rIndex--)
{
var resource = junction.ResourceSegments.ElementAtOrDefault(rIndex);
if (resource?.IsSent == true && resource.Segments.Count > 0)
{
// 返回该资源段的终点节点
return resource.Segments.Last().ToNodeId;
}
}
}
return null;
}
}
}