mirror of
https://github.com/misskey-dev/misskey.git
synced 2026-05-14 01:45:36 +02:00
fix(backend): Fix and create unit test of CleanRemoteNotesProcessorService (#16368)
* wip
* test(backend): CleanRemoteNotesProcessorService (basic)
* test(backend): CleanRemoteNotesProcessorService (advanced)
* ✌️
* a
* split initiator query
* no order by
* ???
* old → older
This commit is contained in:
@@ -5,9 +5,8 @@
|
||||
|
||||
import { setTimeout } from 'node:timers/promises';
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { And, Brackets, In, IsNull, LessThan, MoreThan, Not } from 'typeorm';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
import type { MiMeta, MiNote, NoteFavoritesRepository, NotesRepository, UserNotePiningsRepository } from '@/models/_.js';
|
||||
import type { MiMeta, MiNote, NotesRepository } from '@/models/_.js';
|
||||
import type Logger from '@/logger.js';
|
||||
import { bindThis } from '@/decorators.js';
|
||||
import { IdService } from '@/core/IdService.js';
|
||||
@@ -25,12 +24,6 @@ export class CleanRemoteNotesProcessorService {
|
||||
@Inject(DI.notesRepository)
|
||||
private notesRepository: NotesRepository,
|
||||
|
||||
@Inject(DI.noteFavoritesRepository)
|
||||
private noteFavoritesRepository: NoteFavoritesRepository,
|
||||
|
||||
@Inject(DI.userNotePiningsRepository)
|
||||
private userNotePiningsRepository: UserNotePiningsRepository,
|
||||
|
||||
private idService: IdService,
|
||||
private queueLoggerService: QueueLoggerService,
|
||||
) {
|
||||
@@ -61,6 +54,69 @@ export class CleanRemoteNotesProcessorService {
|
||||
|
||||
const MAX_NOTE_COUNT_PER_QUERY = 50;
|
||||
|
||||
//#retion queries
|
||||
// We use string literals instead of query builder for several reasons:
|
||||
// - for removeCondition, we need to use it in having clause, which is not supported by Brackets.
|
||||
// - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query
|
||||
|
||||
// The condition for removing the notes.
|
||||
// The note must be:
|
||||
// - old enough (older than the newestLimit)
|
||||
// - a remote note (userHost is not null).
|
||||
// - not have clipped
|
||||
// - not have pinned on the user profile
|
||||
// - not has been favorite by any user
|
||||
const removeCondition = 'note.id < :newestLimit'
|
||||
+ ' AND note."clippedCount" = 0'
|
||||
+ ' AND note."userHost" IS NOT NULL'
|
||||
// using both userId and noteId instead of just noteId to use index on user_note_pining table.
|
||||
// This is safe because notes are only pinned by the user who created them.
|
||||
+ ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")'
|
||||
// We cannot use userId trick because users can favorite notes from other users.
|
||||
+ ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")'
|
||||
;
|
||||
|
||||
// The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes
|
||||
const initiatorQuery = this.notesRepository.createQueryBuilder('note')
|
||||
.select('note.id', 'id')
|
||||
.where(removeCondition)
|
||||
.andWhere('note.id > :cursor')
|
||||
.orderBy('note.id', 'ASC')
|
||||
.limit(MAX_NOTE_COUNT_PER_QUERY);
|
||||
|
||||
// The union query queries the related notes and replies related to the initiator query
|
||||
const unionQuery = `
|
||||
SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId"
|
||||
FROM "note" "note"
|
||||
INNER JOIN "related_notes" "rn"
|
||||
ON "note"."replyId" = rn.id
|
||||
OR "note"."renoteId" = rn.id
|
||||
OR "note"."id" = rn."replyId"
|
||||
OR "note"."id" = rn."renoteId"
|
||||
`;
|
||||
|
||||
const selectRelatedNotesFromInitiatorIdsQuery = `
|
||||
SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId"
|
||||
FROM "note" "note" WHERE "note"."id" IN (:...initiatorIds)
|
||||
`;
|
||||
|
||||
const recursiveQuery = `(${selectRelatedNotesFromInitiatorIdsQuery}) UNION (${unionQuery})`;
|
||||
|
||||
const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note')
|
||||
.select('rn."initiatorId"')
|
||||
.innerJoin('related_notes', 'rn', 'note.id = rn.id')
|
||||
.groupBy('rn."initiatorId"')
|
||||
.having(`bool_and(${removeCondition})`);
|
||||
|
||||
const notesQuery = this.notesRepository.createQueryBuilder('note')
|
||||
.addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true })
|
||||
.select('note.id', 'id')
|
||||
.addSelect('rn."initiatorId"')
|
||||
.innerJoin('related_notes', 'rn', 'note.id = rn.id')
|
||||
.where(`rn."initiatorId" IN (${removableInitiatorNotesQuery.getQuery()})`)
|
||||
.distinctOn(['note.id']);
|
||||
//#endregion
|
||||
|
||||
const stats = {
|
||||
deletedCount: 0,
|
||||
oldest: null as number | null,
|
||||
@@ -74,77 +130,45 @@ export class CleanRemoteNotesProcessorService {
|
||||
let cursor = '0'; // oldest note ID to start from
|
||||
|
||||
while (true) {
|
||||
//#region check time
|
||||
const batchBeginAt = Date.now();
|
||||
|
||||
// We use string literals instead of query builder for several reasons:
|
||||
// - for removeCondition, we need to use it in having clause, which is not supported by Brackets.
|
||||
// - for recursive part, we need to preserve the order of columns, but typeorm query builder does not guarantee the order of columns in the result query
|
||||
const elapsed = batchBeginAt - startAt;
|
||||
|
||||
// The condition for removing the notes.
|
||||
// The note must be:
|
||||
// - old enough (older than the newestLimit)
|
||||
// - a remote note (userHost is not null).
|
||||
// - not have clipped
|
||||
// - not have pinned on the user profile
|
||||
// - not has been favorite by any user
|
||||
const removeCondition = 'note.id < :newestLimit'
|
||||
+ ' AND note."clippedCount" = 0'
|
||||
+ ' AND note."userHost" IS NOT NULL'
|
||||
// using both userId and noteId instead of just noteId to use index on user_note_pining table.
|
||||
// This is safe because notes are only pinned by the user who created them.
|
||||
+ ' AND NOT EXISTS(SELECT 1 FROM "user_note_pining" WHERE "noteId" = note."id" AND "userId" = note."userId")'
|
||||
// We cannot use userId trick because users can favorite notes from other users.
|
||||
+ ' AND NOT EXISTS(SELECT 1 FROM "note_favorite" WHERE "noteId" = note."id")'
|
||||
;
|
||||
if (elapsed >= maxDuration) {
|
||||
this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`);
|
||||
job.log('Reached maximum duration, stopping cleaning.');
|
||||
job.updateProgress(100);
|
||||
break;
|
||||
}
|
||||
|
||||
// The initiator query contains the oldest ${MAX_NOTE_COUNT_PER_QUERY} remote non-clipped notes
|
||||
const initiatorQuery = `
|
||||
SELECT "note"."id" AS "id", "note"."replyId" AS "replyId", "note"."renoteId" AS "renoteId", "note"."id" AS "initiatorId"
|
||||
FROM "note" "note" WHERE ${removeCondition} AND "note"."id" > :cursor ORDER BY "note"."id" ASC LIMIT ${MAX_NOTE_COUNT_PER_QUERY}`;
|
||||
job.updateProgress((elapsed / maxDuration) * 100);
|
||||
//#endregion
|
||||
|
||||
// The union query queries the related notes and replies related to the initiator query
|
||||
const unionQuery = `
|
||||
SELECT "note"."id", "note"."replyId", "note"."renoteId", rn."initiatorId"
|
||||
FROM "note" "note"
|
||||
INNER JOIN "related_notes" "rn"
|
||||
ON "note"."replyId" = rn.id
|
||||
OR "note"."renoteId" = rn.id
|
||||
OR "note"."id" = rn."replyId"
|
||||
OR "note"."id" = rn."renoteId"
|
||||
`;
|
||||
const recursiveQuery = `(${initiatorQuery}) UNION (${unionQuery})`;
|
||||
|
||||
const removableInitiatorNotesQuery = this.notesRepository.createQueryBuilder('note')
|
||||
.select('rn."initiatorId"')
|
||||
.innerJoin('related_notes', 'rn', 'note.id = rn.id')
|
||||
.groupBy('rn."initiatorId"')
|
||||
.having(`bool_and(${removeCondition})`);
|
||||
|
||||
const notesQuery = this.notesRepository.createQueryBuilder('note')
|
||||
.addCommonTableExpression(recursiveQuery, 'related_notes', { recursive: true })
|
||||
.select('note.id', 'id')
|
||||
.addSelect('rn."initiatorId"')
|
||||
.innerJoin('related_notes', 'rn', 'note.id = rn.id')
|
||||
.where(`rn."initiatorId" IN (${ removableInitiatorNotesQuery.getQuery() })`)
|
||||
.setParameters({ cursor, newestLimit });
|
||||
|
||||
const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.getRawMany();
|
||||
|
||||
const fetchedCount = notes.length;
|
||||
// First, we fetch the initiator notes that are older than the newestLimit.
|
||||
const initiatorNotes: { id: MiNote['id'] }[] = await initiatorQuery.setParameters({ cursor, newestLimit }).getRawMany();
|
||||
|
||||
// update the cursor to the newest initiatorId found in the fetched notes.
|
||||
// We don't use 'id' since the note can be newer than the initiator note.
|
||||
for (const note of notes) {
|
||||
if (cursor < note.initiatorId) {
|
||||
cursor = note.initiatorId;
|
||||
}
|
||||
const newCursor = initiatorNotes.reduce((max, note) => note.id > max ? note.id : max, cursor);
|
||||
|
||||
if (initiatorNotes.length === 0 || cursor === newCursor || newCursor >= newestLimit) {
|
||||
// If no notes were found or the cursor did not change, we can stop.
|
||||
job.log('No more notes to clean. (no initiator notes found or cursor did not change.)');
|
||||
break;
|
||||
}
|
||||
|
||||
const notes: { id: MiNote['id'], initiatorId: MiNote['id'] }[] = await notesQuery.setParameters({
|
||||
initiatorIds: initiatorNotes.map(note => note.id),
|
||||
newestLimit,
|
||||
}).getRawMany();
|
||||
|
||||
cursor = newCursor;
|
||||
|
||||
if (notes.length > 0) {
|
||||
await this.notesRepository.delete(notes.map(note => note.id));
|
||||
|
||||
for (const note of notes) {
|
||||
const t = this.idService.parse(note.id).date.getTime();
|
||||
for (const { id } of notes) {
|
||||
const t = this.idService.parse(id).date.getTime();
|
||||
if (stats.oldest === null || t < stats.oldest) {
|
||||
stats.oldest = t;
|
||||
}
|
||||
@@ -156,19 +180,14 @@ export class CleanRemoteNotesProcessorService {
|
||||
stats.deletedCount += notes.length;
|
||||
}
|
||||
|
||||
job.log(`Deleted ${notes.length} of ${fetchedCount}; ${Date.now() - batchBeginAt}ms`);
|
||||
job.log(`Deleted ${notes.length} from ${initiatorNotes.length} initiators; ${Date.now() - batchBeginAt}ms`);
|
||||
|
||||
const elapsed = Date.now() - startAt;
|
||||
|
||||
if (elapsed >= maxDuration) {
|
||||
this.logger.info(`Reached maximum duration of ${maxDuration}ms, stopping...`);
|
||||
job.log('Reached maximum duration, stopping cleaning.');
|
||||
job.updateProgress(100);
|
||||
if (initiatorNotes.length < MAX_NOTE_COUNT_PER_QUERY) {
|
||||
// If we fetched less than the maximum, it means there are no more notes to process.
|
||||
job.log(`No more notes to clean. (fewer than MAX_NOTE_COUNT_PER_QUERY =${MAX_NOTE_COUNT_PER_QUERY}.)`);
|
||||
break;
|
||||
}
|
||||
|
||||
job.updateProgress((elapsed / maxDuration) * 100);
|
||||
|
||||
await setTimeout(1000 * 5); // Wait a moment to avoid overwhelming the db
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user