code with topic

async

并发编程 - Rust 中的异步编程 — Concurrent programming - Asynchronous Programming in Rust

异步编程是一种并发,其高级目标与线程并发(同时做很多事情)相同,但实现方式不同。异步并发和线程并发之间的两大区别是,异步并发完全在程序中进行管理,无需 OS 的帮助,而多任务处理是协作的,而不是抢占式。

并发concurrency:多个任务在一个时间段执行(排序),针对单个cpu,由软件支持(时间片、调度)

并行parallelism:多个任务同时执行,需要多核同时执行多个任务。

并发是组织代码的一种方式,而并行是一种资源。

In threaded systems, each concurrent thread is executed in parallel using as much parallelism as possible. In async systems, there is no strong default: a system may run all tasks in a single thread, it may assign multiple tasks to a single thread and lock that thread to a core (so groups of tasks execute in parallel, but within a group each task executes concurrently, but never in parallel with other tasks within the group), or tasks may be run in parallel with or without limits.

在线程系统中,每个并发线程都使用尽可能多的并行性并行执行。在异步系统中,没有强默认值:系统可以在单个线程中运行所有任务,可以将多个任务分配给单个线程并将该线程锁定到内核(因此任务组并行执行,但在组内每个任务同时执行,但绝不与组内的其他任务并行执行), 或者任务可以并行运行,有限制或没有限制。

async await

术语:

  1. reactor、event loop、driver:管理IO和计时器事件,并与IO交互
  2. scheduler
  3. executor、runtime:结合上面两者,用于运行异步任务,提供api
  4. future:rust中延迟计算的任务,异步并发的基本单位。关于 Rust 中 futures 的一个重要直觉是它们是惰性对象。要完成任何工作,它们必须由外力(通常是异步运行时)向前驱动。

async:Returns a [Future] instead of blocking the current thread.

await:Suspend execution until the result of a [Future] is ready.

future可以组合成更大的future,async可定义future,await可组合

use tokio::{spawn, time::{sleep, Duration}};

#[tokio::main]
async fn main() {
spawn(say_hello());
spawn(say_world());
// Wait for a while to give the tasks time to run.
sleep(Duration::from_millis(1000)).await;
}

async fn say_hello() {
// Wait for a while before printing to make it a more interesting race.
sleep(Duration::from_millis(100)).await;
println!("hello");
}

async fn say_world() {
sleep(Duration::from_millis(100)).await;
println!("world!");
}

例子里hello和world的执行顺序是随机的。spawn将一个future转化为tokio任务执行,做到任务之间的并发

let handle1 = spawn(say_hello());
let handle2 = spawn(say_world());

let _ = handle1.await;
let _ = handle2.await;

println!("!");

代码与上次类似,但我们不是只调用 spawn,而是保存返回的 JoinHandle稍后等待它们。由于我们在退出 main 函数之前等待这些任务完成,因此我们不再需要 main 中的休眠

await JoinHandle 会返回一个 Result(这就是为什么我们在上面的例子中使用 let _ = ...,它避免了关于未使用的 Result 的警告)。如果生成的任务成功完成,则任务的结果将采用 Ok 变体。如果任务 panic 或被中止,则结果将是一个包含 JoinError 文档Err。如果你没有在项目中通过 abort 使用 cancel,那么解包 ``JoinHandle.await 的结果是一种合理的方法,因为这实际上是将恐慌从被生成的任务传播到生成者。

cancellation

如果要取消一个future:

  1. 放弃它
  2. abort中止它
  3. 通过cancellation
  4. 通过函数或宏隐式取消

后果:从编写异步代码(在异步函数、块、futures 等中)的角度来看,代码可能会在任何 await 处停止执行(包括宏中的隐藏代码),并且永远不会再次启动。为了使您的代码正确(特别是要确保取消), 无论它是否正常完成,或者是否在任何等待点终止,它都必须正常工作。

block of async
//声明
async {

};
let s1 = {
let a = 42;
format!("The answer is {a}")
};

let s2 = async {
let q = question().await;
format!("The question is {q}")
};
s2.await;
demo

Demo 1 - 动手学 rust-libp2p

// main.c

use futures::executor::block_on;
//类似.await
// use async_std::task::{sleep};
// use std::time::Duration;

struct Buff {
data: Vec<u8>,
}

async fn rx_socket() -> Buff {
// sleep(Duration::from_secs(2)).await;
Buff {
data: vec![1, 2, 3, 4],
}
}

async fn rx_log(buff: Buff) {
println!("socket 接收:{:?}", buff.data);
}

async fn socket_log() {
let buff = rx_socket().await;
rx_log(buff).await;
}

async fn tx_serial() {
let buf = Buff {
data: vec![11, 22, 33, 44],
};
println!("serial 发送:{:?}", buf.data);
}

