Leaky bucket (처리량 제어기)
Leaky bucket 알고리즘을 활용한 처리량 제어기를 만들어보자.
Leaky bucket 알고리즘이란?
Leaky bucket 알고리즘은 네트워크 트래픽 제어 및 처리량 제한(rate limiting)에 주로 사용되는 방법중 하나이다.
특징
- 대기열은 Queue (FIFO) 구조. 일정량의 요청을 담을 수 있음
- 대기열이 가득 차면 새로운 요청은 버려짐
- 고정된 속도로 요청을 처리함. 과도한 요청이 들어와도 대기열의 처리 속도는 변하지 않음
활용 사례
- API 서비스 제공 시 고객별 호출량 제한
소스코드
구현 소스코드는 다음과 같다.
import { concatMap, Subject, timer } from 'rxjs';
export interface RateLimiter {
run<T>(task: () => Promise<T>): Promise<T>;
}
/**
* 일정 주기 마다 작업을 균일하게 실행하는 처리량 제어기 입니다.
*/
export class LeakyBucketRateLimiter implements RateLimiter {
private readonly interval: number;
private readonly maxQueueSize: number;
private readonly stream = new Subject<() => void>();
private currentQueueSize = 0;
/**
* @param interval 작업간 처리 주기 (ms)
* @param maxQueueSize 작업 대기열 최대 크기
*/
constructor(interval: number, maxQueueSize: number) {
this.interval = interval;
this.maxQueueSize = maxQueueSize;
this.stream
.pipe(
concatMap((task) =>
timer(this.interval).pipe(
concatMap(() => {
task();
this.currentQueueSize--;
return [];
}),
),
),
)
.subscribe();
}
/**
* 대기열에 작업을 예약 합니다. 먼저 예약된 작업이 모두 완료된 후 실행 됩니다.
* @param task 실행할 작업
*/
run<T>(task: () => Promise<T>): Promise<T> {
if (this.maxQueueSize < this.currentQueueSize + 1) {
throw new Error('Rate limit exceeded.');
}
return new Promise<T>((resolve, reject) => {
const lazy = () => {
task().then(resolve, reject);
};
this.currentQueueSize++;
this.stream.next(lazy);
});
}
}
run 함수를 통해 작업을 등록하면 대기열에 추가하고 각 개별 작업들은 interval 값의 간격(밀리초)으로 실행된다. task 파라미터는 작업이 바로 시작되면 안되므로 Lazy Evaluation을 위해 비동기 작업을 반환하는 함수여야한다.
보통 Rate Limiter 구조에서는 queue를 많이 사용하는데 간략하게 코드를 작성하기 위해 rx의 stream을 queue처럼 활용했다. (어짜피 FIFO 역할은 동일함)
// 0.001초마다 작업을 처리하고 대기열에는 최대 10000개의 작업이 들어올 수 있음
const limiter = new LeakyBucketRateLimiter(1, 10000);
limiter.run(() => api_1()); // 0.001초뒤 실행
limiter.run(() => api_2()); // api_1 출발 후 0.001초뒤 실행
limiter.run(() => api_3()); // api_2 출발 후 0.001초뒤 실행
// 작업 api_4, 5, 6이 끝날 때 까지 대기
await Promise.all([
limiter.run(() => api_4()); // api_3 출발 후 0.001초뒤 실행
limiter.run(() => api_5()); // api_4 출발 후 0.001초뒤 실행
limiter.run(() => api_6()); // api_5 출발 후 0.001초뒤 실행
]);
실사용은 위 코드처럼 처리 주기와 최대 대기열 크기를 지정하여 객체를 생성하고, 처리량을 제어할 함수를 집어넣으면 된다. run 함수는 예약한 작업의 비동기 상태를 반환하므로 await을 통해 기다릴 수 있다.
참고 자료
Leaky bucket - Wikipedia
From Wikipedia, the free encyclopedia Network traffic shaping and policing algorithm This article is about the computer algorithm. For the metaphor in economic redistribution, see Leaky bucket (economics). The leaky bucket analogy. Water can be added inter
en.wikipedia.org
'Programming > Algorithm' 카테고리의 다른 글
[Algorithm] 이동 평균 필터 (Moving average filter) (0) | 2021.12.04 |
---|---|
[Algorithm] 선형 보간법 (Linear interpolation) (0) | 2021.04.22 |
[Algorithm] 지구에서 두 점 사이의 중간지점 구하기 (1) | 2020.09.15 |
[Algorithm] 지구에서 두 점 사이의 방위각 구하기 (0) | 2020.09.07 |
[Alogrithm] 지구에서 두 점 사이의 거리 구하기 (2) | 2020.08.25 |
댓글