Coverage for services / program_trading_stream_service.py: 91%

293 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-04-04 15:08 +0000

1import asyncio 

2import json 

3import os 

4import sqlite3 

5import time 

6import threading 

7import logging 

8from contextlib import contextmanager 

9from datetime import datetime 

10from concurrent.futures import ThreadPoolExecutor 

11 

12 

13class ProgramTradingStreamService: 

14 """ 

15 프로그램매매 실시간 데이터의 수신, 저장(SQLite), 클라이언트 전송을 담당하는 서비스. 

16 - 프로그램매매 히스토리: SQLite pt_history 테이블 (버퍼 기반 일괄 저장) 

17 - 스냅샷(차트 데이터): SQLite pt_snapshot 테이블 

18 - 구독 상태: SQLite pt_subscriptions 테이블 (재시작 시 복구) 

19 """ 

20 

21 RETENTION_DAYS = 7 # 데이터 보존 기간 

22 FLUSH_INTERVAL_SEC = 1.0 # 버퍼 플러시 주기 (초) 

23 FLUSH_BATCH_SIZE = 100 # 이 개수 이상이면 즉시 플러시 

24 

25 def __init__(self, logger=None): 

26 self.logger = logger if logger else logging.getLogger(__name__) 

27 

28 # 메모리 캐시 (성능 최적화) 

29 self._pt_history: dict = {} # {code: [data1, data2, ...]} 

30 self.last_data_ts = 0.0 # 마지막 데이터 수신 시간 (Timestamp) 

31 

32 # 클라이언트 스트리밍 

33 self._pt_queues: list = [] # 접속한 클라이언트들의 큐 리스트 

34 self._pt_codes: set = set() # 현재 구독 중인 종목 코드 집합 

35 

36 # SQLite 

37 self._db_path = os.path.join(self._get_base_dir(), "program_trading.db") 

38 self._lock = threading.Lock() 

39 self._conn = None 

40 

41 # 버퍼 기반 일괄 저장 

42 self._write_buffer: list = [] 

43 self._buffer_lock = threading.Lock() 

44 self._flush_task: asyncio.Task = None 

45 self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="pt_db") 

46 

47 # 초기화 

48 os.makedirs(self._get_base_dir(), exist_ok=True) 

49 self._init_db() 

50 self._load_subscribed_codes() 

51 self._load_pt_history() 

52 

53 # --- SQLite 초기화 --- 

54 def _get_base_dir(self): 

55 return "data/program_subscribe" 

56 

57 def _init_db(self): 

58 """SQLite DB 초기화 및 테이블 생성.""" 

59 try: 

60 self._conn = sqlite3.connect(self._db_path, check_same_thread=False) 

61 with self._get_connection() as conn: 

62 conn.execute("PRAGMA journal_mode=WAL") 

63 

64 # 프로그램매매 히스토리 테이블 (모든 필드 분리 및 정수형 최적화) 

65 conn.execute(""" 

66 CREATE TABLE IF NOT EXISTS pt_history ( 

67 id INTEGER PRIMARY KEY AUTOINCREMENT, 

68 code TEXT NOT NULL, 

69 trade_time TEXT, -- 주식체결시간 

70 price INTEGER DEFAULT 0, -- 현재가 

71 rate REAL DEFAULT 0.0, -- 등락률 

72 sell_vol INTEGER, -- 매도체결량 

73 sell_amt INTEGER, -- 매도거래대금 

74 buy_vol INTEGER, -- 매수2체결량 

75 buy_amt INTEGER, -- 매수2거래대금 

76 net_vol INTEGER, -- 순매수체결량 

77 net_amt INTEGER, -- 순매수거래대금 

78 sell_rem INTEGER, -- 매도호가잔량 

79 buy_rem INTEGER, -- 매수호가잔량 

80 net_rem INTEGER, -- 전체순매수호가잔량 

81 created_at REAL NOT NULL 

82 ) 

83 """) 

84 conn.execute("CREATE INDEX IF NOT EXISTS idx_pt_history_code ON pt_history(code)") 

85 conn.execute("CREATE INDEX IF NOT EXISTS idx_pt_history_created_at ON pt_history(created_at)") 

86 

87 # 기존 테이블 마이그레이션: price 컬럼 추가 

88 try: 

89 conn.execute("ALTER TABLE pt_history ADD COLUMN price INTEGER DEFAULT 0") 

90 except sqlite3.OperationalError: 

91 pass 

