用Python 抓取股價歷史資料|股票量化交易從零開始(四)

上一篇文章裡,我們已經教你,該如何進行元富證券「下單」跟「行情」API 的認證。

在本章節裡,我們將會教你如何使用元富 API 抓取歷史的成交資料,以及劃出 K 線圖。

立即訂閱電子報,掌握最新資訊!

    電子郵件

    有興趣的主題
    量化交易金融知識台灣股市國內期貨海外期貨虛擬貨幣

    有興趣的量化交易軟體/平台
    不清楚MultiChartsTradingViewPythonXQMT4MT5

    還有什麼詢問的?

    【早鳥報名中】元富證券 X 量化通 模擬量化交易競賽

    好富投 1920x400
    好富投 978x258

    點我了解更多資訊


    Python 抓取股價歷史資料步驟

    步驟一:下載技術指標套件

    首先請到元富 API 專區的網頁,並且在「下載專區」的「技術指標」欄位中,下載「Python」 的技術指標套件。

    請注意,因為目前技術指標的套件,只支援 32 bit 的 Python,所以我們需要先額外設定環境。

    Py 101209161710
    Py 101209161711

    步驟二:環境建置

    套件下載並解壓縮後,將檔案放到我們的工作目錄底下,並用 VS Code 開啟,記得在左上角先打開 Terminal,因為稍後要輸入指令。

    元富python 3

    上面步驟完成後,接著先切換到目錄底下: cd .\Python_tech_analysis\

    再來需要輸入以下指令:

    • 查看可安裝 Python 版本: pyenv install -l
    • 安裝特定 Python 版本: pyenv install 3.8.10-win32
    • 使用3.8.10-win32做為這個專案資料夾的python 版本:pyenv local 3.8.10-win32
    • 確認一下現在的版本:
    元富python 3
    • 接下來創建 venv 環境: python -m venv venv
    • 啟用 venv 虛擬環境: venv\\Scripts\\activate
    元富python 2

    指令輸入完成後,這時我們左邊的目錄應該會跟下圖所示一樣:  

    元富python 3

    接著找到「venv」的資料夾,會看到同一資料夾底下有「example.py」跟一個「.whl」檔案,接著請依照下列指示安裝套件:

    • 安裝套件: pip install .\\tech_analysis_api_v2-0.0.3-py3-none-win32.whl
    元富python 2

    套件安裝完成後,這樣我們就完成環境的建置。

    步驟三:資料抓取測試

    完成環境建置後,接著我們要使用官方的範例,來測試資料的抓取。

    第一步開啟「example.py」檔案,這一部分也可以參考元富的官網說明

    接著在「main function」內,修改自己的帳號密碼(黃色塗鴉處)。

    元富python 3

    並且將「股票代號 2330」後面的日期,改成昨天儲存之後,執行此程式。如果這時候你在「Terminal」看到如下圖的畫面,這樣就表示執行成功。

    元富python 3

    Python 抓取多筆歷史資料&劃K線

    接下來的步驟,我們會使用筆者所撰寫的程式,透過元富 API 撈取台積電的股價資料,並繪製成 K 線圖。

    如果這部分看不懂的話,也可以直接使用文章最後的完整程式碼,只要複製貼上,並且儲存後執行即可。

    首先我們需要額外安裝一些 Python 套件:

    • 升級pip: python3 -m pip install –upgrade pip
    • 可能會有相容性問題所以需要手動安裝Pillow這個套件:pip install –only-binary Pillow Pillow
    • 安裝需要的套件: pip install numpy mplfinance

    註:「numpy」是 python 做資料處理時,很常用且重要的套件;「mplfinance」則是方便我們繪圖用的套件。

    套件安裝完成後,接下來我們在同一個資料夾裡,新增一個新的檔案,至於檔案名稱則隨意,像我是命名為「historyDataAndPlot.py」並貼入下列程式碼:

    from tech_analysis_api_v2.api import TechAnalysis
    from tech_analysis_api_v2.model import *
    
    
    import threading
    import pandas as pd
    from datetime import datetime, timedelta
    import mplfinance as mpf
    
    
    def OnDigitalSSOEvent(aIsOK, aMsg):
        print(f'OnDigitalSSOEvent: {aIsOK} {aMsg}')
    
    
    def OnTAConnStuEvent(aIsOK):
        print(f'OnTAConnStuEvent: {aIsOK}')
        if aIsOK:
            event.set()
    
    
    def OnUpdate(ta_Type: eTA_Type, aResultPre, aResultLast):
       
        if aResultPre != None:
            if ta_Type == eTA_Type.SMA:
                print(f'前K {str(aResultPre)}')
            if ta_Type == eTA_Type.EMA:
                print(f'前K {str(aResultPre)}')
            if ta_Type == eTA_Type.WMA:
                print(f'前K {str(aResultPre)}')
            if ta_Type == eTA_Type.SAR:
                print(f'前K {str(aResultPre)}')
            if ta_Type == eTA_Type.RSI:
                print(f'前K {str(aResultPre)}')
            if ta_Type == eTA_Type.MACD:
                print(f'前K {str(aResultPre)}')
            if ta_Type == eTA_Type.KD:
                print(f'前K {str(aResultPre)}')
            if ta_Type == eTA_Type.CDP:
                print(f'前K {str(aResultPre)}')
        if aResultLast != None:
            if ta_Type == eTA_Type.SMA:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, SMA:{aResultLast.Value}')
            if ta_Type == eTA_Type.EMA:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, EMA:{aResultLast.Value}')
            if ta_Type == eTA_Type.WMA:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, EMA:{aResultLast.Value}')
            if ta_Type == eTA_Type.SAR:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, SAR:{aResultLast.SAR}, EPh:{aResultLast.EPh}, EPl:{aResultLast.EPl}, AF:{aResultLast.AF}, RaiseFall:{aResultLast.RaiseFall}')
            if ta_Type == eTA_Type.RSI:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, RSI:{aResultLast.RSI}, UpDn:{aResultLast.UpDn}, UpAvg:{aResultLast.UpAvg}, DnAvg:{aResultLast.DnAvg}')
            if ta_Type == eTA_Type.MACD:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, DIF:{aResultLast.DIF}, OSC:{aResultLast.OSC}')
            if ta_Type == eTA_Type.KD:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, K:{aResultLast.K}, D:{aResultLast.D}')
            if ta_Type == eTA_Type.CDP:
                print(f'最新 Time:{aResultLast.KBar.TimeSn_Dply}, CDP:{aResultLast.CDP}, AH:{aResultLast.AH}, NH:{aResultLast.NH}, AL:{aResultLast.AL}, NL:{aResultLast.NL}')
           
    def OnRcvDone(ta_Type: eTA_Type, aResult):
        if ta_Type == eTA_Type.SMA:
            for x in aResult:
                print(f'回補 {x}')
        if ta_Type == eTA_Type.EMA:
            for x in aResult:
                print(f'回補 {x}')
        if ta_Type == eTA_Type.WMA:
            for x in aResult:
                print(f'回補 {x}')
        if ta_Type == eTA_Type.SAR:
            for x in aResult:
                print(f'回補 {x}')
        if ta_Type == eTA_Type.RSI:
            for x in aResult:
                print(f'回補 {x}')
        if ta_Type == eTA_Type.MACD:
            for x in aResult:
                print(f'回補 {x}')
        if ta_Type == eTA_Type.KD:
            for x in aResult:
                print(f'回補 {x}')
        if ta_Type == eTA_Type.CDP:
            for x in aResult:
                print(f'回補 {x}')
           
    def option():
        ProdID = input("商品代號: ")
        SNK = input("分K(1/3/5): ")
        STA_Type = input("指標(SMA/EMA/WMA/SAR/RSI/MACD/KD/CDP): ")
        DateBegin = input("日期(ex: 20230619): ")
    
    
        NK = eNK_Kind.K_1m
        if SNK == '1':
            NK = eNK_Kind.K_1m
        elif SNK == '3':
            NK = eNK_Kind.K_3m
        elif SNK == '5':
            NK = eNK_Kind.K_5m
    
    
        TA_Type = eTA_Type.SMA
        if STA_Type == 'SMA':
            TA_Type = eTA_Type.SMA
        elif STA_Type == 'EMA':
            TA_Type = eTA_Type.EMA
        elif STA_Type == 'WMA':
            TA_Type = eTA_Type.WMA
        elif STA_Type == 'SAR':
            TA_Type = eTA_Type.SAR
        elif STA_Type == 'RSI':
            TA_Type = eTA_Type.RSI
        elif STA_Type == 'MACD':
            TA_Type = eTA_Type.MACD
        elif STA_Type == 'KD':
            TA_Type = eTA_Type.KD
        elif STA_Type == 'CDP':
            TA_Type = eTA_Type.CDP
    
    
        return TechAnalysis.get_k_setting(ProdID, TA_Type, NK, DateBegin)
    
    
    event = threading.Event()
    
    
    def fetch_historical_data(prod_id='2330', date='20231115'):
        """
        將帳號密碼取代為你的帳號密碼,
        填入商品代號與日期,抓取指定日期的歷史成交資料。
        """
        ta = TechAnalysis(OnDigitalSSOEvent, OnTAConnStuEvent, OnUpdate, OnRcvDone)
        ta.Login('{{你的身分證字號}}', '{{你的密碼}}')  # Replace with your credentials
        event.wait()
        lsBS, sErrMsg = ta.GetHisBS_Stock(prod_id, date)
        if sErrMsg:
            print(f"Error: {sErrMsg}")
            return None
       
        # Converting data to DataFrame
        data = [{
            'ProdID': x.Prod,
            'Match_Time': x.Match_Time,
            'Match_Price': x.Match_Price,
            'Match_Quantity': x.Match_Quantity,
            'Is_TryMatch': x.Is_TryMatch,
            'BS': x.BS
        } for x in lsBS]
    
    
        df = pd.DataFrame(data)
        # 應用轉換
        df['Formatted_Time'] = df['Match_Time'].apply(convert_time)
        return df
    
    
    
    
    
    
    def convert_time(time_val):
        # 先轉成 String比較好處理 因為資料是回復一串數字
        time_str = str(time_val)
         
        time_str = time_str[:6]  
        if '.' in time_str:
            time_str = time_str.replace('.', '')
            time_str = '0' + time_str
    
    
        # 解析時間 小時、分鐘、秒
        hours = int(time_str[:2])
        minutes = int(time_str[2:4])
        seconds = int(time_str[4:6])
        # 返回datetime物件
        return datetime(2023, 11, 15, hours, minutes, seconds)
    
    
    def aggregate_to_ohlc(df, time_frame='5T'):
        """
        將給定的DataFrame轉換為指定時間幀的OHLC數據,並使用最後一個有效時間點的數據填充空白時間段。
    
    
        參數:
        df -- 原始的DataFrame。
        time_frame -- 要聚合的時間幀,默認為'5T'(五分鐘)。
    
    
        返回:
        轉換後的DataFrame,包含OHLC和總交易量。
        """
        df['Formatted_Time'] = pd.to_datetime(df['Formatted_Time'])
        df.set_index('Formatted_Time', inplace=True)
    
    
        # 定義聚合成OHLC的規則
        ohlc_dict = {
            'Match_Price': 'ohlc',
            'Match_Quantity': 'sum'
        }
    
    
        # 聚合資料
        df_ohlc = df.resample(time_frame).apply(ohlc_dict)
        df_ohlc.columns = df_ohlc.columns.droplevel(0)  # 移除多级列名
    
    
        # 13:25 並不會有交易,所以使用前一筆的資料填補
        df_ohlc.fillna(method='pad', inplace=True)
    
    
        return df_ohlc
    
    
    
    
    def main():
        df = fetch_historical_data()
        if df is not None:
            df_ohlc = aggregate_to_ohlc(df)
            print("---------印出OHLC資料---------")
            print(df_ohlc)
        if df_ohlc is not None:
            # 確保你的 DataFrame 索引是日期時間型別(datetime)
            df_ohlc.index = pd.to_datetime(df_ohlc.index)
            # 交易量欄位重新命名為 volume (mplfinance 預設欄位名稱)
            df_ohlc.rename(columns={'Match_Quantity': 'volume'}, inplace=True)
    
    
            mpf.plot(df_ohlc, type='candle', style='charles',
                    title='2330', volume=True)
    main()
    
    

    其實前面大部分的程式碼,都跟之前的範例檔案相同,但在這裡我們重寫抓取歷史資料的部分(fetch_historical_data),並且將其資料整理(aggregate_to_ohlc)成可以畫成 K 線圖的格式(須要有OHLC資料),然後在輸出成圖表(mpf.plot)。

    所以在「terminal」那邊輸入以下執行程式時,你應該會看到如下圖的輸出:

    python historyDataAndPlot.py
    元富python 4

    這樣就代表我們成功抓取資料,並且畫出 K 線圖囉,那麼本章就介紹到這邊!

      電子郵件

      有興趣的主題
      量化交易金融知識台灣股市國內期貨海外期貨虛擬貨幣

      有興趣的量化交易軟體/平台
      不清楚MultiChartsTradingViewPythonXQMT4MT5

      還有什麼詢問的?


      量化通粉絲社群,一起討論程式交易!

      加入LINE社群量化交易討論群」無壓力討論與分享!

      加入臉書社團「程式交易 Taiwan」即時獲取實用的資源!

      量化通
      量化通

      量化通是個致力於全民量化金融教育的社群,我們希望透過由淺入深的內容,帶領大家以正確觀念來實踐自動化的金融投資研究分析。

      文章: 182

      發佈留言

      發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *