Skip to content

Commit 75f933d

Browse files
committed
Change fifo to pipe in shim like go shim
add io monitor to inspect io fd change Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
1 parent 217f0ee commit 75f933d

File tree

7 files changed

+326
-33
lines changed

7 files changed

+326
-33
lines changed

crates/runc-shim/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ license.workspace = true
1515
repository.workspace = true
1616
homepage.workspace = true
1717

18+
1819
[[bin]]
1920
# Overwrite the binary name so it can be referred as "io.containerd.runc.v2-rs" from containerd.
2021
# Note: the runtime's binary name must start with "io.containerd.runc" in order to
@@ -36,10 +37,12 @@ serde.workspace = true
3637
serde_json.workspace = true
3738
time.workspace = true
3839
uuid.workspace = true
40+
lazy_static = "1.4.0"
3941

4042
# Async dependencies
4143
async-trait.workspace = true
4244
tokio = { workspace = true, features = ["full"] }
45+
futures.workspace = true
4346

4447
[target.'cfg(target_os = "linux")'.dependencies]
4548
cgroups-rs.workspace = true

crates/runc-shim/src/common.rs

+23-12
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@ use std::{
2929

3030
use containerd_shim::{
3131
api::{ExecProcessRequest, Options},
32-
io_error, other, other_error,
33-
util::IntoOption,
34-
Error,
32+
io_error, other, other_error, Error,
3533
};
3634
use log::{debug, warn};
3735
use nix::{
@@ -43,7 +41,7 @@ use nix::{
4341
};
4442
use oci_spec::runtime::{LinuxNamespaceType, Spec};
4543
use runc::{
46-
io::{Io, NullIo, FIFO},
44+
io::{IOOption, Io, NullIo, PipedIo},
4745
options::GlobalOpts,
4846
Runc, Spawner,
4947
};
@@ -76,8 +74,8 @@ pub struct ProcessIO {
7674

7775
pub fn create_io(
7876
id: &str,
79-
_io_uid: u32,
80-
_io_gid: u32,
77+
io_uid: u32,
78+
io_gid: u32,
8179
stdio: &Stdio,
8280
) -> containerd_shim::Result<ProcessIO> {
8381
let mut pio = ProcessIO::default();
@@ -100,19 +98,32 @@ pub fn create_io(
10098

10199
if scheme == FIFO_SCHEME {
102100
debug!(
103-
"create named pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
101+
"create pipe io for container {}, stdin: {}, stdout: {}, stderr: {}",
104102
id,
105103
stdio.stdin.as_str(),
106104
stdio.stdout.as_str(),
107105
stdio.stderr.as_str()
108106
);
109-
let io = FIFO {
110-
stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
111-
stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
112-
stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
107+
108+
// let io = FIFO {
109+
// stdin: stdio.stdin.to_string().none_if(|x| x.is_empty()),
110+
// stdout: stdio.stdout.to_string().none_if(|x| x.is_empty()),
111+
// stderr: stdio.stderr.to_string().none_if(|x| x.is_empty()),
112+
// };
113+
// pio.copy = false;
114+
115+
if stdio.stdin.is_empty() {
116+
debug!("stdin is empty");
117+
}
118+
let opts = IOOption {
119+
open_stdin: !stdio.stdin.is_empty(),
120+
open_stdout: !stdio.stdout.is_empty(),
121+
open_stderr: !stdio.stderr.is_empty(),
113122
};
123+
let io = PipedIo::new(io_uid, io_gid, &opts).unwrap();
124+
pio.copy = true;
125+
114126
pio.io = Some(Arc::new(io));
115-
pio.copy = false;
116127
}
117128
Ok(pio)
118129
}

crates/runc-shim/src/processes.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use tokio::{
3737
sync::oneshot::{channel, Receiver, Sender},
3838
};
3939

40-
use crate::io::Stdio;
40+
use crate::{common::ProcessIO, io::Stdio};
4141

4242
#[async_trait]
4343
pub trait Process {
@@ -71,6 +71,7 @@ pub struct ProcessTemplate<S> {
7171
pub state: Status,
7272
pub id: String,
7373
pub stdio: Stdio,
74+
pub io: Option<Arc<ProcessIO>>,
7475
pub pid: i32,
7576
pub exit_code: i32,
7677
pub exited_at: Option<OffsetDateTime>,
@@ -86,6 +87,7 @@ impl<S> ProcessTemplate<S> {
8687
state: Status::CREATED,
8788
id: id.to_string(),
8889
stdio,
90+
io: None,
8991
pid: 0,
9092
exit_code: 0,
9193
exited_at: None,

crates/runc-shim/src/runc.rs

+123-10
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use oci_spec::runtime::{LinuxResources, Process};
4949
use runc::{Command, Runc, Spawner};
5050
use tokio::{
5151
fs::{remove_file, File, OpenOptions},
52-
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, BufReader},
52+
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader},
5353
};
5454

5555
use super::{
@@ -63,6 +63,7 @@ use crate::{
6363
CreateConfig, Log, ProcessIO, ShimExecutor, INIT_PID_FILE, LOG_JSON_FILE,
6464
},
6565
io::Stdio,
66+
service::add_monitor_io,
6667
};
6768

6869
pub type ExecProcess = ProcessTemplate<RuncExecLifecycle>;
@@ -163,8 +164,10 @@ impl RuncFactory {
163164
(Some(s), None)
164165
} else {
165166
let pio = create_io(&id, opts.io_uid, opts.io_gid, stdio)?;
166-
create_opts.io = pio.io.as_ref().cloned();
167-
(None, Some(pio))
167+
let ref_pio = Arc::new(pio);
168+
create_opts.io = ref_pio.io.clone();
169+
init.io = Some(ref_pio.clone());
170+
(None, Some(ref_pio))
168171
};
169172

170173
let resp = init
@@ -178,6 +181,22 @@ impl RuncFactory {
178181
}
179182
return Err(runtime_error(bundle, e, "OCI runtime create failed").await);
180183
}
184+
if !init.stdio.stdin.is_empty() {
185+
let stdin_clone = init.stdio.stdin.clone();
186+
let stdin_w = init.stdin.clone();
187+
// Open the write side in advance to make sure read side will not block,
188+
// open it in another thread otherwise it will block too.
189+
tokio::spawn(async move {
190+
if let Ok(stdin_w_file) = OpenOptions::new()
191+
.write(true)
192+
.open(stdin_clone.as_str())
193+
.await
194+
{
195+
let mut lock_guard = stdin_w.lock().unwrap();
196+
*lock_guard = Some(stdin_w_file);
197+
}
198+
});
199+
}
181200
copy_io_or_console(init, socket, pio, init.lifecycle.exit_signal.clone()).await?;
182201
let pid = read_file_to_str(pid_path).await?.parse::<i32>()?;
183202
init.pid = pid;
@@ -232,6 +251,7 @@ impl ProcessFactory<ExecProcess> for RuncExecFactory {
232251
stderr: req.stderr.to_string(),
233252
terminal: req.terminal,
234253
},
254+
io: None,
235255
pid: 0,
236256
exit_code: 0,
237257
exited_at: None,
@@ -299,6 +319,15 @@ impl ProcessLifecycle<InitProcess> for RuncInitLifecycle {
299319
);
300320
}
301321
}
322+
// close pipe read
323+
if !p.stdio.is_null() {
324+
if let Some(c) = p.io.clone() {
325+
if let Some(io) = c.io.clone() {
326+
io.close_read_side();
327+
}
328+
}
329+
}
330+
debug!("Do close io complete");
302331
self.exit_signal.signal();
303332
Ok(())
304333
}
@@ -394,8 +423,10 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
394423
(Some(s), None)
395424
} else {
396425
let pio = create_io(&p.id, self.io_uid, self.io_gid, &p.stdio)?;
397-
exec_opts.io = pio.io.as_ref().cloned();
398-
(None, Some(pio))
426+
let ref_pio = Arc::new(pio);
427+
exec_opts.io = ref_pio.io.clone();
428+
p.io = Some(ref_pio.clone());
429+
(None, Some(ref_pio))
399430
};
400431
//TODO checkpoint support
401432
let exec_result = self
@@ -457,6 +488,15 @@ impl ProcessLifecycle<ExecProcess> for RuncExecLifecycle {
457488

458489
async fn delete(&self, p: &mut ExecProcess) -> Result<()> {
459490
self.exit_signal.signal();
491+
//close pipe read
492+
if !p.stdio.is_null() {
493+
if let Some(c) = p.io.clone() {
494+
if let Some(io) = c.io.clone() {
495+
io.close_read_side();
496+
}
497+
}
498+
}
499+
debug!("Do close io complete");
460500
let exec_pid_path = Path::new(self.bundle.as_str()).join(format!("{}.pid", p.id));
461501
remove_file(exec_pid_path).await.unwrap_or_default();
462502
Ok(())
@@ -495,7 +535,7 @@ async fn copy_console(
495535
.open(stdio.stdin.as_str())
496536
.await
497537
.map_err(io_error!(e, "failed to open stdin"))?;
498-
spawn_copy(stdin, console_stdin, exit_signal.clone(), None::<fn()>);
538+
spawn_copy_no_recvs(stdin, console_stdin, exit_signal.clone(), None::<fn()>);
499539
}
500540

501541
if !stdio.stdout.is_empty() {
@@ -516,7 +556,7 @@ async fn copy_console(
516556
.open(stdio.stdout.as_str())
517557
.await
518558
.map_err(io_error!(e, "open stdout for read"))?;
519-
spawn_copy(
559+
spawn_copy_no_recvs(
520560
console_stdout,
521561
stdout,
522562
exit_signal,
@@ -535,6 +575,10 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
535575
if !pio.copy {
536576
return Ok(());
537577
};
578+
let mut rs = add_monitor_io(pio.io.clone().unwrap()).await;
579+
//change owner of rs stdin no need spawn_copy
580+
let stdout_recv = rs.remove(0);
581+
let stderr_recv = rs.remove(0);
538582
if let Some(io) = &pio.io {
539583
if let Some(w) = io.stdin() {
540584
debug!("copy_io: pipe stdin from {}", stdio.stdin.as_str());
@@ -544,7 +588,7 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
544588
.open(stdio.stdin.as_str())
545589
.await
546590
.map_err(io_error!(e, "open stdin"))?;
547-
spawn_copy(stdin, w, exit_signal.clone(), None::<fn()>);
591+
spawn_copy_no_recvs(stdin, w, exit_signal.clone(), None::<fn()>);
548592
}
549593
}
550594

@@ -568,8 +612,10 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
568612
stdout,
569613
exit_signal.clone(),
570614
Some(move || {
615+
debug!("stdout exit.....................");
571616
drop(stdout_r);
572617
}),
618+
stdout_recv,
573619
);
574620
}
575621
}
@@ -594,8 +640,10 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
594640
stderr,
595641
exit_signal,
596642
Some(move || {
643+
debug!("stderr exit.....................");
597644
drop(stderr_r);
598645
}),
646+
stderr_recv,
599647
);
600648
}
601649
}
@@ -604,7 +652,71 @@ pub async fn copy_io(pio: &ProcessIO, stdio: &Stdio, exit_signal: Arc<ExitSignal
604652
Ok(())
605653
}
606654

607-
fn spawn_copy<R, W, F>(from: R, to: W, exit_signal: Arc<ExitSignal>, on_close: Option<F>)
655+
fn spawn_copy<R, W, F>(
656+
from: R,
657+
to: W,
658+
exit_signal: Arc<ExitSignal>,
659+
on_close: Option<F>,
660+
mut r: tokio::sync::mpsc::Receiver<i64>,
661+
) where
662+
R: AsyncRead + Send + Unpin + 'static,
663+
W: AsyncWrite + Send + Unpin + 'static,
664+
F: FnOnce() + Send + 'static,
665+
{
666+
let mut src = from;
667+
let mut dst = to;
668+
669+
tokio::spawn(async move {
670+
let mut buffer: Vec<u8> = vec![0u8; 1024];
671+
//Change to loop and use time out, to make sure the read_buf will not hangon forever
672+
loop {
673+
let mut if_cn = true;
674+
let result = tokio::time::timeout(tokio::time::Duration::from_secs(5), async {
675+
tokio::select! {
676+
_ = exit_signal.wait() =>{
677+
debug!("container exit");
678+
if_cn = false;
679+
}
680+
r = src.read_buf(&mut buffer) => {
681+
match r{
682+
//Read n=0 but read_buf not close means pipe close
683+
Ok(n) => {
684+
if n == 0{
685+
if_cn = false;
686+
}else{
687+
//Need sure the dist write complete?
688+
let d_w = dst.write_all(&buffer).await;
689+
if d_w.is_err(){
690+
if_cn = false;
691+
}
692+
buffer.clear();
693+
}
694+
},
695+
Err(_) => {
696+
debug!("read exit");
697+
if_cn = false;
698+
},
699+
}
700+
}
701+
c = r.recv() =>{
702+
debug!("fd error io exit!! recv {:?}",c);
703+
if_cn = false;
704+
}
705+
}
706+
});
707+
//Timeout will continue unitl recv the io close
708+
let _ = result.await;
709+
if !if_cn {
710+
break;
711+
}
712+
}
713+
if let Some(f) = on_close {
714+
f();
715+
}
716+
});
717+
}
718+
719+
fn spawn_copy_no_recvs<R, W, F>(from: R, to: W, exit_signal: Arc<ExitSignal>, on_close: Option<F>)
608720
where
609721
R: AsyncRead + Send + Unpin + 'static,
610722
W: AsyncWrite + Send + Unpin + 'static,
@@ -632,7 +744,7 @@ where
632744
async fn copy_io_or_console<P>(
633745
p: &mut ProcessTemplate<P>,
634746
socket: Option<ConsoleSocket>,
635-
pio: Option<ProcessIO>,
747+
pio: Option<Arc<ProcessIO>>,
636748
exit_signal: Arc<ExitSignal>,
637749
) -> Result<()> {
638750
if p.stdio.terminal {
@@ -670,6 +782,7 @@ impl Spawner for ShimExecutor {
670782
}
671783
};
672784
let pid = child.id().unwrap();
785+
673786
let (stdout, stderr, exit_code) = tokio::join!(
674787
read_std(child.stdout),
675788
read_std(child.stderr),

0 commit comments

Comments
 (0)