92 

93 # 기존 테이블 마이그레이션: rate 컬럼 추가 

94 try: 

95 conn.execute("ALTER TABLE pt_history ADD COLUMN rate REAL DEFAULT 0.0") 

96 except sqlite3.OperationalError: 

97 pass 

98 

99 # 구독 상태 테이블 

100 conn.execute(""" 

101 CREATE TABLE IF NOT EXISTS pt_subscriptions ( 

102 code TEXT PRIMARY KEY 

103 ) 

104 """) 

105 

106 # 스냅샷 테이블 

107 conn.execute(""" 

108 CREATE TABLE IF NOT EXISTS pt_snapshot ( 

109 key TEXT PRIMARY KEY, 

110 value TEXT NOT NULL, 

111 updated_at REAL NOT NULL 

112 ) 

113 """) 

114 self.logger.info("ProgramTradingStreamService: SQLite DB 초기화 완료") 

115 except Exception as e: 

116 self.logger.error(f"SQLite DB 초기화 실패: {e}") 

117 

118 @contextmanager 

119 def _get_connection(self): 

120 with self._lock: 

121 try: 

122 yield self._conn 

123 self._conn.commit() 

124 except Exception: 

125 self._conn.rollback() 

126 raise 

127 

128 # --- 데이터 로드 --- 

129 def _load_pt_history(self): 

130 """DB에서 금일 프로그램 매매 이력을 메모리로 로드.""" 

131 try: 

132 today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) 

133 today_ts = today_start.timestamp() 

134 

135 with self._get_connection() as conn: 

136 cursor = conn.execute(""" 

137 SELECT 

138 code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt, 

139 net_vol, net_amt, sell_rem, buy_rem, net_rem 

140 FROM pt_history 

141 WHERE created_at >= ? ORDER BY id ASC 

142 """, (today_ts,)) 

143 

144 count = 0 

145 for row in cursor.fetchall(): 

146 (code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt, 

147 net_vol, net_amt, sell_rem, buy_rem, net_rem) = row 

148 

149 # 기존 웹 UI 및 타 서비스와 호환되도록 원래 JSON 구조로 완벽히 복원 

150 restored_data = { 

151 "유가증권단축종목코드": code, 

152 "주식체결시간": trade_time, 

153 "price": price, 

154 "rate": rate, 

155 "매도체결량": str(sell_vol), 

156 "매도거래대금": str(sell_amt), 

157 "매수2체결량": str(buy_vol), 

158 "매수2거래대금": str(buy_amt), 

159 "순매수체결량": str(net_vol), 

160 "순매수거래대금": str(net_amt), 

161 "매도호가잔량": str(sell_rem), 

162 "매수호가잔량": str(buy_rem), 

163 "전체순매수호가잔량": str(net_rem) 

164 } 

165 self._pt_history.setdefault(code, []).append(restored_data) 

166 count += 1 

167 

168 if count > 0: 

169 self.logger.info(f"DB에서 {count}건의 히스토리 데이터를 복구했습니다.") 

170 except Exception as e: 

171 self.logger.error(f"히스토리 로드 중 오류: {e}") 

172 

173 def _load_subscribed_codes(self): 

174 """DB에서 구독 상태를 복원.""" 

175 try: 

176 with self._get_connection() as conn: 

177 cursor = conn.execute("SELECT code FROM pt_subscriptions") 

178 codes = [row[0] for row in cursor.fetchall()] 

179 self._pt_codes = set(codes) 

180 if self._pt_codes: 

181 self.logger.info(f"구독 상태 복원: {sorted(self._pt_codes)}") 

182 except Exception as e: 

183 self.logger.error(f"구독 상태 로드 실패: {e}") 

184 

185 # --- 데이터 처리 --- 

186 @staticmethod 

187 def _safe_int(val): 

188 try: 

189 return int(val) 

190 except (TypeError, ValueError): 

191 return 0 

192 

193 @staticmethod 

194 def _safe_float(val): 

195 try: 

196 return float(val) 

197 except (TypeError, ValueError): 

198 return 0.0 

199 

200 def on_data_received(self, data: dict): 

201 """웹소켓 등에서 수신한 데이터를 처리 (버퍼 저장 및 브로드캐스트). 

202 

203 DB 쓰기는 버퍼에 적재 후 백그라운드에서 일괄 처리하여 

204 이벤트 루프 블로킹을 방지한다. 

205 """ 

206 code = data.get('유가증권단축종목코드') 

