本专栏文章持续更新,新增内容使用蓝色表示。

1. 项目概述

项目名称:IM即时通讯系统

功能:用户上线提示、用户消息广播、查询在线用户、修改用户名、超时强踢、私聊。

目的:对 go 语言基础部分的内容加以巩固。

涉及内容

  • Go语言基础语法与特性

  • TCP/IP网络编程原理

  • Goroutine轻量级并发模型

  • Channel通信与同步机制

  • sync包同步原语实战

2. 项目架构

IM_System/
├── server.go      # 服务端核心逻辑
├── user.go        # 用户管理模块
├── client.go      # 客户端实现
├── main.go        # 服务端启动入口
└── go.mod         # 项目依赖管理

简易示意图如下:

架构说明:系统采用经典的 服务端-客户端 架构,服务端负责连接管理、消息路由和用户状态维护,客户端提供用户交互界面。通过TCP长连接实现实时通信,利用 Go 语言的并发特性高效处理多用户同时在线场景。

并发安全设计

  • 读写锁保护 - 使用 sync.RWMutex 保护共享的 OnlineMap,允许多个读操作或单个写操作

  • goroutine隔离 - 每个用户连接在独立的 goroutine 中处理,避免阻塞主线程

  • Channel通信 - 通过 channel 进行 goroutine 间通信,避免共享内存的竞态条件

  • 超时回收机制 - 自动检测不活跃连接并及时释放资源

3. 代码部分

3.1 server.go - 服务端核心

package main

import (
	"fmt"
	"io"
	"net"
	"runtime"
	"sync"
	"time"
)

const BufSize = 4096

type Server struct {
	Ip   string
	Port int
	// 在线用户列表
	OnlineMap map[string]*User
	mapLock   sync.RWMutex
	// 广播管道
	Message chan string
}

// 创建一个server接口
func NewServer(ip string, port int) *Server {
	server := &Server{
		Ip:        ip,
		Port:      port,
		OnlineMap: make(map[string]*User),
		Message:   make(chan string),
	}
	return server
}

// 监听Message管道
func (this *Server) listenMessage() {
	for {
		select {
		case msg := <-this.Message:
			// 将msg给所有OnlineMap中的user的管道
			this.mapLock.Lock()
			for _, value := range this.OnlineMap {
				value.UserChan <- msg
			}
			this.mapLock.Unlock()
		}
	}
}

// 广播方法
func (this *Server) BroadCast(user *User, msg string) {
	sendMsg := "[" + user.Addr + "]" + user.Name + ":" + msg
	// 交给Message管道
	this.Message <- sendMsg
}

// handler 业务处理
func (this *Server) Handler(conn net.Conn) {
	// 新用户 +1
	user := NewUser(conn, this)
	user.Online()

	// 活跃管道
	IsAlive := make(chan bool)

	// 用户消息广播
	go func() {
		buf := make([]byte, BufSize)
		for {
			n, err := conn.Read(buf)
			if n == 0 {
				user.Offline()
				return
			}
			if err != nil && err != io.EOF {
				fmt.Println("conn.Read err:", err)
				return
			}
			msg := string(buf[0 : n-1]) // 去掉换行符
			user.HandlerMsg(msg)

			// 发消息表示活跃
			IsAlive <- true
		}
	}()

	// 超时强踢
	for {
		select {
		case <-IsAlive:
			// 什么也不做重置定时器
		case <-time.After(time.Second * 10):
			// 踢出用户
			user.SendMsg("由于长时间不活跃,系统已将您强制下线")
			// 等待一小段时间确保消息发送
			time.Sleep(time.Millisecond * 50)
			// 关闭通道
			close(user.UserChan)
			// 退出当前handler
			runtime.Goexit()
		}
	}
}

// 启动服务端的接口
func (this *Server) StartServer() {
	// socket listen
	// 返回的Listener是通信的套接字。
	Listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.Ip, this.Port))
	if err != nil {
		fmt.Println("net.Listen error:", err)
		return
	}
	// close listen socket
	defer Listener.Close()

	// 启动监听Message的goroutine
	go this.listenMessage()

	for {
		// accept conn
		conn, err := Listener.Accept()
		if err != nil {
			fmt.Println("Listener.Accept error:", err)
			continue
		}
		// handler
		go this.Handler(conn)
	}
}

服务端采用事件驱动架构,每个客户端连接都会创建一个独立的 goroutine 进行处理。通过 channel 实现消息的广播机制,利用 select 语句处理超时强踢功能,确保系统资源的合理利用。

3.2 user.go - 用户管理

package main

import (
	"log"
	"net"
	"strings"
	"time"
)

type User struct {
	Name     string
	Addr     string
	UserChan chan string
	conn     net.Conn // 通信套接字
	server   *Server  // 属于哪个server
}

