标签 API 下的文章

logo-teal.png

在Python中创建一个简单的API,您可以使用一个轻量级的Web框架,比如Flask。以下是一个示例,展示如何创建一个基本的API,返回一个JSON响应:

Flask构建一个API例子

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/api/hello', methods=['GET'])
def hello():
    data = {
        'message': '你好,世界!'
    }
    return jsonify(data)

if __name__ == '__main__':
    app.run()

在这个例子中,我们使用Flask(__name__)定义一个Flask应用程序。然后,我们使用@app.route装饰器定义了一个路由/api/hello,指定HTTP方法为GET。当通过GET请求访问/api/hello端点时,将执行hello函数。

在hello函数内部,我们创建一个包含简单消息的字典data。然后,我们使用jsonify函数将字典转换为JSON响应。

最后,我们使用app.run()运行Flask应用程序。

为了测试API,您可以运行Python脚本,它将启动一个本地开发服务器。您可以在http://localhost:5000/api/hello上访问API。响应将是一个包含消息"你好,世界!"的JSON对象。

这是一个基本的示例,帮助您开始使用Flask创建API。您可以添加更多的路由、处理不同的HTTP方法,并根据具体需求执行各种操作。

使用FastAPI构建一个API,随机生成一个偶数

FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 3.6+ 并基于标准的 Python 类型提示。

from fastapi import FastAPI
from random import randint

app = FastAPI()

@app.get("/api/random_even")
async def generate_random_even():
    number = randint(0, 100)
    if number % 2 != 0:
        number += 1
    return {"random_even": number}

在这个例子中,我们从fastapi模块导入FastAPI,从random模块导入randint。我们使用FastAPI()创建一个FastAPI应用程序。

通过使用@app.get装饰器,我们定义了一个路由/api/random_even,处理该端点的GET请求。在generate_random_even函数内部,我们使用randint(0, 100)生成一个0到100之间的随机数。如果生成的数字不是偶数(即奇数),我们将其加1,使其变为偶数。

最后,我们返回一个包含生成的随机偶数的JSON响应。

要测试这个API,您可以运行Python脚本,FastAPI应用程序将启动一个本地开发服务器。您可以在http://localhost:8000/api/random_even上访问API。每次访问这个端点时,它都会生成一个新的随机偶数。

注意:在运行脚本之前,确保已通过pip install fastapi uvicorn安装了FastAPI及其所需的依赖项。然后,您可以使用uvicorn main:app --reload来启动应用程序,假设脚本的名称为main.py。

pip install fastapi uvicorn
uvicorn main:app --reload

浏览器访问该API

http://localhost:8000/api/random_even

指定ip和端口运行fastapi应用

uvicorn main:app --host 192.168.1.100 --port 8000
uvicorn main:app --host 0.0.0.0.0 --port 7878 ## 使用不同的端口

FastAPI传入参数

from fastapi import FastAPI

app = FastAPI()

@app.get("/users/{user_id}/items/{item_id}")
def get_user_item(user_id: int, item_id: str):
    return {"user_id": user_id, "item_id": item_id}

url指定参数,传入函数

根据函数的参数自动匹配url里参数?

from fastapi import FastAPI

app = FastAPI()

@app.get("/items")
def get_items(item_id: int, q: str):
    return {"item_id": item_id, "q": q}

如何构建自己的汇率API?

拓展?使用现有的api,作一些封装,再转发除去实现自己的api服务

3e904319-2068-47eb-8e93-3f35219bbce0.jpeg

前面汇率实时转换的小程序,使用了api key,因为是免费的,所以有调用次数的限制
找了一些免费、公开、无需key的API,把玩一下看看
这里列了不少免费的API
没有一一尝试,感兴趣可以自行测试一下,免费的,不能用也很正常:)

对于汇率转换小程序,可以改用上面列表里提到的第30个api
|30 |Crypto & Finance |Exchangerate |Currency exchange & crypto rates| https://api.exchangerate.host/latest|

使用requests库,访问该url,如法炮制:

import requests
import json
url = 'https://api.exchangerate.host/latest'
response = requests.get(url)
json_object = json.loads(response.text)
#print(json_object) 
if json_object["success"] == True:
    print(f"日期:{json_object['date']}")
    print(f"基准货币: {json_object['base']}")
    print(f"美元汇率: {json_object['rates']['USD']}")
    print(f"人民币汇率: {json_object['rates']['CNY']}")

对于一个API访问返回的数据,首先打印出来,然后用json格式化工具去查看,比如
json格式化与验证工具, 或者浏览器直接打开URL,现代浏览器会直接以JSON格式显示返回的数据

在Python里面访问这样的数据结构,使用字典数据访问形式:
json_object["result"] == "success" ## 判断结果是否成功

如下图所示,就可以比较清晰看出数据的结构:
格式化的json数据

print(f"基准货币: {json_object['base']}")
print(f"到美元的汇率: {json_object['rates']['USD']}")
print(f"到人民币汇率: {json_object['rates']['CNY']}")

以后就可以将这个API可以集成到自己的应用中...