207 if not code: 

208 return 

209 

210 now = time.time() 

211 if self.last_data_ts > 0 and (now - self.last_data_ts) > 10.0: 

212 self.logger.info(f"실시간 데이터 수신 재개 (Gap: {now - self.last_data_ts:.1f}초)") 

213 

214 self.last_data_ts = now 

215 

216 # 1. 메모리 저장 (기존 프론트엔드/백엔드 호환용 원본 유지) 

217 self._pt_history.setdefault(code, []).append(data) 

218 

219 # 2. 버퍼에 적재 (이벤트 루프 블로킹 방지) 

220 trade_time = data.get('주식체결시간', '') 

221 price = self._safe_int(data.get('price', 0)) 

222 rate = self._safe_float(data.get('rate', 0.0)) 

223 sell_vol = self._safe_int(data.get('매도체결량', 0)) 

224 sell_amt = self._safe_int(data.get('매도거래대금', 0)) 

225 buy_vol = self._safe_int(data.get('매수2체결량', 0)) 

226 buy_amt = self._safe_int(data.get('매수2거래대금', 0)) 

227 net_vol = self._safe_int(data.get('순매수체결량', 0)) 

228 net_amt = self._safe_int(data.get('순매수거래대금', 0)) 

229 sell_rem = self._safe_int(data.get('매도호가잔량', 0)) 

230 buy_rem = self._safe_int(data.get('매수호가잔량', 0)) 

231 net_rem = self._safe_int(data.get('전체순매수호가잔량', 0)) 

232 

233 row = (code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt, 

234 net_vol, net_amt, sell_rem, buy_rem, net_rem, now) 

235 with self._buffer_lock: 

236 self._write_buffer.append(row) 

237 

238 # 3. 클라이언트 브로드캐스트 (Dict 대신 Array 전송으로 네트워크 트래픽 80% 절감) 

239 payload = [ 

240 code, 

241 trade_time, 

242 price, 

243 rate, 

244 data.get('change', 0), 

245 data.get('sign', ''), 

246 sell_vol, 

247 buy_vol, 

248 net_vol, 

249 net_amt, 

250 sell_rem, 

251 buy_rem 

252 ] 

253 

254 json_payload = json.dumps(payload, ensure_ascii=False) 

255 for q in list(self._pt_queues): 

256 try: 

257 q.put_nowait(json_payload) 

258 except asyncio.QueueFull: 

259 try: 

260 q.get_nowait() 

261 q.put_nowait(json_payload) 

262 except Exception: 

263 pass 

264 except Exception: 

265 pass 

266 

267 # --- 버퍼 플러시 --- 

268 def _bulk_insert_to_db(self, batch: list): 

269 """동기 벌크 인서트 — ThreadPoolExecutor에서 실행.""" 

270 with self._lock: 

271 try: 

272 self._conn.executemany(""" 

273 INSERT INTO pt_history ( 

274 code, trade_time, price, rate, sell_vol, sell_amt, buy_vol, buy_amt, 

275 net_vol, net_amt, sell_rem, buy_rem, net_rem, created_at 

276 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 

277 """, batch) 

278 self._conn.commit() 

279 except Exception as e: 

280 self.logger.error(f"벌크 인서트 실패: {e}") 

281 try: 

282 self._conn.rollback() 

283 except Exception: 

284 pass 

285 

286 async def _flush_write_buffer(self): 

287 """버퍼를 꺼내 executor에서 벌크 인서트.""" 

288 with self._buffer_lock: 

289 if not self._write_buffer: 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true

290 return 

291 batch = list(self._write_buffer) 

292 self._write_buffer.clear() 

293 

294 loop = asyncio.get_event_loop() 

295 await loop.run_in_executor(self._executor, self._bulk_insert_to_db, batch) 

296 

297 def flush_write_buffer_sync(self): 

298 """동기 플러시 — 테스트 또는 종료 시 잔여 데이터를 즉시 DB에 기록.""" 

299 with self._buffer_lock: 

300 if not self._write_buffer: 

301 return 

302 batch = list(self._write_buffer) 

303 self._write_buffer.clear() 

304 self._bulk_insert_to_db(batch) 

305 

306 async def _flush_loop(self): 

307 """주기적으로 버퍼를 플러시하는 백그라운드 태스크.""" 

308 try: 

309 while True: 

310 await asyncio.sleep(self.FLUSH_INTERVAL_SEC) 