async fn async_main() {
let f1 = socket_log();
let f2 = tx_serial();

futures::join!(f1, f2);
}

fn main() {
block_on(async_main());
}


future:一个将来会执行并获得结果的变量,调用poll可以获取结果(如果没到可获取时间,则用轮询获取)

waker:一个轮询中使用的信号量,当轮询到future时,waker作为参数之一传给poll,从而获得future的值

executor:用于管理一批future。单个future用.await驱动,执行器会对所有的future首先poll一次,之后等待wake再去第二次poll

join!:并发的executor若其中一个阻塞,则执行其它的,当全部阻塞时,当前函数会阻塞,让将线程的所有权让给执行器所在的函数(线程)。

select!:任意furure完成就退出。

future对数据的访问:

如果future访问的数据会变化或销毁,那么会有内存错误。为此需要Pin住数据(需要数据实现UnPin,通常会默认实现)

p2p network

concept

libp2p, (short for “library peer-to-peer”) is a peer-to-peer (P2P) networking framework that enables the development of P2P applications. It consists of a collection of protocols, specifications, and libraries that facilitate P2P communication between network participants, known as “peers.”

libp2p 旨在解决传统 P2P 网络方法和这些现有网络模型的局限性,目标是实现分布式 Web。

why libp2p:

  1. 模块化
  2. 广泛的传输配置
  3. 多功能
  4. 安全
  5. 健壮
  6. 弹性
  7. 效率
  8. 穿透NAT
  9. 消息分发和传播
  10. 去中心化

项目:

  1. IPFS+FileCoin
  2. Ethereum
  3. polkadot
  4. lighthouse

很多:谁在使用 libp2p - libp2p — Who uses libp2p - libp2p

addressing

使用multiaddress作为多种寻址方案的统一约定。

例如:/ip4/192.0.2.0/udp/1234 对两种协议及其基本寻址信息进行编码。/ip4/192.0.2.0 告诉我们需要 IPv4 协议的 192.0.2.0 环回地址,/udp/1234 告诉我们要向端口 1234 发送 UDP 数据包。

multiaddr /p2p/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N 使用 libp2p 的注册协议 id/p2p/ 和我的 IPFS 节点公钥的 multihash 来唯一标识我的本地 IPFS 节点。

基于以上的信息,组合multiaddr:

/ip4/198.51.100.0/tcp/4242/p2p/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N

假设我有上述对等 ID QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N ,我的公网 IP 是 198.51.100.0。我启动我的 libp2p 应用程序并侦听 TCP 端口 4242 上的连接

这可以扩展为考虑多层寻址和抽象。例如, 用于电路中继的地址将传输地址与多个对等体身份组合在一起,以形成描述“中继电路”的地址:

/ip4/198.51.100.0/tcp/4242/p2p/QmRelay/p2p-circuit/p2p/QmRelayedPeer

protocol

/my-app/amazing-protocol/1.0.1libp2p 协议具有唯一的字符串标识符,这些标识符在首次打开连接时用于协议协商过程。

为了接受链接,libp2p应用程序通过接口(switch,swarm,golang的Host接口)使用ID。使用已注册的协议 ID 标记传入流时,将调用处理程序函数。

binary streams

libp2p 协议所依据的 “媒介” 是具有以下属性的双向二进制流:

  • Bidirectional, reliable delivery of binary data

双向、可靠地交付二进制数据

  • Each side can read and write from the stream at any time
    每一端都可以随时从流中读取和写入

  • Data is read in the same order as it was written
    数据读取顺序与写入数据的顺序相同

  • Can be “half-closed”, that is, closed for writing and open for reading, or closed for reading and open for writing
    可以是“半闭”,即闭写开读,或闭读开写

  • Supports backpressure

    支持背压

  • Readers can’t be flooded by eager writers
    读者不能被热心的作家淹没

同时保证流的安全和多路复用。

protocol negotiation

对流和链接使用的协议达成一致的过程。拨出新的流时,libp2p将发送协议ID,另一端处理ID,ID的协议不被支持时将结束流,支持则侦听对等体将回显协议 ID,作为未来通过流发送的数据将使用商定的协议语义的信号。

match protocol ID and version

两种。一种传入协议 ID 和处理程序函数。如果传入流请求发送与协议 ID 完全匹配,则将使用新流作为参数调用处理程序函数,另一种采用三个参数:协议 ID、协议匹配函数和处理程序函数。

Core protocol

使用protobuf定义消息架构,在二进制消息有效负载前加一个整数,该整数表示有效负载的长度(以字节为单位)。长度编码为 protobuf varint (可变长度整数)。

  1. ping:/ipfs/ping/1.0.0
  2. identify:/ipfs/id/1.0.0
  3. identify/push:/ipfs/id/push/1.0.0
  4. kad-dht:基于kademlia路由算法的分布式哈希表
  5. circuit relay:/libp2p/circuit/relay/0.1.0