一般性API访问小结:

  • 了解API基本调用方式
  • 了解API响应的数据结构
  • 将响应的数据专程json对象
  • 访问json对象特定字段,获取需要的数据
  • 根据实际问题实现自己程序逻辑,比如汇率转换的简单计算

练习

给定一个汇率数据API
编写一个完整的应用程序,输入一定数目的美元, 输出所有支持货币的转换金额,以及与美元的汇率
比如,程序实际运行可能如下所示:
请输入输入美元的数量:
日期, 你输入美元 $ xxxx
xxxx美元 换 yyyy 人民币; 汇率:7.2
xxxx美元 换 yyyy 日元; 汇率: 125
...
要求,尽可能输出所有支持的货币
提示:需要手工编写一个货币代码到中文文字描述的映射表,比如CNY表示人民币, USD表示美元等等(可以从网上获取相应的数据)

参考实现

获取货币代码与名称、国家地区的描述
import csv
import requests
from bs4 import BeautifulSoup

# 发起网页请求
url = "https://www.iban.hk/currency-codes"
response = requests.get(url)

# 解析HTML内容
soup = BeautifulSoup(response.text, 'html.parser')

# 找到<tbody>标签
tbody = soup.find('tbody')

# 提取表格数据
data = []
rows = tbody.find_all('tr')
for row in rows:
    cells = row.find_all('td')
    row_data = [cell.get_text(strip=True) for cell in cells]
    data.append(row_data)

