--- loncom/loncnew 2003/04/18 03:10:36 1.3 +++ loncom/loncnew 2003/06/03 01:59:39 1.7 @@ -2,7 +2,7 @@ # The LearningOnline Network with CAPA # lonc maintains the connections to remote computers # -# $Id: loncnew,v 1.3 2003/04/18 03:10:36 albertel Exp $ +# $Id: loncnew,v 1.7 2003/06/03 01:59:39 foxr Exp $ # # Copyright Michigan State University Board of Trustees # @@ -34,13 +34,22 @@ # - Add timer dispatch. (done) # - Add ability to accept lonc UNIX domain sockets. (done) # - Add ability to create/negotiate lond connections (done). -# - Add general logic for dispatching requests and timeouts. -# - Add support for the lonc/lond requests. +# - Add general logic for dispatching requests and timeouts. (done). +# - Add support for the lonc/lond requests. (done). # - Add logging/status monitoring. # - Add Signal handling - HUP restarts. USR1 status report. -# - Add Configuration file I/O +# - Add Configuration file I/O (done). # - Add Pending request processing on startup. # - Add management/status request interface. +# - Add deferred request capability. +# + +# Change log: +# $Log: loncnew,v $ +# Revision 1.7 2003/06/03 01:59:39 foxr +# complete coding to support deferred transactions. +# +# use lib "/home/httpd/lib/perl/"; use lib "/home/foxr/newloncapa/types"; @@ -49,11 +58,13 @@ use POSIX qw(:signal_h); use IO::Socket; use IO::Socket::INET; use IO::Socket::UNIX; +use IO::Handle; use Socket; use Crypt::IDEA; use LONCAPA::Queue; use LONCAPA::Stack; use LONCAPA::LondConnection; +use LONCAPA::LondTransaction; use LONCAPA::Configuration; use LONCAPA::HashIterator; @@ -95,12 +106,18 @@ my $RemoteHost; # Name of host child i my $UnixSocketDir= "/home/httpd/sockets"; my $IdleConnections = Stack->new(); # Set of idle connections my %ActiveConnections; # Connections to the remote lond. -my %ActiveTransactions; # Transactions in flight. +my %ActiveTransactions; # LondTransactions in flight. my %ActiveClients; # Serial numbers of active clients by socket. my $WorkQueue = Queue->new(); # Queue of pending transactions. -my $ClientQueue = Queue->new(); # Queue of clients causing xactinos. +# my $ClientQueue = Queue->new(); # Queue of clients causing xactinos. my $ConnectionCount = 0; +my $IdleSeconds = 0; # Number of seconds idle. +# +# This disconnected socket makes posible a bit more regular +# code when processing delayed requests: +# +my $NullSocket = IO::Socket->new(); # @@ -155,19 +172,61 @@ sub SocketDump { =pod +=head2 ShowStatus + + Place some text as our pid status. + +=cut +sub ShowStatus { + my $status = shift; + $0 = "lonc: ".$status; +} + +=pod + =head2 Tick Invoked each timer tick. =cut + sub Tick { my $client; + ShowStatus(GetServerHost()." Connection count: ".$ConnectionCount); Debug(6, "Tick"); Debug(6, " Current connection count: ".$ConnectionCount); foreach $client (keys %ActiveClients) { Debug(7, " Have client: with id: ".$ActiveClients{$client}); } + # Is it time to prune connection count: + + + if($IdleConnections->Count() && + ($WorkQueue->Count() == 0)) { # Idle connections and nothing to do? + $IdleSeconds++; + if($IdleSeconds > $IdleTimeout) { # Prune a connection... + $Socket = $IdleConnections->pop(); + KillSocket($Socket); + } + } else { + $IdleSeconds = 0; # Reset idle count if not idle. + } + + # Do we have work in the queue, but no connections to service them? + # If so, try to make some new connections to get things going again. + # + + my $Requests = $WorkQueue->Count(); + if (($ConnectionCount == 0) && ($Requests > 0)) { + my $Connections = ($Requests <= $MaxConnectionCount) ? + $Requests : $MaxConnectionCount; + Debug(1,"Work but no connections, starting ".$Connections." of them"); + for ($i =0; $i < $Connections; $i++) { + MakeLondConnection(); + } + + } } =pod @@ -208,24 +267,20 @@ long enough, it will be shut down and re sub ServerToIdle { my $Socket = shift; # Get the socket. + delete($ActiveTransactions{$Socket}); # Server has no transaction &Debug(6, "Server to idle"); # If there's work to do, start the transaction: - $reqdata = $WorkQueue->dequeue(); - Debug(9, "Queue gave request data: ".$reqdata); + $reqdata = $WorkQueue->dequeue(); # This is a LondTransaction unless($reqdata eq undef) { - my $unixSocket = $ClientQueue->dequeue(); - &Debug(6, "Starting new work request"); - &Debug(7, "Request: ".$reqdata); - - &StartRequest($Socket, $unixSocket, $reqdata); + Debug(9, "Queue gave request data: ".$reqdata->getRequest()); + &StartRequest($Socket, $reqdata); } else { # There's no work waiting, so push the server to idle list. &Debug(8, "No new work requests, server connection going idle"); - delete($ActiveTransactions{$Socket}); $IdleConnections->push($Socket); } } @@ -260,51 +315,55 @@ sub ClientWritable { &Debug(6, "ClientWritable writing".$Data); &Debug(9, "Socket is: ".$Socket); - my $result = $Socket->send($Data, 0); - - # $result undefined: the write failed. - # otherwise $result is the number of bytes written. - # Remove that preceding string from the data. - # If the resulting data is empty, destroy the watcher - # and set up a read event handler to accept the next - # request. - - &Debug(9,"Send result is ".$result." Defined: ".defined($result)); - if(defined($result)) { - &Debug(9, "send result was defined"); - if($result == length($Data)) { # Entire string sent. - &Debug(9, "ClientWritable data all written"); - $Watcher->cancel(); - # - # Set up to read next request from socket: - - my $descr = sprintf("Connection to lonc client %d", - $ActiveClients{$Socket}); - Event->io(cb => \&ClientRequest, - poll => 'r', - desc => $descr, - data => "", - fd => $Socket); - - } else { # Partial string sent. - $Watcher->data(substr($Data, $result)); - } + if($Socket->connected) { + my $result = $Socket->send($Data, 0); - } else { # Error of some sort... - - # Some errnos are possible: - my $errno = $!; - if($errno == POSIX::EWOULDBLOCK || - $errno == POSIX::EAGAIN || - $errno == POSIX::EINTR) { - # No action taken? - } else { # Unanticipated errno. - &Debug(5,"ClientWritable error or peer shutdown".$RemoteHost); - $Watcher->cancel; # Stop the watcher. - $Socket->shutdown(2); # Kill connection - $Socket->close(); # Close the socket. - } + # $result undefined: the write failed. + # otherwise $result is the number of bytes written. + # Remove that preceding string from the data. + # If the resulting data is empty, destroy the watcher + # and set up a read event handler to accept the next + # request. + &Debug(9,"Send result is ".$result." Defined: ".defined($result)); + if(defined($result)) { + &Debug(9, "send result was defined"); + if($result == length($Data)) { # Entire string sent. + &Debug(9, "ClientWritable data all written"); + $Watcher->cancel(); + # + # Set up to read next request from socket: + + my $descr = sprintf("Connection to lonc client %d", + $ActiveClients{$Socket}); + Event->io(cb => \&ClientRequest, + poll => 'r', + desc => $descr, + data => "", + fd => $Socket); + + } else { # Partial string sent. + $Watcher->data(substr($Data, $result)); + } + + } else { # Error of some sort... + + # Some errnos are possible: + my $errno = $!; + if($errno == POSIX::EWOULDBLOCK || + $errno == POSIX::EAGAIN || + $errno == POSIX::EINTR) { + # No action taken? + } else { # Unanticipated errno. + &Debug(5,"ClientWritable error or peer shutdown".$RemoteHost); + $Watcher->cancel; # Stop the watcher. + $Socket->shutdown(2); # Kill connection + $Socket->close(); # Close the socket. + } + + } + } else { + $Watcher->cancel(); # A delayed request...just cancel. } } @@ -325,22 +384,50 @@ Parameters: Socket on which the lond transaction occured. This is a LondConnection. The data received is in the TransactionReply member. -=item Client +=item Transaction -Unix domain socket open on the ultimate client. +The transaction that is being completed. =cut sub CompleteTransaction { &Debug(6,"Complete transaction"); my $Socket = shift; - my $Client = shift; + my $Transaction = shift; + + if (!$Transaction->isDeferred()) { # Normal transaction + my $data = $Socket->GetReply(); # Data to send. + StartClientReply($Transaction, $data); + } else { # Delete deferred transaction file. + unlink $Transaction->getFile(); + } +} +=pod +=head1 StartClientReply + + Initiates a reply to a client where the reply data is a parameter. + +=head2 parameters: + +=item Transaction - my $data = $Socket->GetReply(); # Data to send. + The transaction for which we are responding to the client. + +=item data + + The data to send to apached client. + +=cut +sub StartClientReply { + my $Transaction = shift; + my $data = shift; + + my $Client = $Transaction->getClient(); &Debug(8," Reply was: ".$data); my $Serial = $ActiveClients{$Client}; my $desc = sprintf("Connection to lonc client %d", + $Serial); Event->io(fd => $Client, poll => "w", @@ -348,7 +435,99 @@ sub CompleteTransaction { cb => \&ClientWritable, data => $data); } +=pod +=head2 FailTransaction + + Finishes a transaction with failure because the associated lond socket + disconnected. There are two possibilities: + - The transaction is deferred: in which case we just quietly + delete the transaction since there is no client connection. + - The transaction is 'live' in which case we initiate the sending + of "con_lost" to the client. + +Deleting the transaction means killing it from the +%ActiveTransactions hash. + +Parameters: + +=item client + + The LondTransaction we are failing. + +=cut + +sub FailTransaction { + my $transaction = shift; + my $Lond = $transaction->getServer(); + if (!$client->isDeferred()) { # If the transaction is deferred we'll get to it. + my $client = $transcation->getClient(); + StartClientReply($client, "con_lost"); + } +# not needed, done elsewhere if active. +# delete $ActiveTransactions{$Lond}; + +} + +=pod +=head1 EmptyQueue + + Fails all items in the work queue with con_lost. + Note that each item in the work queue is a transaction. + +=cut +sub EmptyQueue { + while($WorkQueue->Count()) { + my $request = $Workqueue->dequeue(); # This is a transaction + FailTransaction($request); + } +} + +=pod +=head2 KillSocket + +Destroys a socket. This function can be called either when a socket +has died of 'natural' causes or because a socket needs to be pruned due to +idleness. If the socket has died naturally, if there are no longer any +live connections a new connection is created (in case there are transactions +in the queue). If the socket has been pruned, it is never re-created. + +Parameters: + +=item Socket + + The socket to kill off. + +=item Restart + +nonzero if we are allowed to create a new connection. + + +=cut +sub KillSocket { + my $Socket = shift; + + # If the socket came from the active connection set, + # delete its transaction... note that FailTransaction should + # already have been called!!! + # otherwise it came from the idle set. + # + + if(exists($ActiveTransactions{$Socket})) { + delete ($ActiveTransactions{$Socket}); + } + if(exists($ActiveConnections{$Socket})) { + delete($ActiveConnections{$Socket}); + } + $ConnectionCount--; + + # If the connection count has gone to zero and there is work in the + # work queue, the work all gets failed with con_lost. + # + if($ConnectionCount == 0) { + EmptyQueue; + } +} =pod @@ -421,7 +600,16 @@ sub LondReadable { SocketDump(6, $Socket); if($Socket->Readable() != 0) { - # bad return from socket read. + # bad return from socket read. Currently this means that + # The socket has become disconnected. We fail the transaction. + + if(exists($ActiveTransactions{$Socket})) { + Debug(3,"Lond connection lost failing transaction"); + FailTransaction($ActiveTransactions{$Socket}); + } + $Watcher->cancel(); + KillSocket($Socket); + return; } SocketDump(6,$Socket); @@ -461,7 +649,7 @@ sub LondReadable { } $Watcher->cancel(); ServerToIdle($Socket); # Next work unit or idle. - + } elsif ($State eq "SendingRequest") { # We need to be writable for this and probably don't belong # here inthe first place. @@ -546,7 +734,7 @@ sub LondWritable { my @data = $Watcher->data; Debug(6,"LondWritable State = ".$State." data has ".@data." elts.\n"); - my $Socket = $data[0]; # I know there's at least a socket. + my $Socket = $data; # I know there's at least a socket. # Figure out what to do depending on the state of the socket: @@ -557,11 +745,17 @@ sub LondWritable { SocketDump(6,$Socket); if ($State eq "Connected") { - # "init" is being sent... if ($Socket->Writable() != 0) { # The write resulted in an error. + # We'll treat this as if the socket got disconnected: + + $Watcher->cancel(); + KillSocket($Socket); + return; } + # "init" is being sent... + } elsif ($State eq "Initialized") { @@ -577,7 +771,10 @@ sub LondWritable { # we're waiting for the state to change if($Socket->Writable() != 0) { - # Write of the next chunk resulted in an error. + + $Watcher->cancel(); + KillSocket($Socket); + return; } } elsif ($State eq "ChallengeReplied") { @@ -595,8 +792,12 @@ sub LondWritable { if($Socket->Writable() != 0) { # Write resulted in an error. - } + $Watcher->cancel(); + KillSocket($Socket); + return; + + } } elsif ($State eq "ReceivingKey") { # Now we need to wait for the key # to come back from the peer: @@ -609,8 +810,15 @@ sub LondWritable { # peer... write the next chunk: if($Socket->Writable() != 0) { - # Write resulted in an error. + if(exists($ActiveTransactions{$Socket})) { + Debug(3, "Lond connection lost, failing transactions"); + FailTransaction($ActiveTransactions{$Socket}); + } + $Watcher->cancel(); + KillSocket($Socket); + return; + } } elsif ($State eq "ReceivingReply") { @@ -630,6 +838,27 @@ sub LondWritable { } } +=pod + +=cut +sub QueueDelayed { + my $path = "$perlvar{'lonSockDir'}/delayed"; + opendir(DIRHANDLE, $path); + @alldelayed = grep /\.$RemoteHost$/, readdir DIRHANDLE; + closedir(DIRHANDLE); + my $dfname; + my $reqfile + foreach $reqfile (sort @alldelayed) { + $reqfile = $path/$reqfile; + my $Handle = IO::File->new($reqfile); + my $cmd = <$Handle>; + chomp($cmd); + my $Transaction = LondTransaction->new($cmd); + $Transaction->SetDeferred($reqfile); + QueueTransaction($Transaction); + } + +} =pod @@ -651,31 +880,33 @@ sub MakeLondConnection { &GetServerPort()); if($Connection == undef) { # Needs to be more robust later. - die "Failed to make a connection!!".$!."\n"; + Debug(0,"Failed to make a connection with lond."); + } else { + # The connection needs to have writability + # monitored in order to send the init sequence + # that starts the whole authentication/key + # exchange underway. + # + my $Socket = $Connection->GetSocket(); + if($Socket == undef) { + die "did not get a socket from the connection"; + } else { + &Debug(9,"MakeLondConnection got socket: ".$Socket); + } - } - # The connection needs to have writability - # monitored in order to send the init sequence - # that starts the whole authentication/key - # exchange underway. - # - my $Socket = $Connection->GetSocket(); - if($Socket == undef) { - die "did not get a socket from the connection"; - } else { - &Debug(9,"MakeLondConnection got socket: ".$Socket); + + $event = Event->io(fd => $Socket, + poll => 'w', + cb => \&LondWritable, + data => \$Connection, + desc => 'Connection to lond server'); + $ActiveConnections{$Connection} = $event; + + $ConnectionCount++; + if($ConnectionCount == 1) { # First Connection: + QueueDelayed; + } } - - - $event = Event->io(fd => $Socket, - poll => 'w', - cb => \&LondWritable, - data => ($Connection, undef), - desc => 'Connection to lond server'); - $ActiveConnections{$Lond} = $event; - - $ConnectionCount++; - } @@ -706,16 +937,16 @@ The text of the request to send. sub StartRequest { my $Lond = shift; - my $Client = shift; - my $Request = shift; + my $Request = shift; # This is a LondTransaction. - Debug(6, "StartRequest: ".$Request); + Debug(6, "StartRequest: ".$Request->getRequest()); my $Socket = $Lond->GetSocket(); - $ActiveTransactions{$Lond} = $Client; # Socket to relay to client. + $Request->Activate($Lond); + $ActiveTransactions{$Lond} = $Request; - $Lond->InitiateTransaction($Request); + $Lond->InitiateTransaction($Request->getRequest()); $event = Event->io(fd => $Lond->GetSocket(), poll => "w", cb => \&LondWritable, @@ -747,15 +978,15 @@ data to send to the lond. =cut sub QueueTransaction { - my $requestSocket = shift; - my $requestData = shift; - Debug(6,"QueueTransaction: ".$requestData); + my $requestData = shift; # This is a LondTransaction. + my $cmd = $requestData->getRequest(); + + Debug(6,"QueueTransaction: ".$cmd); my $LondSocket = $IdleConnections->pop(); if(!defined $LondSocket) { # Need to queue request. Debug(8,"Must queue..."); - $ClientQueue->enqueue($requestSocket); $WorkQueue->enqueue($requestData); if($ConnectionCount < $MaxConnectionCount) { Debug(4,"Starting additional lond connection"); @@ -763,7 +994,7 @@ sub QueueTransaction { } } else { # Can start the request: Debug(8,"Can start..."); - StartRequest($LondSocket, $requestSocket, $requestData); + StartRequest($LondSocket, $requestData); } } @@ -803,7 +1034,9 @@ sub ClientRequest { $watcher->data($data); if($data =~ /(.*\n)/) { # Request entirely read. Debug(8, "Complete transaction received: ".$data); - QueueTransaction($socket, $data); + my $Transaction = new LondTransaction->new($data); + $Transaction->SetClient($socket); + QueueTransaction($Transaction); $watcher->cancel(); # Done looking for input data. } @@ -865,7 +1098,7 @@ Returns the host whose lond we talk with =cut -sub GetServerHost { # Stub - get this from config. +sub GetServerHost { return $RemoteHost; # Setup by the fork. } @@ -877,7 +1110,7 @@ Returns the lond port number. =cut -sub GetServerPort { # Stub - get this from config. +sub GetServerPort { return $perlvar{londPort}; } @@ -898,7 +1131,7 @@ sub SetupLoncListener { my $socket; my $SocketName = GetLoncSocketPath(); unlink($SocketName); - unless ($socket = IO::Socket::UNIX->new(Local => $SocketName, + unless ($socket =IO::Socket::UNIX->new(Local => $SocketName, Listen => 10, Type => SOCK_STREAM)) { die "Failed to create a lonc listner socket"; @@ -941,7 +1174,11 @@ sub ChildProcess { # Setup the initial server connection: &MakeLondConnection(); - + + if($ConnectionCount == 0) { + Debug(1,"Could not make initial connection..\n"); + Debug(1,"Will retry when there's work to do\n"); + } Debug(9,"Entering event loop"); my $ret = Event::loop(); # Start the main event loop. @@ -960,6 +1197,7 @@ sub CreateChild { if($pid) { # Parent $ChildHash{$pid} = $RemoteHost; } else { # child. + ShowStatus("Connected to ".$RemoteHost); ChildProcess; } @@ -974,6 +1212,34 @@ sub CreateChild { # Each exit gets logged and the child gets restarted. # +# +# Fork and start in new session so hang-up isn't going to +# happen without intent. +# + + + + +ShowStatus("Parent writing pid file:"); +$execdir = $perlvar{'lonDaemons'}; +open (PIDSAVE, ">$execdir/logs/lonc.pid"); +print PIDSAVE "$$\n"; +close(PIDSAVE); + +ShowStatus("Forming new session"); +my $childpid = fork; +if ($childpid != 0) { + sleep 4; # Give child a chacne to break to + exit 0; # a new sesion. +} + +if (POSIX::setsid() < 0) { + print "Could not create new session\n"; + exit -1; +} + +ShowStatus("Forking node servers"); + my $HostIterator = LondConnection::GetHostIterator; while (! $HostIterator->end()) { @@ -984,6 +1250,8 @@ while (! $HostIterator->end()) { # Maintain the population: +ShowStatus("Parent keeping the flock"); + while(1) { $deadchild = wait(); if(exists $ChildHash{$deadchild}) { # need to restart.