QUIC

QUIC 是一种新的传输协议,它提供始终加密的流多路复用 建立在 UDP 之上的连接.

HTTP/2 & HTTP/3 OSI model

它作为加密层参与http3协议,用udp作为传输层协议实现了单次RTT和多个流,从而不需要multiplexing

对比TCP:

  1. HoL阻塞:tcp在阻塞后,之后交付的数据包需要在内核缓冲区中等待,直到收到丢失数据包的重新传输。
    QUIC则只会阻塞一个流
  2. 僵化Ossification:由于 TCP 数据包的 header 没有加密,中间设备可以检查和修改 TCP header 字段,当遇到不理解的内容时可能会意外中断。这使得几乎不可能对 TCP 协议部署任何更改线路格式的更改。
    QUIC尽可能加密,从而避免僵化
  3. 握手:TCP需要两个RTT,一次验证客户端地址,另一次启动加密握手。
    QUIC减少新连接建立到单个 RTT 的延迟,并允许为恢复的连接发送 0 RTT 应用程序数据。
in p2p

libp2p 仅支持双向流,默认使用 TLS 1.3。由于 QUIC 已经提供了加密的流多路复用连接,因此 libp2p 直接使用 QUIC 流,无需任何其他帧。遵循多地址格式,标准 QUIC 连接将如下所示: /ip4/192.0.2.0/udp/65432/quic-v1/

Stream

流是计算机网络中一种有序、可靠的双向数据传输通道。它提供端到端的逻辑连接,确保数据按发送顺序到达接收方。

应用:

协议层 实现形式 特点
TCP 字节流(Byte Stream) 无消息边界,需应用层处理分包
HTTP/2 逻辑消息流(Message Stream) 在单个TCP连接上多路复用多个请求
QUIC 独立流(Isolated Stream) 每个流独立处理,避免队头阻塞
multiplexing

多路复用是在单一物理连接上同时传输多个独立数据流的技术,其核心价值在于:

  • 提升连接利用率:避免频繁建立/断开连接的开销;
  • 降低延迟:并行处理多个数据交互;
  • 资源高效:减少端口和内存占用。
类型 原理 典型协议 优缺点
端口复用 通过不同端口区分服务 HTTP/1.1 简单但扩展性差
帧标识复用 为每个流分配唯一ID标识 HTTP/2, QUIC 高效但需复杂状态管理
信道化复用 物理划分时隙/频段 传统电话网络 资源固定分配,利用率低

协议中的多路复用

协议 多路复用创新 性能提升指标
HTTP/3 基于QUIC的流多路复用 页面加载时间↓35%
gRPC 利用HTTP/2流实现RPC调用复用 QPS↑4倍
WebTransport 基于UDP的流式传输 延迟↓50%
  • 实时视频会议 → 优先选择QUIC协议,利用独立流避免卡顿
  • 微服务通信 → 采用gRPC的HTTP/2流复用提升吞吐量
  • IoT设备管理 → 使用CoAP的块传输实现低功耗多路复用
Project
protocol/floodsub