// 用户上线
func (this *User) Online() {
	// 加入在线列表
	this.server.mapLock.Lock()
	this.server.OnlineMap[this.Name] = this
	this.server.mapLock.Unlock()

	// 上线提示
	this.server.BroadCast(this, "已上线")
}

// 用户下线
func (this *User) Offline() {
	// 离开在线列表
	this.server.mapLock.Lock()
	delete(this.server.OnlineMap, this.Name)
	this.server.mapLock.Unlock()

	// 下线提示
	this.server.BroadCast(this, "离线")
}

// 用户消息处理
func (this *User) HandlerMsg(msg string) {
	if msg == "who" {
		// 返回所有在线用户
		this.server.mapLock.RLock()
		for _, value := range this.server.OnlineMap {
			msg := value.Name + " 在线"
			this.SendMsg(msg)
		}
		this.SendMsg("")
		this.server.mapLock.RUnlock()
	} else if len(msg) > 7 && msg[0:6] == "rename" {
		// 修改用户名
		newName := msg[7:len(msg)]
		// 检查是否有重名
		this.server.mapLock.Lock()
		_, ok := this.server.OnlineMap[newName]
		if ok {
			this.SendMsg("系统提示:该名称已被占用啦!")
			this.SendMsg("")
		} else {
			delete(this.server.OnlineMap, this.Name)
			this.Name = newName
			this.server.OnlineMap[newName] = this
			this.SendMsg("系统提示:您的用户名已更改为" + this.Name)
			this.SendMsg("")
		}
		this.server.mapLock.Unlock()
	} else if len(msg) > 4 && strings.HasPrefix(msg, "to|") && strings.Count(msg, "|") >= 2 {
		parts := strings.SplitN(msg, "|", 3)
		// 获取用户名
		friendName := parts[1]
		// 获取消息
		friendMsg := parts[2]
		// 根据用户名找到user对象
		this.server.mapLock.RLock()
		value, exit := this.server.OnlineMap[friendName]
		this.server.mapLock.RUnlock()
		// 使用对方的SendMsg将消息发送过去
		if exit {
			value.SendMsg(this.Name + "对您说:" + friendMsg + "\n")
			this.SendMsg("您对" + friendName + "说:" + friendMsg + "\n") // 给自己也发一个确认
		} else {
			this.SendMsg("当前用户" + friendName + "不存在或不在线\n")
		}
	} else {
		this.server.BroadCast(this, msg)
	}
}

// 给当前用户发消息
func (this *User) SendMsg(msg string) {
	select {
	case this.UserChan <- msg:
		// 发送成功
	case <-time.After(time.Second * 1):
		// 发送超时,避免阻塞
		log.Println("发送消息超时:", msg)
	}
}

// 创建 user 的 API
func NewUser(conn net.Conn, server *Server) *User {
	userAddr := conn.RemoteAddr().String()
	user := &User{
		Name:     userAddr,
		Addr:     userAddr,
		UserChan: make(chan string),
		conn:     conn,
		server:   server,
	}

	go user.ListenUser()

	return user
}

// 监听当前user
func (this *User) ListenUser() {
	// 当UserChan通道关闭后,不再进行监听并写入信息
	for msg := range this.UserChan {
		_, err := this.conn.Write([]byte(msg + "\n"))
		if err != nil {
			panic(err)
		}
	}
	// conn在此处关闭
	err := this.conn.Close()
	if err != nil {
		panic(err)
	}
}

User 模块封装了用户的核心行为,包括上线下线状态管理、消息处理逻辑。采用多路复用机制处理不同类型的消息命令,通过读写锁保证并发安全,实现了高效的用户会话管理。

3.3 client.go - 客户端实现

package main

import (
	"flag"
	"fmt"
	"io"
	"net"
	"os"
)

type Client struct {
	serverIP   string
	serverPort int
	name       string
	conn       net.Conn
	mode       int // 客户端所处模式
}

// 新建客户端
func NewClient(ip string, port int) *Client {
	client := &Client{
		serverIP:   ip,
		serverPort: port,
		mode:       -1,
	}
	// 连接server
	conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", ip, port))
	if err != nil {
		fmt.Println("net.Dial error:", err)
		return nil
	}
	client.conn = conn
	return client
}

// 显示菜单的方法
func (client *Client) PrintMenu() bool {
	var mode int
	fmt.Println("1.公聊模式")
	fmt.Println("2.私聊模式")
	fmt.Println("3.更改用户名")
	fmt.Println("0.退出")
	fmt.Scanln(&mode)
	if mode >= 0 && mode <= 3 {
		client.mode = mode
		return true
	} else {
		fmt.Println(">>>>>>请输入合法的数字<<<<<<")
		return false
	}
}

