分类 Python 下的文章

截图 2023-07-19 20-16-47.png

复习一下Python面向对象编程,构建一个简单的ATM类,实现ATM自助取款机的模拟
这是一组渐进式的练习,从最简单的ATM类,到持久性数据保存,构成一系列适合初学者的练习
ATM - Automatic Trasaction Machine
简化模拟,只需要实现下面的一些功能:
1. 登录功能(这里简化,不需要实现)
2. 创建账户
3. 存钱
4. 取钱
5. 退出

第一个版本atm_v1.py

构建基本的ATM类,包含几个方法,初始化一个ATM实例,调用其方法

最主要的功能有存储账户信息、存钱、取钱; 账户信息使用Python 字典数据类型
class ATM:
    def __init__(self):
        self.accounts = {}  # 用于存储账户信息的字典

    def create_account(self, account_number, initial_balance):
        if account_number not in self.accounts:
            self.accounts[account_number] = initial_balance
            print(f"账户 {account_number} 创建成功,初始余额为 {initial_balance} 元。")
        else:
            print("该账户已存在。")

    def check_balance(self, account_number):
        if account_number in self.accounts:
            return self.accounts[account_number]
        else:
            print("该账户不存在。")
            return None

    def deposit(self, account_number, amount):
        if account_number in self.accounts:
            self.accounts[account_number] += amount
            print(f"向账户 {account_number} 存入 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
        else:
            print("该账户不存在。")

    def withdraw(self, account_number, amount):
        if account_number in self.accounts:
            if self.accounts[account_number] >= amount:
                self.accounts[account_number] -= amount
                print(f"从账户 {account_number} 取出 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
            else:
                print("余额不足,无法完成取款。")
        else:
            print("该账户不存在。")

# 测试ATM类
atm = ATM()
atm.create_account("123456789", 1000)
atm.deposit("123456789", 500)
atm.withdraw("123456789", 200)
print(atm.check_balance("123456789"))

第二个版本atm_v2.py

增加菜单,初始化ATM实例,一直运行,直到用户退出菜单,这个比较符合正常使用场景
增加几个方法:一个run方法让ATM一直执行,show_menu菜单

class ATM:
    def __init__(self):
        self.accounts = {}

    def create_account(self, account_number, initial_balance):
        if account_number not in self.accounts:
            self.accounts[account_number] = initial_balance
            print(f"账户 {account_number} 创建成功,初始余额为 {initial_balance} 元。")
        else:
            print("该账户已存在。")

    def check_balance(self, account_number):
        if account_number in self.accounts:
            return self.accounts[account_number]
        else:
            print("该账户不存在。")
            return None

    def deposit(self, account_number, amount):
        if account_number in self.accounts:
            self.accounts[account_number] += amount
            print(f"向账户 {account_number} 存入 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
        else:
            print("该账户不存在。")

    def withdraw(self, account_number, amount):
        if account_number in self.accounts:
            if self.accounts[account_number] >= amount:
                self.accounts[account_number] -= amount
                print(f"从账户 {account_number} 取出 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
            else:
                print("余额不足,无法完成取款。")
        else:
            print("该账户不存在.")

    def show_menu(self):
        print("===== ATM菜单 =====")
        print("1. 创建账户")
        print("2. 查询余额")
        print("3. 存款")
        print("4. 取款")
        print("0. 退出")

    def run(self):
        while True:
            self.show_menu()
            choice = input("请输入选项数字:")

            if choice == "1":
                account_number = input("请输入账户号码:")
                initial_balance = float(input("请输入初始余额:"))
                self.create_account(account_number, initial_balance)

            elif choice == "2":
                account_number = input("请输入账户号码:")
                balance = self.check_balance(account_number)
                if balance is not None:
                    print(f"账户 {account_number} 的余额为 {balance} 元。")

            elif choice == "3":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入存款金额:"))
                self.deposit(account_number, amount)

            elif choice == "4":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入取款金额:"))
                self.withdraw(account_number, amount)

            elif choice == "0":
                print("感谢使用ATM,再见!")
                break

            else:
                print("无效选项,请重新输入。")

# 实例化ATM对象并运行
atm = ATM()
atm.run()

第三个版本 atm_v3.py

增加了历史记录存储

增加一个list变量 self.history,用来存储操作记录
class ATM:
    def __init__(self):
        self.accounts = {}
        self.history = []

    def create_account(self, account_number, initial_balance):
        if account_number not in self.accounts:
            self.accounts[account_number] = initial_balance
            self.history.append(f"创建账户 {account_number},初始余额为 {initial_balance} 元。")
            print(f"账户 {account_number} 创建成功,初始余额为 {initial_balance} 元。")
        else:
            print("该账户已存在。")

    def check_balance(self, account_number):
        if account_number in self.accounts:
            return self.accounts[account_number]
        else:
            print("该账户不存在。")
            return None

    def deposit(self, account_number, amount):
        if account_number in self.accounts:
            self.accounts[account_number] += amount
            self.history.append(f"向账户 {account_number} 存入 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
            print(f"向账户 {account_number} 存入 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
        else:
            print("该账户不存在。")

    def withdraw(self, account_number, amount):
        if account_number in self.accounts:
            if self.accounts[account_number] >= amount:
                self.accounts[account_number] -= amount
                self.history.append(f"从账户 {account_number} 取出 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
                print(f"从账户 {account_number} 取出 {amount} 元。当前余额为 {self.accounts[account_number]} 元。")
            else:
                print("余额不足,无法完成取款。")
        else:
            print("该账户不存在.")
    def show_history(self):
        print("===== 操作历史记录 =====")
        for entry in self.history:
            print(entry)
        print("======================")
    
    def show_menu(self):
        print("===== ATM菜单 =====")
        print("1. 创建账户")
        print("2. 查询余额")
        print("3. 存款")
        print("4. 取款")
        print("5. 显示操作历史记录")
        print("0. 退出")

    def run(self):
        while True:
            self.show_menu()
            choice = input("请输入选项数字:")

            if choice == "1":
                account_number = input("请输入账户号码:")
                initial_balance = float(input("请输入初始余额:"))
                self.create_account(account_number, initial_balance)

            elif choice == "2":
                account_number = input("请输入账户号码:")
                balance = self.check_balance(account_number)
                if balance is not None:
                    print(f"账户 {account_number} 的余额为 {balance} 元。")

            elif choice == "3":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入存款金额:"))
                self.deposit(account_number, amount)

            elif choice == "4":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入取款金额:"))
                self.withdraw(account_number, amount)
            
            elif choice == "5":
                self.show_history()

            elif choice == "0":
                print("感谢使用ATM,再见!")
                break

            else:
                print("无效选项,请重新输入。")

# 实例化ATM对象并运行
atm = ATM()
atm.run()

第四个版本atm_v4.py

每个用户的历史记录单独存储
每个用户保存自己的历史记录,可以学习如何使用嵌套的数据类型,字典里面嵌套字典
将history变量放入账户变量self.accounts里面,这样每个账户单独保存各自的操作记录

class ATM:
    def __init__(self):
        self.accounts = {}

    def create_account(self, account_number, initial_balance):
        if account_number not in self.accounts:
            self.accounts[account_number] = {
                "balance": initial_balance,
                "history": []
            }
            self.accounts[account_number]["history"].append(f"创建账户,初始余额为 {initial_balance} 元。")
            print(f"账户 {account_number} 创建成功,初始余额为 {initial_balance} 元。")
        else:
            print("该账户已存在。")

    def check_balance(self, account_number):
        if account_number in self.accounts:
            return self.accounts[account_number]["balance"]
        else:
            print("该账户不存在。")
            return None

    def deposit(self, account_number, amount):
        if account_number in self.accounts:
            self.accounts[account_number]["balance"] += amount
            self.accounts[account_number]["history"].append(f"存入 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
            print(f"向账户 {account_number} 存入 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
        else:
            print("该账户不存在。")

    def withdraw(self, account_number, amount):
        if account_number in self.accounts:
            if self.accounts[account_number]["balance"] >= amount:
                self.accounts[account_number]["balance"] -= amount
                self.accounts[account_number]["history"].append(f"取出 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
                print(f"从账户 {account_number} 取出 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
            else:
                print("余额不足,无法完成取款。")
        else:
            print("该账户不存在.")

    def show_history(self, account_number):
        if account_number in self.accounts:
            print(f"===== 账户 {account_number} 操作历史记录 =====")
            for entry in self.accounts[account_number]["history"]:
                print(entry)
            print("==============================")
        else:
            print("该账户不存在。")

    def show_menu(self):
        print("[===== ATM菜单 =====]")
        print("1. 创建账户")
        print("2. 查询余额")
        print("3. 存款")
        print("4. 取款")
        print("5. 显示操作历史记录")
        print("0. 退出")

    def run(self):
        while True:
            self.show_menu()
            choice = input("请输入选项数字:")

            if choice == "1":
                account_number = input("请输入账户号码:")
                initial_balance = float(input("请输入初始余额:"))
                self.create_account(account_number, initial_balance)

            elif choice == "2":
                account_number = input("请输入账户号码:")
                balance = self.check_balance(account_number)
                if balance is not None:
                    print(f"账户 {account_number} 的余额为 {balance} 元。")

            elif choice == "3":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入存款金额:"))
                self.deposit(account_number, amount)

            elif choice == "4":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入取款金额:"))
                self.withdraw(account_number, amount)

            elif choice == "5":
                account_number = input("请输入账户号码:")
                self.show_history(account_number)

            elif choice == "0":
                print("感谢使用ATM,再见!")
                break

            else:
                print("无效选项,请重新输入。")

# 实例化ATM对象并运行
atm = ATM()
atm.run()

继续进化...

第五个版本atm_v5.py

使用pickle持久存储数据,将账户信息保存到本地文件里面
pickle的中文资料,百度搜索
import pickle

class ATM:
    def __init__(self):
        self.accounts = {}
        self.load_data()  # 在初始化时加载之前保存的数据

    def create_account(self, account_number, initial_balance):
        if account_number not in self.accounts:
            self.accounts[account_number] = {
                "balance": initial_balance,
                "history": []
            }
            self.accounts[account_number]["history"].append(f"创建账户,初始余额为 {initial_balance} 元。")
            print(f"账户 {account_number} 创建成功,初始余额为 {initial_balance} 元。")
        else:
            print("该账户已存在。")

    def check_balance(self, account_number):
        if account_number in self.accounts:
            return self.accounts[account_number]["balance"]
        else:
            print("该账户不存在。")
            return None

    def deposit(self, account_number, amount):
        if account_number in self.accounts:
            self.accounts[account_number]["balance"] += amount
            self.accounts[account_number]["history"].append(f"存入 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
            print(f"向账户 {account_number} 存入 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
        else:
            print("该账户不存在。")

    def withdraw(self, account_number, amount):
        if account_number in self.accounts:
            if self.accounts[account_number]["balance"] >= amount:
                self.accounts[account_number]["balance"] -= amount
                self.accounts[account_number]["history"].append(f"取出 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
                print(f"从账户 {account_number} 取出 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
            else:
                print("余额不足,无法完成取款。")
        else:
            print("该账户不存在.")

    def show_history(self, account_number):
        if account_number in self.accounts:
            print(f"===== 账户 {account_number} 操作历史记录 =====")
            for entry in self.accounts[account_number]["history"]:
                print(entry)
            print("==============================")
        else:
            print("该账户不存在。")

    def show_menu(self):
        print("[===== ATM菜单 =====]")
        print("1. 创建账户")
        print("2. 查询余额")
        print("3. 存款")
        print("4. 取款")
        print("5. 显示操作历史记录")
        print("0. 退出")

    def load_data(self):
        try:
            with open("accounts_data.pickle", "rb") as file:
                self.accounts = pickle.load(file)
        except FileNotFoundError:
            # 如果文件不存在,可以忽略此错误
            pass

    def save_data(self):
        with open("accounts_data.pickle", "wb") as file:
            pickle.dump(self.accounts, file)

    def run(self):
        while True:
            self.show_menu()
            choice = input("请输入选项数字:")

            if choice == "1":
                account_number = input("请输入账户号码:")
                initial_balance = float(input("请输入初始余额:"))
                self.create_account(account_number, initial_balance)

            elif choice == "2":
                account_number = input("请输入账户号码:")
                balance = self.check_balance(account_number)
                if balance is not None:
                    print(f"账户 {account_number} 的余额为 {balance} 元。")

            elif choice == "3":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入存款金额:"))
                self.deposit(account_number, amount)

            elif choice == "4":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入取款金额:"))
                self.withdraw(account_number, amount)

            elif choice == "5":
                account_number = input("请输入账户号码:")
                self.show_history(account_number)

            elif choice == "0":
                self.save_data()  # 在退出前保存数据
                print("感谢使用ATM,再见!")
                break

            else:
                print("无效选项,请重新输入。")

# 实例化ATM对象并运行
atm = ATM()
atm.run()

第六个版本atm_v4.py

使用加密库,对账户信息文件加密
使用cryptography库
如果没有安装,使用下面命令安装一下:pip install cryptography

from cryptography.fernet import Fernet
import pickle
import cryptography
import os
# 生成密钥并保存到文件中
def generate_key():
    # 密钥文件路径
    key_file_path = "secret.key"

    # 检查文件是否存在,存在就不重新生成key
    if os.path.exists(key_file_path) and os.path.isfile(key_file_path):
        return
    key = Fernet.generate_key()
    with open("secret.key", "wb") as key_file:
        key_file.write(key)

# 加载密钥
def load_key():
    return open("secret.key", "rb").read()

# 加密数据
def encrypt_data(data, key):
    f = Fernet(key)
    return f.encrypt(data)

# 解密数据
def decrypt_data(encrypted_data, key):
    f = Fernet(key)
    return f.decrypt(encrypted_data)

class ATM:
    def __init__(self):
        self.accounts = {}
        self.load_data()  # 在初始化时加载之前保存的数据

    def create_account(self, account_number, initial_balance):
        if account_number not in self.accounts:
            self.accounts[account_number] = {
                "balance": initial_balance,
                "history": []
            }
            self.accounts[account_number]["history"].append(f"创建账户,初始余额为 {initial_balance} 元。")
            print(f"账户 {account_number} 创建成功,初始余额为 {initial_balance} 元。")
        else:
            print("该账户已存在。")

    def check_balance(self, account_number):
        if account_number in self.accounts:
            return self.accounts[account_number]["balance"]
        else:
            print("该账户不存在。")
            return None

    def deposit(self, account_number, amount):
        if account_number in self.accounts:
            self.accounts[account_number]["balance"] += amount
            self.accounts[account_number]["history"].append(f"存入 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
            print(f"向账户 {account_number} 存入 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
        else:
            print("该账户不存在。")

    def withdraw(self, account_number, amount):
        if account_number in self.accounts:
            if self.accounts[account_number]["balance"] >= amount:
                self.accounts[account_number]["balance"] -= amount
                self.accounts[account_number]["history"].append(f"取出 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
                print(f"从账户 {account_number} 取出 {amount} 元。当前余额为 {self.accounts[account_number]['balance']} 元。")
            else:
                print("余额不足,无法完成取款。")
        else:
            print("该账户不存在.")

    def show_history(self, account_number):
        if account_number in self.accounts:
            print(f"===== 账户 {account_number} 操作历史记录 =====")
            for entry in self.accounts[account_number]["history"]:
                print(entry)
            print("==============================")
        else:
            print("该账户不存在。")

    def show_menu(self):
        print("[===== ATM菜单 =====]")
        print("1. 创建账户")
        print("2. 查询余额")
        print("3. 存款")
        print("4. 取款")
        print("5. 显示操作历史记录")
        print("0. 退出")

    def load_data(self):
        try:
            key = load_key()
            with open("accounts_data.pickle", "rb") as file:
                encrypted_data = file.read()
                decrypted_data = decrypt_data(encrypted_data, key)
                self.accounts = pickle.loads(decrypted_data)
        except (FileNotFoundError, pickle.UnpicklingError, cryptography.fernet.InvalidToken):
            # 如果文件不存在或解密失败,可以忽略此错误
            print("fail to load data")

    def save_data(self):
        key = load_key()
        data_to_save = pickle.dumps(self.accounts)
        encrypted_data = encrypt_data(data_to_save, key)
        with open("accounts_data.pickle", "wb") as file:
            file.write(encrypted_data)

    def run(self):
        while True:
            self.show_menu()
            choice = input("请输入选项数字:")

            if choice == "1":
                account_number = input("请输入账户号码:")
                initial_balance = float(input("请输入初始余额:"))
                self.create_account(account_number, initial_balance)

            elif choice == "2":
                account_number = input("请输入账户号码:")
                balance = self.check_balance(account_number)
                if balance is not None:
                    print(f"账户 {account_number} 的余额为 {balance} 元。")

            elif choice == "3":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入存款金额:"))
                self.deposit(account_number, amount)

            elif choice == "4":
                account_number = input("请输入账户号码:")
                amount = float(input("请输入取款金额:"))
                self.withdraw(account_number, amount)

            elif choice == "5":
                account_number = input("请输入账户号码:")
                self.show_history(account_number)

            elif choice == "0":
                self.save_data()  # 在退出前保存数据
                print("感谢使用ATM,再见!")
                break

            else:
                print("无效选项,请重新输入。")

# 生成密钥(仅需执行一次)
generate_key()

# 实例化ATM对象并运行
atm = ATM()
atm.run()

运行画面

[===== ATM菜单 =====]
1. 创建账户
2. 查询余额
3. 存款
4. 取款
5. 显示操作历史记录
0. 退出
请输入选项数字:

截图 2023-07-19 20-17-07.png

使用pygame的gfxdraw模块实现火焰特效
from pygame import gfxdraw

主要使用了pygame里面的gfxdraw模块,对pixel进行操作

import pygame as pg
import sys
from pygame import gfxdraw
from random import randint

WIN_SIZE = WIDTH, HEIGHT = 640, 480
STEPS_BETWEEN_COLORS = 9
COLORS = ['black', 'red', 'orange', 'yellow', 'white']
PIXEL_SIZE = 3

FIRE_REPS = 4
FIRE_WIDTH = WIDTH // (PIXEL_SIZE * FIRE_REPS)
FIRE_HEIGHT = HEIGHT // PIXEL_SIZE


class MyFire:
    def __init__(self, app):
        self.app = app
        self.palette = self.get_palette()
        self.fire_array = self.get_fire_array()
        self.fire_surf = pg.Surface([PIXEL_SIZE * FIRE_WIDTH, HEIGHT])
        self.fire_surf.set_colorkey('black')

        self.logo = pg.image.load('dazuicheng.jpg').convert_alpha()
        self.logo = pg.transform.scale2x(self.logo)
        self.logo_x, self.logo_y = (WIDTH // 2 - self.logo.get_width() // 2,
                                    HEIGHT // 3 - self.logo.get_height() // 2)
        self.logo_start_y = HEIGHT

    def draw_logo(self):
        if self.logo_start_y > self.logo_y:
            self.logo_start_y -= 5
        self.app.screen.blit(self.logo, (self.logo_x, self.logo_start_y))

    def do_fire(self):
        for x in range(FIRE_WIDTH):
            for y in range(1, FIRE_HEIGHT):
                color_index = self.fire_array[y][x]
                if color_index:
                    rnd = randint(0, 3)
                    self.fire_array[y - 1][(x - rnd + 1) % FIRE_WIDTH] = color_index - rnd % 2
                else:
                    self.fire_array[y - 1][x] = 0

    def draw_fire(self):
        self.fire_surf.fill('black')
        for y, row in enumerate(self.fire_array):
            for x, color_index in enumerate(row):
                if color_index:
                    color = self.palette[color_index]
                    gfxdraw.box(self.fire_surf, (x * PIXEL_SIZE, y * PIXEL_SIZE,
                                                  PIXEL_SIZE, PIXEL_SIZE), color)

        for i in range(FIRE_REPS):
            self.app.screen.blit(self.fire_surf, (self.fire_surf.get_width() * i, 0))

    def get_fire_array(self):
        fire_array = [[0 for i in range(FIRE_WIDTH)] for j in range(FIRE_HEIGHT)]
        for i in range(FIRE_WIDTH):
            fire_array[FIRE_HEIGHT - 1][i] = len(self.palette) - 1
        return fire_array

    def draw_palette(self):
        size = 90
        for i, color in enumerate(self.palette):
            pg.draw.rect(self.app.screen, color, (i * size, HEIGHT // 2, size - 5, size - 5))

    @staticmethod
    def get_palette():
        palette = [(0, 0, 0)]
        for i, color in enumerate(COLORS[:-1]):
            c1, c2 = color, COLORS[i + 1]
            for step in range(STEPS_BETWEEN_COLORS):
                c = pg.Color(c1).lerp(c2, (step + 0.5) / STEPS_BETWEEN_COLORS)
                palette.append(c)
        return palette

    def update(self):
        self.do_fire()

    def draw(self):
        self.draw_logo()
        # self.draw_palette()
        self.draw_fire()


class App:
    def __init__(self):
        self.screen = pg.display.set_mode(size=WIN_SIZE)
        self.clock = pg.time.Clock()
        self.dzc_fire = MyFire(self)

    def update(self):
        self.dzc_fire.update()
        self.clock.tick(60)
        pg.display.set_caption(f'{self.clock.get_fps() :.1f}')

    def draw(self):
        self.screen.fill('black')
        self.dzc_fire.draw()
        pg.display.flip()

    def run(self):
        while True:
            for event in pg.event.get():
                if event.type == pg.QUIT:
                    pg.quit()
                    sys.exit()
            self.update()
            self.draw()


if __name__ == '__main__':
    app = App()
    app.run()

效果

截图 2023-07-22 00-26-01.png

logo-teal.png

在Python中创建一个简单的API,您可以使用一个轻量级的Web框架,比如Flask。以下是一个示例,展示如何创建一个基本的API,返回一个JSON响应:

Flask构建一个API例子

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/api/hello', methods=['GET'])
def hello():
    data = {
        'message': '你好,世界!'
    }
    return jsonify(data)

if __name__ == '__main__':
    app.run()

在这个例子中,我们使用Flask(__name__)定义一个Flask应用程序。然后,我们使用@app.route装饰器定义了一个路由/api/hello,指定HTTP方法为GET。当通过GET请求访问/api/hello端点时,将执行hello函数。

在hello函数内部,我们创建一个包含简单消息的字典data。然后,我们使用jsonify函数将字典转换为JSON响应。

最后,我们使用app.run()运行Flask应用程序。

为了测试API,您可以运行Python脚本,它将启动一个本地开发服务器。您可以在http://localhost:5000/api/hello上访问API。响应将是一个包含消息"你好,世界!"的JSON对象。

这是一个基本的示例,帮助您开始使用Flask创建API。您可以添加更多的路由、处理不同的HTTP方法,并根据具体需求执行各种操作。

使用FastAPI构建一个API,随机生成一个偶数

FastAPI 是一个用于构建 API 的现代、快速(高性能)的 web 框架,使用 Python 3.6+ 并基于标准的 Python 类型提示。

from fastapi import FastAPI
from random import randint

app = FastAPI()

@app.get("/api/random_even")
async def generate_random_even():
    number = randint(0, 100)
    if number % 2 != 0:
        number += 1
    return {"random_even": number}

在这个例子中,我们从fastapi模块导入FastAPI,从random模块导入randint。我们使用FastAPI()创建一个FastAPI应用程序。

通过使用@app.get装饰器,我们定义了一个路由/api/random_even,处理该端点的GET请求。在generate_random_even函数内部,我们使用randint(0, 100)生成一个0到100之间的随机数。如果生成的数字不是偶数(即奇数),我们将其加1,使其变为偶数。

最后,我们返回一个包含生成的随机偶数的JSON响应。

要测试这个API,您可以运行Python脚本,FastAPI应用程序将启动一个本地开发服务器。您可以在http://localhost:8000/api/random_even上访问API。每次访问这个端点时,它都会生成一个新的随机偶数。

注意:在运行脚本之前,确保已通过pip install fastapi uvicorn安装了FastAPI及其所需的依赖项。然后,您可以使用uvicorn main:app --reload来启动应用程序,假设脚本的名称为main.py。

pip install fastapi uvicorn
uvicorn main:app --reload

浏览器访问该API

http://localhost:8000/api/random_even

指定ip和端口运行fastapi应用

uvicorn main:app --host 192.168.1.100 --port 8000
uvicorn main:app --host 0.0.0.0.0 --port 7878 ## 使用不同的端口

FastAPI传入参数

from fastapi import FastAPI

app = FastAPI()

@app.get("/users/{user_id}/items/{item_id}")
def get_user_item(user_id: int, item_id: str):
    return {"user_id": user_id, "item_id": item_id}

url指定参数,传入函数

根据函数的参数自动匹配url里参数?

from fastapi import FastAPI

app = FastAPI()

@app.get("/items")
def get_items(item_id: int, q: str):
    return {"item_id": item_id, "q": q}

如何构建自己的汇率API?

拓展?使用现有的api,作一些封装,再转发除去实现自己的api服务

3e904319-2068-47eb-8e93-3f35219bbce0.jpeg

前面汇率实时转换的小程序,使用了api key,因为是免费的,所以有调用次数的限制
找了一些免费、公开、无需key的API,把玩一下看看
这里列了不少免费的API
没有一一尝试,感兴趣可以自行测试一下,免费的,不能用也很正常:)

对于汇率转换小程序,可以改用上面列表里提到的第30个api
|30 |Crypto & Finance |Exchangerate |Currency exchange & crypto rates| https://api.exchangerate.host/latest|

使用requests库,访问该url,如法炮制:

import requests
import json
url = 'https://api.exchangerate.host/latest'
response = requests.get(url)
json_object = json.loads(response.text)
#print(json_object) 
if json_object["success"] == True:
    print(f"日期:{json_object['date']}")
    print(f"基准货币: {json_object['base']}")
    print(f"美元汇率: {json_object['rates']['USD']}")
    print(f"人民币汇率: {json_object['rates']['CNY']}")

对于一个API访问返回的数据,首先打印出来,然后用json格式化工具去查看,比如
json格式化与验证工具, 或者浏览器直接打开URL,现代浏览器会直接以JSON格式显示返回的数据

在Python里面访问这样的数据结构,使用字典数据访问形式:
json_object["result"] == "success" ## 判断结果是否成功

如下图所示,就可以比较清晰看出数据的结构:
格式化的json数据

print(f"基准货币: {json_object['base']}")
print(f"到美元的汇率: {json_object['rates']['USD']}")
print(f"到人民币汇率: {json_object['rates']['CNY']}")

以后就可以将这个API可以集成到自己的应用中...

一般性API访问小结:

  • 了解API基本调用方式
  • 了解API响应的数据结构
  • 将响应的数据专程json对象
  • 访问json对象特定字段,获取需要的数据
  • 根据实际问题实现自己程序逻辑,比如汇率转换的简单计算

练习

给定一个汇率数据API
编写一个完整的应用程序,输入一定数目的美元, 输出所有支持货币的转换金额,以及与美元的汇率
比如,程序实际运行可能如下所示:
请输入输入美元的数量:
日期, 你输入美元 $ xxxx
xxxx美元 换 yyyy 人民币; 汇率:7.2
xxxx美元 换 yyyy 日元; 汇率: 125
...
要求,尽可能输出所有支持的货币
提示:需要手工编写一个货币代码到中文文字描述的映射表,比如CNY表示人民币, USD表示美元等等(可以从网上获取相应的数据)

参考实现

获取货币代码与名称、国家地区的描述
import csv
import requests
from bs4 import BeautifulSoup

# 发起网页请求
url = "https://www.iban.hk/currency-codes"
response = requests.get(url)

# 解析HTML内容
soup = BeautifulSoup(response.text, 'html.parser')

# 找到<tbody>标签
tbody = soup.find('tbody')

# 提取表格数据
data = []
rows = tbody.find_all('tr')
for row in rows:
    cells = row.find_all('td')
    row_data = [cell.get_text(strip=True) for cell in cells]
    data.append(row_data)

# 将数据写入CSV文件
with open('currency_table_data.csv', 'w', newline='') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerows(data)

将汇率web API封装成一个独立的模块

currency_exchange.py

import requests
import json
import datetime
import csv




class CurrencyExchange:
    def __init__(self, api_url="https://open.er-api.com/v6/latest/USD"):
        self.url = api_url
        self.base = 'USD'
        self.timestamp = None
        self.from_currency = None
        self.to_currency = None
        self.rates = None
        self.currency_code_mapping = None

    def get_currency_code_mapping(self, csv_file="currency_table_data.csv"):
        '''
        获取货币编码与名称的映射,预存在csv文件里
        '''
        self.currency_code_mapping = []
        with open(csv_file, 'r') as csvfile:
            csvreader = csv.reader(csvfile)
            for row in csvreader:
                self.currency_code_mapping.append(tuple(row))
        return self.currency_code_mapping
    
    def get_allrates(self):
        '''
        获取所有支持货币的汇率,相对于base货币,比如USD
        '''
        response = requests.get(self.url)
        json_object = json.loads(response.text)
        if json_object["result"] == "success":
            timestamp = int(json_object['time_last_update_unix'])
            self.timestamp = datetime.datetime.fromtimestamp(timestamp)
            self.rates = json_object["rates"]
            return self.timestamp, self.rates
        else:
            return 0, 0
    
    def get_rate(self, to_currency):
        response = requests.get(self.url)
        json_object = json.loads(response.text)
        if json_object["result"] == "success":
            if to_currency.upper() in json_object["rates"]:
                from_rate = float(json_object['rates'][self.base])
                to_rate = float(json_object['rates'][to_currency])
                from_to_rate = to_rate / from_rate
                timestamp = int(json_object['time_last_update_unix'])
                updated_time = datetime.datetime.fromtimestamp(timestamp)
                return updated_time, from_to_rate
            else:
                return 0,0
        else:
            return 0, 0
    
    def calc(self, usd_amount, to_currency):
        timestamp, rate = self.get_rate(to_currency)

        return timestamp, float(usd_amount) * float(rate)

使用自定义的类

# 引入一个自定义的模块,货币转换的一个类
from currency_exchange import CurrencyExchange

my_currency_exchange = CurrencyExchange()

# 默认该货币转换的类基础货币是美元,转换为浮点数,需要增加try except 异常处理
usd_amount = float(input("输入待转换的美元金额: "))
# 将用户的输入统一转换为大写,这样输入jpy,JPY,jPy,jPY,程序一样可以运行
to_currency = input("输入目标货币的代号: 比如 CNY, JPY, 等等...").upper()
# 查询一下货币代码与名称映射
list_of_currency_code_mapping = my_currency_exchange.get_currency_code_mapping()
to_currency_area = None
to_currency_name = None
to_currency_id = None

# 根据用户输入的货币代码,查表获取对应的国家名称,以及货币名称
for item in list_of_currency_code_mapping:
    if item[2] == to_currency:
        to_currency_area = item[0]
        to_currency_name = item[1]
        to_currency_id = item[3]
        # 给用户一个反馈
        print(f'你输入美元 ${usd_amount}, 目标货币: {to_currency_area} {to_currency_name}')
        break

# 调用自定义类货币转换的 calc方法: 负责将美元转成目标货币的金额
updated_timestamp, total = my_currency_exchange.calc(usd_amount, to_currency)

# 返回出错处理
if updated_timestamp == 0 and total == 0:
    print("查询失败or不支持该货币汇率")
else:
    if to_currency_name is not None:
        print(f"[更新日期:{updated_timestamp}] 美元 ${usd_amount} 可换 {total:.2f} {to_currency_area} {to_currency_name}")
    else:
        print(f"[更新日期:{updated_timestamp}] 美元 ${usd_amount} 可换 {total:.2f} {to_currency}")




其他好玩的API

Next:

下次我们自己写个简单的API看看?

image_XZsE558j_1688837139931_raw.jpg

Python requests 是一个常用的 HTTP 请求库,可以方便地向网站发送 HTTP 请求,并获取响应结果。requests 模块比 urllib 模块更简洁。

使用 requests 发送 HTTP 请求需要先导入 requests 模块:

import requests

导入后就可以发送 HTTP 请求,使用 requests 提供的方法向指定 URL 发送 HTTP 请求,例如:

# 导入 requests 包
import requests

# 发送请求
x = requests.get('https://www.dazuicheng.com/')

# 返回网页内容
print(x.text)

每次调用 requests 请求之后,会返回一个 response 对象,该对象包含了具体的响应信息,如状态码、响应头、响应内容等:

print(response.status_code)  # 获取响应状态码
print(response.headers)  # 获取响应头
print(response.content)  # 获取响应内容

更多响应信息如下:

属性或方法说明
apparent_encoding编码方式
close()关闭与服务器的连接
content返回响应的内容,以字节为单位
cookies返回一个 CookieJar 对象,包含了从服务器发回的 cookie
elapsed返回一个 timedelta 对象,包含了从发送请求到响应到达之间经过的时间量,可以用于测试响应速度。比如 r.elapsed.microseconds 表示响应到达需要多少微秒。
encoding解码 r.text 的编码方式
headers返回响应头,字典格式
history返回包含请求历史的响应对象列表(url)
is_permanent_redirect如果响应是永久重定向的 url,则返回 True,否则返回 False
is_redirect如果响应被重定向,则返回 True,否则返回 False
iter_content()迭代响应
iter_lines()迭代响应的行
json()返回结果的 JSON 对象 (结果需要以 JSON 格式编写的,否则会引发错误)
links返回响应的解析头链接
next返回重定向链中下一个请求的 PreparedRequest 对象
ok检查 "status_code" 的值,如果小于400,则返回 True,如果不小于 400,则返回 False
raise_for_status()如果发生错误,方法返回一个 HTTPError 对象
reason响应状态的描述,比如 "Not Found" 或 "OK"
request返回请求此响应的请求对象
status_code返回 http 的状态码,比如 404 和 200(200 是 OK,404 是 Not Found)
text返回响应的内容,unicode 类型数据
url返回响应的 URL

实例

# 导入 requests 包
import requests
import json

    
# 发送请求
x = requests.get('https://api.ipify.org?format=json')
    
# 返回 http 的状态码
print(x.status_code)
    
# 响应状态的描述
print(x.reason)
    
# 返回编码
print(x.apparent_encoding)

json_object = json.loads(x.text)
print(f"public IP: {json_object['ip']}")

实时汇率转换小程序

import requests
import time
import json
import datetime
import sys


supported_currency_name = {
    0: '欧元',
    1: '美元',
    2: '日元',
    3: '英镑',
    4: '人民币'
}

supported_currency_code = {
    0: 'EUR',
    1: 'USD',
    2: 'JPY',
    3: 'GBP',
    4: 'CNY'
}

def get_rate(from_currency, to_currency):
    url = 'https://v6.exchangerate-api.com/v6/b3c4901ef11a8bcb17276efc/latest/EUR'
    response = requests.get(url)
    json_object = json.loads(response.text)
    if json_object["result"] == "success":
        from_rate = float(json_object['conversion_rates'][from_currency])
        to_rate = float(json_object['conversion_rates'][to_currency])
        from_to_rate = to_rate / from_rate
        timestamp = int(json_object['time_last_update_unix'])
        query_time = datetime.datetime.fromtimestamp(timestamp)
        return query_time, from_to_rate
    else:
        return 0, 0

print("$$$$$$$$欢迎使用汇率转换程序¥¥¥¥¥¥¥¥¥¥")
print(f"支持的货币代码有:{supported_currency_code}")
from_currency_choice = int(input('输入原始货币编号数字:'))
from_currency_code = supported_currency_code[from_currency_choice]
from_currency_name = supported_currency_name[from_currency_choice]
print(f"你输入的初始货币:{from_currency_name}")
print(f"支持的货币代码有:{supported_currency_code}")
to_currency_choice = int(input('输入目标货币编号数字:'))
to_currency_code   = supported_currency_code[to_currency_choice]
to_currency_name = supported_currency_name[to_currency_choice]
print(f"你输入的目标货币:{to_currency_name}")
source_amount = int(input('输入待转换的货币金额: '))

query_time, rate = get_rate(from_currency_code, to_currency_code)
if query_time == 0 and rate == 0:
    print("获取汇率失败...")
    sys.exit()

target_amount = source_amount * rate

print(f"[{query_time}]: {source_amount} {from_currency_name} 可以换 {target_amount:.2f} {to_currency_name}")