Floodsub 基本逻辑就是订阅消息发送消息传递消息。每一个节点在收到消息后,都会对连接的全部节点进行发送消息,“一传十,十传百”,真的就是字面上的意思,所以会造成大量的重复发送,是一种以带宽换取传播速度的方式,适合小型网络。

  1. event

    pub struct Behaviour {
    /// Events that need to be yielded to the outside when polling.
    events: VecDeque<ToSwarm<Event, FloodsubRpc>>,

    ///...
    }

    包装了Toswarm的类型,而ToSwarm有以下的内容

    pub enum ToSwarm<TOutEvent, TInEvent> {
    /// Instructs the `Swarm` to return an event when it is being polled.
    GenerateEvent(TOutEvent),


    Dial { opts: DialOpts },

    /// Instructs the [`Swarm`](crate::Swarm) to listen on the provided address.
    ListenOn { opts: ListenOpts },

    /// Instructs the [`Swarm`](crate::Swarm) to remove the listener.
    RemoveListener { id: ListenerId },


    NotifyHandler {
    /// The peer for whom a [`ConnectionHandler`] should be notified.
    peer_id: PeerId,
    /// The options w.r.t. which connection handler to notify of the event.
    handler: NotifyHandler,
    /// The event to send.
    event: TInEvent,
    },


    NewExternalAddrCandidate(Multiaddr),


    ExternalAddrConfirmed(Multiaddr),


    ExternalAddrExpired(Multiaddr),


    CloseConnection {
    /// The peer to disconnect.
    peer_id: PeerId,
    /// Whether to close a specific or all connections to the given peer.
    connection: CloseConnection,
    },


    NewExternalAddrOfPeer { peer_id: PeerId, address: Multiaddr },
    }

    可见event几乎涵盖了节点的功能相关的枚举类型,layer中的很多方法实现的就是将方法对应的事件塞入events队列的尾部。

    应当执行event时则会调用poll函数(见异步编程的future部分)

    #[tracing::instrument(level = "trace", name = "NetworkBehaviour::poll", skip(self))]
    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
    if let Some(event) = self.events.pop_front() {
    return Poll::Ready(event);
    }

    Poll::Pending
    }
  2. 关注

    pub fn subscribe(&mut self, topic: Topic) -> bool {
    if self.subscribed_topics.iter().any(|t| t.id() == topic.id()) {
    return false;
    }

    for peer in self.connected_peers.keys() {
    self.events.push_back(ToSwarm::NotifyHandler {
    peer_id: *peer,
    handler: NotifyHandler::Any,
    event: FloodsubRpc {
    messages: Vec::new(),
    subscriptions: vec![FloodsubSubscription {
    topic: topic.clone(),
    action: FloodsubSubscriptionAction::Subscribe,
    }],
    },
    });
    }

    self.subscribed_topics.push(topic);
    true
    }

    /// Unsubscribes from a topic.
    ///
    /// Note that this only requires the topic name.
    ///
    /// Returns true if we were subscribed to this topic.
    pub fn unsubscribe(&mut self, topic: Topic) -> bool {
    let Some(pos) = self.subscribed_topics.iter().position(|t| *t == topic) else {
    return false;
    };

    self.subscribed_topics.remove(pos);

    for peer in self.connected_peers.keys() {
    self.events.push_back(ToSwarm::NotifyHandler {
    peer_id: *peer,
    handler: NotifyHandler::Any,
    event: FloodsubRpc {
    messages: Vec::new(),
    subscriptions: vec![FloodsubSubscription {
    topic: topic.clone(),
    action: FloodsubSubscriptionAction::Unsubscribe,
    }],
    },
    });
    }

    true
    }

    解释:

    • handler:NotifyHandler::Any :: floodsub 只需要将消息传递给对等节点一次,而不需要通过所有连接都发送相同的消息。Any不关心具体的连接节点,只关心消息是否传达到对等节点。
chat

参考了官方库的example

里面使用了gossipsub,不同于floodsub适用于小型网络,它更适用于区块链和p2p聊天。

Gossipsub 是一种基于 Gossip(流言)协议发布/订阅(PubSub) 消息传播机制,旨在高效、可靠地在 P2P 网络中广播消息。它结合了 Floodsub(泛洪协议) 的简单性和 Gossip 协议 的扩展性,同时优化了带宽使用和消息传播效率。

  • Mesh 网络(主题网格)
    • 每个主题(Topic)维护一个 Mesh,即一组稳定连接的节点(Peer),用于高效传播消息。
    • Mesh 的大小受 D(期望连接数)参数控制,避免过度连接或孤立节点。
  • Fanout 机制(非订阅发布)
    • 允许节点向未订阅的主题发送消息(如临时广播),并记录目标节点,避免全网络泛洪。
  • Control Messages(控制消息)
    • IHAVE/IWANT:用于请求/提供缺失的消息(类似 BitTorrent 的块交换)。
    • GRAFT/PRUNE:动态调整 Mesh 成员(加入或剔除节点)。
  • 消息缓存(Message Cache)
    • 记录最近的消息 ID,防止重复传播,并支持按需拉取(IWANT 机制)。

topic作为聊天室的范围,聊天节点可以随时加入或离开,IHAVE/IWANT 机制确保即使部分节点离线,消息仍可通过其他节点补全。

上述的特点在这个publish函数中都能找到

  1. 数据转换:对原始数据进行可选的转换(如压缩)

  2. 消息构建:根据发布配置(签名/匿名/随机作者)构建原始消息

  3. 消息验证:检查消息大小和是否为重复消息

  4. 选择接收者

    :根据不同策略选择消息接收节点

    • 如果启用了洪泛发布 (flood_publish),向所有订阅的高分节点发送
    • 如果是正常 Gossipsub 逻辑,则使用 mesh(有主题) 或 fanout(无主题,mesh中随机配凑fanout) 机制
  5. 缓存消息:将消息保存在本地缓存中,用于处理重复消息

  6. 发送消息:向所有选定的接收者发送消息

