仮想通貨の自動取引 チャート作成

仮想通貨で勝手に金を稼ぎ出すロボットを作ります。まずは、チャートを作るプログラムを作りました。
BinanceのAPIを使って、価格情報を得ています。

f:id:ryuuseigo:20210410180226p:plain
BTC/USDT
from binance.client import Client
import time
import numpy as np
import pandas as pd
import datetime
from pytz import timezone
import mplfinance as mpf
from matplotlib.ticker import MultipleLocator

class ChartGen(object):
    def __init__(self, start_date):
        API_KEY = '****************************************'
        API_SECRET = '****************************************'
        self.client = Client(API_KEY, API_SECRET)
        self.start_date = start_date # start_dateから現在までのOHLCを取得
    
    def get_df(self):
        klines = self.client.get_historical_klines(symbol='BTCUSDT', interval='1h', start_str=self.start_date)
        # klinesの各要素のlistのカラム
        klines_columns = ['Open time',
                          'Open',
                          'High',
                          'Low',
                          'Close',
                          'Volume',
                          'Close time',
                          'Quote asset volume',
                          'Number of trades',
                          'Taker buy base asset volume',
                          'Taker buy quote asset volume',
                          'Can be ignored']
        klines_df = pd.DataFrame(index=[], columns=klines_columns)
        for i in range(len(klines)):
            # 取得したklineはint型とstring型が混在しているので、全部float型にする
            klines_f = [float(klines[i][j]) for j in range(len(klines[i]))]
            # binance APIのtimestampは13桁であるのに対し、pythonのUNIX時間は10桁であるので、biannceのUNIX時間を1000で割る
            klines_f[0] = datetime.datetime.fromtimestamp(klines_f[0]/1000, datetime.timezone.utc)
            # listをSeriesに変換してから、DateFrameにappendする
            kline_s = pd.Series(klines_f, index=klines_df.columns)
            klines_df = klines_df.append(kline_s, ignore_index=True)
        klines_df.set_index('Open time', inplace=True)
    
        df = klines_df.loc[:,['Open', 'High', 'Low', 'Close', 'Volume']]
        return df


    def get_ma(self, data, n):
        # 単純移動平均の計算
        # nは移動平均の期間(4時間足でn=6なら期間は1日となる)
        
        ma = np.zeros(len(data))
        for i in range(len(data)):
            if i < n-1:
                ma[i] = np.nan # n個のデータが集まるまでは、np.nan
            else:
                ma[i] =  np.mean(data[i-(n-1):i+1])
        return ma
    
    
    def get_ema(self, data, n):
        # 指数移動平均の計算
        # nは移動平均の期間(4時間足でn=6なら期間は1日となる)
        # 先頭から連続するnan以外の欠損値がある場合、計算不可となるから注意
        
        nan_count = np.count_nonzero(np.isnan(data)) # 先頭からのnanの数を予め数えておく
        ema = np.zeros(len(data))
        alpha = 2 / (n + 1)
        for i in range(len(data)):
            if i < nan_count+n-1:
                ema[i] = np.nan # n個のデータが集まるまでは、np.nan
            elif i == nan_count+n:
                ema[i] = np.mean(data[i-(n-1):i+1])
            else:
                ema[i] = ema[i-1] + alpha * (data[i] - ema[i-1])
        return ema
    
    
    def get_bb(self, df, n):
        # ボリジャーバンドの計算
        # 単純移動平均から標準偏差を計算する
        
        ma = np.zeros(len(df))
        bb_std = np.zeros(len(df))
        for i in range(len(df)):
            if i < n-1:
                ma[i] = np.nan
                bb_std[i] = np.nan
            else:
                ma[i] =  np.mean(df['Close'][i-(n-1):i+1])
                bb_std[i] = np.std(df['Close'][i-(n-1):i+1])
                
        bb_dict = {'BB_2sigma_upper': ma + 2*bb_std,
                   'BB_2sigma_lower': ma - 2*bb_std,
                   'BB_3sigma_upper': ma + 3*bb_std,
                   'BB_3sigma_lower': ma - 3*bb_std}
        return bb_dict
    
    
    def get_rsi(self, df, n):
        # RSI(Relative Strength Index)の計算
        
        # diffは終値の差分
        diff = np.zeros((len(df)-1, 2)) # 差分なので0番目の要素は計算されないので要素数-1、さらに上昇と下降で分ける
        for i in range(len(df)-1):
            diff_ = df['Close'][i+1] - df['Close'][i]
            if diff_ > 0:
                diff[i,0] = diff_
            else:
                diff[i,1] = diff_

        diff_ma = [self.get_ma(diff[:,0], n), np.abs(self.get_ma(diff[:,1], n))]
        
        RSI = 100 * diff_ma[0] / (diff_ma[0] + diff_ma[1])
        RSI = np.insert(RSI, 0, np.nan) #要素数が足りないので、先頭にNanを追加
        return RSI


    def get_dmi(self, df):
        # DMI(Directional Movement Index)の計算
        # 現在の相場がボックスかトレンドかの判断に有効な指標
        # +DIは上昇の強さ, -DIは下降の強さ、ADXはトレンドの強さを表す
        # ADXがADX-Rを下から上抜いた時 -> トレンドが強い
        # ADXがADX-Rを上から下抜いた時 -> トレンドが弱い
        
        # +DM, -DMの計算
        pDM = np.zeros(len(df))
        nDM = np.zeros(len(df))
        for i in range(len(df)):
            if i == 0:
                pDM[i] = np.nan # 前日との差分からDMを計算するので先頭はnan
                nDM[i] = np.nan
            else:
                # +DM = 当日の高値 - 前日の高値 ただし、+DM<0の場合は0
                # -DM = 前日の安値 - 当日の安値 ただし、-DM<0の場合は0
                pDM[i] = np.max([df['High'][i] - df['High'][i-1], 0])
                nDM[i] = np.max([df['Low'][i-1] - df['Low'][i], 0])
    
                # 含み足の場合は+DMと-DMそれぞれで0以上の値となる為、値の大きい方を残して他方を0にする
                if nDM[i] >= pDM[i]:
                    pDM[i] = 0
                elif pDM[i] >= nDM[i]:
                    nDM[i] = 0

        # TRの計算
        TR = np.zeros(len(df))
        for i in range(len(df)):
            if i == 0:
                TR[i] = np.nan # 計算に前日のデータを使用するので先頭はnan
            else:
                # (当日の高値 - 当日の安値), (当日の高値 - 前日の終値), (前日の終値 - 当日の安値)
                # 上記の値で最大となるものをTRとする
                TR[i] = np.max([np.abs(df['High'][i]-df['Low'][i]),
                                np.abs(df['High'][i]-df['Close'][i-1]),
                                np.abs(df['Close'][i-1]-df['Low'][i])])

        # +DI, -DIの計算
        # 指数移動平均を使う(関数を使おうとすると逆に複雑なので、そのまま計算)
        pDI = np.zeros(len(df))
        nDI = np.zeros(len(df))
        n = 14 # 移動平均の期間(ワイルダーは期間14を推奨している)
        alpha = 2 / (n +1)
        for i in range(len(df)):
            if i < n-1:
                pDI[i] = np.nan
                nDI[i] = np.nan
            elif i == n:
                pDI[i] = (np.mean(pDM[i-(n-1):i+1]) / np.mean(TR[i-(n-1):i+1])) * 100
                nDI[i] = (np.mean(nDM[i-(n-1):i+1]) / np.mean(TR[i-(n-1):i+1])) * 100
            else:
                pDI[i] =  pDI[i-1] + alpha * (((pDM[i] / TR[i])*100) - pDI[i-1])
                nDI[i] =  nDI[i-1] + alpha * (((nDM[i] / TR[i])*100) - nDI[i-1])

        # ADXの計算
        DX = np.abs(pDI - nDI) / (pDI + nDI) * 100
        
        # DXの指数移動平均でADXを計算
        n_adx = 14 # ADXにおける移動平均の期間はDIとは異なる期間を用いることができる(ワイルダーはDIの計算に使った期間と同じ期間を推奨している)
        ADX = self.get_ema(DX, n_adx)
        # ADX-Rの計算
        n_adx_r = 26 #ADX-Rの移動平均に用いる期間はADXよりも長くする
        ADX_R = self.get_ema(DX, n_adx_r)
        
        DMI_dict = {'+DI': pDI,
                    '-DI': nDI,
                    'ADX': ADX,
                    'ADX-R': ADX_R}
        return DMI_dict


    def get_macd(self, df):
        # MACDの計算
        # MACDは価格の変化に敏感
        # MACDシグナルは価格の変化に鈍感
        # MACDがMACDシグナルを下から上抜いた時 -> 買い
        # MACDがMACDシグナルを上から下抜いた時 -> 売り
        
        # MACDは期間12と期間26の指数移動平均の差分
        ema_12 = self.get_ema(df['Close'], 12)
        ema_26 = self.get_ema(df['Close'], 26)
        MACD = ema_12 - ema_26

        # シグナルの計算
        # シグナルはMACDの期間9の指数移動平均で計算する
        signal = self.get_ema(MACD, 9)
        
        # MACDとシグナルの差を計算
        bar = np.zeros((len(MACD), 2))
        for i in range(len(MACD)):
            diff = MACD[i] - signal[i]
            if diff >= 0:
                bar[i,0] = diff
                bar[i,1] = np.nan
            elif diff < 0:
                bar[i,1] = diff
                bar[i,0] = np.nan
            else:
                bar[i,:] = np.nan
        
        MACD_dict = {'MACD': MACD,
                     'signal': signal,
                     'bar_high': bar[:,0],
                     'bar_low': bar[:,1]}
        return MACD_dict
    
    
    def get_signal(self, df):
        # RSI>70% かつ 2sigma を超えたときは買われすぎ
        # RSI<30% かつ -2sigma を超えたときは売られすぎ
        # ボックス相場で有効な指標
        # ADX < 40% でボックス相場と判断する
        signal = np.zeros((len(df), 2))
        for i in range(len(df)):
            if df['Close'][i] > df['BB_2sigma_upper'][i] and df['RSI'][i] > 70 and df['ADX'][i] < 40:
                signal[i,0] = df['Close'][i] * 1.01
                signal[i,1] = np.nan
            elif df['Close'][i] < df['BB_2sigma_lower'][i] and df['RSI'][i] < 30 and df['ADX'][i] < 40:
                signal[i,1] = df['Close'][i] * 0.99
                signal[i,0] = np.nan
            else:
                signal[i,:] = np.nan

        signal_dict = {'high': signal[:,0],
                       'low': signal[:,1]}
        return signal_dict


    def add_indicator(self, df):
        # データフレームに指標を追加
        df['MA'] = self.get_ma(df['Close'], 6)
        df['EMA'] = self.get_ema(df['Close'], 6)
        
        bb = self.get_bb(df, 6)
        df['BB_2sigma_upper'] = bb['BB_2sigma_upper']
        df['BB_2sigma_lower'] = bb['BB_2sigma_lower']
        df['BB_3sigma_upper'] = bb['BB_3sigma_upper']
        df['BB_3sigma_lower'] = bb['BB_3sigma_lower']
        
        df['RSI'] = self.get_rsi(df, 6)
        df['RSI_70'] = np.full(len(df), 70)
        df['RSI_30'] = np.full(len(df), 30)
        
        DMI = self.get_dmi(df)
        df['+DI'] = DMI['+DI']
        df['-DI'] = DMI['-DI']
        df['ADX'] = DMI['ADX']
        df['ADX-R'] = DMI['ADX-R']
        df['ADX_20'] = np.full(len(df), 20) # 20%<ADX<30%のラインを書く
        df['ADX_30'] = np.full(len(df), 30)
        
        MACD = self.get_macd(df)
        df['MACD'] = MACD['MACD']
        df['MACD_signal'] = MACD['signal']
        df['MACD_bar_high'] = MACD['bar_high']
        df['MACD_bar_low'] = MACD['bar_low']
        
        signal = self.get_signal(df)
        df['signal_high'] = signal['high']
        df['signal_low'] = signal['low']
        return df


    def draw_fig(self):
        df = self.get_df()
        df = self.add_indicator(df)

        fig = mpf.figure(figsize=(12,10),style='binance', tight_layout=True)

        heights = [2,1,1,1,1,1] # fig.add_gridspecで指定する各axの高さの比率
        gs = fig.add_gridspec(6,1, height_ratios=heights) # 3行1列のgridspecを作成

        ax1 = fig.add_subplot(gs[0,0]) # 1行1列にプロット
        ax2 = fig.add_subplot(gs[1,0], sharex=ax1) # 2行1列目にプロットし、sharex=ax1でax1とx軸を共通化
        ax3 = fig.add_subplot(gs[2,0], sharex=ax1)
        ax4 = fig.add_subplot(gs[3,0], sharex=ax1)
        ax5 = fig.add_subplot(gs[4,0], sharex=ax1)
        ax6 = fig.add_subplot(gs[5,0], sharex=ax1)

        adps = [mpf.make_addplot(df['MA'], ax=ax1, color='blue', width=0.5),
                mpf.make_addplot(df['EMA'], ax=ax1, color='red', width=0.5),
                mpf.make_addplot(df['BB_2sigma_upper'], ax=ax1, linestyle='dashdot', color='magenta', width=0.5),
                mpf.make_addplot(df['BB_2sigma_lower'], ax=ax1, linestyle='dashdot', color='cyan', width=0.5),
                mpf.make_addplot(df['BB_3sigma_upper'], ax=ax1, linestyle='dashdot', color='red', width=0.5),
                mpf.make_addplot(df['BB_3sigma_lower'], ax=ax1, linestyle='dashdot', color='blue', width=0.5),
                mpf.make_addplot(df['signal_high'], ax=ax1, type='scatter', markersize=50,marker='v', color='magenta'),
                mpf.make_addplot(df['signal_low'], ax=ax1, type='scatter', markersize=50,marker='^', color='cyan'),                 
                mpf.make_addplot(df['RSI'], ax=ax3, color='green', ylabel='RSI'),
                mpf.make_addplot(df['RSI_70'], ax=ax3, color='red', width=0.5),
                mpf.make_addplot(df['RSI_30'], ax=ax3, color='blue', width=0.5),
                mpf.make_addplot(df['+DI'], ax=ax4, color='green', ylabel='DMI'),
                mpf.make_addplot(df['-DI'], ax=ax4, color='red'),
                mpf.make_addplot(df['ADX'], ax=ax5, color='red', ylabel='ADX'),
                mpf.make_addplot(df['ADX-R'], ax=ax5, color='blue', ylabel='ADX'),
                mpf.make_addplot(df['ADX_20'], ax=ax5, color='gray', width=0.5),
                mpf.make_addplot(df['ADX_30'], ax=ax5, color='gray', width=0.5),
                mpf.make_addplot(df['MACD'], ax=ax6, color='red', ylabel='MACD'),
                mpf.make_addplot(df['MACD_signal'], ax=ax6, color='blue'),
                mpf.make_addplot(df['MACD_bar_high'], type='bar', ax=ax6, color='green'),
                mpf.make_addplot(df['MACD_bar_low'], type='bar', ax=ax6, color='red')]

        vlines = ['2021-04-03 00:00:00']
        hlines = [57941, 58270, 56830]

        mpf.plot(df,ax=ax1,
                 type='candle',
                 ylabel='BTC/USDT',
                 volume=ax2,
                 addplot=adps,
                 vlines=dict(vlines=vlines, colors='gray', linewidths=[0.1]),
                 hlines=dict(hlines=hlines, colors='gray', linewidths=[0.1]),
                 returnfig=True)

        ax1.tick_params(labelbottom=False) # x軸のラベルを表示しない
        ax2.tick_params(labelbottom=False)
        ax3.tick_params(labelbottom=False)
        ax4.tick_params(labelbottom=False)
        ax5.tick_params(labelbottom=False)
        ax6.tick_params(axis='x', labelrotation=90) # x軸のラベルを90度回転
        ax4_legend=['+DI', '-DI']
        ax5_legend=['ADX', 'ADX-R']
        ax6_legend=['MACD', 'signal']
        ax4.legend(ax4_legend, loc='upper left')
        ax5.legend(ax5_legend, loc='upper left')
        ax6.legend(ax6_legend, loc='upper left')

        fig.align_labels([ax1, ax2, ax3, ax4, ax5, ax6])
        
        ax1.xaxis.set_major_locator(MultipleLocator(6))
        ax1.xaxis.set_minor_locator(MultipleLocator(2))
        ax1.grid(True, which='major', linestyle='-')
        ax1.grid(True, which='minor', linestyle='--')
        ax2.grid(True, which='major', linestyle='-')
        ax2.grid(True, which='minor', linestyle='--')
        ax3.grid(True, which='major', linestyle='-')
        ax3.grid(True, which='minor', linestyle='--')
        ax4.grid(True, which='major', linestyle='-')
        ax4.grid(True, which='minor', linestyle='--')
        ax5.grid(True, which='major', linestyle='-')
        ax5.grid(True, which='minor', linestyle='--')
        ax6.grid(True, which='major', linestyle='-')
        ax6.grid(True, which='minor', linestyle='--')        

        mpf.show()
        fig.savefig('chart.png', dpi=300)

if __name__ == "__main__":
    
    start_date = '2021-04-02 00:00:00'
    chart = ChartGen(start_date)
    chart.draw_fig()