인플럭스DB
시간 순서 데이터 저장 및 검색에 최적화 데이터 베이스오전.
다수의 거래와 관련된 데이터를 저장하여 시간별로 분석그것을하는 데 사용됩니다.
InfluxDB JOIN 쿼리
InfluxDB에 트랜잭션 데이터를 저장하고 특정 기간 동안의 트랜잭션 볼륨 및 가격 차이에 대한 데이터를 검색하는 쿼리를 작성했습니다.
다음과 같은 JOIN이 필요합니다.
조인 B ==> 엑스
씨 조인 디 ==> 와이
X 연결 Y ==> 결론
1. 느린 쿼리
먼저 다음과 같은 쿼리를 생성했습니다.
import "join"
import "date"
// 240분전부터 현재까지의 데이터 범위를 지정
// 문제가 되었던 부분: dataset 변수
dataset = from(bucket:"market") |> range(start: -240m) |> filter(fn: (r) => r("_measurement") == "tick")
// 시작 누적거래량
volFirst = dataset |> filter(fn: (r) => r("_field") == "acc_price") |> first()
// 마지막 누적거래량
volLast = dataset |> filter(fn: (r) => r("_field") == "acc_price") |> last()
// 거래량
vol = join.inner(
left: volFirst,
right: volLast,
on: (l, r) => l.product == r.product,
as: (l, r) => ({
l with product: l.product,
volume: if r._value > l._value then r._value - l._value else r._value
})
) |> group()
price = dataset |> filter(fn: (r) => r("_field") == "price")
// 시작 가격
first = price |> first()
// 마지막 가격
last = price |> last()
// 가격 변화: 시작 가격과 마지막 가격의 차이
diff = join.inner(
left: first,
right: last,
on: (l, r) => l.product == r.product,
as: (l, r) => ({
l with product: r.product, diff: r._value - l._value
})
) |> group()
// 거래량의 가격 변화를 상품별로 조인
// 결과 컬럼: (상품, 거래량, 가격 변화)
join.inner(
left: vol,
right: diff,
on: (l, r) => l.product == r.product,
as: (l, r) => ({
product: l.product,
volume: l._value,
diff: r.diff
})
)
변수 저장
결과를 얻기 위한 목표 데이터를 독립 변수라고 합니다.
기록 = from(bucket:”market”) |> range(start: -240m) |> filter(fn: (r) => r(“_measurement”) == “tick”)
쿼리 시간
대상 데이터 기간쿼리 시간은 변경 시 측정되었습니다.
범위(시작: -240m)
시간 범위가 증가하는 비율도 쿼리 속도를 늦춥니다.
| 5 분 | 0.3초 |
| 30 분 | 0.9초 |
| 60분 | 1.5초 |
| 90분 | 2.3초 |
| 120분 | 4초 |
| 240분 | 7.4초 |
이상한??
시간 범위가 작든 크든 처음과 마지막 값의 차이에 따라 필요한 데이터가 결정되므로 중요하지 않다고 생각했습니다.
쿼리 시간도 늘어납니다.
이상하게도 마지막 JOIN 쿼리까지 결과를 보면 모든 것이 빠릅니다. 마지막 조인 쿼리가 가장 많은 시간이 걸립니다.
뭐가 문제인지 정확히 모르겠습니다
만일을 대비하여 레코드 변수를 제거해 보았습니다.
2. 빠른 쿼리
대상 데이터는 dataset이라는 변수로 정의하여 사용하던 것을 제거하였다.
이전에는 시간만 변수에 저장되었으므로 작은 시차가 없습니다.
import "join"
import "date"
now = now()
startTime = date.sub(d: 240m, from: now)
// 시작 누적거래량
volFirst = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r("_measurement") == "tick") |> filter(fn: (r) => r("_field") == "acc_price") |> first()
// 마지막 누적거래량
volLast = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r("_measurement") == "tick") |> filter(fn: (r) => r("_field") == "acc_price") |> last()
// 거래량
vol = join.inner(
left: volFirst,
right: volLast,
on: (l, r) => l.product == r.product,
as: (l, r) => ({
l with product: l.product,
volume: if r._value > l._value then r._value - l._value else r._value
})
) |> group()
// 시작 가격
first = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r("_measurement") == "tick") |> filter(fn: (r) => r("_field") == "price") |> first()
// 마지막 가격
last = from(bucket:"market") |> range(start: startTime, stop:now) |> filter(fn: (r) => r("_measurement") == "tick") |> filter(fn: (r) => r("_field") == "price") |> last()
// 가격 변화: 시작 가격과 마지막 가격의 차이
diff = join.inner(
left: first,
right: last,
on: (l, r) => l.product == r.product,
as: (l, r) => ({
l with product: r.product, diff: r._value - l._value
})
) |> group()
// 거래량의 가격 변화를 상품별로 조인
// 결과 컬럼: (상품, 거래량, 가격 변화)
join.inner(
left: vol,
right: diff,
on: (l, r) => l.product == r.product,
as: (l, r) => ({
product: l.product,
volume: l.volume,
diff: r.diff
})
)
속도 측정
| 5 분 | 0.2초 |
| 30 분 | 0.2초 |
| 60분 | 0.2초 |
| 90분 | 0.2초 |
| 120분 | 0.2초 |
| 240분 | 0.2초 |
시간 범위가 늘어나도 쿼리 시간은 동일하게 유지됩니다. 이것이 제가 원하는 결과였습니다.
위 쿼리에 필요한 작업은 복잡하지 않습니다.
특정 시간 범위 내에서 시작 값과 종료 값만 보는 쿼리입니다.
그러나 데이터셋을 변수에 저장하고 두 개의 JOIN 프로세스에 걸쳐 사용하면 범위가 클수록 느려집니다.
쿼리 최적화에 문제가 있는 것 같습니다.
이 문제는 모든 레코드를 변수에 저장하지 않고 하나씩 지정하여 쿼리를 만들었기 때문에 사라졌습니다.
InfluxDB 문서를 찾아봤지만 원인을 찾기가 쉽지 않았습니다.
서류를 다시 검토해야 합니다.