pub fn publish(
&mut self,
topic: impl Into<TopicHash>,
data: impl Into<Vec<u8>>,
) -> Result<MessageId, PublishError> {
let data = data.into();
let topic = topic.into();

// 对数据进行转换(例如压缩)
let transformed_data = self
.data_transform
.outbound_transform(&topic, data.clone())?;

// 构建原始消息
let raw_message = self.build_raw_message(topic, transformed_data)?;

// 基于原始数据(未转换的)计算消息ID
let msg_id = self.config.message_id(&Message {
source: raw_message.source,
data, // 使用未压缩形式
sequence_number: raw_message.sequence_number,
topic: raw_message.topic.clone(),
});

// 检查消息大小是否超过传输限制
if raw_message.raw_protobuf_len() > self.config.max_transmit_size() {
return Err(PublishError::MessageTooLarge);
}

// 检查消息是否已经发布过
if self.duplicate_cache.contains(&msg_id) {
tracing::warn!(
message=%msg_id,
"Not publishing a message that has already been published"
);
return Err(PublishError::Duplicate);
}

// 选择消息接收者
let mut recipient_peers = HashSet::new();

// 获取所有订阅该主题的节点
if let Some(set) = self.topic_peers.get(&topic_hash) {
if self.config.flood_publish() {
// 如果启用了洪泛发布,则向所有符合条件的节点发送
recipient_peers.extend(set.iter().filter(|p| {
self.explicit_peers.contains(*p)
|| !self.score_below_threshold(p, |ts| ts.publish_threshold).0
}));
} else {
// 正常Gossipsub逻辑:根据mesh或fanout选择接收节点
match self.mesh.get(&raw_message.topic) {
// 如果该主题在我们的mesh中,向mesh中的节点发送
Some(mesh_peers) => {
recipient_peers.extend(mesh_peers);
}
// 否则,使用fanout机制
None => {
// 如果已有fanout节点则使用,否则新建fanout列表
if self.fanout.contains_key(&topic_hash) {
for peer in self.fanout.get(&topic_hash).expect("Topic must exist") {
recipient_peers.insert(*peer);
}
} else {
// 随机选择mesh_n个节点作为fanout
let mesh_n = self.config.mesh_n();
let new_peers = get_random_peers(...);

self.fanout.insert(topic_hash.clone(), new_peers.clone());
recipient_peers.extend(new_peers);
}
// 更新最近发布时间戳
self.fanout_last_pub.insert(topic_hash.clone(), Instant::now());
}
}

// 添加显式对等节点(explicit peers)
// 添加floodsub节点
}
}

// 如果没有接收者,返回错误
if recipient_peers.is_empty() {
return Err(PublishError::InsufficientPeers);
}

// 将消息加入缓存
self.duplicate_cache.insert(msg_id.clone());
self.mcache.put(&msg_id, raw_message.clone());

// 处理匿名消息的特殊情况
if let PublishConfig::RandomAuthor | PublishConfig::Anonymous = self.publish_config {
if !self.config.allow_self_origin() {
self.published_message_ids.insert(msg_id.clone());
}
}

// 向所有选定的节点发送消息
for peer_id in recipient_peers.iter() {
self.send_message(*peer_id, RpcOut::Publish(raw_message.clone()));
}

// 更新指标
if let Some(metrics) = self.metrics.as_mut() {
metrics.register_published_message(&topic_hash);
}

Ok(msg_id)
}

构建chat的流程:

  1. 配置swarm:tokio+tcp+quic+交互逻辑
    交互逻辑使用with_behaviour配置:

    .with_behaviour(|key| {
    // 配置 gossipsub 消息 ID 生成函数
    let message_id_fn = |message: &gossipsub::Message| {
    let mut s = DefaultHasher::new();
    message.data.hash(&mut s);
    gossipsub::MessageId::from(s.finish().to_string())
    };

    // 构建 gossipsub 配置
    let gossipsub_config = gossipsub::ConfigBuilder::default()
    .heartbeat_interval(Duration::from_secs(5))
    .validation_mode(gossipsub::ValidationMode::Permissive)
    .message_id_fn(message_id_fn)
    .build()
    .map_err(io::Error::other)?;

    // 创建 gossipsub 行为
    let gossipsub = gossipsub::Behaviour::new(
    gossipsub::MessageAuthenticity::Signed(key.clone()),
    gossipsub_config,
    )?;

    // 创建 mdns 行为
    let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?;

    // 返回你的自定义行为组合
    Ok(MyBehaviour {
    gossipsub,
    mdns,
    })
    })?

    其中mdns协议负责在本地网络自动发现其他节点,设置了gossipsub的消息ID生成函数和参数。最后返回的MyBehaviour由程序自定义,通过#[derive(NetworkBehaviour)]统一行为,在3中作为异步事件的参数

  2. 设置topic和stdin流

  3. 进入异步事件循环

    loop {
    select! {
    Ok(Some(line)) = stdin.next_line() => {
    // let message = gossipsub::Message {
    // data: line.as_bytes().to_vec(),
    // sequence_number: None,
    // topic: topic.clone().into(),
    // source: None,
    // };
    if let Err(e) = swarm
    .behaviour_mut().gossipsub
    .publish(topic.clone(), line.as_bytes()) {
    println!("Publish error: {e:?}");
    }
    }
    event = swarm.select_next_some() => match event {
    SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
    for (peer_id, _multiaddr) in list {
    println!("mDNS discovered a new peer: {peer_id}");
    swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
    }
    let topic_peers_len = swarm.behaviour().gossipsub.all_peers()
    .count();
    println!("Connected peers for topic 'chat': {} peers", topic_peers_len);
    },
    SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => {
    for (peer_id, _multiaddr) in list {
    println!("mDNS discover peer has expired: {peer_id}");
    swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer_id);
    }
    },
    SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
    propagation_source: peer_id,
    message_id: id,
    message,
    })) => println!(
    "Got message: '{}' with id: {id} from peer: {peer_id}",
    String::from_utf8_lossy(&message.data),
    ),
    SwarmEvent::NewListenAddr { address, .. } => {
    println!("Local node is listening on {address}");
    }
    _ => {}
    }

    }

    事件循环中的stdin.next_line()event = swarm.select_next_some()具有异步属性,通过select宏统一执行。每次循环直到执行完其中一个后结束。网络事件主要有mdns的discover+expired和gossipsub的消息传输。

