본문 바로가기

Programming/Python

[Python] PyQt5, Watchdog을 이용한 파일 데이터 실시간 전송 프로그램 만들기

반응형

바쁘디바쁜 대학원 생활 속에서 위기의 순간들이 조금은 누그러진 어느날 문득 생각나 오랜만에 블로그를 켜보게 되었다. 나는 사실 블로그를 정보 전달의 목적보다는 나중에 내가 다시 그 일을 수행할 때 시행착오 없이 할 수 있도록 매뉴얼 만들듯이 정리하자는 의미로 처음 시작하게 되었다. 그래서 "나만 알아보면 되지" 라는 마인드로 다소 불친절하게 쓴 경우도 많았다. 그런데 내 생각보다 많은 분들이 온오프라인으로 관심을 가져주시고 피드백도 주셔서, 이에 호응하는 마음으로 연구실 생활하면서 쌓인 작은 경험과 지식을 나누는 일을 조금씩 해보고자 한다.

더불어 기존 포스팅에 부족한 부분이 정말 많이 발견되었다. 이 부분들도 하나씩 개선해서 더 완성도를 높이고자 한다. 많은 관심 바란다.

 

우리 연구실에서 운용중인 자연광 측정장비는 매 분마다 (정확히는 1회 측정 이후 1분 delay) 측정한 태양광의 광특성 정보를 파일로 저장한다. 보통은 이렇게 쌓인 파일 데이터를 하루 단위로 취합하여 데이터 정리를 하고 DB에 삽입하는 과정을 거치는데, 이 과정은 모두 수동으로 이루어지기 때문에 꽤나 불편함이 있었다. 이를 해결하고자 파일 데이터를 실시간으로 전송하는 프로그램을 파이썬으로 개발하게 되었다.

 

사실 작년부터 만들어놓겠다고 계획했었는데, 과제와 논문이 주 임무였기 때문에 한동안 손을 안댔는데, 최근 연구실에서 자연광 관련 개발 프로젝트를 시작하게 되면서 '강력한 필요에 의해' 이틀만에 뚝딱뚝딱 만들었다.

짧은 기간 안에 만들어야 했고, 관련 배경지식에 대한 부분도 풍부하지 않은 채 필요한 부분만 차용해서 개발했기에 아직 많이 부족하다. 따라서 본 포스팅에서는 사용된 패키지 등에 대한 자세한 설명은 자주 생략할 예정이다. 자세한 내용은 크게 중요하지 않고 검색을 통해 충분히 나오기 때문이다. 독자분들의 많은 양해 바라며, 그럼에도 불구하고 독자분들에게 필요한 부분이 있다면 도움이 되었으면 좋겠다.

 

오랜만의 포스팅이라 잡담좀 해봤고 본격적으로 이번 포스팅의 주제인 "파일 데이터 실시간 전송 프로그램"을 만들어보도록 하자.

 

1. Prerequisite 

1.1 watchdog

이번 포스팅에서 해결하려는 문제의 핵심은 파일이 자동으로 생성되면 그 생성을 '감지'할 수 있느냐이다. 파이썬의 'watchdog'은 이를 가능하게 해주는 패키지이다.

official docs: https://pythonhosted.org/watchdog/

 

Watchdog — watchdog 0.8.2 documentation

Watchdog Python API library and shell utilities to monitor file system events. Directory monitoring made easy with A cross-platform API. A shell tool to run commands in response to directory changes. Get started quickly with a simple example in Quickstart.

pythonhosted.org

1.2 PyQt5

사실 지금 만드려고 하는 프로그램이 복잡한 기능을 수행하는 프로그램이 아니므로 그냥 TUI기반으로 만들수도 있지만, 한 번 잘 만들어서 오랫동안 잘 쓰도록 하자는 철학(?)에 따라 GUI기반으로 구현해보기로 했다. 대신, 너무 많은 시간을 들이긴 어렵기 때문에 최대한 간단하면서도 필요한 기능들만 담을 수 있는 가벼운 GUI 프레임워크가 필요했다.

