本文根据 http://tf.wiki/ 编写。
使用 keras 实现线性模型
keras 的 model 中可以使用 variables 直接导出模型中的参数,这样就能使用优化器进行非常方便的求导,这跟上一章中的 variables 数组的效果是一样的。
import tensorflow as tf
# 初始化数据集
X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) # 这是一个2x3的矩阵,其实代表的是两个样本,每个样本是一个3维的向量
y = tf.constant([[10.0], [20.0]])
class Linear(tf.keras.Model):
def __init__(self):
# 调用父类的构造函数
super().__init__()
# 创建一个全连接层,这里的全连接层实际上就代表一个线性变换,wX+b
self.dense = tf.keras.layers.Dense(
units=1,
activation=None,
kernel_initializer=tf.zeros_initializer(),
bias_initializer=tf.zeros_initializer()
)
def call(self, input):
# 实现一个类似函数调用的形式,例如 pred=model(X)
output = self.dense(input)
return output
# 以下代码结构与前节类似
model = Linear()
# 定义优化器
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for i in range(100):
with tf.GradientTape() as tape:
y_pred = model(X) # 调用模型 y_pred = model(X) 而不是显式写出 y_pred = a * X + b
loss = tf.reduce_mean(tf.square(y_pred - y)) # 损失函数表达式
grads = tape.gradient(loss, model.variables) # 使用 model.variables 这一属性直接获得模型中的所有变量
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
print(model.variables)
[<tf.Variable 'linear_2/dense_6/kernel:0' shape=(3, 1) dtype=float32, numpy=
array([[0.40784466],
[1.1910654 ],
[1.9742855 ]], dtype=float32)>, <tf.Variable 'linear_2/dense_6/bias:0' shape=(1,) dtype=float32, numpy=array([0.7832204], dtype=float32)>]
全连接层 (Fully-connected Layer,tf.keras.layers.Dense )是 Keras 中最基础和常用的层之一,对输入矩阵 $A$ 进行 $f(AX+b)$ 的线性变换和激活操作,其中 $f$ 函数就是激活,如果去掉,就相当于一个简单的线性变换。
参数介绍
我们从编写一个最简单的 多层感知机 (Multilayer Perceptron, MLP),或者说 “多层全连接神经网络” 开始,介绍 TensorFlow 的模型编写方式。多层感知机就是一个简单的全连接神经网络。
import numpy as np
class MNISTLoader(tf.keras.Model):
def __init__(self) -> None:
super().__init__()
mnist = tf.keras.datasets.mnist
(self.train_data, self.train_label), (self.test_data, self.test_label) = mnist.load_data()
# MNIST中的图像默认为uint8(0-255的数字)。以下代码将其归一化到0-1之间的浮点数,并在最后增加一维作为颜色通道
# expand_dims()函数的作用是增加一个维度,这里是增加一个颜色通道的维度, axis=-1表示在最后一个维度增加
self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1) # [60000, 28, 28, 1]
self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1) # [10000, 28, 28, 1]
# 调整标签的数据类型
self.train_label = self.train_label.astype(np.int32) # [60000]
self.test_label = self.test_label.astype(np.int32) # [10000]
self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0]
def get_batch(self, batch_size):
# 随机生成一个batch_size大小的索引列表
# 但是怎么保证每次取得值都不一样?
index = np.random.randint(0, np.shape(self.train_data)[0], batch_size)
return self.train_data[index, :], self.train_label[index]
class MLP(tf.keras.Model):
def __init__(self):
super().__init__()
self.flatten = tf.keras.layers.Flatten() # Flatten层将除第一维(batch_size)以外的维度展平
self.dense1 = tf.keras.layers.Dense(units=100, activation=tf.nn.relu) # 使用ReLU激活函数,该层的参数为100个
self.dense2 = tf.keras.layers.Dense(units=10) # 输出层,参数为10个
def call(self, inputs): # [batch_size, 28, 28, 1],输入的图像是28x28的,颜色通道为1
x = self.flatten(inputs) # [batch_size, 784],将除第一维batch_size以外的维度展平
x = self.dense1(x) # [batch_size, 100],全连接层,参数为100个
x = self.dense2(x) # [batch_size, 10]
output = tf.nn.softmax(x) # 对输出层的输出做softmax,得到的是概率分布,[batch_size, 10]
return output
# 模型训练
model = MLP() # 使用一个多层感知机模型
data_loader = MNISTLoader() # 使用一个数据加载器
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) # 使用Adam优化器
从 DataLoader 中随机取一批训练数据;
将这批数据送入模型,计算出模型的预测值;
将模型预测值与真实值进行比较,计算损失函数(loss)。这里使用 tf.keras.losses 中的交叉熵函数作为损失函数;
计算损失函数关于模型变量的导数;
将求出的导数值传入优化器,使用优化器的 apply_gradients 方法更新模型参数以最小化损失函数
num_epochs = 5
batch_size = 50
summary_writer = tf.summary.create_file_writer('./tensorboard') # 参数为记录文件所保存的目录
# 计算一共有多少个batch,通过训练集的大小整除每个batch的大小和epoch的大小
num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
for batch_index in range(num_batches):
# 取出一组batch
X, y = data_loader.get_batch(batch_size)
with tf.GradientTape() as tape:
y_pred = model(X)
# 计算loss值
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
with summary_writer.as_default():
tf.summary.scalar('loss', tf.reduce_mean(loss), step=batch_index)
# 打印loss的值
print("batch {}: loss {}".format(batch_index, loss.numpy()))
# 计算梯度
grads = tape.gradient(loss, model.variables)
# 使用优化器更新参数
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
最后,我们使用测试集评估模型的性能。这里,我们使用 tf.keras.metrics 中的 SparseCategoricalAccuracy 评估器来评估模型在测试集上的性能,该评估器能够对模型预测的结果与真实结果进行比较,并输出预测正确的样本数占总样本数的比例。我们迭代测试数据集,每次通过 update_state() 方法向评估器输入两个参数: y_pred 和 y_true ,即模型预测出的结果和真实结果。评估器具有内部变量来保存当前评估指标相关的参数数值(例如当前已传入的累计样本数和当前预测正确的样本数)。迭代结束后,我们使用 result() 方法输出最终的评估指标值(预测正确的样本数占总样本数的比例)。
在以下代码中,我们实例化了一个 tf.keras.metrics.SparseCategoricalAccuracy 评估器,并使用 For 循环迭代分批次传入了测试集数据的预测结果与真实结果,并输出训练后的模型在测试数据集上的准确率。
# 实例化一个稀疏交叉熵评估器
sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
# 计算需要多少轮才能完成
num_batches = int(data_loader.num_test_data // batch_size)
for batch_index in range(num_batches):
# 计算每次取出的batch的上标下标
start_index, end_index = batch_index * \
batch_size, (batch_index + 1) * batch_size
# 进行一轮预测
y_pred = model.predict(data_loader.test_data[start_index: end_index])
# 更新评估器
sparse_categorical_accuracy.update_state(
y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
print("test accuracy: %f" % sparse_categorical_accuracy.result())
test accuracy: 0.975600
class DataLoader():
def __init__(self):
# 从网络上下载数据集
path = tf.keras.utils.get_file('nietzsche.txt',
origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt')
with open(path, encoding='utf-8') as f:
self.raw_text = f.read().lower()
# 将原始文本去重并排序,这里把每个词都看做一个数组元素
self.chars = sorted(list(set(self.raw_text)))
# 创建一个字典,将字符映射到索引
self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
# 创建一个字典,将索引映射到字符
self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
# 将原始文本转换为索引序列
self.text = [self.char_indices[c] for c in self.raw_text]
def get_batch(self, seq_length, batch_size):
seq = []
next_char = []
for i in range(batch_size):
# 随机选择一个起始位置
index = np.random.randint(0, len(self.text) - seq_length)
# 把一个长度为 seq_length 的字符串序列插入 seq 中
seq.append(self.text[index:index+seq_length])
# 记录这个序列的下一个字符
next_char.append(self.text[index+seq_length])
return np.array(seq), np.array(next_char) # [batch_size, seq_length], [num_batch]
RNN 网络的输入与输出
将 input 和 当前 state 输入 RNN,得到 output 和 下一个 stateoutput, state = self.cell(inputs[:, t, :], state)
图示
class RNN(tf.keras.Model):
def __init__(self, num_chars, batch_size, seq_length):
super().__init__()
# one-hot 编码长度
self.num_chars = num_chars
# 序列的长度
self.seq_length = seq_length
# 每次训练的 batch 大小
self.batch_size = batch_size
#
self.cell = tf.keras.layers.LSTMCell(units=256)
# 将处理结果映射到 one-hot 编码的长度上
self.dense = tf.keras.layers.Dense(units=self.num_chars)
def call(self, inputs, from_logits=False):
inputs = tf.one_hot(inputs, depth=self.num_chars) # [batch_size, seq_length, num_chars]
state = self.cell.get_initial_state(batch_size=self.batch_size, dtype=tf.float32) # 获得 RNN 的初始状态
for t in range(self.seq_length):
output, state = self.cell(inputs[:, t, :], state) # 通过当前输入input和前一时刻的状态state,得到输出和当前时刻的状态
logits = self.dense(output)
if from_logits: # from_logits 参数控制输出是否通过 softmax 函数进行归一化
return logits
else:
return tf.nn.softmax(logits)
def predict(self, inputs, temperature=1.):
batch_size, _ = tf.shape(inputs) # [batch_size, seq_length]
logits = self(inputs, from_logits=True) # 调用训练好的RNN模型,预测下一个字符的概率分布
prob = tf.nn.softmax(logits / temperature).numpy() # 使用带 temperature 参数的 softmax 函数获得归一化的概率分布值,如果temperature越大,那么用于分类的数据中最大最小的差值越小,相当于提高了丰富度,反之亦然。
return np.array([np.random.choice(self.num_chars, p=prob[i, :]) # 使用 np.random.choice 函数,choice:Generates a random sample from a given 1-D array,从概率分布 prob 中随机采样一个字符的索引
for i in range(batch_size.numpy())]) # 在预测的概率分布 prob 上进行随机取样,如果不给p的话就是uniform distribution 均匀分布。这里相当于给定数组中每个元素被抽到的概率
num_batches = 1000 # 训练 1000 轮
seq_length = 40 # 序列长度为 40
batch_size = 50 # 每次训练的 batch 大小为 50
learning_rate = 1e-3 # 学习率
data_loader = DataLoader()
# 传入的参数分别为 one-hot 编码的长度,batch 大小和序列长度
# num_chars 为去重之后的chars长度
model = RNN(num_chars=len(data_loader.chars),
batch_size=batch_size, seq_length=seq_length)
# 使用 adam 优化器
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for batch_index in range(num_batches):
# 取出 seq_length 长度的字符序列,标签为下一个字符,一共取出 batch_size 个
X, y = data_loader.get_batch(seq_length, batch_size)
# 使用 GradientTape 追踪梯度
with tf.GradientTape() as tape:
y_pred = model(X)
# 使用稀疏交叉熵计算损失
loss = tf.keras.losses.sparse_categorical_crossentropy(
y_true=y, y_pred=y_pred)
loss = tf.reduce_mean(loss)
print("batch %d: loss %f" % (batch_index, loss.numpy()))
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
batch 998: loss 2.455446
batch 999: loss 2.231981
X_, _ = data_loader.get_batch(seq_length, 1)
for diversity in [0.2, 0.5, 1.0, 1.2]: # 丰富度(即temperature)分别设置为从小到大的 4 个值
X = X_
print("diversity %f:" % diversity)
for t in range(400):
y_pred = model.predict(X, diversity) # 预测下一个字符的编号
print(data_loader.indices_char[y_pred[0]],
end='', flush=True) # 输出预测的字符
# 将预测的字符接在输入 X 的末尾,并截断 X 的第一个字符,以保证 X 的长度不变
X = np.concatenate(
[X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
print("\n")
diversity 0.200000:
the the the the the the ind and in the the the the the the the the the the the the the the the the the the the the the he the the the the the the the the the to the alle the the he the the the the the the ald and athe the the the he the the the the the the fore the the the the the the the the the the fore the the the the the the the ind the whe the the the the whe the the the the the the hes of t
diversity 0.500000:
ny ang fore wort deles the wond thare fof the the tho the hethe ald the ind the and and by or hithe merelly the thel the end the it ald indes the ond the halt ind allely of sert the ins io whe thes in the theace the secthe s ion the inte the toully and the sin the soct the heve wo the that and the mer the thit he
mand of the fere alemece the 1an the senon the he mentald the thel tithe hed and the
diversity 1.000000:
lhon
paniywgsthe iistidtbe suplro-sudiigt patl pe
thive sq int, whtues ousiwedingtoransitl pit m p mhht whar al
ecitl enlsiat ime s ce ofhangirsith, anienpee
incietice sopophe
nobts atyt-weet ind al so alf aythe of tale and mbesymect torere ,
?thettyvolldes of fanf ompec?t the whenenuoch onheucelty po toipwmcltobcosiid sod than of whar -lelllathest ino ali, titelsely ye helrminele that dactdiy fun
diversity 1.200000:
e.
atreutl bo
aluomocg sowhan thes, wiveuos cod
amte whisisdihs peally?,othe
pemosed me)derinctinncowaltmepleskt]ind?anytraogec
ofkhe-geg po5y asma os feydeehbe
ctitid ef,ort, sopveeve le wrenllolpqeteesyons bnsstrntt agndicys angassy finooousta4d, andell thvit al mad er wuafiat
fiis dvan ranopindant toote6 ohtio? hedsw withu, asy peltda standyinocu. p eler. ey f thes pivgittey d1e tititiwe. o
深度强化学习
强化学习 (Reinforcement learning,RL)强调如何基于环境而行动,以取得最大化的预期利益。结合了深度学习技术后的强化学习(Deep Reinforcement learning,DRL)更是如虎添翼。近年广为人知的 AlphaGo 即是深度强化学习的典型应用。
import gym
import random
from collections import deque
num_episodes = 500 # 游戏训练的总episode数量
num_exploration_episodes = 100 # 探索过程所占的episode数量
max_len_episode = 1000 # 每个episode的最大回合数
batch_size = 32 # 批次大小
learning_rate = 1e-3 # 学习率
gamma = 1. # 折扣因子
initial_epsilon = 1. # 探索起始时的探索率
final_epsilon = 0.01 # 探索终止时的探索率
class QNetwork(tf.keras.Model):
def __init__(self):
super().__init__()
# units 代表输出的维度,activation 代表激活函数
self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
self.dense3 = tf.keras.layers.Dense(units=2)
def call(self, inputs):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
return x
def predict(self, inputs):
q_values = self(inputs)
# 返回每行最大值的索引,axis的合法值为-1,0,1...到数组的维度-1,例如二维数组的axis合法值为0,1,-1
return tf.argmax(q_values, axis=-1)