// 公聊模式
func (client *Client) GroupChat() {
	fmt.Println(">>>>>>>输入exit退出<<<<<<<")
	var chatMsg string
	fmt.Println("请输入要发送的消息:")
	fmt.Scanln(&chatMsg)
	for chatMsg != "exit" {
		if len(chatMsg) != 0 {
			sendMsg := chatMsg + "\n"
			_, err := client.conn.Write([]byte(sendMsg))
			if err != nil {
				fmt.Println("client.conn.Write err:", err)
				return
			}
			sendMsg = ""
		}
		fmt.Scanln(&chatMsg)
		fmt.Println("请输入要发送的消息:")
	}
}

// 查询在线用户
func (client *Client) CheckOnlineUser() {
	sendMsg := "who\n"
	_, err := client.conn.Write([]byte(sendMsg))
	if err != nil {
		fmt.Println("client.conn.Write err:", err)
		return
	}
}

// 私聊模式
// 查询在线用户
// 选择用户私聊
func (client *Client) PrivateChat() {
	fmt.Println(">>>>>>>输入exit退出<<<<<<<")
	var friend string
	client.CheckOnlineUser()
	fmt.Println("请选择您的私聊对象:")
	fmt.Scanln(&friend)
	var chatMsg string
	for friend != "exit" {
		fmt.Println("请输入要发送的消息:")
		fmt.Scanln(&chatMsg)
		for chatMsg != "exit" {
			if len(chatMsg) != 0 {
				sendMsg := "to|" + friend + "|" + chatMsg + "\n"
				_, err := client.conn.Write([]byte(sendMsg))
				if err != nil {
					fmt.Println("client.conn.Write err:", err)
					return
				}
				sendMsg = ""
			}
			fmt.Println("\n请输入要发送的消息:")
			fmt.Scanln(&chatMsg)
		}
	}
}

// 更改用户名
func (client *Client) Rename() bool {
	fmt.Println(">>>>>请输入用户名:")
	fmt.Scanln(&client.name)
	sendMsg := "rename|" + client.name + "\n"
	_, err := client.conn.Write([]byte(sendMsg))
	if err != nil {
		fmt.Println("client.conn.Write err:", err)
		return false
	}
	return true
}

var ServerIP string
var ServerPort int

// init函数是在main之前执行的
func init() {
	// 初始化命令行参数
	flag.StringVar(&ServerIP, "ip", "127.0.0.1", "设置ip(默认是127.0.0.1)")
	flag.IntVar(&ServerPort, "port", 8686, "设置port(默认是8686)")
}

// 客户端主业务
func (client *Client) Run() {
	for client.mode != 0 {
		for client.PrintMenu() != true {
		}
		switch client.mode {
		case 1:
			fmt.Println(">>>>>>您已进入公聊频道<<<<<<")
			client.GroupChat()
			break
		case 2:
			fmt.Println(">>>>>>您已进入私聊模式<<<<<<")
			client.PrivateChat()
			break
		case 3:
			fmt.Println(">>>>>>更改用户名模式<<<<<<")
			client.Rename()
			break
		}
	}

}

// 接收服务端回应
func (client *Client) AcceptResponce() {
	io.Copy(os.Stdout, client.conn)
}

func main() {
	// 命令行解析
	flag.Parse()
	client := NewClient(ServerIP, ServerPort)
	if client == nil {
		fmt.Println(">>>>>>>>>客户端连接建立失败<<<<<<<<<<")
		return
	}
	fmt.Println(">>>>>>>>>客户端连接建立成功<<<<<<<<<<")

	go client.AcceptResponce()
	// 启动客户端业务
	client.Run()
}

客户端采用交互式命令行界面,为用户提供友好的操作体验。通过独立的 goroutine 异步接收服务端消息,确保界面响应及时。

3.4 main.go - 程序入口

package main

func main() {
	server := NewServer("127.0.0.1", 8686)
	server.StartServer()
}

4. 部署与使用

4.1 环境准备

# 创建项目目录
mkdir ~/tree/IM_System && cd ~/tree/IM_System

# 初始化项目
go mod init

touch server.go main.go user.go client.go

go mod tidy

4.2 启动服务端

go build -o server main.go server.go user.go

./server

服务端启动后进入监听状态,准备接受客户端连接。默认监听8686端口,可通过修改代码调整监听地址和端口。

4.3 启动客户端

go build -o client client.go

./client

客户端启动后自动连接服务端,显示功能菜单等待用户输入。多开几个终端可以模拟多用户同时在线的场景。

4.4 效果展示

5. 后续可拓展方向

  1. 消息持久化 - 集成数据库存储聊天记录

  2. 用户认证 - 添加注册登录功能

  3. 文件传输 - 支持图片、文件分享

  4. 群组管理 - 创建和管理聊天群组

  5. Web界面 - 开发WebSocket前端界面


如有问题或建议,欢迎在评论区中留言~

Logo

网易智企-云信开发者社区是面向全网开发者的技术交流与服务平台,依托近 29 年 IM、音视频技术积累,提供 IM、RTC、实时对话智能体、云原生、短信等全场景开发资源。

更多推荐