이에 적합한 녀석, 바로 PyQt5이다. 이전에도 몇 번 사용해봤는데 이 만큼 직관적인 코드로 GUI를 구성할 수 있는 프레임워크는 몇 없는 것 같다. 기초적인 동작 원리랑 코드만 봐도 쭉쭉 읽힌다.

official homepage: https://www.riverbankcomputing.com/software/pyqt/intro

 

Riverbank | Software | PyQt | What is PyQt?

What is PyQt? PyQt is a set of Python v2 and v3 bindings for The Qt Company's Qt application framework and runs on all platforms supported by Qt including Windows, OS X, Linux, iOS and Android. PyQt5 supports Qt v5. PyQt4 supports Qt v4 and will build agai

www.riverbankcomputing.com

1.3 Requests

requests는 HTTP 요청(request)을 만드는 패키지이다. REST API에 전송할 요청을 만들 때 내가 애용하는 아주아주 유용한 패키지이다.

official site: https://2.python-requests.org/en/master/

 

Requests: HTTP for Humans™ — Requests 2.22.0 documentation

Requests is the only Non-GMO HTTP library for Python, safe for human consumption. Requests allows you to send organic, grass-fed HTTP/1.1 requests, without the need for manual labor. There’s no need to manually add query strings to your URLs, or to form-en

2.python-requests.org

 

 

준비물을 다 챙겼다면 이제 기능을 하나씩 구현해보자.

 

2. GUI 설계

2.1 필요한 기능 정의

문제 해결을 위해 필요한 기능을 생각나는대로 쭉 나열해보았다.

 

2.1.1 디렉토리에 파일 생성 감지 기능

먼저 필요한 것은 '파일 생성 감지 기능'이다. 이는 watchdog를 이용하여 생각보다 간편하게 할 수 있다.

 

2.1.2 감시할 디렉토리 설정

그 다음은 '어떤 디렉토리를 감시할 것이냐'이다. 이를 위해 디렉토리 경로를 입력하는 입력창이 필요할 것이다.

 

2.1.3 감시 on / off

그다음 주어진 디렉토리의 파일 생성 감지 기능을 켜고 끄는 기능이 있을 것이다. 켜고끄는 기능을 버튼으로 만들되 토글 형식으로 켜고 끄게 구현할 수 있을 것 같다.

 

2.1.4 파일 데이터 서버로 전송

이제 새로 생성된 파일을 감지했으니 해당 파일 데이터를 자동으로 서버로 전송하는 기능이 있어야 할 것이다. 이것이 이 프로그램의 가장 핵심이 되는 기능일 것이다.

 

2.1.5 감시 상태 확인

마지막으로 현재 프로그램이 해당 디렉토리 감시를 하고 있는지 하지 않고 있는지 확인하는 기능이 필요하다. 필수적인 기능은 아니지만 프로그램의 완성도를 좀 더 높이기 위해 사용자에게 현재 상태를 알려줄 필요가 있다.

 

정말 간단하게 만들 것이기 때문에 대충 정리하면 이게 끝.

 

 

2.2 UI 설계

기능이 많지 않기 때문에 UI 설계도 정말 간단하게 구성할 것이다.

 

3. 구현

3.1 UI 구현

기능을 먼저 구현하고 UI를 나중에 구현하는 것이 소프트웨어공학적으로 더욱 좋은 품질의 소프트웨어를 개발할 수 있다는 것을 배웠지만 이 프로그램은 그런거 필요없다. 그만큼 간단한 프로젝트이므로.

그리고 또 눈에 뭐가 보여야 만드는 맛도 있고 하니깐...

 

2.2의 화면을 PyQt5로 구현한 코드는 다음과 같다.

# cas_streamer_gui.py

import sys

from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import (QApplication, QDesktopWidget, QLabel, QHBoxLayout, QVBoxLayout,
                             QWidget, QMainWindow, QLineEdit, QPushButton)
from debugmodule import Log


