786 lines
18 KiB
C
786 lines
18 KiB
C
/*-
|
|
* See the file LICENSE for redistribution information.
|
|
*
|
|
* Copyright (c) 2005,2008 Oracle. All rights reserved.
|
|
*
|
|
* $Id: repmgr_windows.c 63573 2008-05-23 21:43:21Z trent.nelson $
|
|
*/
|
|
|
|
#include "db_config.h"
|
|
|
|
#define __INCLUDE_NETWORKING 1
|
|
#include "db_int.h"
|
|
|
|
/* Convert time-out from microseconds to milliseconds, rounding up. */
|
|
#define DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t) (((t) + (US_PER_MS - 1)) / US_PER_MS)
|
|
|
|
typedef struct __ack_waiter {
|
|
HANDLE event;
|
|
const DB_LSN *lsnp;
|
|
struct __ack_waiter *next_free;
|
|
} ACK_WAITER;
|
|
|
|
#define WAITER_SLOT_IN_USE(w) ((w)->lsnp != NULL)
|
|
|
|
/*
|
|
* Array slots [0:next_avail-1] are initialized, and either in use or on the
|
|
* free list. Slots beyond that are virgin territory, whose memory contents
|
|
* could be garbage. In particular, note that slots [0:next_avail-1] have a
|
|
* Win32 Event Object created for them, which have to be freed when cleaning up
|
|
* this data structure.
|
|
*
|
|
* "first_free" points to a list of not-in-use slots threaded through the first
|
|
* section of the array.
|
|
*/
|
|
struct __ack_waiters_table {
|
|
struct __ack_waiter *array;
|
|
int size;
|
|
int next_avail;
|
|
struct __ack_waiter *first_free;
|
|
};
|
|
|
|
static int allocate_wait_slot __P((ENV *, ACK_WAITER **));
|
|
static void free_wait_slot __P((ENV *, ACK_WAITER *));
|
|
static int handle_completion __P((ENV *, REPMGR_CONNECTION *));
|
|
static int finish_connecting __P((ENV *, REPMGR_CONNECTION *,
|
|
LPWSANETWORKEVENTS));
|
|
|
|
int
|
|
__repmgr_thread_start(env, runnable)
|
|
ENV *env;
|
|
REPMGR_RUNNABLE *runnable;
|
|
{
|
|
HANDLE thread_id;
|
|
|
|
runnable->finished = FALSE;
|
|
|
|
thread_id = CreateThread(NULL, 0,
|
|
(LPTHREAD_START_ROUTINE)runnable->run, env, 0, NULL);
|
|
if (thread_id == NULL)
|
|
return (GetLastError());
|
|
runnable->thread_id = thread_id;
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
__repmgr_thread_join(thread)
|
|
REPMGR_RUNNABLE *thread;
|
|
{
|
|
if (WaitForSingleObject(thread->thread_id, INFINITE) == WAIT_OBJECT_0)
|
|
return (0);
|
|
return (GetLastError());
|
|
}
|
|
|
|
int
|
|
__repmgr_set_nonblocking(s)
|
|
SOCKET s;
|
|
{
|
|
int ret;
|
|
u_long arg;
|
|
|
|
arg = 1; /* any non-zero value */
|
|
if ((ret = ioctlsocket(s, FIONBIO, &arg)) == SOCKET_ERROR)
|
|
return (WSAGetLastError());
|
|
return (0);
|
|
}
|
|
|
|
/*
|
|
* Wake any send()-ing threads waiting for an acknowledgement.
|
|
*
|
|
* !!!
|
|
* Caller must hold the repmgr->mutex, if this thread synchronization is to work
|
|
* properly.
|
|
*/
|
|
int
|
|
__repmgr_wake_waiting_senders(env)
|
|
ENV *env;
|
|
{
|
|
ACK_WAITER *slot;
|
|
DB_REP *db_rep;
|
|
int i, ret;
|
|
|
|
ret = 0;
|
|
db_rep = env->rep_handle;
|
|
for (i=0; i<db_rep->waiters->next_avail; i++) {
|
|
slot = &db_rep->waiters->array[i];
|
|
if (!WAITER_SLOT_IN_USE(slot))
|
|
continue;
|
|
if (__repmgr_is_permanent(env, slot->lsnp))
|
|
if (!SetEvent(slot->event) && ret == 0)
|
|
ret = GetLastError();
|
|
}
|
|
return (ret);
|
|
}
|
|
|
|
/*
|
|
* !!!
|
|
* Caller must hold mutex.
|
|
*/
|
|
int
|
|
__repmgr_await_ack(env, lsnp)
|
|
ENV *env;
|
|
const DB_LSN *lsnp;
|
|
{
|
|
ACK_WAITER *me;
|
|
DB_REP *db_rep;
|
|
DWORD ret, timeout;
|
|
|
|
db_rep = env->rep_handle;
|
|
|
|
if ((ret = allocate_wait_slot(env, &me)) != 0)
|
|
goto err;
|
|
|
|
timeout = db_rep->ack_timeout > 0 ?
|
|
DB_TIMEOUT_TO_WINDOWS_TIMEOUT(db_rep->ack_timeout) : INFINITE;
|
|
me->lsnp = lsnp;
|
|
if ((ret = SignalObjectAndWait(db_rep->mutex, me->event, timeout,
|
|
FALSE)) == WAIT_FAILED) {
|
|
ret = GetLastError();
|
|
} else if (ret == WAIT_TIMEOUT)
|
|
ret = DB_REP_UNAVAIL;
|
|
else
|
|
DB_ASSERT(env, ret == WAIT_OBJECT_0);
|
|
|
|
LOCK_MUTEX(db_rep->mutex);
|
|
free_wait_slot(env, me);
|
|
|
|
err:
|
|
return (ret);
|
|
}
|
|
|
|
/*
|
|
* !!!
|
|
* Caller must hold the mutex.
|
|
*/
|
|
static int
|
|
allocate_wait_slot(env, resultp)
|
|
ENV *env;
|
|
ACK_WAITER **resultp;
|
|
{
|
|
ACK_WAITER *w;
|
|
ACK_WAITERS_TABLE *table;
|
|
DB_REP *db_rep;
|
|
int ret;
|
|
|
|
db_rep = env->rep_handle;
|
|
table = db_rep->waiters;
|
|
if (table->first_free == NULL) {
|
|
if (table->next_avail >= table->size) {
|
|
/*
|
|
* Grow the array.
|
|
*/
|
|
table->size *= 2;
|
|
w = table->array;
|
|
if ((ret = __os_realloc(env, table->size * sizeof(*w),
|
|
&w)) != 0)
|
|
return (ret);
|
|
table->array = w;
|
|
}
|
|
/*
|
|
* Here if, one way or another, we're good to go for using the
|
|
* next slot (for the first time).
|
|
*/
|
|
w = &table->array[table->next_avail++];
|
|
if ((w->event = CreateEvent(NULL, FALSE, FALSE, NULL)) ==
|
|
NULL) {
|
|
/*
|
|
* Maintain the sanctity of our rule that
|
|
* [0:next_avail-1] contain valid Event Objects.
|
|
*/
|
|
--table->next_avail;
|
|
return (GetLastError());
|
|
}
|
|
} else {
|
|
w = table->first_free;
|
|
table->first_free = w->next_free;
|
|
}
|
|
*resultp = w;
|
|
return (0);
|
|
}
|
|
|
|
static void
|
|
free_wait_slot(env, slot)
|
|
ENV *env;
|
|
ACK_WAITER *slot;
|
|
{
|
|
DB_REP *db_rep;
|
|
|
|
db_rep = env->rep_handle;
|
|
|
|
slot->lsnp = NULL; /* show it's not in use */
|
|
slot->next_free = db_rep->waiters->first_free;
|
|
db_rep->waiters->first_free = slot;
|
|
}
|
|
|
|
/* (See requirements described in repmgr_posix.c.) */
|
|
int
|
|
__repmgr_await_drain(env, conn, timeout)
|
|
ENV *env;
|
|
REPMGR_CONNECTION *conn;
|
|
db_timeout_t timeout;
|
|
{
|
|
DB_REP *db_rep;
|
|
db_timespec deadline, delta, now;
|
|
db_timeout_t t;
|
|
DWORD duration, ret;
|
|
int round_up;
|
|
|
|
db_rep = env->rep_handle;
|
|
|
|
__os_gettime(env, &deadline, 1);
|
|
TIMESPEC_ADD_DB_TIMEOUT(&deadline, timeout);
|
|
|
|
while (conn->out_queue_length >= OUT_QUEUE_LIMIT) {
|
|
if (!ResetEvent(conn->drained))
|
|
return (GetLastError());
|
|
|
|
/* How long until the deadline? */
|
|
__os_gettime(env, &now, 1);
|
|
if (timespeccmp(&now, &deadline, >=)) {
|
|
conn->state = CONN_CONGESTED;
|
|
return (0);
|
|
}
|
|
delta = deadline;
|
|
timespecsub(&delta, &now);
|
|
round_up = TRUE;
|
|
DB_TIMESPEC_TO_TIMEOUT(t, &delta, round_up);
|
|
duration = DB_TIMEOUT_TO_WINDOWS_TIMEOUT(t);
|
|
|
|
ret = SignalObjectAndWait(db_rep->mutex,
|
|
conn->drained, duration, FALSE);
|
|
LOCK_MUTEX(db_rep->mutex);
|
|
if (ret == WAIT_FAILED)
|
|
return (GetLastError());
|
|
else if (ret == WAIT_TIMEOUT) {
|
|
conn->state = CONN_CONGESTED;
|
|
return (0);
|
|
} else
|
|
DB_ASSERT(env, ret == WAIT_OBJECT_0);
|
|
|
|
if (db_rep->finished)
|
|
return (0);
|
|
if (conn->state == CONN_DEFUNCT)
|
|
return (DB_REP_UNAVAIL);
|
|
}
|
|
return (0);
|
|
}
|
|
|
|
/*
|
|
* Creates a manual reset event, which is usually our best choice when we may
|
|
* have multiple threads waiting on a single event.
|
|
*/
|
|
int
|
|
__repmgr_alloc_cond(c)
|
|
cond_var_t *c;
|
|
{
|
|
HANDLE event;
|
|
|
|
if ((event = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)
|
|
return (GetLastError());
|
|
*c = event;
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
__repmgr_free_cond(c)
|
|
cond_var_t *c;
|
|
{
|
|
if (CloseHandle(*c))
|
|
return (0);
|
|
return (GetLastError());
|
|
}
|
|
|
|
/*
|
|
* Make resource allocation an all-or-nothing affair, outside of this and the
|
|
* close_sync function. db_rep->waiters should be non-NULL iff all of these
|
|
* resources have been created.
|
|
*/
|
|
int
|
|
__repmgr_init_sync(env, db_rep)
|
|
ENV *env;
|
|
DB_REP *db_rep;
|
|
{
|
|
#define INITIAL_ALLOCATION 5 /* arbitrary size */
|
|
ACK_WAITERS_TABLE *table;
|
|
int ret;
|
|
|
|
db_rep->signaler = db_rep->queue_nonempty = db_rep->check_election =
|
|
db_rep->mutex = NULL;
|
|
table = NULL;
|
|
|
|
if ((db_rep->signaler = CreateEvent(NULL, /* security attr */
|
|
FALSE, /* (not) of the manual reset variety */
|
|
FALSE, /* (not) initially signaled */
|
|
NULL)) == NULL) /* name */
|
|
goto geterr;
|
|
|
|
if ((db_rep->queue_nonempty = CreateEvent(NULL, TRUE, FALSE, NULL))
|
|
== NULL)
|
|
goto geterr;
|
|
|
|
if ((db_rep->check_election = CreateEvent(NULL, FALSE, FALSE, NULL))
|
|
== NULL)
|
|
goto geterr;
|
|
|
|
if ((db_rep->mutex = CreateMutex(NULL, FALSE, NULL)) == NULL)
|
|
goto geterr;
|
|
|
|
if ((ret = __os_calloc(env, 1, sizeof(ACK_WAITERS_TABLE), &table))
|
|
!= 0)
|
|
goto err;
|
|
|
|
if ((ret = __os_calloc(env, INITIAL_ALLOCATION, sizeof(ACK_WAITER),
|
|
&table->array)) != 0)
|
|
goto err;
|
|
|
|
table->size = INITIAL_ALLOCATION;
|
|
table->first_free = NULL;
|
|
table->next_avail = 0;
|
|
|
|
/* There's a restaurant joke in there somewhere. */
|
|
db_rep->waiters = table;
|
|
return (0);
|
|
|
|
geterr:
|
|
ret = GetLastError();
|
|
err:
|
|
if (db_rep->check_election != NULL)
|
|
CloseHandle(db_rep->check_election);
|
|
if (db_rep->queue_nonempty != NULL)
|
|
CloseHandle(db_rep->queue_nonempty);
|
|
if (db_rep->signaler != NULL)
|
|
CloseHandle(db_rep->signaler);
|
|
if (db_rep->mutex != NULL)
|
|
CloseHandle(db_rep->mutex);
|
|
if (table != NULL)
|
|
__os_free(env, table);
|
|
db_rep->waiters = NULL;
|
|
return (ret);
|
|
}
|
|
|
|
int
|
|
__repmgr_close_sync(env)
|
|
ENV *env;
|
|
{
|
|
DB_REP *db_rep;
|
|
int i, ret;
|
|
|
|
db_rep = env->rep_handle;
|
|
if (!(REPMGR_SYNC_INITED(db_rep)))
|
|
return (0);
|
|
|
|
ret = 0;
|
|
for (i = 0; i < db_rep->waiters->next_avail; i++) {
|
|
if (!CloseHandle(db_rep->waiters->array[i].event) && ret == 0)
|
|
ret = GetLastError();
|
|
}
|
|
__os_free(env, db_rep->waiters->array);
|
|
__os_free(env, db_rep->waiters);
|
|
|
|
if (!CloseHandle(db_rep->check_election) && ret == 0)
|
|
ret = GetLastError();
|
|
|
|
if (!CloseHandle(db_rep->queue_nonempty) && ret == 0)
|
|
ret = GetLastError();
|
|
|
|
if (!CloseHandle(db_rep->signaler) && ret == 0)
|
|
ret = GetLastError();
|
|
|
|
if (!CloseHandle(db_rep->mutex) && ret == 0)
|
|
ret = GetLastError();
|
|
|
|
db_rep->waiters = NULL;
|
|
return (ret);
|
|
}
|
|
|
|
/*
|
|
* Performs net-related resource initialization other than memory initialization
|
|
* and allocation. A valid db_rep->listen_fd acts as the "all-or-nothing"
|
|
* sentinel signifying that these resources are allocated (except that now the
|
|
* new wsa_inited flag may be used to indicate that WSAStartup has already been
|
|
* called).
|
|
*/
|
|
int
|
|
__repmgr_net_init(env, db_rep)
|
|
ENV *env;
|
|
DB_REP *db_rep;
|
|
{
|
|
int ret;
|
|
|
|
/* Initialize the Windows sockets DLL. */
|
|
if (!db_rep->wsa_inited && (ret = __repmgr_wsa_init(env)) != 0)
|
|
goto err;
|
|
|
|
if ((ret = __repmgr_listen(env)) == 0)
|
|
return (0);
|
|
|
|
if (WSACleanup() == SOCKET_ERROR) {
|
|
ret = net_errno;
|
|
__db_err(env, ret, "WSACleanup");
|
|
}
|
|
|
|
err: db_rep->listen_fd = INVALID_SOCKET;
|
|
return (ret);
|
|
}
|
|
|
|
/*
|
|
* __repmgr_wsa_init --
|
|
* Initialize the Windows sockets DLL.
|
|
*
|
|
* PUBLIC: int __repmgr_wsa_init __P((ENV *));
|
|
*/
|
|
int
|
|
__repmgr_wsa_init(env)
|
|
ENV *env;
|
|
{
|
|
DB_REP *db_rep;
|
|
WSADATA wsaData;
|
|
int ret;
|
|
|
|
db_rep = env->rep_handle;
|
|
|
|
if ((ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0) {
|
|
__db_err(env, ret, "unable to initialize Windows networking");
|
|
return (ret);
|
|
}
|
|
db_rep->wsa_inited = TRUE;
|
|
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
__repmgr_lock_mutex(mutex)
|
|
mgr_mutex_t *mutex;
|
|
{
|
|
if (WaitForSingleObject(*mutex, INFINITE) == WAIT_OBJECT_0)
|
|
return (0);
|
|
return (GetLastError());
|
|
}
|
|
|
|
int
|
|
__repmgr_unlock_mutex(mutex)
|
|
mgr_mutex_t *mutex;
|
|
{
|
|
if (ReleaseMutex(*mutex))
|
|
return (0);
|
|
return (GetLastError());
|
|
}
|
|
|
|
int
|
|
__repmgr_signal(v)
|
|
cond_var_t *v;
|
|
{
|
|
return (SetEvent(*v) ? 0 : GetLastError());
|
|
}
|
|
|
|
int
|
|
__repmgr_wake_main_thread(env)
|
|
ENV *env;
|
|
{
|
|
if (!SetEvent(env->rep_handle->signaler))
|
|
return (GetLastError());
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
__repmgr_writev(fd, iovec, buf_count, byte_count_p)
|
|
socket_t fd;
|
|
db_iovec_t *iovec;
|
|
int buf_count;
|
|
size_t *byte_count_p;
|
|
{
|
|
DWORD bytes;
|
|
|
|
if (WSASend(fd, iovec,
|
|
(DWORD)buf_count, &bytes, 0, NULL, NULL) == SOCKET_ERROR)
|
|
return (net_errno);
|
|
|
|
*byte_count_p = (size_t)bytes;
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
__repmgr_readv(fd, iovec, buf_count, xfr_count_p)
|
|
socket_t fd;
|
|
db_iovec_t *iovec;
|
|
int buf_count;
|
|
size_t *xfr_count_p;
|
|
{
|
|
DWORD bytes, flags;
|
|
|
|
flags = 0;
|
|
if (WSARecv(fd, iovec,
|
|
(DWORD)buf_count, &bytes, &flags, NULL, NULL) == SOCKET_ERROR)
|
|
return (net_errno);
|
|
|
|
*xfr_count_p = (size_t)bytes;
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
__repmgr_select_loop(env)
|
|
ENV *env;
|
|
{
|
|
DB_REP *db_rep;
|
|
DWORD nevents, ret;
|
|
DWORD select_timeout;
|
|
REPMGR_CONNECTION *conn, *next;
|
|
REPMGR_CONNECTION *connections[WSA_MAXIMUM_WAIT_EVENTS];
|
|
WSAEVENT events[WSA_MAXIMUM_WAIT_EVENTS];
|
|
db_timespec timeout;
|
|
WSAEVENT listen_event;
|
|
WSANETWORKEVENTS net_events;
|
|
int flow_control, i;
|
|
|
|
db_rep = env->rep_handle;
|
|
|
|
if ((listen_event = WSACreateEvent()) == WSA_INVALID_EVENT) {
|
|
__db_err(
|
|
env, net_errno, "can't create event for listen socket");
|
|
return (net_errno);
|
|
}
|
|
if (WSAEventSelect(db_rep->listen_fd, listen_event, FD_ACCEPT) ==
|
|
SOCKET_ERROR) {
|
|
ret = net_errno;
|
|
__db_err(env, ret, "can't enable event for listener");
|
|
goto out;
|
|
}
|
|
|
|
LOCK_MUTEX(db_rep->mutex);
|
|
if ((ret = __repmgr_first_try_connections(env)) != 0)
|
|
goto unlock;
|
|
flow_control = FALSE;
|
|
for (;;) {
|
|
/* Start with the two events that we always wait for. */
|
|
events[0] = db_rep->signaler;
|
|
events[1] = listen_event;
|
|
nevents = 2;
|
|
|
|
/*
|
|
* Add an event for each surviving socket that we're interested
|
|
* in. (For now [until we implement flow control], that's all
|
|
* of them, in one form or another.) Clean up defunct
|
|
* connections; note that this is the only place where elements
|
|
* get deleted from this list.
|
|
* Loop just like TAILQ_FOREACH, except that we need to be
|
|
* able to unlink a list entry.
|
|
*/
|
|
for (conn = TAILQ_FIRST(&db_rep->connections);
|
|
conn != NULL;
|
|
conn = next) {
|
|
next = TAILQ_NEXT(conn, entries);
|
|
|
|
if (conn->state == CONN_DEFUNCT) {
|
|
if ((ret = __repmgr_cleanup_connection(env,
|
|
conn)) != 0)
|
|
goto unlock;
|
|
continue;
|
|
}
|
|
|
|
/*
|
|
* Note that even if we're suffering flow control, we
|
|
* nevertheless still read if we haven't even yet gotten
|
|
* a handshake. Why? (1) Handshakes are important; and
|
|
* (2) they don't hurt anything flow-control-wise.
|
|
*/
|
|
if (conn->state == CONN_CONNECTING ||
|
|
!STAILQ_EMPTY(&conn->outbound_queue) ||
|
|
(!flow_control || !IS_VALID_EID(conn->eid))) {
|
|
events[nevents] = conn->event_object;
|
|
connections[nevents++] = conn;
|
|
}
|
|
}
|
|
|
|
if (__repmgr_compute_timeout(env, &timeout))
|
|
select_timeout =
|
|
(DWORD)(timeout.tv_sec * MS_PER_SEC +
|
|
timeout.tv_nsec / NS_PER_MS);
|
|
else {
|
|
/* No time-based events to wake us up. */
|
|
select_timeout = WSA_INFINITE;
|
|
}
|
|
|
|
UNLOCK_MUTEX(db_rep->mutex);
|
|
ret = WSAWaitForMultipleEvents(
|
|
nevents, events, FALSE, select_timeout, FALSE);
|
|
if (db_rep->finished) {
|
|
ret = 0;
|
|
goto out;
|
|
}
|
|
LOCK_MUTEX(db_rep->mutex);
|
|
|
|
/*
|
|
* !!!
|
|
* Note that `ret' remains set as the return code from
|
|
* WSAWaitForMultipleEvents, above.
|
|
*/
|
|
if (ret >= WSA_WAIT_EVENT_0 &&
|
|
ret < WSA_WAIT_EVENT_0 + nevents) {
|
|
switch (i = ret - WSA_WAIT_EVENT_0) {
|
|
case 0:
|
|
/* Another thread woke us. */
|
|
break;
|
|
case 1:
|
|
if ((ret = WSAEnumNetworkEvents(
|
|
db_rep->listen_fd, listen_event,
|
|
&net_events)) == SOCKET_ERROR) {
|
|
ret = net_errno;
|
|
goto unlock;
|
|
}
|
|
DB_ASSERT(env,
|
|
net_events.lNetworkEvents & FD_ACCEPT);
|
|
if ((ret = net_events.iErrorCode[FD_ACCEPT_BIT])
|
|
!= 0)
|
|
goto unlock;
|
|
if ((ret = __repmgr_accept(env)) != 0)
|
|
goto unlock;
|
|
break;
|
|
default:
|
|
if (connections[i]->state != CONN_DEFUNCT &&
|
|
(ret = handle_completion(env,
|
|
connections[i])) != 0)
|
|
goto unlock;
|
|
break;
|
|
}
|
|
} else if (ret == WSA_WAIT_TIMEOUT) {
|
|
if ((ret = __repmgr_check_timeouts(env)) != 0)
|
|
goto unlock;
|
|
} else if (ret == WSA_WAIT_FAILED) {
|
|
ret = net_errno;
|
|
goto unlock;
|
|
}
|
|
}
|
|
|
|
unlock:
|
|
UNLOCK_MUTEX(db_rep->mutex);
|
|
out:
|
|
if (!CloseHandle(listen_event) && ret == 0)
|
|
ret = GetLastError();
|
|
return (ret);
|
|
}
|
|
|
|
static int
|
|
handle_completion(env, conn)
|
|
ENV *env;
|
|
REPMGR_CONNECTION *conn;
|
|
{
|
|
int ret;
|
|
WSANETWORKEVENTS events;
|
|
|
|
if ((ret = WSAEnumNetworkEvents(conn->fd, conn->event_object, &events))
|
|
== SOCKET_ERROR) {
|
|
__db_err(env, net_errno, "EnumNetworkEvents");
|
|
STAT(env->rep_handle->region->mstat.st_connection_drop++);
|
|
ret = DB_REP_UNAVAIL;
|
|
goto err;
|
|
}
|
|
|
|
if (conn->state == CONN_CONNECTING) {
|
|
if ((ret = finish_connecting(env, conn, &events)) != 0)
|
|
goto err;
|
|
} else { /* Check both writing and reading. */
|
|
if (events.lNetworkEvents & FD_CLOSE) {
|
|
__db_err(env,
|
|
events.iErrorCode[FD_CLOSE_BIT],
|
|
"connection closed");
|
|
STAT(env->rep_handle->
|
|
region->mstat.st_connection_drop++);
|
|
ret = DB_REP_UNAVAIL;
|
|
goto err;
|
|
}
|
|
|
|
if (events.lNetworkEvents & FD_WRITE) {
|
|
if (events.iErrorCode[FD_WRITE_BIT] != 0) {
|
|
__db_err(env,
|
|
events.iErrorCode[FD_WRITE_BIT],
|
|
"error writing");
|
|
STAT(env->rep_handle->
|
|
region->mstat.st_connection_drop++);
|
|
ret = DB_REP_UNAVAIL;
|
|
goto err;
|
|
} else if ((ret =
|
|
__repmgr_write_some(env, conn)) != 0)
|
|
goto err;
|
|
}
|
|
|
|
if (events.lNetworkEvents & FD_READ) {
|
|
if (events.iErrorCode[FD_READ_BIT] != 0) {
|
|
__db_err(env,
|
|
events.iErrorCode[FD_READ_BIT],
|
|
"error reading");
|
|
STAT(env->rep_handle->
|
|
region->mstat.st_connection_drop++);
|
|
ret = DB_REP_UNAVAIL;
|
|
goto err;
|
|
} else if ((ret =
|
|
__repmgr_read_from_site(env, conn)) != 0)
|
|
goto err;
|
|
}
|
|
}
|
|
|
|
err:
|
|
if (ret == DB_REP_UNAVAIL)
|
|
ret = __repmgr_bust_connection(env, conn);
|
|
return (ret);
|
|
}
|
|
|
|
static int
|
|
finish_connecting(env, conn, events)
|
|
ENV *env;
|
|
REPMGR_CONNECTION *conn;
|
|
LPWSANETWORKEVENTS events;
|
|
{
|
|
DB_REP *db_rep;
|
|
u_int eid;
|
|
/* char reason[100]; */
|
|
int ret/*, t_ret*/;
|
|
/* DWORD_PTR values[1]; */
|
|
|
|
if (!(events->lNetworkEvents & FD_CONNECT))
|
|
return (0);
|
|
|
|
conn->state = CONN_CONNECTED;
|
|
|
|
if ((ret = events->iErrorCode[FD_CONNECT_BIT]) != 0) {
|
|
/* t_ret = FormatMessage( */
|
|
/* FORMAT_MESSAGE_IGNORE_INSERTS | */
|
|
/* FORMAT_MESSAGE_FROM_SYSTEM | */
|
|
/* FORMAT_MESSAGE_ARGUMENT_ARRAY, */
|
|
/* NULL, ret, 0, (LPTSTR)reason, sizeof(reason), values); */
|
|
/* __db_err(env/\*, ret*\/, "connecting: %s", */
|
|
/* reason); */
|
|
/* LocalFree(reason); */
|
|
__db_err(env, ret, "connecting");
|
|
goto err;
|
|
}
|
|
|
|
if (WSAEventSelect(conn->fd, conn->event_object, FD_READ | FD_CLOSE) ==
|
|
SOCKET_ERROR) {
|
|
ret = net_errno;
|
|
__db_err(env, ret, "setting event bits for reading");
|
|
return (ret);
|
|
}
|
|
|
|
return (__repmgr_propose_version(env, conn));
|
|
|
|
err:
|
|
db_rep = env->rep_handle;
|
|
eid = conn->eid;
|
|
DB_ASSERT(env, IS_VALID_EID(eid));
|
|
|
|
if (ADDR_LIST_NEXT(&SITE_FROM_EID(eid)->net_addr) == NULL) {
|
|
STAT(db_rep->region->mstat.st_connect_fail++);
|
|
return (DB_REP_UNAVAIL);
|
|
}
|
|
|
|
/*
|
|
* Since we're immediately trying the next address in the list, simply
|
|
* disable the failed connection, without the usual recovery.
|
|
*/
|
|
DISABLE_CONNECTION(conn);
|
|
|
|
ret = __repmgr_connect_site(env, eid);
|
|
DB_ASSERT(env, ret != DB_REP_UNAVAIL);
|
|
return (ret);
|
|
}
|