实现上传接口
<span class="ne-text">main.go</span>
func main() {
//建立路由规则
//注册了对应于请求路径 /file/upload 的处理函数
http.HandleFunc("/file/upload", handler.UploadHandler)
http.HandleFunc("/file/upload/success", handler.UploadSuccessHandler)
//启动了一个端口为 8080 的 http 服务
err := http.ListenAndServe(":8080", nil)
if err != nil {
fmt.Printf("Failed to listen and serve: %v", err)
}
}
<span class="ne-text">handler处理函数</span>
package handler
import (
"fmt"
"io"
"net/http"
"os"
)
// 处理文件上传
func UploadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
//返回上传页面
f, err := os.ReadFile("./static/view/index.html")
if err != nil {
io.WriteString(w, err.Error())
}
io.WriteString(w, string(f))
} else if r.Method == "POST" {
//接收文件流 存储到本地目录
f, fileHeader, err := r.FormFile("file")
if err != nil {
fmt.Printf("Failed to get file: %v\n", err)
io.WriteString(w, err.Error())
}
defer f.Close()
//读取文件流
newFile, err := os.OpenFile("./upload/"+fileHeader.Filename, os.O_WRONLY|os.O_CREATE, os.ModePerm)
if err != nil {
return
}
defer newFile.Close()
_, err = io.Copy(newFile, f)
if err != nil {
fmt.Printf("Failed to save file: %v\n", err)
io.WriteString(w, err.Error())
}
//重定向,跳转页面
http.Redirect(w, r, "/file/upload/success", http.StatusFound)
}
}
// 上传成功的处理函数
func UploadSuccessHandler(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, "Upload Success")
}
保存文件元信息
保存 文件元信息包括:md5值(作为文件id,保证唯一性)、文件名、上传时间、文件大小、存储路径
- 引入计算文件hash(md5值)的相关处理函数
<span class="ne-text">util.go</span>
package util
import (
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"hash"
"io"
"os"
"path/filepath"
)
//文件流Sha1 结构
type Sha1Stream struct {
_sha1 hash.Hash
}
//更新文件流的Sha1
func (obj *Sha1Stream) Update(data []byte) {
if obj._sha1 == nil {
obj._sha1 = sha1.New()
}
obj._sha1.Write(data)
}
//计长文件流的Sha1
func (obj *Sha1Stream) Sum() string {
return hex.EncodeToString(obj._sha1.Sum([]byte("")))
}
func Sha1(data []byte) string {
_sha1 := sha1.New()
_sha1.Write(data)
return hex.EncodeToString(_sha1.Sum([]byte("")))
}
//文件的Sha1
func FileSha1(file *os.File) string {
_sha1 := sha1.New()
io.Copy(_sha1, file)
return hex.EncodeToString(_sha1.Sum(nil))
}
func MD5(data []byte) string {
_md5 := md5.New()
_md5.Write(data)
return hex.EncodeToString(_md5.Sum([]byte("")))
}
//文件的MD5
func FileMD5(file *os.File) string {
_md5 := md5.New()
io.Copy(_md5, file)
return hex.EncodeToString(_md5.Sum(nil))
}
//路径是否存在
func PathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
//获取文件大小
func GetFileSize(filename string) int64 {
var result int64
filepath.Walk(filename, func(path string, f os.FileInfo, err error) error {
result = f.Size()
return nil
})
return result
}
- 构建存储元文件信息的结构体,并且构建相关处理函数
package models
// 文件元信息结构
type FileMeta struct {
FileSha1 string //文件的唯一标致
Name string
Size int64
Path string //本地路径
UploadAt string
}
// 定义对象,来存储所有上传文件的原信息
// string 标识 文件的唯一标志 ---FileSha1
var fileMetas map[string]FileMeta
// 初始化函数
func init() {
fileMetas = make(map[string]FileMeta)
}
// 新增/更新文件元信息
func UpdateFileMeta(fileMeta FileMeta) {
fileMetas[fileMeta.FileSha1] = fileMeta
}
// 获取文件元信息
func GetFileMeta(fileSha1 string) FileMeta {
return fileMetas[fileSha1]
}
// 删除文件元信息
func DeleteFileMeta(fileSha1 string) {
delete(fileMetas, fileSha1)
}
- **修改文件上传逻辑 **
<span class="ne-text">handler\handler.go UploadHandler</span>
// 处理文件上传
func UploadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
//返回上传页面
f, err := os.ReadFile("./static/view/index.html")
if err != nil {
io.WriteString(w, err.Error())
}
io.WriteString(w, string(f))
} else if r.Method == "POST" {
//接收文件流 存储到本地目录
f, fileHeader, err := r.FormFile("file")
if err != nil {
fmt.Printf("Failed to get file: %v\n", err)
io.WriteString(w, err.Error())
}
defer f.Close()
//定义文件元信息结构体
fileMeta := models.FileMeta{
Name: fileHeader.Filename,
Path: "./upload/" + fileHeader.Filename,
UploadAt: time.Now().Format("2006-01-02 15:04:05"),
}
//读取文件流
newFile, err := os.Create(fileMeta.Path)
if err != nil {
return
}
defer newFile.Close()
fileMeta.Size, err = io.Copy(newFile, f)
//计算文件的sha1值
newFile.Seek(0, 0)
//sha1值添加到结构体的数据中去
fileMeta.FileSha1 = util.FileSha1(newFile)
//新增文件元信息
models.UpdateFileMeta(fileMeta)
if err != nil {
fmt.Printf("Failed to save file: %v\n", err)
io.WriteString(w, err.Error())
}
//重定向,跳转页面
http.Redirect(w, r, "/file/upload/success", http.StatusFound)
}
}
总结
文件上传流程:
- 获取上传页面
- 选取本地文件,form形式上传文件
- 云端接收到文件流,写入本地存储
- 云端更新文件原信息集合
文件元信息查询接口
- 编写处理查询文件元信息的函数
func GetFileMetaHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
fileHash := r.Form["filehash"][0]
fileMeta := models.GetFileMeta(fileHash)
//返回给客户端
data, err := json.Marshal(fileMeta)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(data)
}
<span class="ne-text">main.go</span>
中注册对应于请求路径 /file/meta的处理函数
func main() {
//建立路由规则
//注册了对应于请求路径 /file/upload 的处理函数
http.HandleFunc("/file/upload", handler.UploadHandler)
http.HandleFunc("/file/upload/success", handler.UploadSuccessHandler)
http.HandleFunc("/file/meta", handler.GetFileMetaHandler)
//启动了一个端口为 8080 的 http 服务
err := http.ListenAndServe(":8080", nil)
if err != nil {
fmt.Printf("Failed to listen and serve: %v", err)
}
}
测试
文件下载接口
- 编写处理文件下载的函数
//下载文件的处理函数
func DownloadHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
fileHash := r.Form.Get("filehash")
fileMeta := models.GetFileMeta(fileHash)
//根据本地路径打开 文件
f, err := os.Open(fileMeta.Path)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
defer f.Close()
//读取文件的全部内容
data, err := io.ReadAll(f)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
//添加http的响应头
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", "attachment; filename=\""+fileMeta.Name+"\"")
w.Write(data)
}
<span class="ne-text">main.go</span>
func main() {
//建立路由规则
//注册了对应于请求路径 /file/upload 的处理函数
http.HandleFunc("/file/upload", handler.UploadHandler)
http.HandleFunc("/file/upload/success", handler.UploadSuccessHandler)
http.HandleFunc("/file/meta", handler.GetFileMetaHandler)
http.HandleFunc("/file/download", handler.DownloadHandler)
//启动了一个端口为 8080 的 http 服务
err := http.ListenAndServe(":8080", nil)
if err != nil {
fmt.Printf("Failed to listen and serve: %v", err)
}
}
文件Meta更新接口和文件删除接口
- 编写更新文件和删除文件的处理
/ 更新文件原信息接口
func FileMetaUpDateHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
//是否支持同命名
opType := r.Form.Get("op")
fileHash := r.Form.Get("filehash")
fileName := r.Form.Get("filename")
if opType != "0" {
//不支持
w.WriteHeader(http.StatusForbidden)
return
}
//只支持post方法
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
fileMeta := models.GetFileMeta(fileHash)
fileMeta.Name = fileName
models.UpdateFileMeta(fileMeta)
w.WriteHeader(http.StatusOK)
//把文件元信息返回给客户端
data, err := json.Marshal(fileMeta)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write(data)
}
// 文件删除处理函数(删除文件及其元信息)
func FileMetaDeleteHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
fileHash := r.Form.Get("filehash")
//物理删除本地存储的文件
//取出文件的存储位置
fileMeta := models.GetFileMeta(fileHash)
//删除文件
err := os.Remove(fileMeta.Path)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
//删除文件索引信息
models.DeleteFileMeta(fileHash)
w.WriteHeader(http.StatusOK)
}
- 添加路由规则
http.HandleFunc("file/update", handler.FileMetaUpDateHandler)
http.HandleFunc("/file/delete", handler.FileMetaDeleteHandler)
测试
- 更新文件信息
- 删除文件
Mysql主从复制
Mysql主从机的配置
主容器配置
- 创建master容器 (3307)
docker run -d --name master \
-p 3307:3306 \
--privileged=true \
-v /root/docker/mysql/master/log:/var/log/mysql \
-v /root/docker/mysql/master/data:/var/lib/mysql \
-v /root/docker/mysql/master/conf:/etc/mysql/conf.d \
-e MYSQL_ROOT_PASSWORD=root \
mysql:8.4
- master配置
主数据库 my.cnf
[mysqld]
# 唯一ID
server_id=1
# 设置需要复制的数据库(可以不配置)
binlog-do-db=master_slave
# 启用二进制日志 (重要,不配置此项会导致新建表等动作无法复制,只能复制crud)
log-bin=master-bin
# 设置不要复制的数据库
# binlog-ignore-db=testDB
- 重启master ,查看master主状态
// 重启
docker restart master
// 进入master容器
docker exec -it master /bin/bash
// 连接MySQL
mysql -u root -p
// 创建slave账号密码 用于同步使用,%可以替换为固定ip
CREATE USER 'slave'@'%'IDENTIFIED BY '123456';
// 设置slave的权限
//REPLICATION SLAVE:允许从服务器连接到主服务器并复制数据。(授予复制权限)
//REPLICATION CLIENT:允许从服务器读取二进制日志文件,这对于监控复制状态和调试非常有用。(授予读取二进制日志文件的权限)
//ON .:表示对所有数据库和表授权
GRANT REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO 'slave'@'%';
// 刷新权限
flush privileges;
查看master主状态
show master status \G
//如果mysql版本大于8.4,则要使用
SHOW BINARY LOG STATUS // binlog.000002
slave容器配置
- 创建slave容器3308:
docker run -d --name slave -p 3308:3306 --privileged=true \
-v /root/docker/mysql/slave/log:/var/log/mysql \
-v /root/docker/mysql/slave/data:/var/lib/mysql \
-v /root/docker/mysql/slave/conf:/etc/mysql/conf.d \
-e MYSQL_ROOT_PASSWORD=root \
mysql:8.4
- slave配置文件
cd /root/docker/mysql/slave/conf
vim my.cnf
[mysqld]
# 唯一ID
server_id=2
# 启用二进制日志
relay-log=mysql-relay
- 重启slave
// 重启
docker restart slave
// 进入slave容器
docker exec -it slave /bin/bash
// 连接MySQL
mysql -u root -p
- 运行配置slave同步主机
语法:
change master to master_host='宿主机ip', master_user='slave', master_password='123456', master_port=3307,master_log_file='mall-mysql-bin.000001', master_log_pos=154, master_connect_retry=30;
// 参数说明:
master_host:主数据库的IP地址;
master_port:主数据库的运行端口;
master_user:在主数据库创建的用于同步数据的用户账号;
master_password:在主数据库创建的用于同步数据的用户密码;
master_log_file:指定从数据库要复制数据的日志文件,[通过查看主数据的状态show master status \G,获取File参数;
master_log_pos:指定从数据库从哪个位置开始复制数据,通过查看主数据的状态show master status \G,获取Position参数;
master_connect_retry:连接失败重试的时间间隔,单位为秒。
// 执行slave连接master
change master to master_host='47.122.53.44', master_user='slave', master_password='123456', master_port=3307,master_log_file='binlog.000002', master_log_pos=0, master_connect_retry=30;
//mysql版本8版本以上用:
# 可以在连接工具中执行
CHANGE REPLICATION SOURCE TO
source_host='47.122.53.44',
source_port=3307,
source_user='slave',
source_password='123456',
source_log_file='binlog.000002',
source_log_pos=1281;
- 启动slave同步进程:
start replica;
//查看slave状态:
show replica status \G
当Slave_IO_Running和Slave_SQL_Running都为YES的时候就表示主从同步设置成功了。
验证同步是否成功
- 主库创建一个数据库,从库可以查看到。
#创建数据库
create database master_slave default character set utf8;
#创建表
CREATE TABLE tbl_test(`name` varchar(10) not null, `age` int(11) not null)default charset utf8;
#查看表
show tables;
#从库中查看
show databases;
use master_slave
#查看表
show tables;
数据同步成功
数据库设计
文件表
CREATE TABLE `file` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`sha1` char(40) NOT NULL DEFAULT '' COMMENT '文件hash',
`name` varchar(256) NOT NULL DEFAULT '' COMMENT '文件名',
`size` bigint(20) DEFAULT '0' COMMENT '文件大小',
`path` varchar(1024) NOT NULL DEFAULT '' COMMENT '文件存储位置',
`create_at` datetime default NOW() COMMENT '创建日期',
`update_at` datetime default NOW() on update current_timestamp() COMMENT '更新日期',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '状态(可用/禁用/已删除等状态)',
`ext1` int(11) DEFAULT '0' COMMENT '备用字段1',
`ext2` text COMMENT '备用字段2',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_file_hash` (`sha1`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 创建连接数据库的方法
package mysql
import (
"database/sql"
"fmt"
_ "github.com/go-sql-driver/mysql"
"os"
)
var database *sql.DB
func init() {
database, _ = sql.Open("mysql", "root:xxx@tcp(xxxx:3307)?charset=utf8")
database.SetMaxOpenConns(1000)
err := database.Ping()
if err != nil {
fmt.Println("Failed to connect to database,err:", err)
os.Exit(1)
}
}
// 提供外部访问的方法:返回数据库连接对象
func DBConn() *sql.DB {
return database
}
新增文件信息
- 提供文件上传成功后,将数据插入数据库的方法
// 提供对文件进行访问的方法
// 文件上传完成后,要将数据插入数据库
func FileInsert(fileHash string, name string, size int64, path string) bool {
statement, err := mysql.DBConn().Prepare("insert ignore into file(`sha1`,`name`,`size`,`path`,`status`) " +
"values(?,?,?,?,1)")
if err != nil {
fmt.Println("Failed to create statement", err.Error())
return false
}
defer statement.Close()
result, err := statement.Exec(fileHash, name, size, path)
if err != nil {
fmt.Println("Failed to execute statement", err.Error())
return false
}
if affected, err := result.RowsAffected(); nil == err {
if affected <= 0 {
fmt.Printf("File with hash:%s has been uploaded before", fileHash)
return false
}
return true
}
return false
}
- fileMeta.go中添加新增文件元信息到数据库的函数
// 新增/更新文件元信息(到数据库中)
func UpdateFileMetaToDB(fileMeta FileMeta) {
db.FileInsert(fileMeta.FileSha1, fileMeta.Name, fileMeta.Size, fileMeta.Path)
}
- ** 添加文件新增 路由规则对应的处理函数 **
<span class="ne-text">handler.go</span>
// 处理文件上传
func UploadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
//返回上传页面
f, err := os.ReadFile("./static/view/index.html")
if err != nil {
io.WriteString(w, err.Error())
}
io.WriteString(w, string(f))
} else if r.Method == "POST" {
//接收文件流 存储到本地目录
f, fileHeader, err := r.FormFile("file")
if err != nil {
fmt.Printf("Failed to get file: %v\n", err)
io.WriteString(w, err.Error())
}
defer f.Close()
//定义文件元信息结构体
fileMeta := models.FileMeta{
Name: fileHeader.Filename,
Path: "./upload/" + fileHeader.Filename,
UploadAt: time.Now().Format("2006-01-02 15:04:05"),
}
//读取文件流
newFile, err := os.Create(fileMeta.Path)
if err != nil {
return
}
defer newFile.Close()
fileMeta.Size, err = io.Copy(newFile, f)
if err != nil {
fmt.Printf("Failed to save file: %v\n", err)
io.WriteString(w, err.Error())
}
//计算文件的sha1值
newFile.Seek(0, 0)
//sha1值添加到结构体的数据中去
fileMeta.FileSha1 = util.FileSha1(newFile)
//新增文件元信息
models.UpdateFileMetaToDB(fileMeta)
//重定向,跳转页面
http.Redirect(w, r, "/file/upload/success", http.StatusFound)
}
}
测试
获取文件元信息
- 创建文件
//文件的结构体
type File struct {
Hash string
Name sql.NullString
Size sql.NullInt64
Path sql.NullString
}
<span class="ne-text">file.go</span>
// 从数据库中获取文件元信息
func GetFileMeta(fileHash string) (*File, error) {
statement, err := mysql.DBConn().Prepare(
"select sha1,path,name,size from file where sha1=? and status=1 limit 1")
if err != nil {
fmt.Println("Failed to create statement", err.Error())
return nil, err
}
defer statement.Close()
fileMeta := File{}
err = statement.QueryRow(fileHash).Scan(&fileMeta.Hash, &fileMeta.Path, &fileMeta.Name, &fileMeta.Size)
if err != nil {
if err == sql.ErrNoRows {
//查不到对应记录,返回参数及错误均为nil
return nil, nil
}
fmt.Println(err.Error())
return nil, err
}
return &fileMeta, nil
}
<span class="ne-text">fileMeta.go</span>
// 从数据库中获取文件元信息
func GetFileMetaDB(filesha1 string) (FileMeta, error) {
fileMeta, err := db.GetFileMeta(filesha1)
if err != nil {
return FileMeta{}, err
}
meta := FileMeta{
FileSha1: fileMeta.Hash,
Name: fileMeta.Name.String,
Size: fileMeta.Size.Int64,
Path: fileMeta.Path.String,
}
return meta, nil
}
<span class="ne-text">Handler.go</span>
// 查询文件元信息的函数
func GetFileMetaHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
fileHash := r.Form["filehash"][0]
fileMeta, err := models.GetFileMetaDB(fileHash)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
//返回给客户端
data, err := json.Marshal(fileMeta)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(data)
}
账号系统与鉴权
账号系统的功能
- 支持用户注册/登录
- 支持用户session鉴权
- 用户数据资源隔离
Mysql用户表设计
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`user_name` varchar(64) NOT NULL DEFAULT '' COMMENT '用户名',
`user_pwd` varchar(256) NOT NULL DEFAULT '' COMMENT '用户encoded密码',
`email` varchar(64) DEFAULT '' COMMENT '邮箱',
`phone` varchar(128) DEFAULT '' COMMENT '手机号',
`email_validated` tinyint(1) DEFAULT 0 COMMENT '邮箱是否已验证',
`phone_validated` tinyint(1) DEFAULT 0 COMMENT '手机号是否已验证',
`signup_at` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '注册日期',
`last_active` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后活跃时间戳',
`profile` text COMMENT '用户属性',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '账户状态(启用/禁用/锁定/标记删除等)',
PRIMARY KEY (`id`),
UNIQUE KEY `idx_username` (`user_name`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
用户注册接口
<span class="ne-text">用户注册处理函数 user.go</span>
// 通过用户名和密码完成用户注册操作
func UserSignUp(userName string, password string) bool {
statement, err := mysql.DBConn().Prepare(
"insert ignore into user (`user_name`,`user_pwd`) values (?,?)")
if err != nil {
fmt.Println("mysql statement error:", err.Error())
}
defer statement.Close()
result, err := statement.Exec(userName, password)
if err != nil {
fmt.Println("mysql insert error:", err.Error())
return false
}
//重复注册
if affected, err := result.RowsAffected(); nil == err && affected > 0 {
return true
}
return false
}
<span class="ne-text">UserHandler.go 用户注册请求</span>
处理
// 盐值
const salt = "*#468"
// 处理用户注册请求
func SignUpHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
//返回上传页面
f, err := os.ReadFile("./static/view/signup.html")
if err != nil {
io.WriteString(w, err.Error())
}
io.WriteString(w, string(f))
} else if r.Method == "POST" {
r.ParseForm()
userName := r.Form.Get("username")
password := r.Form.Get("password")
//有效性校验
if len(userName) < 3 || len(password) < 5 {
w.Write([]byte("invalid parameter"))
return
}
//密码加密处理
enc_password := util.Sha1([]byte(password + salt))
//写入数据库中
suc := db.UserSignUp(userName, enc_password)
if suc {
w.Write([]byte("SUCCESS"))
} else {
w.Write([]byte("FAIL"))
}
}
}
- **建立路由规则 **
<span class="ne-text">main.go</span>
//-------user-----
http.HandleFunc("/user/signup", handler.SignUpHandler)
用户登录接口
- 判断密码是否正确
<span class="ne-text">user.go</span>
// 判断密码是否正确
func UserSignIn(userName string, encPwd string) bool {
stmt, err := mysql.DBConn().Prepare("select user_pwd from user where user_name=? limit 1")
if err != nil {
fmt.Println("mysql statement error:", err.Error())
return false
}
defer stmt.Close()
rows, err := stmt.Query(userName)
if err != nil {
fmt.Println("mysql query error:", err.Error())
return false
} else if rows == nil {
fmt.Println("user not found")
return false
}
defer rows.Close()
var user_pwd string
for rows.Next() {
err := rows.Scan(&user_pwd)
if err != nil {
fmt.Println("mysql row scan error:", err.Error())
return false
}
}
if user_pwd == encPwd {
return true
}
return false
}
- 用户登录成功后,会形成token值,要编写用户登录的token值
// 存放用户登录的token值
func UpdateToken(userName string, token string) bool {
stmt, err := mysql.DBConn().Prepare(
"replace into user_token(`user_name`,`user_token`) values (?,?)")
if err != nil {
fmt.Println("mysql statement error:", err.Error())
return false
}
defer stmt.Close()
result, err := stmt.Exec(userName, token)
if err != nil {
return false
}
if affected, err := result.RowsAffected(); nil == err && affected > 0 {
return true
}
fmt.Println("token not found,err", err.Error())
return false
}
- 用户登录请求处理函数
<span class="ne-text">UserHandler.go</span>
// 用户登录处理
func SignInHandler(w http.ResponseWriter, r *http.Request) {
//1.检验用户名及密码
r.ParseForm()
username := r.Form.Get("username")
password := r.Form.Get("password")
encPassword := util.Sha1([]byte(password + salt))
pwdChecked := db.UserSignIn(username, encPassword)
if !pwdChecked {
w.Write([]byte("FAIL"))
return
}
//2.生成访问凭证(token)
token := GenToken(username)
result := db.UpdateToken(username, token)
if !result {
w.Write([]byte("FAIL"))
return
}
//3.登录成功重定向到首页
respMsg := util.RespMsg{
Code: 200,
Msg: "SUCCESS",
Data: struct {
Location string
Username string
Token string
}{
Location: "http://" + r.Host + "/static/view/home.html",
Username: username,
Token: token,
},
}
w.Write(respMsg.JSONBytes())
}
** **生成token的规则函数
func GenToken(username string) string {
//40位字符:md5(username+timestamp+token_salt)+timestamp[:8]
ts := fmt.Sprintf("%x", time.Now().Unix())
tokenPrefix := util.MD5([]byte(username + ts + "_tokensalt"))
//40位token
return tokenPrefix + ts[:8]
}
- 包装返回结果的工具类
<span class="ne-text">resp.go</span>
package util
import (
"encoding/json"
"fmt"
"log"
)
// RespMsg : http响应数据的通用结构
type RespMsg struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data interface{} `json:"data"`
}
// NewRespMsg : 生成response对象
func NewRespMsg(code int, msg string, data interface{}) *RespMsg {
return &RespMsg{
Code: code,
Msg: msg,
Data: data,
}
}
// JSONBytes : 对象转json格式的二进制数组
func (resp *RespMsg) JSONBytes() []byte {
r, err := json.Marshal(resp)
if err != nil {
log.Println(err)
}
return r
}
// JSONString : 对象转json格式的string
func (resp *RespMsg) JSONString() string {
r, err := json.Marshal(resp)
if err != nil {
log.Println(err)
}
return string(r)
}
// GenSimpleRespStream : 只包含code和message的响应体([]byte)
func GenSimpleRespStream(code int, msg string) []byte {
return []byte(fmt.Sprintf(`{"code":%d,"msg":"%s"}`, code, msg))
}
// GenSimpleRespString : 只包含code和message的响应体(string)
func GenSimpleRespString(code int, msg string) string {
return fmt.Sprintf(`{"code":%d,"msg":"%s"}`, code, msg)
}
- 在
<span class="ne-text">main.go主函数中设置 </span><strong><span class="ne-text">静态文件目录</span></strong>
//设置静态文件目录
fs := http.FileServer(http.Dir("./static"))
// 设置访问路径为"/static/",这样访问http://localhost:8080/static/index.html时,会提供index.html文件
http.Handle("/static/", http.StripPrefix("/static/", fs))
获取用户信息接口
<span class="ne-text">user.go 查询用户信息</span>
// 查询用户信息
func GetUserInfo(username string) (User, error) {
user := User{}
stmt, err := mysql.DBConn().Prepare(
"select user_name,signup_at from user where user_name=? limit 1 ")
if err != nil {
fmt.Println("mysql statement error:", err.Error())
return user, err
}
defer stmt.Close()
//执行查询操作
err = stmt.QueryRow(username).Scan(&user.Username, &user.SignupAt)
if err != nil {
fmt.Println("mysql row scan error:", err.Error())
return user, err
}
return user, nil
}
<span class="ne-text">UserHandler.go 获取用户信息的处理函数</span>
func UserInfoHandler(w http.ResponseWriter, r *http.Request) {
//1.解析请求参数
r.ParseForm()
username := r.Form.Get("username")
//token := r.Form.Get("token")
/*//2.验证token是否有效
isTokenValid := IsTokenValid(token)
if !isTokenValid {
w.WriteHeader(http.StatusForbidden)
return
}*/
//3.查询用户信息
userInfo, err := db.GetUserInfo(username)
if err != nil {
w.WriteHeader(http.StatusForbidden)
return
}
//4.组装并响应用户数据
respMsg := util.RespMsg{
Code: 200,
Msg: "SUCCESS",
Data: userInfo,
}
w.Write(respMsg.JSONBytes())
}
// 验证token是否有效:
func IsTokenValid(token string) bool {
if len(token) != 40 {
return false
}
// TODO: 判断token的时效性,是否过期
// TODO: 从数据库表tbl_user_token查询username对应的token信息
// TODO: 对比两个token是否一致
return true
}
- 添加路由规则
http.HandleFunc("/user/info", handler.HTTPInterceptor(handler.UserInfoHandler))
添加拦截器
// 拦截器
func HTTPInterceptor(h http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
username := r.Form.Get("username")
token := r.Form.Get("token")
//判断参数 是否有效
if len(username) < 3 || !IsTokenValid(token) {
w.WriteHeader(http.StatusForbidden)
return
}
h(w, r)
})
}
使用
http.HandleFunc("/user/info", handler.HTTPInterceptor(handler.UserInfoHandler))
文件的校验值计算
校验算法类型
- CRC(32/64) ------ 循环冗余校验
CRC是一种用于<span class="ne-text">检测数据传输或存储中错误的方法</span>
。它通过计算数据的循环冗余检查值来验证数据的完整性。CRC32生成一个32位的校验值,常用于检测小到中等大小的数据文件。(数据传输的检验 检测数据是否丢失)
- MD5
MD5(Message Digest Algorithm 5)是一种广泛应用的哈希算法,能够将任意长度的数据转换为固定长度的哈希值。在文件校验中,MD5通过计算文件的哈希值,将文件内容转化为唯一的<strong><span class="ne-text">128位</span></strong>
(16字节)哈希值。这个哈希值可以用来验证文件的完整性,确保文件在传输或存储过程中没有被篡改或损坏1。
- SHA1
SHA1(Secure Hash Algorithm 1)是一种加密哈希函数,产生一个<span class="ne-text">160位</span>
的哈希值。
秒传原理
文件秒传是一种文件上传优化技术,核心思想是:如果服务器上已经存在与要上传文件相同的文件,则直接复用已有文件,而不需要再次上传,从而实现"秒传"
通俗的说,你把要上传的东西上传,服务器会先做MD5校验,如果服务器上有一样的东西,它就直接给你个新地址,其实你下载的都是服务器上的同一个文件,想要不秒传,其实只要让MD5改变,就是对文件本身做一下修改(改名字不行),例如一个文本文件,你多加几个字,MD5就变了,就不会秒传了。
场景:
- 用户上传
- 离线下载
- 好友分享
关键点:
- 记录每个文件的hash值(MD5,SHA1等)
- 用户文件关联
服务架构
用户文件表设计与创建
CREATE TABLE `user_file`
(
`id` int(11) NOT NULL PRIMARY KEY AUTO_INCREMENT,
`user_name` varchar(64) NOT NULL,
`sha1` varchar(64) NOT NULL DEFAULT '' COMMENT '文件hash',
`size` bigint(20) DEFAULT '0' COMMENT '文件大小',
`file_name` varchar(256) NOT NULL DEFAULT '' COMMENT '文件名',
`upload_at` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '上传时间',
`last_update` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
`status` int(11) NOT NULL DEFAULT '0' COMMENT '文件状态(0正常1已删除2禁用)',
UNIQUE KEY `idx_user_file` (`user_name`, `sha1`), -- 唯一性约束
KEY `idx_status` (`status`),
KEY `idx_user_id` (`user_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
改造文件上传接口
- 新增用户文件表结构体
// 用户文件表结构体
type UserFile struct {
UserName string
Hash string
FileName string
Size int64
UploadAt string
LastUpdated string
}
- 更新用户文件表函数
// 更新用户文件表
func OnUserFileUploadFinished(userName string, hash string, fileName string, size int64) bool {
stmt, err := mysql.DBConn().Prepare("insert ignore into user_file(`user_name`, `sha1`, `file_name`, `size`, `upload_at`) " +
"values(?, ?, ?, ?, ?)")
if err != nil {
fmt.Println(err.Error())
return false
}
defer stmt.Close()
_, err = stmt.Exec(userName, hash, fileName, size, time.Now())
if err != nil {
return false
}
return true
}
- 更新文件上传接口(FileHandler.go)
// 处理文件上传
func UploadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
//返回上传页面
f, err := os.ReadFile("./static/view/index.html")
if err != nil {
io.WriteString(w, err.Error())
}
io.WriteString(w, string(f))
} else if r.Method == "POST" {
//接收文件流 存储到本地目录
f, fileHeader, err := r.FormFile("file")
if err != nil {
fmt.Printf("Failed to get file: %v\n", err)
io.WriteString(w, err.Error())
}
defer f.Close()
//定义文件元信息结构体
fileMeta := models.FileMeta{
Name: fileHeader.Filename,
Path: "./upload/" + fileHeader.Filename,
UploadAt: time.Now().Format("2006-01-02 15:04:05"),
}
//读取文件流
newFile, err := os.Create(fileMeta.Path)
if err != nil {
return
}
defer newFile.Close()
fileMeta.Size, err = io.Copy(newFile, f)
if err != nil {
fmt.Printf("Failed to save file: %v\n", err)
io.WriteString(w, err.Error())
}
//计算文件的sha1值
newFile.Seek(0, 0)
//sha1值添加到结构体的数据中去
fileMeta.FileSha1 = util.FileSha1(newFile)
//新增文件元信息
models.UpdateFileMetaToDB(fileMeta)
//todo:更新用户文件表信息
r.ParseForm()
userName := r.Form.Get("username")
suc := db.OnUserFileUploadFinished(userName, fileMeta.FileSha1, fileMeta.Name, fileMeta.Size)
if suc {
//重定向,跳转页面
http.Redirect(w, r, "/file/upload/success", http.StatusFound)
} else {
w.Write([]byte("Upload Failed."))
}
}
}
用户查询文件
<span class="ne-text">userfile.go</span>
// 批量获取用户文件信息
func QueryUserFileMetas(username string, limit int) ([]UserFile, error) {
stmt, err := mysql.DBConn().Prepare("select sha1,file_name,size,upload_at,last_update " +
"from user_file where user_name=? limit ?")
if err != nil {
return nil, err
}
defer stmt.Close()
rows, err := stmt.Query(username, limit)
if err != nil {
return nil, err
}
var userFiles []UserFile
//循环查询结果获得 数据
for rows.Next() {
var userFile UserFile
err := rows.Scan(&userFile.Hash, &userFile.FileName, &userFile.Size,
&userFile.UploadAt, &userFile.LastUpdated)
if err != nil {
fmt.Println(err.Error())
break
}
userFiles = append(userFiles, userFile)
}
return userFiles, nil
}
- **更新路由规则对应的处理函数 **
<span class="ne-text">fileHandler.go</span>
// 查询文件元信息的函数
func GetFileMetaHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
limit, _ := strconv.Atoi(r.Form.Get("limit"))
//fileMeta, err := models.GetFileMetaDB(fileHash)
userName := r.Form.Get("username")
userFiles, err := db.QueryUserFileMetas(userName, limit)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
//返回给客户端
data, err := json.Marshal(userFiles)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write(data)
}
实现秒传判断接口
<span class="ne-text">fileHandler.go</span>
// 尝试秒传接口
func TryFastUploadHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
//1.解析请求参数
username := r.Form.Get("username")
filehash := r.Form.Get("filehash")
filename := r.Form.Get("filename")
size, _ := strconv.Atoi(r.Form.Get("filesize"))
//2.从文件表中查询相同hash的文件记录
fileMeta, err := models.GetFileMetaDB(filehash)
if err != nil {
fmt.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
//3.查不到记录则返回秒传失败
if fileMeta == nil {
//秒传失败
respMsg := util.RespMsg{
Code: -1,
Msg: "秒传失败,请访问普通上传接口",
}
w.Write([]byte(respMsg.JSONBytes()))
return
}
//4.上传过则将文件信息写入用户文件表,返回成功
suc := db.OnUserFileUploadFinished(username, filehash, filename, int64(size))
if suc {
//返回信息
respMsg := util.RespMsg{
Code: 200,
Msg: "秒传成功",
}
w.Write([]byte(respMsg.JSONBytes()))
return
} else {
respMsg := util.RespMsg{
Code: 301,
Msg: "秒传失败,请稍后重试",
}
w.Write(respMsg.JSONBytes())
return
}
}
** **测试
- 添加路由规则
//文件秒传
http.HandleFunc("/file/fastupload", handler.HTTPInterceptor(handler.TryFastUploadHandler))
相同文件冲突处理
- 允许不同用户同时上传同一个文件
- 先完成上传的先入库
- 后上传的只更新用户文件表,并删除已上传的文件
分块上传与断点续传
说明**:**
- 小文件不建议分块上传
- 可以并行上传分块,并且可以无序传输
- 分块上传能极大提高传输效率
- (由于断点续传机制),可以减少传输失败后重试的流量及时间
分块上传流程:
服务架构
Redis缓存存储已经上传的分块文件,在文件上传完成后就会删除redis中的<span class="ne-text">分块缓存信息</span>
因为对****分块数据的操作比较频繁,就要求对存储效率要高****,以保证云端与客户端之间的响应比较及时。因此用内存数据库Redis是一个比较合适的选择
分块上传通用接口
连接Redis
go get github.com/go-redis/redis/v8
package redis
import (
"context"
"fmt"
"github.com/go-redis/redis/v8"
)
var client *redis.Client
func init() {
//创建Redis客户端
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "",
DB: 0,
})
// Ping测试连接
ping, err := client.Ping(context.Background()).Result()
if err != nil {
fmt.Println(ping, err)
}
}
// 提供外部访问的方法:返回redis连接对象
func RedisConn() *redis.Client {
return client
}
分块上传初始化
分块上传之前初始化的方法(mpupload.go)
// 上传文件的初始化信息
type MultipartUploadInfo struct {
FileHash string
FileSize int
UploadId string //唯一标志
ChunkSize int
ChunkCount int //分块的数量 这个文件要分成多少个分块上传
}
// 初始化分块上传
func InitialMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
//1.解析用户参数
r.ParseForm()
username := r.Form.Get("username")
filehash := r.Form.Get("filehash")
filesize, err := strconv.Atoi(r.Form.Get("filesize"))
if err != nil {
w.Write(util.NewRespMsg(305, "params invalid", nil).JSONBytes())
return
}
//2,获得redis的一个连接
client := redis.RedisConn()
defer client.Close()
//3.生成分块上传的初始化信息
uploadInfo := MultipartUploadInfo{
FileHash: filehash,
FileSize: filesize,
UploadId: username + fmt.Sprintf("%x", time.Now().UnixNano()), //假设是当前用户名和当前的时间戳
ChunkSize: 5 * 1024 * 1024, //5MB
ChunkCount: int(math.Ceil(float64(filesize) / (5 * 1024 * 1024))),
}
//4.将出初始化信息写入redis缓存
client.HSet(r.Context(), "MP_"+uploadInfo.UploadId, "chunkCount", uploadInfo.ChunkCount)
client.HSet(r.Context(), "MP_"+uploadInfo.UploadId, "filehash", uploadInfo.FileHash)
client.HSet(r.Context(), "MP_"+uploadInfo.UploadId, "filesize", uploadInfo.FileSize)
//5.将响应初始化数据返回给客户端
w.Write(util.NewRespMsg(200, "OK", uploadInfo).JSONBytes())
}
上传分块
// 上传文件分块
func UploadPartHandler(w http.ResponseWriter, r *http.Request) {
//1.解析用户传递的参数
r.ParseForm()
userName := r.Form.Get("username")
uploadId := r.Form.Get("uploadid")
chunkIndex := r.Form.Get("index") //分块文件索引
//2.获得redis连接
client := redis.RedisConn()
defer client.Close()
//3.获得文件句柄,用于存储分块内容
fileData, err := os.Create("/data/" + uploadId + "/" + chunkIndex)
if err != nil {
w.Write(util.NewRespMsg(-1, "Upload part failed", nil).JSONBytes())
return
}
defer fileData.Close()
buffer := make([]byte, 1024*1024)
for {
//读取文件内容
n, err := r.Body.Read(buffer)
fileData.Write(buffer[:n])
if err != nil {
break
}
}
//4.更新redis缓存状态 (比如每读取完一条分块文件后,更新redis中的分块数量 +1)
client.HSet(r.Context(), "MP_"+uploadId, "chkidx_"+chunkIndex, 1)
//5.返回结果
w.Write(util.NewRespMsg(200, "OK", nil).JSONBytes())
}
通知分块上传(分块上传完成)
// 通知上传合并
func CompleteUploadHandler(w http.ResponseWriter, r *http.Request) {
//1.解析请求参数
r.ParseForm()
uploadId := r.Form.Get("uploadid")
username := r.Form.Get("username")
filehash := r.Form.Get("filehash")
filesize, _ := strconv.Atoi(r.Form.Get("filesize"))
filename := r.Form.Get("index")
//2.获得redis连接
client := redis.RedisConn()
defer client.Close()
//3.通过uploadId查询redis 判断是否所有分块文件上传完成
data, err := client.HGetAll(r.Context(), "MP_"+uploadId).Result()
if err != nil {
w.Write(util.NewRespMsg(-1, "complete upload failed", nil).JSONBytes())
return
}
totalCount := 0
chunkCount := 0
for k, v := range data {
if k == "chunkcount" {
//获取分块数量
totalCount, _ = strconv.Atoi(data["chunkcount"])
} else if strings.HasPrefix(k, "chkidx_") && v == "1" {
chunkCount += 1
}
}
//上传出现问题
if totalCount != chunkCount {
w.Write(util.NewRespMsg(305, "invalid request", nil).JSONBytes())
return
}
//4.todo:合并分块
//5.更新唯一文件表和用户文件表
db.OnFileUploadFinished(filehash, filename, int64(filesize), "")
db.OnUserFileUploadFinished(username, filehash, filename, int64(filesize))
//6.响应处理结果
w.Write(util.NewRespMsg(200, "OK", nil).JSONBytes())
}
接入阿里云OSS
oss官方SDK文档
- 安装oss的相关包
go get github.com/aliyun/aliyun-oss-go-sdk/oss
- **编写oss相关配置 **
<span class="ne-text">config/oss.go</span>
package config
const (
// oss bucket名
BucketName = "cloud-disk-468"
Endpoint = "oss-cn-beijing.aliyuncs.com"
AccessKey = "**"
SecretKey = "99"
)
- 编写oss 连接代码
package oss
import (
"filestore-server/config"
"fmt"
"github.com/aliyun/aliyun-oss-go-sdk/oss"
)
// 全局变量用来存储OSS客户端实例
var client *oss.Client
func OssClient() *oss.Client {
if client != nil {
//还未创建过
return client
}
client, err := oss.New(config.Endpoint, config.AccessKey, config.SecretKey)
if err != nil {
fmt.Println(err)
return nil
}
return client
}
// Bucket:获取bucket存储空间
func OssBucket() *oss.Bucket {
client := OssClient()
if client != nil {
//存在存储对象
bucket, err := client.Bucket(config.BucketName)
if err != nil {
fmt.Println(err)
return nil
}
return bucket
}
return nil
}
上传文件到oss
修改<span class="ne-text">fileHandler.go中的上传文件代码</span>
// 处理文件上传
func UploadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" {
//返回上传页面
f, err := os.ReadFile("./static/view/index.html")
if err != nil {
io.WriteString(w, err.Error())
}
io.WriteString(w, string(f))
} else if r.Method == "POST" {
//接收文件流 存储到本地目录
f, fileHeader, err := r.FormFile("file")
if err != nil {
fmt.Printf("Failed to get file: %v\n", err)
io.WriteString(w, err.Error())
}
defer f.Close()
//定义文件元信息结构体
fileMeta := models.FileMeta{
Name: fileHeader.Filename,
Path: "./upload/" + fileHeader.Filename,
UploadAt: time.Now().Format("2006-01-02 15:04:05"),
}
//读取文件流
newFile, err := os.Create(fileMeta.Path)
if err != nil {
return
}
defer newFile.Close()
fileMeta.Size, err = io.Copy(newFile, f)
if err != nil {
fmt.Printf("Failed to save file: %v\n", err)
io.WriteString(w, err.Error())
}
//计算文件的sha1值
newFile.Seek(0, 0)
//sha1值添加到结构体的数据中去
fileMeta.FileSha1 = util.FileSha1(newFile)
//将文件存储到oss
newFile.Seek(0, 0)
now := time.Now()
year, month, day := now.Date()
formmattedDate := fmt.Sprintf("%d%02d%02d", year, month, day)
ossPath := "oss/" + formmattedDate + "/" + fileMeta.FileSha1
err = oss.OssBucket().PutObject(ossPath, newFile)
if err != nil {
fmt.Println("error:", err)
w.Write([]byte("Upload failed."))
return
}
//更新文件的存储位置 改为oss上的存储位置
fileMeta.Path = ossPath
//新增文件元信息
models.UpdateFileMetaToDB(fileMeta)
//更新用户文件表信息
r.ParseForm()
userName := r.Form.Get("username")
suc := db.OnUserFileUploadFinished(userName, fileMeta.FileSha1, fileMeta.Name, fileMeta.Size)
if suc {
//重定向,跳转页面
http.Redirect(w, r, "/file/upload/success", http.StatusFound)
} else {
w.Write([]byte("Upload Failed."))
}
}
}
下载文件
<span class="ne-text">oss_conn.go</span>
// DownloadUrl:临时授权下载url //返回url
func DownLoadUrl(objectName string) string {
signUrl, err := OssBucket().SignURL(objectName, oss.HTTPGet, 3600)
if err != nil {
fmt.Println(err.Error())
return ""
}
return signUrl
}
<span class="ne-text">fileHandler.go</span>
// 下载oss上的文件:生成文件的下载地址
func DownLoadUrlHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
filehash := r.Form.Get("filehash")
//从文件表查询记录
row, _ := db.GetFileMeta(filehash)
downLoadUrl := oss.DownLoadUrl(row.Path.String)
//将下载地址返回给客户端
w.Write([]byte(downLoadUrl))
}
客户端直传OSS
同步与异步
异步逻辑架构
RabbitMQ工作原理
RabbitMQ关键术语
Exchange工作模式
Fanout
** **
Direct
Topic
编程实战
编写rabbitMQ的连接配置
<span class="ne-text">rabbitMQ.go</span>
package config
const (
// AsyncTransferEnable:是否开启文件异步转移(默认同步)
AsyncTransferEnable = true
//RabbitUrl:服务入口url
RabbitURL = "amqp://guest:guest@47.122.53.44:5672/"
//TransExchangeName:用于文件transfer的交换机
TransExchangeName = "uploadserver.trans"
//TransOSSQueueName:oss转移队列名
TransOSSQueueName = "uploadserver.trans.oss"
//TransOSSErrQueueName:oss转移失败后写入另一个队列的队列名
TransOssErrQueueName = "uploadserver.trans.oss.err"
//TransOssRoutingKey:routingKey
TransOssRoutingKey = "oss"
)
转移队列中消息载体的结构格式
type TransferData struct {
FileHash string
CurrentPath string //当前临时存储位置
DestPath string //存储到oss的路径
DestStoreType string
}
异步转移的MQ生产者
package mq
import (
"filestore-server/config"
"github.com/streadway/amqp"
"log"
)
var conn *amqp.Connection
var channel *amqp.Channel
func initChannel() bool {
//1.判断channel是否已经创建过
if channel != nil {
return true
}
//2.获得rabbitmq的一个连接
conn, err := amqp.Dial(config.RabbitURL)
if err != nil {
log.Println(err.Error())
return false
}
channel, err = conn.Channel()
if err != nil {
log.Println(err.Error())
return false
}
//3.打开一个channel,用于消息的发布与接收
return true
}
// 发布消息
func Publish(exchange string, routingKey string, msg []byte) bool {
//1.判断channel是否正常
if !initChannel() {
return false
}
//2.调用消息发布动作
err := channel.Publish(exchange, routingKey, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: msg})
if err != nil {
log.Println(err.Error())
return false
}
return true
}
异步转移的MQ消费者
package mq
import "log"
var done chan bool
// 开启监听队列,获取消息
func StartConsume(queueName string, consumerName string, callback func(msg []byte) bool) {
//1.通过channel.Consumer获得消息信道
initChannel()
msgs, err := channel.Consume(queueName, consumerName, true, false, false, false, nil)
if err != nil {
log.Println(err.Error())
return
}
done = make(chan bool)
//2.循环从信道中获取消息
go func() {
for msg := range msgs {
//3.调用callback方法来处理新的消息
processSuc := callback(msg.Body)
if !processSuc {
//todo:写入另外一个等待队列,用于异常情况的重试
}
}
}()
//done没有新的消息过来,则会一直阻塞
<-done
//关闭rabbitMQ通道
channel.Close()
}
架构微服务化
微服务:一种分散治理的开发技术和理念
优缺点:
- 化繁为简,分散治理
- 服务间松耦合,服务内高内聚
- 服务可独立构建/部署/升级,局部更新
- 提高系统容错性,减少系统整体崩溃概率
- 易于实现异构系统
缺点:
- 增加了运维部署的工作量与难度
- 增加了系统间调用逻辑的处理难度
- 日志更难收集和统计
- 额外引入了一些非业务模块服务(服务与注册发现中心、日志收集等等)
引入微服务后的服务架构
Web框架Gin
Gin运行流程:
核心术语:
- Engine:实现了ServeHTTP接口的handler
- MethodTree:根据http请求方法分别维护的路由树(post、get的路由规则)
- RouterGroup:将路由表分组,方便中间件统一处理
- Context:Gin的上下文,在handler之间传递参数
基于Gin改造用户service
路由组是Gin框架中用于组织相关路由的机制,它允许开发者:
- 为一组路由定义公共路径前缀
- 为特定路由集合应用中间件
- 实现API版本控制
- 按功能模块组织代码结构
- 编写路由组
package route
import (
"filestore-server/handler"
"github.com/gin-gonic/gin"
)
func Router() *gin.Engine {
//gin framework,包括Logger,Recovery
router := gin.Default()
//处理静态资源
router.Static("/static", "./static")
//不需要经过验证的就能访问的接口
//-------user-----
router.GET("/user/signup", handler.SignUpHandler)
router.POST("/user/signup", handler.DoSignUpHandler)
router.POST("/user/signin", handler.DoSignInHandler)
router.GET("/user/signin", handler.SignInHandler)
//加入中间件,用于token校验的拦截器
router.Use(handler.HTTPInterceptor())
//Use之后的所有handler都会经过拦截器进行token校验
// 文件存取接口
router.GET("/file/upload", handler.UploadHandler)
router.POST("/file/upload", handler.DoUploadHandler)
router.GET("/file/upload/suc", handler.UploadSucHandler)
router.GET("/file/meta", handler.GetFileMetaHandler)
router.POST("/file/query", handler.FileQueryHandler)
router.GET("/file/download", handler.DownloadHandler)
router.POST("/file/download", handler.DownloadHandler)
router.POST("/file/update", handler.FileMetaUpdateHandler)
router.POST("/file/delete", handler.FileDeleteHandler)
router.POST("/file/downloadurl",
handler.DownloadURLHandler)
// 秒传接口
router.POST("/file/fastupload",
handler.TryFastUploadHandler)
// 分块上传接口
router.POST("/file/mpupload/init",
handler.InitialMultipartUploadHandler)
router.POST("/file/mpupload/uppart",
handler.UploadPartHandler)
router.POST("/file/mpupload/complete",
handler.CompleteUploadHandler)
// 用户相关接口
router.POST("/user/info", handler.UserInfoHandler)
return router
}
- 修改接口处理函数(参数要加上上下文 c *gin.Context)
<span class="ne-text">fileHandler.go</span>
package handler
import (
"encoding/json"
"filestore-server/common"
"filestore-server/config"
"filestore-server/models"
"filestore-server/mq"
util "filestore-server/utils"
"fmt"
"io"
"net/http"
"os"
"strconv"
"time"
"github.com/gin-gonic/gin"
dblayer "filestore-server/db"
"filestore-server/store/oss"
)
// UploadHandler : 处理用户注册请求
func UploadHandler(c *gin.Context) {
data, err := os.ReadFile("./static/view/index.html")
if err != nil {
c.String(404, `网页不存在`)
return
}
c.Header("Content-Type", "text/html; charset=utf-8")
c.String(200, string(data))
}
// DoUploadHandler : 处理文件上传
func DoUploadHandler(c *gin.Context) {
errCode := http.StatusOK
defer func() {
if errCode != http.StatusOK {
c.JSON(http.StatusOK, gin.H{
"code": errCode,
"msg": "Upload failed",
})
}
}()
// 接收文件流及存储到本地目录
file, fileHeader, err := c.Request.FormFile("file")
if err != nil {
fmt.Printf("Failed to get data, err:%s\n", err.Error())
errCode = 305
return
}
defer file.Close()
fileMeta := models.FileMeta{
Name: fileHeader.Filename,
Path: "./upload/" + fileHeader.Filename,
UploadAt: time.Now().Format("2006-01-02 15:04:05"),
}
newFile, err := os.Create(fileMeta.Path)
if err != nil {
fmt.Printf("Failed to create file, err:%s\n", err.Error())
errCode = 306
return
}
defer newFile.Close()
fileMeta.Size, err = io.Copy(newFile, file)
if err != nil {
fmt.Printf("Failed to save data into file, err:%s\n", err.Error())
errCode = 307
return
}
newFile.Seek(0, 0)
fileMeta.FileSha1 = util.FileSha1(newFile)
// 游标重新回到文件头部
newFile.Seek(0, 0)
// 文件写入OSS存储
//将文件存储到oss
newFile.Seek(0, 0)
now := time.Now()
year, month, day := now.Date()
formmattedDate := fmt.Sprintf("%d%02d%02d", year, month, day)
ossPath := "oss/" + formmattedDate + "/" + fileMeta.FileSha1
err = oss.OssBucket().PutObject(ossPath, newFile)
if err != nil {
fmt.Println(err.Error())
errCode = 308
return
}
fileMeta.Path = ossPath
transferData := mq.TransferData{
FileHash: fileMeta.FileSha1,
CurrentPath: fileMeta.Path,
DestPath: ossPath,
DestStoreType: common.StoreOSS,
}
pubData, _ := json.Marshal(transferData)
suc := mq.Publish(config.TransExchangeName, config.TransOssRoutingKey, pubData)
if !suc {
//todo:加入重拾发送消息逻辑
}
//新增文件元信息
models.UpdateFileMetaToDB(fileMeta)
// 更新用户文件表记录
username := c.Request.FormValue("username")
suc = dblayer.OnUserFileUploadFinished(username, fileMeta.FileSha1,
fileMeta.Name, fileMeta.Size)
if suc {
c.Redirect(http.StatusFound, "/static/view/home.html")
} else {
errCode = 309
}
}
// UploadSucHandler : 上传已完成
func UploadSucHandler(c *gin.Context) {
c.JSON(http.StatusOK,
gin.H{
"code": http.StatusOK,
"msg": "Upload Finish!",
})
}
// GetFileMetaHandler : 获取文件元信息
func GetFileMetaHandler(c *gin.Context) {
filehash := c.Request.FormValue("filehash")
//fMeta := meta.GetFileMeta(filehash)
fMeta, err := models.GetFileMetaDB(filehash)
if err != nil {
c.JSON(http.StatusInternalServerError,
gin.H{
"code": 305,
"msg": "Upload failed!",
})
return
}
if fMeta != nil {
data, err := json.Marshal(fMeta)
if err != nil {
c.JSON(http.StatusInternalServerError,
gin.H{
"code": 306,
"msg": "Upload failed!",
})
return
}
c.Data(http.StatusOK, "application/json", data)
} else {
c.JSON(http.StatusOK,
gin.H{
"code": 307,
"msg": "No sub file",
})
}
}
// FileQueryHandler : 查询批量的文件元信息
func FileQueryHandler(c *gin.Context) {
limitCnt, _ := strconv.Atoi(c.Request.FormValue("limit"))
username := c.Request.FormValue("username")
//fileMetas, _ := meta.GetLastFileMetasDB(limitCnt)
userFiles, err := dblayer.QueryUserFileMetas(username, limitCnt)
if err != nil {
c.JSON(http.StatusInternalServerError,
gin.H{
"code": 305,
"msg": "Query failed!",
})
return
}
data, err := json.Marshal(userFiles)
if err != nil {
c.JSON(http.StatusInternalServerError,
gin.H{
"code": 306,
"msg": "Query failed!",
})
return
}
c.Data(http.StatusOK, "application/json", data)
}
// DownloadHandler : 文件下载接口
func DownloadHandler(c *gin.Context) {
fsha1 := c.Request.FormValue("filehash")
fm, _ := models.GetFileMetaDB(fsha1)
c.FileAttachment(fm.Path, fm.Name)
}
// FileMetaUpdateHandler : 更新元信息接口(重命名)
func FileMetaUpdateHandler(c *gin.Context) {
opType := c.Request.FormValue("op")
fileSha1 := c.Request.FormValue("filehash")
newFileName := c.Request.FormValue("filename")
if opType != "0" {
c.Status(http.StatusForbidden)
return
}
curFileMeta := models.GetFileMeta(fileSha1)
curFileMeta.Name = newFileName
models.UpdateFileMeta(curFileMeta)
// TODO: 更新文件表中的元信息记录
data, err := json.Marshal(curFileMeta)
if err != nil {
c.Status(http.StatusInternalServerError)
return
}
c.JSON(http.StatusOK, data)
}
// FileDeleteHandler : 删除文件及元信息
func FileDeleteHandler(c *gin.Context) {
fileSha1 := c.Request.FormValue("filehash")
fMeta := models.GetFileMeta(fileSha1)
// 删除文件
os.Remove(fMeta.Path)
// 删除文件元信息
models.DeleteFileMeta(fileSha1)
// TODO: 删除表文件信息
c.Status(http.StatusOK)
}
// TryFastUploadHandler : 尝试秒传接口
func TryFastUploadHandler(c *gin.Context) {
// 1. 解析请求参数
username := c.Request.FormValue("username")
filehash := c.Request.FormValue("filehash")
filename := c.Request.FormValue("filename")
filesize, _ := strconv.Atoi(c.Request.FormValue("filesize"))
// 2. 从文件表中查询相同hash的文件记录
fileMeta, err := models.GetFileMetaDB(filehash)
if err != nil {
fmt.Println(err.Error())
c.Status(http.StatusInternalServerError)
return
}
// 3. 查不到记录则返回秒传失败
if fileMeta == nil {
resp := util.RespMsg{
Code: 305,
Msg: "秒传失败,请访问普通上传接口",
}
c.Data(http.StatusOK, "application/json", resp.JSONBytes())
return
}
// 4. 上传过则将文件信息写入用户文件表, 返回成功
suc := dblayer.OnUserFileUploadFinished(
username, filehash, filename, int64(filesize))
if suc {
resp := util.RespMsg{
Code: http.StatusOK,
Msg: "秒传成功",
}
c.Data(http.StatusOK, "application/json", resp.JSONBytes())
return
}
resp := util.RespMsg{
Code: -2,
Msg: "秒传失败,请稍后重试",
}
c.Data(http.StatusOK, "application/json", resp.JSONBytes())
return
}
// DownloadURLHandler : 生成文件的下载地址
func DownloadURLHandler(c *gin.Context) {
filehash := c.Request.FormValue("filehash")
// 从文件表查找记录
row, _ := dblayer.GetFileMeta(filehash)
// TODO: 判断文件存在OSS,还是Ceph,还是在本地
downLoadUrl := oss.DownLoadUrl(row.Path.String)
//将下载地址返回给客户端
c.Data(http.StatusOK, "octet-stream", []byte(downLoadUrl))
}
<span class="ne-text">mpupload.go</span>
package handler
import (
"fmt"
"math"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
rPool "filestore-server/cache/redis"
dblayer "filestore-server/db"
)
// MultipartUploadInfo : 初始化信息
type MultipartUploadInfo struct {
FileHash string
FileSize int
UploadID string
ChunkSize int
ChunkCount int
}
// InitialMultipartUploadHandler : 初始化分块上传
func InitialMultipartUploadHandler(c *gin.Context) {
// 1. 解析用户请求参数
username := c.Request.FormValue("username")
filehash := c.Request.FormValue("filehash")
filesize, err := strconv.Atoi(c.Request.FormValue("filesize"))
if err != nil {
c.JSON(
http.StatusOK,
gin.H{
"code": -1,
"msg": "params invalid",
})
return
}
// 2. 获得redis的一个连接
client := rPool.RedisConn()
defer client.Close()
// 3. 生成分块上传的初始化信息
uploadInfo := MultipartUploadInfo{
FileHash: filehash,
FileSize: filesize,
UploadID: username + fmt.Sprintf("%x", time.Now().UnixNano()),
ChunkSize: 5 * 1024 * 1024, // 5MB
ChunkCount: int(math.Ceil(float64(filesize) / (5 * 1024 * 1024))),
}
// 4. 将初始化信息写入到redis缓存
client.HSet(c, "MP_"+uploadInfo.UploadID, "chunkCount", uploadInfo.ChunkCount)
client.HSet(c, "MP_"+uploadInfo.UploadID, "filehash", uploadInfo.FileHash)
client.HSet(c, "MP_"+uploadInfo.UploadID, "filesize", uploadInfo.FileSize)
// 5. 将响应初始化数据返回到客户端
c.JSON(
http.StatusOK,
gin.H{
"code": 0,
"msg": "OK",
"data": uploadInfo,
})
}
// UploadPartHandler : 上传文件分块
func UploadPartHandler(c *gin.Context) {
// 1. 解析用户请求参数
// username := c.Request.FormValue("username")
uploadID := c.Request.FormValue("uploadid")
chunkIndex := c.Request.FormValue("index")
// 2. 获得redis连接池中的一个连接
rConn := rPool.RedisConn()
defer rConn.Close()
// 3. 获得文件句柄,用于存储分块内容
fpath := "/data/" + uploadID + "/" + chunkIndex
os.MkdirAll(path.Dir(fpath), 0744)
fd, err := os.Create(fpath)
if err != nil {
c.JSON(
http.StatusOK,
gin.H{
"code": 0,
"msg": "Upload part failed",
"data": nil,
})
return
}
defer fd.Close()
buf := make([]byte, 1024*1024)
for {
n, err := c.Request.Body.Read(buf)
fd.Write(buf[:n])
if err != nil {
break
}
}
// 4. 更新redis缓存状态
rConn.HSet(c, "MP_"+uploadID, "chkidx_"+chunkIndex, 1)
// 5. 返回处理结果到客户端
c.JSON(
http.StatusOK,
gin.H{
"code": 0,
"msg": "OK",
"data": nil,
})
}
// CompleteUploadHandler : 通知上传合并
func CompleteUploadHandler(c *gin.Context) {
// 1. 解析请求参数
upid := c.Request.FormValue("uploadid")
username := c.Request.FormValue("username")
filehash := c.Request.FormValue("filehash")
filesize := c.Request.FormValue("filesize")
filename := c.Request.FormValue("filename")
// 2. 获得redis连接池中的一个连接
rConn := rPool.RedisConn()
defer rConn.Close()
//3.通过uploadId查询redis 判断是否所有分块文件上传完成
data, err := rConn.HGetAll(c, "MP_"+upid).Result()
if err != nil {
c.JSON(
http.StatusOK,
gin.H{
"code": -1,
"msg": "OK",
"data": nil,
})
return
}
totalCount := 0
chunkCount := 0
for k, v := range data {
if k == "chunkcount" {
//获取分块数量
totalCount, _ = strconv.Atoi(data["chunkcount"])
} else if strings.HasPrefix(k, "chkidx_") && v == "1" {
chunkCount += 1
}
}
if totalCount != chunkCount {
c.JSON(
http.StatusOK,
gin.H{
"code": -2,
"msg": "OK",
"data": nil,
})
return
}
// 4. TODO:合并分块
// 5. 更新唯一文件表及用户文件表
fsize, _ := strconv.Atoi(filesize)
dblayer.OnFileUploadFinished(filehash, filename, int64(fsize), "")
dblayer.OnUserFileUploadFinished(username, filehash, filename, int64(fsize))
// 6. 响应处理结果
c.JSON(
http.StatusOK,
gin.H{
"code": 0,
"msg": "OK",
"data": nil,
})
}
<span class="ne-text">UserHandler.go</span>
package handler
import (
"filestore-server/db"
util "filestore-server/utils"
"fmt"
"github.com/gin-gonic/gin"
"net/http"
"time"
)
// 盐值
const salt = "*#468"
// 响应注册页面
func SignUpHandler(c *gin.Context) {
c.Redirect(http.StatusFound, "/static/view/signup.html")
}
// 基于gin框架处理用户注册post请求
func DoSignUpHandler(c *gin.Context) {
userName := c.Request.FormValue("username")
password := c.Request.FormValue("password")
//有效性校验
if len(userName) < 3 || len(password) < 5 {
c.JSON(http.StatusOK, gin.H{
"code": 305,
"msg": "invalid parameter",
})
return
}
//密码加密处理
enc_password := util.Sha1([]byte(password + salt))
//写入数据库中
suc := db.UserSignUp(userName, enc_password)
if suc {
c.JSON(http.StatusOK, gin.H{
"code": 200,
"msg": "注册成功!",
})
} else {
c.JSON(http.StatusOK, gin.H{
"code": 305,
"msg": "注册失败!",
})
}
}
// 用户登录处理
func SignInHandler(c *gin.Context) {
c.Redirect(http.StatusFound, "/static/view/signin.html")
}
// 基于gin框架处理用户登录post请求
func DoSignInHandler(c *gin.Context) {
//1.检验用户名及密码
username := c.Request.FormValue("username")
password := c.Request.FormValue("password")
encPassword := util.Sha1([]byte(password + salt))
pwdChecked := db.UserSignIn(username, encPassword)
if !pwdChecked {
c.JSON(http.StatusOK, gin.H{
"code": 305,
"msg": "登录失败,请检查登录密码!",
})
return
}
//2.生成访问凭证(token)
token := GenToken(username)
result := db.UpdateToken(username, token)
if !result {
c.JSON(http.StatusOK, gin.H{
"code": 305,
"msg": "登录失败!",
})
return
}
//3.登录成功重定向到首页
respMsg := util.RespMsg{
Code: 200,
Msg: "SUCCESS",
Data: struct {
Location string
Username string
Token string
}{
Location: "/static/view/home.html",
Username: username,
Token: token,
},
}
c.Data(http.StatusOK, "octet-stream", respMsg.JSONBytes())
}
// 获取用户信息的处理函数
func UserInfoHandler(c *gin.Context) {
//1.解析请求参数
username := c.Request.FormValue("username")
//token := r.Form.Get("token")
/*//2.验证token是否有效
isTokenValid := IsTokenValid(token)
if !isTokenValid {
w.WriteHeader(http.StatusForbidden)
return
}*/
//3.查询用户信息
userInfo, err := db.GetUserInfo(username)
if err != nil {
c.JSON(http.StatusForbidden,
gin.H{})
return
}
//4.组装并响应用户数据
respMsg := util.RespMsg{
Code: 200,
Msg: "SUCCESS",
Data: userInfo,
}
c.Data(http.StatusOK, "octet-stream", respMsg.JSONBytes())
}
// 生成token
func GenToken(username string) string {
//40位字符:md5(username+timestamp+token_salt)+timestamp[:8]
ts := fmt.Sprintf("%x", time.Now().Unix())
tokenPrefix := util.MD5([]byte(username + ts + "_tokensalt"))
//40位token
return tokenPrefix + ts[:8]
}
// 验证token是否有效:
func IsTokenValid(token string) bool {
if len(token) != 40 {
return false
}
// TODO: 判断token的时效性,是否过期
// TODO: 从数据库表tbl_user_token查询username对应的token信息
// TODO: 对比两个token是否一致
return true
}
- 编写入口main函数
func main() {
router := route.Router()
router.Run(":8080")
}
启动
RPC原理(远程调用)
gRPC框架
Protobuf
一种跨语言和跨平台的数据序列化协议(二进制化协议)
go-micro框架
专注于微服务的一种RPC框架
提供分布式系统相关的接口集合
broker:用于异步通信
codec:消息编码
Registry:服务注册与发现
selector:负载均衡,当客户端向服务器发起请求之后,首先会去服务注册中心去查询服务的注册表