整个回调流程是这样的:
服务端完成任务后发送状态到 SQS:
1 2 3 4 5 6 7 8 9
| def send_callback(task_id, status): callback_message = { "taskId": task_id, "status": status, "outputDir": output_location, } send_sqs_message(status_callback_queue_url, callback_message)
|
AWS Lambda 函数监听回调队列:
1 2 3 4 5 6 7 8 9 10 11 12
| def lambda_handler(event, context): for record in event['Records']: message_body = json.loads(record['body']) url = "https://backend.apob.ai/callback/content/event" response = requests.post( url, headers={'Content-Type': 'application/json'}, data=json.dumps(message_body) )
|
Java 后端接收回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @RestController @RequestMapping("/callback") public class ContentGenerationCallbackController { @PostMapping("/content/event") public void handleCallback(@RequestBody ContentGenerationCallback callback) { Task task = taskService.getTaskByTaskId(callback.getTaskId()); Content content = contentService.getContentById(task.getResourceId()); AbstractContentCallbackHandler<ContentGenerationCallbackDetail> handler = contentCallbackHandlerRegistry.getHandler(content.getType()); switch (callback.getStatus().toLowerCase()) { case "running": handler.handleRunningStatus(callbackDetail); break; case "failed": handler.handleFailureStatus(callbackDetail); break; case "success": handler.handleSuccessStatus(callbackDetail); break; } } }
|
完整流程示意:
1 2 3 4 5 6 7 8 9 10 11
| Service完成任务 ↓ 发送状态到SQS回调队列 ↓ Lambda函数触发 ↓ 转发回调到Java后端API ↓ 后端根据内容类型分发处理 ↓ 更新任务状态/处理结果
|
这样设计的好处:
1. 解耦服务端和后端
2. 保证消息的可靠投递
3. 支持失败重试
4. 可以处理复杂的任务依赖关系