有个扩展作业,实现chat的私聊功能:

deepseek给的实现方案如下:

方案 隐私性 实现难度 扩展性 适用场景
消息头标记法(@) 简单 快速原型开发
独立私聊 Topic 中等 中小规模固定群组
端到端加密(E2E) 复杂 高安全性需求
直接连接(Stream) 最高 最复杂 一对一实时通信

按我的直觉来说第一个方案实现的最差,可以被别人伪装,第三个方案像是我会做的。方案二意外的不错。

优化后尝试把2和3结合:

  • 强制订阅检查:发送私聊前,确认对方已订阅目标 Topic(可通过 gossipsub.subscribed_peers(topic) 检查)。
  • 动态重试:如果对方未订阅,先发送控制消息(如 "@B请订阅 /chat/private/A-B"),再重发私聊消息。
  • 限制私聊 Topic 数量:例如,每个 Peer 最多同时维护 10 个私聊 Topic,超时自动清理。
  • 按需订阅:仅在需要通信时订阅 Topic,结束后取消订阅(gossipsub.unsubscribe(topic))。
  • 哈希化 Topic 名称:例如,用 sha256(local_peer_id + remote_peer_id) 生成不可预测的 Topic。
  • 临时 Topic:每次会话生成随机 Topic,并通过安全通道(如方案 3 的 E2E 加密)通知对方。

具体的代码实现可能得有空闲时间的时候来实现

Write a CLI

非常好代码book,这一大节的参考:入门 - Rust 中的命令行应用程序 - Rust CLI 中文

clap = {version = "4.0", features = ["derive"]}以及以下代码

use clap::Parser;
#[derive(Parser)]
struct CLI {
pattern: String,
path: path::PathBuf,
}

//in fn main
let args = CLI::parse();
println!("Pattern: {}", args.pattern);
println!("Path: {:?}", args.path);
//in terminal
cargo run -- aa src/main.rs

执行之后会得到 aa Pattern和后面的Path输出

Error handling

尝试用result + match语句来处理

let my_async_result =     
async {
let addr = args.path.to_str().unwrap();
let content = fs::read_to_string(addr);
match content {
Ok(content) => {
println!("File content: {}", content);
return content
}
Err(e) => {
println!("Error reading file: {}", e);
return String::new()
}
}

}.await;
println!("Async Result: {}", my_async_result);
Ok(())

现在,我们能够访问文件的内容,但在 match 块之后我们无法真正对它做任何事情。为此,我们需要以某种方式处理错误情况。挑战在于 match 块的所有分支都需要返回相同类型的某些内容。可以尝试unwrap(像addr一行)或者?let content = std::fs::read_to_string("test.txt")?;

当然,时间富裕的首选是自定义错误处理,其次使用更成熟的库。

在anyhow库中:

use anyhow::{Context, Result};

fn main() -> Result<()> {
let path = "test.txt";
let content = std::fs::read_to_string(path)
.with_context(|| format!("could not read file `{}`", path))?;
println!("file content: {}", content);
Ok(())
}

由此修改上面的代码:

