diff --git a/conev.h b/conev.h index 8e99c65..31c6fd1 100644 --- a/conev.h +++ b/conev.h @@ -77,6 +77,7 @@ struct eval { ssize_t round_sent; unsigned int round_count; int attempt; + unsigned int part_sent; bool cache; bool mark; // }; diff --git a/desync.c b/desync.c index 7bc4140..f2b32e2 100644 --- a/desync.c +++ b/desync.c @@ -521,7 +521,7 @@ static void tamp(char *buffer, size_t bfsize, ssize_t *n, ssize_t desync(struct poolhd *pool, - struct eval *val, struct buffer *buff, ssize_t *np) + struct eval *val, struct buffer *buff, ssize_t *np, bool *wait) { struct desync_params dp = params.dp[val->pair->attempt]; struct proto_info info = { 0 }; @@ -531,6 +531,7 @@ ssize_t desync(struct poolhd *pool, size_t bfsize = buff->size; ssize_t offset = buff->offset; ssize_t skip = val->pair->round_sent; + unsigned int part_skip = val->pair->part_sent; if (!skip && params.debug) { init_proto_info(buffer, *np, &info); @@ -551,25 +552,37 @@ ssize_t desync(struct poolhd *pool, long lp = offset; struct part part; int i = 0, r = 0; + unsigned int curr_part = 0; + bool need_wait = false; for (; r > 0 || i < dp.parts_n; r--) { if (r <= 0) { part = dp.parts[i]; r = part.r; i++; } + curr_part++; + long pos = gen_offset(part.pos, part.flag, buffer, n, lp, &info); pos += (long )part.s * (part.r - r); - if (skip && pos <= skip && !(part.flag & OFFSET_START)) { + if ((skip && pos <= skip) + && curr_part <= part_skip && !(part.flag & OFFSET_START)) { continue; } - if (offset && pos <= offset) { + if (offset && pos < offset) { continue; } if (pos < 0 || pos > n || pos < lp) { LOG(LOG_E, "split cancel: pos=%ld-%ld, n=%zd\n", lp, pos, n); break; } + + if (need_wait) { + set_timer(pool, val, params.await_int); + *wait = true; + return lp - offset; + } + ssize_t s = 0; if (sock_has_notsent(sfd)) { @@ -605,9 +618,11 @@ ssize_t desync(struct poolhd *pool, break; } LOG(LOG_S, "split: pos=%ld-%ld (%zd), m: %s\n", lp, pos, s, demode_str[part.m]); - + val->pair->part_sent = curr_part; + if (s == ERR_WAIT) { set_timer(pool, val, params.await_int); + *wait = true; return lp - offset; } if (s < 0) { @@ -615,12 +630,23 @@ ssize_t desync(struct poolhd *pool, return lp - offset; } return -1; - } + } else if (s != (pos - lp)) { LOG(LOG_E, "%zd != %ld\n", s, pos - lp); return lp + s - offset; } lp = pos; + + if (params.wait_send) { + if (lp < n) { + set_timer(pool, val, params.await_int); + *wait = true; + return lp - offset; + } + else { + need_wait = true; + } + } } // send all/rest if (lp < n) { diff --git a/desync.h b/desync.h index f6060ae..8917e83 100644 --- a/desync.h +++ b/desync.h @@ -12,7 +12,7 @@ #include #endif -ssize_t desync(struct poolhd *pool, struct eval *val, struct buffer *buff, ssize_t *n); +ssize_t desync(struct poolhd *pool, struct eval *val, struct buffer *buff, ssize_t *n, bool *wait); ssize_t desync_udp(int sfd, char *buffer, ssize_t n, const struct sockaddr *dst, int dp_c); diff --git a/extend.c b/extend.c index 24b18af..8f3c58d 100644 --- a/extend.c +++ b/extend.c @@ -173,6 +173,7 @@ static int reconnect(struct poolhd *pool, struct eval *val, int m) client->buff->offset = 0; client->round_sent = 0; + client->part_sent = 0; return 0; } @@ -404,7 +405,7 @@ static int cancel_setup(struct eval *remote) ssize_t tcp_send_hook(struct poolhd *pool, - struct eval *remote, struct buffer *buff, ssize_t *n) + struct eval *remote, struct buffer *buff, ssize_t *n, bool *wait) { ssize_t sn = -1; int skip = remote->flag != FLAG_CONN; @@ -423,7 +424,7 @@ ssize_t tcp_send_hook(struct poolhd *pool, } else { LOG(LOG_S, "desync TCP: group=%d, round=%d, fd=%d\n", m, r, remote->fd); - sn = desync(pool, remote, buff, n); + sn = desync(pool, remote, buff, n, wait); } } if (skip) { @@ -467,6 +468,7 @@ ssize_t tcp_recv_hook(struct poolhd *pool, if (val->round_sent == 0) { val->round_count++; val->pair->round_sent = 0; + val->pair->part_sent = 0; } if (val->flag == FLAG_CONN && !val->round_sent) { int *nr = params.dp[val->pair->attempt].rounds; diff --git a/extend.h b/extend.h index ed8a0e8..8e9fecb 100644 --- a/extend.h +++ b/extend.h @@ -11,7 +11,7 @@ int connect_hook(struct poolhd *pool, struct eval *val, const union sockaddr_u *dst, evcb_t next); ssize_t tcp_send_hook(struct poolhd *pool, - struct eval *remote, struct buffer *buff, ssize_t *n); + struct eval *remote, struct buffer *buff, ssize_t *n, bool *wait); ssize_t tcp_recv_hook(struct poolhd *pool, struct eval *val, struct buffer *buff); diff --git a/main.c b/main.c index 5d97f80..bdc357a 100644 --- a/main.c +++ b/main.c @@ -178,6 +178,7 @@ const struct option options[] = { {"tlsrec", 1, 0, 'r'}, {"udp-fake", 1, 0, 'a'}, {"def-ttl", 1, 0, 'g'}, + {"wait-send", 0, 0, 'Z'}, // {"await-int", 1, 0, 'W'}, // #ifdef __linux__ {"drop-sack", 0, 0, 'Y'}, @@ -1067,6 +1068,10 @@ int main(int argc, char **argv) dp->drop_sack = 1; break; + case 'Z': + params.wait_send = 1; + break; + case 'W': params.await_int = atoi(optarg); break; diff --git a/params.h b/params.h index 088f5b3..23fba1e 100644 --- a/params.h +++ b/params.h @@ -102,6 +102,7 @@ struct params { int dp_count; struct desync_params *dp; int await_int; + bool wait_send; int def_ttl; bool custom_ttl; diff --git a/proxy.c b/proxy.c index 68512ce..421f1c4 100644 --- a/proxy.c +++ b/proxy.c @@ -662,12 +662,13 @@ int on_tunnel(struct poolhd *pool, struct eval *val, int etype) } n = val->buff->lock - val->buff->offset; - ssize_t sn = tcp_send_hook(pool, pair, val->buff, &val->buff->lock); + bool wait = false; + ssize_t sn = tcp_send_hook(pool, pair, val->buff, &val->buff->lock, &wait); if (sn < 0) { uniperror("send"); return -1; } - if (sn < n) { + if (sn < n || wait) { val->buff->offset += sn; return 0; } @@ -694,19 +695,26 @@ int on_tunnel(struct poolhd *pool, struct eval *val, int etype) if (n < 0) { return -1; } - ssize_t sn = tcp_send_hook(pool, pair, buff, &n); + + bool wait = false; + ssize_t sn = tcp_send_hook(pool, pair, buff, &n, &wait); if (sn < 0) { uniperror("send"); return -1; } - if (sn < n) { - LOG(LOG_S, "send: %zd != %zd (fd=%d)\n", sn, n, pair->fd); + if (sn < n || wait) { + if (sn < n) { + LOG(LOG_S, "send: %zd != %zd (fd=%d)\n", sn, n, pair->fd); + } + else { + LOG(LOG_S, "send: %zd, but not done yet (fd=%d)\n", sn, pair->fd); + } buff->lock = n; buff->offset = sn; if (mod_etype(pool, val, 0) || - mod_etype(pool, pair, !pair->tv_ms ? POLLOUT : 0)) { + mod_etype(pool, pair, !wait ? POLLOUT : 0)) { uniperror("mod_etype"); return -1; }