123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- from bs4 import BeautifulSoup
- import requests
- import operator
- import base64
- import time
- from Crypto.Cipher import AES
- import oss2
- faas_function_name__support = 'support'
- faas_function_name__handle = 'handle'
- const_key_function_id = "faas__function-id"
- const_key_request_wrap = "faas__request_wrap"
- const_key_params = "faas__req.params"
- const_key_context = "faas__req.context"
- const_key_result = "faas__invoke_result"
- const_key_result_success = "faas__success"
- const_key_reload = "faas__reload"
- const_key_trace_id = "faas__trace_id"
- endpoint = 'http://oss-cn-hongkong.aliyuncs.com' # Suppose that your bucket is in the Hangzhou region.
- auth = oss2.Auth('LTAI5t5ZyATP7DL4nMdK51eL', 'WnJqm3jpuP0JUMo3to8wzcVHgIrBAB')
- bucket = oss2.Bucket(auth, endpoint, 'oss-aliyun-hk')
- # 判断流程能否处理
- def support(event):
- return True
- class Aes_byte:
- iv: bytes = []
- key: bytes = []
- BLOCK_SIZE: int = 16 # 设定字节长度
- def __init__(self, key: bytes, iv: bytes, BLOCK_SIZE: int = 16):
- self.iv = iv
- self.key = key
- self.BLOCK_SIZE = BLOCK_SIZE
- pass
- def __str__(self):
- print("AES加密和解密---字节类型")
- pass
- # 补足字节方法
- def pad(self, value) -> bytes:
- count = len(value)
- if (count % self.BLOCK_SIZE != 0):
- add = self.BLOCK_SIZE - (count % self.BLOCK_SIZE)
- else:
- add = 0
- text = value + ("\0".encode() * add) # 这里的"\0"必须编码成bytes,不然无法和text拼接
- return text
- # 将明文用AES加密
- def AES_encrypt(self, data: bytes) -> bytes:
- # 将长度不足16字节的字符串补齐
- buffter = self.pad(data) # 注意在这个地方要把传过来的数据编码成bytes,不然还是会报上面说的那个错
- # 创建加密对象
- cryptor = AES.new(self.key, AES.MODE_CBC, self.iv)
- # 完成加密
- return cryptor.encrypt(buffter) # 将明文用AES加密
- # 将明文用AES解密
- def AES_decrypt(self, data: bytes) -> bytes:
- # 将长度不足16字节的字符串补齐
- buffer = self.pad(data) # 注意在这个地方要把传过来的数据编码成bytes,不然还是会报上面说的那个错
- # 创建加密对象
- cryptor = AES.new(self.key, AES.MODE_CBC, self.iv)
- # 完成加密
- return cryptor.decrypt(buffer)
- def handle(faas__request_wrap):
- # 取出参数和输入上下文
- params = faas__request_wrap[const_key_params]
- context = faas__request_wrap[const_key_context]
- # 拿到任务,调用异步执行,后置操作放结果到 mq
- if not params['task']:
- raise RuntimeError('no task define')
- task = params['task']
- headers = task['headers']
- method = task['requestMethod']
- charset = task['charset']
- req_start_time = time.time()
- r = requests.request(method, task['url'], headers=headers)
- req_cost = time.time() - req_start_time
- process_start_time = time.time()
- content = None
- next_command = None
- if operator.contains(task['resultType'], '[B'):
- video_bytes = r.content
- if task['tsPart']['key']['method'] != 'NONE':
- key_bytes = base64.b64decode(task['tsPart']['key']['key'])
- iv_bytes = base64.b64decode(task['tsPart']['key']['iv'])
- video_bytes = Aes_byte(key_bytes, iv_bytes).AES_decrypt(video_bytes)
- remote_path = 'spider/' + task['progressId'] + '/0000.ts'
- bucket.put_object(remote_path, video_bytes)
- content = remote_path
- # file_path = task['tsPart']['filePath']
- # if not os.path.exists(os.path.dirname(file_path)):
- # os.makedirs(os.path.dirname(file_path))
- # with open(file_path, 'wb') as f:
- # f.write(video_bytes)
- # content = 'file_path'
- next_command = 'upload_to_oss'
- next_command = 'upload_to_disk'
- next_command = 'upload_to_ssh'
- next_command = 'upload_to_webdav'
- # todo 路径放上下文
- else:
- # encoding是从http中的header中的charset字段中提取的编码方式,若header中没有charset字段则默认为ISO - 8859 - 1 编码模式,则无法解析中文,这是乱码的原因
- # apparent_encoding会从网页的内容中分析网页编码的方式,所以apparent_encoding比encoding更加准确。当网页出现乱码时可以把apparent_encoding的编码格式赋值给encoding。
- # r.encoding = r.apparent_encoding
- r.encoding = charset
- soup = BeautifulSoup(r.text, 'html.parser')
- if not soup.title:
- title = soup.title.text
- content = str(soup)
- # 更新爬虫任务队列状态
- # app.config['crawler_queued'] -= 1
- # app.config['crawler_completed'] += 1
- # --------------------------------------------------------------------
- #
- # --------------------------------------------------------------------
- process_cost = time.time() - req_start_time
- print('spider task: ' + task['requestMethod'] + ' ' + task['url']
- + ' req_cost=%.2f' % req_cost + ' process_cost=%.2f' % process_cost)
- result = {
- 'process_result': {
- 'spider': context
- },
- 'content': content,
- 'next-command': next_command
- }
- return content
|