diff --git a/conev.h b/conev.h index b619375..88b091a 100644 --- a/conev.h +++ b/conev.h @@ -84,6 +84,7 @@ struct eval { struct desync_params *dp; uint64_t dp_mask; + int detect; bool mark; // bool restore_ttl; diff --git a/extend.c b/extend.c index e493d42..756aafa 100644 --- a/extend.c +++ b/extend.c @@ -73,7 +73,7 @@ static struct elem_i *cache_get(const union sockaddr_u *dst) return 0; } time_t t = time(0); - if (t > val->time + params.cache_ttl || val->dp == params.dp) { + if (t > val->time + params.cache_ttl) { LOG(LOG_S, "time=%jd, now=%jd, ignore\n", (intmax_t)val->time, (intmax_t)t); mem_delete(params.mempool, (char *)&key, len); return 0; @@ -114,25 +114,25 @@ static struct elem_i *cache_add( int connect_hook(struct poolhd *pool, struct eval *val, const union sockaddr_u *dst, evcb_t next) { - struct desync_params *dp = val->dp, *init_dp; - if (!dp) { + struct desync_params *dp = params.dp; + + if (!val->dp_mask) { struct elem_i *e = cache_get(dst); if (e) { val->dp_mask = e->dp_mask; - dp = e->dp; - } else - dp = params.dp; + val->detect = e->detect; + } } - init_dp = dp; - for (; ; dp = dp->next) { if (!dp) { return -1; } - if ((!dp->detect || dp == init_dp) + if (!(dp->bit & val->dp_mask) + && (!dp->detect || (val->detect & dp->detect)) && check_l34(dp, SOCK_STREAM, dst)) { break; } + val->dp_mask |= dp->bit; } val->dp = dp; @@ -160,12 +160,11 @@ int socket_mod(int fd) } -static int reconnect(struct poolhd *pool, struct eval *val, struct desync_params *dp) +static int reconnect(struct poolhd *pool, struct eval *val) { assert(val->flag == FLAG_CONN); struct eval *client = val->pair; - client->dp = dp; if (connect_hook(pool, client, &val->addr, &on_tunnel)) { return -1; @@ -274,9 +273,6 @@ static bool check_round(const int *nr, int r) static void swop_groups(struct desync_params *dpc, struct desync_params *dpn) { - if (dpc->fail_count <= dpn->fail_count) { - return; - } LOG(LOG_S, "swop: %d <-> %d\n", dpc->id, dpn->id); struct desync_params dpc_cp = *dpc; @@ -306,55 +302,65 @@ static void swop_groups(struct desync_params *dpc, struct desync_params *dpn) if (params.dp == dpc) params.dp = dpn; } - + static int on_trigger(int type, struct poolhd *pool, struct eval *val) { - struct desync_params *dp = val->pair->dp; - dp->fail_count++; - val->pair->dp_mask |= dp->bit; + struct eval *lav = val->pair; - struct buffer *pair_buff = val->pair->sq_buff; - bool can_reconn = ( - pair_buff && pair_buff->lock - && !val->recv_count - && (params.auto_level & AUTO_RECONN) + bool can_reconn = (lav->sq_buff + && (params.auto_level & AUTO_RECONN) ); if (!can_reconn && !(params.auto_level & AUTO_POST)) { return -1; } INIT_ADDR_STR((val->addr)); - for (dp = dp->next; dp; dp = dp->next) { - if (!dp->detect) { - break; - } - if (!(dp->detect & type)) { - continue; - } - if (params.auto_level & AUTO_SORT) { - if (dp->bit & val->pair->dp_mask) - continue; - else - swop_groups(val->pair->dp, dp); - } - LOG(LOG_S, "save: ip=%s, id=%d\n", ADDR_STR, dp->id); - - struct elem_i *e = cache_add(&val->addr, &val->pair->host, val->pair->host_len); - if (e) { - e->dp = dp; - e->dp_mask = val->pair->dp_mask; - } - if (can_reconn) { - return reconnect(pool, val, dp); - } + struct elem_i *cache = cache_add(&val->addr, &lav->host, lav->host_len); + if (!cache) { return -1; } - LOG(LOG_S, "unreach ip: %s\n", ADDR_STR); + lav->dp->fail_count++; + lav->dp_mask |= lav->dp->bit; + lav->detect = type; - struct elem_i *e = cache_add(&val->addr, &val->pair->host, val->pair->host_len); - if (e) { - e->dp = params.dp; - e->dp_mask = 0; + uint64_t uncheked = lav->dp_mask; + struct desync_params *dp = params.dp, *next = 0; + + for (; dp; dp = dp->next) { + if (!uncheked && !dp->detect) { + break; + } + if (!(dp->bit & lav->dp_mask) + && (!dp->detect || (dp->detect & type))) { + next = dp; + break; + } + uncheked &= ~dp->bit; + lav->dp_mask |= dp->bit; + } + if ((params.auto_level & AUTO_SORT) + && !(lav->dp->bit & (cache->dp_mask | uncheked))) + { + if (next && lav->dp->pri > next->pri) + swop_groups(lav->dp, next); + else + lav->dp->pri++; + } + if (!next) { + LOG(LOG_S, "unreach ip: %s\n", ADDR_STR); + cache->dp_mask = 0; + cache->detect = 0; + cache->dp = params.dp; + return -1; + } + LOG(LOG_S, "save: ip=%s, id=%d\n", ADDR_STR, next->id); + + cache->dp_mask |= lav->dp_mask; + cache->detect = lav->detect; + cache->dp = next; + + if (can_reconn) { + return reconnect(pool, val); } return -1; } @@ -389,14 +395,16 @@ static int on_fin(struct poolhd *pool, struct eval *val) static int on_response(struct poolhd *pool, struct eval *val, const char *resp, ssize_t sn) { - struct desync_params *dp = val->pair->dp->next; + struct desync_params *dp = params.dp; char *req = val->pair->sq_buff->data; ssize_t qn = val->pair->sq_buff->size; + val->pair->dp_mask |= val->pair->dp->bit; + for (; dp; dp = dp->next) { - if (!dp->detect) { - return -1; + if (dp->bit & val->pair->dp_mask) { + continue; } if ((dp->detect & DETECT_HTTP_LOCAT) && is_http_redirect(req, qn, resp, sn)) { @@ -409,7 +417,7 @@ static int on_response(struct poolhd *pool, struct eval *val, } } if (dp) { - return reconnect(pool, val, dp); + return on_trigger(dp->detect, pool, val); } return -1; } @@ -447,15 +455,17 @@ static int setup_conn(struct eval *client, const char *buffer, ssize_t n) if (params.cache_file) { save_hostname(client, buffer, n); } - struct desync_params *dp = client->dp, *init_dp = dp; + struct desync_params *dp = client->dp, *init_dp = client->dp; for (; dp; dp = dp->next) { - if ((!dp->detect || dp == init_dp) + if (!(dp->bit & client->dp_mask) + && (!dp->detect || (client->detect & dp->detect)) && (dp == init_dp || check_l34(dp, SOCK_STREAM, &client->pair->addr)) && check_proto_tcp(dp->proto, buffer, n) && (!dp->hosts || check_host(dp->hosts, buffer, n))) { break; } + client->dp_mask |= dp->bit; } if (!dp) { LOG(LOG_E, "drop connection\n"); diff --git a/main.c b/main.c index 4ec7a1b..6da9a38 100644 --- a/main.c +++ b/main.c @@ -1189,7 +1189,7 @@ int main(int argc, char **argv) return -1; } } - if ((params.auto_level & AUTO_SORT) && params.dp_n > 64) { + if ((size_t )params.dp_n > sizeof(dp->bit) * 8) { LOG(LOG_E, "too many groups!\n"); } if (params.baddr.sa.sa_family != AF_INET6) { @@ -1222,7 +1222,7 @@ int main(int argc, char **argv) int status = run(¶ms.laddr); for (dp = params.dp; dp; dp = dp->next) { - LOG(LOG_S, "group: %d, triggered: %d\n", dp->id, dp->fail_count); + LOG(LOG_S, "group: %d, triggered: %d, pri: %d\n", dp->id, dp->fail_count, dp->pri); } if (params.cache_file) { FILE *f; diff --git a/mpool.h b/mpool.h index d9bfd2b..50ddb75 100644 --- a/mpool.h +++ b/mpool.h @@ -47,6 +47,7 @@ struct elem_i { struct desync_params *dp; uint64_t dp_mask; + int detect; time_t time; }; diff --git a/params.h b/params.h index 7eaa119..55ae2ca 100644 --- a/params.h +++ b/params.h @@ -115,6 +115,7 @@ struct desync_params { int id; uint64_t bit; int fail_count; + int pri; struct desync_params *prev; struct desync_params *next;