#[warn(unused_imports)]
use std::{
path,
error::Error,
fs,
io::{self, Write},
env
};
use anyhow::{Context, Result};
use clap::Parser;
#[derive(Parser)]
struct CLI {
pattern: String,
path: path::PathBuf,
}
#[warn(unused_imports)]
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = CLI::parse();
println!("Pattern: {}", args.pattern);
println!("Path: {:?}", args.path);
async {
let addr = args.path.clone();
return fs::read_to_string(&addr)
.with_context(|| format!("Failed to read file: {}", addr.display()))
.map(|content| {
let mut count = 0;
for line in content.lines() {
if line.contains(&args.pattern) {
count += 1;
}
}
println!("Pattern '{}' found {} times", args.pattern, count);
})

}.await?;
Ok(())

}

Pattern: aa
Path: “src/main”
Error: Failed to read file: src/main

Caused by:
No such file or directory (os error 2)

可以看见它保留了原始的错误和我们自定义的错误描述

pipe

应通过 stderr 打印错误,以便用户和其他工具更容易将输出管道传输到文件或更多工具。

注意:在大多数操作系统上,程序可以写入两个输出流,stdoutstderrstdout 用于程序的实际输出,而 stderr 允许错误和其他消息与 stdout 分开。这样,输出可以存储到文件中或管道传输到另一个程序,而错误会显示给用户。

在 Rust 中,这是通过 println!eprintln! 实现的,前者打印到 stdout,后者打印到 stderr

如果有转义字符,避免他们改变用户的terminal输出。可以用ansi_term这种crate。

直接调用println会很慢,为此使用一个缓冲区会有用。

use std::io::{self, Write};

let stdout = io::stdout(); // get the global stdout entity
let mut handle = io::BufWriter::new(stdout); // optional: wrap that handle in a buffer
writeln!(handle, "foo: {}", 42); // add `?` if you care about errors here

直接这么写会得到warning

warning: unused Result that must be used
–> src/main.rs:22:5
|
22 | writeln!(handle, “Pattern: {}”, args.pattern);
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: this Result may be an Err variant, which should be handled
= note: #[warn(unused_must_use)] on by default
= note: this warning originates in the macro writeln (in Nightly builds, run with -Z macro-backtrace for more info)

为了去掉warning,锁定 stdout(或 stderr)并使用 writeln! 直接打印到它会有所帮助。这也可以防止系统反复锁定和解锁 stdout。为了简洁可以包装成一个函数

fn print_stdout<T: std::fmt::Display>(s: T) {
let stdout = io::stdout();
let mut handle = io::BufWriter::new(stdout.lock());
writeln!(handle, "{}", s).unwrap();
}
//invoking
print_stdout(format!("Pattern: {}", args.pattern));

进度条

使用 indicatif 板条箱,您可以向您的程序添加进度条和小旋转器。这是一个快速示例

fn main() {
let pb = indicatif::ProgressBar::new(100);
for i in 0..100 {
do_hard_work();
pb.println(format!("[+] finished #{}", i));
pb.inc(1);
}
pb.finish_with_message("done");
}

log

cli需要日志。

向您的应用程序添加简单记录,您需要两件事:log 板条箱(它包含以日志级别命名的宏)和一个实际将日志输出写入某个有用位置的适配器。能够使用日志适配器非常灵活:例如,您可以使用它们将日志写入终端,还可以写入 syslog 或中央日志服务器。

由于我们现在只关心编写 CLI 应用程序,因此一个易于使用的适配器是 env_logger。它被称为“env”记录器,因为您可以使用环境变量来指定要记录应用程序的哪些部分(以及要记录它们的级别)。它将在您的日志消息前面加上时间戳和日志消息来自的模块。由于库也可以使用 log,因此您也可以轻松配置它们的日志输出。

#[warn(unused_imports)]
use std::{
path,
error::Error,
fs,
io::{self, Write},
env
};
use anyhow::{Context, Result};
use clap::Parser;
use log::{info, warn, error, debug, LevelFilter};
use env_logger::{Builder, WriteStyle};
#[derive(Parser)]
struct CLI {
pattern: String,
path: path::PathBuf,
#[clap(long, default_value = "app.log")]
log_file: String,
}

#[warn(unused_imports)]
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let args = CLI::parse();
setup_logger(&args.log_file)?;
info!("搜索模式: {}", args.pattern);
info!("搜索路径: {}", args.path.display());
print_stdout(format!("Pattern: {}", args.pattern));
print_stdout(format!("Path: {}", args.path.display()));
debug!("开始文件读取和搜索操作");
async {
let addr = args.path.clone();
return fs::read_to_string(&addr)
.with_context(|| {
let err_msg = format!("Failed to read file: {}", addr.display());
error!("{}", err_msg);
err_msg
})
.map(|content| {
let mut count = 0;
for (line_num, line) in content.lines().enumerate() {
if line.contains(&args.pattern) {
count += 1;
debug!("在行 {} 找到匹配: {}", line_num + 1, line);
}
}
let result_msg = format!("Pattern '{}' found {} times", args.pattern, count);
info!("{}", result_msg);
print_stdout(result_msg); })

}.await?;
info!("程序正常结束");
Ok(())

}

