はじめに

弊社では全国にエッジ端末を展開して、それに接続された蓄電池のデータ集計や不具合への対処を行っております。
これまで、接続された機器の管理画面には、通常エンジニアがVPNに接続してエッジ端末に入ってローカルポートフォワードを実行することでアクセスしていました。ただ、毎度エンジニアがミーティングに参加して設定の手伝いをするのは時間効率が悪く、非エンジニアでも安全に管理画面にアクセスできるようにするためAWS IoT Greengrassとリモートポートフォワーディングを組み合わせた安全な中継システムを実装しました。

システムアーキテクチャ

構成図

本システムは、Greengrassコンポーネントと中央サーバーの2つの主要部分で構成されています。中央サーバーは3つのコンテナで構成され、それぞれForwardingWeb(一覧画面)、ForwardingApp、ForwardingProxy(Apache + SSH)の役割を担います。
ForwardingProxyコンテナは、コアデバイスからのSSH接続を受け付け、デバイスごとに一意のポートを割り当てます。このポートはApacheのリバースプロキシによって内部的に振り分けられます。

コンポーネントの実装

コンポーネントの役割はエッジデバイス内の特定のポート宛のパケットをSmartLoggerの443ポートに転送することと、エッジデバイス内の特定のポート宛のパケットを、SSHを介して中央サーバーに転送して中央サーバ内からlocalhost:portでアクセスできるようにすることの2つです。

DNATルールを設定して443ポートへパケットを転送するための関数

async def set_dnat_rule(self):
   """DNATルールを設定"""
   try:
       # 既存のルールを削除(エラーは無視)
       remove_cmd = [
           'iptables',
           '-t', 'nat',
           '-D', 'OUTPUT',
           '-p', 'tcp',
           '--dport', LOCAL_BIND_PORT,
           '-j', 'DNAT',
           '--to-destination', f'{self.target_ip}:{self.target_port}'
       ]
       
       process = await asyncio.create_subprocess_exec(
           *remove_cmd,
           stdout=subprocess.PIPE,
           stderr=subprocess.PIPE
       )
       await process.communicate()

       # 新しいルールを追加
       add_cmd = [
           'iptables',
           '-t', 'nat',
           '-I', 'OUTPUT',
           '-p', 'tcp',
           '--dport', LOCAL_BIND_PORT,
           '-j', 'DNAT',
           '--to-destination', f'{self.target_ip}:{self.target_port}'
       ]
       
       process = await asyncio.create_subprocess_exec(
           *add_cmd,
           stdout=subprocess.PIPE,
           stderr=subprocess.PIPE
       )
       
       stdout, stderr = await process.communicate()
       if process.returncode != 0:
           raise Exception(f"iptables command failed: {stderr.decode()}")
       
       Logger.info("DNAT rule set successfully")
       return True

   except Exception as e:
       Logger.error(f"Failed to set DNAT rule: {e}")
       return False

Autosshを利用して中央サーバへリモートポートフォワードを開始する関数

async def start_tunnel(self, timeout_seconds: int = 30):
    try:
        # 鍵はpayloadからbase64デコードしてファイルに保存
        public_key  = base64.b64decode(self.public_key).decode()
        private_key = base64.b64decode(self.private_key).decode()
        with open(self.pubkey_path,"w") as f:
          f.write(public_key)
        with open(self.prvkey_path,"w") as f:
          f.write(private_key)

        os.chmod(self.pubkey_path,0o600)
        os.chmod(self.prvkey_path,0o600)

        start_at   = datetime.datetime.now(TIME_ZONE)
        expires_in = start_at + datetime.timedelta(seconds=timeout_seconds)

        start_at   = start_at.strftime("%Y/%m/%d %H:%M:%S")
        expires_in = expires_in.strftime("%Y/%m/%d %H:%M:%S")

        command = [
            'timeout',
            f'{self.ssh_timeout}s',
            'autossh',
            '-tt',
            '-M', '0',
            '-i', self.prvkey_path, 
            '-v', 
            '-o', 'StrictHostKeyChecking=no',  
            '-o', 'ServerAliveInterval=30', 
            '-o', 'ServerAliveCountMax=3',
            '-o', 'UserKnownHostsFile=/dev/null', 
            '-R', f'{self.remote_bind_port}:{self.target_ip}:{self.target_port}', # よくよく考えたらDNAT不要なので、直接ターゲットのIP/PORTで転送
            '-p', f'{self.ssh_port}',
            f'{self.ssh_user}@{self.ssh_host}',
            f'"sleep {self.ssh_timeout} && echo FORWARDING::{THING_NAME}::{self.remote_bind_port}::{self.target_ip}::{self.target_port}::{start_at}::{expires_in}::BRIDGE"'
        ]

        Logger.info(f"Starting SSH tunnel: {' '.join(command)}")
        self.process = await asyncio.create_subprocess_shell(
            " ".join(command),
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )

        await asyncio.sleep(2)
        
        if self.process.returncode is None or self.process.returncode == 0:
            Logger.info("SSH tunnel process started successfully")
            return True, asyncio.ensure_future(self._monitor_process()), None
        else:
            error_msg = f"SSH tunnel failed to start with return code: {self.process.returncode}"
            Logger.error(error_msg)
            Logger.error(self.process.stderr)
            return False, None, error_msg

    except Exception as e:
        error_msg = f"Failed to start tunnel: {e}"
        Logger.error(error_msg)
        return False, None, error_msg

パケットの転送は管理画面から開始ボタンが押下されて特定のトピック(例えば “/portforward/{thing-name}/start” )にメッセージがパブリッシュされたことをコンポーネント側が検知すると開始するようになっています。
また、timeoutコマンドを利用することで制限時間になったら強制的にSSH接続が切断されるようにしています(ウェブサイトから切断ボタンを押しても実際には切断できていなかった、みたいなことがあるとだいぶ危険)

サーバー側の設定

中央サーバでは管理画面を構成するのみではなくどのエッジ端末にどのポートに割り振るのか、動的に処理しています。中央サーバーは8080-8089の10個のポートを内部的に管理し、各接続リクエストに対して以下のロジックで割り当てを行います。

  1. 同一デバイスからの接続要求の場合、既存の割り当てポートを再利用
  2. 未使用ポートがある場合、新規に割り当て
  3. 期限切れポートの自動解放(SSH_TIMEOUTに基づく)
bind_port = nil
    bind_ports = Rails.cache.fetch("remote_bind_ports")
    if bind_ports
      bind_ports.each do |key,vals|
        # 既存のポートで期限切れのものを一旦リセット
        if vals["expired_at"] && vals["expired_at"] < Time.current.to_i
          bind_ports[key] = {
            "location_code" => nil,
            "thing_name"    => nil,
            "target_ip"     => nil,
            "expired_at"    => nil,
            "updated_at"    => nil
          }
        end
      end
      # すでに同じthing,targetに対して転送中の場合は引き続きそれを指定
      bind_port,vals = bind_ports.select {|key,vals| vals["thing_name"] == device.thing_name && vals["target_ip"] == target_ip }.first
      bind_port,vals = bind_ports.select{|k,v| !v["expired_at"]}.first if !bind_port
    else
      bind_ports = self.gen_default_bind_ports
      bind_port  = "8080"

開発環境でのテスト

無事にユーザ認証でログインしてから画面転送を開始してSmartLoggerの管理画面にアクセスできました。