mirror of
https://github.com/misskey-dev/misskey.git
synced 2026-05-04 22:25:50 +02:00
feat: Job queue inspector (#15856)
* wip
* wip
* Update job-queue.vue
* wip
* wip
* Update job-queue.vue
* wip
* Update job-queue.vue
* wip
* Update QueueService.ts
* Update QueueService.ts
* Update QueueService.ts
* Update job-queue.vue
* wip
* wip
* wip
* Update job-queue.vue
* wip
* Update MkTl.vue
* wip
* Update index.vue
* wip
* wip
* Update MkTl.vue
* 🎨
* jobs search
* wip
* Update job-queue.vue
* wip
* wip
* Update job-queue.vue
* Update job-queue.vue
* Update job-queue.vue
* Update job-queue.vue
* wip
* Update job-queue.job.vue
* wip
* wip
* wip
* Update MkCode.vue
* wip
* Update job-queue.job.vue
* wip
* Update job-queue.job.vue
* Update misskey-js.api.md
* Update CHANGELOG.md
* Update job-queue.job.vue
This commit is contained in:
@@ -44,7 +44,7 @@ import { BakeBufferedReactionsProcessorService } from './processors/BakeBuffered
|
||||
import { CleanProcessorService } from './processors/CleanProcessorService.js';
|
||||
import { AggregateRetentionProcessorService } from './processors/AggregateRetentionProcessorService.js';
|
||||
import { QueueLoggerService } from './QueueLoggerService.js';
|
||||
import { QUEUE, baseQueueOptions } from './const.js';
|
||||
import { QUEUE, baseWorkerOptions } from './const.js';
|
||||
|
||||
// ref. https://github.com/misskey-dev/misskey/pull/7635#issue-971097019
|
||||
function httpRelatedBackoff(attemptsMade: number) {
|
||||
@@ -175,7 +175,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return processer(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.SYSTEM),
|
||||
...baseWorkerOptions(this.config, QUEUE.SYSTEM),
|
||||
autorun: false,
|
||||
});
|
||||
|
||||
@@ -232,7 +232,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return processer(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.DB),
|
||||
...baseWorkerOptions(this.config, QUEUE.DB),
|
||||
autorun: false,
|
||||
});
|
||||
|
||||
@@ -264,7 +264,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return this.deliverProcessorService.process(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.DELIVER),
|
||||
...baseWorkerOptions(this.config, QUEUE.DELIVER),
|
||||
autorun: false,
|
||||
concurrency: this.config.deliverJobConcurrency ?? 128,
|
||||
limiter: {
|
||||
@@ -304,7 +304,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return this.inboxProcessorService.process(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.INBOX),
|
||||
...baseWorkerOptions(this.config, QUEUE.INBOX),
|
||||
autorun: false,
|
||||
concurrency: this.config.inboxJobConcurrency ?? 16,
|
||||
limiter: {
|
||||
@@ -344,7 +344,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return this.userWebhookDeliverProcessorService.process(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
|
||||
...baseWorkerOptions(this.config, QUEUE.USER_WEBHOOK_DELIVER),
|
||||
autorun: false,
|
||||
concurrency: 64,
|
||||
limiter: {
|
||||
@@ -384,7 +384,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return this.systemWebhookDeliverProcessorService.process(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
|
||||
...baseWorkerOptions(this.config, QUEUE.SYSTEM_WEBHOOK_DELIVER),
|
||||
autorun: false,
|
||||
concurrency: 16,
|
||||
limiter: {
|
||||
@@ -434,7 +434,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return processer(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.RELATIONSHIP),
|
||||
...baseWorkerOptions(this.config, QUEUE.RELATIONSHIP),
|
||||
autorun: false,
|
||||
concurrency: this.config.relationshipJobConcurrency ?? 16,
|
||||
limiter: {
|
||||
@@ -479,7 +479,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return processer(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.OBJECT_STORAGE),
|
||||
...baseWorkerOptions(this.config, QUEUE.OBJECT_STORAGE),
|
||||
autorun: false,
|
||||
concurrency: 16,
|
||||
});
|
||||
@@ -512,7 +512,7 @@ export class QueueProcessorService implements OnApplicationShutdown {
|
||||
return this.endedPollNotificationProcessorService.process(job);
|
||||
}
|
||||
}, {
|
||||
...baseQueueOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
|
||||
...baseWorkerOptions(this.config, QUEUE.ENDED_POLL_NOTIFICATION),
|
||||
autorun: false,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
* SPDX-License-Identifier: AGPL-3.0-only
|
||||
*/
|
||||
|
||||
import { MetricsTime } from 'bullmq';
|
||||
import { Config } from '@/config.js';
|
||||
import type * as Bull from 'bullmq';
|
||||
|
||||
@@ -27,3 +28,12 @@ export function baseQueueOptions(config: Config, queueName: typeof QUEUE[keyof t
|
||||
prefix: config.redisForJobQueue.prefix ? `${config.redisForJobQueue.prefix}:queue:${queueName}` : `queue:${queueName}`,
|
||||
};
|
||||
}
|
||||
|
||||
export function baseWorkerOptions(config: Config, queueName: typeof QUEUE[keyof typeof QUEUE]): Bull.WorkerOptions {
|
||||
return {
|
||||
...baseQueueOptions(config, queueName),
|
||||
metrics: {
|
||||
maxDataPoints: MetricsTime.ONE_WEEK,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user