fn print_stdout<T: std::fmt::Display>(s: T) {
let stdout = io::stdout();
let mut handle = io::BufWriter::new(stdout.lock());
writeln!(handle, "{}", s).unwrap();
}

fn setup_logger(log_file_path: &str) -> Result<(), Box<dyn Error>> {
// 创建日志文件,使用追加模式
let log_file = fs::OpenOptions::new()
.create(true)
.write(true)
.append(true)
.open(log_file_path)?;

// 构建自定义 logger
let mut builder = Builder::new();
builder
.target(env_logger::Target::Pipe(Box::new(log_file)))
.filter_level(LevelFilter::Debug)
.write_style(WriteStyle::Always)
.format_timestamp_secs()
.format_module_path(true)
.init();

info!("日志系统初始化完成,日志文件: {}", log_file_path);
Ok(())
}

整个程序略显臃肿。

test

测试驱动开发可以看看这个

rust有内置的测试框架,只是测试的话不该写在这里。

那么如何测试一个CLI程序?

首先从单元测试开始。

fn find_matches(content: &str, pattern: &str) -> Vec<(usize, String)> {
let mut matches = Vec::new();
for (line_num, line) in content.lines().enumerate() {
if line.contains(pattern) {
info!("找到匹配: {} (行 {})", line, line_num + 1);
matches.push((line_num + 1, line.to_string()));
}
}
matches
}

#[test]
fn find_a_match() {
find_matches("lorem ipsum\ndolor sit amet", "lorem");
assert_eq!(
find_matches("lorem ipsum\ndolor sit amet", "lorem"),
vec![(1, "lorem ipsum".to_string())]
);
assert_eq!(
find_matches("lorem ipsum\ndolor sit amet", "ipsum"),
vec![(1, "lorem ipsum".to_string())]
);
}

此时打印在日志中可以简单捕获。如果是会打印到stdout中的功能,则需要std::io::Writetrait的实现

例如下面的代码:

fn print_stdout_with_handle<T: std::fmt::Display>(s: T, handle: &mut impl Write) {
writeln!(handle, "{}", s).unwrap();
}

#[test]
fn test_find_matches_stdout() {
let mut buffer = Vec::new();
find_matches_stdout("lorem ipsum\ndolor sit amet", "lorem", &mut buffer);
assert_eq!(
String::from_utf8(buffer).unwrap(),
"在行 1 找到匹配:lorem ipsum\n"
);
}

buffer参数的存在能让测试时获得stdout的数据用于测试。

为了易于测试,做到的东西会有:

  1. 确定了我们应用程序的核心部分之一,
  2. 将其放入自己的函数中,
  3. 并使其更加灵活。

当然作为命令行程序也要从用户的角度测试使用,可以尝试assert_fs

use assert_fs::prelude::*;

#[test]
fn find_content_in_file() -> Result<(), Box<dyn std::error::Error>> {
let file = assert_fs::NamedTempFile::new("sample.txt")?;
file.write_str("A test\nActual content\nMore content\nAnother test")?;

let mut cmd = Command::cargo_bin("grrs")?;
cmd.arg("test").arg(file.path());
cmd.assert()
.success()
.stdout(predicate::str::contains("A test\nAnother test"));

Ok(())
}

根据你的程序的性质,你还可以尝试添加更多的测试技术。例如,如果你已经提取了程序的部分内容,并且发现自己在编写大量示例用例作为单元测试,同时试图找出所有边缘情况,你应该研究 proptest。如果你的程序消耗任意文件并解析它们,请尝试编写一个 fuzzer 以查找边缘情况下的错误。

more

处理ctrl+c等unix信号:https://crates.io/crates/signal-hook

使用通道作为线程同步点:https://crates.io/crates/crossbeam-channel

future和streams:如果您正在使用 tokio,您很可能已经使用异步模式和事件驱动设计来编写应用程序。您可以启用 signal-hook 的 tokio-support 功能,而不是直接使用 crossbeam 的通道。这允许您在 signal-hook 的 Signals 类型上调用 .into_async(),以获取一个实现 futures::Stream 的新类型。

配置文件的使用:https://docs.rs/confy/0.3.1/confy/

友好地处理panic:https://crates.io/crates/human-panic

区分使用者是人类还是Terminal:https://doc.rust-lang.net.cn/stable/std/io/trait.IsTerminal.html

为命令行渲染文档:https://crates.io/crates/clap_mangen

thread/process

进程和线程的处理方式存在许多细微的差异。最重要的区别是内存在线程之间共享,而不是在进程之间共享。这意味着进程之间的通信是通过某种消息传递进行的,类似于在不同计算机上运行的程序之间的通信。

process
threads