forked from mirrors/misskey
wip
This commit is contained in:
@@ -5,7 +5,7 @@
|
||||
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import * as Redis from 'ioredis';
|
||||
import type { BlockingsRepository, ChannelFollowingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository } from '@/models/_.js';
|
||||
import type { BlockingsRepository, ChannelFollowingsRepository, FollowingsRepository, MutingsRepository, RenoteMutingsRepository, MiUserProfile, UserProfilesRepository, UsersRepository, MiFollowing } from '@/models/_.js';
|
||||
import { MemoryKVCache, RedisKVCache } from '@/misc/cache.js';
|
||||
import type { MiLocalUser, MiUser } from '@/models/User.js';
|
||||
import { DI } from '@/di-symbols.js';
|
||||
@@ -25,7 +25,7 @@ export class CacheService implements OnApplicationShutdown {
|
||||
public userBlockingCache: RedisKVCache<Set<string>>;
|
||||
public userBlockedCache: RedisKVCache<Set<string>>; // NOTE: 「被」Blockキャッシュ
|
||||
public renoteMutingsCache: RedisKVCache<Set<string>>;
|
||||
public userFollowingsCache: RedisKVCache<Set<string>>;
|
||||
public userFollowingsCache: RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>;
|
||||
public userFollowingChannelsCache: RedisKVCache<Set<string>>;
|
||||
|
||||
constructor(
|
||||
@@ -136,12 +136,18 @@ export class CacheService implements OnApplicationShutdown {
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
});
|
||||
|
||||
this.userFollowingsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowings', {
|
||||
this.userFollowingsCache = new RedisKVCache<Record<string, Pick<MiFollowing, 'withReplies'> | undefined>>(this.redisClient, 'userFollowings', {
|
||||
lifetime: 1000 * 60 * 30, // 30m
|
||||
memoryCacheLifetime: 1000 * 60, // 1m
|
||||
fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId'] }).then(xs => new Set(xs.map(x => x.followeeId))),
|
||||
toRedisConverter: (value) => JSON.stringify(Array.from(value)),
|
||||
fromRedisConverter: (value) => new Set(JSON.parse(value)),
|
||||
fetcher: (key) => this.followingsRepository.find({ where: { followerId: key }, select: ['followeeId', 'withReplies'] }).then(xs => {
|
||||
const obj: Record<string, Pick<MiFollowing, 'withReplies'> | undefined> = {};
|
||||
for (const x of xs) {
|
||||
obj[x.followeeId] = { withReplies: x.withReplies };
|
||||
}
|
||||
return obj;
|
||||
}),
|
||||
toRedisConverter: (value) => JSON.stringify(value),
|
||||
fromRedisConverter: (value) => JSON.parse(value),
|
||||
});
|
||||
|
||||
this.userFollowingChannelsCache = new RedisKVCache<Set<string>>(this.redisClient, 'userFollowingChannels', {
|
||||
|
||||
@@ -805,15 +805,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||
private async pushToTl(note: MiNote, user: { id: MiUser['id']; host: MiUser['host']; }) {
|
||||
const redisPipeline = this.redisClient.pipeline();
|
||||
|
||||
if (note.replyId && note.replyUserId !== note.userId) {
|
||||
if (note.visibility === 'public' || note.visibility === 'home') {
|
||||
redisPipeline.xadd(
|
||||
`userTimelineWithReplies:${user.id}`,
|
||||
'MAXLEN', '~', '200',
|
||||
'*',
|
||||
'note', note.id);
|
||||
}
|
||||
} else if (note.channelId) {
|
||||
if (note.channelId) {
|
||||
const channelFollowings = await this.channelFollowingsRepository.find({
|
||||
where: {
|
||||
followeeId: note.channelId,
|
||||
@@ -845,7 +837,7 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||
followeeId: user.id,
|
||||
followerHost: IsNull(),
|
||||
},
|
||||
select: ['followerId'],
|
||||
select: ['followerId', 'withReplies'],
|
||||
});
|
||||
|
||||
let userLists = await this.userListJoiningsRepository.find({
|
||||
@@ -857,6 +849,11 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||
|
||||
// TODO: あまりにも数が多いと redisPipeline.exec に失敗する(理由は不明)ため、3万件程度を目安に分割して実行するようにする
|
||||
for (const following of followings) {
|
||||
// 自分自身以外への返信
|
||||
if (note.replyId && note.replyUserId !== note.userId) {
|
||||
if (!following.withReplies) continue;
|
||||
}
|
||||
|
||||
redisPipeline.xadd(
|
||||
`homeTimeline:${following.followerId}`,
|
||||
'MAXLEN', '~', '200',
|
||||
@@ -878,6 +875,11 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||
}
|
||||
|
||||
for (const userList of userLists) {
|
||||
// 自分自身以外への返信
|
||||
if (note.replyId && note.replyUserId !== note.userId) {
|
||||
if (!userList.withReplies) continue;
|
||||
}
|
||||
|
||||
redisPipeline.xadd(
|
||||
`userListTimeline:${userList.userListId}`,
|
||||
'MAXLEN', '~', '200',
|
||||
@@ -893,49 +895,60 @@ export class NoteCreateService implements OnApplicationShutdown {
|
||||
}
|
||||
}
|
||||
|
||||
redisPipeline.xadd(
|
||||
`homeTimeline:${user.id}`,
|
||||
'MAXLEN', '~', '200',
|
||||
'*',
|
||||
'note', note.id);
|
||||
|
||||
if (note.fileIds.length > 0) {
|
||||
{ // 自分自身のHTL
|
||||
redisPipeline.xadd(
|
||||
`homeTimelineWithFiles:${user.id}`,
|
||||
'MAXLEN', '~', '100',
|
||||
'*',
|
||||
'note', note.id);
|
||||
}
|
||||
|
||||
if (note.visibility === 'public' || note.visibility === 'home') {
|
||||
redisPipeline.xadd(
|
||||
`userTimeline:${user.id}`,
|
||||
`homeTimeline:${user.id}`,
|
||||
'MAXLEN', '~', '200',
|
||||
'*',
|
||||
'note', note.id);
|
||||
|
||||
if (note.fileIds.length > 0) {
|
||||
redisPipeline.xadd(
|
||||
`userTimelineWithFiles:${user.id}`,
|
||||
`homeTimelineWithFiles:${user.id}`,
|
||||
'MAXLEN', '~', '100',
|
||||
'*',
|
||||
'note', note.id);
|
||||
}
|
||||
}
|
||||
|
||||
if (note.userHost == null) {
|
||||
if (note.visibility === 'public' || note.visibility === 'home') {
|
||||
// 自分自身以外への返信
|
||||
if (note.replyId && note.replyUserId !== note.userId) {
|
||||
redisPipeline.xadd(
|
||||
'localTimeline',
|
||||
'MAXLEN', '~', '1000',
|
||||
`userTimelineWithReplies:${user.id}`,
|
||||
'MAXLEN', '~', '200',
|
||||
'*',
|
||||
'note', note.id);
|
||||
} else {
|
||||
redisPipeline.xadd(
|
||||
`userTimeline:${user.id}`,
|
||||
'MAXLEN', '~', '200',
|
||||
'*',
|
||||
'note', note.id);
|
||||
|
||||
if (note.fileIds.length > 0) {
|
||||
redisPipeline.xadd(
|
||||
'localTimelineWithFiles',
|
||||
'MAXLEN', '~', '500',
|
||||
`userTimelineWithFiles:${user.id}`,
|
||||
'MAXLEN', '~', '100',
|
||||
'*',
|
||||
'note', note.id);
|
||||
}
|
||||
|
||||
if (note.userHost == null) {
|
||||
redisPipeline.xadd(
|
||||
'localTimeline',
|
||||
'MAXLEN', '~', '1000',
|
||||
'*',
|
||||
'note', note.id);
|
||||
|
||||
if (note.fileIds.length > 0) {
|
||||
redisPipeline.xadd(
|
||||
'localTimelineWithFiles',
|
||||
'MAXLEN', '~', '500',
|
||||
'*',
|
||||
'note', note.id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -99,19 +99,19 @@ export class NotificationService implements OnApplicationShutdown {
|
||||
}
|
||||
|
||||
if (recieveConfig?.type === 'following') {
|
||||
const isFollowing = await this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId));
|
||||
const isFollowing = await this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId));
|
||||
if (!isFollowing) {
|
||||
return null;
|
||||
}
|
||||
} else if (recieveConfig?.type === 'follower') {
|
||||
const isFollower = await this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId));
|
||||
const isFollower = await this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId));
|
||||
if (!isFollower) {
|
||||
return null;
|
||||
}
|
||||
} else if (recieveConfig?.type === 'mutualFollow') {
|
||||
const [isFollowing, isFollower] = await Promise.all([
|
||||
this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => followings.has(notifierId)),
|
||||
this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => followings.has(notifieeId)),
|
||||
this.cacheService.userFollowingsCache.fetch(notifieeId).then(followings => Object.hasOwn(followings, notifierId)),
|
||||
this.cacheService.userFollowingsCache.fetch(notifierId).then(followings => Object.hasOwn(followings, notifieeId)),
|
||||
]);
|
||||
if (!isFollowing && !isFollower) {
|
||||
return null;
|
||||
|
||||
@@ -487,6 +487,7 @@ export class UserEntityService implements OnModuleInit {
|
||||
isMuted: relation.isMuted,
|
||||
isRenoteMuted: relation.isRenoteMuted,
|
||||
notify: relation.following?.notify ?? 'none',
|
||||
withReplies: relation.following?.withReplies ?? false,
|
||||
} : {}),
|
||||
} as Promiseable<Packed<'User'>> as Promiseable<IsMeAndIsUserDetailed<ExpectsMe, D>>;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user