class CasStreamerFrame(QMainWindow):
    def __init__(self, title):
        super(CasStreamerFrame, self).__init__()
        self.title = title
        self.tag = 'CasStreamerFrame'

        # create widgets
        self.layout1 = QHBoxLayout()
        self.layout2 = QHBoxLayout()
        self.main_layout = QVBoxLayout()
        self.lbl = QLabel('스트리밍 대상 디렉토리')
        self.ledt = QLineEdit()
        self.lbl_state = QLabel('스트리밍 상태: 멈춤')
        self.btn = QPushButton('시작')
        self.widget = QWidget()

        self.setupUi()
        self.createActions()

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def setupUi(self):
        self.setGeometry(0, 0, 500, 100)
        self.setWindowTitle(self.title)
        self.setWindowIcon(QIcon('icon.png'))
        self.wnd2Center()

        # add widgets
        self.layout1.addWidget(self.lbl)
        self.layout1.addWidget(self.ledt)
        self.layout2.addWidget(self.lbl_state)
        self.layout2.addWidget(self.btn)
        self.main_layout.addLayout(self.layout1)
        self.main_layout.addLayout(self.layout2)

        # set layout and stretch widgets
        self.layout1.setContentsMargins(5, 5, 5, 5)
        self.layout1.setStretchFactor(self.lbl, 3)
        self.layout1.setStretchFactor(self.ledt, 7)
        self.layout2.setContentsMargins(5, 5, 5, 5)
        self.layout2.setStretchFactor(self.lbl_state, 5)
        self.layout2.setStretchFactor(self.btn, 5)
        self.widget.setLayout(self.main_layout)
        self.setCentralWidget(self.widget)

    def createActions(self):
        self.btn.clicked.connect(self.toggle_streaming)

    def toggle_streaming(self):
        pass
 
    def wnd2Center(self):
        # geometry of the main window
        qr = self.frameGeometry()
        # center point of screen
        cp = QDesktopWidget().availableGeometry().center()
        # move rectangle's center point to screen's center point
        qr.moveCenter(cp)
        # top left of rectangle becomes top left of window centering it
        self.move(qr.topLeft())


if __name__ == "__main__":
    app = QApplication(sys.argv)
    window = CasStreamerFrame('WitLab CAS data streamer v1.0 - jake')
    window.show()
    app.exec_()

 

이를 실행한 화면은 다음과 같다. 그림판으로 그린것과 똑같다.

PyQt5에서 GUI를 구성하는 컴포넌트 클래스명은 맨 앞에 모두 Q가 들어간다. 그 중 윈도우를 담당하는 클래스는 QMainWindow이다. 그리고 QMainWindow 안에 구성되는 컴포넌트들의 이름은 QWidget이다. 단순 텍스트를 표시하는 QLabel, 텍스트 입력창을 나타내는 QLineEdit, 버튼을 나타내는 QPushButton은 모두 QWidget의 자식클래스이다. 그리고 이 widget간의 정렬 및 배치를 결정하는 접시 역할을 하는 Q어쩌구Layout 클래스도 있다. QHBoxLayout은 addwidget()으로 추가된 widget을 가로 ('H'orizontal) 방향으로 배치하고, QVBoxLayout은 addwidget()으로 추가된 widget을 세로 ('V'ertical) 방향으로 배치한다. 그리고 각각의 배치된 widget에 대해 크기 조절을 할 수 있는 stretch 개념도 가지고 있다.

아래 그림은 UI 구조를 좀더 자세히 설명한 그림이다. 이 그림을 보면 대략 감이 올 것이고, 이 그림을 이해한 후 코드를 보면 술술 읽힐 것이다. 자세한 설명은 생략한다.

 

 

 

3.2 기능 구현

먼저 필수항목은 아니지만 개인적으로 자주쓰는 클래스 몇개를 가져왔다. 이것은 필요에 의해 개발중인 파이썬 기반 API 라이브러리 프로젝트에서 가져온 일부 코드이다. 별로 대단한 것은 아니다.

# debugmodule.py

class Log:
    def __init__(self):
        pass

    @staticmethod
    def _timestamp():
        import datetime
        return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    @staticmethod
    def _get_header(tag):
        return 'dbg_' + tag + ' ' + Log._timestamp() + '>>'

    @staticmethod
    def d(tag, *content):
        print(Log._get_header(tag), *content)

    @staticmethod
    def e(tag, *content):
        import sys
        print(Log._get_header(tag), *content, file=sys.stderr)


# basemodule.py

