Coverage for services / program_trading_stream_service.py: 91%
293 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-04-04 15:08 +0000
1import asyncio
2import json
3import os
4import sqlite3
5import time
6import threading
7import logging
8from contextlib import contextmanager
9from datetime import datetime
10from concurrent.futures import ThreadPoolExecutor
13class ProgramTradingStreamService:
14 """
15 프로그램매매 실시간 데이터의 수신, 저장(SQLite), 클라이언트 전송을 담당하는 서비스.
16 - 프로그램매매 히스토리: SQLite pt_history 테이블 (버퍼 기반 일괄 저장)
17 - 스냅샷(차트 데이터): SQLite pt_snapshot 테이블
18 - 구독 상태: SQLite pt_subscriptions 테이블 (재시작 시 복구)
19 """
21 RETENTION_DAYS = 7 # 데이터 보존 기간
22 FLUSH_INTERVAL_SEC = 1.0 # 버퍼 플러시 주기 (초)
23 FLUSH_BATCH_SIZE = 100 # 이 개수 이상이면 즉시 플러시
25 def __init__(self, logger=None):
26 self.logger = logger if logger else logging.getLogger(__name__)
28 # 메모리 캐시 (성능 최적화)
29 self._pt_history: dict = {} # {code: [data1, data2, ...]}
30 self.last_data_ts = 0.0 # 마지막 데이터 수신 시간 (Timestamp)
32 # 클라이언트 스트리밍
33 self._pt_queues: list = [] # 접속한 클라이언트들의 큐 리스트
34 self._pt_codes: set = set() # 현재 구독 중인 종목 코드 집합
36 # SQLite
37 self._db_path = os.path.join(self._get_base_dir(), "program_trading.db")
38 self._lock = threading.Lock()
39 self._conn = None
41 # 버퍼 기반 일괄 저장
42 self._write_buffer: list = []
43 self._buffer_lock = threading.Lock()
44 self._flush_task: asyncio.Task = None
45 self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="pt_db")
47 # 초기화
48 os.makedirs(self._get_base_dir(), exist_ok=True)
49 self._init_db()
50 self._load_subscribed_codes()
51 self._load_pt_history()
53 # --- SQLite 초기화 ---
54 def _get_base_dir(self):
55 return "data/program_subscribe"
57 def _init_db(self):
58 """SQLite DB 초기화 및 테이블 생성."""
59 try:
60 self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
61 with self._get_connection() as conn:
62 conn.execute("PRAGMA journal_mode=WAL")
64 # 프로그램매매 히스토리 테이블 (모든 필드 분리 및 정수형 최적화)
65 conn.execute("""
66 CREATE TABLE IF NOT EXISTS pt_history (
67 id INTEGER PRIMARY KEY AUTOINCREMENT,
68 code TEXT NOT NULL,
69 trade_time TEXT, -- 주식체결시간
70 price INTEGER DEFAULT 0, -- 현재가
71 rate REAL DEFAULT 0.0, -- 등락률
72 sell_vol INTEGER, -- 매도체결량
73 sell_amt INTEGER, -- 매도거래대금
74 buy_vol INTEGER, -- 매수2체결량
75 buy_amt INTEGER, -- 매수2거래대금
76 net_vol INTEGER, -- 순매수체결량
77 net_amt INTEGER, -- 순매수거래대금
78 sell_rem INTEGER, -- 매도호가잔량
79 buy_rem INTEGER, -- 매수호가잔량
80 net_rem INTEGER, -- 전체순매수호가잔량
81 created_at REAL NOT NULL
82 )
83 """)
84 conn.execute("CREATE INDEX IF NOT EXISTS idx_pt_history_code ON pt_history(code)")
85 conn.execute("CREATE INDEX IF NOT EXISTS idx_pt_history_created_at ON pt_history(created_at)")
87 # 기존 테이블 마이그레이션: price 컬럼 추가
88 try:
89 conn.execute("ALTER TABLE pt_history ADD COLUMN price INTEGER DEFAULT 0")
90 except sqlite3.OperationalError:
91 pass
93 # 기존 테이블 마이그레이션: rate 컬럼 추가
94 try:
95 conn.execute("ALTER TABLE pt_history ADD COLUMN rate REAL DEFAULT 0.0")
96 except sqlite3.OperationalError:
97 pass
99 # 구독 상태 테이블
100 conn.execute("""
101 CREATE TABLE IF NOT EXISTS pt_subscriptions (
102 code TEXT PRIMARY KEY
103 )
104 """)
106 # 스냅샷 테이블
107 conn.execute("""
108 CREATE TABLE IF NOT EXISTS pt_snapshot (
109 key TEXT PRIMARY KEY,
110 value TEXT NOT NULL,
111 updated_at REAL NOT NULL
112 )
113 """)
114 self.logger.info("ProgramTradingStreamService: SQLite DB 초기화 완료")
115 except Exception as e:
116 self.logger.error(f"SQLite DB 초기화 실패: {e}")
118 @contextmanager
119 def _get_connection(self):
120 with self._lock:
121 try:
122 yield self._conn
123 self._conn.commit()
124 except Exception:
125 self._conn.rollback()
126 raise
128 # --- 데이터 로드 ---
129 def _load_pt_history(self):
130 """DB에서 금일 프로그램 매매 이력을 메모리로 로드."""
131 try:
132 today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
133 today_ts = today_start.timestamp()
135 with self._get_connection() as conn:
136 cursor = conn.execute("""
137 SELECT
138 code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt,
139 net_vol, net_amt, sell_rem, buy_rem, net_rem
140 FROM pt_history
141 WHERE created_at >= ? ORDER BY id ASC
142 """, (today_ts,))
144 count = 0
145 for row in cursor.fetchall():
146 (code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt,
147 net_vol, net_amt, sell_rem, buy_rem, net_rem) = row
149 # 기존 웹 UI 및 타 서비스와 호환되도록 원래 JSON 구조로 완벽히 복원
150 restored_data = {
151 "유가증권단축종목코드": code,
152 "주식체결시간": trade_time,
153 "price": price,
154 "rate": rate,
155 "매도체결량": str(sell_vol),
156 "매도거래대금": str(sell_amt),
157 "매수2체결량": str(buy_vol),
158 "매수2거래대금": str(buy_amt),
159 "순매수체결량": str(net_vol),
160 "순매수거래대금": str(net_amt),
161 "매도호가잔량": str(sell_rem),
162 "매수호가잔량": str(buy_rem),
163 "전체순매수호가잔량": str(net_rem)
164 }
165 self._pt_history.setdefault(code, []).append(restored_data)
166 count += 1
168 if count > 0:
169 self.logger.info(f"DB에서 {count}건의 히스토리 데이터를 복구했습니다.")
170 except Exception as e:
171 self.logger.error(f"히스토리 로드 중 오류: {e}")
173 def _load_subscribed_codes(self):
174 """DB에서 구독 상태를 복원."""
175 try:
176 with self._get_connection() as conn:
177 cursor = conn.execute("SELECT code FROM pt_subscriptions")
178 codes = [row[0] for row in cursor.fetchall()]
179 self._pt_codes = set(codes)
180 if self._pt_codes:
181 self.logger.info(f"구독 상태 복원: {sorted(self._pt_codes)}")
182 except Exception as e:
183 self.logger.error(f"구독 상태 로드 실패: {e}")
185 # --- 데이터 처리 ---
186 @staticmethod
187 def _safe_int(val):
188 try:
189 return int(val)
190 except (TypeError, ValueError):
191 return 0
193 @staticmethod
194 def _safe_float(val):
195 try:
196 return float(val)
197 except (TypeError, ValueError):
198 return 0.0
200 def on_data_received(self, data: dict):
201 """웹소켓 등에서 수신한 데이터를 처리 (버퍼 저장 및 브로드캐스트).
203 DB 쓰기는 버퍼에 적재 후 백그라운드에서 일괄 처리하여
204 이벤트 루프 블로킹을 방지한다.
205 """
206 code = data.get('유가증권단축종목코드')
207 if not code:
208 return
210 now = time.time()
211 if self.last_data_ts > 0 and (now - self.last_data_ts) > 10.0:
212 self.logger.info(f"실시간 데이터 수신 재개 (Gap: {now - self.last_data_ts:.1f}초)")
214 self.last_data_ts = now
216 # 1. 메모리 저장 (기존 프론트엔드/백엔드 호환용 원본 유지)
217 self._pt_history.setdefault(code, []).append(data)
219 # 2. 버퍼에 적재 (이벤트 루프 블로킹 방지)
220 trade_time = data.get('주식체결시간', '')
221 price = self._safe_int(data.get('price', 0))
222 rate = self._safe_float(data.get('rate', 0.0))
223 sell_vol = self._safe_int(data.get('매도체결량', 0))
224 sell_amt = self._safe_int(data.get('매도거래대금', 0))
225 buy_vol = self._safe_int(data.get('매수2체결량', 0))
226 buy_amt = self._safe_int(data.get('매수2거래대금', 0))
227 net_vol = self._safe_int(data.get('순매수체결량', 0))
228 net_amt = self._safe_int(data.get('순매수거래대금', 0))
229 sell_rem = self._safe_int(data.get('매도호가잔량', 0))
230 buy_rem = self._safe_int(data.get('매수호가잔량', 0))
231 net_rem = self._safe_int(data.get('전체순매수호가잔량', 0))
233 row = (code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt,
234 net_vol, net_amt, sell_rem, buy_rem, net_rem, now)
235 with self._buffer_lock:
236 self._write_buffer.append(row)
238 # 3. 클라이언트 브로드캐스트 (Dict 대신 Array 전송으로 네트워크 트래픽 80% 절감)
239 payload = [
240 code,
241 trade_time,
242 price,
243 rate,
244 data.get('change', 0),
245 data.get('sign', ''),
246 sell_vol,
247 buy_vol,
248 net_vol,
249 net_amt,
250 sell_rem,
251 buy_rem
252 ]
254 json_payload = json.dumps(payload, ensure_ascii=False)
255 for q in list(self._pt_queues):
256 try:
257 q.put_nowait(json_payload)
258 except asyncio.QueueFull:
259 try:
260 q.get_nowait()
261 q.put_nowait(json_payload)
262 except Exception:
263 pass
264 except Exception:
265 pass
267 # --- 버퍼 플러시 ---
268 def _bulk_insert_to_db(self, batch: list):
269 """동기 벌크 인서트 — ThreadPoolExecutor에서 실행."""
270 with self._lock:
271 try:
272 self._conn.executemany("""
273 INSERT INTO pt_history (
274 code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt,
275 net_vol, net_amt, sell_rem, buy_rem, net_rem, created_at
276 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
277 """, batch)
278 self._conn.commit()
279 except Exception as e:
280 self.logger.error(f"벌크 인서트 실패: {e}")
281 try:
282 self._conn.rollback()
283 except Exception:
284 pass
286 async def _flush_write_buffer(self):
287 """버퍼를 꺼내 executor에서 벌크 인서트."""
288 with self._buffer_lock:
289 if not self._write_buffer: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true
290 return
291 batch = list(self._write_buffer)
292 self._write_buffer.clear()
294 loop = asyncio.get_event_loop()
295 await loop.run_in_executor(self._executor, self._bulk_insert_to_db, batch)
297 def flush_write_buffer_sync(self):
298 """동기 플러시 — 테스트 또는 종료 시 잔여 데이터를 즉시 DB에 기록."""
299 with self._buffer_lock:
300 if not self._write_buffer:
301 return
302 batch = list(self._write_buffer)
303 self._write_buffer.clear()
304 self._bulk_insert_to_db(batch)
306 async def _flush_loop(self):
307 """주기적으로 버퍼를 플러시하는 백그라운드 태스크."""
308 try:
309 while True:
310 await asyncio.sleep(self.FLUSH_INTERVAL_SEC)
311 await self._flush_write_buffer()
312 except asyncio.CancelledError:
313 # 종료 시 잔여 데이터 플러시
314 await self._flush_write_buffer()
316 # --- 클라이언트 큐 관리 ---
317 def create_subscriber_queue(self) -> asyncio.Queue:
318 """새로운 구독자(웹 클라이언트)를 위한 큐 생성 및 등록."""
319 queue = asyncio.Queue(maxsize=200)
320 self._pt_queues.append(queue)
321 return queue
323 def remove_subscriber_queue(self, queue: asyncio.Queue):
324 """구독자 큐 제거."""
325 if queue in self._pt_queues:
326 self._pt_queues.remove(queue)
328 def get_history_data(self):
329 """현재 메모리에 있는 히스토리 데이터 반환."""
330 return self._pt_history
332 # --- 구독 상태 관리 (SQLite 영속) ---
333 def add_subscribed_code(self, code: str):
334 self._pt_codes.add(code)
335 try:
336 with self._get_connection() as conn:
337 conn.execute(
338 "INSERT OR IGNORE INTO pt_subscriptions (code) VALUES (?)",
339 (code,)
340 )
341 except Exception as e:
342 self.logger.error(f"구독 상태 저장 실패: {e}")
344 def remove_subscribed_code(self, code: str):
345 self._pt_codes.discard(code)
346 try:
347 with self._get_connection() as conn:
348 conn.execute("DELETE FROM pt_subscriptions WHERE code = ?", (code,))
349 except Exception as e:
350 self.logger.error(f"구독 상태 삭제 실패: {e}")
352 def clear_subscribed_codes(self):
353 self._pt_codes.clear()
354 try:
355 with self._get_connection() as conn:
356 conn.execute("DELETE FROM pt_subscriptions")
357 except Exception as e:
358 self.logger.error(f"구독 상태 전체 삭제 실패: {e}")
360 def is_subscribed(self, code: str) -> bool:
361 return code in self._pt_codes
363 def get_subscribed_codes(self) -> list:
364 return sorted(list(self._pt_codes))
366 # --- 스냅샷 저장/로드 (SQLite) ---
367 def save_snapshot(self, data_dict: dict):
368 try:
369 json_str = json.dumps(data_dict, ensure_ascii=False)
370 now = time.time()
371 with self._get_connection() as conn:
372 conn.execute(
373 "INSERT OR REPLACE INTO pt_snapshot (key, value, updated_at) VALUES (?, ?, ?)",
374 ("pt_data", json_str, now)
375 )
376 return True
377 except Exception as e:
378 self.logger.error(f"스냅샷 저장 실패: {e}")
379 raise e
381 def load_snapshot(self) -> dict:
382 try:
383 with self._get_connection() as conn:
384 cursor = conn.execute("SELECT value, updated_at FROM pt_snapshot WHERE key = ?", ("pt_data",))
385 row = cursor.fetchone()
386 if row:
387 updated_at = datetime.fromtimestamp(row[1]).strftime("%Y-%m-%d %H:%M:%S")
388 self.logger.info(f"스냅샷 로드됨 (Last Updated: {updated_at})")
389 return json.loads(row[0])
390 except Exception as e:
391 self.logger.error(f"스냅샷 로드 실패: {e}")
392 return None
394 # --- 데이터 정리 ---
395 def _cleanup_old_data(self):
396 """보존 기간(7일)이 지난 데이터를 삭제."""
397 try:
398 cutoff = time.time() - (self.RETENTION_DAYS * 86400)
399 with self._get_connection() as conn:
400 cursor = conn.execute(
401 "DELETE FROM pt_history WHERE created_at < ?", (cutoff,)
402 )
403 deleted = cursor.rowcount
404 if deleted > 0:
405 self.logger.info(f"{self.RETENTION_DAYS}일 이전 히스토리 {deleted}건 삭제 완료")
406 conn.execute("PRAGMA optimize")
407 except Exception as e:
408 self.logger.error(f"오래된 데이터 정리 중 오류: {e}")
410 def _cleanup_old_files(self):
411 """기존 JSONL/JSON 파일 정리 (마이그레이션 후 잔여 파일 삭제)."""
412 try:
413 base_dir = self._get_base_dir()
414 for filename in os.listdir(base_dir):
415 if filename.endswith(".jsonl") or filename == "pt_data.json":
416 file_path = os.path.join(base_dir, filename)
417 os.remove(file_path)
418 self.logger.info(f"레거시 파일 삭제: {filename}")
419 except Exception as e:
420 self.logger.error(f"레거시 파일 정리 중 오류: {e}")
422 # --- 생명주기 관리 ---
423 def start_background_tasks(self):
424 """백그라운드 태스크 시작 (데이터 정리 + 버퍼 플러시 루프)."""
425 self._cleanup_old_data()
426 self._cleanup_old_files()
427 # 비동기 플러시 루프 시작 (async 컨텍스트에서 호출됨)
428 self._flush_task = asyncio.create_task(self._flush_loop())
429 self.logger.info("ProgramTradingStreamService: 초기화 완료 (버퍼 기반 일괄 저장 모드)")
431 async def shutdown(self):
432 """서비스 종료 처리."""
433 # 플러시 태스크 종료
434 if self._flush_task and not self._flush_task.done(): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true
435 self._flush_task.cancel()
436 try:
437 await self._flush_task
438 except asyncio.CancelledError:
439 pass
441 # 잔여 버퍼 동기 플러시
442 self.flush_write_buffer_sync()
444 # executor 종료
445 self._executor.shutdown(wait=False)
447 if self._conn:
448 try:
449 self._conn.close()
450 except Exception:
451 pass
452 self._conn = None
453 self.logger.info("ProgramTradingStreamService: 종료 완료")
455 def inspect_db_status(self) -> dict:
456 """DB 상태(마지막 저장 시간, 데이터 건수 등)를 조회하여 반환 (디버깅용)."""
457 status = {
458 "snapshot": {"exists": False, "updated_at": None},
459 "history": {"count": 0, "last_record": None, "hourly_counts": {}},
460 "memory": {"last_received_at": None}
461 }
462 try:
463 with self._get_connection() as conn:
464 # 1. Snapshot 확인
465 cursor = conn.execute("SELECT updated_at FROM pt_snapshot WHERE key='pt_data'")
466 row = cursor.fetchone()
467 if row: 467 ↛ 472line 467 didn't jump to line 472 because the condition on line 467 was always true
468 status["snapshot"]["exists"] = True
469 status["snapshot"]["updated_at"] = datetime.fromtimestamp(row[0]).strftime("%Y-%m-%d %H:%M:%S")
471 # 2. History 확인 (오늘 기준)
472 today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp()
474 cursor = conn.execute(
475 "SELECT COUNT(*), MAX(created_at) FROM pt_history WHERE created_at >= ?",
476 (today_start,)
477 )
478 cnt, last_ts = cursor.fetchone()
479 status["history"]["count"] = cnt
480 if last_ts: 480 ↛ 484line 480 didn't jump to line 484 because the condition on line 480 was always true
481 status["history"]["last_record"] = datetime.fromtimestamp(last_ts).strftime("%Y-%m-%d %H:%M:%S")
483 # 시간대별 건수
484 cursor = conn.execute("""
485 SELECT strftime('%H', datetime(created_at, 'unixepoch', 'localtime')) as hour, count(*)
486 FROM pt_history
487 WHERE created_at >= ?
488 GROUP BY hour
489 ORDER BY hour
490 """, (today_start,))
492 for r in cursor.fetchall():
493 status["history"]["hourly_counts"][r[0]] = r[1]
495 # 메모리 상태 추가
496 if self.last_data_ts > 0: 496 ↛ 503line 496 didn't jump to line 503 because the condition on line 496 was always true
497 status["memory"]["last_received_at"] = datetime.fromtimestamp(self.last_data_ts).strftime("%Y-%m-%d %H:%M:%S")
499 except Exception as e:
500 self.logger.error(f"DB 검사 중 오류: {e}")
501 status["error"] = str(e)
503 return status