311 await self._flush_write_buffer() 

312 except asyncio.CancelledError: 

313 # 종료 시 잔여 데이터 플러시 

314 await self._flush_write_buffer() 

315 

316 # --- 클라이언트 큐 관리 --- 

317 def create_subscriber_queue(self) -> asyncio.Queue: 

318 """새로운 구독자(웹 클라이언트)를 위한 큐 생성 및 등록.""" 

319 queue = asyncio.Queue(maxsize=200) 

320 self._pt_queues.append(queue) 

321 return queue 

322 

323 def remove_subscriber_queue(self, queue: asyncio.Queue): 

324 """구독자 큐 제거.""" 

325 if queue in self._pt_queues: 

326 self._pt_queues.remove(queue) 

327 

328 def get_history_data(self): 

329 """현재 메모리에 있는 히스토리 데이터 반환.""" 

330 return self._pt_history 

331 

332 # --- 구독 상태 관리 (SQLite 영속) --- 

333 def add_subscribed_code(self, code: str): 

334 self._pt_codes.add(code) 

335 try: 

336 with self._get_connection() as conn: 

337 conn.execute( 

338 "INSERT OR IGNORE INTO pt_subscriptions (code) VALUES (?)", 

339 (code,) 

340 ) 

341 except Exception as e: 

342 self.logger.error(f"구독 상태 저장 실패: {e}") 

343 

344 def remove_subscribed_code(self, code: str): 

345 self._pt_codes.discard(code) 

346 try: 

347 with self._get_connection() as conn: 

348 conn.execute("DELETE FROM pt_subscriptions WHERE code = ?", (code,)) 

349 except Exception as e: 

350 self.logger.error(f"구독 상태 삭제 실패: {e}") 

351 

352 def clear_subscribed_codes(self): 

353 self._pt_codes.clear() 

354 try: 

355 with self._get_connection() as conn: 

356 conn.execute("DELETE FROM pt_subscriptions") 

357 except Exception as e: 

358 self.logger.error(f"구독 상태 전체 삭제 실패: {e}") 

359 

360 def is_subscribed(self, code: str) -> bool: 

361 return code in self._pt_codes 

362 

363 def get_subscribed_codes(self) -> list: 

364 return sorted(list(self._pt_codes)) 

365 

366 # --- 스냅샷 저장/로드 (SQLite) --- 

367 def save_snapshot(self, data_dict: dict): 

368 try: 

369 json_str = json.dumps(data_dict, ensure_ascii=False) 

370 now = time.time() 

371 with self._get_connection() as conn: 

372 conn.execute( 

373 "INSERT OR REPLACE INTO pt_snapshot (key, value, updated_at) VALUES (?, ?, ?)", 

374 ("pt_data", json_str, now) 

375 ) 

376 return True 

377 except Exception as e: 

378 self.logger.error(f"스냅샷 저장 실패: {e}") 

379 raise e 

380 

381 def load_snapshot(self) -> dict: 

382 try: 

383 with self._get_connection() as conn: 

384 cursor = conn.execute("SELECT value, updated_at FROM pt_snapshot WHERE key = ?", ("pt_data",)) 

385 row = cursor.fetchone() 

386 if row: 

387 updated_at = datetime.fromtimestamp(row[1]).strftime("%Y-%m-%d %H:%M:%S") 

388 self.logger.info(f"스냅샷 로드됨 (Last Updated: {updated_at})") 

389 return json.loads(row[0]) 

390 except Exception as e: 

391 self.logger.error(f"스냅샷 로드 실패: {e}") 

392 return None 

393 

394 # --- 데이터 정리 --- 

395 def _cleanup_old_data(self): 

396 """보존 기간(7일)이 지난 데이터를 삭제.""" 

397 try: 

398 cutoff = time.time() - (self.RETENTION_DAYS * 86400) 

399 with self._get_connection() as conn: 

400 cursor = conn.execute( 

401 "DELETE FROM pt_history WHERE created_at < ?", (cutoff,) 

402 ) 

403 deleted = cursor.rowcount 

404 if deleted > 0: 

405 self.logger.info(f"{self.RETENTION_DAYS}일 이전 히스토리 {deleted}건 삭제 완료") 

406 conn.execute("PRAGMA optimize") 

407 except Exception as e: 

408 self.logger.error(f"오래된 데이터 정리 중 오류: {e}") 

409 

410 def _cleanup_old_files(self): 