class Singleton(object):
    _instance = None

    def __new__(cls, *args, **kwargs):
        if not isinstance(cls._instance, Singleton):
            cls._instance = object.__new__(cls, *args, **kwargs)
        return cls._instance

 

Log클래스는 안드로이드의 Log 클래스와 비슷한 기능을 하며 모두 staticmethod이므로 특별한 객체생성없이 간단한 함수처럼 사용 가능하도록 구현했다. 그리고 측정데이터 파일 내용을 실시간 전송하는 기능인 CasEntryStreamer는 프로그램상 단 한개만 존재해야 메모리 누수 등의 문제가 발생하지 않으므로 Singleton 패턴을 사용하였고, 이를 위한 base class를 구현하였다.

그리고 한가지 더 있다면 CasEntry라는 클래스를 볼 수 있다. 우리 연구실의 광측정 장비는 빛을 1회 측정할 경우 수백가지의 광특성 데이터를 산출한다. 이렇게 1회 측정 시 발생하는 데이터는 1개의 파일로 저장되는데 CasEntry 클래스는 이 광특성 데이터 파일 1개에 대한 정보를 담을 수 있는 클래스이다. 이 클래스는 생성자의 인자로 파일의 절대경로를 문자열로 받는다.

 

 

이제 특정 디렉토리에 파일이 생성되는 것을 감지하는 기능을 구현해보자. 

# cas_streamer.py

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from entries.cas_entry import CasEntry
from basemodule import Singleton
from debugmodule import Log
import time


class MyEventHandler(FileSystemEventHandler):
    def __init__(self, observer, dirpath):
        self.observer = observer
        self.dirpath = dirpath
        self.tag = 'MyEventHandler'
        self.wait = 1
        self.retry = 10

    def on_created(self, event):
        # TODO: process when a file created in the selected directory
        if event.event_type == 'created':
            Log.d(self.tag, 'file creation detected. waiting for creating completed for %ds...' % self.wait)
            time.sleep(self.wait)

            entry = CasEntry('')
            for i in range(self.retry):
                entry = CasEntry(event.src_path)
                if entry.valid:
                    break
                else:
                    Log.e(self.tag, 'new file is not valid. retrying...(%d / %d)' % (i + 1, self.retry))
                    time.sleep(1)

            if not entry.valid:
                Log.e(self.tag, 'ISD file parsing error. streaming aborted.')
                return

            Log.d(self.tag, 'send %s ::' % event.src_path, str(entry.get_category())[:50], '...')
            # TODO: send cas entry
            self.send_entry(entry, mode='cas')

        else:
            pass

    def send_entry(self, entry, mode):
        pass


class CasEntryStreamer(Singleton):
    def __init__(self):
        self.observer = None
        self.is_streaming = False

    def set_observer(self, path=None):
        if path:
            self.remote_dirpath = path
        else:
            self.remote_dirpath = ''

        self.observer = Observer()
        event_handler = MyEventHandler(self.observer, self.remote_dirpath)
        self.observer.schedule(event_handler, self.remote_dirpath, recursive=True)

    def streaming_on(self):
        if not self.observer:
            self.set_observer(self.remote_dirpath)
        self.observer.start()
        self.is_streaming = True

    def streaming_off(self):
        self.observer.stop()
        self.observer = None
        self.is_streaming = False

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.streaming_off()

 

파일 생성 감지를 위해서는 Observer와 FileSystemEventHandler라는 클래스를 함께 이용해야 한다. 큰 흐름을 간단히 설명하자면 다음과 같다.

 

  1. 먼저 파일 생성이 감지되면 무슨 일을 할 것인지를 FileSystemEventHandler를 상속하여 정의해야한다. 본 포스팅에서는 FileSystemEventHandler의 자식 클래스인 MyEventHandler를 정의하였다.
  2. 파일이 생성되면 FileSystemEventHandler의 하위 메소드 중 on_created() 메소드가 콜백처럼 호출된다. 인자로는 event 객체가 전달되는데, event.event_type은 발생한 이벤트의 종류, event.src_path는 생성된 파일의 절대경로이다.
  3. on_created()는 파일 신규 생성 이외에 다른 이벤트에도 호출된다. 우리는 신규 생성 이벤트만 감지하고 싶으므로 event.event_type이 'created'인 경우에만 코드를 실행하도록 처리했다.
  4. 광측정장비는 측정에 다소 시간이 걸리기도 한다. 따라서 파일생성 감지 직후 미리 설정된 delay time이 지난 뒤 CasEntry로 객체화한다. 생성된 파일이 엉뚱한 형식일 경우 CasEntry 객체 내부 valid 플래그가 False로 set되므로, 올바른 파일형식이 아니면 미리 설정된 retry 횟수만큼 재시도를 한 후, 그래도 올바른 파일이 아니면 해당 파일 전송을 취소한다.

