import base64 import hmac import random import struct import time from hashlib import sha256 from collections import OrderedDict VERSION = "001" VERSION_LENGTH = 3 APP_ID_LENGTH = 24 PrivPublishStream = 0 # not exported, do not use directly privPublishAudioStream = 1 privPublishVideoStream = 2 privPublishDataStream = 3 PrivSubscribeStream = 4 class VolcEngineAccessToken: # Initializes token struct by required parameters. def __init__(self, app_id, app_key, room_id, user_id): random.seed(time.time()) self.app_id = app_id self.app_key = app_key self.room_id = room_id self.user_id = user_id self.issued_at = int(time.time()) self.nonce = random.randint(1, 99999999) self.expire_at = 0 self.privileges = {} # AddPrivilege adds permission for token with an expiration. def add_privilege(self, privilege, expire_ts): if self.privileges is None: self.privileges = {} self.privileges[privilege] = expire_ts if privilege == PrivPublishStream: self.privileges[privPublishVideoStream] = expire_ts self.privileges[privPublishAudioStream] = expire_ts self.privileges[privPublishDataStream] = expire_ts # ExpireTime sets token expire time, won't expire by default. # The token will be invalid after expireTime no matter what privilege's expireTime is. def expire_time(self, expire_ts): self.expire_at = expire_ts def pack_msg(self): m = pack_uint32(self.nonce) m += pack_uint32(self.issued_at) m += pack_uint32(self.expire_at) m += pack_string(self.room_id) m += pack_string(self.user_id) m += pack_map_uint32(self.privileges) return m # Serialize generates the token string def serialize(self): m = self.pack_msg() signature = hmac.new(self.app_key.encode('utf-8'), m, sha256).digest() content = pack_bytes(m) + pack_bytes(signature) return VERSION + self.app_id + base64.b64encode(content).decode('utf-8') # Verify checks if this token valid, called by server side. def verify(self, key): if 0 < self.expire_at < int(time.time()): return False self.app_key = key return hmac.new(self.app_key.encode('utf-8'), self.pack_msg(), sha256).digest() == self.signature # Parse retrieves token information from raw string def parse(raw): try: if len(raw) <= VERSION_LENGTH: return if raw[:VERSION_LENGTH] != VERSION: return token = VolcEngineAccessToken("", "", "", "") token.app_id = raw[VERSION_LENGTH:VERSION_LENGTH + APP_ID_LENGTH] content_buf = base64.b64decode(raw[VERSION_LENGTH + APP_ID_LENGTH:]) readbuf = ReadByteBuffer(content_buf) msg = readbuf.unpack_bytes() token.signature = readbuf.unpack_bytes() msgbuf = ReadByteBuffer(msg) token.nonce = msgbuf.unpack_uint32() token.issued_at = msgbuf.unpack_uint32() token.expire_at = msgbuf.unpack_uint32() token.room_id = msgbuf.unpack_string() token.user_id = msgbuf.unpack_string() token.privileges = msgbuf.unpack_map_uint32() return token except Exception as e: print("parse error:", str(e)) return def pack_uint16(x): return struct.pack('