2023-09-10 18:56:17 +00:00
|
|
|
import { FmtDuration, Log } from "../deps.ts";
|
|
|
|
import { formatUserChat } from "../utils.ts";
|
|
|
|
import { jobStore } from "../db/jobStore.ts";
|
|
|
|
|
|
|
|
const logger = () => Log.getLogger();
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Returns hanged jobs to the queue.
|
|
|
|
*/
|
|
|
|
export async function returnHangedJobs(): Promise<never> {
|
|
|
|
while (true) {
|
|
|
|
try {
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 5000));
|
|
|
|
const jobs = await jobStore.getBy("status.type", "processing");
|
|
|
|
for (const job of jobs) {
|
|
|
|
if (job.value.status.type !== "processing") continue;
|
2023-09-10 23:58:51 +00:00
|
|
|
// if job wasn't updated for 2 minutes, return it to the queue
|
2023-09-10 18:56:17 +00:00
|
|
|
const timeSinceLastUpdateMs = Date.now() - job.value.status.updatedDate.getTime();
|
2023-09-10 23:58:51 +00:00
|
|
|
if (timeSinceLastUpdateMs > 2 * 60 * 1000) {
|
2023-09-10 18:56:17 +00:00
|
|
|
await job.update({ status: { type: "waiting" } });
|
|
|
|
logger().warning(
|
2023-09-12 01:57:44 +00:00
|
|
|
`Job for ${formatUserChat(job.value)} was returned to the queue because it hanged for ${
|
2023-09-10 18:56:17 +00:00
|
|
|
FmtDuration.format(Math.trunc(timeSinceLastUpdateMs / 1000) * 1000, {
|
|
|
|
ignoreZero: true,
|
|
|
|
})
|
|
|
|
}`,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} catch (err) {
|
|
|
|
logger().warning(`Returning hanged jobs failed: ${err}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|