이러한 과정을 거쳐 선택된 파일은 json으로 직렬화되어 서버로 전송되는데, 이 부분이 또 독립된 send_entry() 메소드로 구현되었다. 상기 코드에서 send_entry() 메소드 부분을 다음과 같이 구현하였다.

 

# cas_streamer.py

...(생략)

    def send_entry(self, entry, mode):
        import requests
        post_data = entry.get_category(category=mode, str_key_type=True)
        response = None
        while not response:
            try:
                Log.d(self.tag, 'method: POST, url: http://xxx.xxx.xxx.xxx:xxxx/api/nl/witlab/cas/stream')
                Log.d(self.tag, 'body:', str(post_data)[:50], '...')
                response = requests.post('http://xxx.xxx.xxx.xxx:xxxx/api/nl/witlab/cas/stream', json=post_data)
                time.sleep(1)
            except Exception as e:
                Log.e(self.tag, 'http post error:', e.__class__.__name__)
                time.sleep(1)

        Log.d(self.tag, 'response: ', response.text)
        
...(생략)

 

send_entry()의 흐름은 다음과 같다.

  1. entry의 get_category()는 entry 객체의 내용을 dictionary 형태로 변환하는 것을 의미한다. 이 entry의 내용을 post_data에 담고, 이를 서버에 POST method의 body로 전달하는데, 서버 단에서도 이에 맞게 구현을 해주어야 한다. 이 내용도 포스팅을 할 기회가 있었으면 좋겠다.
  2. 전송에 실패하거나 서버 오류가 발생하면 프로그램이 뻗는 문제를 해결하기 위해 예외처리를 수행하고 예외가 발생할 때마다 재전송을 할 수 있도록 loop를 구성했다. 정상전송되면 응답 내용을 log로 찍고 끝난다.

마지막으로 실시간 전송 기능을 담당하는 CasEntryStreamer 클래스와 UI를 담당하는 CasStreamerFrame 클래스의 연동을 위해 CasStreamerFrame 클래스의 toggle_streaming() 메소드를 아래와 같이 구현한다.

from cas_streamer import CasEntryStreamer

...(생략)

class CasStreamerFrame(QMainWindow):
    def __init__(self, title):
    	self.streamer = CasEntryStreamer()
        
    	...(생략)
    
    def toggle_streaming(self):
        if self.streamer.is_streaming:
            # streaming off
            self.streamer.streaming_off()

            # change btn caption
            self.lbl_state.setText('스트리밍 상태: 멈춤')
            self.btn.setText('시작')

        else:
            try:
                # setup
                self.streamer.set_observer(self.ledt.text())

                # streaming on
                self.streamer.streaming_on()
            except Exception as e:
                Log.e(self.tag, 'directory path error! :', e.__class__.__name__)
                return

            # change btn caption
            self.lbl_state.setText('스트리밍 상태: 동작중')
            self.btn.setText('정지')

        Log.d(self.tag, self.lbl_state.text(), ',', self.streamer.remote_dirpath)

...(생략)

 

 

 

이렇게 구현은 끝이 났다. 설명하는 재주가 없어 조금 장황했을 수도 있지만 concept 자체는 간단하기 때문에 금방 이해할 수 있으리라 믿는다.

 

 

테스트는 자체적으로 해봤는데 잘 된다. 다만 측정환경이 외부망이고 서버는 학교 내부망에 존재하기 때문에, 실사용 테스트는 아직 해보지 않았다. 서버에 외부로 open된 SSH 포트를 통한 포트포워딩 등을 통해 로깅을 수행할 예정이다.

 

반응형