# 将数据写入CSV文件
with open('currency_table_data.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerows(data)

将汇率web API封装成一个独立的模块

currency_exchange.py

import requests
import json
import datetime
import csv




class CurrencyExchange:
    def __init__(self, api_url="https://open.er-api.com/v6/latest/USD"):
        self.url = api_url
        self.base = 'USD'
        self.timestamp = None
        self.from_currency = None
        self.to_currency = None
        self.rates = None
        self.currency_code_mapping = None

    def get_currency_code_mapping(self, csv_file="currency_table_data.csv"):
        '''
        获取货币编码与名称的映射,预存在csv文件里
        '''
        self.currency_code_mapping = []
        with open(csv_file, 'r') as csvfile:
            csvreader = csv.reader(csvfile)
            for row in csvreader:
                self.currency_code_mapping.append(tuple(row))
        return self.currency_code_mapping
    
    def get_allrates(self):
        '''
        获取所有支持货币的汇率,相对于base货币,比如USD
        '''
        response = requests.get(self.url)
        json_object = json.loads(response.text)
        if json_object["result"] == "success":
            timestamp = int(json_object['time_last_update_unix'])
            self.timestamp = datetime.datetime.fromtimestamp(timestamp)
            self.rates = json_object["rates"]
            return self.timestamp, self.rates
        else:
            return 0, 0
    
    def get_rate(self, to_currency):
        response = requests.get(self.url)
        json_object = json.loads(response.text)
        if json_object["result"] == "success":
            if to_currency.upper() in json_object["rates"]:
                from_rate = float(json_object['rates'][self.base])
                to_rate = float(json_object['rates'][to_currency])
                from_to_rate = to_rate / from_rate
                timestamp = int(json_object['time_last_update_unix'])
                updated_time = datetime.datetime.fromtimestamp(timestamp)
                return updated_time, from_to_rate
            else:
                return 0,0
        else:
            return 0, 0
    
    def calc(self, usd_amount, to_currency):
        timestamp, rate = self.get_rate(to_currency)

        return timestamp, float(usd_amount) * float(rate)

使用自定义的类

# 引入一个自定义的模块,货币转换的一个类
from currency_exchange import CurrencyExchange

my_currency_exchange = CurrencyExchange()

# 默认该货币转换的类基础货币是美元,转换为浮点数,需要增加try except 异常处理
usd_amount = float(input("输入待转换的美元金额: "))
# 将用户的输入统一转换为大写,这样输入jpy,JPY,jPy,jPY,程序一样可以运行
to_currency = input("输入目标货币的代号: 比如 CNY, JPY, 等等...").upper()
# 查询一下货币代码与名称映射
list_of_currency_code_mapping = my_currency_exchange.get_currency_code_mapping()
to_currency_area = None
to_currency_name = None
to_currency_id = None

# 根据用户输入的货币代码,查表获取对应的国家名称,以及货币名称
for item in list_of_currency_code_mapping:
    if item[2] == to_currency:
        to_currency_area = item[0]
        to_currency_name = item[1]
        to_currency_id = item[3]
        # 给用户一个反馈
        print(f'你输入美元 ${usd_amount}, 目标货币: {to_currency_area} {to_currency_name}')
        break

# 调用自定义类货币转换的 calc方法: 负责将美元转成目标货币的金额
updated_timestamp, total = my_currency_exchange.calc(usd_amount, to_currency)

# 返回出错处理
if updated_timestamp == 0 and total == 0:
    print("查询失败or不支持该货币汇率")
else:
    if to_currency_name is not None:
        print(f"[更新日期:{updated_timestamp}] 美元 ${usd_amount} 可换 {total:.2f} {to_currency_area} {to_currency_name}")
    else:
        print(f"[更新日期:{updated_timestamp}] 美元 ${usd_amount} 可换 {total:.2f} {to_currency}")




其他好玩的API

Next:

下次我们自己写个简单的API看看?

image_XZsE558j_1688837139931_raw.jpg

Python requests 是一个常用的 HTTP 请求库,可以方便地向网站发送 HTTP 请求,并获取响应结果。requests 模块比 urllib 模块更简洁。

使用 requests 发送 HTTP 请求需要先导入 requests 模块:

import requests

导入后就可以发送 HTTP 请求,使用 requests 提供的方法向指定 URL 发送 HTTP 请求,例如:

# 导入 requests 包
import requests

# 发送请求
x = requests.get('https://www.dazuicheng.com/')

# 返回网页内容
print(x.text)

每次调用 requests 请求之后,会返回一个 response 对象,该对象包含了具体的响应信息,如状态码、响应头、响应内容等:

print(response.status_code)  # 获取响应状态码
print(response.headers)  # 获取响应头
print(response.content)  # 获取响应内容

更多响应信息如下:

属性或方法说明
apparent_encoding编码方式
close()关闭与服务器的连接
content返回响应的内容,以字节为单位
cookies返回一个 CookieJar 对象,包含了从服务器发回的 cookie
elapsed返回一个 timedelta 对象,包含了从发送请求到响应到达之间经过的时间量,可以用于测试响应速度。比如 r.elapsed.microseconds 表示响应到达需要多少微秒。
encoding解码 r.text 的编码方式
headers返回响应头,字典格式
history返回包含请求历史的响应对象列表(url)
is_permanent_redirect如果响应是永久重定向的 url,则返回 True,否则返回 False
is_redirect如果响应被重定向,则返回 True,否则返回 False
iter_content()迭代响应
iter_lines()迭代响应的行
json()返回结果的 JSON 对象 (结果需要以 JSON 格式编写的,否则会引发错误)
links返回响应的解析头链接
next返回重定向链中下一个请求的 PreparedRequest 对象
ok检查 "status_code" 的值,如果小于400,则返回 True,如果不小于 400,则返回 False
raise_for_status()如果发生错误,方法返回一个 HTTPError 对象
reason响应状态的描述,比如 "Not Found" 或 "OK"
request返回请求此响应的请求对象
status_code返回 http 的状态码,比如 404 和 200(200 是 OK,404 是 Not Found)
text返回响应的内容,unicode 类型数据
url返回响应的 URL

实例

# 导入 requests 包
import requests
import json

    
# 发送请求
x = requests.get('https://api.ipify.org?format=json')
    
# 返回 http 的状态码
print(x.status_code)
    
# 响应状态的描述
print(x.reason)
    
# 返回编码
print(x.apparent_encoding)

json_object = json.loads(x.text)
print(f"public IP: {json_object['ip']}")

实时汇率转换小程序

import requests
import time
import json
import datetime
import sys


supported_currency_name = {
    0: '欧元',
    1: '美元',
    2: '日元',
    3: '英镑',
    4: '人民币'
}

supported_currency_code = {
    0: 'EUR',
    1: 'USD',
    2: 'JPY',
    3: 'GBP',
    4: 'CNY'
}

def get_rate(from_currency, to_currency):
    url = 'https://v6.exchangerate-api.com/v6/b3c4901ef11a8bcb17276efc/latest/EUR'
    response = requests.get(url)
    json_object = json.loads(response.text)
    if json_object["result"] == "success":
        from_rate = float(json_object['conversion_rates'][from_currency])
        to_rate = float(json_object['conversion_rates'][to_currency])
        from_to_rate = to_rate / from_rate
        timestamp = int(json_object['time_last_update_unix'])
        query_time = datetime.datetime.fromtimestamp(timestamp)
        return query_time, from_to_rate
    else:
        return 0, 0

print("$$$$$$$$欢迎使用汇率转换程序¥¥¥¥¥¥¥¥¥¥")
print(f"支持的货币代码有:{supported_currency_code}")
from_currency_choice = int(input('输入原始货币编号数字:'))
from_currency_code = supported_currency_code[from_currency_choice]
from_currency_name = supported_currency_name[from_currency_choice]
print(f"你输入的初始货币:{from_currency_name}")
print(f"支持的货币代码有:{supported_currency_code}")
to_currency_choice = int(input('输入目标货币编号数字:'))
to_currency_code   = supported_currency_code[to_currency_choice]
to_currency_name = supported_currency_name[to_currency_choice]
print(f"你输入的目标货币:{to_currency_name}")
source_amount = int(input('输入待转换的货币金额: '))

query_time, rate = get_rate(from_currency_code, to_currency_code)
if query_time == 0 and rate == 0:
    print("获取汇率失败...")
    sys.exit()

target_amount = source_amount * rate

print(f"[{query_time}]: {source_amount} {from_currency_name} 可以换 {target_amount:.2f} {to_currency_name}")