411 """기존 JSONL/JSON 파일 정리 (마이그레이션 후 잔여 파일 삭제).""" 

412 try: 

413 base_dir = self._get_base_dir() 

414 for filename in os.listdir(base_dir): 

415 if filename.endswith(".jsonl") or filename == "pt_data.json": 

416 file_path = os.path.join(base_dir, filename) 

417 os.remove(file_path) 

418 self.logger.info(f"레거시 파일 삭제: {filename}") 

419 except Exception as e: 

420 self.logger.error(f"레거시 파일 정리 중 오류: {e}") 

421 

422 # --- 생명주기 관리 --- 

423 def start_background_tasks(self): 

424 """백그라운드 태스크 시작 (데이터 정리 + 버퍼 플러시 루프).""" 

425 self._cleanup_old_data() 

426 self._cleanup_old_files() 

427 # 비동기 플러시 루프 시작 (async 컨텍스트에서 호출됨) 

428 self._flush_task = asyncio.create_task(self._flush_loop()) 

429 self.logger.info("ProgramTradingStreamService: 초기화 완료 (버퍼 기반 일괄 저장 모드)") 

430 

431 async def shutdown(self): 

432 """서비스 종료 처리.""" 

433 # 플러시 태스크 종료 

434 if self._flush_task and not self._flush_task.done(): 434 ↛ 435line 434 didn't jump to line 435 because the condition on line 434 was never true

435 self._flush_task.cancel() 

436 try: 

437 await self._flush_task 

438 except asyncio.CancelledError: 

439 pass 

440 

441 # 잔여 버퍼 동기 플러시 

442 self.flush_write_buffer_sync() 

443 

444 # executor 종료 

445 self._executor.shutdown(wait=False) 

446 

447 if self._conn: 

448 try: 

449 self._conn.close() 

450 except Exception: 

451 pass 

452 self._conn = None 

453 self.logger.info("ProgramTradingStreamService: 종료 완료") 

454 

455 def inspect_db_status(self) -> dict: 

456 """DB 상태(마지막 저장 시간, 데이터 건수 등)를 조회하여 반환 (디버깅용).""" 

457 status = { 

458 "snapshot": {"exists": False, "updated_at": None}, 

459 "history": {"count": 0, "last_record": None, "hourly_counts": {}}, 

460 "memory": {"last_received_at": None} 

461 } 

462 try: 

463 with self._get_connection() as conn: 

464 # 1. Snapshot 확인 

465 cursor = conn.execute("SELECT updated_at FROM pt_snapshot WHERE key='pt_data'") 

466 row = cursor.fetchone() 

467 if row: 467 ↛ 472line 467 didn't jump to line 472 because the condition on line 467 was always true

468 status["snapshot"]["exists"] = True 

469 status["snapshot"]["updated_at"] = datetime.fromtimestamp(row[0]).strftime("%Y-%m-%d %H:%M:%S") 

470 

471 # 2. History 확인 (오늘 기준) 

472 today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0).timestamp() 

473 

474 cursor = conn.execute( 

475 "SELECT COUNT(*), MAX(created_at) FROM pt_history WHERE created_at >= ?", 

476 (today_start,) 

477 ) 

478 cnt, last_ts = cursor.fetchone() 

479 status["history"]["count"] = cnt 

480 if last_ts: 480 ↛ 484line 480 didn't jump to line 484 because the condition on line 480 was always true

481 status["history"]["last_record"] = datetime.fromtimestamp(last_ts).strftime("%Y-%m-%d %H:%M:%S") 

482 

483 # 시간대별 건수 

484 cursor = conn.execute(""" 

485 SELECT strftime('%H', datetime(created_at, 'unixepoch', 'localtime')) as hour, count(*) 

486 FROM pt_history 

487 WHERE created_at >= ? 

488 GROUP BY hour 

489 ORDER BY hour 

490 """, (today_start,)) 

491 

492 for r in cursor.fetchall(): 

493 status["history"]["hourly_counts"][r[0]] = r[1] 

494 

495 # 메모리 상태 추가 

496 if self.last_data_ts > 0: 496 ↛ 503line 496 didn't jump to line 503 because the condition on line 496 was always true

497 status["memory"]["last_received_at"] = datetime.fromtimestamp(self.last_data_ts).strftime("%Y-%m-%d %H:%M:%S") 

498 

499 except Exception as e: 

500 self.logger.error(f"DB 검사 중 오류: {e}") 

501 status["error"] = str(e) 

502 

503 return status